vortex_array/
iter.rs

1//! Iterator over slices of an array, and related utilities.
2
3use std::sync::Arc;
4
5use itertools::Itertools;
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::arrays::ChunkedArray;
10use crate::stream::{ArrayStream, ArrayStreamAdapter};
11use crate::{Array, ArrayExt, ArrayRef};
12
13/// Iterator of array with a known [`DType`].
14///
15/// It's up to implementations to guarantee all arrays have the same [`DType`].
16pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
17    fn dtype(&self) -> &DType;
18}
19
20impl ArrayIterator for Box<dyn ArrayIterator + Send> {
21    fn dtype(&self) -> &DType {
22        self.as_ref().dtype()
23    }
24}
25
26pub struct ArrayIteratorAdapter<I> {
27    dtype: DType,
28    inner: I,
29}
30
31impl<I> ArrayIteratorAdapter<I> {
32    pub fn new(dtype: DType, inner: I) -> Self {
33        Self { dtype, inner }
34    }
35}
36
37impl<I> Iterator for ArrayIteratorAdapter<I>
38where
39    I: Iterator<Item = VortexResult<ArrayRef>>,
40{
41    type Item = VortexResult<ArrayRef>;
42
43    fn next(&mut self) -> Option<Self::Item> {
44        self.inner.next()
45    }
46}
47
48impl<I> ArrayIterator for ArrayIteratorAdapter<I>
49where
50    I: Iterator<Item = VortexResult<ArrayRef>>,
51{
52    fn dtype(&self) -> &DType {
53        &self.dtype
54    }
55}
56
57pub trait ArrayIteratorExt: ArrayIterator {
58    fn into_array_stream(self) -> impl ArrayStream
59    where
60        Self: Sized,
61    {
62        ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
63    }
64
65    /// Collect the iterator into a single `Array`.
66    ///
67    /// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
68    fn read_all(self) -> VortexResult<ArrayRef>
69    where
70        Self: Sized,
71    {
72        let dtype = self.dtype().clone();
73        let mut chunks: Vec<ArrayRef> = self.try_collect()?;
74        if chunks.len() == 1 {
75            Ok(chunks.remove(0))
76        } else {
77            Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
78        }
79    }
80}
81
82impl<I: ArrayIterator> ArrayIteratorExt for I {}
83
84pub trait ArrayIteratorArrayExt: Array {
85    /// Create an [`ArrayIterator`] over the array.
86    fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
87        let dtype = self.dtype().clone();
88        let iter = if let Some(chunked) = self.as_opt::<ChunkedArray>() {
89            ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
90        } else {
91            ArrayChunkIterator::Single(Some(self.to_array()))
92        };
93        ArrayIteratorAdapter::new(dtype, iter)
94    }
95}
96
97impl<A: ?Sized + Array> ArrayIteratorArrayExt for A {}
98
99/// We define a single iterator that can handle both chunked and non-chunked arrays.
100/// This avoids the need to create boxed static iterators for the two chunked and non-chunked cases.
101enum ArrayChunkIterator {
102    Single(Option<ArrayRef>),
103    Chunked(Arc<ChunkedArray>, usize),
104}
105
106impl Iterator for ArrayChunkIterator {
107    type Item = VortexResult<ArrayRef>;
108
109    fn next(&mut self) -> Option<Self::Item> {
110        match self {
111            ArrayChunkIterator::Single(array) => array.take().map(Ok),
112            ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
113                let chunk = chunked.chunk(*idx).vortex_expect("not out of bounds");
114                *idx += 1;
115                Ok(chunk.clone())
116            }),
117        }
118    }
119}