pyo3_arrow/
array_reader.rs

1use std::fmt::Display;
2use std::sync::Mutex;
3
4use arrow_schema::FieldRef;
5use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
6use pyo3::intern;
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyTuple, PyType};
9
10use crate::error::PyArrowResult;
11use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3Field};
12use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
13use crate::ffi::from_python::utils::import_stream_pycapsule;
14use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
15use crate::ffi::to_python::to_stream_pycapsule;
16use crate::ffi::{to_schema_pycapsule, ArrayIterator, ArrayReader};
17use crate::input::AnyArray;
18use crate::{PyArray, PyChunkedArray, PyField};
19
20/// A Python-facing Arrow array reader.
21///
22/// This is a wrapper around a [ArrayReader].
23#[pyclass(module = "arro3.core._core", name = "ArrayReader", subclass, frozen)]
24pub struct PyArrayReader(pub(crate) Mutex<Option<Box<dyn ArrayReader + Send>>>);
25
26impl PyArrayReader {
27    /// Construct a new [PyArrayReader] from an existing [ArrayReader].
28    pub fn new(reader: Box<dyn ArrayReader + Send>) -> Self {
29        Self(Mutex::new(Some(reader)))
30    }
31
32    /// Import from a raw Arrow C Stream capsule
33    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
34        let stream = import_stream_pycapsule(capsule)?;
35        let stream_reader = ArrowArrayStreamReader::try_new(stream)
36            .map_err(|err| PyValueError::new_err(err.to_string()))?;
37        Ok(Self::new(Box::new(stream_reader)))
38    }
39
40    /// Consume this reader and convert into a [ArrayReader].
41    ///
42    /// The reader can only be consumed once. Calling `into_reader`
43    pub fn into_reader(self) -> PyResult<Box<dyn ArrayReader + Send>> {
44        self.to_reader()
45    }
46
47    /// Consume this reader and create a [PyChunkedArray] object
48    pub fn into_chunked_array(self) -> PyArrowResult<PyChunkedArray> {
49        self.to_chunked_array()
50    }
51
52    /// Consume this reader and convert into a [ArrayReader].
53    ///
54    /// The reader can only be consumed once. Calling `into_reader`
55    pub fn to_reader(&self) -> PyResult<Box<dyn ArrayReader + Send>> {
56        let stream = self
57            .0
58            .lock()
59            .unwrap()
60            .take()
61            .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
62        Ok(stream)
63    }
64
65    /// Consume this reader and create a [PyChunkedArray] object
66    pub fn to_chunked_array(&self) -> PyArrowResult<PyChunkedArray> {
67        let stream = self
68            .0
69            .lock()
70            .unwrap()
71            .take()
72            .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
73        let field = stream.field();
74        let mut arrays = vec![];
75        for array in stream {
76            arrays.push(array?);
77        }
78        Ok(PyChunkedArray::try_new(arrays, field)?)
79    }
80
81    /// Access the [FieldRef] of this ArrayReader.
82    ///
83    /// If the stream has already been consumed, this method will error.
84    pub fn field_ref(&self) -> PyResult<FieldRef> {
85        let inner = self.0.lock().unwrap();
86        let stream = inner
87            .as_ref()
88            .ok_or(PyIOError::new_err("Stream already closed."))?;
89        Ok(stream.field())
90    }
91
92    /// Export this to a Python `arro3.core.ArrayReader`.
93    pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
94        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
95        arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
96            intern!(py, "from_arrow_pycapsule"),
97            PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
98        )
99    }
100
101    /// Export this to a Python `arro3.core.ArrayReader`.
102    pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
103        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
104        let array_reader = self
105            .0
106            .lock()
107            .unwrap()
108            .take()
109            .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
110        let stream_pycapsule = to_stream_pycapsule(py, array_reader, None)?;
111        arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
112            intern!(py, "from_arrow_pycapsule"),
113            PyTuple::new(py, vec![stream_pycapsule])?,
114        )
115    }
116
117    /// Export this to a Python `nanoarrow.ArrayStream`.
118    pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
119        to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
120    }
121}
122
123impl AsRef<Mutex<Option<Box<dyn ArrayReader + Send>>>> for PyArrayReader {
124    fn as_ref(&self) -> &Mutex<Option<Box<dyn ArrayReader + Send>>> {
125        &self.0
126    }
127}
128
129impl From<Box<dyn ArrayReader + Send>> for PyArrayReader {
130    fn from(value: Box<dyn ArrayReader + Send>) -> Self {
131        Self::new(value)
132    }
133}
134
135impl Display for PyArrayReader {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        writeln!(f, "arro3.core.ArrayReader")?;
138        writeln!(f, "-----------------------")?;
139        if let Ok(field) = self.field_ref() {
140            field.data_type().fmt(f)
141        } else {
142            writeln!(f, "Closed stream")
143        }
144    }
145}
146
147#[pymethods]
148impl PyArrayReader {
149    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
150        to_schema_pycapsule(py, self.field_ref()?.as_ref())
151    }
152
153    #[pyo3(signature = (requested_schema=None))]
154    fn __arrow_c_stream__<'py>(
155        &'py self,
156        py: Python<'py>,
157        requested_schema: Option<Bound<'py, PyCapsule>>,
158    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
159        let array_reader = self
160            .0
161            .lock()
162            .unwrap()
163            .take()
164            .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
165        to_stream_pycapsule(py, array_reader, requested_schema)
166    }
167
168    // Return self
169    // https://stackoverflow.com/a/52056290
170    fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
171        slf
172    }
173
174    fn __next__(&self) -> PyArrowResult<Arro3Array> {
175        self.read_next_array()
176    }
177
178    fn __repr__(&self) -> String {
179        self.to_string()
180    }
181
182    #[getter]
183    fn closed(&self) -> bool {
184        self.0.lock().unwrap().is_none()
185    }
186
187    #[classmethod]
188    fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
189        let reader = input.into_reader()?;
190        Ok(Self::new(reader))
191    }
192
193    #[classmethod]
194    #[pyo3(name = "from_arrow_pycapsule")]
195    fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
196        Self::from_arrow_pycapsule(capsule)
197    }
198
199    #[classmethod]
200    fn from_arrays(_cls: &Bound<PyType>, field: PyField, arrays: Vec<PyArray>) -> Self {
201        let arrays = arrays
202            .into_iter()
203            .map(|array| {
204                let (arr, _field) = array.into_inner();
205                arr
206            })
207            .collect::<Vec<_>>();
208        Self::new(Box::new(ArrayIterator::new(
209            arrays.into_iter().map(Ok),
210            field.into_inner(),
211        )))
212    }
213
214    #[classmethod]
215    fn from_stream(_cls: &Bound<PyType>, data: &Bound<PyAny>) -> PyResult<Self> {
216        data.extract()
217    }
218
219    #[getter]
220    fn field(&self) -> PyResult<Arro3Field> {
221        Ok(PyField::new(self.field_ref()?).into())
222    }
223
224    fn read_all(&self) -> PyArrowResult<Arro3ChunkedArray> {
225        let stream = self
226            .0
227            .lock()
228            .unwrap()
229            .take()
230            .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
231        let field = stream.field();
232        let mut arrays = vec![];
233        for array in stream {
234            arrays.push(array?);
235        }
236        Ok(PyChunkedArray::try_new(arrays, field)?.into())
237    }
238
239    fn read_next_array(&self) -> PyArrowResult<Arro3Array> {
240        let mut inner = self.0.lock().unwrap();
241        let stream = inner
242            .as_mut()
243            .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
244
245        if let Some(next_batch) = stream.next() {
246            Ok(PyArray::new(next_batch?, stream.field()).into())
247        } else {
248            Err(PyStopIteration::new_err("").into())
249        }
250    }
251}