vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::Array;
8use vortex_array::ArrayRef;
9use vortex_array::VortexSessionExecute;
10use vortex_array::arrays::StructArray;
11use vortex_array::compute::sum;
12use vortex_array::expr::Expression;
13use vortex_array::expr::stats::Precision;
14use vortex_array::expr::stats::Stat;
15use vortex_array::expr::stats::StatsProvider;
16use vortex_array::stats::StatsSet;
17use vortex_array::validity::Validity;
18use vortex_dtype::DType;
19use vortex_dtype::Nullability;
20use vortex_dtype::PType;
21use vortex_dtype::StructFields;
22use vortex_error::VortexExpect;
23use vortex_error::VortexResult;
24use vortex_error::vortex_bail;
25use vortex_mask::Mask;
26use vortex_session::VortexSession;
27
28use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
29use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
30use crate::layouts::zoned::builder::StatsArrayBuilder;
31use crate::layouts::zoned::builder::stats_builder_with_capacity;
32
33#[derive(Clone)]
38pub struct ZoneMap {
39 array: StructArray,
41 stats: Arc<[Stat]>,
43}
44
45impl ZoneMap {
46 pub fn try_new(
49 column_dtype: DType,
50 array: StructArray,
51 stats: Arc<[Stat]>,
52 ) -> VortexResult<Self> {
53 let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
54 if &expected_dtype != array.dtype() {
55 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
56 }
57
58 Ok(unsafe { Self::new_unchecked(array, stats) })
60 }
61
62 pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
69 Self { array, stats }
70 }
71
72 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
74 assert!(present_stats.is_sorted(), "Stats must be sorted");
75 DType::Struct(
76 StructFields::from_iter(
77 present_stats
78 .iter()
79 .filter_map(|stat| {
80 stat.dtype(column_dtype)
81 .map(|dtype| (stat, dtype.as_nullable()))
82 })
83 .flat_map(|(s, dt)| match s {
84 Stat::Max => vec![
85 (s.name(), dt),
86 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
87 ],
88 Stat::Min => vec![
89 (s.name(), dt),
90 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
91 ],
92 _ => vec![(s.name(), dt)],
93 }),
94 ),
95 Nullability::NonNullable,
96 )
97 }
98
99 pub fn array(&self) -> &StructArray {
101 &self.array
102 }
103
104 pub fn present_stats(&self) -> &Arc<[Stat]> {
106 &self.stats
107 }
108
109 pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
111 let mut stats_set = StatsSet::default();
112 for &stat in stats {
113 let Some(array) = self.get_stat(stat)? else {
114 continue;
115 };
116
117 match stat {
119 Stat::Min | Stat::Max | Stat::Sum => {
121 if let Some(s) = array.statistics().compute_stat(stat)?
122 && let Some(v) = s.into_value()
123 {
124 stats_set.set(stat, Precision::exact(v))
125 }
126 }
127 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
129 if let Some(sum_value) = sum(&array)?
130 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
131 .into_value()
132 {
133 stats_set.set(stat, Precision::exact(sum_value));
134 }
135 }
136 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
138 }
139 }
140 Ok(stats_set)
141 }
142
143 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
145 Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
146 }
147
148 pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
157 let mut ctx = session.create_execution_ctx();
158 self.array
159 .to_array()
160 .apply(predicate)?
161 .execute::<Mask>(&mut ctx)
162 }
163}
164
165pub struct StatsAccumulator {
171 builders: Vec<Box<dyn StatsArrayBuilder>>,
172 length: usize,
173}
174
175impl StatsAccumulator {
176 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
177 let builders = stats
178 .iter()
179 .filter_map(|&s| {
180 s.dtype(dtype).map(|stat_dtype| {
181 stats_builder_with_capacity(
182 s,
183 &stat_dtype.as_nullable(),
184 1024,
185 max_variable_length_statistics_size,
186 )
187 })
188 })
189 .collect::<Vec<_>>();
190
191 Self {
192 builders,
193 length: 0,
194 }
195 }
196
197 pub fn push_chunk_without_compute(&mut self, array: &dyn Array) -> VortexResult<()> {
198 for builder in self.builders.iter_mut() {
199 if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
200 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
201 } else {
202 builder.append_null();
203 }
204 }
205 self.length += 1;
206 Ok(())
207 }
208
209 pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
210 for builder in self.builders.iter_mut() {
211 if let Some(v) = array.statistics().compute_stat(builder.stat())? {
212 builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
213 } else {
214 builder.append_null();
215 }
216 }
217 self.length += 1;
218 Ok(())
219 }
220
221 pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
226 let mut names = Vec::new();
227 let mut fields = Vec::new();
228 let mut stats = Vec::new();
229
230 for builder in self
231 .builders
232 .iter_mut()
233 .sorted_unstable_by_key(|b| b.stat())
235 {
236 let values = builder.finish();
237
238 if values.all_invalid()? {
240 continue;
241 }
242
243 stats.push(builder.stat());
244 names.extend(values.names);
245 fields.extend(values.arrays);
246 }
247
248 if names.is_empty() {
249 return Ok(None);
250 }
251
252 Ok(Some(ZoneMap {
253 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
254 .vortex_expect("Failed to create zone map"),
255 stats: stats.into(),
256 }))
257 }
258}
259
260#[cfg(test)]
261mod tests {
262 use std::sync::Arc;
263
264 use rstest::rstest;
265 use vortex_array::IntoArray;
266 use vortex_array::ToCanonical;
267 use vortex_array::arrays::BoolArray;
268 use vortex_array::arrays::PrimitiveArray;
269 use vortex_array::arrays::StructArray;
270 use vortex_array::assert_arrays_eq;
271 use vortex_array::builders::ArrayBuilder;
272 use vortex_array::builders::VarBinViewBuilder;
273 use vortex_array::expr::gt;
274 use vortex_array::expr::gt_eq;
275 use vortex_array::expr::lit;
276 use vortex_array::expr::lt;
277 use vortex_array::expr::pruning::checked_pruning_expr;
278 use vortex_array::expr::root;
279 use vortex_array::expr::stats::Stat;
280 use vortex_array::validity::Validity;
281 use vortex_buffer::BitBuffer;
282 use vortex_buffer::buffer;
283 use vortex_dtype::DType;
284 use vortex_dtype::FieldPath;
285 use vortex_dtype::FieldPathSet;
286 use vortex_dtype::Nullability;
287 use vortex_dtype::PType;
288 use vortex_error::VortexExpect;
289
290 use crate::layouts::zoned::MAX_IS_TRUNCATED;
291 use crate::layouts::zoned::MIN_IS_TRUNCATED;
292 use crate::layouts::zoned::zone_map::StatsAccumulator;
293 use crate::layouts::zoned::zone_map::ZoneMap;
294 use crate::test::SESSION;
295
296 #[rstest]
297 #[case(DType::Utf8(Nullability::NonNullable))]
298 #[case(DType::Binary(Nullability::NonNullable))]
299 fn truncates_accumulated_stats(#[case] dtype: DType) {
300 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
301 builder.append_value("Value to be truncated");
302 builder.append_value("untruncated");
303 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
304 builder2.append_value("Another");
305 builder2.append_value("wait a minute");
306 let mut acc =
307 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
308 acc.push_chunk(&builder.finish())
309 .vortex_expect("push_chunk should succeed for test data");
310 acc.push_chunk(&builder2.finish())
311 .vortex_expect("push_chunk should succeed for test data");
312 let stats_table = acc
313 .as_stats_table()
314 .unwrap()
315 .expect("Must have stats table");
316 assert_eq!(
317 stats_table.array.names().as_ref(),
318 &[
319 Stat::Max.name(),
320 MAX_IS_TRUNCATED,
321 Stat::Min.name(),
322 MIN_IS_TRUNCATED,
323 ]
324 );
325 assert_eq!(
326 stats_table.array.unmasked_fields()[1]
327 .to_bool()
328 .to_bit_buffer(),
329 BitBuffer::from(vec![false, true])
330 );
331 assert_eq!(
332 stats_table.array.unmasked_fields()[3]
333 .to_bool()
334 .to_bit_buffer(),
335 BitBuffer::from(vec![true, false])
336 );
337 }
338
339 #[test]
340 fn always_adds_is_truncated_column() {
341 let array = buffer![0, 1, 2].into_array();
342 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
343 acc.push_chunk(&array)
344 .vortex_expect("push_chunk should succeed for test array");
345 let stats_table = acc
346 .as_stats_table()
347 .unwrap()
348 .expect("Must have stats table");
349 assert_eq!(
350 stats_table.array.names().as_ref(),
351 &[
352 Stat::Max.name(),
353 MAX_IS_TRUNCATED,
354 Stat::Min.name(),
355 MIN_IS_TRUNCATED,
356 Stat::Sum.name(),
357 ]
358 );
359 assert_eq!(
360 stats_table.array.unmasked_fields()[1]
361 .to_bool()
362 .to_bit_buffer(),
363 BitBuffer::from(vec![false])
364 );
365 assert_eq!(
366 stats_table.array.unmasked_fields()[3]
367 .to_bool()
368 .to_bit_buffer(),
369 BitBuffer::from(vec![false])
370 );
371 }
372
373 #[rstest]
374 fn test_zone_map_prunes() {
375 let stats = FieldPathSet::from_iter([
377 FieldPath::from_iter([Stat::Min.name().into()]),
378 FieldPath::from_iter([Stat::Max.name().into()]),
379 ]);
380
381 let zone_map = ZoneMap::try_new(
393 PType::I32.into(),
394 StructArray::from_fields(&[
395 (
396 "max",
397 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
398 ),
399 (
400 "max_is_truncated",
401 BoolArray::from_iter([false, false, false]).into_array(),
402 ),
403 (
404 "min",
405 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
406 ),
407 (
408 "min_is_truncated",
409 BoolArray::from_iter([false, false, false]).into_array(),
410 ),
411 ])
412 .unwrap(),
413 Arc::new([Stat::Max, Stat::Min]),
414 )
415 .unwrap();
416
417 let expr = gt_eq(root(), lit(6i32));
420 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
421 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
422 assert_arrays_eq!(
423 mask.into_array(),
424 BoolArray::from_iter([true, false, false])
425 );
426
427 let expr = gt(root(), lit(5i32));
430 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
431 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
432 assert_arrays_eq!(
433 mask.into_array(),
434 BoolArray::from_iter([true, false, false])
435 );
436
437 let expr = lt(root(), lit(2i32));
440 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
441 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
442 assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
443 }
444}