vortex_array/stream/
ext.rs1use std::future::Future;
5
6use futures::TryStreamExt;
7use vortex_error::VortexResult;
8
9use crate::ArrayRef;
10use crate::arrays::ChunkedArray;
11use crate::stream::ArrayStream;
12use crate::stream::SendableArrayStream;
13
14pub trait ArrayStreamExt: ArrayStream {
15 fn boxed(self) -> SendableArrayStream
17 where
18 Self: Sized + Send + 'static,
19 {
20 Box::pin(self)
21 }
22
23 fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
27 where
28 Self: Sized,
29 {
30 async move {
31 let dtype = self.dtype().clone();
32 let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
33 if chunks.len() == 1 {
34 Ok(chunks.remove(0))
35 } else {
36 Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
37 }
38 }
39 }
40}
41
42impl<S: ArrayStream> ArrayStreamExt for S {}