vortex_layout/layouts/zoned/
zone_map.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::arrays::StructArray;
8use vortex_array::compute::sum;
9use vortex_array::stats::{Precision, Stat, StatsSet};
10use vortex_array::validity::Validity;
11use vortex_array::{Array, ArrayRef};
12use vortex_dtype::{DType, Nullability, PType, StructFields};
13use vortex_error::{VortexExpect, VortexResult, vortex_bail};
14use vortex_expr::{ExprRef, Scope};
15use vortex_mask::Mask;
16
17use crate::layouts::zoned::builder::{
18    MAX_IS_TRUNCATED, MIN_IS_TRUNCATED, StatsArrayBuilder, stats_builder_with_capacity,
19};
20
21/// A zone map containing statistics for a column.
22/// Each row of the zone map corresponds to a chunk of the column.
23///
24/// Note that it's possible for the zone map to have no statistics.
25#[derive(Clone)]
26pub struct ZoneMap {
27    // The struct array backing the zone map
28    array: StructArray,
29    // The statistics that are included in the table.
30    stats: Arc<[Stat]>,
31}
32
33impl ZoneMap {
34    /// Create [`ZoneMap`] of given column_dtype from given array. Validates that the array matches expected
35    /// structure for given list of stats.
36    pub fn try_new(
37        column_dtype: DType,
38        array: StructArray,
39        stats: Arc<[Stat]>,
40    ) -> VortexResult<Self> {
41        let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
42        if &expected_dtype != array.dtype() {
43            vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
44        }
45        Ok(Self::new_unchecked(array, stats))
46    }
47
48    /// Creates [`ZoneMap`] without validating return array against expected stats.
49    pub fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
50        Self { array, stats }
51    }
52
53    /// Returns the [`DType`] of the statistics table given a set of statistics and column [`DType`].
54    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
55        assert!(present_stats.is_sorted(), "Stats must be sorted");
56        DType::Struct(
57            StructFields::from_iter(
58                present_stats
59                    .iter()
60                    .filter_map(|stat| {
61                        stat.dtype(column_dtype)
62                            .map(|dtype| (stat, dtype.as_nullable()))
63                    })
64                    .flat_map(|(s, dt)| match s {
65                        Stat::Max => vec![
66                            (s.name(), dt),
67                            (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
68                        ],
69                        Stat::Min => vec![
70                            (s.name(), dt),
71                            (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
72                        ],
73                        _ => vec![(s.name(), dt)],
74                    }),
75            ),
76            Nullability::NonNullable,
77        )
78    }
79
80    /// Returns the underlying [`StructArray`] backing the zone map
81    pub fn array(&self) -> &StructArray {
82        &self.array
83    }
84
85    /// Returns the list of stats included in the zone map.
86    pub fn present_stats(&self) -> &Arc<[Stat]> {
87        &self.stats
88    }
89
90    /// Returns an aggregated stats set for the table.
91    pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
92        let mut stats_set = StatsSet::default();
93        for &stat in stats {
94            let Some(array) = self.get_stat(stat)? else {
95                continue;
96            };
97
98            // Different stats need different aggregations
99            match stat {
100                // For stats that are associative, we can just compute them over the stat column
101                Stat::Min | Stat::Max | Stat::Sum => {
102                    if let Some(s) = array.statistics().compute_stat(stat)? {
103                        stats_set.set(stat, Precision::exact(s))
104                    }
105                }
106                // These stats sum up
107                Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
108                    let sum = sum(&array)?
109                        .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
110                        .into_value();
111                    stats_set.set(stat, Precision::exact(sum));
112                }
113                // We could implement these aggregations in the future, but for now they're unused
114                Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
115            }
116        }
117        Ok(stats_set)
118    }
119
120    /// Returns the array for a given stat.
121    pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
122        Ok(self.array.field_by_name_opt(stat.name()).cloned())
123    }
124
125    /// Apply a pruning predicate against the ZoneMap, yielding a mask indicating which zones can
126    /// be pruned.
127    ///
128    /// The expression provided should be the result of converting an existing `VortexExpr` via
129    /// [`checked_pruning_expr`][vortex_expr::pruning::checked_pruning_expr] into a prunable
130    /// expression that can be evaluated on a zone map.
131    ///
132    /// All zones where the predicate evaluates to `true` can be skipped entirely.
133    pub fn prune(&self, predicate: &ExprRef) -> VortexResult<Mask> {
134        Mask::try_from(
135            predicate
136                .evaluate(&Scope::new(self.array.to_array()))?
137                .as_ref(),
138        )
139    }
140}
141
142// TODO(ngates): we should make it such that the zone map stores a mirror of the DType
143//  underneath each stats column. For example, `min: i32` for an `i32` array.
144//  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
145//  See: <https://github.com/vortex-data/vortex/issues/1835>
146/// Accumulates statistics for a column.
147pub struct StatsAccumulator {
148    builders: Vec<Box<dyn StatsArrayBuilder>>,
149    length: usize,
150}
151
152impl StatsAccumulator {
153    pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
154        let builders = stats
155            .iter()
156            .filter_map(|&s| {
157                s.dtype(dtype).map(|stat_dtype| {
158                    stats_builder_with_capacity(
159                        s,
160                        &stat_dtype.as_nullable(),
161                        1024,
162                        max_variable_length_statistics_size,
163                    )
164                })
165            })
166            .collect::<Vec<_>>();
167
168        Self {
169            builders,
170            length: 0,
171        }
172    }
173
174    pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
175        for builder in self.builders.iter_mut() {
176            if let Some(v) = array.statistics().compute_stat(builder.stat())? {
177                builder.append_scalar_value(v)?;
178            } else {
179                builder.append_null();
180            }
181        }
182        self.length += 1;
183        Ok(())
184    }
185
186    /// Finishes the accumulator into a [`ZoneMap`].
187    ///
188    /// Returns `None` if none of the requested statistics can be computed, for example they are
189    /// not applicable to the column's data type.
190    pub fn as_stats_table(&mut self) -> Option<ZoneMap> {
191        let mut names = Vec::new();
192        let mut fields = Vec::new();
193        let mut stats = Vec::new();
194
195        for builder in self
196            .builders
197            .iter_mut()
198            // We sort the stats so the DType is deterministic based on which stats are present.
199            .sorted_unstable_by_key(|b| b.stat())
200        {
201            let values = builder.finish();
202
203            // We drop any all-null stats columns
204            if values
205                .all_invalid()
206                .vortex_expect("failed to get invalid count")
207            {
208                continue;
209            }
210
211            stats.push(builder.stat());
212            names.extend(values.names);
213            fields.extend(values.arrays);
214        }
215
216        if names.is_empty() {
217            return None;
218        }
219
220        Some(ZoneMap {
221            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
222                .vortex_expect("Failed to create zone map"),
223            stats: stats.into(),
224        })
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use std::sync::Arc;
231
232    use arrow_buffer::BooleanBuffer;
233    use itertools::Itertools;
234    use rstest::rstest;
235    use vortex_array::arrays::{BoolArray, PrimitiveArray, StructArray};
236    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
237    use vortex_array::stats::Stat;
238    use vortex_array::validity::Validity;
239    use vortex_array::{IntoArray, ToCanonical};
240    use vortex_buffer::buffer;
241    use vortex_dtype::{DType, FieldPath, FieldPathSet, Nullability, PType};
242    use vortex_error::{VortexExpect, VortexUnwrap};
243    use vortex_expr::pruning::checked_pruning_expr;
244    use vortex_expr::{gt, gt_eq, lit, lt, root};
245
246    use crate::layouts::zoned::zone_map::{StatsAccumulator, ZoneMap};
247    use crate::layouts::zoned::{MAX_IS_TRUNCATED, MIN_IS_TRUNCATED};
248
249    #[rstest]
250    #[case(DType::Utf8(Nullability::NonNullable))]
251    #[case(DType::Binary(Nullability::NonNullable))]
252    fn truncates_accumulated_stats(#[case] dtype: DType) {
253        let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
254        builder.append_value("Value to be truncated");
255        builder.append_value("untruncated");
256        let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
257        builder2.append_value("Another");
258        builder2.append_value("wait a minute");
259        let mut acc =
260            StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
261        acc.push_chunk(&builder.finish()).vortex_unwrap();
262        acc.push_chunk(&builder2.finish()).vortex_unwrap();
263        let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
264        assert_eq!(
265            stats_table.array.names().as_ref(),
266            &[
267                Stat::Max.name().into(),
268                MAX_IS_TRUNCATED.into(),
269                Stat::Min.name().into(),
270                MIN_IS_TRUNCATED.into(),
271            ]
272        );
273        assert_eq!(
274            stats_table.array.fields()[1]
275                .to_bool()
276                .vortex_unwrap()
277                .boolean_buffer(),
278            &BooleanBuffer::from(vec![false, true])
279        );
280        assert_eq!(
281            stats_table.array.fields()[3]
282                .to_bool()
283                .vortex_unwrap()
284                .boolean_buffer(),
285            &BooleanBuffer::from(vec![true, false])
286        );
287    }
288
289    #[test]
290    fn always_adds_is_truncated_column() {
291        let array = buffer![0, 1, 2].into_array();
292        let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
293        acc.push_chunk(&array).vortex_unwrap();
294        let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
295        assert_eq!(
296            stats_table.array.names().as_ref(),
297            &[
298                Stat::Max.name().into(),
299                MAX_IS_TRUNCATED.into(),
300                Stat::Min.name().into(),
301                MIN_IS_TRUNCATED.into(),
302                Stat::Sum.name().into(),
303            ]
304        );
305        assert_eq!(
306            stats_table.array.fields()[1]
307                .to_bool()
308                .vortex_unwrap()
309                .boolean_buffer(),
310            &BooleanBuffer::from(vec![false])
311        );
312        assert_eq!(
313            stats_table.array.fields()[3]
314                .to_bool()
315                .vortex_unwrap()
316                .boolean_buffer(),
317            &BooleanBuffer::from(vec![false])
318        );
319    }
320
321    #[rstest]
322    fn test_zone_map_prunes() {
323        // All stats that are known at pruning time.
324        let stats = FieldPathSet::from_iter([
325            FieldPath::from_iter([Stat::Min.name().into()]),
326            FieldPath::from_iter([Stat::Max.name().into()]),
327        ]);
328
329        // Construct a zone map with 3 zones:
330        //
331        // +----------+----------+
332        // |  a_min   |  a_max   |
333        // +----------+----------+
334        // |  1       |  5       |
335        // +----------+----------+
336        // |  2       |  6       |
337        // +----------+----------+
338        // |  3       |  7       |
339        // +----------+----------+
340        let zone_map = ZoneMap::try_new(
341            PType::I32.into(),
342            StructArray::from_fields(&[
343                (
344                    "max",
345                    PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
346                ),
347                (
348                    "max_is_truncated",
349                    BoolArray::from_iter([false, false, false]).into_array(),
350                ),
351                (
352                    "min",
353                    PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
354                ),
355                (
356                    "min_is_truncated",
357                    BoolArray::from_iter([false, false, false]).into_array(),
358                ),
359            ])
360            .unwrap(),
361            Arc::new([Stat::Max, Stat::Min]),
362        )
363        .unwrap();
364
365        // A >= 6
366        // => A.max < 6
367        let expr = gt_eq(root(), lit(6i32));
368        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
369        let mask = zone_map.prune(&pruning_expr).unwrap();
370        assert_eq!(
371            mask.to_boolean_buffer().into_iter().collect_vec(),
372            vec![true, false, false]
373        );
374
375        // A > 5
376        // => A.max <= 5
377        let expr = gt(root(), lit(5i32));
378        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
379        let mask = zone_map.prune(&pruning_expr).unwrap();
380        assert_eq!(
381            mask.to_boolean_buffer().into_iter().collect_vec(),
382            vec![true, false, false]
383        );
384
385        // A < 2
386        // => A.min >= 2
387        let expr = lt(root(), lit(2i32));
388        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
389        let mask = zone_map.prune(&pruning_expr).unwrap();
390        assert_eq!(
391            mask.to_boolean_buffer().into_iter().collect_vec(),
392            vec![false, true, true]
393        );
394    }
395}