vortex_layout/layouts/stats/
writer.rs1use std::sync::Arc;
2
3use itertools::Itertools;
4use vortex_array::arcref::ArcRef;
5use vortex_array::stats::{PRUNING_STATS, Stat, as_stat_bitset_bytes};
6use vortex_array::{ArrayContext, ArrayRef};
7use vortex_buffer::ByteBufferMut;
8use vortex_dtype::DType;
9use vortex_error::{VortexResult, vortex_bail};
10
11use crate::data::Layout;
12use crate::layouts::stats::StatsLayout;
13use crate::layouts::stats::stats_table::StatsAccumulator;
14use crate::segments::SegmentWriter;
15use crate::writer::{LayoutWriter, LayoutWriterExt};
16use crate::{LayoutStrategy, LayoutVTableRef};
17
18pub struct StatsLayoutOptions {
19 pub block_size: usize,
21 pub stats: Arc<[Stat]>,
23}
24
25impl Default for StatsLayoutOptions {
26 fn default() -> Self {
27 Self {
28 block_size: 8192,
29 stats: PRUNING_STATS.into(),
30 }
31 }
32}
33
34pub struct StatsLayoutWriter {
35 ctx: ArrayContext,
36 options: StatsLayoutOptions,
37 child_writer: Box<dyn LayoutWriter>,
38 stats_strategy: ArcRef<dyn LayoutStrategy>,
39 stats_accumulator: StatsAccumulator,
40 dtype: DType,
41
42 nblocks: usize,
43 final_block: bool,
45}
46
47impl StatsLayoutWriter {
48 pub fn new(
49 ctx: ArrayContext,
50 dtype: &DType,
51 child_writer: Box<dyn LayoutWriter>,
55 stats_strategy: ArcRef<dyn LayoutStrategy>,
56 options: StatsLayoutOptions,
57 ) -> Self {
58 let present_stats: Arc<[Stat]> = options.stats.iter().sorted().copied().collect();
59 let stats_accumulator = StatsAccumulator::new(dtype.clone(), &present_stats);
60
61 Self {
62 ctx,
63 options,
64 child_writer,
65 stats_strategy,
66 stats_accumulator,
67 dtype: dtype.clone(),
68 nblocks: 0,
69 final_block: false,
70 }
71 }
72}
73
74impl LayoutWriter for StatsLayoutWriter {
75 fn push_chunk(
76 &mut self,
77 segment_writer: &mut dyn SegmentWriter,
78 chunk: ArrayRef,
79 ) -> VortexResult<()> {
80 if chunk.len() > self.options.block_size {
81 vortex_bail!(
82 "Chunks passed to StatsLayoutWriter must be block_size in length, except the final block. Use RepartitionWriter to split chunks into blocks."
83 );
84 }
85 if self.final_block {
86 vortex_bail!(
87 "Cannot push chunks to StatsLayoutWriter after the final block has been written."
88 );
89 }
90 if chunk.len() < self.options.block_size {
91 self.final_block = true;
92 }
93
94 self.nblocks += 1;
95 self.stats_accumulator.push_chunk(&chunk)?;
96 self.child_writer.push_chunk(segment_writer, chunk)
97 }
98
99 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
100 self.child_writer.flush(segment_writer)
101 }
102
103 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
104 let child = self.child_writer.finish(segment_writer)?;
105
106 let Some(stats_table) = self.stats_accumulator.as_stats_table() else {
108 return Ok(child);
111 };
112
113 let stats_array = stats_table.array();
116 let mut stats_writer = self
117 .stats_strategy
118 .new_writer(&self.ctx, stats_array.dtype())?;
119 let stats_layout = stats_writer.push_one(segment_writer, stats_table.array().clone())?;
120
121 let mut metadata = ByteBufferMut::empty();
122
123 let block_size = u32::try_from(self.options.block_size)?;
125 metadata.extend_from_slice(&block_size.to_le_bytes());
126
127 metadata.extend_from_slice(&as_stat_bitset_bytes(stats_table.present_stats()));
129
130 Ok(Layout::new_owned(
131 "stats".into(),
132 LayoutVTableRef::new_ref(&StatsLayout),
133 self.dtype.clone(),
134 child.row_count(),
136 vec![],
137 vec![child, stats_layout],
138 Some(metadata.freeze().into_inner()),
139 ))
140 }
141}