re_arrow2/ffi/
stream.rs

1use std::ffi::{CStr, CString};
2use std::ops::DerefMut;
3
4use crate::{array::Array, datatypes::Field, error::Error};
5
6use super::{export_array_to_c, export_field_to_c, import_array_from_c, import_field_from_c};
7use super::{ArrowArray, ArrowArrayStream, ArrowSchema};
8
9impl Drop for ArrowArrayStream {
10    fn drop(&mut self) {
11        match self.release {
12            None => (),
13            Some(release) => unsafe { release(self) },
14        };
15    }
16}
17
18impl ArrowArrayStream {
19    /// Creates an empty [`ArrowArrayStream`] used to import from a producer.
20    pub fn empty() -> Self {
21        Self {
22            get_schema: None,
23            get_next: None,
24            get_last_error: None,
25            release: None,
26            private_data: std::ptr::null_mut(),
27        }
28    }
29}
30
31unsafe fn handle_error(iter: &mut ArrowArrayStream) -> Error {
32    let error = unsafe { (iter.get_last_error.unwrap())(&mut *iter) };
33
34    if error.is_null() {
35        return Error::External(
36            "C stream".to_string(),
37            Box::new(Error::ExternalFormat("an unspecified error".to_string())),
38        );
39    }
40
41    let error = unsafe { CStr::from_ptr(error) };
42    Error::External(
43        "C stream".to_string(),
44        Box::new(Error::ExternalFormat(error.to_str().unwrap().to_string())),
45    )
46}
47
48/// Implements an iterator of [`Array`] consumed from the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html).
49pub struct ArrowArrayStreamReader<Iter: DerefMut<Target = ArrowArrayStream>> {
50    iter: Iter,
51    field: Field,
52}
53
54impl<Iter: DerefMut<Target = ArrowArrayStream>> ArrowArrayStreamReader<Iter> {
55    /// Returns a new [`ArrowArrayStreamReader`]
56    /// # Error
57    /// Errors iff the [`ArrowArrayStream`] is out of specification,
58    /// or was already released prior to calling this function.
59    /// # Safety
60    /// This method is intrinsically `unsafe` since it assumes that the `ArrowArrayStream`
61    /// contains a valid Arrow C stream interface.
62    /// In particular:
63    /// * The `ArrowArrayStream` fulfills the invariants of the C stream interface
64    /// * The schema `get_schema` produces fulfills the C data interface
65    pub unsafe fn try_new(mut iter: Iter) -> Result<Self, Error> {
66        if iter.release.is_none() {
67            return Err(Error::InvalidArgumentError(
68                "The C stream was already released".to_string(),
69            ));
70        };
71
72        if iter.get_next.is_none() {
73            return Err(Error::OutOfSpec(
74                "The C stream MUST contain a non-null get_next".to_string(),
75            ));
76        };
77
78        if iter.get_last_error.is_none() {
79            return Err(Error::OutOfSpec(
80                "The C stream MUST contain a non-null get_last_error".to_string(),
81            ));
82        };
83
84        let mut field = ArrowSchema::empty();
85        let status = if let Some(f) = iter.get_schema {
86            unsafe { (f)(&mut *iter, &mut field) }
87        } else {
88            return Err(Error::OutOfSpec(
89                "The C stream MUST contain a non-null get_schema".to_string(),
90            ));
91        };
92
93        if status != 0 {
94            return Err(unsafe { handle_error(&mut iter) });
95        }
96
97        let field = unsafe { import_field_from_c(&field)? };
98
99        Ok(Self { iter, field })
100    }
101
102    /// Returns the field provided by the stream
103    pub fn field(&self) -> &Field {
104        &self.field
105    }
106
107    /// Advances this iterator by one array
108    /// # Error
109    /// Errors iff:
110    /// * The C stream interface returns an error
111    /// * The C stream interface returns an invalid array (that we can identify, see Safety below)
112    /// # Safety
113    /// Calling this iterator's `next` assumes that the [`ArrowArrayStream`] produces arrow arrays
114    /// that fulfill the C data interface
115    pub unsafe fn next(&mut self) -> Option<Result<Box<dyn Array>, Error>> {
116        let mut array = ArrowArray::empty();
117        let status = unsafe { (self.iter.get_next.unwrap())(&mut *self.iter, &mut array) };
118
119        if status != 0 {
120            return Some(Err(unsafe { handle_error(&mut self.iter) }));
121        }
122
123        // last paragraph of https://arrow.apache.org/docs/format/CStreamInterface.html#c.ArrowArrayStream.get_next
124        array.release?;
125
126        // Safety: assumed from the C stream interface
127        unsafe { import_array_from_c(array, self.field.data_type.clone()) }
128            .map(Some)
129            .transpose()
130    }
131}
132
133struct PrivateData {
134    iter: Box<dyn Iterator<Item = Result<Box<dyn Array>, Error>>>,
135    field: Field,
136    error: Option<CString>,
137}
138
139unsafe extern "C" fn get_next(iter: *mut ArrowArrayStream, array: *mut ArrowArray) -> i32 {
140    if iter.is_null() {
141        return 2001;
142    }
143    let private = &mut *((*iter).private_data as *mut PrivateData);
144
145    match private.iter.next() {
146        Some(Ok(item)) => {
147            // check that the array has the same data_type as field
148            let item_dt = item.data_type();
149            let expected_dt = private.field.data_type();
150            if item_dt != expected_dt {
151                private.error = Some(CString::new(format!("The iterator produced an item of data type {item_dt:?} but the producer expects data type {expected_dt:?}").as_bytes().to_vec()).unwrap());
152                return 2001; // custom application specific error (since this is never a result of this interface)
153            }
154
155            std::ptr::write(array, export_array_to_c(item));
156
157            private.error = None;
158            0
159        }
160        Some(Err(err)) => {
161            private.error = Some(CString::new(err.to_string().as_bytes().to_vec()).unwrap());
162            2001 // custom application specific error (since this is never a result of this interface)
163        }
164        None => {
165            let a = ArrowArray::empty();
166            std::ptr::write_unaligned(array, a);
167            private.error = None;
168            0
169        }
170    }
171}
172
173unsafe extern "C" fn get_schema(iter: *mut ArrowArrayStream, schema: *mut ArrowSchema) -> i32 {
174    if iter.is_null() {
175        return 2001;
176    }
177    let private = &mut *((*iter).private_data as *mut PrivateData);
178
179    std::ptr::write(schema, export_field_to_c(&private.field));
180    0
181}
182
183unsafe extern "C" fn get_last_error(iter: *mut ArrowArrayStream) -> *const ::std::os::raw::c_char {
184    if iter.is_null() {
185        return std::ptr::null();
186    }
187    let private = &mut *((*iter).private_data as *mut PrivateData);
188
189    private
190        .error
191        .as_ref()
192        .map(|x| x.as_ptr())
193        .unwrap_or(std::ptr::null())
194}
195
196unsafe extern "C" fn release(iter: *mut ArrowArrayStream) {
197    if iter.is_null() {
198        return;
199    }
200    let _ = Box::from_raw((*iter).private_data as *mut PrivateData);
201    (*iter).release = None;
202    // private drops automatically
203}
204
205/// Exports an iterator to the [C stream interface](https://arrow.apache.org/docs/format/CStreamInterface.html)
206pub fn export_iterator(
207    iter: Box<dyn Iterator<Item = Result<Box<dyn Array>, Error>>>,
208    field: Field,
209) -> ArrowArrayStream {
210    let private_data = Box::new(PrivateData {
211        iter,
212        field,
213        error: None,
214    });
215
216    ArrowArrayStream {
217        get_schema: Some(get_schema),
218        get_next: Some(get_next),
219        get_last_error: Some(get_last_error),
220        release: Some(release),
221        private_data: Box::into_raw(private_data) as *mut ::std::os::raw::c_void,
222    }
223}