Skip to main content

vortex_layout/layouts/zoned/
zone_map.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::ArrayRef;
8use vortex_array::DynArray;
9use vortex_array::IntoArray;
10use vortex_array::VortexSessionExecute;
11use vortex_array::arrays::StructArray;
12use vortex_array::compute::sum;
13use vortex_array::dtype::DType;
14use vortex_array::dtype::Nullability;
15use vortex_array::dtype::PType;
16use vortex_array::dtype::StructFields;
17use vortex_array::expr::Expression;
18use vortex_array::expr::stats::Precision;
19use vortex_array::expr::stats::Stat;
20use vortex_array::expr::stats::StatsProvider;
21use vortex_array::stats::StatsSet;
22use vortex_array::validity::Validity;
23use vortex_error::VortexExpect;
24use vortex_error::VortexResult;
25use vortex_error::vortex_bail;
26use vortex_mask::Mask;
27use vortex_session::VortexSession;
28
29use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
30use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
31use crate::layouts::zoned::builder::StatsArrayBuilder;
32use crate::layouts::zoned::builder::stats_builder_with_capacity;
33
34/// A zone map containing statistics for a column.
35/// Each row of the zone map corresponds to a chunk of the column.
36///
37/// Note that it's possible for the zone map to have no statistics.
38#[derive(Clone)]
39pub struct ZoneMap {
40    // The struct array backing the zone map
41    array: StructArray,
42    // The statistics that are included in the table.
43    stats: Arc<[Stat]>,
44}
45
46impl ZoneMap {
47    /// Create [`ZoneMap`] of given column_dtype from given array. Validates that the array matches expected
48    /// structure for given list of stats.
49    pub fn try_new(
50        column_dtype: DType,
51        array: StructArray,
52        stats: Arc<[Stat]>,
53    ) -> VortexResult<Self> {
54        let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
55        if &expected_dtype != array.dtype() {
56            vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
57        }
58
59        // SAFETY: We checked that the
60        Ok(unsafe { Self::new_unchecked(array, stats) })
61    }
62
63    /// Creates [`ZoneMap`] without validating return array against expected stats.
64    ///
65    /// # Safety
66    ///
67    /// Assumes that the input struct array has the correct statistics as fields. Or in other words,
68    /// the [`DType`] of the input array is equal to the result of [`Self::dtype_for_stats_table`].
69    pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
70        Self { array, stats }
71    }
72
73    /// Returns the [`DType`] of the statistics table given a set of statistics and column [`DType`].
74    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
75        assert!(present_stats.is_sorted(), "Stats must be sorted");
76        DType::Struct(
77            StructFields::from_iter(
78                present_stats
79                    .iter()
80                    .filter_map(|stat| {
81                        stat.dtype(column_dtype)
82                            .map(|dtype| (stat, dtype.as_nullable()))
83                    })
84                    .flat_map(|(s, dt)| match s {
85                        Stat::Max => vec![
86                            (s.name(), dt),
87                            (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
88                        ],
89                        Stat::Min => vec![
90                            (s.name(), dt),
91                            (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
92                        ],
93                        _ => vec![(s.name(), dt)],
94                    }),
95            ),
96            Nullability::NonNullable,
97        )
98    }
99
100    /// Returns the underlying [`StructArray`] backing the zone map
101    pub fn array(&self) -> &StructArray {
102        &self.array
103    }
104
105    /// Returns the list of stats included in the zone map.
106    pub fn present_stats(&self) -> &Arc<[Stat]> {
107        &self.stats
108    }
109
110    /// Returns an aggregated stats set for the table.
111    pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
112        let mut stats_set = StatsSet::default();
113        for &stat in stats {
114            let Some(array) = self.get_stat(stat)? else {
115                continue;
116            };
117
118            // Different stats need different aggregations
119            match stat {
120                // For stats that are associative, we can just compute them over the stat column
121                Stat::Min | Stat::Max | Stat::Sum => {
122                    if let Some(s) = array.statistics().compute_stat(stat)?
123                        && let Some(v) = s.into_value()
124                    {
125                        stats_set.set(stat, Precision::exact(v))
126                    }
127                }
128                // These stats sum up
129                Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
130                    if let Some(sum_value) = sum(&array)?
131                        .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
132                        .into_value()
133                    {
134                        stats_set.set(stat, Precision::exact(sum_value));
135                    }
136                }
137                // We could implement these aggregations in the future, but for now they're unused
138                Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
139            }
140        }
141        Ok(stats_set)
142    }
143
144    /// Returns the array for a given stat.
145    pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
146        Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
147    }
148
149    /// Apply a pruning predicate against the ZoneMap, yielding a mask indicating which zones can
150    /// be pruned.
151    ///
152    /// The expression provided should be the result of converting an existing `VortexExpr` via
153    /// [`checked_pruning_expr`][vortex_array::expr::pruning::checked_pruning_expr] into a prunable
154    /// expression that can be evaluated on a zone map.
155    ///
156    /// All zones where the predicate evaluates to `true` can be skipped entirely.
157    pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
158        let mut ctx = session.create_execution_ctx();
159        self.array
160            .clone()
161            .into_array()
162            .apply(predicate)?
163            .execute::<Mask>(&mut ctx)
164    }
165}
166
167// TODO(ngates): we should make it such that the zone map stores a mirror of the DType
168//  underneath each stats column. For example, `min: i32` for an `i32` array.
169//  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
170//  See: <https://github.com/vortex-data/vortex/issues/1835>
171/// Accumulates statistics for a column.
172pub struct StatsAccumulator {
173    builders: Vec<Box<dyn StatsArrayBuilder>>,
174    length: usize,
175}
176
177impl StatsAccumulator {
178    pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
179        let builders = stats
180            .iter()
181            .filter_map(|&s| {
182                s.dtype(dtype).map(|stat_dtype| {
183                    stats_builder_with_capacity(
184                        s,
185                        &stat_dtype.as_nullable(),
186                        1024,
187                        max_variable_length_statistics_size,
188                    )
189                })
190            })
191            .collect::<Vec<_>>();
192
193        Self {
194            builders,
195            length: 0,
196        }
197    }
198
199    pub fn push_chunk_without_compute(&mut self, array: &ArrayRef) -> VortexResult<()> {
200        for builder in self.builders.iter_mut() {
201            if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
202                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
203            } else {
204                builder.append_null();
205            }
206        }
207        self.length += 1;
208        Ok(())
209    }
210
211    pub fn push_chunk(&mut self, array: &ArrayRef) -> VortexResult<()> {
212        for builder in self.builders.iter_mut() {
213            if let Some(v) = array.statistics().compute_stat(builder.stat())? {
214                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
215            } else {
216                builder.append_null();
217            }
218        }
219        self.length += 1;
220        Ok(())
221    }
222
223    /// Finishes the accumulator into a [`ZoneMap`].
224    ///
225    /// Returns `None` if none of the requested statistics can be computed, for example they are
226    /// not applicable to the column's data type.
227    pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
228        let mut names = Vec::new();
229        let mut fields = Vec::new();
230        let mut stats = Vec::new();
231
232        for builder in self
233            .builders
234            .iter_mut()
235            // We sort the stats so the DType is deterministic based on which stats are present.
236            .sorted_unstable_by_key(|b| b.stat())
237        {
238            let values = builder.finish();
239
240            // We drop any all-null stats columns
241            if values.all_invalid()? {
242                continue;
243            }
244
245            stats.push(builder.stat());
246            names.extend(values.names);
247            fields.extend(values.arrays);
248        }
249
250        if names.is_empty() {
251            return Ok(None);
252        }
253
254        Ok(Some(ZoneMap {
255            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
256                .vortex_expect("Failed to create zone map"),
257            stats: stats.into(),
258        }))
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use std::sync::Arc;
265
266    use rstest::rstest;
267    use vortex_array::IntoArray;
268    use vortex_array::ToCanonical;
269    use vortex_array::arrays::BoolArray;
270    use vortex_array::arrays::PrimitiveArray;
271    use vortex_array::arrays::StructArray;
272    use vortex_array::assert_arrays_eq;
273    use vortex_array::builders::ArrayBuilder;
274    use vortex_array::builders::VarBinViewBuilder;
275    use vortex_array::dtype::DType;
276    use vortex_array::dtype::FieldPath;
277    use vortex_array::dtype::FieldPathSet;
278    use vortex_array::dtype::Nullability;
279    use vortex_array::dtype::PType;
280    use vortex_array::expr::gt;
281    use vortex_array::expr::gt_eq;
282    use vortex_array::expr::lit;
283    use vortex_array::expr::lt;
284    use vortex_array::expr::pruning::checked_pruning_expr;
285    use vortex_array::expr::root;
286    use vortex_array::expr::stats::Stat;
287    use vortex_array::validity::Validity;
288    use vortex_buffer::BitBuffer;
289    use vortex_buffer::buffer;
290    use vortex_error::VortexExpect;
291
292    use crate::layouts::zoned::MAX_IS_TRUNCATED;
293    use crate::layouts::zoned::MIN_IS_TRUNCATED;
294    use crate::layouts::zoned::zone_map::StatsAccumulator;
295    use crate::layouts::zoned::zone_map::ZoneMap;
296    use crate::test::SESSION;
297
298    #[rstest]
299    #[case(DType::Utf8(Nullability::NonNullable))]
300    #[case(DType::Binary(Nullability::NonNullable))]
301    fn truncates_accumulated_stats(#[case] dtype: DType) {
302        let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
303        builder.append_value("Value to be truncated");
304        builder.append_value("untruncated");
305        let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
306        builder2.append_value("Another");
307        builder2.append_value("wait a minute");
308        let mut acc =
309            StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
310        acc.push_chunk(&builder.finish())
311            .vortex_expect("push_chunk should succeed for test data");
312        acc.push_chunk(&builder2.finish())
313            .vortex_expect("push_chunk should succeed for test data");
314        let stats_table = acc
315            .as_stats_table()
316            .unwrap()
317            .expect("Must have stats table");
318        assert_eq!(
319            stats_table.array.names().as_ref(),
320            &[
321                Stat::Max.name(),
322                MAX_IS_TRUNCATED,
323                Stat::Min.name(),
324                MIN_IS_TRUNCATED,
325            ]
326        );
327        assert_eq!(
328            stats_table.array.unmasked_fields()[1]
329                .to_bool()
330                .to_bit_buffer(),
331            BitBuffer::from(vec![false, true])
332        );
333        assert_eq!(
334            stats_table.array.unmasked_fields()[3]
335                .to_bool()
336                .to_bit_buffer(),
337            BitBuffer::from(vec![true, false])
338        );
339    }
340
341    #[test]
342    fn always_adds_is_truncated_column() {
343        let array = buffer![0, 1, 2].into_array();
344        let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
345        acc.push_chunk(&array)
346            .vortex_expect("push_chunk should succeed for test array");
347        let stats_table = acc
348            .as_stats_table()
349            .unwrap()
350            .expect("Must have stats table");
351        assert_eq!(
352            stats_table.array.names().as_ref(),
353            &[
354                Stat::Max.name(),
355                MAX_IS_TRUNCATED,
356                Stat::Min.name(),
357                MIN_IS_TRUNCATED,
358                Stat::Sum.name(),
359            ]
360        );
361        assert_eq!(
362            stats_table.array.unmasked_fields()[1]
363                .to_bool()
364                .to_bit_buffer(),
365            BitBuffer::from(vec![false])
366        );
367        assert_eq!(
368            stats_table.array.unmasked_fields()[3]
369                .to_bool()
370                .to_bit_buffer(),
371            BitBuffer::from(vec![false])
372        );
373    }
374
375    #[rstest]
376    fn test_zone_map_prunes() {
377        // All stats that are known at pruning time.
378        let stats = FieldPathSet::from_iter([
379            FieldPath::from_iter([Stat::Min.name().into()]),
380            FieldPath::from_iter([Stat::Max.name().into()]),
381        ]);
382
383        // Construct a zone map with 3 zones:
384        //
385        // +----------+----------+
386        // |  a_min   |  a_max   |
387        // +----------+----------+
388        // |  1       |  5       |
389        // +----------+----------+
390        // |  2       |  6       |
391        // +----------+----------+
392        // |  3       |  7       |
393        // +----------+----------+
394        let zone_map = ZoneMap::try_new(
395            PType::I32.into(),
396            StructArray::from_fields(&[
397                (
398                    "max",
399                    PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
400                ),
401                (
402                    "max_is_truncated",
403                    BoolArray::from_iter([false, false, false]).into_array(),
404                ),
405                (
406                    "min",
407                    PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
408                ),
409                (
410                    "min_is_truncated",
411                    BoolArray::from_iter([false, false, false]).into_array(),
412                ),
413            ])
414            .unwrap(),
415            Arc::new([Stat::Max, Stat::Min]),
416        )
417        .unwrap();
418
419        // A >= 6
420        // => A.max < 6
421        let expr = gt_eq(root(), lit(6i32));
422        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
423        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
424        assert_arrays_eq!(
425            mask.into_array(),
426            BoolArray::from_iter([true, false, false])
427        );
428
429        // A > 5
430        // => A.max <= 5
431        let expr = gt(root(), lit(5i32));
432        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
433        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
434        assert_arrays_eq!(
435            mask.into_array(),
436            BoolArray::from_iter([true, false, false])
437        );
438
439        // A < 2
440        // => A.min >= 2
441        let expr = lt(root(), lit(2i32));
442        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
443        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
444        assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
445    }
446}