pyo3_arrow/
array_reader.rs1use std::fmt::Display;
2use std::sync::Mutex;
3
4use arrow_schema::FieldRef;
5use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
6use pyo3::intern;
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyTuple, PyType};
9
10use crate::error::PyArrowResult;
11use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3Field};
12use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
13use crate::ffi::from_python::utils::import_stream_pycapsule;
14use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
15use crate::ffi::to_python::to_stream_pycapsule;
16use crate::ffi::{to_schema_pycapsule, ArrayIterator, ArrayReader};
17use crate::input::AnyArray;
18use crate::{PyArray, PyChunkedArray, PyField};
19
20#[pyclass(module = "arro3.core._core", name = "ArrayReader", subclass, frozen)]
24pub struct PyArrayReader(pub(crate) Mutex<Option<Box<dyn ArrayReader + Send>>>);
25
26impl PyArrayReader {
27 pub fn new(reader: Box<dyn ArrayReader + Send>) -> Self {
29 Self(Mutex::new(Some(reader)))
30 }
31
32 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
34 let stream = import_stream_pycapsule(capsule)?;
35 let stream_reader = ArrowArrayStreamReader::try_new(stream)
36 .map_err(|err| PyValueError::new_err(err.to_string()))?;
37 Ok(Self::new(Box::new(stream_reader)))
38 }
39
40 pub fn into_reader(self) -> PyResult<Box<dyn ArrayReader + Send>> {
44 self.to_reader()
45 }
46
47 pub fn into_chunked_array(self) -> PyArrowResult<PyChunkedArray> {
49 self.to_chunked_array()
50 }
51
52 pub fn to_reader(&self) -> PyResult<Box<dyn ArrayReader + Send>> {
56 let stream = self
57 .0
58 .lock()
59 .unwrap()
60 .take()
61 .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
62 Ok(stream)
63 }
64
65 pub fn to_chunked_array(&self) -> PyArrowResult<PyChunkedArray> {
67 let stream = self
68 .0
69 .lock()
70 .unwrap()
71 .take()
72 .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
73 let field = stream.field();
74 let mut arrays = vec![];
75 for array in stream {
76 arrays.push(array?);
77 }
78 Ok(PyChunkedArray::try_new(arrays, field)?)
79 }
80
81 pub fn field_ref(&self) -> PyResult<FieldRef> {
85 let inner = self.0.lock().unwrap();
86 let stream = inner
87 .as_ref()
88 .ok_or(PyIOError::new_err("Stream already closed."))?;
89 Ok(stream.field())
90 }
91
92 pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
94 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
95 arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
96 intern!(py, "from_arrow_pycapsule"),
97 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
98 )
99 }
100
101 pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
103 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
104 let array_reader = self
105 .0
106 .lock()
107 .unwrap()
108 .take()
109 .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
110 let stream_pycapsule = to_stream_pycapsule(py, array_reader, None)?;
111 arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
112 intern!(py, "from_arrow_pycapsule"),
113 PyTuple::new(py, vec![stream_pycapsule])?,
114 )
115 }
116
117 pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
119 to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
120 }
121}
122
123impl AsRef<Mutex<Option<Box<dyn ArrayReader + Send>>>> for PyArrayReader {
124 fn as_ref(&self) -> &Mutex<Option<Box<dyn ArrayReader + Send>>> {
125 &self.0
126 }
127}
128
129impl From<Box<dyn ArrayReader + Send>> for PyArrayReader {
130 fn from(value: Box<dyn ArrayReader + Send>) -> Self {
131 Self::new(value)
132 }
133}
134
135impl Display for PyArrayReader {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 writeln!(f, "arro3.core.ArrayReader")?;
138 writeln!(f, "-----------------------")?;
139 if let Ok(field) = self.field_ref() {
140 field.data_type().fmt(f)
141 } else {
142 writeln!(f, "Closed stream")
143 }
144 }
145}
146
147#[pymethods]
148impl PyArrayReader {
149 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
150 to_schema_pycapsule(py, self.field_ref()?.as_ref())
151 }
152
153 #[pyo3(signature = (requested_schema=None))]
154 fn __arrow_c_stream__<'py>(
155 &'py self,
156 py: Python<'py>,
157 requested_schema: Option<Bound<'py, PyCapsule>>,
158 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
159 let array_reader = self
160 .0
161 .lock()
162 .unwrap()
163 .take()
164 .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
165 to_stream_pycapsule(py, array_reader, requested_schema)
166 }
167
168 fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
171 slf
172 }
173
174 fn __next__(&self) -> PyArrowResult<Arro3Array> {
175 self.read_next_array()
176 }
177
178 fn __repr__(&self) -> String {
179 self.to_string()
180 }
181
182 #[getter]
183 fn closed(&self) -> bool {
184 self.0.lock().unwrap().is_none()
185 }
186
187 #[classmethod]
188 fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
189 let reader = input.into_reader()?;
190 Ok(Self::new(reader))
191 }
192
193 #[classmethod]
194 #[pyo3(name = "from_arrow_pycapsule")]
195 fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
196 Self::from_arrow_pycapsule(capsule)
197 }
198
199 #[classmethod]
200 fn from_arrays(_cls: &Bound<PyType>, field: PyField, arrays: Vec<PyArray>) -> Self {
201 let arrays = arrays
202 .into_iter()
203 .map(|array| {
204 let (arr, _field) = array.into_inner();
205 arr
206 })
207 .collect::<Vec<_>>();
208 Self::new(Box::new(ArrayIterator::new(
209 arrays.into_iter().map(Ok),
210 field.into_inner(),
211 )))
212 }
213
214 #[classmethod]
215 fn from_stream(_cls: &Bound<PyType>, data: &Bound<PyAny>) -> PyResult<Self> {
216 data.extract()
217 }
218
219 #[getter]
220 fn field(&self) -> PyResult<Arro3Field> {
221 Ok(PyField::new(self.field_ref()?).into())
222 }
223
224 fn read_all(&self) -> PyArrowResult<Arro3ChunkedArray> {
225 let stream = self
226 .0
227 .lock()
228 .unwrap()
229 .take()
230 .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
231 let field = stream.field();
232 let mut arrays = vec![];
233 for array in stream {
234 arrays.push(array?);
235 }
236 Ok(PyChunkedArray::try_new(arrays, field)?.into())
237 }
238
239 fn read_next_array(&self) -> PyArrowResult<Arro3Array> {
240 let mut inner = self.0.lock().unwrap();
241 let stream = inner
242 .as_mut()
243 .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
244
245 if let Some(next_batch) = stream.next() {
246 Ok(PyArray::new(next_batch?, stream.field()).into())
247 } else {
248 Err(PyStopIteration::new_err("").into())
249 }
250 }
251}