1use crate::{Error, ScalarFunction, TableFunction};
18use arrow_array::RecordBatchReader;
19use arrow_ipc::{reader::FileReader, writer::FileWriter};
20
21#[no_mangle]
33#[used]
34pub static ARROWUDF_VERSION_3_0: () = ();
35
36#[no_mangle]
42pub unsafe extern "C" fn alloc(len: usize, align: usize) -> *mut u8 {
43 std::alloc::alloc(std::alloc::Layout::from_size_align_unchecked(len, align))
44}
45
46#[no_mangle]
52pub unsafe extern "C" fn dealloc(ptr: *mut u8, len: usize, align: usize) {
53 std::alloc::dealloc(
54 ptr,
55 std::alloc::Layout::from_size_align_unchecked(len, align),
56 );
57}
58
59#[repr(C)]
61#[derive(Debug)]
62pub struct CSlice {
63 pub ptr: *const u8,
64 pub len: usize,
65}
66
67pub unsafe fn scalar_wrapper(
82 function: ScalarFunction,
83 ptr: *const u8,
84 len: usize,
85 out_slice: *mut CSlice,
86) -> i32 {
87 let input = std::slice::from_raw_parts(ptr, len);
88 match call_scalar(function, input) {
89 Ok(data) => {
90 out_slice.write(CSlice {
91 ptr: data.as_ptr(),
92 len: data.len(),
93 });
94 std::mem::forget(data);
95 0
96 }
97 Err(err) => {
98 let msg = err.to_string().into_boxed_str();
99 out_slice.write(CSlice {
100 ptr: msg.as_ptr(),
101 len: msg.len(),
102 });
103 std::mem::forget(msg);
104 -1
105 }
106 }
107}
108
109fn call_scalar(function: ScalarFunction, input_bytes: &[u8]) -> Result<Box<[u8]>, Error> {
111 let mut reader = FileReader::try_new(std::io::Cursor::new(input_bytes), None)?;
112 let input_batch = reader
113 .next()
114 .ok_or_else(|| Error::IpcError("no record batch".into()))??;
115
116 let output_batch = function(&input_batch)?;
117
118 let mut buf = vec![];
120 let mut writer = FileWriter::try_new(&mut buf, &output_batch.schema())?;
121 writer.write(&output_batch)?;
122 writer.finish()?;
123 drop(writer);
124
125 Ok(buf.into())
126}
127
128pub struct RecordBatchIter {
130 iter: Box<dyn RecordBatchReader + Send>,
131}
132
133pub unsafe fn table_wrapper(
147 function: TableFunction,
148 ptr: *const u8,
149 len: usize,
150 out_slice: *mut CSlice,
151) -> i32 {
152 let input = std::slice::from_raw_parts(ptr, len);
153 match call_table(function, input) {
154 Ok(iter) => {
155 out_slice.write(CSlice {
156 ptr: Box::into_raw(iter) as *const u8,
157 len: std::mem::size_of::<RecordBatchIter>(),
158 });
159 0
160 }
161 Err(err) => {
162 let msg = err.to_string().into_boxed_str();
163 out_slice.write(CSlice {
164 ptr: msg.as_ptr(),
165 len: msg.len(),
166 });
167 std::mem::forget(msg);
168 -1
169 }
170 }
171}
172
173fn call_table(function: TableFunction, input_bytes: &[u8]) -> Result<Box<RecordBatchIter>, Error> {
174 let mut reader = FileReader::try_new(std::io::Cursor::new(input_bytes), None)?;
175 let input_batch = reader
176 .next()
177 .ok_or_else(|| Error::IpcError("no record batch".into()))??;
178
179 let iter = function(&input_batch)?;
180 Ok(Box::new(RecordBatchIter { iter }))
181}
182
183#[no_mangle]
192pub unsafe extern "C" fn record_batch_iterator_next(iter: *mut RecordBatchIter, out: *mut CSlice) {
193 let iter = iter.as_mut().expect("null pointer");
194 if let Some(Ok(batch)) = iter.iter.next() {
195 let mut buf = vec![];
196 let mut writer = FileWriter::try_new(&mut buf, &batch.schema()).unwrap();
197 writer.write(&batch).unwrap();
198 writer.finish().unwrap();
199 drop(writer);
200 let buf = buf.into_boxed_slice();
201
202 out.write(CSlice {
203 ptr: buf.as_ptr(),
204 len: buf.len(),
205 });
206 std::mem::forget(buf);
207 } else {
208 out.write(CSlice {
210 ptr: std::ptr::null(),
211 len: 0,
212 });
213 }
214}
215
216#[no_mangle]
222pub unsafe extern "C" fn record_batch_iterator_drop(iter: *mut RecordBatchIter) {
223 drop(Box::from_raw(iter));
224}