vortex_layout/layouts/zoned/
zone_map.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::arrays::StructArray;
8use vortex_array::compute::sum;
9use vortex_array::expr::Expression;
10use vortex_array::stats::{Precision, Stat, StatsProvider, StatsSet};
11use vortex_array::validity::Validity;
12use vortex_array::{Array, ArrayRef};
13use vortex_dtype::{DType, Nullability, PType, StructFields};
14use vortex_error::{VortexExpect, VortexResult, vortex_bail};
15use vortex_mask::Mask;
16
17use crate::layouts::zoned::builder::{
18    MAX_IS_TRUNCATED, MIN_IS_TRUNCATED, StatsArrayBuilder, stats_builder_with_capacity,
19};
20
21/// A zone map containing statistics for a column.
22/// Each row of the zone map corresponds to a chunk of the column.
23///
24/// Note that it's possible for the zone map to have no statistics.
25#[derive(Clone)]
26pub struct ZoneMap {
27    // The struct array backing the zone map
28    array: StructArray,
29    // The statistics that are included in the table.
30    stats: Arc<[Stat]>,
31}
32
33impl ZoneMap {
34    /// Create [`ZoneMap`] of given column_dtype from given array. Validates that the array matches expected
35    /// structure for given list of stats.
36    pub fn try_new(
37        column_dtype: DType,
38        array: StructArray,
39        stats: Arc<[Stat]>,
40    ) -> VortexResult<Self> {
41        let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
42        if &expected_dtype != array.dtype() {
43            vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
44        }
45
46        // SAFETY: We checked that the
47        Ok(unsafe { Self::new_unchecked(array, stats) })
48    }
49
50    /// Creates [`ZoneMap`] without validating return array against expected stats.
51    ///
52    /// # Safety
53    ///
54    /// Assumes that the input struct array has the correct statistics as fields. Or in other words,
55    /// the [`DType`] of the input array is equal to the result of [`Self::dtype_for_stats_table`].
56    pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
57        Self { array, stats }
58    }
59
60    /// Returns the [`DType`] of the statistics table given a set of statistics and column [`DType`].
61    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
62        assert!(present_stats.is_sorted(), "Stats must be sorted");
63        DType::Struct(
64            StructFields::from_iter(
65                present_stats
66                    .iter()
67                    .filter_map(|stat| {
68                        stat.dtype(column_dtype)
69                            .map(|dtype| (stat, dtype.as_nullable()))
70                    })
71                    .flat_map(|(s, dt)| match s {
72                        Stat::Max => vec![
73                            (s.name(), dt),
74                            (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
75                        ],
76                        Stat::Min => vec![
77                            (s.name(), dt),
78                            (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
79                        ],
80                        _ => vec![(s.name(), dt)],
81                    }),
82            ),
83            Nullability::NonNullable,
84        )
85    }
86
87    /// Returns the underlying [`StructArray`] backing the zone map
88    pub fn array(&self) -> &StructArray {
89        &self.array
90    }
91
92    /// Returns the list of stats included in the zone map.
93    pub fn present_stats(&self) -> &Arc<[Stat]> {
94        &self.stats
95    }
96
97    /// Returns an aggregated stats set for the table.
98    pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
99        let mut stats_set = StatsSet::default();
100        for &stat in stats {
101            let Some(array) = self.get_stat(stat)? else {
102                continue;
103            };
104
105            // Different stats need different aggregations
106            match stat {
107                // For stats that are associative, we can just compute them over the stat column
108                Stat::Min | Stat::Max | Stat::Sum => {
109                    if let Some(s) = array.statistics().compute_stat(stat)? {
110                        stats_set.set(stat, Precision::exact(s.into_value()))
111                    }
112                }
113                // These stats sum up
114                Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
115                    let sum = sum(&array)?
116                        .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
117                        .into_value();
118                    stats_set.set(stat, Precision::exact(sum));
119                }
120                // We could implement these aggregations in the future, but for now they're unused
121                Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
122            }
123        }
124        Ok(stats_set)
125    }
126
127    /// Returns the array for a given stat.
128    pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
129        Ok(self.array.field_by_name_opt(stat.name()).cloned())
130    }
131
132    /// Apply a pruning predicate against the ZoneMap, yielding a mask indicating which zones can
133    /// be pruned.
134    ///
135    /// The expression provided should be the result of converting an existing `VortexExpr` via
136    /// [`checked_pruning_expr`][vortex_array::expr::pruning::checked_pruning_expr] into a prunable
137    /// expression that can be evaluated on a zone map.
138    ///
139    /// All zones where the predicate evaluates to `true` can be skipped entirely.
140    pub fn prune(&self, predicate: &Expression) -> VortexResult<Mask> {
141        predicate
142            .evaluate(&self.array.to_array())?
143            .try_to_mask_fill_null_false()
144    }
145}
146
147// TODO(ngates): we should make it such that the zone map stores a mirror of the DType
148//  underneath each stats column. For example, `min: i32` for an `i32` array.
149//  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
150//  See: <https://github.com/vortex-data/vortex/issues/1835>
151/// Accumulates statistics for a column.
152pub struct StatsAccumulator {
153    builders: Vec<Box<dyn StatsArrayBuilder>>,
154    length: usize,
155}
156
157impl StatsAccumulator {
158    pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
159        let builders = stats
160            .iter()
161            .filter_map(|&s| {
162                s.dtype(dtype).map(|stat_dtype| {
163                    stats_builder_with_capacity(
164                        s,
165                        &stat_dtype.as_nullable(),
166                        1024,
167                        max_variable_length_statistics_size,
168                    )
169                })
170            })
171            .collect::<Vec<_>>();
172
173        Self {
174            builders,
175            length: 0,
176        }
177    }
178
179    pub fn push_chunk_without_compute(&mut self, array: &dyn Array) -> VortexResult<()> {
180        for builder in self.builders.iter_mut() {
181            if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
182                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
183            } else {
184                builder.append_null();
185            }
186        }
187        self.length += 1;
188        Ok(())
189    }
190
191    pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
192        for builder in self.builders.iter_mut() {
193            if let Some(v) = array.statistics().compute_stat(builder.stat())? {
194                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
195            } else {
196                builder.append_null();
197            }
198        }
199        self.length += 1;
200        Ok(())
201    }
202
203    /// Finishes the accumulator into a [`ZoneMap`].
204    ///
205    /// Returns `None` if none of the requested statistics can be computed, for example they are
206    /// not applicable to the column's data type.
207    pub fn as_stats_table(&mut self) -> Option<ZoneMap> {
208        let mut names = Vec::new();
209        let mut fields = Vec::new();
210        let mut stats = Vec::new();
211
212        for builder in self
213            .builders
214            .iter_mut()
215            // We sort the stats so the DType is deterministic based on which stats are present.
216            .sorted_unstable_by_key(|b| b.stat())
217        {
218            let values = builder.finish();
219
220            // We drop any all-null stats columns
221            if values.all_invalid() {
222                continue;
223            }
224
225            stats.push(builder.stat());
226            names.extend(values.names);
227            fields.extend(values.arrays);
228        }
229
230        if names.is_empty() {
231            return None;
232        }
233
234        Some(ZoneMap {
235            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
236                .vortex_expect("Failed to create zone map"),
237            stats: stats.into(),
238        })
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use std::sync::Arc;
245
246    use itertools::Itertools;
247    use rstest::rstest;
248    use vortex_array::arrays::{BoolArray, PrimitiveArray, StructArray};
249    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
250    use vortex_array::expr::pruning::checked_pruning_expr;
251    use vortex_array::expr::{gt, gt_eq, lit, lt, root};
252    use vortex_array::stats::Stat;
253    use vortex_array::validity::Validity;
254    use vortex_array::{IntoArray, ToCanonical};
255    use vortex_buffer::{BitBuffer, buffer};
256    use vortex_dtype::{DType, FieldPath, FieldPathSet, Nullability, PType};
257    use vortex_error::{VortexExpect, VortexUnwrap};
258
259    use crate::layouts::zoned::zone_map::{StatsAccumulator, ZoneMap};
260    use crate::layouts::zoned::{MAX_IS_TRUNCATED, MIN_IS_TRUNCATED};
261
262    #[rstest]
263    #[case(DType::Utf8(Nullability::NonNullable))]
264    #[case(DType::Binary(Nullability::NonNullable))]
265    fn truncates_accumulated_stats(#[case] dtype: DType) {
266        let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
267        builder.append_value("Value to be truncated");
268        builder.append_value("untruncated");
269        let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
270        builder2.append_value("Another");
271        builder2.append_value("wait a minute");
272        let mut acc =
273            StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
274        acc.push_chunk(&builder.finish()).vortex_unwrap();
275        acc.push_chunk(&builder2.finish()).vortex_unwrap();
276        let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
277        assert_eq!(
278            stats_table.array.names().as_ref(),
279            &[
280                Stat::Max.name(),
281                MAX_IS_TRUNCATED,
282                Stat::Min.name(),
283                MIN_IS_TRUNCATED,
284            ]
285        );
286        assert_eq!(
287            stats_table.array.fields()[1].to_bool().bit_buffer(),
288            &BitBuffer::from(vec![false, true])
289        );
290        assert_eq!(
291            stats_table.array.fields()[3].to_bool().bit_buffer(),
292            &BitBuffer::from(vec![true, false])
293        );
294    }
295
296    #[test]
297    fn always_adds_is_truncated_column() {
298        let array = buffer![0, 1, 2].into_array();
299        let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
300        acc.push_chunk(&array).vortex_unwrap();
301        let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
302        assert_eq!(
303            stats_table.array.names().as_ref(),
304            &[
305                Stat::Max.name(),
306                MAX_IS_TRUNCATED,
307                Stat::Min.name(),
308                MIN_IS_TRUNCATED,
309                Stat::Sum.name(),
310            ]
311        );
312        assert_eq!(
313            stats_table.array.fields()[1].to_bool().bit_buffer(),
314            &BitBuffer::from(vec![false])
315        );
316        assert_eq!(
317            stats_table.array.fields()[3].to_bool().bit_buffer(),
318            &BitBuffer::from(vec![false])
319        );
320    }
321
322    #[rstest]
323    fn test_zone_map_prunes() {
324        // All stats that are known at pruning time.
325        let stats = FieldPathSet::from_iter([
326            FieldPath::from_iter([Stat::Min.name().into()]),
327            FieldPath::from_iter([Stat::Max.name().into()]),
328        ]);
329
330        // Construct a zone map with 3 zones:
331        //
332        // +----------+----------+
333        // |  a_min   |  a_max   |
334        // +----------+----------+
335        // |  1       |  5       |
336        // +----------+----------+
337        // |  2       |  6       |
338        // +----------+----------+
339        // |  3       |  7       |
340        // +----------+----------+
341        let zone_map = ZoneMap::try_new(
342            PType::I32.into(),
343            StructArray::from_fields(&[
344                (
345                    "max",
346                    PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
347                ),
348                (
349                    "max_is_truncated",
350                    BoolArray::from_iter([false, false, false]).into_array(),
351                ),
352                (
353                    "min",
354                    PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
355                ),
356                (
357                    "min_is_truncated",
358                    BoolArray::from_iter([false, false, false]).into_array(),
359                ),
360            ])
361            .unwrap(),
362            Arc::new([Stat::Max, Stat::Min]),
363        )
364        .unwrap();
365
366        // A >= 6
367        // => A.max < 6
368        let expr = gt_eq(root(), lit(6i32));
369        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
370        let mask = zone_map.prune(&pruning_expr).unwrap();
371        assert_eq!(
372            mask.to_bit_buffer().into_iter().collect_vec(),
373            vec![true, false, false]
374        );
375
376        // A > 5
377        // => A.max <= 5
378        let expr = gt(root(), lit(5i32));
379        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
380        let mask = zone_map.prune(&pruning_expr).unwrap();
381        assert_eq!(
382            mask.to_bit_buffer().into_iter().collect_vec(),
383            vec![true, false, false]
384        );
385
386        // A < 2
387        // => A.min >= 2
388        let expr = lt(root(), lit(2i32));
389        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
390        let mask = zone_map.prune(&pruning_expr).unwrap();
391        assert_eq!(
392            mask.to_bit_buffer().into_iter().collect_vec(),
393            vec![false, true, true]
394        );
395    }
396}