1pub mod async_udf;
2pub mod parse;
3
4use arrow::array::{
5 ArrayBuilder, ArrayData, BinaryBuilder, BooleanBuilder, Float32Builder, Float64Builder,
6 Int32Builder, Int64Builder, StringBuilder, TimestampNanosecondBuilder, UInt32Builder,
7 UInt64Builder,
8};
9use arrow::ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema};
10use std::time::{SystemTime, UNIX_EPOCH};
11
12#[repr(C)]
13#[derive(Debug)]
14pub struct FfiArraySchema(pub FFI_ArrowArray, pub FFI_ArrowSchema);
15
16impl FfiArraySchema {
17 pub fn from_data(data: ArrayData) -> Self {
18 let (array, schema) = to_ffi(&data).unwrap();
19 Self(array, schema)
20 }
21}
22
23#[repr(C)]
24pub struct FfiArrays {
25 ptr: *mut FfiArraySchema,
26 len: usize,
27 capacity: usize,
28 error: bool,
29}
30
31unsafe impl Send for FfiArrays {}
32
33impl FfiArrays {
34 pub fn from_vec(value: Vec<ArrayData>) -> Self {
35 let vec: Vec<_> = value
36 .into_iter()
37 .map(|a| to_ffi(&a).unwrap())
38 .map(|(data, schema)| FfiArraySchema(data, schema))
39 .collect();
40
41 let len = vec.len();
42 let capacity = vec.capacity();
43 let ptr = vec.leak().as_mut_ptr();
46
47 Self {
48 ptr,
49 len,
50 capacity,
51 error: false,
52 }
53 }
54
55 pub fn into_vec(self) -> Vec<ArrayData> {
56 let vec = unsafe { Vec::from_raw_parts(self.ptr, self.len, self.capacity) };
57
58 vec.into_iter()
59 .map(|FfiArraySchema(array, schema)| unsafe { from_ffi(array, &schema).unwrap() })
60 .collect()
61 }
62}
63
64#[repr(C)]
65pub enum RunResult {
66 Ok(FfiArraySchema),
67 Err,
68}
69
70pub enum ArrowDatum {
71 Bool(Option<bool>),
72 U32(Option<u32>),
73 U64(Option<u64>),
74 I32(Option<i32>),
75 I64(Option<i64>),
76 F32(Option<f32>),
77 F64(Option<f64>),
78 String(Option<String>),
79 Bytes(Option<Vec<u8>>),
80 Timestamp(Option<SystemTime>),
81}
82
83fn to_nanos(time: SystemTime) -> i64 {
84 time.duration_since(UNIX_EPOCH).unwrap().as_nanos() as i64
85}
86impl ArrowDatum {
87 pub fn append_to(self, builder: &mut dyn ArrayBuilder) {
88 match self {
89 ArrowDatum::Bool(x) => builder
90 .as_any_mut()
91 .downcast_mut::<BooleanBuilder>()
92 .unwrap()
93 .append_option(x),
94 ArrowDatum::U32(x) => builder
95 .as_any_mut()
96 .downcast_mut::<UInt32Builder>()
97 .unwrap()
98 .append_option(x),
99 ArrowDatum::U64(x) => builder
100 .as_any_mut()
101 .downcast_mut::<UInt64Builder>()
102 .unwrap()
103 .append_option(x),
104 ArrowDatum::I32(x) => builder
105 .as_any_mut()
106 .downcast_mut::<Int32Builder>()
107 .unwrap()
108 .append_option(x),
109 ArrowDatum::I64(x) => builder
110 .as_any_mut()
111 .downcast_mut::<Int64Builder>()
112 .unwrap()
113 .append_option(x),
114 ArrowDatum::F32(x) => builder
115 .as_any_mut()
116 .downcast_mut::<Float32Builder>()
117 .unwrap()
118 .append_option(x),
119 ArrowDatum::F64(x) => builder
120 .as_any_mut()
121 .downcast_mut::<Float64Builder>()
122 .unwrap()
123 .append_option(x),
124 ArrowDatum::String(x) => builder
125 .as_any_mut()
126 .downcast_mut::<StringBuilder>()
127 .unwrap()
128 .append_option(x),
129 ArrowDatum::Bytes(x) => builder
130 .as_any_mut()
131 .downcast_mut::<BinaryBuilder>()
132 .unwrap()
133 .append_option(x),
134 ArrowDatum::Timestamp(x) => builder
135 .as_any_mut()
136 .downcast_mut::<TimestampNanosecondBuilder>()
137 .unwrap()
138 .append_option(x.map(to_nanos)),
139 }
140 }
141}