use std::future::Future;
use futures::TryStreamExt;
use vortex_error::VortexResult;
use crate::ArrayRef;
use crate::IntoArray;
use crate::arrays::ChunkedArray;
use crate::stream::ArrayStream;
use crate::stream::SendableArrayStream;
pub trait ArrayStreamExt: ArrayStream {
fn boxed(self) -> SendableArrayStream
where
Self: Sized + Send + 'static,
{
Box::pin(self)
}
fn read_all(self) -> impl Future<Output = VortexResult<ArrayRef>>
where
Self: Sized,
{
async move {
let dtype = self.dtype().clone();
let mut chunks: Vec<ArrayRef> = self.try_collect().await?;
if chunks.len() == 1 {
Ok(chunks.remove(0))
} else {
Ok(ChunkedArray::try_new(chunks, dtype)?.into_array())
}
}
}
}
impl<S: ArrayStream> ArrayStreamExt for S {}