Skip to main content

vortex_layout/layouts/zoned/
zone_map.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::ArrayRef;
8use vortex_array::DynArray;
9use vortex_array::ExecutionCtx;
10use vortex_array::IntoArray;
11use vortex_array::VortexSessionExecute;
12use vortex_array::aggregate_fn::fns::sum::sum;
13use vortex_array::arrays::StructArray;
14use vortex_array::dtype::DType;
15use vortex_array::dtype::Nullability;
16use vortex_array::dtype::PType;
17use vortex_array::dtype::StructFields;
18use vortex_array::expr::Expression;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::expr::stats::StatsProvider;
22use vortex_array::stats::StatsSet;
23use vortex_array::validity::Validity;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_mask::Mask;
28use vortex_session::VortexSession;
29
30use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
31use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
32use crate::layouts::zoned::builder::StatsArrayBuilder;
33use crate::layouts::zoned::builder::stats_builder_with_capacity;
34
35/// A zone map containing statistics for a column.
36/// Each row of the zone map corresponds to a chunk of the column.
37///
38/// Note that it's possible for the zone map to have no statistics.
39#[derive(Clone)]
40pub struct ZoneMap {
41    // The struct array backing the zone map
42    array: StructArray,
43    // The statistics that are included in the table.
44    stats: Arc<[Stat]>,
45}
46
47impl ZoneMap {
48    /// Create [`ZoneMap`] of given column_dtype from given array. Validates that the array matches expected
49    /// structure for given list of stats.
50    pub fn try_new(
51        column_dtype: DType,
52        array: StructArray,
53        stats: Arc<[Stat]>,
54    ) -> VortexResult<Self> {
55        let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
56        if &expected_dtype != array.dtype() {
57            vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
58        }
59
60        // SAFETY: We checked that the
61        Ok(unsafe { Self::new_unchecked(array, stats) })
62    }
63
64    /// Creates [`ZoneMap`] without validating return array against expected stats.
65    ///
66    /// # Safety
67    ///
68    /// Assumes that the input struct array has the correct statistics as fields. Or in other words,
69    /// the [`DType`] of the input array is equal to the result of [`Self::dtype_for_stats_table`].
70    pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
71        Self { array, stats }
72    }
73
74    /// Returns the [`DType`] of the statistics table given a set of statistics and column [`DType`].
75    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
76        assert!(present_stats.is_sorted(), "Stats must be sorted");
77        DType::Struct(
78            StructFields::from_iter(
79                present_stats
80                    .iter()
81                    .filter_map(|stat| {
82                        stat.dtype(column_dtype)
83                            .or_else(|| {
84                                // Backward compat: older files may have stored stats (e.g. Sum)
85                                // for extension types by resolving through the storage dtype.
86                                if let DType::Extension(ext) = column_dtype {
87                                    stat.dtype(ext.storage_dtype())
88                                } else {
89                                    None
90                                }
91                            })
92                            .map(|dtype| (stat, dtype.as_nullable()))
93                    })
94                    .flat_map(|(s, dt)| match s {
95                        Stat::Max => vec![
96                            (s.name(), dt),
97                            (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
98                        ],
99                        Stat::Min => vec![
100                            (s.name(), dt),
101                            (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
102                        ],
103                        _ => vec![(s.name(), dt)],
104                    }),
105            ),
106            Nullability::NonNullable,
107        )
108    }
109
110    /// Returns the underlying [`StructArray`] backing the zone map
111    pub fn array(&self) -> &StructArray {
112        &self.array
113    }
114
115    /// Returns the list of stats included in the zone map.
116    pub fn present_stats(&self) -> &Arc<[Stat]> {
117        &self.stats
118    }
119
120    /// Returns an aggregated stats set for the table.
121    pub fn to_stats_set(&self, stats: &[Stat], ctx: &mut ExecutionCtx) -> VortexResult<StatsSet> {
122        let mut stats_set = StatsSet::default();
123        for &stat in stats {
124            let Some(array) = self.get_stat(stat)? else {
125                continue;
126            };
127
128            // Different stats need different aggregations
129            match stat {
130                // For stats that are associative, we can just compute them over the stat column
131                Stat::Min | Stat::Max | Stat::Sum => {
132                    if let Some(s) = array.statistics().compute_stat(stat)?
133                        && let Some(v) = s.into_value()
134                    {
135                        stats_set.set(stat, Precision::exact(v))
136                    }
137                }
138                // These stats sum up
139                Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
140                    if let Some(sum_value) = sum(&array, ctx)?
141                        .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
142                        .into_value()
143                    {
144                        stats_set.set(stat, Precision::exact(sum_value));
145                    }
146                }
147                // We could implement these aggregations in the future, but for now they're unused
148                Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
149            }
150        }
151        Ok(stats_set)
152    }
153
154    /// Returns the array for a given stat.
155    pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
156        Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
157    }
158
159    /// Apply a pruning predicate against the ZoneMap, yielding a mask indicating which zones can
160    /// be pruned.
161    ///
162    /// The expression provided should be the result of converting an existing `VortexExpr` via
163    /// [`checked_pruning_expr`][vortex_array::expr::pruning::checked_pruning_expr] into a prunable
164    /// expression that can be evaluated on a zone map.
165    ///
166    /// All zones where the predicate evaluates to `true` can be skipped entirely.
167    pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
168        let mut ctx = session.create_execution_ctx();
169        self.array
170            .clone()
171            .into_array()
172            .apply(predicate)?
173            .execute::<Mask>(&mut ctx)
174    }
175}
176
177// TODO(ngates): we should make it such that the zone map stores a mirror of the DType
178//  underneath each stats column. For example, `min: i32` for an `i32` array.
179//  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
180//  See: <https://github.com/vortex-data/vortex/issues/1835>
181/// Accumulates statistics for a column.
182pub struct StatsAccumulator {
183    builders: Vec<Box<dyn StatsArrayBuilder>>,
184    length: usize,
185}
186
187impl StatsAccumulator {
188    pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
189        let builders = stats
190            .iter()
191            .filter_map(|&s| {
192                s.dtype(dtype).map(|stat_dtype| {
193                    stats_builder_with_capacity(
194                        s,
195                        &stat_dtype.as_nullable(),
196                        1024,
197                        max_variable_length_statistics_size,
198                    )
199                })
200            })
201            .collect::<Vec<_>>();
202
203        Self {
204            builders,
205            length: 0,
206        }
207    }
208
209    pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
210        for builder in self.builders.iter_mut() {
211            if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
212                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
213            } else {
214                builder.append_null();
215            }
216        }
217        self.length += 1;
218        Ok(())
219    }
220
221    pub fn push_chunk(&mut self, array: &ArrayRef) -> VortexResult<()> {
222        for builder in self.builders.iter_mut() {
223            if let Some(v) = array.statistics().compute_stat(builder.stat())? {
224                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
225            } else {
226                builder.append_null();
227            }
228        }
229        self.length += 1;
230        Ok(())
231    }
232
233    /// Finishes the accumulator into a [`ZoneMap`].
234    ///
235    /// Returns `None` if none of the requested statistics can be computed, for example they are
236    /// not applicable to the column's data type.
237    pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
238        let mut names = Vec::new();
239        let mut fields = Vec::new();
240        let mut stats = Vec::new();
241
242        for builder in self
243            .builders
244            .iter_mut()
245            // We sort the stats so the DType is deterministic based on which stats are present.
246            .sorted_unstable_by_key(|b| b.stat())
247        {
248            let values = builder.finish();
249
250            // We drop any all-null stats columns
251            if values.all_invalid()? {
252                continue;
253            }
254
255            stats.push(builder.stat());
256            names.extend(values.names);
257            fields.extend(values.arrays);
258        }
259
260        if names.is_empty() {
261            return Ok(None);
262        }
263
264        Ok(Some(ZoneMap {
265            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
266                .vortex_expect("Failed to create zone map"),
267            stats: stats.into(),
268        }))
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::sync::Arc;
275
276    use rstest::rstest;
277    use vortex_array::IntoArray;
278    use vortex_array::ToCanonical;
279    use vortex_array::arrays::BoolArray;
280    use vortex_array::arrays::PrimitiveArray;
281    use vortex_array::arrays::StructArray;
282    use vortex_array::assert_arrays_eq;
283    use vortex_array::builders::ArrayBuilder;
284    use vortex_array::builders::VarBinViewBuilder;
285    use vortex_array::dtype::DType;
286    use vortex_array::dtype::FieldPath;
287    use vortex_array::dtype::FieldPathSet;
288    use vortex_array::dtype::Nullability;
289    use vortex_array::dtype::PType;
290    use vortex_array::expr::gt;
291    use vortex_array::expr::gt_eq;
292    use vortex_array::expr::lit;
293    use vortex_array::expr::lt;
294    use vortex_array::expr::pruning::checked_pruning_expr;
295    use vortex_array::expr::root;
296    use vortex_array::expr::stats::Stat;
297    use vortex_array::validity::Validity;
298    use vortex_buffer::BitBuffer;
299    use vortex_buffer::buffer;
300    use vortex_error::VortexExpect;
301
302    use crate::layouts::zoned::MAX_IS_TRUNCATED;
303    use crate::layouts::zoned::MIN_IS_TRUNCATED;
304    use crate::layouts::zoned::zone_map::StatsAccumulator;
305    use crate::layouts::zoned::zone_map::ZoneMap;
306    use crate::test::SESSION;
307
308    #[rstest]
309    #[case(DType::Utf8(Nullability::NonNullable))]
310    #[case(DType::Binary(Nullability::NonNullable))]
311    fn truncates_accumulated_stats(#[case] dtype: DType) {
312        let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
313        builder.append_value("Value to be truncated");
314        builder.append_value("untruncated");
315        let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
316        builder2.append_value("Another");
317        builder2.append_value("wait a minute");
318        let mut acc =
319            StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
320        acc.push_chunk(&builder.finish())
321            .vortex_expect("push_chunk should succeed for test data");
322        acc.push_chunk(&builder2.finish())
323            .vortex_expect("push_chunk should succeed for test data");
324        let stats_table = acc
325            .as_stats_table()
326            .unwrap()
327            .expect("Must have stats table");
328        assert_eq!(
329            stats_table.array.names().as_ref(),
330            &[
331                Stat::Max.name(),
332                MAX_IS_TRUNCATED,
333                Stat::Min.name(),
334                MIN_IS_TRUNCATED,
335            ]
336        );
337        assert_eq!(
338            stats_table.array.unmasked_fields()[1]
339                .to_bool()
340                .to_bit_buffer(),
341            BitBuffer::from(vec![false, true])
342        );
343        assert_eq!(
344            stats_table.array.unmasked_fields()[3]
345                .to_bool()
346                .to_bit_buffer(),
347            BitBuffer::from(vec![true, false])
348        );
349    }
350
351    #[test]
352    fn always_adds_is_truncated_column() {
353        let array = buffer![0, 1, 2].into_array();
354        let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
355        acc.push_chunk(&array)
356            .vortex_expect("push_chunk should succeed for test array");
357        let stats_table = acc
358            .as_stats_table()
359            .unwrap()
360            .expect("Must have stats table");
361        assert_eq!(
362            stats_table.array.names().as_ref(),
363            &[
364                Stat::Max.name(),
365                MAX_IS_TRUNCATED,
366                Stat::Min.name(),
367                MIN_IS_TRUNCATED,
368                Stat::Sum.name(),
369            ]
370        );
371        assert_eq!(
372            stats_table.array.unmasked_fields()[1]
373                .to_bool()
374                .to_bit_buffer(),
375            BitBuffer::from(vec![false])
376        );
377        assert_eq!(
378            stats_table.array.unmasked_fields()[3]
379                .to_bool()
380                .to_bit_buffer(),
381            BitBuffer::from(vec![false])
382        );
383    }
384
385    #[rstest]
386    fn test_zone_map_prunes() {
387        // All stats that are known at pruning time.
388        let stats = FieldPathSet::from_iter([
389            FieldPath::from_iter([Stat::Min.name().into()]),
390            FieldPath::from_iter([Stat::Max.name().into()]),
391        ]);
392
393        // Construct a zone map with 3 zones:
394        //
395        // +----------+----------+
396        // |  a_min   |  a_max   |
397        // +----------+----------+
398        // |  1       |  5       |
399        // +----------+----------+
400        // |  2       |  6       |
401        // +----------+----------+
402        // |  3       |  7       |
403        // +----------+----------+
404        let zone_map = ZoneMap::try_new(
405            PType::I32.into(),
406            StructArray::from_fields(&[
407                (
408                    "max",
409                    PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
410                ),
411                (
412                    "max_is_truncated",
413                    BoolArray::from_iter([false, false, false]).into_array(),
414                ),
415                (
416                    "min",
417                    PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
418                ),
419                (
420                    "min_is_truncated",
421                    BoolArray::from_iter([false, false, false]).into_array(),
422                ),
423            ])
424            .unwrap(),
425            Arc::new([Stat::Max, Stat::Min]),
426        )
427        .unwrap();
428
429        // A >= 6
430        // => A.max < 6
431        let expr = gt_eq(root(), lit(6i32));
432        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
433        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
434        assert_arrays_eq!(
435            mask.into_array(),
436            BoolArray::from_iter([true, false, false])
437        );
438
439        // A > 5
440        // => A.max <= 5
441        let expr = gt(root(), lit(5i32));
442        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
443        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
444        assert_arrays_eq!(
445            mask.into_array(),
446            BoolArray::from_iter([true, false, false])
447        );
448
449        // A < 2
450        // => A.min >= 2
451        let expr = lt(root(), lit(2i32));
452        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
453        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
454        assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
455    }
456}