vortex_array/stream/
ext.rs1use std::future::Future;
2
3use futures_util::TryStreamExt;
4use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::arrays::ChunkedArray;
8use crate::stream::{ArrayStream, SendableArrayStream};
9
10pub trait ArrayStreamExt: ArrayStream {
11 fn boxed(self) -> SendableArrayStream
13 where
14 Self: Sized + Send + 'static,
15 {
16 Box::pin(self)
17 }
18
19 fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
23 where
24 Self: Sized,
25 {
26 async move {
27 let dtype = self.dtype().clone();
28 let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
29 if chunks.len() == 1 {
30 Ok(chunks.remove(0))
31 } else {
32 Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
33 }
34 }
35 }
36}
37
38impl<S: ArrayStream> ArrayStreamExt for S {}