1use std::fmt::Display;
2use std::sync::Arc;
3
4use arrow_array::{Array, ArrayRef};
5use arrow_cast::cast;
6use arrow_cast::display::ArrayFormatter;
7use arrow_schema::{ArrowError, DataType, Field, FieldRef};
8use arrow_select::concat::concat;
9use pyo3::exceptions::{PyIndexError, PyTypeError, PyValueError};
10use pyo3::intern;
11use pyo3::prelude::*;
12use pyo3::types::{PyCapsule, PyTuple, PyType};
13
14use crate::error::{PyArrowError, PyArrowResult};
15use crate::export::{Arro3Array, Arro3ChunkedArray, Arro3DataType, Arro3Field};
16use crate::ffi::from_python::ffi_stream::ArrowArrayStreamReader;
17use crate::ffi::from_python::utils::import_stream_pycapsule;
18use crate::ffi::to_python::chunked::ArrayIterator;
19use crate::ffi::to_python::nanoarrow::to_nanoarrow_array_stream;
20use crate::ffi::to_python::to_stream_pycapsule;
21use crate::ffi::to_schema_pycapsule;
22use crate::input::AnyArray;
23use crate::interop::numpy::to_numpy::chunked_to_numpy;
24use crate::utils::default_repr_options;
25use crate::{PyArray, PyDataType, PyField, PyScalar};
26
27#[derive(Debug)]
31#[pyclass(module = "arro3.core._core", name = "ChunkedArray", subclass, frozen)]
32pub struct PyChunkedArray {
33 chunks: Vec<ArrayRef>,
34 field: FieldRef,
35}
36
37impl PyChunkedArray {
38 pub fn try_new(chunks: Vec<ArrayRef>, field: FieldRef) -> PyResult<Self> {
40 if !chunks
41 .iter()
42 .all(|chunk| chunk.data_type().equals_datatype(field.data_type()))
43 {
44 return Err(PyTypeError::new_err("All chunks must have same data type"));
45 }
46
47 Ok(Self { chunks, field })
48 }
49
50 pub fn data_type(&self) -> &DataType {
52 self.field.data_type()
53 }
54
55 pub fn from_array_refs(chunks: Vec<ArrayRef>) -> PyArrowResult<Self> {
58 if chunks.is_empty() {
59 return Err(ArrowError::SchemaError(
60 "Cannot infer data type from empty Vec<ArrayRef>".to_string(),
61 )
62 .into());
63 }
64
65 if !chunks
66 .windows(2)
67 .all(|w| w[0].data_type() == w[1].data_type())
68 {
69 return Err(ArrowError::SchemaError("Mismatched data types".to_string()).into());
70 }
71
72 let field = Field::new("", chunks.first().unwrap().data_type().clone(), true);
73 Ok(Self::try_new(chunks, Arc::new(field))?)
74 }
75
76 pub fn from_arrow_pycapsule(capsule: &Bound<PyCapsule>) -> PyResult<Self> {
78 let stream = import_stream_pycapsule(capsule)?;
79
80 let stream_reader = ArrowArrayStreamReader::try_new(stream)
81 .map_err(|err| PyValueError::new_err(err.to_string()))?;
82
83 let field = stream_reader.field();
84
85 let mut chunks = vec![];
86 for array in stream_reader {
87 let array = array.map_err(|err| PyTypeError::new_err(err.to_string()))?;
88 chunks.push(array);
89 }
90
91 PyChunkedArray::try_new(chunks, field)
92 }
93
94 pub fn chunks(&self) -> &[ArrayRef] {
96 &self.chunks
97 }
98
99 pub fn field(&self) -> &FieldRef {
101 &self.field
102 }
103
104 pub fn into_inner(self) -> (Vec<ArrayRef>, FieldRef) {
106 (self.chunks, self.field)
107 }
108
109 #[allow(dead_code)]
110 pub(crate) fn is_empty(&self) -> bool {
111 self.len() == 0
112 }
113
114 pub(crate) fn len(&self) -> usize {
115 self.chunks.iter().fold(0, |acc, arr| acc + arr.len())
116 }
117
118 pub(crate) fn rechunk(&self, chunk_lengths: Vec<usize>) -> PyArrowResult<Self> {
119 let total_chunk_length = chunk_lengths.iter().sum::<usize>();
120 if total_chunk_length != self.length() {
121 return Err(PyValueError::new_err(
122 "Chunk lengths do not add up to chunked array length",
123 )
124 .into());
125 }
126
127 let matches_existing_chunking = chunk_lengths
129 .iter()
130 .zip(self.chunks())
131 .all(|(length, arr)| *length == arr.len());
132 if matches_existing_chunking {
133 return Ok(Self::try_new(self.chunks.clone(), self.field.clone())?);
134 }
135
136 let mut offset = 0;
137 let chunks = chunk_lengths
138 .iter()
139 .map(|chunk_length| {
140 let sliced_chunked_array = self.slice(offset, *chunk_length)?;
141 let arr_refs = sliced_chunked_array
142 .chunks
143 .iter()
144 .map(|a| a.as_ref())
145 .collect::<Vec<_>>();
146 let sliced_concatted = concat(&arr_refs)?;
147 offset += chunk_length;
148 Ok(sliced_concatted)
149 })
150 .collect::<PyArrowResult<Vec<_>>>()?;
151
152 Ok(PyChunkedArray::try_new(chunks, self.field.clone())?)
153 }
154
155 pub(crate) fn slice(&self, mut offset: usize, mut length: usize) -> PyArrowResult<Self> {
156 if offset + length > self.length() {
157 return Err(
158 PyValueError::new_err("offset + length may not exceed length of array").into(),
159 );
160 }
161
162 let mut sliced_chunks: Vec<ArrayRef> = vec![];
163 for chunk in self.chunks() {
164 if chunk.is_empty() {
165 continue;
166 }
167
168 if offset >= chunk.len() {
171 offset -= chunk.len();
172 continue;
173 }
174
175 let take_count = length.min(chunk.len() - offset);
176 let sliced_chunk = chunk.slice(offset, take_count);
177 sliced_chunks.push(sliced_chunk);
178
179 length -= take_count;
180
181 if length == 0 {
183 break;
184 } else {
185 offset = 0;
186 }
187 }
188
189 Ok(Self::try_new(sliced_chunks, self.field.clone())?)
190 }
191
192 pub fn to_arro3<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
194 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
195 arro3_mod
196 .getattr(intern!(py, "ChunkedArray"))?
197 .call_method1(
198 intern!(py, "from_arrow_pycapsule"),
199 PyTuple::new(py, vec![self.__arrow_c_stream__(py, None)?])?,
200 )
201 }
202
203 pub fn into_arro3(self, py: Python) -> PyResult<Bound<PyAny>> {
205 let arro3_mod = py.import(intern!(py, "arro3.core"))?;
206 let capsule = Self::to_stream_pycapsule(py, self.chunks.clone(), self.field.clone(), None)?;
207 arro3_mod
208 .getattr(intern!(py, "ChunkedArray"))?
209 .call_method1(
210 intern!(py, "from_arrow_pycapsule"),
211 PyTuple::new(py, vec![capsule])?,
212 )
213 }
214 pub fn to_nanoarrow<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
216 to_nanoarrow_array_stream(py, &self.__arrow_c_stream__(py, None)?)
217 }
218
219 pub fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
223 let pyarrow_mod = py.import(intern!(py, "pyarrow"))?;
224 pyarrow_mod
225 .getattr(intern!(py, "chunked_array"))?
226 .call1(PyTuple::new(py, vec![self.into_pyobject(py)?])?)
227 }
228
229 pub(crate) fn to_stream_pycapsule<'py>(
230 py: Python<'py>,
231 chunks: Vec<ArrayRef>,
232 field: FieldRef,
233 requested_schema: Option<Bound<'py, PyCapsule>>,
234 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
235 let array_reader = Box::new(ArrayIterator::new(chunks.into_iter().map(Ok), field));
236 to_stream_pycapsule(py, array_reader, requested_schema)
237 }
238}
239
240impl TryFrom<Vec<ArrayRef>> for PyChunkedArray {
241 type Error = PyArrowError;
242
243 fn try_from(value: Vec<ArrayRef>) -> Result<Self, Self::Error> {
244 Self::from_array_refs(value)
245 }
246}
247
248impl AsRef<[ArrayRef]> for PyChunkedArray {
249 fn as_ref(&self) -> &[ArrayRef] {
250 &self.chunks
251 }
252}
253
254impl Display for PyChunkedArray {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 write!(f, "arro3.core.ChunkedArray<")?;
257 self.field.data_type().fmt(f)?;
258 writeln!(f, ">")?;
259
260 let options = default_repr_options();
261
262 writeln!(f, "[")?;
263 for chunk in self.chunks.iter().take(5) {
264 writeln!(f, " [")?;
265 let formatter =
266 ArrayFormatter::try_new(chunk, &options).map_err(|_| std::fmt::Error)?;
267 for i in 0..chunk.len().min(10) {
268 let row = formatter.value(i);
269 writeln!(f, " {},", row)?;
270 }
271 writeln!(f, " ]")?;
272 }
273 writeln!(f, "]")?;
274
275 Ok(())
276 }
277}
278
279#[pymethods]
280impl PyChunkedArray {
281 #[new]
282 #[pyo3(signature = (arrays, r#type=None))]
283 fn init(py: Python, arrays: &Bound<PyAny>, r#type: Option<PyField>) -> PyArrowResult<Self> {
284 if arrays.hasattr(intern!(py, "__arrow_c_array__"))?
285 || arrays.hasattr(intern!(py, "__arrow_c_stream__"))?
286 {
287 Ok(arrays.extract::<AnyArray>()?.into_chunked_array()?)
288 } else if let Ok(data) = arrays.extract::<AnyArray>() {
289 Ok(data.into_chunked_array()?)
290 } else if let Ok(arrays) = arrays.extract::<Vec<PyArray>>() {
291 let (chunks, fields): (Vec<_>, Vec<_>) =
293 arrays.into_iter().map(|arr| arr.into_inner()).unzip();
294 if !fields
295 .windows(2)
296 .all(|w| w[0].data_type().equals_datatype(w[1].data_type()))
297 {
298 return Err(PyTypeError::new_err(
299 "Cannot create a ChunkedArray with differing data types.",
300 )
301 .into());
302 }
303
304 let field = r#type
305 .map(|py_data_type| py_data_type.into_inner())
306 .unwrap_or_else(|| fields[0].clone());
307
308 Ok(PyChunkedArray::try_new(
309 chunks,
310 Field::new("", field.data_type().clone(), true)
311 .with_metadata(field.metadata().clone())
312 .into(),
313 )?)
314 } else {
315 Err(
316 PyTypeError::new_err("Expected ChunkedArray-like input or sequence of arrays.")
317 .into(),
318 )
319 }
320 }
321
322 #[pyo3(signature = (dtype=None, copy=None))]
323 #[allow(unused_variables)]
324 fn __array__<'py>(
325 &'py self,
326 py: Python<'py>,
327 dtype: Option<Bound<PyAny>>,
328 copy: Option<Bound<PyAny>>,
329 ) -> PyResult<Bound<'py, PyAny>> {
330 let chunk_refs = self
331 .chunks
332 .iter()
333 .map(|arr| arr.as_ref())
334 .collect::<Vec<_>>();
335 chunked_to_numpy(py, chunk_refs)
336 }
337
338 fn __arrow_c_schema__<'py>(&'py self, py: Python<'py>) -> PyArrowResult<Bound<'py, PyCapsule>> {
339 to_schema_pycapsule(py, self.field.as_ref())
340 }
341
342 #[pyo3(signature = (requested_schema=None))]
343 fn __arrow_c_stream__<'py>(
344 &'py self,
345 py: Python<'py>,
346 requested_schema: Option<Bound<'py, PyCapsule>>,
347 ) -> PyArrowResult<Bound<'py, PyCapsule>> {
348 Self::to_stream_pycapsule(
349 py,
350 self.chunks.clone(),
351 self.field.clone(),
352 requested_schema,
353 )
354 }
355
356 fn __eq__(&self, other: &PyChunkedArray) -> bool {
357 self.field == other.field && self.chunks == other.chunks
358 }
359
360 fn __getitem__(&self, i: isize) -> PyArrowResult<PyScalar> {
361 let mut i = if i < 0 {
363 let i = self.len() as isize + i;
364 if i < 0 {
365 return Err(PyIndexError::new_err("Index out of range").into());
366 }
367 i as usize
368 } else {
369 i as usize
370 };
371 if i >= self.len() {
372 return Err(PyIndexError::new_err("Index out of range").into());
373 }
374 for chunk in self.chunks() {
375 if i < chunk.len() {
376 return PyScalar::try_new(chunk.slice(i, 1), self.field.clone());
377 }
378 i -= chunk.len();
379 }
380 unreachable!("index in range but past end of last chunk")
381 }
382
383 fn __len__(&self) -> usize {
384 self.chunks.iter().fold(0, |acc, x| acc + x.len())
385 }
386
387 fn __repr__(&self) -> String {
388 self.to_string()
389 }
390
391 #[classmethod]
392 fn from_arrow(_cls: &Bound<PyType>, input: AnyArray) -> PyArrowResult<Self> {
393 input.into_chunked_array()
394 }
395
396 #[classmethod]
397 #[pyo3(name = "from_arrow_pycapsule")]
398 fn from_arrow_pycapsule_py(_cls: &Bound<PyType>, capsule: &Bound<PyCapsule>) -> PyResult<Self> {
399 Self::from_arrow_pycapsule(capsule)
400 }
401
402 fn cast(&self, target_type: PyField) -> PyArrowResult<Arro3ChunkedArray> {
403 let new_field = target_type.into_inner();
404 let new_chunks = self
405 .chunks
406 .iter()
407 .map(|chunk| cast(&chunk, new_field.data_type()))
408 .collect::<Result<Vec<_>, ArrowError>>()?;
409 Ok(PyChunkedArray::try_new(new_chunks, new_field)?.into())
410 }
411
412 fn chunk(&self, i: usize) -> PyResult<Arro3Array> {
413 let field = self.field().clone();
414 let array = self
415 .chunks
416 .get(i)
417 .ok_or(PyValueError::new_err("out of index"))?
418 .clone();
419 Ok(PyArray::new(array, field).into())
420 }
421
422 #[getter]
423 #[pyo3(name = "chunks")]
424 fn chunks_py(&self) -> Vec<Arro3Array> {
425 let field = self.field().clone();
426 self.chunks
427 .iter()
428 .map(|array| PyArray::new(array.clone(), field.clone()).into())
429 .collect()
430 }
431
432 fn combine_chunks(&self) -> PyArrowResult<Arro3Array> {
433 let field = self.field().clone();
434 let arrays: Vec<&dyn Array> = self.chunks.iter().map(|arr| arr.as_ref()).collect();
435 Ok(PyArray::new(concat(&arrays)?, field).into())
436 }
437
438 fn equals(&self, other: PyChunkedArray) -> bool {
439 self.field == other.field && self.chunks == other.chunks
440 }
441
442 #[getter]
443 #[pyo3(name = "field")]
444 fn py_field(&self) -> Arro3Field {
445 PyField::new(self.field.clone()).into()
446 }
447
448 fn length(&self) -> usize {
449 self.len()
450 }
451
452 #[getter]
453 fn nbytes(&self) -> usize {
454 self.chunks
455 .iter()
456 .fold(0, |acc, batch| acc + batch.get_array_memory_size())
457 }
458
459 #[getter]
460 fn null_count(&self) -> usize {
461 self.chunks
462 .iter()
463 .fold(0, |acc, arr| acc + arr.null_count())
464 }
465
466 #[getter]
467 fn num_chunks(&self) -> usize {
468 self.chunks.len()
469 }
470
471 #[pyo3(signature = (*, max_chunksize=None))]
472 #[pyo3(name = "rechunk")]
473 fn rechunk_py(&self, max_chunksize: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
474 let max_chunksize = max_chunksize.unwrap_or(self.len());
475 let mut chunk_lengths = vec![];
476 let mut offset = 0;
477 while offset < self.len() {
478 let chunk_length = max_chunksize.min(self.len() - offset);
479 offset += chunk_length;
480 chunk_lengths.push(chunk_length);
481 }
482 Ok(self.rechunk(chunk_lengths)?.into())
483 }
484
485 #[pyo3(signature = (offset=0, length=None))]
486 #[pyo3(name = "slice")]
487 fn slice_py(&self, offset: usize, length: Option<usize>) -> PyArrowResult<Arro3ChunkedArray> {
488 let length = length.unwrap_or_else(|| self.len() - offset);
489 Ok(self.slice(offset, length)?.into())
490 }
491
492 fn to_numpy<'py>(&'py self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
493 self.__array__(py, None, None)
494 }
495
496 fn to_pylist(&self, py: Python) -> PyResult<Vec<Py<PyAny>>> {
497 let mut scalars = Vec::with_capacity(self.len());
498 for chunk in &self.chunks {
499 for i in 0..chunk.len() {
500 let scalar =
501 unsafe { PyScalar::new_unchecked(chunk.slice(i, 1), self.field.clone()) };
502 scalars.push(scalar.as_py(py)?);
503 }
504 }
505 Ok(scalars)
506 }
507
508 #[getter]
509 fn r#type(&self) -> Arro3DataType {
510 PyDataType::new(self.field.data_type().clone()).into()
511 }
512}