Skip to main content

vortex_file/v2/
file_stats_reader.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! A [`LayoutReader`] decorator that performs file-level stats pruning.
5//!
6//! If file-level statistics prove that a filter expression cannot match any rows in the file,
7//! [`FileStatsLayoutReader`] short-circuits [`pruning_evaluation`](LayoutReader::pruning_evaluation)
8//! by returning an all-false mask — avoiding all downstream I/O.
9
10use std::collections::BTreeSet;
11use std::ops::Range;
12use std::sync::Arc;
13
14use vortex_array::Canonical;
15use vortex_array::IntoArray;
16use vortex_array::MaskFuture;
17use vortex_array::VortexSessionExecute;
18use vortex_array::arrays::NullArray;
19use vortex_array::dtype::DType;
20use vortex_array::dtype::FieldMask;
21use vortex_array::dtype::FieldPath;
22use vortex_array::dtype::StructFields;
23use vortex_array::expr::Expression;
24use vortex_array::expr::StatsCatalog;
25use vortex_array::expr::lit;
26use vortex_array::expr::stats::Stat;
27use vortex_array::scalar::Scalar;
28use vortex_array::scalar_fn::fns::literal::Literal;
29use vortex_error::VortexResult;
30use vortex_layout::ArrayFuture;
31use vortex_layout::LayoutReader;
32use vortex_layout::LayoutReaderRef;
33use vortex_mask::Mask;
34use vortex_session::VortexSession;
35use vortex_utils::aliases::dash_map::DashMap;
36
37use crate::FileStatistics;
38
39/// A [`LayoutReader`] decorator that prunes entire files based on file-level statistics.
40///
41/// This reader wraps an inner `LayoutReader` and intercepts `pruning_evaluation` calls.
42/// When file-level stats prove that a filter expression is false for the entire file,
43/// it returns an all-false mask immediately — avoiding all downstream I/O.
44///
45/// Pruning results are cached per-expression since file-level stats are global
46/// (the result is the same regardless of which row range is requested).
47pub struct FileStatsLayoutReader {
48    child: LayoutReaderRef,
49    file_stats: FileStatistics,
50    struct_fields: StructFields,
51    session: VortexSession,
52    prune_cache: DashMap<Expression, bool>,
53}
54
55impl FileStatsLayoutReader {
56    /// Creates a new `FileStatsLayoutReader` wrapping the given child reader.
57    ///
58    /// The `struct_fields` are derived from the child reader's dtype. If the dtype is not a
59    /// struct, the available stats will be empty and no pruning will occur.
60    ///
61    /// Pre-computes the set of available stat field paths from the struct fields and file stats.
62    pub fn new(child: LayoutReaderRef, file_stats: FileStatistics, session: VortexSession) -> Self {
63        let struct_fields = child
64            .dtype()
65            .as_struct_fields_opt()
66            .cloned()
67            .unwrap_or_default();
68
69        Self {
70            child,
71            file_stats,
72            struct_fields,
73            session,
74            prune_cache: Default::default(),
75        }
76    }
77
78    /// Evaluates whether the file can be fully pruned for the given expression.
79    ///
80    /// Returns `true` if file-level stats prove no rows can match, `false` otherwise.
81    fn evaluate_file_stats(&self, expr: &Expression) -> VortexResult<bool> {
82        let Some(pruning_expr) = expr.stat_falsification(self) else {
83            // If there is no pruning expression, we can't prune.
84            return Ok(false);
85        };
86
87        // Given how we implemented the StatsCatalog, we know the expression must be all literals.
88        // We can therefore optimize with a null scope since there are no field references that
89        // need to be resolved.
90        let simplified = pruning_expr.optimize_recursive(&DType::Null)?;
91        if let Some(result) = simplified.as_opt::<Literal>() {
92            // Can prune if the result is non-nullable and true
93            return Ok(result.as_bool().value() == Some(true));
94        }
95
96        // Sometimes expressions don't implement constant folding to literals... In this case,
97        // we just execute the expression over a null array.
98        let pruning = NullArray::new(1).into_array().apply(&pruning_expr)?;
99
100        let mut ctx = self.session.create_execution_ctx();
101        let result = pruning
102            .execute::<Canonical>(&mut ctx)?
103            .into_bool()
104            .scalar_at(0)?;
105
106        Ok(result.as_bool().value() == Some(true))
107    }
108}
109
110/// Implements [`StatsCatalog`] to provide file-level stats to expressions during pruning evaluation.
111impl StatsCatalog for FileStatsLayoutReader {
112    fn stats_ref(&self, field_path: &FieldPath, stat: Stat) -> Option<Expression> {
113        // FileStats currently only holds top-level field statistics.
114        if field_path.parts().len() != 1 {
115            return None;
116        }
117
118        let field_name = field_path.parts()[0].as_name()?;
119        let field_idx = self.struct_fields.find(field_name)?;
120        let field_stats = self.file_stats.stats_sets().get(field_idx)?;
121
122        let stat_value = field_stats.get(stat)?.as_exact()?;
123        let field_dtype = self.struct_fields.field_by_index(field_idx)?;
124        let stat_scalar = Scalar::try_new(field_dtype, Some(stat_value)).ok()?;
125
126        Some(lit(stat_scalar))
127    }
128}
129
130impl LayoutReader for FileStatsLayoutReader {
131    fn name(&self) -> &Arc<str> {
132        self.child.name()
133    }
134
135    fn dtype(&self) -> &DType {
136        self.child.dtype()
137    }
138
139    fn row_count(&self) -> u64 {
140        self.child.row_count()
141    }
142
143    fn register_splits(
144        &self,
145        field_mask: &[FieldMask],
146        row_range: &Range<u64>,
147        splits: &mut BTreeSet<u64>,
148    ) -> VortexResult<()> {
149        self.child.register_splits(field_mask, row_range, splits)
150    }
151
152    fn pruning_evaluation(
153        &self,
154        row_range: &Range<u64>,
155        expr: &Expression,
156        mask: Mask,
157    ) -> VortexResult<MaskFuture> {
158        // Check cache first with read-only lock.
159        if let Some(pruned) = self.prune_cache.get(expr) {
160            if *pruned {
161                return Ok(MaskFuture::ready(Mask::new_false(mask.len())));
162            }
163            return self.child.pruning_evaluation(row_range, expr, mask);
164        }
165
166        // Evaluate and cache.
167        let pruned = self.evaluate_file_stats(expr)?;
168        self.prune_cache.insert(expr.clone(), pruned);
169
170        if pruned {
171            Ok(MaskFuture::ready(Mask::new_false(mask.len())))
172        } else {
173            self.child.pruning_evaluation(row_range, expr, mask)
174        }
175    }
176
177    fn filter_evaluation(
178        &self,
179        row_range: &Range<u64>,
180        expr: &Expression,
181        mask: MaskFuture,
182    ) -> VortexResult<MaskFuture> {
183        self.child.filter_evaluation(row_range, expr, mask)
184    }
185
186    fn projection_evaluation(
187        &self,
188        row_range: &Range<u64>,
189        expr: &Expression,
190        mask: MaskFuture,
191    ) -> VortexResult<ArrayFuture> {
192        self.child.projection_evaluation(row_range, expr, mask)
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::sync::Arc;
199    use std::sync::LazyLock;
200
201    use vortex_array::ArrayContext;
202    use vortex_array::IntoArray as _;
203    use vortex_array::arrays::StructArray;
204    use vortex_array::dtype::DType;
205    use vortex_array::dtype::Nullability;
206    use vortex_array::dtype::PType;
207    use vortex_array::expr::get_item;
208    use vortex_array::expr::gt;
209    use vortex_array::expr::lit;
210    use vortex_array::expr::root;
211    use vortex_array::expr::stats::Precision;
212    use vortex_array::expr::stats::Stat;
213    use vortex_array::scalar::ScalarValue;
214    use vortex_array::scalar_fn::session::ScalarFnSession;
215    use vortex_array::session::ArraySession;
216    use vortex_array::stats::StatsSet;
217    use vortex_buffer::buffer;
218    use vortex_error::VortexResult;
219    use vortex_io::runtime::single::block_on;
220    use vortex_io::session::RuntimeSession;
221    use vortex_layout::LayoutReader;
222    use vortex_layout::LayoutStrategy;
223    use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
224    use vortex_layout::layouts::table::TableStrategy;
225    use vortex_layout::segments::TestSegments;
226    use vortex_layout::sequence::SequenceId;
227    use vortex_layout::sequence::SequentialArrayStreamExt;
228    use vortex_layout::session::LayoutSession;
229    use vortex_mask::Mask;
230    use vortex_session::VortexSession;
231
232    use super::*;
233
234    static SESSION: LazyLock<VortexSession> = LazyLock::new(|| {
235        VortexSession::empty()
236            .with::<ArraySession>()
237            .with::<LayoutSession>()
238            .with::<ScalarFnSession>()
239            .with::<RuntimeSession>()
240    });
241
242    fn test_file_stats(min: i32, max: i32) -> FileStatistics {
243        let mut stats = StatsSet::default();
244        stats.set(Stat::Min, Precision::exact(ScalarValue::from(min)));
245        stats.set(Stat::Max, Precision::exact(ScalarValue::from(max)));
246        FileStatistics::new(
247            Arc::from([stats]),
248            Arc::from([DType::Primitive(PType::I32, Nullability::NonNullable)]),
249        )
250    }
251
252    #[test]
253    fn pruning_when_filter_out_of_range() -> VortexResult<()> {
254        block_on(|handle| async {
255            let ctx = ArrayContext::empty();
256            let segments = Arc::new(TestSegments::default());
257            let (ptr, eof) = SequenceId::root().split();
258            let struct_array = StructArray::from_fields(
259                [("col", buffer![1i32, 2, 3, 4, 5].into_array())].as_slice(),
260            )?;
261            let strategy = TableStrategy::new(
262                Arc::new(FlatLayoutStrategy::default()),
263                Arc::new(FlatLayoutStrategy::default()),
264            );
265            let layout = strategy
266                .write_stream(
267                    ctx,
268                    segments.clone(),
269                    struct_array.into_array().to_array_stream().sequenced(ptr),
270                    eof,
271                    handle,
272                )
273                .await?;
274
275            let child = layout.new_reader("".into(), segments, &SESSION)?;
276
277            let reader =
278                FileStatsLayoutReader::new(child, test_file_stats(0, 100), SESSION.clone());
279
280            // col > 200 should be prunable since max is 100.
281            let expr = gt(get_item("col", root()), lit(200i32));
282            let mask = Mask::new_true(5);
283            let result = reader.pruning_evaluation(&(0..5), &expr, mask)?.await?;
284            assert_eq!(result, Mask::new_false(5));
285
286            Ok(())
287        })
288    }
289
290    #[test]
291    fn no_pruning_when_filter_in_range() -> VortexResult<()> {
292        block_on(|handle| async {
293            let ctx = ArrayContext::empty();
294            let segments = Arc::new(TestSegments::default());
295            let (ptr, eof) = SequenceId::root().split();
296            let struct_array = StructArray::from_fields(
297                [("col", buffer![1i32, 2, 3, 4, 5].into_array())].as_slice(),
298            )?;
299            let strategy = TableStrategy::new(
300                Arc::new(FlatLayoutStrategy::default()),
301                Arc::new(FlatLayoutStrategy::default()),
302            );
303            let layout = strategy
304                .write_stream(
305                    ctx,
306                    segments.clone(),
307                    struct_array.into_array().to_array_stream().sequenced(ptr),
308                    eof,
309                    handle,
310                )
311                .await?;
312
313            let child = layout.new_reader("".into(), segments, &SESSION)?;
314
315            let reader =
316                FileStatsLayoutReader::new(child, test_file_stats(0, 100), SESSION.clone());
317
318            // col > 50 should NOT be prunable since max is 100 (some rows could match).
319            let expr = gt(get_item("col", root()), lit(50i32));
320            let mask = Mask::new_true(5);
321            let result = reader.pruning_evaluation(&(0..5), &expr, mask)?.await?;
322            // Should delegate to child, which returns the mask unchanged (struct reader doesn't prune).
323            assert_eq!(result, Mask::new_true(5));
324
325            Ok(())
326        })
327    }
328}