datafusion_pruning/
pruning_predicate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27    array::{ArrayRef, BooleanArray, new_null_array},
28    datatypes::{DataType, Field, Schema, SchemaRef},
29    record_batch::{RecordBatch, RecordBatchOptions},
30};
31// pub use for backwards compatibility
32pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::{TransformedResult, TreeNodeRecursion};
39use datafusion_common::{Column, DFSchema, assert_eq_or_internal_err};
40use datafusion_common::{
41    ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err,
42    tree_node::{Transformed, TreeNode},
43};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::expressions::CastColumnExpr;
46use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
47use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
48use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
49use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
50
51/// Used to prove that arbitrary predicates (boolean expression) can not
52/// possibly evaluate to `true` given information about a column provided by
53/// [`PruningStatistics`].
54///
55/// # Introduction
56///
57/// `PruningPredicate` analyzes filter expressions using statistics such as
58/// min/max values and null counts, attempting to prove a "container" (e.g.
59/// Parquet Row Group) can be skipped without reading the actual data,
60/// potentially leading to significant performance improvements.
61///
62/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
63/// on the min/max values found in the Parquet metadata. If the
64/// `PruningPredicate` can prove that the filter can never evaluate to `true`
65/// for any row in the Row Group, the entire Row Group is skipped during query
66/// execution.
67///
68/// The `PruningPredicate` API is general, and can be used for pruning other
69/// types of containers (e.g. files) based on statistics that may be known from
70/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
71/// subtle topic.  See the Background and Implementation section for details.
72///
73/// `PruningPredicate` supports:
74///
75/// 1. Arbitrary expressions (including user defined functions)
76///
77/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
78///    so it is suitable for pruning 1000s of containers.
79///
80/// 3. Any source of information that implements the [`PruningStatistics`] trait
81///    (not just Parquet metadata).
82///
83/// # Example
84///
85/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
86/// example of how to use `PruningPredicate` to prune files based on min/max
87/// values.
88///
89/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/query_planning/pruning.rs
90///
91/// Given an expression like `x = 5` and statistics for 3 containers (Row
92/// Groups, files, etc) `A`, `B`, and `C`:
93///
94/// ```text
95///   A: {x_min = 0, x_max = 4}
96///   B: {x_min = 2, x_max = 10}
97///   C: {x_min = 5, x_max = 8}
98/// ```
99///
100/// `PruningPredicate` will conclude that the rows in container `A` can never
101/// be true (as the maximum value is only `4`), so it can be pruned:
102///
103/// ```text
104/// A: false (no rows could possibly match x = 5)
105/// B: true  (rows might match x = 5)
106/// C: true  (rows might match x = 5)
107/// ```
108///
109/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
110///
111/// # Background
112///
113/// ## Boolean Tri-state logic
114///
115/// To understand the details of the rest of this documentation, it is important
116/// to understand how the tri-state boolean logic in SQL works. As this is
117/// somewhat esoteric, we review it here.
118///
119/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
120/// uncertainty propagates through expressions. SQL `NULL` behaves very
121/// differently than the `NULL` in most other languages where it is a special,
122/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
123/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
124/// first encountered as they behave differently than most programmers may
125/// expect.
126///
127/// In most other programming languages,
128/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
129/// * `a == NULL` evaluates to `false` if `a` has any other value
130///
131/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
132/// `false`):
133///
134/// Expression    | Result
135/// ------------- | ---------
136/// `1 = NULL`    | `NULL`
137/// `NULL = NULL` | `NULL`
138///
139/// Also important is how `AND` and `OR` works with tri-state boolean logic as
140/// (perhaps counterintuitively) the result is **not** always NULL. While
141/// consistent with the notion of `NULL` representing “unknown”, this is again,
142/// often deeply confusing 🤯 when first encountered.
143///
144/// Expression       | Result    | Intuition
145/// ---------------  | --------- | -----------
146/// `NULL AND true`  |   `NULL`  | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
147/// `NULL AND false` |  `false`  | If the `NULL` was either `true` or `false` the overall expression is still `false`
148/// `NULL AND NULL`  | `NULL`    |
149///
150/// Expression      | Result    | Intuition
151/// --------------- | --------- | ----------
152/// `NULL OR true`  | `true`    |  If the `NULL` was either `true` or `false` the overall expression is still `true`
153/// `NULL OR false` | `NULL`    |  The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
154/// `NULL OR NULL`  |  `NULL`   |
155///
156/// ## SQL Filter Semantics
157///
158/// The SQL `WHERE` clause has a boolean expression, often called a filter or
159/// predicate. The semantics of this predicate are that the query evaluates the
160/// predicate for each row in the input tables and:
161///
162/// * Rows that evaluate to `true` are returned in the query results
163///
164/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
165///
166/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
167///   Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
168///   in the rewritten predicate described below.*
169///
170/// # `PruningPredicate` Implementation
171///
172/// Armed with the information in the Background section, we can now understand
173/// how the `PruningPredicate` logic works.
174///
175/// ## Interface
176///
177/// **Inputs**
178/// 1. An input schema describing what columns exist
179///
180/// 2. A predicate (expression that evaluates to a boolean)
181///
182/// 3. [`PruningStatistics`] that provides information about columns in that
183///    schema, for multiple “containers”. For each column in each container, it
184///    provides optional information on contained values, min_values, max_values,
185///    null_counts counts, and row_counts counts.
186///
187/// **Outputs**:
188/// A (non null) boolean value for each container:
189/// * `true`: There MAY be rows that match the predicate
190///
191/// * `false`: There are no rows that could possibly match the predicate (the
192///   predicate can never possibly be true). The container can be pruned (skipped)
193///   entirely.
194///
195/// While `PruningPredicate` will never return a `NULL` value, the
196/// rewritten predicate (as returned by `build_predicate_expression` and used internally
197/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
198/// or null / row counts are not known.
199///
200/// In order to be correct, `PruningPredicate` must return false
201/// **only** if it can determine that for all rows in the container, the
202/// predicate could never evaluate to `true` (always evaluates to either `NULL`
203/// or `false`).
204///
205/// ## Contains Analysis and Min/Max Rewrite
206///
207/// `PruningPredicate` works by first analyzing the predicate to see what
208/// [`LiteralGuarantee`] must hold for the predicate to be true.
209///
210/// Then, the `PruningPredicate` rewrites the original predicate into an
211/// expression that references the min/max values of each column in the original
212/// predicate.
213///
214/// When the min/max values are actually substituted in to this expression and
215/// evaluated, the result means
216///
217/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
218///
219/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
220///   Note that rewritten predicate can evaluate to NULL when some of
221///   the min/max values are not known. *Note that this is different than
222///   the SQL filter semantics where `NULL` means the row is filtered
223///   out.*
224///
225/// * `false`: there are no rows that could possibly match the predicate,
226///   **PRUNES** the container
227///
228/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
229/// `x_row_count` represent the minimum and maximum values, the null count of
230/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
231/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
232/// is known to be all `NULL`s. Note this is different from knowing nothing about
233/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
234/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
235///
236/// Here are some examples of the rewritten predicates:
237///
238/// Original Predicate | Rewritten Predicate
239/// ------------------ | --------------------
240/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
241/// `x < 5` | `x_null_count != x_row_count AND (x_min < 5)`
242/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
243/// `x IS NULL`  | `x_null_count > 0`
244/// `x IS NOT NULL`  | `x_null_count != row_count`
245/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
246///
247/// ## Predicate Evaluation
248/// The PruningPredicate works in two passes
249///
250/// **First pass**:  For each `LiteralGuarantee` calls
251/// [`PruningStatistics::contained`] and rules out containers where the
252/// LiteralGuarantees are not satisfied
253///
254/// **Second Pass**: Evaluates the rewritten expression using the
255/// min/max/null_counts/row_counts values for each column for each container. For any
256/// container that this expression evaluates to `false`, it rules out those
257/// containers.
258///
259///
260/// ### Example 1
261///
262/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
263///
264/// ```sql
265/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
266/// AND
267/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
268/// ```
269///
270/// If we know that for a given container, `x` is between `1 and 100` and we know that
271/// `y` is between `4` and `7`, we know nothing about the null count and row count of
272/// `x` and `y`, the input statistics might look like:
273///
274/// Column   | Value
275/// -------- | -----
276/// `x_min`  | `1`
277/// `x_max`  | `100`
278/// `x_null_count` | `null`
279/// `x_row_count`  | `null`
280/// `y_min`  | `4`
281/// `y_max`  | `7`
282/// `y_null_count` | `null`
283/// `y_row_count`  | `null`
284///
285/// When these statistics values are substituted in to the rewritten predicate and
286/// simplified, the result is `false`:
287///
288/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
289/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
290/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
291/// * evaluating the clauses further we get:
292/// * `null and true and null and false`
293/// * `null and false`
294/// * `false`
295///
296/// Returning `false` means the container can be pruned, which matches the
297/// intuition that  `x = 5 AND y = 10` can’t be true for any row if all values of `y`
298/// are `7` or less.
299///
300/// Note that if we had ended up with `null AND true AND null AND true` the result
301/// would have been `null`.
302/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
303///
304/// If, for some other container, we knew `y` was between the values `4` and
305/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
306/// left as an exercise to the reader -- are you still here?), and the container
307/// **could not** be pruned. The intuition is that there may be rows where the
308/// predicate *might* evaluate to `true`, and the only way to find out is to do
309/// more analysis, for example by actually reading the data and evaluating the
310/// predicate row by row.
311///
312/// ### Example 2
313///
314/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
315/// look like the same as example 1:
316///
317/// ```sql
318/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
319/// AND
320/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
321/// ```
322///
323/// If we know that for another given container, `x_min` is NULL and `x_max` is
324/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
325///  is `100`; we know that `y` is between `4` and `7`, but we know nothing about
326/// the null count and row count of `y`. The input statistics might look like:
327///
328/// Column   | Value
329/// -------- | -----
330/// `x_min`  | `null`
331/// `x_max`  | `null`
332/// `x_null_count` | `100`
333/// `x_row_count`  | `100`
334/// `y_min`  | `4`
335/// `y_max`  | `7`
336/// `y_null_count` | `null`
337/// `y_row_count`  | `null`
338///
339/// When these statistics values are substituted in to the rewritten predicate and
340/// simplified, the result is `false`:
341///
342/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
343/// * `false AND null AND null AND false`
344/// * `false AND false`
345/// * `false`
346///
347/// Returning `false` means the container can be pruned, which matches the
348/// intuition that  `x = 5 AND y = 10` can’t be true because all values in `x`
349/// are known to be NULL.
350///
351/// # Related Work
352///
353/// [`PruningPredicate`] implements the type of min/max pruning described in
354/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
355/// described by various research such as [small materialized aggregates], [zone
356/// maps], and [data skipping].
357///
358/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
359/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
360/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
361/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
362#[derive(Debug, Clone)]
363pub struct PruningPredicate {
364    /// The input schema against which the predicate will be evaluated
365    schema: SchemaRef,
366    /// A min/max pruning predicate (rewritten in terms of column min/max
367    /// values, which are supplied by statistics)
368    predicate_expr: Arc<dyn PhysicalExpr>,
369    /// Description of which statistics are required to evaluate `predicate_expr`
370    required_columns: RequiredColumns,
371    /// Original physical predicate from which this predicate expr is derived
372    /// (required for serialization)
373    orig_expr: Arc<dyn PhysicalExpr>,
374    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
375    /// evaluate to `true`.
376    ///
377    /// See [`PruningPredicate::literal_guarantees`] for more details.
378    literal_guarantees: Vec<LiteralGuarantee>,
379}
380
381/// Build a pruning predicate from an optional predicate expression.
382/// If the predicate is None or the predicate cannot be converted to a pruning
383/// predicate, return None.
384/// If there is an error creating the pruning predicate it is recorded by incrementing
385/// the `predicate_creation_errors` counter.
386pub fn build_pruning_predicate(
387    predicate: Arc<dyn PhysicalExpr>,
388    file_schema: &SchemaRef,
389    predicate_creation_errors: &Count,
390) -> Option<Arc<PruningPredicate>> {
391    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
392        Ok(pruning_predicate) => {
393            if !pruning_predicate.always_true() {
394                return Some(Arc::new(pruning_predicate));
395            }
396        }
397        Err(e) => {
398            debug!("Could not create pruning predicate for: {e}");
399            predicate_creation_errors.add(1);
400        }
401    }
402    None
403}
404
405/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
406/// complex expressions or predicates that reference columns that are not in the
407/// schema.
408pub trait UnhandledPredicateHook {
409    /// Called when a predicate can not be rewritten in terms of statistics or
410    /// references a column that is not in the schema.
411    fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
412}
413
414/// The default handling for unhandled predicates is to return a constant `true`
415/// (meaning don't prune the container)
416#[derive(Debug, Clone)]
417struct ConstantUnhandledPredicateHook {
418    default: Arc<dyn PhysicalExpr>,
419}
420
421impl Default for ConstantUnhandledPredicateHook {
422    fn default() -> Self {
423        Self {
424            default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
425        }
426    }
427}
428
429impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
430    fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
431        Arc::clone(&self.default)
432    }
433}
434
435impl PruningPredicate {
436    /// Try to create a new instance of [`PruningPredicate`]
437    ///
438    /// This will translate the provided `expr` filter expression into
439    /// a *pruning predicate*.
440    ///
441    /// A pruning predicate is one that has been rewritten in terms of
442    /// the min and max values of column references and that evaluates
443    /// to FALSE if the filter predicate would evaluate FALSE *for
444    /// every row* whose values fell within the min / max ranges (aka
445    /// could be pruned).
446    ///
447    /// The pruning predicate evaluates to TRUE or NULL
448    /// if the filter predicate *might* evaluate to TRUE for at least
449    /// one row whose values fell within the min/max ranges (in other
450    /// words they might pass the predicate)
451    ///
452    /// For example, the filter expression `(column / 2) = 4` becomes
453    /// the pruning predicate
454    /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
455    ///
456    /// See the struct level documentation on [`PruningPredicate`] for more
457    /// details.
458    ///
459    /// Note that `PruningPredicate` does not attempt to normalize or simplify
460    /// the input expression unless calling [`snapshot_physical_expr_opt`]
461    /// returns a new expression.
462    /// It is recommended that you pass the expressions through [`PhysicalExprSimplifier`]
463    /// before calling this method to make sure the expressions can be used for pruning.
464    pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
465        // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`.
466        // In particular this unravels any `DynamicFilterPhysicalExpr`s by snapshotting them
467        // so that PruningPredicate can work with a static expression.
468        let tf = snapshot_physical_expr_opt(expr)?;
469        if tf.transformed {
470            // If we had an expression such as Dynamic(part_col < 5 and col < 10)
471            // (this could come from something like `select * from t order by part_col, col, limit 10`)
472            // after snapshotting and because `DynamicFilterPhysicalExpr` applies child replacements to its
473            // children after snapshotting and previously `replace_columns_with_literals` may have been called with partition values
474            // the expression we have now is `8 < 5 and col < 10`.
475            // Thus we need as simplifier pass to get `false and col < 10` => `false` here.
476            let simplifier = PhysicalExprSimplifier::new(&schema);
477            expr = simplifier.simplify(tf.data)?;
478        } else {
479            expr = tf.data;
480        }
481        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
482
483        // build predicate expression once
484        let mut required_columns = RequiredColumns::new();
485        let predicate_expr = build_predicate_expression(
486            &expr,
487            &schema,
488            &mut required_columns,
489            &unhandled_hook,
490        );
491        let predicate_schema = required_columns.schema();
492        // Simplify the newly created predicate to get rid of redundant casts, comparisons, etc.
493        let predicate_expr =
494            PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
495
496        let literal_guarantees = LiteralGuarantee::analyze(&expr);
497
498        Ok(Self {
499            schema,
500            predicate_expr,
501            required_columns,
502            orig_expr: expr,
503            literal_guarantees,
504        })
505    }
506
507    /// For each set of statistics, evaluates the pruning predicate
508    /// and returns a `bool` with the following meaning for a
509    /// all rows whose values match the statistics:
510    ///
511    /// `true`: There MAY be rows that match the predicate
512    ///
513    /// `false`: There are no rows that could possibly match the predicate
514    ///
515    /// Note: the predicate passed to `prune` should already be simplified as
516    /// much as possible (e.g. this pass doesn't handle some
517    /// expressions like `b = false`, but it does handle the
518    /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
519    ///
520    /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
521    pub fn prune<S: PruningStatistics + ?Sized>(
522        &self,
523        statistics: &S,
524    ) -> Result<Vec<bool>> {
525        let mut builder = BoolVecBuilder::new(statistics.num_containers());
526
527        // Try to prove the predicate can't be true for the containers based on
528        // literal guarantees
529        for literal_guarantee in &self.literal_guarantees {
530            let LiteralGuarantee {
531                column,
532                guarantee,
533                literals,
534            } = literal_guarantee;
535            if let Some(results) = statistics.contained(column, literals) {
536                match guarantee {
537                    // `In` means the values in the column must be one of the
538                    // values in the set for the predicate to evaluate to true.
539                    // If `contained` returns false, that means the column is
540                    // not any of the values so we can prune the container
541                    Guarantee::In => builder.combine_array(&results),
542                    // `NotIn` means the values in the column must not be
543                    // any of the values in the set for the predicate to
544                    // evaluate to true. If `contained` returns true, it means the
545                    // column is only in the set of values so we can prune the
546                    // container
547                    Guarantee::NotIn => {
548                        builder.combine_array(&arrow::compute::not(&results)?)
549                    }
550                }
551                // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
552                // can return early without evaluating the rest of predicates.
553                if builder.check_all_pruned() {
554                    return Ok(builder.build());
555                }
556            }
557        }
558
559        // Next, try to prove the predicate can't be true for the containers based
560        // on min/max values
561
562        // build a RecordBatch that contains the min/max values in the
563        // appropriate statistics columns for the min/max predicate
564        let statistics_batch =
565            build_statistics_record_batch(statistics, &self.required_columns)?;
566
567        // Evaluate the pruning predicate on that record batch and append any results to the builder
568        builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
569
570        Ok(builder.build())
571    }
572
573    /// Return a reference to the input schema
574    pub fn schema(&self) -> &SchemaRef {
575        &self.schema
576    }
577
578    /// Returns a reference to the physical expr used to construct this pruning predicate
579    pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
580        &self.orig_expr
581    }
582
583    /// Returns a reference to the predicate expr
584    pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
585        &self.predicate_expr
586    }
587
588    /// Returns a reference to the literal guarantees
589    ///
590    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
591    /// expression to possibly be `true`. If any is not satisfied, the
592    /// expression is guaranteed to be `null` or `false`.
593    pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
594        &self.literal_guarantees
595    }
596
597    /// Returns true if this pruning predicate can not prune anything.
598    ///
599    /// This happens if the predicate is a literal `true`  and
600    /// literal_guarantees is empty.
601    ///
602    /// This can happen when a predicate is simplified to a constant `true`
603    pub fn always_true(&self) -> bool {
604        is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
605    }
606
607    pub fn required_columns(&self) -> &RequiredColumns {
608        &self.required_columns
609    }
610
611    /// Names of the columns that are known to be / not be in a set
612    /// of literals (constants). These are the columns the that may be passed to
613    /// [`PruningStatistics::contained`] during pruning.
614    ///
615    /// This is useful to avoid fetching statistics for columns that will not be
616    /// used in the predicate. For example, it can be used to avoid reading
617    /// unneeded bloom filters (a non trivial operation).
618    pub fn literal_columns(&self) -> Vec<String> {
619        let mut seen = HashSet::new();
620        self.literal_guarantees
621            .iter()
622            .map(|e| &e.column.name)
623            // avoid duplicates
624            .filter(|name| seen.insert(*name))
625            .map(|s| s.to_string())
626            .collect()
627    }
628}
629
630/// Builds the return `Vec` for [`PruningPredicate::prune`].
631#[derive(Debug)]
632struct BoolVecBuilder {
633    /// One element per container. Each element is
634    /// * `true`: if the container has row that may pass the predicate
635    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
636    inner: Vec<bool>,
637}
638
639impl BoolVecBuilder {
640    /// Create a new `BoolVecBuilder` with `num_containers` elements
641    fn new(num_containers: usize) -> Self {
642        Self {
643            // assume by default all containers may pass the predicate
644            inner: vec![true; num_containers],
645        }
646    }
647
648    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
649    /// predicate into the currently in progress array.
650    ///
651    /// Each `array` element is:
652    /// * `true`: container has row that may pass the predicate
653    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
654    /// * `null`: container may or may not have rows that pass the predicate
655    fn combine_array(&mut self, array: &BooleanArray) {
656        assert_eq!(array.len(), self.inner.len());
657        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
658            // `false` for this conjunct means we know for sure no rows could
659            // pass the predicate and thus we set the corresponding container
660            // location to false.
661            if let Some(false) = new {
662                *cur = false;
663            }
664        }
665    }
666
667    /// Combines the results in the [`ColumnarValue`] to the currently in
668    /// progress array, following the same rules as [`Self::combine_array`].
669    ///
670    /// # Panics
671    /// If `value` is not boolean
672    fn combine_value(&mut self, value: ColumnarValue) {
673        match value {
674            ColumnarValue::Array(array) => {
675                self.combine_array(array.as_boolean());
676            }
677            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
678                // False means all containers can not pass the predicate
679                self.inner = vec![false; self.inner.len()];
680            }
681            _ => {
682                // Null or true means the rows in container may pass this
683                // conjunct so we can't prune any containers based on that
684            }
685        }
686    }
687
688    /// Convert this builder into a Vec of bools
689    fn build(self) -> Vec<bool> {
690        self.inner
691    }
692
693    /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
694    fn check_all_pruned(&self) -> bool {
695        self.inner.iter().all(|&x| !x)
696    }
697}
698
699fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
700    expr.as_any()
701        .downcast_ref::<phys_expr::Literal>()
702        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
703        .unwrap_or_default()
704}
705
706fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
707    expr.as_any()
708        .downcast_ref::<phys_expr::Literal>()
709        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
710        .unwrap_or_default()
711}
712
713/// Describes which columns statistics are necessary to evaluate a
714/// [`PruningPredicate`].
715///
716/// This structure permits reading and creating the minimum number statistics,
717/// which is important since statistics may be non trivial to read (e.g. large
718/// strings or when there are 1000s of columns).
719///
720/// Handles creating references to the min/max statistics
721/// for columns as well as recording which statistics are needed
722#[derive(Debug, Default, Clone)]
723pub struct RequiredColumns {
724    /// The statistics required to evaluate this predicate:
725    /// * The unqualified column in the input schema
726    /// * Statistics type (e.g. Min or Max or Null_Count)
727    /// * The field the statistics value should be placed in for
728    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
729    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
730}
731
732impl RequiredColumns {
733    fn new() -> Self {
734        Self::default()
735    }
736
737    /// Returns Some(column) if this is a single column predicate.
738    ///
739    /// Returns None if this is a multi-column predicate.
740    ///
741    /// Examples:
742    /// * `a > 5 OR a < 10` returns `Some(a)`
743    /// * `a > 5 OR b < 10` returns `None`
744    /// * `true` returns None
745    pub fn single_column(&self) -> Option<&phys_expr::Column> {
746        if self.columns.windows(2).all(|w| {
747            // check if all columns are the same (ignoring statistics and field)
748            let c1 = &w[0].0;
749            let c2 = &w[1].0;
750            c1 == c2
751        }) {
752            self.columns.first().map(|r| &r.0)
753        } else {
754            None
755        }
756    }
757
758    /// Returns a schema that describes the columns required to evaluate this
759    /// pruning predicate.
760    /// The schema contains the fields for each column in `self.columns` with
761    /// the appropriate data type for the statistics.
762    /// Order matters, this same order is used to evaluate the
763    /// pruning predicate.
764    fn schema(&self) -> Schema {
765        let fields = self
766            .columns
767            .iter()
768            .map(|(_c, _t, f)| f.clone())
769            .collect::<Vec<_>>();
770        Schema::new(fields)
771    }
772
773    /// Returns an iterator over items in columns (see doc on
774    /// `self.columns` for details)
775    pub(crate) fn iter(
776        &self,
777    ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
778        self.columns.iter()
779    }
780
781    fn find_stat_column(
782        &self,
783        column: &phys_expr::Column,
784        statistics_type: StatisticsType,
785    ) -> Option<usize> {
786        match statistics_type {
787            StatisticsType::RowCount => {
788                // Use the first row count we find, if any
789                self.columns
790                    .iter()
791                    .enumerate()
792                    .find(|(_i, (_c, t, _f))| t == &statistics_type)
793                    .map(|(i, (_c, _t, _f))| i)
794            }
795            _ => self
796                .columns
797                .iter()
798                .enumerate()
799                .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
800                .map(|(i, (_c, _t, _f))| i),
801        }
802    }
803
804    /// Rewrites column_expr so that all appearances of column
805    /// are replaced with a reference to either the min or max
806    /// statistics column, while keeping track that a reference to the statistics
807    /// column is required
808    ///
809    /// for example, an expression like `col("foo") > 5`, when called
810    /// with Max would result in an expression like `col("foo_max") >
811    /// 5` with the appropriate entry noted in self.columns
812    fn stat_column_expr(
813        &mut self,
814        column: &phys_expr::Column,
815        column_expr: &Arc<dyn PhysicalExpr>,
816        field: &Field,
817        stat_type: StatisticsType,
818    ) -> Result<Arc<dyn PhysicalExpr>> {
819        let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
820            Some(idx) => (idx, false),
821            None => (self.columns.len(), true),
822        };
823
824        let column_name = column.name();
825        let stat_column_name = match stat_type {
826            StatisticsType::Min => format!("{column_name}_min"),
827            StatisticsType::Max => format!("{column_name}_max"),
828            StatisticsType::NullCount => format!("{column_name}_null_count"),
829            StatisticsType::RowCount => "row_count".to_string(),
830        };
831
832        let stat_column = phys_expr::Column::new(&stat_column_name, idx);
833
834        // only add statistics column if not previously added
835        if need_to_insert {
836            // may be null if statistics are not present
837            let nullable = true;
838            let stat_field =
839                Field::new(stat_column.name(), field.data_type().clone(), nullable);
840            self.columns.push((column.clone(), stat_type, stat_field));
841        }
842        rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
843    }
844
845    /// rewrite col --> col_min
846    fn min_column_expr(
847        &mut self,
848        column: &phys_expr::Column,
849        column_expr: &Arc<dyn PhysicalExpr>,
850        field: &Field,
851    ) -> Result<Arc<dyn PhysicalExpr>> {
852        self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
853    }
854
855    /// rewrite col --> col_max
856    fn max_column_expr(
857        &mut self,
858        column: &phys_expr::Column,
859        column_expr: &Arc<dyn PhysicalExpr>,
860        field: &Field,
861    ) -> Result<Arc<dyn PhysicalExpr>> {
862        self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
863    }
864
865    /// rewrite col --> col_null_count
866    fn null_count_column_expr(
867        &mut self,
868        column: &phys_expr::Column,
869        column_expr: &Arc<dyn PhysicalExpr>,
870        field: &Field,
871    ) -> Result<Arc<dyn PhysicalExpr>> {
872        self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
873    }
874
875    /// rewrite col --> col_row_count
876    fn row_count_column_expr(
877        &mut self,
878        column: &phys_expr::Column,
879        column_expr: &Arc<dyn PhysicalExpr>,
880        field: &Field,
881    ) -> Result<Arc<dyn PhysicalExpr>> {
882        self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
883    }
884}
885
886impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
887    fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
888        Self { columns }
889    }
890}
891
892/// Build a RecordBatch from a list of statistics, creating arrays,
893/// with one row for each PruningStatistics and columns specified in
894/// the required_columns parameter.
895///
896/// For example, if the requested columns are
897/// ```text
898/// ("s1", Min, Field:s1_min)
899/// ("s2", Max, field:s2_max)
900/// ```
901///
902/// And the input statistics had
903/// ```text
904/// S1(Min: 5, Max: 10)
905/// S2(Min: 99, Max: 1000)
906/// S3(Min: 1, Max: 2)
907/// ```
908///
909/// Then this function would build a record batch with 2 columns and
910/// one row s1_min and s2_max as follows (s3 is not requested):
911///
912/// ```text
913/// s1_min | s2_max
914/// -------+--------
915///   5    | 1000
916/// ```
917fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
918    statistics: &S,
919    required_columns: &RequiredColumns,
920) -> Result<RecordBatch> {
921    let mut arrays = Vec::<ArrayRef>::new();
922    // For each needed statistics column:
923    for (column, statistics_type, stat_field) in required_columns.iter() {
924        let column = Column::from_name(column.name());
925        let data_type = stat_field.data_type();
926
927        let num_containers = statistics.num_containers();
928
929        let array = match statistics_type {
930            StatisticsType::Min => statistics.min_values(&column),
931            StatisticsType::Max => statistics.max_values(&column),
932            StatisticsType::NullCount => statistics.null_counts(&column),
933            StatisticsType::RowCount => statistics.row_counts(&column),
934        };
935        let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
936
937        assert_eq_or_internal_err!(
938            num_containers,
939            array.len(),
940            "mismatched statistics length. Expected {}, got {}",
941            num_containers,
942            array.len()
943        );
944
945        // cast statistics array to required data type (e.g. parquet
946        // provides timestamp statistics as "Int64")
947        let array = arrow::compute::cast(&array, data_type)?;
948
949        arrays.push(array);
950    }
951
952    let schema = Arc::new(required_columns.schema());
953    // provide the count in case there were no needed statistics
954    let mut options = RecordBatchOptions::default();
955    options.row_count = Some(statistics.num_containers());
956
957    trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
958
959    RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
960        plan_datafusion_err!("Can not create statistics record batch: {err}")
961    })
962}
963
964struct PruningExpressionBuilder<'a> {
965    column: phys_expr::Column,
966    column_expr: Arc<dyn PhysicalExpr>,
967    op: Operator,
968    scalar_expr: Arc<dyn PhysicalExpr>,
969    field: &'a Field,
970    required_columns: &'a mut RequiredColumns,
971}
972
973impl<'a> PruningExpressionBuilder<'a> {
974    fn try_new(
975        left: &'a Arc<dyn PhysicalExpr>,
976        right: &'a Arc<dyn PhysicalExpr>,
977        left_columns: ColumnReferenceCount,
978        right_columns: ColumnReferenceCount,
979        op: Operator,
980        schema: &'a SchemaRef,
981        required_columns: &'a mut RequiredColumns,
982    ) -> Result<Self> {
983        // find column name; input could be a more complicated expression
984        let (column_expr, scalar_expr, column, correct_operator) = match (
985            left_columns,
986            right_columns,
987        ) {
988            (ColumnReferenceCount::One(column), ColumnReferenceCount::Zero) => {
989                (left, right, column, op)
990            }
991            (ColumnReferenceCount::Zero, ColumnReferenceCount::One(column)) => {
992                (right, left, column, reverse_operator(op)?)
993            }
994            (ColumnReferenceCount::One(_), ColumnReferenceCount::One(_)) => {
995                // both sides have one column - not supported
996                return plan_err!(
997                    "Expression not supported for pruning: left has 1 column, right has 1 column"
998                );
999            }
1000            (ColumnReferenceCount::Zero, ColumnReferenceCount::Zero) => {
1001                // both sides are literals - should be handled before calling try_new
1002                return plan_err!(
1003                    "Pruning literal expressions is not supported, please call PhysicalExprSimplifier first"
1004                );
1005            }
1006            (ColumnReferenceCount::Many, _) | (_, ColumnReferenceCount::Many) => {
1007                return plan_err!(
1008                    "Expression not supported for pruning: left or right has multiple columns"
1009                );
1010            }
1011        };
1012
1013        let df_schema = DFSchema::try_from(Arc::clone(schema))?;
1014        let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1015            column_expr,
1016            correct_operator,
1017            scalar_expr,
1018            df_schema,
1019        )?;
1020        let field = match schema.column_with_name(column.name()) {
1021            Some((_, f)) => f,
1022            _ => {
1023                return plan_err!("Field not found in schema");
1024            }
1025        };
1026
1027        Ok(Self {
1028            column,
1029            column_expr,
1030            op: correct_operator,
1031            scalar_expr,
1032            field,
1033            required_columns,
1034        })
1035    }
1036
1037    fn op(&self) -> Operator {
1038        self.op
1039    }
1040
1041    fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1042        &self.scalar_expr
1043    }
1044
1045    fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1046        self.required_columns
1047            .min_column_expr(&self.column, &self.column_expr, self.field)
1048    }
1049
1050    fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1051        self.required_columns
1052            .max_column_expr(&self.column, &self.column_expr, self.field)
1053    }
1054
1055    /// This function is to simply retune the `null_count` physical expression no matter what the
1056    /// predicate expression is
1057    ///
1058    /// i.e., x > 5 => x_null_count,
1059    ///       cast(x as int) < 10 => x_null_count,
1060    ///       try_cast(x as float) < 10.0 => x_null_count
1061    fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1062        // Retune to [`phys_expr::Column`]
1063        let column_expr = Arc::new(self.column.clone()) as _;
1064
1065        // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1066        let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1067
1068        self.required_columns.null_count_column_expr(
1069            &self.column,
1070            &column_expr,
1071            null_count_field,
1072        )
1073    }
1074
1075    /// This function is to simply retune the `row_count` physical expression no matter what the
1076    /// predicate expression is
1077    ///
1078    /// i.e., x > 5 => x_row_count,
1079    ///       cast(x as int) < 10 => x_row_count,
1080    ///       try_cast(x as float) < 10.0 => x_row_count
1081    fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1082        // Retune to [`phys_expr::Column`]
1083        let column_expr = Arc::new(self.column.clone()) as _;
1084
1085        // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1086        let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1087
1088        self.required_columns.row_count_column_expr(
1089            &self.column,
1090            &column_expr,
1091            row_count_field,
1092        )
1093    }
1094}
1095
1096/// This function is designed to rewrite the column_expr to
1097/// ensure the column_expr is monotonically increasing.
1098///
1099/// For example,
1100/// 1. `col > 10`
1101/// 2. `-col > 10` should be rewritten to `col < -10`
1102/// 3. `!col = true` would be rewritten to `col = !true`
1103/// 4. `abs(a - 10) > 0` not supported
1104/// 5. `cast(can_prunable_expr) > 10`
1105/// 6. `try_cast(can_prunable_expr) > 10`
1106///
1107/// More rewrite rules are still in progress.
1108fn rewrite_expr_to_prunable(
1109    column_expr: &PhysicalExprRef,
1110    op: Operator,
1111    scalar_expr: &PhysicalExprRef,
1112    schema: DFSchema,
1113) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1114    if !is_compare_op(op) {
1115        return plan_err!("rewrite_expr_to_prunable only support compare expression");
1116    }
1117
1118    let column_expr_any = column_expr.as_any();
1119
1120    if column_expr_any
1121        .downcast_ref::<phys_expr::Column>()
1122        .is_some()
1123    {
1124        // `col op lit()`
1125        Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1126    } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1127        // `cast(col) op lit()`
1128        let arrow_schema = schema.as_arrow();
1129        let from_type = cast.expr().data_type(arrow_schema)?;
1130        verify_support_type_for_prune(&from_type, cast.cast_type())?;
1131        let (left, op, right) =
1132            rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1133        let left = Arc::new(phys_expr::CastExpr::new(
1134            left,
1135            cast.cast_type().clone(),
1136            None,
1137        ));
1138        Ok((left, op, right))
1139    } else if let Some(cast_col) = column_expr_any.downcast_ref::<CastColumnExpr>() {
1140        // `cast_column(col) op lit()` - same as CastExpr but uses CastColumnExpr
1141        let arrow_schema = schema.as_arrow();
1142        let from_type = cast_col.expr().data_type(arrow_schema)?;
1143        let to_type = cast_col.target_field().data_type();
1144        verify_support_type_for_prune(&from_type, to_type)?;
1145        let (left, op, right) =
1146            rewrite_expr_to_prunable(cast_col.expr(), op, scalar_expr, schema)?;
1147        // Predicate pruning / statistics generally don't support struct columns yet.
1148        // In the future we may want to support pruning on nested fields, in which case we probably need to
1149        // do something more sophisticated here.
1150        // But for now since we don't support pruning on nested fields, we can just cast to the target type directly.
1151        let left = Arc::new(phys_expr::CastExpr::new(left, to_type.clone(), None));
1152        Ok((left, op, right))
1153    } else if let Some(try_cast) =
1154        column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1155    {
1156        // `try_cast(col) op lit()`
1157        let arrow_schema = schema.as_arrow();
1158        let from_type = try_cast.expr().data_type(arrow_schema)?;
1159        verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1160        let (left, op, right) =
1161            rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1162        let left = Arc::new(phys_expr::TryCastExpr::new(
1163            left,
1164            try_cast.cast_type().clone(),
1165        ));
1166        Ok((left, op, right))
1167    } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1168        // `-col > lit()`  --> `col < -lit()`
1169        let (left, op, right) =
1170            rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1171        let right = Arc::new(phys_expr::NegativeExpr::new(right));
1172        Ok((left, reverse_operator(op)?, right))
1173    } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1174        // `!col = true` --> `col = !true`
1175        if op != Operator::Eq && op != Operator::NotEq {
1176            return plan_err!("Not with operator other than Eq / NotEq is not supported");
1177        }
1178        if not
1179            .arg()
1180            .as_any()
1181            .downcast_ref::<phys_expr::Column>()
1182            .is_some()
1183        {
1184            let left = Arc::clone(not.arg());
1185            let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1186            Ok((left, reverse_operator(op)?, right))
1187        } else {
1188            plan_err!("Not with complex expression {column_expr:?} is not supported")
1189        }
1190    } else {
1191        plan_err!("column expression {column_expr:?} is not supported")
1192    }
1193}
1194
1195fn is_compare_op(op: Operator) -> bool {
1196    matches!(
1197        op,
1198        Operator::Eq
1199            | Operator::NotEq
1200            | Operator::Lt
1201            | Operator::LtEq
1202            | Operator::Gt
1203            | Operator::GtEq
1204            | Operator::LikeMatch
1205            | Operator::NotLikeMatch
1206    )
1207}
1208
1209fn is_string_type(data_type: &DataType) -> bool {
1210    matches!(
1211        data_type,
1212        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1213    )
1214}
1215
1216// The pruning logic is based on the comparing the min/max bounds.
1217// Must make sure the two type has order.
1218// For example, casts from string to numbers is not correct.
1219// Because the "13" is less than "3" with UTF8 comparison order.
1220fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1221    // Dictionary casts are always supported as long as the value types are supported
1222    let from_type = match from_type {
1223        DataType::Dictionary(_, t) => {
1224            return verify_support_type_for_prune(t.as_ref(), to_type);
1225        }
1226        _ => from_type,
1227    };
1228    let to_type = match to_type {
1229        DataType::Dictionary(_, t) => {
1230            return verify_support_type_for_prune(from_type, t.as_ref());
1231        }
1232        _ => to_type,
1233    };
1234    // If both types are strings or both are not strings (number, timestamp, etc)
1235    // then we can compare them.
1236    // PruningPredicate does not support casting of strings to numbers and such.
1237    if is_string_type(from_type) == is_string_type(to_type) {
1238        Ok(())
1239    } else {
1240        plan_err!(
1241            "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1242        )
1243    }
1244}
1245
1246/// replaces a column with an old name with a new name in an expression
1247fn rewrite_column_expr(
1248    e: Arc<dyn PhysicalExpr>,
1249    column_old: &phys_expr::Column,
1250    column_new: &phys_expr::Column,
1251) -> Result<Arc<dyn PhysicalExpr>> {
1252    e.transform(|expr| {
1253        if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>()
1254            && column == column_old
1255        {
1256            return Ok(Transformed::yes(Arc::new(column_new.clone())));
1257        }
1258
1259        Ok(Transformed::no(expr))
1260    })
1261    .data()
1262}
1263
1264fn reverse_operator(op: Operator) -> Result<Operator> {
1265    op.swap().ok_or_else(|| {
1266        internal_datafusion_err!(
1267            "Could not reverse operator {op} while building pruning predicate"
1268        )
1269    })
1270}
1271
1272/// Given a column reference to `column`, returns a pruning
1273/// expression in terms of the min and max that will evaluate to true
1274/// if the column may contain values, and false if definitely does not
1275/// contain values
1276fn build_single_column_expr(
1277    column: &phys_expr::Column,
1278    schema: &Schema,
1279    required_columns: &mut RequiredColumns,
1280    is_not: bool, // if true, treat as !col
1281) -> Option<Arc<dyn PhysicalExpr>> {
1282    let field = schema.field_with_name(column.name()).ok()?;
1283
1284    if matches!(field.data_type(), &DataType::Boolean) {
1285        let col_ref = Arc::new(column.clone()) as _;
1286
1287        let min = required_columns
1288            .min_column_expr(column, &col_ref, field)
1289            .ok()?;
1290        let max = required_columns
1291            .max_column_expr(column, &col_ref, field)
1292            .ok()?;
1293
1294        // remember -- we want an expression that is:
1295        // TRUE: if there may be rows that match
1296        // FALSE: if there are no rows that match
1297        if is_not {
1298            // The only way we know a column couldn't match is if both the min and max are true
1299            // !(min && max)
1300            Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1301                phys_expr::BinaryExpr::new(min, Operator::And, max),
1302            ))))
1303        } else {
1304            // the only way we know a column couldn't match is if both the min and max are false
1305            // !(!min && !max) --> min || max
1306            Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1307        }
1308    } else {
1309        None
1310    }
1311}
1312
1313/// Given an expression reference to `expr`, if `expr` is a column expression,
1314/// returns a pruning expression in terms of IsNull that will evaluate to true
1315/// if the column may contain null, and false if definitely does not
1316/// contain null.
1317/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1318/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1319/// at least one NULL value.  In this case we can know that `IS NOT NULL` can not be true and
1320/// thus can prune the row group / value
1321fn build_is_null_column_expr(
1322    expr: &Arc<dyn PhysicalExpr>,
1323    schema: &Schema,
1324    required_columns: &mut RequiredColumns,
1325    with_not: bool,
1326) -> Option<Arc<dyn PhysicalExpr>> {
1327    if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1328        let field = schema.field_with_name(col.name()).ok()?;
1329
1330        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1331        if with_not {
1332            if let Ok(row_count_expr) =
1333                required_columns.row_count_column_expr(col, expr, null_count_field)
1334            {
1335                required_columns
1336                    .null_count_column_expr(col, expr, null_count_field)
1337                    .map(|null_count_column_expr| {
1338                        // IsNotNull(column) => null_count != row_count
1339                        Arc::new(phys_expr::BinaryExpr::new(
1340                            null_count_column_expr,
1341                            Operator::NotEq,
1342                            row_count_expr,
1343                        )) as _
1344                    })
1345                    .ok()
1346            } else {
1347                None
1348            }
1349        } else {
1350            required_columns
1351                .null_count_column_expr(col, expr, null_count_field)
1352                .map(|null_count_column_expr| {
1353                    // IsNull(column) => null_count > 0
1354                    Arc::new(phys_expr::BinaryExpr::new(
1355                        null_count_column_expr,
1356                        Operator::Gt,
1357                        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1358                    )) as _
1359                })
1360                .ok()
1361        }
1362    } else {
1363        None
1364    }
1365}
1366
1367/// The maximum number of entries in an `InList` that might be rewritten into
1368/// an OR chain
1369const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1370
1371/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1372/// for use as a [`PruningPredicate`].
1373pub struct PredicateRewriter {
1374    unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1375}
1376
1377impl Default for PredicateRewriter {
1378    fn default() -> Self {
1379        Self {
1380            unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1381        }
1382    }
1383}
1384
1385impl PredicateRewriter {
1386    /// Create a new `PredicateRewriter`
1387    pub fn new() -> Self {
1388        Self::default()
1389    }
1390
1391    /// Set the unhandled hook to be used when a predicate can not be rewritten
1392    pub fn with_unhandled_hook(
1393        self,
1394        unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1395    ) -> Self {
1396        Self { unhandled_hook }
1397    }
1398
1399    /// Translate logical filter expression into pruning predicate
1400    /// expression that will evaluate to FALSE if it can be determined no
1401    /// rows between the min/max values could pass the predicates.
1402    ///
1403    /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1404    ///
1405    /// Returns the pruning predicate as an [`PhysicalExpr`]
1406    ///
1407    /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1408    pub fn rewrite_predicate_to_statistics_predicate(
1409        &self,
1410        expr: &Arc<dyn PhysicalExpr>,
1411        schema: &Schema,
1412    ) -> Arc<dyn PhysicalExpr> {
1413        let mut required_columns = RequiredColumns::new();
1414        build_predicate_expression(
1415            expr,
1416            &Arc::new(schema.clone()),
1417            &mut required_columns,
1418            &self.unhandled_hook,
1419        )
1420    }
1421}
1422
1423/// Translate logical filter expression into pruning predicate
1424/// expression that will evaluate to FALSE if it can be determined no
1425/// rows between the min/max values could pass the predicates.
1426///
1427/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1428///
1429/// Returns the pruning predicate as an [`PhysicalExpr`]
1430///
1431/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1432fn build_predicate_expression(
1433    expr: &Arc<dyn PhysicalExpr>,
1434    schema: &SchemaRef,
1435    required_columns: &mut RequiredColumns,
1436    unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1437) -> Arc<dyn PhysicalExpr> {
1438    if is_always_false(expr) {
1439        // Shouldn't return `unhandled_hook.handle(expr)`
1440        // Because it will transfer false to true.
1441        return Arc::clone(expr);
1442    }
1443    // predicate expression can only be a binary expression
1444    let expr_any = expr.as_any();
1445    if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1446        return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1447            .unwrap_or_else(|| unhandled_hook.handle(expr));
1448    }
1449    if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1450        return build_is_null_column_expr(
1451            is_not_null.arg(),
1452            schema,
1453            required_columns,
1454            true,
1455        )
1456        .unwrap_or_else(|| unhandled_hook.handle(expr));
1457    }
1458    if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1459        return build_single_column_expr(col, schema, required_columns, false)
1460            .unwrap_or_else(|| unhandled_hook.handle(expr));
1461    }
1462    if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1463        // match !col (don't do so recursively)
1464        if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1465            return build_single_column_expr(col, schema, required_columns, true)
1466                .unwrap_or_else(|| unhandled_hook.handle(expr));
1467        } else {
1468            return unhandled_hook.handle(expr);
1469        }
1470    }
1471    if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1472        if !in_list.list().is_empty()
1473            && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1474        {
1475            let eq_op = if in_list.negated() {
1476                Operator::NotEq
1477            } else {
1478                Operator::Eq
1479            };
1480            let re_op = if in_list.negated() {
1481                Operator::And
1482            } else {
1483                Operator::Or
1484            };
1485            let change_expr = in_list
1486                .list()
1487                .iter()
1488                .map(|e| {
1489                    Arc::new(phys_expr::BinaryExpr::new(
1490                        Arc::clone(in_list.expr()),
1491                        eq_op,
1492                        Arc::clone(e),
1493                    )) as _
1494                })
1495                .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1496                .unwrap();
1497            return build_predicate_expression(
1498                &change_expr,
1499                schema,
1500                required_columns,
1501                unhandled_hook,
1502            );
1503        } else {
1504            return unhandled_hook.handle(expr);
1505        }
1506    }
1507
1508    let (left, op, right) = {
1509        if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1510            (
1511                Arc::clone(bin_expr.left()),
1512                *bin_expr.op(),
1513                Arc::clone(bin_expr.right()),
1514            )
1515        } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1516            if like_expr.case_insensitive() {
1517                return unhandled_hook.handle(expr);
1518            }
1519            let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1520                (false, false) => Operator::LikeMatch,
1521                (true, false) => Operator::NotLikeMatch,
1522                (false, true) => Operator::ILikeMatch,
1523                (true, true) => Operator::NotILikeMatch,
1524            };
1525            (
1526                Arc::clone(like_expr.expr()),
1527                op,
1528                Arc::clone(like_expr.pattern()),
1529            )
1530        } else {
1531            return unhandled_hook.handle(expr);
1532        }
1533    };
1534
1535    if op == Operator::And || op == Operator::Or {
1536        let left_expr =
1537            build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1538        let right_expr =
1539            build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1540        // simplify boolean expression if applicable
1541        let expr = match (&left_expr, op, &right_expr) {
1542            (left, Operator::And, right)
1543                if is_always_false(left) || is_always_false(right) =>
1544            {
1545                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1546            }
1547            (left, Operator::And, _) if is_always_true(left) => right_expr,
1548            (_, Operator::And, right) if is_always_true(right) => left_expr,
1549            (left, Operator::Or, right)
1550                if is_always_true(left) || is_always_true(right) =>
1551            {
1552                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1553            }
1554            (left, Operator::Or, _) if is_always_false(left) => right_expr,
1555            (_, Operator::Or, right) if is_always_false(right) => left_expr,
1556
1557            _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1558        };
1559        return expr;
1560    }
1561
1562    let left_columns = ColumnReferenceCount::from_expression(&left);
1563    let right_columns = ColumnReferenceCount::from_expression(&right);
1564    let expr_builder = PruningExpressionBuilder::try_new(
1565        &left,
1566        &right,
1567        left_columns,
1568        right_columns,
1569        op,
1570        schema,
1571        required_columns,
1572    );
1573    let mut expr_builder = match expr_builder {
1574        Ok(builder) => builder,
1575        // allow partial failure in predicate expression generation
1576        // this can still produce a useful predicate when multiple conditions are joined using AND
1577        Err(e) => {
1578            debug!("Error building pruning expression: {e}");
1579            return unhandled_hook.handle(expr);
1580        }
1581    };
1582
1583    build_statistics_expr(&mut expr_builder)
1584        .unwrap_or_else(|_| unhandled_hook.handle(expr))
1585}
1586
1587/// Count of distinct column references in an expression.
1588/// This is the same as [`collect_columns`] but optimized to stop counting
1589/// once more than one distinct column is found.
1590///
1591/// For example, in expression `col1 + col2`, the count is `Many`.
1592/// In expression `col1 + 5`, the count is `One`.
1593/// In expression `5 + 10`, the count is `Zero`.
1594///
1595/// [`collect_columns`]: datafusion_physical_expr::utils::collect_columns
1596#[derive(Debug, PartialEq, Eq)]
1597enum ColumnReferenceCount {
1598    /// no column references
1599    Zero,
1600    /// Only one column reference
1601    One(phys_expr::Column),
1602    /// More than one column reference
1603    Many,
1604}
1605
1606impl ColumnReferenceCount {
1607    /// Count the number of distinct column references in an expression
1608    fn from_expression(expr: &Arc<dyn PhysicalExpr>) -> Self {
1609        let mut seen = HashSet::<phys_expr::Column>::new();
1610        expr.apply(|expr| {
1611            if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1612                seen.insert(column.clone());
1613                if seen.len() > 1 {
1614                    return Ok(TreeNodeRecursion::Stop);
1615                }
1616            }
1617            Ok(TreeNodeRecursion::Continue)
1618        })
1619        // pre_visit always returns OK, so this will always too
1620        .expect("no way to return error during recursion");
1621        match seen.len() {
1622            0 => ColumnReferenceCount::Zero,
1623            1 => ColumnReferenceCount::One(
1624                seen.into_iter().next().expect("just checked len==1"),
1625            ),
1626            _ => ColumnReferenceCount::Many,
1627        }
1628    }
1629}
1630
1631fn build_statistics_expr(
1632    expr_builder: &mut PruningExpressionBuilder,
1633) -> Result<Arc<dyn PhysicalExpr>> {
1634    let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1635        Operator::NotEq => {
1636            // column != literal => (min, max) = literal =>
1637            // !(min != literal && max != literal) ==>
1638            // min != literal || literal != max
1639            let min_column_expr = expr_builder.min_column_expr()?;
1640            let max_column_expr = expr_builder.max_column_expr()?;
1641            Arc::new(phys_expr::BinaryExpr::new(
1642                Arc::new(phys_expr::BinaryExpr::new(
1643                    min_column_expr,
1644                    Operator::NotEq,
1645                    Arc::clone(expr_builder.scalar_expr()),
1646                )),
1647                Operator::Or,
1648                Arc::new(phys_expr::BinaryExpr::new(
1649                    Arc::clone(expr_builder.scalar_expr()),
1650                    Operator::NotEq,
1651                    max_column_expr,
1652                )),
1653            ))
1654        }
1655        Operator::Eq => {
1656            // column = literal => (min, max) = literal => min <= literal && literal <= max
1657            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1658            let min_column_expr = expr_builder.min_column_expr()?;
1659            let max_column_expr = expr_builder.max_column_expr()?;
1660            Arc::new(phys_expr::BinaryExpr::new(
1661                Arc::new(phys_expr::BinaryExpr::new(
1662                    min_column_expr,
1663                    Operator::LtEq,
1664                    Arc::clone(expr_builder.scalar_expr()),
1665                )),
1666                Operator::And,
1667                Arc::new(phys_expr::BinaryExpr::new(
1668                    Arc::clone(expr_builder.scalar_expr()),
1669                    Operator::LtEq,
1670                    max_column_expr,
1671                )),
1672            ))
1673        }
1674        Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1675        Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1676            plan_datafusion_err!(
1677                "LIKE expression with wildcard at the beginning is not supported"
1678            )
1679        })?,
1680        Operator::Gt => {
1681            // column > literal => (min, max) > literal => max > literal
1682            Arc::new(phys_expr::BinaryExpr::new(
1683                expr_builder.max_column_expr()?,
1684                Operator::Gt,
1685                Arc::clone(expr_builder.scalar_expr()),
1686            ))
1687        }
1688        Operator::GtEq => {
1689            // column >= literal => (min, max) >= literal => max >= literal
1690            Arc::new(phys_expr::BinaryExpr::new(
1691                expr_builder.max_column_expr()?,
1692                Operator::GtEq,
1693                Arc::clone(expr_builder.scalar_expr()),
1694            ))
1695        }
1696        Operator::Lt => {
1697            // column < literal => (min, max) < literal => min < literal
1698            Arc::new(phys_expr::BinaryExpr::new(
1699                expr_builder.min_column_expr()?,
1700                Operator::Lt,
1701                Arc::clone(expr_builder.scalar_expr()),
1702            ))
1703        }
1704        Operator::LtEq => {
1705            // column <= literal => (min, max) <= literal => min <= literal
1706            Arc::new(phys_expr::BinaryExpr::new(
1707                expr_builder.min_column_expr()?,
1708                Operator::LtEq,
1709                Arc::clone(expr_builder.scalar_expr()),
1710            ))
1711        }
1712        // other expressions are not supported
1713        _ => {
1714            return plan_err!(
1715                "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1716            );
1717        }
1718    };
1719    let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1720    Ok(statistics_expr)
1721}
1722
1723/// returns the string literal of the scalar value if it is a string
1724fn unpack_string(s: &ScalarValue) -> Option<&str> {
1725    s.try_as_str().flatten()
1726}
1727
1728fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1729    if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1730        let s = unpack_string(lit.value())?;
1731        return Some(s);
1732    }
1733    None
1734}
1735
1736/// Convert `column LIKE literal` where P is a constant prefix of the literal
1737/// to a range check on the column: `P <= column && column < P'`, where P' is the
1738/// lowest string after all P* strings.
1739fn build_like_match(
1740    expr_builder: &mut PruningExpressionBuilder,
1741) -> Option<Arc<dyn PhysicalExpr>> {
1742    // column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
1743    // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1744    // column LIKE '%foo' => min <= '' && '' <= max => true
1745    // column LIKE '%foo%' => min <= '' && '' <= max => true
1746    // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1747
1748    // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1749    //  this may involve building the physical expressions that call lower() and upper()
1750    let min_column_expr = expr_builder.min_column_expr().ok()?;
1751    let max_column_expr = expr_builder.max_column_expr().ok()?;
1752    let scalar_expr = expr_builder.scalar_expr();
1753    // check that the scalar is a string literal
1754    let s = extract_string_literal(scalar_expr)?;
1755    // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1756    let first_wildcard_index = s.find(['%', '_']);
1757    if first_wildcard_index == Some(0) {
1758        // there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
1759        return None;
1760    }
1761    let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1762        let prefix = &s[..wildcard_index];
1763        let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1764            prefix.to_string(),
1765        ))));
1766        let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1767            increment_utf8(prefix)?,
1768        ))));
1769        (lower_bound_lit, upper_bound_lit)
1770    } else {
1771        // the like expression is a literal and can be converted into a comparison
1772        let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1773            s.to_string(),
1774        ))));
1775        (Arc::clone(&bound), bound)
1776    };
1777    let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1778        lower_bound,
1779        Operator::LtEq,
1780        Arc::clone(&max_column_expr),
1781    ));
1782    let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1783        Arc::clone(&min_column_expr),
1784        Operator::LtEq,
1785        upper_bound,
1786    ));
1787    let combined = Arc::new(phys_expr::BinaryExpr::new(
1788        upper_bound_expr,
1789        Operator::And,
1790        lower_bound_expr,
1791    ));
1792    Some(combined)
1793}
1794
1795// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1796//
1797// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1798// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1799// looking for rows that don't begin with `const_prefix` can never be true)
1800fn build_not_like_match(
1801    expr_builder: &mut PruningExpressionBuilder<'_>,
1802) -> Result<Arc<dyn PhysicalExpr>> {
1803    // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1804
1805    let min_column_expr = expr_builder.min_column_expr()?;
1806    let max_column_expr = expr_builder.max_column_expr()?;
1807
1808    let scalar_expr = expr_builder.scalar_expr();
1809
1810    let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1811        plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1812    })?;
1813
1814    let (const_prefix, remaining) = split_constant_prefix(pattern);
1815    if const_prefix.is_empty() || remaining != "%" {
1816        // we can not handle `%` at the beginning or in the middle of the pattern
1817        // Example: For pattern "foo%bar", the row group might include values like
1818        // ["foobar", "food", "foodbar"], making it unsafe to prune.
1819        // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1820        // match the pattern, intermediate values like "food" may not
1821        // match the full pattern "foo%bar", making pruning unsafe.
1822        // (truncate foo%bar to foo% have same problem)
1823
1824        // we can not handle pattern containing `_`
1825        // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1826        // which means not every row is guaranteed to match the pattern.
1827        return Err(plan_datafusion_err!(
1828            "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1829        ));
1830    }
1831
1832    let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1833        true,
1834        false,
1835        Arc::clone(&min_column_expr),
1836        Arc::clone(scalar_expr),
1837    ));
1838
1839    let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1840        true,
1841        false,
1842        Arc::clone(&max_column_expr),
1843        Arc::clone(scalar_expr),
1844    ));
1845
1846    Ok(Arc::new(phys_expr::BinaryExpr::new(
1847        min_col_not_like_epxr,
1848        Operator::Or,
1849        max_col_not_like_expr,
1850    )))
1851}
1852
1853/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1854fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1855    let char_indices = pattern.char_indices().collect::<Vec<_>>();
1856    for i in 0..char_indices.len() {
1857        let (idx, char) = char_indices[i];
1858        if char == '%' || char == '_' {
1859            if i != 0 && char_indices[i - 1].1 == '\\' {
1860                // ecsaped by `\`
1861                continue;
1862            }
1863            return (&pattern[..idx], &pattern[idx..]);
1864        }
1865    }
1866    (pattern, "")
1867}
1868
1869/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1870/// This makes it so that the returned string will always compare greater than the input string
1871/// or any other string with the same prefix.
1872/// This is necessary since the statistics may have been truncated: if we have a min statistic
1873/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1874/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1875/// In this example `increment_utf8("foo") == "fop"
1876fn increment_utf8(data: &str) -> Option<String> {
1877    // Helper function to check if a character is valid to use
1878    fn is_valid_unicode(c: char) -> bool {
1879        let cp = c as u32;
1880
1881        // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1882        if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1883            return false;
1884        }
1885
1886        // Filter out private use area
1887        if cp >= 0x110000 {
1888            return false;
1889        }
1890
1891        true
1892    }
1893
1894    // Convert string to vector of code points
1895    let mut code_points: Vec<char> = data.chars().collect();
1896
1897    // Work backwards through code points
1898    for idx in (0..code_points.len()).rev() {
1899        let original = code_points[idx] as u32;
1900
1901        // Try incrementing the code point
1902        if let Some(next_char) = char::from_u32(original + 1)
1903            && is_valid_unicode(next_char)
1904        {
1905            code_points[idx] = next_char;
1906            // truncate the string to the current index
1907            code_points.truncate(idx + 1);
1908            return Some(code_points.into_iter().collect());
1909        }
1910    }
1911
1912    None
1913}
1914
1915/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
1916///
1917/// This is important not only as an optimization but also because statistics may not be
1918/// accurate for columns that are all nulls.
1919/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
1920/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
1921///
1922/// For example:
1923///
1924/// `x_min <= 10 AND 10 <= x_max`
1925///
1926/// will become
1927///
1928/// ```sql
1929/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
1930/// ````
1931///
1932/// If the column is known to be all nulls, then the expression
1933/// `x_null_count = x_row_count` will be true, which will cause the
1934/// boolean expression to return false. Therefore, prune out the container.
1935fn wrap_null_count_check_expr(
1936    statistics_expr: Arc<dyn PhysicalExpr>,
1937    expr_builder: &mut PruningExpressionBuilder,
1938) -> Result<Arc<dyn PhysicalExpr>> {
1939    // x_null_count != x_row_count
1940    let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1941        expr_builder.null_count_column_expr()?,
1942        Operator::NotEq,
1943        expr_builder.row_count_column_expr()?,
1944    ));
1945
1946    // (x_null_count != x_row_count) AND (<statistics_expr>)
1947    Ok(Arc::new(phys_expr::BinaryExpr::new(
1948        not_when_null_count_eq_row_count,
1949        Operator::And,
1950        statistics_expr,
1951    )))
1952}
1953
1954#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1955pub(crate) enum StatisticsType {
1956    Min,
1957    Max,
1958    NullCount,
1959    RowCount,
1960}
1961
1962#[cfg(test)]
1963mod tests {
1964    use std::collections::HashMap;
1965    use std::ops::{Not, Rem};
1966
1967    use super::*;
1968    use datafusion_common::test_util::batches_to_string;
1969    use datafusion_expr::{and, col, lit, or};
1970    use datafusion_physical_expr::utils::collect_columns;
1971    use insta::assert_snapshot;
1972
1973    use arrow::array::Decimal128Array;
1974    use arrow::{
1975        array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1976        datatypes::TimeUnit,
1977    };
1978    use datafusion_expr::expr::InList;
1979    use datafusion_expr::{Expr, cast, is_null, try_cast};
1980    use datafusion_functions_nested::expr_fn::{array_has, make_array};
1981    use datafusion_physical_expr::expressions::{
1982        self as phys_expr, DynamicFilterPhysicalExpr,
1983    };
1984    use datafusion_physical_expr::planner::logical2physical;
1985    use itertools::Itertools;
1986
1987    #[derive(Debug, Default)]
1988    /// Mock statistic provider for tests
1989    ///
1990    /// Each row represents the statistics for a "container" (which
1991    /// might represent an entire parquet file, or directory of files,
1992    /// or some other collection of data for which we had statistics)
1993    ///
1994    /// Note All `ArrayRefs` must be the same size.
1995    struct ContainerStats {
1996        min: Option<ArrayRef>,
1997        max: Option<ArrayRef>,
1998        /// Optional values
1999        null_counts: Option<ArrayRef>,
2000        row_counts: Option<ArrayRef>,
2001        /// Optional known values (e.g. mimic a bloom filter)
2002        /// (value, contained)
2003        /// If present, all BooleanArrays must be the same size as min/max
2004        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
2005    }
2006
2007    impl ContainerStats {
2008        fn new() -> Self {
2009            Default::default()
2010        }
2011        fn new_decimal128(
2012            min: impl IntoIterator<Item = Option<i128>>,
2013            max: impl IntoIterator<Item = Option<i128>>,
2014            precision: u8,
2015            scale: i8,
2016        ) -> Self {
2017            Self::new()
2018                .with_min(Arc::new(
2019                    min.into_iter()
2020                        .collect::<Decimal128Array>()
2021                        .with_precision_and_scale(precision, scale)
2022                        .unwrap(),
2023                ))
2024                .with_max(Arc::new(
2025                    max.into_iter()
2026                        .collect::<Decimal128Array>()
2027                        .with_precision_and_scale(precision, scale)
2028                        .unwrap(),
2029                ))
2030        }
2031
2032        fn new_i64(
2033            min: impl IntoIterator<Item = Option<i64>>,
2034            max: impl IntoIterator<Item = Option<i64>>,
2035        ) -> Self {
2036            Self::new()
2037                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
2038                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
2039        }
2040
2041        fn new_i32(
2042            min: impl IntoIterator<Item = Option<i32>>,
2043            max: impl IntoIterator<Item = Option<i32>>,
2044        ) -> Self {
2045            Self::new()
2046                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
2047                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
2048        }
2049
2050        fn new_utf8<'a>(
2051            min: impl IntoIterator<Item = Option<&'a str>>,
2052            max: impl IntoIterator<Item = Option<&'a str>>,
2053        ) -> Self {
2054            Self::new()
2055                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
2056                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
2057        }
2058
2059        fn new_bool(
2060            min: impl IntoIterator<Item = Option<bool>>,
2061            max: impl IntoIterator<Item = Option<bool>>,
2062        ) -> Self {
2063            Self::new()
2064                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
2065                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
2066        }
2067
2068        fn min(&self) -> Option<ArrayRef> {
2069            self.min.clone()
2070        }
2071
2072        fn max(&self) -> Option<ArrayRef> {
2073            self.max.clone()
2074        }
2075
2076        fn null_counts(&self) -> Option<ArrayRef> {
2077            self.null_counts.clone()
2078        }
2079
2080        fn row_counts(&self) -> Option<ArrayRef> {
2081            self.row_counts.clone()
2082        }
2083
2084        /// return an iterator over all arrays in this statistics
2085        fn arrays(&self) -> Vec<ArrayRef> {
2086            let contained_arrays = self
2087                .contained
2088                .iter()
2089                .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2090
2091            [
2092                self.min.as_ref().cloned(),
2093                self.max.as_ref().cloned(),
2094                self.null_counts.as_ref().cloned(),
2095                self.row_counts.as_ref().cloned(),
2096            ]
2097            .into_iter()
2098            .flatten()
2099            .chain(contained_arrays)
2100            .collect()
2101        }
2102
2103        /// Returns the number of containers represented by this statistics This
2104        /// picks the length of the first array as all arrays must have the same
2105        /// length (which is verified by `assert_invariants`).
2106        fn len(&self) -> usize {
2107            // pick the first non zero length
2108            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2109        }
2110
2111        /// Ensure that the lengths of all arrays are consistent
2112        fn assert_invariants(&self) {
2113            let mut prev_len = None;
2114
2115            for len in self.arrays().iter().map(|a| a.len()) {
2116                // Get a length, if we don't already have one
2117                match prev_len {
2118                    None => {
2119                        prev_len = Some(len);
2120                    }
2121                    Some(prev_len) => {
2122                        assert_eq!(prev_len, len);
2123                    }
2124                }
2125            }
2126        }
2127
2128        /// Add min values
2129        fn with_min(mut self, min: ArrayRef) -> Self {
2130            self.min = Some(min);
2131            self
2132        }
2133
2134        /// Add max values
2135        fn with_max(mut self, max: ArrayRef) -> Self {
2136            self.max = Some(max);
2137            self
2138        }
2139
2140        /// Add null counts. There must be the same number of null counts as
2141        /// there are containers
2142        fn with_null_counts(
2143            mut self,
2144            counts: impl IntoIterator<Item = Option<u64>>,
2145        ) -> Self {
2146            let null_counts: ArrayRef =
2147                Arc::new(counts.into_iter().collect::<UInt64Array>());
2148
2149            self.assert_invariants();
2150            self.null_counts = Some(null_counts);
2151            self
2152        }
2153
2154        /// Add row counts. There must be the same number of row counts as
2155        /// there are containers
2156        fn with_row_counts(
2157            mut self,
2158            counts: impl IntoIterator<Item = Option<u64>>,
2159        ) -> Self {
2160            let row_counts: ArrayRef =
2161                Arc::new(counts.into_iter().collect::<UInt64Array>());
2162
2163            self.assert_invariants();
2164            self.row_counts = Some(row_counts);
2165            self
2166        }
2167
2168        /// Add contained information.
2169        pub fn with_contained(
2170            mut self,
2171            values: impl IntoIterator<Item = ScalarValue>,
2172            contained: impl IntoIterator<Item = Option<bool>>,
2173        ) -> Self {
2174            let contained: BooleanArray = contained.into_iter().collect();
2175            let values: HashSet<_> = values.into_iter().collect();
2176
2177            self.contained.push((values, contained));
2178            self.assert_invariants();
2179            self
2180        }
2181
2182        /// get any contained information for the specified values
2183        fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2184            // find the one with the matching values
2185            self.contained
2186                .iter()
2187                .find(|(values, _contained)| values == find_values)
2188                .map(|(_values, contained)| contained.clone())
2189        }
2190    }
2191
2192    #[derive(Debug, Default)]
2193    struct TestStatistics {
2194        // key: column name
2195        stats: HashMap<Column, ContainerStats>,
2196    }
2197
2198    impl TestStatistics {
2199        fn new() -> Self {
2200            Self::default()
2201        }
2202
2203        fn with(
2204            mut self,
2205            name: impl Into<String>,
2206            container_stats: ContainerStats,
2207        ) -> Self {
2208            let col = Column::from_name(name.into());
2209            self.stats.insert(col, container_stats);
2210            self
2211        }
2212
2213        /// Add null counts for the specified column.
2214        /// There must be the same number of null counts as
2215        /// there are containers
2216        fn with_null_counts(
2217            mut self,
2218            name: impl Into<String>,
2219            counts: impl IntoIterator<Item = Option<u64>>,
2220        ) -> Self {
2221            let col = Column::from_name(name.into());
2222
2223            // take stats out and update them
2224            let container_stats = self
2225                .stats
2226                .remove(&col)
2227                .unwrap_or_default()
2228                .with_null_counts(counts);
2229
2230            // put stats back in
2231            self.stats.insert(col, container_stats);
2232            self
2233        }
2234
2235        /// Add row counts for the specified column.
2236        /// There must be the same number of row counts as
2237        /// there are containers
2238        fn with_row_counts(
2239            mut self,
2240            name: impl Into<String>,
2241            counts: impl IntoIterator<Item = Option<u64>>,
2242        ) -> Self {
2243            let col = Column::from_name(name.into());
2244
2245            // take stats out and update them
2246            let container_stats = self
2247                .stats
2248                .remove(&col)
2249                .unwrap_or_default()
2250                .with_row_counts(counts);
2251
2252            // put stats back in
2253            self.stats.insert(col, container_stats);
2254            self
2255        }
2256
2257        /// Add contained information for the specified column.
2258        fn with_contained(
2259            mut self,
2260            name: impl Into<String>,
2261            values: impl IntoIterator<Item = ScalarValue>,
2262            contained: impl IntoIterator<Item = Option<bool>>,
2263        ) -> Self {
2264            let col = Column::from_name(name.into());
2265
2266            // take stats out and update them
2267            let container_stats = self
2268                .stats
2269                .remove(&col)
2270                .unwrap_or_default()
2271                .with_contained(values, contained);
2272
2273            // put stats back in
2274            self.stats.insert(col, container_stats);
2275            self
2276        }
2277    }
2278
2279    impl PruningStatistics for TestStatistics {
2280        fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2281            self.stats
2282                .get(column)
2283                .map(|container_stats| container_stats.min())
2284                .unwrap_or(None)
2285        }
2286
2287        fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2288            self.stats
2289                .get(column)
2290                .map(|container_stats| container_stats.max())
2291                .unwrap_or(None)
2292        }
2293
2294        fn num_containers(&self) -> usize {
2295            self.stats
2296                .values()
2297                .next()
2298                .map(|container_stats| container_stats.len())
2299                .unwrap_or(0)
2300        }
2301
2302        fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2303            self.stats
2304                .get(column)
2305                .map(|container_stats| container_stats.null_counts())
2306                .unwrap_or(None)
2307        }
2308
2309        fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2310            self.stats
2311                .get(column)
2312                .map(|container_stats| container_stats.row_counts())
2313                .unwrap_or(None)
2314        }
2315
2316        fn contained(
2317            &self,
2318            column: &Column,
2319            values: &HashSet<ScalarValue>,
2320        ) -> Option<BooleanArray> {
2321            self.stats
2322                .get(column)
2323                .and_then(|container_stats| container_stats.contained(values))
2324        }
2325    }
2326
2327    /// Returns the specified min/max container values
2328    struct OneContainerStats {
2329        min_values: Option<ArrayRef>,
2330        max_values: Option<ArrayRef>,
2331        num_containers: usize,
2332    }
2333
2334    impl PruningStatistics for OneContainerStats {
2335        fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2336            self.min_values.clone()
2337        }
2338
2339        fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2340            self.max_values.clone()
2341        }
2342
2343        fn num_containers(&self) -> usize {
2344            self.num_containers
2345        }
2346
2347        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2348            None
2349        }
2350
2351        fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2352            None
2353        }
2354
2355        fn contained(
2356            &self,
2357            _column: &Column,
2358            _values: &HashSet<ScalarValue>,
2359        ) -> Option<BooleanArray> {
2360            None
2361        }
2362    }
2363
2364    /// Row count should only be referenced once in the pruning expression, even if we need the row count
2365    /// for multiple columns.
2366    #[test]
2367    fn test_unique_row_count_field_and_column() {
2368        // c1 = 100 AND c2 = 200
2369        let schema: SchemaRef = Arc::new(Schema::new(vec![
2370            Field::new("c1", DataType::Int32, true),
2371            Field::new("c2", DataType::Int32, true),
2372        ]));
2373        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2374        let expr = logical2physical(&expr, &schema);
2375        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2376        // note pruning expression refers to row_count twice
2377        assert_eq!(
2378            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2379            p.predicate_expr.to_string()
2380        );
2381
2382        // Fields in required schema should be unique, otherwise when creating batches
2383        // it will fail because of duplicate field names
2384        let mut fields = HashSet::new();
2385        for (_col, _ty, field) in p.required_columns().iter() {
2386            let was_new = fields.insert(field);
2387            if !was_new {
2388                panic!(
2389                    "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2390                );
2391            }
2392        }
2393    }
2394
2395    #[test]
2396    fn prune_all_rows_null_counts() {
2397        // if null_count = row_count then we should prune the container for i = 0
2398        // regardless of the statistics
2399        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2400        let statistics = TestStatistics::new().with(
2401            "i",
2402            ContainerStats::new_i32(
2403                vec![Some(0)], // min
2404                vec![Some(0)], // max
2405            )
2406            .with_null_counts(vec![Some(1)])
2407            .with_row_counts(vec![Some(1)]),
2408        );
2409        let expected_ret = &[false];
2410        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2411
2412        // this should be true even if the container stats are missing
2413        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2414        let container_stats = ContainerStats {
2415            min: Some(Arc::new(Int32Array::from(vec![None]))),
2416            max: Some(Arc::new(Int32Array::from(vec![None]))),
2417            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2418            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2419            ..ContainerStats::default()
2420        };
2421        let statistics = TestStatistics::new().with("i", container_stats);
2422        let expected_ret = &[false];
2423        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2424
2425        // If the null counts themselves are missing we should be able to fall back to the stats
2426        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2427        let container_stats = ContainerStats {
2428            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2429            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2430            null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2431            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2432            ..ContainerStats::default()
2433        };
2434        let statistics = TestStatistics::new().with("i", container_stats);
2435        let expected_ret = &[true];
2436        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2437        let expected_ret = &[false];
2438        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2439
2440        // Same for the row counts
2441        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2442        let container_stats = ContainerStats {
2443            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2444            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2445            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2446            row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2447            ..ContainerStats::default()
2448        };
2449        let statistics = TestStatistics::new().with("i", container_stats);
2450        let expected_ret = &[true];
2451        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2452        let expected_ret = &[false];
2453        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2454    }
2455
2456    #[test]
2457    fn prune_missing_statistics() {
2458        // If the min or max stats are missing we should not prune
2459        // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2460        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2461        let container_stats = ContainerStats {
2462            min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2463            max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2464            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2465            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2466            ..ContainerStats::default()
2467        };
2468        let statistics = TestStatistics::new().with("i", container_stats);
2469        let expected_ret = &[true, true];
2470        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2471        let expected_ret = &[false, true];
2472        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2473        let expected_ret = &[true, false];
2474        prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2475    }
2476
2477    #[test]
2478    fn prune_null_stats() {
2479        // if null_count = row_count then we should prune the container for i = 0
2480        // regardless of the statistics
2481        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2482
2483        let statistics = TestStatistics::new().with(
2484            "i",
2485            ContainerStats::new_i32(
2486                vec![Some(0)], // min
2487                vec![Some(0)], // max
2488            )
2489            .with_null_counts(vec![Some(1)])
2490            .with_row_counts(vec![Some(1)]),
2491        );
2492
2493        let expected_ret = &[false];
2494
2495        // i = 0
2496        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2497    }
2498
2499    #[test]
2500    fn test_build_statistics_record_batch() {
2501        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2502        let required_columns = RequiredColumns::from(vec![
2503            // min of original column s1, named s1_min
2504            (
2505                phys_expr::Column::new("s1", 1),
2506                StatisticsType::Min,
2507                Field::new("s1_min", DataType::Int32, true),
2508            ),
2509            // max of original column s2, named s2_max
2510            (
2511                phys_expr::Column::new("s2", 2),
2512                StatisticsType::Max,
2513                Field::new("s2_max", DataType::Int32, true),
2514            ),
2515            // max of original column s3, named s3_max
2516            (
2517                phys_expr::Column::new("s3", 3),
2518                StatisticsType::Max,
2519                Field::new("s3_max", DataType::Utf8, true),
2520            ),
2521            // min of original column s3, named s3_min
2522            (
2523                phys_expr::Column::new("s3", 3),
2524                StatisticsType::Min,
2525                Field::new("s3_min", DataType::Utf8, true),
2526            ),
2527        ]);
2528
2529        let statistics = TestStatistics::new()
2530            .with(
2531                "s1",
2532                ContainerStats::new_i32(
2533                    vec![None, None, Some(9), None],  // min
2534                    vec![Some(10), None, None, None], // max
2535                ),
2536            )
2537            .with(
2538                "s2",
2539                ContainerStats::new_i32(
2540                    vec![Some(2), None, None, None],  // min
2541                    vec![Some(20), None, None, None], // max
2542                ),
2543            )
2544            .with(
2545                "s3",
2546                ContainerStats::new_utf8(
2547                    vec![Some("a"), None, None, None],      // min
2548                    vec![Some("q"), None, Some("r"), None], // max
2549                ),
2550            );
2551
2552        let batch =
2553            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2554        assert_snapshot!(batches_to_string(&[batch]), @r"
2555        +--------+--------+--------+--------+
2556        | s1_min | s2_max | s3_max | s3_min |
2557        +--------+--------+--------+--------+
2558        |        | 20     | q      | a      |
2559        |        |        |        |        |
2560        | 9      |        | r      |        |
2561        |        |        |        |        |
2562        +--------+--------+--------+--------+
2563        ");
2564    }
2565
2566    #[test]
2567    fn test_build_statistics_casting() {
2568        // Test requesting a Timestamp column, but getting statistics as Int64
2569        // which is what Parquet does
2570
2571        // Request a record batch with of s1_min as a timestamp
2572        let required_columns = RequiredColumns::from(vec![(
2573            phys_expr::Column::new("s3", 3),
2574            StatisticsType::Min,
2575            Field::new(
2576                "s1_min",
2577                DataType::Timestamp(TimeUnit::Nanosecond, None),
2578                true,
2579            ),
2580        )]);
2581
2582        // Note the statistics pass back i64 (not timestamp)
2583        let statistics = OneContainerStats {
2584            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2585            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2586            num_containers: 1,
2587        };
2588
2589        let batch =
2590            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2591
2592        assert_snapshot!(batches_to_string(&[batch]), @r"
2593        +-------------------------------+
2594        | s1_min                        |
2595        +-------------------------------+
2596        | 1970-01-01T00:00:00.000000010 |
2597        +-------------------------------+
2598        ");
2599    }
2600
2601    #[test]
2602    fn test_build_statistics_no_required_stats() {
2603        let required_columns = RequiredColumns::new();
2604
2605        let statistics = OneContainerStats {
2606            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2607            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2608            num_containers: 1,
2609        };
2610
2611        let batch =
2612            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2613        assert_eq!(batch.num_rows(), 1); // had 1 container
2614    }
2615
2616    #[test]
2617    fn test_build_statistics_inconsistent_types() {
2618        // Test requesting a Utf8 column when the stats return some other type
2619
2620        // Request a record batch with of s1_min as a timestamp
2621        let required_columns = RequiredColumns::from(vec![(
2622            phys_expr::Column::new("s3", 3),
2623            StatisticsType::Min,
2624            Field::new("s1_min", DataType::Utf8, true),
2625        )]);
2626
2627        // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2628        let statistics = OneContainerStats {
2629            min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2630            max_values: None,
2631            num_containers: 1,
2632        };
2633
2634        let batch =
2635            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2636        assert_snapshot!(batches_to_string(&[batch]), @r"
2637        +--------+
2638        | s1_min |
2639        +--------+
2640        |        |
2641        +--------+
2642        ");
2643    }
2644
2645    #[test]
2646    fn test_build_statistics_inconsistent_length() {
2647        // return an inconsistent length to the actual statistics arrays
2648        let required_columns = RequiredColumns::from(vec![(
2649            phys_expr::Column::new("s1", 3),
2650            StatisticsType::Min,
2651            Field::new("s1_min", DataType::Int64, true),
2652        )]);
2653
2654        // Note the statistics pass back i64 (not timestamp)
2655        let statistics = OneContainerStats {
2656            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2657            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2658            num_containers: 3,
2659        };
2660
2661        let result =
2662            build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2663        assert!(
2664            result
2665                .to_string()
2666                .contains("mismatched statistics length. Expected 3, got 1"),
2667            "{}",
2668            result
2669        );
2670    }
2671
2672    #[test]
2673    fn row_group_predicate_eq() -> Result<()> {
2674        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2675        let expected_expr =
2676            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2677
2678        // test column on the left
2679        let expr = col("c1").eq(lit(1));
2680        let predicate_expr =
2681            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2682        assert_eq!(predicate_expr.to_string(), expected_expr);
2683
2684        // test column on the right
2685        let expr = lit(1).eq(col("c1"));
2686        let predicate_expr =
2687            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2688        assert_eq!(predicate_expr.to_string(), expected_expr);
2689
2690        Ok(())
2691    }
2692
2693    #[test]
2694    fn row_group_predicate_not_eq() -> Result<()> {
2695        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2696        let expected_expr =
2697            "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2698
2699        // test column on the left
2700        let expr = col("c1").not_eq(lit(1));
2701        let predicate_expr =
2702            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2703        assert_eq!(predicate_expr.to_string(), expected_expr);
2704
2705        // test column on the right
2706        let expr = lit(1).not_eq(col("c1"));
2707        let predicate_expr =
2708            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2709        assert_eq!(predicate_expr.to_string(), expected_expr);
2710
2711        Ok(())
2712    }
2713
2714    #[test]
2715    fn row_group_predicate_gt() -> Result<()> {
2716        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2717        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2718
2719        // test column on the left
2720        let expr = col("c1").gt(lit(1));
2721        let predicate_expr =
2722            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2723        assert_eq!(predicate_expr.to_string(), expected_expr);
2724
2725        // test column on the right
2726        let expr = lit(1).lt(col("c1"));
2727        let predicate_expr =
2728            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2729        assert_eq!(predicate_expr.to_string(), expected_expr);
2730
2731        Ok(())
2732    }
2733
2734    #[test]
2735    fn row_group_predicate_gt_eq() -> Result<()> {
2736        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2737        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2738
2739        // test column on the left
2740        let expr = col("c1").gt_eq(lit(1));
2741        let predicate_expr =
2742            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2743        assert_eq!(predicate_expr.to_string(), expected_expr);
2744        // test column on the right
2745        let expr = lit(1).lt_eq(col("c1"));
2746        let predicate_expr =
2747            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2748        assert_eq!(predicate_expr.to_string(), expected_expr);
2749
2750        Ok(())
2751    }
2752
2753    #[test]
2754    fn row_group_predicate_lt() -> Result<()> {
2755        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2756        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2757
2758        // test column on the left
2759        let expr = col("c1").lt(lit(1));
2760        let predicate_expr =
2761            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2762        assert_eq!(predicate_expr.to_string(), expected_expr);
2763
2764        // test column on the right
2765        let expr = lit(1).gt(col("c1"));
2766        let predicate_expr =
2767            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2768        assert_eq!(predicate_expr.to_string(), expected_expr);
2769
2770        Ok(())
2771    }
2772
2773    #[test]
2774    fn row_group_predicate_lt_eq() -> Result<()> {
2775        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2776        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2777
2778        // test column on the left
2779        let expr = col("c1").lt_eq(lit(1));
2780        let predicate_expr =
2781            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2782        assert_eq!(predicate_expr.to_string(), expected_expr);
2783        // test column on the right
2784        let expr = lit(1).gt_eq(col("c1"));
2785        let predicate_expr =
2786            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2787        assert_eq!(predicate_expr.to_string(), expected_expr);
2788
2789        Ok(())
2790    }
2791
2792    #[test]
2793    fn row_group_predicate_and() -> Result<()> {
2794        let schema = Schema::new(vec![
2795            Field::new("c1", DataType::Int32, false),
2796            Field::new("c2", DataType::Int32, false),
2797            Field::new("c3", DataType::Int32, false),
2798        ]);
2799        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2800        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2801        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2802        let predicate_expr =
2803            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2804        assert_eq!(predicate_expr.to_string(), expected_expr);
2805
2806        Ok(())
2807    }
2808
2809    #[test]
2810    fn row_group_predicate_or() -> Result<()> {
2811        let schema = Schema::new(vec![
2812            Field::new("c1", DataType::Int32, false),
2813            Field::new("c2", DataType::Int32, false),
2814        ]);
2815        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2816        let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2817        let expected_expr = "true";
2818        let predicate_expr =
2819            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2820        assert_eq!(predicate_expr.to_string(), expected_expr);
2821
2822        Ok(())
2823    }
2824
2825    #[test]
2826    fn row_group_predicate_not() -> Result<()> {
2827        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2828        let expected_expr = "true";
2829
2830        let expr = col("c1").not();
2831        let predicate_expr =
2832            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2833        assert_eq!(predicate_expr.to_string(), expected_expr);
2834
2835        Ok(())
2836    }
2837
2838    #[test]
2839    fn row_group_predicate_not_bool() -> Result<()> {
2840        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2841        let expected_expr = "NOT c1_min@0 AND c1_max@1";
2842
2843        let expr = col("c1").not();
2844        let predicate_expr =
2845            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2846        assert_eq!(predicate_expr.to_string(), expected_expr);
2847
2848        Ok(())
2849    }
2850
2851    #[test]
2852    fn row_group_predicate_bool() -> Result<()> {
2853        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2854        let expected_expr = "c1_min@0 OR c1_max@1";
2855
2856        let expr = col("c1");
2857        let predicate_expr =
2858            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2859        assert_eq!(predicate_expr.to_string(), expected_expr);
2860
2861        Ok(())
2862    }
2863
2864    /// Test that non-boolean literal expressions don't prune any containers and error gracefully by not pruning anything instead of e.g. panicking
2865    #[test]
2866    fn row_group_predicate_non_boolean() {
2867        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2868        let statistics = TestStatistics::new()
2869            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2870        let expected_ret = &[true];
2871        prune_with_expr(lit(1), &schema, &statistics, expected_ret);
2872    }
2873
2874    // Test that literal-to-literal comparisons are correctly evaluated.
2875    // When both sides are constants, the expression should be evaluated directly
2876    // and if it's false, all containers should be pruned.
2877    #[test]
2878    fn row_group_predicate_literal_false() {
2879        // lit(1) = lit(2) is always false, so all containers should be pruned
2880        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2881        let statistics = TestStatistics::new()
2882            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2883        let expected_ret = &[false];
2884        prune_with_simplified_expr(lit(1).eq(lit(2)), &schema, &statistics, expected_ret);
2885    }
2886
2887    /// Test nested/complex literal expression trees.
2888    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2889    #[test]
2890    fn row_group_predicate_literal_true() {
2891        // lit(1) = lit(1) is always true, so no containers should be pruned
2892        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2893        let statistics = TestStatistics::new()
2894            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2895        let expected_ret = &[true];
2896        prune_with_simplified_expr(lit(1).eq(lit(1)), &schema, &statistics, expected_ret);
2897    }
2898
2899    /// Test nested/complex literal expression trees.
2900    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2901    #[test]
2902    fn row_group_predicate_literal_null() {
2903        // lit(1) = null is always null, so no containers should be pruned
2904        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2905        let statistics = TestStatistics::new()
2906            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2907        let expected_ret = &[true];
2908        prune_with_simplified_expr(
2909            lit(1).eq(lit(ScalarValue::Null)),
2910            &schema,
2911            &statistics,
2912            expected_ret,
2913        );
2914    }
2915
2916    /// Test nested/complex literal expression trees.
2917    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2918    #[test]
2919    fn row_group_predicate_complex_literals() {
2920        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2921        let statistics = TestStatistics::new()
2922            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2923
2924        // (1 + 2) > 0 is always true
2925        prune_with_simplified_expr(
2926            (lit(1) + lit(2)).gt(lit(0)),
2927            &schema,
2928            &statistics,
2929            &[true],
2930        );
2931
2932        // (1 + 2) < 0 is always false
2933        prune_with_simplified_expr(
2934            (lit(1) + lit(2)).lt(lit(0)),
2935            &schema,
2936            &statistics,
2937            &[false],
2938        );
2939
2940        // Nested AND of literals: true AND false = false
2941        prune_with_simplified_expr(
2942            lit(true).and(lit(false)),
2943            &schema,
2944            &statistics,
2945            &[false],
2946        );
2947
2948        // Nested OR of literals: true OR false = true
2949        prune_with_simplified_expr(
2950            lit(true).or(lit(false)),
2951            &schema,
2952            &statistics,
2953            &[true],
2954        );
2955
2956        // Complex nested: (1 < 2) AND (3 > 1) = true AND true = true
2957        prune_with_simplified_expr(
2958            lit(1).lt(lit(2)).and(lit(3).gt(lit(1))),
2959            &schema,
2960            &statistics,
2961            &[true],
2962        );
2963
2964        // Complex nested: (1 > 2) OR (3 < 1) = false OR false = false
2965        prune_with_simplified_expr(
2966            lit(1).gt(lit(2)).or(lit(3).lt(lit(1))),
2967            &schema,
2968            &statistics,
2969            &[false],
2970        );
2971    }
2972
2973    /// Integration test demonstrating that a dynamic filter with replaced children as literals will be snapshotted, simplified and then pruned correctly.
2974    #[test]
2975    fn row_group_predicate_dynamic_filter_with_literals() {
2976        let schema = Arc::new(Schema::new(vec![
2977            Field::new("c1", DataType::Int32, true),
2978            Field::new("part", DataType::Utf8, true),
2979        ]));
2980        let statistics = TestStatistics::new()
2981            // Note that we have no stats, pruning can only happen via partition value pruning from the dynamic filter
2982            .with_row_counts("c1", vec![Some(10)]);
2983        let dynamic_filter_expr = col("c1").gt(lit(5)).and(col("part").eq(lit("B")));
2984        let phys_expr = logical2physical(&dynamic_filter_expr, &schema);
2985        let children = collect_columns(&phys_expr)
2986            .iter()
2987            .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
2988            .collect_vec();
2989        let dynamic_phys_expr =
2990            Arc::new(DynamicFilterPhysicalExpr::new(children, phys_expr))
2991                as Arc<dyn PhysicalExpr>;
2992        // Simulate the partition value substitution that would happen in ParquetOpener
2993        let remapped_expr = dynamic_phys_expr
2994            .children()
2995            .into_iter()
2996            .map(|child_expr| {
2997                let Some(col_expr) =
2998                    child_expr.as_any().downcast_ref::<phys_expr::Column>()
2999                else {
3000                    return Arc::clone(child_expr);
3001                };
3002                if col_expr.name() == "part" {
3003                    // simulate dynamic filter replacement with literal "A"
3004                    Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
3005                        "A".to_string(),
3006                    )))) as Arc<dyn PhysicalExpr>
3007                } else {
3008                    Arc::clone(child_expr)
3009                }
3010            })
3011            .collect_vec();
3012        let dynamic_filter_expr =
3013            dynamic_phys_expr.with_new_children(remapped_expr).unwrap();
3014        // After substitution the expression is c1 > 5 AND part = "B" which should prune the file since the partition value is "A"
3015        let expected = &[false];
3016        let p =
3017            PruningPredicate::try_new(dynamic_filter_expr, Arc::clone(&schema)).unwrap();
3018        let result = p.prune(&statistics).unwrap();
3019        assert_eq!(result, expected);
3020    }
3021
3022    #[test]
3023    fn row_group_predicate_lt_bool() -> Result<()> {
3024        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
3025        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
3026
3027        // DF doesn't support arithmetic on boolean columns so
3028        // this predicate will error when evaluated
3029        let expr = col("c1").lt(lit(true));
3030        let predicate_expr =
3031            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3032        assert_eq!(predicate_expr.to_string(), expected_expr);
3033
3034        Ok(())
3035    }
3036
3037    #[test]
3038    fn row_group_predicate_required_columns() -> Result<()> {
3039        let schema = Schema::new(vec![
3040            Field::new("c1", DataType::Int32, false),
3041            Field::new("c2", DataType::Int32, false),
3042        ]);
3043        let mut required_columns = RequiredColumns::new();
3044        // c1 < 1 and (c2 = 2 or c2 = 3)
3045        let expr = col("c1")
3046            .lt(lit(1))
3047            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
3048        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
3049        let predicate_expr =
3050            test_build_predicate_expression(&expr, &schema, &mut required_columns);
3051        assert_eq!(predicate_expr.to_string(), expected_expr);
3052        println!("required_columns: {required_columns:#?}"); // for debugging assertions below
3053        // c1 < 1 should add c1_min
3054        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
3055        assert_eq!(
3056            required_columns.columns[0],
3057            (
3058                phys_expr::Column::new("c1", 0),
3059                StatisticsType::Min,
3060                c1_min_field.with_nullable(true) // could be nullable if stats are not present
3061            )
3062        );
3063        // c1 < 1 should add c1_null_count
3064        let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
3065        assert_eq!(
3066            required_columns.columns[1],
3067            (
3068                phys_expr::Column::new("c1", 0),
3069                StatisticsType::NullCount,
3070                c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
3071            )
3072        );
3073        // c1 < 1 should add row_count
3074        let row_count_field = Field::new("row_count", DataType::UInt64, false);
3075        assert_eq!(
3076            required_columns.columns[2],
3077            (
3078                phys_expr::Column::new("c1", 0),
3079                StatisticsType::RowCount,
3080                row_count_field.with_nullable(true) // could be nullable if stats are not present
3081            )
3082        );
3083        // c2 = 2 should add c2_min and c2_max
3084        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
3085        assert_eq!(
3086            required_columns.columns[3],
3087            (
3088                phys_expr::Column::new("c2", 1),
3089                StatisticsType::Min,
3090                c2_min_field.with_nullable(true) // could be nullable if stats are not present
3091            )
3092        );
3093        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
3094        assert_eq!(
3095            required_columns.columns[4],
3096            (
3097                phys_expr::Column::new("c2", 1),
3098                StatisticsType::Max,
3099                c2_max_field.with_nullable(true) // could be nullable if stats are not present
3100            )
3101        );
3102        // c2 = 2 should add c2_null_count
3103        let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
3104        assert_eq!(
3105            required_columns.columns[5],
3106            (
3107                phys_expr::Column::new("c2", 1),
3108                StatisticsType::NullCount,
3109                c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
3110            )
3111        );
3112        // c2 = 1 should add row_count
3113        let row_count_field = Field::new("row_count", DataType::UInt64, false);
3114        assert_eq!(
3115            required_columns.columns[2],
3116            (
3117                phys_expr::Column::new("c1", 0),
3118                StatisticsType::RowCount,
3119                row_count_field.with_nullable(true) // could be nullable if stats are not present
3120            )
3121        );
3122        // c2 = 3 shouldn't add any new statistics fields
3123        assert_eq!(required_columns.columns.len(), 6);
3124
3125        Ok(())
3126    }
3127
3128    #[test]
3129    fn row_group_predicate_in_list() -> Result<()> {
3130        let schema = Schema::new(vec![
3131            Field::new("c1", DataType::Int32, false),
3132            Field::new("c2", DataType::Int32, false),
3133        ]);
3134        // test c1 in(1, 2, 3)
3135        let expr = Expr::InList(InList::new(
3136            Box::new(col("c1")),
3137            vec![lit(1), lit(2), lit(3)],
3138            false,
3139        ));
3140        let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
3141        let predicate_expr =
3142            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3143        assert_eq!(predicate_expr.to_string(), expected_expr);
3144
3145        Ok(())
3146    }
3147
3148    #[test]
3149    fn row_group_predicate_in_list_empty() -> Result<()> {
3150        let schema = Schema::new(vec![
3151            Field::new("c1", DataType::Int32, false),
3152            Field::new("c2", DataType::Int32, false),
3153        ]);
3154        // test c1 in()
3155        let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
3156        let expected_expr = "true";
3157        let predicate_expr =
3158            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3159        assert_eq!(predicate_expr.to_string(), expected_expr);
3160
3161        Ok(())
3162    }
3163
3164    #[test]
3165    fn row_group_predicate_in_list_negated() -> Result<()> {
3166        let schema = Schema::new(vec![
3167            Field::new("c1", DataType::Int32, false),
3168            Field::new("c2", DataType::Int32, false),
3169        ]);
3170        // test c1 not in(1, 2, 3)
3171        let expr = Expr::InList(InList::new(
3172            Box::new(col("c1")),
3173            vec![lit(1), lit(2), lit(3)],
3174            true,
3175        ));
3176        let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
3177        let predicate_expr =
3178            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3179        assert_eq!(predicate_expr.to_string(), expected_expr);
3180
3181        Ok(())
3182    }
3183
3184    #[test]
3185    fn row_group_predicate_between() -> Result<()> {
3186        let schema = Schema::new(vec![
3187            Field::new("c1", DataType::Int32, false),
3188            Field::new("c2", DataType::Int32, false),
3189        ]);
3190
3191        // test c1 BETWEEN 1 AND 5
3192        let expr1 = col("c1").between(lit(1), lit(5));
3193
3194        // test 1 <= c1 <= 5
3195        let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
3196
3197        let predicate_expr1 =
3198            test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
3199
3200        let predicate_expr2 =
3201            test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
3202        assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
3203
3204        Ok(())
3205    }
3206
3207    #[test]
3208    fn row_group_predicate_between_with_in_list() -> Result<()> {
3209        let schema = Schema::new(vec![
3210            Field::new("c1", DataType::Int32, false),
3211            Field::new("c2", DataType::Int32, false),
3212        ]);
3213        // test c1 in(1, 2)
3214        let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
3215
3216        // test c2 BETWEEN 4 AND 5
3217        let expr2 = col("c2").between(lit(4), lit(5));
3218
3219        // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
3220        let expr3 = expr1.and(expr2);
3221
3222        let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
3223        let predicate_expr =
3224            test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
3225        assert_eq!(predicate_expr.to_string(), expected_expr);
3226
3227        Ok(())
3228    }
3229
3230    #[test]
3231    fn row_group_predicate_in_list_to_many_values() -> Result<()> {
3232        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3233        // test c1 in(1..21)
3234        // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
3235        // always true
3236        let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
3237
3238        let expected_expr = "true";
3239        let predicate_expr =
3240            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3241        assert_eq!(predicate_expr.to_string(), expected_expr);
3242
3243        Ok(())
3244    }
3245
3246    #[test]
3247    fn row_group_predicate_cast_int_int() -> Result<()> {
3248        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3249        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3250
3251        // test cast(c1 as int64) = 1
3252        // test column on the left
3253        let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3254        let predicate_expr =
3255            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3256        assert_eq!(predicate_expr.to_string(), expected_expr);
3257
3258        // test column on the right
3259        let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3260        let predicate_expr =
3261            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3262        assert_eq!(predicate_expr.to_string(), expected_expr);
3263
3264        let expected_expr =
3265            "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3266
3267        // test column on the left
3268        let expr =
3269            try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3270        let predicate_expr =
3271            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3272        assert_eq!(predicate_expr.to_string(), expected_expr);
3273
3274        // test column on the right
3275        let expr =
3276            lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3277        let predicate_expr =
3278            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3279        assert_eq!(predicate_expr.to_string(), expected_expr);
3280
3281        Ok(())
3282    }
3283
3284    #[test]
3285    fn row_group_predicate_cast_string_string() -> Result<()> {
3286        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3287        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3288
3289        // test column on the left
3290        let expr = cast(col("c1"), DataType::Utf8)
3291            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3292        let predicate_expr =
3293            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3294        assert_eq!(predicate_expr.to_string(), expected_expr);
3295
3296        // test column on the right
3297        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3298            .eq(cast(col("c1"), DataType::Utf8));
3299        let predicate_expr =
3300            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3301        assert_eq!(predicate_expr.to_string(), expected_expr);
3302
3303        Ok(())
3304    }
3305
3306    #[test]
3307    fn row_group_predicate_cast_string_int() -> Result<()> {
3308        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3309        let expected_expr = "true";
3310
3311        // test column on the left
3312        let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3313        let predicate_expr =
3314            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3315        assert_eq!(predicate_expr.to_string(), expected_expr);
3316
3317        // test column on the right
3318        let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3319        let predicate_expr =
3320            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3321        assert_eq!(predicate_expr.to_string(), expected_expr);
3322
3323        Ok(())
3324    }
3325
3326    #[test]
3327    fn row_group_predicate_cast_int_string() -> Result<()> {
3328        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3329        let expected_expr = "true";
3330
3331        // test column on the left
3332        let expr = cast(col("c1"), DataType::Utf8)
3333            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3334        let predicate_expr =
3335            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3336        assert_eq!(predicate_expr.to_string(), expected_expr);
3337
3338        // test column on the right
3339        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3340            .eq(cast(col("c1"), DataType::Utf8));
3341        let predicate_expr =
3342            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3343        assert_eq!(predicate_expr.to_string(), expected_expr);
3344
3345        Ok(())
3346    }
3347
3348    #[test]
3349    fn row_group_predicate_date_date() -> Result<()> {
3350        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3351        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3352
3353        // test column on the left
3354        let expr =
3355            cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3356        let predicate_expr =
3357            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3358        assert_eq!(predicate_expr.to_string(), expected_expr);
3359
3360        // test column on the right
3361        let expr =
3362            lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3363        let predicate_expr =
3364            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3365        assert_eq!(predicate_expr.to_string(), expected_expr);
3366
3367        Ok(())
3368    }
3369
3370    #[test]
3371    fn row_group_predicate_dict_string_date() -> Result<()> {
3372        // Test with Dictionary<UInt8, Utf8> for the literal
3373        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3374        let expected_expr = "true";
3375
3376        // test column on the left
3377        let expr = cast(
3378            col("c1"),
3379            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3380        )
3381        .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3382        let predicate_expr =
3383            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3384        assert_eq!(predicate_expr.to_string(), expected_expr);
3385
3386        // test column on the right
3387        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3388            col("c1"),
3389            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3390        ));
3391        let predicate_expr =
3392            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3393        assert_eq!(predicate_expr.to_string(), expected_expr);
3394
3395        Ok(())
3396    }
3397
3398    #[test]
3399    fn row_group_predicate_date_dict_string() -> Result<()> {
3400        // Test with Dictionary<UInt8, Utf8> for the column
3401        let schema = Schema::new(vec![Field::new(
3402            "c1",
3403            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3404            false,
3405        )]);
3406        let expected_expr = "true";
3407
3408        // test column on the left
3409        let expr =
3410            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3411        let predicate_expr =
3412            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3413        assert_eq!(predicate_expr.to_string(), expected_expr);
3414
3415        // test column on the right
3416        let expr =
3417            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3418        let predicate_expr =
3419            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3420        assert_eq!(predicate_expr.to_string(), expected_expr);
3421
3422        Ok(())
3423    }
3424
3425    #[test]
3426    fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3427        // Test with Dictionary types that have the same value type but different key types
3428        let schema = Schema::new(vec![Field::new(
3429            "c1",
3430            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3431            false,
3432        )]);
3433
3434        // Direct comparison with no cast
3435        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3436        let predicate_expr =
3437            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3438        let expected_expr =
3439            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3440        assert_eq!(predicate_expr.to_string(), expected_expr);
3441
3442        // Test with column cast to a dictionary with different key type
3443        let expr = cast(
3444            col("c1"),
3445            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3446        )
3447        .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3448        let predicate_expr =
3449            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3450        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3451        assert_eq!(predicate_expr.to_string(), expected_expr);
3452
3453        Ok(())
3454    }
3455
3456    #[test]
3457    fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3458        // Test with Dictionary types that have different value types
3459        let schema = Schema::new(vec![Field::new(
3460            "c1",
3461            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3462            false,
3463        )]);
3464        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3465
3466        // Test with literal of a different type
3467        let expr =
3468            cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3469        let predicate_expr =
3470            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3471        assert_eq!(predicate_expr.to_string(), expected_expr);
3472
3473        Ok(())
3474    }
3475
3476    #[test]
3477    fn row_group_predicate_nested_dict() -> Result<()> {
3478        // Test with nested Dictionary types
3479        let schema = Schema::new(vec![Field::new(
3480            "c1",
3481            DataType::Dictionary(
3482                Box::new(DataType::UInt8),
3483                Box::new(DataType::Dictionary(
3484                    Box::new(DataType::UInt16),
3485                    Box::new(DataType::Utf8),
3486                )),
3487            ),
3488            false,
3489        )]);
3490        let expected_expr =
3491            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3492
3493        // Test with a simple literal
3494        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3495        let predicate_expr =
3496            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3497        assert_eq!(predicate_expr.to_string(), expected_expr);
3498
3499        Ok(())
3500    }
3501
3502    #[test]
3503    fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3504        // Test with dictionary-wrapped date types for both sides
3505        let schema = Schema::new(vec![Field::new(
3506            "c1",
3507            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3508            false,
3509        )]);
3510        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3511
3512        // Test with a cast to a different date type
3513        let expr = cast(
3514            col("c1"),
3515            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3516        )
3517        .eq(lit(ScalarValue::Date64(Some(123))));
3518        let predicate_expr =
3519            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3520        assert_eq!(predicate_expr.to_string(), expected_expr);
3521
3522        Ok(())
3523    }
3524
3525    #[test]
3526    fn row_group_predicate_date_string() -> Result<()> {
3527        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3528        let expected_expr = "true";
3529
3530        // test column on the left
3531        let expr =
3532            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3533        let predicate_expr =
3534            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3535        assert_eq!(predicate_expr.to_string(), expected_expr);
3536
3537        // test column on the right
3538        let expr =
3539            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3540        let predicate_expr =
3541            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3542        assert_eq!(predicate_expr.to_string(), expected_expr);
3543
3544        Ok(())
3545    }
3546
3547    #[test]
3548    fn row_group_predicate_string_date() -> Result<()> {
3549        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3550        let expected_expr = "true";
3551
3552        // test column on the left
3553        let expr = cast(col("c1"), DataType::Utf8)
3554            .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3555        let predicate_expr =
3556            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3557        assert_eq!(predicate_expr.to_string(), expected_expr);
3558
3559        // test column on the right
3560        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3561            .eq(cast(col("c1"), DataType::Utf8));
3562        let predicate_expr =
3563            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3564        assert_eq!(predicate_expr.to_string(), expected_expr);
3565
3566        Ok(())
3567    }
3568
3569    #[test]
3570    fn row_group_predicate_cast_list() -> Result<()> {
3571        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3572        // test cast(c1 as int64) in int64(1, 2, 3)
3573        let expr = Expr::InList(InList::new(
3574            Box::new(cast(col("c1"), DataType::Int64)),
3575            vec![
3576                lit(ScalarValue::Int64(Some(1))),
3577                lit(ScalarValue::Int64(Some(2))),
3578                lit(ScalarValue::Int64(Some(3))),
3579            ],
3580            false,
3581        ));
3582        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3583        let predicate_expr =
3584            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3585        assert_eq!(predicate_expr.to_string(), expected_expr);
3586
3587        let expr = Expr::InList(InList::new(
3588            Box::new(cast(col("c1"), DataType::Int64)),
3589            vec![
3590                lit(ScalarValue::Int64(Some(1))),
3591                lit(ScalarValue::Int64(Some(2))),
3592                lit(ScalarValue::Int64(Some(3))),
3593            ],
3594            true,
3595        ));
3596        let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3597        let predicate_expr =
3598            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3599        assert_eq!(predicate_expr.to_string(), expected_expr);
3600
3601        Ok(())
3602    }
3603
3604    #[test]
3605    fn prune_decimal_data() {
3606        // decimal(9,2)
3607        let schema = Arc::new(Schema::new(vec![Field::new(
3608            "s1",
3609            DataType::Decimal128(9, 2),
3610            true,
3611        )]));
3612
3613        prune_with_expr(
3614            // s1 > 5
3615            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3616            &schema,
3617            // If the data is written by spark, the physical data type is INT32 in the parquet
3618            // So we use the INT32 type of statistic.
3619            &TestStatistics::new().with(
3620                "s1",
3621                ContainerStats::new_i32(
3622                    vec![Some(0), Some(4), None, Some(3)], // min
3623                    vec![Some(5), Some(6), Some(4), None], // max
3624                ),
3625            ),
3626            &[false, true, false, true],
3627        );
3628
3629        prune_with_expr(
3630            // with cast column to other type
3631            cast(col("s1"), DataType::Decimal128(14, 3))
3632                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3633            &schema,
3634            &TestStatistics::new().with(
3635                "s1",
3636                ContainerStats::new_i32(
3637                    vec![Some(0), Some(4), None, Some(3)], // min
3638                    vec![Some(5), Some(6), Some(4), None], // max
3639                ),
3640            ),
3641            &[false, true, false, true],
3642        );
3643
3644        prune_with_expr(
3645            // with try cast column to other type
3646            try_cast(col("s1"), DataType::Decimal128(14, 3))
3647                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3648            &schema,
3649            &TestStatistics::new().with(
3650                "s1",
3651                ContainerStats::new_i32(
3652                    vec![Some(0), Some(4), None, Some(3)], // min
3653                    vec![Some(5), Some(6), Some(4), None], // max
3654                ),
3655            ),
3656            &[false, true, false, true],
3657        );
3658
3659        // decimal(18,2)
3660        let schema = Arc::new(Schema::new(vec![Field::new(
3661            "s1",
3662            DataType::Decimal128(18, 2),
3663            true,
3664        )]));
3665        prune_with_expr(
3666            // s1 > 5
3667            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3668            &schema,
3669            // If the data is written by spark, the physical data type is INT64 in the parquet
3670            // So we use the INT32 type of statistic.
3671            &TestStatistics::new().with(
3672                "s1",
3673                ContainerStats::new_i64(
3674                    vec![Some(0), Some(4), None, Some(3)], // min
3675                    vec![Some(5), Some(6), Some(4), None], // max
3676                ),
3677            ),
3678            &[false, true, false, true],
3679        );
3680
3681        // decimal(23,2)
3682        let schema = Arc::new(Schema::new(vec![Field::new(
3683            "s1",
3684            DataType::Decimal128(23, 2),
3685            true,
3686        )]));
3687
3688        prune_with_expr(
3689            // s1 > 5
3690            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3691            &schema,
3692            &TestStatistics::new().with(
3693                "s1",
3694                ContainerStats::new_decimal128(
3695                    vec![Some(0), Some(400), None, Some(300)], // min
3696                    vec![Some(500), Some(600), Some(400), None], // max
3697                    23,
3698                    2,
3699                ),
3700            ),
3701            &[false, true, false, true],
3702        );
3703    }
3704
3705    #[test]
3706    fn prune_api() {
3707        let schema = Arc::new(Schema::new(vec![
3708            Field::new("s1", DataType::Utf8, true),
3709            Field::new("s2", DataType::Int32, true),
3710        ]));
3711
3712        let statistics = TestStatistics::new().with(
3713            "s2",
3714            ContainerStats::new_i32(
3715                vec![Some(0), Some(4), None, Some(3)], // min
3716                vec![Some(5), Some(6), None, None],    // max
3717            ),
3718        );
3719        prune_with_expr(
3720            // Prune using s2 > 5
3721            col("s2").gt(lit(5)),
3722            &schema,
3723            &statistics,
3724            // s2 [0, 5] ==> no rows should pass
3725            // s2 [4, 6] ==> some rows could pass
3726            // No stats for s2 ==> some rows could pass
3727            // s2 [3, None] (null max) ==> some rows could pass
3728            &[false, true, true, true],
3729        );
3730
3731        prune_with_expr(
3732            // filter with cast
3733            cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3734            &schema,
3735            &statistics,
3736            &[false, true, true, true],
3737        );
3738    }
3739
3740    #[test]
3741    fn prune_not_eq_data() {
3742        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3743
3744        prune_with_expr(
3745            // Prune using s2 != 'M'
3746            col("s1").not_eq(lit("M")),
3747            &schema,
3748            &TestStatistics::new().with(
3749                "s1",
3750                ContainerStats::new_utf8(
3751                    vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3752                    vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3753                ),
3754            ),
3755            // s1 [A, Z] ==> might have values that pass predicate
3756            // s1 [A, L] ==> all rows pass the predicate
3757            // s1 [N, Z] ==> all rows pass the predicate
3758            // s1 [M, M] ==> all rows do not pass the predicate
3759            // No stats for s2 ==> some rows could pass
3760            // s2 [3, None] (null max) ==> some rows could pass
3761            &[true, true, true, false, true, true],
3762        );
3763    }
3764
3765    /// Creates setup for boolean chunk pruning
3766    ///
3767    /// For predicate "b1" (boolean expr)
3768    /// b1 [false, false] ==> no rows can pass (not keep)
3769    /// b1 [false, true] ==> some rows could pass (must keep)
3770    /// b1 [true, true] ==> all rows must pass (must keep)
3771    /// b1 [NULL, NULL]  ==> unknown (must keep)
3772    /// b1 [false, NULL]  ==> unknown (must keep)
3773    ///
3774    /// For predicate "!b1" (boolean expr)
3775    /// b1 [false, false] ==> all rows pass (must keep)
3776    /// b1 [false, true] ==> some rows could pass (must keep)
3777    /// b1 [true, true] ==> no rows can pass (not keep)
3778    /// b1 [NULL, NULL]  ==> unknown (must keep)
3779    /// b1 [false, NULL]  ==> unknown (must keep)
3780    fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3781        let schema =
3782            Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3783
3784        let statistics = TestStatistics::new().with(
3785            "b1",
3786            ContainerStats::new_bool(
3787                vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3788                vec![Some(false), Some(true), Some(true), None, None],         // max
3789            ),
3790        );
3791        let expected_true = vec![false, true, true, true, true];
3792        let expected_false = vec![true, true, false, true, true];
3793
3794        (schema, statistics, expected_true, expected_false)
3795    }
3796
3797    #[test]
3798    fn prune_bool_const_expr() {
3799        let (schema, statistics, _, _) = bool_setup();
3800
3801        prune_with_expr(
3802            // true
3803            lit(true),
3804            &schema,
3805            &statistics,
3806            &[true, true, true, true, true],
3807        );
3808
3809        prune_with_expr(
3810            // false
3811            lit(false),
3812            &schema,
3813            &statistics,
3814            &[false, false, false, false, false],
3815        );
3816    }
3817
3818    #[test]
3819    fn prune_bool_column() {
3820        let (schema, statistics, expected_true, _) = bool_setup();
3821
3822        prune_with_expr(
3823            // b1
3824            col("b1"),
3825            &schema,
3826            &statistics,
3827            &expected_true,
3828        );
3829    }
3830
3831    #[test]
3832    fn prune_bool_not_column() {
3833        let (schema, statistics, _, expected_false) = bool_setup();
3834
3835        prune_with_expr(
3836            // !b1
3837            col("b1").not(),
3838            &schema,
3839            &statistics,
3840            &expected_false,
3841        );
3842    }
3843
3844    #[test]
3845    fn prune_bool_column_eq_true() {
3846        let (schema, statistics, expected_true, _) = bool_setup();
3847
3848        prune_with_expr(
3849            // b1 = true
3850            col("b1").eq(lit(true)),
3851            &schema,
3852            &statistics,
3853            &expected_true,
3854        );
3855    }
3856
3857    #[test]
3858    fn prune_bool_not_column_eq_true() {
3859        let (schema, statistics, _, expected_false) = bool_setup();
3860
3861        prune_with_expr(
3862            // !b1 = true
3863            col("b1").not().eq(lit(true)),
3864            &schema,
3865            &statistics,
3866            &expected_false,
3867        );
3868    }
3869
3870    /// Creates a setup for chunk pruning, modeling a int32 column "i"
3871    /// with 5 different containers (e.g. RowGroups). They have [min,
3872    /// max]:
3873    ///
3874    /// i [-5, 5]
3875    /// i [1, 11]
3876    /// i [-11, -1]
3877    /// i [NULL, NULL]
3878    /// i [1, NULL]
3879    fn int32_setup() -> (SchemaRef, TestStatistics) {
3880        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3881
3882        let statistics = TestStatistics::new().with(
3883            "i",
3884            ContainerStats::new_i32(
3885                vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3886                vec![Some(5), Some(11), Some(-1), None, None],     // max
3887            ),
3888        );
3889        (schema, statistics)
3890    }
3891
3892    #[test]
3893    fn prune_int32_col_gt_zero() {
3894        let (schema, statistics) = int32_setup();
3895
3896        // Expression "i > 0" and "-i < 0"
3897        // i [-5, 5] ==> some rows could pass (must keep)
3898        // i [1, 11] ==> all rows must pass (must keep)
3899        // i [-11, -1] ==>  no rows can pass (not keep)
3900        // i [NULL, NULL]  ==> unknown (must keep)
3901        // i [1, NULL]  ==> unknown (must keep)
3902        let expected_ret = &[true, true, false, true, true];
3903
3904        // i > 0
3905        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3906
3907        // -i < 0
3908        prune_with_expr(
3909            Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3910            &schema,
3911            &statistics,
3912            expected_ret,
3913        );
3914    }
3915
3916    #[test]
3917    fn prune_int32_col_lte_zero() {
3918        let (schema, statistics) = int32_setup();
3919
3920        // Expression "i <= 0" and "-i >= 0"
3921        // i [-5, 5] ==> some rows could pass (must keep)
3922        // i [1, 11] ==> no rows can pass (not keep)
3923        // i [-11, -1] ==>  all rows must pass (must keep)
3924        // i [NULL, NULL]  ==> unknown (must keep)
3925        // i [1, NULL]  ==> no rows can pass (not keep)
3926        let expected_ret = &[true, false, true, true, false];
3927
3928        prune_with_expr(
3929            // i <= 0
3930            col("i").lt_eq(lit(0)),
3931            &schema,
3932            &statistics,
3933            expected_ret,
3934        );
3935
3936        prune_with_expr(
3937            // -i >= 0
3938            Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3939            &schema,
3940            &statistics,
3941            expected_ret,
3942        );
3943    }
3944
3945    #[test]
3946    fn prune_int32_col_lte_zero_cast() {
3947        let (schema, statistics) = int32_setup();
3948
3949        // Expression "cast(i as utf8) <= '0'"
3950        // i [-5, 5] ==> some rows could pass (must keep)
3951        // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
3952        // i [-11, -1] ==>  no rows could pass in theory (conservatively keep)
3953        // i [NULL, NULL]  ==> unknown (must keep)
3954        // i [1, NULL]  ==> no rows can pass (conservatively keep)
3955        let expected_ret = &[true, true, true, true, true];
3956
3957        prune_with_expr(
3958            // cast(i as utf8) <= 0
3959            cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3960            &schema,
3961            &statistics,
3962            expected_ret,
3963        );
3964
3965        prune_with_expr(
3966            // try_cast(i as utf8) <= 0
3967            try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3968            &schema,
3969            &statistics,
3970            expected_ret,
3971        );
3972
3973        prune_with_expr(
3974            // cast(-i as utf8) >= 0
3975            cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3976            &schema,
3977            &statistics,
3978            expected_ret,
3979        );
3980
3981        prune_with_expr(
3982            // try_cast(-i as utf8) >= 0
3983            try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3984            &schema,
3985            &statistics,
3986            expected_ret,
3987        );
3988    }
3989
3990    #[test]
3991    fn prune_int32_col_eq_zero() {
3992        let (schema, statistics) = int32_setup();
3993
3994        // Expression "i = 0"
3995        // i [-5, 5] ==> some rows could pass (must keep)
3996        // i [1, 11] ==> no rows can pass (not keep)
3997        // i [-11, -1] ==>  no rows can pass (not keep)
3998        // i [NULL, NULL]  ==> unknown (must keep)
3999        // i [1, NULL]  ==> no rows can pass (not keep)
4000        let expected_ret = &[true, false, false, true, false];
4001
4002        prune_with_expr(
4003            // i = 0
4004            col("i").eq(lit(0)),
4005            &schema,
4006            &statistics,
4007            expected_ret,
4008        );
4009    }
4010
4011    #[test]
4012    fn prune_int32_col_eq_zero_cast() {
4013        let (schema, statistics) = int32_setup();
4014
4015        // Expression "cast(i as int64) = 0"
4016        // i [-5, 5] ==> some rows could pass (must keep)
4017        // i [1, 11] ==> no rows can pass (not keep)
4018        // i [-11, -1] ==>  no rows can pass (not keep)
4019        // i [NULL, NULL]  ==> unknown (must keep)
4020        // i [1, NULL]  ==> no rows can pass (not keep)
4021        let expected_ret = &[true, false, false, true, false];
4022
4023        prune_with_expr(
4024            cast(col("i"), DataType::Int64).eq(lit(0i64)),
4025            &schema,
4026            &statistics,
4027            expected_ret,
4028        );
4029
4030        prune_with_expr(
4031            try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
4032            &schema,
4033            &statistics,
4034            expected_ret,
4035        );
4036    }
4037
4038    #[test]
4039    fn prune_int32_col_eq_zero_cast_as_str() {
4040        let (schema, statistics) = int32_setup();
4041
4042        // Note the cast is to a string where sorting properties are
4043        // not the same as integers
4044        //
4045        // Expression "cast(i as utf8) = '0'"
4046        // i [-5, 5] ==> some rows could pass (keep)
4047        // i [1, 11] ==> no rows can pass  (could keep)
4048        // i [-11, -1] ==>  no rows can pass (could keep)
4049        // i [NULL, NULL]  ==> unknown (keep)
4050        // i [1, NULL]  ==> no rows can pass (could keep)
4051        let expected_ret = &[true, true, true, true, true];
4052
4053        prune_with_expr(
4054            cast(col("i"), DataType::Utf8).eq(lit("0")),
4055            &schema,
4056            &statistics,
4057            expected_ret,
4058        );
4059    }
4060
4061    #[test]
4062    fn prune_int32_col_lt_neg_one() {
4063        let (schema, statistics) = int32_setup();
4064
4065        // Expression "i > -1" and "-i < 1"
4066        // i [-5, 5] ==> some rows could pass (must keep)
4067        // i [1, 11] ==> all rows must pass (must keep)
4068        // i [-11, -1] ==>  no rows can pass (not keep)
4069        // i [NULL, NULL]  ==> unknown (must keep)
4070        // i [1, NULL]  ==> all rows must pass (must keep)
4071        let expected_ret = &[true, true, false, true, true];
4072
4073        prune_with_expr(
4074            // i > -1
4075            col("i").gt(lit(-1)),
4076            &schema,
4077            &statistics,
4078            expected_ret,
4079        );
4080
4081        prune_with_expr(
4082            // -i < 1
4083            Expr::Negative(Box::new(col("i"))).lt(lit(1)),
4084            &schema,
4085            &statistics,
4086            expected_ret,
4087        );
4088    }
4089
4090    #[test]
4091    fn prune_int32_is_null() {
4092        let (schema, statistics) = int32_setup();
4093
4094        // Expression "i IS NULL" when there are no null statistics,
4095        // should all be kept
4096        let expected_ret = &[true, true, true, true, true];
4097
4098        prune_with_expr(
4099            // i IS NULL, no null statistics
4100            col("i").is_null(),
4101            &schema,
4102            &statistics,
4103            expected_ret,
4104        );
4105
4106        // provide null counts for each column
4107        let statistics = statistics.with_null_counts(
4108            "i",
4109            vec![
4110                Some(0), // no nulls (don't keep)
4111                Some(1), // 1 null
4112                None,    // unknown nulls
4113                None, // unknown nulls (min/max are both null too, like no stats at all)
4114                Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
4115            ],
4116        );
4117
4118        let expected_ret = &[false, true, true, true, false];
4119
4120        prune_with_expr(
4121            // i IS NULL, with actual null statistics
4122            col("i").is_null(),
4123            &schema,
4124            &statistics,
4125            expected_ret,
4126        );
4127    }
4128
4129    #[test]
4130    fn prune_int32_column_is_known_all_null() {
4131        let (schema, statistics) = int32_setup();
4132
4133        // Expression "i < 0"
4134        // i [-5, 5] ==> some rows could pass (must keep)
4135        // i [1, 11] ==> no rows can pass (not keep)
4136        // i [-11, -1] ==>  all rows must pass (must keep)
4137        // i [NULL, NULL]  ==> unknown (must keep)
4138        // i [1, NULL]  ==> no rows can pass (not keep)
4139        let expected_ret = &[true, false, true, true, false];
4140
4141        prune_with_expr(
4142            // i < 0
4143            col("i").lt(lit(0)),
4144            &schema,
4145            &statistics,
4146            expected_ret,
4147        );
4148
4149        // provide row counts for each column
4150        let statistics = statistics.with_row_counts(
4151            "i",
4152            vec![
4153                Some(10), // 10 rows of data
4154                Some(9),  // 9 rows of data
4155                None,     // unknown row counts
4156                Some(4),
4157                Some(10),
4158            ],
4159        );
4160
4161        // pruning result is still the same if we only know row counts
4162        prune_with_expr(
4163            // i < 0, with only row counts statistics
4164            col("i").lt(lit(0)),
4165            &schema,
4166            &statistics,
4167            expected_ret,
4168        );
4169
4170        // provide null counts for each column
4171        let statistics = statistics.with_null_counts(
4172            "i",
4173            vec![
4174                Some(0), // no nulls
4175                Some(1), // 1 null
4176                None,    // unknown nulls
4177                Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
4178                Some(0), // 0 nulls (max=null too which means no known max)
4179            ],
4180        );
4181
4182        // Expression "i < 0" with actual null and row counts statistics
4183        // col | min, max     | row counts | null counts |
4184        // ----+--------------+------------+-------------+
4185        //  i  | [-5, 5]      | 10         | 0           | ==> Some rows could pass (must keep)
4186        //  i  | [1, 11]      | 9          | 1           | ==> No rows can pass (not keep)
4187        //  i  | [-11,-1]     | Unknown    | Unknown     | ==> All rows must pass (must keep)
4188        //  i  | [NULL, NULL] | 4          | 4           | ==> The column is all null (not keep)
4189        //  i  | [1, NULL]    | 10         | 0           | ==> No rows can pass (not keep)
4190        let expected_ret = &[true, false, true, false, false];
4191
4192        prune_with_expr(
4193            // i < 0, with actual null and row counts statistics
4194            col("i").lt(lit(0)),
4195            &schema,
4196            &statistics,
4197            expected_ret,
4198        );
4199    }
4200
4201    #[test]
4202    fn prune_cast_column_scalar() {
4203        // The data type of column i is INT32
4204        let (schema, statistics) = int32_setup();
4205        let expected_ret = &[true, true, false, true, true];
4206
4207        prune_with_expr(
4208            // i > int64(0)
4209            col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
4210            &schema,
4211            &statistics,
4212            expected_ret,
4213        );
4214
4215        prune_with_expr(
4216            // cast(i as int64) > int64(0)
4217            cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4218            &schema,
4219            &statistics,
4220            expected_ret,
4221        );
4222
4223        prune_with_expr(
4224            // try_cast(i as int64) > int64(0)
4225            try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4226            &schema,
4227            &statistics,
4228            expected_ret,
4229        );
4230
4231        prune_with_expr(
4232            // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
4233            Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
4234                .lt(lit(ScalarValue::Int64(Some(0)))),
4235            &schema,
4236            &statistics,
4237            expected_ret,
4238        );
4239    }
4240
4241    #[test]
4242    fn test_increment_utf8() {
4243        // Basic ASCII
4244        assert_eq!(increment_utf8("abc").unwrap(), "abd");
4245        assert_eq!(increment_utf8("abz").unwrap(), "ab{");
4246
4247        // Test around ASCII 127 (DEL)
4248        assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
4249        assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
4250
4251        // Test 2-byte UTF-8 sequences
4252        assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
4253
4254        // Test 3-byte UTF-8 sequences
4255        assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
4256
4257        // Test at UTF-8 boundaries
4258        assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
4259        assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
4260
4261        // Test that if we can't increment we return None
4262        assert!(increment_utf8("").is_none());
4263        assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
4264
4265        // Test that if we can't increment the last character we do the previous one and truncate
4266        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4267
4268        // Test surrogate pair range (0xD800..=0xDFFF)
4269        assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4270        assert!(increment_utf8("\u{D7FF}").is_none());
4271
4272        // Test non-characters range (0xFDD0..=0xFDEF)
4273        assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4274        assert!(increment_utf8("\u{FDCF}").is_none());
4275
4276        // Test private use area limit (>= 0x110000)
4277        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4278        assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
4279    }
4280
4281    /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
4282    /// with 5 different containers (e.g. RowGroups). They have [min,
4283    /// max]:
4284    /// s1 ["A", "Z"]
4285    /// s1 ["A", "L"]
4286    /// s1 ["N", "Z"]
4287    /// s1 [NULL, NULL]
4288    /// s1 ["A", NULL]
4289    /// s1 ["", "A"]
4290    /// s1 ["", ""]
4291    /// s1 ["AB", "A\u{10ffff}"]
4292    /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
4293    fn utf8_setup() -> (SchemaRef, TestStatistics) {
4294        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4295
4296        let statistics = TestStatistics::new().with(
4297            "s1",
4298            ContainerStats::new_utf8(
4299                vec![
4300                    Some("A"),
4301                    Some("A"),
4302                    Some("N"),
4303                    Some("M"),
4304                    None,
4305                    Some("A"),
4306                    Some(""),
4307                    Some(""),
4308                    Some("AB"),
4309                    Some("A\u{10ffff}\u{10ffff}"),
4310                ], // min
4311                vec![
4312                    Some("Z"),
4313                    Some("L"),
4314                    Some("Z"),
4315                    Some("M"),
4316                    None,
4317                    None,
4318                    Some("A"),
4319                    Some(""),
4320                    Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4321                    Some("A\u{10ffff}\u{10ffff}"),
4322                ], // max
4323            ),
4324        );
4325        (schema, statistics)
4326    }
4327
4328    #[test]
4329    fn prune_utf8_eq() {
4330        let (schema, statistics) = utf8_setup();
4331
4332        let expr = col("s1").eq(lit("A"));
4333        #[rustfmt::skip]
4334        let expected_ret = &[
4335            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4336            true,
4337            // s1 ["A", "L"] ==> some rows could pass (must keep)
4338            true,
4339            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4340            false,
4341            // s1 ["M", "M"] ==> no rows can pass (not keep)
4342            false,
4343            // s1 [NULL, NULL]  ==> unknown (must keep)
4344            true,
4345            // s1 ["A", NULL]  ==> unknown (must keep)
4346            true,
4347            // s1 ["", "A"]  ==> some rows could pass (must keep)
4348            true,
4349            // s1 ["", ""]  ==> no rows can pass (not keep)
4350            false,
4351            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4352            false,
4353            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4354            false,
4355        ];
4356        prune_with_expr(expr, &schema, &statistics, expected_ret);
4357
4358        let expr = col("s1").eq(lit(""));
4359        #[rustfmt::skip]
4360        let expected_ret = &[
4361            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4362            false,
4363            // s1 ["A", "L"] ==> no rows can pass (not keep)
4364            false,
4365            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4366            false,
4367            // s1 ["M", "M"] ==> no rows can pass (not keep)
4368            false,
4369            // s1 [NULL, NULL]  ==> unknown (must keep)
4370            true,
4371            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4372            false,
4373            // s1 ["", "A"]  ==> some rows could pass (must keep)
4374            true,
4375            // s1 ["", ""]  ==> all rows must pass (must keep)
4376            true,
4377            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4378            false,
4379            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4380            false,
4381        ];
4382        prune_with_expr(expr, &schema, &statistics, expected_ret);
4383    }
4384
4385    #[test]
4386    fn prune_utf8_not_eq() {
4387        let (schema, statistics) = utf8_setup();
4388
4389        let expr = col("s1").not_eq(lit("A"));
4390        #[rustfmt::skip]
4391        let expected_ret = &[
4392            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4393            true,
4394            // s1 ["A", "L"] ==> some rows could pass (must keep)
4395            true,
4396            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4397            true,
4398            // s1 ["M", "M"] ==> all rows must pass (must keep)
4399            true,
4400            // s1 [NULL, NULL]  ==> unknown (must keep)
4401            true,
4402            // s1 ["A", NULL]  ==> unknown (must keep)
4403            true,
4404            // s1 ["", "A"]  ==> some rows could pass (must keep)
4405            true,
4406            // s1 ["", ""]  ==> all rows must pass (must keep)
4407            true,
4408            // s1 ["AB", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4409            true,
4410            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4411            true,
4412        ];
4413        prune_with_expr(expr, &schema, &statistics, expected_ret);
4414
4415        let expr = col("s1").not_eq(lit(""));
4416        #[rustfmt::skip]
4417        let expected_ret = &[
4418            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4419            true,
4420            // s1 ["A", "L"] ==> all rows must pass (must keep)
4421            true,
4422            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4423            true,
4424            // s1 ["M", "M"] ==> all rows must pass (must keep)
4425            true,
4426            // s1 [NULL, NULL]  ==> unknown (must keep)
4427            true,
4428            // s1 ["A", NULL]  ==> unknown (must keep)
4429            true,
4430            // s1 ["", "A"]  ==> some rows could pass (must keep)
4431            true,
4432            // s1 ["", ""]  ==> no rows can pass (not keep)
4433            false,
4434            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4435            true,
4436            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4437            true,
4438        ];
4439        prune_with_expr(expr, &schema, &statistics, expected_ret);
4440    }
4441
4442    #[test]
4443    fn prune_utf8_like_one() {
4444        let (schema, statistics) = utf8_setup();
4445
4446        let expr = col("s1").like(lit("A_"));
4447        #[rustfmt::skip]
4448        let expected_ret = &[
4449            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4450            true,
4451            // s1 ["A", "L"] ==> some rows could pass (must keep)
4452            true,
4453            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4454            false,
4455            // s1 ["M", "M"] ==> no rows can pass (not keep)
4456            false,
4457            // s1 [NULL, NULL]  ==> unknown (must keep)
4458            true,
4459            // s1 ["A", NULL]  ==> unknown (must keep)
4460            true,
4461            // s1 ["", "A"]  ==> some rows could pass (must keep)
4462            true,
4463            // s1 ["", ""]  ==> no rows can pass (not keep)
4464            false,
4465            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4466            true,
4467            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4468            true,
4469        ];
4470        prune_with_expr(expr, &schema, &statistics, expected_ret);
4471
4472        let expr = col("s1").like(lit("_A_"));
4473        #[rustfmt::skip]
4474        let expected_ret = &[
4475            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4476            true,
4477            // s1 ["A", "L"] ==> some rows could pass (must keep)
4478            true,
4479            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4480            true,
4481            // s1 ["M", "M"] ==> some rows could pass (must keep)
4482            true,
4483            // s1 [NULL, NULL]  ==> unknown (must keep)
4484            true,
4485            // s1 ["A", NULL]  ==> unknown (must keep)
4486            true,
4487            // s1 ["", "A"]  ==> some rows could pass (must keep)
4488            true,
4489            // s1 ["", ""]  ==> some rows could pass (must keep)
4490            true,
4491            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4492            true,
4493            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4494            true,
4495        ];
4496        prune_with_expr(expr, &schema, &statistics, expected_ret);
4497
4498        let expr = col("s1").like(lit("_"));
4499        #[rustfmt::skip]
4500        let expected_ret = &[
4501            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4502            true,
4503            // s1 ["A", "L"] ==> all rows must pass (must keep)
4504            true,
4505            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4506            true,
4507            // s1 ["M", "M"] ==> all rows must pass (must keep)
4508            true,
4509            // s1 [NULL, NULL]  ==> unknown (must keep)
4510            true,
4511            // s1 ["A", NULL]  ==> unknown (must keep)
4512            true,
4513            // s1 ["", "A"]  ==> all rows must pass (must keep)
4514            true,
4515            // s1 ["", ""]  ==> all rows must pass (must keep)
4516            true,
4517            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4518            true,
4519            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4520            true,
4521        ];
4522        prune_with_expr(expr, &schema, &statistics, expected_ret);
4523
4524        let expr = col("s1").like(lit(""));
4525        #[rustfmt::skip]
4526        let expected_ret = &[
4527            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4528            false,
4529            // s1 ["A", "L"] ==> no rows can pass (not keep)
4530            false,
4531            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4532            false,
4533            // s1 ["M", "M"] ==> no rows can pass (not keep)
4534            false,
4535            // s1 [NULL, NULL]  ==> unknown (must keep)
4536            true,
4537            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4538            false,
4539            // s1 ["", "A"]  ==> some rows could pass (must keep)
4540            true,
4541            // s1 ["", ""]  ==> all rows must pass (must keep)
4542            true,
4543            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4544            false,
4545            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4546            false,
4547        ];
4548        prune_with_expr(expr, &schema, &statistics, expected_ret);
4549    }
4550
4551    #[test]
4552    fn prune_utf8_like_many() {
4553        let (schema, statistics) = utf8_setup();
4554
4555        let expr = col("s1").like(lit("A%"));
4556        #[rustfmt::skip]
4557        let expected_ret = &[
4558            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4559            true,
4560            // s1 ["A", "L"] ==> some rows could pass (must keep)
4561            true,
4562            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4563            false,
4564            // s1 ["M", "M"] ==> no rows can pass (not keep)
4565            false,
4566            // s1 [NULL, NULL]  ==> unknown (must keep)
4567            true,
4568            // s1 ["A", NULL]  ==> unknown (must keep)
4569            true,
4570            // s1 ["", "A"]  ==> some rows could pass (must keep)
4571            true,
4572            // s1 ["", ""]  ==> no rows can pass (not keep)
4573            false,
4574            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4575            true,
4576            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4577            true,
4578        ];
4579        prune_with_expr(expr, &schema, &statistics, expected_ret);
4580
4581        let expr = col("s1").like(lit("%A%"));
4582        #[rustfmt::skip]
4583        let expected_ret = &[
4584            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4585            true,
4586            // s1 ["A", "L"] ==> some rows could pass (must keep)
4587            true,
4588            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4589            true,
4590            // s1 ["M", "M"] ==> some rows could pass (must keep)
4591            true,
4592            // s1 [NULL, NULL]  ==> unknown (must keep)
4593            true,
4594            // s1 ["A", NULL]  ==> unknown (must keep)
4595            true,
4596            // s1 ["", "A"]  ==> some rows could pass (must keep)
4597            true,
4598            // s1 ["", ""]  ==> some rows could pass (must keep)
4599            true,
4600            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4601            true,
4602            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4603            true,
4604        ];
4605        prune_with_expr(expr, &schema, &statistics, expected_ret);
4606
4607        let expr = col("s1").like(lit("%"));
4608        #[rustfmt::skip]
4609        let expected_ret = &[
4610            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4611            true,
4612            // s1 ["A", "L"] ==> all rows must pass (must keep)
4613            true,
4614            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4615            true,
4616            // s1 ["M", "M"] ==> all rows must pass (must keep)
4617            true,
4618            // s1 [NULL, NULL]  ==> unknown (must keep)
4619            true,
4620            // s1 ["A", NULL]  ==> unknown (must keep)
4621            true,
4622            // s1 ["", "A"]  ==> all rows must pass (must keep)
4623            true,
4624            // s1 ["", ""]  ==> all rows must pass (must keep)
4625            true,
4626            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4627            true,
4628            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4629            true,
4630        ];
4631        prune_with_expr(expr, &schema, &statistics, expected_ret);
4632
4633        let expr = col("s1").like(lit(""));
4634        #[rustfmt::skip]
4635        let expected_ret = &[
4636            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4637            false,
4638            // s1 ["A", "L"] ==> no rows can pass (not keep)
4639            false,
4640            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4641            false,
4642            // s1 ["M", "M"] ==> no rows can pass (not keep)
4643            false,
4644            // s1 [NULL, NULL]  ==> unknown (must keep)
4645            true,
4646            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4647            false,
4648            // s1 ["", "A"]  ==> some rows could pass (must keep)
4649            true,
4650            // s1 ["", ""]  ==> all rows must pass (must keep)
4651            true,
4652            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4653            false,
4654            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4655            false,
4656        ];
4657        prune_with_expr(expr, &schema, &statistics, expected_ret);
4658    }
4659
4660    #[test]
4661    fn prune_utf8_not_like_one() {
4662        let (schema, statistics) = utf8_setup();
4663
4664        let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4665        #[rustfmt::skip]
4666        let expected_ret = &[
4667            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4668            true,
4669            // s1 ["A", "L"] ==> some rows could pass (must keep)
4670            true,
4671            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4672            true,
4673            // s1 ["M", "M"] ==> some rows could pass (must keep)
4674            true,
4675            // s1 [NULL, NULL]  ==> unknown (must keep)
4676            true,
4677            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4678            true,
4679            // s1 ["", "A"]  ==> some rows could pass (must keep)
4680            true,
4681            // s1 ["", ""]  ==> some rows could pass (must keep)
4682            true,
4683            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4684            true,
4685            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match. (min, max) maybe truncate 
4686            // original (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
4687            true,
4688        ];
4689        prune_with_expr(expr, &schema, &statistics, expected_ret);
4690    }
4691
4692    #[test]
4693    fn prune_utf8_not_like_many() {
4694        let (schema, statistics) = utf8_setup();
4695
4696        let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4697        #[rustfmt::skip]
4698        let expected_ret = &[
4699            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4700            true,
4701            // s1 ["A", "L"] ==> some rows could pass (must keep)
4702            true,
4703            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4704            true,
4705            // s1 ["M", "M"] ==> some rows could pass (must keep)
4706            true,
4707            // s1 [NULL, NULL]  ==> unknown (must keep)
4708            true,
4709            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4710            true,
4711            // s1 ["", "A"]  ==> some rows could pass (must keep)
4712            true,
4713            // s1 ["", ""]  ==> some rows could pass (must keep)
4714            true,
4715            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4716            true,
4717            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match
4718            false,
4719        ];
4720        prune_with_expr(expr, &schema, &statistics, expected_ret);
4721
4722        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4723        #[rustfmt::skip]
4724        let expected_ret = &[
4725            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4726            true,
4727            // s1 ["A", "L"] ==> some rows could pass (must keep)
4728            true,
4729            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4730            true,
4731            // s1 ["M", "M"] ==> some rows could pass (must keep)
4732            true,
4733            // s1 [NULL, NULL]  ==> unknown (must keep)
4734            true,
4735            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4736            true,
4737            // s1 ["", "A"]  ==> some rows could pass (must keep)
4738            true,
4739            // s1 ["", ""]  ==> some rows could pass (must keep)
4740            true,
4741            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4742            true,
4743            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4744            true,
4745        ];
4746        prune_with_expr(expr, &schema, &statistics, expected_ret);
4747
4748        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4749        #[rustfmt::skip]
4750        let expected_ret = &[
4751            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4752            true,
4753            // s1 ["A", "L"] ==> some rows could pass (must keep)
4754            true,
4755            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4756            true,
4757            // s1 ["M", "M"] ==> some rows could pass (must keep)
4758            true,
4759            // s1 [NULL, NULL]  ==> unknown (must keep)
4760            true,
4761            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4762            true,
4763            // s1 ["", "A"]  ==> some rows could pass (must keep)
4764            true,
4765            // s1 ["", ""]  ==> some rows could pass (must keep)
4766            true,
4767            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4768            true,
4769            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4770            true,
4771        ];
4772        prune_with_expr(expr, &schema, &statistics, expected_ret);
4773
4774        let expr = col("s1").not_like(lit("A\\%%"));
4775        let statistics = TestStatistics::new().with(
4776            "s1",
4777            ContainerStats::new_utf8(
4778                vec![Some("A%a"), Some("A")],
4779                vec![Some("A%c"), Some("A")],
4780            ),
4781        );
4782        let expected_ret = &[false, true];
4783        prune_with_expr(expr, &schema, &statistics, expected_ret);
4784    }
4785
4786    #[test]
4787    fn test_rewrite_expr_to_prunable() {
4788        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4789        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4790
4791        // column op lit
4792        let left_input = col("a");
4793        let left_input = logical2physical(&left_input, &schema);
4794        let right_input = lit(ScalarValue::Int32(Some(12)));
4795        let right_input = logical2physical(&right_input, &schema);
4796        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4797            &left_input,
4798            Operator::Eq,
4799            &right_input,
4800            df_schema.clone(),
4801        )
4802        .unwrap();
4803        assert_eq!(result_left.to_string(), left_input.to_string());
4804        assert_eq!(result_right.to_string(), right_input.to_string());
4805
4806        // cast op lit
4807        let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4808        let left_input = logical2physical(&left_input, &schema);
4809        let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4810        let right_input = logical2physical(&right_input, &schema);
4811        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4812            &left_input,
4813            Operator::Gt,
4814            &right_input,
4815            df_schema.clone(),
4816        )
4817        .unwrap();
4818        assert_eq!(result_left.to_string(), left_input.to_string());
4819        assert_eq!(result_right.to_string(), right_input.to_string());
4820
4821        // try_cast op lit
4822        let left_input = try_cast(col("a"), DataType::Int64);
4823        let left_input = logical2physical(&left_input, &schema);
4824        let right_input = lit(ScalarValue::Int64(Some(12)));
4825        let right_input = logical2physical(&right_input, &schema);
4826        let (result_left, _, result_right) =
4827            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4828                .unwrap();
4829        assert_eq!(result_left.to_string(), left_input.to_string());
4830        assert_eq!(result_right.to_string(), right_input.to_string());
4831
4832        // TODO: add test for other case and op
4833    }
4834
4835    #[test]
4836    fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4837        struct CustomUnhandledHook;
4838
4839        impl UnhandledPredicateHook for CustomUnhandledHook {
4840            /// This handles an arbitrary case of a column that doesn't exist in the schema
4841            /// by renaming it to yet another column that doesn't exist in the schema
4842            /// (the transformation is arbitrary, the point is that it can do whatever it wants)
4843            fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4844                Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4845            }
4846        }
4847
4848        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4849        let schema_with_b = Schema::new(vec![
4850            Field::new("a", DataType::Int32, true),
4851            Field::new("b", DataType::Int32, true),
4852        ]);
4853
4854        let rewriter = PredicateRewriter::new()
4855            .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4856
4857        let transform_expr = |expr| {
4858            let expr = logical2physical(&expr, &schema_with_b);
4859            rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4860        };
4861
4862        // transform an arbitrary valid expression that we know is handled
4863        let known_expression = col("a").eq(lit(12));
4864        let known_expression_transformed = PredicateRewriter::new()
4865            .rewrite_predicate_to_statistics_predicate(
4866                &logical2physical(&known_expression, &schema),
4867                &schema,
4868            );
4869
4870        // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
4871        let input = col("b").eq(lit(12));
4872        let expected = logical2physical(&lit(42), &schema);
4873        let transformed = transform_expr(input.clone());
4874        assert_eq!(transformed.to_string(), expected.to_string());
4875
4876        // more complex case with unknown column
4877        let input = known_expression.clone().and(input.clone());
4878        let expected = phys_expr::BinaryExpr::new(
4879            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4880            Operator::And,
4881            logical2physical(&lit(42), &schema),
4882        );
4883        let transformed = transform_expr(input.clone());
4884        assert_eq!(transformed.to_string(), expected.to_string());
4885
4886        // an unknown expression gets passed to the hook
4887        let input = array_has(make_array(vec![lit(1)]), col("a"));
4888        let expected = logical2physical(&lit(42), &schema);
4889        let transformed = transform_expr(input.clone());
4890        assert_eq!(transformed.to_string(), expected.to_string());
4891
4892        // more complex case with unknown expression
4893        let input = known_expression.and(input);
4894        let expected = phys_expr::BinaryExpr::new(
4895            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4896            Operator::And,
4897            logical2physical(&lit(42), &schema),
4898        );
4899        let transformed = transform_expr(input.clone());
4900        assert_eq!(transformed.to_string(), expected.to_string());
4901    }
4902
4903    #[test]
4904    fn test_rewrite_expr_to_prunable_error() {
4905        // cast string value to numeric value
4906        // this cast is not supported
4907        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4908        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4909        let left_input = cast(col("a"), DataType::Int64);
4910        let left_input = logical2physical(&left_input, &schema);
4911        let right_input = lit(ScalarValue::Int64(Some(12)));
4912        let right_input = logical2physical(&right_input, &schema);
4913        let result = rewrite_expr_to_prunable(
4914            &left_input,
4915            Operator::Gt,
4916            &right_input,
4917            df_schema.clone(),
4918        );
4919        assert!(result.is_err());
4920
4921        // other expr
4922        let left_input = is_null(col("a"));
4923        let left_input = logical2physical(&left_input, &schema);
4924        let right_input = lit(ScalarValue::Int64(Some(12)));
4925        let right_input = logical2physical(&right_input, &schema);
4926        let result =
4927            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4928        assert!(result.is_err());
4929        // TODO: add other negative test for other case and op
4930    }
4931
4932    #[test]
4933    fn prune_with_contained_one_column() {
4934        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4935
4936        // Model having information like a bloom filter for s1
4937        let statistics = TestStatistics::new()
4938            .with_contained(
4939                "s1",
4940                [ScalarValue::from("foo")],
4941                [
4942                    // container 0 known to only contain "foo"",
4943                    Some(true),
4944                    // container 1 known to not contain "foo"
4945                    Some(false),
4946                    // container 2 unknown about "foo"
4947                    None,
4948                    // container 3 known to only contain "foo"
4949                    Some(true),
4950                    // container 4 known to not contain "foo"
4951                    Some(false),
4952                    // container 5 unknown about "foo"
4953                    None,
4954                    // container 6 known to only contain "foo"
4955                    Some(true),
4956                    // container 7 known to not contain "foo"
4957                    Some(false),
4958                    // container 8 unknown about "foo"
4959                    None,
4960                ],
4961            )
4962            .with_contained(
4963                "s1",
4964                [ScalarValue::from("bar")],
4965                [
4966                    // containers 0,1,2 known to only contain "bar"
4967                    Some(true),
4968                    Some(true),
4969                    Some(true),
4970                    // container 3,4,5 known to not contain "bar"
4971                    Some(false),
4972                    Some(false),
4973                    Some(false),
4974                    // container 6,7,8 unknown about "bar"
4975                    None,
4976                    None,
4977                    None,
4978                ],
4979            )
4980            .with_contained(
4981                // the way the tests are setup, this data is
4982                // consulted if the "foo" and "bar" are being checked at the same time
4983                "s1",
4984                [ScalarValue::from("foo"), ScalarValue::from("bar")],
4985                [
4986                    // container 0,1,2 unknown about ("foo, "bar")
4987                    None,
4988                    None,
4989                    None,
4990                    // container 3,4,5 known to contain only either "foo" and "bar"
4991                    Some(true),
4992                    Some(true),
4993                    Some(true),
4994                    // container 6,7,8  known to contain  neither "foo" and "bar"
4995                    Some(false),
4996                    Some(false),
4997                    Some(false),
4998                ],
4999            );
5000
5001        // s1 = 'foo'
5002        prune_with_expr(
5003            col("s1").eq(lit("foo")),
5004            &schema,
5005            &statistics,
5006            // rule out containers ('false) where we know foo is not present
5007            &[true, false, true, true, false, true, true, false, true],
5008        );
5009
5010        // s1 = 'bar'
5011        prune_with_expr(
5012            col("s1").eq(lit("bar")),
5013            &schema,
5014            &statistics,
5015            // rule out containers where we know bar is not present
5016            &[true, true, true, false, false, false, true, true, true],
5017        );
5018
5019        // s1 = 'baz' (unknown value)
5020        prune_with_expr(
5021            col("s1").eq(lit("baz")),
5022            &schema,
5023            &statistics,
5024            // can't rule out anything
5025            &[true, true, true, true, true, true, true, true, true],
5026        );
5027
5028        // s1 = 'foo' AND s1 = 'bar'
5029        prune_with_expr(
5030            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
5031            &schema,
5032            &statistics,
5033            // logically this predicate can't possibly be true (the column can't
5034            // take on both values) but we could rule it out if the stats tell
5035            // us that both values are not present
5036            &[true, true, true, true, true, true, true, true, true],
5037        );
5038
5039        // s1 = 'foo' OR s1 = 'bar'
5040        prune_with_expr(
5041            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
5042            &schema,
5043            &statistics,
5044            // can rule out containers that we know contain neither foo nor bar
5045            &[true, true, true, true, true, true, false, false, false],
5046        );
5047
5048        // s1 = 'foo' OR s1 = 'baz'
5049        prune_with_expr(
5050            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
5051            &schema,
5052            &statistics,
5053            // can't rule out anything container
5054            &[true, true, true, true, true, true, true, true, true],
5055        );
5056
5057        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
5058        prune_with_expr(
5059            col("s1")
5060                .eq(lit("foo"))
5061                .or(col("s1").eq(lit("bar")))
5062                .or(col("s1").eq(lit("baz"))),
5063            &schema,
5064            &statistics,
5065            // can rule out any containers based on knowledge of s1 and `foo`,
5066            // `bar` and (`foo`, `bar`)
5067            &[true, true, true, true, true, true, true, true, true],
5068        );
5069
5070        // s1 != foo
5071        prune_with_expr(
5072            col("s1").not_eq(lit("foo")),
5073            &schema,
5074            &statistics,
5075            // rule out containers we know for sure only contain foo
5076            &[false, true, true, false, true, true, false, true, true],
5077        );
5078
5079        // s1 != bar
5080        prune_with_expr(
5081            col("s1").not_eq(lit("bar")),
5082            &schema,
5083            &statistics,
5084            // rule out when we know for sure s1 has the value bar
5085            &[false, false, false, true, true, true, true, true, true],
5086        );
5087
5088        // s1 != foo AND s1 != bar
5089        prune_with_expr(
5090            col("s1")
5091                .not_eq(lit("foo"))
5092                .and(col("s1").not_eq(lit("bar"))),
5093            &schema,
5094            &statistics,
5095            // can rule out any container where we know s1 does not have either 'foo' or 'bar'
5096            &[true, true, true, false, false, false, true, true, true],
5097        );
5098
5099        // s1 != foo AND s1 != bar AND s1 != baz
5100        prune_with_expr(
5101            col("s1")
5102                .not_eq(lit("foo"))
5103                .and(col("s1").not_eq(lit("bar")))
5104                .and(col("s1").not_eq(lit("baz"))),
5105            &schema,
5106            &statistics,
5107            // can't rule out any container based on  knowledge of s1,s2
5108            &[true, true, true, true, true, true, true, true, true],
5109        );
5110
5111        // s1 != foo OR s1 != bar
5112        prune_with_expr(
5113            col("s1")
5114                .not_eq(lit("foo"))
5115                .or(col("s1").not_eq(lit("bar"))),
5116            &schema,
5117            &statistics,
5118            // cant' rule out anything based on contains information
5119            &[true, true, true, true, true, true, true, true, true],
5120        );
5121
5122        // s1 != foo OR s1 != bar OR s1 != baz
5123        prune_with_expr(
5124            col("s1")
5125                .not_eq(lit("foo"))
5126                .or(col("s1").not_eq(lit("bar")))
5127                .or(col("s1").not_eq(lit("baz"))),
5128            &schema,
5129            &statistics,
5130            // cant' rule out anything based on contains information
5131            &[true, true, true, true, true, true, true, true, true],
5132        );
5133    }
5134
5135    #[test]
5136    fn prune_with_contained_two_columns() {
5137        let schema = Arc::new(Schema::new(vec![
5138            Field::new("s1", DataType::Utf8, true),
5139            Field::new("s2", DataType::Utf8, true),
5140        ]));
5141
5142        // Model having information like bloom filters for s1 and s2
5143        let statistics = TestStatistics::new()
5144            .with_contained(
5145                "s1",
5146                [ScalarValue::from("foo")],
5147                [
5148                    // container 0, s1 known to only contain "foo"",
5149                    Some(true),
5150                    // container 1, s1 known to not contain "foo"
5151                    Some(false),
5152                    // container 2, s1 unknown about "foo"
5153                    None,
5154                    // container 3, s1 known to only contain "foo"
5155                    Some(true),
5156                    // container 4, s1 known to not contain "foo"
5157                    Some(false),
5158                    // container 5, s1 unknown about "foo"
5159                    None,
5160                    // container 6, s1 known to only contain "foo"
5161                    Some(true),
5162                    // container 7, s1 known to not contain "foo"
5163                    Some(false),
5164                    // container 8, s1 unknown about "foo"
5165                    None,
5166                ],
5167            )
5168            .with_contained(
5169                "s2", // for column s2
5170                [ScalarValue::from("bar")],
5171                [
5172                    // containers 0,1,2 s2 known to only contain "bar"
5173                    Some(true),
5174                    Some(true),
5175                    Some(true),
5176                    // container 3,4,5 s2 known to not contain "bar"
5177                    Some(false),
5178                    Some(false),
5179                    Some(false),
5180                    // container 6,7,8 s2 unknown about "bar"
5181                    None,
5182                    None,
5183                    None,
5184                ],
5185            );
5186
5187        // s1 = 'foo'
5188        prune_with_expr(
5189            col("s1").eq(lit("foo")),
5190            &schema,
5191            &statistics,
5192            // rule out containers where we know s1 is not present
5193            &[true, false, true, true, false, true, true, false, true],
5194        );
5195
5196        // s1 = 'foo' OR s2 = 'bar'
5197        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
5198        prune_with_expr(
5199            expr,
5200            &schema,
5201            &statistics,
5202            //  can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
5203            &[true, true, true, true, true, true, true, true, true],
5204        );
5205
5206        // s1 = 'foo' AND s2 != 'bar'
5207        prune_with_expr(
5208            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
5209            &schema,
5210            &statistics,
5211            // can only rule out container where we know either:
5212            // 1. s1 doesn't have the value 'foo` or
5213            // 2. s2 has only the value of 'bar'
5214            &[false, false, false, true, false, true, true, false, true],
5215        );
5216
5217        // s1 != 'foo' AND s2 != 'bar'
5218        prune_with_expr(
5219            col("s1")
5220                .not_eq(lit("foo"))
5221                .and(col("s2").not_eq(lit("bar"))),
5222            &schema,
5223            &statistics,
5224            // Can  rule out any container where we know either
5225            // 1. s1 has only the value 'foo'
5226            // 2. s2 has only the value 'bar'
5227            &[false, false, false, false, true, true, false, true, true],
5228        );
5229
5230        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
5231        prune_with_expr(
5232            col("s1")
5233                .not_eq(lit("foo"))
5234                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
5235            &schema,
5236            &statistics,
5237            // Can rule out any container where we know s1 has only the value
5238            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
5239            &[false, true, true, false, true, true, false, true, true],
5240        );
5241
5242        // s1 like '%foo%bar%'
5243        prune_with_expr(
5244            col("s1").like(lit("foo%bar%")),
5245            &schema,
5246            &statistics,
5247            // cant rule out anything with information we know
5248            &[true, true, true, true, true, true, true, true, true],
5249        );
5250
5251        // s1 like '%foo%bar%' AND s2 = 'bar'
5252        prune_with_expr(
5253            col("s1")
5254                .like(lit("foo%bar%"))
5255                .and(col("s2").eq(lit("bar"))),
5256            &schema,
5257            &statistics,
5258            // can rule out any container where we know s2 does not have the value 'bar'
5259            &[true, true, true, false, false, false, true, true, true],
5260        );
5261
5262        // s1 like '%foo%bar%' OR s2 = 'bar'
5263        prune_with_expr(
5264            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5265            &schema,
5266            &statistics,
5267            // can't rule out anything (we would have to prove that both the
5268            // like and the equality must be false)
5269            &[true, true, true, true, true, true, true, true, true],
5270        );
5271    }
5272
5273    #[test]
5274    fn prune_with_range_and_contained() {
5275        // Setup mimics range information for i, a bloom filter for s
5276        let schema = Arc::new(Schema::new(vec![
5277            Field::new("i", DataType::Int32, true),
5278            Field::new("s", DataType::Utf8, true),
5279        ]));
5280
5281        let statistics = TestStatistics::new()
5282            .with(
5283                "i",
5284                ContainerStats::new_i32(
5285                    // Container 0, 3, 6: [-5 to 5]
5286                    // Container 1, 4, 7: [10 to 20]
5287                    // Container 2, 5, 9: unknown
5288                    vec![
5289                        Some(-5),
5290                        Some(10),
5291                        None,
5292                        Some(-5),
5293                        Some(10),
5294                        None,
5295                        Some(-5),
5296                        Some(10),
5297                        None,
5298                    ], // min
5299                    vec![
5300                        Some(5),
5301                        Some(20),
5302                        None,
5303                        Some(5),
5304                        Some(20),
5305                        None,
5306                        Some(5),
5307                        Some(20),
5308                        None,
5309                    ], // max
5310                ),
5311            )
5312            // Add contained  information about the s and "foo"
5313            .with_contained(
5314                "s",
5315                [ScalarValue::from("foo")],
5316                [
5317                    // container 0,1,2 known to only contain "foo"
5318                    Some(true),
5319                    Some(true),
5320                    Some(true),
5321                    // container 3,4,5 known to not contain "foo"
5322                    Some(false),
5323                    Some(false),
5324                    Some(false),
5325                    // container 6,7,8 unknown about "foo"
5326                    None,
5327                    None,
5328                    None,
5329                ],
5330            );
5331
5332        // i = 0 and s = 'foo'
5333        prune_with_expr(
5334            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5335            &schema,
5336            &statistics,
5337            // Can rule out container where we know that either:
5338            // 1. 0 is outside the min/max range of i
5339            // 1. s does not contain foo
5340            // (range is false, and contained  is false)
5341            &[true, false, true, false, false, false, true, false, true],
5342        );
5343
5344        // i = 0 and s != 'foo'
5345        prune_with_expr(
5346            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5347            &schema,
5348            &statistics,
5349            // Can rule out containers where either:
5350            // 1. 0 is outside the min/max range of i
5351            // 2. s only contains foo
5352            &[false, false, false, true, false, true, true, false, true],
5353        );
5354
5355        // i = 0 OR s = 'foo'
5356        prune_with_expr(
5357            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5358            &schema,
5359            &statistics,
5360            // in theory could rule out containers if we had min/max values for
5361            // s as well. But in this case we don't so we can't rule out anything
5362            &[true, true, true, true, true, true, true, true, true],
5363        );
5364    }
5365
5366    /// prunes the specified expr with the specified schema and statistics, and
5367    /// ensures it returns expected.
5368    ///
5369    /// `expected` is a vector of bools, where true means the row group should
5370    /// be kept, and false means it should be pruned.
5371    // TODO refactor other tests to use this to reduce boiler plate
5372    fn prune_with_expr(
5373        expr: Expr,
5374        schema: &SchemaRef,
5375        statistics: &TestStatistics,
5376        expected: &[bool],
5377    ) {
5378        println!("Pruning with expr: {expr}");
5379        let expr = logical2physical(&expr, schema);
5380        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5381        let result = p.prune(statistics).unwrap();
5382        assert_eq!(result, expected);
5383    }
5384
5385    fn prune_with_simplified_expr(
5386        expr: Expr,
5387        schema: &SchemaRef,
5388        statistics: &TestStatistics,
5389        expected: &[bool],
5390    ) {
5391        println!("Pruning with expr: {expr}");
5392        let expr = logical2physical(&expr, schema);
5393        let simplifier = PhysicalExprSimplifier::new(schema);
5394        let expr = simplifier.simplify(expr).unwrap();
5395        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5396        let result = p.prune(statistics).unwrap();
5397        assert_eq!(result, expected);
5398    }
5399
5400    fn test_build_predicate_expression(
5401        expr: &Expr,
5402        schema: &Schema,
5403        required_columns: &mut RequiredColumns,
5404    ) -> Arc<dyn PhysicalExpr> {
5405        let expr = logical2physical(expr, schema);
5406        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5407        build_predicate_expression(
5408            &expr,
5409            &Arc::new(schema.clone()),
5410            required_columns,
5411            &unhandled_hook,
5412        )
5413    }
5414
5415    #[test]
5416    fn test_build_predicate_expression_with_false() {
5417        let expr = lit(ScalarValue::Boolean(Some(false)));
5418        let schema = Schema::empty();
5419        let res =
5420            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5421        let expected = logical2physical(&expr, &schema);
5422        assert_eq!(&res, &expected);
5423    }
5424
5425    #[test]
5426    fn test_build_predicate_expression_with_and_false() {
5427        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5428        let expr = and(
5429            col("c1").eq(lit("a")),
5430            lit(ScalarValue::Boolean(Some(false))),
5431        );
5432        let res =
5433            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5434        let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5435        assert_eq!(&res, &expected);
5436    }
5437
5438    #[test]
5439    fn test_build_predicate_expression_with_or_false() {
5440        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5441        let left_expr = col("c1").eq(lit("a"));
5442        let right_expr = lit(ScalarValue::Boolean(Some(false)));
5443        let res = test_build_predicate_expression(
5444            &or(left_expr.clone(), right_expr.clone()),
5445            &schema,
5446            &mut RequiredColumns::new(),
5447        );
5448        let expected =
5449            "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5450        assert_eq!(res.to_string(), expected);
5451    }
5452}