redpanda_transform_sdk_sys/
lib.rs1use std::{
21 fmt::Debug,
22 time::{Duration, SystemTime},
23};
24
25#[cfg(target_os = "wasi")]
26mod abi;
27#[cfg(not(target_os = "wasi"))]
28mod stub_abi;
29#[cfg(not(target_os = "wasi"))]
30use stub_abi as abi;
31mod serde;
32
33use redpanda_transform_sdk_types::*;
34
35extern crate redpanda_transform_sdk_varint as varint;
36
37#[cfg(test)]
38#[macro_use]
39extern crate quickcheck;
40
41#[cfg(test)]
42extern crate rand;
43
44pub fn process<E, F>(cb: F) -> !
45where
46 E: Debug,
47 F: Fn(WriteEvent, &mut RecordWriter) -> Result<(), E>,
48{
49 unsafe {
50 abi::check_abi();
51 }
52 let mut input_buffer: Vec<u8> = vec![];
53 let mut sink = AbiRecordWriter::new();
54 let mut writer = RecordWriter::new(&mut sink);
55 loop {
56 process_batch(&mut input_buffer, &mut writer, &cb);
57 }
58}
59
60struct BatchHeader {
61 pub base_offset: i64,
62 pub record_count: i32,
63 pub partition_leader_epoch: i32,
64 pub attributes: i16,
65 pub last_offset_delta: i32,
66 pub base_timestamp: i64,
67 pub max_timestamp: i64,
68 pub producer_id: i64,
69 pub producer_epoch: i16,
70 pub base_sequence: i32,
71}
72
73struct AbiRecordWriter {
74 pub output_buffer: Vec<u8>,
75 pub options_buffer: Vec<u8>,
76}
77
78impl AbiRecordWriter {
79 fn new() -> Self {
80 Self {
81 output_buffer: Vec::new(),
82 options_buffer: Vec::new(),
83 }
84 }
85}
86
87impl RecordSink for AbiRecordWriter {
88 fn write(&mut self, r: BorrowedRecord, opts: WriteOptions) -> Result<(), WriteError> {
89 self.output_buffer.clear();
90 serde::write_record_payload(r, &mut self.output_buffer);
91 let errno_or_amt = match opts.topic {
92 Some(topic) => {
93 self.options_buffer.clear();
94 self.options_buffer.push(0x01);
96 varint::write_sized_buffer(&mut self.options_buffer, Some(topic.as_bytes()));
97 unsafe {
98 abi::write_record_with_options(
99 self.output_buffer.as_ptr(),
100 self.output_buffer.len() as u32,
101 self.options_buffer.as_ptr(),
102 self.options_buffer.len() as u32,
103 )
104 }
105 }
106 None => unsafe {
107 abi::write_record(self.output_buffer.as_ptr(), self.output_buffer.len() as u32)
108 },
109 };
110 if errno_or_amt == self.output_buffer.len() as i32 {
111 Ok(())
112 } else {
113 Err(WriteError::Unknown(errno_or_amt))
114 }
115 }
116}
117
118fn process_batch<E, F>(input_buffer: &mut Vec<u8>, writer: &mut RecordWriter, cb: &F)
119where
120 E: Debug,
121 F: Fn(WriteEvent, &mut RecordWriter) -> Result<(), E>,
122{
123 let mut header = BatchHeader {
124 base_offset: 0,
125 record_count: 0,
126 partition_leader_epoch: 0,
127 attributes: 0,
128 last_offset_delta: 0,
129 base_timestamp: 0,
130 max_timestamp: 0,
131 producer_id: 0,
132 producer_epoch: 0,
133 base_sequence: 0,
134 };
135 let errno_or_buf_size = unsafe {
136 abi::read_batch_header(
137 &mut header.base_offset,
138 &mut header.record_count,
139 &mut header.partition_leader_epoch,
140 &mut header.attributes,
141 &mut header.last_offset_delta,
142 &mut header.base_timestamp,
143 &mut header.max_timestamp,
144 &mut header.producer_id,
145 &mut header.producer_epoch,
146 &mut header.base_sequence,
147 )
148 };
149 assert!(
150 errno_or_buf_size >= 0,
151 "failed to read batch header (errno: {errno_or_buf_size})"
152 );
153 let buf_size = errno_or_buf_size as usize;
154 input_buffer.resize(buf_size, 0);
155 for _ in 0..header.record_count {
156 let mut attr: u8 = 0;
157 let mut timestamp: i64 = 0;
158 let mut offset: i64 = 0;
159 let errno_or_amt = unsafe {
160 abi::read_next_record(
161 &mut attr,
162 &mut timestamp,
163 &mut offset,
164 input_buffer.as_mut_ptr(),
165 input_buffer.len() as u32,
166 )
167 };
168 assert!(
169 errno_or_amt >= 0,
170 "reading record failed (errno: {errno_or_amt}, buffer_size: {buf_size})"
171 );
172 let amt = errno_or_amt as usize;
173 let ts = SystemTime::UNIX_EPOCH + Duration::from_millis(timestamp as u64);
174 let record = serde::read_record_from_payload(&input_buffer[0..amt])
175 .expect("deserializing record failed");
176 cb(
177 WriteEvent {
178 record: WrittenRecord::from_record(record, ts),
179 },
180 writer,
181 )
182 .expect("transforming record failed");
183 }
184}