vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
2
3use itertools::Itertools;
4use vortex_array::arrays::StructArray;
5use vortex_array::compute::sum;
6use vortex_array::stats::{Precision, Stat, StatsSet};
7use vortex_array::validity::Validity;
8use vortex_array::{Array, ArrayRef};
9use vortex_dtype::{DType, Nullability, PType, StructDType};
10use vortex_error::{VortexExpect, VortexResult, vortex_bail};
11
12use crate::layouts::zoned::builder::{
13 MAX_IS_TRUNCATED, MIN_IS_TRUNCATED, StatsArrayBuilder, stats_builder_with_capacity,
14};
15
16#[derive(Clone)]
21pub struct ZoneMap {
22 array: StructArray,
24 stats: Arc<[Stat]>,
26}
27
28impl ZoneMap {
29 pub fn try_new(
32 column_dtype: DType,
33 array: StructArray,
34 stats: Arc<[Stat]>,
35 ) -> VortexResult<Self> {
36 if &Self::dtype_for_stats_table(&column_dtype, &stats) != array.dtype() {
37 vortex_bail!("Array dtype does not match expected zone map dtype");
38 }
39 Ok(Self::unchecked_new(array, stats))
40 }
41
42 pub fn unchecked_new(array: StructArray, stats: Arc<[Stat]>) -> Self {
44 Self { array, stats }
45 }
46
47 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
49 assert!(present_stats.is_sorted(), "Stats must be sorted");
50 DType::Struct(
51 Arc::new(StructDType::from_iter(
52 present_stats
53 .iter()
54 .filter_map(|stat| {
55 stat.dtype(column_dtype)
56 .map(|dtype| (stat, dtype.as_nullable()))
57 })
58 .flat_map(|(s, dt)| match s {
59 Stat::Max => vec![
60 (s.name(), dt),
61 (MAX_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
62 ],
63 Stat::Min => vec![
64 (s.name(), dt),
65 (MIN_IS_TRUNCATED, DType::Bool(Nullability::NonNullable)),
66 ],
67 _ => vec![(s.name(), dt)],
68 }),
69 )),
70 Nullability::NonNullable,
71 )
72 }
73
74 pub fn array(&self) -> &StructArray {
76 &self.array
77 }
78
79 pub fn present_stats(&self) -> &Arc<[Stat]> {
81 &self.stats
82 }
83
84 pub fn to_stats_set(&self, stats: &[Stat]) -> VortexResult<StatsSet> {
86 let mut stats_set = StatsSet::default();
87 for &stat in stats {
88 let Some(array) = self.get_stat(stat)? else {
89 continue;
90 };
91
92 match stat {
94 Stat::Min | Stat::Max | Stat::Sum => {
96 if let Some(s) = array.statistics().compute_stat(stat)? {
97 stats_set.set(stat, Precision::exact(s))
98 }
99 }
100 Stat::NullCount | Stat::NaNCount | Stat::UncompressedSizeInBytes => {
102 let sum = sum(&array)?
103 .cast(&DType::Primitive(PType::U64, Nullability::Nullable))?
104 .into_value();
105 stats_set.set(stat, Precision::exact(sum));
106 }
107 Stat::IsConstant | Stat::IsSorted | Stat::IsStrictSorted => {}
109 }
110 }
111 Ok(stats_set)
112 }
113
114 pub fn get_stat(&self, stat: Stat) -> VortexResult<Option<ArrayRef>> {
116 Ok(self.array.field_by_name_opt(stat.name()).cloned())
117 }
118}
119
120pub struct StatsAccumulator {
127 builders: Vec<Box<dyn StatsArrayBuilder>>,
128 length: usize,
129}
130
131impl StatsAccumulator {
132 pub fn new(dtype: &DType, stats: &[Stat], max_variable_length_statistics_size: usize) -> Self {
133 let builders = stats
134 .iter()
135 .filter_map(|&s| {
136 s.dtype(dtype).map(|stat_dtype| {
137 stats_builder_with_capacity(
138 s,
139 &stat_dtype.as_nullable(),
140 1024,
141 max_variable_length_statistics_size,
142 )
143 })
144 })
145 .collect::<Vec<_>>();
146
147 Self {
148 builders,
149 length: 0,
150 }
151 }
152
153 pub fn push_chunk(&mut self, array: &dyn Array) -> VortexResult<()> {
154 for builder in self.builders.iter_mut() {
155 if let Some(v) = array.statistics().compute_stat(builder.stat())? {
156 builder.append_scalar_value(v)?;
157 } else {
158 builder.append_null();
159 }
160 }
161 self.length += 1;
162 Ok(())
163 }
164
165 pub fn as_stats_table(&mut self) -> Option<ZoneMap> {
170 let mut names = Vec::new();
171 let mut fields = Vec::new();
172 let mut stats = Vec::new();
173
174 for builder in self
175 .builders
176 .iter_mut()
177 .sorted_unstable_by_key(|b| b.stat())
179 {
180 let values = builder.finish();
181
182 if values
184 .all_invalid()
185 .vortex_expect("failed to get invalid count")
186 {
187 continue;
188 }
189
190 stats.push(builder.stat());
191 names.extend(values.names);
192 fields.extend(values.arrays);
193 }
194
195 if names.is_empty() {
196 return None;
197 }
198
199 Some(ZoneMap {
200 array: StructArray::try_new(names.into(), fields, self.length, Validity::NonNullable)
201 .vortex_expect("Failed to create zone map"),
202 stats: stats.into(),
203 })
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use arrow_buffer::BooleanBuffer;
210 use rstest::rstest;
211 use vortex_array::builders::{ArrayBuilder, VarBinViewBuilder};
212 use vortex_array::stats::Stat;
213 use vortex_array::{IntoArray, ToCanonical};
214 use vortex_buffer::buffer;
215 use vortex_dtype::{DType, Nullability};
216 use vortex_error::{VortexExpect, VortexUnwrap};
217
218 use crate::layouts::zoned::zone_map::StatsAccumulator;
219 use crate::layouts::zoned::{MAX_IS_TRUNCATED, MIN_IS_TRUNCATED};
220
221 #[rstest]
222 #[case(DType::Utf8(Nullability::NonNullable))]
223 #[case(DType::Binary(Nullability::NonNullable))]
224 fn truncates_accumulated_stats(#[case] dtype: DType) {
225 let mut builder = VarBinViewBuilder::with_capacity(dtype.clone(), 2);
226 builder.append_value("Value to be truncated");
227 builder.append_value("untruncated");
228 let mut builder2 = VarBinViewBuilder::with_capacity(dtype, 2);
229 builder2.append_value("Another");
230 builder2.append_value("wait a minute");
231 let mut acc =
232 StatsAccumulator::new(builder.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
233 acc.push_chunk(&builder.finish()).vortex_unwrap();
234 acc.push_chunk(&builder2.finish()).vortex_unwrap();
235 let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
236 assert_eq!(
237 stats_table.array.names().as_ref(),
238 &[
239 Stat::Max.name().into(),
240 MAX_IS_TRUNCATED.into(),
241 Stat::Min.name().into(),
242 MIN_IS_TRUNCATED.into(),
243 ]
244 );
245 assert_eq!(
246 stats_table.array.fields()[1]
247 .to_bool()
248 .vortex_unwrap()
249 .boolean_buffer(),
250 &BooleanBuffer::from(vec![false, true])
251 );
252 assert_eq!(
253 stats_table.array.fields()[3]
254 .to_bool()
255 .vortex_unwrap()
256 .boolean_buffer(),
257 &BooleanBuffer::from(vec![true, false])
258 );
259 }
260
261 #[test]
262 fn always_adds_is_truncated_column() {
263 let array = buffer![0, 1, 2].into_array();
264 let mut acc = StatsAccumulator::new(array.dtype(), &[Stat::Max, Stat::Min, Stat::Sum], 12);
265 acc.push_chunk(&array).vortex_unwrap();
266 let stats_table = acc.as_stats_table().vortex_expect("Must have stats table");
267 assert_eq!(
268 stats_table.array.names().as_ref(),
269 &[
270 Stat::Max.name().into(),
271 MAX_IS_TRUNCATED.into(),
272 Stat::Min.name().into(),
273 MIN_IS_TRUNCATED.into(),
274 Stat::Sum.name().into(),
275 ]
276 );
277 assert_eq!(
278 stats_table.array.fields()[1]
279 .to_bool()
280 .vortex_unwrap()
281 .boolean_buffer(),
282 &BooleanBuffer::from(vec![false])
283 );
284 assert_eq!(
285 stats_table.array.fields()[3]
286 .to_bool()
287 .vortex_unwrap()
288 .boolean_buffer(),
289 &BooleanBuffer::from(vec![false])
290 );
291 }
292}