use std::ffi::CStr;
use std::sync::Arc;
use arrow_array::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema};
use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::{make_array, Array};
use arrow_schema::{ArrowError, Field, FieldRef};
use crate::ffi::ArrayReader;
#[derive(Debug)]
pub struct ArrowArrayStreamReader {
stream: FFI_ArrowArrayStream,
field: FieldRef,
}
fn get_stream_schema(stream_ptr: *mut FFI_ArrowArrayStream) -> Result<FieldRef, ArrowError> {
let mut schema = FFI_ArrowSchema::empty();
let ret_code = unsafe { (*stream_ptr).get_schema.unwrap()(stream_ptr, &mut schema) };
if ret_code == 0 {
let field = Field::try_from(&schema)?;
Ok(Arc::new(field))
} else {
Err(ArrowError::CDataInterface(format!(
"Cannot get schema from input stream. Error code: {ret_code:?}"
)))
}
}
impl ArrowArrayStreamReader {
#[allow(dead_code)]
pub fn try_new(mut stream: FFI_ArrowArrayStream) -> Result<Self, ArrowError> {
if stream.release.is_none() {
return Err(ArrowError::CDataInterface(
"input stream is already released".to_string(),
));
}
let field = get_stream_schema(&mut stream)?;
Ok(Self { stream, field })
}
pub fn field(&self) -> FieldRef {
self.field.clone()
}
fn get_stream_last_error(&mut self) -> Option<String> {
let get_last_error = self.stream.get_last_error?;
let error_str = unsafe { get_last_error(&mut self.stream) };
if error_str.is_null() {
return None;
}
let error_str = unsafe { CStr::from_ptr(error_str) };
Some(error_str.to_string_lossy().to_string())
}
}
impl Iterator for ArrowArrayStreamReader {
type Item = Result<Arc<dyn Array>, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
let mut array = FFI_ArrowArray::empty();
let ret_code = unsafe { self.stream.get_next.unwrap()(&mut self.stream, &mut array) };
if ret_code == 0 {
if array.is_released() {
return None;
}
let result = unsafe { from_ffi_and_data_type(array, self.field().data_type().clone()) };
Some(result.map(make_array))
} else {
let last_error = self.get_stream_last_error();
let err = ArrowError::CDataInterface(last_error.unwrap());
Some(Err(err))
}
}
}
impl ArrayReader for ArrowArrayStreamReader {
fn field(&self) -> FieldRef {
self.field.clone()
}
}