pyo3_arrow/
chunked.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow_array::{Array, ArrayRef};
5use arrow_cast::cast;
6use arrow_schema::{ArrowError, DataType, Field, FieldRef};
7use arrow_select::concat::concat;
8use pyo3::exceptions::{PyIndexError, PyTypeError, PyValueError};
9use pyo3::prelude::*;
10use pyo3::types::{PyCapsule, PyTuple, PyType};
11use pyo3::{intern, IntoPyObjectExt};
12
13use crate::error::{PyArrowError, PyArrowResult};
14use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field};
15use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
16use crate::ffi::from_python::utils::import_stream_pycapsule;
17use crate::ffi::to_python::chunked::ArrayIterator;
18use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
19use crate::ffi::to_python::to_stream_pycapsule;
20use crate::ffi::to_schema_pycapsule;
21use crate::input::AnyArray;
22use crate::interop::numpy::to_numpy::chunked_to_numpy;
23use crate::{PyArray, PyDataType, PyField, PyScalar};
24
25/// A Python-facing Arrow chunked array.
26///
27/// This is a wrapper around a [FieldRef] and a `Vec` of [ArrayRef].
28#[derive(Debug)]
29#[pyclass(module = "arro3.core._core", name = "ChunkedArray", subclass, frozen)]
30pub struct PyChunkedArray {
31    chunks: Vec<ArrayRef>,
32    field: FieldRef,
33}
34
35impl PyChunkedArray {
36    /// Construct a new [PyChunkedArray] from existing chunks and a field.
37    pub fn try_new(chunks: Vec<ArrayRef>, field: FieldRef) -> PyResult<Self> {
38        if !chunks
39            .iter()
40            .all(|chunk| chunk.data_type().equals_datatype(field.data_type()))
41        {
42            return Err(PyTypeError::new_err("All chunks must have same data type"));
43        }
44
45        Ok(Self { chunks, field })
46    }
47
48    /// Access the [DataType] of this ChunkedArray
49    pub fn data_type(&self) -> &DataType {
50        self.field.data_type()
51    }
52
53    /// Create a new PyChunkedArray from a vec of [ArrayRef]s, inferring their data type
54    /// automatically.
55    pub fn from_array_refs(chunks: Vec<ArrayRef>) -> PyArrowResult<Self> {
56        if chunks.is_empty() {
57            return Err(ArrowError::SchemaError(
58                "Cannot infer data type from empty Vec<ArrayRef>".to_string(),
59            )
60            .into());
61        }
62
63        if !chunks
64            .windows(2)
65            .all(|w| w[0].data_type() == w[1].data_type())
66        {
67            return Err(ArrowError::SchemaError("Mismatched data types".to_string()).into());
68        }
69
70        let field = Field::new("", chunks.first().unwrap().data_type().clone(), true);
71        Ok(Self::try_new(chunks, Arc::new(field))?)
72    }
73
74    /// Import from a raw Arrow C Stream capsule
75    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
76        let stream = import_stream_pycapsule(capsule)?;
77
78        let stream_reader = ArrowArrayStreamReader::try_new(stream)
79            .map_err(|err| PyValueError::new_err(err.to_string()))?;
80
81        let field = stream_reader.field();
82
83        let mut chunks = vec![];
84        for array in stream_reader {
85            let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
86            chunks.push(array);
87        }
88
89        PyChunkedArray::try_new(chunks, field)
90    }
91
92    /// Access the underlying chunks.
93    pub fn chunks(&self) -> &[ArrayRef] {
94        &self.chunks
95    }
96
97    /// Access the underlying field.
98    pub fn field(&self) -> &FieldRef {
99        &self.field
100    }
101
102    /// Consume this and return its inner parts.
103    pub fn into_inner(self) -> (Vec<ArrayRef>, FieldRef) {
104        (self.chunks, self.field)
105    }
106
107    #[allow(dead_code)]
108    pub(crate) fn is_empty(&self) -> bool {
109        self.len() == 0
110    }
111
112    pub(crate) fn len(&self) -> usize {
113        self.chunks.iter().fold(0, |acc, arr| acc + arr.len())
114    }
115
116    pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
117        let total_chunk_length = chunk_lengths.iter().sum::<usize>();
118        if total_chunk_length != self.length() {
119            return Err(PyValueError::new_err(
120                "Chunk lengths do not add up to chunked array length",
121            )
122            .into());
123        }
124
125        // If the desired rechunking is the existing chunking, return early
126        let matches_existing_chunking = chunk_lengths
127            .iter()
128            .zip(self.chunks())
129            .all(|(length, arr)| *length == arr.len());
130        if matches_existing_chunking {
131            return Ok(Self::try_new(self.chunks.clone(), self.field.clone())?);
132        }
133
134        let mut offset = 0;
135        let chunks = chunk_lengths
136            .iter()
137            .map(|chunk_length| {
138                let sliced_chunked_array = self.slice(offset, *chunk_length)?;
139                let arr_refs = sliced_chunked_array
140                    .chunks
141                    .iter()
142                    .map(|a| a.as_ref())
143                    .collect::<Vec<_>>();
144                let sliced_concatted = concat(&arr_refs)?;
145                offset += chunk_length;
146                Ok(sliced_concatted)
147            })
148            .collect::<PyArrowResult<Vec<_>>>()?;
149
150        Ok(PyChunkedArray::try_new(chunks, self.field.clone())?)
151    }
152
153    pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
154        if offset + length > self.length() {
155            return Err(
156                PyValueError::new_err("offset + length may not exceed length of array").into(),
157            );
158        }
159
160        let mut sliced_chunks: Vec<ArrayRef> = vec![];
161        for chunk in self.chunks() {
162            if chunk.is_empty() {
163                continue;
164            }
165
166            // If the offset is greater than the len of this chunk, don't include any rows from
167            // this chunk
168            if offset >= chunk.len() {
169                offset -= chunk.len();
170                continue;
171            }
172
173            let take_count = length.min(chunk.len() - offset);
174            let sliced_chunk = chunk.slice(offset, take_count);
175            sliced_chunks.push(sliced_chunk);
176
177            length -= take_count;
178
179            // If we've selected all rows, exit
180            if length == 0 {
181                break;
182            } else {
183                offset = 0;
184            }
185        }
186
187        Ok(Self::try_new(sliced_chunks, self.field.clone())?)
188    }
189
190    /// Export this to a Python `arro3.core.ChunkedArray`.
191    pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
192        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
193        arro3_mod
194            .getattr(intern!(py, "ChunkedArray"))?
195            .call_method1(
196                intern!(py, "from_arrow_pycapsule"),
197                PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
198            )
199    }
200
201    /// Export this to a Python `arro3.core.ChunkedArray`.
202    pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
203        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
204        let capsule = Self::to_stream_pycapsule(py, self.chunks.clone(), self.field.clone(), None)?;
205        arro3_mod
206            .getattr(intern!(py, "ChunkedArray"))?
207            .call_method1(
208                intern!(py, "from_arrow_pycapsule"),
209                PyTuple::new(py, vec![capsule])?,
210            )
211    }
212    /// Export this to a Python `nanoarrow.ArrayStream`.
213    pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
214        to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
215    }
216
217    /// Export to a pyarrow.ChunkedArray
218    ///
219    /// Requires pyarrow >=14
220    pub fn to_pyarrow(self, py: Python) -> PyResult<PyObject> {
221        let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
222        let pyarrow_obj = pyarrow_mod
223            .getattr(intern!(py, "chunked_array"))?
224            .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?;
225        pyarrow_obj.into_py_any(py)
226    }
227
228    pub(crate) fn to_stream_pycapsule<'py>(
229        py: Python<'py>,
230        chunks: Vec<ArrayRef>,
231        field: FieldRef,
232        requested_schema: Option<Bound<'py, PyCapsule>>,
233    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
234        let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field));
235        to_stream_pycapsule(py, array_reader, requested_schema)
236    }
237}
238
239impl TryFrom<Vec<ArrayRef>> for PyChunkedArray {
240    type Error = PyArrowError;
241
242    fn try_from(value: Vec<ArrayRef>) -> Result<Self, Self::Error> {
243        Self::from_array_refs(value)
244    }
245}
246
247impl AsRef<[ArrayRef]> for PyChunkedArray {
248    fn as_ref(&self) -> &[ArrayRef] {
249        &self.chunks
250    }
251}
252
253impl Display for PyChunkedArray {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        write!(f, "arro3.core.ChunkedArray<")?;
256        self.field.data_type().fmt(f)?;
257        writeln!(f, ">")?;
258        Ok(())
259    }
260}
261
262#[pymethods]
263impl PyChunkedArray {
264    #[new]
265    #[pyo3(signature = (arrays, r#type=None))]
266    fn init(arrays: &Bound<PyAny>, r#type: Option<PyField>) -> PyArrowResult<Self> {
267        if let Ok(data) = arrays.extract::<AnyArray>() {
268            Ok(data.into_chunked_array()?)
269        } else if let Ok(arrays) = arrays.extract::<Vec<PyArray>>() {
270            // TODO: move this into from_arrays?
271            let (chunks, fields): (Vec<_>, Vec<_>) =
272                arrays.into_iter().map(|arr| arr.into_inner()).unzip();
273            if !fields
274                .windows(2)
275                .all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
276            {
277                return Err(PyTypeError::new_err(
278                    "Cannot create a ChunkedArray with differing data types.",
279                )
280                .into());
281            }
282
283            let field = r#type
284                .map(|py_data_type| py_data_type.into_inner())
285                .unwrap_or_else(|| fields[0].clone());
286
287            Ok(PyChunkedArray::try_new(
288                chunks,
289                Field::new("", field.data_type().clone(), true)
290                    .with_metadata(field.metadata().clone())
291                    .into(),
292            )?)
293        } else {
294            Err(
295                PyTypeError::new_err("Expected ChunkedArray-like input or sequence of arrays.")
296                    .into(),
297            )
298        }
299    }
300
301    #[pyo3(signature = (dtype=None, copy=None))]
302    #[allow(unused_variables)]
303    fn __array__<'py>(
304        &'py self,
305        py: Python<'py>,
306        dtype: Option<PyObject>,
307        copy: Option<PyObject>,
308    ) -> PyResult<Bound<'py, PyAny>> {
309        let chunk_refs = self
310            .chunks
311            .iter()
312            .map(|arr| arr.as_ref())
313            .collect::<Vec<_>>();
314        chunked_to_numpy(py, chunk_refs)
315    }
316
317    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
318        to_schema_pycapsule(py, self.field.as_ref())
319    }
320
321    #[pyo3(signature = (requested_schema=None))]
322    fn __arrow_c_stream__<'py>(
323        &'py self,
324        py: Python<'py>,
325        requested_schema: Option<Bound<'py, PyCapsule>>,
326    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
327        Self::to_stream_pycapsule(
328            py,
329            self.chunks.clone(),
330            self.field.clone(),
331            requested_schema,
332        )
333    }
334
335    fn __eq__(&self, other: &PyChunkedArray) -> bool {
336        self.field == other.field && self.chunks == other.chunks
337    }
338
339    fn __getitem__(&self, i: isize) -> PyArrowResult<PyScalar> {
340        // Handle negative indexes from the end
341        let mut i = if i < 0 {
342            let i = self.len() as isize + i;
343            if i < 0 {
344                return Err(PyIndexError::new_err("Index out of range").into());
345            }
346            i as usize
347        } else {
348            i as usize
349        };
350        if i >= self.len() {
351            return Err(PyIndexError::new_err("Index out of range").into());
352        }
353        for chunk in self.chunks() {
354            if i < chunk.len() {
355                return PyScalar::try_new(chunk.slice(i, 1), self.field.clone());
356            }
357            i -= chunk.len();
358        }
359        unreachable!("index in range but past end of last chunk")
360    }
361
362    fn __len__(&self) -> usize {
363        self.chunks.iter().fold(0, |acc, x| acc + x.len())
364    }
365
366    fn __repr__(&self) -> String {
367        self.to_string()
368    }
369
370    #[classmethod]
371    fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
372        input.into_chunked_array()
373    }
374
375    #[classmethod]
376    #[pyo3(name = "from_arrow_pycapsule")]
377    fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
378        Self::from_arrow_pycapsule(capsule)
379    }
380
381    fn cast(&self, target_type: PyField) -> PyArrowResult<Arro3ChunkedArray> {
382        let new_field = target_type.into_inner();
383        let new_chunks = self
384            .chunks
385            .iter()
386            .map(|chunk| cast(&chunk, new_field.data_type()))
387            .collect::<Result<Vec<_>, ArrowError>>()?;
388        Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into())
389    }
390
391    fn chunk(&self, i: usize) -> PyResult<Arro3Array> {
392        let field = self.field().clone();
393        let array = self
394            .chunks
395            .get(i)
396            .ok_or(PyValueError::new_err("out of index"))?
397            .clone();
398        Ok(PyArray::new(array, field).into())
399    }
400
401    #[getter]
402    #[pyo3(name = "chunks")]
403    fn chunks_py(&self) -> Vec<Arro3Array> {
404        let field = self.field().clone();
405        self.chunks
406            .iter()
407            .map(|array| PyArray::new(array.clone(), field.clone()).into())
408            .collect()
409    }
410
411    fn combine_chunks(&self) -> PyArrowResult<Arro3Array> {
412        let field = self.field().clone();
413        let arrays: Vec<&dyn Array> = self.chunks.iter().map(|arr| arr.as_ref()).collect();
414        Ok(PyArray::new(concat(&arrays)?, field).into())
415    }
416
417    fn equals(&self, other: PyChunkedArray) -> bool {
418        self.field == other.field && self.chunks == other.chunks
419    }
420
421    #[getter]
422    #[pyo3(name = "field")]
423    fn py_field(&self) -> Arro3Field {
424        PyField::new(self.field.clone()).into()
425    }
426
427    fn length(&self) -> usize {
428        self.len()
429    }
430
431    #[getter]
432    fn nbytes(&self) -> usize {
433        self.chunks
434            .iter()
435            .fold(0, |acc, batch| acc + batch.get_array_memory_size())
436    }
437
438    #[getter]
439    fn null_count(&self) -> usize {
440        self.chunks
441            .iter()
442            .fold(0, |acc, arr| acc + arr.null_count())
443    }
444
445    #[getter]
446    fn num_chunks(&self) -> usize {
447        self.chunks.len()
448    }
449
450    #[pyo3(signature = (*, max_chunksize=None))]
451    #[pyo3(name = "rechunk")]
452    fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
453        let max_chunksize = max_chunksize.unwrap_or(self.len());
454        let mut chunk_lengths = vec![];
455        let mut offset = 0;
456        while offset < self.len() {
457            let chunk_length = max_chunksize.min(self.len() - offset);
458            offset += chunk_length;
459            chunk_lengths.push(chunk_length);
460        }
461        Ok(self.rechunk(chunk_lengths)?.into())
462    }
463
464    #[pyo3(signature = (offset=0, length=None))]
465    #[pyo3(name = "slice")]
466    fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
467        let length = length.unwrap_or_else(|| self.len() - offset);
468        Ok(self.slice(offset, length)?.into())
469    }
470
471    fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
472        self.__array__(py, None, None)
473    }
474
475    fn to_pylist(&self, py: Python) -> PyResult<PyObject> {
476        let mut scalars = Vec::with_capacity(self.len());
477        for chunk in &self.chunks {
478            for i in 0..chunk.len() {
479                let scalar =
480                    unsafe { PyScalar::new_unchecked(chunk.slice(i, 1), self.field.clone()) };
481                scalars.push(scalar.as_py(py)?);
482            }
483        }
484        scalars.into_py_any(py)
485    }
486
487    #[getter]
488    fn r#type(&self) -> Arro3DataType {
489        PyDataType::new(self.field.data_type().clone()).into()
490    }
491}