vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::ArrayRef;
8use vortex_array::DynArray;
9use vortex_array::IntoArray;
10use vortex_array::VortexSessionExecute;
11use vortex_array::arrays::StructArray;
12use vortex_array::compute::sum;
13use vortex_array::dtype::DType;
14use vortex_array::dtype::Nullability;
15use vortex_array::dtype::PType;
16use vortex_array::dtype::StructFields;
17use vortex_array::expr::Expression;
18use vortex_array::expr::stats::Precision;
19use vortex_array::expr::stats::Stat;
20use vortex_array::expr::stats::StatsProvider;
21use vortex_array::stats::StatsSet;
22use vortex_array::validity::Validity;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_error::vortex_bail;
26use vortex_mask::Mask;
27use vortex_session::VortexSession;
28
29use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
30use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
31use crate::layouts::zoned::builder::StatsArrayBuilder;
32use crate::layouts::zoned::builder::stats_builder_with_capacity;
33
34#[derive(Clone)]
39pub struct ZoneMap {
40 array: StructArray,
42 stats: Arc<[Stat]>,
44}
45
46impl ZoneMap {
47 pub fn try_new(
50 column_dtype: DType,
51 array: StructArray,
52 stats: Arc<[Stat]>,
53 ) -> VortexResult<Self> {
54 let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
55 if &expected_dtype != array.dtype() {
56 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
57 }
58
59 Ok(unsafe { Self::new_unchecked(array, stats) })
61 }
62
63 pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
70 Self { array, stats }
71 }
72
73 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
75 assert!(present_stats.is_sorted(), "Stats must be sorted");
76 DType::Struct(
77 StructFields::from_iter(
78 present_stats
79 .iter()
80 .filter_map(|stat| {
81 stat.dtype(column_dtype)
82 .map(|dtype| (stat, dtype.as_nullable()))
83 })
84 .flat_map(|(s, dt)| match s {
85 Stat::Max => vec![
86 (s.name(), dt),
87 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
88 ],
89 Stat::Min => vec![
90 (s.name(), dt),
91 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
92 ],
93 _ => vec![(s.name(), dt)],
94 }),
95 ),
96 Nullability::NonNullable,
97 )
98 }
99
100 pub fn array(&self) -> &StructArray {
102 &self.array
103 }
104
105 pub fn present_stats(&self) -> &Arc<[Stat]> {
107 &self.stats
108 }
109
110 pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
112 let mut stats_set = StatsSet::default();
113 for &stat in stats {
114 let Some(array) = self.get_stat(stat)? else {
115 continue;
116 };
117
118 match stat {
120 Stat::Min | Stat::Max | Stat::Sum => {
122 if let Some(s) = array.statistics().compute_stat(stat)?
123 && let Some(v) = s.into_value()
124 {
125 stats_set.set(stat, Precision::exact(v))
126 }
127 }
128 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
130 if let Some(sum_value) = sum(&array)?
131 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
132 .into_value()
133 {
134 stats_set.set(stat, Precision::exact(sum_value));
135 }
136 }
137 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
139 }
140 }
141 Ok(stats_set)
142 }
143
144 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
146 Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
147 }
148
149 pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
158 let mut ctx = session.create_execution_ctx();
159 self.array
160 .clone()
161 .into_array()
162 .apply(predicate)?
163 .execute::<Mask>(&mut ctx)
164 }
165}
166
167pub struct StatsAccumulator {
173 builders: Vec<Box<dyn StatsArrayBuilder>>,
174 length: usize,
175}
176
177impl StatsAccumulator {
178 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
179 let builders = stats
180 .iter()
181 .filter_map(|&s| {
182 s.dtype(dtype).map(|stat_dtype| {
183 stats_builder_with_capacity(
184 s,
185 &stat_dtype.as_nullable(),
186 1024,
187 max_variable_length_statistics_size,
188 )
189 })
190 })
191 .collect::<Vec<_>>();
192
193 Self {
194 builders,
195 length: 0,
196 }
197 }
198
199 pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
200 for builder in self.builders.iter_mut() {
201 if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
202 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
203 } else {
204 builder.append_null();
205 }
206 }
207 self.length += 1;
208 Ok(())
209 }
210
211 pub fn push_chunk(&mut self, array: &ArrayRef) -> VortexResult<()> {
212 for builder in self.builders.iter_mut() {
213 if let Some(v) = array.statistics().compute_stat(builder.stat())? {
214 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
215 } else {
216 builder.append_null();
217 }
218 }
219 self.length += 1;
220 Ok(())
221 }
222
223 pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
228 let mut names = Vec::new();
229 let mut fields = Vec::new();
230 let mut stats = Vec::new();
231
232 for builder in self
233 .builders
234 .iter_mut()
235 .sorted_unstable_by_key(|b| b.stat())
237 {
238 let values = builder.finish();
239
240 if values.all_invalid()? {
242 continue;
243 }
244
245 stats.push(builder.stat());
246 names.extend(values.names);
247 fields.extend(values.arrays);
248 }
249
250 if names.is_empty() {
251 return Ok(None);
252 }
253
254 Ok(Some(ZoneMap {
255 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
256 .vortex_expect("Failed to create zone map"),
257 stats: stats.into(),
258 }))
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use std::sync::Arc;
265
266 use rstest::rstest;
267 use vortex_array::IntoArray;
268 use vortex_array::ToCanonical;
269 use vortex_array::arrays::BoolArray;
270 use vortex_array::arrays::PrimitiveArray;
271 use vortex_array::arrays::StructArray;
272 use vortex_array::assert_arrays_eq;
273 use vortex_array::builders::ArrayBuilder;
274 use vortex_array::builders::VarBinViewBuilder;
275 use vortex_array::dtype::DType;
276 use vortex_array::dtype::FieldPath;
277 use vortex_array::dtype::FieldPathSet;
278 use vortex_array::dtype::Nullability;
279 use vortex_array::dtype::PType;
280 use vortex_array::expr::gt;
281 use vortex_array::expr::gt_eq;
282 use vortex_array::expr::lit;
283 use vortex_array::expr::lt;
284 use vortex_array::expr::pruning::checked_pruning_expr;
285 use vortex_array::expr::root;
286 use vortex_array::expr::stats::Stat;
287 use vortex_array::validity::Validity;
288 use vortex_buffer::BitBuffer;
289 use vortex_buffer::buffer;
290 use vortex_error::VortexExpect;
291
292 use crate::layouts::zoned::MAX_IS_TRUNCATED;
293 use crate::layouts::zoned::MIN_IS_TRUNCATED;
294 use crate::layouts::zoned::zone_map::StatsAccumulator;
295 use crate::layouts::zoned::zone_map::ZoneMap;
296 use crate::test::SESSION;
297
298 #[rstest]
299 #[case(DType::Utf8(Nullability::NonNullable))]
300 #[case(DType::Binary(Nullability::NonNullable))]
301 fn truncates_accumulated_stats(#[case] dtype: DType) {
302 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
303 builder.append_value("Value to be truncated");
304 builder.append_value("untruncated");
305 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
306 builder2.append_value("Another");
307 builder2.append_value("wait a minute");
308 let mut acc =
309 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
310 acc.push_chunk(&builder.finish())
311 .vortex_expect("push_chunk should succeed for test data");
312 acc.push_chunk(&builder2.finish())
313 .vortex_expect("push_chunk should succeed for test data");
314 let stats_table = acc
315 .as_stats_table()
316 .unwrap()
317 .expect("Must have stats table");
318 assert_eq!(
319 stats_table.array.names().as_ref(),
320 &[
321 Stat::Max.name(),
322 MAX_IS_TRUNCATED,
323 Stat::Min.name(),
324 MIN_IS_TRUNCATED,
325 ]
326 );
327 assert_eq!(
328 stats_table.array.unmasked_fields()[1]
329 .to_bool()
330 .to_bit_buffer(),
331 BitBuffer::from(vec![false, true])
332 );
333 assert_eq!(
334 stats_table.array.unmasked_fields()[3]
335 .to_bool()
336 .to_bit_buffer(),
337 BitBuffer::from(vec![true, false])
338 );
339 }
340
341 #[test]
342 fn always_adds_is_truncated_column() {
343 let array = buffer![0, 1, 2].into_array();
344 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
345 acc.push_chunk(&array)
346 .vortex_expect("push_chunk should succeed for test array");
347 let stats_table = acc
348 .as_stats_table()
349 .unwrap()
350 .expect("Must have stats table");
351 assert_eq!(
352 stats_table.array.names().as_ref(),
353 &[
354 Stat::Max.name(),
355 MAX_IS_TRUNCATED,
356 Stat::Min.name(),
357 MIN_IS_TRUNCATED,
358 Stat::Sum.name(),
359 ]
360 );
361 assert_eq!(
362 stats_table.array.unmasked_fields()[1]
363 .to_bool()
364 .to_bit_buffer(),
365 BitBuffer::from(vec![false])
366 );
367 assert_eq!(
368 stats_table.array.unmasked_fields()[3]
369 .to_bool()
370 .to_bit_buffer(),
371 BitBuffer::from(vec![false])
372 );
373 }
374
375 #[rstest]
376 fn test_zone_map_prunes() {
377 let stats = FieldPathSet::from_iter([
379 FieldPath::from_iter([Stat::Min.name().into()]),
380 FieldPath::from_iter([Stat::Max.name().into()]),
381 ]);
382
383 let zone_map = ZoneMap::try_new(
395 PType::I32.into(),
396 StructArray::from_fields(&[
397 (
398 "max",
399 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
400 ),
401 (
402 "max_is_truncated",
403 BoolArray::from_iter([false, false, false]).into_array(),
404 ),
405 (
406 "min",
407 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
408 ),
409 (
410 "min_is_truncated",
411 BoolArray::from_iter([false, false, false]).into_array(),
412 ),
413 ])
414 .unwrap(),
415 Arc::new([Stat::Max, Stat::Min]),
416 )
417 .unwrap();
418
419 let expr = gt_eq(root(), lit(6i32));
422 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
423 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
424 assert_arrays_eq!(
425 mask.into_array(),
426 BoolArray::from_iter([true, false, false])
427 );
428
429 let expr = gt(root(), lit(5i32));
432 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
433 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
434 assert_arrays_eq!(
435 mask.into_array(),
436 BoolArray::from_iter([true, false, false])
437 );
438
439 let expr = lt(root(), lit(2i32));
442 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
443 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
444 assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
445 }
446}