pyo3_arrow/
array_reader.rs

1use std::fmt::Display;
2use std::sync::Mutex;
3
4use arrow_schema::FieldRef;
5use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
6use pyo3::intern;
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyTuple, PyType};
9
10use crate::error::PyArrowResult;
11use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3Field};
12use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
13use crate::ffi::from_python::utils::import_stream_pycapsule;
14use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
15use crate::ffi::to_python::to_stream_pycapsule;
16use crate::ffi::{to_schema_pycapsule, ArrayIterator, ArrayReader};
17use crate::input::AnyArray;
18use crate::{PyArray, PyChunkedArray, PyField};
19
20/// A Python-facing Arrow array reader.
21///
22/// This is a wrapper around a [ArrayReader].
23#[pyclass(module = "arro3.core._core", name = "ArrayReader", subclass, frozen)]
24pub struct PyArrayReader(pub(crate) Mutex<Option<Box<dyn ArrayReader + Send>>>);
25
26impl PyArrayReader {
27    /// Construct a new [PyArrayReader] from an existing [ArrayReader].
28    pub fn new(reader: Box<dyn ArrayReader + Send>) -> Self {
29        Self(Mutex::new(Some(reader)))
30    }
31
32    /// Import from a raw Arrow C Stream capsule
33    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
34        let stream = import_stream_pycapsule(capsule)?;
35        let stream_reader = ArrowArrayStreamReader::try_new(stream)
36            .map_err(|err| PyValueError::new_err(err.to_string()))?;
37        Ok(Self::new(Box::new(stream_reader)))
38    }
39
40    /// Consume this reader and convert into a [ArrayReader].
41    ///
42    /// The reader can only be consumed once. Calling `into_reader`
43    pub fn into_reader(self) -> PyResult<Box<dyn ArrayReader + Send>> {
44        let stream = self
45            .0
46            .lock()
47            .unwrap()
48            .take()
49            .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
50        Ok(stream)
51    }
52
53    /// Consume this reader and create a [PyChunkedArray] object
54    pub fn into_chunked_array(self) -> PyArrowResult<PyChunkedArray> {
55        let stream = self
56            .0
57            .lock()
58            .unwrap()
59            .take()
60            .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
61        let field = stream.field();
62        let mut arrays = vec![];
63        for array in stream {
64            arrays.push(array?);
65        }
66        Ok(PyChunkedArray::try_new(arrays, field)?)
67    }
68
69    /// Access the [FieldRef] of this ArrayReader.
70    ///
71    /// If the stream has already been consumed, this method will error.
72    pub fn field_ref(&self) -> PyResult<FieldRef> {
73        let inner = self.0.lock().unwrap();
74        let stream = inner
75            .as_ref()
76            .ok_or(PyIOError::new_err("Stream already closed."))?;
77        Ok(stream.field())
78    }
79
80    /// Export this to a Python `arro3.core.ArrayReader`.
81    pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
82        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
83        arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
84            intern!(py, "from_arrow_pycapsule"),
85            PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
86        )
87    }
88
89    /// Export this to a Python `arro3.core.ArrayReader`.
90    pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
91        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
92        let array_reader = self
93            .0
94            .lock()
95            .unwrap()
96            .take()
97            .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
98        let stream_pycapsule = to_stream_pycapsule(py, array_reader, None)?;
99        arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
100            intern!(py, "from_arrow_pycapsule"),
101            PyTuple::new(py, vec![stream_pycapsule])?,
102        )
103    }
104
105    /// Export this to a Python `nanoarrow.ArrayStream`.
106    pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
107        to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
108    }
109}
110
111impl From<Box<dyn ArrayReader + Send>> for PyArrayReader {
112    fn from(value: Box<dyn ArrayReader + Send>) -> Self {
113        Self::new(value)
114    }
115}
116
117impl Display for PyArrayReader {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        writeln!(f, "arro3.core.ArrayReader")?;
120        writeln!(f, "-----------------------")?;
121        if let Ok(field) = self.field_ref() {
122            field.data_type().fmt(f)
123        } else {
124            writeln!(f, "Closed stream")
125        }
126    }
127}
128
129#[pymethods]
130impl PyArrayReader {
131    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
132        to_schema_pycapsule(py, self.field_ref()?.as_ref())
133    }
134
135    #[pyo3(signature = (requested_schema=None))]
136    fn __arrow_c_stream__<'py>(
137        &'py self,
138        py: Python<'py>,
139        requested_schema: Option<Bound<'py, PyCapsule>>,
140    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
141        let array_reader = self
142            .0
143            .lock()
144            .unwrap()
145            .take()
146            .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
147        to_stream_pycapsule(py, array_reader, requested_schema)
148    }
149
150    // Return self
151    // https://stackoverflow.com/a/52056290
152    fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
153        slf
154    }
155
156    fn __next__(&self) -> PyArrowResult<Arro3Array> {
157        self.read_next_array()
158    }
159
160    fn __repr__(&self) -> String {
161        self.to_string()
162    }
163
164    #[getter]
165    fn closed(&self) -> bool {
166        self.0.lock().unwrap().is_none()
167    }
168
169    #[classmethod]
170    fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
171        let reader = input.into_reader()?;
172        Ok(Self::new(reader))
173    }
174
175    #[classmethod]
176    #[pyo3(name = "from_arrow_pycapsule")]
177    fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
178        Self::from_arrow_pycapsule(capsule)
179    }
180
181    #[classmethod]
182    fn from_arrays(_cls: &Bound<PyType>, field: PyField, arrays: Vec<PyArray>) -> Self {
183        let arrays = arrays
184            .into_iter()
185            .map(|array| {
186                let (arr, _field) = array.into_inner();
187                arr
188            })
189            .collect::<Vec<_>>();
190        Self::new(Box::new(ArrayIterator::new(
191            arrays.into_iter().map(Ok),
192            field.into_inner(),
193        )))
194    }
195
196    #[classmethod]
197    fn from_stream(_cls: &Bound<PyType>, data: &Bound<PyAny>) -> PyResult<Self> {
198        data.extract()
199    }
200
201    #[getter]
202    fn field(&self) -> PyResult<Arro3Field> {
203        Ok(PyField::new(self.field_ref()?).into())
204    }
205
206    fn read_all(&self) -> PyArrowResult<Arro3ChunkedArray> {
207        let stream = self
208            .0
209            .lock()
210            .unwrap()
211            .take()
212            .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
213        let field = stream.field();
214        let mut arrays = vec![];
215        for array in stream {
216            arrays.push(array?);
217        }
218        Ok(PyChunkedArray::try_new(arrays, field)?.into())
219    }
220
221    fn read_next_array(&self) -> PyArrowResult<Arro3Array> {
222        let mut inner = self.0.lock().unwrap();
223        let stream = inner
224            .as_mut()
225            .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
226
227        if let Some(next_batch) = stream.next() {
228            Ok(PyArray::new(next_batch?, stream.field()).into())
229        } else {
230            Err(PyStopIteration::new_err("").into())
231        }
232    }
233}