pyo3_arrow/
chunked.rs

1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow::compute::concat;
5use arrow_array::{Array, ArrayRef};
6use arrow_schema::{ArrowError, DataType, Field, FieldRef};
7use pyo3::exceptions::{PyIndexError, PyTypeError, PyValueError};
8use pyo3::prelude::*;
9use pyo3::types::{PyCapsule, PyTuple, PyType};
10use pyo3::{intern, IntoPyObjectExt};
11
12use crate::error::{PyArrowError, PyArrowResult};
13use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field};
14use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
15use crate::ffi::from_python::utils::import_stream_pycapsule;
16use crate::ffi::to_python::chunked::ArrayIterator;
17use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
18use crate::ffi::to_python::to_stream_pycapsule;
19use crate::ffi::to_schema_pycapsule;
20use crate::input::AnyArray;
21use crate::interop::numpy::to_numpy::chunked_to_numpy;
22use crate::{PyArray, PyDataType, PyField, PyScalar};
23
24/// A Python-facing Arrow chunked array.
25///
26/// This is a wrapper around a [FieldRef] and a `Vec` of [ArrayRef].
27#[derive(Debug)]
28#[pyclass(module = "arro3.core._core", name = "ChunkedArray", subclass, frozen)]
29pub struct PyChunkedArray {
30    chunks: Vec<ArrayRef>,
31    field: FieldRef,
32}
33
34impl PyChunkedArray {
35    /// Construct a new [PyChunkedArray] from existing chunks and a field.
36    pub fn try_new(chunks: Vec<ArrayRef>, field: FieldRef) -> PyResult<Self> {
37        if !chunks
38            .iter()
39            .all(|chunk| chunk.data_type().equals_datatype(field.data_type()))
40        {
41            return Err(PyTypeError::new_err("All chunks must have same data type"));
42        }
43
44        Ok(Self { chunks, field })
45    }
46
47    /// Access the [DataType] of this ChunkedArray
48    pub fn data_type(&self) -> &DataType {
49        self.field.data_type()
50    }
51
52    /// Create a new PyChunkedArray from a vec of [ArrayRef]s, inferring their data type
53    /// automatically.
54    pub fn from_array_refs(chunks: Vec<ArrayRef>) -> PyArrowResult<Self> {
55        if chunks.is_empty() {
56            return Err(ArrowError::SchemaError(
57                "Cannot infer data type from empty Vec<ArrayRef>".to_string(),
58            )
59            .into());
60        }
61
62        if !chunks
63            .windows(2)
64            .all(|w| w[0].data_type() == w[1].data_type())
65        {
66            return Err(ArrowError::SchemaError("Mismatched data types".to_string()).into());
67        }
68
69        let field = Field::new("", chunks.first().unwrap().data_type().clone(), true);
70        Ok(Self::try_new(chunks, Arc::new(field))?)
71    }
72
73    /// Import from a raw Arrow C Stream capsule
74    pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
75        let stream = import_stream_pycapsule(capsule)?;
76
77        let stream_reader = ArrowArrayStreamReader::try_new(stream)
78            .map_err(|err| PyValueError::new_err(err.to_string()))?;
79
80        let field = stream_reader.field();
81
82        let mut chunks = vec![];
83        for array in stream_reader {
84            let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
85            chunks.push(array);
86        }
87
88        PyChunkedArray::try_new(chunks, field)
89    }
90
91    /// Access the underlying chunks.
92    pub fn chunks(&self) -> &[ArrayRef] {
93        &self.chunks
94    }
95
96    /// Access the underlying field.
97    pub fn field(&self) -> &FieldRef {
98        &self.field
99    }
100
101    /// Consume this and return its inner parts.
102    pub fn into_inner(self) -> (Vec<ArrayRef>, FieldRef) {
103        (self.chunks, self.field)
104    }
105
106    #[allow(dead_code)]
107    pub(crate) fn is_empty(&self) -> bool {
108        self.len() == 0
109    }
110
111    pub(crate) fn len(&self) -> usize {
112        self.chunks.iter().fold(0, |acc, arr| acc + arr.len())
113    }
114
115    pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
116        let total_chunk_length = chunk_lengths.iter().sum::<usize>();
117        if total_chunk_length != self.length() {
118            return Err(PyValueError::new_err(
119                "Chunk lengths do not add up to chunked array length",
120            )
121            .into());
122        }
123
124        // If the desired rechunking is the existing chunking, return early
125        let matches_existing_chunking = chunk_lengths
126            .iter()
127            .zip(self.chunks())
128            .all(|(length, arr)| *length == arr.len());
129        if matches_existing_chunking {
130            return Ok(Self::try_new(self.chunks.clone(), self.field.clone())?);
131        }
132
133        let mut offset = 0;
134        let chunks = chunk_lengths
135            .iter()
136            .map(|chunk_length| {
137                let sliced_chunked_array = self.slice(offset, *chunk_length)?;
138                let arr_refs = sliced_chunked_array
139                    .chunks
140                    .iter()
141                    .map(|a| a.as_ref())
142                    .collect::<Vec<_>>();
143                let sliced_concatted = concat(&arr_refs)?;
144                offset += chunk_length;
145                Ok(sliced_concatted)
146            })
147            .collect::<PyArrowResult<Vec<_>>>()?;
148
149        Ok(PyChunkedArray::try_new(chunks, self.field.clone())?)
150    }
151
152    pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
153        if offset + length > self.length() {
154            return Err(
155                PyValueError::new_err("offset + length may not exceed length of array").into(),
156            );
157        }
158
159        let mut sliced_chunks: Vec<ArrayRef> = vec![];
160        for chunk in self.chunks() {
161            if chunk.is_empty() {
162                continue;
163            }
164
165            // If the offset is greater than the len of this chunk, don't include any rows from
166            // this chunk
167            if offset >= chunk.len() {
168                offset -= chunk.len();
169                continue;
170            }
171
172            let take_count = length.min(chunk.len() - offset);
173            let sliced_chunk = chunk.slice(offset, take_count);
174            sliced_chunks.push(sliced_chunk);
175
176            length -= take_count;
177
178            // If we've selected all rows, exit
179            if length == 0 {
180                break;
181            } else {
182                offset = 0;
183            }
184        }
185
186        Ok(Self::try_new(sliced_chunks, self.field.clone())?)
187    }
188
189    /// Export this to a Python `arro3.core.ChunkedArray`.
190    pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
191        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
192        arro3_mod
193            .getattr(intern!(py, "ChunkedArray"))?
194            .call_method1(
195                intern!(py, "from_arrow_pycapsule"),
196                PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
197            )
198    }
199
200    /// Export this to a Python `arro3.core.ChunkedArray`.
201    pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
202        let arro3_mod = py.import(intern!(py, "arro3.core"))?;
203        let capsule = Self::to_stream_pycapsule(py, self.chunks.clone(), self.field.clone(), None)?;
204        arro3_mod
205            .getattr(intern!(py, "ChunkedArray"))?
206            .call_method1(
207                intern!(py, "from_arrow_pycapsule"),
208                PyTuple::new(py, vec![capsule])?,
209            )
210    }
211    /// Export this to a Python `nanoarrow.ArrayStream`.
212    pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
213        to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
214    }
215
216    /// Export to a pyarrow.ChunkedArray
217    ///
218    /// Requires pyarrow >=14
219    pub fn to_pyarrow(self, py: Python) -> PyResult<PyObject> {
220        let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
221        let pyarrow_obj = pyarrow_mod
222            .getattr(intern!(py, "chunked_array"))?
223            .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?;
224        pyarrow_obj.into_py_any(py)
225    }
226
227    pub(crate) fn to_stream_pycapsule<'py>(
228        py: Python<'py>,
229        chunks: Vec<ArrayRef>,
230        field: FieldRef,
231        requested_schema: Option<Bound<'py, PyCapsule>>,
232    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
233        let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field));
234        to_stream_pycapsule(py, array_reader, requested_schema)
235    }
236}
237
238impl TryFrom<Vec<ArrayRef>> for PyChunkedArray {
239    type Error = PyArrowError;
240
241    fn try_from(value: Vec<ArrayRef>) -> Result<Self, Self::Error> {
242        Self::from_array_refs(value)
243    }
244}
245
246impl AsRef<[ArrayRef]> for PyChunkedArray {
247    fn as_ref(&self) -> &[ArrayRef] {
248        &self.chunks
249    }
250}
251
252impl Display for PyChunkedArray {
253    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254        write!(f, "arro3.core.ChunkedArray<")?;
255        self.field.data_type().fmt(f)?;
256        writeln!(f, ">")?;
257        Ok(())
258    }
259}
260
261#[pymethods]
262impl PyChunkedArray {
263    #[new]
264    #[pyo3(signature = (arrays, r#type=None))]
265    fn init(arrays: &Bound<PyAny>, r#type: Option<PyField>) -> PyArrowResult<Self> {
266        if let Ok(data) = arrays.extract::<AnyArray>() {
267            Ok(data.into_chunked_array()?)
268        } else if let Ok(arrays) = arrays.extract::<Vec<PyArray>>() {
269            // TODO: move this into from_arrays?
270            let (chunks, fields): (Vec<_>, Vec<_>) =
271                arrays.into_iter().map(|arr| arr.into_inner()).unzip();
272            if !fields
273                .windows(2)
274                .all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
275            {
276                return Err(PyTypeError::new_err(
277                    "Cannot create a ChunkedArray with differing data types.",
278                )
279                .into());
280            }
281
282            let field = r#type
283                .map(|py_data_type| py_data_type.into_inner())
284                .unwrap_or_else(|| fields[0].clone());
285
286            Ok(PyChunkedArray::try_new(
287                chunks,
288                Field::new("", field.data_type().clone(), true)
289                    .with_metadata(field.metadata().clone())
290                    .into(),
291            )?)
292        } else {
293            Err(
294                PyTypeError::new_err("Expected ChunkedArray-like input or sequence of arrays.")
295                    .into(),
296            )
297        }
298    }
299
300    #[pyo3(signature = (dtype=None, copy=None))]
301    #[allow(unused_variables)]
302    fn __array__<'py>(
303        &'py self,
304        py: Python<'py>,
305        dtype: Option<PyObject>,
306        copy: Option<PyObject>,
307    ) -> PyResult<Bound<'py, PyAny>> {
308        let chunk_refs = self
309            .chunks
310            .iter()
311            .map(|arr| arr.as_ref())
312            .collect::<Vec<_>>();
313        chunked_to_numpy(py, chunk_refs)
314    }
315
316    fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
317        to_schema_pycapsule(py, self.field.as_ref())
318    }
319
320    #[pyo3(signature = (requested_schema=None))]
321    fn __arrow_c_stream__<'py>(
322        &'py self,
323        py: Python<'py>,
324        requested_schema: Option<Bound<'py, PyCapsule>>,
325    ) -> PyArrowResult<Bound<'py, PyCapsule>> {
326        Self::to_stream_pycapsule(
327            py,
328            self.chunks.clone(),
329            self.field.clone(),
330            requested_schema,
331        )
332    }
333
334    fn __eq__(&self, other: &PyChunkedArray) -> bool {
335        self.field == other.field && self.chunks == other.chunks
336    }
337
338    fn __getitem__(&self, i: isize) -> PyArrowResult<PyScalar> {
339        // Handle negative indexes from the end
340        let mut i = if i < 0 {
341            let i = self.len() as isize + i;
342            if i < 0 {
343                return Err(PyIndexError::new_err("Index out of range").into());
344            }
345            i as usize
346        } else {
347            i as usize
348        };
349        if i >= self.len() {
350            return Err(PyIndexError::new_err("Index out of range").into());
351        }
352        for chunk in self.chunks() {
353            if i < chunk.len() {
354                return PyScalar::try_new(chunk.slice(i, 1), self.field.clone());
355            }
356            i -= chunk.len();
357        }
358        unreachable!("index in range but past end of last chunk")
359    }
360
361    fn __len__(&self) -> usize {
362        self.chunks.iter().fold(0, |acc, x| acc + x.len())
363    }
364
365    fn __repr__(&self) -> String {
366        self.to_string()
367    }
368
369    #[classmethod]
370    fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
371        input.into_chunked_array()
372    }
373
374    #[classmethod]
375    #[pyo3(name = "from_arrow_pycapsule")]
376    fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
377        Self::from_arrow_pycapsule(capsule)
378    }
379
380    fn cast(&self, target_type: PyField) -> PyArrowResult<Arro3ChunkedArray> {
381        let new_field = target_type.into_inner();
382        let new_chunks = self
383            .chunks
384            .iter()
385            .map(|chunk| arrow::compute::cast(&chunk, new_field.data_type()))
386            .collect::<Result<Vec<_>, ArrowError>>()?;
387        Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into())
388    }
389
390    fn chunk(&self, i: usize) -> PyResult<Arro3Array> {
391        let field = self.field().clone();
392        let array = self
393            .chunks
394            .get(i)
395            .ok_or(PyValueError::new_err("out of index"))?
396            .clone();
397        Ok(PyArray::new(array, field).into())
398    }
399
400    #[getter]
401    #[pyo3(name = "chunks")]
402    fn chunks_py(&self) -> Vec<Arro3Array> {
403        let field = self.field().clone();
404        self.chunks
405            .iter()
406            .map(|array| PyArray::new(array.clone(), field.clone()).into())
407            .collect()
408    }
409
410    fn combine_chunks(&self) -> PyArrowResult<Arro3Array> {
411        let field = self.field().clone();
412        let arrays: Vec<&dyn Array> = self.chunks.iter().map(|arr| arr.as_ref()).collect();
413        Ok(PyArray::new(concat(&arrays)?, field).into())
414    }
415
416    fn equals(&self, other: PyChunkedArray) -> bool {
417        self.field == other.field && self.chunks == other.chunks
418    }
419
420    #[getter]
421    #[pyo3(name = "field")]
422    fn py_field(&self) -> Arro3Field {
423        PyField::new(self.field.clone()).into()
424    }
425
426    fn length(&self) -> usize {
427        self.len()
428    }
429
430    #[getter]
431    fn nbytes(&self) -> usize {
432        self.chunks
433            .iter()
434            .fold(0, |acc, batch| acc + batch.get_array_memory_size())
435    }
436
437    #[getter]
438    fn null_count(&self) -> usize {
439        self.chunks
440            .iter()
441            .fold(0, |acc, arr| acc + arr.null_count())
442    }
443
444    #[getter]
445    fn num_chunks(&self) -> usize {
446        self.chunks.len()
447    }
448
449    #[pyo3(signature = (*, max_chunksize=None))]
450    #[pyo3(name = "rechunk")]
451    fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
452        let max_chunksize = max_chunksize.unwrap_or(self.len());
453        let mut chunk_lengths = vec![];
454        let mut offset = 0;
455        while offset < self.len() {
456            let chunk_length = max_chunksize.min(self.len() - offset);
457            offset += chunk_length;
458            chunk_lengths.push(chunk_length);
459        }
460        Ok(self.rechunk(chunk_lengths)?.into())
461    }
462
463    #[pyo3(signature = (offset=0, length=None))]
464    #[pyo3(name = "slice")]
465    fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
466        let length = length.unwrap_or_else(|| self.len() - offset);
467        Ok(self.slice(offset, length)?.into())
468    }
469
470    fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
471        self.__array__(py, None, None)
472    }
473
474    fn to_pylist(&self, py: Python) -> PyResult<PyObject> {
475        let mut scalars = Vec::with_capacity(self.len());
476        for chunk in &self.chunks {
477            for i in 0..chunk.len() {
478                let scalar =
479                    unsafe { PyScalar::new_unchecked(chunk.slice(i, 1), self.field.clone()) };
480                scalars.push(scalar.as_py(py)?);
481            }
482        }
483        scalars.into_py_any(py)
484    }
485
486    #[getter]
487    fn r#type(&self) -> Arro3DataType {
488        PyDataType::new(self.field.data_type().clone()).into()
489    }
490}