vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::ArrayRef;
8use vortex_array::DynArray;
9use vortex_array::ExecutionCtx;
10use vortex_array::IntoArray;
11use vortex_array::VortexSessionExecute;
12use vortex_array::aggregate_fn::fns::sum::sum;
13use vortex_array::arrays::StructArray;
14use vortex_array::dtype::DType;
15use vortex_array::dtype::Nullability;
16use vortex_array::dtype::PType;
17use vortex_array::dtype::StructFields;
18use vortex_array::expr::Expression;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::expr::stats::StatsProvider;
22use vortex_array::stats::StatsSet;
23use vortex_array::validity::Validity;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_mask::Mask;
28use vortex_session::VortexSession;
29
30use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
31use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
32use crate::layouts::zoned::builder::StatsArrayBuilder;
33use crate::layouts::zoned::builder::stats_builder_with_capacity;
34
35#[derive(Clone)]
40pub struct ZoneMap {
41 array: StructArray,
43 stats: Arc<[Stat]>,
45}
46
47impl ZoneMap {
48 pub fn try_new(
51 column_dtype: DType,
52 array: StructArray,
53 stats: Arc<[Stat]>,
54 ) -> VortexResult<Self> {
55 let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
56 if &expected_dtype != array.dtype() {
57 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
58 }
59
60 Ok(unsafe { Self::new_unchecked(array, stats) })
62 }
63
64 pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
71 Self { array, stats }
72 }
73
74 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
76 assert!(present_stats.is_sorted(), "Stats must be sorted");
77 DType::Struct(
78 StructFields::from_iter(
79 present_stats
80 .iter()
81 .filter_map(|stat| {
82 stat.dtype(column_dtype)
83 .or_else(|| {
84 if let DType::Extension(ext) = column_dtype {
87 stat.dtype(ext.storage_dtype())
88 } else {
89 None
90 }
91 })
92 .map(|dtype| (stat, dtype.as_nullable()))
93 })
94 .flat_map(|(s, dt)| match s {
95 Stat::Max => vec![
96 (s.name(), dt),
97 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
98 ],
99 Stat::Min => vec![
100 (s.name(), dt),
101 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
102 ],
103 _ => vec![(s.name(), dt)],
104 }),
105 ),
106 Nullability::NonNullable,
107 )
108 }
109
110 pub fn array(&self) -> &StructArray {
112 &self.array
113 }
114
115 pub fn present_stats(&self) -> &Arc<[Stat]> {
117 &self.stats
118 }
119
120 pub fn to_stats_set(&self, stats: &[Stat], ctx: &mut ExecutionCtx) -> VortexResult<StatsSet> {
122 let mut stats_set = StatsSet::default();
123 for &stat in stats {
124 let Some(array) = self.get_stat(stat)? else {
125 continue;
126 };
127
128 match stat {
130 Stat::Min | Stat::Max | Stat::Sum => {
132 if let Some(s) = array.statistics().compute_stat(stat)?
133 && let Some(v) = s.into_value()
134 {
135 stats_set.set(stat, Precision::exact(v))
136 }
137 }
138 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
140 if let Some(sum_value) = sum(&array, ctx)?
141 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
142 .into_value()
143 {
144 stats_set.set(stat, Precision::exact(sum_value));
145 }
146 }
147 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
149 }
150 }
151 Ok(stats_set)
152 }
153
154 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
156 Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
157 }
158
159 pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
168 let mut ctx = session.create_execution_ctx();
169 self.array
170 .clone()
171 .into_array()
172 .apply(predicate)?
173 .execute::<Mask>(&mut ctx)
174 }
175}
176
177pub struct StatsAccumulator {
183 builders: Vec<Box<dyn StatsArrayBuilder>>,
184 length: usize,
185}
186
187impl StatsAccumulator {
188 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
189 let builders = stats
190 .iter()
191 .filter_map(|&s| {
192 s.dtype(dtype).map(|stat_dtype| {
193 stats_builder_with_capacity(
194 s,
195 &stat_dtype.as_nullable(),
196 1024,
197 max_variable_length_statistics_size,
198 )
199 })
200 })
201 .collect::<Vec<_>>();
202
203 Self {
204 builders,
205 length: 0,
206 }
207 }
208
209 pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
210 for builder in self.builders.iter_mut() {
211 if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
212 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
213 } else {
214 builder.append_null();
215 }
216 }
217 self.length += 1;
218 Ok(())
219 }
220
221 pub fn push_chunk(&mut self, array: &ArrayRef) -> VortexResult<()> {
222 for builder in self.builders.iter_mut() {
223 if let Some(v) = array.statistics().compute_stat(builder.stat())? {
224 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
225 } else {
226 builder.append_null();
227 }
228 }
229 self.length += 1;
230 Ok(())
231 }
232
233 pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
238 let mut names = Vec::new();
239 let mut fields = Vec::new();
240 let mut stats = Vec::new();
241
242 for builder in self
243 .builders
244 .iter_mut()
245 .sorted_unstable_by_key(|b| b.stat())
247 {
248 let values = builder.finish();
249
250 if values.all_invalid()? {
252 continue;
253 }
254
255 stats.push(builder.stat());
256 names.extend(values.names);
257 fields.extend(values.arrays);
258 }
259
260 if names.is_empty() {
261 return Ok(None);
262 }
263
264 Ok(Some(ZoneMap {
265 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
266 .vortex_expect("Failed to create zone map"),
267 stats: stats.into(),
268 }))
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use std::sync::Arc;
275
276 use rstest::rstest;
277 use vortex_array::IntoArray;
278 use vortex_array::ToCanonical;
279 use vortex_array::arrays::BoolArray;
280 use vortex_array::arrays::PrimitiveArray;
281 use vortex_array::arrays::StructArray;
282 use vortex_array::assert_arrays_eq;
283 use vortex_array::builders::ArrayBuilder;
284 use vortex_array::builders::VarBinViewBuilder;
285 use vortex_array::dtype::DType;
286 use vortex_array::dtype::FieldPath;
287 use vortex_array::dtype::FieldPathSet;
288 use vortex_array::dtype::Nullability;
289 use vortex_array::dtype::PType;
290 use vortex_array::expr::gt;
291 use vortex_array::expr::gt_eq;
292 use vortex_array::expr::lit;
293 use vortex_array::expr::lt;
294 use vortex_array::expr::pruning::checked_pruning_expr;
295 use vortex_array::expr::root;
296 use vortex_array::expr::stats::Stat;
297 use vortex_array::validity::Validity;
298 use vortex_buffer::BitBuffer;
299 use vortex_buffer::buffer;
300 use vortex_error::VortexExpect;
301
302 use crate::layouts::zoned::MAX_IS_TRUNCATED;
303 use crate::layouts::zoned::MIN_IS_TRUNCATED;
304 use crate::layouts::zoned::zone_map::StatsAccumulator;
305 use crate::layouts::zoned::zone_map::ZoneMap;
306 use crate::test::SESSION;
307
308 #[rstest]
309 #[case(DType::Utf8(Nullability::NonNullable))]
310 #[case(DType::Binary(Nullability::NonNullable))]
311 fn truncates_accumulated_stats(#[case] dtype: DType) {
312 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
313 builder.append_value("Value to be truncated");
314 builder.append_value("untruncated");
315 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
316 builder2.append_value("Another");
317 builder2.append_value("wait a minute");
318 let mut acc =
319 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
320 acc.push_chunk(&builder.finish())
321 .vortex_expect("push_chunk should succeed for test data");
322 acc.push_chunk(&builder2.finish())
323 .vortex_expect("push_chunk should succeed for test data");
324 let stats_table = acc
325 .as_stats_table()
326 .unwrap()
327 .expect("Must have stats table");
328 assert_eq!(
329 stats_table.array.names().as_ref(),
330 &[
331 Stat::Max.name(),
332 MAX_IS_TRUNCATED,
333 Stat::Min.name(),
334 MIN_IS_TRUNCATED,
335 ]
336 );
337 assert_eq!(
338 stats_table.array.unmasked_fields()[1]
339 .to_bool()
340 .to_bit_buffer(),
341 BitBuffer::from(vec![false, true])
342 );
343 assert_eq!(
344 stats_table.array.unmasked_fields()[3]
345 .to_bool()
346 .to_bit_buffer(),
347 BitBuffer::from(vec![true, false])
348 );
349 }
350
351 #[test]
352 fn always_adds_is_truncated_column() {
353 let array = buffer![0, 1, 2].into_array();
354 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
355 acc.push_chunk(&array)
356 .vortex_expect("push_chunk should succeed for test array");
357 let stats_table = acc
358 .as_stats_table()
359 .unwrap()
360 .expect("Must have stats table");
361 assert_eq!(
362 stats_table.array.names().as_ref(),
363 &[
364 Stat::Max.name(),
365 MAX_IS_TRUNCATED,
366 Stat::Min.name(),
367 MIN_IS_TRUNCATED,
368 Stat::Sum.name(),
369 ]
370 );
371 assert_eq!(
372 stats_table.array.unmasked_fields()[1]
373 .to_bool()
374 .to_bit_buffer(),
375 BitBuffer::from(vec![false])
376 );
377 assert_eq!(
378 stats_table.array.unmasked_fields()[3]
379 .to_bool()
380 .to_bit_buffer(),
381 BitBuffer::from(vec![false])
382 );
383 }
384
385 #[rstest]
386 fn test_zone_map_prunes() {
387 let stats = FieldPathSet::from_iter([
389 FieldPath::from_iter([Stat::Min.name().into()]),
390 FieldPath::from_iter([Stat::Max.name().into()]),
391 ]);
392
393 let zone_map = ZoneMap::try_new(
405 PType::I32.into(),
406 StructArray::from_fields(&[
407 (
408 "max",
409 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
410 ),
411 (
412 "max_is_truncated",
413 BoolArray::from_iter([false, false, false]).into_array(),
414 ),
415 (
416 "min",
417 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
418 ),
419 (
420 "min_is_truncated",
421 BoolArray::from_iter([false, false, false]).into_array(),
422 ),
423 ])
424 .unwrap(),
425 Arc::new([Stat::Max, Stat::Min]),
426 )
427 .unwrap();
428
429 let expr = gt_eq(root(), lit(6i32));
432 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
433 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
434 assert_arrays_eq!(
435 mask.into_array(),
436 BoolArray::from_iter([true, false, false])
437 );
438
439 let expr = gt(root(), lit(5i32));
442 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
443 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
444 assert_arrays_eq!(
445 mask.into_array(),
446 BoolArray::from_iter([true, false, false])
447 );
448
449 let expr = lt(root(), lit(2i32));
452 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
453 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
454 assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
455 }
456}