vortex_layout/layouts/stats/
writer.rs1use std::sync::Arc;
2
3use arcref::ArcRef;
4use itertools::Itertools;
5use vortex_array::stats::{PRUNING_STATS, Stat, as_stat_bitset_bytes};
6use vortex_array::{Array, ArrayContext, ArrayRef};
7use vortex_buffer::ByteBufferMut;
8use vortex_dtype::DType;
9use vortex_error::{VortexResult, vortex_bail};
10
11use crate::data::Layout;
12use crate::layouts::stats::StatsLayout;
13use crate::layouts::stats::stats_table::StatsAccumulator;
14use crate::segments::SegmentWriter;
15use crate::writer::{LayoutWriter, LayoutWriterExt};
16use crate::{LayoutStrategy, LayoutVTableRef};
17
18pub struct StatsLayoutOptions {
19 pub block_size: usize,
21 pub stats: Arc<[Stat]>,
23}
24
25impl Default for StatsLayoutOptions {
26 fn default() -> Self {
27 Self {
28 block_size: 8192,
29 stats: PRUNING_STATS.into(),
30 }
31 }
32}
33
34pub struct StatsLayoutWriter {
35 ctx: ArrayContext,
36 options: StatsLayoutOptions,
37 child_writer: Box<dyn LayoutWriter>,
38 stats_strategy: ArcRef<dyn LayoutStrategy>,
39 stats_accumulator: StatsAccumulator,
40 dtype: DType,
41
42 nblocks: usize,
43 final_block: bool,
45}
46
47impl StatsLayoutWriter {
48 pub fn new(
49 ctx: ArrayContext,
50 dtype: &DType,
51 child_writer: Box<dyn LayoutWriter>,
55 stats_strategy: ArcRef<dyn LayoutStrategy>,
56 options: StatsLayoutOptions,
57 ) -> Self {
58 let present_stats: Arc<[Stat]> = options.stats.iter().sorted().copied().collect();
59 let stats_accumulator = StatsAccumulator::new(dtype.clone(), &present_stats);
60
61 Self {
62 ctx,
63 options,
64 child_writer,
65 stats_strategy,
66 stats_accumulator,
67 dtype: dtype.clone(),
68 nblocks: 0,
69 final_block: false,
70 }
71 }
72}
73
74impl LayoutWriter for StatsLayoutWriter {
75 fn push_chunk(
76 &mut self,
77 segment_writer: &mut dyn SegmentWriter,
78 chunk: ArrayRef,
79 ) -> VortexResult<()> {
80 assert_eq!(
81 chunk.dtype(),
82 &self.dtype,
83 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
84 chunk.dtype(),
85 self.dtype
86 );
87 if chunk.len() > self.options.block_size {
88 vortex_bail!(
89 "Chunks passed to StatsLayoutWriter must be block_size in length, except the final block. Use RepartitionWriter to split chunks into blocks."
90 );
91 }
92 if self.final_block {
93 vortex_bail!(
94 "Cannot push chunks to StatsLayoutWriter after the final block has been written."
95 );
96 }
97 if chunk.len() < self.options.block_size {
98 self.final_block = true;
99 }
100
101 self.nblocks += 1;
102 self.stats_accumulator.push_chunk(&chunk)?;
103 self.child_writer.push_chunk(segment_writer, chunk)
104 }
105
106 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
107 self.child_writer.flush(segment_writer)
108 }
109
110 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
111 let child = self.child_writer.finish(segment_writer)?;
112
113 let Some(stats_table) = self.stats_accumulator.as_stats_table() else {
115 return Ok(child);
118 };
119
120 let stats_array = stats_table.array();
123 let mut stats_writer = self
124 .stats_strategy
125 .new_writer(&self.ctx, stats_array.dtype())?;
126 let stats_layout = stats_writer.push_one(segment_writer, stats_table.array().to_array())?;
127
128 let mut metadata = ByteBufferMut::empty();
129
130 let block_size = u32::try_from(self.options.block_size)?;
132 metadata.extend_from_slice(&block_size.to_le_bytes());
133
134 metadata.extend_from_slice(&as_stat_bitset_bytes(stats_table.present_stats()));
136
137 Ok(Layout::new_owned(
138 "stats".into(),
139 LayoutVTableRef::new_ref(&StatsLayout),
140 self.dtype.clone(),
141 child.row_count(),
143 vec![],
144 vec![child, stats_layout],
145 Some(metadata.freeze().into_inner()),
146 ))
147 }
148}