vortex_layout/layouts/chunked/
writer.rs

1use std::sync::Arc;
2
3use arcref::ArcRef;
4use vortex_array::{ArrayContext, ArrayRef};
5use vortex_dtype::DType;
6use vortex_error::{VortexExpect, VortexResult};
7
8use crate::data::Layout;
9use crate::layouts::chunked::ChunkedLayout;
10use crate::layouts::flat::writer::FlatLayoutStrategy;
11use crate::segments::SegmentWriter;
12use crate::strategy::LayoutStrategy;
13use crate::writer::LayoutWriter;
14use crate::{LayoutVTableRef, LayoutWriterExt};
15
16#[derive(Clone)]
17pub struct ChunkedLayoutStrategy {
18    /// The layout strategy for each chunk.
19    pub chunk_strategy: ArcRef<dyn LayoutStrategy>,
20}
21
22impl Default for ChunkedLayoutStrategy {
23    fn default() -> Self {
24        Self {
25            chunk_strategy: ArcRef::new_arc(Arc::new(FlatLayoutStrategy::default())),
26        }
27    }
28}
29
30impl LayoutStrategy for ChunkedLayoutStrategy {
31    fn new_writer(&self, ctx: &ArrayContext, dtype: &DType) -> VortexResult<Box<dyn LayoutWriter>> {
32        Ok(ChunkedLayoutWriter::new(ctx.clone(), dtype.clone(), self.clone()).boxed())
33    }
34}
35
36/// A basic implementation of a chunked layout writer that writes each batch into its own chunk.
37///
38/// TODO(ngates): introduce more sophisticated layout writers with different chunking strategies.
39pub struct ChunkedLayoutWriter {
40    ctx: ArrayContext,
41    options: ChunkedLayoutStrategy,
42    chunks: Vec<Box<dyn LayoutWriter>>,
43    dtype: DType,
44    row_count: u64,
45}
46
47impl ChunkedLayoutWriter {
48    pub fn new(ctx: ArrayContext, dtype: DType, options: ChunkedLayoutStrategy) -> Self {
49        Self {
50            ctx,
51            options,
52            chunks: vec![],
53            dtype,
54            row_count: 0,
55        }
56    }
57}
58
59impl LayoutWriter for ChunkedLayoutWriter {
60    fn push_chunk(
61        &mut self,
62        segment_writer: &mut dyn SegmentWriter,
63        chunk: ArrayRef,
64    ) -> VortexResult<()> {
65        assert_eq!(
66            chunk.dtype(),
67            &self.dtype,
68            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
69            chunk.dtype(),
70            self.dtype
71        );
72
73        self.row_count += chunk.len() as u64;
74
75        // We write each chunk, but don't call finish quite yet to ensure that chunks have an
76        // opportunity to write messages at the end of the file.
77        let mut chunk_writer = self
78            .options
79            .chunk_strategy
80            .new_writer(&self.ctx, chunk.dtype())?;
81        chunk_writer.push_chunk(segment_writer, chunk)?;
82        chunk_writer.flush(segment_writer)?;
83        self.chunks.push(chunk_writer);
84
85        Ok(())
86    }
87
88    fn flush(&mut self, _segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
89        // We flush each chunk as we write it, so there's nothing to do here.
90        Ok(())
91    }
92
93    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
94        // Call finish on each chunk's writer
95        let mut children = vec![];
96        for writer in self.chunks.iter_mut() {
97            // FIXME(ngates): we should try calling finish after each chunk.
98            children.push(writer.finish(segment_writer)?);
99        }
100
101        // If there's only one child, there's no point even writing a stats table since
102        // there's no pruning for us to do.
103        if children.len() == 1 {
104            return Ok(children.pop().vortex_expect("child layout"));
105        }
106        Ok(chunked_layout(self.dtype.clone(), self.row_count, children))
107    }
108}
109
110pub(crate) fn chunked_layout(dtype: DType, row_count: u64, children: Vec<Layout>) -> Layout {
111    Layout::new_owned(
112        "chunked".into(),
113        LayoutVTableRef::new_ref(&ChunkedLayout),
114        dtype,
115        row_count,
116        vec![],
117        children,
118        None,
119    )
120}