1use std::sync::Arc;
4
5use itertools::Itertools;
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::arrays::{ChunkedArray, ChunkedVTable};
10use crate::stream::{ArrayStream, ArrayStreamAdapter};
11use crate::{Array, ArrayRef, IntoArray};
12
13pub trait ArrayIterator: Iterator<Item = VortexResult<ArrayRef>> {
17 fn dtype(&self) -> &DType;
18}
19
20impl ArrayIterator for Box<dyn ArrayIterator + Send> {
21 fn dtype(&self) -> &DType {
22 self.as_ref().dtype()
23 }
24}
25
26pub struct ArrayIteratorAdapter<I> {
27 dtype: DType,
28 inner: I,
29}
30
31impl<I> ArrayIteratorAdapter<I> {
32 pub fn new(dtype: DType, inner: I) -> Self {
33 Self { dtype, inner }
34 }
35}
36
37impl<I> Iterator for ArrayIteratorAdapter<I>
38where
39 I: Iterator<Item = VortexResult<ArrayRef>>,
40{
41 type Item = VortexResult<ArrayRef>;
42
43 fn next(&mut self) -> Option<Self::Item> {
44 self.inner.next()
45 }
46}
47
48impl<I> ArrayIterator for ArrayIteratorAdapter<I>
49where
50 I: Iterator<Item = VortexResult<ArrayRef>>,
51{
52 fn dtype(&self) -> &DType {
53 &self.dtype
54 }
55}
56
57pub trait ArrayIteratorExt: ArrayIterator {
58 fn into_array_stream(self) -> impl ArrayStream
59 where
60 Self: Sized,
61 {
62 ArrayStreamAdapter::new(self.dtype().clone(), futures_util::stream::iter(self))
63 }
64
65 fn read_all(self) -> VortexResult<ArrayRef>
69 where
70 Self: Sized,
71 {
72 let dtype = self.dtype().clone();
73 let mut chunks: Vec<ArrayRef> = self.try_collect()?;
74 if chunks.len() == 1 {
75 Ok(chunks.remove(0))
76 } else {
77 Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
78 }
79 }
80}
81
82impl<I: ArrayIterator> ArrayIteratorExt for I {}
83
84impl dyn Array + '_ {
85 pub fn to_array_iterator(&self) -> impl ArrayIterator + 'static {
87 let dtype = self.dtype().clone();
88 let iter = if let Some(chunked) = self.as_opt::<ChunkedVTable>() {
89 ArrayChunkIterator::Chunked(Arc::new(chunked.clone()), 0)
90 } else {
91 ArrayChunkIterator::Single(Some(self.to_array()))
92 };
93 ArrayIteratorAdapter::new(dtype, iter)
94 }
95}
96
97enum ArrayChunkIterator {
100 Single(Option<ArrayRef>),
101 Chunked(Arc<ChunkedArray>, usize),
102}
103
104impl Iterator for ArrayChunkIterator {
105 type Item = VortexResult<ArrayRef>;
106
107 fn next(&mut self) -> Option<Self::Item> {
108 match self {
109 ArrayChunkIterator::Single(array) => array.take().map(Ok),
110 ArrayChunkIterator::Chunked(chunked, idx) => (*idx < chunked.nchunks()).then(|| {
111 let chunk = chunked.chunk(*idx).vortex_expect("not out of bounds");
112 *idx += 1;
113 Ok(chunk.clone())
114 }),
115 }
116 }
117}