1use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect, VortexResult};
11
12use crate::arrays::{ChunkedArray, ChunkedVTable};
13use crate::stream::{ArrayStream, ArrayStreamAdapter};
14use crate::{Array, ArrayRef, IntoArray};
15
16pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
20 fn dtype(&self) -> &DType;
21}
22
23impl ArrayIterator for Box<dyn ArrayIterator + Send> {
24 fn dtype(&self) -> &DType {
25 self.as_ref().dtype()
26 }
27}
28
29pub struct ArrayIteratorAdapter<I> {
30 dtype: DType,
31 inner: I,
32}
33
34impl<I> ArrayIteratorAdapter<I> {
35 pub fn new(dtype: DType, inner: I) -> Self {
36 Self { dtype, inner }
37 }
38}
39
40impl<I> Iterator for ArrayIteratorAdapter<I>
41where
42 I: Iterator<Item = VortexResult<ArrayRef>>,
43{
44 type Item = VortexResult<ArrayRef>;
45
46 fn next(&mut self) -> Option<Self::Item> {
47 self.inner.next()
48 }
49}
50
51impl<I> ArrayIterator for ArrayIteratorAdapter<I>
52where
53 I: Iterator<Item = VortexResult<ArrayRef>>,
54{
55 fn dtype(&self) -> &DType {
56 &self.dtype
57 }
58}
59
60pub trait ArrayIteratorExt: ArrayIterator {
61 fn into_array_stream(self) -> impl ArrayStream
62 where
63 Self: Sized,
64 {
65 ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
66 }
67
68 fn read_all(self) -> VortexResult<ArrayRef>
72 where
73 Self: Sized,
74 {
75 let dtype = self.dtype().clone();
76 let mut chunks: Vec<ArrayRef> = self.try_collect()?;
77 if chunks.len() == 1 {
78 Ok(chunks.remove(0))
79 } else {
80 Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
81 }
82 }
83}
84
85impl<I: ArrayIterator> ArrayIteratorExt for I {}
86
87impl dyn Array + '_ {
88 pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
90 let dtype = self.dtype().clone();
91 let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
92 ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
93 } else {
94 ArrayChunkIterator::Single(Some(self.to_array()))
95 };
96 ArrayIteratorAdapter::new(dtype, iter)
97 }
98}
99
100enum ArrayChunkIterator {
103 Single(Option<ArrayRef>),
104 Chunked(Arc<ChunkedArray>, usize),
105}
106
107impl Iterator for ArrayChunkIterator {
108 type Item = VortexResult<ArrayRef>;
109
110 fn next(&mut self) -> Option<Self::Item> {
111 match self {
112 ArrayChunkIterator::Single(array) => array.take().map(Ok),
113 ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
114 let chunk = chunked.chunk(*idx).vortex_expect("not out of bounds");
115 *idx += 1;
116 Ok(chunk.clone())
117 }),
118 }
119 }
120}