vortex_serde/stream_reader/
mod.rs1use std::ops::Deref;
2use std::sync::Arc;
3
4use futures_util::stream::try_unfold;
5use futures_util::Stream;
6use vortex_array::stream::ArrayStream;
7use vortex_array::Context;
8use vortex_buffer::Buffer;
9use vortex_dtype::DType;
10use vortex_error::{VortexExpect as _, VortexResult};
11
12use crate::io::VortexRead;
13use crate::MessageReader;
14
15pub struct StreamArrayReader<R: VortexRead> {
16 msgs: MessageReader<R>,
17 ctx: Arc<Context>,
18 dtype: Option<Arc<DType>>,
19}
20
21impl<R: VortexRead> StreamArrayReader<R> {
22 pub async fn try_new(read: R, ctx: Arc<Context>) -> VortexResult<Self> {
23 Ok(Self {
24 msgs: MessageReader::try_new(read).await?,
25 ctx,
26 dtype: None,
27 })
28 }
29
30 pub fn with_dtype(mut self, dtype: Arc<DType>) -> Self {
31 assert!(self.dtype.is_none(), "DType already set");
32 self.dtype = Some(dtype);
33 self
34 }
35
36 pub async fn load_dtype(mut self) -> VortexResult<Self> {
37 assert!(self.dtype.is_none(), "DType already set");
38 self.dtype = Some(Arc::new(self.msgs.read_dtype().await?));
39 Ok(self)
40 }
41
42 pub fn array_stream(&mut self) -> impl ArrayStream + '_ {
44 let dtype = self
45 .dtype
46 .as_ref()
47 .vortex_expect("Cannot read array from stream: DType not set")
48 .deref()
49 .clone();
50 self.msgs.array_stream(self.ctx.clone(), dtype)
51 }
52
53 pub fn into_array_stream(self) -> impl ArrayStream {
54 let dtype = self
55 .dtype
56 .as_ref()
57 .vortex_expect("Cannot read array from stream: DType not set")
58 .deref()
59 .clone();
60 self.msgs.into_array_stream(self.ctx.clone(), dtype)
61 }
62
63 pub async fn next_page(&mut self) -> VortexResult<Option<Buffer>> {
65 self.msgs.maybe_read_page().await
66 }
67
68 pub async fn page_stream(&mut self) -> impl Stream<Item = VortexResult<Buffer>> + '_ {
70 try_unfold(self, |reader| async {
71 match reader.next_page().await? {
72 Some(page) => Ok(Some((page, reader))),
73 None => Ok(None),
74 }
75 })
76 }
77}