vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::arrays::StructArray;
8use vortex_array::compute::sum;
9use vortex_array::expr::Expression;
10use vortex_array::stats::{Precision, Stat, StatsProvider, StatsSet};
11use vortex_array::validity::Validity;
12use vortex_array::{Array, ArrayRef};
13use vortex_dtype::{DType, Nullability, PType, StructFields};
14use vortex_error::{VortexExpect, VortexResult, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::layouts::zoned::builder::{
18 MAX_IS_TRUNCATED, MIN_IS_TRUNCATED, StatsArrayBuilder, stats_builder_with_capacity,
19};
20
21#[derive(Clone)]
26pub struct ZoneMap {
27 array: StructArray,
29 stats: Arc<[Stat]>,
31}
32
33impl ZoneMap {
34 pub fn try_new(
37 column_dtype: DType,
38 array: StructArray,
39 stats: Arc<[Stat]>,
40 ) -> VortexResult<Self> {
41 let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
42 if &expected_dtype != array.dtype() {
43 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
44 }
45
46 Ok(unsafe { Self::new_unchecked(array, stats) })
48 }
49
50 pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
57 Self { array, stats }
58 }
59
60 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
62 assert!(present_stats.is_sorted(), "Stats must be sorted");
63 DType::Struct(
64 StructFields::from_iter(
65 present_stats
66 .iter()
67 .filter_map(|stat| {
68 stat.dtype(column_dtype)
69 .map(|dtype| (stat, dtype.as_nullable()))
70 })
71 .flat_map(|(s, dt)| match s {
72 Stat::Max => vec![
73 (s.name(), dt),
74 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
75 ],
76 Stat::Min => vec![
77 (s.name(), dt),
78 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
79 ],
80 _ => vec![(s.name(), dt)],
81 }),
82 ),
83 Nullability::NonNullable,
84 )
85 }
86
87 pub fn array(&self) -> &StructArray {
89 &self.array
90 }
91
92 pub fn present_stats(&self) -> &Arc<[Stat]> {
94 &self.stats
95 }
96
97 pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
99 let mut stats_set = StatsSet::default();
100 for &stat in stats {
101 let Some(array) = self.get_stat(stat)? else {
102 continue;
103 };
104
105 match stat {
107 Stat::Min | Stat::Max | Stat::Sum => {
109 if let Some(s) = array.statistics().compute_stat(stat)? {
110 stats_set.set(stat, Precision::exact(s.into_value()))
111 }
112 }
113 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
115 let sum = sum(&array)?
116 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
117 .into_value();
118 stats_set.set(stat, Precision::exact(sum));
119 }
120 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
122 }
123 }
124 Ok(stats_set)
125 }
126
127 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
129 Ok(self.array.field_by_name_opt(stat.name()).cloned())
130 }
131
132 pub fn prune(&self, predicate: &Expression) -> VortexResult<Mask> {
141 predicate
142 .evaluate(&self.array.to_array())?
143 .try_to_mask_fill_null_false()
144 }
145}
146
147pub struct StatsAccumulator {
153 builders: Vec<Box<dyn StatsArrayBuilder>>,
154 length: usize,
155}
156
157impl StatsAccumulator {
158 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
159 let builders = stats
160 .iter()
161 .filter_map(|&s| {
162 s.dtype(dtype).map(|stat_dtype| {
163 stats_builder_with_capacity(
164 s,
165 &stat_dtype.as_nullable(),
166 1024,
167 max_variable_length_statistics_size,
168 )
169 })
170 })
171 .collect::<Vec<_>>();
172
173 Self {
174 builders,
175 length: 0,
176 }
177 }
178
179 pub fn push_chunk_without_compute(&mut self, array: &dyn Array) -> VortexResult<()> {
180 for builder in self.builders.iter_mut() {
181 if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
182 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
183 } else {
184 builder.append_null();
185 }
186 }
187 self.length += 1;
188 Ok(())
189 }
190
191 pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
192 for builder in self.builders.iter_mut() {
193 if let Some(v) = array.statistics().compute_stat(builder.stat())? {
194 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
195 } else {
196 builder.append_null();
197 }
198 }
199 self.length += 1;
200 Ok(())
201 }
202
203 pub fn as_stats_table(&mut self) -> Option<ZoneMap> {
208 let mut names = Vec::new();
209 let mut fields = Vec::new();
210 let mut stats = Vec::new();
211
212 for builder in self
213 .builders
214 .iter_mut()
215 .sorted_unstable_by_key(|b| b.stat())
217 {
218 let values = builder.finish();
219
220 if values.all_invalid() {
222 continue;
223 }
224
225 stats.push(builder.stat());
226 names.extend(values.names);
227 fields.extend(values.arrays);
228 }
229
230 if names.is_empty() {
231 return None;
232 }
233
234 Some(ZoneMap {
235 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
236 .vortex_expect("Failed to create zone map"),
237 stats: stats.into(),
238 })
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use std::sync::Arc;
245
246 use itertools::Itertools;
247 use rstest::rstest;
248 use vortex_array::arrays::{BoolArray, PrimitiveArray, StructArray};
249 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
250 use vortex_array::expr::pruning::checked_pruning_expr;
251 use vortex_array::expr::{gt, gt_eq, lit, lt, root};
252 use vortex_array::stats::Stat;
253 use vortex_array::validity::Validity;
254 use vortex_array::{IntoArray, ToCanonical};
255 use vortex_buffer::{BitBuffer, buffer};
256 use vortex_dtype::{DType, FieldPath, FieldPathSet, Nullability, PType};
257 use vortex_error::{VortexExpect, VortexUnwrap};
258
259 use crate::layouts::zoned::zone_map::{StatsAccumulator, ZoneMap};
260 use crate::layouts::zoned::{MAX_IS_TRUNCATED, MIN_IS_TRUNCATED};
261
262 #[rstest]
263 #[case(DType::Utf8(Nullability::NonNullable))]
264 #[case(DType::Binary(Nullability::NonNullable))]
265 fn truncates_accumulated_stats(#[case] dtype: DType) {
266 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
267 builder.append_value("Value to be truncated");
268 builder.append_value("untruncated");
269 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
270 builder2.append_value("Another");
271 builder2.append_value("wait a minute");
272 let mut acc =
273 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
274 acc.push_chunk(&builder.finish()).vortex_unwrap();
275 acc.push_chunk(&builder2.finish()).vortex_unwrap();
276 let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
277 assert_eq!(
278 stats_table.array.names().as_ref(),
279 &[
280 Stat::Max.name(),
281 MAX_IS_TRUNCATED,
282 Stat::Min.name(),
283 MIN_IS_TRUNCATED,
284 ]
285 );
286 assert_eq!(
287 stats_table.array.fields()[1].to_bool().bit_buffer(),
288 &BitBuffer::from(vec![false, true])
289 );
290 assert_eq!(
291 stats_table.array.fields()[3].to_bool().bit_buffer(),
292 &BitBuffer::from(vec![true, false])
293 );
294 }
295
296 #[test]
297 fn always_adds_is_truncated_column() {
298 let array = buffer![0, 1, 2].into_array();
299 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
300 acc.push_chunk(&array).vortex_unwrap();
301 let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
302 assert_eq!(
303 stats_table.array.names().as_ref(),
304 &[
305 Stat::Max.name(),
306 MAX_IS_TRUNCATED,
307 Stat::Min.name(),
308 MIN_IS_TRUNCATED,
309 Stat::Sum.name(),
310 ]
311 );
312 assert_eq!(
313 stats_table.array.fields()[1].to_bool().bit_buffer(),
314 &BitBuffer::from(vec![false])
315 );
316 assert_eq!(
317 stats_table.array.fields()[3].to_bool().bit_buffer(),
318 &BitBuffer::from(vec![false])
319 );
320 }
321
322 #[rstest]
323 fn test_zone_map_prunes() {
324 let stats = FieldPathSet::from_iter([
326 FieldPath::from_iter([Stat::Min.name().into()]),
327 FieldPath::from_iter([Stat::Max.name().into()]),
328 ]);
329
330 let zone_map = ZoneMap::try_new(
342 PType::I32.into(),
343 StructArray::from_fields(&[
344 (
345 "max",
346 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
347 ),
348 (
349 "max_is_truncated",
350 BoolArray::from_iter([false, false, false]).into_array(),
351 ),
352 (
353 "min",
354 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
355 ),
356 (
357 "min_is_truncated",
358 BoolArray::from_iter([false, false, false]).into_array(),
359 ),
360 ])
361 .unwrap(),
362 Arc::new([Stat::Max, Stat::Min]),
363 )
364 .unwrap();
365
366 let expr = gt_eq(root(), lit(6i32));
369 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
370 let mask = zone_map.prune(&pruning_expr).unwrap();
371 assert_eq!(
372 mask.to_bit_buffer().into_iter().collect_vec(),
373 vec![true, false, false]
374 );
375
376 let expr = gt(root(), lit(5i32));
379 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
380 let mask = zone_map.prune(&pruning_expr).unwrap();
381 assert_eq!(
382 mask.to_bit_buffer().into_iter().collect_vec(),
383 vec![true, false, false]
384 );
385
386 let expr = lt(root(), lit(2i32));
389 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
390 let mask = zone_map.prune(&pruning_expr).unwrap();
391 assert_eq!(
392 mask.to_bit_buffer().into_iter().collect_vec(),
393 vec![false, true, true]
394 );
395 }
396}