vortex_file/
writer.rs

1use futures::StreamExt;
2use vortex_array::ArrayContext;
3use vortex_array::stats::{PRUNING_STATS, Stat};
4use vortex_array::stream::ArrayStream;
5use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err};
6use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, WriteFlatBufferExt};
7use vortex_io::VortexWrite;
8use vortex_layout::layouts::file_stats::FileStatsLayoutWriter;
9use vortex_layout::{LayoutStrategy, LayoutWriter};
10
11use crate::footer::{FileLayoutFlatBufferWriter, FileStatistics, Postscript, PostscriptSegment};
12use crate::segments::writer::BufferedSegmentWriter;
13use crate::strategy::VortexLayoutStrategy;
14use crate::{EOF_SIZE, MAGIC_BYTES, MAX_FOOTER_SIZE, VERSION};
15
16/// Configure a new writer, which can eventually be used to write an [`ArrayStream`] into a sink that implements [`VortexWrite`].
17///
18/// By default, the [`LayoutStrategy`] will be the [`VortexLayoutStrategy`], which includes re-chunking and will also
19/// uncompress all data back to its canonical form before compressing it using the [`BtrBlocksCompressor`](vortex_btrblocks::BtrBlocksCompressor).
20pub struct VortexWriteOptions {
21    strategy: Box<dyn LayoutStrategy>,
22    exclude_dtype: bool,
23    file_statistics: Vec<Stat>,
24}
25
26impl Default for VortexWriteOptions {
27    fn default() -> Self {
28        Self {
29            strategy: Box::new(VortexLayoutStrategy),
30            exclude_dtype: false,
31            file_statistics: PRUNING_STATS.to_vec(),
32        }
33    }
34}
35
36impl VortexWriteOptions {
37    /// Replace the default layout strategy with the provided one.
38    pub fn with_strategy<S: LayoutStrategy>(mut self, strategy: S) -> Self {
39        self.strategy = Box::new(strategy);
40        self
41    }
42
43    /// Exclude the DType from the Vortex file. You must provide the DType to the reader.
44    // TODO(ngates): Should we store some sort of DType checksum to make sure the one passed at
45    //  read-time is sane? I guess most layouts will have some reasonable validation.
46    pub fn exclude_dtype(mut self) -> Self {
47        self.exclude_dtype = true;
48        self
49    }
50
51    /// Configure which statistics to compute at the file-level.
52    pub fn with_file_statistics(mut self, file_statistics: Vec<Stat>) -> Self {
53        self.file_statistics = file_statistics;
54        self
55    }
56}
57
58impl VortexWriteOptions {
59    /// Perform an async write of the provided stream of `Array`.
60    pub async fn write<W: VortexWrite, S: ArrayStream + Unpin>(
61        self,
62        write: W,
63        mut stream: S,
64    ) -> VortexResult<W> {
65        // Set up a Context to capture the encodings used in the file.
66        let ctx = ArrayContext::empty();
67
68        // Set up the root layout
69        let mut layout_writer = FileStatsLayoutWriter::new(
70            self.strategy.new_writer(&ctx, stream.dtype())?,
71            stream.dtype(),
72            self.file_statistics.clone().into(),
73        )?;
74
75        // First we write the magic number
76        let mut write = futures::io::Cursor::new(write);
77        write.write_all(MAGIC_BYTES).await?;
78
79        // Our buffered message writer accumulates messages for each batch so we can flush them
80        // into the file.
81        let mut segment_writer = BufferedSegmentWriter::default();
82        let mut segment_specs = vec![];
83
84        // Then write the stream via the root layout
85        while let Some(chunk) = stream.next().await {
86            let chunk = chunk?;
87            layout_writer.push_chunk(&mut segment_writer, chunk)?;
88            // NOTE(ngates): we could spawn this task and continue to compress the next chunk.
89            segment_writer
90                .flush_async(&mut write, &mut segment_specs)
91                .await?;
92        }
93
94        // Flush the final layout messages into the file
95        layout_writer.flush(&mut segment_writer)?;
96        segment_writer
97            .flush_async(&mut write, &mut segment_specs)
98            .await?;
99
100        // Finish the layouts and flush the finishing messages into the file
101        let layout = layout_writer.finish(&mut segment_writer)?;
102        segment_writer
103            .flush_async(&mut write, &mut segment_specs)
104            .await?;
105
106        // We write our footer components in order of least likely to be needed to most likely.
107        // DType is the least likely to be needed, as many readers may provide this from an
108        // external source.
109        let dtype_segment = if self.exclude_dtype {
110            None
111        } else {
112            Some(self.write_flatbuffer(&mut write, stream.dtype()).await?)
113        };
114
115        let statistics_segment = if self.file_statistics.is_empty() {
116            None
117        } else {
118            let file_statistics = FileStatistics(layout_writer.into_stats_sets().into());
119            Some(self.write_flatbuffer(&mut write, &file_statistics).await?)
120        };
121
122        let layout_segment = self
123            .write_flatbuffer(
124                &mut write,
125                &FileLayoutFlatBufferWriter {
126                    ctx,
127                    layout,
128                    segment_specs: segment_specs.into(),
129                },
130            )
131            .await?;
132
133        // Assemble the postscript, and write it manually to avoid any framing.
134        let postscript = Postscript {
135            dtype: dtype_segment,
136            statistics: statistics_segment,
137            layout: layout_segment,
138        };
139        let postscript_buffer = postscript.write_flatbuffer_bytes();
140        if postscript_buffer.len() > MAX_FOOTER_SIZE as usize {
141            vortex_bail!(
142                "Postscript is too large ({} bytes); max postscript size is {}",
143                postscript_buffer.len(),
144                MAX_FOOTER_SIZE
145            );
146        }
147        let postscript_len = u16::try_from(postscript_buffer.len())
148            .vortex_expect("Postscript already verified to fit into u16");
149        write.write_all(postscript_buffer).await?;
150
151        // And finally, the EOF 8-byte footer.
152        let mut eof = [0u8; EOF_SIZE];
153        eof[0..2].copy_from_slice(&VERSION.to_le_bytes());
154        eof[2..4].copy_from_slice(&postscript_len.to_le_bytes());
155        eof[4..8].copy_from_slice(&MAGIC_BYTES);
156        write.write_all(eof).await?;
157
158        write.flush().await?;
159
160        Ok(write.into_inner())
161    }
162
163    async fn write_flatbuffer<W: VortexWrite, F: FlatBufferRoot + WriteFlatBuffer>(
164        &self,
165        write: &mut futures::io::Cursor<W>,
166        flatbuffer: &F,
167    ) -> VortexResult<PostscriptSegment> {
168        let layout_offset = write.position();
169        write.write_all(flatbuffer.write_flatbuffer_bytes()).await?;
170        Ok(PostscriptSegment {
171            offset: layout_offset,
172            length: u32::try_from(write.position() - layout_offset)
173                .map_err(|_| vortex_err!("segment length exceeds maximum u32"))?,
174            alignment: FlatBuffer::alignment(),
175        })
176    }
177}