Skip to main content

datafusion_pruning/
pruning_predicate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27    array::{ArrayRef, BooleanArray, new_null_array},
28    datatypes::{DataType, Field, Schema, SchemaRef},
29    record_batch::{RecordBatch, RecordBatchOptions},
30};
31// pub use for backwards compatibility
32pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::Result;
38use datafusion_common::tree_node::{TransformedResult, TreeNodeRecursion};
39use datafusion_common::{Column, DFSchema, assert_eq_or_internal_err};
40use datafusion_common::{
41    ScalarValue, internal_datafusion_err, plan_datafusion_err, plan_err,
42    tree_node::{Transformed, TreeNode},
43};
44use datafusion_expr_common::operator::Operator;
45use datafusion_physical_expr::utils::{Guarantee, LiteralGuarantee};
46use datafusion_physical_expr::{PhysicalExprRef, expressions as phys_expr};
47use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr_opt;
48use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
49
50/// Used to prove that arbitrary predicates (boolean expression) can not
51/// possibly evaluate to `true` given information about a column provided by
52/// [`PruningStatistics`].
53///
54/// # Introduction
55///
56/// `PruningPredicate` analyzes filter expressions using statistics such as
57/// min/max values and null counts, attempting to prove a "container" (e.g.
58/// Parquet Row Group) can be skipped without reading the actual data,
59/// potentially leading to significant performance improvements.
60///
61/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
62/// on the min/max values found in the Parquet metadata. If the
63/// `PruningPredicate` can prove that the filter can never evaluate to `true`
64/// for any row in the Row Group, the entire Row Group is skipped during query
65/// execution.
66///
67/// The `PruningPredicate` API is general, and can be used for pruning other
68/// types of containers (e.g. files) based on statistics that may be known from
69/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
70/// subtle topic.  See the Background and Implementation section for details.
71///
72/// `PruningPredicate` supports:
73///
74/// 1. Arbitrary expressions (including user defined functions)
75///
76/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
77///    so it is suitable for pruning 1000s of containers.
78///
79/// 3. Any source of information that implements the [`PruningStatistics`] trait
80///    (not just Parquet metadata).
81///
82/// # Example
83///
84/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
85/// example of how to use `PruningPredicate` to prune files based on min/max
86/// values.
87///
88/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/query_planning/pruning.rs
89///
90/// Given an expression like `x = 5` and statistics for 3 containers (Row
91/// Groups, files, etc) `A`, `B`, and `C`:
92///
93/// ```text
94///   A: {x_min = 0, x_max = 4}
95///   B: {x_min = 2, x_max = 10}
96///   C: {x_min = 5, x_max = 8}
97/// ```
98///
99/// `PruningPredicate` will conclude that the rows in container `A` can never
100/// be true (as the maximum value is only `4`), so it can be pruned:
101///
102/// ```text
103/// A: false (no rows could possibly match x = 5)
104/// B: true  (rows might match x = 5)
105/// C: true  (rows might match x = 5)
106/// ```
107///
108/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
109///
110/// # Background
111///
112/// ## Boolean Tri-state logic
113///
114/// To understand the details of the rest of this documentation, it is important
115/// to understand how the tri-state boolean logic in SQL works. As this is
116/// somewhat esoteric, we review it here.
117///
118/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
119/// uncertainty propagates through expressions. SQL `NULL` behaves very
120/// differently than the `NULL` in most other languages where it is a special,
121/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
122/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
123/// first encountered as they behave differently than most programmers may
124/// expect.
125///
126/// In most other programming languages,
127/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
128/// * `a == NULL` evaluates to `false` if `a` has any other value
129///
130/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
131/// `false`):
132///
133/// Expression    | Result
134/// ------------- | ---------
135/// `1 = NULL`    | `NULL`
136/// `NULL = NULL` | `NULL`
137///
138/// Also important is how `AND` and `OR` works with tri-state boolean logic as
139/// (perhaps counterintuitively) the result is **not** always NULL. While
140/// consistent with the notion of `NULL` representing “unknown”, this is again,
141/// often deeply confusing 🤯 when first encountered.
142///
143/// Expression       | Result    | Intuition
144/// ---------------  | --------- | -----------
145/// `NULL AND true`  |   `NULL`  | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
146/// `NULL AND false` |  `false`  | If the `NULL` was either `true` or `false` the overall expression is still `false`
147/// `NULL AND NULL`  | `NULL`    |
148///
149/// Expression      | Result    | Intuition
150/// --------------- | --------- | ----------
151/// `NULL OR true`  | `true`    |  If the `NULL` was either `true` or `false` the overall expression is still `true`
152/// `NULL OR false` | `NULL`    |  The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
153/// `NULL OR NULL`  |  `NULL`   |
154///
155/// ## SQL Filter Semantics
156///
157/// The SQL `WHERE` clause has a boolean expression, often called a filter or
158/// predicate. The semantics of this predicate are that the query evaluates the
159/// predicate for each row in the input tables and:
160///
161/// * Rows that evaluate to `true` are returned in the query results
162///
163/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
164///
165/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
166///   Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
167///   in the rewritten predicate described below.*
168///
169/// # `PruningPredicate` Implementation
170///
171/// Armed with the information in the Background section, we can now understand
172/// how the `PruningPredicate` logic works.
173///
174/// ## Interface
175///
176/// **Inputs**
177/// 1. An input schema describing what columns exist
178///
179/// 2. A predicate (expression that evaluates to a boolean)
180///
181/// 3. [`PruningStatistics`] that provides information about columns in that
182///    schema, for multiple “containers”. For each column in each container, it
183///    provides optional information on contained values, min_values, max_values,
184///    null_counts counts, and row_counts counts.
185///
186/// **Outputs**:
187/// A (non null) boolean value for each container:
188/// * `true`: There MAY be rows that match the predicate
189///
190/// * `false`: There are no rows that could possibly match the predicate (the
191///   predicate can never possibly be true). The container can be pruned (skipped)
192///   entirely.
193///
194/// While `PruningPredicate` will never return a `NULL` value, the
195/// rewritten predicate (as returned by `build_predicate_expression` and used internally
196/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
197/// or null / row counts are not known.
198///
199/// In order to be correct, `PruningPredicate` must return false
200/// **only** if it can determine that for all rows in the container, the
201/// predicate could never evaluate to `true` (always evaluates to either `NULL`
202/// or `false`).
203///
204/// ## Contains Analysis and Min/Max Rewrite
205///
206/// `PruningPredicate` works by first analyzing the predicate to see what
207/// [`LiteralGuarantee`] must hold for the predicate to be true.
208///
209/// Then, the `PruningPredicate` rewrites the original predicate into an
210/// expression that references the min/max values of each column in the original
211/// predicate.
212///
213/// When the min/max values are actually substituted in to this expression and
214/// evaluated, the result means
215///
216/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
217///
218/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
219///   Note that rewritten predicate can evaluate to NULL when some of
220///   the min/max values are not known. *Note that this is different than
221///   the SQL filter semantics where `NULL` means the row is filtered
222///   out.*
223///
224/// * `false`: there are no rows that could possibly match the predicate,
225///   **PRUNES** the container
226///
227/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
228/// `x_row_count` represent the minimum and maximum values, the null count of
229/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
230/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
231/// is known to be all `NULL`s. Note this is different from knowing nothing about
232/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
233/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
234///
235/// Here are some examples of the rewritten predicates:
236///
237/// Original Predicate | Rewritten Predicate
238/// ------------------ | --------------------
239/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
240/// `x < 5` | `x_null_count != x_row_count AND (x_min < 5)`
241/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
242/// `x IS NULL`  | `x_null_count > 0`
243/// `x IS NOT NULL`  | `x_null_count != row_count`
244/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
245///
246/// ## Predicate Evaluation
247/// The PruningPredicate works in two passes
248///
249/// **First pass**:  For each `LiteralGuarantee` calls
250/// [`PruningStatistics::contained`] and rules out containers where the
251/// LiteralGuarantees are not satisfied
252///
253/// **Second Pass**: Evaluates the rewritten expression using the
254/// min/max/null_counts/row_counts values for each column for each container. For any
255/// container that this expression evaluates to `false`, it rules out those
256/// containers.
257///
258///
259/// ### Example 1
260///
261/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
262///
263/// ```sql
264/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
265/// AND
266/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
267/// ```
268///
269/// If we know that for a given container, `x` is between `1 and 100` and we know that
270/// `y` is between `4` and `7`, we know nothing about the null count and row count of
271/// `x` and `y`, the input statistics might look like:
272///
273/// Column   | Value
274/// -------- | -----
275/// `x_min`  | `1`
276/// `x_max`  | `100`
277/// `x_null_count` | `null`
278/// `x_row_count`  | `null`
279/// `y_min`  | `4`
280/// `y_max`  | `7`
281/// `y_null_count` | `null`
282/// `y_row_count`  | `null`
283///
284/// When these statistics values are substituted in to the rewritten predicate and
285/// simplified, the result is `false`:
286///
287/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
288/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
289/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
290/// * evaluating the clauses further we get:
291/// * `null and true and null and false`
292/// * `null and false`
293/// * `false`
294///
295/// Returning `false` means the container can be pruned, which matches the
296/// intuition that  `x = 5 AND y = 10` can’t be true for any row if all values of `y`
297/// are `7` or less.
298///
299/// Note that if we had ended up with `null AND true AND null AND true` the result
300/// would have been `null`.
301/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
302///
303/// If, for some other container, we knew `y` was between the values `4` and
304/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
305/// left as an exercise to the reader -- are you still here?), and the container
306/// **could not** be pruned. The intuition is that there may be rows where the
307/// predicate *might* evaluate to `true`, and the only way to find out is to do
308/// more analysis, for example by actually reading the data and evaluating the
309/// predicate row by row.
310///
311/// ### Example 2
312///
313/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
314/// look like the same as example 1:
315///
316/// ```sql
317/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
318/// AND
319/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
320/// ```
321///
322/// If we know that for another given container, `x_min` is NULL and `x_max` is
323/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
324///  is `100`; we know that `y` is between `4` and `7`, but we know nothing about
325/// the null count and row count of `y`. The input statistics might look like:
326///
327/// Column   | Value
328/// -------- | -----
329/// `x_min`  | `null`
330/// `x_max`  | `null`
331/// `x_null_count` | `100`
332/// `x_row_count`  | `100`
333/// `y_min`  | `4`
334/// `y_max`  | `7`
335/// `y_null_count` | `null`
336/// `y_row_count`  | `null`
337///
338/// When these statistics values are substituted in to the rewritten predicate and
339/// simplified, the result is `false`:
340///
341/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
342/// * `false AND null AND null AND false`
343/// * `false AND false`
344/// * `false`
345///
346/// Returning `false` means the container can be pruned, which matches the
347/// intuition that  `x = 5 AND y = 10` can’t be true because all values in `x`
348/// are known to be NULL.
349///
350/// # Related Work
351///
352/// [`PruningPredicate`] implements the type of min/max pruning described in
353/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
354/// described by various research such as [small materialized aggregates], [zone
355/// maps], and [data skipping].
356///
357/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
358/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
359/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
360/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
361#[derive(Debug, Clone)]
362pub struct PruningPredicate {
363    /// The input schema against which the predicate will be evaluated
364    schema: SchemaRef,
365    /// A min/max pruning predicate (rewritten in terms of column min/max
366    /// values, which are supplied by statistics)
367    predicate_expr: Arc<dyn PhysicalExpr>,
368    /// Description of which statistics are required to evaluate `predicate_expr`
369    required_columns: RequiredColumns,
370    /// Original physical predicate from which this predicate expr is derived
371    /// (required for serialization)
372    orig_expr: Arc<dyn PhysicalExpr>,
373    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
374    /// evaluate to `true`.
375    ///
376    /// See [`PruningPredicate::literal_guarantees`] for more details.
377    literal_guarantees: Vec<LiteralGuarantee>,
378}
379
380/// Build a pruning predicate from an optional predicate expression.
381/// If the predicate is None or the predicate cannot be converted to a pruning
382/// predicate, return None.
383/// If there is an error creating the pruning predicate it is recorded by incrementing
384/// the `predicate_creation_errors` counter.
385pub fn build_pruning_predicate(
386    predicate: Arc<dyn PhysicalExpr>,
387    file_schema: &SchemaRef,
388    predicate_creation_errors: &Count,
389) -> Option<Arc<PruningPredicate>> {
390    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
391        Ok(pruning_predicate) => {
392            if !pruning_predicate.always_true() {
393                return Some(Arc::new(pruning_predicate));
394            }
395        }
396        Err(e) => {
397            debug!("Could not create pruning predicate for: {e}");
398            predicate_creation_errors.add(1);
399        }
400    }
401    None
402}
403
404/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
405/// complex expressions or predicates that reference columns that are not in the
406/// schema.
407pub trait UnhandledPredicateHook {
408    /// Called when a predicate can not be rewritten in terms of statistics or
409    /// references a column that is not in the schema.
410    fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
411}
412
413/// The default handling for unhandled predicates is to return a constant `true`
414/// (meaning don't prune the container)
415#[derive(Debug, Clone)]
416struct ConstantUnhandledPredicateHook {
417    default: Arc<dyn PhysicalExpr>,
418}
419
420impl Default for ConstantUnhandledPredicateHook {
421    fn default() -> Self {
422        Self {
423            default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
424        }
425    }
426}
427
428impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
429    fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
430        Arc::clone(&self.default)
431    }
432}
433
434impl PruningPredicate {
435    /// Try to create a new instance of [`PruningPredicate`]
436    ///
437    /// This will translate the provided `expr` filter expression into
438    /// a *pruning predicate*.
439    ///
440    /// A pruning predicate is one that has been rewritten in terms of
441    /// the min and max values of column references and that evaluates
442    /// to FALSE if the filter predicate would evaluate FALSE *for
443    /// every row* whose values fell within the min / max ranges (aka
444    /// could be pruned).
445    ///
446    /// The pruning predicate evaluates to TRUE or NULL
447    /// if the filter predicate *might* evaluate to TRUE for at least
448    /// one row whose values fell within the min/max ranges (in other
449    /// words they might pass the predicate)
450    ///
451    /// For example, the filter expression `(column / 2) = 4` becomes
452    /// the pruning predicate
453    /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
454    ///
455    /// See the struct level documentation on [`PruningPredicate`] for more
456    /// details.
457    ///
458    /// Note that `PruningPredicate` does not attempt to normalize or simplify
459    /// the input expression unless calling [`snapshot_physical_expr_opt`]
460    /// returns a new expression.
461    /// It is recommended that you pass the expressions through [`PhysicalExprSimplifier`]
462    /// before calling this method to make sure the expressions can be used for pruning.
463    pub fn try_new(mut expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
464        // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`.
465        // In particular this unravels any `DynamicFilterPhysicalExpr`s by snapshotting them
466        // so that PruningPredicate can work with a static expression.
467        let tf = snapshot_physical_expr_opt(expr)?;
468        if tf.transformed {
469            // If we had an expression such as Dynamic(part_col < 5 and col < 10)
470            // (this could come from something like `select * from t order by part_col, col, limit 10`)
471            // after snapshotting and because `DynamicFilterPhysicalExpr` applies child replacements to its
472            // children after snapshotting and previously `replace_columns_with_literals` may have been called with partition values
473            // the expression we have now is `8 < 5 and col < 10`.
474            // Thus we need as simplifier pass to get `false and col < 10` => `false` here.
475            let simplifier = PhysicalExprSimplifier::new(&schema);
476            expr = simplifier.simplify(tf.data)?;
477        } else {
478            expr = tf.data;
479        }
480        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
481
482        // build predicate expression once
483        let mut required_columns = RequiredColumns::new();
484        let predicate_expr = build_predicate_expression(
485            &expr,
486            &schema,
487            &mut required_columns,
488            &unhandled_hook,
489        );
490        let predicate_schema = required_columns.schema();
491        // Simplify the newly created predicate to get rid of redundant casts, comparisons, etc.
492        let predicate_expr =
493            PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
494        let literal_guarantees = LiteralGuarantee::analyze(&expr);
495
496        Ok(Self {
497            schema,
498            predicate_expr,
499            required_columns,
500            orig_expr: expr,
501            literal_guarantees,
502        })
503    }
504
505    /// For each set of statistics, evaluates the pruning predicate
506    /// and returns a `bool` with the following meaning for a
507    /// all rows whose values match the statistics:
508    ///
509    /// `true`: There MAY be rows that match the predicate
510    ///
511    /// `false`: There are no rows that could possibly match the predicate
512    ///
513    /// Note: the predicate passed to `prune` should already be simplified as
514    /// much as possible (e.g. this pass doesn't handle some
515    /// expressions like `b = false`, but it does handle the
516    /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
517    ///
518    /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
519    pub fn prune<S: PruningStatistics + ?Sized>(
520        &self,
521        statistics: &S,
522    ) -> Result<Vec<bool>> {
523        let mut builder = BoolVecBuilder::new(statistics.num_containers());
524
525        // Try to prove the predicate can't be true for the containers based on
526        // literal guarantees
527        for literal_guarantee in &self.literal_guarantees {
528            let LiteralGuarantee {
529                column,
530                guarantee,
531                literals,
532            } = literal_guarantee;
533            if let Some(results) = statistics.contained(column, literals) {
534                match guarantee {
535                    // `In` means the values in the column must be one of the
536                    // values in the set for the predicate to evaluate to true.
537                    // If `contained` returns false, that means the column is
538                    // not any of the values so we can prune the container
539                    Guarantee::In => builder.combine_array(&results),
540                    // `NotIn` means the values in the column must not be
541                    // any of the values in the set for the predicate to
542                    // evaluate to true. If `contained` returns true, it means the
543                    // column is only in the set of values so we can prune the
544                    // container
545                    Guarantee::NotIn => {
546                        builder.combine_array(&arrow::compute::not(&results)?)
547                    }
548                }
549                // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
550                // can return early without evaluating the rest of predicates.
551                if builder.check_all_pruned() {
552                    return Ok(builder.build());
553                }
554            }
555        }
556
557        // Next, try to prove the predicate can't be true for the containers based
558        // on min/max values
559
560        // build a RecordBatch that contains the min/max values in the
561        // appropriate statistics columns for the min/max predicate
562        let statistics_batch =
563            build_statistics_record_batch(statistics, &self.required_columns)?;
564
565        // Evaluate the pruning predicate on that record batch and append any results to the builder
566        builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
567
568        Ok(builder.build())
569    }
570
571    /// Return a reference to the input schema
572    pub fn schema(&self) -> &SchemaRef {
573        &self.schema
574    }
575
576    /// Returns a reference to the physical expr used to construct this pruning predicate
577    pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
578        &self.orig_expr
579    }
580
581    /// Returns a reference to the predicate expr
582    pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
583        &self.predicate_expr
584    }
585
586    /// Returns a reference to the literal guarantees
587    ///
588    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
589    /// expression to possibly be `true`. If any is not satisfied, the
590    /// expression is guaranteed to be `null` or `false`.
591    pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
592        &self.literal_guarantees
593    }
594
595    /// Returns true if this pruning predicate can not prune anything.
596    ///
597    /// This happens if the predicate is a literal `true`  and
598    /// literal_guarantees is empty.
599    ///
600    /// This can happen when a predicate is simplified to a constant `true`
601    pub fn always_true(&self) -> bool {
602        is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
603    }
604
605    pub fn required_columns(&self) -> &RequiredColumns {
606        &self.required_columns
607    }
608
609    /// Names of the columns that are known to be / not be in a set
610    /// of literals (constants). These are the columns the that may be passed to
611    /// [`PruningStatistics::contained`] during pruning.
612    ///
613    /// This is useful to avoid fetching statistics for columns that will not be
614    /// used in the predicate. For example, it can be used to avoid reading
615    /// unneeded bloom filters (a non trivial operation).
616    pub fn literal_columns(&self) -> Vec<String> {
617        let mut seen = HashSet::new();
618        self.literal_guarantees
619            .iter()
620            .map(|e| &e.column.name)
621            // avoid duplicates
622            .filter(|name| seen.insert(*name))
623            .map(|s| s.to_string())
624            .collect()
625    }
626}
627
628/// Builds the return `Vec` for [`PruningPredicate::prune`].
629#[derive(Debug)]
630struct BoolVecBuilder {
631    /// One element per container. Each element is
632    /// * `true`: if the container has row that may pass the predicate
633    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
634    inner: Vec<bool>,
635}
636
637impl BoolVecBuilder {
638    /// Create a new `BoolVecBuilder` with `num_containers` elements
639    fn new(num_containers: usize) -> Self {
640        Self {
641            // assume by default all containers may pass the predicate
642            inner: vec![true; num_containers],
643        }
644    }
645
646    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
647    /// predicate into the currently in progress array.
648    ///
649    /// Each `array` element is:
650    /// * `true`: container has row that may pass the predicate
651    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
652    /// * `null`: container may or may not have rows that pass the predicate
653    fn combine_array(&mut self, array: &BooleanArray) {
654        assert_eq!(array.len(), self.inner.len());
655        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
656            // `false` for this conjunct means we know for sure no rows could
657            // pass the predicate and thus we set the corresponding container
658            // location to false.
659            if let Some(false) = new {
660                *cur = false;
661            }
662        }
663    }
664
665    /// Combines the results in the [`ColumnarValue`] to the currently in
666    /// progress array, following the same rules as [`Self::combine_array`].
667    ///
668    /// # Panics
669    /// If `value` is not boolean
670    fn combine_value(&mut self, value: ColumnarValue) {
671        match value {
672            ColumnarValue::Array(array) => {
673                self.combine_array(array.as_boolean());
674            }
675            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
676                // False means all containers can not pass the predicate
677                self.inner = vec![false; self.inner.len()];
678            }
679            _ => {
680                // Null or true means the rows in container may pass this
681                // conjunct so we can't prune any containers based on that
682            }
683        }
684    }
685
686    /// Convert this builder into a Vec of bools
687    fn build(self) -> Vec<bool> {
688        self.inner
689    }
690
691    /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
692    fn check_all_pruned(&self) -> bool {
693        self.inner.iter().all(|&x| !x)
694    }
695}
696
697fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
698    expr.downcast_ref::<phys_expr::Literal>()
699        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
700        .unwrap_or_default()
701}
702
703fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
704    expr.downcast_ref::<phys_expr::Literal>()
705        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
706        .unwrap_or_default()
707}
708
709/// Describes which columns statistics are necessary to evaluate a
710/// [`PruningPredicate`].
711///
712/// This structure permits reading and creating the minimum number statistics,
713/// which is important since statistics may be non trivial to read (e.g. large
714/// strings or when there are 1000s of columns).
715///
716/// Handles creating references to the min/max statistics
717/// for columns as well as recording which statistics are needed
718#[derive(Debug, Default, Clone)]
719pub struct RequiredColumns {
720    /// The statistics required to evaluate this predicate:
721    /// * The unqualified column in the input schema
722    /// * Statistics type (e.g. Min or Max or Null_Count)
723    /// * The field the statistics value should be placed in for
724    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
725    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
726}
727
728impl RequiredColumns {
729    fn new() -> Self {
730        Self::default()
731    }
732
733    /// Returns Some(column) if this is a single column predicate.
734    ///
735    /// Returns None if this is a multi-column predicate.
736    ///
737    /// Examples:
738    /// * `a > 5 OR a < 10` returns `Some(a)`
739    /// * `a > 5 OR b < 10` returns `None`
740    /// * `true` returns None
741    pub fn single_column(&self) -> Option<&phys_expr::Column> {
742        if self.columns.windows(2).all(|w| {
743            // check if all columns are the same (ignoring statistics and field)
744            let c1 = &w[0].0;
745            let c2 = &w[1].0;
746            c1 == c2
747        }) {
748            self.columns.first().map(|r| &r.0)
749        } else {
750            None
751        }
752    }
753
754    /// Returns a schema that describes the columns required to evaluate this
755    /// pruning predicate.
756    /// The schema contains the fields for each column in `self.columns` with
757    /// the appropriate data type for the statistics.
758    /// Order matters, this same order is used to evaluate the
759    /// pruning predicate.
760    fn schema(&self) -> Schema {
761        let fields = self
762            .columns
763            .iter()
764            .map(|(_c, _t, f)| f.clone())
765            .collect::<Vec<_>>();
766        Schema::new(fields)
767    }
768
769    /// Returns an iterator over items in columns (see doc on
770    /// `self.columns` for details)
771    pub(crate) fn iter(
772        &self,
773    ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
774        self.columns.iter()
775    }
776
777    fn find_stat_column(
778        &self,
779        column: &phys_expr::Column,
780        statistics_type: StatisticsType,
781    ) -> Option<usize> {
782        match statistics_type {
783            StatisticsType::RowCount => {
784                // Use the first row count we find, if any
785                self.columns
786                    .iter()
787                    .enumerate()
788                    .find(|(_i, (_c, t, _f))| t == &statistics_type)
789                    .map(|(i, (_c, _t, _f))| i)
790            }
791            _ => self
792                .columns
793                .iter()
794                .enumerate()
795                .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
796                .map(|(i, (_c, _t, _f))| i),
797        }
798    }
799
800    /// Rewrites column_expr so that all appearances of column
801    /// are replaced with a reference to either the min or max
802    /// statistics column, while keeping track that a reference to the statistics
803    /// column is required
804    ///
805    /// for example, an expression like `col("foo") > 5`, when called
806    /// with Max would result in an expression like `col("foo_max") >
807    /// 5` with the appropriate entry noted in self.columns
808    fn stat_column_expr(
809        &mut self,
810        column: &phys_expr::Column,
811        column_expr: &Arc<dyn PhysicalExpr>,
812        field: &Field,
813        stat_type: StatisticsType,
814    ) -> Result<Arc<dyn PhysicalExpr>> {
815        let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
816            Some(idx) => (idx, false),
817            None => (self.columns.len(), true),
818        };
819
820        let column_name = column.name();
821        let stat_column_name = match stat_type {
822            StatisticsType::Min => format!("{column_name}_min"),
823            StatisticsType::Max => format!("{column_name}_max"),
824            StatisticsType::NullCount => format!("{column_name}_null_count"),
825            StatisticsType::RowCount => "row_count".to_string(),
826        };
827
828        let stat_column = phys_expr::Column::new(&stat_column_name, idx);
829
830        // only add statistics column if not previously added
831        if need_to_insert {
832            // may be null if statistics are not present
833            let nullable = true;
834            let stat_field =
835                Field::new(stat_column.name(), field.data_type().clone(), nullable);
836            self.columns.push((column.clone(), stat_type, stat_field));
837        }
838        rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
839    }
840
841    /// rewrite col --> col_min
842    fn min_column_expr(
843        &mut self,
844        column: &phys_expr::Column,
845        column_expr: &Arc<dyn PhysicalExpr>,
846        field: &Field,
847    ) -> Result<Arc<dyn PhysicalExpr>> {
848        self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
849    }
850
851    /// rewrite col --> col_max
852    fn max_column_expr(
853        &mut self,
854        column: &phys_expr::Column,
855        column_expr: &Arc<dyn PhysicalExpr>,
856        field: &Field,
857    ) -> Result<Arc<dyn PhysicalExpr>> {
858        self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
859    }
860
861    /// rewrite col --> col_null_count
862    fn null_count_column_expr(
863        &mut self,
864        column: &phys_expr::Column,
865        column_expr: &Arc<dyn PhysicalExpr>,
866        field: &Field,
867    ) -> Result<Arc<dyn PhysicalExpr>> {
868        self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
869    }
870
871    /// rewrite col --> col_row_count
872    fn row_count_column_expr(
873        &mut self,
874        column: &phys_expr::Column,
875        column_expr: &Arc<dyn PhysicalExpr>,
876        field: &Field,
877    ) -> Result<Arc<dyn PhysicalExpr>> {
878        self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
879    }
880}
881
882impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
883    fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
884        Self { columns }
885    }
886}
887
888/// Build a RecordBatch from a list of statistics, creating arrays,
889/// with one row for each PruningStatistics and columns specified in
890/// the required_columns parameter.
891///
892/// For example, if the requested columns are
893/// ```text
894/// ("s1", Min, Field:s1_min)
895/// ("s2", Max, field:s2_max)
896/// ```
897///
898/// And the input statistics had
899/// ```text
900/// S1(Min: 5, Max: 10)
901/// S2(Min: 99, Max: 1000)
902/// S3(Min: 1, Max: 2)
903/// ```
904///
905/// Then this function would build a record batch with 2 columns and
906/// one row s1_min and s2_max as follows (s3 is not requested):
907///
908/// ```text
909/// s1_min | s2_max
910/// -------+--------
911///   5    | 1000
912/// ```
913fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
914    statistics: &S,
915    required_columns: &RequiredColumns,
916) -> Result<RecordBatch> {
917    let mut arrays = Vec::<ArrayRef>::new();
918    // For each needed statistics column:
919    for (column, statistics_type, stat_field) in required_columns.iter() {
920        let column = Column::from_name(column.name());
921        let data_type = stat_field.data_type();
922
923        let num_containers = statistics.num_containers();
924
925        let array = match statistics_type {
926            StatisticsType::Min => statistics.min_values(&column),
927            StatisticsType::Max => statistics.max_values(&column),
928            StatisticsType::NullCount => statistics.null_counts(&column),
929            StatisticsType::RowCount => statistics.row_counts(),
930        };
931        let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
932
933        assert_eq_or_internal_err!(
934            num_containers,
935            array.len(),
936            "mismatched statistics length. Expected {}, got {}",
937            num_containers,
938            array.len()
939        );
940
941        // cast statistics array to required data type (e.g. parquet
942        // provides timestamp statistics as "Int64")
943        let array = arrow::compute::cast(&array, data_type)?;
944
945        arrays.push(array);
946    }
947
948    let schema = Arc::new(required_columns.schema());
949    // provide the count in case there were no needed statistics
950    let mut options = RecordBatchOptions::default();
951    options.row_count = Some(statistics.num_containers());
952
953    trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
954
955    RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
956        plan_datafusion_err!("Can not create statistics record batch: {err}")
957    })
958}
959
960struct PruningExpressionBuilder<'a> {
961    column: phys_expr::Column,
962    column_expr: Arc<dyn PhysicalExpr>,
963    op: Operator,
964    scalar_expr: Arc<dyn PhysicalExpr>,
965    field: &'a Field,
966    required_columns: &'a mut RequiredColumns,
967}
968
969impl<'a> PruningExpressionBuilder<'a> {
970    fn try_new(
971        left: &'a Arc<dyn PhysicalExpr>,
972        right: &'a Arc<dyn PhysicalExpr>,
973        left_columns: ColumnReferenceCount,
974        right_columns: ColumnReferenceCount,
975        op: Operator,
976        schema: &'a SchemaRef,
977        required_columns: &'a mut RequiredColumns,
978    ) -> Result<Self> {
979        // find column name; input could be a more complicated expression
980        let (column_expr, scalar_expr, column, correct_operator) = match (
981            left_columns,
982            right_columns,
983        ) {
984            (ColumnReferenceCount::One(column), ColumnReferenceCount::Zero) => {
985                (left, right, column, op)
986            }
987            (ColumnReferenceCount::Zero, ColumnReferenceCount::One(column)) => {
988                (right, left, column, reverse_operator(op)?)
989            }
990            (ColumnReferenceCount::One(_), ColumnReferenceCount::One(_)) => {
991                // both sides have one column - not supported
992                return plan_err!(
993                    "Expression not supported for pruning: left has 1 column, right has 1 column"
994                );
995            }
996            (ColumnReferenceCount::Zero, ColumnReferenceCount::Zero) => {
997                // both sides are literals - should be handled before calling try_new
998                return plan_err!(
999                    "Pruning literal expressions is not supported, please call PhysicalExprSimplifier first"
1000                );
1001            }
1002            (ColumnReferenceCount::Many, _) | (_, ColumnReferenceCount::Many) => {
1003                return plan_err!(
1004                    "Expression not supported for pruning: left or right has multiple columns"
1005                );
1006            }
1007        };
1008
1009        let df_schema = DFSchema::try_from(Arc::clone(schema))?;
1010        let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1011            column_expr,
1012            correct_operator,
1013            scalar_expr,
1014            df_schema,
1015        )?;
1016        let field = match schema.column_with_name(column.name()) {
1017            Some((_, f)) => f,
1018            _ => {
1019                return plan_err!("Field not found in schema");
1020            }
1021        };
1022
1023        Ok(Self {
1024            column,
1025            column_expr,
1026            op: correct_operator,
1027            scalar_expr,
1028            field,
1029            required_columns,
1030        })
1031    }
1032
1033    fn op(&self) -> Operator {
1034        self.op
1035    }
1036
1037    fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1038        &self.scalar_expr
1039    }
1040
1041    fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1042        self.required_columns
1043            .min_column_expr(&self.column, &self.column_expr, self.field)
1044    }
1045
1046    fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1047        self.required_columns
1048            .max_column_expr(&self.column, &self.column_expr, self.field)
1049    }
1050
1051    /// This function is to simply retune the `null_count` physical expression no matter what the
1052    /// predicate expression is
1053    ///
1054    /// i.e., x > 5 => x_null_count,
1055    ///       cast(x as int) < 10 => x_null_count,
1056    ///       try_cast(x as float) < 10.0 => x_null_count
1057    fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1058        // Retune to [`phys_expr::Column`]
1059        let column_expr = Arc::new(self.column.clone()) as _;
1060
1061        // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1062        let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1063
1064        self.required_columns.null_count_column_expr(
1065            &self.column,
1066            &column_expr,
1067            null_count_field,
1068        )
1069    }
1070
1071    /// This function is to simply retune the `row_count` physical expression no matter what the
1072    /// predicate expression is
1073    ///
1074    /// i.e., x > 5 => x_row_count,
1075    ///       cast(x as int) < 10 => x_row_count,
1076    ///       try_cast(x as float) < 10.0 => x_row_count
1077    fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1078        // Retune to [`phys_expr::Column`]
1079        let column_expr = Arc::new(self.column.clone()) as _;
1080
1081        // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1082        let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1083
1084        self.required_columns.row_count_column_expr(
1085            &self.column,
1086            &column_expr,
1087            row_count_field,
1088        )
1089    }
1090}
1091
1092/// This function is designed to rewrite the column_expr to
1093/// ensure the column_expr is monotonically increasing.
1094///
1095/// For example,
1096/// 1. `col > 10`
1097/// 2. `-col > 10` should be rewritten to `col < -10`
1098/// 3. `!col = true` would be rewritten to `col = !true`
1099/// 4. `abs(a - 10) > 0` not supported
1100/// 5. `cast(can_prunable_expr) > 10`
1101/// 6. `try_cast(can_prunable_expr) > 10`
1102///
1103/// More rewrite rules are still in progress.
1104fn rewrite_expr_to_prunable(
1105    column_expr: &PhysicalExprRef,
1106    op: Operator,
1107    scalar_expr: &PhysicalExprRef,
1108    schema: DFSchema,
1109) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1110    if !is_compare_op(op) {
1111        return plan_err!("rewrite_expr_to_prunable only support compare expression");
1112    }
1113
1114    if column_expr.downcast_ref::<phys_expr::Column>().is_some() {
1115        // `col op lit()`
1116        Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1117    } else if let Some(cast) = column_expr.downcast_ref::<phys_expr::CastExpr>() {
1118        // `cast(col) op lit()`
1119        let (left, op, right) = rewrite_cast_child_to_prunable(
1120            cast.expr(),
1121            cast.cast_type(),
1122            op,
1123            scalar_expr,
1124            schema,
1125        )?;
1126        let left = Arc::new(phys_expr::CastExpr::new_with_target_field(
1127            left,
1128            Arc::clone(cast.target_field()),
1129            None,
1130        ));
1131        // PruningPredicate does not support pruning on nested fields yet.
1132        // End-to-end nested-field pruning also requires Parquet statistics
1133        // extraction to agree with PruningPredicate on a stats representation
1134        // for nested field expressions.
1135        Ok((left, op, right))
1136    } else if let Some(try_cast) = column_expr.downcast_ref::<phys_expr::TryCastExpr>() {
1137        // `try_cast(col) op lit()`
1138        let (left, op, right) = rewrite_cast_child_to_prunable(
1139            try_cast.expr(),
1140            try_cast.cast_type(),
1141            op,
1142            scalar_expr,
1143            schema,
1144        )?;
1145        let left = Arc::new(phys_expr::TryCastExpr::new(
1146            left,
1147            try_cast.cast_type().clone(),
1148        ));
1149        Ok((left, op, right))
1150    } else if let Some(neg) = column_expr.downcast_ref::<phys_expr::NegativeExpr>() {
1151        // `-col > lit()`  --> `col < -lit()`
1152        let (left, op, right) =
1153            rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1154        let right = Arc::new(phys_expr::NegativeExpr::new(right));
1155        Ok((left, reverse_operator(op)?, right))
1156    } else if let Some(not) = column_expr.downcast_ref::<phys_expr::NotExpr>() {
1157        // `!col = true` --> `col = !true`
1158        if !matches!(
1159            op,
1160            Operator::Eq
1161                | Operator::NotEq
1162                | Operator::IsDistinctFrom
1163                | Operator::IsNotDistinctFrom
1164        ) {
1165            return plan_err!(
1166                "Not with operator other than Eq / NotEq / IsDistinctFrom / IsNotDistinctFrom is not supported"
1167            );
1168        }
1169        if not.arg().downcast_ref::<phys_expr::Column>().is_some() {
1170            let left = Arc::clone(not.arg());
1171            let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1172            Ok((left, reverse_operator(op)?, right))
1173        } else {
1174            plan_err!("Not with complex expression {column_expr:?} is not supported")
1175        }
1176    } else {
1177        plan_err!("column expression {column_expr:?} is not supported")
1178    }
1179}
1180
1181fn rewrite_cast_child_to_prunable(
1182    cast_child_expr: &PhysicalExprRef,
1183    cast_type: &DataType,
1184    op: Operator,
1185    scalar_expr: &PhysicalExprRef,
1186    schema: DFSchema,
1187) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1188    verify_support_type_for_prune(
1189        &cast_child_expr.data_type(schema.as_arrow())?,
1190        cast_type,
1191    )?;
1192    rewrite_expr_to_prunable(cast_child_expr, op, scalar_expr, schema)
1193}
1194
1195fn is_compare_op(op: Operator) -> bool {
1196    matches!(
1197        op,
1198        Operator::Eq
1199            | Operator::NotEq
1200            | Operator::Lt
1201            | Operator::LtEq
1202            | Operator::Gt
1203            | Operator::GtEq
1204            | Operator::IsDistinctFrom
1205            | Operator::IsNotDistinctFrom
1206            | Operator::LikeMatch
1207            | Operator::NotLikeMatch
1208    )
1209}
1210
1211// The pruning logic is based on the comparing the min/max bounds.
1212// Must make sure the two type has order.
1213// For example, casts from string to numbers is not correct.
1214// Because the "13" is less than "3" with UTF8 comparison order.
1215fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1216    // Dictionary casts are always supported as long as the value types are supported
1217    let from_type = match from_type {
1218        DataType::Dictionary(_, t) => {
1219            return verify_support_type_for_prune(t.as_ref(), to_type);
1220        }
1221        _ => from_type,
1222    };
1223    let to_type = match to_type {
1224        DataType::Dictionary(_, t) => {
1225            return verify_support_type_for_prune(from_type, t.as_ref());
1226        }
1227        _ => to_type,
1228    };
1229    // If both types are strings or both are not strings (number, timestamp, etc)
1230    // then we can compare them.
1231    // PruningPredicate does not support casting of strings to numbers and such.
1232    if from_type.is_string() == to_type.is_string() {
1233        Ok(())
1234    } else {
1235        plan_err!(
1236            "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1237        )
1238    }
1239}
1240
1241/// replaces a column with an old name with a new name in an expression
1242fn rewrite_column_expr(
1243    e: Arc<dyn PhysicalExpr>,
1244    column_old: &phys_expr::Column,
1245    column_new: &phys_expr::Column,
1246) -> Result<Arc<dyn PhysicalExpr>> {
1247    e.transform(|expr| {
1248        if let Some(column) = expr.downcast_ref::<phys_expr::Column>()
1249            && column == column_old
1250        {
1251            return Ok(Transformed::yes(Arc::new(column_new.clone())));
1252        }
1253
1254        Ok(Transformed::no(expr))
1255    })
1256    .data()
1257}
1258
1259fn reverse_operator(op: Operator) -> Result<Operator> {
1260    op.swap().ok_or_else(|| {
1261        internal_datafusion_err!(
1262            "Could not reverse operator {op} while building pruning predicate"
1263        )
1264    })
1265}
1266
1267/// Given a column reference to `column`, returns a pruning
1268/// expression in terms of the min and max that will evaluate to true
1269/// if the column may contain values, and false if definitely does not
1270/// contain values
1271fn build_single_column_expr(
1272    column: &phys_expr::Column,
1273    schema: &Schema,
1274    required_columns: &mut RequiredColumns,
1275    is_not: bool, // if true, treat as !col
1276) -> Option<Arc<dyn PhysicalExpr>> {
1277    let field = schema.field_with_name(column.name()).ok()?;
1278
1279    if *field.data_type() == DataType::Boolean {
1280        let col_ref = Arc::new(column.clone()) as _;
1281
1282        let min = required_columns
1283            .min_column_expr(column, &col_ref, field)
1284            .ok()?;
1285        let max = required_columns
1286            .max_column_expr(column, &col_ref, field)
1287            .ok()?;
1288
1289        // remember -- we want an expression that is:
1290        // TRUE: if there may be rows that match
1291        // FALSE: if there are no rows that match
1292        if is_not {
1293            // The only way we know a column couldn't match is if both the min and max are true
1294            // !(min && max)
1295            Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1296                phys_expr::BinaryExpr::new(min, Operator::And, max),
1297            ))))
1298        } else {
1299            // the only way we know a column couldn't match is if both the min and max are false
1300            // !(!min && !max) --> min || max
1301            Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1302        }
1303    } else {
1304        None
1305    }
1306}
1307
1308/// Given an expression reference to `expr`, if `expr` is a column expression,
1309/// returns a pruning expression in terms of IsNull that will evaluate to true
1310/// if the column may contain null, and false if definitely does not
1311/// contain null.
1312/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1313/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1314/// at least one NULL value.  In this case we can know that `IS NOT NULL` can not be true and
1315/// thus can prune the row group / value
1316fn build_is_null_column_expr(
1317    expr: &Arc<dyn PhysicalExpr>,
1318    schema: &Schema,
1319    required_columns: &mut RequiredColumns,
1320    with_not: bool,
1321) -> Option<Arc<dyn PhysicalExpr>> {
1322    if let Some(col) = expr.downcast_ref::<phys_expr::Column>() {
1323        let field = schema.field_with_name(col.name()).ok()?;
1324
1325        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1326        if with_not {
1327            if let Ok(row_count_expr) =
1328                required_columns.row_count_column_expr(col, expr, null_count_field)
1329            {
1330                required_columns
1331                    .null_count_column_expr(col, expr, null_count_field)
1332                    .map(|null_count_column_expr| {
1333                        // IsNotNull(column) => null_count != row_count
1334                        Arc::new(phys_expr::BinaryExpr::new(
1335                            null_count_column_expr,
1336                            Operator::NotEq,
1337                            row_count_expr,
1338                        )) as _
1339                    })
1340                    .ok()
1341            } else {
1342                None
1343            }
1344        } else {
1345            required_columns
1346                .null_count_column_expr(col, expr, null_count_field)
1347                .map(|null_count_column_expr| {
1348                    // IsNull(column) => null_count > 0
1349                    Arc::new(phys_expr::BinaryExpr::new(
1350                        null_count_column_expr,
1351                        Operator::Gt,
1352                        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1353                    )) as _
1354                })
1355                .ok()
1356        }
1357    } else {
1358        None
1359    }
1360}
1361
1362/// The maximum number of entries in an `InList` that might be rewritten into
1363/// an OR chain
1364const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1365
1366/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1367/// for use as a [`PruningPredicate`].
1368pub struct PredicateRewriter {
1369    unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1370}
1371
1372impl Default for PredicateRewriter {
1373    fn default() -> Self {
1374        Self {
1375            unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1376        }
1377    }
1378}
1379
1380impl PredicateRewriter {
1381    /// Create a new `PredicateRewriter`
1382    pub fn new() -> Self {
1383        Self::default()
1384    }
1385
1386    /// Set the unhandled hook to be used when a predicate can not be rewritten
1387    pub fn with_unhandled_hook(
1388        self,
1389        unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1390    ) -> Self {
1391        Self { unhandled_hook }
1392    }
1393
1394    /// Translate logical filter expression into pruning predicate
1395    /// expression that will evaluate to FALSE if it can be determined no
1396    /// rows between the min/max values could pass the predicates.
1397    ///
1398    /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1399    ///
1400    /// Returns the pruning predicate as an [`PhysicalExpr`]
1401    ///
1402    /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1403    pub fn rewrite_predicate_to_statistics_predicate(
1404        &self,
1405        expr: &Arc<dyn PhysicalExpr>,
1406        schema: &Schema,
1407    ) -> Arc<dyn PhysicalExpr> {
1408        let mut required_columns = RequiredColumns::new();
1409        build_predicate_expression(
1410            expr,
1411            &Arc::new(schema.clone()),
1412            &mut required_columns,
1413            &self.unhandled_hook,
1414        )
1415    }
1416}
1417
1418/// Translate logical filter expression into pruning predicate
1419/// expression that will evaluate to FALSE if it can be determined no
1420/// rows between the min/max values could pass the predicates.
1421///
1422/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1423///
1424/// Returns the pruning predicate as an [`PhysicalExpr`]
1425///
1426/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1427fn build_predicate_expression(
1428    expr: &Arc<dyn PhysicalExpr>,
1429    schema: &SchemaRef,
1430    required_columns: &mut RequiredColumns,
1431    unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1432) -> Arc<dyn PhysicalExpr> {
1433    if is_always_false(expr) {
1434        // Shouldn't return `unhandled_hook.handle(expr)`
1435        // Because it will transfer false to true.
1436        return Arc::clone(expr);
1437    }
1438    // predicate expression can only be a binary expression
1439    if let Some(is_null) = expr.downcast_ref::<phys_expr::IsNullExpr>() {
1440        return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1441            .unwrap_or_else(|| unhandled_hook.handle(expr));
1442    }
1443    if let Some(is_not_null) = expr.downcast_ref::<phys_expr::IsNotNullExpr>() {
1444        return build_is_null_column_expr(
1445            is_not_null.arg(),
1446            schema,
1447            required_columns,
1448            true,
1449        )
1450        .unwrap_or_else(|| unhandled_hook.handle(expr));
1451    }
1452    if let Some(col) = expr.downcast_ref::<phys_expr::Column>() {
1453        return build_single_column_expr(col, schema, required_columns, false)
1454            .unwrap_or_else(|| unhandled_hook.handle(expr));
1455    }
1456    if let Some(not) = expr.downcast_ref::<phys_expr::NotExpr>() {
1457        // match !col (don't do so recursively)
1458        if let Some(col) = not.arg().downcast_ref::<phys_expr::Column>() {
1459            return build_single_column_expr(col, schema, required_columns, true)
1460                .unwrap_or_else(|| unhandled_hook.handle(expr));
1461        } else {
1462            return unhandled_hook.handle(expr);
1463        }
1464    }
1465    if let Some(in_list) = expr.downcast_ref::<phys_expr::InListExpr>() {
1466        if !in_list.list().is_empty()
1467            && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1468        {
1469            let eq_op = if in_list.negated() {
1470                Operator::NotEq
1471            } else {
1472                Operator::Eq
1473            };
1474            let re_op = if in_list.negated() {
1475                Operator::And
1476            } else {
1477                Operator::Or
1478            };
1479            let change_expr = in_list
1480                .list()
1481                .iter()
1482                .map(|e| {
1483                    Arc::new(phys_expr::BinaryExpr::new(
1484                        Arc::clone(in_list.expr()),
1485                        eq_op,
1486                        Arc::clone(e),
1487                    )) as _
1488                })
1489                .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1490                .unwrap();
1491            return build_predicate_expression(
1492                &change_expr,
1493                schema,
1494                required_columns,
1495                unhandled_hook,
1496            );
1497        } else {
1498            return unhandled_hook.handle(expr);
1499        }
1500    }
1501
1502    let (left, op, right) = {
1503        if let Some(bin_expr) = expr.downcast_ref::<phys_expr::BinaryExpr>() {
1504            (
1505                Arc::clone(bin_expr.left()),
1506                *bin_expr.op(),
1507                Arc::clone(bin_expr.right()),
1508            )
1509        } else if let Some(like_expr) = expr.downcast_ref::<phys_expr::LikeExpr>() {
1510            if like_expr.case_insensitive() {
1511                return unhandled_hook.handle(expr);
1512            }
1513            let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1514                (false, false) => Operator::LikeMatch,
1515                (true, false) => Operator::NotLikeMatch,
1516                (false, true) => Operator::ILikeMatch,
1517                (true, true) => Operator::NotILikeMatch,
1518            };
1519            (
1520                Arc::clone(like_expr.expr()),
1521                op,
1522                Arc::clone(like_expr.pattern()),
1523            )
1524        } else {
1525            return unhandled_hook.handle(expr);
1526        }
1527    };
1528
1529    if op == Operator::And || op == Operator::Or {
1530        let left_expr =
1531            build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1532        let right_expr =
1533            build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1534        // simplify boolean expression if applicable
1535        let expr = match (&left_expr, op, &right_expr) {
1536            (left, Operator::And, right)
1537                if is_always_false(left) || is_always_false(right) =>
1538            {
1539                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1540            }
1541            (left, Operator::And, _) if is_always_true(left) => right_expr,
1542            (_, Operator::And, right) if is_always_true(right) => left_expr,
1543            (left, Operator::Or, right)
1544                if is_always_true(left) || is_always_true(right) =>
1545            {
1546                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1547            }
1548            (left, Operator::Or, _) if is_always_false(left) => right_expr,
1549            (_, Operator::Or, right) if is_always_false(right) => left_expr,
1550
1551            _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1552        };
1553        return expr;
1554    }
1555
1556    let left_columns = ColumnReferenceCount::from_expression(&left);
1557    let right_columns = ColumnReferenceCount::from_expression(&right);
1558    let expr_builder = PruningExpressionBuilder::try_new(
1559        &left,
1560        &right,
1561        left_columns,
1562        right_columns,
1563        op,
1564        schema,
1565        required_columns,
1566    );
1567    let mut expr_builder = match expr_builder {
1568        Ok(builder) => builder,
1569        // allow partial failure in predicate expression generation
1570        // this can still produce a useful predicate when multiple conditions are joined using AND
1571        Err(e) => {
1572            debug!("Error building pruning expression: {e}");
1573            return unhandled_hook.handle(expr);
1574        }
1575    };
1576
1577    build_statistics_expr(&mut expr_builder)
1578        .unwrap_or_else(|_| unhandled_hook.handle(expr))
1579}
1580
1581/// Count of distinct column references in an expression.
1582/// This is the same as [`collect_columns`] but optimized to stop counting
1583/// once more than one distinct column is found.
1584///
1585/// For example, in expression `col1 + col2`, the count is `Many`.
1586/// In expression `col1 + 5`, the count is `One`.
1587/// In expression `5 + 10`, the count is `Zero`.
1588///
1589/// [`collect_columns`]: datafusion_physical_expr::utils::collect_columns
1590#[derive(Debug, PartialEq, Eq)]
1591enum ColumnReferenceCount {
1592    /// no column references
1593    Zero,
1594    /// Only one column reference
1595    One(phys_expr::Column),
1596    /// More than one column reference
1597    Many,
1598}
1599
1600impl ColumnReferenceCount {
1601    /// Count the number of distinct column references in an expression
1602    fn from_expression(expr: &Arc<dyn PhysicalExpr>) -> Self {
1603        let mut seen = HashSet::<phys_expr::Column>::new();
1604        expr.apply(|expr| {
1605            if let Some(column) = expr.downcast_ref::<phys_expr::Column>() {
1606                seen.insert(column.clone());
1607                if seen.len() > 1 {
1608                    return Ok(TreeNodeRecursion::Stop);
1609                }
1610            }
1611            Ok(TreeNodeRecursion::Continue)
1612        })
1613        // pre_visit always returns OK, so this will always too
1614        .expect("no way to return error during recursion");
1615        match seen.len() {
1616            0 => ColumnReferenceCount::Zero,
1617            1 => ColumnReferenceCount::One(
1618                seen.into_iter().next().expect("just checked len==1"),
1619            ),
1620            _ => ColumnReferenceCount::Many,
1621        }
1622    }
1623}
1624
1625fn build_statistics_expr(
1626    expr_builder: &mut PruningExpressionBuilder,
1627) -> Result<Arc<dyn PhysicalExpr>> {
1628    let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1629        Operator::NotEq => build_ne_statistics_expr(expr_builder)?,
1630        Operator::Eq => {
1631            // column = literal => (min, max) = literal => min <= literal && literal <= max
1632            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1633            build_eq_statistics_expr(expr_builder)?
1634        }
1635        Operator::IsDistinctFrom => return build_is_distinct_from(expr_builder),
1636        Operator::IsNotDistinctFrom => return build_is_not_distinct_from(expr_builder),
1637        Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1638        Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1639            plan_datafusion_err!(
1640                "LIKE expression with wildcard at the beginning is not supported"
1641            )
1642        })?,
1643        Operator::Gt => {
1644            // column > literal => (min, max) > literal => max > literal
1645            Arc::new(phys_expr::BinaryExpr::new(
1646                expr_builder.max_column_expr()?,
1647                Operator::Gt,
1648                Arc::clone(expr_builder.scalar_expr()),
1649            ))
1650        }
1651        Operator::GtEq => {
1652            // column >= literal => (min, max) >= literal => max >= literal
1653            Arc::new(phys_expr::BinaryExpr::new(
1654                expr_builder.max_column_expr()?,
1655                Operator::GtEq,
1656                Arc::clone(expr_builder.scalar_expr()),
1657            ))
1658        }
1659        Operator::Lt => {
1660            // column < literal => (min, max) < literal => min < literal
1661            Arc::new(phys_expr::BinaryExpr::new(
1662                expr_builder.min_column_expr()?,
1663                Operator::Lt,
1664                Arc::clone(expr_builder.scalar_expr()),
1665            ))
1666        }
1667        Operator::LtEq => {
1668            // column <= literal => (min, max) <= literal => min <= literal
1669            Arc::new(phys_expr::BinaryExpr::new(
1670                expr_builder.min_column_expr()?,
1671                Operator::LtEq,
1672                Arc::clone(expr_builder.scalar_expr()),
1673            ))
1674        }
1675        // other expressions are not supported
1676        _ => {
1677            return plan_err!(
1678                "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1679            );
1680        }
1681    };
1682    let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1683    Ok(statistics_expr)
1684}
1685
1686fn binary_expr(
1687    left: Arc<dyn PhysicalExpr>,
1688    op: Operator,
1689    right: Arc<dyn PhysicalExpr>,
1690) -> Arc<dyn PhysicalExpr> {
1691    Arc::new(phys_expr::BinaryExpr::new(left, op, right))
1692}
1693
1694fn and_expr(
1695    left: Arc<dyn PhysicalExpr>,
1696    right: Arc<dyn PhysicalExpr>,
1697) -> Arc<dyn PhysicalExpr> {
1698    binary_expr(left, Operator::And, right)
1699}
1700
1701fn or_expr(
1702    left: Arc<dyn PhysicalExpr>,
1703    right: Arc<dyn PhysicalExpr>,
1704) -> Arc<dyn PhysicalExpr> {
1705    binary_expr(left, Operator::Or, right)
1706}
1707
1708fn build_eq_statistics_expr(
1709    expr_builder: &mut PruningExpressionBuilder,
1710) -> Result<Arc<dyn PhysicalExpr>> {
1711    let min_column_expr = expr_builder.min_column_expr()?;
1712    let max_column_expr = expr_builder.max_column_expr()?;
1713    Ok(and_expr(
1714        binary_expr(
1715            min_column_expr,
1716            Operator::LtEq,
1717            Arc::clone(expr_builder.scalar_expr()),
1718        ),
1719        binary_expr(
1720            Arc::clone(expr_builder.scalar_expr()),
1721            Operator::LtEq,
1722            max_column_expr,
1723        ),
1724    ))
1725}
1726
1727fn build_ne_statistics_expr(
1728    expr_builder: &mut PruningExpressionBuilder,
1729) -> Result<Arc<dyn PhysicalExpr>> {
1730    let min_column_expr = expr_builder.min_column_expr()?;
1731    let max_column_expr = expr_builder.max_column_expr()?;
1732    Ok(or_expr(
1733        binary_expr(
1734            min_column_expr,
1735            Operator::NotEq,
1736            Arc::clone(expr_builder.scalar_expr()),
1737        ),
1738        binary_expr(
1739            Arc::clone(expr_builder.scalar_expr()),
1740            Operator::NotEq,
1741            max_column_expr,
1742        ),
1743    ))
1744}
1745
1746fn column_has_nulls_expr(
1747    expr_builder: &mut PruningExpressionBuilder,
1748) -> Result<Arc<dyn PhysicalExpr>> {
1749    Ok(binary_expr(
1750        expr_builder.null_count_column_expr()?,
1751        Operator::Gt,
1752        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1753    ))
1754}
1755
1756fn column_has_non_nulls_expr(
1757    expr_builder: &mut PruningExpressionBuilder,
1758) -> Result<Arc<dyn PhysicalExpr>> {
1759    Ok(binary_expr(
1760        expr_builder.null_count_column_expr()?,
1761        Operator::NotEq,
1762        expr_builder.row_count_column_expr()?,
1763    ))
1764}
1765
1766fn build_is_distinct_from(
1767    expr_builder: &mut PruningExpressionBuilder,
1768) -> Result<Arc<dyn PhysicalExpr>> {
1769    let scalar_expr = Arc::clone(expr_builder.scalar_expr());
1770
1771    Ok(or_expr(
1772        and_expr(
1773            Arc::new(phys_expr::IsNullExpr::new(Arc::clone(&scalar_expr))),
1774            column_has_non_nulls_expr(expr_builder)?,
1775        ),
1776        and_expr(
1777            Arc::new(phys_expr::IsNotNullExpr::new(scalar_expr)),
1778            or_expr(
1779                column_has_nulls_expr(expr_builder)?,
1780                build_ne_statistics_expr(expr_builder)?,
1781            ),
1782        ),
1783    ))
1784}
1785
1786fn build_is_not_distinct_from(
1787    expr_builder: &mut PruningExpressionBuilder,
1788) -> Result<Arc<dyn PhysicalExpr>> {
1789    let scalar_expr = Arc::clone(expr_builder.scalar_expr());
1790
1791    Ok(or_expr(
1792        and_expr(
1793            Arc::new(phys_expr::IsNullExpr::new(Arc::clone(&scalar_expr))),
1794            column_has_nulls_expr(expr_builder)?,
1795        ),
1796        and_expr(
1797            Arc::new(phys_expr::IsNotNullExpr::new(scalar_expr)),
1798            and_expr(
1799                column_has_non_nulls_expr(expr_builder)?,
1800                build_eq_statistics_expr(expr_builder)?,
1801            ),
1802        ),
1803    ))
1804}
1805
1806/// returns the string literal of the scalar value if it is a string
1807fn unpack_string(s: &ScalarValue) -> Option<&str> {
1808    s.try_as_str().flatten()
1809}
1810
1811fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1812    if let Some(lit) = expr.downcast_ref::<phys_expr::Literal>() {
1813        let s = unpack_string(lit.value())?;
1814        return Some(s);
1815    }
1816    None
1817}
1818
1819/// Convert `column LIKE literal` where P is a constant prefix of the literal
1820/// to a range check on the column: `P <= column && column < P'`, where P' is the
1821/// lowest string after all P* strings.
1822fn build_like_match(
1823    expr_builder: &mut PruningExpressionBuilder,
1824) -> Option<Arc<dyn PhysicalExpr>> {
1825    // column LIKE literal => (min, max) LIKE literal split at unescaped % => min <= split literal && split literal <= max
1826    // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1827    // column LIKE 'foo\_%' => min <= 'foo_' && 'foo_' <= max (the _ is escaped)
1828    // column LIKE 'foo\%%' => min <= 'foo%' && 'foo%' <= max (the % is escaped)
1829    // column LIKE '%foo' => min <= '' && '' <= max => true
1830    // column LIKE '%foo%' => min <= '' && '' <= max => true
1831    // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1832
1833    // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1834    //  this may involve building the physical expressions that call lower() and upper()
1835    let min_column_expr = expr_builder.min_column_expr().ok()?;
1836    let max_column_expr = expr_builder.max_column_expr().ok()?;
1837    let scalar_expr = expr_builder.scalar_expr();
1838    // check that the scalar is a string literal
1839    let s = extract_string_literal(scalar_expr)?;
1840    // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1841    let (decoded_prefix, rest) = split_constant_prefix(s);
1842    let has_wildcard = !rest.is_empty();
1843    if has_wildcard && decoded_prefix.is_empty() {
1844        // there's no filtering we could possibly do, return None and have this be handled by the unhandled hook
1845        return None;
1846    }
1847    let (lower_bound, upper_bound) = if has_wildcard {
1848        let incremented_prefix = increment_utf8(&decoded_prefix)?;
1849        let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1850            decoded_prefix,
1851        ))));
1852        let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1853            incremented_prefix,
1854        ))));
1855        (lower_bound_lit, upper_bound_lit)
1856    } else {
1857        // the like expression is a literal and can be converted into a comparison
1858        let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1859            decoded_prefix,
1860        ))));
1861        (Arc::clone(&bound), bound)
1862    };
1863    let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1864        lower_bound,
1865        Operator::LtEq,
1866        Arc::clone(&max_column_expr),
1867    ));
1868    let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1869        Arc::clone(&min_column_expr),
1870        Operator::LtEq,
1871        upper_bound,
1872    ));
1873    let combined = Arc::new(phys_expr::BinaryExpr::new(
1874        upper_bound_expr,
1875        Operator::And,
1876        lower_bound_expr,
1877    ));
1878    Some(combined)
1879}
1880
1881// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1882//
1883// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1884// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1885// looking for rows that don't begin with `const_prefix` can never be true)
1886fn build_not_like_match(
1887    expr_builder: &mut PruningExpressionBuilder<'_>,
1888) -> Result<Arc<dyn PhysicalExpr>> {
1889    // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1890
1891    let min_column_expr = expr_builder.min_column_expr()?;
1892    let max_column_expr = expr_builder.max_column_expr()?;
1893
1894    let scalar_expr = expr_builder.scalar_expr();
1895
1896    let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1897        plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1898    })?;
1899
1900    let (const_prefix, remaining) = split_constant_prefix(pattern);
1901    if const_prefix.is_empty() || remaining != "%" {
1902        // we can not handle `%` at the beginning or in the middle of the pattern
1903        // Example: For pattern "foo%bar", the row group might include values like
1904        // ["foobar", "food", "foodbar"], making it unsafe to prune.
1905        // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1906        // match the pattern, intermediate values like "food" may not
1907        // match the full pattern "foo%bar", making pruning unsafe.
1908        // (truncate foo%bar to foo% have same problem)
1909
1910        // we can not handle pattern containing `_`
1911        // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1912        // which means not every row is guaranteed to match the pattern.
1913        return Err(plan_datafusion_err!(
1914            "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1915        ));
1916    }
1917
1918    let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1919        true,
1920        false,
1921        Arc::clone(&min_column_expr),
1922        Arc::clone(scalar_expr),
1923    ));
1924
1925    let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1926        true,
1927        false,
1928        Arc::clone(&max_column_expr),
1929        Arc::clone(scalar_expr),
1930    ));
1931
1932    Ok(Arc::new(phys_expr::BinaryExpr::new(
1933        min_col_not_like_epxr,
1934        Operator::Or,
1935        max_col_not_like_expr,
1936    )))
1937}
1938
1939/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1940fn split_constant_prefix(pattern: &str) -> (String, &str) {
1941    let mut prefix = String::with_capacity(pattern.len());
1942    let mut iter = pattern.char_indices();
1943    while let Some((idx, c)) = iter.next() {
1944        match c {
1945            '%' | '_' => return (prefix, &pattern[idx..]),
1946            '\\' => match iter.next() {
1947                Some((_, escaped)) => prefix.push(escaped),
1948                None => prefix.push('\\'),
1949            },
1950            _ => prefix.push(c),
1951        }
1952    }
1953    (prefix, "")
1954}
1955
1956/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1957/// This makes it so that the returned string will always compare greater than the input string
1958/// or any other string with the same prefix.
1959/// This is necessary since the statistics may have been truncated: if we have a min statistic
1960/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1961/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1962/// In this example `increment_utf8("foo") == "fop"
1963fn increment_utf8(data: &str) -> Option<String> {
1964    // Helper function to check if a character is valid to use
1965    fn is_valid_unicode(c: char) -> bool {
1966        let cp = c as u32;
1967
1968        // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1969        if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1970            return false;
1971        }
1972
1973        // Filter out private use area
1974        if cp >= 0x110000 {
1975            return false;
1976        }
1977
1978        true
1979    }
1980
1981    // Convert string to vector of code points
1982    let mut code_points: Vec<char> = data.chars().collect();
1983
1984    // Work backwards through code points
1985    for idx in (0..code_points.len()).rev() {
1986        let original = code_points[idx] as u32;
1987
1988        // Try incrementing the code point
1989        if let Some(next_char) = char::from_u32(original + 1)
1990            && is_valid_unicode(next_char)
1991        {
1992            code_points[idx] = next_char;
1993            // truncate the string to the current index
1994            code_points.truncate(idx + 1);
1995            return Some(code_points.into_iter().collect());
1996        }
1997    }
1998
1999    None
2000}
2001
2002/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
2003///
2004/// This is important not only as an optimization but also because statistics may not be
2005/// accurate for columns that are all nulls.
2006/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
2007/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
2008///
2009/// For example:
2010///
2011/// `x_min <= 10 AND 10 <= x_max`
2012///
2013/// will become
2014///
2015/// ```sql
2016/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
2017/// ````
2018///
2019/// If the column is known to be all nulls, then the expression
2020/// `x_null_count = x_row_count` will be true, which will cause the
2021/// boolean expression to return false. Therefore, prune out the container.
2022fn wrap_null_count_check_expr(
2023    statistics_expr: Arc<dyn PhysicalExpr>,
2024    expr_builder: &mut PruningExpressionBuilder,
2025) -> Result<Arc<dyn PhysicalExpr>> {
2026    // (x_null_count != x_row_count) AND (<statistics_expr>)
2027    Ok(and_expr(
2028        column_has_non_nulls_expr(expr_builder)?,
2029        statistics_expr,
2030    ))
2031}
2032
2033#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2034pub(crate) enum StatisticsType {
2035    Min,
2036    Max,
2037    NullCount,
2038    RowCount,
2039}
2040
2041#[cfg(test)]
2042mod tests {
2043    use std::collections::HashMap;
2044    use std::ops::{Not, Rem};
2045
2046    use super::*;
2047    use datafusion_common::test_util::batches_to_string;
2048    use datafusion_expr::{and, col, lit, or};
2049    use datafusion_physical_expr::utils::collect_columns;
2050    use insta::assert_snapshot;
2051
2052    use arrow::array::Decimal128Array;
2053    use arrow::{
2054        array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
2055        datatypes::TimeUnit,
2056    };
2057    use datafusion_expr::expr::InList;
2058    use datafusion_expr::{BinaryExpr, Expr, cast, is_null, try_cast};
2059    use datafusion_functions_nested::expr_fn::{array_has, make_array};
2060    use datafusion_physical_expr::expressions::{
2061        self as phys_expr, DynamicFilterPhysicalExpr,
2062    };
2063    use datafusion_physical_expr::planner::logical2physical;
2064    use itertools::Itertools;
2065
2066    #[derive(Debug, Default)]
2067    /// Mock statistic provider for tests
2068    ///
2069    /// Each row represents the statistics for a "container" (which
2070    /// might represent an entire parquet file, or directory of files,
2071    /// or some other collection of data for which we had statistics)
2072    ///
2073    /// Note All `ArrayRefs` must be the same size.
2074    struct ContainerStats {
2075        min: Option<ArrayRef>,
2076        max: Option<ArrayRef>,
2077        /// Optional values
2078        null_counts: Option<ArrayRef>,
2079        row_counts: Option<ArrayRef>,
2080        /// Optional known values (e.g. mimic a bloom filter)
2081        /// (value, contained)
2082        /// If present, all BooleanArrays must be the same size as min/max
2083        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
2084    }
2085
2086    impl ContainerStats {
2087        fn new() -> Self {
2088            Default::default()
2089        }
2090        fn new_decimal128(
2091            min: impl IntoIterator<Item = Option<i128>>,
2092            max: impl IntoIterator<Item = Option<i128>>,
2093            precision: u8,
2094            scale: i8,
2095        ) -> Self {
2096            Self::new()
2097                .with_min(Arc::new(
2098                    min.into_iter()
2099                        .collect::<Decimal128Array>()
2100                        .with_precision_and_scale(precision, scale)
2101                        .unwrap(),
2102                ))
2103                .with_max(Arc::new(
2104                    max.into_iter()
2105                        .collect::<Decimal128Array>()
2106                        .with_precision_and_scale(precision, scale)
2107                        .unwrap(),
2108                ))
2109        }
2110
2111        fn new_i64(
2112            min: impl IntoIterator<Item = Option<i64>>,
2113            max: impl IntoIterator<Item = Option<i64>>,
2114        ) -> Self {
2115            Self::new()
2116                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
2117                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
2118        }
2119
2120        fn new_i32(
2121            min: impl IntoIterator<Item = Option<i32>>,
2122            max: impl IntoIterator<Item = Option<i32>>,
2123        ) -> Self {
2124            Self::new()
2125                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
2126                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
2127        }
2128
2129        fn new_utf8<'a>(
2130            min: impl IntoIterator<Item = Option<&'a str>>,
2131            max: impl IntoIterator<Item = Option<&'a str>>,
2132        ) -> Self {
2133            Self::new()
2134                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
2135                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
2136        }
2137
2138        fn new_bool(
2139            min: impl IntoIterator<Item = Option<bool>>,
2140            max: impl IntoIterator<Item = Option<bool>>,
2141        ) -> Self {
2142            Self::new()
2143                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
2144                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
2145        }
2146
2147        fn min(&self) -> Option<ArrayRef> {
2148            self.min.clone()
2149        }
2150
2151        fn max(&self) -> Option<ArrayRef> {
2152            self.max.clone()
2153        }
2154
2155        fn null_counts(&self) -> Option<ArrayRef> {
2156            self.null_counts.clone()
2157        }
2158
2159        fn row_counts(&self) -> Option<ArrayRef> {
2160            self.row_counts.clone()
2161        }
2162
2163        /// return an iterator over all arrays in this statistics
2164        fn arrays(&self) -> Vec<ArrayRef> {
2165            let contained_arrays = self
2166                .contained
2167                .iter()
2168                .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2169
2170            [
2171                self.min.as_ref().cloned(),
2172                self.max.as_ref().cloned(),
2173                self.null_counts.as_ref().cloned(),
2174                self.row_counts.as_ref().cloned(),
2175            ]
2176            .into_iter()
2177            .flatten()
2178            .chain(contained_arrays)
2179            .collect()
2180        }
2181
2182        /// Returns the number of containers represented by this statistics This
2183        /// picks the length of the first array as all arrays must have the same
2184        /// length (which is verified by `assert_invariants`).
2185        fn len(&self) -> usize {
2186            // pick the first non zero length
2187            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2188        }
2189
2190        /// Ensure that the lengths of all arrays are consistent
2191        fn assert_invariants(&self) {
2192            let mut prev_len = None;
2193
2194            for len in self.arrays().iter().map(|a| a.len()) {
2195                // Get a length, if we don't already have one
2196                match prev_len {
2197                    None => {
2198                        prev_len = Some(len);
2199                    }
2200                    Some(prev_len) => {
2201                        assert_eq!(prev_len, len);
2202                    }
2203                }
2204            }
2205        }
2206
2207        /// Add min values
2208        fn with_min(mut self, min: ArrayRef) -> Self {
2209            self.min = Some(min);
2210            self
2211        }
2212
2213        /// Add max values
2214        fn with_max(mut self, max: ArrayRef) -> Self {
2215            self.max = Some(max);
2216            self
2217        }
2218
2219        /// Add null counts. There must be the same number of null counts as
2220        /// there are containers
2221        fn with_null_counts(
2222            mut self,
2223            counts: impl IntoIterator<Item = Option<u64>>,
2224        ) -> Self {
2225            let null_counts: ArrayRef =
2226                Arc::new(counts.into_iter().collect::<UInt64Array>());
2227
2228            self.assert_invariants();
2229            self.null_counts = Some(null_counts);
2230            self
2231        }
2232
2233        /// Add row counts. There must be the same number of row counts as
2234        /// there are containers
2235        fn with_row_counts(
2236            mut self,
2237            counts: impl IntoIterator<Item = Option<u64>>,
2238        ) -> Self {
2239            let row_counts: ArrayRef =
2240                Arc::new(counts.into_iter().collect::<UInt64Array>());
2241
2242            self.assert_invariants();
2243            self.row_counts = Some(row_counts);
2244            self
2245        }
2246
2247        /// Add contained information.
2248        #[allow(clippy::allow_attributes, clippy::mutable_key_type)] // ScalarValue has interior mutability but is intentionally used as hash key
2249        pub fn with_contained(
2250            mut self,
2251            values: impl IntoIterator<Item = ScalarValue>,
2252            contained: impl IntoIterator<Item = Option<bool>>,
2253        ) -> Self {
2254            let contained: BooleanArray = contained.into_iter().collect();
2255            let values: HashSet<_> = values.into_iter().collect();
2256
2257            self.contained.push((values, contained));
2258            self.assert_invariants();
2259            self
2260        }
2261
2262        /// get any contained information for the specified values
2263        #[allow(clippy::allow_attributes, clippy::mutable_key_type)] // ScalarValue has interior mutability but is intentionally used as hash key
2264        fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2265            // find the one with the matching values
2266            self.contained
2267                .iter()
2268                .find(|(values, _contained)| values == find_values)
2269                .map(|(_values, contained)| contained.clone())
2270        }
2271    }
2272
2273    #[derive(Debug, Default)]
2274    struct TestStatistics {
2275        // key: column name
2276        stats: HashMap<Column, ContainerStats>,
2277    }
2278
2279    impl TestStatistics {
2280        fn new() -> Self {
2281            Self::default()
2282        }
2283
2284        fn with(
2285            mut self,
2286            name: impl Into<String>,
2287            container_stats: ContainerStats,
2288        ) -> Self {
2289            let col = Column::from_name(name.into());
2290            self.stats.insert(col, container_stats);
2291            self
2292        }
2293
2294        /// Add null counts for the specified column.
2295        /// There must be the same number of null counts as
2296        /// there are containers
2297        fn with_null_counts(
2298            mut self,
2299            name: impl Into<String>,
2300            counts: impl IntoIterator<Item = Option<u64>>,
2301        ) -> Self {
2302            let col = Column::from_name(name.into());
2303
2304            // take stats out and update them
2305            let container_stats = self
2306                .stats
2307                .remove(&col)
2308                .unwrap_or_default()
2309                .with_null_counts(counts);
2310
2311            // put stats back in
2312            self.stats.insert(col, container_stats);
2313            self
2314        }
2315
2316        /// Add row counts for the specified column.
2317        /// There must be the same number of row counts as
2318        /// there are containers
2319        fn with_row_counts(
2320            mut self,
2321            name: impl Into<String>,
2322            counts: impl IntoIterator<Item = Option<u64>>,
2323        ) -> Self {
2324            let col = Column::from_name(name.into());
2325
2326            // take stats out and update them
2327            let container_stats = self
2328                .stats
2329                .remove(&col)
2330                .unwrap_or_default()
2331                .with_row_counts(counts);
2332
2333            // put stats back in
2334            self.stats.insert(col, container_stats);
2335            self
2336        }
2337
2338        /// Add contained information for the specified column.
2339        fn with_contained(
2340            mut self,
2341            name: impl Into<String>,
2342            values: impl IntoIterator<Item = ScalarValue>,
2343            contained: impl IntoIterator<Item = Option<bool>>,
2344        ) -> Self {
2345            let col = Column::from_name(name.into());
2346
2347            // take stats out and update them
2348            let container_stats = self
2349                .stats
2350                .remove(&col)
2351                .unwrap_or_default()
2352                .with_contained(values, contained);
2353
2354            // put stats back in
2355            self.stats.insert(col, container_stats);
2356            self
2357        }
2358    }
2359
2360    impl PruningStatistics for TestStatistics {
2361        fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2362            self.stats
2363                .get(column)
2364                .map(|container_stats| container_stats.min())
2365                .unwrap_or(None)
2366        }
2367
2368        fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2369            self.stats
2370                .get(column)
2371                .map(|container_stats| container_stats.max())
2372                .unwrap_or(None)
2373        }
2374
2375        fn num_containers(&self) -> usize {
2376            self.stats
2377                .values()
2378                .next()
2379                .map(|container_stats| container_stats.len())
2380                .unwrap_or(0)
2381        }
2382
2383        fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2384            self.stats
2385                .get(column)
2386                .map(|container_stats| container_stats.null_counts())
2387                .unwrap_or(None)
2388        }
2389
2390        fn row_counts(&self) -> Option<ArrayRef> {
2391            self.stats
2392                .values()
2393                .find_map(|container_stats| container_stats.row_counts())
2394        }
2395
2396        fn contained(
2397            &self,
2398            column: &Column,
2399            values: &HashSet<ScalarValue>,
2400        ) -> Option<BooleanArray> {
2401            self.stats
2402                .get(column)
2403                .and_then(|container_stats| container_stats.contained(values))
2404        }
2405    }
2406
2407    /// Returns the specified min/max container values
2408    struct OneContainerStats {
2409        min_values: Option<ArrayRef>,
2410        max_values: Option<ArrayRef>,
2411        num_containers: usize,
2412    }
2413
2414    impl PruningStatistics for OneContainerStats {
2415        fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2416            self.min_values.clone()
2417        }
2418
2419        fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2420            self.max_values.clone()
2421        }
2422
2423        fn num_containers(&self) -> usize {
2424            self.num_containers
2425        }
2426
2427        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2428            None
2429        }
2430
2431        fn row_counts(&self) -> Option<ArrayRef> {
2432            None
2433        }
2434
2435        fn contained(
2436            &self,
2437            _column: &Column,
2438            _values: &HashSet<ScalarValue>,
2439        ) -> Option<BooleanArray> {
2440            None
2441        }
2442    }
2443
2444    /// Row count should only be referenced once in the pruning expression, even if we need the row count
2445    /// for multiple columns.
2446    #[test]
2447    fn test_unique_row_count_field_and_column() {
2448        // c1 = 100 AND c2 = 200
2449        let schema: SchemaRef = Arc::new(Schema::new(vec![
2450            Field::new("c1", DataType::Int32, true),
2451            Field::new("c2", DataType::Int32, true),
2452        ]));
2453        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2454        let expr = logical2physical(&expr, &schema);
2455        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2456        // note pruning expression refers to row_count twice
2457        assert_eq!(
2458            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2459            p.predicate_expr.to_string()
2460        );
2461
2462        // Fields in required schema should be unique, otherwise when creating batches
2463        // it will fail because of duplicate field names
2464        let mut fields = HashSet::new();
2465        for (_col, _ty, field) in p.required_columns().iter() {
2466            let was_new = fields.insert(field);
2467            if !was_new {
2468                panic!(
2469                    "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2470                );
2471            }
2472        }
2473    }
2474
2475    #[test]
2476    fn prune_all_rows_null_counts() {
2477        // if null_count = row_count then we should prune the container for i = 0
2478        // regardless of the statistics
2479        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2480        let statistics = TestStatistics::new().with(
2481            "i",
2482            ContainerStats::new_i32(
2483                vec![Some(0)], // min
2484                vec![Some(0)], // max
2485            )
2486            .with_null_counts(vec![Some(1)])
2487            .with_row_counts(vec![Some(1)]),
2488        );
2489        let expected_ret = &[false];
2490        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2491
2492        // this should be true even if the container stats are missing
2493        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2494        let container_stats = ContainerStats {
2495            min: Some(Arc::new(Int32Array::from(vec![None]))),
2496            max: Some(Arc::new(Int32Array::from(vec![None]))),
2497            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2498            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2499            ..ContainerStats::default()
2500        };
2501        let statistics = TestStatistics::new().with("i", container_stats);
2502        let expected_ret = &[false];
2503        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2504
2505        // If the null counts themselves are missing we should be able to fall back to the stats
2506        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2507        let container_stats = ContainerStats {
2508            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2509            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2510            null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2511            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2512            ..ContainerStats::default()
2513        };
2514        let statistics = TestStatistics::new().with("i", container_stats);
2515        let expected_ret = &[true];
2516        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2517        let expected_ret = &[false];
2518        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2519
2520        // Same for the row counts
2521        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2522        let container_stats = ContainerStats {
2523            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2524            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2525            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2526            row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2527            ..ContainerStats::default()
2528        };
2529        let statistics = TestStatistics::new().with("i", container_stats);
2530        let expected_ret = &[true];
2531        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2532        let expected_ret = &[false];
2533        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2534    }
2535
2536    #[test]
2537    fn prune_missing_statistics() {
2538        // If the min or max stats are missing we should not prune
2539        // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2540        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2541        let container_stats = ContainerStats {
2542            min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2543            max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2544            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2545            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2546            ..ContainerStats::default()
2547        };
2548        let statistics = TestStatistics::new().with("i", container_stats);
2549        let expected_ret = &[true, true];
2550        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2551        let expected_ret = &[false, true];
2552        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2553        let expected_ret = &[true, false];
2554        prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2555    }
2556
2557    #[test]
2558    fn prune_null_stats() {
2559        // if null_count = row_count then we should prune the container for i = 0
2560        // regardless of the statistics
2561        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2562
2563        let statistics = TestStatistics::new().with(
2564            "i",
2565            ContainerStats::new_i32(
2566                vec![Some(0)], // min
2567                vec![Some(0)], // max
2568            )
2569            .with_null_counts(vec![Some(1)])
2570            .with_row_counts(vec![Some(1)]),
2571        );
2572
2573        let expected_ret = &[false];
2574
2575        // i = 0
2576        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2577    }
2578
2579    #[test]
2580    fn test_build_statistics_record_batch() {
2581        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2582        let required_columns = RequiredColumns::from(vec![
2583            // min of original column s1, named s1_min
2584            (
2585                phys_expr::Column::new("s1", 1),
2586                StatisticsType::Min,
2587                Field::new("s1_min", DataType::Int32, true),
2588            ),
2589            // max of original column s2, named s2_max
2590            (
2591                phys_expr::Column::new("s2", 2),
2592                StatisticsType::Max,
2593                Field::new("s2_max", DataType::Int32, true),
2594            ),
2595            // max of original column s3, named s3_max
2596            (
2597                phys_expr::Column::new("s3", 3),
2598                StatisticsType::Max,
2599                Field::new("s3_max", DataType::Utf8, true),
2600            ),
2601            // min of original column s3, named s3_min
2602            (
2603                phys_expr::Column::new("s3", 3),
2604                StatisticsType::Min,
2605                Field::new("s3_min", DataType::Utf8, true),
2606            ),
2607        ]);
2608
2609        let statistics = TestStatistics::new()
2610            .with(
2611                "s1",
2612                ContainerStats::new_i32(
2613                    vec![None, None, Some(9), None],  // min
2614                    vec![Some(10), None, None, None], // max
2615                ),
2616            )
2617            .with(
2618                "s2",
2619                ContainerStats::new_i32(
2620                    vec![Some(2), None, None, None],  // min
2621                    vec![Some(20), None, None, None], // max
2622                ),
2623            )
2624            .with(
2625                "s3",
2626                ContainerStats::new_utf8(
2627                    vec![Some("a"), None, None, None],      // min
2628                    vec![Some("q"), None, Some("r"), None], // max
2629                ),
2630            );
2631
2632        let batch =
2633            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2634        assert_snapshot!(batches_to_string(&[batch]), @r"
2635        +--------+--------+--------+--------+
2636        | s1_min | s2_max | s3_max | s3_min |
2637        +--------+--------+--------+--------+
2638        |        | 20     | q      | a      |
2639        |        |        |        |        |
2640        | 9      |        | r      |        |
2641        |        |        |        |        |
2642        +--------+--------+--------+--------+
2643        ");
2644    }
2645
2646    #[test]
2647    fn test_build_statistics_casting() {
2648        // Test requesting a Timestamp column, but getting statistics as Int64
2649        // which is what Parquet does
2650
2651        // Request a record batch with of s1_min as a timestamp
2652        let required_columns = RequiredColumns::from(vec![(
2653            phys_expr::Column::new("s3", 3),
2654            StatisticsType::Min,
2655            Field::new(
2656                "s1_min",
2657                DataType::Timestamp(TimeUnit::Nanosecond, None),
2658                true,
2659            ),
2660        )]);
2661
2662        // Note the statistics pass back i64 (not timestamp)
2663        let statistics = OneContainerStats {
2664            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2665            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2666            num_containers: 1,
2667        };
2668
2669        let batch =
2670            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2671
2672        assert_snapshot!(batches_to_string(&[batch]), @r"
2673        +-------------------------------+
2674        | s1_min                        |
2675        +-------------------------------+
2676        | 1970-01-01T00:00:00.000000010 |
2677        +-------------------------------+
2678        ");
2679    }
2680
2681    #[test]
2682    fn test_build_statistics_no_required_stats() {
2683        let required_columns = RequiredColumns::new();
2684
2685        let statistics = OneContainerStats {
2686            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2687            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2688            num_containers: 1,
2689        };
2690
2691        let batch =
2692            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2693        assert_eq!(batch.num_rows(), 1); // had 1 container
2694    }
2695
2696    #[test]
2697    fn test_build_statistics_inconsistent_types() {
2698        // Test requesting a Utf8 column when the stats return some other type
2699
2700        // Request a record batch with of s1_min as a timestamp
2701        let required_columns = RequiredColumns::from(vec![(
2702            phys_expr::Column::new("s3", 3),
2703            StatisticsType::Min,
2704            Field::new("s1_min", DataType::Utf8, true),
2705        )]);
2706
2707        // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2708        let statistics = OneContainerStats {
2709            min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2710            max_values: None,
2711            num_containers: 1,
2712        };
2713
2714        let batch =
2715            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2716        assert_snapshot!(batches_to_string(&[batch]), @r"
2717        +--------+
2718        | s1_min |
2719        +--------+
2720        |        |
2721        +--------+
2722        ");
2723    }
2724
2725    #[test]
2726    fn test_build_statistics_inconsistent_length() {
2727        // return an inconsistent length to the actual statistics arrays
2728        let required_columns = RequiredColumns::from(vec![(
2729            phys_expr::Column::new("s1", 3),
2730            StatisticsType::Min,
2731            Field::new("s1_min", DataType::Int64, true),
2732        )]);
2733
2734        // Note the statistics pass back i64 (not timestamp)
2735        let statistics = OneContainerStats {
2736            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2737            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2738            num_containers: 3,
2739        };
2740
2741        let result =
2742            build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2743        assert!(
2744            result
2745                .to_string()
2746                .contains("mismatched statistics length. Expected 3, got 1"),
2747            "{}",
2748            result
2749        );
2750    }
2751
2752    #[test]
2753    fn row_group_predicate_eq() -> Result<()> {
2754        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2755        let expected_expr =
2756            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2757
2758        // test column on the left
2759        let expr = col("c1").eq(lit(1));
2760        let predicate_expr =
2761            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2762        assert_eq!(predicate_expr.to_string(), expected_expr);
2763
2764        // test column on the right
2765        let expr = lit(1).eq(col("c1"));
2766        let predicate_expr =
2767            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2768        assert_eq!(predicate_expr.to_string(), expected_expr);
2769
2770        Ok(())
2771    }
2772
2773    #[test]
2774    fn row_group_predicate_not_eq() -> Result<()> {
2775        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2776        let expected_expr =
2777            "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2778
2779        // test column on the left
2780        let expr = col("c1").not_eq(lit(1));
2781        let predicate_expr =
2782            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2783        assert_eq!(predicate_expr.to_string(), expected_expr);
2784
2785        // test column on the right
2786        let expr = lit(1).not_eq(col("c1"));
2787        let predicate_expr =
2788            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2789        assert_eq!(predicate_expr.to_string(), expected_expr);
2790
2791        Ok(())
2792    }
2793
2794    #[test]
2795    fn row_group_predicate_gt() -> Result<()> {
2796        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2797        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2798
2799        // test column on the left
2800        let expr = col("c1").gt(lit(1));
2801        let predicate_expr =
2802            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2803        assert_eq!(predicate_expr.to_string(), expected_expr);
2804
2805        // test column on the right
2806        let expr = lit(1).lt(col("c1"));
2807        let predicate_expr =
2808            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2809        assert_eq!(predicate_expr.to_string(), expected_expr);
2810
2811        Ok(())
2812    }
2813
2814    #[test]
2815    fn row_group_predicate_gt_eq() -> Result<()> {
2816        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2817        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2818
2819        // test column on the left
2820        let expr = col("c1").gt_eq(lit(1));
2821        let predicate_expr =
2822            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2823        assert_eq!(predicate_expr.to_string(), expected_expr);
2824        // test column on the right
2825        let expr = lit(1).lt_eq(col("c1"));
2826        let predicate_expr =
2827            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2828        assert_eq!(predicate_expr.to_string(), expected_expr);
2829
2830        Ok(())
2831    }
2832
2833    #[test]
2834    fn row_group_predicate_lt() -> Result<()> {
2835        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2836        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2837
2838        // test column on the left
2839        let expr = col("c1").lt(lit(1));
2840        let predicate_expr =
2841            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2842        assert_eq!(predicate_expr.to_string(), expected_expr);
2843
2844        // test column on the right
2845        let expr = lit(1).gt(col("c1"));
2846        let predicate_expr =
2847            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2848        assert_eq!(predicate_expr.to_string(), expected_expr);
2849
2850        Ok(())
2851    }
2852
2853    #[test]
2854    fn row_group_predicate_lt_eq() -> Result<()> {
2855        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2856        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2857
2858        // test column on the left
2859        let expr = col("c1").lt_eq(lit(1));
2860        let predicate_expr =
2861            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2862        assert_eq!(predicate_expr.to_string(), expected_expr);
2863        // test column on the right
2864        let expr = lit(1).gt_eq(col("c1"));
2865        let predicate_expr =
2866            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2867        assert_eq!(predicate_expr.to_string(), expected_expr);
2868
2869        Ok(())
2870    }
2871
2872    #[test]
2873    fn row_group_predicate_and() -> Result<()> {
2874        let schema = Schema::new(vec![
2875            Field::new("c1", DataType::Int32, false),
2876            Field::new("c2", DataType::Int32, false),
2877            Field::new("c3", DataType::Int32, false),
2878        ]);
2879        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2880        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2881        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2882        let predicate_expr =
2883            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2884        assert_eq!(predicate_expr.to_string(), expected_expr);
2885
2886        Ok(())
2887    }
2888
2889    #[test]
2890    fn row_group_predicate_or() -> Result<()> {
2891        let schema = Schema::new(vec![
2892            Field::new("c1", DataType::Int32, false),
2893            Field::new("c2", DataType::Int32, false),
2894        ]);
2895        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2896        let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2897        let expected_expr = "true";
2898        let predicate_expr =
2899            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2900        assert_eq!(predicate_expr.to_string(), expected_expr);
2901
2902        Ok(())
2903    }
2904
2905    #[test]
2906    fn row_group_predicate_not() -> Result<()> {
2907        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2908        let expected_expr = "true";
2909
2910        let expr = col("c1").not();
2911        let predicate_expr =
2912            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2913        assert_eq!(predicate_expr.to_string(), expected_expr);
2914
2915        Ok(())
2916    }
2917
2918    #[test]
2919    fn row_group_predicate_not_bool() -> Result<()> {
2920        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2921        let expected_expr = "NOT c1_min@0 AND c1_max@1";
2922
2923        let expr = col("c1").not();
2924        let predicate_expr =
2925            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2926        assert_eq!(predicate_expr.to_string(), expected_expr);
2927
2928        Ok(())
2929    }
2930
2931    #[test]
2932    fn row_group_predicate_bool() -> Result<()> {
2933        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2934        let expected_expr = "c1_min@0 OR c1_max@1";
2935
2936        let expr = col("c1");
2937        let predicate_expr =
2938            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2939        assert_eq!(predicate_expr.to_string(), expected_expr);
2940
2941        Ok(())
2942    }
2943
2944    /// Test that non-boolean literal expressions don't prune any containers and error gracefully by not pruning anything instead of e.g. panicking
2945    #[test]
2946    fn row_group_predicate_non_boolean() {
2947        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2948        let statistics = TestStatistics::new()
2949            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2950        let expected_ret = &[true];
2951        prune_with_expr(lit(1), &schema, &statistics, expected_ret);
2952    }
2953
2954    // Test that literal-to-literal comparisons are correctly evaluated.
2955    // When both sides are constants, the expression should be evaluated directly
2956    // and if it's false, all containers should be pruned.
2957    #[test]
2958    fn row_group_predicate_literal_false() {
2959        // lit(1) = lit(2) is always false, so all containers should be pruned
2960        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2961        let statistics = TestStatistics::new()
2962            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2963        let expected_ret = &[false];
2964        prune_with_simplified_expr(lit(1).eq(lit(2)), &schema, &statistics, expected_ret);
2965    }
2966
2967    /// Test nested/complex literal expression trees.
2968    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2969    #[test]
2970    fn row_group_predicate_literal_true() {
2971        // lit(1) = lit(1) is always true, so no containers should be pruned
2972        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2973        let statistics = TestStatistics::new()
2974            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2975        let expected_ret = &[true];
2976        prune_with_simplified_expr(lit(1).eq(lit(1)), &schema, &statistics, expected_ret);
2977    }
2978
2979    /// Test nested/complex literal expression trees.
2980    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2981    #[test]
2982    fn row_group_predicate_literal_null() {
2983        // lit(1) = null is always null, so no containers should be pruned
2984        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
2985        let statistics = TestStatistics::new()
2986            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
2987        let expected_ret = &[true];
2988        prune_with_simplified_expr(
2989            lit(1).eq(lit(ScalarValue::Null)),
2990            &schema,
2991            &statistics,
2992            expected_ret,
2993        );
2994    }
2995
2996    /// Test nested/complex literal expression trees.
2997    /// This is an integration test that PhysicalExprSimplifier + PruningPredicate work together as expected.
2998    #[test]
2999    fn row_group_predicate_complex_literals() {
3000        let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
3001        let statistics = TestStatistics::new()
3002            .with("c1", ContainerStats::new_i32(vec![Some(0)], vec![Some(10)]));
3003
3004        // (1 + 2) > 0 is always true
3005        prune_with_simplified_expr(
3006            (lit(1) + lit(2)).gt(lit(0)),
3007            &schema,
3008            &statistics,
3009            &[true],
3010        );
3011
3012        // (1 + 2) < 0 is always false
3013        prune_with_simplified_expr(
3014            (lit(1) + lit(2)).lt(lit(0)),
3015            &schema,
3016            &statistics,
3017            &[false],
3018        );
3019
3020        // Nested AND of literals: true AND false = false
3021        prune_with_simplified_expr(
3022            lit(true).and(lit(false)),
3023            &schema,
3024            &statistics,
3025            &[false],
3026        );
3027
3028        // Nested OR of literals: true OR false = true
3029        prune_with_simplified_expr(
3030            lit(true).or(lit(false)),
3031            &schema,
3032            &statistics,
3033            &[true],
3034        );
3035
3036        // Complex nested: (1 < 2) AND (3 > 1) = true AND true = true
3037        prune_with_simplified_expr(
3038            lit(1).lt(lit(2)).and(lit(3).gt(lit(1))),
3039            &schema,
3040            &statistics,
3041            &[true],
3042        );
3043
3044        // Complex nested: (1 > 2) OR (3 < 1) = false OR false = false
3045        prune_with_simplified_expr(
3046            lit(1).gt(lit(2)).or(lit(3).lt(lit(1))),
3047            &schema,
3048            &statistics,
3049            &[false],
3050        );
3051    }
3052
3053    /// Integration test demonstrating that a dynamic filter with replaced children as literals will be snapshotted, simplified and then pruned correctly.
3054    #[test]
3055    fn row_group_predicate_dynamic_filter_with_literals() {
3056        let schema = Arc::new(Schema::new(vec![
3057            Field::new("c1", DataType::Int32, true),
3058            Field::new("part", DataType::Utf8, true),
3059        ]));
3060        let statistics = TestStatistics::new()
3061            // Note that we have no stats, pruning can only happen via partition value pruning from the dynamic filter
3062            .with_row_counts("c1", vec![Some(10)]);
3063        let dynamic_filter_expr = col("c1").gt(lit(5)).and(col("part").eq(lit("B")));
3064        let phys_expr = logical2physical(&dynamic_filter_expr, &schema);
3065        let children = collect_columns(&phys_expr)
3066            .iter()
3067            .map(|c| Arc::new(c.clone()) as Arc<dyn PhysicalExpr>)
3068            .collect_vec();
3069        let dynamic_phys_expr =
3070            Arc::new(DynamicFilterPhysicalExpr::new(children, phys_expr))
3071                as Arc<dyn PhysicalExpr>;
3072        // Simulate the partition value substitution that would happen in ParquetOpener
3073        let remapped_expr = dynamic_phys_expr
3074            .children()
3075            .into_iter()
3076            .map(|child_expr| {
3077                let Some(col_expr) = child_expr.downcast_ref::<phys_expr::Column>()
3078                else {
3079                    return Arc::clone(child_expr);
3080                };
3081                if col_expr.name() == "part" {
3082                    // simulate dynamic filter replacement with literal "A"
3083                    Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
3084                        "A".to_string(),
3085                    )))) as Arc<dyn PhysicalExpr>
3086                } else {
3087                    Arc::clone(child_expr)
3088                }
3089            })
3090            .collect_vec();
3091        let dynamic_filter_expr =
3092            dynamic_phys_expr.with_new_children(remapped_expr).unwrap();
3093        // After substitution the expression is c1 > 5 AND part = "B" which should prune the file since the partition value is "A"
3094        let expected = &[false];
3095        let p =
3096            PruningPredicate::try_new(dynamic_filter_expr, Arc::clone(&schema)).unwrap();
3097        let result = p.prune(&statistics).unwrap();
3098        assert_eq!(result, expected);
3099    }
3100
3101    #[test]
3102    fn row_group_predicate_lt_bool() -> Result<()> {
3103        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
3104        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
3105
3106        // DF doesn't support arithmetic on boolean columns so
3107        // this predicate will error when evaluated
3108        let expr = col("c1").lt(lit(true));
3109        let predicate_expr =
3110            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3111        assert_eq!(predicate_expr.to_string(), expected_expr);
3112
3113        Ok(())
3114    }
3115
3116    #[test]
3117    fn row_group_predicate_required_columns() -> Result<()> {
3118        let schema = Schema::new(vec![
3119            Field::new("c1", DataType::Int32, false),
3120            Field::new("c2", DataType::Int32, false),
3121        ]);
3122        let mut required_columns = RequiredColumns::new();
3123        // c1 < 1 and (c2 = 2 or c2 = 3)
3124        let expr = col("c1")
3125            .lt(lit(1))
3126            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
3127        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
3128        let predicate_expr =
3129            test_build_predicate_expression(&expr, &schema, &mut required_columns);
3130        assert_eq!(predicate_expr.to_string(), expected_expr);
3131        println!("required_columns: {required_columns:#?}"); // for debugging assertions below
3132        // c1 < 1 should add c1_min
3133        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
3134        assert_eq!(
3135            required_columns.columns[0],
3136            (
3137                phys_expr::Column::new("c1", 0),
3138                StatisticsType::Min,
3139                c1_min_field.with_nullable(true) // could be nullable if stats are not present
3140            )
3141        );
3142        // c1 < 1 should add c1_null_count
3143        let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
3144        assert_eq!(
3145            required_columns.columns[1],
3146            (
3147                phys_expr::Column::new("c1", 0),
3148                StatisticsType::NullCount,
3149                c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
3150            )
3151        );
3152        // c1 < 1 should add row_count
3153        let row_count_field = Field::new("row_count", DataType::UInt64, false);
3154        assert_eq!(
3155            required_columns.columns[2],
3156            (
3157                phys_expr::Column::new("c1", 0),
3158                StatisticsType::RowCount,
3159                row_count_field.with_nullable(true) // could be nullable if stats are not present
3160            )
3161        );
3162        // c2 = 2 should add c2_min and c2_max
3163        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
3164        assert_eq!(
3165            required_columns.columns[3],
3166            (
3167                phys_expr::Column::new("c2", 1),
3168                StatisticsType::Min,
3169                c2_min_field.with_nullable(true) // could be nullable if stats are not present
3170            )
3171        );
3172        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
3173        assert_eq!(
3174            required_columns.columns[4],
3175            (
3176                phys_expr::Column::new("c2", 1),
3177                StatisticsType::Max,
3178                c2_max_field.with_nullable(true) // could be nullable if stats are not present
3179            )
3180        );
3181        // c2 = 2 should add c2_null_count
3182        let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
3183        assert_eq!(
3184            required_columns.columns[5],
3185            (
3186                phys_expr::Column::new("c2", 1),
3187                StatisticsType::NullCount,
3188                c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
3189            )
3190        );
3191        // c2 = 1 should add row_count
3192        let row_count_field = Field::new("row_count", DataType::UInt64, false);
3193        assert_eq!(
3194            required_columns.columns[2],
3195            (
3196                phys_expr::Column::new("c1", 0),
3197                StatisticsType::RowCount,
3198                row_count_field.with_nullable(true) // could be nullable if stats are not present
3199            )
3200        );
3201        // c2 = 3 shouldn't add any new statistics fields
3202        assert_eq!(required_columns.columns.len(), 6);
3203
3204        Ok(())
3205    }
3206
3207    #[test]
3208    fn row_group_predicate_in_list() -> Result<()> {
3209        let schema = Schema::new(vec![
3210            Field::new("c1", DataType::Int32, false),
3211            Field::new("c2", DataType::Int32, false),
3212        ]);
3213        // test c1 in(1, 2, 3)
3214        let expr = Expr::InList(InList::new(
3215            Box::new(col("c1")),
3216            vec![lit(1), lit(2), lit(3)],
3217            false,
3218        ));
3219        let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
3220        let predicate_expr =
3221            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3222        assert_eq!(predicate_expr.to_string(), expected_expr);
3223
3224        Ok(())
3225    }
3226
3227    #[test]
3228    fn row_group_predicate_in_list_empty() -> Result<()> {
3229        let schema = Schema::new(vec![
3230            Field::new("c1", DataType::Int32, false),
3231            Field::new("c2", DataType::Int32, false),
3232        ]);
3233        // test c1 in()
3234        let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
3235        let expected_expr = "true";
3236        let predicate_expr =
3237            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3238        assert_eq!(predicate_expr.to_string(), expected_expr);
3239
3240        Ok(())
3241    }
3242
3243    #[test]
3244    fn row_group_predicate_in_list_negated() -> Result<()> {
3245        let schema = Schema::new(vec![
3246            Field::new("c1", DataType::Int32, false),
3247            Field::new("c2", DataType::Int32, false),
3248        ]);
3249        // test c1 not in(1, 2, 3)
3250        let expr = Expr::InList(InList::new(
3251            Box::new(col("c1")),
3252            vec![lit(1), lit(2), lit(3)],
3253            true,
3254        ));
3255        let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
3256        let predicate_expr =
3257            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3258        assert_eq!(predicate_expr.to_string(), expected_expr);
3259
3260        Ok(())
3261    }
3262
3263    #[test]
3264    fn row_group_predicate_between() -> Result<()> {
3265        let schema = Schema::new(vec![
3266            Field::new("c1", DataType::Int32, false),
3267            Field::new("c2", DataType::Int32, false),
3268        ]);
3269
3270        // test c1 BETWEEN 1 AND 5
3271        let expr1 = col("c1").between(lit(1), lit(5));
3272
3273        // test 1 <= c1 <= 5
3274        let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
3275
3276        let predicate_expr1 =
3277            test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
3278
3279        let predicate_expr2 =
3280            test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
3281        assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
3282
3283        Ok(())
3284    }
3285
3286    #[test]
3287    fn row_group_predicate_between_with_in_list() -> Result<()> {
3288        let schema = Schema::new(vec![
3289            Field::new("c1", DataType::Int32, false),
3290            Field::new("c2", DataType::Int32, false),
3291        ]);
3292        // test c1 in(1, 2)
3293        let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
3294
3295        // test c2 BETWEEN 4 AND 5
3296        let expr2 = col("c2").between(lit(4), lit(5));
3297
3298        // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
3299        let expr3 = expr1.and(expr2);
3300
3301        let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
3302        let predicate_expr =
3303            test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
3304        assert_eq!(predicate_expr.to_string(), expected_expr);
3305
3306        Ok(())
3307    }
3308
3309    #[test]
3310    fn row_group_predicate_in_list_to_many_values() -> Result<()> {
3311        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3312        // test c1 in(1..21)
3313        // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
3314        // always true
3315        let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
3316
3317        let expected_expr = "true";
3318        let predicate_expr =
3319            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3320        assert_eq!(predicate_expr.to_string(), expected_expr);
3321
3322        Ok(())
3323    }
3324
3325    #[test]
3326    fn row_group_predicate_cast_int_int() -> Result<()> {
3327        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3328        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3329
3330        // test cast(c1 as int64) = 1
3331        // test column on the left
3332        let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3333        let predicate_expr =
3334            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3335        assert_eq!(predicate_expr.to_string(), expected_expr);
3336
3337        // test column on the right
3338        let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3339        let predicate_expr =
3340            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3341        assert_eq!(predicate_expr.to_string(), expected_expr);
3342
3343        let expected_expr =
3344            "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3345
3346        // test column on the left
3347        let expr =
3348            try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3349        let predicate_expr =
3350            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3351        assert_eq!(predicate_expr.to_string(), expected_expr);
3352
3353        // test column on the right
3354        let expr =
3355            lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3356        let predicate_expr =
3357            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3358        assert_eq!(predicate_expr.to_string(), expected_expr);
3359
3360        Ok(())
3361    }
3362
3363    #[test]
3364    fn row_group_predicate_cast_string_string() -> Result<()> {
3365        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3366        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3367
3368        // test column on the left
3369        let expr = cast(col("c1"), DataType::Utf8)
3370            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3371        let predicate_expr =
3372            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3373        assert_eq!(predicate_expr.to_string(), expected_expr);
3374
3375        // test column on the right
3376        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3377            .eq(cast(col("c1"), DataType::Utf8));
3378        let predicate_expr =
3379            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3380        assert_eq!(predicate_expr.to_string(), expected_expr);
3381
3382        Ok(())
3383    }
3384
3385    #[test]
3386    fn row_group_predicate_cast_string_int() -> Result<()> {
3387        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3388        let expected_expr = "true";
3389
3390        // test column on the left
3391        let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3392        let predicate_expr =
3393            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3394        assert_eq!(predicate_expr.to_string(), expected_expr);
3395
3396        // test column on the right
3397        let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3398        let predicate_expr =
3399            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3400        assert_eq!(predicate_expr.to_string(), expected_expr);
3401
3402        Ok(())
3403    }
3404
3405    #[test]
3406    fn row_group_predicate_cast_int_string() -> Result<()> {
3407        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3408        let expected_expr = "true";
3409
3410        // test column on the left
3411        let expr = cast(col("c1"), DataType::Utf8)
3412            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3413        let predicate_expr =
3414            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3415        assert_eq!(predicate_expr.to_string(), expected_expr);
3416
3417        // test column on the right
3418        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3419            .eq(cast(col("c1"), DataType::Utf8));
3420        let predicate_expr =
3421            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3422        assert_eq!(predicate_expr.to_string(), expected_expr);
3423
3424        Ok(())
3425    }
3426
3427    #[test]
3428    fn row_group_predicate_date_date() -> Result<()> {
3429        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3430        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3431
3432        // test column on the left
3433        let expr =
3434            cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3435        let predicate_expr =
3436            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3437        assert_eq!(predicate_expr.to_string(), expected_expr);
3438
3439        // test column on the right
3440        let expr =
3441            lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3442        let predicate_expr =
3443            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3444        assert_eq!(predicate_expr.to_string(), expected_expr);
3445
3446        Ok(())
3447    }
3448
3449    #[test]
3450    fn row_group_predicate_dict_string_date() -> Result<()> {
3451        // Test with Dictionary<UInt8, Utf8> for the literal
3452        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3453        let expected_expr = "true";
3454
3455        // test column on the left
3456        let expr = cast(
3457            col("c1"),
3458            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3459        )
3460        .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3461        let predicate_expr =
3462            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3463        assert_eq!(predicate_expr.to_string(), expected_expr);
3464
3465        // test column on the right
3466        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3467            col("c1"),
3468            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3469        ));
3470        let predicate_expr =
3471            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3472        assert_eq!(predicate_expr.to_string(), expected_expr);
3473
3474        Ok(())
3475    }
3476
3477    #[test]
3478    fn row_group_predicate_date_dict_string() -> Result<()> {
3479        // Test with Dictionary<UInt8, Utf8> for the column
3480        let schema = Schema::new(vec![Field::new(
3481            "c1",
3482            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3483            false,
3484        )]);
3485        let expected_expr = "true";
3486
3487        // test column on the left
3488        let expr =
3489            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3490        let predicate_expr =
3491            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3492        assert_eq!(predicate_expr.to_string(), expected_expr);
3493
3494        // test column on the right
3495        let expr =
3496            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3497        let predicate_expr =
3498            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3499        assert_eq!(predicate_expr.to_string(), expected_expr);
3500
3501        Ok(())
3502    }
3503
3504    #[test]
3505    fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3506        // Test with Dictionary types that have the same value type but different key types
3507        let schema = Schema::new(vec![Field::new(
3508            "c1",
3509            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3510            false,
3511        )]);
3512
3513        // Direct comparison with no cast
3514        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3515        let predicate_expr =
3516            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3517        let expected_expr =
3518            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3519        assert_eq!(predicate_expr.to_string(), expected_expr);
3520
3521        // Test with column cast to a dictionary with different key type
3522        let expr = cast(
3523            col("c1"),
3524            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3525        )
3526        .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3527        let predicate_expr =
3528            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3529        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3530        assert_eq!(predicate_expr.to_string(), expected_expr);
3531
3532        Ok(())
3533    }
3534
3535    #[test]
3536    fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3537        // Test with Dictionary types that have different value types
3538        let schema = Schema::new(vec![Field::new(
3539            "c1",
3540            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3541            false,
3542        )]);
3543        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3544
3545        // Test with literal of a different type
3546        let expr =
3547            cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3548        let predicate_expr =
3549            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3550        assert_eq!(predicate_expr.to_string(), expected_expr);
3551
3552        Ok(())
3553    }
3554
3555    #[test]
3556    fn row_group_predicate_nested_dict() -> Result<()> {
3557        // Test with nested Dictionary types
3558        let schema = Schema::new(vec![Field::new(
3559            "c1",
3560            DataType::Dictionary(
3561                Box::new(DataType::UInt8),
3562                Box::new(DataType::Dictionary(
3563                    Box::new(DataType::UInt16),
3564                    Box::new(DataType::Utf8),
3565                )),
3566            ),
3567            false,
3568        )]);
3569        let expected_expr =
3570            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3571
3572        // Test with a simple literal
3573        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3574        let predicate_expr =
3575            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3576        assert_eq!(predicate_expr.to_string(), expected_expr);
3577
3578        Ok(())
3579    }
3580
3581    #[test]
3582    fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3583        // Test with dictionary-wrapped date types for both sides
3584        let schema = Schema::new(vec![Field::new(
3585            "c1",
3586            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3587            false,
3588        )]);
3589        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3590
3591        // Test with a cast to a different date type
3592        let expr = cast(
3593            col("c1"),
3594            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3595        )
3596        .eq(lit(ScalarValue::Date64(Some(123))));
3597        let predicate_expr =
3598            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3599        assert_eq!(predicate_expr.to_string(), expected_expr);
3600
3601        Ok(())
3602    }
3603
3604    #[test]
3605    fn row_group_predicate_date_string() -> Result<()> {
3606        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3607        let expected_expr = "true";
3608
3609        // test column on the left
3610        let expr =
3611            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3612        let predicate_expr =
3613            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3614        assert_eq!(predicate_expr.to_string(), expected_expr);
3615
3616        // test column on the right
3617        let expr =
3618            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3619        let predicate_expr =
3620            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3621        assert_eq!(predicate_expr.to_string(), expected_expr);
3622
3623        Ok(())
3624    }
3625
3626    #[test]
3627    fn row_group_predicate_string_date() -> Result<()> {
3628        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3629        let expected_expr = "true";
3630
3631        // test column on the left
3632        let expr = cast(col("c1"), DataType::Utf8)
3633            .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3634        let predicate_expr =
3635            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3636        assert_eq!(predicate_expr.to_string(), expected_expr);
3637
3638        // test column on the right
3639        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3640            .eq(cast(col("c1"), DataType::Utf8));
3641        let predicate_expr =
3642            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3643        assert_eq!(predicate_expr.to_string(), expected_expr);
3644
3645        Ok(())
3646    }
3647
3648    #[test]
3649    fn row_group_predicate_cast_list() -> Result<()> {
3650        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3651        // test cast(c1 as int64) in int64(1, 2, 3)
3652        let expr = Expr::InList(InList::new(
3653            Box::new(cast(col("c1"), DataType::Int64)),
3654            vec![
3655                lit(ScalarValue::Int64(Some(1))),
3656                lit(ScalarValue::Int64(Some(2))),
3657                lit(ScalarValue::Int64(Some(3))),
3658            ],
3659            false,
3660        ));
3661        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3662        let predicate_expr =
3663            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3664        assert_eq!(predicate_expr.to_string(), expected_expr);
3665
3666        let expr = Expr::InList(InList::new(
3667            Box::new(cast(col("c1"), DataType::Int64)),
3668            vec![
3669                lit(ScalarValue::Int64(Some(1))),
3670                lit(ScalarValue::Int64(Some(2))),
3671                lit(ScalarValue::Int64(Some(3))),
3672            ],
3673            true,
3674        ));
3675        let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3676        let predicate_expr =
3677            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3678        assert_eq!(predicate_expr.to_string(), expected_expr);
3679
3680        Ok(())
3681    }
3682
3683    #[test]
3684    fn prune_decimal_data() {
3685        // decimal(9,2)
3686        let schema = Arc::new(Schema::new(vec![Field::new(
3687            "s1",
3688            DataType::Decimal128(9, 2),
3689            true,
3690        )]));
3691
3692        prune_with_expr(
3693            // s1 > 5
3694            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3695            &schema,
3696            // If the data is written by spark, the physical data type is INT32 in the parquet
3697            // So we use the INT32 type of statistic.
3698            &TestStatistics::new().with(
3699                "s1",
3700                ContainerStats::new_i32(
3701                    vec![Some(0), Some(4), None, Some(3)], // min
3702                    vec![Some(5), Some(6), Some(4), None], // max
3703                ),
3704            ),
3705            &[false, true, false, true],
3706        );
3707
3708        prune_with_expr(
3709            // with cast column to other type
3710            cast(col("s1"), DataType::Decimal128(14, 3))
3711                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3712            &schema,
3713            &TestStatistics::new().with(
3714                "s1",
3715                ContainerStats::new_i32(
3716                    vec![Some(0), Some(4), None, Some(3)], // min
3717                    vec![Some(5), Some(6), Some(4), None], // max
3718                ),
3719            ),
3720            &[false, true, false, true],
3721        );
3722
3723        prune_with_expr(
3724            // with try cast column to other type
3725            try_cast(col("s1"), DataType::Decimal128(14, 3))
3726                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3727            &schema,
3728            &TestStatistics::new().with(
3729                "s1",
3730                ContainerStats::new_i32(
3731                    vec![Some(0), Some(4), None, Some(3)], // min
3732                    vec![Some(5), Some(6), Some(4), None], // max
3733                ),
3734            ),
3735            &[false, true, false, true],
3736        );
3737
3738        // decimal(18,2)
3739        let schema = Arc::new(Schema::new(vec![Field::new(
3740            "s1",
3741            DataType::Decimal128(18, 2),
3742            true,
3743        )]));
3744        prune_with_expr(
3745            // s1 > 5
3746            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3747            &schema,
3748            // If the data is written by spark, the physical data type is INT64 in the parquet
3749            // So we use the INT32 type of statistic.
3750            &TestStatistics::new().with(
3751                "s1",
3752                ContainerStats::new_i64(
3753                    vec![Some(0), Some(4), None, Some(3)], // min
3754                    vec![Some(5), Some(6), Some(4), None], // max
3755                ),
3756            ),
3757            &[false, true, false, true],
3758        );
3759
3760        // decimal(23,2)
3761        let schema = Arc::new(Schema::new(vec![Field::new(
3762            "s1",
3763            DataType::Decimal128(23, 2),
3764            true,
3765        )]));
3766
3767        prune_with_expr(
3768            // s1 > 5
3769            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3770            &schema,
3771            &TestStatistics::new().with(
3772                "s1",
3773                ContainerStats::new_decimal128(
3774                    vec![Some(0), Some(400), None, Some(300)], // min
3775                    vec![Some(500), Some(600), Some(400), None], // max
3776                    23,
3777                    2,
3778                ),
3779            ),
3780            &[false, true, false, true],
3781        );
3782    }
3783
3784    #[test]
3785    fn prune_api() {
3786        let schema = Arc::new(Schema::new(vec![
3787            Field::new("s1", DataType::Utf8, true),
3788            Field::new("s2", DataType::Int32, true),
3789        ]));
3790
3791        let statistics = TestStatistics::new().with(
3792            "s2",
3793            ContainerStats::new_i32(
3794                vec![Some(0), Some(4), None, Some(3)], // min
3795                vec![Some(5), Some(6), None, None],    // max
3796            ),
3797        );
3798        prune_with_expr(
3799            // Prune using s2 > 5
3800            col("s2").gt(lit(5)),
3801            &schema,
3802            &statistics,
3803            // s2 [0, 5] ==> no rows should pass
3804            // s2 [4, 6] ==> some rows could pass
3805            // No stats for s2 ==> some rows could pass
3806            // s2 [3, None] (null max) ==> some rows could pass
3807            &[false, true, true, true],
3808        );
3809
3810        prune_with_expr(
3811            // filter with cast
3812            cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3813            &schema,
3814            &statistics,
3815            &[false, true, true, true],
3816        );
3817    }
3818
3819    #[test]
3820    fn prune_not_eq_data() {
3821        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3822
3823        prune_with_expr(
3824            // Prune using s2 != 'M'
3825            col("s1").not_eq(lit("M")),
3826            &schema,
3827            &TestStatistics::new().with(
3828                "s1",
3829                ContainerStats::new_utf8(
3830                    vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3831                    vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3832                ),
3833            ),
3834            // s1 [A, Z] ==> might have values that pass predicate
3835            // s1 [A, L] ==> all rows pass the predicate
3836            // s1 [N, Z] ==> all rows pass the predicate
3837            // s1 [M, M] ==> all rows do not pass the predicate
3838            // No stats for s2 ==> some rows could pass
3839            // s2 [3, None] (null max) ==> some rows could pass
3840            &[true, true, true, false, true, true],
3841        );
3842    }
3843
3844    /// Creates setup for boolean chunk pruning
3845    ///
3846    /// For predicate "b1" (boolean expr)
3847    /// b1 [false, false] ==> no rows can pass (not keep)
3848    /// b1 [false, true] ==> some rows could pass (must keep)
3849    /// b1 [true, true] ==> all rows must pass (must keep)
3850    /// b1 [NULL, NULL]  ==> unknown (must keep)
3851    /// b1 [false, NULL]  ==> unknown (must keep)
3852    ///
3853    /// For predicate "!b1" (boolean expr)
3854    /// b1 [false, false] ==> all rows pass (must keep)
3855    /// b1 [false, true] ==> some rows could pass (must keep)
3856    /// b1 [true, true] ==> no rows can pass (not keep)
3857    /// b1 [NULL, NULL]  ==> unknown (must keep)
3858    /// b1 [false, NULL]  ==> unknown (must keep)
3859    fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3860        let schema =
3861            Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3862
3863        let statistics = TestStatistics::new().with(
3864            "b1",
3865            ContainerStats::new_bool(
3866                vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3867                vec![Some(false), Some(true), Some(true), None, None],         // max
3868            ),
3869        );
3870        let expected_true = vec![false, true, true, true, true];
3871        let expected_false = vec![true, true, false, true, true];
3872
3873        (schema, statistics, expected_true, expected_false)
3874    }
3875
3876    #[test]
3877    fn prune_bool_const_expr() {
3878        let (schema, statistics, _, _) = bool_setup();
3879
3880        prune_with_expr(
3881            // true
3882            lit(true),
3883            &schema,
3884            &statistics,
3885            &[true, true, true, true, true],
3886        );
3887
3888        prune_with_expr(
3889            // false
3890            lit(false),
3891            &schema,
3892            &statistics,
3893            &[false, false, false, false, false],
3894        );
3895    }
3896
3897    #[test]
3898    fn prune_bool_column() {
3899        let (schema, statistics, expected_true, _) = bool_setup();
3900
3901        prune_with_expr(
3902            // b1
3903            col("b1"),
3904            &schema,
3905            &statistics,
3906            &expected_true,
3907        );
3908    }
3909
3910    #[test]
3911    fn prune_bool_not_column() {
3912        let (schema, statistics, _, expected_false) = bool_setup();
3913
3914        prune_with_expr(
3915            // !b1
3916            col("b1").not(),
3917            &schema,
3918            &statistics,
3919            &expected_false,
3920        );
3921    }
3922
3923    #[test]
3924    fn prune_bool_column_eq_true() {
3925        let (schema, statistics, expected_true, _) = bool_setup();
3926
3927        prune_with_expr(
3928            // b1 = true
3929            col("b1").eq(lit(true)),
3930            &schema,
3931            &statistics,
3932            &expected_true,
3933        );
3934    }
3935
3936    #[test]
3937    fn prune_bool_not_column_eq_true() {
3938        let (schema, statistics, _, expected_false) = bool_setup();
3939
3940        prune_with_expr(
3941            // !b1 = true
3942            col("b1").not().eq(lit(true)),
3943            &schema,
3944            &statistics,
3945            &expected_false,
3946        );
3947    }
3948
3949    /// Creates a setup for chunk pruning, modeling a int32 column "i"
3950    /// with 5 different containers (e.g. RowGroups). They have [min,
3951    /// max]:
3952    ///
3953    /// i [-5, 5]
3954    /// i [1, 11]
3955    /// i [-11, -1]
3956    /// i [NULL, NULL]
3957    /// i [1, NULL]
3958    fn int32_setup() -> (SchemaRef, TestStatistics) {
3959        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3960
3961        let statistics = TestStatistics::new().with(
3962            "i",
3963            ContainerStats::new_i32(
3964                vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3965                vec![Some(5), Some(11), Some(-1), None, None],     // max
3966            ),
3967        );
3968        (schema, statistics)
3969    }
3970
3971    #[test]
3972    fn prune_int32_col_gt_zero() {
3973        let (schema, statistics) = int32_setup();
3974
3975        // Expression "i > 0" and "-i < 0"
3976        // i [-5, 5] ==> some rows could pass (must keep)
3977        // i [1, 11] ==> all rows must pass (must keep)
3978        // i [-11, -1] ==>  no rows can pass (not keep)
3979        // i [NULL, NULL]  ==> unknown (must keep)
3980        // i [1, NULL]  ==> unknown (must keep)
3981        let expected_ret = &[true, true, false, true, true];
3982
3983        // i > 0
3984        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3985
3986        // -i < 0
3987        prune_with_expr(
3988            Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3989            &schema,
3990            &statistics,
3991            expected_ret,
3992        );
3993    }
3994
3995    #[test]
3996    fn prune_int32_col_lte_zero() {
3997        let (schema, statistics) = int32_setup();
3998
3999        // Expression "i <= 0" and "-i >= 0"
4000        // i [-5, 5] ==> some rows could pass (must keep)
4001        // i [1, 11] ==> no rows can pass (not keep)
4002        // i [-11, -1] ==>  all rows must pass (must keep)
4003        // i [NULL, NULL]  ==> unknown (must keep)
4004        // i [1, NULL]  ==> no rows can pass (not keep)
4005        let expected_ret = &[true, false, true, true, false];
4006
4007        prune_with_expr(
4008            // i <= 0
4009            col("i").lt_eq(lit(0)),
4010            &schema,
4011            &statistics,
4012            expected_ret,
4013        );
4014
4015        prune_with_expr(
4016            // -i >= 0
4017            Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
4018            &schema,
4019            &statistics,
4020            expected_ret,
4021        );
4022    }
4023
4024    #[test]
4025    fn prune_int32_col_lte_zero_cast() {
4026        let (schema, statistics) = int32_setup();
4027
4028        // Expression "cast(i as utf8) <= '0'"
4029        // i [-5, 5] ==> some rows could pass (must keep)
4030        // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
4031        // i [-11, -1] ==>  no rows could pass in theory (conservatively keep)
4032        // i [NULL, NULL]  ==> unknown (must keep)
4033        // i [1, NULL]  ==> no rows can pass (conservatively keep)
4034        let expected_ret = &[true, true, true, true, true];
4035
4036        prune_with_expr(
4037            // cast(i as utf8) <= 0
4038            cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
4039            &schema,
4040            &statistics,
4041            expected_ret,
4042        );
4043
4044        prune_with_expr(
4045            // try_cast(i as utf8) <= 0
4046            try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
4047            &schema,
4048            &statistics,
4049            expected_ret,
4050        );
4051
4052        prune_with_expr(
4053            // cast(-i as utf8) >= 0
4054            cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
4055            &schema,
4056            &statistics,
4057            expected_ret,
4058        );
4059
4060        prune_with_expr(
4061            // try_cast(-i as utf8) >= 0
4062            try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
4063            &schema,
4064            &statistics,
4065            expected_ret,
4066        );
4067    }
4068
4069    #[test]
4070    fn prune_int32_col_eq_zero() {
4071        let (schema, statistics) = int32_setup();
4072
4073        // Expression "i = 0"
4074        // i [-5, 5] ==> some rows could pass (must keep)
4075        // i [1, 11] ==> no rows can pass (not keep)
4076        // i [-11, -1] ==>  no rows can pass (not keep)
4077        // i [NULL, NULL]  ==> unknown (must keep)
4078        // i [1, NULL]  ==> no rows can pass (not keep)
4079        let expected_ret = &[true, false, false, true, false];
4080
4081        prune_with_expr(
4082            // i = 0
4083            col("i").eq(lit(0)),
4084            &schema,
4085            &statistics,
4086            expected_ret,
4087        );
4088    }
4089
4090    #[test]
4091    fn prune_int32_col_is_not_distinct_from() {
4092        let (schema, statistics) = int32_setup();
4093
4094        // Without null counts, IS NOT DISTINCT FROM a non-null literal can
4095        // still use min/max ranges, but unknown all-null containers must be kept.
4096        let expected_ret = &[true, false, false, true, false];
4097
4098        prune_with_expr(
4099            is_not_distinct_from(col("i"), lit(0)),
4100            &schema,
4101            &statistics,
4102            expected_ret,
4103        );
4104
4105        // The operator is symmetric, so the scalar-left form should prune the
4106        // same row groups.
4107        prune_with_expr(
4108            is_not_distinct_from(lit(0), col("i")),
4109            &schema,
4110            &statistics,
4111            expected_ret,
4112        );
4113
4114        let statistics = statistics
4115            .with_row_counts("i", vec![Some(10), Some(9), None, Some(4), Some(10)])
4116            .with_null_counts("i", vec![Some(0), Some(1), None, Some(4), Some(0)]);
4117
4118        let expected_ret = &[true, false, false, false, false];
4119        prune_with_expr(
4120            is_not_distinct_from(col("i"), lit(0)),
4121            &schema,
4122            &statistics,
4123            expected_ret,
4124        );
4125
4126        let expected_ret = &[false, true, true, true, false];
4127        prune_with_expr(
4128            is_not_distinct_from(col("i"), lit(ScalarValue::Int32(None))),
4129            &schema,
4130            &statistics,
4131            expected_ret,
4132        );
4133    }
4134
4135    #[test]
4136    fn prune_int32_col_is_distinct_from() {
4137        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
4138        let statistics = TestStatistics::new().with(
4139            "i",
4140            ContainerStats::new_i32(
4141                vec![Some(0), Some(0), Some(5), None],
4142                vec![Some(0), Some(2), Some(5), None],
4143            )
4144            .with_row_counts(vec![Some(2), Some(2), Some(2), Some(2)])
4145            .with_null_counts(vec![Some(0), Some(0), Some(0), Some(2)]),
4146        );
4147
4148        let expected_ret = &[false, true, true, true];
4149        prune_with_expr(
4150            is_distinct_from(col("i"), lit(0)),
4151            &schema,
4152            &statistics,
4153            expected_ret,
4154        );
4155
4156        // The operator is symmetric, so the scalar-left form should prune the
4157        // same row groups.
4158        prune_with_expr(
4159            is_distinct_from(lit(0), col("i")),
4160            &schema,
4161            &statistics,
4162            expected_ret,
4163        );
4164
4165        let expected_ret = &[true, true, true, false];
4166        prune_with_expr(
4167            is_distinct_from(col("i"), lit(ScalarValue::Int32(None))),
4168            &schema,
4169            &statistics,
4170            expected_ret,
4171        );
4172    }
4173
4174    #[test]
4175    fn prune_int32_col_eq_zero_cast() {
4176        let (schema, statistics) = int32_setup();
4177
4178        // Expression "cast(i as int64) = 0"
4179        // i [-5, 5] ==> some rows could pass (must keep)
4180        // i [1, 11] ==> no rows can pass (not keep)
4181        // i [-11, -1] ==>  no rows can pass (not keep)
4182        // i [NULL, NULL]  ==> unknown (must keep)
4183        // i [1, NULL]  ==> no rows can pass (not keep)
4184        let expected_ret = &[true, false, false, true, false];
4185
4186        prune_with_expr(
4187            cast(col("i"), DataType::Int64).eq(lit(0i64)),
4188            &schema,
4189            &statistics,
4190            expected_ret,
4191        );
4192
4193        prune_with_expr(
4194            try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
4195            &schema,
4196            &statistics,
4197            expected_ret,
4198        );
4199    }
4200
4201    #[test]
4202    fn prune_int32_col_eq_zero_cast_as_str() {
4203        let (schema, statistics) = int32_setup();
4204
4205        // Note the cast is to a string where sorting properties are
4206        // not the same as integers
4207        //
4208        // Expression "cast(i as utf8) = '0'"
4209        // i [-5, 5] ==> some rows could pass (keep)
4210        // i [1, 11] ==> no rows can pass  (could keep)
4211        // i [-11, -1] ==>  no rows can pass (could keep)
4212        // i [NULL, NULL]  ==> unknown (keep)
4213        // i [1, NULL]  ==> no rows can pass (could keep)
4214        let expected_ret = &[true, true, true, true, true];
4215
4216        prune_with_expr(
4217            cast(col("i"), DataType::Utf8).eq(lit("0")),
4218            &schema,
4219            &statistics,
4220            expected_ret,
4221        );
4222    }
4223
4224    #[test]
4225    fn prune_int32_col_lt_neg_one() {
4226        let (schema, statistics) = int32_setup();
4227
4228        // Expression "i > -1" and "-i < 1"
4229        // i [-5, 5] ==> some rows could pass (must keep)
4230        // i [1, 11] ==> all rows must pass (must keep)
4231        // i [-11, -1] ==>  no rows can pass (not keep)
4232        // i [NULL, NULL]  ==> unknown (must keep)
4233        // i [1, NULL]  ==> all rows must pass (must keep)
4234        let expected_ret = &[true, true, false, true, true];
4235
4236        prune_with_expr(
4237            // i > -1
4238            col("i").gt(lit(-1)),
4239            &schema,
4240            &statistics,
4241            expected_ret,
4242        );
4243
4244        prune_with_expr(
4245            // -i < 1
4246            Expr::Negative(Box::new(col("i"))).lt(lit(1)),
4247            &schema,
4248            &statistics,
4249            expected_ret,
4250        );
4251    }
4252
4253    #[test]
4254    fn prune_int32_is_null() {
4255        let (schema, statistics) = int32_setup();
4256
4257        // Expression "i IS NULL" when there are no null statistics,
4258        // should all be kept
4259        let expected_ret = &[true, true, true, true, true];
4260
4261        prune_with_expr(
4262            // i IS NULL, no null statistics
4263            col("i").is_null(),
4264            &schema,
4265            &statistics,
4266            expected_ret,
4267        );
4268
4269        // provide null counts for each column
4270        let statistics = statistics.with_null_counts(
4271            "i",
4272            vec![
4273                Some(0), // no nulls (don't keep)
4274                Some(1), // 1 null
4275                None,    // unknown nulls
4276                None, // unknown nulls (min/max are both null too, like no stats at all)
4277                Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
4278            ],
4279        );
4280
4281        let expected_ret = &[false, true, true, true, false];
4282
4283        prune_with_expr(
4284            // i IS NULL, with actual null statistics
4285            col("i").is_null(),
4286            &schema,
4287            &statistics,
4288            expected_ret,
4289        );
4290    }
4291
4292    #[test]
4293    fn prune_int32_column_is_known_all_null() {
4294        let (schema, statistics) = int32_setup();
4295
4296        // Expression "i < 0"
4297        // i [-5, 5] ==> some rows could pass (must keep)
4298        // i [1, 11] ==> no rows can pass (not keep)
4299        // i [-11, -1] ==>  all rows must pass (must keep)
4300        // i [NULL, NULL]  ==> unknown (must keep)
4301        // i [1, NULL]  ==> no rows can pass (not keep)
4302        let expected_ret = &[true, false, true, true, false];
4303
4304        prune_with_expr(
4305            // i < 0
4306            col("i").lt(lit(0)),
4307            &schema,
4308            &statistics,
4309            expected_ret,
4310        );
4311
4312        // provide row counts for each column
4313        let statistics = statistics.with_row_counts(
4314            "i",
4315            vec![
4316                Some(10), // 10 rows of data
4317                Some(9),  // 9 rows of data
4318                None,     // unknown row counts
4319                Some(4),
4320                Some(10),
4321            ],
4322        );
4323
4324        // pruning result is still the same if we only know row counts
4325        prune_with_expr(
4326            // i < 0, with only row counts statistics
4327            col("i").lt(lit(0)),
4328            &schema,
4329            &statistics,
4330            expected_ret,
4331        );
4332
4333        // provide null counts for each column
4334        let statistics = statistics.with_null_counts(
4335            "i",
4336            vec![
4337                Some(0), // no nulls
4338                Some(1), // 1 null
4339                None,    // unknown nulls
4340                Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
4341                Some(0), // 0 nulls (max=null too which means no known max)
4342            ],
4343        );
4344
4345        // Expression "i < 0" with actual null and row counts statistics
4346        // col | min, max     | row counts | null counts |
4347        // ----+--------------+------------+-------------+
4348        //  i  | [-5, 5]      | 10         | 0           | ==> Some rows could pass (must keep)
4349        //  i  | [1, 11]      | 9          | 1           | ==> No rows can pass (not keep)
4350        //  i  | [-11,-1]     | Unknown    | Unknown     | ==> All rows must pass (must keep)
4351        //  i  | [NULL, NULL] | 4          | 4           | ==> The column is all null (not keep)
4352        //  i  | [1, NULL]    | 10         | 0           | ==> No rows can pass (not keep)
4353        let expected_ret = &[true, false, true, false, false];
4354
4355        prune_with_expr(
4356            // i < 0, with actual null and row counts statistics
4357            col("i").lt(lit(0)),
4358            &schema,
4359            &statistics,
4360            expected_ret,
4361        );
4362    }
4363
4364    #[test]
4365    fn prune_cast_scalar() {
4366        // The data type of column i is INT32
4367        let (schema, statistics) = int32_setup();
4368        let expected_ret = &[true, true, false, true, true];
4369
4370        prune_with_expr(
4371            // i > int64(0)
4372            col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
4373            &schema,
4374            &statistics,
4375            expected_ret,
4376        );
4377
4378        prune_with_expr(
4379            // cast(i as int64) > int64(0)
4380            cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4381            &schema,
4382            &statistics,
4383            expected_ret,
4384        );
4385
4386        prune_with_expr(
4387            // try_cast(i as int64) > int64(0)
4388            try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
4389            &schema,
4390            &statistics,
4391            expected_ret,
4392        );
4393
4394        prune_with_expr(
4395            // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
4396            Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
4397                .lt(lit(ScalarValue::Int64(Some(0)))),
4398            &schema,
4399            &statistics,
4400            expected_ret,
4401        );
4402    }
4403
4404    #[test]
4405    fn test_increment_utf8() {
4406        // Basic ASCII
4407        assert_eq!(increment_utf8("abc").unwrap(), "abd");
4408        assert_eq!(increment_utf8("abz").unwrap(), "ab{");
4409
4410        // Test around ASCII 127 (DEL)
4411        assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
4412        assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
4413
4414        // Test 2-byte UTF-8 sequences
4415        assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
4416
4417        // Test 3-byte UTF-8 sequences
4418        assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
4419
4420        // Test at UTF-8 boundaries
4421        assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
4422        assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
4423
4424        // Test that if we can't increment we return None
4425        assert!(increment_utf8("").is_none());
4426        assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
4427
4428        // Test that if we can't increment the last character we do the previous one and truncate
4429        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4430
4431        // Test surrogate pair range (0xD800..=0xDFFF)
4432        assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4433        assert!(increment_utf8("\u{D7FF}").is_none());
4434
4435        // Test non-characters range (0xFDD0..=0xFDEF)
4436        assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4437        assert!(increment_utf8("\u{FDCF}").is_none());
4438
4439        // Test private use area limit (>= 0x110000)
4440        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4441        assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
4442    }
4443
4444    /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
4445    /// with 5 different containers (e.g. RowGroups). They have [min,
4446    /// max]:
4447    /// s1 ["A", "Z"]
4448    /// s1 ["A", "L"]
4449    /// s1 ["N", "Z"]
4450    /// s1 [NULL, NULL]
4451    /// s1 ["A", NULL]
4452    /// s1 ["", "A"]
4453    /// s1 ["", ""]
4454    /// s1 ["AB", "A\u{10ffff}"]
4455    /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
4456    fn utf8_setup() -> (SchemaRef, TestStatistics) {
4457        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4458
4459        let statistics = TestStatistics::new().with(
4460            "s1",
4461            ContainerStats::new_utf8(
4462                vec![
4463                    Some("A"),
4464                    Some("A"),
4465                    Some("N"),
4466                    Some("M"),
4467                    None,
4468                    Some("A"),
4469                    Some(""),
4470                    Some(""),
4471                    Some("AB"),
4472                    Some("A\u{10ffff}\u{10ffff}"),
4473                ], // min
4474                vec![
4475                    Some("Z"),
4476                    Some("L"),
4477                    Some("Z"),
4478                    Some("M"),
4479                    None,
4480                    None,
4481                    Some("A"),
4482                    Some(""),
4483                    Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4484                    Some("A\u{10ffff}\u{10ffff}"),
4485                ], // max
4486            ),
4487        );
4488        (schema, statistics)
4489    }
4490
4491    #[test]
4492    fn prune_utf8_eq() {
4493        let (schema, statistics) = utf8_setup();
4494
4495        let expr = col("s1").eq(lit("A"));
4496        #[rustfmt::skip]
4497        let expected_ret = &[
4498            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4499            true,
4500            // s1 ["A", "L"] ==> some rows could pass (must keep)
4501            true,
4502            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4503            false,
4504            // s1 ["M", "M"] ==> no rows can pass (not keep)
4505            false,
4506            // s1 [NULL, NULL]  ==> unknown (must keep)
4507            true,
4508            // s1 ["A", NULL]  ==> unknown (must keep)
4509            true,
4510            // s1 ["", "A"]  ==> some rows could pass (must keep)
4511            true,
4512            // s1 ["", ""]  ==> no rows can pass (not keep)
4513            false,
4514            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4515            false,
4516            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4517            false,
4518        ];
4519        prune_with_expr(expr, &schema, &statistics, expected_ret);
4520
4521        let expr = col("s1").eq(lit(""));
4522        #[rustfmt::skip]
4523        let expected_ret = &[
4524            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4525            false,
4526            // s1 ["A", "L"] ==> no rows can pass (not keep)
4527            false,
4528            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4529            false,
4530            // s1 ["M", "M"] ==> no rows can pass (not keep)
4531            false,
4532            // s1 [NULL, NULL]  ==> unknown (must keep)
4533            true,
4534            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4535            false,
4536            // s1 ["", "A"]  ==> some rows could pass (must keep)
4537            true,
4538            // s1 ["", ""]  ==> all rows must pass (must keep)
4539            true,
4540            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4541            false,
4542            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4543            false,
4544        ];
4545        prune_with_expr(expr, &schema, &statistics, expected_ret);
4546    }
4547
4548    #[test]
4549    fn prune_utf8_not_eq() {
4550        let (schema, statistics) = utf8_setup();
4551
4552        let expr = col("s1").not_eq(lit("A"));
4553        #[rustfmt::skip]
4554        let expected_ret = &[
4555            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4556            true,
4557            // s1 ["A", "L"] ==> some rows could pass (must keep)
4558            true,
4559            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4560            true,
4561            // s1 ["M", "M"] ==> all rows must pass (must keep)
4562            true,
4563            // s1 [NULL, NULL]  ==> unknown (must keep)
4564            true,
4565            // s1 ["A", NULL]  ==> unknown (must keep)
4566            true,
4567            // s1 ["", "A"]  ==> some rows could pass (must keep)
4568            true,
4569            // s1 ["", ""]  ==> all rows must pass (must keep)
4570            true,
4571            // s1 ["AB", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4572            true,
4573            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4574            true,
4575        ];
4576        prune_with_expr(expr, &schema, &statistics, expected_ret);
4577
4578        let expr = col("s1").not_eq(lit(""));
4579        #[rustfmt::skip]
4580        let expected_ret = &[
4581            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4582            true,
4583            // s1 ["A", "L"] ==> all rows must pass (must keep)
4584            true,
4585            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4586            true,
4587            // s1 ["M", "M"] ==> all rows must pass (must keep)
4588            true,
4589            // s1 [NULL, NULL]  ==> unknown (must keep)
4590            true,
4591            // s1 ["A", NULL]  ==> unknown (must keep)
4592            true,
4593            // s1 ["", "A"]  ==> some rows could pass (must keep)
4594            true,
4595            // s1 ["", ""]  ==> no rows can pass (not keep)
4596            false,
4597            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4598            true,
4599            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4600            true,
4601        ];
4602        prune_with_expr(expr, &schema, &statistics, expected_ret);
4603    }
4604
4605    #[test]
4606    fn prune_utf8_like_one() {
4607        let (schema, statistics) = utf8_setup();
4608
4609        let expr = col("s1").like(lit("A_"));
4610        #[rustfmt::skip]
4611        let expected_ret = &[
4612            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4613            true,
4614            // s1 ["A", "L"] ==> some rows could pass (must keep)
4615            true,
4616            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4617            false,
4618            // s1 ["M", "M"] ==> no rows can pass (not keep)
4619            false,
4620            // s1 [NULL, NULL]  ==> unknown (must keep)
4621            true,
4622            // s1 ["A", NULL]  ==> unknown (must keep)
4623            true,
4624            // s1 ["", "A"]  ==> some rows could pass (must keep)
4625            true,
4626            // s1 ["", ""]  ==> no rows can pass (not keep)
4627            false,
4628            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4629            true,
4630            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4631            true,
4632        ];
4633        prune_with_expr(expr, &schema, &statistics, expected_ret);
4634
4635        let expr = col("s1").like(lit("_A_"));
4636        #[rustfmt::skip]
4637        let expected_ret = &[
4638            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4639            true,
4640            // s1 ["A", "L"] ==> some rows could pass (must keep)
4641            true,
4642            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4643            true,
4644            // s1 ["M", "M"] ==> some rows could pass (must keep)
4645            true,
4646            // s1 [NULL, NULL]  ==> unknown (must keep)
4647            true,
4648            // s1 ["A", NULL]  ==> unknown (must keep)
4649            true,
4650            // s1 ["", "A"]  ==> some rows could pass (must keep)
4651            true,
4652            // s1 ["", ""]  ==> some rows could pass (must keep)
4653            true,
4654            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4655            true,
4656            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4657            true,
4658        ];
4659        prune_with_expr(expr, &schema, &statistics, expected_ret);
4660
4661        let expr = col("s1").like(lit("_"));
4662        #[rustfmt::skip]
4663        let expected_ret = &[
4664            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4665            true,
4666            // s1 ["A", "L"] ==> all rows must pass (must keep)
4667            true,
4668            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4669            true,
4670            // s1 ["M", "M"] ==> all rows must pass (must keep)
4671            true,
4672            // s1 [NULL, NULL]  ==> unknown (must keep)
4673            true,
4674            // s1 ["A", NULL]  ==> unknown (must keep)
4675            true,
4676            // s1 ["", "A"]  ==> all rows must pass (must keep)
4677            true,
4678            // s1 ["", ""]  ==> all rows must pass (must keep)
4679            true,
4680            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4681            true,
4682            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4683            true,
4684        ];
4685        prune_with_expr(expr, &schema, &statistics, expected_ret);
4686
4687        let expr = col("s1").like(lit(""));
4688        #[rustfmt::skip]
4689        let expected_ret = &[
4690            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4691            false,
4692            // s1 ["A", "L"] ==> no rows can pass (not keep)
4693            false,
4694            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4695            false,
4696            // s1 ["M", "M"] ==> no rows can pass (not keep)
4697            false,
4698            // s1 [NULL, NULL]  ==> unknown (must keep)
4699            true,
4700            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4701            false,
4702            // s1 ["", "A"]  ==> some rows could pass (must keep)
4703            true,
4704            // s1 ["", ""]  ==> all rows must pass (must keep)
4705            true,
4706            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4707            false,
4708            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4709            false,
4710        ];
4711        prune_with_expr(expr, &schema, &statistics, expected_ret);
4712    }
4713
4714    #[test]
4715    fn prune_utf8_like_many() {
4716        let (schema, statistics) = utf8_setup();
4717
4718        let expr = col("s1").like(lit("A%"));
4719        #[rustfmt::skip]
4720        let expected_ret = &[
4721            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4722            true,
4723            // s1 ["A", "L"] ==> some rows could pass (must keep)
4724            true,
4725            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4726            false,
4727            // s1 ["M", "M"] ==> no rows can pass (not keep)
4728            false,
4729            // s1 [NULL, NULL]  ==> unknown (must keep)
4730            true,
4731            // s1 ["A", NULL]  ==> unknown (must keep)
4732            true,
4733            // s1 ["", "A"]  ==> some rows could pass (must keep)
4734            true,
4735            // s1 ["", ""]  ==> no rows can pass (not keep)
4736            false,
4737            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4738            true,
4739            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4740            true,
4741        ];
4742        prune_with_expr(expr, &schema, &statistics, expected_ret);
4743
4744        let expr = col("s1").like(lit("%A%"));
4745        #[rustfmt::skip]
4746        let expected_ret = &[
4747            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4748            true,
4749            // s1 ["A", "L"] ==> some rows could pass (must keep)
4750            true,
4751            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4752            true,
4753            // s1 ["M", "M"] ==> some rows could pass (must keep)
4754            true,
4755            // s1 [NULL, NULL]  ==> unknown (must keep)
4756            true,
4757            // s1 ["A", NULL]  ==> unknown (must keep)
4758            true,
4759            // s1 ["", "A"]  ==> some rows could pass (must keep)
4760            true,
4761            // s1 ["", ""]  ==> some rows could pass (must keep)
4762            true,
4763            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4764            true,
4765            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4766            true,
4767        ];
4768        prune_with_expr(expr, &schema, &statistics, expected_ret);
4769
4770        let expr = col("s1").like(lit("%"));
4771        #[rustfmt::skip]
4772        let expected_ret = &[
4773            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4774            true,
4775            // s1 ["A", "L"] ==> all rows must pass (must keep)
4776            true,
4777            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4778            true,
4779            // s1 ["M", "M"] ==> all rows must pass (must keep)
4780            true,
4781            // s1 [NULL, NULL]  ==> unknown (must keep)
4782            true,
4783            // s1 ["A", NULL]  ==> unknown (must keep)
4784            true,
4785            // s1 ["", "A"]  ==> all rows must pass (must keep)
4786            true,
4787            // s1 ["", ""]  ==> all rows must pass (must keep)
4788            true,
4789            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4790            true,
4791            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4792            true,
4793        ];
4794        prune_with_expr(expr, &schema, &statistics, expected_ret);
4795
4796        let expr = col("s1").like(lit(""));
4797        #[rustfmt::skip]
4798        let expected_ret = &[
4799            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4800            false,
4801            // s1 ["A", "L"] ==> no rows can pass (not keep)
4802            false,
4803            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4804            false,
4805            // s1 ["M", "M"] ==> no rows can pass (not keep)
4806            false,
4807            // s1 [NULL, NULL]  ==> unknown (must keep)
4808            true,
4809            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4810            false,
4811            // s1 ["", "A"]  ==> some rows could pass (must keep)
4812            true,
4813            // s1 ["", ""]  ==> all rows must pass (must keep)
4814            true,
4815            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4816            false,
4817            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4818            false,
4819        ];
4820        prune_with_expr(expr, &schema, &statistics, expected_ret);
4821    }
4822
4823    // `build_like_match()` must honor `\` escapes when scanning the pattern for
4824    // wildcards.
4825    #[test]
4826    fn prune_utf8_like_escaped_chars() {
4827        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4828        let statistics = TestStatistics::new().with(
4829            "s1",
4830            ContainerStats::new_utf8(
4831                vec![
4832                    Some("foo_aaa"),
4833                    Some(r#"foo\aaa"#),
4834                    Some("foo"),
4835                    Some("bar"),
4836                    Some("foo%aaa"),
4837                    Some("%foo_aaa"),
4838                ], // min
4839                vec![
4840                    Some("foo_zzz"),
4841                    Some(r#"foo\zzz"#),
4842                    Some("foozzz"),
4843                    Some("baz"),
4844                    Some("foo%zzz"),
4845                    Some("%foo_zzz"),
4846                ], // max
4847            ),
4848        );
4849
4850        let expr = col("s1").like(lit(r#"foo\_%"#));
4851        #[rustfmt::skip]
4852        let expected_ret = &[
4853            // s1 ["foo_aaa", "foo_zzz"] => every value starts with literal
4854            // "foo_" and matches the pattern; must keep.
4855            true,
4856            // s1 ["foo\aaa", "foo\zzz"] => no rows can pass (not keep)
4857            false,
4858            // s1 ["foo", "foozzz"] => stats don't prove "foo_" is or isn't in
4859            // range; must conservatively keep.
4860            true,
4861            // s1 ["bar", "baz"] => no rows can pass (not keep)
4862            false,
4863            // s1 ["foo%aaa", "foo%zzz"] => no rows can pass (not keep)
4864            false,
4865            // s1 ["%foo_aaa", "%foo_zzz"] => no rows can pass (not keep)
4866            false,
4867        ];
4868        prune_with_expr(expr, &schema, &statistics, expected_ret);
4869
4870        let expr = col("s1").like(lit(r#"foo\\%"#));
4871        #[rustfmt::skip]
4872        let expected_ret = &[
4873            // s1 ["foo_aaa", "foo_zzz"] => no rows can pass (not keep)
4874            false,
4875            // s1 ["foo\aaa", "foo\zzz"] => every value starts with literal
4876            // "foo\" and matches the pattern; must keep.
4877            true,
4878            // s1 ["foo", "foozzz"] => stats don't prove "foo\" is or isn't in
4879            // range; must conservatively keep.
4880            true,
4881            // s1 ["bar", "baz"] => no rows can pass (not keep)
4882            false,
4883            // s1 ["foo%aaa", "foo%zzz"] => no rows can pass (not keep)
4884            false,
4885            // s1 ["%foo_aaa", "%foo_zzz"] => no rows can pass (not keep)
4886            false,
4887        ];
4888        prune_with_expr(expr, &schema, &statistics, expected_ret);
4889
4890        let expr = col("s1").like(lit(r#"foo\%%"#));
4891        #[rustfmt::skip]
4892        let expected_ret = &[
4893            // s1 ["foo_aaa", "foo_zzz"] => no rows can pass (not keep)
4894            false,
4895            // s1 ["foo\aaa", "foo\zzz"] => no rows can pass (not keep)
4896            false,
4897            // s1 ["foo", "foozzz"] => range straddles "foo%"; must keep.
4898            true,
4899            // s1 ["bar", "baz"] => no rows can pass (not keep)
4900            false,
4901            // s1 ["foo%aaa", "foo%zzz"] => every value starts with literal
4902            // "foo%" and matches the pattern; must keep.
4903            true,
4904            // s1 ["%foo_aaa", "%foo_zzz"] => no rows can pass (not keep)
4905            false,
4906        ];
4907        prune_with_expr(expr, &schema, &statistics, expected_ret);
4908
4909        // No wildcard after escapes: pattern reduces to an equality check on
4910        // the literal "foo_".
4911        let expr = col("s1").like(lit(r#"foo\_"#));
4912        #[rustfmt::skip]
4913        let expected_ret = &[
4914            // s1 ["foo_aaa", "foo_zzz"] => no rows can pass (not keep)
4915            false,
4916            // s1 ["foo\aaa", "foo\zzz"] => no rows can pass (not keep)
4917            false,
4918            // s1 ["foo", "foozzz"] => "foo_" is within the range; must keep.
4919            true,
4920            // s1 ["bar", "baz"] => no rows can pass (not keep)
4921            false,
4922            // s1 ["foo%aaa", "foo%zzz"] => no rows can pass (not keep)
4923            false,
4924            // s1 ["%foo_aaa", "%foo_zzz"] => no rows can pass (not keep)
4925            false,
4926        ];
4927        prune_with_expr(expr, &schema, &statistics, expected_ret);
4928
4929        // Leading escaped `%`: prefix is "%foo" (non-empty), so the guard
4930        // for "all wildcards" must NOT bail out here.
4931        let expr = col("s1").like(lit(r#"\%foo%"#));
4932        #[rustfmt::skip]
4933        let expected_ret = &[
4934            // s1 ["foo_aaa", "foo_zzz"] => no rows can pass (not keep)
4935            false,
4936            // s1 ["foo\aaa", "foo\zzz"] => no rows can pass (not keep)
4937            false,
4938            // s1 ["foo", "foozzz"] => no rows can pass (not keep)
4939            false,
4940            // s1 ["bar", "baz"] => no rows can pass (not keep)
4941            false,
4942            // s1 ["foo%aaa", "foo%zzz"] => no rows can pass (not keep)
4943            false,
4944            // s1 ["%foo_aaa", "%foo_zzz"] => every value starts with literal
4945            // "%foo" and matches the pattern; must keep.
4946            true,
4947        ];
4948        prune_with_expr(expr, &schema, &statistics, expected_ret);
4949
4950        // Two escaped wildcards, no real wildcard: equality on "foo%_".
4951        let expr = col("s1").like(lit(r#"foo\%\_"#));
4952        #[rustfmt::skip]
4953        let expected_ret = &[
4954            // s1 ["foo_aaa", "foo_zzz"] => no rows can pass (not keep)
4955            false,
4956            // s1 ["foo\aaa", "foo\zzz"] => no rows can pass (not keep)
4957            false,
4958            // s1 ["foo", "foozzz"] => "foo%_" is within the range; must keep.
4959            true,
4960            // s1 ["bar", "baz"] => no rows can pass (not keep)
4961            false,
4962            // s1 ["foo%aaa", "foo%zzz"] => no rows can pass (not keep)
4963            false,
4964            // s1 ["%foo_aaa", "%foo_zzz"] => no rows can pass (not keep)
4965            false,
4966        ];
4967        prune_with_expr(expr, &schema, &statistics, expected_ret);
4968
4969        // Escaped backslash followed by more literal chars before the
4970        // wildcard: prefix is "foo\bar".
4971        let expr = col("s1").like(lit(r#"foo\\bar%"#));
4972        #[rustfmt::skip]
4973        let expected_ret = &[
4974            // s1 ["foo_aaa", "foo_zzz"] => no rows can pass (not keep)
4975            false,
4976            // s1 ["foo\aaa", "foo\zzz"] => range straddles "foo\bar"; must
4977            // keep.
4978            true,
4979            // s1 ["foo", "foozzz"] => range straddles "foo\bar"; must keep.
4980            true,
4981            // s1 ["bar", "baz"] => no rows can pass (not keep)
4982            false,
4983            // s1 ["foo%aaa", "foo%zzz"] => no rows can pass (not keep)
4984            false,
4985            // s1 ["%foo_aaa", "%foo_zzz"] => no rows can pass (not keep)
4986            false,
4987        ];
4988        prune_with_expr(expr, &schema, &statistics, expected_ret);
4989    }
4990
4991    #[test]
4992    fn prune_utf8_not_like_one() {
4993        let (schema, statistics) = utf8_setup();
4994
4995        let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4996        #[rustfmt::skip]
4997        let expected_ret = &[
4998            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4999            true,
5000            // s1 ["A", "L"] ==> some rows could pass (must keep)
5001            true,
5002            // s1 ["N", "Z"] ==> some rows could pass (must keep)
5003            true,
5004            // s1 ["M", "M"] ==> some rows could pass (must keep)
5005            true,
5006            // s1 [NULL, NULL]  ==> unknown (must keep)
5007            true,
5008            // s1 ["A", NULL]  ==> some rows could pass (must keep)
5009            true,
5010            // s1 ["", "A"]  ==> some rows could pass (must keep)
5011            true,
5012            // s1 ["", ""]  ==> some rows could pass (must keep)
5013            true,
5014            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
5015            true,
5016            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match. (min, max) maybe truncate
5017            // original (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
5018            true,
5019        ];
5020        prune_with_expr(expr, &schema, &statistics, expected_ret);
5021    }
5022
5023    #[test]
5024    fn prune_utf8_not_like_many() {
5025        let (schema, statistics) = utf8_setup();
5026
5027        let expr = col("s1").not_like(lit("A\u{10ffff}%"));
5028        #[rustfmt::skip]
5029        let expected_ret = &[
5030            // s1 ["A", "Z"] ==> some rows could pass (must keep)
5031            true,
5032            // s1 ["A", "L"] ==> some rows could pass (must keep)
5033            true,
5034            // s1 ["N", "Z"] ==> some rows could pass (must keep)
5035            true,
5036            // s1 ["M", "M"] ==> some rows could pass (must keep)
5037            true,
5038            // s1 [NULL, NULL]  ==> unknown (must keep)
5039            true,
5040            // s1 ["A", NULL]  ==> some rows could pass (must keep)
5041            true,
5042            // s1 ["", "A"]  ==> some rows could pass (must keep)
5043            true,
5044            // s1 ["", ""]  ==> some rows could pass (must keep)
5045            true,
5046            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
5047            true,
5048            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match
5049            false,
5050        ];
5051        prune_with_expr(expr, &schema, &statistics, expected_ret);
5052
5053        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
5054        #[rustfmt::skip]
5055        let expected_ret = &[
5056            // s1 ["A", "Z"] ==> some rows could pass (must keep)
5057            true,
5058            // s1 ["A", "L"] ==> some rows could pass (must keep)
5059            true,
5060            // s1 ["N", "Z"] ==> some rows could pass (must keep)
5061            true,
5062            // s1 ["M", "M"] ==> some rows could pass (must keep)
5063            true,
5064            // s1 [NULL, NULL]  ==> unknown (must keep)
5065            true,
5066            // s1 ["A", NULL]  ==> some rows could pass (must keep)
5067            true,
5068            // s1 ["", "A"]  ==> some rows could pass (must keep)
5069            true,
5070            // s1 ["", ""]  ==> some rows could pass (must keep)
5071            true,
5072            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
5073            true,
5074            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
5075            true,
5076        ];
5077        prune_with_expr(expr, &schema, &statistics, expected_ret);
5078
5079        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
5080        #[rustfmt::skip]
5081        let expected_ret = &[
5082            // s1 ["A", "Z"] ==> some rows could pass (must keep)
5083            true,
5084            // s1 ["A", "L"] ==> some rows could pass (must keep)
5085            true,
5086            // s1 ["N", "Z"] ==> some rows could pass (must keep)
5087            true,
5088            // s1 ["M", "M"] ==> some rows could pass (must keep)
5089            true,
5090            // s1 [NULL, NULL]  ==> unknown (must keep)
5091            true,
5092            // s1 ["A", NULL]  ==> some rows could pass (must keep)
5093            true,
5094            // s1 ["", "A"]  ==> some rows could pass (must keep)
5095            true,
5096            // s1 ["", ""]  ==> some rows could pass (must keep)
5097            true,
5098            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
5099            true,
5100            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
5101            true,
5102        ];
5103        prune_with_expr(expr, &schema, &statistics, expected_ret);
5104
5105        let expr = col("s1").not_like(lit("A\\%%"));
5106        let statistics = TestStatistics::new().with(
5107            "s1",
5108            ContainerStats::new_utf8(
5109                vec![Some("A%a"), Some("A")],
5110                vec![Some("A%c"), Some("A")],
5111            ),
5112        );
5113        let expected_ret = &[false, true];
5114        prune_with_expr(expr, &schema, &statistics, expected_ret);
5115    }
5116
5117    #[test]
5118    fn test_rewrite_expr_to_prunable() {
5119        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
5120        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
5121
5122        // column op lit
5123        let left_input = col("a");
5124        let left_input = logical2physical(&left_input, &schema);
5125        let right_input = lit(ScalarValue::Int32(Some(12)));
5126        let right_input = logical2physical(&right_input, &schema);
5127        let (result_left, _, result_right) = rewrite_expr_to_prunable(
5128            &left_input,
5129            Operator::Eq,
5130            &right_input,
5131            df_schema.clone(),
5132        )
5133        .unwrap();
5134        assert_eq!(result_left.to_string(), left_input.to_string());
5135        assert_eq!(result_right.to_string(), right_input.to_string());
5136
5137        // cast op lit
5138        let left_input = cast(col("a"), DataType::Decimal128(20, 3));
5139        let left_input = logical2physical(&left_input, &schema);
5140        let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
5141        let right_input = logical2physical(&right_input, &schema);
5142        let (result_left, _, result_right) = rewrite_expr_to_prunable(
5143            &left_input,
5144            Operator::Gt,
5145            &right_input,
5146            df_schema.clone(),
5147        )
5148        .unwrap();
5149        assert_eq!(result_left.to_string(), left_input.to_string());
5150        assert_eq!(result_right.to_string(), right_input.to_string());
5151
5152        // try_cast op lit
5153        let left_input = try_cast(col("a"), DataType::Int64);
5154        let left_input = logical2physical(&left_input, &schema);
5155        let right_input = lit(ScalarValue::Int64(Some(12)));
5156        let right_input = logical2physical(&right_input, &schema);
5157        let (result_left, _, result_right) =
5158            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
5159                .unwrap();
5160        assert_eq!(result_left.to_string(), left_input.to_string());
5161        assert_eq!(result_right.to_string(), right_input.to_string());
5162
5163        // TODO: add test for other case and op
5164    }
5165
5166    #[test]
5167    fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
5168        struct CustomUnhandledHook;
5169
5170        impl UnhandledPredicateHook for CustomUnhandledHook {
5171            /// This handles an arbitrary case of a column that doesn't exist in the schema
5172            /// by renaming it to yet another column that doesn't exist in the schema
5173            /// (the transformation is arbitrary, the point is that it can do whatever it wants)
5174            fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
5175                Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
5176            }
5177        }
5178
5179        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
5180        let schema_with_b = Schema::new(vec![
5181            Field::new("a", DataType::Int32, true),
5182            Field::new("b", DataType::Int32, true),
5183        ]);
5184
5185        let rewriter = PredicateRewriter::new()
5186            .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
5187
5188        let transform_expr = |expr| {
5189            let expr = logical2physical(&expr, &schema_with_b);
5190            rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
5191        };
5192
5193        // transform an arbitrary valid expression that we know is handled
5194        let known_expression = col("a").eq(lit(12));
5195        let known_expression_transformed = PredicateRewriter::new()
5196            .rewrite_predicate_to_statistics_predicate(
5197                &logical2physical(&known_expression, &schema),
5198                &schema,
5199            );
5200
5201        // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
5202        let input = col("b").eq(lit(12));
5203        let expected = logical2physical(&lit(42), &schema);
5204        let transformed = transform_expr(input.clone());
5205        assert_eq!(transformed.to_string(), expected.to_string());
5206
5207        // more complex case with unknown column
5208        let input = known_expression.clone().and(input.clone());
5209        let expected = phys_expr::BinaryExpr::new(
5210            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
5211            Operator::And,
5212            logical2physical(&lit(42), &schema),
5213        );
5214        let transformed = transform_expr(input.clone());
5215        assert_eq!(transformed.to_string(), expected.to_string());
5216
5217        // an unknown expression gets passed to the hook
5218        let input = array_has(make_array(vec![lit(1)]), col("a"));
5219        let expected = logical2physical(&lit(42), &schema);
5220        let transformed = transform_expr(input.clone());
5221        assert_eq!(transformed.to_string(), expected.to_string());
5222
5223        // more complex case with unknown expression
5224        let input = known_expression.and(input);
5225        let expected = phys_expr::BinaryExpr::new(
5226            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
5227            Operator::And,
5228            logical2physical(&lit(42), &schema),
5229        );
5230        let transformed = transform_expr(input.clone());
5231        assert_eq!(transformed.to_string(), expected.to_string());
5232    }
5233
5234    #[test]
5235    fn test_rewrite_expr_to_prunable_error() {
5236        // cast string value to numeric value
5237        // this cast is not supported
5238        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
5239        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
5240        let left_input = cast(col("a"), DataType::Int64);
5241        let left_input = logical2physical(&left_input, &schema);
5242        let right_input = lit(ScalarValue::Int64(Some(12)));
5243        let right_input = logical2physical(&right_input, &schema);
5244        let result = rewrite_expr_to_prunable(
5245            &left_input,
5246            Operator::Gt,
5247            &right_input,
5248            df_schema.clone(),
5249        );
5250        assert!(result.is_err());
5251
5252        // other expr
5253        let left_input = is_null(col("a"));
5254        let left_input = logical2physical(&left_input, &schema);
5255        let right_input = lit(ScalarValue::Int64(Some(12)));
5256        let right_input = logical2physical(&right_input, &schema);
5257        let result =
5258            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
5259        assert!(result.is_err());
5260        // TODO: add other negative test for other case and op
5261    }
5262
5263    #[test]
5264    fn prune_with_contained_one_column() {
5265        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
5266
5267        // Model having information like a bloom filter for s1
5268        let statistics = TestStatistics::new()
5269            .with_contained(
5270                "s1",
5271                [ScalarValue::from("foo")],
5272                [
5273                    // container 0 known to only contain "foo"",
5274                    Some(true),
5275                    // container 1 known to not contain "foo"
5276                    Some(false),
5277                    // container 2 unknown about "foo"
5278                    None,
5279                    // container 3 known to only contain "foo"
5280                    Some(true),
5281                    // container 4 known to not contain "foo"
5282                    Some(false),
5283                    // container 5 unknown about "foo"
5284                    None,
5285                    // container 6 known to only contain "foo"
5286                    Some(true),
5287                    // container 7 known to not contain "foo"
5288                    Some(false),
5289                    // container 8 unknown about "foo"
5290                    None,
5291                ],
5292            )
5293            .with_contained(
5294                "s1",
5295                [ScalarValue::from("bar")],
5296                [
5297                    // containers 0,1,2 known to only contain "bar"
5298                    Some(true),
5299                    Some(true),
5300                    Some(true),
5301                    // container 3,4,5 known to not contain "bar"
5302                    Some(false),
5303                    Some(false),
5304                    Some(false),
5305                    // container 6,7,8 unknown about "bar"
5306                    None,
5307                    None,
5308                    None,
5309                ],
5310            )
5311            .with_contained(
5312                // the way the tests are setup, this data is
5313                // consulted if the "foo" and "bar" are being checked at the same time
5314                "s1",
5315                [ScalarValue::from("foo"), ScalarValue::from("bar")],
5316                [
5317                    // container 0,1,2 unknown about ("foo, "bar")
5318                    None,
5319                    None,
5320                    None,
5321                    // container 3,4,5 known to contain only either "foo" and "bar"
5322                    Some(true),
5323                    Some(true),
5324                    Some(true),
5325                    // container 6,7,8  known to contain  neither "foo" and "bar"
5326                    Some(false),
5327                    Some(false),
5328                    Some(false),
5329                ],
5330            );
5331
5332        // s1 = 'foo'
5333        prune_with_expr(
5334            col("s1").eq(lit("foo")),
5335            &schema,
5336            &statistics,
5337            // rule out containers ('false) where we know foo is not present
5338            &[true, false, true, true, false, true, true, false, true],
5339        );
5340
5341        // s1 = 'bar'
5342        prune_with_expr(
5343            col("s1").eq(lit("bar")),
5344            &schema,
5345            &statistics,
5346            // rule out containers where we know bar is not present
5347            &[true, true, true, false, false, false, true, true, true],
5348        );
5349
5350        // s1 = 'baz' (unknown value)
5351        prune_with_expr(
5352            col("s1").eq(lit("baz")),
5353            &schema,
5354            &statistics,
5355            // can't rule out anything
5356            &[true, true, true, true, true, true, true, true, true],
5357        );
5358
5359        // s1 = 'foo' AND s1 = 'bar'
5360        prune_with_expr(
5361            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
5362            &schema,
5363            &statistics,
5364            // logically this predicate can't possibly be true (the column can't
5365            // take on both values) but we could rule it out if the stats tell
5366            // us that both values are not present
5367            &[true, true, true, true, true, true, true, true, true],
5368        );
5369
5370        // s1 = 'foo' OR s1 = 'bar'
5371        prune_with_expr(
5372            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
5373            &schema,
5374            &statistics,
5375            // can rule out containers that we know contain neither foo nor bar
5376            &[true, true, true, true, true, true, false, false, false],
5377        );
5378
5379        // s1 = 'foo' OR s1 = 'baz'
5380        prune_with_expr(
5381            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
5382            &schema,
5383            &statistics,
5384            // can't rule out anything container
5385            &[true, true, true, true, true, true, true, true, true],
5386        );
5387
5388        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
5389        prune_with_expr(
5390            col("s1")
5391                .eq(lit("foo"))
5392                .or(col("s1").eq(lit("bar")))
5393                .or(col("s1").eq(lit("baz"))),
5394            &schema,
5395            &statistics,
5396            // can rule out any containers based on knowledge of s1 and `foo`,
5397            // `bar` and (`foo`, `bar`)
5398            &[true, true, true, true, true, true, true, true, true],
5399        );
5400
5401        // s1 != foo
5402        prune_with_expr(
5403            col("s1").not_eq(lit("foo")),
5404            &schema,
5405            &statistics,
5406            // rule out containers we know for sure only contain foo
5407            &[false, true, true, false, true, true, false, true, true],
5408        );
5409
5410        // s1 != bar
5411        prune_with_expr(
5412            col("s1").not_eq(lit("bar")),
5413            &schema,
5414            &statistics,
5415            // rule out when we know for sure s1 has the value bar
5416            &[false, false, false, true, true, true, true, true, true],
5417        );
5418
5419        // s1 != foo AND s1 != bar
5420        prune_with_expr(
5421            col("s1")
5422                .not_eq(lit("foo"))
5423                .and(col("s1").not_eq(lit("bar"))),
5424            &schema,
5425            &statistics,
5426            // can rule out any container where we know s1 does not have either 'foo' or 'bar'
5427            &[true, true, true, false, false, false, true, true, true],
5428        );
5429
5430        // s1 != foo AND s1 != bar AND s1 != baz
5431        prune_with_expr(
5432            col("s1")
5433                .not_eq(lit("foo"))
5434                .and(col("s1").not_eq(lit("bar")))
5435                .and(col("s1").not_eq(lit("baz"))),
5436            &schema,
5437            &statistics,
5438            // can't rule out any container based on  knowledge of s1,s2
5439            &[true, true, true, true, true, true, true, true, true],
5440        );
5441
5442        // s1 != foo OR s1 != bar
5443        prune_with_expr(
5444            col("s1")
5445                .not_eq(lit("foo"))
5446                .or(col("s1").not_eq(lit("bar"))),
5447            &schema,
5448            &statistics,
5449            // cant' rule out anything based on contains information
5450            &[true, true, true, true, true, true, true, true, true],
5451        );
5452
5453        // s1 != foo OR s1 != bar OR s1 != baz
5454        prune_with_expr(
5455            col("s1")
5456                .not_eq(lit("foo"))
5457                .or(col("s1").not_eq(lit("bar")))
5458                .or(col("s1").not_eq(lit("baz"))),
5459            &schema,
5460            &statistics,
5461            // cant' rule out anything based on contains information
5462            &[true, true, true, true, true, true, true, true, true],
5463        );
5464    }
5465
5466    #[test]
5467    fn prune_with_contained_two_columns() {
5468        let schema = Arc::new(Schema::new(vec![
5469            Field::new("s1", DataType::Utf8, true),
5470            Field::new("s2", DataType::Utf8, true),
5471        ]));
5472
5473        // Model having information like bloom filters for s1 and s2
5474        let statistics = TestStatistics::new()
5475            .with_contained(
5476                "s1",
5477                [ScalarValue::from("foo")],
5478                [
5479                    // container 0, s1 known to only contain "foo"",
5480                    Some(true),
5481                    // container 1, s1 known to not contain "foo"
5482                    Some(false),
5483                    // container 2, s1 unknown about "foo"
5484                    None,
5485                    // container 3, s1 known to only contain "foo"
5486                    Some(true),
5487                    // container 4, s1 known to not contain "foo"
5488                    Some(false),
5489                    // container 5, s1 unknown about "foo"
5490                    None,
5491                    // container 6, s1 known to only contain "foo"
5492                    Some(true),
5493                    // container 7, s1 known to not contain "foo"
5494                    Some(false),
5495                    // container 8, s1 unknown about "foo"
5496                    None,
5497                ],
5498            )
5499            .with_contained(
5500                "s2", // for column s2
5501                [ScalarValue::from("bar")],
5502                [
5503                    // containers 0,1,2 s2 known to only contain "bar"
5504                    Some(true),
5505                    Some(true),
5506                    Some(true),
5507                    // container 3,4,5 s2 known to not contain "bar"
5508                    Some(false),
5509                    Some(false),
5510                    Some(false),
5511                    // container 6,7,8 s2 unknown about "bar"
5512                    None,
5513                    None,
5514                    None,
5515                ],
5516            );
5517
5518        // s1 = 'foo'
5519        prune_with_expr(
5520            col("s1").eq(lit("foo")),
5521            &schema,
5522            &statistics,
5523            // rule out containers where we know s1 is not present
5524            &[true, false, true, true, false, true, true, false, true],
5525        );
5526
5527        // s1 = 'foo' OR s2 = 'bar'
5528        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
5529        prune_with_expr(
5530            expr,
5531            &schema,
5532            &statistics,
5533            //  can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
5534            &[true, true, true, true, true, true, true, true, true],
5535        );
5536
5537        // s1 = 'foo' AND s2 != 'bar'
5538        prune_with_expr(
5539            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
5540            &schema,
5541            &statistics,
5542            // can only rule out container where we know either:
5543            // 1. s1 doesn't have the value 'foo` or
5544            // 2. s2 has only the value of 'bar'
5545            &[false, false, false, true, false, true, true, false, true],
5546        );
5547
5548        // s1 != 'foo' AND s2 != 'bar'
5549        prune_with_expr(
5550            col("s1")
5551                .not_eq(lit("foo"))
5552                .and(col("s2").not_eq(lit("bar"))),
5553            &schema,
5554            &statistics,
5555            // Can  rule out any container where we know either
5556            // 1. s1 has only the value 'foo'
5557            // 2. s2 has only the value 'bar'
5558            &[false, false, false, false, true, true, false, true, true],
5559        );
5560
5561        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
5562        prune_with_expr(
5563            col("s1")
5564                .not_eq(lit("foo"))
5565                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
5566            &schema,
5567            &statistics,
5568            // Can rule out any container where we know s1 has only the value
5569            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
5570            &[false, true, true, false, true, true, false, true, true],
5571        );
5572
5573        // s1 like '%foo%bar%'
5574        prune_with_expr(
5575            col("s1").like(lit("foo%bar%")),
5576            &schema,
5577            &statistics,
5578            // cant rule out anything with information we know
5579            &[true, true, true, true, true, true, true, true, true],
5580        );
5581
5582        // s1 like '%foo%bar%' AND s2 = 'bar'
5583        prune_with_expr(
5584            col("s1")
5585                .like(lit("foo%bar%"))
5586                .and(col("s2").eq(lit("bar"))),
5587            &schema,
5588            &statistics,
5589            // can rule out any container where we know s2 does not have the value 'bar'
5590            &[true, true, true, false, false, false, true, true, true],
5591        );
5592
5593        // s1 like '%foo%bar%' OR s2 = 'bar'
5594        prune_with_expr(
5595            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5596            &schema,
5597            &statistics,
5598            // can't rule out anything (we would have to prove that both the
5599            // like and the equality must be false)
5600            &[true, true, true, true, true, true, true, true, true],
5601        );
5602    }
5603
5604    #[test]
5605    fn prune_with_range_and_contained() {
5606        // Setup mimics range information for i, a bloom filter for s
5607        let schema = Arc::new(Schema::new(vec![
5608            Field::new("i", DataType::Int32, true),
5609            Field::new("s", DataType::Utf8, true),
5610        ]));
5611
5612        let statistics = TestStatistics::new()
5613            .with(
5614                "i",
5615                ContainerStats::new_i32(
5616                    // Container 0, 3, 6: [-5 to 5]
5617                    // Container 1, 4, 7: [10 to 20]
5618                    // Container 2, 5, 9: unknown
5619                    vec![
5620                        Some(-5),
5621                        Some(10),
5622                        None,
5623                        Some(-5),
5624                        Some(10),
5625                        None,
5626                        Some(-5),
5627                        Some(10),
5628                        None,
5629                    ], // min
5630                    vec![
5631                        Some(5),
5632                        Some(20),
5633                        None,
5634                        Some(5),
5635                        Some(20),
5636                        None,
5637                        Some(5),
5638                        Some(20),
5639                        None,
5640                    ], // max
5641                ),
5642            )
5643            // Add contained  information about the s and "foo"
5644            .with_contained(
5645                "s",
5646                [ScalarValue::from("foo")],
5647                [
5648                    // container 0,1,2 known to only contain "foo"
5649                    Some(true),
5650                    Some(true),
5651                    Some(true),
5652                    // container 3,4,5 known to not contain "foo"
5653                    Some(false),
5654                    Some(false),
5655                    Some(false),
5656                    // container 6,7,8 unknown about "foo"
5657                    None,
5658                    None,
5659                    None,
5660                ],
5661            );
5662
5663        // i = 0 and s = 'foo'
5664        prune_with_expr(
5665            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5666            &schema,
5667            &statistics,
5668            // Can rule out container where we know that either:
5669            // 1. 0 is outside the min/max range of i
5670            // 1. s does not contain foo
5671            // (range is false, and contained  is false)
5672            &[true, false, true, false, false, false, true, false, true],
5673        );
5674
5675        // i = 0 and s != 'foo'
5676        prune_with_expr(
5677            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5678            &schema,
5679            &statistics,
5680            // Can rule out containers where either:
5681            // 1. 0 is outside the min/max range of i
5682            // 2. s only contains foo
5683            &[false, false, false, true, false, true, true, false, true],
5684        );
5685
5686        // i = 0 OR s = 'foo'
5687        prune_with_expr(
5688            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5689            &schema,
5690            &statistics,
5691            // in theory could rule out containers if we had min/max values for
5692            // s as well. But in this case we don't so we can't rule out anything
5693            &[true, true, true, true, true, true, true, true, true],
5694        );
5695    }
5696
5697    /// prunes the specified expr with the specified schema and statistics, and
5698    /// ensures it returns expected.
5699    ///
5700    /// `expected` is a vector of bools, where true means the row group should
5701    /// be kept, and false means it should be pruned.
5702    // TODO refactor other tests to use this to reduce boiler plate
5703    fn prune_with_expr(
5704        expr: Expr,
5705        schema: &SchemaRef,
5706        statistics: &TestStatistics,
5707        expected: &[bool],
5708    ) {
5709        println!("Pruning with expr: {expr}");
5710        let expr = logical2physical(&expr, schema);
5711        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5712        let result = p.prune(statistics).unwrap();
5713        assert_eq!(result, expected);
5714    }
5715
5716    fn prune_with_simplified_expr(
5717        expr: Expr,
5718        schema: &SchemaRef,
5719        statistics: &TestStatistics,
5720        expected: &[bool],
5721    ) {
5722        println!("Pruning with expr: {expr}");
5723        let expr = logical2physical(&expr, schema);
5724        let simplifier = PhysicalExprSimplifier::new(schema);
5725        let expr = simplifier.simplify(expr).unwrap();
5726        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5727        let result = p.prune(statistics).unwrap();
5728        assert_eq!(result, expected);
5729    }
5730
5731    fn is_not_distinct_from(left: Expr, right: Expr) -> Expr {
5732        Expr::BinaryExpr(BinaryExpr::new(
5733            Box::new(left),
5734            Operator::IsNotDistinctFrom,
5735            Box::new(right),
5736        ))
5737    }
5738
5739    fn is_distinct_from(left: Expr, right: Expr) -> Expr {
5740        Expr::BinaryExpr(BinaryExpr::new(
5741            Box::new(left),
5742            Operator::IsDistinctFrom,
5743            Box::new(right),
5744        ))
5745    }
5746
5747    fn test_build_predicate_expression(
5748        expr: &Expr,
5749        schema: &Schema,
5750        required_columns: &mut RequiredColumns,
5751    ) -> Arc<dyn PhysicalExpr> {
5752        let expr = logical2physical(expr, schema);
5753        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5754        build_predicate_expression(
5755            &expr,
5756            &Arc::new(schema.clone()),
5757            required_columns,
5758            &unhandled_hook,
5759        )
5760    }
5761
5762    #[test]
5763    fn test_build_predicate_expression_with_false() {
5764        let expr = lit(ScalarValue::Boolean(Some(false)));
5765        let schema = Schema::empty();
5766        let res =
5767            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5768        let expected = logical2physical(&expr, &schema);
5769        assert_eq!(&res, &expected);
5770    }
5771
5772    #[test]
5773    fn test_build_predicate_expression_with_and_false() {
5774        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5775        let expr = and(
5776            col("c1").eq(lit("a")),
5777            lit(ScalarValue::Boolean(Some(false))),
5778        );
5779        let res =
5780            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5781        let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5782        assert_eq!(&res, &expected);
5783    }
5784
5785    #[test]
5786    fn test_build_predicate_expression_with_or_false() {
5787        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5788        let left_expr = col("c1").eq(lit("a"));
5789        let right_expr = lit(ScalarValue::Boolean(Some(false)));
5790        let res = test_build_predicate_expression(
5791            &or(left_expr.clone(), right_expr.clone()),
5792            &schema,
5793            &mut RequiredColumns::new(),
5794        );
5795        let expected =
5796            "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5797        assert_eq!(res.to_string(), expected);
5798    }
5799}