vortex_array/
iter.rs

1//! Iterator over slices of an array, and related utilities.
2
3use std::sync::Arc;
4
5use itertools::Itertools;
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::arrays::ChunkedArray;
10use crate::stream::{ArrayStream, ArrayStreamAdapter};
11use crate::{Array, ArrayExt, ArrayRef};
12
13/// Iterator of array with a known [`DType`].
14///
15/// It's up to implementations to guarantee all arrays have the same [`DType`].
16pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
17    fn dtype(&self) -> &DType;
18}
19
20pub struct ArrayIteratorAdapter<I> {
21    dtype: DType,
22    inner: I,
23}
24
25impl<I> ArrayIteratorAdapter<I> {
26    pub fn new(dtype: DType, inner: I) -> Self {
27        Self { dtype, inner }
28    }
29}
30
31impl<I> Iterator for ArrayIteratorAdapter<I>
32where
33    I: Iterator<Item = VortexResult<ArrayRef>>,
34{
35    type Item = VortexResult<ArrayRef>;
36
37    fn next(&mut self) -> Option<Self::Item> {
38        self.inner.next()
39    }
40}
41
42impl<I> ArrayIterator for ArrayIteratorAdapter<I>
43where
44    I: Iterator<Item = VortexResult<ArrayRef>>,
45{
46    fn dtype(&self) -> &DType {
47        &self.dtype
48    }
49}
50
51pub trait ArrayIteratorExt: ArrayIterator {
52    fn into_stream(self) -> impl ArrayStream
53    where
54        Self: Sized,
55    {
56        ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
57    }
58
59    /// Collect the iterator into a single `Array`.
60    ///
61    /// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
62    fn into_array(self) -> VortexResult<ArrayRef>
63    where
64        Self: Sized,
65    {
66        let dtype = self.dtype().clone();
67        let mut chunks: Vec<ArrayRef> = self.try_collect()?;
68        if chunks.len() == 1 {
69            Ok(chunks.remove(0))
70        } else {
71            Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
72        }
73    }
74}
75
76impl<I: ArrayIterator> ArrayIteratorExt for I {}
77
78pub trait ArrayIteratorArrayExt: Array {
79    /// Create an [`ArrayIterator`] over the array.
80    fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
81        let dtype = self.dtype().clone();
82        let iter = if let Some(chunked) = self.as_opt::<ChunkedArray>() {
83            ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
84        } else {
85            ArrayChunkIterator::Single(Some(self.to_array()))
86        };
87        ArrayIteratorAdapter::new(dtype, iter)
88    }
89}
90
91impl<A: ?Sized + Array> ArrayIteratorArrayExt for A {}
92
93/// We define a single iterator that can handle both chunked and non-chunked arrays.
94/// This avoids the need to create boxed static iterators for the two chunked and non-chunked cases.
95enum ArrayChunkIterator {
96    Single(Option<ArrayRef>),
97    Chunked(Arc<ChunkedArray>, usize),
98}
99
100impl Iterator for ArrayChunkIterator {
101    type Item = VortexResult<ArrayRef>;
102
103    fn next(&mut self) -> Option<Self::Item> {
104        match self {
105            ArrayChunkIterator::Single(array) => array.take().map(Ok),
106            ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
107                let chunk = chunked.chunk(*idx).vortex_expect("not out of bounds");
108                *idx += 1;
109                Ok(chunk.clone())
110            }),
111        }
112    }
113}