vortex_array/stream/
ext.rs

1use std::future::Future;
2
3use futures_util::TryStreamExt;
4use vortex_error::VortexResult;
5
6use crate::ArrayRef;
7use crate::arrays::ChunkedArray;
8use crate::stream::{ArrayStream, SendableArrayStream};
9
10pub trait ArrayStreamExt: ArrayStream {
11    /// Box the [`ArrayStream`] so that it can be sent between threads.
12    fn boxed(self) -> SendableArrayStream
13    where
14        Self: Sized + Send + 'static,
15    {
16        Box::pin(self)
17    }
18
19    /// Collect the stream into a single `Array`.
20    ///
21    /// If the stream yields multiple chunks, they will be returned as a [`ChunkedArray`].
22    fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
23    where
24        Self: Sized,
25    {
26        async move {
27            let dtype = self.dtype().clone();
28            let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
29            if chunks.len() == 1 {
30                Ok(chunks.remove(0))
31            } else {
32                Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
33            }
34        }
35    }
36}
37
38impl<S: ArrayStream> ArrayStreamExt for S {}