vortex_array/
iter.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Iterator over slices of an array, and related utilities.
5
6use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect, VortexResult};
11
12use crate::arrays::{ChunkedArray, ChunkedVTable};
13use crate::stream::{ArrayStream, ArrayStreamAdapter};
14use crate::{Array, ArrayRef, IntoArray};
15
16/// Iterator of array with a known [`DType`].
17///
18/// It's up to implementations to guarantee all arrays have the same [`DType`].
19pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
20    fn dtype(&self) -> &DType;
21}
22
23impl ArrayIterator for Box<dyn ArrayIterator + Send> {
24    fn dtype(&self) -> &DType {
25        self.as_ref().dtype()
26    }
27}
28
29pub struct ArrayIteratorAdapter<I> {
30    dtype: DType,
31    inner: I,
32}
33
34impl<I> ArrayIteratorAdapter<I> {
35    pub fn new(dtype: DType, inner: I) -> Self {
36        Self { dtype, inner }
37    }
38}
39
40impl<I> Iterator for ArrayIteratorAdapter<I>
41where
42    I: Iterator<Item = VortexResult<ArrayRef>>,
43{
44    type Item = VortexResult<ArrayRef>;
45
46    fn next(&mut self) -> Option<Self::Item> {
47        self.inner.next()
48    }
49}
50
51impl<I> ArrayIterator for ArrayIteratorAdapter<I>
52where
53    I: Iterator<Item = VortexResult<ArrayRef>>,
54{
55    fn dtype(&self) -> &DType {
56        &self.dtype
57    }
58}
59
60pub trait ArrayIteratorExt: ArrayIterator {
61    fn into_array_stream(self) -> impl ArrayStream
62    where
63        Self: Sized,
64    {
65        ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
66    }
67
68    /// Collect the iterator into a single `Array`.
69    ///
70    /// If the iterator yields multiple chunks, they will be returned as a [`ChunkedArray`].
71    fn read_all(self) -> VortexResult<ArrayRef>
72    where
73        Self: Sized,
74    {
75        let dtype = self.dtype().clone();
76        let mut chunks: Vec<ArrayRef> = self.try_collect()?;
77        if chunks.len() == 1 {
78            Ok(chunks.remove(0))
79        } else {
80            Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
81        }
82    }
83}
84
85impl<I: ArrayIterator> ArrayIteratorExt for I {}
86
87impl dyn Array + '_ {
88    /// Create an [`ArrayIterator`] over the array.
89    pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
90        let dtype = self.dtype().clone();
91        let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
92            ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
93        } else {
94            ArrayChunkIterator::Single(Some(self.to_array()))
95        };
96        ArrayIteratorAdapter::new(dtype, iter)
97    }
98}
99
100/// We define a single iterator that can handle both chunked and non-chunked arrays.
101/// This avoids the need to create boxed static iterators for the two chunked and non-chunked cases.
102enum ArrayChunkIterator {
103    Single(Option<ArrayRef>),
104    Chunked(Arc<ChunkedArray>, usize),
105}
106
107impl Iterator for ArrayChunkIterator {
108    type Item = VortexResult<ArrayRef>;
109
110    fn next(&mut self) -> Option<Self::Item> {
111        match self {
112            ArrayChunkIterator::Single(array) => array.take().map(Ok),
113            ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
114                let chunk = chunked.chunk(*idx).vortex_expect("not out of bounds");
115                *idx += 1;
116                Ok(chunk.clone())
117            }),
118        }
119    }
120}