1use std::sync::Arc;
4
5use itertools::Itertools;
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::arrays::ChunkedArray;
10use crate::stream::{ArrayStream, ArrayStreamAdapter};
11use crate::{Array, ArrayExt, ArrayRef};
12
13pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
17 fn dtype(&self) -> &DType;
18}
19
20pub struct ArrayIteratorAdapter<I> {
21 dtype: DType,
22 inner: I,
23}
24
25impl<I> ArrayIteratorAdapter<I> {
26 pub fn new(dtype: DType, inner: I) -> Self {
27 Self { dtype, inner }
28 }
29}
30
31impl<I> Iterator for ArrayIteratorAdapter<I>
32where
33 I: Iterator<Item = VortexResult<ArrayRef>>,
34{
35 type Item = VortexResult<ArrayRef>;
36
37 fn next(&mut self) -> Option<Self::Item> {
38 self.inner.next()
39 }
40}
41
42impl<I> ArrayIterator for ArrayIteratorAdapter<I>
43where
44 I: Iterator<Item = VortexResult<ArrayRef>>,
45{
46 fn dtype(&self) -> &DType {
47 &self.dtype
48 }
49}
50
51pub trait ArrayIteratorExt: ArrayIterator {
52 fn into_stream(self) -> impl ArrayStream
53 where
54 Self: Sized,
55 {
56 ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
57 }
58
59 fn into_array(self) -> VortexResult<ArrayRef>
63 where
64 Self: Sized,
65 {
66 let dtype = self.dtype().clone();
67 let mut chunks: Vec<ArrayRef> = self.try_collect()?;
68 if chunks.len() == 1 {
69 Ok(chunks.remove(0))
70 } else {
71 Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
72 }
73 }
74}
75
76impl<I: ArrayIterator> ArrayIteratorExt for I {}
77
78pub trait ArrayIteratorArrayExt: Array {
79 fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
81 let dtype = self.dtype().clone();
82 let iter = if let Some(chunked) = self.as_opt::<ChunkedArray>() {
83 ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
84 } else {
85 ArrayChunkIterator::Single(Some(self.to_array()))
86 };
87 ArrayIteratorAdapter::new(dtype, iter)
88 }
89}
90
91impl<A: ?Sized + Array> ArrayIteratorArrayExt for A {}
92
93enum ArrayChunkIterator {
96 Single(Option<ArrayRef>),
97 Chunked(Arc<ChunkedArray>, usize),
98}
99
100impl Iterator for ArrayChunkIterator {
101 type Item = VortexResult<ArrayRef>;
102
103 fn next(&mut self) -> Option<Self::Item> {
104 match self {
105 ArrayChunkIterator::Single(array) => array.take().map(Ok),
106 ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
107 let chunk = chunked.chunk(*idx).vortex_expect("not out of bounds");
108 *idx += 1;
109 Ok(chunk.clone())
110 }),
111 }
112 }
113}