vortex_serde/stream_reader/
mod.rs

1use std::ops::Deref;
2use std::sync::Arc;
3
4use futures_util::stream::try_unfold;
5use futures_util::Stream;
6use vortex_array::stream::ArrayStream;
7use vortex_array::Context;
8use vortex_buffer::Buffer;
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect as _, VortexResult};
11
12use crate::io::VortexRead;
13use crate::MessageReader;
14
15pub struct StreamArrayReader<R: VortexRead> {
16    msgs: MessageReader<R>,
17    ctx: Arc<Context>,
18    dtype: Option<Arc<DType>>,
19}
20
21impl<R: VortexRead> StreamArrayReader<R> {
22    pub async fn try_new(read: R, ctx: Arc<Context>) -> VortexResult<Self> {
23        Ok(Self {
24            msgs: MessageReader::try_new(read).await?,
25            ctx,
26            dtype: None,
27        })
28    }
29
30    pub fn with_dtype(mut self, dtype: Arc<DType>) -> Self {
31        assert!(self.dtype.is_none(), "DType already set");
32        self.dtype = Some(dtype);
33        self
34    }
35
36    pub async fn load_dtype(mut self) -> VortexResult<Self> {
37        assert!(self.dtype.is_none(), "DType already set");
38        self.dtype = Some(Arc::new(self.msgs.read_dtype().await?));
39        Ok(self)
40    }
41
42    /// Reads a single array from the stream.
43    pub fn array_stream(&mut self) -> impl ArrayStream + '_ {
44        let dtype = self
45            .dtype
46            .as_ref()
47            .vortex_expect("Cannot read array from stream: DType not set")
48            .deref()
49            .clone();
50        self.msgs.array_stream(self.ctx.clone(), dtype)
51    }
52
53    pub fn into_array_stream(self) -> impl ArrayStream {
54        let dtype = self
55            .dtype
56            .as_ref()
57            .vortex_expect("Cannot read array from stream: DType not set")
58            .deref()
59            .clone();
60        self.msgs.into_array_stream(self.ctx.clone(), dtype)
61    }
62
63    /// Reads a single page from the stream.
64    pub async fn next_page(&mut self) -> VortexResult<Option<Buffer>> {
65        self.msgs.maybe_read_page().await
66    }
67
68    /// Reads consecutive pages from the stream until the message type changes.
69    pub async fn page_stream(&mut self) -> impl Stream<Item = VortexResult<Buffer>> + '_ {
70        try_unfold(self, |reader| async {
71            match reader.next_page().await? {
72                Some(page) => Ok(Some((page, reader))),
73                None => Ok(None),
74            }
75        })
76    }
77}