vortex_array/stream/
ext.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future::Future;
5
6use futures::TryStreamExt;
7use vortex_error::VortexResult;
8
9use crate::ArrayRef;
10use crate::arrays::ChunkedArray;
11use crate::stream::ArrayStream;
12use crate::stream::SendableArrayStream;
13
14pub trait ArrayStreamExt: ArrayStream {
15    /// Box the [`ArrayStream`] so that it can be sent between threads.
16    fn boxed(self) -> SendableArrayStream
17    where
18        Self: Sized + Send + 'static,
19    {
20        Box::pin(self)
21    }
22
23    /// Collect the stream into a single `Array`.
24    ///
25    /// If the stream yields multiple chunks, they will be returned as a [`ChunkedArray`].
26    fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
27    where
28        Self: Sized,
29    {
30        async move {
31            let dtype = self.dtype().clone();
32            let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
33            if chunks.len() == 1 {
34                Ok(chunks.remove(0))
35            } else {
36                Ok(ChunkedArray::try_new(chunks, dtype)?.to_array())
37            }
38        }
39    }
40}
41
42impl<S: ArrayStream> ArrayStreamExt for S {}