vortex_layout/layouts/stats/
writer.rs

1use std::sync::Arc;
2
3use arcref::ArcRef;
4use itertools::Itertools;
5use vortex_array::stats::{PRUNING_STATS, Stat, as_stat_bitset_bytes};
6use vortex_array::{Array, ArrayContext, ArrayRef};
7use vortex_buffer::ByteBufferMut;
8use vortex_dtype::DType;
9use vortex_error::{VortexResult, vortex_bail};
10
11use crate::data::Layout;
12use crate::layouts::stats::StatsLayout;
13use crate::layouts::stats::stats_table::StatsAccumulator;
14use crate::segments::SegmentWriter;
15use crate::writer::{LayoutWriter, LayoutWriterExt};
16use crate::{LayoutStrategy, LayoutVTableRef};
17
18pub struct StatsLayoutOptions {
19    /// The size of a statistics block
20    pub block_size: usize,
21    /// The statistics to collect for each block.
22    pub stats: Arc<[Stat]>,
23}
24
25impl Default for StatsLayoutOptions {
26    fn default() -> Self {
27        Self {
28            block_size: 8192,
29            stats: PRUNING_STATS.into(),
30        }
31    }
32}
33
34pub struct StatsLayoutWriter {
35    ctx: ArrayContext,
36    options: StatsLayoutOptions,
37    child_writer: Box<dyn LayoutWriter>,
38    stats_strategy: ArcRef<dyn LayoutStrategy>,
39    stats_accumulator: StatsAccumulator,
40    dtype: DType,
41
42    nblocks: usize,
43    // Whether we've seen a block with a len < block_size.
44    final_block: bool,
45}
46
47impl StatsLayoutWriter {
48    pub fn new(
49        ctx: ArrayContext,
50        dtype: &DType,
51        // TODO(ngates): we should arrive at a convention on this. I think we should maybe just
52        //  impl LayoutStrategy for StatsLayoutStrategy, which holds options, and options contain
53        //  other layout strategies?
54        child_writer: Box<dyn LayoutWriter>,
55        stats_strategy: ArcRef<dyn LayoutStrategy>,
56        options: StatsLayoutOptions,
57    ) -> Self {
58        let present_stats: Arc<[Stat]> = options.stats.iter().sorted().copied().collect();
59        let stats_accumulator = StatsAccumulator::new(dtype.clone(), &present_stats);
60
61        Self {
62            ctx,
63            options,
64            child_writer,
65            stats_strategy,
66            stats_accumulator,
67            dtype: dtype.clone(),
68            nblocks: 0,
69            final_block: false,
70        }
71    }
72}
73
74impl LayoutWriter for StatsLayoutWriter {
75    fn push_chunk(
76        &mut self,
77        segment_writer: &mut dyn SegmentWriter,
78        chunk: ArrayRef,
79    ) -> VortexResult<()> {
80        assert_eq!(
81            chunk.dtype(),
82            &self.dtype,
83            "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
84            chunk.dtype(),
85            self.dtype
86        );
87        if chunk.len() > self.options.block_size {
88            vortex_bail!(
89                "Chunks passed to StatsLayoutWriter must be block_size in length, except the final block. Use RepartitionWriter to split chunks into blocks."
90            );
91        }
92        if self.final_block {
93            vortex_bail!(
94                "Cannot push chunks to StatsLayoutWriter after the final block has been written."
95            );
96        }
97        if chunk.len() < self.options.block_size {
98            self.final_block = true;
99        }
100
101        self.nblocks += 1;
102        self.stats_accumulator.push_chunk(&chunk)?;
103        self.child_writer.push_chunk(segment_writer, chunk)
104    }
105
106    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
107        self.child_writer.flush(segment_writer)
108    }
109
110    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
111        let child = self.child_writer.finish(segment_writer)?;
112
113        // Collect together the statistics
114        let Some(stats_table) = self.stats_accumulator.as_stats_table() else {
115            // If we have no stats (e.g. the DType doesn't support them), then we just return the
116            // child layout.
117            return Ok(child);
118        };
119
120        // We must defer creating the stats table LayoutWriter until now, because the DType of
121        // the table depends on which stats were successfully computed.
122        let stats_array = stats_table.array();
123        let mut stats_writer = self
124            .stats_strategy
125            .new_writer(&self.ctx, stats_array.dtype())?;
126        let stats_layout = stats_writer.push_one(segment_writer, stats_table.array().to_array())?;
127
128        let mut metadata = ByteBufferMut::empty();
129
130        // First, write the block size to the metadata.
131        let block_size = u32::try_from(self.options.block_size)?;
132        metadata.extend_from_slice(&block_size.to_le_bytes());
133
134        // Then write the bit-set of statistics.
135        metadata.extend_from_slice(&as_stat_bitset_bytes(stats_table.present_stats()));
136
137        Ok(Layout::new_owned(
138            "stats".into(),
139            LayoutVTableRef::new_ref(&StatsLayout),
140            self.dtype.clone(),
141            // We report our child data's row count, not the stats table.
142            child.row_count(),
143            vec![],
144            vec![child, stats_layout],
145            Some(metadata.freeze().into_inner()),
146        ))
147    }
148}