vortex_serde/stream_writer/
mod.rs

1use std::fmt::{Display, Formatter};
2use std::ops::Range;
3
4use futures_util::{Stream, TryStreamExt};
5use vortex_array::array::ChunkedArray;
6use vortex_array::stream::ArrayStream;
7use vortex_array::Array;
8use vortex_buffer::Buffer;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::io::VortexWrite;
13use crate::MessageWriter;
14
15#[cfg(test)]
16mod tests;
17
18pub struct StreamArrayWriter<W: VortexWrite> {
19    msgs: MessageWriter<W>,
20
21    array_layouts: Vec<ArrayLayout>,
22    page_ranges: Vec<ByteRange>,
23}
24
25impl<W: VortexWrite> StreamArrayWriter<W> {
26    pub fn new(write: W) -> Self {
27        Self {
28            msgs: MessageWriter::new(write),
29            array_layouts: vec![],
30            page_ranges: vec![],
31        }
32    }
33
34    pub fn array_layouts(&self) -> &[ArrayLayout] {
35        &self.array_layouts
36    }
37
38    pub fn page_ranges(&self) -> &[ByteRange] {
39        &self.page_ranges
40    }
41
42    pub fn into_inner(self) -> W {
43        self.msgs.into_inner()
44    }
45
46    async fn write_dtype(&mut self, dtype: &DType) -> VortexResult<ByteRange> {
47        let begin = self.msgs.tell();
48        self.msgs.write_dtype(dtype).await?;
49        let end = self.msgs.tell();
50        Ok(ByteRange { begin, end })
51    }
52
53    async fn write_array_chunks<S>(&mut self, mut stream: S) -> VortexResult<ChunkOffsets>
54    where
55        S: Stream<Item = VortexResult<Array>> + Unpin,
56    {
57        let mut byte_offsets = vec![self.msgs.tell()];
58        let mut row_offsets = vec![0];
59        let mut row_offset = 0;
60
61        while let Some(chunk) = stream.try_next().await? {
62            row_offset += chunk.len() as u64;
63            row_offsets.push(row_offset);
64            self.msgs.write_batch(chunk).await?;
65            byte_offsets.push(self.msgs.tell());
66        }
67
68        Ok(ChunkOffsets::new(byte_offsets, row_offsets))
69    }
70
71    pub async fn write_array_stream<S: ArrayStream + Unpin>(
72        mut self,
73        mut array_stream: S,
74    ) -> VortexResult<Self> {
75        let dtype_pos = self.write_dtype(array_stream.dtype()).await?;
76        let chunk_pos = self.write_array_chunks(&mut array_stream).await?;
77        self.array_layouts.push(ArrayLayout {
78            dtype: dtype_pos,
79            chunks: chunk_pos,
80        });
81        Ok(self)
82    }
83
84    pub async fn write_array(self, array: Array) -> VortexResult<Self> {
85        if let Ok(chunked) = ChunkedArray::try_from(&array) {
86            self.write_array_stream(chunked.array_stream()).await
87        } else {
88            self.write_array_stream(array.into_array_stream()).await
89        }
90    }
91
92    pub async fn write_page(mut self, buffer: Buffer) -> VortexResult<Self> {
93        let begin = self.msgs.tell();
94        self.msgs.write_page(buffer).await?;
95        let end = self.msgs.tell();
96        self.page_ranges.push(ByteRange { begin, end });
97        Ok(self)
98    }
99}
100
101#[derive(Copy, Clone, Debug)]
102pub struct ByteRange {
103    pub begin: u64,
104    pub end: u64,
105}
106
107impl Display for ByteRange {
108    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109        write!(f, "[{}, {})", self.begin, self.end)
110    }
111}
112
113impl ByteRange {
114    pub fn new(begin: u64, end: u64) -> Self {
115        assert!(end > begin, "Buffer end must be after its beginning");
116        Self { begin, end }
117    }
118
119    pub fn len(&self) -> usize {
120        (self.end - self.begin) as usize
121    }
122
123    pub fn is_empty(&self) -> bool {
124        self.begin == self.end
125    }
126
127    pub fn to_range(&self) -> Range<usize> {
128        self.begin as usize..self.end as usize
129    }
130}
131
132#[derive(Clone, Debug)]
133pub struct ArrayLayout {
134    pub dtype: ByteRange,
135    pub chunks: ChunkOffsets,
136}
137
138#[derive(Clone, Debug)]
139pub struct ChunkOffsets {
140    pub byte_offsets: Vec<u64>,
141    pub row_offsets: Vec<u64>,
142}
143
144impl ChunkOffsets {
145    pub fn new(byte_offsets: Vec<u64>, row_offsets: Vec<u64>) -> Self {
146        Self {
147            byte_offsets,
148            row_offsets,
149        }
150    }
151}