vortex_layout/layouts/chunked/
writer.rs

1use vortex_array::arcref::ArcRef;
2use vortex_array::{ArrayContext, ArrayRef};
3use vortex_dtype::DType;
4use vortex_error::{VortexExpect, VortexResult};
5
6use crate::data::Layout;
7use crate::layouts::chunked::ChunkedLayout;
8use crate::layouts::flat::FlatLayout;
9use crate::segments::SegmentWriter;
10use crate::strategy::LayoutStrategy;
11use crate::writer::LayoutWriter;
12use crate::{LayoutVTableRef, LayoutWriterExt};
13
14#[derive(Clone)]
15pub struct ChunkedLayoutStrategy {
16    /// The layout strategy for each chunk.
17    pub chunk_strategy: ArcRef<dyn LayoutStrategy>,
18}
19
20impl Default for ChunkedLayoutStrategy {
21    fn default() -> Self {
22        Self {
23            chunk_strategy: ArcRef::new_ref(&FlatLayout),
24        }
25    }
26}
27
28impl LayoutStrategy for ChunkedLayoutStrategy {
29    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
30        Ok(ChunkedLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
31    }
32}
33
34/// A basic implementation of a chunked layout writer that writes each batch into its own chunk.
35///
36/// TODO(ngates): introduce more sophisticated layout writers with different chunking strategies.
37pub struct ChunkedLayoutWriter {
38    ctx: ArrayContext,
39    options: ChunkedLayoutStrategy,
40    chunks: Vec<Box<dyn LayoutWriter>>,
41    dtype: DType,
42    row_count: u64,
43}
44
45impl ChunkedLayoutWriter {
46    pub fn new(ctx: ArrayContext, dtype: DType, options: ChunkedLayoutStrategy) -> Self {
47        Self {
48            ctx,
49            options,
50            chunks: vec![],
51            dtype,
52            row_count: 0,
53        }
54    }
55}
56
57impl LayoutWriter for ChunkedLayoutWriter {
58    fn push_chunk(
59        &mut self,
60        segment_writer: &mut dyn SegmentWriter,
61        chunk: ArrayRef,
62    ) -> VortexResult<()> {
63        self.row_count += chunk.len() as u64;
64
65        // We write each chunk, but don't call finish quite yet to ensure that chunks have an
66        // opportunity to write messages at the end of the file.
67        let mut chunk_writer = self
68            .options
69            .chunk_strategy
70            .new_writer(&self.ctx, chunk.dtype())?;
71        chunk_writer.push_chunk(segment_writer, chunk)?;
72        chunk_writer.flush(segment_writer)?;
73        self.chunks.push(chunk_writer);
74
75        Ok(())
76    }
77
78    fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
79        // We flush each chunk as we write it, so there's nothing to do here.
80        Ok(())
81    }
82
83    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
84        // Call finish on each chunk's writer
85        let mut children = vec![];
86        for writer in self.chunks.iter_mut() {
87            // FIXME(ngates): we should try calling finish after each chunk.
88            children.push(writer.finish(segment_writer)?);
89        }
90
91        // If there's only one child, there's no point even writing a stats table since
92        // there's no pruning for us to do.
93        if children.len() == 1 {
94            return Ok(children.pop().vortex_expect("child layout"));
95        }
96
97        Ok(Layout::new_owned(
98            "chunked".into(),
99            LayoutVTableRef::new_ref(&ChunkedLayout),
100            self.dtype.clone(),
101            self.row_count,
102            vec![],
103            children,
104            None,
105        ))
106    }
107}