pyo3_arrow/
chunked.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow_array::{Array, ArrayRef};
5use arrow_cast::cast;
6use arrow_cast::display::ArrayFormatter;
7use arrow_schema::{ArrowError, DataType, Field, FieldRef};
8use arrow_select::concat::concat;
9use pyo3::exceptions::{PyIndexError, PyTypeError, PyValueError};
10use pyo3::intern;
11use pyo3::prelude::*;
12use pyo3::types::{PyCapsule, PyTuple, PyType};
13
14use crate::error::{PyArrowError, PyArrowResult};
15use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field};
16use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
17use crate::ffi::from_python::utils::import_stream_pycapsule;
18use crate::ffi::to_python::chunked::ArrayIterator;
19use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
20use crate::ffi::to_python::to_stream_pycapsule;
21use crate::ffi::to_schema_pycapsule;
22use crate::input::AnyArray;
23use crate::interop::numpy::to_numpy::chunked_to_numpy;
24use crate::utils::default_repr_options;
25use crate::{PyArray, PyDataType, PyField, PyScalar};
26
27/// A Python-facing Arrow chunked array.
28///
29/// This is a wrapper around a [FieldRef] and a `Vec` of [ArrayRef].
30#[derive(Debug)]
31#[pyclass(module = "arro3.core._core", name = "ChunkedArray", subclass, frozen)]
32pub struct PyChunkedArray {
33    chunks: Vec<ArrayRef>,
34    field: FieldRef,
35}
36
37impl PyChunkedArray {
38    /// Construct a new [PyChunkedArray] from existing chunks and a field.
39    pub fn try_new(chunks: Vec<ArrayRef>, field: FieldRef) -> PyResult<Self> {
40        if !chunks
41            .iter()
42            .all(|chunk| chunk.data_type().equals_datatype(field.data_type()))
43        {
44            return Err(PyTypeError::new_err("All chunks must have same data type"));
45        }
46
47        Ok(Self { chunks, field })
48    }
49
50    /// Access the [DataType] of this ChunkedArray
51    pub fn data_type(&self) -> &DataType {
52        self.field.data_type()
53    }
54
55    /// Create a new PyChunkedArray from a vec of [ArrayRef]s, inferring their data type
56    /// automatically.
57    pub fn from_array_refs(chunks: Vec<ArrayRef>) -> PyArrowResult<Self> {
58        if chunks.is_empty() {
59            return Err(ArrowError::SchemaError(
60                "Cannot infer data type from empty Vec<ArrayRef>".to_string(),
61            )
62            .into());
63        }
64
65        if !chunks
66            .windows(2)
67            .all(|w| w[0].data_type() == w[1].data_type())
68        {
69            return Err(ArrowError::SchemaError("Mismatched data types".to_string()).into());
70        }
71
72        let field = Field::new("", chunks.first().unwrap().data_type().clone(), true);
73        Ok(Self::try_new(chunks, Arc::new(field))?)
74    }
75
76    /// Import from a raw Arrow C Stream capsule
77    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
78        let stream = import_stream_pycapsule(capsule)?;
79
80        let stream_reader = ArrowArrayStreamReader::try_new(stream)
81            .map_err(|err| PyValueError::new_err(err.to_string()))?;
82
83        let field = stream_reader.field();
84
85        let mut chunks = vec![];
86        for array in stream_reader {
87            let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
88            chunks.push(array);
89        }
90
91        PyChunkedArray::try_new(chunks, field)
92    }
93
94    /// Access the underlying chunks.
95    pub fn chunks(&self) -> &[ArrayRef] {
96        &self.chunks
97    }
98
99    /// Access the underlying field.
100    pub fn field(&self) -> &FieldRef {
101        &self.field
102    }
103
104    /// Consume this and return its inner parts.
105    pub fn into_inner(self) -> (Vec<ArrayRef>, FieldRef) {
106        (self.chunks, self.field)
107    }
108
109    #[allow(dead_code)]
110    pub(crate) fn is_empty(&self) -> bool {
111        self.len() == 0
112    }
113
114    pub(crate) fn len(&self) -> usize {
115        self.chunks.iter().fold(0, |acc, arr| acc + arr.len())
116    }
117
118    pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
119        let total_chunk_length = chunk_lengths.iter().sum::<usize>();
120        if total_chunk_length != self.length() {
121            return Err(PyValueError::new_err(
122                "Chunk lengths do not add up to chunked array length",
123            )
124            .into());
125        }
126
127        // If the desired rechunking is the existing chunking, return early
128        let matches_existing_chunking = chunk_lengths
129            .iter()
130            .zip(self.chunks())
131            .all(|(length, arr)| *length == arr.len());
132        if matches_existing_chunking {
133            return Ok(Self::try_new(self.chunks.clone(), self.field.clone())?);
134        }
135
136        let mut offset = 0;
137        let chunks = chunk_lengths
138            .iter()
139            .map(|chunk_length| {
140                let sliced_chunked_array = self.slice(offset, *chunk_length)?;
141                let arr_refs = sliced_chunked_array
142                    .chunks
143                    .iter()
144                    .map(|a| a.as_ref())
145                    .collect::<Vec<_>>();
146                let sliced_concatted = concat(&arr_refs)?;
147                offset += chunk_length;
148                Ok(sliced_concatted)
149            })
150            .collect::<PyArrowResult<Vec<_>>>()?;
151
152        Ok(PyChunkedArray::try_new(chunks, self.field.clone())?)
153    }
154
155    pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
156        if offset + length > self.length() {
157            return Err(
158                PyValueError::new_err("offset + length may not exceed length of array").into(),
159            );
160        }
161
162        let mut sliced_chunks: Vec<ArrayRef> = vec![];
163        for chunk in self.chunks() {
164            if chunk.is_empty() {
165                continue;
166            }
167
168            // If the offset is greater than the len of this chunk, don't include any rows from
169            // this chunk
170            if offset >= chunk.len() {
171                offset -= chunk.len();
172                continue;
173            }
174
175            let take_count = length.min(chunk.len() - offset);
176            let sliced_chunk = chunk.slice(offset, take_count);
177            sliced_chunks.push(sliced_chunk);
178
179            length -= take_count;
180
181            // If we've selected all rows, exit
182            if length == 0 {
183                break;
184            } else {
185                offset = 0;
186            }
187        }
188
189        Ok(Self::try_new(sliced_chunks, self.field.clone())?)
190    }
191
192    /// Export this to a Python `arro3.core.ChunkedArray`.
193    pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
194        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
195        arro3_mod
196            .getattr(intern!(py, "ChunkedArray"))?
197            .call_method1(
198                intern!(py, "from_arrow_pycapsule"),
199                PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
200            )
201    }
202
203    /// Export this to a Python `arro3.core.ChunkedArray`.
204    pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
205        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
206        let capsule = Self::to_stream_pycapsule(py, self.chunks.clone(), self.field.clone(), None)?;
207        arro3_mod
208            .getattr(intern!(py, "ChunkedArray"))?
209            .call_method1(
210                intern!(py, "from_arrow_pycapsule"),
211                PyTuple::new(py, vec![capsule])?,
212            )
213    }
214    /// Export this to a Python `nanoarrow.ArrayStream`.
215    pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
216        to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
217    }
218
219    /// Export to a pyarrow.ChunkedArray
220    ///
221    /// Requires pyarrow >=14
222    pub fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
223        let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
224        pyarrow_mod
225            .getattr(intern!(py, "chunked_array"))?
226            .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)
227    }
228
229    pub(crate) fn to_stream_pycapsule<'py>(
230        py: Python<'py>,
231        chunks: Vec<ArrayRef>,
232        field: FieldRef,
233        requested_schema: Option<Bound<'py, PyCapsule>>,
234    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
235        let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field));
236        to_stream_pycapsule(py, array_reader, requested_schema)
237    }
238}
239
240impl TryFrom<Vec<ArrayRef>> for PyChunkedArray {
241    type Error = PyArrowError;
242
243    fn try_from(value: Vec<ArrayRef>) -> Result<Self, Self::Error> {
244        Self::from_array_refs(value)
245    }
246}
247
248impl AsRef<[ArrayRef]> for PyChunkedArray {
249    fn as_ref(&self) -> &[ArrayRef] {
250        &self.chunks
251    }
252}
253
254impl Display for PyChunkedArray {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        write!(f, "arro3.core.ChunkedArray<")?;
257        self.field.data_type().fmt(f)?;
258        writeln!(f, ">")?;
259
260        let options = default_repr_options();
261
262        writeln!(f, "[")?;
263        for chunk in self.chunks.iter().take(5) {
264            writeln!(f, "  [")?;
265            let formatter =
266                ArrayFormatter::try_new(chunk, &options).map_err(|_| std::fmt::Error)?;
267            for i in 0..chunk.len().min(10) {
268                let row = formatter.value(i);
269                writeln!(f, "    {},", row)?;
270            }
271            writeln!(f, "  ]")?;
272        }
273        writeln!(f, "]")?;
274
275        Ok(())
276    }
277}
278
279#[pymethods]
280impl PyChunkedArray {
281    #[new]
282    #[pyo3(signature = (arrays, r#type=None))]
283    fn init(py: Python, arrays: &Bound<PyAny>, r#type: Option<PyField>) -> PyArrowResult<Self> {
284        if arrays.hasattr(intern!(py, "__arrow_c_array__"))?
285            || arrays.hasattr(intern!(py, "__arrow_c_stream__"))?
286        {
287            Ok(arrays.extract::<AnyArray>()?.into_chunked_array()?)
288        } else if let Ok(data) = arrays.extract::<AnyArray>() {
289            Ok(data.into_chunked_array()?)
290        } else if let Ok(arrays) = arrays.extract::<Vec<PyArray>>() {
291            // TODO: move this into from_arrays?
292            let (chunks, fields): (Vec<_>, Vec<_>) =
293                arrays.into_iter().map(|arr| arr.into_inner()).unzip();
294            if !fields
295                .windows(2)
296                .all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
297            {
298                return Err(PyTypeError::new_err(
299                    "Cannot create a ChunkedArray with differing data types.",
300                )
301                .into());
302            }
303
304            let field = r#type
305                .map(|py_data_type| py_data_type.into_inner())
306                .unwrap_or_else(|| fields[0].clone());
307
308            Ok(PyChunkedArray::try_new(
309                chunks,
310                Field::new("", field.data_type().clone(), true)
311                    .with_metadata(field.metadata().clone())
312                    .into(),
313            )?)
314        } else {
315            Err(
316                PyTypeError::new_err("Expected ChunkedArray-like input or sequence of arrays.")
317                    .into(),
318            )
319        }
320    }
321
322    #[pyo3(signature = (dtype=None, copy=None))]
323    #[allow(unused_variables)]
324    fn __array__<'py>(
325        &'py self,
326        py: Python<'py>,
327        dtype: Option<Bound<PyAny>>,
328        copy: Option<Bound<PyAny>>,
329    ) -> PyResult<Bound<'py, PyAny>> {
330        let chunk_refs = self
331            .chunks
332            .iter()
333            .map(|arr| arr.as_ref())
334            .collect::<Vec<_>>();
335        chunked_to_numpy(py, chunk_refs)
336    }
337
338    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
339        to_schema_pycapsule(py, self.field.as_ref())
340    }
341
342    #[pyo3(signature = (requested_schema=None))]
343    fn __arrow_c_stream__<'py>(
344        &'py self,
345        py: Python<'py>,
346        requested_schema: Option<Bound<'py, PyCapsule>>,
347    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
348        Self::to_stream_pycapsule(
349            py,
350            self.chunks.clone(),
351            self.field.clone(),
352            requested_schema,
353        )
354    }
355
356    fn __eq__(&self, other: &PyChunkedArray) -> bool {
357        self.field == other.field && self.chunks == other.chunks
358    }
359
360    fn __getitem__(&self, i: isize) -> PyArrowResult<PyScalar> {
361        // Handle negative indexes from the end
362        let mut i = if i < 0 {
363            let i = self.len() as isize + i;
364            if i < 0 {
365                return Err(PyIndexError::new_err("Index out of range").into());
366            }
367            i as usize
368        } else {
369            i as usize
370        };
371        if i >= self.len() {
372            return Err(PyIndexError::new_err("Index out of range").into());
373        }
374        for chunk in self.chunks() {
375            if i < chunk.len() {
376                return PyScalar::try_new(chunk.slice(i, 1), self.field.clone());
377            }
378            i -= chunk.len();
379        }
380        unreachable!("index in range but past end of last chunk")
381    }
382
383    fn __len__(&self) -> usize {
384        self.chunks.iter().fold(0, |acc, x| acc + x.len())
385    }
386
387    fn __repr__(&self) -> String {
388        self.to_string()
389    }
390
391    #[classmethod]
392    fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
393        input.into_chunked_array()
394    }
395
396    #[classmethod]
397    #[pyo3(name = "from_arrow_pycapsule")]
398    fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
399        Self::from_arrow_pycapsule(capsule)
400    }
401
402    fn cast(&self, target_type: PyField) -> PyArrowResult<Arro3ChunkedArray> {
403        let new_field = target_type.into_inner();
404        let new_chunks = self
405            .chunks
406            .iter()
407            .map(|chunk| cast(&chunk, new_field.data_type()))
408            .collect::<Result<Vec<_>, ArrowError>>()?;
409        Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into())
410    }
411
412    fn chunk(&self, i: usize) -> PyResult<Arro3Array> {
413        let field = self.field().clone();
414        let array = self
415            .chunks
416            .get(i)
417            .ok_or(PyValueError::new_err("out of index"))?
418            .clone();
419        Ok(PyArray::new(array, field).into())
420    }
421
422    #[getter]
423    #[pyo3(name = "chunks")]
424    fn chunks_py(&self) -> Vec<Arro3Array> {
425        let field = self.field().clone();
426        self.chunks
427            .iter()
428            .map(|array| PyArray::new(array.clone(), field.clone()).into())
429            .collect()
430    }
431
432    fn combine_chunks(&self) -> PyArrowResult<Arro3Array> {
433        let field = self.field().clone();
434        let arrays: Vec<&dyn Array> = self.chunks.iter().map(|arr| arr.as_ref()).collect();
435        Ok(PyArray::new(concat(&arrays)?, field).into())
436    }
437
438    fn equals(&self, other: PyChunkedArray) -> bool {
439        self.field == other.field && self.chunks == other.chunks
440    }
441
442    #[getter]
443    #[pyo3(name = "field")]
444    fn py_field(&self) -> Arro3Field {
445        PyField::new(self.field.clone()).into()
446    }
447
448    fn length(&self) -> usize {
449        self.len()
450    }
451
452    #[getter]
453    fn nbytes(&self) -> usize {
454        self.chunks
455            .iter()
456            .fold(0, |acc, batch| acc + batch.get_array_memory_size())
457    }
458
459    #[getter]
460    fn null_count(&self) -> usize {
461        self.chunks
462            .iter()
463            .fold(0, |acc, arr| acc + arr.null_count())
464    }
465
466    #[getter]
467    fn num_chunks(&self) -> usize {
468        self.chunks.len()
469    }
470
471    #[pyo3(signature = (*, max_chunksize=None))]
472    #[pyo3(name = "rechunk")]
473    fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
474        let max_chunksize = max_chunksize.unwrap_or(self.len());
475        let mut chunk_lengths = vec![];
476        let mut offset = 0;
477        while offset < self.len() {
478            let chunk_length = max_chunksize.min(self.len() - offset);
479            offset += chunk_length;
480            chunk_lengths.push(chunk_length);
481        }
482        Ok(self.rechunk(chunk_lengths)?.into())
483    }
484
485    #[pyo3(signature = (offset=0, length=None))]
486    #[pyo3(name = "slice")]
487    fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
488        let length = length.unwrap_or_else(|| self.len() - offset);
489        Ok(self.slice(offset, length)?.into())
490    }
491
492    fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
493        self.__array__(py, None, None)
494    }
495
496    fn to_pylist(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
497        let mut scalars = Vec::with_capacity(self.len());
498        for chunk in &self.chunks {
499            for i in 0..chunk.len() {
500                let scalar =
501                    unsafe { PyScalar::new_unchecked(chunk.slice(i, 1), self.field.clone()) };
502                scalars.push(scalar.as_py(py)?);
503            }
504        }
505        Ok(scalars)
506    }
507
508    #[getter]
509    fn r#type(&self) -> Arro3DataType {
510        PyDataType::new(self.field.data_type().clone()).into()
511    }
512}