Skip to main content

pyo3_geoarrow/
array_reader.rs

1use std::sync::{Arc, Mutex};
2
3use geoarrow_array::array::from_arrow_array;
4use geoarrow_array::{GeoArrowArrayIterator, GeoArrowArrayReader};
5use geoarrow_schema::GeoArrowType;
6use geoarrow_schema::error::GeoArrowResult;
7use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
8use pyo3::intern;
9use pyo3::prelude::*;
10use pyo3::types::{PyCapsule, PyTuple, PyType};
11use pyo3_arrow::PyArrayReader;
12use pyo3_arrow::error::PyArrowResult;
13use pyo3_arrow::ffi::{ArrayIterator, ArrayReader, to_schema_pycapsule, to_stream_pycapsule};
14use pyo3_arrow::input::AnyArray;
15
16use crate::data_type::PyGeoType;
17use crate::utils::text_repr::text_repr;
18use crate::{PyGeoArray, PyGeoArrowError, PyGeoArrowResult, PyGeoChunkedArray};
19
20/// Python wrapper for a GeoArrow array reader (stream).
21///
22/// This type represents a stream of GeoArrow arrays that can be read incrementally. It implements
23/// the Arrow C Stream Interface, allowing zero-copy data exchange with Arrow-compatible Python
24/// libraries.
25///
26/// The reader can be iterated over to yield individual [`PyGeoArray`] chunks, or materialized
27/// into a [`PyGeoChunkedArray`] using the [`into_chunked_array()`][Self::into_chunked_array]
28/// method. For stream processing, prefer [`into_reader()`][Self::into_reader].
29#[pyclass(
30    module = "geoarrow.rust.core",
31    name = "GeoArrayReader",
32    subclass,
33    frozen
34)]
35pub struct PyGeoArrayReader {
36    iter: Mutex<Option<Box<dyn GeoArrowArrayReader + Send>>>,
37    data_type: GeoArrowType,
38}
39
40impl PyGeoArrayReader {
41    /// Create a new [`PyGeoArrayReader`] from a GeoArrow array reader.
42    pub fn new(reader: Box<dyn GeoArrowArrayReader + Send>) -> Self {
43        let data_type = reader.data_type();
44        Self {
45            iter: Mutex::new(Some(reader)),
46            data_type,
47        }
48    }
49
50    /// Import from a raw Arrow C Stream capsule
51    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyGeoArrowResult<Self> {
52        let reader = PyArrayReader::from_arrow_pycapsule(capsule)?;
53        Ok(Self::new(array_reader_to_geoarrow_array_reader(
54            reader.into_reader()?,
55        )?))
56    }
57
58    // pub fn into_inner(self) -> (PyArrayReader, GeoArrowType) {
59    //     (self.iter, self.data_type)
60    // }
61
62    /// Get the GeoArrow data type of arrays in this stream.
63    pub fn data_type(&self) -> &GeoArrowType {
64        &self.data_type
65    }
66
67    /// Consume this reader and convert into a [ArrayReader].
68    ///
69    /// The reader can only be consumed once. Calling `into_reader`
70    pub fn into_reader(self) -> PyResult<Box<dyn GeoArrowArrayReader + Send>> {
71        let stream = self
72            .iter
73            .lock()
74            .unwrap()
75            .take()
76            .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
77        Ok(stream)
78    }
79
80    /// Consume this reader and create a [PyGeoChunkedArray] object
81    pub fn into_chunked_array(self) -> PyGeoArrowResult<PyGeoChunkedArray> {
82        self.read_all()
83    }
84
85    /// Export to a geoarrow.rust.core.GeoArrowArrayReader.
86    ///
87    /// This requires that you depend on geoarrow-rust-core from your Python package.
88    pub fn to_geoarrow_py<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
89        let geoarrow_mod = py.import(intern!(py, "geoarrow.rust.core"))?;
90        geoarrow_mod
91            .getattr(intern!(py, "GeoArrayReader"))?
92            .call_method1(
93                intern!(py, "from_arrow_pycapsule"),
94                PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
95            )
96    }
97
98    /// Export to a geoarrow.rust.core.GeoArrowArrayReader.
99    ///
100    /// This requires that you depend on geoarrow-rust-core from your Python package.
101    pub fn into_geoarrow_py(self, py: Python) -> PyResult<Bound<PyAny>> {
102        let geoarrow_mod = py.import(intern!(py, "geoarrow.rust.core"))?;
103        let geoarray_reader = self
104            .iter
105            .lock()
106            .unwrap()
107            .take()
108            .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
109        let array_reader = geoarrow_array_reader_to_array_reader(geoarray_reader)?;
110        let stream_pycapsule = to_stream_pycapsule(py, array_reader, None)?;
111        geoarrow_mod
112            .getattr(intern!(py, "GeoArrayReader"))?
113            .call_method1(
114                intern!(py, "from_arrow_pycapsule"),
115                PyTuple::new(py, vec![stream_pycapsule])?,
116            )
117    }
118}
119
120impl TryFrom<Box<dyn ArrayReader + Send>> for PyGeoArrayReader {
121    type Error = PyGeoArrowError;
122
123    fn try_from(value: Box<dyn ArrayReader + Send>) -> Result<Self, Self::Error> {
124        Ok(Self::new(array_reader_to_geoarrow_array_reader(value)?))
125    }
126}
127
128impl TryFrom<PyArrayReader> for PyGeoArrayReader {
129    type Error = PyGeoArrowError;
130
131    fn try_from(value: PyArrayReader) -> Result<Self, Self::Error> {
132        value.into_reader()?.try_into()
133    }
134}
135
136#[pymethods]
137impl PyGeoArrayReader {
138    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
139        let field = self.data_type.to_field("", true);
140        to_schema_pycapsule(py, field)
141    }
142
143    #[pyo3(signature = (requested_schema=None))]
144    fn __arrow_c_stream__<'py>(
145        &'py self,
146        py: Python<'py>,
147        requested_schema: Option<Bound<'py, PyCapsule>>,
148    ) -> PyGeoArrowResult<Bound<'py, PyCapsule>> {
149        let geoarray_reader = self
150            .iter
151            .lock()
152            .unwrap()
153            .take()
154            .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
155        let array_reader = geoarrow_array_reader_to_array_reader(geoarray_reader)?;
156        Ok(to_stream_pycapsule(py, array_reader, requested_schema)?)
157    }
158
159    // Return self
160    // https://stackoverflow.com/a/52056290
161    fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
162        slf
163    }
164
165    fn __next__(&self) -> PyGeoArrowResult<PyGeoArray> {
166        self.read_next_array()
167    }
168
169    fn __repr__(&self) -> String {
170        format!("GeoArrayReader({})", text_repr(self.data_type()))
171    }
172
173    #[getter]
174    fn closed(&self) -> bool {
175        self.iter.lock().unwrap().is_none()
176    }
177
178    #[classmethod]
179    fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyGeoArrowResult<Self> {
180        input.into_reader()?.try_into()
181    }
182
183    #[classmethod]
184    #[pyo3(name = "from_arrow_pycapsule")]
185    fn from_arrow_pycapsule_py(
186        _cls: &Bound<PyType>,
187        capsule: &Bound<PyCapsule>,
188    ) -> PyGeoArrowResult<Self> {
189        Self::from_arrow_pycapsule(capsule)
190    }
191
192    #[classmethod]
193    fn from_arrays(
194        _cls: &Bound<PyType>,
195        r#type: PyGeoType,
196        arrays: Vec<PyGeoArray>,
197    ) -> PyGeoArrowResult<Self> {
198        let typ = r#type.into_inner();
199        let arrays = arrays
200            .into_iter()
201            .map(|array| {
202                let array = array.into_inner();
203                if array.data_type() != typ {
204                    return Err(PyValueError::new_err(format!(
205                        "Array data type does not match expected type: got {:?}, expected {:?}",
206                        array.data_type(),
207                        typ
208                    )));
209                }
210                Ok(array.to_array_ref())
211            })
212            .collect::<PyResult<Vec<_>>>()?;
213        PyArrayReader::new(Box::new(ArrayIterator::new(
214            arrays.into_iter().map(Ok),
215            typ.to_field("", true).into(),
216        )))
217        .try_into()
218    }
219
220    #[classmethod]
221    fn from_stream(_cls: &Bound<PyType>, reader: Self) -> Self {
222        reader
223    }
224
225    #[getter]
226    fn r#type(&self) -> PyGeoType {
227        self.data_type.clone().into()
228    }
229
230    fn read_all(&self) -> PyGeoArrowResult<PyGeoChunkedArray> {
231        let stream = self
232            .iter
233            .lock()
234            .unwrap()
235            .take()
236            .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
237        let data_type = stream.data_type();
238        let arrays = stream.collect::<GeoArrowResult<_>>()?;
239        Ok(PyGeoChunkedArray::try_new(arrays, data_type)?)
240    }
241
242    fn read_next_array(&self) -> PyGeoArrowResult<PyGeoArray> {
243        let mut inner = self.iter.lock().unwrap();
244        let stream = inner
245            .as_mut()
246            .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
247
248        if let Some(next_array) = stream.next() {
249            Ok(PyGeoArray::new(next_array?))
250        } else {
251            Err(PyStopIteration::new_err("").into())
252        }
253    }
254}
255
256impl<'py> FromPyObject<'_, 'py> for PyGeoArrayReader {
257    type Error = PyErr;
258
259    fn extract(ob: Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
260        let reader = ob.extract::<PyArrayReader>()?;
261        Ok(Self::new(array_reader_to_geoarrow_array_reader(
262            reader.into_reader()?,
263        )?))
264    }
265}
266
267fn array_reader_to_geoarrow_array_reader(
268    reader: Box<dyn ArrayReader + Send>,
269) -> PyGeoArrowResult<Box<dyn GeoArrowArrayReader + Send>> {
270    let field = reader.field();
271    let data_type = GeoArrowType::try_from(field.as_ref())?;
272    let iter = reader
273        .into_iter()
274        .map(move |array| from_arrow_array(array?.as_ref(), field.as_ref()));
275    Ok(Box::new(GeoArrowArrayIterator::new(iter, data_type)))
276}
277
278fn geoarrow_array_reader_to_array_reader(
279    reader: Box<dyn GeoArrowArrayReader + Send>,
280) -> PyGeoArrowResult<Box<dyn ArrayReader + Send>> {
281    let field = Arc::new(reader.data_type().to_field("", true));
282    let iter = reader
283        .into_iter()
284        .map(move |array| Ok(array?.to_array_ref()));
285    Ok(Box::new(ArrayIterator::new(iter, field)))
286}