1use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::Array;
13use crate::ArrayRef;
14use crate::IntoArray;
15use crate::arrays::ChunkedArray;
16use crate::arrays::ChunkedVTable;
17use crate::stream::ArrayStream;
18use crate::stream::ArrayStreamAdapter;
19
20pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
24 fn dtype(&self) -> &DType;
25}
26
27impl ArrayIterator for Box<dyn ArrayIterator + Send> {
28 #[inline]
29 fn dtype(&self) -> &DType {
30 self.as_ref().dtype()
31 }
32}
33
34pub struct ArrayIteratorAdapter<I> {
35 dtype: DType,
36 inner: I,
37}
38
39impl<I> ArrayIteratorAdapter<I> {
40 #[inline]
41 pub fn new(dtype: DType, inner: I) -> Self {
42 Self { dtype, inner }
43 }
44}
45
46impl<I> Iterator for ArrayIteratorAdapter<I>
47where
48 I: Iterator<Item = VortexResult<ArrayRef>>,
49{
50 type Item = VortexResult<ArrayRef>;
51
52 #[inline]
53 fn next(&mut self) -> Option<Self::Item> {
54 self.inner.next()
55 }
56}
57
58impl<I> ArrayIterator for ArrayIteratorAdapter<I>
59where
60 I: Iterator<Item = VortexResult<ArrayRef>>,
61{
62 #[inline]
63 fn dtype(&self) -> &DType {
64 &self.dtype
65 }
66}
67
68pub trait ArrayIteratorExt: ArrayIterator {
69 fn into_array_stream(self) -> impl ArrayStream
70 where
71 Self: Sized,
72 {
73 ArrayStreamAdapter::new(self.dtype().clone(), futures::stream::iter(self))
74 }
75
76 fn read_all(self) -> VortexResult<ArrayRef>
80 where
81 Self: Sized,
82 {
83 let dtype = self.dtype().clone();
84 let mut chunks: Vec<ArrayRef> = self.try_collect()?;
85 if chunks.len() == 1 {
86 Ok(chunks.remove(0))
87 } else {
88 Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
89 }
90 }
91}
92
93impl<I: ArrayIterator> ArrayIteratorExt for I {}
94
95impl dyn Array + '_ {
96 pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
98 let dtype = self.dtype().clone();
99 let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
100 ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
101 } else {
102 ArrayChunkIterator::Single(Some(self.to_array()))
103 };
104 ArrayIteratorAdapter::new(dtype, iter)
105 }
106}
107
108enum ArrayChunkIterator {
111 Single(Option<ArrayRef>),
112 Chunked(Arc<ChunkedArray>, usize),
113}
114
115impl Iterator for ArrayChunkIterator {
116 type Item = VortexResult<ArrayRef>;
117
118 #[inline]
119 fn next(&mut self) -> Option<Self::Item> {
120 match self {
121 ArrayChunkIterator::Single(array) => array.take().map(Ok),
122 ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
123 let chunk = chunked.chunk(*idx);
124 *idx += 1;
125 Ok(chunk.clone())
126 }),
127 }
128 }
129}