redpanda_transform_sdk_sr_sys/
lib.rs1#[cfg(target_os = "wasi")]
22mod abi;
23#[cfg(not(target_os = "wasi"))]
24mod stub_abi;
25use abi::{get_schema_definition, get_subject_schema};
26#[cfg(not(target_os = "wasi"))]
27use stub_abi as abi;
28mod serde;
29
30use redpanda_transform_sdk_sr_types::*;
31use redpanda_transform_sdk_varint as varint;
32
33#[derive(Default, Debug)]
34pub struct AbiSchemaRegistryClient {}
35
36impl AbiSchemaRegistryClient {
37 pub fn new() -> AbiSchemaRegistryClient {
38 unsafe {
39 abi::check();
40 }
41 Self {}
42 }
43}
44
45impl SchemaRegistryClientImpl for AbiSchemaRegistryClient {
46 fn lookup_schema_by_id(&self, id: SchemaId) -> Result<Schema> {
47 let mut length: i32 = 0;
48 let errno = unsafe { abi::get_schema_definition_len(id.0, &mut length) };
49 if errno != 0 {
50 return Err(SchemaRegistryError::Unknown(errno));
51 }
52 let mut buf = vec![0; length as usize];
53 let errno_or_amt =
54 unsafe { get_schema_definition(id.0, buf.as_mut_ptr(), buf.len() as i32) };
55 if errno_or_amt < 0 {
56 return Err(SchemaRegistryError::Unknown(errno_or_amt));
57 }
58 serde::decode_schema_def(&buf[0..errno_or_amt as usize])
59 }
60
61 fn lookup_schema_by_version(
62 &self,
63 subject: &str,
64 version: SchemaVersion,
65 ) -> Result<SubjectSchema> {
66 let mut length: i32 = 0;
67 let errno = unsafe {
68 abi::get_subject_schema_len(
69 subject.as_ptr(),
70 subject.len() as i32,
71 version.0,
72 &mut length,
73 )
74 };
75 if errno != 0 {
76 return Err(SchemaRegistryError::Unknown(errno));
77 }
78 let mut buf = vec![0; length as usize];
79 let errno_or_amt = unsafe {
80 get_subject_schema(
81 subject.as_ptr(),
82 subject.len() as i32,
83 version.0,
84 buf.as_mut_ptr(),
85 buf.len() as i32,
86 )
87 };
88 if errno_or_amt < 0 {
89 return Err(SchemaRegistryError::Unknown(errno_or_amt));
90 }
91 serde::decode_schema(subject, &buf[0..errno_or_amt as usize])
92 }
93
94 fn lookup_latest_schema(&self, subject: &str) -> Result<SubjectSchema> {
95 self.lookup_schema_by_version(subject, SchemaVersion(-1))
96 }
97
98 fn create_schema(&mut self, subject: &str, schema: Schema) -> Result<SubjectSchema> {
99 let mut buf = Vec::with_capacity(schema.schema().len() + (varint::MAX_LENGTH * 2));
100 serde::encode_schema_def(&mut buf, &schema);
101 let mut schema_id: i32 = 0;
102 let schema_version: i32 = 0;
106 let errno = unsafe {
107 abi::create_subject_schema(
108 subject.as_ptr(),
109 subject.len() as i32,
110 buf.as_ptr(),
111 buf.len() as i32,
112 &mut schema_id,
113 )
114 };
115 if errno != 0 {
116 return Err(SchemaRegistryError::Unknown(errno));
117 }
118 Ok(SubjectSchema::new(
119 schema,
120 subject,
121 SchemaVersion(schema_version),
122 SchemaId(schema_id),
123 ))
124 }
125}