vortex_array/
iter.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterator over slices of an array, and related utilities.
5
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::Array;
13use crate::ArrayRef;
14use crate::IntoArray;
15use crate::arrays::ChunkedArray;
16use crate::arrays::ChunkedVTable;
17use crate::stream::ArrayStream;
18use crate::stream::ArrayStreamAdapter;
19
20/// Iterator of array with a known [`DType`].
21///
22/// It's up to implementations to guarantee all arrays have the same [`DType`].
23pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
24    fn dtype(&self) -> &DType;
25}
26
27impl ArrayIterator for Box<dyn ArrayIterator + Send> {
28    #[inline]
29    fn dtype(&self) -> &DType {
30        self.as_ref().dtype()
31    }
32}
33
34pub struct ArrayIteratorAdapter<I> {
35    dtype: DType,
36    inner: I,
37}
38
39impl<I> ArrayIteratorAdapter<I> {
40    #[inline]
41    pub fn new(dtype: DType, inner: I) -> Self {
42        Self { dtype, inner }
43    }
44}
45
46impl<I> Iterator for ArrayIteratorAdapter<I>
47where
48    I: Iterator<Item = VortexResult<ArrayRef>>,
49{
50    type Item = VortexResult<ArrayRef>;
51
52    #[inline]
53    fn next(&mut self) -> Option<Self::Item> {
54        self.inner.next()
55    }
56}
57
58impl<I> ArrayIterator for ArrayIteratorAdapter<I>
59where
60    I: Iterator<Item = VortexResult<ArrayRef>>,
61{
62    #[inline]
63    fn dtype(&self) -> &DType {
64        &self.dtype
65    }
66}
67
68pub trait ArrayIteratorExt: ArrayIterator {
69    fn into_array_stream(self) -> impl ArrayStream
70    where
71        Self: Sized,
72    {
73        ArrayStreamAdapter::new(self.dtype().clone(), futures::stream::iter(self))
74    }
75
76    /// Collect the iterator into a single `Array`.
77    ///
78    /// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
79    fn read_all(self) -> VortexResult<ArrayRef>
80    where
81        Self: Sized,
82    {
83        let dtype = self.dtype().clone();
84        let mut chunks: Vec<ArrayRef> = self.try_collect()?;
85        if chunks.len() == 1 {
86            Ok(chunks.remove(0))
87        } else {
88            Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
89        }
90    }
91}
92
93impl<I: ArrayIterator> ArrayIteratorExt for I {}
94
95impl dyn Array + '_ {
96    /// Create an [`ArrayIterator`] over the array.
97    pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
98        let dtype = self.dtype().clone();
99        let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
100            ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
101        } else {
102            ArrayChunkIterator::Single(Some(self.to_array()))
103        };
104        ArrayIteratorAdapter::new(dtype, iter)
105    }
106}
107
108/// We define a single iterator that can handle both chunked and non-chunked arrays.
109/// This avoids the need to create boxed static iterators for the two chunked and non-chunked cases.
110enum ArrayChunkIterator {
111    Single(Option<ArrayRef>),
112    Chunked(Arc<ChunkedArray>, usize),
113}
114
115impl Iterator for ArrayChunkIterator {
116    type Item = VortexResult<ArrayRef>;
117
118    #[inline]
119    fn next(&mut self) -> Option<Self::Item> {
120        match self {
121            ArrayChunkIterator::Single(array) => array.take().map(Ok),
122            ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
123                let chunk = chunked.chunk(*idx);
124                *idx += 1;
125                Ok(chunk.clone())
126            }),
127        }
128    }
129}