vortex_layout/layouts/chunked/
writer.rs

1use std::sync::Arc;
2
3use vortex_array::arcref::ArcRef;
4use vortex_array::{ArrayContext, ArrayRef};
5use vortex_dtype::DType;
6use vortex_error::{VortexExpect, VortexResult};
7
8use crate::data::Layout;
9use crate::layouts::chunked::ChunkedLayout;
10use crate::layouts::flat::writer::FlatLayoutStrategy;
11use crate::segments::SegmentWriter;
12use crate::strategy::LayoutStrategy;
13use crate::writer::LayoutWriter;
14use crate::{LayoutVTableRef, LayoutWriterExt};
15
16#[derive(Clone)]
17pub struct ChunkedLayoutStrategy {
18    /// The layout strategy for each chunk.
19    pub chunk_strategy: ArcRef<dyn LayoutStrategy>,
20}
21
22impl Default for ChunkedLayoutStrategy {
23    fn default() -> Self {
24        Self {
25            chunk_strategy: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
26        }
27    }
28}
29
30impl LayoutStrategy for ChunkedLayoutStrategy {
31    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
32        Ok(ChunkedLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
33    }
34}
35
36/// A basic implementation of a chunked layout writer that writes each batch into its own chunk.
37///
38/// TODO(ngates): introduce more sophisticated layout writers with different chunking strategies.
39pub struct ChunkedLayoutWriter {
40    ctx: ArrayContext,
41    options: ChunkedLayoutStrategy,
42    chunks: Vec<Box<dyn LayoutWriter>>,
43    dtype: DType,
44    row_count: u64,
45}
46
47impl ChunkedLayoutWriter {
48    pub fn new(ctx: ArrayContext, dtype: DType, options: ChunkedLayoutStrategy) -> Self {
49        Self {
50            ctx,
51            options,
52            chunks: vec![],
53            dtype,
54            row_count: 0,
55        }
56    }
57}
58
59impl LayoutWriter for ChunkedLayoutWriter {
60    fn push_chunk(
61        &mut self,
62        segment_writer: &mut dyn SegmentWriter,
63        chunk: ArrayRef,
64    ) -> VortexResult<()> {
65        self.row_count += chunk.len() as u64;
66
67        // We write each chunk, but don't call finish quite yet to ensure that chunks have an
68        // opportunity to write messages at the end of the file.
69        let mut chunk_writer = self
70            .options
71            .chunk_strategy
72            .new_writer(&self.ctx, chunk.dtype())?;
73        chunk_writer.push_chunk(segment_writer, chunk)?;
74        chunk_writer.flush(segment_writer)?;
75        self.chunks.push(chunk_writer);
76
77        Ok(())
78    }
79
80    fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
81        // We flush each chunk as we write it, so there's nothing to do here.
82        Ok(())
83    }
84
85    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
86        // Call finish on each chunk's writer
87        let mut children = vec![];
88        for writer in self.chunks.iter_mut() {
89            // FIXME(ngates): we should try calling finish after each chunk.
90            children.push(writer.finish(segment_writer)?);
91        }
92
93        // If there's only one child, there's no point even writing a stats table since
94        // there's no pruning for us to do.
95        if children.len() == 1 {
96            return Ok(children.pop().vortex_expect("child layout"));
97        }
98        Ok(chunked_layout(self.dtype.clone(), self.row_count, children))
99    }
100}
101
102pub(crate) fn chunked_layout(dtype: DType, row_count: u64, children: Vec<Layout>) -> Layout {
103    Layout::new_owned(
104        "chunked".into(),
105        LayoutVTableRef::new_ref(&ChunkedLayout),
106        dtype,
107        row_count,
108        vec![],
109        children,
110        None,
111    )
112}