redpanda_transform_sdk_sys/
lib.rs

1// Copyright 2023 Redpanda Data, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! An internal crate providing the ABI contract for Redpanda's Data Transforms.
16//!
17//! If you are looking to use transforms you probably want crate
18//! [redpanda-transform-sdk](https://crates.io/crates/redpanda-transform-sdk).
19
20use std::{
21    fmt::Debug,
22    time::{Duration, SystemTime},
23};
24
25#[cfg(target_os = "wasi")]
26mod abi;
27#[cfg(not(target_os = "wasi"))]
28mod stub_abi;
29#[cfg(not(target_os = "wasi"))]
30use stub_abi as abi;
31mod serde;
32
33use redpanda_transform_sdk_types::*;
34
35extern crate redpanda_transform_sdk_varint as varint;
36
37#[cfg(test)]
38#[macro_use]
39extern crate quickcheck;
40
41#[cfg(test)]
42extern crate rand;
43
44pub fn process<E, F>(cb: F) -> !
45where
46    E: Debug,
47    F: Fn(WriteEvent, &mut RecordWriter) -> Result<(), E>,
48{
49    unsafe {
50        abi::check_abi();
51    }
52    let mut input_buffer: Vec<u8> = vec![];
53    let mut sink = AbiRecordWriter::new();
54    let mut writer = RecordWriter::new(&mut sink);
55    loop {
56        process_batch(&mut input_buffer, &mut writer, &cb);
57    }
58}
59
60struct BatchHeader {
61    pub base_offset: i64,
62    pub record_count: i32,
63    pub partition_leader_epoch: i32,
64    pub attributes: i16,
65    pub last_offset_delta: i32,
66    pub base_timestamp: i64,
67    pub max_timestamp: i64,
68    pub producer_id: i64,
69    pub producer_epoch: i16,
70    pub base_sequence: i32,
71}
72
73struct AbiRecordWriter {
74    pub output_buffer: Vec<u8>,
75    pub options_buffer: Vec<u8>,
76}
77
78impl AbiRecordWriter {
79    fn new() -> Self {
80        Self {
81            output_buffer: Vec::new(),
82            options_buffer: Vec::new(),
83        }
84    }
85}
86
87impl RecordSink for AbiRecordWriter {
88    fn write(&mut self, r: BorrowedRecord, opts: WriteOptions) -> Result<(), WriteError> {
89        self.output_buffer.clear();
90        serde::write_record_payload(r, &mut self.output_buffer);
91        let errno_or_amt = match opts.topic {
92            Some(topic) => {
93                self.options_buffer.clear();
94                // Encode the options buffer:
95                self.options_buffer.push(0x01);
96                varint::write_sized_buffer(&mut self.options_buffer, Some(topic.as_bytes()));
97                unsafe {
98                    abi::write_record_with_options(
99                        self.output_buffer.as_ptr(),
100                        self.output_buffer.len() as u32,
101                        self.options_buffer.as_ptr(),
102                        self.options_buffer.len() as u32,
103                    )
104                }
105            }
106            None => unsafe {
107                abi::write_record(self.output_buffer.as_ptr(), self.output_buffer.len() as u32)
108            },
109        };
110        if errno_or_amt == self.output_buffer.len() as i32 {
111            Ok(())
112        } else {
113            Err(WriteError::Unknown(errno_or_amt))
114        }
115    }
116}
117
118fn process_batch<E, F>(input_buffer: &mut Vec<u8>, writer: &mut RecordWriter, cb: &F)
119where
120    E: Debug,
121    F: Fn(WriteEvent, &mut RecordWriter) -> Result<(), E>,
122{
123    let mut header = BatchHeader {
124        base_offset: 0,
125        record_count: 0,
126        partition_leader_epoch: 0,
127        attributes: 0,
128        last_offset_delta: 0,
129        base_timestamp: 0,
130        max_timestamp: 0,
131        producer_id: 0,
132        producer_epoch: 0,
133        base_sequence: 0,
134    };
135    let errno_or_buf_size = unsafe {
136        abi::read_batch_header(
137            &mut header.base_offset,
138            &mut header.record_count,
139            &mut header.partition_leader_epoch,
140            &mut header.attributes,
141            &mut header.last_offset_delta,
142            &mut header.base_timestamp,
143            &mut header.max_timestamp,
144            &mut header.producer_id,
145            &mut header.producer_epoch,
146            &mut header.base_sequence,
147        )
148    };
149    assert!(
150        errno_or_buf_size >= 0,
151        "failed to read batch header (errno: {errno_or_buf_size})"
152    );
153    let buf_size = errno_or_buf_size as usize;
154    input_buffer.resize(buf_size, 0);
155    for _ in 0..header.record_count {
156        let mut attr: u8 = 0;
157        let mut timestamp: i64 = 0;
158        let mut offset: i64 = 0;
159        let errno_or_amt = unsafe {
160            abi::read_next_record(
161                &mut attr,
162                &mut timestamp,
163                &mut offset,
164                input_buffer.as_mut_ptr(),
165                input_buffer.len() as u32,
166            )
167        };
168        assert!(
169            errno_or_amt >= 0,
170            "reading record failed (errno: {errno_or_amt}, buffer_size: {buf_size})"
171        );
172        let amt = errno_or_amt as usize;
173        let ts = SystemTime::UNIX_EPOCH + Duration::from_millis(timestamp as u64);
174        let record = serde::read_record_from_payload(&input_buffer[0..amt])
175            .expect("deserializing record failed");
176        cb(
177            WriteEvent {
178                record: WrittenRecord::from_record(record, ts),
179            },
180            writer,
181        )
182        .expect("transforming record failed");
183    }
184}