vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
5
6use itertools::Itertools;
7use vortex_array::arrays::StructArray;
8use vortex_array::compute::sum;
9use vortex_array::stats::{Precision, Stat, StatsSet};
10use vortex_array::validity::Validity;
11use vortex_array::{Array, ArrayRef};
12use vortex_dtype::{DType, Nullability, PType, StructFields};
13use vortex_error::{VortexExpect, VortexResult, vortex_bail};
14use vortex_expr::{ExprRef, Scope};
15use vortex_mask::Mask;
16
17use crate::layouts::zoned::builder::{
18 MAX_IS_TRUNCATED, MIN_IS_TRUNCATED, StatsArrayBuilder, stats_builder_with_capacity,
19};
20
21#[derive(Clone)]
26pub struct ZoneMap {
27 array: StructArray,
29 stats: Arc<[Stat]>,
31}
32
33impl ZoneMap {
34 pub fn try_new(
37 column_dtype: DType,
38 array: StructArray,
39 stats: Arc<[Stat]>,
40 ) -> VortexResult<Self> {
41 let expected_dtype = Self::dtype_for_stats_table(&column_dtype, &stats);
42 if &expected_dtype != array.dtype() {
43 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
44 }
45 Ok(Self::new_unchecked(array, stats))
46 }
47
48 pub fn new_unchecked(array: StructArray, stats: Arc<[Stat]>) -> Self {
50 Self { array, stats }
51 }
52
53 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
55 assert!(present_stats.is_sorted(), "Stats must be sorted");
56 DType::Struct(
57 StructFields::from_iter(
58 present_stats
59 .iter()
60 .filter_map(|stat| {
61 stat.dtype(column_dtype)
62 .map(|dtype| (stat, dtype.as_nullable()))
63 })
64 .flat_map(|(s, dt)| match s {
65 Stat::Max => vec![
66 (s.name(), dt),
67 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
68 ],
69 Stat::Min => vec![
70 (s.name(), dt),
71 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
72 ],
73 _ => vec![(s.name(), dt)],
74 }),
75 ),
76 Nullability::NonNullable,
77 )
78 }
79
80 pub fn array(&self) -> &StructArray {
82 &self.array
83 }
84
85 pub fn present_stats(&self) -> &Arc<[Stat]> {
87 &self.stats
88 }
89
90 pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
92 let mut stats_set = StatsSet::default();
93 for &stat in stats {
94 let Some(array) = self.get_stat(stat)? else {
95 continue;
96 };
97
98 match stat {
100 Stat::Min | Stat::Max | Stat::Sum => {
102 if let Some(s) = array.statistics().compute_stat(stat)? {
103 stats_set.set(stat, Precision::exact(s))
104 }
105 }
106 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
108 let sum = sum(&array)?
109 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
110 .into_value();
111 stats_set.set(stat, Precision::exact(sum));
112 }
113 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
115 }
116 }
117 Ok(stats_set)
118 }
119
120 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
122 Ok(self.array.field_by_name_opt(stat.name()).cloned())
123 }
124
125 pub fn prune(&self, predicate: &ExprRef) -> VortexResult<Mask> {
134 Mask::try_from(
135 predicate
136 .evaluate(&Scope::new(self.array.to_array()))?
137 .as_ref(),
138 )
139 }
140}
141
142pub struct StatsAccumulator {
148 builders: Vec<Box<dyn StatsArrayBuilder>>,
149 length: usize,
150}
151
152impl StatsAccumulator {
153 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
154 let builders = stats
155 .iter()
156 .filter_map(|&s| {
157 s.dtype(dtype).map(|stat_dtype| {
158 stats_builder_with_capacity(
159 s,
160 &stat_dtype.as_nullable(),
161 1024,
162 max_variable_length_statistics_size,
163 )
164 })
165 })
166 .collect::<Vec<_>>();
167
168 Self {
169 builders,
170 length: 0,
171 }
172 }
173
174 pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
175 for builder in self.builders.iter_mut() {
176 if let Some(v) = array.statistics().compute_stat(builder.stat())? {
177 builder.append_scalar_value(v)?;
178 } else {
179 builder.append_null();
180 }
181 }
182 self.length += 1;
183 Ok(())
184 }
185
186 pub fn as_stats_table(&mut self) -> Option<ZoneMap> {
191 let mut names = Vec::new();
192 let mut fields = Vec::new();
193 let mut stats = Vec::new();
194
195 for builder in self
196 .builders
197 .iter_mut()
198 .sorted_unstable_by_key(|b| b.stat())
200 {
201 let values = builder.finish();
202
203 if values
205 .all_invalid()
206 .vortex_expect("failed to get invalid count")
207 {
208 continue;
209 }
210
211 stats.push(builder.stat());
212 names.extend(values.names);
213 fields.extend(values.arrays);
214 }
215
216 if names.is_empty() {
217 return None;
218 }
219
220 Some(ZoneMap {
221 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
222 .vortex_expect("Failed to create zone map"),
223 stats: stats.into(),
224 })
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use std::sync::Arc;
231
232 use arrow_buffer::BooleanBuffer;
233 use itertools::Itertools;
234 use rstest::rstest;
235 use vortex_array::arrays::{BoolArray, PrimitiveArray, StructArray};
236 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
237 use vortex_array::stats::Stat;
238 use vortex_array::validity::Validity;
239 use vortex_array::{IntoArray, ToCanonical};
240 use vortex_buffer::buffer;
241 use vortex_dtype::{DType, FieldPath, FieldPathSet, Nullability, PType};
242 use vortex_error::{VortexExpect, VortexUnwrap};
243 use vortex_expr::pruning::checked_pruning_expr;
244 use vortex_expr::{gt, gt_eq, lit, lt, root};
245
246 use crate::layouts::zoned::zone_map::{StatsAccumulator, ZoneMap};
247 use crate::layouts::zoned::{MAX_IS_TRUNCATED, MIN_IS_TRUNCATED};
248
249 #[rstest]
250 #[case(DType::Utf8(Nullability::NonNullable))]
251 #[case(DType::Binary(Nullability::NonNullable))]
252 fn truncates_accumulated_stats(#[case] dtype: DType) {
253 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
254 builder.append_value("Value to be truncated");
255 builder.append_value("untruncated");
256 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
257 builder2.append_value("Another");
258 builder2.append_value("wait a minute");
259 let mut acc =
260 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
261 acc.push_chunk(&builder.finish()).vortex_unwrap();
262 acc.push_chunk(&builder2.finish()).vortex_unwrap();
263 let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
264 assert_eq!(
265 stats_table.array.names().as_ref(),
266 &[
267 Stat::Max.name().into(),
268 MAX_IS_TRUNCATED.into(),
269 Stat::Min.name().into(),
270 MIN_IS_TRUNCATED.into(),
271 ]
272 );
273 assert_eq!(
274 stats_table.array.fields()[1]
275 .to_bool()
276 .vortex_unwrap()
277 .boolean_buffer(),
278 &BooleanBuffer::from(vec![false, true])
279 );
280 assert_eq!(
281 stats_table.array.fields()[3]
282 .to_bool()
283 .vortex_unwrap()
284 .boolean_buffer(),
285 &BooleanBuffer::from(vec![true, false])
286 );
287 }
288
289 #[test]
290 fn always_adds_is_truncated_column() {
291 let array = buffer![0, 1, 2].into_array();
292 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
293 acc.push_chunk(&array).vortex_unwrap();
294 let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
295 assert_eq!(
296 stats_table.array.names().as_ref(),
297 &[
298 Stat::Max.name().into(),
299 MAX_IS_TRUNCATED.into(),
300 Stat::Min.name().into(),
301 MIN_IS_TRUNCATED.into(),
302 Stat::Sum.name().into(),
303 ]
304 );
305 assert_eq!(
306 stats_table.array.fields()[1]
307 .to_bool()
308 .vortex_unwrap()
309 .boolean_buffer(),
310 &BooleanBuffer::from(vec![false])
311 );
312 assert_eq!(
313 stats_table.array.fields()[3]
314 .to_bool()
315 .vortex_unwrap()
316 .boolean_buffer(),
317 &BooleanBuffer::from(vec![false])
318 );
319 }
320
321 #[rstest]
322 fn test_zone_map_prunes() {
323 let stats = FieldPathSet::from_iter([
325 FieldPath::from_iter([Stat::Min.name().into()]),
326 FieldPath::from_iter([Stat::Max.name().into()]),
327 ]);
328
329 let zone_map = ZoneMap::try_new(
341 PType::I32.into(),
342 StructArray::from_fields(&[
343 (
344 "max",
345 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
346 ),
347 (
348 "max_is_truncated",
349 BoolArray::from_iter([false, false, false]).into_array(),
350 ),
351 (
352 "min",
353 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
354 ),
355 (
356 "min_is_truncated",
357 BoolArray::from_iter([false, false, false]).into_array(),
358 ),
359 ])
360 .unwrap(),
361 Arc::new([Stat::Max, Stat::Min]),
362 )
363 .unwrap();
364
365 let expr = gt_eq(root(), lit(6i32));
368 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
369 let mask = zone_map.prune(&pruning_expr).unwrap();
370 assert_eq!(
371 mask.to_boolean_buffer().into_iter().collect_vec(),
372 vec![true, false, false]
373 );
374
375 let expr = gt(root(), lit(5i32));
378 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
379 let mask = zone_map.prune(&pruning_expr).unwrap();
380 assert_eq!(
381 mask.to_boolean_buffer().into_iter().collect_vec(),
382 vec![true, false, false]
383 );
384
385 let expr = lt(root(), lit(2i32));
388 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
389 let mask = zone_map.prune(&pruning_expr).unwrap();
390 assert_eq!(
391 mask.to_boolean_buffer().into_iter().collect_vec(),
392 vec![false, true, true]
393 );
394 }
395}