vortex_layout/layouts/zoned/
writer.rs

1use std::sync::Arc;
2
3use arcref::ArcRef;
4use itertools::Itertools;
5use vortex_array::stats::{PRUNING_STATS, Stat};
6use vortex_array::{Array, ArrayContext, ArrayRef};
7use vortex_dtype::DType;
8use vortex_error::{VortexResult, vortex_bail};
9
10use crate::layouts::zoned::ZonedLayout;
11use crate::layouts::zoned::zone_map::StatsAccumulator;
12use crate::segments::SegmentWriter;
13use crate::writer::{LayoutWriter, LayoutWriterExt};
14use crate::{IntoLayout, LayoutRef, LayoutStrategy};
15
16pub struct ZonedLayoutOptions {
17    /// The size of a statistics block
18    pub block_size: usize,
19    /// The statistics to collect for each block.
20    pub stats: Arc<[Stat]>,
21}
22
23impl Default for ZonedLayoutOptions {
24    fn default() -> Self {
25        Self {
26            block_size: 8192,
27            stats: PRUNING_STATS.into(),
28        }
29    }
30}
31
32pub struct ZonedLayoutWriter {
33    ctx: ArrayContext,
34    options: ZonedLayoutOptions,
35    data_writer: Box<dyn LayoutWriter>,
36    zone_map_strategy: ArcRef<dyn LayoutStrategy>,
37    stats_accumulator: StatsAccumulator,
38    dtype: DType,
39
40    nblocks: usize,
41    // Whether we've seen a block with a len < block_size.
42    final_block: bool,
43}
44
45impl ZonedLayoutWriter {
46    pub fn new(
47        ctx: ArrayContext,
48        dtype: &DType,
49        // TODO(ngates): we should arrive at a convention on this. I think we should maybe just
50        //  impl LayoutStrategy for StatsLayoutStrategy, which holds options, and options contain
51        //  other layout strategies?
52        child_writer: Box<dyn LayoutWriter>,
53        stats_strategy: ArcRef<dyn LayoutStrategy>,
54        options: ZonedLayoutOptions,
55    ) -> Self {
56        let present_stats: Arc<[Stat]> = options.stats.iter().sorted().copied().collect();
57        let stats_accumulator = StatsAccumulator::new(dtype.clone(), &present_stats);
58
59        Self {
60            ctx,
61            options,
62            data_writer: child_writer,
63            zone_map_strategy: stats_strategy,
64            stats_accumulator,
65            dtype: dtype.clone(),
66            nblocks: 0,
67            final_block: false,
68        }
69    }
70}
71
72impl LayoutWriter for ZonedLayoutWriter {
73    fn push_chunk(
74        &mut self,
75        segment_writer: &mut dyn SegmentWriter,
76        chunk: ArrayRef,
77    ) -> VortexResult<()> {
78        assert_eq!(
79            chunk.dtype(),
80            &self.dtype,
81            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
82            chunk.dtype(),
83            self.dtype
84        );
85        if chunk.len() > self.options.block_size {
86            vortex_bail!(
87                "Chunks passed to StatsLayoutWriter must be block_size in length, except the final block. Use RepartitionWriter to split chunks into blocks."
88            );
89        }
90        if self.final_block {
91            vortex_bail!(
92                "Cannot push chunks to StatsLayoutWriter after the final block has been written."
93            );
94        }
95        if chunk.len() < self.options.block_size {
96            self.final_block = true;
97        }
98
99        self.nblocks += 1;
100        self.stats_accumulator.push_chunk(&chunk)?;
101        self.data_writer.push_chunk(segment_writer, chunk)
102    }
103
104    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
105        self.data_writer.flush(segment_writer)
106    }
107
108    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
109        let data = self.data_writer.finish(segment_writer)?;
110
111        // Collect together the statistics
112        let Some(stats_table) = self.stats_accumulator.as_stats_table() else {
113            // If we have no stats (e.g. the DType doesn't support them), then we just return the
114            // child layout.
115            return Ok(data);
116        };
117
118        // We must defer creating the stats table LayoutWriter until now, because the DType of
119        // the table depends on which stats were successfully computed.
120        let stats_array = stats_table.array();
121        let mut stats_writer = self
122            .zone_map_strategy
123            .new_writer(&self.ctx, stats_array.dtype())?;
124        let zones_layout = stats_writer.push_one(segment_writer, stats_table.array().to_array())?;
125
126        Ok(ZonedLayout::new(
127            data,
128            zones_layout,
129            self.options.block_size,
130            stats_table.present_stats().clone(),
131        )
132        .into_layout())
133    }
134}