vortex_serde/stream_writer/
mod.rs1use std::fmt::{Display, Formatter};
2use std::ops::Range;
3
4use futures_util::{Stream, TryStreamExt};
5use vortex_array::array::ChunkedArray;
6use vortex_array::stream::ArrayStream;
7use vortex_array::Array;
8use vortex_buffer::Buffer;
9use vortex_dtype::DType;
10use vortex_error::VortexResult;
11
12use crate::io::VortexWrite;
13use crate::MessageWriter;
14
15#[cfg(test)]
16mod tests;
17
18pub struct StreamArrayWriter<W: VortexWrite> {
19 msgs: MessageWriter<W>,
20
21 array_layouts: Vec<ArrayLayout>,
22 page_ranges: Vec<ByteRange>,
23}
24
25impl<W: VortexWrite> StreamArrayWriter<W> {
26 pub fn new(write: W) -> Self {
27 Self {
28 msgs: MessageWriter::new(write),
29 array_layouts: vec![],
30 page_ranges: vec![],
31 }
32 }
33
34 pub fn array_layouts(&self) -> &[ArrayLayout] {
35 &self.array_layouts
36 }
37
38 pub fn page_ranges(&self) -> &[ByteRange] {
39 &self.page_ranges
40 }
41
42 pub fn into_inner(self) -> W {
43 self.msgs.into_inner()
44 }
45
46 async fn write_dtype(&mut self, dtype: &DType) -> VortexResult<ByteRange> {
47 let begin = self.msgs.tell();
48 self.msgs.write_dtype(dtype).await?;
49 let end = self.msgs.tell();
50 Ok(ByteRange { begin, end })
51 }
52
53 async fn write_array_chunks<S>(&mut self, mut stream: S) -> VortexResult<ChunkOffsets>
54 where
55 S: Stream<Item = VortexResult<Array>> + Unpin,
56 {
57 let mut byte_offsets = vec![self.msgs.tell()];
58 let mut row_offsets = vec![0];
59 let mut row_offset = 0;
60
61 while let Some(chunk) = stream.try_next().await? {
62 row_offset += chunk.len() as u64;
63 row_offsets.push(row_offset);
64 self.msgs.write_batch(chunk).await?;
65 byte_offsets.push(self.msgs.tell());
66 }
67
68 Ok(ChunkOffsets::new(byte_offsets, row_offsets))
69 }
70
71 pub async fn write_array_stream<S: ArrayStream + Unpin>(
72 mut self,
73 mut array_stream: S,
74 ) -> VortexResult<Self> {
75 let dtype_pos = self.write_dtype(array_stream.dtype()).await?;
76 let chunk_pos = self.write_array_chunks(&mut array_stream).await?;
77 self.array_layouts.push(ArrayLayout {
78 dtype: dtype_pos,
79 chunks: chunk_pos,
80 });
81 Ok(self)
82 }
83
84 pub async fn write_array(self, array: Array) -> VortexResult<Self> {
85 if let Ok(chunked) = ChunkedArray::try_from(&array) {
86 self.write_array_stream(chunked.array_stream()).await
87 } else {
88 self.write_array_stream(array.into_array_stream()).await
89 }
90 }
91
92 pub async fn write_page(mut self, buffer: Buffer) -> VortexResult<Self> {
93 let begin = self.msgs.tell();
94 self.msgs.write_page(buffer).await?;
95 let end = self.msgs.tell();
96 self.page_ranges.push(ByteRange { begin, end });
97 Ok(self)
98 }
99}
100
101#[derive(Copy, Clone, Debug)]
102pub struct ByteRange {
103 pub begin: u64,
104 pub end: u64,
105}
106
107impl Display for ByteRange {
108 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
109 write!(f, "[{}, {})", self.begin, self.end)
110 }
111}
112
113impl ByteRange {
114 pub fn new(begin: u64, end: u64) -> Self {
115 assert!(end > begin, "Buffer end must be after its beginning");
116 Self { begin, end }
117 }
118
119 pub fn len(&self) -> usize {
120 (self.end - self.begin) as usize
121 }
122
123 pub fn is_empty(&self) -> bool {
124 self.begin == self.end
125 }
126
127 pub fn to_range(&self) -> Range<usize> {
128 self.begin as usize..self.end as usize
129 }
130}
131
132#[derive(Clone, Debug)]
133pub struct ArrayLayout {
134 pub dtype: ByteRange,
135 pub chunks: ChunkOffsets,
136}
137
138#[derive(Clone, Debug)]
139pub struct ChunkOffsets {
140 pub byte_offsets: Vec<u64>,
141 pub row_offsets: Vec<u64>,
142}
143
144impl ChunkOffsets {
145 pub fn new(byte_offsets: Vec<u64>, row_offsets: Vec<u64>) -> Self {
146 Self {
147 byte_offsets,
148 row_offsets,
149 }
150 }
151}