Skip to main content

vortex_array/stream/
ext.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::Future;
5
6use futures::TryStreamExt;
7use vortex_error::VortexResult;
8
9use crate::ArrayRef;
10use crate::IntoArray;
11use crate::arrays::ChunkedArray;
12use crate::stream::ArrayStream;
13use crate::stream::SendableArrayStream;
14
15pub trait ArrayStreamExt: ArrayStream {
16    /// Box the [`ArrayStream`] so that it can be sent between threads.
17    fn boxed(self) -> SendableArrayStream
18    where
19        Self: Sized + Send + 'static,
20    {
21        Box::pin(self)
22    }
23
24    /// Collect the stream into a single `Array`.
25    ///
26    /// If the stream yields multiple chunks, they will be returned as a [`ChunkedArray`].
27    fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
28    where
29        Self: Sized,
30    {
31        async move {
32            let dtype = self.dtype().clone();
33            let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
34            if chunks.len() == 1 {
35                Ok(chunks.remove(0))
36            } else {
37                Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
38            }
39        }
40    }
41}
42
43impl<S: ArrayStream> ArrayStreamExt for S {}