1use futures::StreamExt;
2use vortex_array::ArrayContext;
3use vortex_array::stats::{PRUNING_STATS, Stat};
4use vortex_array::stream::ArrayStream;
5use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
6use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
7use vortex_io::VortexWrite;
8use vortex_layout::layouts::file_stats::FileStatsLayoutWriter;
9use vortex_layout::{LayoutStrategy, LayoutWriter};
10
11use crate::footer::{FileLayoutFlatBufferWriter, FileStatistics, Postscript, PostscriptSegment};
12use crate::segments::writer::BufferedSegmentWriter;
13use crate::strategy::VortexLayoutStrategy;
14use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
15
16pub struct VortexWriteOptions {
21 strategy: Box<dyn LayoutStrategy>,
22 exclude_dtype: bool,
23 file_statistics: Vec<Stat>,
24}
25
26impl Default for VortexWriteOptions {
27 fn default() -> Self {
28 Self {
29 strategy: Box::new(VortexLayoutStrategy),
30 exclude_dtype: false,
31 file_statistics: PRUNING_STATS.to_vec(),
32 }
33 }
34}
35
36impl VortexWriteOptions {
37 pub fn with_strategy<S: LayoutStrategy>(mut self, strategy: S) -> Self {
39 self.strategy = Box::new(strategy);
40 self
41 }
42
43 pub fn exclude_dtype(mut self) -> Self {
47 self.exclude_dtype = true;
48 self
49 }
50
51 pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
53 self.file_statistics = file_statistics;
54 self
55 }
56}
57
58impl VortexWriteOptions {
59 pub async fn write<W: VortexWrite, S: ArrayStream + Unpin>(
61 self,
62 write: W,
63 mut stream: S,
64 ) -> VortexResult<W> {
65 let ctx = ArrayContext::empty();
67
68 let mut layout_writer = FileStatsLayoutWriter::new(
70 self.strategy.new_writer(&ctx, stream.dtype())?,
71 stream.dtype(),
72 self.file_statistics.clone().into(),
73 )?;
74
75 let mut write = futures::io::Cursor::new(write);
77 write.write_all(MAGIC_BYTES).await?;
78
79 let mut segment_writer = BufferedSegmentWriter::default();
82 let mut segment_specs = vec![];
83
84 while let Some(chunk) = stream.next().await {
86 let chunk = chunk?;
87 layout_writer.push_chunk(&mut segment_writer, chunk)?;
88 segment_writer
90 .flush_async(&mut write, &mut segment_specs)
91 .await?;
92 }
93
94 layout_writer.flush(&mut segment_writer)?;
96 segment_writer
97 .flush_async(&mut write, &mut segment_specs)
98 .await?;
99
100 let layout = layout_writer.finish(&mut segment_writer)?;
102 segment_writer
103 .flush_async(&mut write, &mut segment_specs)
104 .await?;
105
106 let dtype_segment = if self.exclude_dtype {
110 None
111 } else {
112 Some(self.write_flatbuffer(&mut write, stream.dtype()).await?)
113 };
114
115 let statistics_segment = if self.file_statistics.is_empty() {
116 None
117 } else {
118 let file_statistics = FileStatistics(layout_writer.into_stats_sets().into());
119 Some(self.write_flatbuffer(&mut write, &file_statistics).await?)
120 };
121
122 let layout_segment = self
123 .write_flatbuffer(
124 &mut write,
125 &FileLayoutFlatBufferWriter {
126 ctx,
127 layout,
128 segment_specs: segment_specs.into(),
129 },
130 )
131 .await?;
132
133 let postscript = Postscript {
135 dtype: dtype_segment,
136 statistics: statistics_segment,
137 layout: layout_segment,
138 };
139 let postscript_buffer = postscript.write_flatbuffer_bytes();
140 if postscript_buffer.len() > MAX_FOOTER_SIZE as usize {
141 vortex_bail!(
142 "Postscript is too large ({} bytes); max postscript size is {}",
143 postscript_buffer.len(),
144 MAX_FOOTER_SIZE
145 );
146 }
147 let postscript_len = u16::try_from(postscript_buffer.len())
148 .vortex_expect("Postscript already verified to fit into u16");
149 write.write_all(postscript_buffer).await?;
150
151 let mut eof = [0u8; EOF_SIZE];
153 eof[0..2].copy_from_slice(&VERSION.to_le_bytes());
154 eof[2..4].copy_from_slice(&postscript_len.to_le_bytes());
155 eof[4..8].copy_from_slice(&MAGIC_BYTES);
156 write.write_all(eof).await?;
157
158 write.flush().await?;
159
160 Ok(write.into_inner())
161 }
162
163 async fn write_flatbuffer<W: VortexWrite, F: FlatBufferRoot + WriteFlatBuffer>(
164 &self,
165 write: &mut futures::io::Cursor<W>,
166 flatbuffer: &F,
167 ) -> VortexResult<PostscriptSegment> {
168 let layout_offset = write.position();
169 write.write_all(flatbuffer.write_flatbuffer_bytes()).await?;
170 Ok(PostscriptSegment {
171 offset: layout_offset,
172 length: u32::try_from(write.position() - layout_offset)
173 .map_err(|_| vortex_err!("segment length exceeds maximum u32"))?,
174 alignment: FlatBuffer::alignment(),
175 })
176 }
177}