arroyo_udf_common/
lib.rs

1pub mod async_udf;
2pub mod parse;
3
4use arrow::array::{
5    ArrayBuilder, ArrayData, BinaryBuilder, BooleanBuilder, Float32Builder, Float64Builder,
6    Int32Builder, Int64Builder, StringBuilder, TimestampNanosecondBuilder, UInt32Builder,
7    UInt64Builder,
8};
9use arrow::ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[repr(C)]
13#[derive(Debug)]
14pub struct FfiArraySchema(pub FFI_ArrowArray, pub FFI_ArrowSchema);
15
16impl FfiArraySchema {
17    pub fn from_data(data: ArrayData) -> Self {
18        let (array, schema) = to_ffi(&data).unwrap();
19        Self(array, schema)
20    }
21}
22
23#[repr(C)]
24pub struct FfiArrays {
25    ptr: *mut FfiArraySchema,
26    len: usize,
27    capacity: usize,
28    error: bool,
29}
30
31unsafe impl Send for FfiArrays {}
32
33impl FfiArrays {
34    pub fn from_vec(value: Vec<ArrayData>) -> Self {
35        let vec: Vec<_> = value
36            .into_iter()
37            .map(|a| to_ffi(&a).unwrap())
38            .map(|(data, schema)| FfiArraySchema(data, schema))
39            .collect();
40
41        let len = vec.len();
42        let capacity = vec.capacity();
43        // the UDF dylib is responsible for freeing the memory of the args -- we leak it before
44        // calling the udf so that if it panics, we don't try to double-free the args
45        let ptr = vec.leak().as_mut_ptr();
46
47        Self {
48            ptr,
49            len,
50            capacity,
51            error: false,
52        }
53    }
54
55    pub fn into_vec(self) -> Vec<ArrayData> {
56        let vec = unsafe { Vec::from_raw_parts(self.ptr, self.len, self.capacity) };
57
58        vec.into_iter()
59            .map(|FfiArraySchema(array, schema)| unsafe { from_ffi(array, &schema).unwrap() })
60            .collect()
61    }
62}
63
64#[repr(C)]
65pub enum RunResult {
66    Ok(FfiArraySchema),
67    Err,
68}
69
70pub enum ArrowDatum {
71    Bool(Option<bool>),
72    U32(Option<u32>),
73    U64(Option<u64>),
74    I32(Option<i32>),
75    I64(Option<i64>),
76    F32(Option<f32>),
77    F64(Option<f64>),
78    String(Option<String>),
79    Bytes(Option<Vec<u8>>),
80    Timestamp(Option<SystemTime>),
81}
82
83fn to_nanos(time: SystemTime) -> i64 {
84    time.duration_since(UNIX_EPOCH).unwrap().as_nanos() as i64
85}
86impl ArrowDatum {
87    pub fn append_to(self, builder: &mut dyn ArrayBuilder) {
88        match self {
89            ArrowDatum::Bool(x) => builder
90                .as_any_mut()
91                .downcast_mut::<BooleanBuilder>()
92                .unwrap()
93                .append_option(x),
94            ArrowDatum::U32(x) => builder
95                .as_any_mut()
96                .downcast_mut::<UInt32Builder>()
97                .unwrap()
98                .append_option(x),
99            ArrowDatum::U64(x) => builder
100                .as_any_mut()
101                .downcast_mut::<UInt64Builder>()
102                .unwrap()
103                .append_option(x),
104            ArrowDatum::I32(x) => builder
105                .as_any_mut()
106                .downcast_mut::<Int32Builder>()
107                .unwrap()
108                .append_option(x),
109            ArrowDatum::I64(x) => builder
110                .as_any_mut()
111                .downcast_mut::<Int64Builder>()
112                .unwrap()
113                .append_option(x),
114            ArrowDatum::F32(x) => builder
115                .as_any_mut()
116                .downcast_mut::<Float32Builder>()
117                .unwrap()
118                .append_option(x),
119            ArrowDatum::F64(x) => builder
120                .as_any_mut()
121                .downcast_mut::<Float64Builder>()
122                .unwrap()
123                .append_option(x),
124            ArrowDatum::String(x) => builder
125                .as_any_mut()
126                .downcast_mut::<StringBuilder>()
127                .unwrap()
128                .append_option(x),
129            ArrowDatum::Bytes(x) => builder
130                .as_any_mut()
131                .downcast_mut::<BinaryBuilder>()
132                .unwrap()
133                .append_option(x),
134            ArrowDatum::Timestamp(x) => builder
135                .as_any_mut()
136                .downcast_mut::<TimestampNanosecondBuilder>()
137                .unwrap()
138                .append_option(x.map(to_nanos)),
139        }
140    }
141}