Skip to main content

vortex_layout/layouts/zoned/
zone_map.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::Array;
8use vortex_array::ArrayRef;
9use vortex_array::VortexSessionExecute;
10use vortex_array::arrays::StructArray;
11use vortex_array::compute::sum;
12use vortex_array::expr::Expression;
13use vortex_array::expr::stats::Precision;
14use vortex_array::expr::stats::Stat;
15use vortex_array::expr::stats::StatsProvider;
16use vortex_array::stats::StatsSet;
17use vortex_array::validity::Validity;
18use vortex_dtype::DType;
19use vortex_dtype::Nullability;
20use vortex_dtype::PType;
21use vortex_dtype::StructFields;
22use vortex_error::VortexExpect;
23use vortex_error::VortexResult;
24use vortex_error::vortex_bail;
25use vortex_mask::Mask;
26use vortex_session::VortexSession;
27
28use crate::layouts::zoned::builder::MAX_IS_TRUNCATED;
29use crate::layouts::zoned::builder::MIN_IS_TRUNCATED;
30use crate::layouts::zoned::builder::StatsArrayBuilder;
31use crate::layouts::zoned::builder::stats_builder_with_capacity;
32
33/// A zone map containing statistics for a column.
34/// Each row of the zone map corresponds to a chunk of the column.
35///
36/// Note that it's possible for the zone map to have no statistics.
37#[derive(Clone)]
38pub struct ZoneMap {
39    // The struct array backing the zone map
40    array: StructArray,
41    // The statistics that are included in the table.
42    stats: Arc<[Stat]>,
43}
44
45impl ZoneMap {
46    /// Create [`ZoneMap`] of given column_dtype from given array. Validates that the array matches expected
47    /// structure for given list of stats.
48    pub fn try_new(
49        column_dtype: DType,
50        array: StructArray,
51        stats: Arc<[Stat]>,
52    ) -> VortexResult<Self> {
53        let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
54        if &expected_dtype != array.dtype() {
55            vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
56        }
57
58        // SAFETY: We checked that the
59        Ok(unsafe { Self::new_unchecked(array, stats) })
60    }
61
62    /// Creates [`ZoneMap`] without validating return array against expected stats.
63    ///
64    /// # Safety
65    ///
66    /// Assumes that the input struct array has the correct statistics as fields. Or in other words,
67    /// the [`DType`] of the input array is equal to the result of [`Self::dtype_for_stats_table`].
68    pub unsafe fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
69        Self { array, stats }
70    }
71
72    /// Returns the [`DType`] of the statistics table given a set of statistics and column [`DType`].
73    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
74        assert!(present_stats.is_sorted(), "Stats must be sorted");
75        DType::Struct(
76            StructFields::from_iter(
77                present_stats
78                    .iter()
79                    .filter_map(|stat| {
80                        stat.dtype(column_dtype)
81                            .map(|dtype| (stat, dtype.as_nullable()))
82                    })
83                    .flat_map(|(s, dt)| match s {
84                        Stat::Max => vec![
85                            (s.name(), dt),
86                            (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
87                        ],
88                        Stat::Min => vec![
89                            (s.name(), dt),
90                            (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
91                        ],
92                        _ => vec![(s.name(), dt)],
93                    }),
94            ),
95            Nullability::NonNullable,
96        )
97    }
98
99    /// Returns the underlying [`StructArray`] backing the zone map
100    pub fn array(&self) -> &StructArray {
101        &self.array
102    }
103
104    /// Returns the list of stats included in the zone map.
105    pub fn present_stats(&self) -> &Arc<[Stat]> {
106        &self.stats
107    }
108
109    /// Returns an aggregated stats set for the table.
110    pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
111        let mut stats_set = StatsSet::default();
112        for &stat in stats {
113            let Some(array) = self.get_stat(stat)? else {
114                continue;
115            };
116
117            // Different stats need different aggregations
118            match stat {
119                // For stats that are associative, we can just compute them over the stat column
120                Stat::Min | Stat::Max | Stat::Sum => {
121                    if let Some(s) = array.statistics().compute_stat(stat)?
122                        && let Some(v) = s.into_value()
123                    {
124                        stats_set.set(stat, Precision::exact(v))
125                    }
126                }
127                // These stats sum up
128                Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
129                    if let Some(sum_value) = sum(&array)?
130                        .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
131                        .into_value()
132                    {
133                        stats_set.set(stat, Precision::exact(sum_value));
134                    }
135                }
136                // We could implement these aggregations in the future, but for now they're unused
137                Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
138            }
139        }
140        Ok(stats_set)
141    }
142
143    /// Returns the array for a given stat.
144    pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
145        Ok(self.array.unmasked_field_by_name_opt(stat.name()).cloned())
146    }
147
148    /// Apply a pruning predicate against the ZoneMap, yielding a mask indicating which zones can
149    /// be pruned.
150    ///
151    /// The expression provided should be the result of converting an existing `VortexExpr` via
152    /// [`checked_pruning_expr`][vortex_array::expr::pruning::checked_pruning_expr] into a prunable
153    /// expression that can be evaluated on a zone map.
154    ///
155    /// All zones where the predicate evaluates to `true` can be skipped entirely.
156    pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
157        let mut ctx = session.create_execution_ctx();
158        self.array
159            .to_array()
160            .apply(predicate)?
161            .execute::<Mask>(&mut ctx)
162    }
163}
164
165// TODO(ngates): we should make it such that the zone map stores a mirror of the DType
166//  underneath each stats column. For example, `min: i32` for an `i32` array.
167//  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
168//  See: <https://github.com/vortex-data/vortex/issues/1835>
169/// Accumulates statistics for a column.
170pub struct StatsAccumulator {
171    builders: Vec<Box<dyn StatsArrayBuilder>>,
172    length: usize,
173}
174
175impl StatsAccumulator {
176    pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
177        let builders = stats
178            .iter()
179            .filter_map(|&s| {
180                s.dtype(dtype).map(|stat_dtype| {
181                    stats_builder_with_capacity(
182                        s,
183                        &stat_dtype.as_nullable(),
184                        1024,
185                        max_variable_length_statistics_size,
186                    )
187                })
188            })
189            .collect::<Vec<_>>();
190
191        Self {
192            builders,
193            length: 0,
194        }
195    }
196
197    pub fn push_chunk_without_compute(&mut self, array: &dyn Array) -> VortexResult<()> {
198        for builder in self.builders.iter_mut() {
199            if let Some(Precision::Exact(v)) = array.statistics().get(builder.stat()) {
200                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
201            } else {
202                builder.append_null();
203            }
204        }
205        self.length += 1;
206        Ok(())
207    }
208
209    pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
210        for builder in self.builders.iter_mut() {
211            if let Some(v) = array.statistics().compute_stat(builder.stat())? {
212                builder.append_scalar(v.cast(&v.dtype().as_nullable())?)?;
213            } else {
214                builder.append_null();
215            }
216        }
217        self.length += 1;
218        Ok(())
219    }
220
221    /// Finishes the accumulator into a [`ZoneMap`].
222    ///
223    /// Returns `None` if none of the requested statistics can be computed, for example they are
224    /// not applicable to the column's data type.
225    pub fn as_stats_table(&mut self) -> VortexResult<Option<ZoneMap>> {
226        let mut names = Vec::new();
227        let mut fields = Vec::new();
228        let mut stats = Vec::new();
229
230        for builder in self
231            .builders
232            .iter_mut()
233            // We sort the stats so the DType is deterministic based on which stats are present.
234            .sorted_unstable_by_key(|b| b.stat())
235        {
236            let values = builder.finish();
237
238            // We drop any all-null stats columns
239            if values.all_invalid()? {
240                continue;
241            }
242
243            stats.push(builder.stat());
244            names.extend(values.names);
245            fields.extend(values.arrays);
246        }
247
248        if names.is_empty() {
249            return Ok(None);
250        }
251
252        Ok(Some(ZoneMap {
253            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
254                .vortex_expect("Failed to create zone map"),
255            stats: stats.into(),
256        }))
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use std::sync::Arc;
263
264    use rstest::rstest;
265    use vortex_array::IntoArray;
266    use vortex_array::ToCanonical;
267    use vortex_array::arrays::BoolArray;
268    use vortex_array::arrays::PrimitiveArray;
269    use vortex_array::arrays::StructArray;
270    use vortex_array::assert_arrays_eq;
271    use vortex_array::builders::ArrayBuilder;
272    use vortex_array::builders::VarBinViewBuilder;
273    use vortex_array::expr::gt;
274    use vortex_array::expr::gt_eq;
275    use vortex_array::expr::lit;
276    use vortex_array::expr::lt;
277    use vortex_array::expr::pruning::checked_pruning_expr;
278    use vortex_array::expr::root;
279    use vortex_array::expr::stats::Stat;
280    use vortex_array::validity::Validity;
281    use vortex_buffer::BitBuffer;
282    use vortex_buffer::buffer;
283    use vortex_dtype::DType;
284    use vortex_dtype::FieldPath;
285    use vortex_dtype::FieldPathSet;
286    use vortex_dtype::Nullability;
287    use vortex_dtype::PType;
288    use vortex_error::VortexExpect;
289
290    use crate::layouts::zoned::MAX_IS_TRUNCATED;
291    use crate::layouts::zoned::MIN_IS_TRUNCATED;
292    use crate::layouts::zoned::zone_map::StatsAccumulator;
293    use crate::layouts::zoned::zone_map::ZoneMap;
294    use crate::test::SESSION;
295
296    #[rstest]
297    #[case(DType::Utf8(Nullability::NonNullable))]
298    #[case(DType::Binary(Nullability::NonNullable))]
299    fn truncates_accumulated_stats(#[case] dtype: DType) {
300        let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
301        builder.append_value("Value to be truncated");
302        builder.append_value("untruncated");
303        let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
304        builder2.append_value("Another");
305        builder2.append_value("wait a minute");
306        let mut acc =
307            StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
308        acc.push_chunk(&builder.finish())
309            .vortex_expect("push_chunk should succeed for test data");
310        acc.push_chunk(&builder2.finish())
311            .vortex_expect("push_chunk should succeed for test data");
312        let stats_table = acc
313            .as_stats_table()
314            .unwrap()
315            .expect("Must have stats table");
316        assert_eq!(
317            stats_table.array.names().as_ref(),
318            &[
319                Stat::Max.name(),
320                MAX_IS_TRUNCATED,
321                Stat::Min.name(),
322                MIN_IS_TRUNCATED,
323            ]
324        );
325        assert_eq!(
326            stats_table.array.unmasked_fields()[1]
327                .to_bool()
328                .to_bit_buffer(),
329            BitBuffer::from(vec![false, true])
330        );
331        assert_eq!(
332            stats_table.array.unmasked_fields()[3]
333                .to_bool()
334                .to_bit_buffer(),
335            BitBuffer::from(vec![true, false])
336        );
337    }
338
339    #[test]
340    fn always_adds_is_truncated_column() {
341        let array = buffer![0, 1, 2].into_array();
342        let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
343        acc.push_chunk(&array)
344            .vortex_expect("push_chunk should succeed for test array");
345        let stats_table = acc
346            .as_stats_table()
347            .unwrap()
348            .expect("Must have stats table");
349        assert_eq!(
350            stats_table.array.names().as_ref(),
351            &[
352                Stat::Max.name(),
353                MAX_IS_TRUNCATED,
354                Stat::Min.name(),
355                MIN_IS_TRUNCATED,
356                Stat::Sum.name(),
357            ]
358        );
359        assert_eq!(
360            stats_table.array.unmasked_fields()[1]
361                .to_bool()
362                .to_bit_buffer(),
363            BitBuffer::from(vec![false])
364        );
365        assert_eq!(
366            stats_table.array.unmasked_fields()[3]
367                .to_bool()
368                .to_bit_buffer(),
369            BitBuffer::from(vec![false])
370        );
371    }
372
373    #[rstest]
374    fn test_zone_map_prunes() {
375        // All stats that are known at pruning time.
376        let stats = FieldPathSet::from_iter([
377            FieldPath::from_iter([Stat::Min.name().into()]),
378            FieldPath::from_iter([Stat::Max.name().into()]),
379        ]);
380
381        // Construct a zone map with 3 zones:
382        //
383        // +----------+----------+
384        // |  a_min   |  a_max   |
385        // +----------+----------+
386        // |  1       |  5       |
387        // +----------+----------+
388        // |  2       |  6       |
389        // +----------+----------+
390        // |  3       |  7       |
391        // +----------+----------+
392        let zone_map = ZoneMap::try_new(
393            PType::I32.into(),
394            StructArray::from_fields(&[
395                (
396                    "max",
397                    PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
398                ),
399                (
400                    "max_is_truncated",
401                    BoolArray::from_iter([false, false, false]).into_array(),
402                ),
403                (
404                    "min",
405                    PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
406                ),
407                (
408                    "min_is_truncated",
409                    BoolArray::from_iter([false, false, false]).into_array(),
410                ),
411            ])
412            .unwrap(),
413            Arc::new([Stat::Max, Stat::Min]),
414        )
415        .unwrap();
416
417        // A >= 6
418        // => A.max < 6
419        let expr = gt_eq(root(), lit(6i32));
420        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
421        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
422        assert_arrays_eq!(
423            mask.into_array(),
424            BoolArray::from_iter([true, false, false])
425        );
426
427        // A > 5
428        // => A.max <= 5
429        let expr = gt(root(), lit(5i32));
430        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
431        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
432        assert_arrays_eq!(
433            mask.into_array(),
434            BoolArray::from_iter([true, false, false])
435        );
436
437        // A < 2
438        // => A.min >= 2
439        let expr = lt(root(), lit(2i32));
440        let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
441        let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
442        assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
443    }
444}