vortex_array/stream/
ext.rs1use std::future::Future;
5
6use futures::TryStreamExt;
7use vortex_error::VortexResult;
8
9use crate::ArrayRef;
10use crate::IntoArray;
11use crate::arrays::ChunkedArray;
12use crate::stream::ArrayStream;
13use crate::stream::SendableArrayStream;
14
15pub trait ArrayStreamExt: ArrayStream {
16 fn boxed(self) -> SendableArrayStream
18 where
19 Self: Sized + Send + 'static,
20 {
21 Box::pin(self)
22 }
23
24 fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
28 where
29 Self: Sized,
30 {
31 async move {
32 let dtype = self.dtype().clone();
33 let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
34 if chunks.len() == 1 {
35 Ok(chunks.remove(0))
36 } else {
37 Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
38 }
39 }
40 }
41}
42
43impl<S: ArrayStream> ArrayStreamExt for S {}