vortex_layout/layouts/zoned/
zone_map.rs1use std::sync::Arc;
7
8use vortex_array::ArrayRef;
9use vortex_array::IntoArray;
10use vortex_array::VortexSessionExecute;
11use vortex_array::arrays::ConstantArray;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::StructArray;
14use vortex_array::dtype::DType;
15use vortex_array::expr::Expression;
16use vortex_array::expr::stats::Stat;
17use vortex_array::scalar_fn::internal::row_count::contains_row_count;
18use vortex_array::scalar_fn::internal::row_count::substitute_row_count;
19use vortex_array::validity::Validity;
20use vortex_buffer::buffer;
21use vortex_error::VortexResult;
22use vortex_error::vortex_bail;
23use vortex_mask::Mask;
24use vortex_runend::RunEnd;
25use vortex_session::VortexSession;
26
27use crate::layouts::zoned::schema::stats_table_dtype;
28
29#[derive(Clone)]
34pub struct ZoneMap {
35 array: StructArray,
37 zone_len: u64,
39 row_count: u64,
41}
42
43impl ZoneMap {
44 pub fn try_new(
47 column_dtype: DType,
48 array: StructArray,
49 stats: Arc<[Stat]>,
50 zone_len: u64,
51 row_count: u64,
52 ) -> VortexResult<Self> {
53 let expected_dtype = stats_table_dtype(&column_dtype, &stats);
54 if &expected_dtype != array.dtype() {
55 vortex_bail!("Array dtype does not match expected zone map dtype: {expected_dtype}");
56 }
57
58 Ok(unsafe { Self::new_unchecked(array, zone_len, row_count) })
60 }
61
62 pub unsafe fn new_unchecked(array: StructArray, zone_len: u64, row_count: u64) -> Self {
68 Self {
69 array,
70 zone_len,
71 row_count,
72 }
73 }
74
75 #[deprecated(note = "use `stats_table_dtype` from `crate::layouts::zoned::schema` instead")]
79 pub fn dtype_for_stats_table(column_dtype: &DType, present_stats: &[Stat]) -> DType {
80 stats_table_dtype(column_dtype, present_stats)
81 }
82
83 pub fn prune(&self, predicate: &Expression, session: &VortexSession) -> VortexResult<Mask> {
96 let mut ctx = session.create_execution_ctx();
97 let num_zones = self.array.len();
98
99 let applied = self.array.clone().into_array().apply(predicate)?;
100
101 if num_zones == 0 || !contains_row_count(&applied) {
102 return applied.execute::<Mask>(&mut ctx);
103 }
104
105 let row_count_array = row_count_array(self.zone_len, self.row_count, num_zones)?;
106 let substituted = substitute_row_count(applied, &row_count_array)?;
107 substituted.execute::<Mask>(&mut ctx)
108 }
109}
110
111fn row_count_array(zone_len: u64, row_count: u64, num_zones: usize) -> VortexResult<ArrayRef> {
117 let last_zone_len = row_count - zone_len.saturating_mul((num_zones as u64) - 1);
118 if num_zones == 1 || last_zone_len == zone_len {
119 return Ok(ConstantArray::new(last_zone_len, num_zones).into_array());
120 }
121
122 let ends = unsafe {
123 PrimitiveArray::new_unchecked(
124 buffer![num_zones as u64 - 1, num_zones as u64],
125 Validity::NonNullable,
126 )
127 }
128 .into_array();
129 let values = unsafe {
130 PrimitiveArray::new_unchecked(buffer![zone_len, last_zone_len], Validity::NonNullable)
131 }
132 .into_array();
133
134 Ok(unsafe { RunEnd::new_unchecked(ends, values, 0, num_zones) }.into_array())
137}
138
139#[cfg(test)]
140mod tests {
141 use std::sync::Arc;
142
143 use vortex_array::IntoArray;
144 use vortex_array::arrays::BoolArray;
145 use vortex_array::arrays::PrimitiveArray;
146 use vortex_array::arrays::StructArray;
147 use vortex_array::assert_arrays_eq;
148 use vortex_array::dtype::FieldPath;
149 use vortex_array::dtype::FieldPathSet;
150 use vortex_array::dtype::PType;
151 use vortex_array::expr::gt;
152 use vortex_array::expr::gt_eq;
153 use vortex_array::expr::is_not_null;
154 use vortex_array::expr::lit;
155 use vortex_array::expr::lt;
156 use vortex_array::expr::pruning::checked_pruning_expr;
157 use vortex_array::expr::root;
158 use vortex_array::expr::stats::Stat;
159 use vortex_array::validity::Validity;
160 use vortex_buffer::buffer;
161
162 use crate::layouts::zoned::zone_map::ZoneMap;
163 use crate::test::SESSION;
164
165 #[test]
166 fn test_zone_map_prunes() {
167 let stats = FieldPathSet::from_iter([
169 FieldPath::from_iter([Stat::Min.name().into()]),
170 FieldPath::from_iter([Stat::Max.name().into()]),
171 ]);
172
173 let zone_map = ZoneMap::try_new(
185 PType::I32.into(),
186 StructArray::from_fields(&[
187 (
188 "max",
189 PrimitiveArray::new(buffer![5i32, 6i32, 7i32], Validity::AllValid).into_array(),
190 ),
191 (
192 "max_is_truncated",
193 BoolArray::from_iter([false, false, false]).into_array(),
194 ),
195 (
196 "min",
197 PrimitiveArray::new(buffer![1i32, 2i32, 3i32], Validity::AllValid).into_array(),
198 ),
199 (
200 "min_is_truncated",
201 BoolArray::from_iter([false, false, false]).into_array(),
202 ),
203 ])
204 .unwrap(),
205 Arc::new([Stat::Max, Stat::Min]),
206 3,
207 10,
208 )
209 .unwrap();
210
211 let expr = gt_eq(root(), lit(6i32));
214 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
215 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
216 assert_arrays_eq!(
217 mask.into_array(),
218 BoolArray::from_iter([true, false, false])
219 );
220
221 let expr = gt(root(), lit(5i32));
224 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
225 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
226 assert_arrays_eq!(
227 mask.into_array(),
228 BoolArray::from_iter([true, false, false])
229 );
230
231 let expr = lt(root(), lit(2i32));
234 let (pruning_expr, _) = checked_pruning_expr(&expr, &stats).unwrap();
235 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
236 assert_arrays_eq!(mask.into_array(), BoolArray::from_iter([false, true, true]));
237 }
238
239 #[test]
240 fn row_count_prunes_short_trailing_zone() {
241 let zone_map = ZoneMap::try_new(
242 PType::U64.into(),
243 StructArray::from_fields(&[(
244 "null_count",
245 PrimitiveArray::new(buffer![0u64, 0, 2], Validity::AllValid).into_array(),
246 )])
247 .unwrap(),
248 Arc::new([Stat::NullCount]),
249 4,
250 10,
251 )
252 .unwrap();
253
254 let available_stats =
255 FieldPathSet::from_iter([FieldPath::from_iter([Stat::NullCount.name().into()])]);
256 let expr = is_not_null(root());
257 let (pruning_expr, _) = checked_pruning_expr(&expr, &available_stats).unwrap();
258
259 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
260 assert_arrays_eq!(
261 mask.into_array(),
262 BoolArray::from_iter([false, false, true])
263 );
264 }
265
266 #[test]
267 fn row_count_prunes_all_null_uniform_zones() {
268 let zone_map = ZoneMap::try_new(
269 PType::U64.into(),
270 StructArray::from_fields(&[(
271 "null_count",
272 PrimitiveArray::new(buffer![0u64, 4, 0], Validity::AllValid).into_array(),
273 )])
274 .unwrap(),
275 Arc::new([Stat::NullCount]),
276 4,
277 12,
278 )
279 .unwrap();
280
281 let available_stats =
282 FieldPathSet::from_iter([FieldPath::from_iter([Stat::NullCount.name().into()])]);
283 let expr = is_not_null(root());
284 let (pruning_expr, _) = checked_pruning_expr(&expr, &available_stats).unwrap();
285
286 let mask = zone_map.prune(&pruning_expr, &SESSION).unwrap();
288 assert_arrays_eq!(
289 mask.into_array(),
290 BoolArray::from_iter([false, true, false])
291 );
292 }
293}