redpanda_transform_sdk_sr_sys/
lib.rs

1// Copyright 2024 Redpanda Data, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An internal crate providing the ABI contract for Redpanda's Data Transform Schema Registry
16//! Client.
17//!
18//! If you are looking to use transform's schema registry client you probably want crate
19//! [redpanda-transform-sdk-sr](https://crates.io/crates/redpanda-transform-sdk-sr).
20
21#[cfg(target_os = "wasi")]
22mod abi;
23#[cfg(not(target_os = "wasi"))]
24mod stub_abi;
25use abi::{get_schema_definition, get_subject_schema};
26#[cfg(not(target_os = "wasi"))]
27use stub_abi as abi;
28mod serde;
29
30use redpanda_transform_sdk_sr_types::*;
31use redpanda_transform_sdk_varint as varint;
32
33#[derive(Default, Debug)]
34pub struct AbiSchemaRegistryClient {}
35
36impl AbiSchemaRegistryClient {
37    pub fn new() -> AbiSchemaRegistryClient {
38        unsafe {
39            abi::check();
40        }
41        Self {}
42    }
43}
44
45impl SchemaRegistryClientImpl for AbiSchemaRegistryClient {
46    fn lookup_schema_by_id(&self, id: SchemaId) -> Result<Schema> {
47        let mut length: i32 = 0;
48        let errno = unsafe { abi::get_schema_definition_len(id.0, &mut length) };
49        if errno != 0 {
50            return Err(SchemaRegistryError::Unknown(errno));
51        }
52        let mut buf = vec![0; length as usize];
53        let errno_or_amt =
54            unsafe { get_schema_definition(id.0, buf.as_mut_ptr(), buf.len() as i32) };
55        if errno_or_amt < 0 {
56            return Err(SchemaRegistryError::Unknown(errno_or_amt));
57        }
58        serde::decode_schema_def(&buf[0..errno_or_amt as usize])
59    }
60
61    fn lookup_schema_by_version(
62        &self,
63        subject: &str,
64        version: SchemaVersion,
65    ) -> Result<SubjectSchema> {
66        let mut length: i32 = 0;
67        let errno = unsafe {
68            abi::get_subject_schema_len(
69                subject.as_ptr(),
70                subject.len() as i32,
71                version.0,
72                &mut length,
73            )
74        };
75        if errno != 0 {
76            return Err(SchemaRegistryError::Unknown(errno));
77        }
78        let mut buf = vec![0; length as usize];
79        let errno_or_amt = unsafe {
80            get_subject_schema(
81                subject.as_ptr(),
82                subject.len() as i32,
83                version.0,
84                buf.as_mut_ptr(),
85                buf.len() as i32,
86            )
87        };
88        if errno_or_amt < 0 {
89            return Err(SchemaRegistryError::Unknown(errno_or_amt));
90        }
91        serde::decode_schema(subject, &buf[0..errno_or_amt as usize])
92    }
93
94    fn lookup_latest_schema(&self, subject: &str) -> Result<SubjectSchema> {
95        self.lookup_schema_by_version(subject, SchemaVersion(-1))
96    }
97
98    fn create_schema(&mut self, subject: &str, schema: Schema) -> Result<SubjectSchema> {
99        let mut buf = Vec::with_capacity(schema.schema().len() + (varint::MAX_LENGTH * 2));
100        serde::encode_schema_def(&mut buf, &schema);
101        let mut schema_id: i32 = 0;
102        // TODO: It was currently an oversight that schema registry does not correctly set the
103        // version here (take it as a parameter to this method), we should bump the ABI and
104        // expose that method.
105        let schema_version: i32 = 0;
106        let errno = unsafe {
107            abi::create_subject_schema(
108                subject.as_ptr(),
109                subject.len() as i32,
110                buf.as_ptr(),
111                buf.len() as i32,
112                &mut schema_id,
113            )
114        };
115        if errno != 0 {
116            return Err(SchemaRegistryError::Unknown(errno));
117        }
118        Ok(SubjectSchema::new(
119            schema,
120            subject,
121            SchemaVersion(schema_version),
122            SchemaId(schema_id),
123        ))
124    }
125}