vortex_array/
iter.rs

1//! Iterator over slices of an array, and related utilities.
2
3use std::sync::Arc;
4
5use itertools::Itertools;
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::arrays::{ChunkedArray, ChunkedVTable};
10use crate::stream::{ArrayStream, ArrayStreamAdapter};
11use crate::{Array, ArrayRef, IntoArray};
12
13/// Iterator of array with a known [`DType`].
14///
15/// It's up to implementations to guarantee all arrays have the same [`DType`].
16pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
17    fn dtype(&self) -> &DType;
18}
19
20impl ArrayIterator for Box<dyn ArrayIterator + Send> {
21    fn dtype(&self) -> &DType {
22        self.as_ref().dtype()
23    }
24}
25
26pub struct ArrayIteratorAdapter<I> {
27    dtype: DType,
28    inner: I,
29}
30
31impl<I> ArrayIteratorAdapter<I> {
32    pub fn new(dtype: DType, inner: I) -> Self {
33        Self { dtype, inner }
34    }
35}
36
37impl<I> Iterator for ArrayIteratorAdapter<I>
38where
39    I: Iterator<Item = VortexResult<ArrayRef>>,
40{
41    type Item = VortexResult<ArrayRef>;
42
43    fn next(&mut self) -> Option<Self::Item> {
44        self.inner.next()
45    }
46}
47
48impl<I> ArrayIterator for ArrayIteratorAdapter<I>
49where
50    I: Iterator<Item = VortexResult<ArrayRef>>,
51{
52    fn dtype(&self) -> &DType {
53        &self.dtype
54    }
55}
56
57pub trait ArrayIteratorExt: ArrayIterator {
58    fn into_array_stream(self) -> impl ArrayStream
59    where
60        Self: Sized,
61    {
62        ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
63    }
64
65    /// Collect the iterator into a single `Array`.
66    ///
67    /// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
68    fn read_all(self) -> VortexResult<ArrayRef>
69    where
70        Self: Sized,
71    {
72        let dtype = self.dtype().clone();
73        let mut chunks: Vec<ArrayRef> = self.try_collect()?;
74        if chunks.len() == 1 {
75            Ok(chunks.remove(0))
76        } else {
77            Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
78        }
79    }
80}
81
82impl<I: ArrayIterator> ArrayIteratorExt for I {}
83
84impl dyn Array + '_ {
85    /// Create an [`ArrayIterator`] over the array.
86    pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
87        let dtype = self.dtype().clone();
88        let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
89            ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
90        } else {
91            ArrayChunkIterator::Single(Some(self.to_array()))
92        };
93        ArrayIteratorAdapter::new(dtype, iter)
94    }
95}
96
97/// We define a single iterator that can handle both chunked and non-chunked arrays.
98/// This avoids the need to create boxed static iterators for the two chunked and non-chunked cases.
99enum ArrayChunkIterator {
100    Single(Option<ArrayRef>),
101    Chunked(Arc<ChunkedArray>, usize),
102}
103
104impl Iterator for ArrayChunkIterator {
105    type Item = VortexResult<ArrayRef>;
106
107    fn next(&mut self) -> Option<Self::Item> {
108        match self {
109            ArrayChunkIterator::Single(array) => array.take().map(Ok),
110            ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
111                let chunk = chunked.chunk(*idx).vortex_expect("not out of bounds");
112                *idx += 1;
113                Ok(chunk.clone())
114            }),
115        }
116    }
117}