vortex_array/stream/
ext.rs1use std::future::Future;
5
6use futures_util::TryStreamExt;
7use vortex_error::VortexResult;
8
9use crate::ArrayRef;
10use crate::arrays::ChunkedArray;
11use crate::stream::{ArrayStream, SendableArrayStream};
12
13pub trait ArrayStreamExt: ArrayStream {
14 fn boxed(self) -> SendableArrayStream
16 where
17 Self: Sized + Send + 'static,
18 {
19 Box::pin(self)
20 }
21
22 fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
26 where
27 Self: Sized,
28 {
29 async move {
30 let dtype = self.dtype().clone();
31 let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
32 if chunks.len() == 1 {
33 Ok(chunks.remove(0))
34 } else {
35 Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
36 }
37 }
38 }
39}
40
41impl<S: ArrayStream> ArrayStreamExt for S {}