1use std::sync::Arc;
7
8use itertools::Itertools;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::arrays::{ChunkedArray, ChunkedVTable};
13use crate::stream::{ArrayStream, ArrayStreamAdapter};
14use crate::{Array, ArrayRef, IntoArray};
15
16pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
20 fn dtype(&self) -> &DType;
21}
22
23impl ArrayIterator for Box<dyn ArrayIterator + Send> {
24 #[inline]
25 fn dtype(&self) -> &DType {
26 self.as_ref().dtype()
27 }
28}
29
30pub struct ArrayIteratorAdapter<I> {
31 dtype: DType,
32 inner: I,
33}
34
35impl<I> ArrayIteratorAdapter<I> {
36 #[inline]
37 pub fn new(dtype: DType, inner: I) -> Self {
38 Self { dtype, inner }
39 }
40}
41
42impl<I> Iterator for ArrayIteratorAdapter<I>
43where
44 I: Iterator<Item = VortexResult<ArrayRef>>,
45{
46 type Item = VortexResult<ArrayRef>;
47
48 #[inline]
49 fn next(&mut self) -> Option<Self::Item> {
50 self.inner.next()
51 }
52}
53
54impl<I> ArrayIterator for ArrayIteratorAdapter<I>
55where
56 I: Iterator<Item = VortexResult<ArrayRef>>,
57{
58 #[inline]
59 fn dtype(&self) -> &DType {
60 &self.dtype
61 }
62}
63
64pub trait ArrayIteratorExt: ArrayIterator {
65 fn into_array_stream(self) -> impl ArrayStream
66 where
67 Self: Sized,
68 {
69 ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
70 }
71
72 fn read_all(self) -> VortexResult<ArrayRef>
76 where
77 Self: Sized,
78 {
79 let dtype = self.dtype().clone();
80 let mut chunks: Vec<ArrayRef> = self.try_collect()?;
81 if chunks.len() == 1 {
82 Ok(chunks.remove(0))
83 } else {
84 Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
85 }
86 }
87}
88
89impl<I: ArrayIterator> ArrayIteratorExt for I {}
90
91impl dyn Array + '_ {
92 pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
94 let dtype = self.dtype().clone();
95 let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
96 ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
97 } else {
98 ArrayChunkIterator::Single(Some(self.to_array()))
99 };
100 ArrayIteratorAdapter::new(dtype, iter)
101 }
102}
103
104enum ArrayChunkIterator {
107 Single(Option<ArrayRef>),
108 Chunked(Arc<ChunkedArray>, usize),
109}
110
111impl Iterator for ArrayChunkIterator {
112 type Item = VortexResult<ArrayRef>;
113
114 #[inline]
115 fn next(&mut self) -> Option<Self::Item> {
116 match self {
117 ArrayChunkIterator::Single(array) => array.take().map(Ok),
118 ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
119 let chunk = chunked.chunk(*idx);
120 *idx += 1;
121 Ok(chunk.clone())
122 }),
123 }
124 }
125}