vortex_layout/layouts/zoned/
writer.rs

1use std::sync::Arc;
2
3use arcref::ArcRef;
4use itertools::Itertools;
5use vortex_array::stats::{PRUNING_STATS, Stat};
6use vortex_array::{Array, ArrayContext, ArrayRef};
7use vortex_dtype::DType;
8use vortex_error::{VortexResult, vortex_bail};
9
10use crate::layouts::zoned::ZonedLayout;
11use crate::layouts::zoned::zone_map::StatsAccumulator;
12use crate::segments::SegmentWriter;
13use crate::writer::{LayoutWriter, LayoutWriterExt};
14use crate::{IntoLayout, LayoutRef, LayoutStrategy};
15
16pub struct ZonedLayoutOptions {
17    /// The size of a statistics block
18    pub block_size: usize,
19    /// The statistics to collect for each block.
20    pub stats: Arc<[Stat]>,
21    /// Maximum length of a variable length statistics
22    pub max_variable_length_statistics_size: usize,
23}
24
25impl Default for ZonedLayoutOptions {
26    fn default() -> Self {
27        Self {
28            block_size: 8192,
29            stats: PRUNING_STATS.into(),
30            max_variable_length_statistics_size: 64,
31        }
32    }
33}
34
35pub struct ZonedLayoutWriter {
36    ctx: ArrayContext,
37    options: ZonedLayoutOptions,
38    data_writer: Box<dyn LayoutWriter>,
39    zone_map_strategy: ArcRef<dyn LayoutStrategy>,
40    stats_accumulator: StatsAccumulator,
41    dtype: DType,
42
43    nblocks: usize,
44    // Whether we've seen a block with a len < block_size.
45    final_block: bool,
46}
47
48impl ZonedLayoutWriter {
49    pub fn new(
50        ctx: ArrayContext,
51        dtype: &DType,
52        // TODO(ngates): we should arrive at a convention on this. I think we should maybe just
53        //  impl LayoutStrategy for StatsLayoutStrategy, which holds options, and options contain
54        //  other layout strategies?
55        child_writer: Box<dyn LayoutWriter>,
56        stats_strategy: ArcRef<dyn LayoutStrategy>,
57        options: ZonedLayoutOptions,
58    ) -> Self {
59        let present_stats: Arc<[Stat]> = options.stats.iter().sorted().copied().collect();
60        let stats_accumulator = StatsAccumulator::new(
61            dtype,
62            &present_stats,
63            options.max_variable_length_statistics_size,
64        );
65
66        Self {
67            ctx,
68            options,
69            data_writer: child_writer,
70            zone_map_strategy: stats_strategy,
71            stats_accumulator,
72            dtype: dtype.clone(),
73            nblocks: 0,
74            final_block: false,
75        }
76    }
77}
78
79impl LayoutWriter for ZonedLayoutWriter {
80    fn push_chunk(
81        &mut self,
82        segment_writer: &mut dyn SegmentWriter,
83        chunk: ArrayRef,
84    ) -> VortexResult<()> {
85        assert_eq!(
86            chunk.dtype(),
87            &self.dtype,
88            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
89            chunk.dtype(),
90            self.dtype
91        );
92        if chunk.len() > self.options.block_size {
93            vortex_bail!(
94                "Chunks passed to StatsLayoutWriter must be block_size in length, except the final block. Use RepartitionWriter to split chunks into blocks."
95            );
96        }
97        if self.final_block {
98            vortex_bail!(
99                "Cannot push chunks to StatsLayoutWriter after the final block has been written."
100            );
101        }
102        if chunk.len() < self.options.block_size {
103            self.final_block = true;
104        }
105
106        self.nblocks += 1;
107        self.stats_accumulator.push_chunk(&chunk)?;
108        self.data_writer.push_chunk(segment_writer, chunk)
109    }
110
111    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
112        self.data_writer.flush(segment_writer)
113    }
114
115    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
116        let data = self.data_writer.finish(segment_writer)?;
117
118        // Collect together the statistics
119        let Some(stats_table) = self.stats_accumulator.as_stats_table() else {
120            // If we have no stats (e.g. the DType doesn't support them), then we just return the
121            // child layout.
122            return Ok(data);
123        };
124
125        // We must defer creating the stats table LayoutWriter until now, because the DType of
126        // the table depends on which stats were successfully computed.
127        let stats_array = stats_table.array();
128        let mut stats_writer = self
129            .zone_map_strategy
130            .new_writer(&self.ctx, stats_array.dtype())?;
131        let zones_layout = stats_writer.push_one(segment_writer, stats_table.array().to_array())?;
132
133        Ok(ZonedLayout::new(
134            data,
135            zones_layout,
136            self.options.block_size,
137            stats_table.present_stats().clone(),
138        )
139        .into_layout())
140    }
141}