vortex_array/stream/
ext.rs1use std::future::Future;
2
3use futures_util::TryStreamExt;
4use vortex_error::VortexResult;
5
6use crate::arrays::ChunkedArray;
7use crate::stream::take_rows::TakeRows;
8use crate::stream::{ArrayStream, ArrayStreamAdapter, SendableArrayStream};
9use crate::{Array, ArrayRef};
10
11pub trait ArrayStreamExt: ArrayStream {
12 fn boxed(self) -> SendableArrayStream
14 where
15 Self: Sized + Send + 'static,
16 {
17 Box::pin(self)
18 }
19
20 fn into_array(self) -> impl Future<Output = VortexResult<ArrayRef>>
24 where
25 Self: Sized,
26 {
27 async move {
28 let dtype = self.dtype().clone();
29 let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
30 if chunks.len() == 1 {
31 Ok(chunks.remove(0))
32 } else {
33 Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
34 }
35 }
36 }
37
38 fn take_rows(self, indices: ArrayRef) -> VortexResult<impl ArrayStream>
40 where
41 Self: Sized,
42 {
43 Ok(ArrayStreamAdapter::new(
44 self.dtype().clone(),
45 TakeRows::try_new(self, indices)?,
46 ))
47 }
48}
49
50impl<S: ArrayStream> ArrayStreamExt for S {}