1use std::ffi::{CStr, CString};
2use std::ops::DerefMut;
3
4use crate::{array::Array, datatypes::Field, error::Error};
5
6use super::{export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c};
7use super::{ArrowArray, ArrowArrayStream, ArrowSchema};
8
9impl Drop for ArrowArrayStream {
10 fn drop(&mut self) {
11 match self.release {
12 None => (),
13 Some(release) => unsafe { release(self) },
14 };
15 }
16}
17
18impl ArrowArrayStream {
19 pub fn empty() -> Self {
21 Self {
22 get_schema: None,
23 get_next: None,
24 get_last_error: None,
25 release: None,
26 private_data: std::ptr::null_mut(),
27 }
28 }
29}
30
31unsafe fn handle_error(iter: &mut ArrowArrayStream) -> Error {
32 let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
33
34 if error.is_null() {
35 return Error::External(
36 "C stream".to_string(),
37 Box::new(Error::ExternalFormat("an unspecified error".to_string())),
38 );
39 }
40
41 let error = unsafe { CStr::from_ptr(error) };
42 Error::External(
43 "C stream".to_string(),
44 Box::new(Error::ExternalFormat(error.to_str().unwrap().to_string())),
45 )
46}
47
48pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
50 iter: Iter,
51 field: Field,
52}
53
54impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
55 pub unsafe fn try_new(mut iter: Iter) -> Result<Self, Error> {
66 if iter.release.is_none() {
67 return Err(Error::InvalidArgumentError(
68 "The C stream was already released".to_string(),
69 ));
70 };
71
72 if iter.get_next.is_none() {
73 return Err(Error::OutOfSpec(
74 "The C stream MUST contain a non-null get_next".to_string(),
75 ));
76 };
77
78 if iter.get_last_error.is_none() {
79 return Err(Error::OutOfSpec(
80 "The C stream MUST contain a non-null get_last_error".to_string(),
81 ));
82 };
83
84 let mut field = ArrowSchema::empty();
85 let status = if let Some(f) = iter.get_schema {
86 unsafe { (f)(&mut *iter, &mut field) }
87 } else {
88 return Err(Error::OutOfSpec(
89 "The C stream MUST contain a non-null get_schema".to_string(),
90 ));
91 };
92
93 if status != 0 {
94 return Err(unsafe { handle_error(&mut iter) });
95 }
96
97 let field = unsafe { import_field_from_c(&field)? };
98
99 Ok(Self { iter, field })
100 }
101
102 pub fn field(&self) -> &Field {
104 &self.field
105 }
106
107 pub unsafe fn next(&mut self) -> Option<Result<Box<dyn Array>, Error>> {
116 let mut array = ArrowArray::empty();
117 let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };
118
119 if status != 0 {
120 return Some(Err(unsafe { handle_error(&mut self.iter) }));
121 }
122
123 array.release?;
125
126 unsafe { import_array_from_c(array, self.field.data_type.clone()) }
128 .map(Some)
129 .transpose()
130 }
131}
132
133struct PrivateData {
134 iter: Box<dyn Iterator<Item = Result<Box<dyn Array>, Error>>>,
135 field: Field,
136 error: Option<CString>,
137}
138
139unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
140 if iter.is_null() {
141 return 2001;
142 }
143 let private = &mut *((*iter).private_data as *mut PrivateData);
144
145 match private.iter.next() {
146 Some(Ok(item)) => {
147 let item_dt = item.data_type();
149 let expected_dt = private.field.data_type();
150 if item_dt != expected_dt {
151 private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
152 return 2001; }
154
155 std::ptr::write(array, export_array_to_c(item));
156
157 private.error = None;
158 0
159 }
160 Some(Err(err)) => {
161 private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
162 2001 }
164 None => {
165 let a = ArrowArray::empty();
166 std::ptr::write_unaligned(array, a);
167 private.error = None;
168 0
169 }
170 }
171}
172
173unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
174 if iter.is_null() {
175 return 2001;
176 }
177 let private = &mut *((*iter).private_data as *mut PrivateData);
178
179 std::ptr::write(schema, export_field_to_c(&private.field));
180 0
181}
182
183unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
184 if iter.is_null() {
185 return std::ptr::null();
186 }
187 let private = &mut *((*iter).private_data as *mut PrivateData);
188
189 private
190 .error
191 .as_ref()
192 .map(|x| x.as_ptr())
193 .unwrap_or(std::ptr::null())
194}
195
196unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
197 if iter.is_null() {
198 return;
199 }
200 let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
201 (*iter).release = None;
202 }
204
205pub fn export_iterator(
207 iter: Box<dyn Iterator<Item = Result<Box<dyn Array>, Error>>>,
208 field: Field,
209) -> ArrowArrayStream {
210 let private_data = Box::new(PrivateData {
211 iter,
212 field,
213 error: None,
214 });
215
216 ArrowArrayStream {
217 get_schema: Some(get_schema),
218 get_next: Some(get_next),
219 get_last_error: Some(get_last_error),
220 release: Some(release),
221 private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
222 }
223}