vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::ArrayRef;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::VortexSessionExecute;
11use vortex_array::aggregate_fn::fns::sum::sum;
12use vortex_array::arrays::StructArray;
13use vortex_array::arrays::struct_::StructArrayExt;
14use vortex_array::dtype::DType;
15use vortex_array::dtype::Nullability;
16use vortex_array::dtype::PType;
17use vortex_array::dtype::StructFields;
18use vortex_array::expr::Expression;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::expr::stats::StatsProvider;
22use vortex_array::stats::StatsSet;
23use vortex_array::validity::Validity;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_mask::Mask;
28use vortex_session::VortexSession;
29
30use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
31use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
32use crate::layouts::zoned::builder::StatsArrayBuilder;
33use crate::layouts::zoned::builder::stats_builder_with_capacity;
34
35#[derive(Clone)]
40pub struct ZoneMap {
41 array: StructArray,
43 stats: Arc<[Stat]>,
45}
46
47impl ZoneMap {
48 pub fn try_new(
51 column_dtype: DType,
52 array: StructArray,
53 stats: Arc<[Stat]>,
54 ) -> VortexResult<Self> {
55 let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
56 if &expected_dtype != array.dtype() {
57 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
58 }
59
60 Ok(unsafe { Self::new_unchecked(array, stats) })
62 }
63
64 pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
71 Self { array, stats }
72 }
73
74 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
76 assert!(present_stats.is_sorted(), "Stats must be sorted");
77 DType::Struct(
78 StructFields::from_iter(
79 present_stats
80 .iter()
81 .filter_map(|stat| {
82 stat.dtype(column_dtype)
83 .or_else(|| {
84 if let DType::Extension(ext) = column_dtype {
87 stat.dtype(ext.storage_dtype())
88 } else {
89 None
90 }
91 })
92 .map(|dtype| (stat, dtype.as_nullable()))
93 })
94 .flat_map(|(s, dt)| match s {
95 Stat::Max => vec![
96 (s.name(), dt),
97 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
98 ],
99 Stat::Min => vec![
100 (s.name(), dt),
101 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
102 ],
103 _ => vec![(s.name(), dt)],
104 }),
105 ),
106 Nullability::NonNullable,
107 )
108 }
109
110 pub fn array(&self) -> &StructArray {
112 &self.array
113 }
114
115 pub fn present_stats(&self) -> &Arc<[Stat]> {
117 &self.stats
118 }
119
120 pub fn to_stats_set(&self, stats: &[Stat], ctx: &mut ExecutionCtx) -> VortexResult<StatsSet> {
122 let mut stats_set = StatsSet::default();
123 for &stat in stats {
124 let Some(array) = self.get_stat(stat)? else {
125 continue;
126 };
127
128 match stat {
130 Stat::Min | Stat::Max | Stat::Sum => {
132 if let Some(s) = array.statistics().compute_stat(stat)?
133 && let Some(v) = s.into_value()
134 {
135 stats_set.set(stat, Precision::exact(v))
136 }
137 }
138 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
140 if let Some(sum_value) = sum(&array, ctx)?
141 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
142 .into_value()
143 {
144 stats_set.set(stat, Precision::exact(sum_value));
145 }
146 }
147 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
149 }
150 }
151 Ok(stats_set)
152 }
153
154 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
156 Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
157 }
158
159 pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
168 let mut ctx = session.create_execution_ctx();
169 self.array
170 .clone()
171 .into_array()
172 .apply(predicate)?
173 .execute::<Mask>(&mut ctx)
174 }
175}
176
177pub struct StatsAccumulator {
183 builders: Vec<Box<dyn StatsArrayBuilder>>,
184 length: usize,
185}
186
187impl StatsAccumulator {
188 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
189 let builders = stats
190 .iter()
191 .filter_map(|&s| {
192 s.dtype(dtype).map(|stat_dtype| {
193 stats_builder_with_capacity(
194 s,
195 &stat_dtype.as_nullable(),
196 1024,
197 max_variable_length_statistics_size,
198 )
199 })
200 })
201 .collect::<Vec<_>>();
202
203 Self {
204 builders,
205 length: 0,
206 }
207 }
208
209 pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
210 for builder in self.builders.iter_mut() {
211 if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
212 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
213 } else {
214 builder.append_null();
215 }
216 }
217 self.length += 1;
218 Ok(())
219 }
220
221 pub fn push_chunk(&mut self, array: &ArrayRef) -> VortexResult<()> {
222 for builder in self.builders.iter_mut() {
223 if let Some(v) = array.statistics().compute_stat(builder.stat())? {
224 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
225 } else {
226 builder.append_null();
227 }
228 }
229 self.length += 1;
230 Ok(())
231 }
232
233 pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
238 let mut names = Vec::new();
239 let mut fields = Vec::new();
240 let mut stats = Vec::new();
241
242 for builder in self
243 .builders
244 .iter_mut()
245 .sorted_unstable_by_key(|b| b.stat())
247 {
248 let values = builder.finish();
249
250 if values.all_invalid()? {
252 continue;
253 }
254
255 stats.push(builder.stat());
256 names.extend(values.names);
257 fields.extend(values.arrays);
258 }
259
260 if names.is_empty() {
261 return Ok(None);
262 }
263
264 Ok(Some(ZoneMap {
265 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
266 .vortex_expect("Failed to create zone map"),
267 stats: stats.into(),
268 }))
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::sync::Arc;
275
276 use rstest::rstest;
277 use vortex_array::IntoArray;
278 use vortex_array::ToCanonical;
279 use vortex_array::arrays::BoolArray;
280 use vortex_array::arrays::PrimitiveArray;
281 use vortex_array::arrays::StructArray;
282 use vortex_array::arrays::bool::BoolArrayExt;
283 use vortex_array::arrays::struct_::StructArrayExt;
284 use vortex_array::assert_arrays_eq;
285 use vortex_array::builders::ArrayBuilder;
286 use vortex_array::builders::VarBinViewBuilder;
287 use vortex_array::dtype::DType;
288 use vortex_array::dtype::FieldPath;
289 use vortex_array::dtype::FieldPathSet;
290 use vortex_array::dtype::Nullability;
291 use vortex_array::dtype::PType;
292 use vortex_array::expr::gt;
293 use vortex_array::expr::gt_eq;
294 use vortex_array::expr::lit;
295 use vortex_array::expr::lt;
296 use vortex_array::expr::pruning::checked_pruning_expr;
297 use vortex_array::expr::root;
298 use vortex_array::expr::stats::Stat;
299 use vortex_array::validity::Validity;
300 use vortex_buffer::BitBuffer;
301 use vortex_buffer::buffer;
302 use vortex_error::VortexExpect;
303
304 use crate::layouts::zoned::MAX_IS_TRUNCATED;
305 use crate::layouts::zoned::MIN_IS_TRUNCATED;
306 use crate::layouts::zoned::zone_map::StatsAccumulator;
307 use crate::layouts::zoned::zone_map::ZoneMap;
308 use crate::test::SESSION;
309
310 #[rstest]
311 #[case(DType::Utf8(Nullability::NonNullable))]
312 #[case(DType::Binary(Nullability::NonNullable))]
313 fn truncates_accumulated_stats(#[case] dtype: DType) {
314 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
315 builder.append_value("Value to be truncated");
316 builder.append_value("untruncated");
317 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
318 builder2.append_value("Another");
319 builder2.append_value("wait a minute");
320 let mut acc =
321 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
322 acc.push_chunk(&builder.finish())
323 .vortex_expect("push_chunk should succeed for test data");
324 acc.push_chunk(&builder2.finish())
325 .vortex_expect("push_chunk should succeed for test data");
326 let stats_table = acc
327 .as_stats_table()
328 .unwrap()
329 .expect("Must have stats table");
330 assert_eq!(
331 stats_table.array.names().as_ref(),
332 &[
333 Stat::Max.name(),
334 MAX_IS_TRUNCATED,
335 Stat::Min.name(),
336 MIN_IS_TRUNCATED,
337 ]
338 );
339 assert_eq!(
340 stats_table
341 .array
342 .unmasked_field(1)
343 .to_bool()
344 .to_bit_buffer(),
345 BitBuffer::from(vec![false, true])
346 );
347 assert_eq!(
348 stats_table
349 .array
350 .unmasked_field(3)
351 .to_bool()
352 .to_bit_buffer(),
353 BitBuffer::from(vec![true, false])
354 );
355 }
356
357 #[test]
358 fn always_adds_is_truncated_column() {
359 let array = buffer![0, 1, 2].into_array();
360 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
361 acc.push_chunk(&array)
362 .vortex_expect("push_chunk should succeed for test array");
363 let stats_table = acc
364 .as_stats_table()
365 .unwrap()
366 .expect("Must have stats table");
367 assert_eq!(
368 stats_table.array.names().as_ref(),
369 &[
370 Stat::Max.name(),
371 MAX_IS_TRUNCATED,
372 Stat::Min.name(),
373 MIN_IS_TRUNCATED,
374 Stat::Sum.name(),
375 ]
376 );
377 assert_eq!(
378 stats_table
379 .array
380 .unmasked_field(1)
381 .to_bool()
382 .to_bit_buffer(),
383 BitBuffer::from(vec![false])
384 );
385 assert_eq!(
386 stats_table
387 .array
388 .unmasked_field(3)
389 .to_bool()
390 .to_bit_buffer(),
391 BitBuffer::from(vec![false])
392 );
393 }
394
395 #[rstest]
396 fn test_zone_map_prunes() {
397 let stats = FieldPathSet::from_iter([
399 FieldPath::from_iter([Stat::Min.name().into()]),
400 FieldPath::from_iter([Stat::Max.name().into()]),
401 ]);
402
403 let zone_map = ZoneMap::try_new(
415 PType::I32.into(),
416 StructArray::from_fields(&[
417 (
418 "max",
419 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
420 ),
421 (
422 "max_is_truncated",
423 BoolArray::from_iter([false, false, false]).into_array(),
424 ),
425 (
426 "min",
427 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
428 ),
429 (
430 "min_is_truncated",
431 BoolArray::from_iter([false, false, false]).into_array(),
432 ),
433 ])
434 .unwrap(),
435 Arc::new([Stat::Max, Stat::Min]),
436 )
437 .unwrap();
438
439 let expr = gt_eq(root(), lit(6i32));
442 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
443 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
444 assert_arrays_eq!(
445 mask.into_array(),
446 BoolArray::from_iter([true, false, false])
447 );
448
449 let expr = gt(root(), lit(5i32));
452 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
453 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
454 assert_arrays_eq!(
455 mask.into_array(),
456 BoolArray::from_iter([true, false, false])
457 );
458
459 let expr = lt(root(), lit(2i32));
462 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
463 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
464 assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
465 }
466}