vortex_layout/layouts/zoned/
zone_map.rs

1use std::sync::Arc;
2
3use itertools::Itertools;
4use vortex_array::arrays::StructArray;
5use vortex_array::compute::sum;
6use vortex_array::stats::{Precision, Stat, StatsSet};
7use vortex_array::validity::Validity;
8use vortex_array::{Array, ArrayRef};
9use vortex_dtype::{DType, Nullability, PType, StructDType};
10use vortex_error::{VortexExpect, VortexResult, vortex_bail};
11
12use crate::layouts::zoned::builder::{
13    MAX_IS_TRUNCATED, MIN_IS_TRUNCATED, StatsArrayBuilder, stats_builder_with_capacity,
14};
15
16/// A zone map containing statistics for a column.
17/// Each row of the zone map corresponds to a chunk of the column.
18///
19/// Note that it's possible for the zone map to have no statistics.
20#[derive(Clone)]
21pub struct ZoneMap {
22    // The struct array backing the zone map
23    array: StructArray,
24    // The statistics that are included in the table.
25    stats: Arc<[Stat]>,
26}
27
28impl ZoneMap {
29    /// Create StatsTable of given column_dtype from given array. Validates that the array matches expected
30    /// structure for given list of stats
31    pub fn try_new(
32        column_dtype: DType,
33        array: StructArray,
34        stats: Arc<[Stat]>,
35    ) -> VortexResult<Self> {
36        if &Self::dtype_for_stats_table(&column_dtype, &stats) != array.dtype() {
37            vortex_bail!("Array dtype does not match expected zone map dtype");
38        }
39        Ok(Self::unchecked_new(array, stats))
40    }
41
42    /// Create StatsTable without validating return array against expected stats
43    pub fn unchecked_new(array: StructArray, stats: Arc<[Stat]>) -> Self {
44        Self { array, stats }
45    }
46
47    /// Returns the DType of the statistics table given a set of statistics and column [`DType`].
48    pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
49        assert!(present_stats.is_sorted(), "Stats must be sorted");
50        DType::Struct(
51            Arc::new(StructDType::from_iter(
52                present_stats
53                    .iter()
54                    .filter_map(|stat| {
55                        stat.dtype(column_dtype)
56                            .map(|dtype| (stat, dtype.as_nullable()))
57                    })
58                    .flat_map(|(s, dt)| match s {
59                        Stat::Max => vec![
60                            (s.name(), dt),
61                            (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
62                        ],
63                        Stat::Min => vec![
64                            (s.name(), dt),
65                            (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
66                        ],
67                        _ => vec![(s.name(), dt)],
68                    }),
69            )),
70            Nullability::NonNullable,
71        )
72    }
73
74    /// The struct array backing the zone map
75    pub fn array(&self) -> &StructArray {
76        &self.array
77    }
78
79    /// The statistics that are included in the table.
80    pub fn present_stats(&self) -> &Arc<[Stat]> {
81        &self.stats
82    }
83
84    /// Return an aggregated stats set for the table.
85    pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
86        let mut stats_set = StatsSet::default();
87        for &stat in stats {
88            let Some(array) = self.get_stat(stat)? else {
89                continue;
90            };
91
92            // Different stats need different aggregations
93            match stat {
94                // For stats that are associative, we can just compute them over the stat column
95                Stat::Min | Stat::Max | Stat::Sum => {
96                    if let Some(s) = array.statistics().compute_stat(stat)? {
97                        stats_set.set(stat, Precision::exact(s))
98                    }
99                }
100                // These stats sum up
101                Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
102                    let sum = sum(&array)?
103                        .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
104                        .into_value();
105                    stats_set.set(stat, Precision::exact(sum));
106                }
107                // We could implement these aggregations in the future, but for now they're unused
108                Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
109            }
110        }
111        Ok(stats_set)
112    }
113
114    /// Return the array for a given stat.
115    pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
116        Ok(self.array.field_by_name_opt(stat.name()).cloned())
117    }
118}
119
120/// Accumulates statistics for a column.
121///
122/// TODO(ngates): we should make it such that the zone map stores a mirror of the DType
123///  underneath each stats column. For example, `min: i32` for an `i32` array.
124///  Or `min: {a: i32, b: i32}` for a struct array of type `{a: i32, b: i32}`.
125///  See: <https://github.com/vortex-data/vortex/issues/1835>
126pub struct StatsAccumulator {
127    builders: Vec<Box<dyn StatsArrayBuilder>>,
128    length: usize,
129}
130
131impl StatsAccumulator {
132    pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
133        let builders = stats
134            .iter()
135            .filter_map(|&s| {
136                s.dtype(dtype).map(|stat_dtype| {
137                    stats_builder_with_capacity(
138                        s,
139                        &stat_dtype.as_nullable(),
140                        1024,
141                        max_variable_length_statistics_size,
142                    )
143                })
144            })
145            .collect::<Vec<_>>();
146
147        Self {
148            builders,
149            length: 0,
150        }
151    }
152
153    pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
154        for builder in self.builders.iter_mut() {
155            if let Some(v) = array.statistics().compute_stat(builder.stat())? {
156                builder.append_scalar_value(v)?;
157            } else {
158                builder.append_null();
159            }
160        }
161        self.length += 1;
162        Ok(())
163    }
164
165    /// Finishes the accumulator into a [`ZoneMap`].
166    ///
167    /// Returns `None` if none of the requested statistics can be computed, for example they are
168    /// not applicable to the column's data type.
169    pub fn as_stats_table(&mut self) -> Option<ZoneMap> {
170        let mut names = Vec::new();
171        let mut fields = Vec::new();
172        let mut stats = Vec::new();
173
174        for builder in self
175            .builders
176            .iter_mut()
177            // We sort the stats so the DType is deterministic based on which stats are present.
178            .sorted_unstable_by_key(|b| b.stat())
179        {
180            let values = builder.finish();
181
182            // We drop any all-null stats columns
183            if values
184                .all_invalid()
185                .vortex_expect("failed to get invalid count")
186            {
187                continue;
188            }
189
190            stats.push(builder.stat());
191            names.extend(values.names);
192            fields.extend(values.arrays);
193        }
194
195        if names.is_empty() {
196            return None;
197        }
198
199        Some(ZoneMap {
200            array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
201                .vortex_expect("Failed to create zone map"),
202            stats: stats.into(),
203        })
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use arrow_buffer::BooleanBuffer;
210    use rstest::rstest;
211    use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
212    use vortex_array::stats::Stat;
213    use vortex_array::{IntoArray, ToCanonical};
214    use vortex_buffer::buffer;
215    use vortex_dtype::{DType, Nullability};
216    use vortex_error::{VortexExpect, VortexUnwrap};
217
218    use crate::layouts::zoned::zone_map::StatsAccumulator;
219    use crate::layouts::zoned::{MAX_IS_TRUNCATED, MIN_IS_TRUNCATED};
220
221    #[rstest]
222    #[case(DType::Utf8(Nullability::NonNullable))]
223    #[case(DType::Binary(Nullability::NonNullable))]
224    fn truncates_accumulated_stats(#[case] dtype: DType) {
225        let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
226        builder.append_value("Value to be truncated");
227        builder.append_value("untruncated");
228        let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
229        builder2.append_value("Another");
230        builder2.append_value("wait a minute");
231        let mut acc =
232            StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
233        acc.push_chunk(&builder.finish()).vortex_unwrap();
234        acc.push_chunk(&builder2.finish()).vortex_unwrap();
235        let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
236        assert_eq!(
237            stats_table.array.names().as_ref(),
238            &[
239                Stat::Max.name().into(),
240                MAX_IS_TRUNCATED.into(),
241                Stat::Min.name().into(),
242                MIN_IS_TRUNCATED.into(),
243            ]
244        );
245        assert_eq!(
246            stats_table.array.fields()[1]
247                .to_bool()
248                .vortex_unwrap()
249                .boolean_buffer(),
250            &BooleanBuffer::from(vec![false, true])
251        );
252        assert_eq!(
253            stats_table.array.fields()[3]
254                .to_bool()
255                .vortex_unwrap()
256                .boolean_buffer(),
257            &BooleanBuffer::from(vec![true, false])
258        );
259    }
260
261    #[test]
262    fn always_adds_is_truncated_column() {
263        let array = buffer![0, 1, 2].into_array();
264        let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
265        acc.push_chunk(&array).vortex_unwrap();
266        let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
267        assert_eq!(
268            stats_table.array.names().as_ref(),
269            &[
270                Stat::Max.name().into(),
271                MAX_IS_TRUNCATED.into(),
272                Stat::Min.name().into(),
273                MIN_IS_TRUNCATED.into(),
274                Stat::Sum.name().into(),
275            ]
276        );
277        assert_eq!(
278            stats_table.array.fields()[1]
279                .to_bool()
280                .vortex_unwrap()
281                .boolean_buffer(),
282            &BooleanBuffer::from(vec![false])
283        );
284        assert_eq!(
285            stats_table.array.fields()[3]
286                .to_bool()
287                .vortex_unwrap()
288                .boolean_buffer(),
289            &BooleanBuffer::from(vec![false])
290        );
291    }
292}