1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow_array::{Array, ArrayRef};
5use arrow_cast::cast;
6use arrow_schema::{ArrowError, DataType, Field, FieldRef};
7use arrow_select::concat::concat;
8use pyo3::exceptions::{PyIndexError, PyTypeError, PyValueError};
9use pyo3::prelude::*;
10use pyo3::types::{PyCapsule, PyTuple, PyType};
11use pyo3::{intern, IntoPyObjectExt};
12
13use crate::error::{PyArrowError, PyArrowResult};
14use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field};
15use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
16use crate::ffi::from_python::utils::import_stream_pycapsule;
17use crate::ffi::to_python::chunked::ArrayIterator;
18use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
19use crate::ffi::to_python::to_stream_pycapsule;
20use crate::ffi::to_schema_pycapsule;
21use crate::input::AnyArray;
22use crate::interop::numpy::to_numpy::chunked_to_numpy;
23use crate::{PyArray, PyDataType, PyField, PyScalar};
24
25#[derive(Debug)]
29#[pyclass(module = "arro3.core._core", name = "ChunkedArray", subclass, frozen)]
30pub struct PyChunkedArray {
31 chunks: Vec<ArrayRef>,
32 field: FieldRef,
33}
34
35impl PyChunkedArray {
36 pub fn try_new(chunks: Vec<ArrayRef>, field: FieldRef) -> PyResult<Self> {
38 if !chunks
39 .iter()
40 .all(|chunk| chunk.data_type().equals_datatype(field.data_type()))
41 {
42 return Err(PyTypeError::new_err("All chunks must have same data type"));
43 }
44
45 Ok(Self { chunks, field })
46 }
47
48 pub fn data_type(&self) -> &DataType {
50 self.field.data_type()
51 }
52
53 pub fn from_array_refs(chunks: Vec<ArrayRef>) -> PyArrowResult<Self> {
56 if chunks.is_empty() {
57 return Err(ArrowError::SchemaError(
58 "Cannot infer data type from empty Vec<ArrayRef>".to_string(),
59 )
60 .into());
61 }
62
63 if !chunks
64 .windows(2)
65 .all(|w| w[0].data_type() == w[1].data_type())
66 {
67 return Err(ArrowError::SchemaError("Mismatched data types".to_string()).into());
68 }
69
70 let field = Field::new("", chunks.first().unwrap().data_type().clone(), true);
71 Ok(Self::try_new(chunks, Arc::new(field))?)
72 }
73
74 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
76 let stream = import_stream_pycapsule(capsule)?;
77
78 let stream_reader = ArrowArrayStreamReader::try_new(stream)
79 .map_err(|err| PyValueError::new_err(err.to_string()))?;
80
81 let field = stream_reader.field();
82
83 let mut chunks = vec![];
84 for array in stream_reader {
85 let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
86 chunks.push(array);
87 }
88
89 PyChunkedArray::try_new(chunks, field)
90 }
91
92 pub fn chunks(&self) -> &[ArrayRef] {
94 &self.chunks
95 }
96
97 pub fn field(&self) -> &FieldRef {
99 &self.field
100 }
101
102 pub fn into_inner(self) -> (Vec<ArrayRef>, FieldRef) {
104 (self.chunks, self.field)
105 }
106
107 #[allow(dead_code)]
108 pub(crate) fn is_empty(&self) -> bool {
109 self.len() == 0
110 }
111
112 pub(crate) fn len(&self) -> usize {
113 self.chunks.iter().fold(0, |acc, arr| acc + arr.len())
114 }
115
116 pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
117 let total_chunk_length = chunk_lengths.iter().sum::<usize>();
118 if total_chunk_length != self.length() {
119 return Err(PyValueError::new_err(
120 "Chunk lengths do not add up to chunked array length",
121 )
122 .into());
123 }
124
125 let matches_existing_chunking = chunk_lengths
127 .iter()
128 .zip(self.chunks())
129 .all(|(length, arr)| *length == arr.len());
130 if matches_existing_chunking {
131 return Ok(Self::try_new(self.chunks.clone(), self.field.clone())?);
132 }
133
134 let mut offset = 0;
135 let chunks = chunk_lengths
136 .iter()
137 .map(|chunk_length| {
138 let sliced_chunked_array = self.slice(offset, *chunk_length)?;
139 let arr_refs = sliced_chunked_array
140 .chunks
141 .iter()
142 .map(|a| a.as_ref())
143 .collect::<Vec<_>>();
144 let sliced_concatted = concat(&arr_refs)?;
145 offset += chunk_length;
146 Ok(sliced_concatted)
147 })
148 .collect::<PyArrowResult<Vec<_>>>()?;
149
150 Ok(PyChunkedArray::try_new(chunks, self.field.clone())?)
151 }
152
153 pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
154 if offset + length > self.length() {
155 return Err(
156 PyValueError::new_err("offset + length may not exceed length of array").into(),
157 );
158 }
159
160 let mut sliced_chunks: Vec<ArrayRef> = vec![];
161 for chunk in self.chunks() {
162 if chunk.is_empty() {
163 continue;
164 }
165
166 if offset >= chunk.len() {
169 offset -= chunk.len();
170 continue;
171 }
172
173 let take_count = length.min(chunk.len() - offset);
174 let sliced_chunk = chunk.slice(offset, take_count);
175 sliced_chunks.push(sliced_chunk);
176
177 length -= take_count;
178
179 if length == 0 {
181 break;
182 } else {
183 offset = 0;
184 }
185 }
186
187 Ok(Self::try_new(sliced_chunks, self.field.clone())?)
188 }
189
190 pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
192 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
193 arro3_mod
194 .getattr(intern!(py, "ChunkedArray"))?
195 .call_method1(
196 intern!(py, "from_arrow_pycapsule"),
197 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
198 )
199 }
200
201 pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
203 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
204 let capsule = Self::to_stream_pycapsule(py, self.chunks.clone(), self.field.clone(), None)?;
205 arro3_mod
206 .getattr(intern!(py, "ChunkedArray"))?
207 .call_method1(
208 intern!(py, "from_arrow_pycapsule"),
209 PyTuple::new(py, vec![capsule])?,
210 )
211 }
212 pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
214 to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
215 }
216
217 pub fn to_pyarrow(self, py: Python) -> PyResult<PyObject> {
221 let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
222 let pyarrow_obj = pyarrow_mod
223 .getattr(intern!(py, "chunked_array"))?
224 .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?;
225 pyarrow_obj.into_py_any(py)
226 }
227
228 pub(crate) fn to_stream_pycapsule<'py>(
229 py: Python<'py>,
230 chunks: Vec<ArrayRef>,
231 field: FieldRef,
232 requested_schema: Option<Bound<'py, PyCapsule>>,
233 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
234 let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field));
235 to_stream_pycapsule(py, array_reader, requested_schema)
236 }
237}
238
239impl TryFrom<Vec<ArrayRef>> for PyChunkedArray {
240 type Error = PyArrowError;
241
242 fn try_from(value: Vec<ArrayRef>) -> Result<Self, Self::Error> {
243 Self::from_array_refs(value)
244 }
245}
246
247impl AsRef<[ArrayRef]> for PyChunkedArray {
248 fn as_ref(&self) -> &[ArrayRef] {
249 &self.chunks
250 }
251}
252
253impl Display for PyChunkedArray {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 write!(f, "arro3.core.ChunkedArray<")?;
256 self.field.data_type().fmt(f)?;
257 writeln!(f, ">")?;
258 Ok(())
259 }
260}
261
262#[pymethods]
263impl PyChunkedArray {
264 #[new]
265 #[pyo3(signature = (arrays, r#type=None))]
266 fn init(arrays: &Bound<PyAny>, r#type: Option<PyField>) -> PyArrowResult<Self> {
267 if let Ok(data) = arrays.extract::<AnyArray>() {
268 Ok(data.into_chunked_array()?)
269 } else if let Ok(arrays) = arrays.extract::<Vec<PyArray>>() {
270 let (chunks, fields): (Vec<_>, Vec<_>) =
272 arrays.into_iter().map(|arr| arr.into_inner()).unzip();
273 if !fields
274 .windows(2)
275 .all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
276 {
277 return Err(PyTypeError::new_err(
278 "Cannot create a ChunkedArray with differing data types.",
279 )
280 .into());
281 }
282
283 let field = r#type
284 .map(|py_data_type| py_data_type.into_inner())
285 .unwrap_or_else(|| fields[0].clone());
286
287 Ok(PyChunkedArray::try_new(
288 chunks,
289 Field::new("", field.data_type().clone(), true)
290 .with_metadata(field.metadata().clone())
291 .into(),
292 )?)
293 } else {
294 Err(
295 PyTypeError::new_err("Expected ChunkedArray-like input or sequence of arrays.")
296 .into(),
297 )
298 }
299 }
300
301 #[pyo3(signature = (dtype=None, copy=None))]
302 #[allow(unused_variables)]
303 fn __array__<'py>(
304 &'py self,
305 py: Python<'py>,
306 dtype: Option<PyObject>,
307 copy: Option<PyObject>,
308 ) -> PyResult<Bound<'py, PyAny>> {
309 let chunk_refs = self
310 .chunks
311 .iter()
312 .map(|arr| arr.as_ref())
313 .collect::<Vec<_>>();
314 chunked_to_numpy(py, chunk_refs)
315 }
316
317 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
318 to_schema_pycapsule(py, self.field.as_ref())
319 }
320
321 #[pyo3(signature = (requested_schema=None))]
322 fn __arrow_c_stream__<'py>(
323 &'py self,
324 py: Python<'py>,
325 requested_schema: Option<Bound<'py, PyCapsule>>,
326 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
327 Self::to_stream_pycapsule(
328 py,
329 self.chunks.clone(),
330 self.field.clone(),
331 requested_schema,
332 )
333 }
334
335 fn __eq__(&self, other: &PyChunkedArray) -> bool {
336 self.field == other.field && self.chunks == other.chunks
337 }
338
339 fn __getitem__(&self, i: isize) -> PyArrowResult<PyScalar> {
340 let mut i = if i < 0 {
342 let i = self.len() as isize + i;
343 if i < 0 {
344 return Err(PyIndexError::new_err("Index out of range").into());
345 }
346 i as usize
347 } else {
348 i as usize
349 };
350 if i >= self.len() {
351 return Err(PyIndexError::new_err("Index out of range").into());
352 }
353 for chunk in self.chunks() {
354 if i < chunk.len() {
355 return PyScalar::try_new(chunk.slice(i, 1), self.field.clone());
356 }
357 i -= chunk.len();
358 }
359 unreachable!("index in range but past end of last chunk")
360 }
361
362 fn __len__(&self) -> usize {
363 self.chunks.iter().fold(0, |acc, x| acc + x.len())
364 }
365
366 fn __repr__(&self) -> String {
367 self.to_string()
368 }
369
370 #[classmethod]
371 fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
372 input.into_chunked_array()
373 }
374
375 #[classmethod]
376 #[pyo3(name = "from_arrow_pycapsule")]
377 fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
378 Self::from_arrow_pycapsule(capsule)
379 }
380
381 fn cast(&self, target_type: PyField) -> PyArrowResult<Arro3ChunkedArray> {
382 let new_field = target_type.into_inner();
383 let new_chunks = self
384 .chunks
385 .iter()
386 .map(|chunk| cast(&chunk, new_field.data_type()))
387 .collect::<Result<Vec<_>, ArrowError>>()?;
388 Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into())
389 }
390
391 fn chunk(&self, i: usize) -> PyResult<Arro3Array> {
392 let field = self.field().clone();
393 let array = self
394 .chunks
395 .get(i)
396 .ok_or(PyValueError::new_err("out of index"))?
397 .clone();
398 Ok(PyArray::new(array, field).into())
399 }
400
401 #[getter]
402 #[pyo3(name = "chunks")]
403 fn chunks_py(&self) -> Vec<Arro3Array> {
404 let field = self.field().clone();
405 self.chunks
406 .iter()
407 .map(|array| PyArray::new(array.clone(), field.clone()).into())
408 .collect()
409 }
410
411 fn combine_chunks(&self) -> PyArrowResult<Arro3Array> {
412 let field = self.field().clone();
413 let arrays: Vec<&dyn Array> = self.chunks.iter().map(|arr| arr.as_ref()).collect();
414 Ok(PyArray::new(concat(&arrays)?, field).into())
415 }
416
417 fn equals(&self, other: PyChunkedArray) -> bool {
418 self.field == other.field && self.chunks == other.chunks
419 }
420
421 #[getter]
422 #[pyo3(name = "field")]
423 fn py_field(&self) -> Arro3Field {
424 PyField::new(self.field.clone()).into()
425 }
426
427 fn length(&self) -> usize {
428 self.len()
429 }
430
431 #[getter]
432 fn nbytes(&self) -> usize {
433 self.chunks
434 .iter()
435 .fold(0, |acc, batch| acc + batch.get_array_memory_size())
436 }
437
438 #[getter]
439 fn null_count(&self) -> usize {
440 self.chunks
441 .iter()
442 .fold(0, |acc, arr| acc + arr.null_count())
443 }
444
445 #[getter]
446 fn num_chunks(&self) -> usize {
447 self.chunks.len()
448 }
449
450 #[pyo3(signature = (*, max_chunksize=None))]
451 #[pyo3(name = "rechunk")]
452 fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
453 let max_chunksize = max_chunksize.unwrap_or(self.len());
454 let mut chunk_lengths = vec![];
455 let mut offset = 0;
456 while offset < self.len() {
457 let chunk_length = max_chunksize.min(self.len() - offset);
458 offset += chunk_length;
459 chunk_lengths.push(chunk_length);
460 }
461 Ok(self.rechunk(chunk_lengths)?.into())
462 }
463
464 #[pyo3(signature = (offset=0, length=None))]
465 #[pyo3(name = "slice")]
466 fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
467 let length = length.unwrap_or_else(|| self.len() - offset);
468 Ok(self.slice(offset, length)?.into())
469 }
470
471 fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
472 self.__array__(py, None, None)
473 }
474
475 fn to_pylist(&self, py: Python) -> PyResult<PyObject> {
476 let mut scalars = Vec::with_capacity(self.len());
477 for chunk in &self.chunks {
478 for i in 0..chunk.len() {
479 let scalar =
480 unsafe { PyScalar::new_unchecked(chunk.slice(i, 1), self.field.clone()) };
481 scalars.push(scalar.as_py(py)?);
482 }
483 }
484 scalars.into_py_any(py)
485 }
486
487 #[getter]
488 fn r#type(&self) -> Arro3DataType {
489 PyDataType::new(self.field.data_type().clone()).into()
490 }
491}