Skip to main content

datafusion_pruning/
pruning_predicate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27    array::{ArrayRef, BooleanArray, new_null_array},
28    datatypes::{DataType, Field, Schema, SchemaRef},
29    record_batch::{RecordBatch, RecordBatchOptions},
30};
31// pub use for backwards compatibility
32pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::{TransformedResult, TreeNodeRecursion};
39use datafusion_common::{Column, DFSchema, assert_eq_or_internal_err};
40use datafusion_common::{
41    ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err,
42    tree_node::{Transformed, TreeNode},
43};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::expressions::CastColumnExpr;
46use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
47use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
48use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
49use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
50
51/// Used to prove that arbitrary predicates (boolean expression) can not
52/// possibly evaluate to `true` given information about a column provided by
53/// [`PruningStatistics`].
54///
55/// # Introduction
56///
57/// `PruningPredicate` analyzes filter expressions using statistics such as
58/// min/max values and null counts, attempting to prove a "container" (e.g.
59/// Parquet Row Group) can be skipped without reading the actual data,
60/// potentially leading to significant performance improvements.
61///
62/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
63/// on the min/max values found in the Parquet metadata. If the
64/// `PruningPredicate` can prove that the filter can never evaluate to `true`
65/// for any row in the Row Group, the entire Row Group is skipped during query
66/// execution.
67///
68/// The `PruningPredicate` API is general, and can be used for pruning other
69/// types of containers (e.g. files) based on statistics that may be known from
70/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
71/// subtle topic.  See the Background and Implementation section for details.
72///
73/// `PruningPredicate` supports:
74///
75/// 1. Arbitrary expressions (including user defined functions)
76///
77/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
78///    so it is suitable for pruning 1000s of containers.
79///
80/// 3. Any source of information that implements the [`PruningStatistics`] trait
81///    (not just Parquet metadata).
82///
83/// # Example
84///
85/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
86/// example of how to use `PruningPredicate` to prune files based on min/max
87/// values.
88///
89/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/query_planning/pruning.rs
90///
91/// Given an expression like `x = 5` and statistics for 3 containers (Row
92/// Groups, files, etc) `A`, `B`, and `C`:
93///
94/// ```text
95///   A: {x_min = 0, x_max = 4}
96///   B: {x_min = 2, x_max = 10}
97///   C: {x_min = 5, x_max = 8}
98/// ```
99///
100/// `PruningPredicate` will conclude that the rows in container `A` can never
101/// be true (as the maximum value is only `4`), so it can be pruned:
102///
103/// ```text
104/// A: false (no rows could possibly match x = 5)
105/// B: true  (rows might match x = 5)
106/// C: true  (rows might match x = 5)
107/// ```
108///
109/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
110///
111/// # Background
112///
113/// ## Boolean Tri-state logic
114///
115/// To understand the details of the rest of this documentation, it is important
116/// to understand how the tri-state boolean logic in SQL works. As this is
117/// somewhat esoteric, we review it here.
118///
119/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
120/// uncertainty propagates through expressions. SQL `NULL` behaves very
121/// differently than the `NULL` in most other languages where it is a special,
122/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
123/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
124/// first encountered as they behave differently than most programmers may
125/// expect.
126///
127/// In most other programming languages,
128/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
129/// * `a == NULL` evaluates to `false` if `a` has any other value
130///
131/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
132/// `false`):
133///
134/// Expression    | Result
135/// ------------- | ---------
136/// `1 = NULL`    | `NULL`
137/// `NULL = NULL` | `NULL`
138///
139/// Also important is how `AND` and `OR` works with tri-state boolean logic as
140/// (perhaps counterintuitively) the result is **not** always NULL. While
141/// consistent with the notion of `NULL` representing “unknown”, this is again,
142/// often deeply confusing 🤯 when first encountered.
143///
144/// Expression       | Result    | Intuition
145/// ---------------  | --------- | -----------
146/// `NULL AND true`  |   `NULL`  | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
147/// `NULL AND false` |  `false`  | If the `NULL` was either `true` or `false` the overall expression is still `false`
148/// `NULL AND NULL`  | `NULL`    |
149///
150/// Expression      | Result    | Intuition
151/// --------------- | --------- | ----------
152/// `NULL OR true`  | `true`    |  If the `NULL` was either `true` or `false` the overall expression is still `true`
153/// `NULL OR false` | `NULL`    |  The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
154/// `NULL OR NULL`  |  `NULL`   |
155///
156/// ## SQL Filter Semantics
157///
158/// The SQL `WHERE` clause has a boolean expression, often called a filter or
159/// predicate. The semantics of this predicate are that the query evaluates the
160/// predicate for each row in the input tables and:
161///
162/// * Rows that evaluate to `true` are returned in the query results
163///
164/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
165///
166/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
167///   Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
168///   in the rewritten predicate described below.*
169///
170/// # `PruningPredicate` Implementation
171///
172/// Armed with the information in the Background section, we can now understand
173/// how the `PruningPredicate` logic works.
174///
175/// ## Interface
176///
177/// **Inputs**
178/// 1. An input schema describing what columns exist
179///
180/// 2. A predicate (expression that evaluates to a boolean)
181///
182/// 3. [`PruningStatistics`] that provides information about columns in that
183///    schema, for multiple “containers”. For each column in each container, it
184///    provides optional information on contained values, min_values, max_values,
185///    null_counts counts, and row_counts counts.
186///
187/// **Outputs**:
188/// A (non null) boolean value for each container:
189/// * `true`: There MAY be rows that match the predicate
190///
191/// * `false`: There are no rows that could possibly match the predicate (the
192///   predicate can never possibly be true). The container can be pruned (skipped)
193///   entirely.
194///
195/// While `PruningPredicate` will never return a `NULL` value, the
196/// rewritten predicate (as returned by `build_predicate_expression` and used internally
197/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
198/// or null / row counts are not known.
199///
200/// In order to be correct, `PruningPredicate` must return false
201/// **only** if it can determine that for all rows in the container, the
202/// predicate could never evaluate to `true` (always evaluates to either `NULL`
203/// or `false`).
204///
205/// ## Contains Analysis and Min/Max Rewrite
206///
207/// `PruningPredicate` works by first analyzing the predicate to see what
208/// [`LiteralGuarantee`] must hold for the predicate to be true.
209///
210/// Then, the `PruningPredicate` rewrites the original predicate into an
211/// expression that references the min/max values of each column in the original
212/// predicate.
213///
214/// When the min/max values are actually substituted in to this expression and
215/// evaluated, the result means
216///
217/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
218///
219/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
220///   Note that rewritten predicate can evaluate to NULL when some of
221///   the min/max values are not known. *Note that this is different than
222///   the SQL filter semantics where `NULL` means the row is filtered
223///   out.*
224///
225/// * `false`: there are no rows that could possibly match the predicate,
226///   **PRUNES** the container
227///
228/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
229/// `x_row_count` represent the minimum and maximum values, the null count of
230/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
231/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
232/// is known to be all `NULL`s. Note this is different from knowing nothing about
233/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
234/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
235///
236/// Here are some examples of the rewritten predicates:
237///
238/// Original Predicate | Rewritten Predicate
239/// ------------------ | --------------------
240/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
241/// `x < 5` | `x_null_count != x_row_count AND (x_min < 5)`
242/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
243/// `x IS NULL`  | `x_null_count > 0`
244/// `x IS NOT NULL`  | `x_null_count != row_count`
245/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
246///
247/// ## Predicate Evaluation
248/// The PruningPredicate works in two passes
249///
250/// **First pass**:  For each `LiteralGuarantee` calls
251/// [`PruningStatistics::contained`] and rules out containers where the
252/// LiteralGuarantees are not satisfied
253///
254/// **Second Pass**: Evaluates the rewritten expression using the
255/// min/max/null_counts/row_counts values for each column for each container. For any
256/// container that this expression evaluates to `false`, it rules out those
257/// containers.
258///
259///
260/// ### Example 1
261///
262/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
263///
264/// ```sql
265/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
266/// AND
267/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
268/// ```
269///
270/// If we know that for a given container, `x` is between `1 and 100` and we know that
271/// `y` is between `4` and `7`, we know nothing about the null count and row count of
272/// `x` and `y`, the input statistics might look like:
273///
274/// Column   | Value
275/// -------- | -----
276/// `x_min`  | `1`
277/// `x_max`  | `100`
278/// `x_null_count` | `null`
279/// `x_row_count`  | `null`
280/// `y_min`  | `4`
281/// `y_max`  | `7`
282/// `y_null_count` | `null`
283/// `y_row_count`  | `null`
284///
285/// When these statistics values are substituted in to the rewritten predicate and
286/// simplified, the result is `false`:
287///
288/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
289/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
290/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
291/// * evaluating the clauses further we get:
292/// * `null and true and null and false`
293/// * `null and false`
294/// * `false`
295///
296/// Returning `false` means the container can be pruned, which matches the
297/// intuition that  `x = 5 AND y = 10` can’t be true for any row if all values of `y`
298/// are `7` or less.
299///
300/// Note that if we had ended up with `null AND true AND null AND true` the result
301/// would have been `null`.
302/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
303///
304/// If, for some other container, we knew `y` was between the values `4` and
305/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
306/// left as an exercise to the reader -- are you still here?), and the container
307/// **could not** be pruned. The intuition is that there may be rows where the
308/// predicate *might* evaluate to `true`, and the only way to find out is to do
309/// more analysis, for example by actually reading the data and evaluating the
310/// predicate row by row.
311///
312/// ### Example 2
313///
314/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
315/// look like the same as example 1:
316///
317/// ```sql
318/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
319/// AND
320/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
321/// ```
322///
323/// If we know that for another given container, `x_min` is NULL and `x_max` is
324/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
325///  is `100`; we know that `y` is between `4` and `7`, but we know nothing about
326/// the null count and row count of `y`. The input statistics might look like:
327///
328/// Column   | Value
329/// -------- | -----
330/// `x_min`  | `null`
331/// `x_max`  | `null`
332/// `x_null_count` | `100`
333/// `x_row_count`  | `100`
334/// `y_min`  | `4`
335/// `y_max`  | `7`
336/// `y_null_count` | `null`
337/// `y_row_count`  | `null`
338///
339/// When these statistics values are substituted in to the rewritten predicate and
340/// simplified, the result is `false`:
341///
342/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
343/// * `false AND null AND null AND false`
344/// * `false AND false`
345/// * `false`
346///
347/// Returning `false` means the container can be pruned, which matches the
348/// intuition that  `x = 5 AND y = 10` can’t be true because all values in `x`
349/// are known to be NULL.
350///
351/// # Related Work
352///
353/// [`PruningPredicate`] implements the type of min/max pruning described in
354/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
355/// described by various research such as [small materialized aggregates], [zone
356/// maps], and [data skipping].
357///
358/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
359/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
360/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
361/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
362#[derive(Debug, Clone)]
363pub struct PruningPredicate {
364    /// The input schema against which the predicate will be evaluated
365    schema: SchemaRef,
366    /// A min/max pruning predicate (rewritten in terms of column min/max
367    /// values, which are supplied by statistics)
368    predicate_expr: Arc<dyn PhysicalExpr>,
369    /// Description of which statistics are required to evaluate `predicate_expr`
370    required_columns: RequiredColumns,
371    /// Original physical predicate from which this predicate expr is derived
372    /// (required for serialization)
373    orig_expr: Arc<dyn PhysicalExpr>,
374    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
375    /// evaluate to `true`.
376    ///
377    /// See [`PruningPredicate::literal_guarantees`] for more details.
378    literal_guarantees: Vec<LiteralGuarantee>,
379}
380
381/// Build a pruning predicate from an optional predicate expression.
382/// If the predicate is None or the predicate cannot be converted to a pruning
383/// predicate, return None.
384/// If there is an error creating the pruning predicate it is recorded by incrementing
385/// the `predicate_creation_errors` counter.
386pub fn build_pruning_predicate(
387    predicate: Arc<dyn PhysicalExpr>,
388    file_schema: &SchemaRef,
389    predicate_creation_errors: &Count,
390) -> Option<Arc<PruningPredicate>> {
391    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
392        Ok(pruning_predicate) => {
393            if !pruning_predicate.always_true() {
394                return Some(Arc::new(pruning_predicate));
395            }
396        }
397        Err(e) => {
398            debug!("Could not create pruning predicate for: {e}");
399            predicate_creation_errors.add(1);
400        }
401    }
402    None
403}
404
405/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
406/// complex expressions or predicates that reference columns that are not in the
407/// schema.
408pub trait UnhandledPredicateHook {
409    /// Called when a predicate can not be rewritten in terms of statistics or
410    /// references a column that is not in the schema.
411    fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
412}
413
414/// The default handling for unhandled predicates is to return a constant `true`
415/// (meaning don't prune the container)
416#[derive(Debug, Clone)]
417struct ConstantUnhandledPredicateHook {
418    default: Arc<dyn PhysicalExpr>,
419}
420
421impl Default for ConstantUnhandledPredicateHook {
422    fn default() -> Self {
423        Self {
424            default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
425        }
426    }
427}
428
429impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
430    fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
431        Arc::clone(&self.default)
432    }
433}
434
435impl PruningPredicate {
436    /// Try to create a new instance of [`PruningPredicate`]
437    ///
438    /// This will translate the provided `expr` filter expression into
439    /// a *pruning predicate*.
440    ///
441    /// A pruning predicate is one that has been rewritten in terms of
442    /// the min and max values of column references and that evaluates
443    /// to FALSE if the filter predicate would evaluate FALSE *for
444    /// every row* whose values fell within the min / max ranges (aka
445    /// could be pruned).
446    ///
447    /// The pruning predicate evaluates to TRUE or NULL
448    /// if the filter predicate *might* evaluate to TRUE for at least
449    /// one row whose values fell within the min/max ranges (in other
450    /// words they might pass the predicate)
451    ///
452    /// For example, the filter expression `(column / 2) = 4` becomes
453    /// the pruning predicate
454    /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
455    ///
456    /// See the struct level documentation on [`PruningPredicate`] for more
457    /// details.
458    ///
459    /// Note that `PruningPredicate` does not attempt to normalize or simplify
460    /// the input expression unless calling [`snapshot_physical_expr_opt`]
461    /// returns a new expression.
462    /// It is recommended that you pass the expressions through [`PhysicalExprSimplifier`]
463    /// before calling this method to make sure the expressions can be used for pruning.
464    pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
465        // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`.
466        // In particular this unravels any `DynamicFilterPhysicalExpr`s by snapshotting them
467        // so that PruningPredicate can work with a static expression.
468        let tf = snapshot_physical_expr_opt(expr)?;
469        if tf.transformed {
470            // If we had an expression such as Dynamic(part_col < 5 and col < 10)
471            // (this could come from something like `select * from t order by part_col, col, limit 10`)
472            // after snapshotting and because `DynamicFilterPhysicalExpr` applies child replacements to its
473            // children after snapshotting and previously `replace_columns_with_literals` may have been called with partition values
474            // the expression we have now is `8 < 5 and col < 10`.
475            // Thus we need as simplifier pass to get `false and col < 10` => `false` here.
476            let simplifier = PhysicalExprSimplifier::new(&schema);
477            expr = simplifier.simplify(tf.data)?;
478        } else {
479            expr = tf.data;
480        }
481        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
482
483        // build predicate expression once
484        let mut required_columns = RequiredColumns::new();
485        let predicate_expr = build_predicate_expression(
486            &expr,
487            &schema,
488            &mut required_columns,
489            &unhandled_hook,
490        );
491        let predicate_schema = required_columns.schema();
492        // Simplify the newly created predicate to get rid of redundant casts, comparisons, etc.
493        let predicate_expr =
494            PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
495        let literal_guarantees = LiteralGuarantee::analyze(&expr);
496
497        Ok(Self {
498            schema,
499            predicate_expr,
500            required_columns,
501            orig_expr: expr,
502            literal_guarantees,
503        })
504    }
505
506    /// For each set of statistics, evaluates the pruning predicate
507    /// and returns a `bool` with the following meaning for a
508    /// all rows whose values match the statistics:
509    ///
510    /// `true`: There MAY be rows that match the predicate
511    ///
512    /// `false`: There are no rows that could possibly match the predicate
513    ///
514    /// Note: the predicate passed to `prune` should already be simplified as
515    /// much as possible (e.g. this pass doesn't handle some
516    /// expressions like `b = false`, but it does handle the
517    /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
518    ///
519    /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
520    pub fn prune<S: PruningStatistics + ?Sized>(
521        &self,
522        statistics: &S,
523    ) -> Result<Vec<bool>> {
524        let mut builder = BoolVecBuilder::new(statistics.num_containers());
525
526        // Try to prove the predicate can't be true for the containers based on
527        // literal guarantees
528        for literal_guarantee in &self.literal_guarantees {
529            let LiteralGuarantee {
530                column,
531                guarantee,
532                literals,
533            } = literal_guarantee;
534            if let Some(results) = statistics.contained(column, literals) {
535                match guarantee {
536                    // `In` means the values in the column must be one of the
537                    // values in the set for the predicate to evaluate to true.
538                    // If `contained` returns false, that means the column is
539                    // not any of the values so we can prune the container
540                    Guarantee::In => builder.combine_array(&results),
541                    // `NotIn` means the values in the column must not be
542                    // any of the values in the set for the predicate to
543                    // evaluate to true. If `contained` returns true, it means the
544                    // column is only in the set of values so we can prune the
545                    // container
546                    Guarantee::NotIn => {
547                        builder.combine_array(&arrow::compute::not(&results)?)
548                    }
549                }
550                // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
551                // can return early without evaluating the rest of predicates.
552                if builder.check_all_pruned() {
553                    return Ok(builder.build());
554                }
555            }
556        }
557
558        // Next, try to prove the predicate can't be true for the containers based
559        // on min/max values
560
561        // build a RecordBatch that contains the min/max values in the
562        // appropriate statistics columns for the min/max predicate
563        let statistics_batch =
564            build_statistics_record_batch(statistics, &self.required_columns)?;
565
566        // Evaluate the pruning predicate on that record batch and append any results to the builder
567        builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
568
569        Ok(builder.build())
570    }
571
572    /// Return a reference to the input schema
573    pub fn schema(&self) -> &SchemaRef {
574        &self.schema
575    }
576
577    /// Returns a reference to the physical expr used to construct this pruning predicate
578    pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
579        &self.orig_expr
580    }
581
582    /// Returns a reference to the predicate expr
583    pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
584        &self.predicate_expr
585    }
586
587    /// Returns a reference to the literal guarantees
588    ///
589    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
590    /// expression to possibly be `true`. If any is not satisfied, the
591    /// expression is guaranteed to be `null` or `false`.
592    pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
593        &self.literal_guarantees
594    }
595
596    /// Returns true if this pruning predicate can not prune anything.
597    ///
598    /// This happens if the predicate is a literal `true`  and
599    /// literal_guarantees is empty.
600    ///
601    /// This can happen when a predicate is simplified to a constant `true`
602    pub fn always_true(&self) -> bool {
603        is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
604    }
605
606    pub fn required_columns(&self) -> &RequiredColumns {
607        &self.required_columns
608    }
609
610    /// Names of the columns that are known to be / not be in a set
611    /// of literals (constants). These are the columns the that may be passed to
612    /// [`PruningStatistics::contained`] during pruning.
613    ///
614    /// This is useful to avoid fetching statistics for columns that will not be
615    /// used in the predicate. For example, it can be used to avoid reading
616    /// unneeded bloom filters (a non trivial operation).
617    pub fn literal_columns(&self) -> Vec<String> {
618        let mut seen = HashSet::new();
619        self.literal_guarantees
620            .iter()
621            .map(|e| &e.column.name)
622            // avoid duplicates
623            .filter(|name| seen.insert(*name))
624            .map(|s| s.to_string())
625            .collect()
626    }
627}
628
629/// Builds the return `Vec` for [`PruningPredicate::prune`].
630#[derive(Debug)]
631struct BoolVecBuilder {
632    /// One element per container. Each element is
633    /// * `true`: if the container has row that may pass the predicate
634    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
635    inner: Vec<bool>,
636}
637
638impl BoolVecBuilder {
639    /// Create a new `BoolVecBuilder` with `num_containers` elements
640    fn new(num_containers: usize) -> Self {
641        Self {
642            // assume by default all containers may pass the predicate
643            inner: vec![true; num_containers],
644        }
645    }
646
647    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
648    /// predicate into the currently in progress array.
649    ///
650    /// Each `array` element is:
651    /// * `true`: container has row that may pass the predicate
652    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
653    /// * `null`: container may or may not have rows that pass the predicate
654    fn combine_array(&mut self, array: &BooleanArray) {
655        assert_eq!(array.len(), self.inner.len());
656        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
657            // `false` for this conjunct means we know for sure no rows could
658            // pass the predicate and thus we set the corresponding container
659            // location to false.
660            if let Some(false) = new {
661                *cur = false;
662            }
663        }
664    }
665
666    /// Combines the results in the [`ColumnarValue`] to the currently in
667    /// progress array, following the same rules as [`Self::combine_array`].
668    ///
669    /// # Panics
670    /// If `value` is not boolean
671    fn combine_value(&mut self, value: ColumnarValue) {
672        match value {
673            ColumnarValue::Array(array) => {
674                self.combine_array(array.as_boolean());
675            }
676            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
677                // False means all containers can not pass the predicate
678                self.inner = vec![false; self.inner.len()];
679            }
680            _ => {
681                // Null or true means the rows in container may pass this
682                // conjunct so we can't prune any containers based on that
683            }
684        }
685    }
686
687    /// Convert this builder into a Vec of bools
688    fn build(self) -> Vec<bool> {
689        self.inner
690    }
691
692    /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
693    fn check_all_pruned(&self) -> bool {
694        self.inner.iter().all(|&x| !x)
695    }
696}
697
698fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
699    expr.as_any()
700        .downcast_ref::<phys_expr::Literal>()
701        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
702        .unwrap_or_default()
703}
704
705fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
706    expr.as_any()
707        .downcast_ref::<phys_expr::Literal>()
708        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
709        .unwrap_or_default()
710}
711
712/// Describes which columns statistics are necessary to evaluate a
713/// [`PruningPredicate`].
714///
715/// This structure permits reading and creating the minimum number statistics,
716/// which is important since statistics may be non trivial to read (e.g. large
717/// strings or when there are 1000s of columns).
718///
719/// Handles creating references to the min/max statistics
720/// for columns as well as recording which statistics are needed
721#[derive(Debug, Default, Clone)]
722pub struct RequiredColumns {
723    /// The statistics required to evaluate this predicate:
724    /// * The unqualified column in the input schema
725    /// * Statistics type (e.g. Min or Max or Null_Count)
726    /// * The field the statistics value should be placed in for
727    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
728    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
729}
730
731impl RequiredColumns {
732    fn new() -> Self {
733        Self::default()
734    }
735
736    /// Returns Some(column) if this is a single column predicate.
737    ///
738    /// Returns None if this is a multi-column predicate.
739    ///
740    /// Examples:
741    /// * `a > 5 OR a < 10` returns `Some(a)`
742    /// * `a > 5 OR b < 10` returns `None`
743    /// * `true` returns None
744    pub fn single_column(&self) -> Option<&phys_expr::Column> {
745        if self.columns.windows(2).all(|w| {
746            // check if all columns are the same (ignoring statistics and field)
747            let c1 = &w[0].0;
748            let c2 = &w[1].0;
749            c1 == c2
750        }) {
751            self.columns.first().map(|r| &r.0)
752        } else {
753            None
754        }
755    }
756
757    /// Returns a schema that describes the columns required to evaluate this
758    /// pruning predicate.
759    /// The schema contains the fields for each column in `self.columns` with
760    /// the appropriate data type for the statistics.
761    /// Order matters, this same order is used to evaluate the
762    /// pruning predicate.
763    fn schema(&self) -> Schema {
764        let fields = self
765            .columns
766            .iter()
767            .map(|(_c, _t, f)| f.clone())
768            .collect::<Vec<_>>();
769        Schema::new(fields)
770    }
771
772    /// Returns an iterator over items in columns (see doc on
773    /// `self.columns` for details)
774    pub(crate) fn iter(
775        &self,
776    ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
777        self.columns.iter()
778    }
779
780    fn find_stat_column(
781        &self,
782        column: &phys_expr::Column,
783        statistics_type: StatisticsType,
784    ) -> Option<usize> {
785        match statistics_type {
786            StatisticsType::RowCount => {
787                // Use the first row count we find, if any
788                self.columns
789                    .iter()
790                    .enumerate()
791                    .find(|(_i, (_c, t, _f))| t == &statistics_type)
792                    .map(|(i, (_c, _t, _f))| i)
793            }
794            _ => self
795                .columns
796                .iter()
797                .enumerate()
798                .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
799                .map(|(i, (_c, _t, _f))| i),
800        }
801    }
802
803    /// Rewrites column_expr so that all appearances of column
804    /// are replaced with a reference to either the min or max
805    /// statistics column, while keeping track that a reference to the statistics
806    /// column is required
807    ///
808    /// for example, an expression like `col("foo") > 5`, when called
809    /// with Max would result in an expression like `col("foo_max") >
810    /// 5` with the appropriate entry noted in self.columns
811    fn stat_column_expr(
812        &mut self,
813        column: &phys_expr::Column,
814        column_expr: &Arc<dyn PhysicalExpr>,
815        field: &Field,
816        stat_type: StatisticsType,
817    ) -> Result<Arc<dyn PhysicalExpr>> {
818        let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
819            Some(idx) => (idx, false),
820            None => (self.columns.len(), true),
821        };
822
823        let column_name = column.name();
824        let stat_column_name = match stat_type {
825            StatisticsType::Min => format!("{column_name}_min"),
826            StatisticsType::Max => format!("{column_name}_max"),
827            StatisticsType::NullCount => format!("{column_name}_null_count"),
828            StatisticsType::RowCount => "row_count".to_string(),
829        };
830
831        let stat_column = phys_expr::Column::new(&stat_column_name, idx);
832
833        // only add statistics column if not previously added
834        if need_to_insert {
835            // may be null if statistics are not present
836            let nullable = true;
837            let stat_field =
838                Field::new(stat_column.name(), field.data_type().clone(), nullable);
839            self.columns.push((column.clone(), stat_type, stat_field));
840        }
841        rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
842    }
843
844    /// rewrite col --> col_min
845    fn min_column_expr(
846        &mut self,
847        column: &phys_expr::Column,
848        column_expr: &Arc<dyn PhysicalExpr>,
849        field: &Field,
850    ) -> Result<Arc<dyn PhysicalExpr>> {
851        self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
852    }
853
854    /// rewrite col --> col_max
855    fn max_column_expr(
856        &mut self,
857        column: &phys_expr::Column,
858        column_expr: &Arc<dyn PhysicalExpr>,
859        field: &Field,
860    ) -> Result<Arc<dyn PhysicalExpr>> {
861        self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
862    }
863
864    /// rewrite col --> col_null_count
865    fn null_count_column_expr(
866        &mut self,
867        column: &phys_expr::Column,
868        column_expr: &Arc<dyn PhysicalExpr>,
869        field: &Field,
870    ) -> Result<Arc<dyn PhysicalExpr>> {
871        self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
872    }
873
874    /// rewrite col --> col_row_count
875    fn row_count_column_expr(
876        &mut self,
877        column: &phys_expr::Column,
878        column_expr: &Arc<dyn PhysicalExpr>,
879        field: &Field,
880    ) -> Result<Arc<dyn PhysicalExpr>> {
881        self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
882    }
883}
884
885impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
886    fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
887        Self { columns }
888    }
889}
890
891/// Build a RecordBatch from a list of statistics, creating arrays,
892/// with one row for each PruningStatistics and columns specified in
893/// the required_columns parameter.
894///
895/// For example, if the requested columns are
896/// ```text
897/// ("s1", Min, Field:s1_min)
898/// ("s2", Max, field:s2_max)
899/// ```
900///
901/// And the input statistics had
902/// ```text
903/// S1(Min: 5, Max: 10)
904/// S2(Min: 99, Max: 1000)
905/// S3(Min: 1, Max: 2)
906/// ```
907///
908/// Then this function would build a record batch with 2 columns and
909/// one row s1_min and s2_max as follows (s3 is not requested):
910///
911/// ```text
912/// s1_min | s2_max
913/// -------+--------
914///   5    | 1000
915/// ```
916fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
917    statistics: &S,
918    required_columns: &RequiredColumns,
919) -> Result<RecordBatch> {
920    let mut arrays = Vec::<ArrayRef>::new();
921    // For each needed statistics column:
922    for (column, statistics_type, stat_field) in required_columns.iter() {
923        let column = Column::from_name(column.name());
924        let data_type = stat_field.data_type();
925
926        let num_containers = statistics.num_containers();
927
928        let array = match statistics_type {
929            StatisticsType::Min => statistics.min_values(&column),
930            StatisticsType::Max => statistics.max_values(&column),
931            StatisticsType::NullCount => statistics.null_counts(&column),
932            StatisticsType::RowCount => statistics.row_counts(&column),
933        };
934        let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
935
936        assert_eq_or_internal_err!(
937            num_containers,
938            array.len(),
939            "mismatched statistics length. Expected {}, got {}",
940            num_containers,
941            array.len()
942        );
943
944        // cast statistics array to required data type (e.g. parquet
945        // provides timestamp statistics as "Int64")
946        let array = arrow::compute::cast(&array, data_type)?;
947
948        arrays.push(array);
949    }
950
951    let schema = Arc::new(required_columns.schema());
952    // provide the count in case there were no needed statistics
953    let mut options = RecordBatchOptions::default();
954    options.row_count = Some(statistics.num_containers());
955
956    trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
957
958    RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
959        plan_datafusion_err!("Can not create statistics record batch: {err}")
960    })
961}
962
963struct PruningExpressionBuilder<'a> {
964    column: phys_expr::Column,
965    column_expr: Arc<dyn PhysicalExpr>,
966    op: Operator,
967    scalar_expr: Arc<dyn PhysicalExpr>,
968    field: &'a Field,
969    required_columns: &'a mut RequiredColumns,
970}
971
972impl<'a> PruningExpressionBuilder<'a> {
973    fn try_new(
974        left: &'a Arc<dyn PhysicalExpr>,
975        right: &'a Arc<dyn PhysicalExpr>,
976        left_columns: ColumnReferenceCount,
977        right_columns: ColumnReferenceCount,
978        op: Operator,
979        schema: &'a SchemaRef,
980        required_columns: &'a mut RequiredColumns,
981    ) -> Result<Self> {
982        // find column name; input could be a more complicated expression
983        let (column_expr, scalar_expr, column, correct_operator) = match (
984            left_columns,
985            right_columns,
986        ) {
987            (ColumnReferenceCount::One(column), ColumnReferenceCount::Zero) => {
988                (left, right, column, op)
989            }
990            (ColumnReferenceCount::Zero, ColumnReferenceCount::One(column)) => {
991                (right, left, column, reverse_operator(op)?)
992            }
993            (ColumnReferenceCount::One(_), ColumnReferenceCount::One(_)) => {
994                // both sides have one column - not supported
995                return plan_err!(
996                    "Expression not supported for pruning: left has 1 column, right has 1 column"
997                );
998            }
999            (ColumnReferenceCount::Zero, ColumnReferenceCount::Zero) => {
1000                // both sides are literals - should be handled before calling try_new
1001                return plan_err!(
1002                    "Pruning literal expressions is not supported, please call PhysicalExprSimplifier first"
1003                );
1004            }
1005            (ColumnReferenceCount::Many, _) | (_, ColumnReferenceCount::Many) => {
1006                return plan_err!(
1007                    "Expression not supported for pruning: left or right has multiple columns"
1008                );
1009            }
1010        };
1011
1012        let df_schema = DFSchema::try_from(Arc::clone(schema))?;
1013        let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1014            column_expr,
1015            correct_operator,
1016            scalar_expr,
1017            df_schema,
1018        )?;
1019        let field = match schema.column_with_name(column.name()) {
1020            Some((_, f)) => f,
1021            _ => {
1022                return plan_err!("Field not found in schema");
1023            }
1024        };
1025
1026        Ok(Self {
1027            column,
1028            column_expr,
1029            op: correct_operator,
1030            scalar_expr,
1031            field,
1032            required_columns,
1033        })
1034    }
1035
1036    fn op(&self) -> Operator {
1037        self.op
1038    }
1039
1040    fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1041        &self.scalar_expr
1042    }
1043
1044    fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1045        self.required_columns
1046            .min_column_expr(&self.column, &self.column_expr, self.field)
1047    }
1048
1049    fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1050        self.required_columns
1051            .max_column_expr(&self.column, &self.column_expr, self.field)
1052    }
1053
1054    /// This function is to simply retune the `null_count` physical expression no matter what the
1055    /// predicate expression is
1056    ///
1057    /// i.e., x > 5 => x_null_count,
1058    ///       cast(x as int) < 10 => x_null_count,
1059    ///       try_cast(x as float) < 10.0 => x_null_count
1060    fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1061        // Retune to [`phys_expr::Column`]
1062        let column_expr = Arc::new(self.column.clone()) as _;
1063
1064        // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1065        let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1066
1067        self.required_columns.null_count_column_expr(
1068            &self.column,
1069            &column_expr,
1070            null_count_field,
1071        )
1072    }
1073
1074    /// This function is to simply retune the `row_count` physical expression no matter what the
1075    /// predicate expression is
1076    ///
1077    /// i.e., x > 5 => x_row_count,
1078    ///       cast(x as int) < 10 => x_row_count,
1079    ///       try_cast(x as float) < 10.0 => x_row_count
1080    fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1081        // Retune to [`phys_expr::Column`]
1082        let column_expr = Arc::new(self.column.clone()) as _;
1083
1084        // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1085        let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1086
1087        self.required_columns.row_count_column_expr(
1088            &self.column,
1089            &column_expr,
1090            row_count_field,
1091        )
1092    }
1093}
1094
1095/// This function is designed to rewrite the column_expr to
1096/// ensure the column_expr is monotonically increasing.
1097///
1098/// For example,
1099/// 1. `col > 10`
1100/// 2. `-col > 10` should be rewritten to `col < -10`
1101/// 3. `!col = true` would be rewritten to `col = !true`
1102/// 4. `abs(a - 10) > 0` not supported
1103/// 5. `cast(can_prunable_expr) > 10`
1104/// 6. `try_cast(can_prunable_expr) > 10`
1105///
1106/// More rewrite rules are still in progress.
1107fn rewrite_expr_to_prunable(
1108    column_expr: &PhysicalExprRef,
1109    op: Operator,
1110    scalar_expr: &PhysicalExprRef,
1111    schema: DFSchema,
1112) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1113    if !is_compare_op(op) {
1114        return plan_err!("rewrite_expr_to_prunable only support compare expression");
1115    }
1116
1117    let column_expr_any = column_expr.as_any();
1118
1119    if column_expr_any
1120        .downcast_ref::<phys_expr::Column>()
1121        .is_some()
1122    {
1123        // `col op lit()`
1124        Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1125    } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1126        // `cast(col) op lit()`
1127        let arrow_schema = schema.as_arrow();
1128        let from_type = cast.expr().data_type(arrow_schema)?;
1129        verify_support_type_for_prune(&from_type, cast.cast_type())?;
1130        let (left, op, right) =
1131            rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1132        let left = Arc::new(phys_expr::CastExpr::new(
1133            left,
1134            cast.cast_type().clone(),
1135            None,
1136        ));
1137        Ok((left, op, right))
1138    } else if let Some(cast_col) = column_expr_any.downcast_ref::<CastColumnExpr>() {
1139        // `cast_column(col) op lit()` - same as CastExpr but uses CastColumnExpr
1140        let arrow_schema = schema.as_arrow();
1141        let from_type = cast_col.expr().data_type(arrow_schema)?;
1142        let to_type = cast_col.target_field().data_type();
1143        verify_support_type_for_prune(&from_type, to_type)?;
1144        let (left, op, right) =
1145            rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, schema)?;
1146        // Predicate pruning / statistics generally don't support struct columns yet.
1147        // In the future we may want to support pruning on nested fields, in which case we probably need to
1148        // do something more sophisticated here.
1149        // But for now since we don't support pruning on nested fields, we can just cast to the target type directly.
1150        let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), None));
1151        Ok((left, op, right))
1152    } else if let Some(try_cast) =
1153        column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1154    {
1155        // `try_cast(col) op lit()`
1156        let arrow_schema = schema.as_arrow();
1157        let from_type = try_cast.expr().data_type(arrow_schema)?;
1158        verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1159        let (left, op, right) =
1160            rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1161        let left = Arc::new(phys_expr::TryCastExpr::new(
1162            left,
1163            try_cast.cast_type().clone(),
1164        ));
1165        Ok((left, op, right))
1166    } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1167        // `-col > lit()`  --> `col < -lit()`
1168        let (left, op, right) =
1169            rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1170        let right = Arc::new(phys_expr::NegativeExpr::new(right));
1171        Ok((left, reverse_operator(op)?, right))
1172    } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1173        // `!col = true` --> `col = !true`
1174        if op != Operator::Eq && op != Operator::NotEq {
1175            return plan_err!("Not with operator other than Eq / NotEq is not supported");
1176        }
1177        if not
1178            .arg()
1179            .as_any()
1180            .downcast_ref::<phys_expr::Column>()
1181            .is_some()
1182        {
1183            let left = Arc::clone(not.arg());
1184            let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1185            Ok((left, reverse_operator(op)?, right))
1186        } else {
1187            plan_err!("Not with complex expression {column_expr:?} is not supported")
1188        }
1189    } else {
1190        plan_err!("column expression {column_expr:?} is not supported")
1191    }
1192}
1193
1194fn is_compare_op(op: Operator) -> bool {
1195    matches!(
1196        op,
1197        Operator::Eq
1198            | Operator::NotEq
1199            | Operator::Lt
1200            | Operator::LtEq
1201            | Operator::Gt
1202            | Operator::GtEq
1203            | Operator::LikeMatch
1204            | Operator::NotLikeMatch
1205    )
1206}
1207
1208// The pruning logic is based on the comparing the min/max bounds.
1209// Must make sure the two type has order.
1210// For example, casts from string to numbers is not correct.
1211// Because the "13" is less than "3" with UTF8 comparison order.
1212fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1213    // Dictionary casts are always supported as long as the value types are supported
1214    let from_type = match from_type {
1215        DataType::Dictionary(_, t) => {
1216            return verify_support_type_for_prune(t.as_ref(), to_type);
1217        }
1218        _ => from_type,
1219    };
1220    let to_type = match to_type {
1221        DataType::Dictionary(_, t) => {
1222            return verify_support_type_for_prune(from_type, t.as_ref());
1223        }
1224        _ => to_type,
1225    };
1226    // If both types are strings or both are not strings (number, timestamp, etc)
1227    // then we can compare them.
1228    // PruningPredicate does not support casting of strings to numbers and such.
1229    if from_type.is_string() == to_type.is_string() {
1230        Ok(())
1231    } else {
1232        plan_err!(
1233            "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1234        )
1235    }
1236}
1237
1238/// replaces a column with an old name with a new name in an expression
1239fn rewrite_column_expr(
1240    e: Arc<dyn PhysicalExpr>,
1241    column_old: &phys_expr::Column,
1242    column_new: &phys_expr::Column,
1243) -> Result<Arc<dyn PhysicalExpr>> {
1244    e.transform(|expr| {
1245        if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>()
1246            && column == column_old
1247        {
1248            return Ok(Transformed::yes(Arc::new(column_new.clone())));
1249        }
1250
1251        Ok(Transformed::no(expr))
1252    })
1253    .data()
1254}
1255
1256fn reverse_operator(op: Operator) -> Result<Operator> {
1257    op.swap().ok_or_else(|| {
1258        internal_datafusion_err!(
1259            "Could not reverse operator {op} while building pruning predicate"
1260        )
1261    })
1262}
1263
1264/// Given a column reference to `column`, returns a pruning
1265/// expression in terms of the min and max that will evaluate to true
1266/// if the column may contain values, and false if definitely does not
1267/// contain values
1268fn build_single_column_expr(
1269    column: &phys_expr::Column,
1270    schema: &Schema,
1271    required_columns: &mut RequiredColumns,
1272    is_not: bool, // if true, treat as !col
1273) -> Option<Arc<dyn PhysicalExpr>> {
1274    let field = schema.field_with_name(column.name()).ok()?;
1275
1276    if *field.data_type() == DataType::Boolean {
1277        let col_ref = Arc::new(column.clone()) as _;
1278
1279        let min = required_columns
1280            .min_column_expr(column, &col_ref, field)
1281            .ok()?;
1282        let max = required_columns
1283            .max_column_expr(column, &col_ref, field)
1284            .ok()?;
1285
1286        // remember -- we want an expression that is:
1287        // TRUE: if there may be rows that match
1288        // FALSE: if there are no rows that match
1289        if is_not {
1290            // The only way we know a column couldn't match is if both the min and max are true
1291            // !(min && max)
1292            Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1293                phys_expr::BinaryExpr::new(min, Operator::And, max),
1294            ))))
1295        } else {
1296            // the only way we know a column couldn't match is if both the min and max are false
1297            // !(!min && !max) --> min || max
1298            Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1299        }
1300    } else {
1301        None
1302    }
1303}
1304
1305/// Given an expression reference to `expr`, if `expr` is a column expression,
1306/// returns a pruning expression in terms of IsNull that will evaluate to true
1307/// if the column may contain null, and false if definitely does not
1308/// contain null.
1309/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1310/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1311/// at least one NULL value.  In this case we can know that `IS NOT NULL` can not be true and
1312/// thus can prune the row group / value
1313fn build_is_null_column_expr(
1314    expr: &Arc<dyn PhysicalExpr>,
1315    schema: &Schema,
1316    required_columns: &mut RequiredColumns,
1317    with_not: bool,
1318) -> Option<Arc<dyn PhysicalExpr>> {
1319    if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1320        let field = schema.field_with_name(col.name()).ok()?;
1321
1322        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1323        if with_not {
1324            if let Ok(row_count_expr) =
1325                required_columns.row_count_column_expr(col, expr, null_count_field)
1326            {
1327                required_columns
1328                    .null_count_column_expr(col, expr, null_count_field)
1329                    .map(|null_count_column_expr| {
1330                        // IsNotNull(column) => null_count != row_count
1331                        Arc::new(phys_expr::BinaryExpr::new(
1332                            null_count_column_expr,
1333                            Operator::NotEq,
1334                            row_count_expr,
1335                        )) as _
1336                    })
1337                    .ok()
1338            } else {
1339                None
1340            }
1341        } else {
1342            required_columns
1343                .null_count_column_expr(col, expr, null_count_field)
1344                .map(|null_count_column_expr| {
1345                    // IsNull(column) => null_count > 0
1346                    Arc::new(phys_expr::BinaryExpr::new(
1347                        null_count_column_expr,
1348                        Operator::Gt,
1349                        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1350                    )) as _
1351                })
1352                .ok()
1353        }
1354    } else {
1355        None
1356    }
1357}
1358
1359/// The maximum number of entries in an `InList` that might be rewritten into
1360/// an OR chain
1361const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1362
1363/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1364/// for use as a [`PruningPredicate`].
1365pub struct PredicateRewriter {
1366    unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1367}
1368
1369impl Default for PredicateRewriter {
1370    fn default() -> Self {
1371        Self {
1372            unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1373        }
1374    }
1375}
1376
1377impl PredicateRewriter {
1378    /// Create a new `PredicateRewriter`
1379    pub fn new() -> Self {
1380        Self::default()
1381    }
1382
1383    /// Set the unhandled hook to be used when a predicate can not be rewritten
1384    pub fn with_unhandled_hook(
1385        self,
1386        unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1387    ) -> Self {
1388        Self { unhandled_hook }
1389    }
1390
1391    /// Translate logical filter expression into pruning predicate
1392    /// expression that will evaluate to FALSE if it can be determined no
1393    /// rows between the min/max values could pass the predicates.
1394    ///
1395    /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1396    ///
1397    /// Returns the pruning predicate as an [`PhysicalExpr`]
1398    ///
1399    /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1400    pub fn rewrite_predicate_to_statistics_predicate(
1401        &self,
1402        expr: &Arc<dyn PhysicalExpr>,
1403        schema: &Schema,
1404    ) -> Arc<dyn PhysicalExpr> {
1405        let mut required_columns = RequiredColumns::new();
1406        build_predicate_expression(
1407            expr,
1408            &Arc::new(schema.clone()),
1409            &mut required_columns,
1410            &self.unhandled_hook,
1411        )
1412    }
1413}
1414
1415/// Translate logical filter expression into pruning predicate
1416/// expression that will evaluate to FALSE if it can be determined no
1417/// rows between the min/max values could pass the predicates.
1418///
1419/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1420///
1421/// Returns the pruning predicate as an [`PhysicalExpr`]
1422///
1423/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1424fn build_predicate_expression(
1425    expr: &Arc<dyn PhysicalExpr>,
1426    schema: &SchemaRef,
1427    required_columns: &mut RequiredColumns,
1428    unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1429) -> Arc<dyn PhysicalExpr> {
1430    if is_always_false(expr) {
1431        // Shouldn't return `unhandled_hook.handle(expr)`
1432        // Because it will transfer false to true.
1433        return Arc::clone(expr);
1434    }
1435    // predicate expression can only be a binary expression
1436    let expr_any = expr.as_any();
1437    if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1438        return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1439            .unwrap_or_else(|| unhandled_hook.handle(expr));
1440    }
1441    if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1442        return build_is_null_column_expr(
1443            is_not_null.arg(),
1444            schema,
1445            required_columns,
1446            true,
1447        )
1448        .unwrap_or_else(|| unhandled_hook.handle(expr));
1449    }
1450    if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1451        return build_single_column_expr(col, schema, required_columns, false)
1452            .unwrap_or_else(|| unhandled_hook.handle(expr));
1453    }
1454    if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1455        // match !col (don't do so recursively)
1456        if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1457            return build_single_column_expr(col, schema, required_columns, true)
1458                .unwrap_or_else(|| unhandled_hook.handle(expr));
1459        } else {
1460            return unhandled_hook.handle(expr);
1461        }
1462    }
1463    if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1464        if !in_list.list().is_empty()
1465            && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1466        {
1467            let eq_op = if in_list.negated() {
1468                Operator::NotEq
1469            } else {
1470                Operator::Eq
1471            };
1472            let re_op = if in_list.negated() {
1473                Operator::And
1474            } else {
1475                Operator::Or
1476            };
1477            let change_expr = in_list
1478                .list()
1479                .iter()
1480                .map(|e| {
1481                    Arc::new(phys_expr::BinaryExpr::new(
1482                        Arc::clone(in_list.expr()),
1483                        eq_op,
1484                        Arc::clone(e),
1485                    )) as _
1486                })
1487                .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1488                .unwrap();
1489            return build_predicate_expression(
1490                &change_expr,
1491                schema,
1492                required_columns,
1493                unhandled_hook,
1494            );
1495        } else {
1496            return unhandled_hook.handle(expr);
1497        }
1498    }
1499
1500    let (left, op, right) = {
1501        if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1502            (
1503                Arc::clone(bin_expr.left()),
1504                *bin_expr.op(),
1505                Arc::clone(bin_expr.right()),
1506            )
1507        } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1508            if like_expr.case_insensitive() {
1509                return unhandled_hook.handle(expr);
1510            }
1511            let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1512                (false, false) => Operator::LikeMatch,
1513                (true, false) => Operator::NotLikeMatch,
1514                (false, true) => Operator::ILikeMatch,
1515                (true, true) => Operator::NotILikeMatch,
1516            };
1517            (
1518                Arc::clone(like_expr.expr()),
1519                op,
1520                Arc::clone(like_expr.pattern()),
1521            )
1522        } else {
1523            return unhandled_hook.handle(expr);
1524        }
1525    };
1526
1527    if op == Operator::And || op == Operator::Or {
1528        let left_expr =
1529            build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1530        let right_expr =
1531            build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1532        // simplify boolean expression if applicable
1533        let expr = match (&left_expr, op, &right_expr) {
1534            (left, Operator::And, right)
1535                if is_always_false(left) || is_always_false(right) =>
1536            {
1537                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1538            }
1539            (left, Operator::And, _) if is_always_true(left) => right_expr,
1540            (_, Operator::And, right) if is_always_true(right) => left_expr,
1541            (left, Operator::Or, right)
1542                if is_always_true(left) || is_always_true(right) =>
1543            {
1544                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1545            }
1546            (left, Operator::Or, _) if is_always_false(left) => right_expr,
1547            (_, Operator::Or, right) if is_always_false(right) => left_expr,
1548
1549            _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1550        };
1551        return expr;
1552    }
1553
1554    let left_columns = ColumnReferenceCount::from_expression(&left);
1555    let right_columns = ColumnReferenceCount::from_expression(&right);
1556    let expr_builder = PruningExpressionBuilder::try_new(
1557        &left,
1558        &right,
1559        left_columns,
1560        right_columns,
1561        op,
1562        schema,
1563        required_columns,
1564    );
1565    let mut expr_builder = match expr_builder {
1566        Ok(builder) => builder,
1567        // allow partial failure in predicate expression generation
1568        // this can still produce a useful predicate when multiple conditions are joined using AND
1569        Err(e) => {
1570            debug!("Error building pruning expression: {e}");
1571            return unhandled_hook.handle(expr);
1572        }
1573    };
1574
1575    build_statistics_expr(&mut expr_builder)
1576        .unwrap_or_else(|_| unhandled_hook.handle(expr))
1577}
1578
1579/// Count of distinct column references in an expression.
1580/// This is the same as [`collect_columns`] but optimized to stop counting
1581/// once more than one distinct column is found.
1582///
1583/// For example, in expression `col1 + col2`, the count is `Many`.
1584/// In expression `col1 + 5`, the count is `One`.
1585/// In expression `5 + 10`, the count is `Zero`.
1586///
1587/// [`collect_columns`]: datafusion_physical_expr::utils::collect_columns
1588#[derive(Debug, PartialEq, Eq)]
1589enum ColumnReferenceCount {
1590    /// no column references
1591    Zero,
1592    /// Only one column reference
1593    One(phys_expr::Column),
1594    /// More than one column reference
1595    Many,
1596}
1597
1598impl ColumnReferenceCount {
1599    /// Count the number of distinct column references in an expression
1600    fn from_expression(expr: &Arc<dyn PhysicalExpr>) -> Self {
1601        let mut seen = HashSet::<phys_expr::Column>::new();
1602        expr.apply(|expr| {
1603            if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1604                seen.insert(column.clone());
1605                if seen.len() > 1 {
1606                    return Ok(TreeNodeRecursion::Stop);
1607                }
1608            }
1609            Ok(TreeNodeRecursion::Continue)
1610        })
1611        // pre_visit always returns OK, so this will always too
1612        .expect("no way to return error during recursion");
1613        match seen.len() {
1614            0 => ColumnReferenceCount::Zero,
1615            1 => ColumnReferenceCount::One(
1616                seen.into_iter().next().expect("just checked len==1"),
1617            ),
1618            _ => ColumnReferenceCount::Many,
1619        }
1620    }
1621}
1622
1623fn build_statistics_expr(
1624    expr_builder: &mut PruningExpressionBuilder,
1625) -> Result<Arc<dyn PhysicalExpr>> {
1626    let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1627        Operator::NotEq => {
1628            // column != literal => (min, max) = literal =>
1629            // !(min != literal && max != literal) ==>
1630            // min != literal || literal != max
1631            let min_column_expr = expr_builder.min_column_expr()?;
1632            let max_column_expr = expr_builder.max_column_expr()?;
1633            Arc::new(phys_expr::BinaryExpr::new(
1634                Arc::new(phys_expr::BinaryExpr::new(
1635                    min_column_expr,
1636                    Operator::NotEq,
1637                    Arc::clone(expr_builder.scalar_expr()),
1638                )),
1639                Operator::Or,
1640                Arc::new(phys_expr::BinaryExpr::new(
1641                    Arc::clone(expr_builder.scalar_expr()),
1642                    Operator::NotEq,
1643                    max_column_expr,
1644                )),
1645            ))
1646        }
1647        Operator::Eq => {
1648            // column = literal => (min, max) = literal => min <= literal && literal <= max
1649            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1650            let min_column_expr = expr_builder.min_column_expr()?;
1651            let max_column_expr = expr_builder.max_column_expr()?;
1652            Arc::new(phys_expr::BinaryExpr::new(
1653                Arc::new(phys_expr::BinaryExpr::new(
1654                    min_column_expr,
1655                    Operator::LtEq,
1656                    Arc::clone(expr_builder.scalar_expr()),
1657                )),
1658                Operator::And,
1659                Arc::new(phys_expr::BinaryExpr::new(
1660                    Arc::clone(expr_builder.scalar_expr()),
1661                    Operator::LtEq,
1662                    max_column_expr,
1663                )),
1664            ))
1665        }
1666        Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1667        Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1668            plan_datafusion_err!(
1669                "LIKE expression with wildcard at the beginning is not supported"
1670            )
1671        })?,
1672        Operator::Gt => {
1673            // column > literal => (min, max) > literal => max > literal
1674            Arc::new(phys_expr::BinaryExpr::new(
1675                expr_builder.max_column_expr()?,
1676                Operator::Gt,
1677                Arc::clone(expr_builder.scalar_expr()),
1678            ))
1679        }
1680        Operator::GtEq => {
1681            // column >= literal => (min, max) >= literal => max >= literal
1682            Arc::new(phys_expr::BinaryExpr::new(
1683                expr_builder.max_column_expr()?,
1684                Operator::GtEq,
1685                Arc::clone(expr_builder.scalar_expr()),
1686            ))
1687        }
1688        Operator::Lt => {
1689            // column < literal => (min, max) < literal => min < literal
1690            Arc::new(phys_expr::BinaryExpr::new(
1691                expr_builder.min_column_expr()?,
1692                Operator::Lt,
1693                Arc::clone(expr_builder.scalar_expr()),
1694            ))
1695        }
1696        Operator::LtEq => {
1697            // column <= literal => (min, max) <= literal => min <= literal
1698            Arc::new(phys_expr::BinaryExpr::new(
1699                expr_builder.min_column_expr()?,
1700                Operator::LtEq,
1701                Arc::clone(expr_builder.scalar_expr()),
1702            ))
1703        }
1704        // other expressions are not supported
1705        _ => {
1706            return plan_err!(
1707                "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1708            );
1709        }
1710    };
1711    let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1712    Ok(statistics_expr)
1713}
1714
1715/// returns the string literal of the scalar value if it is a string
1716fn unpack_string(s: &ScalarValue) -> Option<&str> {
1717    s.try_as_str().flatten()
1718}
1719
1720fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1721    if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1722        let s = unpack_string(lit.value())?;
1723        return Some(s);
1724    }
1725    None
1726}
1727
1728/// Convert `column LIKE literal` where P is a constant prefix of the literal
1729/// to a range check on the column: `P <= column && column < P'`, where P' is the
1730/// lowest string after all P* strings.
1731fn build_like_match(
1732    expr_builder: &mut PruningExpressionBuilder,
1733) -> Option<Arc<dyn PhysicalExpr>> {
1734    // column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
1735    // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1736    // column LIKE '%foo' => min <= '' && '' <= max => true
1737    // column LIKE '%foo%' => min <= '' && '' <= max => true
1738    // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1739
1740    // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1741    //  this may involve building the physical expressions that call lower() and upper()
1742    let min_column_expr = expr_builder.min_column_expr().ok()?;
1743    let max_column_expr = expr_builder.max_column_expr().ok()?;
1744    let scalar_expr = expr_builder.scalar_expr();
1745    // check that the scalar is a string literal
1746    let s = extract_string_literal(scalar_expr)?;
1747    // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1748    let first_wildcard_index = s.find(['%', '_']);
1749    if first_wildcard_index == Some(0) {
1750        // there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
1751        return None;
1752    }
1753    let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1754        let prefix = &s[..wildcard_index];
1755        let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1756            prefix.to_string(),
1757        ))));
1758        let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1759            increment_utf8(prefix)?,
1760        ))));
1761        (lower_bound_lit, upper_bound_lit)
1762    } else {
1763        // the like expression is a literal and can be converted into a comparison
1764        let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1765            s.to_string(),
1766        ))));
1767        (Arc::clone(&bound), bound)
1768    };
1769    let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1770        lower_bound,
1771        Operator::LtEq,
1772        Arc::clone(&max_column_expr),
1773    ));
1774    let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1775        Arc::clone(&min_column_expr),
1776        Operator::LtEq,
1777        upper_bound,
1778    ));
1779    let combined = Arc::new(phys_expr::BinaryExpr::new(
1780        upper_bound_expr,
1781        Operator::And,
1782        lower_bound_expr,
1783    ));
1784    Some(combined)
1785}
1786
1787// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1788//
1789// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1790// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1791// looking for rows that don't begin with `const_prefix` can never be true)
1792fn build_not_like_match(
1793    expr_builder: &mut PruningExpressionBuilder<'_>,
1794) -> Result<Arc<dyn PhysicalExpr>> {
1795    // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1796
1797    let min_column_expr = expr_builder.min_column_expr()?;
1798    let max_column_expr = expr_builder.max_column_expr()?;
1799
1800    let scalar_expr = expr_builder.scalar_expr();
1801
1802    let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1803        plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1804    })?;
1805
1806    let (const_prefix, remaining) = split_constant_prefix(pattern);
1807    if const_prefix.is_empty() || remaining != "%" {
1808        // we can not handle `%` at the beginning or in the middle of the pattern
1809        // Example: For pattern "foo%bar", the row group might include values like
1810        // ["foobar", "food", "foodbar"], making it unsafe to prune.
1811        // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1812        // match the pattern, intermediate values like "food" may not
1813        // match the full pattern "foo%bar", making pruning unsafe.
1814        // (truncate foo%bar to foo% have same problem)
1815
1816        // we can not handle pattern containing `_`
1817        // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1818        // which means not every row is guaranteed to match the pattern.
1819        return Err(plan_datafusion_err!(
1820            "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1821        ));
1822    }
1823
1824    let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1825        true,
1826        false,
1827        Arc::clone(&min_column_expr),
1828        Arc::clone(scalar_expr),
1829    ));
1830
1831    let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1832        true,
1833        false,
1834        Arc::clone(&max_column_expr),
1835        Arc::clone(scalar_expr),
1836    ));
1837
1838    Ok(Arc::new(phys_expr::BinaryExpr::new(
1839        min_col_not_like_epxr,
1840        Operator::Or,
1841        max_col_not_like_expr,
1842    )))
1843}
1844
1845/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1846fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1847    let char_indices = pattern.char_indices().collect::<Vec<_>>();
1848    for i in 0..char_indices.len() {
1849        let (idx, char) = char_indices[i];
1850        if char == '%' || char == '_' {
1851            if i != 0 && char_indices[i - 1].1 == '\\' {
1852                // ecsaped by `\`
1853                continue;
1854            }
1855            return (&pattern[..idx], &pattern[idx..]);
1856        }
1857    }
1858    (pattern, "")
1859}
1860
1861/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1862/// This makes it so that the returned string will always compare greater than the input string
1863/// or any other string with the same prefix.
1864/// This is necessary since the statistics may have been truncated: if we have a min statistic
1865/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1866/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1867/// In this example `increment_utf8("foo") == "fop"
1868fn increment_utf8(data: &str) -> Option<String> {
1869    // Helper function to check if a character is valid to use
1870    fn is_valid_unicode(c: char) -> bool {
1871        let cp = c as u32;
1872
1873        // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1874        if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1875            return false;
1876        }
1877
1878        // Filter out private use area
1879        if cp >= 0x110000 {
1880            return false;
1881        }
1882
1883        true
1884    }
1885
1886    // Convert string to vector of code points
1887    let mut code_points: Vec<char> = data.chars().collect();
1888
1889    // Work backwards through code points
1890    for idx in (0..code_points.len()).rev() {
1891        let original = code_points[idx] as u32;
1892
1893        // Try incrementing the code point
1894        if let Some(next_char) = char::from_u32(original + 1)
1895            && is_valid_unicode(next_char)
1896        {
1897            code_points[idx] = next_char;
1898            // truncate the string to the current index
1899            code_points.truncate(idx + 1);
1900            return Some(code_points.into_iter().collect());
1901        }
1902    }
1903
1904    None
1905}
1906
1907/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
1908///
1909/// This is important not only as an optimization but also because statistics may not be
1910/// accurate for columns that are all nulls.
1911/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
1912/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
1913///
1914/// For example:
1915///
1916/// `x_min <= 10 AND 10 <= x_max`
1917///
1918/// will become
1919///
1920/// ```sql
1921/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
1922/// ````
1923///
1924/// If the column is known to be all nulls, then the expression
1925/// `x_null_count = x_row_count` will be true, which will cause the
1926/// boolean expression to return false. Therefore, prune out the container.
1927fn wrap_null_count_check_expr(
1928    statistics_expr: Arc<dyn PhysicalExpr>,
1929    expr_builder: &mut PruningExpressionBuilder,
1930) -> Result<Arc<dyn PhysicalExpr>> {
1931    // x_null_count != x_row_count
1932    let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1933        expr_builder.null_count_column_expr()?,
1934        Operator::NotEq,
1935        expr_builder.row_count_column_expr()?,
1936    ));
1937
1938    // (x_null_count != x_row_count) AND (<statistics_expr>)
1939    Ok(Arc::new(phys_expr::BinaryExpr::new(
1940        not_when_null_count_eq_row_count,
1941        Operator::And,
1942        statistics_expr,
1943    )))
1944}
1945
1946#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1947pub(crate) enum StatisticsType {
1948    Min,
1949    Max,
1950    NullCount,
1951    RowCount,
1952}
1953
1954#[cfg(test)]
1955mod tests {
1956    use std::collections::HashMap;
1957    use std::ops::{Not, Rem};
1958
1959    use super::*;
1960    use datafusion_common::test_util::batches_to_string;
1961    use datafusion_expr::{and, col, lit, or};
1962    use datafusion_physical_expr::utils::collect_columns;
1963    use insta::assert_snapshot;
1964
1965    use arrow::array::Decimal128Array;
1966    use arrow::{
1967        array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1968        datatypes::TimeUnit,
1969    };
1970    use datafusion_expr::expr::InList;
1971    use datafusion_expr::{Expr, cast, is_null, try_cast};
1972    use datafusion_functions_nested::expr_fn::{array_has, make_array};
1973    use datafusion_physical_expr::expressions::{
1974        self as phys_expr, DynamicFilterPhysicalExpr,
1975    };
1976    use datafusion_physical_expr::planner::logical2physical;
1977    use itertools::Itertools;
1978
1979    #[derive(Debug, Default)]
1980    /// Mock statistic provider for tests
1981    ///
1982    /// Each row represents the statistics for a "container" (which
1983    /// might represent an entire parquet file, or directory of files,
1984    /// or some other collection of data for which we had statistics)
1985    ///
1986    /// Note All `ArrayRefs` must be the same size.
1987    struct ContainerStats {
1988        min: Option<ArrayRef>,
1989        max: Option<ArrayRef>,
1990        /// Optional values
1991        null_counts: Option<ArrayRef>,
1992        row_counts: Option<ArrayRef>,
1993        /// Optional known values (e.g. mimic a bloom filter)
1994        /// (value, contained)
1995        /// If present, all BooleanArrays must be the same size as min/max
1996        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1997    }
1998
1999    impl ContainerStats {
2000        fn new() -> Self {
2001            Default::default()
2002        }
2003        fn new_decimal128(
2004            min: impl IntoIterator<Item = Option<i128>>,
2005            max: impl IntoIterator<Item = Option<i128>>,
2006            precision: u8,
2007            scale: i8,
2008        ) -> Self {
2009            Self::new()
2010                .with_min(Arc::new(
2011                    min.into_iter()
2012                        .collect::<Decimal128Array>()
2013                        .with_precision_and_scale(precision, scale)
2014                        .unwrap(),
2015                ))
2016                .with_max(Arc::new(
2017                    max.into_iter()
2018                        .collect::<Decimal128Array>()
2019                        .with_precision_and_scale(precision, scale)
2020                        .unwrap(),
2021                ))
2022        }
2023
2024        fn new_i64(
2025            min: impl IntoIterator<Item = Option<i64>>,
2026            max: impl IntoIterator<Item = Option<i64>>,
2027        ) -> Self {
2028            Self::new()
2029                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
2030                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
2031        }
2032
2033        fn new_i32(
2034            min: impl IntoIterator<Item = Option<i32>>,
2035            max: impl IntoIterator<Item = Option<i32>>,
2036        ) -> Self {
2037            Self::new()
2038                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
2039                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
2040        }
2041
2042        fn new_utf8<'a>(
2043            min: impl IntoIterator<Item = Option<&'a str>>,
2044            max: impl IntoIterator<Item = Option<&'a str>>,
2045        ) -> Self {
2046            Self::new()
2047                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
2048                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
2049        }
2050
2051        fn new_bool(
2052            min: impl IntoIterator<Item = Option<bool>>,
2053            max: impl IntoIterator<Item = Option<bool>>,
2054        ) -> Self {
2055            Self::new()
2056                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
2057                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
2058        }
2059
2060        fn min(&self) -> Option<ArrayRef> {
2061            self.min.clone()
2062        }
2063
2064        fn max(&self) -> Option<ArrayRef> {
2065            self.max.clone()
2066        }
2067
2068        fn null_counts(&self) -> Option<ArrayRef> {
2069            self.null_counts.clone()
2070        }
2071
2072        fn row_counts(&self) -> Option<ArrayRef> {
2073            self.row_counts.clone()
2074        }
2075
2076        /// return an iterator over all arrays in this statistics
2077        fn arrays(&self) -> Vec<ArrayRef> {
2078            let contained_arrays = self
2079                .contained
2080                .iter()
2081                .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2082
2083            [
2084                self.min.as_ref().cloned(),
2085                self.max.as_ref().cloned(),
2086                self.null_counts.as_ref().cloned(),
2087                self.row_counts.as_ref().cloned(),
2088            ]
2089            .into_iter()
2090            .flatten()
2091            .chain(contained_arrays)
2092            .collect()
2093        }
2094
2095        /// Returns the number of containers represented by this statistics This
2096        /// picks the length of the first array as all arrays must have the same
2097        /// length (which is verified by `assert_invariants`).
2098        fn len(&self) -> usize {
2099            // pick the first non zero length
2100            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2101        }
2102
2103        /// Ensure that the lengths of all arrays are consistent
2104        fn assert_invariants(&self) {
2105            let mut prev_len = None;
2106
2107            for len in self.arrays().iter().map(|a| a.len()) {
2108                // Get a length, if we don't already have one
2109                match prev_len {
2110                    None => {
2111                        prev_len = Some(len);
2112                    }
2113                    Some(prev_len) => {
2114                        assert_eq!(prev_len, len);
2115                    }
2116                }
2117            }
2118        }
2119
2120        /// Add min values
2121        fn with_min(mut self, min: ArrayRef) -> Self {
2122            self.min = Some(min);
2123            self
2124        }
2125
2126        /// Add max values
2127        fn with_max(mut self, max: ArrayRef) -> Self {
2128            self.max = Some(max);
2129            self
2130        }
2131
2132        /// Add null counts. There must be the same number of null counts as
2133        /// there are containers
2134        fn with_null_counts(
2135            mut self,
2136            counts: impl IntoIterator<Item = Option<u64>>,
2137        ) -> Self {
2138            let null_counts: ArrayRef =
2139                Arc::new(counts.into_iter().collect::<UInt64Array>());
2140
2141            self.assert_invariants();
2142            self.null_counts = Some(null_counts);
2143            self
2144        }
2145
2146        /// Add row counts. There must be the same number of row counts as
2147        /// there are containers
2148        fn with_row_counts(
2149            mut self,
2150            counts: impl IntoIterator<Item = Option<u64>>,
2151        ) -> Self {
2152            let row_counts: ArrayRef =
2153                Arc::new(counts.into_iter().collect::<UInt64Array>());
2154
2155            self.assert_invariants();
2156            self.row_counts = Some(row_counts);
2157            self
2158        }
2159
2160        /// Add contained information.
2161        pub fn with_contained(
2162            mut self,
2163            values: impl IntoIterator<Item = ScalarValue>,
2164            contained: impl IntoIterator<Item = Option<bool>>,
2165        ) -> Self {
2166            let contained: BooleanArray = contained.into_iter().collect();
2167            let values: HashSet<_> = values.into_iter().collect();
2168
2169            self.contained.push((values, contained));
2170            self.assert_invariants();
2171            self
2172        }
2173
2174        /// get any contained information for the specified values
2175        fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2176            // find the one with the matching values
2177            self.contained
2178                .iter()
2179                .find(|(values, _contained)| values == find_values)
2180                .map(|(_values, contained)| contained.clone())
2181        }
2182    }
2183
2184    #[derive(Debug, Default)]
2185    struct TestStatistics {
2186        // key: column name
2187        stats: HashMap<Column, ContainerStats>,
2188    }
2189
2190    impl TestStatistics {
2191        fn new() -> Self {
2192            Self::default()
2193        }
2194
2195        fn with(
2196            mut self,
2197            name: impl Into<String>,
2198            container_stats: ContainerStats,
2199        ) -> Self {
2200            let col = Column::from_name(name.into());
2201            self.stats.insert(col, container_stats);
2202            self
2203        }
2204
2205        /// Add null counts for the specified column.
2206        /// There must be the same number of null counts as
2207        /// there are containers
2208        fn with_null_counts(
2209            mut self,
2210            name: impl Into<String>,
2211            counts: impl IntoIterator<Item = Option<u64>>,
2212        ) -> Self {
2213            let col = Column::from_name(name.into());
2214
2215            // take stats out and update them
2216            let container_stats = self
2217                .stats
2218                .remove(&col)
2219                .unwrap_or_default()
2220                .with_null_counts(counts);
2221
2222            // put stats back in
2223            self.stats.insert(col, container_stats);
2224            self
2225        }
2226
2227        /// Add row counts for the specified column.
2228        /// There must be the same number of row counts as
2229        /// there are containers
2230        fn with_row_counts(
2231            mut self,
2232            name: impl Into<String>,
2233            counts: impl IntoIterator<Item = Option<u64>>,
2234        ) -> Self {
2235            let col = Column::from_name(name.into());
2236
2237            // take stats out and update them
2238            let container_stats = self
2239                .stats
2240                .remove(&col)
2241                .unwrap_or_default()
2242                .with_row_counts(counts);
2243
2244            // put stats back in
2245            self.stats.insert(col, container_stats);
2246            self
2247        }
2248
2249        /// Add contained information for the specified column.
2250        fn with_contained(
2251            mut self,
2252            name: impl Into<String>,
2253            values: impl IntoIterator<Item = ScalarValue>,
2254            contained: impl IntoIterator<Item = Option<bool>>,
2255        ) -> Self {
2256            let col = Column::from_name(name.into());
2257
2258            // take stats out and update them
2259            let container_stats = self
2260                .stats
2261                .remove(&col)
2262                .unwrap_or_default()
2263                .with_contained(values, contained);
2264
2265            // put stats back in
2266            self.stats.insert(col, container_stats);
2267            self
2268        }
2269    }
2270
2271    impl PruningStatistics for TestStatistics {
2272        fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2273            self.stats
2274                .get(column)
2275                .map(|container_stats| container_stats.min())
2276                .unwrap_or(None)
2277        }
2278
2279        fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2280            self.stats
2281                .get(column)
2282                .map(|container_stats| container_stats.max())
2283                .unwrap_or(None)
2284        }
2285
2286        fn num_containers(&self) -> usize {
2287            self.stats
2288                .values()
2289                .next()
2290                .map(|container_stats| container_stats.len())
2291                .unwrap_or(0)
2292        }
2293
2294        fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2295            self.stats
2296                .get(column)
2297                .map(|container_stats| container_stats.null_counts())
2298                .unwrap_or(None)
2299        }
2300
2301        fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2302            self.stats
2303                .get(column)
2304                .map(|container_stats| container_stats.row_counts())
2305                .unwrap_or(None)
2306        }
2307
2308        fn contained(
2309            &self,
2310            column: &Column,
2311            values: &HashSet<ScalarValue>,
2312        ) -> Option<BooleanArray> {
2313            self.stats
2314                .get(column)
2315                .and_then(|container_stats| container_stats.contained(values))
2316        }
2317    }
2318
2319    /// Returns the specified min/max container values
2320    struct OneContainerStats {
2321        min_values: Option<ArrayRef>,
2322        max_values: Option<ArrayRef>,
2323        num_containers: usize,
2324    }
2325
2326    impl PruningStatistics for OneContainerStats {
2327        fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2328            self.min_values.clone()
2329        }
2330
2331        fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2332            self.max_values.clone()
2333        }
2334
2335        fn num_containers(&self) -> usize {
2336            self.num_containers
2337        }
2338
2339        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2340            None
2341        }
2342
2343        fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2344            None
2345        }
2346
2347        fn contained(
2348            &self,
2349            _column: &Column,
2350            _values: &HashSet<ScalarValue>,
2351        ) -> Option<BooleanArray> {
2352            None
2353        }
2354    }
2355
2356    /// Row count should only be referenced once in the pruning expression, even if we need the row count
2357    /// for multiple columns.
2358    #[test]
2359    fn test_unique_row_count_field_and_column() {
2360        // c1 = 100 AND c2 = 200
2361        let schema: SchemaRef = Arc::new(Schema::new(vec![
2362            Field::new("c1", DataType::Int32, true),
2363            Field::new("c2", DataType::Int32, true),
2364        ]));
2365        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2366        let expr = logical2physical(&expr, &schema);
2367        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2368        // note pruning expression refers to row_count twice
2369        assert_eq!(
2370            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2371            p.predicate_expr.to_string()
2372        );
2373
2374        // Fields in required schema should be unique, otherwise when creating batches
2375        // it will fail because of duplicate field names
2376        let mut fields = HashSet::new();
2377        for (_col, _ty, field) in p.required_columns().iter() {
2378            let was_new = fields.insert(field);
2379            if !was_new {
2380                panic!(
2381                    "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2382                );
2383            }
2384        }
2385    }
2386
2387    #[test]
2388    fn prune_all_rows_null_counts() {
2389        // if null_count = row_count then we should prune the container for i = 0
2390        // regardless of the statistics
2391        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2392        let statistics = TestStatistics::new().with(
2393            "i",
2394            ContainerStats::new_i32(
2395                vec![Some(0)], // min
2396                vec![Some(0)], // max
2397            )
2398            .with_null_counts(vec![Some(1)])
2399            .with_row_counts(vec![Some(1)]),
2400        );
2401        let expected_ret = &[false];
2402        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2403
2404        // this should be true even if the container stats are missing
2405        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2406        let container_stats = ContainerStats {
2407            min: Some(Arc::new(Int32Array::from(vec![None]))),
2408            max: Some(Arc::new(Int32Array::from(vec![None]))),
2409            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2410            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2411            ..ContainerStats::default()
2412        };
2413        let statistics = TestStatistics::new().with("i", container_stats);
2414        let expected_ret = &[false];
2415        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2416
2417        // If the null counts themselves are missing we should be able to fall back to the stats
2418        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2419        let container_stats = ContainerStats {
2420            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2421            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2422            null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2423            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2424            ..ContainerStats::default()
2425        };
2426        let statistics = TestStatistics::new().with("i", container_stats);
2427        let expected_ret = &[true];
2428        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2429        let expected_ret = &[false];
2430        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2431
2432        // Same for the row counts
2433        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2434        let container_stats = ContainerStats {
2435            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2436            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2437            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2438            row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2439            ..ContainerStats::default()
2440        };
2441        let statistics = TestStatistics::new().with("i", container_stats);
2442        let expected_ret = &[true];
2443        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2444        let expected_ret = &[false];
2445        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2446    }
2447
2448    #[test]
2449    fn prune_missing_statistics() {
2450        // If the min or max stats are missing we should not prune
2451        // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2452        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2453        let container_stats = ContainerStats {
2454            min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2455            max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2456            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2457            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2458            ..ContainerStats::default()
2459        };
2460        let statistics = TestStatistics::new().with("i", container_stats);
2461        let expected_ret = &[true, true];
2462        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2463        let expected_ret = &[false, true];
2464        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2465        let expected_ret = &[true, false];
2466        prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2467    }
2468
2469    #[test]
2470    fn prune_null_stats() {
2471        // if null_count = row_count then we should prune the container for i = 0
2472        // regardless of the statistics
2473        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2474
2475        let statistics = TestStatistics::new().with(
2476            "i",
2477            ContainerStats::new_i32(
2478                vec![Some(0)], // min
2479                vec![Some(0)], // max
2480            )
2481            .with_null_counts(vec![Some(1)])
2482            .with_row_counts(vec![Some(1)]),
2483        );
2484
2485        let expected_ret = &[false];
2486
2487        // i = 0
2488        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2489    }
2490
2491    #[test]
2492    fn test_build_statistics_record_batch() {
2493        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2494        let required_columns = RequiredColumns::from(vec![
2495            // min of original column s1, named s1_min
2496            (
2497                phys_expr::Column::new("s1", 1),
2498                StatisticsType::Min,
2499                Field::new("s1_min", DataType::Int32, true),
2500            ),
2501            // max of original column s2, named s2_max
2502            (
2503                phys_expr::Column::new("s2", 2),
2504                StatisticsType::Max,
2505                Field::new("s2_max", DataType::Int32, true),
2506            ),
2507            // max of original column s3, named s3_max
2508            (
2509                phys_expr::Column::new("s3", 3),
2510                StatisticsType::Max,
2511                Field::new("s3_max", DataType::Utf8, true),
2512            ),
2513            // min of original column s3, named s3_min
2514            (
2515                phys_expr::Column::new("s3", 3),
2516                StatisticsType::Min,
2517                Field::new("s3_min", DataType::Utf8, true),
2518            ),
2519        ]);
2520
2521        let statistics = TestStatistics::new()
2522            .with(
2523                "s1",
2524                ContainerStats::new_i32(
2525                    vec![None, None, Some(9), None],  // min
2526                    vec![Some(10), None, None, None], // max
2527                ),
2528            )
2529            .with(
2530                "s2",
2531                ContainerStats::new_i32(
2532                    vec![Some(2), None, None, None],  // min
2533                    vec![Some(20), None, None, None], // max
2534                ),
2535            )
2536            .with(
2537                "s3",
2538                ContainerStats::new_utf8(
2539                    vec![Some("a"), None, None, None],      // min
2540                    vec![Some("q"), None, Some("r"), None], // max
2541                ),
2542            );
2543
2544        let batch =
2545            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2546        assert_snapshot!(batches_to_string(&[batch]), @r"
2547        +--------+--------+--------+--------+
2548        | s1_min | s2_max | s3_max | s3_min |
2549        +--------+--------+--------+--------+
2550        |        | 20     | q      | a      |
2551        |        |        |        |        |
2552        | 9      |        | r      |        |
2553        |        |        |        |        |
2554        +--------+--------+--------+--------+
2555        ");
2556    }
2557
2558    #[test]
2559    fn test_build_statistics_casting() {
2560        // Test requesting a Timestamp column, but getting statistics as Int64
2561        // which is what Parquet does
2562
2563        // Request a record batch with of s1_min as a timestamp
2564        let required_columns = RequiredColumns::from(vec![(
2565            phys_expr::Column::new("s3", 3),
2566            StatisticsType::Min,
2567            Field::new(
2568                "s1_min",
2569                DataType::Timestamp(TimeUnit::Nanosecond, None),
2570                true,
2571            ),
2572        )]);
2573
2574        // Note the statistics pass back i64 (not timestamp)
2575        let statistics = OneContainerStats {
2576            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2577            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2578            num_containers: 1,
2579        };
2580
2581        let batch =
2582            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2583
2584        assert_snapshot!(batches_to_string(&[batch]), @r"
2585        +-------------------------------+
2586        | s1_min                        |
2587        +-------------------------------+
2588        | 1970-01-01T00:00:00.000000010 |
2589        +-------------------------------+
2590        ");
2591    }
2592
2593    #[test]
2594    fn test_build_statistics_no_required_stats() {
2595        let required_columns = RequiredColumns::new();
2596
2597        let statistics = OneContainerStats {
2598            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2599            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2600            num_containers: 1,
2601        };
2602
2603        let batch =
2604            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2605        assert_eq!(batch.num_rows(), 1); // had 1 container
2606    }
2607
2608    #[test]
2609    fn test_build_statistics_inconsistent_types() {
2610        // Test requesting a Utf8 column when the stats return some other type
2611
2612        // Request a record batch with of s1_min as a timestamp
2613        let required_columns = RequiredColumns::from(vec![(
2614            phys_expr::Column::new("s3", 3),
2615            StatisticsType::Min,
2616            Field::new("s1_min", DataType::Utf8, true),
2617        )]);
2618
2619        // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2620        let statistics = OneContainerStats {
2621            min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2622            max_values: None,
2623            num_containers: 1,
2624        };
2625
2626        let batch =
2627            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2628        assert_snapshot!(batches_to_string(&[batch]), @r"
2629        +--------+
2630        | s1_min |
2631        +--------+
2632        |        |
2633        +--------+
2634        ");
2635    }
2636
2637    #[test]
2638    fn test_build_statistics_inconsistent_length() {
2639        // return an inconsistent length to the actual statistics arrays
2640        let required_columns = RequiredColumns::from(vec![(
2641            phys_expr::Column::new("s1", 3),
2642            StatisticsType::Min,
2643            Field::new("s1_min", DataType::Int64, true),
2644        )]);
2645
2646        // Note the statistics pass back i64 (not timestamp)
2647        let statistics = OneContainerStats {
2648            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2649            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2650            num_containers: 3,
2651        };
2652
2653        let result =
2654            build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2655        assert!(
2656            result
2657                .to_string()
2658                .contains("mismatched statistics length. Expected 3, got 1"),
2659            "{}",
2660            result
2661        );
2662    }
2663
2664    #[test]
2665    fn row_group_predicate_eq() -> Result<()> {
2666        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2667        let expected_expr =
2668            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2669
2670        // test column on the left
2671        let expr = col("c1").eq(lit(1));
2672        let predicate_expr =
2673            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2674        assert_eq!(predicate_expr.to_string(), expected_expr);
2675
2676        // test column on the right
2677        let expr = lit(1).eq(col("c1"));
2678        let predicate_expr =
2679            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2680        assert_eq!(predicate_expr.to_string(), expected_expr);
2681
2682        Ok(())
2683    }
2684
2685    #[test]
2686    fn row_group_predicate_not_eq() -> Result<()> {
2687        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2688        let expected_expr =
2689            "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2690
2691        // test column on the left
2692        let expr = col("c1").not_eq(lit(1));
2693        let predicate_expr =
2694            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2695        assert_eq!(predicate_expr.to_string(), expected_expr);
2696
2697        // test column on the right
2698        let expr = lit(1).not_eq(col("c1"));
2699        let predicate_expr =
2700            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2701        assert_eq!(predicate_expr.to_string(), expected_expr);
2702
2703        Ok(())
2704    }
2705
2706    #[test]
2707    fn row_group_predicate_gt() -> Result<()> {
2708        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2709        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2710
2711        // test column on the left
2712        let expr = col("c1").gt(lit(1));
2713        let predicate_expr =
2714            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2715        assert_eq!(predicate_expr.to_string(), expected_expr);
2716
2717        // test column on the right
2718        let expr = lit(1).lt(col("c1"));
2719        let predicate_expr =
2720            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2721        assert_eq!(predicate_expr.to_string(), expected_expr);
2722
2723        Ok(())
2724    }
2725
2726    #[test]
2727    fn row_group_predicate_gt_eq() -> Result<()> {
2728        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2729        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2730
2731        // test column on the left
2732        let expr = col("c1").gt_eq(lit(1));
2733        let predicate_expr =
2734            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2735        assert_eq!(predicate_expr.to_string(), expected_expr);
2736        // test column on the right
2737        let expr = lit(1).lt_eq(col("c1"));
2738        let predicate_expr =
2739            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2740        assert_eq!(predicate_expr.to_string(), expected_expr);
2741
2742        Ok(())
2743    }
2744
2745    #[test]
2746    fn row_group_predicate_lt() -> Result<()> {
2747        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2748        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2749
2750        // test column on the left
2751        let expr = col("c1").lt(lit(1));
2752        let predicate_expr =
2753            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2754        assert_eq!(predicate_expr.to_string(), expected_expr);
2755
2756        // test column on the right
2757        let expr = lit(1).gt(col("c1"));
2758        let predicate_expr =
2759            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2760        assert_eq!(predicate_expr.to_string(), expected_expr);
2761
2762        Ok(())
2763    }
2764
2765    #[test]
2766    fn row_group_predicate_lt_eq() -> Result<()> {
2767        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2768        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2769
2770        // test column on the left
2771        let expr = col("c1").lt_eq(lit(1));
2772        let predicate_expr =
2773            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2774        assert_eq!(predicate_expr.to_string(), expected_expr);
2775        // test column on the right
2776        let expr = lit(1).gt_eq(col("c1"));
2777        let predicate_expr =
2778            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2779        assert_eq!(predicate_expr.to_string(), expected_expr);
2780
2781        Ok(())
2782    }
2783
2784    #[test]
2785    fn row_group_predicate_and() -> Result<()> {
2786        let schema = Schema::new(vec![
2787            Field::new("c1", DataType::Int32, false),
2788            Field::new("c2", DataType::Int32, false),
2789            Field::new("c3", DataType::Int32, false),
2790        ]);
2791        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2792        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2793        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2794        let predicate_expr =
2795            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2796        assert_eq!(predicate_expr.to_string(), expected_expr);
2797
2798        Ok(())
2799    }
2800
2801    #[test]
2802    fn row_group_predicate_or() -> Result<()> {
2803        let schema = Schema::new(vec![
2804            Field::new("c1", DataType::Int32, false),
2805            Field::new("c2", DataType::Int32, false),
2806        ]);
2807        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2808        let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2809        let expected_expr = "true";
2810        let predicate_expr =
2811            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2812        assert_eq!(predicate_expr.to_string(), expected_expr);
2813
2814        Ok(())
2815    }
2816
2817    #[test]
2818    fn row_group_predicate_not() -> Result<()> {
2819        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2820        let expected_expr = "true";
2821
2822        let expr = col("c1").not();
2823        let predicate_expr =
2824            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2825        assert_eq!(predicate_expr.to_string(), expected_expr);
2826
2827        Ok(())
2828    }
2829
2830    #[test]
2831    fn row_group_predicate_not_bool() -> Result<()> {
2832        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2833        let expected_expr = "NOT c1_min@0 AND c1_max@1";
2834
2835        let expr = col("c1").not();
2836        let predicate_expr =
2837            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2838        assert_eq!(predicate_expr.to_string(), expected_expr);
2839
2840        Ok(())
2841    }
2842
2843    #[test]
2844    fn row_group_predicate_bool() -> Result<()> {
2845        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2846        let expected_expr = "c1_min@0 OR c1_max@1";
2847
2848        let expr = col("c1");
2849        let predicate_expr =
2850            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2851        assert_eq!(predicate_expr.to_string(), expected_expr);
2852
2853        Ok(())
2854    }
2855
2856    /// Test that non-boolean literal expressions don't prune any containers and error gracefully by not pruning anything instead of e.g. panicking
2857    #[test]
2858    fn row_group_predicate_non_boolean() {
2859        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2860        let statistics = TestStatistics::new()
2861            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2862        let expected_ret = &[true];
2863        prune_with_expr(lit(1), &schema, &statistics, expected_ret);
2864    }
2865
2866    // Test that literal-to-literal comparisons are correctly evaluated.
2867    // When both sides are constants, the expression should be evaluated directly
2868    // and if it's false, all containers should be pruned.
2869    #[test]
2870    fn row_group_predicate_literal_false() {
2871        // lit(1) = lit(2) is always false, so all containers should be pruned
2872        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2873        let statistics = TestStatistics::new()
2874            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2875        let expected_ret = &[false];
2876        prune_with_simplified_expr(lit(1).eq(lit(2)), &schema, &statistics, expected_ret);
2877    }
2878
2879    /// Test nested/complex literal expression trees.
2880    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2881    #[test]
2882    fn row_group_predicate_literal_true() {
2883        // lit(1) = lit(1) is always true, so no containers should be pruned
2884        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2885        let statistics = TestStatistics::new()
2886            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2887        let expected_ret = &[true];
2888        prune_with_simplified_expr(lit(1).eq(lit(1)), &schema, &statistics, expected_ret);
2889    }
2890
2891    /// Test nested/complex literal expression trees.
2892    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2893    #[test]
2894    fn row_group_predicate_literal_null() {
2895        // lit(1) = null is always null, so no containers should be pruned
2896        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2897        let statistics = TestStatistics::new()
2898            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2899        let expected_ret = &[true];
2900        prune_with_simplified_expr(
2901            lit(1).eq(lit(ScalarValue::Null)),
2902            &schema,
2903            &statistics,
2904            expected_ret,
2905        );
2906    }
2907
2908    /// Test nested/complex literal expression trees.
2909    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2910    #[test]
2911    fn row_group_predicate_complex_literals() {
2912        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2913        let statistics = TestStatistics::new()
2914            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2915
2916        // (1 + 2) > 0 is always true
2917        prune_with_simplified_expr(
2918            (lit(1) + lit(2)).gt(lit(0)),
2919            &schema,
2920            &statistics,
2921            &[true],
2922        );
2923
2924        // (1 + 2) < 0 is always false
2925        prune_with_simplified_expr(
2926            (lit(1) + lit(2)).lt(lit(0)),
2927            &schema,
2928            &statistics,
2929            &[false],
2930        );
2931
2932        // Nested AND of literals: true AND false = false
2933        prune_with_simplified_expr(
2934            lit(true).and(lit(false)),
2935            &schema,
2936            &statistics,
2937            &[false],
2938        );
2939
2940        // Nested OR of literals: true OR false = true
2941        prune_with_simplified_expr(
2942            lit(true).or(lit(false)),
2943            &schema,
2944            &statistics,
2945            &[true],
2946        );
2947
2948        // Complex nested: (1 < 2) AND (3 > 1) = true AND true = true
2949        prune_with_simplified_expr(
2950            lit(1).lt(lit(2)).and(lit(3).gt(lit(1))),
2951            &schema,
2952            &statistics,
2953            &[true],
2954        );
2955
2956        // Complex nested: (1 > 2) OR (3 < 1) = false OR false = false
2957        prune_with_simplified_expr(
2958            lit(1).gt(lit(2)).or(lit(3).lt(lit(1))),
2959            &schema,
2960            &statistics,
2961            &[false],
2962        );
2963    }
2964
2965    /// Integration test demonstrating that a dynamic filter with replaced children as literals will be snapshotted, simplified and then pruned correctly.
2966    #[test]
2967    fn row_group_predicate_dynamic_filter_with_literals() {
2968        let schema = Arc::new(Schema::new(vec![
2969            Field::new("c1", DataType::Int32, true),
2970            Field::new("part", DataType::Utf8, true),
2971        ]));
2972        let statistics = TestStatistics::new()
2973            // Note that we have no stats, pruning can only happen via partition value pruning from the dynamic filter
2974            .with_row_counts("c1", vec![Some(10)]);
2975        let dynamic_filter_expr = col("c1").gt(lit(5)).and(col("part").eq(lit("B")));
2976        let phys_expr = logical2physical(&dynamic_filter_expr, &schema);
2977        let children = collect_columns(&phys_expr)
2978            .iter()
2979            .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
2980            .collect_vec();
2981        let dynamic_phys_expr =
2982            Arc::new(DynamicFilterPhysicalExpr::new(children, phys_expr))
2983                as Arc<dyn PhysicalExpr>;
2984        // Simulate the partition value substitution that would happen in ParquetOpener
2985        let remapped_expr = dynamic_phys_expr
2986            .children()
2987            .into_iter()
2988            .map(|child_expr| {
2989                let Some(col_expr) =
2990                    child_expr.as_any().downcast_ref::<phys_expr::Column>()
2991                else {
2992                    return Arc::clone(child_expr);
2993                };
2994                if col_expr.name() == "part" {
2995                    // simulate dynamic filter replacement with literal "A"
2996                    Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
2997                        "A".to_string(),
2998                    )))) as Arc<dyn PhysicalExpr>
2999                } else {
3000                    Arc::clone(child_expr)
3001                }
3002            })
3003            .collect_vec();
3004        let dynamic_filter_expr =
3005            dynamic_phys_expr.with_new_children(remapped_expr).unwrap();
3006        // After substitution the expression is c1 > 5 AND part = "B" which should prune the file since the partition value is "A"
3007        let expected = &[false];
3008        let p =
3009            PruningPredicate::try_new(dynamic_filter_expr, Arc::clone(&schema)).unwrap();
3010        let result = p.prune(&statistics).unwrap();
3011        assert_eq!(result, expected);
3012    }
3013
3014    #[test]
3015    fn row_group_predicate_lt_bool() -> Result<()> {
3016        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
3017        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
3018
3019        // DF doesn't support arithmetic on boolean columns so
3020        // this predicate will error when evaluated
3021        let expr = col("c1").lt(lit(true));
3022        let predicate_expr =
3023            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3024        assert_eq!(predicate_expr.to_string(), expected_expr);
3025
3026        Ok(())
3027    }
3028
3029    #[test]
3030    fn row_group_predicate_required_columns() -> Result<()> {
3031        let schema = Schema::new(vec![
3032            Field::new("c1", DataType::Int32, false),
3033            Field::new("c2", DataType::Int32, false),
3034        ]);
3035        let mut required_columns = RequiredColumns::new();
3036        // c1 < 1 and (c2 = 2 or c2 = 3)
3037        let expr = col("c1")
3038            .lt(lit(1))
3039            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
3040        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
3041        let predicate_expr =
3042            test_build_predicate_expression(&expr, &schema, &mut required_columns);
3043        assert_eq!(predicate_expr.to_string(), expected_expr);
3044        println!("required_columns: {required_columns:#?}"); // for debugging assertions below
3045        // c1 < 1 should add c1_min
3046        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
3047        assert_eq!(
3048            required_columns.columns[0],
3049            (
3050                phys_expr::Column::new("c1", 0),
3051                StatisticsType::Min,
3052                c1_min_field.with_nullable(true) // could be nullable if stats are not present
3053            )
3054        );
3055        // c1 < 1 should add c1_null_count
3056        let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
3057        assert_eq!(
3058            required_columns.columns[1],
3059            (
3060                phys_expr::Column::new("c1", 0),
3061                StatisticsType::NullCount,
3062                c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
3063            )
3064        );
3065        // c1 < 1 should add row_count
3066        let row_count_field = Field::new("row_count", DataType::UInt64, false);
3067        assert_eq!(
3068            required_columns.columns[2],
3069            (
3070                phys_expr::Column::new("c1", 0),
3071                StatisticsType::RowCount,
3072                row_count_field.with_nullable(true) // could be nullable if stats are not present
3073            )
3074        );
3075        // c2 = 2 should add c2_min and c2_max
3076        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
3077        assert_eq!(
3078            required_columns.columns[3],
3079            (
3080                phys_expr::Column::new("c2", 1),
3081                StatisticsType::Min,
3082                c2_min_field.with_nullable(true) // could be nullable if stats are not present
3083            )
3084        );
3085        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
3086        assert_eq!(
3087            required_columns.columns[4],
3088            (
3089                phys_expr::Column::new("c2", 1),
3090                StatisticsType::Max,
3091                c2_max_field.with_nullable(true) // could be nullable if stats are not present
3092            )
3093        );
3094        // c2 = 2 should add c2_null_count
3095        let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
3096        assert_eq!(
3097            required_columns.columns[5],
3098            (
3099                phys_expr::Column::new("c2", 1),
3100                StatisticsType::NullCount,
3101                c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
3102            )
3103        );
3104        // c2 = 1 should add row_count
3105        let row_count_field = Field::new("row_count", DataType::UInt64, false);
3106        assert_eq!(
3107            required_columns.columns[2],
3108            (
3109                phys_expr::Column::new("c1", 0),
3110                StatisticsType::RowCount,
3111                row_count_field.with_nullable(true) // could be nullable if stats are not present
3112            )
3113        );
3114        // c2 = 3 shouldn't add any new statistics fields
3115        assert_eq!(required_columns.columns.len(), 6);
3116
3117        Ok(())
3118    }
3119
3120    #[test]
3121    fn row_group_predicate_in_list() -> Result<()> {
3122        let schema = Schema::new(vec![
3123            Field::new("c1", DataType::Int32, false),
3124            Field::new("c2", DataType::Int32, false),
3125        ]);
3126        // test c1 in(1, 2, 3)
3127        let expr = Expr::InList(InList::new(
3128            Box::new(col("c1")),
3129            vec![lit(1), lit(2), lit(3)],
3130            false,
3131        ));
3132        let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
3133        let predicate_expr =
3134            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3135        assert_eq!(predicate_expr.to_string(), expected_expr);
3136
3137        Ok(())
3138    }
3139
3140    #[test]
3141    fn row_group_predicate_in_list_empty() -> Result<()> {
3142        let schema = Schema::new(vec![
3143            Field::new("c1", DataType::Int32, false),
3144            Field::new("c2", DataType::Int32, false),
3145        ]);
3146        // test c1 in()
3147        let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
3148        let expected_expr = "true";
3149        let predicate_expr =
3150            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3151        assert_eq!(predicate_expr.to_string(), expected_expr);
3152
3153        Ok(())
3154    }
3155
3156    #[test]
3157    fn row_group_predicate_in_list_negated() -> Result<()> {
3158        let schema = Schema::new(vec![
3159            Field::new("c1", DataType::Int32, false),
3160            Field::new("c2", DataType::Int32, false),
3161        ]);
3162        // test c1 not in(1, 2, 3)
3163        let expr = Expr::InList(InList::new(
3164            Box::new(col("c1")),
3165            vec![lit(1), lit(2), lit(3)],
3166            true,
3167        ));
3168        let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
3169        let predicate_expr =
3170            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3171        assert_eq!(predicate_expr.to_string(), expected_expr);
3172
3173        Ok(())
3174    }
3175
3176    #[test]
3177    fn row_group_predicate_between() -> Result<()> {
3178        let schema = Schema::new(vec![
3179            Field::new("c1", DataType::Int32, false),
3180            Field::new("c2", DataType::Int32, false),
3181        ]);
3182
3183        // test c1 BETWEEN 1 AND 5
3184        let expr1 = col("c1").between(lit(1), lit(5));
3185
3186        // test 1 <= c1 <= 5
3187        let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
3188
3189        let predicate_expr1 =
3190            test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
3191
3192        let predicate_expr2 =
3193            test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
3194        assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
3195
3196        Ok(())
3197    }
3198
3199    #[test]
3200    fn row_group_predicate_between_with_in_list() -> Result<()> {
3201        let schema = Schema::new(vec![
3202            Field::new("c1", DataType::Int32, false),
3203            Field::new("c2", DataType::Int32, false),
3204        ]);
3205        // test c1 in(1, 2)
3206        let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
3207
3208        // test c2 BETWEEN 4 AND 5
3209        let expr2 = col("c2").between(lit(4), lit(5));
3210
3211        // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
3212        let expr3 = expr1.and(expr2);
3213
3214        let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
3215        let predicate_expr =
3216            test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
3217        assert_eq!(predicate_expr.to_string(), expected_expr);
3218
3219        Ok(())
3220    }
3221
3222    #[test]
3223    fn row_group_predicate_in_list_to_many_values() -> Result<()> {
3224        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3225        // test c1 in(1..21)
3226        // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
3227        // always true
3228        let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
3229
3230        let expected_expr = "true";
3231        let predicate_expr =
3232            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3233        assert_eq!(predicate_expr.to_string(), expected_expr);
3234
3235        Ok(())
3236    }
3237
3238    #[test]
3239    fn row_group_predicate_cast_int_int() -> Result<()> {
3240        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3241        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3242
3243        // test cast(c1 as int64) = 1
3244        // test column on the left
3245        let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3246        let predicate_expr =
3247            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3248        assert_eq!(predicate_expr.to_string(), expected_expr);
3249
3250        // test column on the right
3251        let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3252        let predicate_expr =
3253            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3254        assert_eq!(predicate_expr.to_string(), expected_expr);
3255
3256        let expected_expr =
3257            "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3258
3259        // test column on the left
3260        let expr =
3261            try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3262        let predicate_expr =
3263            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3264        assert_eq!(predicate_expr.to_string(), expected_expr);
3265
3266        // test column on the right
3267        let expr =
3268            lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3269        let predicate_expr =
3270            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3271        assert_eq!(predicate_expr.to_string(), expected_expr);
3272
3273        Ok(())
3274    }
3275
3276    #[test]
3277    fn row_group_predicate_cast_string_string() -> Result<()> {
3278        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3279        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3280
3281        // test column on the left
3282        let expr = cast(col("c1"), DataType::Utf8)
3283            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3284        let predicate_expr =
3285            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3286        assert_eq!(predicate_expr.to_string(), expected_expr);
3287
3288        // test column on the right
3289        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3290            .eq(cast(col("c1"), DataType::Utf8));
3291        let predicate_expr =
3292            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3293        assert_eq!(predicate_expr.to_string(), expected_expr);
3294
3295        Ok(())
3296    }
3297
3298    #[test]
3299    fn row_group_predicate_cast_string_int() -> Result<()> {
3300        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3301        let expected_expr = "true";
3302
3303        // test column on the left
3304        let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3305        let predicate_expr =
3306            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3307        assert_eq!(predicate_expr.to_string(), expected_expr);
3308
3309        // test column on the right
3310        let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3311        let predicate_expr =
3312            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3313        assert_eq!(predicate_expr.to_string(), expected_expr);
3314
3315        Ok(())
3316    }
3317
3318    #[test]
3319    fn row_group_predicate_cast_int_string() -> Result<()> {
3320        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3321        let expected_expr = "true";
3322
3323        // test column on the left
3324        let expr = cast(col("c1"), DataType::Utf8)
3325            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3326        let predicate_expr =
3327            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3328        assert_eq!(predicate_expr.to_string(), expected_expr);
3329
3330        // test column on the right
3331        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3332            .eq(cast(col("c1"), DataType::Utf8));
3333        let predicate_expr =
3334            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3335        assert_eq!(predicate_expr.to_string(), expected_expr);
3336
3337        Ok(())
3338    }
3339
3340    #[test]
3341    fn row_group_predicate_date_date() -> Result<()> {
3342        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3343        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3344
3345        // test column on the left
3346        let expr =
3347            cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3348        let predicate_expr =
3349            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3350        assert_eq!(predicate_expr.to_string(), expected_expr);
3351
3352        // test column on the right
3353        let expr =
3354            lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3355        let predicate_expr =
3356            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3357        assert_eq!(predicate_expr.to_string(), expected_expr);
3358
3359        Ok(())
3360    }
3361
3362    #[test]
3363    fn row_group_predicate_dict_string_date() -> Result<()> {
3364        // Test with Dictionary<UInt8, Utf8> for the literal
3365        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3366        let expected_expr = "true";
3367
3368        // test column on the left
3369        let expr = cast(
3370            col("c1"),
3371            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3372        )
3373        .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3374        let predicate_expr =
3375            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3376        assert_eq!(predicate_expr.to_string(), expected_expr);
3377
3378        // test column on the right
3379        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3380            col("c1"),
3381            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3382        ));
3383        let predicate_expr =
3384            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3385        assert_eq!(predicate_expr.to_string(), expected_expr);
3386
3387        Ok(())
3388    }
3389
3390    #[test]
3391    fn row_group_predicate_date_dict_string() -> Result<()> {
3392        // Test with Dictionary<UInt8, Utf8> for the column
3393        let schema = Schema::new(vec![Field::new(
3394            "c1",
3395            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3396            false,
3397        )]);
3398        let expected_expr = "true";
3399
3400        // test column on the left
3401        let expr =
3402            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3403        let predicate_expr =
3404            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3405        assert_eq!(predicate_expr.to_string(), expected_expr);
3406
3407        // test column on the right
3408        let expr =
3409            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3410        let predicate_expr =
3411            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3412        assert_eq!(predicate_expr.to_string(), expected_expr);
3413
3414        Ok(())
3415    }
3416
3417    #[test]
3418    fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3419        // Test with Dictionary types that have the same value type but different key types
3420        let schema = Schema::new(vec![Field::new(
3421            "c1",
3422            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3423            false,
3424        )]);
3425
3426        // Direct comparison with no cast
3427        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3428        let predicate_expr =
3429            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3430        let expected_expr =
3431            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3432        assert_eq!(predicate_expr.to_string(), expected_expr);
3433
3434        // Test with column cast to a dictionary with different key type
3435        let expr = cast(
3436            col("c1"),
3437            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3438        )
3439        .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3440        let predicate_expr =
3441            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3442        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3443        assert_eq!(predicate_expr.to_string(), expected_expr);
3444
3445        Ok(())
3446    }
3447
3448    #[test]
3449    fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3450        // Test with Dictionary types that have different value types
3451        let schema = Schema::new(vec![Field::new(
3452            "c1",
3453            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3454            false,
3455        )]);
3456        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3457
3458        // Test with literal of a different type
3459        let expr =
3460            cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3461        let predicate_expr =
3462            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3463        assert_eq!(predicate_expr.to_string(), expected_expr);
3464
3465        Ok(())
3466    }
3467
3468    #[test]
3469    fn row_group_predicate_nested_dict() -> Result<()> {
3470        // Test with nested Dictionary types
3471        let schema = Schema::new(vec![Field::new(
3472            "c1",
3473            DataType::Dictionary(
3474                Box::new(DataType::UInt8),
3475                Box::new(DataType::Dictionary(
3476                    Box::new(DataType::UInt16),
3477                    Box::new(DataType::Utf8),
3478                )),
3479            ),
3480            false,
3481        )]);
3482        let expected_expr =
3483            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3484
3485        // Test with a simple literal
3486        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3487        let predicate_expr =
3488            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3489        assert_eq!(predicate_expr.to_string(), expected_expr);
3490
3491        Ok(())
3492    }
3493
3494    #[test]
3495    fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3496        // Test with dictionary-wrapped date types for both sides
3497        let schema = Schema::new(vec![Field::new(
3498            "c1",
3499            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3500            false,
3501        )]);
3502        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3503
3504        // Test with a cast to a different date type
3505        let expr = cast(
3506            col("c1"),
3507            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3508        )
3509        .eq(lit(ScalarValue::Date64(Some(123))));
3510        let predicate_expr =
3511            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3512        assert_eq!(predicate_expr.to_string(), expected_expr);
3513
3514        Ok(())
3515    }
3516
3517    #[test]
3518    fn row_group_predicate_date_string() -> Result<()> {
3519        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3520        let expected_expr = "true";
3521
3522        // test column on the left
3523        let expr =
3524            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3525        let predicate_expr =
3526            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3527        assert_eq!(predicate_expr.to_string(), expected_expr);
3528
3529        // test column on the right
3530        let expr =
3531            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3532        let predicate_expr =
3533            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3534        assert_eq!(predicate_expr.to_string(), expected_expr);
3535
3536        Ok(())
3537    }
3538
3539    #[test]
3540    fn row_group_predicate_string_date() -> Result<()> {
3541        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3542        let expected_expr = "true";
3543
3544        // test column on the left
3545        let expr = cast(col("c1"), DataType::Utf8)
3546            .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3547        let predicate_expr =
3548            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3549        assert_eq!(predicate_expr.to_string(), expected_expr);
3550
3551        // test column on the right
3552        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3553            .eq(cast(col("c1"), DataType::Utf8));
3554        let predicate_expr =
3555            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3556        assert_eq!(predicate_expr.to_string(), expected_expr);
3557
3558        Ok(())
3559    }
3560
3561    #[test]
3562    fn row_group_predicate_cast_list() -> Result<()> {
3563        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3564        // test cast(c1 as int64) in int64(1, 2, 3)
3565        let expr = Expr::InList(InList::new(
3566            Box::new(cast(col("c1"), DataType::Int64)),
3567            vec![
3568                lit(ScalarValue::Int64(Some(1))),
3569                lit(ScalarValue::Int64(Some(2))),
3570                lit(ScalarValue::Int64(Some(3))),
3571            ],
3572            false,
3573        ));
3574        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3575        let predicate_expr =
3576            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3577        assert_eq!(predicate_expr.to_string(), expected_expr);
3578
3579        let expr = Expr::InList(InList::new(
3580            Box::new(cast(col("c1"), DataType::Int64)),
3581            vec![
3582                lit(ScalarValue::Int64(Some(1))),
3583                lit(ScalarValue::Int64(Some(2))),
3584                lit(ScalarValue::Int64(Some(3))),
3585            ],
3586            true,
3587        ));
3588        let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3589        let predicate_expr =
3590            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3591        assert_eq!(predicate_expr.to_string(), expected_expr);
3592
3593        Ok(())
3594    }
3595
3596    #[test]
3597    fn prune_decimal_data() {
3598        // decimal(9,2)
3599        let schema = Arc::new(Schema::new(vec![Field::new(
3600            "s1",
3601            DataType::Decimal128(9, 2),
3602            true,
3603        )]));
3604
3605        prune_with_expr(
3606            // s1 > 5
3607            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3608            &schema,
3609            // If the data is written by spark, the physical data type is INT32 in the parquet
3610            // So we use the INT32 type of statistic.
3611            &TestStatistics::new().with(
3612                "s1",
3613                ContainerStats::new_i32(
3614                    vec![Some(0), Some(4), None, Some(3)], // min
3615                    vec![Some(5), Some(6), Some(4), None], // max
3616                ),
3617            ),
3618            &[false, true, false, true],
3619        );
3620
3621        prune_with_expr(
3622            // with cast column to other type
3623            cast(col("s1"), DataType::Decimal128(14, 3))
3624                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3625            &schema,
3626            &TestStatistics::new().with(
3627                "s1",
3628                ContainerStats::new_i32(
3629                    vec![Some(0), Some(4), None, Some(3)], // min
3630                    vec![Some(5), Some(6), Some(4), None], // max
3631                ),
3632            ),
3633            &[false, true, false, true],
3634        );
3635
3636        prune_with_expr(
3637            // with try cast column to other type
3638            try_cast(col("s1"), DataType::Decimal128(14, 3))
3639                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3640            &schema,
3641            &TestStatistics::new().with(
3642                "s1",
3643                ContainerStats::new_i32(
3644                    vec![Some(0), Some(4), None, Some(3)], // min
3645                    vec![Some(5), Some(6), Some(4), None], // max
3646                ),
3647            ),
3648            &[false, true, false, true],
3649        );
3650
3651        // decimal(18,2)
3652        let schema = Arc::new(Schema::new(vec![Field::new(
3653            "s1",
3654            DataType::Decimal128(18, 2),
3655            true,
3656        )]));
3657        prune_with_expr(
3658            // s1 > 5
3659            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3660            &schema,
3661            // If the data is written by spark, the physical data type is INT64 in the parquet
3662            // So we use the INT32 type of statistic.
3663            &TestStatistics::new().with(
3664                "s1",
3665                ContainerStats::new_i64(
3666                    vec![Some(0), Some(4), None, Some(3)], // min
3667                    vec![Some(5), Some(6), Some(4), None], // max
3668                ),
3669            ),
3670            &[false, true, false, true],
3671        );
3672
3673        // decimal(23,2)
3674        let schema = Arc::new(Schema::new(vec![Field::new(
3675            "s1",
3676            DataType::Decimal128(23, 2),
3677            true,
3678        )]));
3679
3680        prune_with_expr(
3681            // s1 > 5
3682            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3683            &schema,
3684            &TestStatistics::new().with(
3685                "s1",
3686                ContainerStats::new_decimal128(
3687                    vec![Some(0), Some(400), None, Some(300)], // min
3688                    vec![Some(500), Some(600), Some(400), None], // max
3689                    23,
3690                    2,
3691                ),
3692            ),
3693            &[false, true, false, true],
3694        );
3695    }
3696
3697    #[test]
3698    fn prune_api() {
3699        let schema = Arc::new(Schema::new(vec![
3700            Field::new("s1", DataType::Utf8, true),
3701            Field::new("s2", DataType::Int32, true),
3702        ]));
3703
3704        let statistics = TestStatistics::new().with(
3705            "s2",
3706            ContainerStats::new_i32(
3707                vec![Some(0), Some(4), None, Some(3)], // min
3708                vec![Some(5), Some(6), None, None],    // max
3709            ),
3710        );
3711        prune_with_expr(
3712            // Prune using s2 > 5
3713            col("s2").gt(lit(5)),
3714            &schema,
3715            &statistics,
3716            // s2 [0, 5] ==> no rows should pass
3717            // s2 [4, 6] ==> some rows could pass
3718            // No stats for s2 ==> some rows could pass
3719            // s2 [3, None] (null max) ==> some rows could pass
3720            &[false, true, true, true],
3721        );
3722
3723        prune_with_expr(
3724            // filter with cast
3725            cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3726            &schema,
3727            &statistics,
3728            &[false, true, true, true],
3729        );
3730    }
3731
3732    #[test]
3733    fn prune_not_eq_data() {
3734        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3735
3736        prune_with_expr(
3737            // Prune using s2 != 'M'
3738            col("s1").not_eq(lit("M")),
3739            &schema,
3740            &TestStatistics::new().with(
3741                "s1",
3742                ContainerStats::new_utf8(
3743                    vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3744                    vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3745                ),
3746            ),
3747            // s1 [A, Z] ==> might have values that pass predicate
3748            // s1 [A, L] ==> all rows pass the predicate
3749            // s1 [N, Z] ==> all rows pass the predicate
3750            // s1 [M, M] ==> all rows do not pass the predicate
3751            // No stats for s2 ==> some rows could pass
3752            // s2 [3, None] (null max) ==> some rows could pass
3753            &[true, true, true, false, true, true],
3754        );
3755    }
3756
3757    /// Creates setup for boolean chunk pruning
3758    ///
3759    /// For predicate "b1" (boolean expr)
3760    /// b1 [false, false] ==> no rows can pass (not keep)
3761    /// b1 [false, true] ==> some rows could pass (must keep)
3762    /// b1 [true, true] ==> all rows must pass (must keep)
3763    /// b1 [NULL, NULL]  ==> unknown (must keep)
3764    /// b1 [false, NULL]  ==> unknown (must keep)
3765    ///
3766    /// For predicate "!b1" (boolean expr)
3767    /// b1 [false, false] ==> all rows pass (must keep)
3768    /// b1 [false, true] ==> some rows could pass (must keep)
3769    /// b1 [true, true] ==> no rows can pass (not keep)
3770    /// b1 [NULL, NULL]  ==> unknown (must keep)
3771    /// b1 [false, NULL]  ==> unknown (must keep)
3772    fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3773        let schema =
3774            Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3775
3776        let statistics = TestStatistics::new().with(
3777            "b1",
3778            ContainerStats::new_bool(
3779                vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3780                vec![Some(false), Some(true), Some(true), None, None],         // max
3781            ),
3782        );
3783        let expected_true = vec![false, true, true, true, true];
3784        let expected_false = vec![true, true, false, true, true];
3785
3786        (schema, statistics, expected_true, expected_false)
3787    }
3788
3789    #[test]
3790    fn prune_bool_const_expr() {
3791        let (schema, statistics, _, _) = bool_setup();
3792
3793        prune_with_expr(
3794            // true
3795            lit(true),
3796            &schema,
3797            &statistics,
3798            &[true, true, true, true, true],
3799        );
3800
3801        prune_with_expr(
3802            // false
3803            lit(false),
3804            &schema,
3805            &statistics,
3806            &[false, false, false, false, false],
3807        );
3808    }
3809
3810    #[test]
3811    fn prune_bool_column() {
3812        let (schema, statistics, expected_true, _) = bool_setup();
3813
3814        prune_with_expr(
3815            // b1
3816            col("b1"),
3817            &schema,
3818            &statistics,
3819            &expected_true,
3820        );
3821    }
3822
3823    #[test]
3824    fn prune_bool_not_column() {
3825        let (schema, statistics, _, expected_false) = bool_setup();
3826
3827        prune_with_expr(
3828            // !b1
3829            col("b1").not(),
3830            &schema,
3831            &statistics,
3832            &expected_false,
3833        );
3834    }
3835
3836    #[test]
3837    fn prune_bool_column_eq_true() {
3838        let (schema, statistics, expected_true, _) = bool_setup();
3839
3840        prune_with_expr(
3841            // b1 = true
3842            col("b1").eq(lit(true)),
3843            &schema,
3844            &statistics,
3845            &expected_true,
3846        );
3847    }
3848
3849    #[test]
3850    fn prune_bool_not_column_eq_true() {
3851        let (schema, statistics, _, expected_false) = bool_setup();
3852
3853        prune_with_expr(
3854            // !b1 = true
3855            col("b1").not().eq(lit(true)),
3856            &schema,
3857            &statistics,
3858            &expected_false,
3859        );
3860    }
3861
3862    /// Creates a setup for chunk pruning, modeling a int32 column "i"
3863    /// with 5 different containers (e.g. RowGroups). They have [min,
3864    /// max]:
3865    ///
3866    /// i [-5, 5]
3867    /// i [1, 11]
3868    /// i [-11, -1]
3869    /// i [NULL, NULL]
3870    /// i [1, NULL]
3871    fn int32_setup() -> (SchemaRef, TestStatistics) {
3872        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3873
3874        let statistics = TestStatistics::new().with(
3875            "i",
3876            ContainerStats::new_i32(
3877                vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3878                vec![Some(5), Some(11), Some(-1), None, None],     // max
3879            ),
3880        );
3881        (schema, statistics)
3882    }
3883
3884    #[test]
3885    fn prune_int32_col_gt_zero() {
3886        let (schema, statistics) = int32_setup();
3887
3888        // Expression "i > 0" and "-i < 0"
3889        // i [-5, 5] ==> some rows could pass (must keep)
3890        // i [1, 11] ==> all rows must pass (must keep)
3891        // i [-11, -1] ==>  no rows can pass (not keep)
3892        // i [NULL, NULL]  ==> unknown (must keep)
3893        // i [1, NULL]  ==> unknown (must keep)
3894        let expected_ret = &[true, true, false, true, true];
3895
3896        // i > 0
3897        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3898
3899        // -i < 0
3900        prune_with_expr(
3901            Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3902            &schema,
3903            &statistics,
3904            expected_ret,
3905        );
3906    }
3907
3908    #[test]
3909    fn prune_int32_col_lte_zero() {
3910        let (schema, statistics) = int32_setup();
3911
3912        // Expression "i <= 0" and "-i >= 0"
3913        // i [-5, 5] ==> some rows could pass (must keep)
3914        // i [1, 11] ==> no rows can pass (not keep)
3915        // i [-11, -1] ==>  all rows must pass (must keep)
3916        // i [NULL, NULL]  ==> unknown (must keep)
3917        // i [1, NULL]  ==> no rows can pass (not keep)
3918        let expected_ret = &[true, false, true, true, false];
3919
3920        prune_with_expr(
3921            // i <= 0
3922            col("i").lt_eq(lit(0)),
3923            &schema,
3924            &statistics,
3925            expected_ret,
3926        );
3927
3928        prune_with_expr(
3929            // -i >= 0
3930            Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3931            &schema,
3932            &statistics,
3933            expected_ret,
3934        );
3935    }
3936
3937    #[test]
3938    fn prune_int32_col_lte_zero_cast() {
3939        let (schema, statistics) = int32_setup();
3940
3941        // Expression "cast(i as utf8) <= '0'"
3942        // i [-5, 5] ==> some rows could pass (must keep)
3943        // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
3944        // i [-11, -1] ==>  no rows could pass in theory (conservatively keep)
3945        // i [NULL, NULL]  ==> unknown (must keep)
3946        // i [1, NULL]  ==> no rows can pass (conservatively keep)
3947        let expected_ret = &[true, true, true, true, true];
3948
3949        prune_with_expr(
3950            // cast(i as utf8) <= 0
3951            cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3952            &schema,
3953            &statistics,
3954            expected_ret,
3955        );
3956
3957        prune_with_expr(
3958            // try_cast(i as utf8) <= 0
3959            try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3960            &schema,
3961            &statistics,
3962            expected_ret,
3963        );
3964
3965        prune_with_expr(
3966            // cast(-i as utf8) >= 0
3967            cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3968            &schema,
3969            &statistics,
3970            expected_ret,
3971        );
3972
3973        prune_with_expr(
3974            // try_cast(-i as utf8) >= 0
3975            try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3976            &schema,
3977            &statistics,
3978            expected_ret,
3979        );
3980    }
3981
3982    #[test]
3983    fn prune_int32_col_eq_zero() {
3984        let (schema, statistics) = int32_setup();
3985
3986        // Expression "i = 0"
3987        // i [-5, 5] ==> some rows could pass (must keep)
3988        // i [1, 11] ==> no rows can pass (not keep)
3989        // i [-11, -1] ==>  no rows can pass (not keep)
3990        // i [NULL, NULL]  ==> unknown (must keep)
3991        // i [1, NULL]  ==> no rows can pass (not keep)
3992        let expected_ret = &[true, false, false, true, false];
3993
3994        prune_with_expr(
3995            // i = 0
3996            col("i").eq(lit(0)),
3997            &schema,
3998            &statistics,
3999            expected_ret,
4000        );
4001    }
4002
4003    #[test]
4004    fn prune_int32_col_eq_zero_cast() {
4005        let (schema, statistics) = int32_setup();
4006
4007        // Expression "cast(i as int64) = 0"
4008        // i [-5, 5] ==> some rows could pass (must keep)
4009        // i [1, 11] ==> no rows can pass (not keep)
4010        // i [-11, -1] ==>  no rows can pass (not keep)
4011        // i [NULL, NULL]  ==> unknown (must keep)
4012        // i [1, NULL]  ==> no rows can pass (not keep)
4013        let expected_ret = &[true, false, false, true, false];
4014
4015        prune_with_expr(
4016            cast(col("i"), DataType::Int64).eq(lit(0i64)),
4017            &schema,
4018            &statistics,
4019            expected_ret,
4020        );
4021
4022        prune_with_expr(
4023            try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
4024            &schema,
4025            &statistics,
4026            expected_ret,
4027        );
4028    }
4029
4030    #[test]
4031    fn prune_int32_col_eq_zero_cast_as_str() {
4032        let (schema, statistics) = int32_setup();
4033
4034        // Note the cast is to a string where sorting properties are
4035        // not the same as integers
4036        //
4037        // Expression "cast(i as utf8) = '0'"
4038        // i [-5, 5] ==> some rows could pass (keep)
4039        // i [1, 11] ==> no rows can pass  (could keep)
4040        // i [-11, -1] ==>  no rows can pass (could keep)
4041        // i [NULL, NULL]  ==> unknown (keep)
4042        // i [1, NULL]  ==> no rows can pass (could keep)
4043        let expected_ret = &[true, true, true, true, true];
4044
4045        prune_with_expr(
4046            cast(col("i"), DataType::Utf8).eq(lit("0")),
4047            &schema,
4048            &statistics,
4049            expected_ret,
4050        );
4051    }
4052
4053    #[test]
4054    fn prune_int32_col_lt_neg_one() {
4055        let (schema, statistics) = int32_setup();
4056
4057        // Expression "i > -1" and "-i < 1"
4058        // i [-5, 5] ==> some rows could pass (must keep)
4059        // i [1, 11] ==> all rows must pass (must keep)
4060        // i [-11, -1] ==>  no rows can pass (not keep)
4061        // i [NULL, NULL]  ==> unknown (must keep)
4062        // i [1, NULL]  ==> all rows must pass (must keep)
4063        let expected_ret = &[true, true, false, true, true];
4064
4065        prune_with_expr(
4066            // i > -1
4067            col("i").gt(lit(-1)),
4068            &schema,
4069            &statistics,
4070            expected_ret,
4071        );
4072
4073        prune_with_expr(
4074            // -i < 1
4075            Expr::Negative(Box::new(col("i"))).lt(lit(1)),
4076            &schema,
4077            &statistics,
4078            expected_ret,
4079        );
4080    }
4081
4082    #[test]
4083    fn prune_int32_is_null() {
4084        let (schema, statistics) = int32_setup();
4085
4086        // Expression "i IS NULL" when there are no null statistics,
4087        // should all be kept
4088        let expected_ret = &[true, true, true, true, true];
4089
4090        prune_with_expr(
4091            // i IS NULL, no null statistics
4092            col("i").is_null(),
4093            &schema,
4094            &statistics,
4095            expected_ret,
4096        );
4097
4098        // provide null counts for each column
4099        let statistics = statistics.with_null_counts(
4100            "i",
4101            vec![
4102                Some(0), // no nulls (don't keep)
4103                Some(1), // 1 null
4104                None,    // unknown nulls
4105                None, // unknown nulls (min/max are both null too, like no stats at all)
4106                Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
4107            ],
4108        );
4109
4110        let expected_ret = &[false, true, true, true, false];
4111
4112        prune_with_expr(
4113            // i IS NULL, with actual null statistics
4114            col("i").is_null(),
4115            &schema,
4116            &statistics,
4117            expected_ret,
4118        );
4119    }
4120
4121    #[test]
4122    fn prune_int32_column_is_known_all_null() {
4123        let (schema, statistics) = int32_setup();
4124
4125        // Expression "i < 0"
4126        // i [-5, 5] ==> some rows could pass (must keep)
4127        // i [1, 11] ==> no rows can pass (not keep)
4128        // i [-11, -1] ==>  all rows must pass (must keep)
4129        // i [NULL, NULL]  ==> unknown (must keep)
4130        // i [1, NULL]  ==> no rows can pass (not keep)
4131        let expected_ret = &[true, false, true, true, false];
4132
4133        prune_with_expr(
4134            // i < 0
4135            col("i").lt(lit(0)),
4136            &schema,
4137            &statistics,
4138            expected_ret,
4139        );
4140
4141        // provide row counts for each column
4142        let statistics = statistics.with_row_counts(
4143            "i",
4144            vec![
4145                Some(10), // 10 rows of data
4146                Some(9),  // 9 rows of data
4147                None,     // unknown row counts
4148                Some(4),
4149                Some(10),
4150            ],
4151        );
4152
4153        // pruning result is still the same if we only know row counts
4154        prune_with_expr(
4155            // i < 0, with only row counts statistics
4156            col("i").lt(lit(0)),
4157            &schema,
4158            &statistics,
4159            expected_ret,
4160        );
4161
4162        // provide null counts for each column
4163        let statistics = statistics.with_null_counts(
4164            "i",
4165            vec![
4166                Some(0), // no nulls
4167                Some(1), // 1 null
4168                None,    // unknown nulls
4169                Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
4170                Some(0), // 0 nulls (max=null too which means no known max)
4171            ],
4172        );
4173
4174        // Expression "i < 0" with actual null and row counts statistics
4175        // col | min, max     | row counts | null counts |
4176        // ----+--------------+------------+-------------+
4177        //  i  | [-5, 5]      | 10         | 0           | ==> Some rows could pass (must keep)
4178        //  i  | [1, 11]      | 9          | 1           | ==> No rows can pass (not keep)
4179        //  i  | [-11,-1]     | Unknown    | Unknown     | ==> All rows must pass (must keep)
4180        //  i  | [NULL, NULL] | 4          | 4           | ==> The column is all null (not keep)
4181        //  i  | [1, NULL]    | 10         | 0           | ==> No rows can pass (not keep)
4182        let expected_ret = &[true, false, true, false, false];
4183
4184        prune_with_expr(
4185            // i < 0, with actual null and row counts statistics
4186            col("i").lt(lit(0)),
4187            &schema,
4188            &statistics,
4189            expected_ret,
4190        );
4191    }
4192
4193    #[test]
4194    fn prune_cast_column_scalar() {
4195        // The data type of column i is INT32
4196        let (schema, statistics) = int32_setup();
4197        let expected_ret = &[true, true, false, true, true];
4198
4199        prune_with_expr(
4200            // i > int64(0)
4201            col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
4202            &schema,
4203            &statistics,
4204            expected_ret,
4205        );
4206
4207        prune_with_expr(
4208            // cast(i as int64) > int64(0)
4209            cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4210            &schema,
4211            &statistics,
4212            expected_ret,
4213        );
4214
4215        prune_with_expr(
4216            // try_cast(i as int64) > int64(0)
4217            try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4218            &schema,
4219            &statistics,
4220            expected_ret,
4221        );
4222
4223        prune_with_expr(
4224            // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
4225            Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
4226                .lt(lit(ScalarValue::Int64(Some(0)))),
4227            &schema,
4228            &statistics,
4229            expected_ret,
4230        );
4231    }
4232
4233    #[test]
4234    fn test_increment_utf8() {
4235        // Basic ASCII
4236        assert_eq!(increment_utf8("abc").unwrap(), "abd");
4237        assert_eq!(increment_utf8("abz").unwrap(), "ab{");
4238
4239        // Test around ASCII 127 (DEL)
4240        assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
4241        assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
4242
4243        // Test 2-byte UTF-8 sequences
4244        assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
4245
4246        // Test 3-byte UTF-8 sequences
4247        assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
4248
4249        // Test at UTF-8 boundaries
4250        assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
4251        assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
4252
4253        // Test that if we can't increment we return None
4254        assert!(increment_utf8("").is_none());
4255        assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
4256
4257        // Test that if we can't increment the last character we do the previous one and truncate
4258        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4259
4260        // Test surrogate pair range (0xD800..=0xDFFF)
4261        assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4262        assert!(increment_utf8("\u{D7FF}").is_none());
4263
4264        // Test non-characters range (0xFDD0..=0xFDEF)
4265        assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4266        assert!(increment_utf8("\u{FDCF}").is_none());
4267
4268        // Test private use area limit (>= 0x110000)
4269        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4270        assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
4271    }
4272
4273    /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
4274    /// with 5 different containers (e.g. RowGroups). They have [min,
4275    /// max]:
4276    /// s1 ["A", "Z"]
4277    /// s1 ["A", "L"]
4278    /// s1 ["N", "Z"]
4279    /// s1 [NULL, NULL]
4280    /// s1 ["A", NULL]
4281    /// s1 ["", "A"]
4282    /// s1 ["", ""]
4283    /// s1 ["AB", "A\u{10ffff}"]
4284    /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
4285    fn utf8_setup() -> (SchemaRef, TestStatistics) {
4286        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4287
4288        let statistics = TestStatistics::new().with(
4289            "s1",
4290            ContainerStats::new_utf8(
4291                vec![
4292                    Some("A"),
4293                    Some("A"),
4294                    Some("N"),
4295                    Some("M"),
4296                    None,
4297                    Some("A"),
4298                    Some(""),
4299                    Some(""),
4300                    Some("AB"),
4301                    Some("A\u{10ffff}\u{10ffff}"),
4302                ], // min
4303                vec![
4304                    Some("Z"),
4305                    Some("L"),
4306                    Some("Z"),
4307                    Some("M"),
4308                    None,
4309                    None,
4310                    Some("A"),
4311                    Some(""),
4312                    Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4313                    Some("A\u{10ffff}\u{10ffff}"),
4314                ], // max
4315            ),
4316        );
4317        (schema, statistics)
4318    }
4319
4320    #[test]
4321    fn prune_utf8_eq() {
4322        let (schema, statistics) = utf8_setup();
4323
4324        let expr = col("s1").eq(lit("A"));
4325        #[rustfmt::skip]
4326        let expected_ret = &[
4327            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4328            true,
4329            // s1 ["A", "L"] ==> some rows could pass (must keep)
4330            true,
4331            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4332            false,
4333            // s1 ["M", "M"] ==> no rows can pass (not keep)
4334            false,
4335            // s1 [NULL, NULL]  ==> unknown (must keep)
4336            true,
4337            // s1 ["A", NULL]  ==> unknown (must keep)
4338            true,
4339            // s1 ["", "A"]  ==> some rows could pass (must keep)
4340            true,
4341            // s1 ["", ""]  ==> no rows can pass (not keep)
4342            false,
4343            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4344            false,
4345            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4346            false,
4347        ];
4348        prune_with_expr(expr, &schema, &statistics, expected_ret);
4349
4350        let expr = col("s1").eq(lit(""));
4351        #[rustfmt::skip]
4352        let expected_ret = &[
4353            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4354            false,
4355            // s1 ["A", "L"] ==> no rows can pass (not keep)
4356            false,
4357            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4358            false,
4359            // s1 ["M", "M"] ==> no rows can pass (not keep)
4360            false,
4361            // s1 [NULL, NULL]  ==> unknown (must keep)
4362            true,
4363            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4364            false,
4365            // s1 ["", "A"]  ==> some rows could pass (must keep)
4366            true,
4367            // s1 ["", ""]  ==> all rows must pass (must keep)
4368            true,
4369            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4370            false,
4371            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4372            false,
4373        ];
4374        prune_with_expr(expr, &schema, &statistics, expected_ret);
4375    }
4376
4377    #[test]
4378    fn prune_utf8_not_eq() {
4379        let (schema, statistics) = utf8_setup();
4380
4381        let expr = col("s1").not_eq(lit("A"));
4382        #[rustfmt::skip]
4383        let expected_ret = &[
4384            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4385            true,
4386            // s1 ["A", "L"] ==> some rows could pass (must keep)
4387            true,
4388            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4389            true,
4390            // s1 ["M", "M"] ==> all rows must pass (must keep)
4391            true,
4392            // s1 [NULL, NULL]  ==> unknown (must keep)
4393            true,
4394            // s1 ["A", NULL]  ==> unknown (must keep)
4395            true,
4396            // s1 ["", "A"]  ==> some rows could pass (must keep)
4397            true,
4398            // s1 ["", ""]  ==> all rows must pass (must keep)
4399            true,
4400            // s1 ["AB", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4401            true,
4402            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4403            true,
4404        ];
4405        prune_with_expr(expr, &schema, &statistics, expected_ret);
4406
4407        let expr = col("s1").not_eq(lit(""));
4408        #[rustfmt::skip]
4409        let expected_ret = &[
4410            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4411            true,
4412            // s1 ["A", "L"] ==> all rows must pass (must keep)
4413            true,
4414            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4415            true,
4416            // s1 ["M", "M"] ==> all rows must pass (must keep)
4417            true,
4418            // s1 [NULL, NULL]  ==> unknown (must keep)
4419            true,
4420            // s1 ["A", NULL]  ==> unknown (must keep)
4421            true,
4422            // s1 ["", "A"]  ==> some rows could pass (must keep)
4423            true,
4424            // s1 ["", ""]  ==> no rows can pass (not keep)
4425            false,
4426            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4427            true,
4428            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4429            true,
4430        ];
4431        prune_with_expr(expr, &schema, &statistics, expected_ret);
4432    }
4433
4434    #[test]
4435    fn prune_utf8_like_one() {
4436        let (schema, statistics) = utf8_setup();
4437
4438        let expr = col("s1").like(lit("A_"));
4439        #[rustfmt::skip]
4440        let expected_ret = &[
4441            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4442            true,
4443            // s1 ["A", "L"] ==> some rows could pass (must keep)
4444            true,
4445            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4446            false,
4447            // s1 ["M", "M"] ==> no rows can pass (not keep)
4448            false,
4449            // s1 [NULL, NULL]  ==> unknown (must keep)
4450            true,
4451            // s1 ["A", NULL]  ==> unknown (must keep)
4452            true,
4453            // s1 ["", "A"]  ==> some rows could pass (must keep)
4454            true,
4455            // s1 ["", ""]  ==> no rows can pass (not keep)
4456            false,
4457            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4458            true,
4459            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4460            true,
4461        ];
4462        prune_with_expr(expr, &schema, &statistics, expected_ret);
4463
4464        let expr = col("s1").like(lit("_A_"));
4465        #[rustfmt::skip]
4466        let expected_ret = &[
4467            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4468            true,
4469            // s1 ["A", "L"] ==> some rows could pass (must keep)
4470            true,
4471            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4472            true,
4473            // s1 ["M", "M"] ==> some rows could pass (must keep)
4474            true,
4475            // s1 [NULL, NULL]  ==> unknown (must keep)
4476            true,
4477            // s1 ["A", NULL]  ==> unknown (must keep)
4478            true,
4479            // s1 ["", "A"]  ==> some rows could pass (must keep)
4480            true,
4481            // s1 ["", ""]  ==> some rows could pass (must keep)
4482            true,
4483            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4484            true,
4485            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4486            true,
4487        ];
4488        prune_with_expr(expr, &schema, &statistics, expected_ret);
4489
4490        let expr = col("s1").like(lit("_"));
4491        #[rustfmt::skip]
4492        let expected_ret = &[
4493            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4494            true,
4495            // s1 ["A", "L"] ==> all rows must pass (must keep)
4496            true,
4497            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4498            true,
4499            // s1 ["M", "M"] ==> all rows must pass (must keep)
4500            true,
4501            // s1 [NULL, NULL]  ==> unknown (must keep)
4502            true,
4503            // s1 ["A", NULL]  ==> unknown (must keep)
4504            true,
4505            // s1 ["", "A"]  ==> all rows must pass (must keep)
4506            true,
4507            // s1 ["", ""]  ==> all rows must pass (must keep)
4508            true,
4509            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4510            true,
4511            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4512            true,
4513        ];
4514        prune_with_expr(expr, &schema, &statistics, expected_ret);
4515
4516        let expr = col("s1").like(lit(""));
4517        #[rustfmt::skip]
4518        let expected_ret = &[
4519            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4520            false,
4521            // s1 ["A", "L"] ==> no rows can pass (not keep)
4522            false,
4523            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4524            false,
4525            // s1 ["M", "M"] ==> no rows can pass (not keep)
4526            false,
4527            // s1 [NULL, NULL]  ==> unknown (must keep)
4528            true,
4529            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4530            false,
4531            // s1 ["", "A"]  ==> some rows could pass (must keep)
4532            true,
4533            // s1 ["", ""]  ==> all rows must pass (must keep)
4534            true,
4535            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4536            false,
4537            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4538            false,
4539        ];
4540        prune_with_expr(expr, &schema, &statistics, expected_ret);
4541    }
4542
4543    #[test]
4544    fn prune_utf8_like_many() {
4545        let (schema, statistics) = utf8_setup();
4546
4547        let expr = col("s1").like(lit("A%"));
4548        #[rustfmt::skip]
4549        let expected_ret = &[
4550            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4551            true,
4552            // s1 ["A", "L"] ==> some rows could pass (must keep)
4553            true,
4554            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4555            false,
4556            // s1 ["M", "M"] ==> no rows can pass (not keep)
4557            false,
4558            // s1 [NULL, NULL]  ==> unknown (must keep)
4559            true,
4560            // s1 ["A", NULL]  ==> unknown (must keep)
4561            true,
4562            // s1 ["", "A"]  ==> some rows could pass (must keep)
4563            true,
4564            // s1 ["", ""]  ==> no rows can pass (not keep)
4565            false,
4566            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4567            true,
4568            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4569            true,
4570        ];
4571        prune_with_expr(expr, &schema, &statistics, expected_ret);
4572
4573        let expr = col("s1").like(lit("%A%"));
4574        #[rustfmt::skip]
4575        let expected_ret = &[
4576            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4577            true,
4578            // s1 ["A", "L"] ==> some rows could pass (must keep)
4579            true,
4580            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4581            true,
4582            // s1 ["M", "M"] ==> some rows could pass (must keep)
4583            true,
4584            // s1 [NULL, NULL]  ==> unknown (must keep)
4585            true,
4586            // s1 ["A", NULL]  ==> unknown (must keep)
4587            true,
4588            // s1 ["", "A"]  ==> some rows could pass (must keep)
4589            true,
4590            // s1 ["", ""]  ==> some rows could pass (must keep)
4591            true,
4592            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4593            true,
4594            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4595            true,
4596        ];
4597        prune_with_expr(expr, &schema, &statistics, expected_ret);
4598
4599        let expr = col("s1").like(lit("%"));
4600        #[rustfmt::skip]
4601        let expected_ret = &[
4602            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4603            true,
4604            // s1 ["A", "L"] ==> all rows must pass (must keep)
4605            true,
4606            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4607            true,
4608            // s1 ["M", "M"] ==> all rows must pass (must keep)
4609            true,
4610            // s1 [NULL, NULL]  ==> unknown (must keep)
4611            true,
4612            // s1 ["A", NULL]  ==> unknown (must keep)
4613            true,
4614            // s1 ["", "A"]  ==> all rows must pass (must keep)
4615            true,
4616            // s1 ["", ""]  ==> all rows must pass (must keep)
4617            true,
4618            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4619            true,
4620            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4621            true,
4622        ];
4623        prune_with_expr(expr, &schema, &statistics, expected_ret);
4624
4625        let expr = col("s1").like(lit(""));
4626        #[rustfmt::skip]
4627        let expected_ret = &[
4628            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4629            false,
4630            // s1 ["A", "L"] ==> no rows can pass (not keep)
4631            false,
4632            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4633            false,
4634            // s1 ["M", "M"] ==> no rows can pass (not keep)
4635            false,
4636            // s1 [NULL, NULL]  ==> unknown (must keep)
4637            true,
4638            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4639            false,
4640            // s1 ["", "A"]  ==> some rows could pass (must keep)
4641            true,
4642            // s1 ["", ""]  ==> all rows must pass (must keep)
4643            true,
4644            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4645            false,
4646            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4647            false,
4648        ];
4649        prune_with_expr(expr, &schema, &statistics, expected_ret);
4650    }
4651
4652    #[test]
4653    fn prune_utf8_not_like_one() {
4654        let (schema, statistics) = utf8_setup();
4655
4656        let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4657        #[rustfmt::skip]
4658        let expected_ret = &[
4659            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4660            true,
4661            // s1 ["A", "L"] ==> some rows could pass (must keep)
4662            true,
4663            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4664            true,
4665            // s1 ["M", "M"] ==> some rows could pass (must keep)
4666            true,
4667            // s1 [NULL, NULL]  ==> unknown (must keep)
4668            true,
4669            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4670            true,
4671            // s1 ["", "A"]  ==> some rows could pass (must keep)
4672            true,
4673            // s1 ["", ""]  ==> some rows could pass (must keep)
4674            true,
4675            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4676            true,
4677            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match. (min, max) maybe truncate
4678            // original (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
4679            true,
4680        ];
4681        prune_with_expr(expr, &schema, &statistics, expected_ret);
4682    }
4683
4684    #[test]
4685    fn prune_utf8_not_like_many() {
4686        let (schema, statistics) = utf8_setup();
4687
4688        let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4689        #[rustfmt::skip]
4690        let expected_ret = &[
4691            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4692            true,
4693            // s1 ["A", "L"] ==> some rows could pass (must keep)
4694            true,
4695            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4696            true,
4697            // s1 ["M", "M"] ==> some rows could pass (must keep)
4698            true,
4699            // s1 [NULL, NULL]  ==> unknown (must keep)
4700            true,
4701            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4702            true,
4703            // s1 ["", "A"]  ==> some rows could pass (must keep)
4704            true,
4705            // s1 ["", ""]  ==> some rows could pass (must keep)
4706            true,
4707            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4708            true,
4709            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match
4710            false,
4711        ];
4712        prune_with_expr(expr, &schema, &statistics, expected_ret);
4713
4714        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4715        #[rustfmt::skip]
4716        let expected_ret = &[
4717            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4718            true,
4719            // s1 ["A", "L"] ==> some rows could pass (must keep)
4720            true,
4721            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4722            true,
4723            // s1 ["M", "M"] ==> some rows could pass (must keep)
4724            true,
4725            // s1 [NULL, NULL]  ==> unknown (must keep)
4726            true,
4727            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4728            true,
4729            // s1 ["", "A"]  ==> some rows could pass (must keep)
4730            true,
4731            // s1 ["", ""]  ==> some rows could pass (must keep)
4732            true,
4733            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4734            true,
4735            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4736            true,
4737        ];
4738        prune_with_expr(expr, &schema, &statistics, expected_ret);
4739
4740        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4741        #[rustfmt::skip]
4742        let expected_ret = &[
4743            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4744            true,
4745            // s1 ["A", "L"] ==> some rows could pass (must keep)
4746            true,
4747            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4748            true,
4749            // s1 ["M", "M"] ==> some rows could pass (must keep)
4750            true,
4751            // s1 [NULL, NULL]  ==> unknown (must keep)
4752            true,
4753            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4754            true,
4755            // s1 ["", "A"]  ==> some rows could pass (must keep)
4756            true,
4757            // s1 ["", ""]  ==> some rows could pass (must keep)
4758            true,
4759            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4760            true,
4761            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4762            true,
4763        ];
4764        prune_with_expr(expr, &schema, &statistics, expected_ret);
4765
4766        let expr = col("s1").not_like(lit("A\\%%"));
4767        let statistics = TestStatistics::new().with(
4768            "s1",
4769            ContainerStats::new_utf8(
4770                vec![Some("A%a"), Some("A")],
4771                vec![Some("A%c"), Some("A")],
4772            ),
4773        );
4774        let expected_ret = &[false, true];
4775        prune_with_expr(expr, &schema, &statistics, expected_ret);
4776    }
4777
4778    #[test]
4779    fn test_rewrite_expr_to_prunable() {
4780        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4781        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4782
4783        // column op lit
4784        let left_input = col("a");
4785        let left_input = logical2physical(&left_input, &schema);
4786        let right_input = lit(ScalarValue::Int32(Some(12)));
4787        let right_input = logical2physical(&right_input, &schema);
4788        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4789            &left_input,
4790            Operator::Eq,
4791            &right_input,
4792            df_schema.clone(),
4793        )
4794        .unwrap();
4795        assert_eq!(result_left.to_string(), left_input.to_string());
4796        assert_eq!(result_right.to_string(), right_input.to_string());
4797
4798        // cast op lit
4799        let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4800        let left_input = logical2physical(&left_input, &schema);
4801        let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4802        let right_input = logical2physical(&right_input, &schema);
4803        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4804            &left_input,
4805            Operator::Gt,
4806            &right_input,
4807            df_schema.clone(),
4808        )
4809        .unwrap();
4810        assert_eq!(result_left.to_string(), left_input.to_string());
4811        assert_eq!(result_right.to_string(), right_input.to_string());
4812
4813        // try_cast op lit
4814        let left_input = try_cast(col("a"), DataType::Int64);
4815        let left_input = logical2physical(&left_input, &schema);
4816        let right_input = lit(ScalarValue::Int64(Some(12)));
4817        let right_input = logical2physical(&right_input, &schema);
4818        let (result_left, _, result_right) =
4819            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4820                .unwrap();
4821        assert_eq!(result_left.to_string(), left_input.to_string());
4822        assert_eq!(result_right.to_string(), right_input.to_string());
4823
4824        // TODO: add test for other case and op
4825    }
4826
4827    #[test]
4828    fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4829        struct CustomUnhandledHook;
4830
4831        impl UnhandledPredicateHook for CustomUnhandledHook {
4832            /// This handles an arbitrary case of a column that doesn't exist in the schema
4833            /// by renaming it to yet another column that doesn't exist in the schema
4834            /// (the transformation is arbitrary, the point is that it can do whatever it wants)
4835            fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4836                Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4837            }
4838        }
4839
4840        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4841        let schema_with_b = Schema::new(vec![
4842            Field::new("a", DataType::Int32, true),
4843            Field::new("b", DataType::Int32, true),
4844        ]);
4845
4846        let rewriter = PredicateRewriter::new()
4847            .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4848
4849        let transform_expr = |expr| {
4850            let expr = logical2physical(&expr, &schema_with_b);
4851            rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4852        };
4853
4854        // transform an arbitrary valid expression that we know is handled
4855        let known_expression = col("a").eq(lit(12));
4856        let known_expression_transformed = PredicateRewriter::new()
4857            .rewrite_predicate_to_statistics_predicate(
4858                &logical2physical(&known_expression, &schema),
4859                &schema,
4860            );
4861
4862        // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
4863        let input = col("b").eq(lit(12));
4864        let expected = logical2physical(&lit(42), &schema);
4865        let transformed = transform_expr(input.clone());
4866        assert_eq!(transformed.to_string(), expected.to_string());
4867
4868        // more complex case with unknown column
4869        let input = known_expression.clone().and(input.clone());
4870        let expected = phys_expr::BinaryExpr::new(
4871            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4872            Operator::And,
4873            logical2physical(&lit(42), &schema),
4874        );
4875        let transformed = transform_expr(input.clone());
4876        assert_eq!(transformed.to_string(), expected.to_string());
4877
4878        // an unknown expression gets passed to the hook
4879        let input = array_has(make_array(vec![lit(1)]), col("a"));
4880        let expected = logical2physical(&lit(42), &schema);
4881        let transformed = transform_expr(input.clone());
4882        assert_eq!(transformed.to_string(), expected.to_string());
4883
4884        // more complex case with unknown expression
4885        let input = known_expression.and(input);
4886        let expected = phys_expr::BinaryExpr::new(
4887            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4888            Operator::And,
4889            logical2physical(&lit(42), &schema),
4890        );
4891        let transformed = transform_expr(input.clone());
4892        assert_eq!(transformed.to_string(), expected.to_string());
4893    }
4894
4895    #[test]
4896    fn test_rewrite_expr_to_prunable_error() {
4897        // cast string value to numeric value
4898        // this cast is not supported
4899        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4900        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4901        let left_input = cast(col("a"), DataType::Int64);
4902        let left_input = logical2physical(&left_input, &schema);
4903        let right_input = lit(ScalarValue::Int64(Some(12)));
4904        let right_input = logical2physical(&right_input, &schema);
4905        let result = rewrite_expr_to_prunable(
4906            &left_input,
4907            Operator::Gt,
4908            &right_input,
4909            df_schema.clone(),
4910        );
4911        assert!(result.is_err());
4912
4913        // other expr
4914        let left_input = is_null(col("a"));
4915        let left_input = logical2physical(&left_input, &schema);
4916        let right_input = lit(ScalarValue::Int64(Some(12)));
4917        let right_input = logical2physical(&right_input, &schema);
4918        let result =
4919            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4920        assert!(result.is_err());
4921        // TODO: add other negative test for other case and op
4922    }
4923
4924    #[test]
4925    fn prune_with_contained_one_column() {
4926        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4927
4928        // Model having information like a bloom filter for s1
4929        let statistics = TestStatistics::new()
4930            .with_contained(
4931                "s1",
4932                [ScalarValue::from("foo")],
4933                [
4934                    // container 0 known to only contain "foo"",
4935                    Some(true),
4936                    // container 1 known to not contain "foo"
4937                    Some(false),
4938                    // container 2 unknown about "foo"
4939                    None,
4940                    // container 3 known to only contain "foo"
4941                    Some(true),
4942                    // container 4 known to not contain "foo"
4943                    Some(false),
4944                    // container 5 unknown about "foo"
4945                    None,
4946                    // container 6 known to only contain "foo"
4947                    Some(true),
4948                    // container 7 known to not contain "foo"
4949                    Some(false),
4950                    // container 8 unknown about "foo"
4951                    None,
4952                ],
4953            )
4954            .with_contained(
4955                "s1",
4956                [ScalarValue::from("bar")],
4957                [
4958                    // containers 0,1,2 known to only contain "bar"
4959                    Some(true),
4960                    Some(true),
4961                    Some(true),
4962                    // container 3,4,5 known to not contain "bar"
4963                    Some(false),
4964                    Some(false),
4965                    Some(false),
4966                    // container 6,7,8 unknown about "bar"
4967                    None,
4968                    None,
4969                    None,
4970                ],
4971            )
4972            .with_contained(
4973                // the way the tests are setup, this data is
4974                // consulted if the "foo" and "bar" are being checked at the same time
4975                "s1",
4976                [ScalarValue::from("foo"), ScalarValue::from("bar")],
4977                [
4978                    // container 0,1,2 unknown about ("foo, "bar")
4979                    None,
4980                    None,
4981                    None,
4982                    // container 3,4,5 known to contain only either "foo" and "bar"
4983                    Some(true),
4984                    Some(true),
4985                    Some(true),
4986                    // container 6,7,8  known to contain  neither "foo" and "bar"
4987                    Some(false),
4988                    Some(false),
4989                    Some(false),
4990                ],
4991            );
4992
4993        // s1 = 'foo'
4994        prune_with_expr(
4995            col("s1").eq(lit("foo")),
4996            &schema,
4997            &statistics,
4998            // rule out containers ('false) where we know foo is not present
4999            &[true, false, true, true, false, true, true, false, true],
5000        );
5001
5002        // s1 = 'bar'
5003        prune_with_expr(
5004            col("s1").eq(lit("bar")),
5005            &schema,
5006            &statistics,
5007            // rule out containers where we know bar is not present
5008            &[true, true, true, false, false, false, true, true, true],
5009        );
5010
5011        // s1 = 'baz' (unknown value)
5012        prune_with_expr(
5013            col("s1").eq(lit("baz")),
5014            &schema,
5015            &statistics,
5016            // can't rule out anything
5017            &[true, true, true, true, true, true, true, true, true],
5018        );
5019
5020        // s1 = 'foo' AND s1 = 'bar'
5021        prune_with_expr(
5022            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
5023            &schema,
5024            &statistics,
5025            // logically this predicate can't possibly be true (the column can't
5026            // take on both values) but we could rule it out if the stats tell
5027            // us that both values are not present
5028            &[true, true, true, true, true, true, true, true, true],
5029        );
5030
5031        // s1 = 'foo' OR s1 = 'bar'
5032        prune_with_expr(
5033            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
5034            &schema,
5035            &statistics,
5036            // can rule out containers that we know contain neither foo nor bar
5037            &[true, true, true, true, true, true, false, false, false],
5038        );
5039
5040        // s1 = 'foo' OR s1 = 'baz'
5041        prune_with_expr(
5042            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
5043            &schema,
5044            &statistics,
5045            // can't rule out anything container
5046            &[true, true, true, true, true, true, true, true, true],
5047        );
5048
5049        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
5050        prune_with_expr(
5051            col("s1")
5052                .eq(lit("foo"))
5053                .or(col("s1").eq(lit("bar")))
5054                .or(col("s1").eq(lit("baz"))),
5055            &schema,
5056            &statistics,
5057            // can rule out any containers based on knowledge of s1 and `foo`,
5058            // `bar` and (`foo`, `bar`)
5059            &[true, true, true, true, true, true, true, true, true],
5060        );
5061
5062        // s1 != foo
5063        prune_with_expr(
5064            col("s1").not_eq(lit("foo")),
5065            &schema,
5066            &statistics,
5067            // rule out containers we know for sure only contain foo
5068            &[false, true, true, false, true, true, false, true, true],
5069        );
5070
5071        // s1 != bar
5072        prune_with_expr(
5073            col("s1").not_eq(lit("bar")),
5074            &schema,
5075            &statistics,
5076            // rule out when we know for sure s1 has the value bar
5077            &[false, false, false, true, true, true, true, true, true],
5078        );
5079
5080        // s1 != foo AND s1 != bar
5081        prune_with_expr(
5082            col("s1")
5083                .not_eq(lit("foo"))
5084                .and(col("s1").not_eq(lit("bar"))),
5085            &schema,
5086            &statistics,
5087            // can rule out any container where we know s1 does not have either 'foo' or 'bar'
5088            &[true, true, true, false, false, false, true, true, true],
5089        );
5090
5091        // s1 != foo AND s1 != bar AND s1 != baz
5092        prune_with_expr(
5093            col("s1")
5094                .not_eq(lit("foo"))
5095                .and(col("s1").not_eq(lit("bar")))
5096                .and(col("s1").not_eq(lit("baz"))),
5097            &schema,
5098            &statistics,
5099            // can't rule out any container based on  knowledge of s1,s2
5100            &[true, true, true, true, true, true, true, true, true],
5101        );
5102
5103        // s1 != foo OR s1 != bar
5104        prune_with_expr(
5105            col("s1")
5106                .not_eq(lit("foo"))
5107                .or(col("s1").not_eq(lit("bar"))),
5108            &schema,
5109            &statistics,
5110            // cant' rule out anything based on contains information
5111            &[true, true, true, true, true, true, true, true, true],
5112        );
5113
5114        // s1 != foo OR s1 != bar OR s1 != baz
5115        prune_with_expr(
5116            col("s1")
5117                .not_eq(lit("foo"))
5118                .or(col("s1").not_eq(lit("bar")))
5119                .or(col("s1").not_eq(lit("baz"))),
5120            &schema,
5121            &statistics,
5122            // cant' rule out anything based on contains information
5123            &[true, true, true, true, true, true, true, true, true],
5124        );
5125    }
5126
5127    #[test]
5128    fn prune_with_contained_two_columns() {
5129        let schema = Arc::new(Schema::new(vec![
5130            Field::new("s1", DataType::Utf8, true),
5131            Field::new("s2", DataType::Utf8, true),
5132        ]));
5133
5134        // Model having information like bloom filters for s1 and s2
5135        let statistics = TestStatistics::new()
5136            .with_contained(
5137                "s1",
5138                [ScalarValue::from("foo")],
5139                [
5140                    // container 0, s1 known to only contain "foo"",
5141                    Some(true),
5142                    // container 1, s1 known to not contain "foo"
5143                    Some(false),
5144                    // container 2, s1 unknown about "foo"
5145                    None,
5146                    // container 3, s1 known to only contain "foo"
5147                    Some(true),
5148                    // container 4, s1 known to not contain "foo"
5149                    Some(false),
5150                    // container 5, s1 unknown about "foo"
5151                    None,
5152                    // container 6, s1 known to only contain "foo"
5153                    Some(true),
5154                    // container 7, s1 known to not contain "foo"
5155                    Some(false),
5156                    // container 8, s1 unknown about "foo"
5157                    None,
5158                ],
5159            )
5160            .with_contained(
5161                "s2", // for column s2
5162                [ScalarValue::from("bar")],
5163                [
5164                    // containers 0,1,2 s2 known to only contain "bar"
5165                    Some(true),
5166                    Some(true),
5167                    Some(true),
5168                    // container 3,4,5 s2 known to not contain "bar"
5169                    Some(false),
5170                    Some(false),
5171                    Some(false),
5172                    // container 6,7,8 s2 unknown about "bar"
5173                    None,
5174                    None,
5175                    None,
5176                ],
5177            );
5178
5179        // s1 = 'foo'
5180        prune_with_expr(
5181            col("s1").eq(lit("foo")),
5182            &schema,
5183            &statistics,
5184            // rule out containers where we know s1 is not present
5185            &[true, false, true, true, false, true, true, false, true],
5186        );
5187
5188        // s1 = 'foo' OR s2 = 'bar'
5189        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
5190        prune_with_expr(
5191            expr,
5192            &schema,
5193            &statistics,
5194            //  can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
5195            &[true, true, true, true, true, true, true, true, true],
5196        );
5197
5198        // s1 = 'foo' AND s2 != 'bar'
5199        prune_with_expr(
5200            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
5201            &schema,
5202            &statistics,
5203            // can only rule out container where we know either:
5204            // 1. s1 doesn't have the value 'foo` or
5205            // 2. s2 has only the value of 'bar'
5206            &[false, false, false, true, false, true, true, false, true],
5207        );
5208
5209        // s1 != 'foo' AND s2 != 'bar'
5210        prune_with_expr(
5211            col("s1")
5212                .not_eq(lit("foo"))
5213                .and(col("s2").not_eq(lit("bar"))),
5214            &schema,
5215            &statistics,
5216            // Can  rule out any container where we know either
5217            // 1. s1 has only the value 'foo'
5218            // 2. s2 has only the value 'bar'
5219            &[false, false, false, false, true, true, false, true, true],
5220        );
5221
5222        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
5223        prune_with_expr(
5224            col("s1")
5225                .not_eq(lit("foo"))
5226                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
5227            &schema,
5228            &statistics,
5229            // Can rule out any container where we know s1 has only the value
5230            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
5231            &[false, true, true, false, true, true, false, true, true],
5232        );
5233
5234        // s1 like '%foo%bar%'
5235        prune_with_expr(
5236            col("s1").like(lit("foo%bar%")),
5237            &schema,
5238            &statistics,
5239            // cant rule out anything with information we know
5240            &[true, true, true, true, true, true, true, true, true],
5241        );
5242
5243        // s1 like '%foo%bar%' AND s2 = 'bar'
5244        prune_with_expr(
5245            col("s1")
5246                .like(lit("foo%bar%"))
5247                .and(col("s2").eq(lit("bar"))),
5248            &schema,
5249            &statistics,
5250            // can rule out any container where we know s2 does not have the value 'bar'
5251            &[true, true, true, false, false, false, true, true, true],
5252        );
5253
5254        // s1 like '%foo%bar%' OR s2 = 'bar'
5255        prune_with_expr(
5256            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5257            &schema,
5258            &statistics,
5259            // can't rule out anything (we would have to prove that both the
5260            // like and the equality must be false)
5261            &[true, true, true, true, true, true, true, true, true],
5262        );
5263    }
5264
5265    #[test]
5266    fn prune_with_range_and_contained() {
5267        // Setup mimics range information for i, a bloom filter for s
5268        let schema = Arc::new(Schema::new(vec![
5269            Field::new("i", DataType::Int32, true),
5270            Field::new("s", DataType::Utf8, true),
5271        ]));
5272
5273        let statistics = TestStatistics::new()
5274            .with(
5275                "i",
5276                ContainerStats::new_i32(
5277                    // Container 0, 3, 6: [-5 to 5]
5278                    // Container 1, 4, 7: [10 to 20]
5279                    // Container 2, 5, 9: unknown
5280                    vec![
5281                        Some(-5),
5282                        Some(10),
5283                        None,
5284                        Some(-5),
5285                        Some(10),
5286                        None,
5287                        Some(-5),
5288                        Some(10),
5289                        None,
5290                    ], // min
5291                    vec![
5292                        Some(5),
5293                        Some(20),
5294                        None,
5295                        Some(5),
5296                        Some(20),
5297                        None,
5298                        Some(5),
5299                        Some(20),
5300                        None,
5301                    ], // max
5302                ),
5303            )
5304            // Add contained  information about the s and "foo"
5305            .with_contained(
5306                "s",
5307                [ScalarValue::from("foo")],
5308                [
5309                    // container 0,1,2 known to only contain "foo"
5310                    Some(true),
5311                    Some(true),
5312                    Some(true),
5313                    // container 3,4,5 known to not contain "foo"
5314                    Some(false),
5315                    Some(false),
5316                    Some(false),
5317                    // container 6,7,8 unknown about "foo"
5318                    None,
5319                    None,
5320                    None,
5321                ],
5322            );
5323
5324        // i = 0 and s = 'foo'
5325        prune_with_expr(
5326            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5327            &schema,
5328            &statistics,
5329            // Can rule out container where we know that either:
5330            // 1. 0 is outside the min/max range of i
5331            // 1. s does not contain foo
5332            // (range is false, and contained  is false)
5333            &[true, false, true, false, false, false, true, false, true],
5334        );
5335
5336        // i = 0 and s != 'foo'
5337        prune_with_expr(
5338            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5339            &schema,
5340            &statistics,
5341            // Can rule out containers where either:
5342            // 1. 0 is outside the min/max range of i
5343            // 2. s only contains foo
5344            &[false, false, false, true, false, true, true, false, true],
5345        );
5346
5347        // i = 0 OR s = 'foo'
5348        prune_with_expr(
5349            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5350            &schema,
5351            &statistics,
5352            // in theory could rule out containers if we had min/max values for
5353            // s as well. But in this case we don't so we can't rule out anything
5354            &[true, true, true, true, true, true, true, true, true],
5355        );
5356    }
5357
5358    /// prunes the specified expr with the specified schema and statistics, and
5359    /// ensures it returns expected.
5360    ///
5361    /// `expected` is a vector of bools, where true means the row group should
5362    /// be kept, and false means it should be pruned.
5363    // TODO refactor other tests to use this to reduce boiler plate
5364    fn prune_with_expr(
5365        expr: Expr,
5366        schema: &SchemaRef,
5367        statistics: &TestStatistics,
5368        expected: &[bool],
5369    ) {
5370        println!("Pruning with expr: {expr}");
5371        let expr = logical2physical(&expr, schema);
5372        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5373        let result = p.prune(statistics).unwrap();
5374        assert_eq!(result, expected);
5375    }
5376
5377    fn prune_with_simplified_expr(
5378        expr: Expr,
5379        schema: &SchemaRef,
5380        statistics: &TestStatistics,
5381        expected: &[bool],
5382    ) {
5383        println!("Pruning with expr: {expr}");
5384        let expr = logical2physical(&expr, schema);
5385        let simplifier = PhysicalExprSimplifier::new(schema);
5386        let expr = simplifier.simplify(expr).unwrap();
5387        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5388        let result = p.prune(statistics).unwrap();
5389        assert_eq!(result, expected);
5390    }
5391
5392    fn test_build_predicate_expression(
5393        expr: &Expr,
5394        schema: &Schema,
5395        required_columns: &mut RequiredColumns,
5396    ) -> Arc<dyn PhysicalExpr> {
5397        let expr = logical2physical(expr, schema);
5398        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5399        build_predicate_expression(
5400            &expr,
5401            &Arc::new(schema.clone()),
5402            required_columns,
5403            &unhandled_hook,
5404        )
5405    }
5406
5407    #[test]
5408    fn test_build_predicate_expression_with_false() {
5409        let expr = lit(ScalarValue::Boolean(Some(false)));
5410        let schema = Schema::empty();
5411        let res =
5412            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5413        let expected = logical2physical(&expr, &schema);
5414        assert_eq!(&res, &expected);
5415    }
5416
5417    #[test]
5418    fn test_build_predicate_expression_with_and_false() {
5419        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5420        let expr = and(
5421            col("c1").eq(lit("a")),
5422            lit(ScalarValue::Boolean(Some(false))),
5423        );
5424        let res =
5425            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5426        let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5427        assert_eq!(&res, &expected);
5428    }
5429
5430    #[test]
5431    fn test_build_predicate_expression_with_or_false() {
5432        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5433        let left_expr = col("c1").eq(lit("a"));
5434        let right_expr = lit(ScalarValue::Boolean(Some(false)));
5435        let res = test_build_predicate_expression(
5436            &or(left_expr.clone(), right_expr.clone()),
5437            &schema,
5438            &mut RequiredColumns::new(),
5439        );
5440        let expected =
5441            "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5442        assert_eq!(res.to_string(), expected);
5443    }
5444}