1use std::collections::BTreeSet;
11use std::ops::Range;
12use std::sync::Arc;
13
14use vortex_array::Canonical;
15use vortex_array::IntoArray;
16use vortex_array::MaskFuture;
17use vortex_array::VortexSessionExecute;
18use vortex_array::arrays::NullArray;
19use vortex_array::dtype::DType;
20use vortex_array::dtype::FieldMask;
21use vortex_array::dtype::FieldPath;
22use vortex_array::dtype::StructFields;
23use vortex_array::expr::Expression;
24use vortex_array::expr::StatsCatalog;
25use vortex_array::expr::lit;
26use vortex_array::expr::stats::Stat;
27use vortex_array::scalar::Scalar;
28use vortex_array::scalar_fn::fns::literal::Literal;
29use vortex_error::VortexResult;
30use vortex_layout::ArrayFuture;
31use vortex_layout::LayoutReader;
32use vortex_layout::LayoutReaderRef;
33use vortex_mask::Mask;
34use vortex_session::VortexSession;
35use vortex_utils::aliases::dash_map::DashMap;
36
37use crate::FileStatistics;
38
39pub struct FileStatsLayoutReader {
48 child: LayoutReaderRef,
49 file_stats: FileStatistics,
50 struct_fields: StructFields,
51 session: VortexSession,
52 prune_cache: DashMap<Expression, bool>,
53}
54
55impl FileStatsLayoutReader {
56 pub fn new(child: LayoutReaderRef, file_stats: FileStatistics, session: VortexSession) -> Self {
63 let struct_fields = child
64 .dtype()
65 .as_struct_fields_opt()
66 .cloned()
67 .unwrap_or_default();
68
69 Self {
70 child,
71 file_stats,
72 struct_fields,
73 session,
74 prune_cache: Default::default(),
75 }
76 }
77
78 fn evaluate_file_stats(&self, expr: &Expression) -> VortexResult<bool> {
82 let Some(pruning_expr) = expr.stat_falsification(self) else {
83 return Ok(false);
85 };
86
87 let simplified = pruning_expr.optimize_recursive(&DType::Null)?;
91 if let Some(result) = simplified.as_opt::<Literal>() {
92 return Ok(result.as_bool().value() == Some(true));
94 }
95
96 let pruning = NullArray::new(1).into_array().apply(&pruning_expr)?;
99
100 let mut ctx = self.session.create_execution_ctx();
101 let result = pruning
102 .execute::<Canonical>(&mut ctx)?
103 .into_bool()
104 .scalar_at(0)?;
105
106 Ok(result.as_bool().value() == Some(true))
107 }
108}
109
110impl StatsCatalog for FileStatsLayoutReader {
112 fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option<Expression> {
113 if field_path.parts().len() != 1 {
115 return None;
116 }
117
118 let field_name = field_path.parts()[0].as_name()?;
119 let field_idx = self.struct_fields.find(field_name)?;
120 let field_stats = self.file_stats.stats_sets().get(field_idx)?;
121
122 let stat_value = field_stats.get(stat)?.as_exact()?;
123 let field_dtype = self.struct_fields.field_by_index(field_idx)?;
124 let stat_scalar = Scalar::try_new(field_dtype, Some(stat_value)).ok()?;
125
126 Some(lit(stat_scalar))
127 }
128}
129
130impl LayoutReader for FileStatsLayoutReader {
131 fn name(&self) -> &Arc<str> {
132 self.child.name()
133 }
134
135 fn dtype(&self) -> &DType {
136 self.child.dtype()
137 }
138
139 fn row_count(&self) -> u64 {
140 self.child.row_count()
141 }
142
143 fn register_splits(
144 &self,
145 field_mask: &[FieldMask],
146 row_range: &Range<u64>,
147 splits: &mut BTreeSet<u64>,
148 ) -> VortexResult<()> {
149 self.child.register_splits(field_mask, row_range, splits)
150 }
151
152 fn pruning_evaluation(
153 &self,
154 row_range: &Range<u64>,
155 expr: &Expression,
156 mask: Mask,
157 ) -> VortexResult<MaskFuture> {
158 if let Some(pruned) = self.prune_cache.get(expr) {
160 if *pruned {
161 return Ok(MaskFuture::ready(Mask::new_false(mask.len())));
162 }
163 return self.child.pruning_evaluation(row_range, expr, mask);
164 }
165
166 let pruned = self.evaluate_file_stats(expr)?;
168 self.prune_cache.insert(expr.clone(), pruned);
169
170 if pruned {
171 Ok(MaskFuture::ready(Mask::new_false(mask.len())))
172 } else {
173 self.child.pruning_evaluation(row_range, expr, mask)
174 }
175 }
176
177 fn filter_evaluation(
178 &self,
179 row_range: &Range<u64>,
180 expr: &Expression,
181 mask: MaskFuture,
182 ) -> VortexResult<MaskFuture> {
183 self.child.filter_evaluation(row_range, expr, mask)
184 }
185
186 fn projection_evaluation(
187 &self,
188 row_range: &Range<u64>,
189 expr: &Expression,
190 mask: MaskFuture,
191 ) -> VortexResult<ArrayFuture> {
192 self.child.projection_evaluation(row_range, expr, mask)
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::sync::Arc;
199 use std::sync::LazyLock;
200
201 use vortex_array::ArrayContext;
202 use vortex_array::IntoArray as _;
203 use vortex_array::arrays::StructArray;
204 use vortex_array::dtype::DType;
205 use vortex_array::dtype::Nullability;
206 use vortex_array::dtype::PType;
207 use vortex_array::expr::get_item;
208 use vortex_array::expr::gt;
209 use vortex_array::expr::lit;
210 use vortex_array::expr::root;
211 use vortex_array::expr::stats::Precision;
212 use vortex_array::expr::stats::Stat;
213 use vortex_array::scalar::ScalarValue;
214 use vortex_array::scalar_fn::session::ScalarFnSession;
215 use vortex_array::session::ArraySession;
216 use vortex_array::stats::StatsSet;
217 use vortex_buffer::buffer;
218 use vortex_error::VortexResult;
219 use vortex_io::runtime::single::block_on;
220 use vortex_io::session::RuntimeSession;
221 use vortex_layout::LayoutReader;
222 use vortex_layout::LayoutStrategy;
223 use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
224 use vortex_layout::layouts::table::TableStrategy;
225 use vortex_layout::segments::TestSegments;
226 use vortex_layout::sequence::SequenceId;
227 use vortex_layout::sequence::SequentialArrayStreamExt;
228 use vortex_layout::session::LayoutSession;
229 use vortex_mask::Mask;
230 use vortex_session::VortexSession;
231
232 use super::*;
233
234 static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
235 VortexSession::empty()
236 .with::<ArraySession>()
237 .with::<LayoutSession>()
238 .with::<ScalarFnSession>()
239 .with::<RuntimeSession>()
240 });
241
242 fn test_file_stats(min: i32, max: i32) -> FileStatistics {
243 let mut stats = StatsSet::default();
244 stats.set(Stat::Min, Precision::exact(ScalarValue::from(min)));
245 stats.set(Stat::Max, Precision::exact(ScalarValue::from(max)));
246 FileStatistics::new(
247 Arc::from([stats]),
248 Arc::from([DType::Primitive(PType::I32, Nullability::NonNullable)]),
249 )
250 }
251
252 #[test]
253 fn pruning_when_filter_out_of_range() -> VortexResult<()> {
254 block_on(|handle| async {
255 let ctx = ArrayContext::empty();
256 let segments = Arc::new(TestSegments::default());
257 let (ptr, eof) = SequenceId::root().split();
258 let struct_array = StructArray::from_fields(
259 [("col", buffer![1i32, 2, 3, 4, 5].into_array())].as_slice(),
260 )?;
261 let strategy = TableStrategy::new(
262 Arc::new(FlatLayoutStrategy::default()),
263 Arc::new(FlatLayoutStrategy::default()),
264 );
265 let layout = strategy
266 .write_stream(
267 ctx,
268 segments.clone(),
269 struct_array.into_array().to_array_stream().sequenced(ptr),
270 eof,
271 handle,
272 )
273 .await?;
274
275 let child = layout.new_reader("".into(), segments, &SESSION)?;
276
277 let reader =
278 FileStatsLayoutReader::new(child, test_file_stats(0, 100), SESSION.clone());
279
280 let expr = gt(get_item("col", root()), lit(200i32));
282 let mask = Mask::new_true(5);
283 let result = reader.pruning_evaluation(&(0..5), &expr, mask)?.await?;
284 assert_eq!(result, Mask::new_false(5));
285
286 Ok(())
287 })
288 }
289
290 #[test]
291 fn no_pruning_when_filter_in_range() -> VortexResult<()> {
292 block_on(|handle| async {
293 let ctx = ArrayContext::empty();
294 let segments = Arc::new(TestSegments::default());
295 let (ptr, eof) = SequenceId::root().split();
296 let struct_array = StructArray::from_fields(
297 [("col", buffer![1i32, 2, 3, 4, 5].into_array())].as_slice(),
298 )?;
299 let strategy = TableStrategy::new(
300 Arc::new(FlatLayoutStrategy::default()),
301 Arc::new(FlatLayoutStrategy::default()),
302 );
303 let layout = strategy
304 .write_stream(
305 ctx,
306 segments.clone(),
307 struct_array.into_array().to_array_stream().sequenced(ptr),
308 eof,
309 handle,
310 )
311 .await?;
312
313 let child = layout.new_reader("".into(), segments, &SESSION)?;
314
315 let reader =
316 FileStatsLayoutReader::new(child, test_file_stats(0, 100), SESSION.clone());
317
318 let expr = gt(get_item("col", root()), lit(50i32));
320 let mask = Mask::new_true(5);
321 let result = reader.pruning_evaluation(&(0..5), &expr, mask)?.await?;
322 assert_eq!(result, Mask::new_true(5));
324
325 Ok(())
326 })
327 }
328}