pyo3_arrow/
array_reader.rs1use std::fmt::Display;
2use std::sync::Mutex;
3
4use arrow_schema::FieldRef;
5use pyo3::exceptions::{PyIOError, PyStopIteration, PyValueError};
6use pyo3::intern;
7use pyo3::prelude::*;
8use pyo3::types::{PyCapsule, PyTuple, PyType};
9
10use crate::error::PyArrowResult;
11use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3Field};
12use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
13use crate::ffi::from_python::utils::import_stream_pycapsule;
14use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
15use crate::ffi::to_python::to_stream_pycapsule;
16use crate::ffi::{to_schema_pycapsule, ArrayIterator, ArrayReader};
17use crate::input::AnyArray;
18use crate::{PyArray, PyChunkedArray, PyField};
19
20#[pyclass(module = "arro3.core._core", name = "ArrayReader", subclass, frozen)]
24pub struct PyArrayReader(pub(crate) Mutex<Option<Box<dyn ArrayReader + Send>>>);
25
26impl PyArrayReader {
27 pub fn new(reader: Box<dyn ArrayReader + Send>) -> Self {
29 Self(Mutex::new(Some(reader)))
30 }
31
32 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
34 let stream = import_stream_pycapsule(capsule)?;
35 let stream_reader = ArrowArrayStreamReader::try_new(stream)
36 .map_err(|err| PyValueError::new_err(err.to_string()))?;
37 Ok(Self::new(Box::new(stream_reader)))
38 }
39
40 pub fn into_reader(self) -> PyResult<Box<dyn ArrayReader + Send>> {
44 let stream = self
45 .0
46 .lock()
47 .unwrap()
48 .take()
49 .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
50 Ok(stream)
51 }
52
53 pub fn into_chunked_array(self) -> PyArrowResult<PyChunkedArray> {
55 let stream = self
56 .0
57 .lock()
58 .unwrap()
59 .take()
60 .ok_or(PyIOError::new_err("Cannot write from closed stream."))?;
61 let field = stream.field();
62 let mut arrays = vec![];
63 for array in stream {
64 arrays.push(array?);
65 }
66 Ok(PyChunkedArray::try_new(arrays, field)?)
67 }
68
69 pub fn field_ref(&self) -> PyResult<FieldRef> {
73 let inner = self.0.lock().unwrap();
74 let stream = inner
75 .as_ref()
76 .ok_or(PyIOError::new_err("Stream already closed."))?;
77 Ok(stream.field())
78 }
79
80 pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
82 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
83 arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
84 intern!(py, "from_arrow_pycapsule"),
85 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
86 )
87 }
88
89 pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
91 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
92 let array_reader = self
93 .0
94 .lock()
95 .unwrap()
96 .take()
97 .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
98 let stream_pycapsule = to_stream_pycapsule(py, array_reader, None)?;
99 arro3_mod.getattr(intern!(py, "ArrayReader"))?.call_method1(
100 intern!(py, "from_arrow_pycapsule"),
101 PyTuple::new(py, vec![stream_pycapsule])?,
102 )
103 }
104
105 pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
107 to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
108 }
109}
110
111impl From<Box<dyn ArrayReader + Send>> for PyArrayReader {
112 fn from(value: Box<dyn ArrayReader + Send>) -> Self {
113 Self::new(value)
114 }
115}
116
117impl Display for PyArrayReader {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 writeln!(f, "arro3.core.ArrayReader")?;
120 writeln!(f, "-----------------------")?;
121 if let Ok(field) = self.field_ref() {
122 field.data_type().fmt(f)
123 } else {
124 writeln!(f, "Closed stream")
125 }
126 }
127}
128
129#[pymethods]
130impl PyArrayReader {
131 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
132 to_schema_pycapsule(py, self.field_ref()?.as_ref())
133 }
134
135 #[pyo3(signature = (requested_schema=None))]
136 fn __arrow_c_stream__<'py>(
137 &'py self,
138 py: Python<'py>,
139 requested_schema: Option<Bound<'py, PyCapsule>>,
140 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
141 let array_reader = self
142 .0
143 .lock()
144 .unwrap()
145 .take()
146 .ok_or(PyIOError::new_err("Cannot read from closed stream"))?;
147 to_stream_pycapsule(py, array_reader, requested_schema)
148 }
149
150 fn __iter__(slf: PyRef<Self>) -> PyRef<Self> {
153 slf
154 }
155
156 fn __next__(&self) -> PyArrowResult<Arro3Array> {
157 self.read_next_array()
158 }
159
160 fn __repr__(&self) -> String {
161 self.to_string()
162 }
163
164 #[getter]
165 fn closed(&self) -> bool {
166 self.0.lock().unwrap().is_none()
167 }
168
169 #[classmethod]
170 fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
171 let reader = input.into_reader()?;
172 Ok(Self::new(reader))
173 }
174
175 #[classmethod]
176 #[pyo3(name = "from_arrow_pycapsule")]
177 fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
178 Self::from_arrow_pycapsule(capsule)
179 }
180
181 #[classmethod]
182 fn from_arrays(_cls: &Bound<PyType>, field: PyField, arrays: Vec<PyArray>) -> Self {
183 let arrays = arrays
184 .into_iter()
185 .map(|array| {
186 let (arr, _field) = array.into_inner();
187 arr
188 })
189 .collect::<Vec<_>>();
190 Self::new(Box::new(ArrayIterator::new(
191 arrays.into_iter().map(Ok),
192 field.into_inner(),
193 )))
194 }
195
196 #[classmethod]
197 fn from_stream(_cls: &Bound<PyType>, data: &Bound<PyAny>) -> PyResult<Self> {
198 data.extract()
199 }
200
201 #[getter]
202 fn field(&self) -> PyResult<Arro3Field> {
203 Ok(PyField::new(self.field_ref()?).into())
204 }
205
206 fn read_all(&self) -> PyArrowResult<Arro3ChunkedArray> {
207 let stream = self
208 .0
209 .lock()
210 .unwrap()
211 .take()
212 .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
213 let field = stream.field();
214 let mut arrays = vec![];
215 for array in stream {
216 arrays.push(array?);
217 }
218 Ok(PyChunkedArray::try_new(arrays, field)?.into())
219 }
220
221 fn read_next_array(&self) -> PyArrowResult<Arro3Array> {
222 let mut inner = self.0.lock().unwrap();
223 let stream = inner
224 .as_mut()
225 .ok_or(PyIOError::new_err("Cannot read from closed stream."))?;
226
227 if let Some(next_batch) = stream.next() {
228 Ok(PyArray::new(next_batch?, stream.field()).into())
229 } else {
230 Err(PyStopIteration::new_err("").into())
231 }
232 }
233}