1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow::compute::concat;
5use arrow_array::{Array, ArrayRef};
6use arrow_schema::{ArrowError, DataType, Field, FieldRef};
7use pyo3::exceptions::{PyIndexError, PyTypeError, PyValueError};
8use pyo3::prelude::*;
9use pyo3::types::{PyCapsule, PyTuple, PyType};
10use pyo3::{intern, IntoPyObjectExt};
11
12use crate::error::{PyArrowError, PyArrowResult};
13use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field};
14use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
15use crate::ffi::from_python::utils::import_stream_pycapsule;
16use crate::ffi::to_python::chunked::ArrayIterator;
17use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
18use crate::ffi::to_python::to_stream_pycapsule;
19use crate::ffi::to_schema_pycapsule;
20use crate::input::AnyArray;
21use crate::interop::numpy::to_numpy::chunked_to_numpy;
22use crate::{PyArray, PyDataType, PyField, PyScalar};
23
24#[derive(Debug)]
28#[pyclass(module = "arro3.core._core", name = "ChunkedArray", subclass, frozen)]
29pub struct PyChunkedArray {
30 chunks: Vec<ArrayRef>,
31 field: FieldRef,
32}
33
34impl PyChunkedArray {
35 pub fn try_new(chunks: Vec<ArrayRef>, field: FieldRef) -> PyResult<Self> {
37 if !chunks
38 .iter()
39 .all(|chunk| chunk.data_type().equals_datatype(field.data_type()))
40 {
41 return Err(PyTypeError::new_err("All chunks must have same data type"));
42 }
43
44 Ok(Self { chunks, field })
45 }
46
47 pub fn data_type(&self) -> &DataType {
49 self.field.data_type()
50 }
51
52 pub fn from_array_refs(chunks: Vec<ArrayRef>) -> PyArrowResult<Self> {
55 if chunks.is_empty() {
56 return Err(ArrowError::SchemaError(
57 "Cannot infer data type from empty Vec<ArrayRef>".to_string(),
58 )
59 .into());
60 }
61
62 if !chunks
63 .windows(2)
64 .all(|w| w[0].data_type() == w[1].data_type())
65 {
66 return Err(ArrowError::SchemaError("Mismatched data types".to_string()).into());
67 }
68
69 let field = Field::new("", chunks.first().unwrap().data_type().clone(), true);
70 Ok(Self::try_new(chunks, Arc::new(field))?)
71 }
72
73 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
75 let stream = import_stream_pycapsule(capsule)?;
76
77 let stream_reader = ArrowArrayStreamReader::try_new(stream)
78 .map_err(|err| PyValueError::new_err(err.to_string()))?;
79
80 let field = stream_reader.field();
81
82 let mut chunks = vec![];
83 for array in stream_reader {
84 let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
85 chunks.push(array);
86 }
87
88 PyChunkedArray::try_new(chunks, field)
89 }
90
91 pub fn chunks(&self) -> &[ArrayRef] {
93 &self.chunks
94 }
95
96 pub fn field(&self) -> &FieldRef {
98 &self.field
99 }
100
101 pub fn into_inner(self) -> (Vec<ArrayRef>, FieldRef) {
103 (self.chunks, self.field)
104 }
105
106 #[allow(dead_code)]
107 pub(crate) fn is_empty(&self) -> bool {
108 self.len() == 0
109 }
110
111 pub(crate) fn len(&self) -> usize {
112 self.chunks.iter().fold(0, |acc, arr| acc + arr.len())
113 }
114
115 pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
116 let total_chunk_length = chunk_lengths.iter().sum::<usize>();
117 if total_chunk_length != self.length() {
118 return Err(PyValueError::new_err(
119 "Chunk lengths do not add up to chunked array length",
120 )
121 .into());
122 }
123
124 let matches_existing_chunking = chunk_lengths
126 .iter()
127 .zip(self.chunks())
128 .all(|(length, arr)| *length == arr.len());
129 if matches_existing_chunking {
130 return Ok(Self::try_new(self.chunks.clone(), self.field.clone())?);
131 }
132
133 let mut offset = 0;
134 let chunks = chunk_lengths
135 .iter()
136 .map(|chunk_length| {
137 let sliced_chunked_array = self.slice(offset, *chunk_length)?;
138 let arr_refs = sliced_chunked_array
139 .chunks
140 .iter()
141 .map(|a| a.as_ref())
142 .collect::<Vec<_>>();
143 let sliced_concatted = concat(&arr_refs)?;
144 offset += chunk_length;
145 Ok(sliced_concatted)
146 })
147 .collect::<PyArrowResult<Vec<_>>>()?;
148
149 Ok(PyChunkedArray::try_new(chunks, self.field.clone())?)
150 }
151
152 pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
153 if offset + length > self.length() {
154 return Err(
155 PyValueError::new_err("offset + length may not exceed length of array").into(),
156 );
157 }
158
159 let mut sliced_chunks: Vec<ArrayRef> = vec![];
160 for chunk in self.chunks() {
161 if chunk.is_empty() {
162 continue;
163 }
164
165 if offset >= chunk.len() {
168 offset -= chunk.len();
169 continue;
170 }
171
172 let take_count = length.min(chunk.len() - offset);
173 let sliced_chunk = chunk.slice(offset, take_count);
174 sliced_chunks.push(sliced_chunk);
175
176 length -= take_count;
177
178 if length == 0 {
180 break;
181 } else {
182 offset = 0;
183 }
184 }
185
186 Ok(Self::try_new(sliced_chunks, self.field.clone())?)
187 }
188
189 pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
191 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
192 arro3_mod
193 .getattr(intern!(py, "ChunkedArray"))?
194 .call_method1(
195 intern!(py, "from_arrow_pycapsule"),
196 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
197 )
198 }
199
200 pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
202 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
203 let capsule = Self::to_stream_pycapsule(py, self.chunks.clone(), self.field.clone(), None)?;
204 arro3_mod
205 .getattr(intern!(py, "ChunkedArray"))?
206 .call_method1(
207 intern!(py, "from_arrow_pycapsule"),
208 PyTuple::new(py, vec![capsule])?,
209 )
210 }
211 pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
213 to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
214 }
215
216 pub fn to_pyarrow(self, py: Python) -> PyResult<PyObject> {
220 let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
221 let pyarrow_obj = pyarrow_mod
222 .getattr(intern!(py, "chunked_array"))?
223 .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)?;
224 pyarrow_obj.into_py_any(py)
225 }
226
227 pub(crate) fn to_stream_pycapsule<'py>(
228 py: Python<'py>,
229 chunks: Vec<ArrayRef>,
230 field: FieldRef,
231 requested_schema: Option<Bound<'py, PyCapsule>>,
232 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
233 let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field));
234 to_stream_pycapsule(py, array_reader, requested_schema)
235 }
236}
237
238impl TryFrom<Vec<ArrayRef>> for PyChunkedArray {
239 type Error = PyArrowError;
240
241 fn try_from(value: Vec<ArrayRef>) -> Result<Self, Self::Error> {
242 Self::from_array_refs(value)
243 }
244}
245
246impl AsRef<[ArrayRef]> for PyChunkedArray {
247 fn as_ref(&self) -> &[ArrayRef] {
248 &self.chunks
249 }
250}
251
252impl Display for PyChunkedArray {
253 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
254 write!(f, "arro3.core.ChunkedArray<")?;
255 self.field.data_type().fmt(f)?;
256 writeln!(f, ">")?;
257 Ok(())
258 }
259}
260
261#[pymethods]
262impl PyChunkedArray {
263 #[new]
264 #[pyo3(signature = (arrays, r#type=None))]
265 fn init(arrays: &Bound<PyAny>, r#type: Option<PyField>) -> PyArrowResult<Self> {
266 if let Ok(data) = arrays.extract::<AnyArray>() {
267 Ok(data.into_chunked_array()?)
268 } else if let Ok(arrays) = arrays.extract::<Vec<PyArray>>() {
269 let (chunks, fields): (Vec<_>, Vec<_>) =
271 arrays.into_iter().map(|arr| arr.into_inner()).unzip();
272 if !fields
273 .windows(2)
274 .all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
275 {
276 return Err(PyTypeError::new_err(
277 "Cannot create a ChunkedArray with differing data types.",
278 )
279 .into());
280 }
281
282 let field = r#type
283 .map(|py_data_type| py_data_type.into_inner())
284 .unwrap_or_else(|| fields[0].clone());
285
286 Ok(PyChunkedArray::try_new(
287 chunks,
288 Field::new("", field.data_type().clone(), true)
289 .with_metadata(field.metadata().clone())
290 .into(),
291 )?)
292 } else {
293 Err(
294 PyTypeError::new_err("Expected ChunkedArray-like input or sequence of arrays.")
295 .into(),
296 )
297 }
298 }
299
300 #[pyo3(signature = (dtype=None, copy=None))]
301 #[allow(unused_variables)]
302 fn __array__<'py>(
303 &'py self,
304 py: Python<'py>,
305 dtype: Option<PyObject>,
306 copy: Option<PyObject>,
307 ) -> PyResult<Bound<'py, PyAny>> {
308 let chunk_refs = self
309 .chunks
310 .iter()
311 .map(|arr| arr.as_ref())
312 .collect::<Vec<_>>();
313 chunked_to_numpy(py, chunk_refs)
314 }
315
316 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
317 to_schema_pycapsule(py, self.field.as_ref())
318 }
319
320 #[pyo3(signature = (requested_schema=None))]
321 fn __arrow_c_stream__<'py>(
322 &'py self,
323 py: Python<'py>,
324 requested_schema: Option<Bound<'py, PyCapsule>>,
325 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
326 Self::to_stream_pycapsule(
327 py,
328 self.chunks.clone(),
329 self.field.clone(),
330 requested_schema,
331 )
332 }
333
334 fn __eq__(&self, other: &PyChunkedArray) -> bool {
335 self.field == other.field && self.chunks == other.chunks
336 }
337
338 fn __getitem__(&self, i: isize) -> PyArrowResult<PyScalar> {
339 let mut i = if i < 0 {
341 let i = self.len() as isize + i;
342 if i < 0 {
343 return Err(PyIndexError::new_err("Index out of range").into());
344 }
345 i as usize
346 } else {
347 i as usize
348 };
349 if i >= self.len() {
350 return Err(PyIndexError::new_err("Index out of range").into());
351 }
352 for chunk in self.chunks() {
353 if i < chunk.len() {
354 return PyScalar::try_new(chunk.slice(i, 1), self.field.clone());
355 }
356 i -= chunk.len();
357 }
358 unreachable!("index in range but past end of last chunk")
359 }
360
361 fn __len__(&self) -> usize {
362 self.chunks.iter().fold(0, |acc, x| acc + x.len())
363 }
364
365 fn __repr__(&self) -> String {
366 self.to_string()
367 }
368
369 #[classmethod]
370 fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
371 input.into_chunked_array()
372 }
373
374 #[classmethod]
375 #[pyo3(name = "from_arrow_pycapsule")]
376 fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
377 Self::from_arrow_pycapsule(capsule)
378 }
379
380 fn cast(&self, target_type: PyField) -> PyArrowResult<Arro3ChunkedArray> {
381 let new_field = target_type.into_inner();
382 let new_chunks = self
383 .chunks
384 .iter()
385 .map(|chunk| arrow::compute::cast(&chunk, new_field.data_type()))
386 .collect::<Result<Vec<_>, ArrowError>>()?;
387 Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into())
388 }
389
390 fn chunk(&self, i: usize) -> PyResult<Arro3Array> {
391 let field = self.field().clone();
392 let array = self
393 .chunks
394 .get(i)
395 .ok_or(PyValueError::new_err("out of index"))?
396 .clone();
397 Ok(PyArray::new(array, field).into())
398 }
399
400 #[getter]
401 #[pyo3(name = "chunks")]
402 fn chunks_py(&self) -> Vec<Arro3Array> {
403 let field = self.field().clone();
404 self.chunks
405 .iter()
406 .map(|array| PyArray::new(array.clone(), field.clone()).into())
407 .collect()
408 }
409
410 fn combine_chunks(&self) -> PyArrowResult<Arro3Array> {
411 let field = self.field().clone();
412 let arrays: Vec<&dyn Array> = self.chunks.iter().map(|arr| arr.as_ref()).collect();
413 Ok(PyArray::new(concat(&arrays)?, field).into())
414 }
415
416 fn equals(&self, other: PyChunkedArray) -> bool {
417 self.field == other.field && self.chunks == other.chunks
418 }
419
420 #[getter]
421 #[pyo3(name = "field")]
422 fn py_field(&self) -> Arro3Field {
423 PyField::new(self.field.clone()).into()
424 }
425
426 fn length(&self) -> usize {
427 self.len()
428 }
429
430 #[getter]
431 fn nbytes(&self) -> usize {
432 self.chunks
433 .iter()
434 .fold(0, |acc, batch| acc + batch.get_array_memory_size())
435 }
436
437 #[getter]
438 fn null_count(&self) -> usize {
439 self.chunks
440 .iter()
441 .fold(0, |acc, arr| acc + arr.null_count())
442 }
443
444 #[getter]
445 fn num_chunks(&self) -> usize {
446 self.chunks.len()
447 }
448
449 #[pyo3(signature = (*, max_chunksize=None))]
450 #[pyo3(name = "rechunk")]
451 fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
452 let max_chunksize = max_chunksize.unwrap_or(self.len());
453 let mut chunk_lengths = vec![];
454 let mut offset = 0;
455 while offset < self.len() {
456 let chunk_length = max_chunksize.min(self.len() - offset);
457 offset += chunk_length;
458 chunk_lengths.push(chunk_length);
459 }
460 Ok(self.rechunk(chunk_lengths)?.into())
461 }
462
463 #[pyo3(signature = (offset=0, length=None))]
464 #[pyo3(name = "slice")]
465 fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
466 let length = length.unwrap_or_else(|| self.len() - offset);
467 Ok(self.slice(offset, length)?.into())
468 }
469
470 fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
471 self.__array__(py, None, None)
472 }
473
474 fn to_pylist(&self, py: Python) -> PyResult<PyObject> {
475 let mut scalars = Vec::with_capacity(self.len());
476 for chunk in &self.chunks {
477 for i in 0..chunk.len() {
478 let scalar =
479 unsafe { PyScalar::new_unchecked(chunk.slice(i, 1), self.field.clone()) };
480 scalars.push(scalar.as_py(py)?);
481 }
482 }
483 scalars.into_py_any(py)
484 }
485
486 #[getter]
487 fn r#type(&self) -> Arro3DataType {
488 PyDataType::new(self.field.data_type().clone()).into()
489 }
490}