vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::ArrayRef;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::LEGACY_SESSION;
11use vortex_array::VortexSessionExecute;
12use vortex_array::aggregate_fn::fns::sum::sum;
13use vortex_array::arrays::StructArray;
14use vortex_array::arrays::struct_::StructArrayExt;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::Nullability;
17use vortex_array::dtype::PType;
18use vortex_array::dtype::StructFields;
19use vortex_array::expr::Expression;
20use vortex_array::expr::stats::Precision;
21use vortex_array::expr::stats::Stat;
22use vortex_array::expr::stats::StatsProvider;
23use vortex_array::stats::StatsSet;
24use vortex_array::validity::Validity;
25use vortex_error::VortexExpect;
26use vortex_error::VortexResult;
27use vortex_error::vortex_bail;
28use vortex_mask::Mask;
29use vortex_session::VortexSession;
30
31use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
32use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
33use crate::layouts::zoned::builder::StatsArrayBuilder;
34use crate::layouts::zoned::builder::stats_builder_with_capacity;
35
36#[derive(Clone)]
41pub struct ZoneMap {
42 array: StructArray,
44 stats: Arc<[Stat]>,
46}
47
48impl ZoneMap {
49 pub fn try_new(
52 column_dtype: DType,
53 array: StructArray,
54 stats: Arc<[Stat]>,
55 ) -> VortexResult<Self> {
56 let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
57 if &expected_dtype != array.dtype() {
58 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
59 }
60
61 Ok(unsafe { Self::new_unchecked(array, stats) })
63 }
64
65 pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
72 Self { array, stats }
73 }
74
75 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
77 assert!(present_stats.is_sorted(), "Stats must be sorted");
78 DType::Struct(
79 StructFields::from_iter(
80 present_stats
81 .iter()
82 .filter_map(|stat| {
83 stat.dtype(column_dtype)
84 .or_else(|| {
85 if let DType::Extension(ext) = column_dtype {
88 stat.dtype(ext.storage_dtype())
89 } else {
90 None
91 }
92 })
93 .map(|dtype| (stat, dtype.as_nullable()))
94 })
95 .flat_map(|(s, dt)| match s {
96 Stat::Max => vec![
97 (s.name(), dt),
98 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
99 ],
100 Stat::Min => vec![
101 (s.name(), dt),
102 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
103 ],
104 _ => vec![(s.name(), dt)],
105 }),
106 ),
107 Nullability::NonNullable,
108 )
109 }
110
111 pub fn array(&self) -> &StructArray {
113 &self.array
114 }
115
116 pub fn present_stats(&self) -> &Arc<[Stat]> {
118 &self.stats
119 }
120
121 pub fn to_stats_set(&self, stats: &[Stat], ctx: &mut ExecutionCtx) -> VortexResult<StatsSet> {
123 let mut stats_set = StatsSet::default();
124 for &stat in stats {
125 let Some(array) = self.get_stat(stat)? else {
126 continue;
127 };
128
129 match stat {
131 Stat::Min | Stat::Max | Stat::Sum => {
133 if let Some(s) = array.statistics().compute_stat(stat, ctx)?
134 && let Some(v) = s.into_value()
135 {
136 stats_set.set(stat, Precision::exact(v))
137 }
138 }
139 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
141 if let Some(sum_value) = sum(&array, ctx)?
142 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
143 .into_value()
144 {
145 stats_set.set(stat, Precision::exact(sum_value));
146 }
147 }
148 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
150 }
151 }
152 Ok(stats_set)
153 }
154
155 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
157 Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
158 }
159
160 pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
169 let mut ctx = session.create_execution_ctx();
170 self.array
171 .clone()
172 .into_array()
173 .apply(predicate)?
174 .execute::<Mask>(&mut ctx)
175 }
176}
177
178pub struct StatsAccumulator {
184 builders: Vec<Box<dyn StatsArrayBuilder>>,
185 length: usize,
186}
187
188impl StatsAccumulator {
189 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
190 let builders = stats
191 .iter()
192 .filter_map(|&s| {
193 s.dtype(dtype).map(|stat_dtype| {
194 stats_builder_with_capacity(
195 s,
196 &stat_dtype.as_nullable(),
197 1024,
198 max_variable_length_statistics_size,
199 )
200 })
201 })
202 .collect::<Vec<_>>();
203
204 Self {
205 builders,
206 length: 0,
207 }
208 }
209
210 pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
211 for builder in self.builders.iter_mut() {
212 if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
213 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
214 } else {
215 builder.append_null();
216 }
217 }
218 self.length += 1;
219 Ok(())
220 }
221
222 pub fn push_chunk(&mut self, array: &ArrayRef) -> VortexResult<()> {
223 for builder in self.builders.iter_mut() {
224 if let Some(v) = array
225 .statistics()
226 .compute_stat(builder.stat(), &mut LEGACY_SESSION.create_execution_ctx())?
227 {
228 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
229 } else {
230 builder.append_null();
231 }
232 }
233 self.length += 1;
234 Ok(())
235 }
236
237 pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
242 let mut names = Vec::new();
243 let mut fields = Vec::new();
244 let mut stats = Vec::new();
245
246 for builder in self
247 .builders
248 .iter_mut()
249 .sorted_unstable_by_key(|b| b.stat())
251 {
252 let values = builder.finish();
253
254 if values.all_invalid()? {
256 continue;
257 }
258
259 stats.push(builder.stat());
260 names.extend(values.names);
261 fields.extend(values.arrays);
262 }
263
264 if names.is_empty() {
265 return Ok(None);
266 }
267
268 Ok(Some(ZoneMap {
269 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
270 .vortex_expect("Failed to create zone map"),
271 stats: stats.into(),
272 }))
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use std::sync::Arc;
279
280 use rstest::rstest;
281 use vortex_array::IntoArray;
282 use vortex_array::ToCanonical;
283 use vortex_array::arrays::BoolArray;
284 use vortex_array::arrays::PrimitiveArray;
285 use vortex_array::arrays::StructArray;
286 use vortex_array::arrays::bool::BoolArrayExt;
287 use vortex_array::arrays::struct_::StructArrayExt;
288 use vortex_array::assert_arrays_eq;
289 use vortex_array::builders::ArrayBuilder;
290 use vortex_array::builders::VarBinViewBuilder;
291 use vortex_array::dtype::DType;
292 use vortex_array::dtype::FieldPath;
293 use vortex_array::dtype::FieldPathSet;
294 use vortex_array::dtype::Nullability;
295 use vortex_array::dtype::PType;
296 use vortex_array::expr::gt;
297 use vortex_array::expr::gt_eq;
298 use vortex_array::expr::lit;
299 use vortex_array::expr::lt;
300 use vortex_array::expr::pruning::checked_pruning_expr;
301 use vortex_array::expr::root;
302 use vortex_array::expr::stats::Stat;
303 use vortex_array::validity::Validity;
304 use vortex_buffer::BitBuffer;
305 use vortex_buffer::buffer;
306 use vortex_error::VortexExpect;
307
308 use crate::layouts::zoned::MAX_IS_TRUNCATED;
309 use crate::layouts::zoned::MIN_IS_TRUNCATED;
310 use crate::layouts::zoned::zone_map::StatsAccumulator;
311 use crate::layouts::zoned::zone_map::ZoneMap;
312 use crate::test::SESSION;
313
314 #[rstest]
315 #[case(DType::Utf8(Nullability::NonNullable))]
316 #[case(DType::Binary(Nullability::NonNullable))]
317 fn truncates_accumulated_stats(#[case] dtype: DType) {
318 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
319 builder.append_value("Value to be truncated");
320 builder.append_value("untruncated");
321 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
322 builder2.append_value("Another");
323 builder2.append_value("wait a minute");
324 let mut acc =
325 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
326 acc.push_chunk(&builder.finish())
327 .vortex_expect("push_chunk should succeed for test data");
328 acc.push_chunk(&builder2.finish())
329 .vortex_expect("push_chunk should succeed for test data");
330 let stats_table = acc
331 .as_stats_table()
332 .unwrap()
333 .expect("Must have stats table");
334 assert_eq!(
335 stats_table.array.names().as_ref(),
336 &[
337 Stat::Max.name(),
338 MAX_IS_TRUNCATED,
339 Stat::Min.name(),
340 MIN_IS_TRUNCATED,
341 ]
342 );
343 assert_eq!(
344 stats_table
345 .array
346 .unmasked_field(1)
347 .to_bool()
348 .to_bit_buffer(),
349 BitBuffer::from(vec![false, true])
350 );
351 assert_eq!(
352 stats_table
353 .array
354 .unmasked_field(3)
355 .to_bool()
356 .to_bit_buffer(),
357 BitBuffer::from(vec![true, false])
358 );
359 }
360
361 #[test]
362 fn always_adds_is_truncated_column() {
363 let array = buffer![0, 1, 2].into_array();
364 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
365 acc.push_chunk(&array)
366 .vortex_expect("push_chunk should succeed for test array");
367 let stats_table = acc
368 .as_stats_table()
369 .unwrap()
370 .expect("Must have stats table");
371 assert_eq!(
372 stats_table.array.names().as_ref(),
373 &[
374 Stat::Max.name(),
375 MAX_IS_TRUNCATED,
376 Stat::Min.name(),
377 MIN_IS_TRUNCATED,
378 Stat::Sum.name(),
379 ]
380 );
381 assert_eq!(
382 stats_table
383 .array
384 .unmasked_field(1)
385 .to_bool()
386 .to_bit_buffer(),
387 BitBuffer::from(vec![false])
388 );
389 assert_eq!(
390 stats_table
391 .array
392 .unmasked_field(3)
393 .to_bool()
394 .to_bit_buffer(),
395 BitBuffer::from(vec![false])
396 );
397 }
398
399 #[rstest]
400 fn test_zone_map_prunes() {
401 let stats = FieldPathSet::from_iter([
403 FieldPath::from_iter([Stat::Min.name().into()]),
404 FieldPath::from_iter([Stat::Max.name().into()]),
405 ]);
406
407 let zone_map = ZoneMap::try_new(
419 PType::I32.into(),
420 StructArray::from_fields(&[
421 (
422 "max",
423 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
424 ),
425 (
426 "max_is_truncated",
427 BoolArray::from_iter([false, false, false]).into_array(),
428 ),
429 (
430 "min",
431 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
432 ),
433 (
434 "min_is_truncated",
435 BoolArray::from_iter([false, false, false]).into_array(),
436 ),
437 ])
438 .unwrap(),
439 Arc::new([Stat::Max, Stat::Min]),
440 )
441 .unwrap();
442
443 let expr = gt_eq(root(), lit(6i32));
446 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
447 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
448 assert_arrays_eq!(
449 mask.into_array(),
450 BoolArray::from_iter([true, false, false])
451 );
452
453 let expr = gt(root(), lit(5i32));
456 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
457 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
458 assert_arrays_eq!(
459 mask.into_array(),
460 BoolArray::from_iter([true, false, false])
461 );
462
463 let expr = lt(root(), lit(2i32));
466 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
467 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
468 assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
469 }
470}