pyo3_geoarrow/
array_reader.rs1use std::sync::{Arc, Mutex};
2
3use geoarrow_array::array::from_arrow_array;
4use geoarrow_array::{GeoArrowArrayIterator, GeoArrowArrayReader};
5use geoarrow_schema::GeoArrowType;
6use geoarrow_schema::error::GeoArrowResult;
7use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
8use pyo3::intern;
9use pyo3::prelude::*;
10use pyo3::types::{PyCapsule, PyTuple, PyType};
11use pyo3_arrow::PyArrayReader;
12use pyo3_arrow::error::PyArrowResult;
13use pyo3_arrow::ffi::{ArrayIterator, ArrayReader, to_schema_pycapsule, to_stream_pycapsule};
14use pyo3_arrow::input::AnyArray;
15
16use crate::data_type::PyGeoType;
17use crate::utils::text_repr::text_repr;
18use crate::{PyGeoArray, PyGeoArrowError, PyGeoArrowResult, PyGeoChunkedArray};
19
20#[pyclass(
30 module = "geoarrow.rust.core",
31 name = "GeoArrayReader",
32 subclass,
33 frozen
34)]
35pub struct PyGeoArrayReader {
36 iter: Mutex<Option<Box<dyn GeoArrowArrayReader + Send>>>,
37 data_type: GeoArrowType,
38}
39
40impl PyGeoArrayReader {
41 pub fn new(reader: Box<dyn GeoArrowArrayReader + Send>) -> Self {
43 let data_type = reader.data_type();
44 Self {
45 iter: Mutex::new(Some(reader)),
46 data_type,
47 }
48 }
49
50 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyGeoArrowResult<Self> {
52 let reader = PyArrayReader::from_arrow_pycapsule(capsule)?;
53 Ok(Self::new(array_reader_to_geoarrow_array_reader(
54 reader.into_reader()?,
55 )?))
56 }
57
58 pub fn data_type(&self) -> &GeoArrowType {
64 &self.data_type
65 }
66
67 pub fn into_reader(self) -> PyResult<Box<dyn GeoArrowArrayReader + Send>> {
71 let stream = self
72 .iter
73 .lock()
74 .unwrap()
75 .take()
76 .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
77 Ok(stream)
78 }
79
80 pub fn into_chunked_array(self) -> PyGeoArrowResult<PyGeoChunkedArray> {
82 self.read_all()
83 }
84
85 pub fn to_geoarrow_py<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
89 let geoarrow_mod = py.import(intern!(py, "geoarrow.rust.core"))?;
90 geoarrow_mod
91 .getattr(intern!(py, "GeoArrayReader"))?
92 .call_method1(
93 intern!(py, "from_arrow_pycapsule"),
94 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
95 )
96 }
97
98 pub fn into_geoarrow_py(self, py: Python) -> PyResult<Bound<PyAny>> {
102 let geoarrow_mod = py.import(intern!(py, "geoarrow.rust.core"))?;
103 let geoarray_reader = self
104 .iter
105 .lock()
106 .unwrap()
107 .take()
108 .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
109 let array_reader = geoarrow_array_reader_to_array_reader(geoarray_reader)?;
110 let stream_pycapsule = to_stream_pycapsule(py, array_reader, None)?;
111 geoarrow_mod
112 .getattr(intern!(py, "GeoArrayReader"))?
113 .call_method1(
114 intern!(py, "from_arrow_pycapsule"),
115 PyTuple::new(py, vec![stream_pycapsule])?,
116 )
117 }
118}
119
120impl TryFrom<Box<dyn ArrayReader + Send>> for PyGeoArrayReader {
121 type Error = PyGeoArrowError;
122
123 fn try_from(value: Box<dyn ArrayReader + Send>) -> Result<Self, Self::Error> {
124 Ok(Self::new(array_reader_to_geoarrow_array_reader(value)?))
125 }
126}
127
128impl TryFrom<PyArrayReader> for PyGeoArrayReader {
129 type Error = PyGeoArrowError;
130
131 fn try_from(value: PyArrayReader) -> Result<Self, Self::Error> {
132 value.into_reader()?.try_into()
133 }
134}
135
136#[pymethods]
137impl PyGeoArrayReader {
138 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
139 let field = self.data_type.to_field("", true);
140 to_schema_pycapsule(py, field)
141 }
142
143 #[pyo3(signature = (requested_schema=None))]
144 fn __arrow_c_stream__<'py>(
145 &'py self,
146 py: Python<'py>,
147 requested_schema: Option<Bound<'py, PyCapsule>>,
148 ) -> PyGeoArrowResult<Bound<'py, PyCapsule>> {
149 let geoarray_reader = self
150 .iter
151 .lock()
152 .unwrap()
153 .take()
154 .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
155 let array_reader = geoarrow_array_reader_to_array_reader(geoarray_reader)?;
156 Ok(to_stream_pycapsule(py, array_reader, requested_schema)?)
157 }
158
159 fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
162 slf
163 }
164
165 fn __next__(&self) -> PyGeoArrowResult<PyGeoArray> {
166 self.read_next_array()
167 }
168
169 fn __repr__(&self) -> String {
170 format!("GeoArrayReader({})", text_repr(self.data_type()))
171 }
172
173 #[getter]
174 fn closed(&self) -> bool {
175 self.iter.lock().unwrap().is_none()
176 }
177
178 #[classmethod]
179 fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyGeoArrowResult<Self> {
180 input.into_reader()?.try_into()
181 }
182
183 #[classmethod]
184 #[pyo3(name = "from_arrow_pycapsule")]
185 fn from_arrow_pycapsule_py(
186 _cls: &Bound<PyType>,
187 capsule: &Bound<PyCapsule>,
188 ) -> PyGeoArrowResult<Self> {
189 Self::from_arrow_pycapsule(capsule)
190 }
191
192 #[classmethod]
193 fn from_arrays(
194 _cls: &Bound<PyType>,
195 r#type: PyGeoType,
196 arrays: Vec<PyGeoArray>,
197 ) -> PyGeoArrowResult<Self> {
198 let typ = r#type.into_inner();
199 let arrays = arrays
200 .into_iter()
201 .map(|array| {
202 let array = array.into_inner();
203 if array.data_type() != typ {
204 return Err(PyValueError::new_err(format!(
205 "Array data type does not match expected type: got {:?}, expected {:?}",
206 array.data_type(),
207 typ
208 )));
209 }
210 Ok(array.to_array_ref())
211 })
212 .collect::<PyResult<Vec<_>>>()?;
213 PyArrayReader::new(Box::new(ArrayIterator::new(
214 arrays.into_iter().map(Ok),
215 typ.to_field("", true).into(),
216 )))
217 .try_into()
218 }
219
220 #[classmethod]
221 fn from_stream(_cls: &Bound<PyType>, reader: Self) -> Self {
222 reader
223 }
224
225 #[getter]
226 fn r#type(&self) -> PyGeoType {
227 self.data_type.clone().into()
228 }
229
230 fn read_all(&self) -> PyGeoArrowResult<PyGeoChunkedArray> {
231 let stream = self
232 .iter
233 .lock()
234 .unwrap()
235 .take()
236 .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
237 let data_type = stream.data_type();
238 let arrays = stream.collect::<GeoArrowResult<_>>()?;
239 Ok(PyGeoChunkedArray::try_new(arrays, data_type)?)
240 }
241
242 fn read_next_array(&self) -> PyGeoArrowResult<PyGeoArray> {
243 let mut inner = self.iter.lock().unwrap();
244 let stream = inner
245 .as_mut()
246 .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
247
248 if let Some(next_array) = stream.next() {
249 Ok(PyGeoArray::new(next_array?))
250 } else {
251 Err(PyStopIteration::new_err("").into())
252 }
253 }
254}
255
256impl<'py> FromPyObject<'_, 'py> for PyGeoArrayReader {
257 type Error = PyErr;
258
259 fn extract(ob: Borrowed<'_, 'py, PyAny>) -> PyResult<Self> {
260 let reader = ob.extract::<PyArrayReader>()?;
261 Ok(Self::new(array_reader_to_geoarrow_array_reader(
262 reader.into_reader()?,
263 )?))
264 }
265}
266
267fn array_reader_to_geoarrow_array_reader(
268 reader: Box<dyn ArrayReader + Send>,
269) -> PyGeoArrowResult<Box<dyn GeoArrowArrayReader + Send>> {
270 let field = reader.field();
271 let data_type = GeoArrowType::try_from(field.as_ref())?;
272 let iter = reader
273 .into_iter()
274 .map(move |array| from_arrow_array(array?.as_ref(), field.as_ref()));
275 Ok(Box::new(GeoArrowArrayIterator::new(iter, data_type)))
276}
277
278fn geoarrow_array_reader_to_array_reader(
279 reader: Box<dyn GeoArrowArrayReader + Send>,
280) -> PyGeoArrowResult<Box<dyn ArrayReader + Send>> {
281 let field = Arc::new(reader.data_type().to_field("", true));
282 let iter = reader
283 .into_iter()
284 .map(move |array| Ok(array?.to_array_ref()));
285 Ok(Box::new(ArrayIterator::new(iter, field)))
286}