use std::ffi::{CStr, CString};
use std::sync::Arc;
use crate::{array::Array, datatypes::Field, error::ArrowError};
use super::{export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c};
use super::{ArrowArray, ArrowArrayStream, ArrowSchema};
impl Drop for ArrowArrayStream {
fn drop(&mut self) {
match self.release {
None => (),
Some(release) => unsafe { release(self) },
};
}
}
impl ArrowArrayStream {
pub fn empty() -> Self {
Self {
get_schema: None,
get_next: None,
get_last_error: None,
release: None,
private_data: std::ptr::null_mut(),
}
}
}
unsafe fn handle_error(iter: &mut ArrowArrayStream) -> ArrowError {
let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
if error.is_null() {
return ArrowError::External(
"C stream".to_string(),
Box::new(ArrowError::ExternalFormat(
"an unspecified error".to_string(),
)),
);
}
let error = unsafe { CStr::from_ptr(error) };
ArrowError::External(
"C stream".to_string(),
Box::new(ArrowError::ExternalFormat(
error.to_str().unwrap().to_string(),
)),
)
}
pub struct ArrowArrayStreamReader {
iter: Box<ArrowArrayStream>,
field: Field,
}
impl ArrowArrayStreamReader {
pub unsafe fn try_new(mut iter: Box<ArrowArrayStream>) -> Result<Self, ArrowError> {
let mut field = Box::new(ArrowSchema::empty());
if iter.get_next.is_none() {
return Err(ArrowError::OutOfSpec(
"The C stream MUST contain a non-null get_next".to_string(),
));
};
if iter.get_last_error.is_none() {
return Err(ArrowError::OutOfSpec(
"The C stream MUST contain a non-null get_last_error".to_string(),
));
};
let status = if let Some(f) = iter.get_schema {
unsafe { (f)(&mut *iter, &mut *field) }
} else {
return Err(ArrowError::OutOfSpec(
"The C stream MUST contain a non-null get_schema".to_string(),
));
};
if status != 0 {
return Err(unsafe { handle_error(&mut iter) });
}
let field = unsafe { import_field_from_c(&field)? };
Ok(Self { iter, field })
}
pub fn field(&self) -> &Field {
&self.field
}
pub unsafe fn next(&mut self) -> Option<Result<Box<dyn Array>, ArrowError>> {
let mut array = Box::new(ArrowArray::empty());
let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut *array) };
if status != 0 {
return Some(Err(unsafe { handle_error(&mut self.iter) }));
}
array.release?;
unsafe { import_array_from_c(array, self.field.data_type.clone()) }
.map(Some)
.transpose()
}
}
struct PrivateData {
iter: Box<dyn Iterator<Item = Result<Arc<dyn Array>, ArrowError>>>,
field: Field,
error: Option<CString>,
}
unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
if iter.is_null() {
return 2001;
}
let mut private = &mut *((*iter).private_data as *mut PrivateData);
match private.iter.next() {
Some(Ok(item)) => {
let item_dt = item.data_type();
let expected_dt = private.field.data_type();
if item_dt != expected_dt {
private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
return 2001; }
export_array_to_c(item, array);
private.error = None;
0
}
Some(Err(err)) => {
private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
2001 }
None => {
let a = ArrowArray::empty();
std::ptr::write_unaligned(array, a);
private.error = None;
0
}
}
}
unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
if iter.is_null() {
return 2001;
}
let private = &mut *((*iter).private_data as *mut PrivateData);
export_field_to_c(&private.field, schema);
0
}
unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
if iter.is_null() {
return std::ptr::null();
}
let private = &mut *((*iter).private_data as *mut PrivateData);
private
.error
.as_ref()
.map(|x| x.as_ptr())
.unwrap_or(std::ptr::null())
}
unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
if iter.is_null() {
return;
}
let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
(*iter).release = None;
}
pub unsafe fn export_iterator(
iter: Box<dyn Iterator<Item = Result<Arc<dyn Array>, ArrowError>>>,
field: Field,
consumer: *mut ArrowArrayStream,
) {
let private_data = Box::new(PrivateData {
iter,
field,
error: None,
});
*consumer = ArrowArrayStream {
get_schema: Some(get_schema),
get_next: Some(get_next),
get_last_error: Some(get_last_error),
release: Some(release),
private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
}
}