vortex_array/stream/
ext.rs

1use std::future::Future;
2
3use futures_util::TryStreamExt;
4use vortex_error::VortexResult;
5
6use crate::arrays::ChunkedArray;
7use crate::stream::take_rows::TakeRows;
8use crate::stream::{ArrayStream, ArrayStreamAdapter, SendableArrayStream};
9use crate::{Array, ArrayRef};
10
11pub trait ArrayStreamExt: ArrayStream {
12    /// Box the [`ArrayStream`] so that it can be sent between threads.
13    fn boxed(self) -> SendableArrayStream
14    where
15        Self: Sized + Send + 'static,
16    {
17        Box::pin(self)
18    }
19
20    /// Collect the stream into a single `Array`.
21    ///
22    /// If the stream yields multiple chunks, they will be returned as a [`ChunkedArray`].
23    fn into_array(self) -> impl Future<Output = VortexResult<ArrayRef>>
24    where
25        Self: Sized,
26    {
27        async move {
28            let dtype = self.dtype().clone();
29            let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
30            if chunks.len() == 1 {
31                Ok(chunks.remove(0))
32            } else {
33                Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
34            }
35        }
36    }
37
38    /// Perform a row-wise selection on the stream from an array of sorted indicessss.
39    fn take_rows(self, indices: ArrayRef) -> VortexResult<impl ArrayStream>
40    where
41        Self: Sized,
42    {
43        Ok(ArrayStreamAdapter::new(
44            self.dtype().clone(),
45            TakeRows::try_new(self, indices)?,
46        ))
47    }
48}
49
50impl<S: ArrayStream> ArrayStreamExt for S {}