vortex_array/stream/
ext.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::Future;
5
6use futures_util::TryStreamExt;
7use vortex_error::VortexResult;
8
9use crate::ArrayRef;
10use crate::arrays::ChunkedArray;
11use crate::stream::{ArrayStream, SendableArrayStream};
12
13pub trait ArrayStreamExt: ArrayStream {
14    /// Box the [`ArrayStream`] so that it can be sent between threads.
15    fn boxed(self) -> SendableArrayStream
16    where
17        Self: Sized + Send + 'static,
18    {
19        Box::pin(self)
20    }
21
22    /// Collect the stream into a single `Array`.
23    ///
24    /// If the stream yields multiple chunks, they will be returned as a [`ChunkedArray`].
25    fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
26    where
27        Self: Sized,
28    {
29        async move {
30            let dtype = self.dtype().clone();
31            let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
32            if chunks.len() == 1 {
33                Ok(chunks.remove(0))
34            } else {
35                Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
36            }
37        }
38    }
39}
40
41impl<S: ArrayStream> ArrayStreamExt for S {}