vortex_array/
iter.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterator over slices of an array, and related utilities.
5
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::arrays::{ChunkedArray, ChunkedVTable};
13use crate::stream::{ArrayStream, ArrayStreamAdapter};
14use crate::{Array, ArrayRef, IntoArray};
15
16/// Iterator of array with a known [`DType`].
17///
18/// It's up to implementations to guarantee all arrays have the same [`DType`].
19pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
20    fn dtype(&self) -> &DType;
21}
22
23impl ArrayIterator for Box<dyn ArrayIterator + Send> {
24    #[inline]
25    fn dtype(&self) -> &DType {
26        self.as_ref().dtype()
27    }
28}
29
30pub struct ArrayIteratorAdapter<I> {
31    dtype: DType,
32    inner: I,
33}
34
35impl<I> ArrayIteratorAdapter<I> {
36    #[inline]
37    pub fn new(dtype: DType, inner: I) -> Self {
38        Self { dtype, inner }
39    }
40}
41
42impl<I> Iterator for ArrayIteratorAdapter<I>
43where
44    I: Iterator<Item = VortexResult<ArrayRef>>,
45{
46    type Item = VortexResult<ArrayRef>;
47
48    #[inline]
49    fn next(&mut self) -> Option<Self::Item> {
50        self.inner.next()
51    }
52}
53
54impl<I> ArrayIterator for ArrayIteratorAdapter<I>
55where
56    I: Iterator<Item = VortexResult<ArrayRef>>,
57{
58    #[inline]
59    fn dtype(&self) -> &DType {
60        &self.dtype
61    }
62}
63
64pub trait ArrayIteratorExt: ArrayIterator {
65    fn into_array_stream(self) -> impl ArrayStream
66    where
67        Self: Sized,
68    {
69        ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
70    }
71
72    /// Collect the iterator into a single `Array`.
73    ///
74    /// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
75    fn read_all(self) -> VortexResult<ArrayRef>
76    where
77        Self: Sized,
78    {
79        let dtype = self.dtype().clone();
80        let mut chunks: Vec<ArrayRef> = self.try_collect()?;
81        if chunks.len() == 1 {
82            Ok(chunks.remove(0))
83        } else {
84            Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
85        }
86    }
87}
88
89impl<I: ArrayIterator> ArrayIteratorExt for I {}
90
91impl dyn Array + '_ {
92    /// Create an [`ArrayIterator`] over the array.
93    pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
94        let dtype = self.dtype().clone();
95        let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
96            ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
97        } else {
98            ArrayChunkIterator::Single(Some(self.to_array()))
99        };
100        ArrayIteratorAdapter::new(dtype, iter)
101    }
102}
103
104/// We define a single iterator that can handle both chunked and non-chunked arrays.
105/// This avoids the need to create boxed static iterators for the two chunked and non-chunked cases.
106enum ArrayChunkIterator {
107    Single(Option<ArrayRef>),
108    Chunked(Arc<ChunkedArray>, usize),
109}
110
111impl Iterator for ArrayChunkIterator {
112    type Item = VortexResult<ArrayRef>;
113
114    #[inline]
115    fn next(&mut self) -> Option<Self::Item> {
116        match self {
117            ArrayChunkIterator::Single(array) => array.take().map(Ok),
118            ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
119                let chunk = chunked.chunk(*idx);
120                *idx += 1;
121                Ok(chunk.clone())
122            }),
123        }
124    }
125}