polars-arrow 0.53.0

Minimal implementation of the Arrow specification forked from arrow2
Documentation
use std::ffi::{CStr, CString};
use std::ops::DerefMut;

use polars_error::{PolarsError, PolarsResult, polars_bail, polars_err};

use super::{
    ArrowArray, ArrowArrayStream, ArrowSchema, export_array_to_c, export_field_to_c,
    import_array_from_c, import_field_from_c,
};
use crate::array::Array;
use crate::datatypes::Field;

impl Drop for ArrowArrayStream {
    fn drop(&mut self) {
        match self.release {
            None => (),
            Some(release) => unsafe { release(self) },
        };
    }
}

unsafe impl Send for ArrowArrayStream {}

impl ArrowArrayStream {
    /// Creates an empty [`ArrowArrayStream`] used to import from a producer.
    pub fn empty() -> Self {
        Self {
            get_schema: None,
            get_next: None,
            get_last_error: None,
            release: None,
            private_data: std::ptr::null_mut(),
        }
    }
}

unsafe fn handle_error(iter: &mut ArrowArrayStream) -> PolarsError {
    let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };

    if error.is_null() {
        return polars_err!(ComputeError: "got unspecified external error");
    }

    let error = unsafe { CStr::from_ptr(error) };
    polars_err!(ComputeError: "got external error: {}", error.to_str().unwrap())
}

/// Implements an iterator of [`Array`] consumed from the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html).
pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
    iter: Iter,
    field: Field,
}

impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
    /// Returns a new [`ArrowArrayStreamReader`]
    /// # Error
    /// Errors iff the [`ArrowArrayStream`] is out of specification,
    /// or was already released prior to calling this function.
    ///
    /// # Safety
    /// This method is intrinsically `unsafe` since it assumes that the `ArrowArrayStream`
    /// contains a valid Arrow C stream interface.
    /// In particular:
    /// * The `ArrowArrayStream` fulfills the invariants of the C stream interface
    /// * The schema `get_schema` produces fulfills the C data interface
    pub unsafe fn try_new(mut iter: Iter) -> PolarsResult<Self> {
        if iter.release.is_none() {
            polars_bail!(InvalidOperation: "the C stream was already released")
        };

        if iter.get_next.is_none() {
            polars_bail!(InvalidOperation: "the C stream must contain a non-null get_next")
        };

        if iter.get_last_error.is_none() {
            polars_bail!(InvalidOperation: "The C stream MUST contain a non-null get_last_error")
        };

        let mut field = ArrowSchema::empty();
        let status = if let Some(f) = iter.get_schema {
            unsafe { (f)(&mut *iter, &mut field) }
        } else {
            polars_bail!(InvalidOperation:
                            "The C stream MUST contain a non-null get_schema"
            )
        };

        if status != 0 {
            return Err(unsafe { handle_error(&mut iter) });
        }

        let field = unsafe { import_field_from_c(&field)? };

        Ok(Self { iter, field })
    }

    /// Returns the field provided by the stream
    pub fn field(&self) -> &Field {
        &self.field
    }

    /// Advances this iterator by one array
    /// # Error
    /// Errors iff:
    /// * The C stream interface returns an error
    /// * The C stream interface returns an invalid array (that we can identify, see Safety below)
    ///
    /// # Safety
    /// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays
    /// that fulfill the C data interface
    pub unsafe fn next(&mut self) -> Option<PolarsResult<Box<dyn Array>>> {
        let mut array = ArrowArray::empty();
        let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };

        if status != 0 {
            return Some(Err(unsafe { handle_error(&mut self.iter) }));
        }

        // last paragraph of https://arrow.apache.org/docs/format/CStreamInterface.html#c.ArrowArrayStream.get_next
        array.release?;

        // SAFETY: assumed from the C stream interface
        unsafe { import_array_from_c(array, self.field.dtype.clone()) }
            .map(Some)
            .transpose()
    }
}

struct PrivateData {
    iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
    field: Field,
    error: Option<CString>,
}

unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
    if iter.is_null() {
        return 2001;
    }
    let private = &mut *((*iter).private_data as *mut PrivateData);

    match private.iter.next() {
        Some(Ok(item)) => {
            // check that the array has the same dtype as field
            let item_dt = item.dtype();
            let expected_dt = private.field.dtype();
            if item_dt != expected_dt {
                private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
                return 2001; // custom application specific error (since this is never a result of this interface)
            }

            std::ptr::write(array, export_array_to_c(item));

            private.error = None;
            0
        },
        Some(Err(err)) => {
            private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
            2001 // custom application specific error (since this is never a result of this interface)
        },
        None => {
            let a = ArrowArray::empty();
            std::ptr::write_unaligned(array, a);
            private.error = None;
            0
        },
    }
}

unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
    if iter.is_null() {
        return 2001;
    }
    let private = &mut *((*iter).private_data as *mut PrivateData);

    std::ptr::write(schema, export_field_to_c(&private.field));
    0
}

unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
    if iter.is_null() {
        return std::ptr::null();
    }
    let private = &mut *((*iter).private_data as *mut PrivateData);

    private
        .error
        .as_ref()
        .map(|x| x.as_ptr())
        .unwrap_or(std::ptr::null())
}

unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
    if iter.is_null() {
        return;
    }
    let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
    (*iter).release = None;
    // private drops automatically
}

/// Exports an iterator to the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html)
pub fn export_iterator(
    iter: Box<dyn Iterator<Item = PolarsResult<Box<dyn Array>>>>,
    field: Field,
) -> ArrowArrayStream {
    let private_data = Box::new(PrivateData {
        iter,
        field,
        error: None,
    });

    ArrowArrayStream {
        get_schema: Some(get_schema),
        get_next: Some(get_next),
        get_last_error: Some(get_last_error),
        release: Some(release),
        private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
    }
}