Skip to main content

vortex_layout/layouts/zoned/
zone_map.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::ArrayRef;
8use vortex_array::ExecutionCtx;
9use vortex_array::IntoArray;
10use vortex_array::LEGACY_SESSION;
11use vortex_array::VortexSessionExecute;
12use vortex_array::aggregate_fn::fns::sum::sum;
13use vortex_array::arrays::StructArray;
14use vortex_array::arrays::struct_::StructArrayExt;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::Nullability;
17use vortex_array::dtype::PType;
18use vortex_array::dtype::StructFields;
19use vortex_array::expr::Expression;
20use vortex_array::expr::stats::Precision;
21use vortex_array::expr::stats::Stat;
22use vortex_array::expr::stats::StatsProvider;
23use vortex_array::stats::StatsSet;
24use vortex_array::validity::Validity;
25use vortex_error::VortexExpect;
26use vortex_error::VortexResult;
27use vortex_error::vortex_bail;
28use vortex_mask::Mask;
29use vortex_session::VortexSession;
30
31use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
32use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
33use crate::layouts::zoned::builder::StatsArrayBuilder;
34use crate::layouts::zoned::builder::stats_builder_with_capacity;
35
36/// A zone map containing statistics for a column.
37/// Each row of the zone map corresponds to a chunk of the column.
38///
39/// Note that it's possible for the zone map to have no statistics.
40#[derive(Clone)]
41pub struct ZoneMap {
42    // The struct array backing the zone map
43    array: StructArray,
44    // The statistics that are included in the table.
45    stats: Arc<[Stat]>,
46}
47
48impl ZoneMap {
49    /// Create [`ZoneMap`] of given column_dtype from given array. Validates that the array matches expected
50    /// structure for given list of stats.
51    pub fn try_new(
52        column_dtype: DType,
53        array: StructArray,
54        stats: Arc<[Stat]>,
55    ) -> VortexResult<Self> {
56        let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
57        if &expected_dtype != array.dtype() {
58            vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
59        }
60
61        // SAFETY: We checked that the
62        Ok(unsafe { Self::new_unchecked(array, stats) })
63    }
64
65    /// Creates [`ZoneMap`] without validating return array against expected stats.
66    ///
67    /// # Safety
68    ///
69    /// Assumes that the input struct array has the correct statistics as fields. Or in other words,
70    /// the [`DType`] of the input array is equal to the result of [`Self::dtype_for_stats_table`].
71    pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
72        Self { array, stats }
73    }
74
75    /// Returns the [`DType`] of the statistics table given a set of statistics and column [`DType`].
76    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
77        assert!(present_stats.is_sorted(), "Stats must be sorted");
78        DType::Struct(
79            StructFields::from_iter(
80                present_stats
81                    .iter()
82                    .filter_map(|stat| {
83                        stat.dtype(column_dtype)
84                            .or_else(|| {
85                                // Backward compat: older files may have stored stats (e.g. Sum)
86                                // for extension types by resolving through the storage dtype.
87                                if let DType::Extension(ext) = column_dtype {
88                                    stat.dtype(ext.storage_dtype())
89                                } else {
90                                    None
91                                }
92                            })
93                            .map(|dtype| (stat, dtype.as_nullable()))
94                    })
95                    .flat_map(|(s, dt)| match s {
96                        Stat::Max => vec![
97                            (s.name(), dt),
98                            (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
99                        ],
100                        Stat::Min => vec![
101                            (s.name(), dt),
102                            (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
103                        ],
104                        _ => vec![(s.name(), dt)],
105                    }),
106            ),
107            Nullability::NonNullable,
108        )
109    }
110
111    /// Returns the underlying [`StructArray`] backing the zone map
112    pub fn array(&self) -> &StructArray {
113        &self.array
114    }
115
116    /// Returns the list of stats included in the zone map.
117    pub fn present_stats(&self) -> &Arc<[Stat]> {
118        &self.stats
119    }
120
121    /// Returns an aggregated stats set for the table.
122    pub fn to_stats_set(&self, stats: &[Stat], ctx: &mut ExecutionCtx) -> VortexResult<StatsSet> {
123        let mut stats_set = StatsSet::default();
124        for &stat in stats {
125            let Some(array) = self.get_stat(stat)? else {
126                continue;
127            };
128
129            // Different stats need different aggregations
130            match stat {
131                // For stats that are associative, we can just compute them over the stat column
132                Stat::Min | Stat::Max | Stat::Sum => {
133                    if let Some(s) = array.statistics().compute_stat(stat, ctx)?
134                        && let Some(v) = s.into_value()
135                    {
136                        stats_set.set(stat, Precision::exact(v))
137                    }
138                }
139                // These stats sum up
140                Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
141                    if let Some(sum_value) = sum(&array, ctx)?
142                        .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
143                        .into_value()
144                    {
145                        stats_set.set(stat, Precision::exact(sum_value));
146                    }
147                }
148                // We could implement these aggregations in the future, but for now they're unused
149                Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
150            }
151        }
152        Ok(stats_set)
153    }
154
155    /// Returns the array for a given stat.
156    pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
157        Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
158    }
159
160    /// Apply a pruning predicate against the ZoneMap, yielding a mask indicating which zones can
161    /// be pruned.
162    ///
163    /// The expression provided should be the result of converting an existing `VortexExpr` via
164    /// [`checked_pruning_expr`][vortex_array::expr::pruning::checked_pruning_expr] into a prunable
165    /// expression that can be evaluated on a zone map.
166    ///
167    /// All zones where the predicate evaluates to `true` can be skipped entirely.
168    pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
169        let mut ctx = session.create_execution_ctx();
170        self.array
171            .clone()
172            .into_array()
173            .apply(predicate)?
174            .execute::<Mask>(&mut ctx)
175    }
176}
177
178// TODO(ngates): we should make it such that the zone map stores a mirror of the DType
179//  underneath each stats column. For example, `min: i32` for an `i32` array.
180//  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
181//  See: <https://github.com/vortex-data/vortex/issues/1835>
182/// Accumulates statistics for a column.
183pub struct StatsAccumulator {
184    builders: Vec<Box<dyn StatsArrayBuilder>>,
185    length: usize,
186}
187
188impl StatsAccumulator {
189    pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
190        let builders = stats
191            .iter()
192            .filter_map(|&s| {
193                s.dtype(dtype).map(|stat_dtype| {
194                    stats_builder_with_capacity(
195                        s,
196                        &stat_dtype.as_nullable(),
197                        1024,
198                        max_variable_length_statistics_size,
199                    )
200                })
201            })
202            .collect::<Vec<_>>();
203
204        Self {
205            builders,
206            length: 0,
207        }
208    }
209
210    pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
211        for builder in self.builders.iter_mut() {
212            if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
213                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
214            } else {
215                builder.append_null();
216            }
217        }
218        self.length += 1;
219        Ok(())
220    }
221
222    pub fn push_chunk(&mut self, array: &ArrayRef) -> VortexResult<()> {
223        for builder in self.builders.iter_mut() {
224            if let Some(v) = array
225                .statistics()
226                .compute_stat(builder.stat(), &mut LEGACY_SESSION.create_execution_ctx())?
227            {
228                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
229            } else {
230                builder.append_null();
231            }
232        }
233        self.length += 1;
234        Ok(())
235    }
236
237    /// Finishes the accumulator into a [`ZoneMap`].
238    ///
239    /// Returns `None` if none of the requested statistics can be computed, for example they are
240    /// not applicable to the column's data type.
241    pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
242        let mut names = Vec::new();
243        let mut fields = Vec::new();
244        let mut stats = Vec::new();
245
246        for builder in self
247            .builders
248            .iter_mut()
249            // We sort the stats so the DType is deterministic based on which stats are present.
250            .sorted_unstable_by_key(|b| b.stat())
251        {
252            let values = builder.finish();
253
254            // We drop any all-null stats columns
255            if values.all_invalid()? {
256                continue;
257            }
258
259            stats.push(builder.stat());
260            names.extend(values.names);
261            fields.extend(values.arrays);
262        }
263
264        if names.is_empty() {
265            return Ok(None);
266        }
267
268        Ok(Some(ZoneMap {
269            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
270                .vortex_expect("Failed to create zone map"),
271            stats: stats.into(),
272        }))
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use std::sync::Arc;
279
280    use rstest::rstest;
281    use vortex_array::IntoArray;
282    use vortex_array::ToCanonical;
283    use vortex_array::arrays::BoolArray;
284    use vortex_array::arrays::PrimitiveArray;
285    use vortex_array::arrays::StructArray;
286    use vortex_array::arrays::bool::BoolArrayExt;
287    use vortex_array::arrays::struct_::StructArrayExt;
288    use vortex_array::assert_arrays_eq;
289    use vortex_array::builders::ArrayBuilder;
290    use vortex_array::builders::VarBinViewBuilder;
291    use vortex_array::dtype::DType;
292    use vortex_array::dtype::FieldPath;
293    use vortex_array::dtype::FieldPathSet;
294    use vortex_array::dtype::Nullability;
295    use vortex_array::dtype::PType;
296    use vortex_array::expr::gt;
297    use vortex_array::expr::gt_eq;
298    use vortex_array::expr::lit;
299    use vortex_array::expr::lt;
300    use vortex_array::expr::pruning::checked_pruning_expr;
301    use vortex_array::expr::root;
302    use vortex_array::expr::stats::Stat;
303    use vortex_array::validity::Validity;
304    use vortex_buffer::BitBuffer;
305    use vortex_buffer::buffer;
306    use vortex_error::VortexExpect;
307
308    use crate::layouts::zoned::MAX_IS_TRUNCATED;
309    use crate::layouts::zoned::MIN_IS_TRUNCATED;
310    use crate::layouts::zoned::zone_map::StatsAccumulator;
311    use crate::layouts::zoned::zone_map::ZoneMap;
312    use crate::test::SESSION;
313
314    #[rstest]
315    #[case(DType::Utf8(Nullability::NonNullable))]
316    #[case(DType::Binary(Nullability::NonNullable))]
317    fn truncates_accumulated_stats(#[case] dtype: DType) {
318        let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
319        builder.append_value("Value to be truncated");
320        builder.append_value("untruncated");
321        let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
322        builder2.append_value("Another");
323        builder2.append_value("wait a minute");
324        let mut acc =
325            StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
326        acc.push_chunk(&builder.finish())
327            .vortex_expect("push_chunk should succeed for test data");
328        acc.push_chunk(&builder2.finish())
329            .vortex_expect("push_chunk should succeed for test data");
330        let stats_table = acc
331            .as_stats_table()
332            .unwrap()
333            .expect("Must have stats table");
334        assert_eq!(
335            stats_table.array.names().as_ref(),
336            &[
337                Stat::Max.name(),
338                MAX_IS_TRUNCATED,
339                Stat::Min.name(),
340                MIN_IS_TRUNCATED,
341            ]
342        );
343        assert_eq!(
344            stats_table
345                .array
346                .unmasked_field(1)
347                .to_bool()
348                .to_bit_buffer(),
349            BitBuffer::from(vec![false, true])
350        );
351        assert_eq!(
352            stats_table
353                .array
354                .unmasked_field(3)
355                .to_bool()
356                .to_bit_buffer(),
357            BitBuffer::from(vec![true, false])
358        );
359    }
360
361    #[test]
362    fn always_adds_is_truncated_column() {
363        let array = buffer![0, 1, 2].into_array();
364        let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
365        acc.push_chunk(&array)
366            .vortex_expect("push_chunk should succeed for test array");
367        let stats_table = acc
368            .as_stats_table()
369            .unwrap()
370            .expect("Must have stats table");
371        assert_eq!(
372            stats_table.array.names().as_ref(),
373            &[
374                Stat::Max.name(),
375                MAX_IS_TRUNCATED,
376                Stat::Min.name(),
377                MIN_IS_TRUNCATED,
378                Stat::Sum.name(),
379            ]
380        );
381        assert_eq!(
382            stats_table
383                .array
384                .unmasked_field(1)
385                .to_bool()
386                .to_bit_buffer(),
387            BitBuffer::from(vec![false])
388        );
389        assert_eq!(
390            stats_table
391                .array
392                .unmasked_field(3)
393                .to_bool()
394                .to_bit_buffer(),
395            BitBuffer::from(vec![false])
396        );
397    }
398
399    #[rstest]
400    fn test_zone_map_prunes() {
401        // All stats that are known at pruning time.
402        let stats = FieldPathSet::from_iter([
403            FieldPath::from_iter([Stat::Min.name().into()]),
404            FieldPath::from_iter([Stat::Max.name().into()]),
405        ]);
406
407        // Construct a zone map with 3 zones:
408        //
409        // +----------+----------+
410        // |  a_min   |  a_max   |
411        // +----------+----------+
412        // |  1       |  5       |
413        // +----------+----------+
414        // |  2       |  6       |
415        // +----------+----------+
416        // |  3       |  7       |
417        // +----------+----------+
418        let zone_map = ZoneMap::try_new(
419            PType::I32.into(),
420            StructArray::from_fields(&[
421                (
422                    "max",
423                    PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
424                ),
425                (
426                    "max_is_truncated",
427                    BoolArray::from_iter([false, false, false]).into_array(),
428                ),
429                (
430                    "min",
431                    PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
432                ),
433                (
434                    "min_is_truncated",
435                    BoolArray::from_iter([false, false, false]).into_array(),
436                ),
437            ])
438            .unwrap(),
439            Arc::new([Stat::Max, Stat::Min]),
440        )
441        .unwrap();
442
443        // A >= 6
444        // => A.max < 6
445        let expr = gt_eq(root(), lit(6i32));
446        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
447        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
448        assert_arrays_eq!(
449            mask.into_array(),
450            BoolArray::from_iter([true, false, false])
451        );
452
453        // A > 5
454        // => A.max <= 5
455        let expr = gt(root(), lit(5i32));
456        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
457        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
458        assert_arrays_eq!(
459            mask.into_array(),
460            BoolArray::from_iter([true, false, false])
461        );
462
463        // A < 2
464        // => A.min >= 2
465        let expr = lt(root(), lit(2i32));
466        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
467        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
468        assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
469    }
470}