datafusion_pruning/
pruning_predicate.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27    array::{new_null_array, ArrayRef, BooleanArray},
28    datatypes::{DataType, Field, Schema, SchemaRef},
29    record_batch::{RecordBatch, RecordBatchOptions},
30};
31// pub use for backwards compatibility
32pub use datafusion_common::pruning::PruningStatistics;
33use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
34use datafusion_physical_plan::metrics::Count;
35use log::{debug, trace};
36
37use datafusion_common::error::{DataFusionError, Result};
38use datafusion_common::tree_node::TransformedResult;
39use datafusion_common::{
40    internal_err, plan_datafusion_err, plan_err,
41    tree_node::{Transformed, TreeNode},
42    ScalarValue,
43};
44use datafusion_common::{Column, DFSchema};
45use datafusion_expr_common::operator::Operator;
46use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
47use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
48use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
49use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
50
51/// Used to prove that arbitrary predicates (boolean expression) can not
52/// possibly evaluate to `true` given information about a column provided by
53/// [`PruningStatistics`].
54///
55/// # Introduction
56///
57/// `PruningPredicate` analyzes filter expressions using statistics such as
58/// min/max values and null counts, attempting to prove a "container" (e.g.
59/// Parquet Row Group) can be skipped without reading the actual data,
60/// potentially leading to significant performance improvements.
61///
62/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
63/// on the min/max values found in the Parquet metadata. If the
64/// `PruningPredicate` can prove that the filter can never evaluate to `true`
65/// for any row in the Row Group, the entire Row Group is skipped during query
66/// execution.
67///
68/// The `PruningPredicate` API is general, and can be used for pruning other
69/// types of containers (e.g. files) based on statistics that may be known from
70/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
71/// subtle topic.  See the Background and Implementation section for details.
72///
73/// `PruningPredicate` supports:
74///
75/// 1. Arbitrary expressions (including user defined functions)
76///
77/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
78///    so it is suitable for pruning 1000s of containers.
79///
80/// 3. Any source of information that implements the [`PruningStatistics`] trait
81///    (not just Parquet metadata).
82///
83/// # Example
84///
85/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
86/// example of how to use `PruningPredicate` to prune files based on min/max
87/// values.
88///
89/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/pruning.rs
90///
91/// Given an expression like `x = 5` and statistics for 3 containers (Row
92/// Groups, files, etc) `A`, `B`, and `C`:
93///
94/// ```text
95///   A: {x_min = 0, x_max = 4}
96///   B: {x_min = 2, x_max = 10}
97///   C: {x_min = 5, x_max = 8}
98/// ```
99///
100/// `PruningPredicate` will conclude that the rows in container `A` can never
101/// be true (as the maximum value is only `4`), so it can be pruned:
102///
103/// ```text
104/// A: false (no rows could possibly match x = 5)
105/// B: true  (rows might match x = 5)
106/// C: true  (rows might match x = 5)
107/// ```
108///
109/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
110///
111/// # Background
112///
113/// ## Boolean Tri-state logic
114///
115/// To understand the details of the rest of this documentation, it is important
116/// to understand how the tri-state boolean logic in SQL works. As this is
117/// somewhat esoteric, we review it here.
118///
119/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
120/// uncertainty propagates through expressions. SQL `NULL` behaves very
121/// differently than the `NULL` in most other languages where it is a special,
122/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
123/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
124/// first encountered as they behave differently than most programmers may
125/// expect.
126///
127/// In most other programming languages,
128/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
129/// * `a == NULL` evaluates to `false` if `a` has any other value
130///
131/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
132/// `false`):
133///
134/// Expression    | Result
135/// ------------- | ---------
136/// `1 = NULL`    | `NULL`
137/// `NULL = NULL` | `NULL`
138///
139/// Also important is how `AND` and `OR` works with tri-state boolean logic as
140/// (perhaps counterintuitively) the result is **not** always NULL. While
141/// consistent with the notion of `NULL` representing “unknown”, this is again,
142/// often deeply confusing 🤯 when first encountered.
143///
144/// Expression       | Result    | Intuition
145/// ---------------  | --------- | -----------
146/// `NULL AND true`  |   `NULL`  | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
147/// `NULL AND false` |  `false`  | If the `NULL` was either `true` or `false` the overall expression is still `false`
148/// `NULL AND NULL`  | `NULL`    |
149///
150/// Expression      | Result    | Intuition
151/// --------------- | --------- | ----------
152/// `NULL OR true`  | `true`    |  If the `NULL` was either `true` or `false` the overall expression is still `true`
153/// `NULL OR false` | `NULL`    |  The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
154/// `NULL OR NULL`  |  `NULL`   |
155///
156/// ## SQL Filter Semantics
157///
158/// The SQL `WHERE` clause has a boolean expression, often called a filter or
159/// predicate. The semantics of this predicate are that the query evaluates the
160/// predicate for each row in the input tables and:
161///
162/// * Rows that evaluate to `true` are returned in the query results
163///
164/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
165///
166/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
167///   Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
168///   in the rewritten predicate described below.*
169///
170/// # `PruningPredicate` Implementation
171///
172/// Armed with the information in the Background section, we can now understand
173/// how the `PruningPredicate` logic works.
174///
175/// ## Interface
176///
177/// **Inputs**
178/// 1. An input schema describing what columns exist
179///
180/// 2. A predicate (expression that evaluates to a boolean)
181///
182/// 3. [`PruningStatistics`] that provides information about columns in that
183///    schema, for multiple “containers”. For each column in each container, it
184///    provides optional information on contained values, min_values, max_values,
185///    null_counts counts, and row_counts counts.
186///
187/// **Outputs**:
188/// A (non null) boolean value for each container:
189/// * `true`: There MAY be rows that match the predicate
190///
191/// * `false`: There are no rows that could possibly match the predicate (the
192///   predicate can never possibly be true). The container can be pruned (skipped)
193///   entirely.
194///
195/// While `PruningPredicate` will never return a `NULL` value, the
196/// rewritten predicate (as returned by `build_predicate_expression` and used internally
197/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
198/// or null / row counts are not known.
199///
200/// In order to be correct, `PruningPredicate` must return false
201/// **only** if it can determine that for all rows in the container, the
202/// predicate could never evaluate to `true` (always evaluates to either `NULL`
203/// or `false`).
204///
205/// ## Contains Analysis and Min/Max Rewrite
206///
207/// `PruningPredicate` works by first analyzing the predicate to see what
208/// [`LiteralGuarantee`] must hold for the predicate to be true.
209///
210/// Then, the `PruningPredicate` rewrites the original predicate into an
211/// expression that references the min/max values of each column in the original
212/// predicate.
213///
214/// When the min/max values are actually substituted in to this expression and
215/// evaluated, the result means
216///
217/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
218///
219/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
220///   Note that rewritten predicate can evaluate to NULL when some of
221///   the min/max values are not known. *Note that this is different than
222///   the SQL filter semantics where `NULL` means the row is filtered
223///   out.*
224///
225/// * `false`: there are no rows that could possibly match the predicate,
226///   **PRUNES** the container
227///
228/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
229/// `x_row_count` represent the minimum and maximum values, the null count of
230/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
231/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
232/// is known to be all `NULL`s. Note this is different from knowing nothing about
233/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
234/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
235///
236/// Here are some examples of the rewritten predicates:
237///
238/// Original Predicate | Rewritten Predicate
239/// ------------------ | --------------------
240/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
241/// `x < 5` | `x_null_count != x_row_count THEN false (x_max < 5)`
242/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
243/// `x IS NULL`  | `x_null_count > 0`
244/// `x IS NOT NULL`  | `x_null_count != row_count`
245/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
246///
247/// ## Predicate Evaluation
248/// The PruningPredicate works in two passes
249///
250/// **First pass**:  For each `LiteralGuarantee` calls
251/// [`PruningStatistics::contained`] and rules out containers where the
252/// LiteralGuarantees are not satisfied
253///
254/// **Second Pass**: Evaluates the rewritten expression using the
255/// min/max/null_counts/row_counts values for each column for each container. For any
256/// container that this expression evaluates to `false`, it rules out those
257/// containers.
258///
259///
260/// ### Example 1
261///
262/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
263///
264/// ```sql
265/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
266/// AND
267/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
268/// ```
269///
270/// If we know that for a given container, `x` is between `1 and 100` and we know that
271/// `y` is between `4` and `7`, we know nothing about the null count and row count of
272/// `x` and `y`, the input statistics might look like:
273///
274/// Column   | Value
275/// -------- | -----
276/// `x_min`  | `1`
277/// `x_max`  | `100`
278/// `x_null_count` | `null`
279/// `x_row_count`  | `null`
280/// `y_min`  | `4`
281/// `y_max`  | `7`
282/// `y_null_count` | `null`
283/// `y_row_count`  | `null`
284///
285/// When these statistics values are substituted in to the rewritten predicate and
286/// simplified, the result is `false`:
287///
288/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
289/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
290/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
291/// * evaluating the clauses further we get:
292/// * `null and true and null and false`
293/// * `null and false`
294/// * `false`
295///
296/// Returning `false` means the container can be pruned, which matches the
297/// intuition that  `x = 5 AND y = 10` can’t be true for any row if all values of `y`
298/// are `7` or less.
299///
300/// Note that if we had ended up with `null AND true AND null AND true` the result
301/// would have been `null`.
302/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
303///
304/// If, for some other container, we knew `y` was between the values `4` and
305/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
306/// left as an exercise to the reader -- are you still here?), and the container
307/// **could not** be pruned. The intuition is that there may be rows where the
308/// predicate *might* evaluate to `true`, and the only way to find out is to do
309/// more analysis, for example by actually reading the data and evaluating the
310/// predicate row by row.
311///
312/// ### Example 2
313///
314/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
315/// look like the same as example 1:
316///
317/// ```sql
318/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
319/// AND
320/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
321/// ```
322///
323/// If we know that for another given container, `x_min` is NULL and `x_max` is
324/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
325///  is `100`; we know that `y` is between `4` and `7`, but we know nothing about
326/// the null count and row count of `y`. The input statistics might look like:
327///
328/// Column   | Value
329/// -------- | -----
330/// `x_min`  | `null`
331/// `x_max`  | `null`
332/// `x_null_count` | `100`
333/// `x_row_count`  | `100`
334/// `y_min`  | `4`
335/// `y_max`  | `7`
336/// `y_null_count` | `null`
337/// `y_row_count`  | `null`
338///
339/// When these statistics values are substituted in to the rewritten predicate and
340/// simplified, the result is `false`:
341///
342/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
343/// * `false AND null AND null AND false`
344/// * `false AND false`
345/// * `false`
346///
347/// Returning `false` means the container can be pruned, which matches the
348/// intuition that  `x = 5 AND y = 10` can’t be true because all values in `x`
349/// are known to be NULL.
350///
351/// # Related Work
352///
353/// [`PruningPredicate`] implements the type of min/max pruning described in
354/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
355/// described by various research such as [small materialized aggregates], [zone
356/// maps], and [data skipping].
357///
358/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
359/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
360/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
361/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
362#[derive(Debug, Clone)]
363pub struct PruningPredicate {
364    /// The input schema against which the predicate will be evaluated
365    schema: SchemaRef,
366    /// A min/max pruning predicate (rewritten in terms of column min/max
367    /// values, which are supplied by statistics)
368    predicate_expr: Arc<dyn PhysicalExpr>,
369    /// Description of which statistics are required to evaluate `predicate_expr`
370    required_columns: RequiredColumns,
371    /// Original physical predicate from which this predicate expr is derived
372    /// (required for serialization)
373    orig_expr: Arc<dyn PhysicalExpr>,
374    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
375    /// evaluate to `true`.
376    ///
377    /// See [`PruningPredicate::literal_guarantees`] for more details.
378    literal_guarantees: Vec<LiteralGuarantee>,
379}
380
381/// Build a pruning predicate from an optional predicate expression.
382/// If the predicate is None or the predicate cannot be converted to a pruning
383/// predicate, return None.
384/// If there is an error creating the pruning predicate it is recorded by incrementing
385/// the `predicate_creation_errors` counter.
386pub fn build_pruning_predicate(
387    predicate: Arc<dyn PhysicalExpr>,
388    file_schema: &SchemaRef,
389    predicate_creation_errors: &Count,
390) -> Option<Arc<PruningPredicate>> {
391    match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) {
392        Ok(pruning_predicate) => {
393            if !pruning_predicate.always_true() {
394                return Some(Arc::new(pruning_predicate));
395            }
396        }
397        Err(e) => {
398            debug!("Could not create pruning predicate for: {e}");
399            predicate_creation_errors.add(1);
400        }
401    }
402    None
403}
404
405/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
406/// complex expressions or predicates that reference columns that are not in the
407/// schema.
408pub trait UnhandledPredicateHook {
409    /// Called when a predicate can not be rewritten in terms of statistics or
410    /// references a column that is not in the schema.
411    fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
412}
413
414/// The default handling for unhandled predicates is to return a constant `true`
415/// (meaning don't prune the container)
416#[derive(Debug, Clone)]
417struct ConstantUnhandledPredicateHook {
418    default: Arc<dyn PhysicalExpr>,
419}
420
421impl Default for ConstantUnhandledPredicateHook {
422    fn default() -> Self {
423        Self {
424            default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
425        }
426    }
427}
428
429impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
430    fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
431        Arc::clone(&self.default)
432    }
433}
434
435impl PruningPredicate {
436    /// Try to create a new instance of [`PruningPredicate`]
437    ///
438    /// This will translate the provided `expr` filter expression into
439    /// a *pruning predicate*.
440    ///
441    /// A pruning predicate is one that has been rewritten in terms of
442    /// the min and max values of column references and that evaluates
443    /// to FALSE if the filter predicate would evaluate FALSE *for
444    /// every row* whose values fell within the min / max ranges (aka
445    /// could be pruned).
446    ///
447    /// The pruning predicate evaluates to TRUE or NULL
448    /// if the filter predicate *might* evaluate to TRUE for at least
449    /// one row whose values fell within the min/max ranges (in other
450    /// words they might pass the predicate)
451    ///
452    /// For example, the filter expression `(column / 2) = 4` becomes
453    /// the pruning predicate
454    /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
455    ///
456    /// See the struct level documentation on [`PruningPredicate`] for more
457    /// details.
458    pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
459        // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`
460        // which does not handle dynamic exprs in general
461        let expr = snapshot_physical_expr(expr)?;
462        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
463
464        // build predicate expression once
465        let mut required_columns = RequiredColumns::new();
466        let predicate_expr = build_predicate_expression(
467            &expr,
468            &schema,
469            &mut required_columns,
470            &unhandled_hook,
471        );
472        let predicate_schema = required_columns.schema();
473        // Simplify the newly created predicate to get rid of redundant casts, comparisons, etc.
474        let predicate_expr =
475            PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?;
476
477        let literal_guarantees = LiteralGuarantee::analyze(&expr);
478
479        Ok(Self {
480            schema,
481            predicate_expr,
482            required_columns,
483            orig_expr: expr,
484            literal_guarantees,
485        })
486    }
487
488    /// For each set of statistics, evaluates the pruning predicate
489    /// and returns a `bool` with the following meaning for a
490    /// all rows whose values match the statistics:
491    ///
492    /// `true`: There MAY be rows that match the predicate
493    ///
494    /// `false`: There are no rows that could possibly match the predicate
495    ///
496    /// Note: the predicate passed to `prune` should already be simplified as
497    /// much as possible (e.g. this pass doesn't handle some
498    /// expressions like `b = false`, but it does handle the
499    /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
500    ///
501    /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
502    pub fn prune<S: PruningStatistics + ?Sized>(
503        &self,
504        statistics: &S,
505    ) -> Result<Vec<bool>> {
506        let mut builder = BoolVecBuilder::new(statistics.num_containers());
507
508        // Try to prove the predicate can't be true for the containers based on
509        // literal guarantees
510        for literal_guarantee in &self.literal_guarantees {
511            let LiteralGuarantee {
512                column,
513                guarantee,
514                literals,
515            } = literal_guarantee;
516            if let Some(results) = statistics.contained(column, literals) {
517                match guarantee {
518                    // `In` means the values in the column must be one of the
519                    // values in the set for the predicate to evaluate to true.
520                    // If `contained` returns false, that means the column is
521                    // not any of the values so we can prune the container
522                    Guarantee::In => builder.combine_array(&results),
523                    // `NotIn` means the values in the column must not be
524                    // any of the values in the set for the predicate to
525                    // evaluate to true. If `contained` returns true, it means the
526                    // column is only in the set of values so we can prune the
527                    // container
528                    Guarantee::NotIn => {
529                        builder.combine_array(&arrow::compute::not(&results)?)
530                    }
531                }
532                // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
533                // can return early without evaluating the rest of predicates.
534                if builder.check_all_pruned() {
535                    return Ok(builder.build());
536                }
537            }
538        }
539
540        // Next, try to prove the predicate can't be true for the containers based
541        // on min/max values
542
543        // build a RecordBatch that contains the min/max values in the
544        // appropriate statistics columns for the min/max predicate
545        let statistics_batch =
546            build_statistics_record_batch(statistics, &self.required_columns)?;
547
548        // Evaluate the pruning predicate on that record batch and append any results to the builder
549        builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
550
551        Ok(builder.build())
552    }
553
554    /// Return a reference to the input schema
555    pub fn schema(&self) -> &SchemaRef {
556        &self.schema
557    }
558
559    /// Returns a reference to the physical expr used to construct this pruning predicate
560    pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
561        &self.orig_expr
562    }
563
564    /// Returns a reference to the predicate expr
565    pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
566        &self.predicate_expr
567    }
568
569    /// Returns a reference to the literal guarantees
570    ///
571    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
572    /// expression to possibly be `true`. If any is not satisfied, the
573    /// expression is guaranteed to be `null` or `false`.
574    pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
575        &self.literal_guarantees
576    }
577
578    /// Returns true if this pruning predicate can not prune anything.
579    ///
580    /// This happens if the predicate is a literal `true`  and
581    /// literal_guarantees is empty.
582    ///
583    /// This can happen when a predicate is simplified to a constant `true`
584    pub fn always_true(&self) -> bool {
585        is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
586    }
587
588    // this is only used by `parquet` feature right now
589    #[allow(dead_code)]
590    pub fn required_columns(&self) -> &RequiredColumns {
591        &self.required_columns
592    }
593
594    /// Names of the columns that are known to be / not be in a set
595    /// of literals (constants). These are the columns the that may be passed to
596    /// [`PruningStatistics::contained`] during pruning.
597    ///
598    /// This is useful to avoid fetching statistics for columns that will not be
599    /// used in the predicate. For example, it can be used to avoid reading
600    /// unneeded bloom filters (a non trivial operation).
601    pub fn literal_columns(&self) -> Vec<String> {
602        let mut seen = HashSet::new();
603        self.literal_guarantees
604            .iter()
605            .map(|e| &e.column.name)
606            // avoid duplicates
607            .filter(|name| seen.insert(*name))
608            .map(|s| s.to_string())
609            .collect()
610    }
611}
612
613/// Builds the return `Vec` for [`PruningPredicate::prune`].
614#[derive(Debug)]
615struct BoolVecBuilder {
616    /// One element per container. Each element is
617    /// * `true`: if the container has row that may pass the predicate
618    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
619    inner: Vec<bool>,
620}
621
622impl BoolVecBuilder {
623    /// Create a new `BoolVecBuilder` with `num_containers` elements
624    fn new(num_containers: usize) -> Self {
625        Self {
626            // assume by default all containers may pass the predicate
627            inner: vec![true; num_containers],
628        }
629    }
630
631    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
632    /// predicate into the currently in progress array.
633    ///
634    /// Each `array` element is:
635    /// * `true`: container has row that may pass the predicate
636    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
637    /// * `null`: container may or may not have rows that pass the predicate
638    fn combine_array(&mut self, array: &BooleanArray) {
639        assert_eq!(array.len(), self.inner.len());
640        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
641            // `false` for this conjunct means we know for sure no rows could
642            // pass the predicate and thus we set the corresponding container
643            // location to false.
644            if let Some(false) = new {
645                *cur = false;
646            }
647        }
648    }
649
650    /// Combines the results in the [`ColumnarValue`] to the currently in
651    /// progress array, following the same rules as [`Self::combine_array`].
652    ///
653    /// # Panics
654    /// If `value` is not boolean
655    fn combine_value(&mut self, value: ColumnarValue) {
656        match value {
657            ColumnarValue::Array(array) => {
658                self.combine_array(array.as_boolean());
659            }
660            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
661                // False means all containers can not pass the predicate
662                self.inner = vec![false; self.inner.len()];
663            }
664            _ => {
665                // Null or true means the rows in container may pass this
666                // conjunct so we can't prune any containers based on that
667            }
668        }
669    }
670
671    /// Convert this builder into a Vec of bools
672    fn build(self) -> Vec<bool> {
673        self.inner
674    }
675
676    /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
677    fn check_all_pruned(&self) -> bool {
678        self.inner.iter().all(|&x| !x)
679    }
680}
681
682fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
683    expr.as_any()
684        .downcast_ref::<phys_expr::Literal>()
685        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
686        .unwrap_or_default()
687}
688
689fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
690    expr.as_any()
691        .downcast_ref::<phys_expr::Literal>()
692        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
693        .unwrap_or_default()
694}
695
696/// Describes which columns statistics are necessary to evaluate a
697/// [`PruningPredicate`].
698///
699/// This structure permits reading and creating the minimum number statistics,
700/// which is important since statistics may be non trivial to read (e.g. large
701/// strings or when there are 1000s of columns).
702///
703/// Handles creating references to the min/max statistics
704/// for columns as well as recording which statistics are needed
705#[derive(Debug, Default, Clone)]
706pub struct RequiredColumns {
707    /// The statistics required to evaluate this predicate:
708    /// * The unqualified column in the input schema
709    /// * Statistics type (e.g. Min or Max or Null_Count)
710    /// * The field the statistics value should be placed in for
711    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
712    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
713}
714
715impl RequiredColumns {
716    fn new() -> Self {
717        Self::default()
718    }
719
720    /// Returns Some(column) if this is a single column predicate.
721    ///
722    /// Returns None if this is a multi-column predicate.
723    ///
724    /// Examples:
725    /// * `a > 5 OR a < 10` returns `Some(a)`
726    /// * `a > 5 OR b < 10` returns `None`
727    /// * `true` returns None
728    #[allow(dead_code)]
729    // this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
730    pub fn single_column(&self) -> Option<&phys_expr::Column> {
731        if self.columns.windows(2).all(|w| {
732            // check if all columns are the same (ignoring statistics and field)
733            let c1 = &w[0].0;
734            let c2 = &w[1].0;
735            c1 == c2
736        }) {
737            self.columns.first().map(|r| &r.0)
738        } else {
739            None
740        }
741    }
742
743    /// Returns a schema that describes the columns required to evaluate this
744    /// pruning predicate.
745    /// The schema contains the fields for each column in `self.columns` with
746    /// the appropriate data type for the statistics.
747    /// Order matters, this same order is used to evaluate the
748    /// pruning predicate.
749    fn schema(&self) -> Schema {
750        let fields = self
751            .columns
752            .iter()
753            .map(|(_c, _t, f)| f.clone())
754            .collect::<Vec<_>>();
755        Schema::new(fields)
756    }
757
758    /// Returns an iterator over items in columns (see doc on
759    /// `self.columns` for details)
760    pub(crate) fn iter(
761        &self,
762    ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
763        self.columns.iter()
764    }
765
766    fn find_stat_column(
767        &self,
768        column: &phys_expr::Column,
769        statistics_type: StatisticsType,
770    ) -> Option<usize> {
771        match statistics_type {
772            StatisticsType::RowCount => {
773                // Use the first row count we find, if any
774                self.columns
775                    .iter()
776                    .enumerate()
777                    .find(|(_i, (_c, t, _f))| t == &statistics_type)
778                    .map(|(i, (_c, _t, _f))| i)
779            }
780            _ => self
781                .columns
782                .iter()
783                .enumerate()
784                .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
785                .map(|(i, (_c, _t, _f))| i),
786        }
787    }
788
789    /// Rewrites column_expr so that all appearances of column
790    /// are replaced with a reference to either the min or max
791    /// statistics column, while keeping track that a reference to the statistics
792    /// column is required
793    ///
794    /// for example, an expression like `col("foo") > 5`, when called
795    /// with Max would result in an expression like `col("foo_max") >
796    /// 5` with the appropriate entry noted in self.columns
797    fn stat_column_expr(
798        &mut self,
799        column: &phys_expr::Column,
800        column_expr: &Arc<dyn PhysicalExpr>,
801        field: &Field,
802        stat_type: StatisticsType,
803    ) -> Result<Arc<dyn PhysicalExpr>> {
804        let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
805            Some(idx) => (idx, false),
806            None => (self.columns.len(), true),
807        };
808
809        let column_name = column.name();
810        let stat_column_name = match stat_type {
811            StatisticsType::Min => format!("{column_name}_min"),
812            StatisticsType::Max => format!("{column_name}_max"),
813            StatisticsType::NullCount => format!("{column_name}_null_count"),
814            StatisticsType::RowCount => "row_count".to_string(),
815        };
816
817        let stat_column = phys_expr::Column::new(&stat_column_name, idx);
818
819        // only add statistics column if not previously added
820        if need_to_insert {
821            // may be null if statistics are not present
822            let nullable = true;
823            let stat_field =
824                Field::new(stat_column.name(), field.data_type().clone(), nullable);
825            self.columns.push((column.clone(), stat_type, stat_field));
826        }
827        rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
828    }
829
830    /// rewrite col --> col_min
831    fn min_column_expr(
832        &mut self,
833        column: &phys_expr::Column,
834        column_expr: &Arc<dyn PhysicalExpr>,
835        field: &Field,
836    ) -> Result<Arc<dyn PhysicalExpr>> {
837        self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
838    }
839
840    /// rewrite col --> col_max
841    fn max_column_expr(
842        &mut self,
843        column: &phys_expr::Column,
844        column_expr: &Arc<dyn PhysicalExpr>,
845        field: &Field,
846    ) -> Result<Arc<dyn PhysicalExpr>> {
847        self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
848    }
849
850    /// rewrite col --> col_null_count
851    fn null_count_column_expr(
852        &mut self,
853        column: &phys_expr::Column,
854        column_expr: &Arc<dyn PhysicalExpr>,
855        field: &Field,
856    ) -> Result<Arc<dyn PhysicalExpr>> {
857        self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
858    }
859
860    /// rewrite col --> col_row_count
861    fn row_count_column_expr(
862        &mut self,
863        column: &phys_expr::Column,
864        column_expr: &Arc<dyn PhysicalExpr>,
865        field: &Field,
866    ) -> Result<Arc<dyn PhysicalExpr>> {
867        self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
868    }
869}
870
871impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
872    fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
873        Self { columns }
874    }
875}
876
877/// Build a RecordBatch from a list of statistics, creating arrays,
878/// with one row for each PruningStatistics and columns specified in
879/// the required_columns parameter.
880///
881/// For example, if the requested columns are
882/// ```text
883/// ("s1", Min, Field:s1_min)
884/// ("s2", Max, field:s2_max)
885///```
886///
887/// And the input statistics had
888/// ```text
889/// S1(Min: 5, Max: 10)
890/// S2(Min: 99, Max: 1000)
891/// S3(Min: 1, Max: 2)
892/// ```
893///
894/// Then this function would build a record batch with 2 columns and
895/// one row s1_min and s2_max as follows (s3 is not requested):
896///
897/// ```text
898/// s1_min | s2_max
899/// -------+--------
900///   5    | 1000
901/// ```
902fn build_statistics_record_batch<S: PruningStatistics + ?Sized>(
903    statistics: &S,
904    required_columns: &RequiredColumns,
905) -> Result<RecordBatch> {
906    let mut arrays = Vec::<ArrayRef>::new();
907    // For each needed statistics column:
908    for (column, statistics_type, stat_field) in required_columns.iter() {
909        let column = Column::from_name(column.name());
910        let data_type = stat_field.data_type();
911
912        let num_containers = statistics.num_containers();
913
914        let array = match statistics_type {
915            StatisticsType::Min => statistics.min_values(&column),
916            StatisticsType::Max => statistics.max_values(&column),
917            StatisticsType::NullCount => statistics.null_counts(&column),
918            StatisticsType::RowCount => statistics.row_counts(&column),
919        };
920        let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
921
922        if num_containers != array.len() {
923            return internal_err!(
924                "mismatched statistics length. Expected {}, got {}",
925                num_containers,
926                array.len()
927            );
928        }
929
930        // cast statistics array to required data type (e.g. parquet
931        // provides timestamp statistics as "Int64")
932        let array = arrow::compute::cast(&array, data_type)?;
933
934        arrays.push(array);
935    }
936
937    let schema = Arc::new(required_columns.schema());
938    // provide the count in case there were no needed statistics
939    let mut options = RecordBatchOptions::default();
940    options.row_count = Some(statistics.num_containers());
941
942    trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
943
944    RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
945        plan_datafusion_err!("Can not create statistics record batch: {err}")
946    })
947}
948
949struct PruningExpressionBuilder<'a> {
950    column: phys_expr::Column,
951    column_expr: Arc<dyn PhysicalExpr>,
952    op: Operator,
953    scalar_expr: Arc<dyn PhysicalExpr>,
954    field: &'a Field,
955    required_columns: &'a mut RequiredColumns,
956}
957
958impl<'a> PruningExpressionBuilder<'a> {
959    fn try_new(
960        left: &'a Arc<dyn PhysicalExpr>,
961        right: &'a Arc<dyn PhysicalExpr>,
962        op: Operator,
963        schema: &'a SchemaRef,
964        required_columns: &'a mut RequiredColumns,
965    ) -> Result<Self> {
966        // find column name; input could be a more complicated expression
967        let left_columns = collect_columns(left);
968        let right_columns = collect_columns(right);
969        let (column_expr, scalar_expr, columns, correct_operator) =
970            match (left_columns.len(), right_columns.len()) {
971                (1, 0) => (left, right, left_columns, op),
972                (0, 1) => (right, left, right_columns, reverse_operator(op)?),
973                _ => {
974                    // if more than one column used in expression - not supported
975                    return plan_err!(
976                        "Multi-column expressions are not currently supported"
977                    );
978                }
979            };
980
981        let df_schema = DFSchema::try_from(Arc::clone(schema))?;
982        let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
983            column_expr,
984            correct_operator,
985            scalar_expr,
986            df_schema,
987        )?;
988        let column = columns.iter().next().unwrap().clone();
989        let field = match schema.column_with_name(column.name()) {
990            Some((_, f)) => f,
991            _ => {
992                return plan_err!("Field not found in schema");
993            }
994        };
995
996        Ok(Self {
997            column,
998            column_expr,
999            op: correct_operator,
1000            scalar_expr,
1001            field,
1002            required_columns,
1003        })
1004    }
1005
1006    fn op(&self) -> Operator {
1007        self.op
1008    }
1009
1010    fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1011        &self.scalar_expr
1012    }
1013
1014    fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1015        self.required_columns
1016            .min_column_expr(&self.column, &self.column_expr, self.field)
1017    }
1018
1019    fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1020        self.required_columns
1021            .max_column_expr(&self.column, &self.column_expr, self.field)
1022    }
1023
1024    /// This function is to simply retune the `null_count` physical expression no matter what the
1025    /// predicate expression is
1026    ///
1027    /// i.e., x > 5 => x_null_count,
1028    ///       cast(x as int) < 10 => x_null_count,
1029    ///       try_cast(x as float) < 10.0 => x_null_count
1030    fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1031        // Retune to [`phys_expr::Column`]
1032        let column_expr = Arc::new(self.column.clone()) as _;
1033
1034        // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1035        let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1036
1037        self.required_columns.null_count_column_expr(
1038            &self.column,
1039            &column_expr,
1040            null_count_field,
1041        )
1042    }
1043
1044    /// This function is to simply retune the `row_count` physical expression no matter what the
1045    /// predicate expression is
1046    ///
1047    /// i.e., x > 5 => x_row_count,
1048    ///       cast(x as int) < 10 => x_row_count,
1049    ///       try_cast(x as float) < 10.0 => x_row_count
1050    fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1051        // Retune to [`phys_expr::Column`]
1052        let column_expr = Arc::new(self.column.clone()) as _;
1053
1054        // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1055        let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1056
1057        self.required_columns.row_count_column_expr(
1058            &self.column,
1059            &column_expr,
1060            row_count_field,
1061        )
1062    }
1063}
1064
1065/// This function is designed to rewrite the column_expr to
1066/// ensure the column_expr is monotonically increasing.
1067///
1068/// For example,
1069/// 1. `col > 10`
1070/// 2. `-col > 10` should be rewritten to `col < -10`
1071/// 3. `!col = true` would be rewritten to `col = !true`
1072/// 4. `abs(a - 10) > 0` not supported
1073/// 5. `cast(can_prunable_expr) > 10`
1074/// 6. `try_cast(can_prunable_expr) > 10`
1075///
1076/// More rewrite rules are still in progress.
1077fn rewrite_expr_to_prunable(
1078    column_expr: &PhysicalExprRef,
1079    op: Operator,
1080    scalar_expr: &PhysicalExprRef,
1081    schema: DFSchema,
1082) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1083    if !is_compare_op(op) {
1084        return plan_err!("rewrite_expr_to_prunable only support compare expression");
1085    }
1086
1087    let column_expr_any = column_expr.as_any();
1088
1089    if column_expr_any
1090        .downcast_ref::<phys_expr::Column>()
1091        .is_some()
1092    {
1093        // `col op lit()`
1094        Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1095    } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1096        // `cast(col) op lit()`
1097        let arrow_schema: SchemaRef = schema.clone().into();
1098        let from_type = cast.expr().data_type(&arrow_schema)?;
1099        verify_support_type_for_prune(&from_type, cast.cast_type())?;
1100        let (left, op, right) =
1101            rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1102        let left = Arc::new(phys_expr::CastExpr::new(
1103            left,
1104            cast.cast_type().clone(),
1105            None,
1106        ));
1107        Ok((left, op, right))
1108    } else if let Some(try_cast) =
1109        column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1110    {
1111        // `try_cast(col) op lit()`
1112        let arrow_schema: SchemaRef = schema.clone().into();
1113        let from_type = try_cast.expr().data_type(&arrow_schema)?;
1114        verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1115        let (left, op, right) =
1116            rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1117        let left = Arc::new(phys_expr::TryCastExpr::new(
1118            left,
1119            try_cast.cast_type().clone(),
1120        ));
1121        Ok((left, op, right))
1122    } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1123        // `-col > lit()`  --> `col < -lit()`
1124        let (left, op, right) =
1125            rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1126        let right = Arc::new(phys_expr::NegativeExpr::new(right));
1127        Ok((left, reverse_operator(op)?, right))
1128    } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1129        // `!col = true` --> `col = !true`
1130        if op != Operator::Eq && op != Operator::NotEq {
1131            return plan_err!("Not with operator other than Eq / NotEq is not supported");
1132        }
1133        if not
1134            .arg()
1135            .as_any()
1136            .downcast_ref::<phys_expr::Column>()
1137            .is_some()
1138        {
1139            let left = Arc::clone(not.arg());
1140            let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1141            Ok((left, reverse_operator(op)?, right))
1142        } else {
1143            plan_err!("Not with complex expression {column_expr:?} is not supported")
1144        }
1145    } else {
1146        plan_err!("column expression {column_expr:?} is not supported")
1147    }
1148}
1149
1150fn is_compare_op(op: Operator) -> bool {
1151    matches!(
1152        op,
1153        Operator::Eq
1154            | Operator::NotEq
1155            | Operator::Lt
1156            | Operator::LtEq
1157            | Operator::Gt
1158            | Operator::GtEq
1159            | Operator::LikeMatch
1160            | Operator::NotLikeMatch
1161    )
1162}
1163
1164fn is_string_type(data_type: &DataType) -> bool {
1165    matches!(
1166        data_type,
1167        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1168    )
1169}
1170
1171// The pruning logic is based on the comparing the min/max bounds.
1172// Must make sure the two type has order.
1173// For example, casts from string to numbers is not correct.
1174// Because the "13" is less than "3" with UTF8 comparison order.
1175fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1176    // Dictionary casts are always supported as long as the value types are supported
1177    let from_type = match from_type {
1178        DataType::Dictionary(_, t) => {
1179            return verify_support_type_for_prune(t.as_ref(), to_type)
1180        }
1181        _ => from_type,
1182    };
1183    let to_type = match to_type {
1184        DataType::Dictionary(_, t) => {
1185            return verify_support_type_for_prune(from_type, t.as_ref())
1186        }
1187        _ => to_type,
1188    };
1189    // If both types are strings or both are not strings (number, timestamp, etc)
1190    // then we can compare them.
1191    // PruningPredicate does not support casting of strings to numbers and such.
1192    if is_string_type(from_type) == is_string_type(to_type) {
1193        Ok(())
1194    } else {
1195        plan_err!(
1196            "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1197        )
1198    }
1199}
1200
1201/// replaces a column with an old name with a new name in an expression
1202fn rewrite_column_expr(
1203    e: Arc<dyn PhysicalExpr>,
1204    column_old: &phys_expr::Column,
1205    column_new: &phys_expr::Column,
1206) -> Result<Arc<dyn PhysicalExpr>> {
1207    e.transform(|expr| {
1208        if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1209            if column == column_old {
1210                return Ok(Transformed::yes(Arc::new(column_new.clone())));
1211            }
1212        }
1213
1214        Ok(Transformed::no(expr))
1215    })
1216    .data()
1217}
1218
1219fn reverse_operator(op: Operator) -> Result<Operator> {
1220    op.swap().ok_or_else(|| {
1221        DataFusionError::Internal(format!(
1222            "Could not reverse operator {op} while building pruning predicate"
1223        ))
1224    })
1225}
1226
1227/// Given a column reference to `column`, returns a pruning
1228/// expression in terms of the min and max that will evaluate to true
1229/// if the column may contain values, and false if definitely does not
1230/// contain values
1231fn build_single_column_expr(
1232    column: &phys_expr::Column,
1233    schema: &Schema,
1234    required_columns: &mut RequiredColumns,
1235    is_not: bool, // if true, treat as !col
1236) -> Option<Arc<dyn PhysicalExpr>> {
1237    let field = schema.field_with_name(column.name()).ok()?;
1238
1239    if matches!(field.data_type(), &DataType::Boolean) {
1240        let col_ref = Arc::new(column.clone()) as _;
1241
1242        let min = required_columns
1243            .min_column_expr(column, &col_ref, field)
1244            .ok()?;
1245        let max = required_columns
1246            .max_column_expr(column, &col_ref, field)
1247            .ok()?;
1248
1249        // remember -- we want an expression that is:
1250        // TRUE: if there may be rows that match
1251        // FALSE: if there are no rows that match
1252        if is_not {
1253            // The only way we know a column couldn't match is if both the min and max are true
1254            // !(min && max)
1255            Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1256                phys_expr::BinaryExpr::new(min, Operator::And, max),
1257            ))))
1258        } else {
1259            // the only way we know a column couldn't match is if both the min and max are false
1260            // !(!min && !max) --> min || max
1261            Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1262        }
1263    } else {
1264        None
1265    }
1266}
1267
1268/// Given an expression reference to `expr`, if `expr` is a column expression,
1269/// returns a pruning expression in terms of IsNull that will evaluate to true
1270/// if the column may contain null, and false if definitely does not
1271/// contain null.
1272/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1273/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1274/// at least one NULL value.  In this case we can know that `IS NOT NULL` can not be true and
1275/// thus can prune the row group / value
1276fn build_is_null_column_expr(
1277    expr: &Arc<dyn PhysicalExpr>,
1278    schema: &Schema,
1279    required_columns: &mut RequiredColumns,
1280    with_not: bool,
1281) -> Option<Arc<dyn PhysicalExpr>> {
1282    if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1283        let field = schema.field_with_name(col.name()).ok()?;
1284
1285        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1286        if with_not {
1287            if let Ok(row_count_expr) =
1288                required_columns.row_count_column_expr(col, expr, null_count_field)
1289            {
1290                required_columns
1291                    .null_count_column_expr(col, expr, null_count_field)
1292                    .map(|null_count_column_expr| {
1293                        // IsNotNull(column) => null_count != row_count
1294                        Arc::new(phys_expr::BinaryExpr::new(
1295                            null_count_column_expr,
1296                            Operator::NotEq,
1297                            row_count_expr,
1298                        )) as _
1299                    })
1300                    .ok()
1301            } else {
1302                None
1303            }
1304        } else {
1305            required_columns
1306                .null_count_column_expr(col, expr, null_count_field)
1307                .map(|null_count_column_expr| {
1308                    // IsNull(column) => null_count > 0
1309                    Arc::new(phys_expr::BinaryExpr::new(
1310                        null_count_column_expr,
1311                        Operator::Gt,
1312                        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1313                    )) as _
1314                })
1315                .ok()
1316        }
1317    } else {
1318        None
1319    }
1320}
1321
1322/// The maximum number of entries in an `InList` that might be rewritten into
1323/// an OR chain
1324const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1325
1326/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1327/// for use as a [`PruningPredicate`].
1328pub struct PredicateRewriter {
1329    unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1330}
1331
1332impl Default for PredicateRewriter {
1333    fn default() -> Self {
1334        Self {
1335            unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1336        }
1337    }
1338}
1339
1340impl PredicateRewriter {
1341    /// Create a new `PredicateRewriter`
1342    pub fn new() -> Self {
1343        Self::default()
1344    }
1345
1346    /// Set the unhandled hook to be used when a predicate can not be rewritten
1347    pub fn with_unhandled_hook(
1348        self,
1349        unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1350    ) -> Self {
1351        Self { unhandled_hook }
1352    }
1353
1354    /// Translate logical filter expression into pruning predicate
1355    /// expression that will evaluate to FALSE if it can be determined no
1356    /// rows between the min/max values could pass the predicates.
1357    ///
1358    /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1359    ///
1360    /// Returns the pruning predicate as an [`PhysicalExpr`]
1361    ///
1362    /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1363    pub fn rewrite_predicate_to_statistics_predicate(
1364        &self,
1365        expr: &Arc<dyn PhysicalExpr>,
1366        schema: &Schema,
1367    ) -> Arc<dyn PhysicalExpr> {
1368        let mut required_columns = RequiredColumns::new();
1369        build_predicate_expression(
1370            expr,
1371            &Arc::new(schema.clone()),
1372            &mut required_columns,
1373            &self.unhandled_hook,
1374        )
1375    }
1376}
1377
1378/// Translate logical filter expression into pruning predicate
1379/// expression that will evaluate to FALSE if it can be determined no
1380/// rows between the min/max values could pass the predicates.
1381///
1382/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1383///
1384/// Returns the pruning predicate as an [`PhysicalExpr`]
1385///
1386/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1387fn build_predicate_expression(
1388    expr: &Arc<dyn PhysicalExpr>,
1389    schema: &SchemaRef,
1390    required_columns: &mut RequiredColumns,
1391    unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1392) -> Arc<dyn PhysicalExpr> {
1393    if is_always_false(expr) {
1394        // Shouldn't return `unhandled_hook.handle(expr)`
1395        // Because it will transfer false to true.
1396        return Arc::clone(expr);
1397    }
1398    // predicate expression can only be a binary expression
1399    let expr_any = expr.as_any();
1400    if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1401        return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1402            .unwrap_or_else(|| unhandled_hook.handle(expr));
1403    }
1404    if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1405        return build_is_null_column_expr(
1406            is_not_null.arg(),
1407            schema,
1408            required_columns,
1409            true,
1410        )
1411        .unwrap_or_else(|| unhandled_hook.handle(expr));
1412    }
1413    if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1414        return build_single_column_expr(col, schema, required_columns, false)
1415            .unwrap_or_else(|| unhandled_hook.handle(expr));
1416    }
1417    if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1418        // match !col (don't do so recursively)
1419        if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1420            return build_single_column_expr(col, schema, required_columns, true)
1421                .unwrap_or_else(|| unhandled_hook.handle(expr));
1422        } else {
1423            return unhandled_hook.handle(expr);
1424        }
1425    }
1426    if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1427        if !in_list.list().is_empty()
1428            && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1429        {
1430            let eq_op = if in_list.negated() {
1431                Operator::NotEq
1432            } else {
1433                Operator::Eq
1434            };
1435            let re_op = if in_list.negated() {
1436                Operator::And
1437            } else {
1438                Operator::Or
1439            };
1440            let change_expr = in_list
1441                .list()
1442                .iter()
1443                .map(|e| {
1444                    Arc::new(phys_expr::BinaryExpr::new(
1445                        Arc::clone(in_list.expr()),
1446                        eq_op,
1447                        Arc::clone(e),
1448                    )) as _
1449                })
1450                .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1451                .unwrap();
1452            return build_predicate_expression(
1453                &change_expr,
1454                schema,
1455                required_columns,
1456                unhandled_hook,
1457            );
1458        } else {
1459            return unhandled_hook.handle(expr);
1460        }
1461    }
1462
1463    let (left, op, right) = {
1464        if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1465            (
1466                Arc::clone(bin_expr.left()),
1467                *bin_expr.op(),
1468                Arc::clone(bin_expr.right()),
1469            )
1470        } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1471            if like_expr.case_insensitive() {
1472                return unhandled_hook.handle(expr);
1473            }
1474            let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1475                (false, false) => Operator::LikeMatch,
1476                (true, false) => Operator::NotLikeMatch,
1477                (false, true) => Operator::ILikeMatch,
1478                (true, true) => Operator::NotILikeMatch,
1479            };
1480            (
1481                Arc::clone(like_expr.expr()),
1482                op,
1483                Arc::clone(like_expr.pattern()),
1484            )
1485        } else {
1486            return unhandled_hook.handle(expr);
1487        }
1488    };
1489
1490    if op == Operator::And || op == Operator::Or {
1491        let left_expr =
1492            build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1493        let right_expr =
1494            build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1495        // simplify boolean expression if applicable
1496        let expr = match (&left_expr, op, &right_expr) {
1497            (left, Operator::And, right)
1498                if is_always_false(left) || is_always_false(right) =>
1499            {
1500                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1501            }
1502            (left, Operator::And, _) if is_always_true(left) => right_expr,
1503            (_, Operator::And, right) if is_always_true(right) => left_expr,
1504            (left, Operator::Or, right)
1505                if is_always_true(left) || is_always_true(right) =>
1506            {
1507                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1508            }
1509            (left, Operator::Or, _) if is_always_false(left) => right_expr,
1510            (_, Operator::Or, right) if is_always_false(right) => left_expr,
1511
1512            _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1513        };
1514        return expr;
1515    }
1516
1517    let expr_builder =
1518        PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1519    let mut expr_builder = match expr_builder {
1520        Ok(builder) => builder,
1521        // allow partial failure in predicate expression generation
1522        // this can still produce a useful predicate when multiple conditions are joined using AND
1523        Err(e) => {
1524            debug!("Error building pruning expression: {e}");
1525            return unhandled_hook.handle(expr);
1526        }
1527    };
1528
1529    build_statistics_expr(&mut expr_builder)
1530        .unwrap_or_else(|_| unhandled_hook.handle(expr))
1531}
1532
1533fn build_statistics_expr(
1534    expr_builder: &mut PruningExpressionBuilder,
1535) -> Result<Arc<dyn PhysicalExpr>> {
1536    let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1537        Operator::NotEq => {
1538            // column != literal => (min, max) = literal =>
1539            // !(min != literal && max != literal) ==>
1540            // min != literal || literal != max
1541            let min_column_expr = expr_builder.min_column_expr()?;
1542            let max_column_expr = expr_builder.max_column_expr()?;
1543            Arc::new(phys_expr::BinaryExpr::new(
1544                Arc::new(phys_expr::BinaryExpr::new(
1545                    min_column_expr,
1546                    Operator::NotEq,
1547                    Arc::clone(expr_builder.scalar_expr()),
1548                )),
1549                Operator::Or,
1550                Arc::new(phys_expr::BinaryExpr::new(
1551                    Arc::clone(expr_builder.scalar_expr()),
1552                    Operator::NotEq,
1553                    max_column_expr,
1554                )),
1555            ))
1556        }
1557        Operator::Eq => {
1558            // column = literal => (min, max) = literal => min <= literal && literal <= max
1559            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1560            let min_column_expr = expr_builder.min_column_expr()?;
1561            let max_column_expr = expr_builder.max_column_expr()?;
1562            Arc::new(phys_expr::BinaryExpr::new(
1563                Arc::new(phys_expr::BinaryExpr::new(
1564                    min_column_expr,
1565                    Operator::LtEq,
1566                    Arc::clone(expr_builder.scalar_expr()),
1567                )),
1568                Operator::And,
1569                Arc::new(phys_expr::BinaryExpr::new(
1570                    Arc::clone(expr_builder.scalar_expr()),
1571                    Operator::LtEq,
1572                    max_column_expr,
1573                )),
1574            ))
1575        }
1576        Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1577        Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1578            plan_datafusion_err!(
1579                "LIKE expression with wildcard at the beginning is not supported"
1580            )
1581        })?,
1582        Operator::Gt => {
1583            // column > literal => (min, max) > literal => max > literal
1584            Arc::new(phys_expr::BinaryExpr::new(
1585                expr_builder.max_column_expr()?,
1586                Operator::Gt,
1587                Arc::clone(expr_builder.scalar_expr()),
1588            ))
1589        }
1590        Operator::GtEq => {
1591            // column >= literal => (min, max) >= literal => max >= literal
1592            Arc::new(phys_expr::BinaryExpr::new(
1593                expr_builder.max_column_expr()?,
1594                Operator::GtEq,
1595                Arc::clone(expr_builder.scalar_expr()),
1596            ))
1597        }
1598        Operator::Lt => {
1599            // column < literal => (min, max) < literal => min < literal
1600            Arc::new(phys_expr::BinaryExpr::new(
1601                expr_builder.min_column_expr()?,
1602                Operator::Lt,
1603                Arc::clone(expr_builder.scalar_expr()),
1604            ))
1605        }
1606        Operator::LtEq => {
1607            // column <= literal => (min, max) <= literal => min <= literal
1608            Arc::new(phys_expr::BinaryExpr::new(
1609                expr_builder.min_column_expr()?,
1610                Operator::LtEq,
1611                Arc::clone(expr_builder.scalar_expr()),
1612            ))
1613        }
1614        // other expressions are not supported
1615        _ => {
1616            return plan_err!(
1617                "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1618            );
1619        }
1620    };
1621    let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1622    Ok(statistics_expr)
1623}
1624
1625/// returns the string literal of the scalar value if it is a string
1626fn unpack_string(s: &ScalarValue) -> Option<&str> {
1627    s.try_as_str().flatten()
1628}
1629
1630fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1631    if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1632        let s = unpack_string(lit.value())?;
1633        return Some(s);
1634    }
1635    None
1636}
1637
1638/// Convert `column LIKE literal` where P is a constant prefix of the literal
1639/// to a range check on the column: `P <= column && column < P'`, where P' is the
1640/// lowest string after all P* strings.
1641fn build_like_match(
1642    expr_builder: &mut PruningExpressionBuilder,
1643) -> Option<Arc<dyn PhysicalExpr>> {
1644    // column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
1645    // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1646    // column LIKE '%foo' => min <= '' && '' <= max => true
1647    // column LIKE '%foo%' => min <= '' && '' <= max => true
1648    // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1649
1650    // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1651    //  this may involve building the physical expressions that call lower() and upper()
1652    let min_column_expr = expr_builder.min_column_expr().ok()?;
1653    let max_column_expr = expr_builder.max_column_expr().ok()?;
1654    let scalar_expr = expr_builder.scalar_expr();
1655    // check that the scalar is a string literal
1656    let s = extract_string_literal(scalar_expr)?;
1657    // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1658    let first_wildcard_index = s.find(['%', '_']);
1659    if first_wildcard_index == Some(0) {
1660        // there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
1661        return None;
1662    }
1663    let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1664        let prefix = &s[..wildcard_index];
1665        let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1666            prefix.to_string(),
1667        ))));
1668        let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1669            increment_utf8(prefix)?,
1670        ))));
1671        (lower_bound_lit, upper_bound_lit)
1672    } else {
1673        // the like expression is a literal and can be converted into a comparison
1674        let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1675            s.to_string(),
1676        ))));
1677        (Arc::clone(&bound), bound)
1678    };
1679    let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1680        lower_bound,
1681        Operator::LtEq,
1682        Arc::clone(&max_column_expr),
1683    ));
1684    let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1685        Arc::clone(&min_column_expr),
1686        Operator::LtEq,
1687        upper_bound,
1688    ));
1689    let combined = Arc::new(phys_expr::BinaryExpr::new(
1690        upper_bound_expr,
1691        Operator::And,
1692        lower_bound_expr,
1693    ));
1694    Some(combined)
1695}
1696
1697// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1698//
1699// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1700// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1701// looking for rows that don't begin with `const_prefix` can never be true)
1702fn build_not_like_match(
1703    expr_builder: &mut PruningExpressionBuilder<'_>,
1704) -> Result<Arc<dyn PhysicalExpr>> {
1705    // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1706
1707    let min_column_expr = expr_builder.min_column_expr()?;
1708    let max_column_expr = expr_builder.max_column_expr()?;
1709
1710    let scalar_expr = expr_builder.scalar_expr();
1711
1712    let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1713        plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1714    })?;
1715
1716    let (const_prefix, remaining) = split_constant_prefix(pattern);
1717    if const_prefix.is_empty() || remaining != "%" {
1718        // we can not handle `%` at the beginning or in the middle of the pattern
1719        // Example: For pattern "foo%bar", the row group might include values like
1720        // ["foobar", "food", "foodbar"], making it unsafe to prune.
1721        // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1722        // match the pattern, intermediate values like "food" may not
1723        // match the full pattern "foo%bar", making pruning unsafe.
1724        // (truncate foo%bar to foo% have same problem)
1725
1726        // we can not handle pattern containing `_`
1727        // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1728        // which means not every row is guaranteed to match the pattern.
1729        return Err(plan_datafusion_err!(
1730            "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1731        ));
1732    }
1733
1734    let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1735        true,
1736        false,
1737        Arc::clone(&min_column_expr),
1738        Arc::clone(scalar_expr),
1739    ));
1740
1741    let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1742        true,
1743        false,
1744        Arc::clone(&max_column_expr),
1745        Arc::clone(scalar_expr),
1746    ));
1747
1748    Ok(Arc::new(phys_expr::BinaryExpr::new(
1749        min_col_not_like_epxr,
1750        Operator::Or,
1751        max_col_not_like_expr,
1752    )))
1753}
1754
1755/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1756fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1757    let char_indices = pattern.char_indices().collect::<Vec<_>>();
1758    for i in 0..char_indices.len() {
1759        let (idx, char) = char_indices[i];
1760        if char == '%' || char == '_' {
1761            if i != 0 && char_indices[i - 1].1 == '\\' {
1762                // ecsaped by `\`
1763                continue;
1764            }
1765            return (&pattern[..idx], &pattern[idx..]);
1766        }
1767    }
1768    (pattern, "")
1769}
1770
1771/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1772/// This makes it so that the returned string will always compare greater than the input string
1773/// or any other string with the same prefix.
1774/// This is necessary since the statistics may have been truncated: if we have a min statistic
1775/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1776/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1777/// In this example `increment_utf8("foo") == "fop"
1778fn increment_utf8(data: &str) -> Option<String> {
1779    // Helper function to check if a character is valid to use
1780    fn is_valid_unicode(c: char) -> bool {
1781        let cp = c as u32;
1782
1783        // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1784        if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1785            return false;
1786        }
1787
1788        // Filter out private use area
1789        if cp >= 0x110000 {
1790            return false;
1791        }
1792
1793        true
1794    }
1795
1796    // Convert string to vector of code points
1797    let mut code_points: Vec<char> = data.chars().collect();
1798
1799    // Work backwards through code points
1800    for idx in (0..code_points.len()).rev() {
1801        let original = code_points[idx] as u32;
1802
1803        // Try incrementing the code point
1804        if let Some(next_char) = char::from_u32(original + 1) {
1805            if is_valid_unicode(next_char) {
1806                code_points[idx] = next_char;
1807                // truncate the string to the current index
1808                code_points.truncate(idx + 1);
1809                return Some(code_points.into_iter().collect());
1810            }
1811        }
1812    }
1813
1814    None
1815}
1816
1817/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
1818///
1819/// This is important not only as an optimization but also because statistics may not be
1820/// accurate for columns that are all nulls.
1821/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
1822/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
1823///
1824/// For example:
1825///
1826/// `x_min <= 10 AND 10 <= x_max`
1827///
1828/// will become
1829///
1830/// ```sql
1831/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
1832/// ````
1833///
1834/// If the column is known to be all nulls, then the expression
1835/// `x_null_count = x_row_count` will be true, which will cause the
1836/// boolean expression to return false. Therefore, prune out the container.
1837fn wrap_null_count_check_expr(
1838    statistics_expr: Arc<dyn PhysicalExpr>,
1839    expr_builder: &mut PruningExpressionBuilder,
1840) -> Result<Arc<dyn PhysicalExpr>> {
1841    // x_null_count != x_row_count
1842    let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1843        expr_builder.null_count_column_expr()?,
1844        Operator::NotEq,
1845        expr_builder.row_count_column_expr()?,
1846    ));
1847
1848    // (x_null_count != x_row_count) AND (<statistics_expr>)
1849    Ok(Arc::new(phys_expr::BinaryExpr::new(
1850        not_when_null_count_eq_row_count,
1851        Operator::And,
1852        statistics_expr,
1853    )))
1854}
1855
1856#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1857pub(crate) enum StatisticsType {
1858    Min,
1859    Max,
1860    NullCount,
1861    RowCount,
1862}
1863
1864#[cfg(test)]
1865mod tests {
1866    use std::collections::HashMap;
1867    use std::ops::{Not, Rem};
1868
1869    use super::*;
1870    use datafusion_common::test_util::batches_to_string;
1871    use datafusion_expr::{and, col, lit, or};
1872    use insta::assert_snapshot;
1873
1874    use arrow::array::Decimal128Array;
1875    use arrow::{
1876        array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1877        datatypes::TimeUnit,
1878    };
1879    use datafusion_expr::expr::InList;
1880    use datafusion_expr::{cast, is_null, try_cast, Expr};
1881    use datafusion_functions_nested::expr_fn::{array_has, make_array};
1882    use datafusion_physical_expr::expressions as phys_expr;
1883    use datafusion_physical_expr::planner::logical2physical;
1884
1885    #[derive(Debug, Default)]
1886    /// Mock statistic provider for tests
1887    ///
1888    /// Each row represents the statistics for a "container" (which
1889    /// might represent an entire parquet file, or directory of files,
1890    /// or some other collection of data for which we had statistics)
1891    ///
1892    /// Note All `ArrayRefs` must be the same size.
1893    struct ContainerStats {
1894        min: Option<ArrayRef>,
1895        max: Option<ArrayRef>,
1896        /// Optional values
1897        null_counts: Option<ArrayRef>,
1898        row_counts: Option<ArrayRef>,
1899        /// Optional known values (e.g. mimic a bloom filter)
1900        /// (value, contained)
1901        /// If present, all BooleanArrays must be the same size as min/max
1902        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1903    }
1904
1905    impl ContainerStats {
1906        fn new() -> Self {
1907            Default::default()
1908        }
1909        fn new_decimal128(
1910            min: impl IntoIterator<Item = Option<i128>>,
1911            max: impl IntoIterator<Item = Option<i128>>,
1912            precision: u8,
1913            scale: i8,
1914        ) -> Self {
1915            Self::new()
1916                .with_min(Arc::new(
1917                    min.into_iter()
1918                        .collect::<Decimal128Array>()
1919                        .with_precision_and_scale(precision, scale)
1920                        .unwrap(),
1921                ))
1922                .with_max(Arc::new(
1923                    max.into_iter()
1924                        .collect::<Decimal128Array>()
1925                        .with_precision_and_scale(precision, scale)
1926                        .unwrap(),
1927                ))
1928        }
1929
1930        fn new_i64(
1931            min: impl IntoIterator<Item = Option<i64>>,
1932            max: impl IntoIterator<Item = Option<i64>>,
1933        ) -> Self {
1934            Self::new()
1935                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1936                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1937        }
1938
1939        fn new_i32(
1940            min: impl IntoIterator<Item = Option<i32>>,
1941            max: impl IntoIterator<Item = Option<i32>>,
1942        ) -> Self {
1943            Self::new()
1944                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1945                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1946        }
1947
1948        fn new_utf8<'a>(
1949            min: impl IntoIterator<Item = Option<&'a str>>,
1950            max: impl IntoIterator<Item = Option<&'a str>>,
1951        ) -> Self {
1952            Self::new()
1953                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1954                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1955        }
1956
1957        fn new_bool(
1958            min: impl IntoIterator<Item = Option<bool>>,
1959            max: impl IntoIterator<Item = Option<bool>>,
1960        ) -> Self {
1961            Self::new()
1962                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1963                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1964        }
1965
1966        fn min(&self) -> Option<ArrayRef> {
1967            self.min.clone()
1968        }
1969
1970        fn max(&self) -> Option<ArrayRef> {
1971            self.max.clone()
1972        }
1973
1974        fn null_counts(&self) -> Option<ArrayRef> {
1975            self.null_counts.clone()
1976        }
1977
1978        fn row_counts(&self) -> Option<ArrayRef> {
1979            self.row_counts.clone()
1980        }
1981
1982        /// return an iterator over all arrays in this statistics
1983        fn arrays(&self) -> Vec<ArrayRef> {
1984            let contained_arrays = self
1985                .contained
1986                .iter()
1987                .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
1988
1989            [
1990                self.min.as_ref().cloned(),
1991                self.max.as_ref().cloned(),
1992                self.null_counts.as_ref().cloned(),
1993                self.row_counts.as_ref().cloned(),
1994            ]
1995            .into_iter()
1996            .flatten()
1997            .chain(contained_arrays)
1998            .collect()
1999        }
2000
2001        /// Returns the number of containers represented by this statistics This
2002        /// picks the length of the first array as all arrays must have the same
2003        /// length (which is verified by `assert_invariants`).
2004        fn len(&self) -> usize {
2005            // pick the first non zero length
2006            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2007        }
2008
2009        /// Ensure that the lengths of all arrays are consistent
2010        fn assert_invariants(&self) {
2011            let mut prev_len = None;
2012
2013            for len in self.arrays().iter().map(|a| a.len()) {
2014                // Get a length, if we don't already have one
2015                match prev_len {
2016                    None => {
2017                        prev_len = Some(len);
2018                    }
2019                    Some(prev_len) => {
2020                        assert_eq!(prev_len, len);
2021                    }
2022                }
2023            }
2024        }
2025
2026        /// Add min values
2027        fn with_min(mut self, min: ArrayRef) -> Self {
2028            self.min = Some(min);
2029            self
2030        }
2031
2032        /// Add max values
2033        fn with_max(mut self, max: ArrayRef) -> Self {
2034            self.max = Some(max);
2035            self
2036        }
2037
2038        /// Add null counts. There must be the same number of null counts as
2039        /// there are containers
2040        fn with_null_counts(
2041            mut self,
2042            counts: impl IntoIterator<Item = Option<u64>>,
2043        ) -> Self {
2044            let null_counts: ArrayRef =
2045                Arc::new(counts.into_iter().collect::<UInt64Array>());
2046
2047            self.assert_invariants();
2048            self.null_counts = Some(null_counts);
2049            self
2050        }
2051
2052        /// Add row counts. There must be the same number of row counts as
2053        /// there are containers
2054        fn with_row_counts(
2055            mut self,
2056            counts: impl IntoIterator<Item = Option<u64>>,
2057        ) -> Self {
2058            let row_counts: ArrayRef =
2059                Arc::new(counts.into_iter().collect::<UInt64Array>());
2060
2061            self.assert_invariants();
2062            self.row_counts = Some(row_counts);
2063            self
2064        }
2065
2066        /// Add contained information.
2067        pub fn with_contained(
2068            mut self,
2069            values: impl IntoIterator<Item = ScalarValue>,
2070            contained: impl IntoIterator<Item = Option<bool>>,
2071        ) -> Self {
2072            let contained: BooleanArray = contained.into_iter().collect();
2073            let values: HashSet<_> = values.into_iter().collect();
2074
2075            self.contained.push((values, contained));
2076            self.assert_invariants();
2077            self
2078        }
2079
2080        /// get any contained information for the specified values
2081        fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2082            // find the one with the matching values
2083            self.contained
2084                .iter()
2085                .find(|(values, _contained)| values == find_values)
2086                .map(|(_values, contained)| contained.clone())
2087        }
2088    }
2089
2090    #[derive(Debug, Default)]
2091    struct TestStatistics {
2092        // key: column name
2093        stats: HashMap<Column, ContainerStats>,
2094    }
2095
2096    impl TestStatistics {
2097        fn new() -> Self {
2098            Self::default()
2099        }
2100
2101        fn with(
2102            mut self,
2103            name: impl Into<String>,
2104            container_stats: ContainerStats,
2105        ) -> Self {
2106            let col = Column::from_name(name.into());
2107            self.stats.insert(col, container_stats);
2108            self
2109        }
2110
2111        /// Add null counts for the specified column.
2112        /// There must be the same number of null counts as
2113        /// there are containers
2114        fn with_null_counts(
2115            mut self,
2116            name: impl Into<String>,
2117            counts: impl IntoIterator<Item = Option<u64>>,
2118        ) -> Self {
2119            let col = Column::from_name(name.into());
2120
2121            // take stats out and update them
2122            let container_stats = self
2123                .stats
2124                .remove(&col)
2125                .unwrap_or_default()
2126                .with_null_counts(counts);
2127
2128            // put stats back in
2129            self.stats.insert(col, container_stats);
2130            self
2131        }
2132
2133        /// Add row counts for the specified column.
2134        /// There must be the same number of row counts as
2135        /// there are containers
2136        fn with_row_counts(
2137            mut self,
2138            name: impl Into<String>,
2139            counts: impl IntoIterator<Item = Option<u64>>,
2140        ) -> Self {
2141            let col = Column::from_name(name.into());
2142
2143            // take stats out and update them
2144            let container_stats = self
2145                .stats
2146                .remove(&col)
2147                .unwrap_or_default()
2148                .with_row_counts(counts);
2149
2150            // put stats back in
2151            self.stats.insert(col, container_stats);
2152            self
2153        }
2154
2155        /// Add contained information for the specified column.
2156        fn with_contained(
2157            mut self,
2158            name: impl Into<String>,
2159            values: impl IntoIterator<Item = ScalarValue>,
2160            contained: impl IntoIterator<Item = Option<bool>>,
2161        ) -> Self {
2162            let col = Column::from_name(name.into());
2163
2164            // take stats out and update them
2165            let container_stats = self
2166                .stats
2167                .remove(&col)
2168                .unwrap_or_default()
2169                .with_contained(values, contained);
2170
2171            // put stats back in
2172            self.stats.insert(col, container_stats);
2173            self
2174        }
2175    }
2176
2177    impl PruningStatistics for TestStatistics {
2178        fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2179            self.stats
2180                .get(column)
2181                .map(|container_stats| container_stats.min())
2182                .unwrap_or(None)
2183        }
2184
2185        fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2186            self.stats
2187                .get(column)
2188                .map(|container_stats| container_stats.max())
2189                .unwrap_or(None)
2190        }
2191
2192        fn num_containers(&self) -> usize {
2193            self.stats
2194                .values()
2195                .next()
2196                .map(|container_stats| container_stats.len())
2197                .unwrap_or(0)
2198        }
2199
2200        fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2201            self.stats
2202                .get(column)
2203                .map(|container_stats| container_stats.null_counts())
2204                .unwrap_or(None)
2205        }
2206
2207        fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2208            self.stats
2209                .get(column)
2210                .map(|container_stats| container_stats.row_counts())
2211                .unwrap_or(None)
2212        }
2213
2214        fn contained(
2215            &self,
2216            column: &Column,
2217            values: &HashSet<ScalarValue>,
2218        ) -> Option<BooleanArray> {
2219            self.stats
2220                .get(column)
2221                .and_then(|container_stats| container_stats.contained(values))
2222        }
2223    }
2224
2225    /// Returns the specified min/max container values
2226    struct OneContainerStats {
2227        min_values: Option<ArrayRef>,
2228        max_values: Option<ArrayRef>,
2229        num_containers: usize,
2230    }
2231
2232    impl PruningStatistics for OneContainerStats {
2233        fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2234            self.min_values.clone()
2235        }
2236
2237        fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2238            self.max_values.clone()
2239        }
2240
2241        fn num_containers(&self) -> usize {
2242            self.num_containers
2243        }
2244
2245        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2246            None
2247        }
2248
2249        fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2250            None
2251        }
2252
2253        fn contained(
2254            &self,
2255            _column: &Column,
2256            _values: &HashSet<ScalarValue>,
2257        ) -> Option<BooleanArray> {
2258            None
2259        }
2260    }
2261
2262    /// Row count should only be referenced once in the pruning expression, even if we need the row count
2263    /// for multiple columns.
2264    #[test]
2265    fn test_unique_row_count_field_and_column() {
2266        // c1 = 100 AND c2 = 200
2267        let schema: SchemaRef = Arc::new(Schema::new(vec![
2268            Field::new("c1", DataType::Int32, true),
2269            Field::new("c2", DataType::Int32, true),
2270        ]));
2271        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2272        let expr = logical2physical(&expr, &schema);
2273        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2274        // note pruning expression refers to row_count twice
2275        assert_eq!(
2276            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2277            p.predicate_expr.to_string()
2278        );
2279
2280        // Fields in required schema should be unique, otherwise when creating batches
2281        // it will fail because of duplicate field names
2282        let mut fields = HashSet::new();
2283        for (_col, _ty, field) in p.required_columns().iter() {
2284            let was_new = fields.insert(field);
2285            if !was_new {
2286                panic!(
2287                    "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2288                );
2289            }
2290        }
2291    }
2292
2293    #[test]
2294    fn prune_all_rows_null_counts() {
2295        // if null_count = row_count then we should prune the container for i = 0
2296        // regardless of the statistics
2297        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2298        let statistics = TestStatistics::new().with(
2299            "i",
2300            ContainerStats::new_i32(
2301                vec![Some(0)], // min
2302                vec![Some(0)], // max
2303            )
2304            .with_null_counts(vec![Some(1)])
2305            .with_row_counts(vec![Some(1)]),
2306        );
2307        let expected_ret = &[false];
2308        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2309
2310        // this should be true even if the container stats are missing
2311        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2312        let container_stats = ContainerStats {
2313            min: Some(Arc::new(Int32Array::from(vec![None]))),
2314            max: Some(Arc::new(Int32Array::from(vec![None]))),
2315            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2316            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2317            ..ContainerStats::default()
2318        };
2319        let statistics = TestStatistics::new().with("i", container_stats);
2320        let expected_ret = &[false];
2321        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2322
2323        // If the null counts themselves are missing we should be able to fall back to the stats
2324        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2325        let container_stats = ContainerStats {
2326            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2327            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2328            null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2329            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2330            ..ContainerStats::default()
2331        };
2332        let statistics = TestStatistics::new().with("i", container_stats);
2333        let expected_ret = &[true];
2334        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2335        let expected_ret = &[false];
2336        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2337
2338        // Same for the row counts
2339        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2340        let container_stats = ContainerStats {
2341            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2342            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2343            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2344            row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2345            ..ContainerStats::default()
2346        };
2347        let statistics = TestStatistics::new().with("i", container_stats);
2348        let expected_ret = &[true];
2349        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2350        let expected_ret = &[false];
2351        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2352    }
2353
2354    #[test]
2355    fn prune_missing_statistics() {
2356        // If the min or max stats are missing we should not prune
2357        // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2358        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2359        let container_stats = ContainerStats {
2360            min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2361            max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2362            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2363            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2364            ..ContainerStats::default()
2365        };
2366        let statistics = TestStatistics::new().with("i", container_stats);
2367        let expected_ret = &[true, true];
2368        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2369        let expected_ret = &[false, true];
2370        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2371        let expected_ret = &[true, false];
2372        prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2373    }
2374
2375    #[test]
2376    fn prune_null_stats() {
2377        // if null_count = row_count then we should prune the container for i = 0
2378        // regardless of the statistics
2379        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2380
2381        let statistics = TestStatistics::new().with(
2382            "i",
2383            ContainerStats::new_i32(
2384                vec![Some(0)], // min
2385                vec![Some(0)], // max
2386            )
2387            .with_null_counts(vec![Some(1)])
2388            .with_row_counts(vec![Some(1)]),
2389        );
2390
2391        let expected_ret = &[false];
2392
2393        // i = 0
2394        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2395    }
2396
2397    #[test]
2398    fn test_build_statistics_record_batch() {
2399        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2400        let required_columns = RequiredColumns::from(vec![
2401            // min of original column s1, named s1_min
2402            (
2403                phys_expr::Column::new("s1", 1),
2404                StatisticsType::Min,
2405                Field::new("s1_min", DataType::Int32, true),
2406            ),
2407            // max of original column s2, named s2_max
2408            (
2409                phys_expr::Column::new("s2", 2),
2410                StatisticsType::Max,
2411                Field::new("s2_max", DataType::Int32, true),
2412            ),
2413            // max of original column s3, named s3_max
2414            (
2415                phys_expr::Column::new("s3", 3),
2416                StatisticsType::Max,
2417                Field::new("s3_max", DataType::Utf8, true),
2418            ),
2419            // min of original column s3, named s3_min
2420            (
2421                phys_expr::Column::new("s3", 3),
2422                StatisticsType::Min,
2423                Field::new("s3_min", DataType::Utf8, true),
2424            ),
2425        ]);
2426
2427        let statistics = TestStatistics::new()
2428            .with(
2429                "s1",
2430                ContainerStats::new_i32(
2431                    vec![None, None, Some(9), None],  // min
2432                    vec![Some(10), None, None, None], // max
2433                ),
2434            )
2435            .with(
2436                "s2",
2437                ContainerStats::new_i32(
2438                    vec![Some(2), None, None, None],  // min
2439                    vec![Some(20), None, None, None], // max
2440                ),
2441            )
2442            .with(
2443                "s3",
2444                ContainerStats::new_utf8(
2445                    vec![Some("a"), None, None, None],      // min
2446                    vec![Some("q"), None, Some("r"), None], // max
2447                ),
2448            );
2449
2450        let batch =
2451            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2452        assert_snapshot!(batches_to_string(&[batch]), @r"
2453        +--------+--------+--------+--------+
2454        | s1_min | s2_max | s3_max | s3_min |
2455        +--------+--------+--------+--------+
2456        |        | 20     | q      | a      |
2457        |        |        |        |        |
2458        | 9      |        | r      |        |
2459        |        |        |        |        |
2460        +--------+--------+--------+--------+
2461        ");
2462    }
2463
2464    #[test]
2465    fn test_build_statistics_casting() {
2466        // Test requesting a Timestamp column, but getting statistics as Int64
2467        // which is what Parquet does
2468
2469        // Request a record batch with of s1_min as a timestamp
2470        let required_columns = RequiredColumns::from(vec![(
2471            phys_expr::Column::new("s3", 3),
2472            StatisticsType::Min,
2473            Field::new(
2474                "s1_min",
2475                DataType::Timestamp(TimeUnit::Nanosecond, None),
2476                true,
2477            ),
2478        )]);
2479
2480        // Note the statistics pass back i64 (not timestamp)
2481        let statistics = OneContainerStats {
2482            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2483            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2484            num_containers: 1,
2485        };
2486
2487        let batch =
2488            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2489
2490        assert_snapshot!(batches_to_string(&[batch]), @r"
2491        +-------------------------------+
2492        | s1_min                        |
2493        +-------------------------------+
2494        | 1970-01-01T00:00:00.000000010 |
2495        +-------------------------------+
2496        ");
2497    }
2498
2499    #[test]
2500    fn test_build_statistics_no_required_stats() {
2501        let required_columns = RequiredColumns::new();
2502
2503        let statistics = OneContainerStats {
2504            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2505            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2506            num_containers: 1,
2507        };
2508
2509        let batch =
2510            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2511        assert_eq!(batch.num_rows(), 1); // had 1 container
2512    }
2513
2514    #[test]
2515    fn test_build_statistics_inconsistent_types() {
2516        // Test requesting a Utf8 column when the stats return some other type
2517
2518        // Request a record batch with of s1_min as a timestamp
2519        let required_columns = RequiredColumns::from(vec![(
2520            phys_expr::Column::new("s3", 3),
2521            StatisticsType::Min,
2522            Field::new("s1_min", DataType::Utf8, true),
2523        )]);
2524
2525        // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2526        let statistics = OneContainerStats {
2527            min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2528            max_values: None,
2529            num_containers: 1,
2530        };
2531
2532        let batch =
2533            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2534        assert_snapshot!(batches_to_string(&[batch]), @r"
2535        +--------+
2536        | s1_min |
2537        +--------+
2538        |        |
2539        +--------+
2540        ");
2541    }
2542
2543    #[test]
2544    fn test_build_statistics_inconsistent_length() {
2545        // return an inconsistent length to the actual statistics arrays
2546        let required_columns = RequiredColumns::from(vec![(
2547            phys_expr::Column::new("s1", 3),
2548            StatisticsType::Min,
2549            Field::new("s1_min", DataType::Int64, true),
2550        )]);
2551
2552        // Note the statistics pass back i64 (not timestamp)
2553        let statistics = OneContainerStats {
2554            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2555            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2556            num_containers: 3,
2557        };
2558
2559        let result =
2560            build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2561        assert!(
2562            result
2563                .to_string()
2564                .contains("mismatched statistics length. Expected 3, got 1"),
2565            "{}",
2566            result
2567        );
2568    }
2569
2570    #[test]
2571    fn row_group_predicate_eq() -> Result<()> {
2572        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2573        let expected_expr =
2574            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2575
2576        // test column on the left
2577        let expr = col("c1").eq(lit(1));
2578        let predicate_expr =
2579            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2580        assert_eq!(predicate_expr.to_string(), expected_expr);
2581
2582        // test column on the right
2583        let expr = lit(1).eq(col("c1"));
2584        let predicate_expr =
2585            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2586        assert_eq!(predicate_expr.to_string(), expected_expr);
2587
2588        Ok(())
2589    }
2590
2591    #[test]
2592    fn row_group_predicate_not_eq() -> Result<()> {
2593        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2594        let expected_expr =
2595            "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2596
2597        // test column on the left
2598        let expr = col("c1").not_eq(lit(1));
2599        let predicate_expr =
2600            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2601        assert_eq!(predicate_expr.to_string(), expected_expr);
2602
2603        // test column on the right
2604        let expr = lit(1).not_eq(col("c1"));
2605        let predicate_expr =
2606            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2607        assert_eq!(predicate_expr.to_string(), expected_expr);
2608
2609        Ok(())
2610    }
2611
2612    #[test]
2613    fn row_group_predicate_gt() -> Result<()> {
2614        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2615        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2616
2617        // test column on the left
2618        let expr = col("c1").gt(lit(1));
2619        let predicate_expr =
2620            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2621        assert_eq!(predicate_expr.to_string(), expected_expr);
2622
2623        // test column on the right
2624        let expr = lit(1).lt(col("c1"));
2625        let predicate_expr =
2626            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2627        assert_eq!(predicate_expr.to_string(), expected_expr);
2628
2629        Ok(())
2630    }
2631
2632    #[test]
2633    fn row_group_predicate_gt_eq() -> Result<()> {
2634        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2635        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2636
2637        // test column on the left
2638        let expr = col("c1").gt_eq(lit(1));
2639        let predicate_expr =
2640            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2641        assert_eq!(predicate_expr.to_string(), expected_expr);
2642        // test column on the right
2643        let expr = lit(1).lt_eq(col("c1"));
2644        let predicate_expr =
2645            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2646        assert_eq!(predicate_expr.to_string(), expected_expr);
2647
2648        Ok(())
2649    }
2650
2651    #[test]
2652    fn row_group_predicate_lt() -> Result<()> {
2653        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2654        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2655
2656        // test column on the left
2657        let expr = col("c1").lt(lit(1));
2658        let predicate_expr =
2659            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2660        assert_eq!(predicate_expr.to_string(), expected_expr);
2661
2662        // test column on the right
2663        let expr = lit(1).gt(col("c1"));
2664        let predicate_expr =
2665            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2666        assert_eq!(predicate_expr.to_string(), expected_expr);
2667
2668        Ok(())
2669    }
2670
2671    #[test]
2672    fn row_group_predicate_lt_eq() -> Result<()> {
2673        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2674        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2675
2676        // test column on the left
2677        let expr = col("c1").lt_eq(lit(1));
2678        let predicate_expr =
2679            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2680        assert_eq!(predicate_expr.to_string(), expected_expr);
2681        // test column on the right
2682        let expr = lit(1).gt_eq(col("c1"));
2683        let predicate_expr =
2684            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2685        assert_eq!(predicate_expr.to_string(), expected_expr);
2686
2687        Ok(())
2688    }
2689
2690    #[test]
2691    fn row_group_predicate_and() -> Result<()> {
2692        let schema = Schema::new(vec![
2693            Field::new("c1", DataType::Int32, false),
2694            Field::new("c2", DataType::Int32, false),
2695            Field::new("c3", DataType::Int32, false),
2696        ]);
2697        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2698        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2699        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2700        let predicate_expr =
2701            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2702        assert_eq!(predicate_expr.to_string(), expected_expr);
2703
2704        Ok(())
2705    }
2706
2707    #[test]
2708    fn row_group_predicate_or() -> Result<()> {
2709        let schema = Schema::new(vec![
2710            Field::new("c1", DataType::Int32, false),
2711            Field::new("c2", DataType::Int32, false),
2712        ]);
2713        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2714        let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2715        let expected_expr = "true";
2716        let predicate_expr =
2717            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2718        assert_eq!(predicate_expr.to_string(), expected_expr);
2719
2720        Ok(())
2721    }
2722
2723    #[test]
2724    fn row_group_predicate_not() -> Result<()> {
2725        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2726        let expected_expr = "true";
2727
2728        let expr = col("c1").not();
2729        let predicate_expr =
2730            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2731        assert_eq!(predicate_expr.to_string(), expected_expr);
2732
2733        Ok(())
2734    }
2735
2736    #[test]
2737    fn row_group_predicate_not_bool() -> Result<()> {
2738        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2739        let expected_expr = "NOT c1_min@0 AND c1_max@1";
2740
2741        let expr = col("c1").not();
2742        let predicate_expr =
2743            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2744        assert_eq!(predicate_expr.to_string(), expected_expr);
2745
2746        Ok(())
2747    }
2748
2749    #[test]
2750    fn row_group_predicate_bool() -> Result<()> {
2751        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2752        let expected_expr = "c1_min@0 OR c1_max@1";
2753
2754        let expr = col("c1");
2755        let predicate_expr =
2756            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2757        assert_eq!(predicate_expr.to_string(), expected_expr);
2758
2759        Ok(())
2760    }
2761
2762    #[test]
2763    fn row_group_predicate_lt_bool() -> Result<()> {
2764        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2765        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2766
2767        // DF doesn't support arithmetic on boolean columns so
2768        // this predicate will error when evaluated
2769        let expr = col("c1").lt(lit(true));
2770        let predicate_expr =
2771            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2772        assert_eq!(predicate_expr.to_string(), expected_expr);
2773
2774        Ok(())
2775    }
2776
2777    #[test]
2778    fn row_group_predicate_required_columns() -> Result<()> {
2779        let schema = Schema::new(vec![
2780            Field::new("c1", DataType::Int32, false),
2781            Field::new("c2", DataType::Int32, false),
2782        ]);
2783        let mut required_columns = RequiredColumns::new();
2784        // c1 < 1 and (c2 = 2 or c2 = 3)
2785        let expr = col("c1")
2786            .lt(lit(1))
2787            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2788        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2789        let predicate_expr =
2790            test_build_predicate_expression(&expr, &schema, &mut required_columns);
2791        assert_eq!(predicate_expr.to_string(), expected_expr);
2792        println!("required_columns: {required_columns:#?}"); // for debugging assertions below
2793                                                             // c1 < 1 should add c1_min
2794        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2795        assert_eq!(
2796            required_columns.columns[0],
2797            (
2798                phys_expr::Column::new("c1", 0),
2799                StatisticsType::Min,
2800                c1_min_field.with_nullable(true) // could be nullable if stats are not present
2801            )
2802        );
2803        // c1 < 1 should add c1_null_count
2804        let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2805        assert_eq!(
2806            required_columns.columns[1],
2807            (
2808                phys_expr::Column::new("c1", 0),
2809                StatisticsType::NullCount,
2810                c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
2811            )
2812        );
2813        // c1 < 1 should add row_count
2814        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2815        assert_eq!(
2816            required_columns.columns[2],
2817            (
2818                phys_expr::Column::new("c1", 0),
2819                StatisticsType::RowCount,
2820                row_count_field.with_nullable(true) // could be nullable if stats are not present
2821            )
2822        );
2823        // c2 = 2 should add c2_min and c2_max
2824        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2825        assert_eq!(
2826            required_columns.columns[3],
2827            (
2828                phys_expr::Column::new("c2", 1),
2829                StatisticsType::Min,
2830                c2_min_field.with_nullable(true) // could be nullable if stats are not present
2831            )
2832        );
2833        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2834        assert_eq!(
2835            required_columns.columns[4],
2836            (
2837                phys_expr::Column::new("c2", 1),
2838                StatisticsType::Max,
2839                c2_max_field.with_nullable(true) // could be nullable if stats are not present
2840            )
2841        );
2842        // c2 = 2 should add c2_null_count
2843        let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2844        assert_eq!(
2845            required_columns.columns[5],
2846            (
2847                phys_expr::Column::new("c2", 1),
2848                StatisticsType::NullCount,
2849                c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
2850            )
2851        );
2852        // c2 = 1 should add row_count
2853        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2854        assert_eq!(
2855            required_columns.columns[2],
2856            (
2857                phys_expr::Column::new("c1", 0),
2858                StatisticsType::RowCount,
2859                row_count_field.with_nullable(true) // could be nullable if stats are not present
2860            )
2861        );
2862        // c2 = 3 shouldn't add any new statistics fields
2863        assert_eq!(required_columns.columns.len(), 6);
2864
2865        Ok(())
2866    }
2867
2868    #[test]
2869    fn row_group_predicate_in_list() -> Result<()> {
2870        let schema = Schema::new(vec![
2871            Field::new("c1", DataType::Int32, false),
2872            Field::new("c2", DataType::Int32, false),
2873        ]);
2874        // test c1 in(1, 2, 3)
2875        let expr = Expr::InList(InList::new(
2876            Box::new(col("c1")),
2877            vec![lit(1), lit(2), lit(3)],
2878            false,
2879        ));
2880        let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2881        let predicate_expr =
2882            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2883        assert_eq!(predicate_expr.to_string(), expected_expr);
2884
2885        Ok(())
2886    }
2887
2888    #[test]
2889    fn row_group_predicate_in_list_empty() -> Result<()> {
2890        let schema = Schema::new(vec![
2891            Field::new("c1", DataType::Int32, false),
2892            Field::new("c2", DataType::Int32, false),
2893        ]);
2894        // test c1 in()
2895        let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2896        let expected_expr = "true";
2897        let predicate_expr =
2898            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2899        assert_eq!(predicate_expr.to_string(), expected_expr);
2900
2901        Ok(())
2902    }
2903
2904    #[test]
2905    fn row_group_predicate_in_list_negated() -> Result<()> {
2906        let schema = Schema::new(vec![
2907            Field::new("c1", DataType::Int32, false),
2908            Field::new("c2", DataType::Int32, false),
2909        ]);
2910        // test c1 not in(1, 2, 3)
2911        let expr = Expr::InList(InList::new(
2912            Box::new(col("c1")),
2913            vec![lit(1), lit(2), lit(3)],
2914            true,
2915        ));
2916        let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2917        let predicate_expr =
2918            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2919        assert_eq!(predicate_expr.to_string(), expected_expr);
2920
2921        Ok(())
2922    }
2923
2924    #[test]
2925    fn row_group_predicate_between() -> Result<()> {
2926        let schema = Schema::new(vec![
2927            Field::new("c1", DataType::Int32, false),
2928            Field::new("c2", DataType::Int32, false),
2929        ]);
2930
2931        // test c1 BETWEEN 1 AND 5
2932        let expr1 = col("c1").between(lit(1), lit(5));
2933
2934        // test 1 <= c1 <= 5
2935        let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2936
2937        let predicate_expr1 =
2938            test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2939
2940        let predicate_expr2 =
2941            test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2942        assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2943
2944        Ok(())
2945    }
2946
2947    #[test]
2948    fn row_group_predicate_between_with_in_list() -> Result<()> {
2949        let schema = Schema::new(vec![
2950            Field::new("c1", DataType::Int32, false),
2951            Field::new("c2", DataType::Int32, false),
2952        ]);
2953        // test c1 in(1, 2)
2954        let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2955
2956        // test c2 BETWEEN 4 AND 5
2957        let expr2 = col("c2").between(lit(4), lit(5));
2958
2959        // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
2960        let expr3 = expr1.and(expr2);
2961
2962        let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2963        let predicate_expr =
2964            test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2965        assert_eq!(predicate_expr.to_string(), expected_expr);
2966
2967        Ok(())
2968    }
2969
2970    #[test]
2971    fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2972        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2973        // test c1 in(1..21)
2974        // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
2975        // always true
2976        let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2977
2978        let expected_expr = "true";
2979        let predicate_expr =
2980            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2981        assert_eq!(predicate_expr.to_string(), expected_expr);
2982
2983        Ok(())
2984    }
2985
2986    #[test]
2987    fn row_group_predicate_cast_int_int() -> Result<()> {
2988        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2989        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
2990
2991        // test cast(c1 as int64) = 1
2992        // test column on the left
2993        let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
2994        let predicate_expr =
2995            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2996        assert_eq!(predicate_expr.to_string(), expected_expr);
2997
2998        // test column on the right
2999        let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3000        let predicate_expr =
3001            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3002        assert_eq!(predicate_expr.to_string(), expected_expr);
3003
3004        let expected_expr =
3005            "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3006
3007        // test column on the left
3008        let expr =
3009            try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3010        let predicate_expr =
3011            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3012        assert_eq!(predicate_expr.to_string(), expected_expr);
3013
3014        // test column on the right
3015        let expr =
3016            lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3017        let predicate_expr =
3018            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3019        assert_eq!(predicate_expr.to_string(), expected_expr);
3020
3021        Ok(())
3022    }
3023
3024    #[test]
3025    fn row_group_predicate_cast_string_string() -> Result<()> {
3026        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3027        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
3028
3029        // test column on the left
3030        let expr = cast(col("c1"), DataType::Utf8)
3031            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3032        let predicate_expr =
3033            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3034        assert_eq!(predicate_expr.to_string(), expected_expr);
3035
3036        // test column on the right
3037        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3038            .eq(cast(col("c1"), DataType::Utf8));
3039        let predicate_expr =
3040            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3041        assert_eq!(predicate_expr.to_string(), expected_expr);
3042
3043        Ok(())
3044    }
3045
3046    #[test]
3047    fn row_group_predicate_cast_string_int() -> Result<()> {
3048        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3049        let expected_expr = "true";
3050
3051        // test column on the left
3052        let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3053        let predicate_expr =
3054            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3055        assert_eq!(predicate_expr.to_string(), expected_expr);
3056
3057        // test column on the right
3058        let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3059        let predicate_expr =
3060            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3061        assert_eq!(predicate_expr.to_string(), expected_expr);
3062
3063        Ok(())
3064    }
3065
3066    #[test]
3067    fn row_group_predicate_cast_int_string() -> Result<()> {
3068        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3069        let expected_expr = "true";
3070
3071        // test column on the left
3072        let expr = cast(col("c1"), DataType::Utf8)
3073            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3074        let predicate_expr =
3075            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3076        assert_eq!(predicate_expr.to_string(), expected_expr);
3077
3078        // test column on the right
3079        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3080            .eq(cast(col("c1"), DataType::Utf8));
3081        let predicate_expr =
3082            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3083        assert_eq!(predicate_expr.to_string(), expected_expr);
3084
3085        Ok(())
3086    }
3087
3088    #[test]
3089    fn row_group_predicate_date_date() -> Result<()> {
3090        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3091        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3092
3093        // test column on the left
3094        let expr =
3095            cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3096        let predicate_expr =
3097            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3098        assert_eq!(predicate_expr.to_string(), expected_expr);
3099
3100        // test column on the right
3101        let expr =
3102            lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3103        let predicate_expr =
3104            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3105        assert_eq!(predicate_expr.to_string(), expected_expr);
3106
3107        Ok(())
3108    }
3109
3110    #[test]
3111    fn row_group_predicate_dict_string_date() -> Result<()> {
3112        // Test with Dictionary<UInt8, Utf8> for the literal
3113        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3114        let expected_expr = "true";
3115
3116        // test column on the left
3117        let expr = cast(
3118            col("c1"),
3119            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3120        )
3121        .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3122        let predicate_expr =
3123            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3124        assert_eq!(predicate_expr.to_string(), expected_expr);
3125
3126        // test column on the right
3127        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3128            col("c1"),
3129            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3130        ));
3131        let predicate_expr =
3132            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3133        assert_eq!(predicate_expr.to_string(), expected_expr);
3134
3135        Ok(())
3136    }
3137
3138    #[test]
3139    fn row_group_predicate_date_dict_string() -> Result<()> {
3140        // Test with Dictionary<UInt8, Utf8> for the column
3141        let schema = Schema::new(vec![Field::new(
3142            "c1",
3143            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3144            false,
3145        )]);
3146        let expected_expr = "true";
3147
3148        // test column on the left
3149        let expr =
3150            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3151        let predicate_expr =
3152            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3153        assert_eq!(predicate_expr.to_string(), expected_expr);
3154
3155        // test column on the right
3156        let expr =
3157            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3158        let predicate_expr =
3159            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3160        assert_eq!(predicate_expr.to_string(), expected_expr);
3161
3162        Ok(())
3163    }
3164
3165    #[test]
3166    fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3167        // Test with Dictionary types that have the same value type but different key types
3168        let schema = Schema::new(vec![Field::new(
3169            "c1",
3170            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3171            false,
3172        )]);
3173
3174        // Direct comparison with no cast
3175        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3176        let predicate_expr =
3177            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3178        let expected_expr =
3179            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3180        assert_eq!(predicate_expr.to_string(), expected_expr);
3181
3182        // Test with column cast to a dictionary with different key type
3183        let expr = cast(
3184            col("c1"),
3185            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3186        )
3187        .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3188        let predicate_expr =
3189            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3190        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3191        assert_eq!(predicate_expr.to_string(), expected_expr);
3192
3193        Ok(())
3194    }
3195
3196    #[test]
3197    fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3198        // Test with Dictionary types that have different value types
3199        let schema = Schema::new(vec![Field::new(
3200            "c1",
3201            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3202            false,
3203        )]);
3204        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3205
3206        // Test with literal of a different type
3207        let expr =
3208            cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3209        let predicate_expr =
3210            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3211        assert_eq!(predicate_expr.to_string(), expected_expr);
3212
3213        Ok(())
3214    }
3215
3216    #[test]
3217    fn row_group_predicate_nested_dict() -> Result<()> {
3218        // Test with nested Dictionary types
3219        let schema = Schema::new(vec![Field::new(
3220            "c1",
3221            DataType::Dictionary(
3222                Box::new(DataType::UInt8),
3223                Box::new(DataType::Dictionary(
3224                    Box::new(DataType::UInt16),
3225                    Box::new(DataType::Utf8),
3226                )),
3227            ),
3228            false,
3229        )]);
3230        let expected_expr =
3231            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3232
3233        // Test with a simple literal
3234        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3235        let predicate_expr =
3236            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3237        assert_eq!(predicate_expr.to_string(), expected_expr);
3238
3239        Ok(())
3240    }
3241
3242    #[test]
3243    fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3244        // Test with dictionary-wrapped date types for both sides
3245        let schema = Schema::new(vec![Field::new(
3246            "c1",
3247            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3248            false,
3249        )]);
3250        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3251
3252        // Test with a cast to a different date type
3253        let expr = cast(
3254            col("c1"),
3255            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3256        )
3257        .eq(lit(ScalarValue::Date64(Some(123))));
3258        let predicate_expr =
3259            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3260        assert_eq!(predicate_expr.to_string(), expected_expr);
3261
3262        Ok(())
3263    }
3264
3265    #[test]
3266    fn row_group_predicate_date_string() -> Result<()> {
3267        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3268        let expected_expr = "true";
3269
3270        // test column on the left
3271        let expr =
3272            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3273        let predicate_expr =
3274            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3275        assert_eq!(predicate_expr.to_string(), expected_expr);
3276
3277        // test column on the right
3278        let expr =
3279            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3280        let predicate_expr =
3281            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3282        assert_eq!(predicate_expr.to_string(), expected_expr);
3283
3284        Ok(())
3285    }
3286
3287    #[test]
3288    fn row_group_predicate_string_date() -> Result<()> {
3289        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3290        let expected_expr = "true";
3291
3292        // test column on the left
3293        let expr = cast(col("c1"), DataType::Utf8)
3294            .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3295        let predicate_expr =
3296            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3297        assert_eq!(predicate_expr.to_string(), expected_expr);
3298
3299        // test column on the right
3300        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3301            .eq(cast(col("c1"), DataType::Utf8));
3302        let predicate_expr =
3303            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3304        assert_eq!(predicate_expr.to_string(), expected_expr);
3305
3306        Ok(())
3307    }
3308
3309    #[test]
3310    fn row_group_predicate_cast_list() -> Result<()> {
3311        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3312        // test cast(c1 as int64) in int64(1, 2, 3)
3313        let expr = Expr::InList(InList::new(
3314            Box::new(cast(col("c1"), DataType::Int64)),
3315            vec![
3316                lit(ScalarValue::Int64(Some(1))),
3317                lit(ScalarValue::Int64(Some(2))),
3318                lit(ScalarValue::Int64(Some(3))),
3319            ],
3320            false,
3321        ));
3322        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3323        let predicate_expr =
3324            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3325        assert_eq!(predicate_expr.to_string(), expected_expr);
3326
3327        let expr = Expr::InList(InList::new(
3328            Box::new(cast(col("c1"), DataType::Int64)),
3329            vec![
3330                lit(ScalarValue::Int64(Some(1))),
3331                lit(ScalarValue::Int64(Some(2))),
3332                lit(ScalarValue::Int64(Some(3))),
3333            ],
3334            true,
3335        ));
3336        let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3337        let predicate_expr =
3338            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3339        assert_eq!(predicate_expr.to_string(), expected_expr);
3340
3341        Ok(())
3342    }
3343
3344    #[test]
3345    fn prune_decimal_data() {
3346        // decimal(9,2)
3347        let schema = Arc::new(Schema::new(vec![Field::new(
3348            "s1",
3349            DataType::Decimal128(9, 2),
3350            true,
3351        )]));
3352
3353        prune_with_expr(
3354            // s1 > 5
3355            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3356            &schema,
3357            // If the data is written by spark, the physical data type is INT32 in the parquet
3358            // So we use the INT32 type of statistic.
3359            &TestStatistics::new().with(
3360                "s1",
3361                ContainerStats::new_i32(
3362                    vec![Some(0), Some(4), None, Some(3)], // min
3363                    vec![Some(5), Some(6), Some(4), None], // max
3364                ),
3365            ),
3366            &[false, true, false, true],
3367        );
3368
3369        prune_with_expr(
3370            // with cast column to other type
3371            cast(col("s1"), DataType::Decimal128(14, 3))
3372                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3373            &schema,
3374            &TestStatistics::new().with(
3375                "s1",
3376                ContainerStats::new_i32(
3377                    vec![Some(0), Some(4), None, Some(3)], // min
3378                    vec![Some(5), Some(6), Some(4), None], // max
3379                ),
3380            ),
3381            &[false, true, false, true],
3382        );
3383
3384        prune_with_expr(
3385            // with try cast column to other type
3386            try_cast(col("s1"), DataType::Decimal128(14, 3))
3387                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3388            &schema,
3389            &TestStatistics::new().with(
3390                "s1",
3391                ContainerStats::new_i32(
3392                    vec![Some(0), Some(4), None, Some(3)], // min
3393                    vec![Some(5), Some(6), Some(4), None], // max
3394                ),
3395            ),
3396            &[false, true, false, true],
3397        );
3398
3399        // decimal(18,2)
3400        let schema = Arc::new(Schema::new(vec![Field::new(
3401            "s1",
3402            DataType::Decimal128(18, 2),
3403            true,
3404        )]));
3405        prune_with_expr(
3406            // s1 > 5
3407            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3408            &schema,
3409            // If the data is written by spark, the physical data type is INT64 in the parquet
3410            // So we use the INT32 type of statistic.
3411            &TestStatistics::new().with(
3412                "s1",
3413                ContainerStats::new_i64(
3414                    vec![Some(0), Some(4), None, Some(3)], // min
3415                    vec![Some(5), Some(6), Some(4), None], // max
3416                ),
3417            ),
3418            &[false, true, false, true],
3419        );
3420
3421        // decimal(23,2)
3422        let schema = Arc::new(Schema::new(vec![Field::new(
3423            "s1",
3424            DataType::Decimal128(23, 2),
3425            true,
3426        )]));
3427
3428        prune_with_expr(
3429            // s1 > 5
3430            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3431            &schema,
3432            &TestStatistics::new().with(
3433                "s1",
3434                ContainerStats::new_decimal128(
3435                    vec![Some(0), Some(400), None, Some(300)], // min
3436                    vec![Some(500), Some(600), Some(400), None], // max
3437                    23,
3438                    2,
3439                ),
3440            ),
3441            &[false, true, false, true],
3442        );
3443    }
3444
3445    #[test]
3446    fn prune_api() {
3447        let schema = Arc::new(Schema::new(vec![
3448            Field::new("s1", DataType::Utf8, true),
3449            Field::new("s2", DataType::Int32, true),
3450        ]));
3451
3452        let statistics = TestStatistics::new().with(
3453            "s2",
3454            ContainerStats::new_i32(
3455                vec![Some(0), Some(4), None, Some(3)], // min
3456                vec![Some(5), Some(6), None, None],    // max
3457            ),
3458        );
3459        prune_with_expr(
3460            // Prune using s2 > 5
3461            col("s2").gt(lit(5)),
3462            &schema,
3463            &statistics,
3464            // s2 [0, 5] ==> no rows should pass
3465            // s2 [4, 6] ==> some rows could pass
3466            // No stats for s2 ==> some rows could pass
3467            // s2 [3, None] (null max) ==> some rows could pass
3468            &[false, true, true, true],
3469        );
3470
3471        prune_with_expr(
3472            // filter with cast
3473            cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3474            &schema,
3475            &statistics,
3476            &[false, true, true, true],
3477        );
3478    }
3479
3480    #[test]
3481    fn prune_not_eq_data() {
3482        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3483
3484        prune_with_expr(
3485            // Prune using s2 != 'M'
3486            col("s1").not_eq(lit("M")),
3487            &schema,
3488            &TestStatistics::new().with(
3489                "s1",
3490                ContainerStats::new_utf8(
3491                    vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3492                    vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3493                ),
3494            ),
3495            // s1 [A, Z] ==> might have values that pass predicate
3496            // s1 [A, L] ==> all rows pass the predicate
3497            // s1 [N, Z] ==> all rows pass the predicate
3498            // s1 [M, M] ==> all rows do not pass the predicate
3499            // No stats for s2 ==> some rows could pass
3500            // s2 [3, None] (null max) ==> some rows could pass
3501            &[true, true, true, false, true, true],
3502        );
3503    }
3504
3505    /// Creates setup for boolean chunk pruning
3506    ///
3507    /// For predicate "b1" (boolean expr)
3508    /// b1 [false, false] ==> no rows can pass (not keep)
3509    /// b1 [false, true] ==> some rows could pass (must keep)
3510    /// b1 [true, true] ==> all rows must pass (must keep)
3511    /// b1 [NULL, NULL]  ==> unknown (must keep)
3512    /// b1 [false, NULL]  ==> unknown (must keep)
3513    ///
3514    /// For predicate "!b1" (boolean expr)
3515    /// b1 [false, false] ==> all rows pass (must keep)
3516    /// b1 [false, true] ==> some rows could pass (must keep)
3517    /// b1 [true, true] ==> no rows can pass (not keep)
3518    /// b1 [NULL, NULL]  ==> unknown (must keep)
3519    /// b1 [false, NULL]  ==> unknown (must keep)
3520    fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3521        let schema =
3522            Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3523
3524        let statistics = TestStatistics::new().with(
3525            "b1",
3526            ContainerStats::new_bool(
3527                vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3528                vec![Some(false), Some(true), Some(true), None, None],         // max
3529            ),
3530        );
3531        let expected_true = vec![false, true, true, true, true];
3532        let expected_false = vec![true, true, false, true, true];
3533
3534        (schema, statistics, expected_true, expected_false)
3535    }
3536
3537    #[test]
3538    fn prune_bool_const_expr() {
3539        let (schema, statistics, _, _) = bool_setup();
3540
3541        prune_with_expr(
3542            // true
3543            lit(true),
3544            &schema,
3545            &statistics,
3546            &[true, true, true, true, true],
3547        );
3548
3549        prune_with_expr(
3550            // false
3551            lit(false),
3552            &schema,
3553            &statistics,
3554            &[false, false, false, false, false],
3555        );
3556    }
3557
3558    #[test]
3559    fn prune_bool_column() {
3560        let (schema, statistics, expected_true, _) = bool_setup();
3561
3562        prune_with_expr(
3563            // b1
3564            col("b1"),
3565            &schema,
3566            &statistics,
3567            &expected_true,
3568        );
3569    }
3570
3571    #[test]
3572    fn prune_bool_not_column() {
3573        let (schema, statistics, _, expected_false) = bool_setup();
3574
3575        prune_with_expr(
3576            // !b1
3577            col("b1").not(),
3578            &schema,
3579            &statistics,
3580            &expected_false,
3581        );
3582    }
3583
3584    #[test]
3585    fn prune_bool_column_eq_true() {
3586        let (schema, statistics, expected_true, _) = bool_setup();
3587
3588        prune_with_expr(
3589            // b1 = true
3590            col("b1").eq(lit(true)),
3591            &schema,
3592            &statistics,
3593            &expected_true,
3594        );
3595    }
3596
3597    #[test]
3598    fn prune_bool_not_column_eq_true() {
3599        let (schema, statistics, _, expected_false) = bool_setup();
3600
3601        prune_with_expr(
3602            // !b1 = true
3603            col("b1").not().eq(lit(true)),
3604            &schema,
3605            &statistics,
3606            &expected_false,
3607        );
3608    }
3609
3610    /// Creates a setup for chunk pruning, modeling a int32 column "i"
3611    /// with 5 different containers (e.g. RowGroups). They have [min,
3612    /// max]:
3613    ///
3614    /// i [-5, 5]
3615    /// i [1, 11]
3616    /// i [-11, -1]
3617    /// i [NULL, NULL]
3618    /// i [1, NULL]
3619    fn int32_setup() -> (SchemaRef, TestStatistics) {
3620        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3621
3622        let statistics = TestStatistics::new().with(
3623            "i",
3624            ContainerStats::new_i32(
3625                vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3626                vec![Some(5), Some(11), Some(-1), None, None],     // max
3627            ),
3628        );
3629        (schema, statistics)
3630    }
3631
3632    #[test]
3633    fn prune_int32_col_gt_zero() {
3634        let (schema, statistics) = int32_setup();
3635
3636        // Expression "i > 0" and "-i < 0"
3637        // i [-5, 5] ==> some rows could pass (must keep)
3638        // i [1, 11] ==> all rows must pass (must keep)
3639        // i [-11, -1] ==>  no rows can pass (not keep)
3640        // i [NULL, NULL]  ==> unknown (must keep)
3641        // i [1, NULL]  ==> unknown (must keep)
3642        let expected_ret = &[true, true, false, true, true];
3643
3644        // i > 0
3645        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3646
3647        // -i < 0
3648        prune_with_expr(
3649            Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3650            &schema,
3651            &statistics,
3652            expected_ret,
3653        );
3654    }
3655
3656    #[test]
3657    fn prune_int32_col_lte_zero() {
3658        let (schema, statistics) = int32_setup();
3659
3660        // Expression "i <= 0" and "-i >= 0"
3661        // i [-5, 5] ==> some rows could pass (must keep)
3662        // i [1, 11] ==> no rows can pass (not keep)
3663        // i [-11, -1] ==>  all rows must pass (must keep)
3664        // i [NULL, NULL]  ==> unknown (must keep)
3665        // i [1, NULL]  ==> no rows can pass (not keep)
3666        let expected_ret = &[true, false, true, true, false];
3667
3668        prune_with_expr(
3669            // i <= 0
3670            col("i").lt_eq(lit(0)),
3671            &schema,
3672            &statistics,
3673            expected_ret,
3674        );
3675
3676        prune_with_expr(
3677            // -i >= 0
3678            Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3679            &schema,
3680            &statistics,
3681            expected_ret,
3682        );
3683    }
3684
3685    #[test]
3686    fn prune_int32_col_lte_zero_cast() {
3687        let (schema, statistics) = int32_setup();
3688
3689        // Expression "cast(i as utf8) <= '0'"
3690        // i [-5, 5] ==> some rows could pass (must keep)
3691        // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
3692        // i [-11, -1] ==>  no rows could pass in theory (conservatively keep)
3693        // i [NULL, NULL]  ==> unknown (must keep)
3694        // i [1, NULL]  ==> no rows can pass (conservatively keep)
3695        let expected_ret = &[true, true, true, true, true];
3696
3697        prune_with_expr(
3698            // cast(i as utf8) <= 0
3699            cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3700            &schema,
3701            &statistics,
3702            expected_ret,
3703        );
3704
3705        prune_with_expr(
3706            // try_cast(i as utf8) <= 0
3707            try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3708            &schema,
3709            &statistics,
3710            expected_ret,
3711        );
3712
3713        prune_with_expr(
3714            // cast(-i as utf8) >= 0
3715            cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3716            &schema,
3717            &statistics,
3718            expected_ret,
3719        );
3720
3721        prune_with_expr(
3722            // try_cast(-i as utf8) >= 0
3723            try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3724            &schema,
3725            &statistics,
3726            expected_ret,
3727        );
3728    }
3729
3730    #[test]
3731    fn prune_int32_col_eq_zero() {
3732        let (schema, statistics) = int32_setup();
3733
3734        // Expression "i = 0"
3735        // i [-5, 5] ==> some rows could pass (must keep)
3736        // i [1, 11] ==> no rows can pass (not keep)
3737        // i [-11, -1] ==>  no rows can pass (not keep)
3738        // i [NULL, NULL]  ==> unknown (must keep)
3739        // i [1, NULL]  ==> no rows can pass (not keep)
3740        let expected_ret = &[true, false, false, true, false];
3741
3742        prune_with_expr(
3743            // i = 0
3744            col("i").eq(lit(0)),
3745            &schema,
3746            &statistics,
3747            expected_ret,
3748        );
3749    }
3750
3751    #[test]
3752    fn prune_int32_col_eq_zero_cast() {
3753        let (schema, statistics) = int32_setup();
3754
3755        // Expression "cast(i as int64) = 0"
3756        // i [-5, 5] ==> some rows could pass (must keep)
3757        // i [1, 11] ==> no rows can pass (not keep)
3758        // i [-11, -1] ==>  no rows can pass (not keep)
3759        // i [NULL, NULL]  ==> unknown (must keep)
3760        // i [1, NULL]  ==> no rows can pass (not keep)
3761        let expected_ret = &[true, false, false, true, false];
3762
3763        prune_with_expr(
3764            cast(col("i"), DataType::Int64).eq(lit(0i64)),
3765            &schema,
3766            &statistics,
3767            expected_ret,
3768        );
3769
3770        prune_with_expr(
3771            try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3772            &schema,
3773            &statistics,
3774            expected_ret,
3775        );
3776    }
3777
3778    #[test]
3779    fn prune_int32_col_eq_zero_cast_as_str() {
3780        let (schema, statistics) = int32_setup();
3781
3782        // Note the cast is to a string where sorting properties are
3783        // not the same as integers
3784        //
3785        // Expression "cast(i as utf8) = '0'"
3786        // i [-5, 5] ==> some rows could pass (keep)
3787        // i [1, 11] ==> no rows can pass  (could keep)
3788        // i [-11, -1] ==>  no rows can pass (could keep)
3789        // i [NULL, NULL]  ==> unknown (keep)
3790        // i [1, NULL]  ==> no rows can pass (could keep)
3791        let expected_ret = &[true, true, true, true, true];
3792
3793        prune_with_expr(
3794            cast(col("i"), DataType::Utf8).eq(lit("0")),
3795            &schema,
3796            &statistics,
3797            expected_ret,
3798        );
3799    }
3800
3801    #[test]
3802    fn prune_int32_col_lt_neg_one() {
3803        let (schema, statistics) = int32_setup();
3804
3805        // Expression "i > -1" and "-i < 1"
3806        // i [-5, 5] ==> some rows could pass (must keep)
3807        // i [1, 11] ==> all rows must pass (must keep)
3808        // i [-11, -1] ==>  no rows can pass (not keep)
3809        // i [NULL, NULL]  ==> unknown (must keep)
3810        // i [1, NULL]  ==> all rows must pass (must keep)
3811        let expected_ret = &[true, true, false, true, true];
3812
3813        prune_with_expr(
3814            // i > -1
3815            col("i").gt(lit(-1)),
3816            &schema,
3817            &statistics,
3818            expected_ret,
3819        );
3820
3821        prune_with_expr(
3822            // -i < 1
3823            Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3824            &schema,
3825            &statistics,
3826            expected_ret,
3827        );
3828    }
3829
3830    #[test]
3831    fn prune_int32_is_null() {
3832        let (schema, statistics) = int32_setup();
3833
3834        // Expression "i IS NULL" when there are no null statistics,
3835        // should all be kept
3836        let expected_ret = &[true, true, true, true, true];
3837
3838        prune_with_expr(
3839            // i IS NULL, no null statistics
3840            col("i").is_null(),
3841            &schema,
3842            &statistics,
3843            expected_ret,
3844        );
3845
3846        // provide null counts for each column
3847        let statistics = statistics.with_null_counts(
3848            "i",
3849            vec![
3850                Some(0), // no nulls (don't keep)
3851                Some(1), // 1 null
3852                None,    // unknown nulls
3853                None, // unknown nulls (min/max are both null too, like no stats at all)
3854                Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
3855            ],
3856        );
3857
3858        let expected_ret = &[false, true, true, true, false];
3859
3860        prune_with_expr(
3861            // i IS NULL, with actual null statistics
3862            col("i").is_null(),
3863            &schema,
3864            &statistics,
3865            expected_ret,
3866        );
3867    }
3868
3869    #[test]
3870    fn prune_int32_column_is_known_all_null() {
3871        let (schema, statistics) = int32_setup();
3872
3873        // Expression "i < 0"
3874        // i [-5, 5] ==> some rows could pass (must keep)
3875        // i [1, 11] ==> no rows can pass (not keep)
3876        // i [-11, -1] ==>  all rows must pass (must keep)
3877        // i [NULL, NULL]  ==> unknown (must keep)
3878        // i [1, NULL]  ==> no rows can pass (not keep)
3879        let expected_ret = &[true, false, true, true, false];
3880
3881        prune_with_expr(
3882            // i < 0
3883            col("i").lt(lit(0)),
3884            &schema,
3885            &statistics,
3886            expected_ret,
3887        );
3888
3889        // provide row counts for each column
3890        let statistics = statistics.with_row_counts(
3891            "i",
3892            vec![
3893                Some(10), // 10 rows of data
3894                Some(9),  // 9 rows of data
3895                None,     // unknown row counts
3896                Some(4),
3897                Some(10),
3898            ],
3899        );
3900
3901        // pruning result is still the same if we only know row counts
3902        prune_with_expr(
3903            // i < 0, with only row counts statistics
3904            col("i").lt(lit(0)),
3905            &schema,
3906            &statistics,
3907            expected_ret,
3908        );
3909
3910        // provide null counts for each column
3911        let statistics = statistics.with_null_counts(
3912            "i",
3913            vec![
3914                Some(0), // no nulls
3915                Some(1), // 1 null
3916                None,    // unknown nulls
3917                Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
3918                Some(0), // 0 nulls (max=null too which means no known max)
3919            ],
3920        );
3921
3922        // Expression "i < 0" with actual null and row counts statistics
3923        // col | min, max     | row counts | null counts |
3924        // ----+--------------+------------+-------------+
3925        //  i  | [-5, 5]      | 10         | 0           | ==> Some rows could pass (must keep)
3926        //  i  | [1, 11]      | 9          | 1           | ==> No rows can pass (not keep)
3927        //  i  | [-11,-1]     | Unknown    | Unknown     | ==> All rows must pass (must keep)
3928        //  i  | [NULL, NULL] | 4          | 4           | ==> The column is all null (not keep)
3929        //  i  | [1, NULL]    | 10         | 0           | ==> No rows can pass (not keep)
3930        let expected_ret = &[true, false, true, false, false];
3931
3932        prune_with_expr(
3933            // i < 0, with actual null and row counts statistics
3934            col("i").lt(lit(0)),
3935            &schema,
3936            &statistics,
3937            expected_ret,
3938        );
3939    }
3940
3941    #[test]
3942    fn prune_cast_column_scalar() {
3943        // The data type of column i is INT32
3944        let (schema, statistics) = int32_setup();
3945        let expected_ret = &[true, true, false, true, true];
3946
3947        prune_with_expr(
3948            // i > int64(0)
3949            col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3950            &schema,
3951            &statistics,
3952            expected_ret,
3953        );
3954
3955        prune_with_expr(
3956            // cast(i as int64) > int64(0)
3957            cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3958            &schema,
3959            &statistics,
3960            expected_ret,
3961        );
3962
3963        prune_with_expr(
3964            // try_cast(i as int64) > int64(0)
3965            try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3966            &schema,
3967            &statistics,
3968            expected_ret,
3969        );
3970
3971        prune_with_expr(
3972            // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
3973            Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3974                .lt(lit(ScalarValue::Int64(Some(0)))),
3975            &schema,
3976            &statistics,
3977            expected_ret,
3978        );
3979    }
3980
3981    #[test]
3982    fn test_increment_utf8() {
3983        // Basic ASCII
3984        assert_eq!(increment_utf8("abc").unwrap(), "abd");
3985        assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3986
3987        // Test around ASCII 127 (DEL)
3988        assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
3989        assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
3990
3991        // Test 2-byte UTF-8 sequences
3992        assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
3993
3994        // Test 3-byte UTF-8 sequences
3995        assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
3996
3997        // Test at UTF-8 boundaries
3998        assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
3999        assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
4000
4001        // Test that if we can't increment we return None
4002        assert!(increment_utf8("").is_none());
4003        assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
4004
4005        // Test that if we can't increment the last character we do the previous one and truncate
4006        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4007
4008        // Test surrogate pair range (0xD800..=0xDFFF)
4009        assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
4010        assert!(increment_utf8("\u{D7FF}").is_none());
4011
4012        // Test non-characters range (0xFDD0..=0xFDEF)
4013        assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
4014        assert!(increment_utf8("\u{FDCF}").is_none());
4015
4016        // Test private use area limit (>= 0x110000)
4017        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
4018        assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
4019    }
4020
4021    /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
4022    /// with 5 different containers (e.g. RowGroups). They have [min,
4023    /// max]:
4024    /// s1 ["A", "Z"]
4025    /// s1 ["A", "L"]
4026    /// s1 ["N", "Z"]
4027    /// s1 [NULL, NULL]
4028    /// s1 ["A", NULL]
4029    /// s1 ["", "A"]
4030    /// s1 ["", ""]
4031    /// s1 ["AB", "A\u{10ffff}"]
4032    /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
4033    fn utf8_setup() -> (SchemaRef, TestStatistics) {
4034        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4035
4036        let statistics = TestStatistics::new().with(
4037            "s1",
4038            ContainerStats::new_utf8(
4039                vec![
4040                    Some("A"),
4041                    Some("A"),
4042                    Some("N"),
4043                    Some("M"),
4044                    None,
4045                    Some("A"),
4046                    Some(""),
4047                    Some(""),
4048                    Some("AB"),
4049                    Some("A\u{10ffff}\u{10ffff}"),
4050                ], // min
4051                vec![
4052                    Some("Z"),
4053                    Some("L"),
4054                    Some("Z"),
4055                    Some("M"),
4056                    None,
4057                    None,
4058                    Some("A"),
4059                    Some(""),
4060                    Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4061                    Some("A\u{10ffff}\u{10ffff}"),
4062                ], // max
4063            ),
4064        );
4065        (schema, statistics)
4066    }
4067
4068    #[test]
4069    fn prune_utf8_eq() {
4070        let (schema, statistics) = utf8_setup();
4071
4072        let expr = col("s1").eq(lit("A"));
4073        #[rustfmt::skip]
4074        let expected_ret = &[
4075            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4076            true,
4077            // s1 ["A", "L"] ==> some rows could pass (must keep)
4078            true,
4079            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4080            false,
4081            // s1 ["M", "M"] ==> no rows can pass (not keep)
4082            false,
4083            // s1 [NULL, NULL]  ==> unknown (must keep)
4084            true,
4085            // s1 ["A", NULL]  ==> unknown (must keep)
4086            true,
4087            // s1 ["", "A"]  ==> some rows could pass (must keep)
4088            true,
4089            // s1 ["", ""]  ==> no rows can pass (not keep)
4090            false,
4091            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4092            false,
4093            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4094            false,
4095        ];
4096        prune_with_expr(expr, &schema, &statistics, expected_ret);
4097
4098        let expr = col("s1").eq(lit(""));
4099        #[rustfmt::skip]
4100        let expected_ret = &[
4101            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4102            false,
4103            // s1 ["A", "L"] ==> no rows can pass (not keep)
4104            false,
4105            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4106            false,
4107            // s1 ["M", "M"] ==> no rows can pass (not keep)
4108            false,
4109            // s1 [NULL, NULL]  ==> unknown (must keep)
4110            true,
4111            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4112            false,
4113            // s1 ["", "A"]  ==> some rows could pass (must keep)
4114            true,
4115            // s1 ["", ""]  ==> all rows must pass (must keep)
4116            true,
4117            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4118            false,
4119            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4120            false,
4121        ];
4122        prune_with_expr(expr, &schema, &statistics, expected_ret);
4123    }
4124
4125    #[test]
4126    fn prune_utf8_not_eq() {
4127        let (schema, statistics) = utf8_setup();
4128
4129        let expr = col("s1").not_eq(lit("A"));
4130        #[rustfmt::skip]
4131        let expected_ret = &[
4132            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4133            true,
4134            // s1 ["A", "L"] ==> some rows could pass (must keep)
4135            true,
4136            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4137            true,
4138            // s1 ["M", "M"] ==> all rows must pass (must keep)
4139            true,
4140            // s1 [NULL, NULL]  ==> unknown (must keep)
4141            true,
4142            // s1 ["A", NULL]  ==> unknown (must keep)
4143            true,
4144            // s1 ["", "A"]  ==> some rows could pass (must keep)
4145            true,
4146            // s1 ["", ""]  ==> all rows must pass (must keep)
4147            true,
4148            // s1 ["AB", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4149            true,
4150            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4151            true,
4152        ];
4153        prune_with_expr(expr, &schema, &statistics, expected_ret);
4154
4155        let expr = col("s1").not_eq(lit(""));
4156        #[rustfmt::skip]
4157        let expected_ret = &[
4158            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4159            true,
4160            // s1 ["A", "L"] ==> all rows must pass (must keep)
4161            true,
4162            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4163            true,
4164            // s1 ["M", "M"] ==> all rows must pass (must keep)
4165            true,
4166            // s1 [NULL, NULL]  ==> unknown (must keep)
4167            true,
4168            // s1 ["A", NULL]  ==> unknown (must keep)
4169            true,
4170            // s1 ["", "A"]  ==> some rows could pass (must keep)
4171            true,
4172            // s1 ["", ""]  ==> no rows can pass (not keep)
4173            false,
4174            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4175            true,
4176            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4177            true,
4178        ];
4179        prune_with_expr(expr, &schema, &statistics, expected_ret);
4180    }
4181
4182    #[test]
4183    fn prune_utf8_like_one() {
4184        let (schema, statistics) = utf8_setup();
4185
4186        let expr = col("s1").like(lit("A_"));
4187        #[rustfmt::skip]
4188        let expected_ret = &[
4189            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4190            true,
4191            // s1 ["A", "L"] ==> some rows could pass (must keep)
4192            true,
4193            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4194            false,
4195            // s1 ["M", "M"] ==> no rows can pass (not keep)
4196            false,
4197            // s1 [NULL, NULL]  ==> unknown (must keep)
4198            true,
4199            // s1 ["A", NULL]  ==> unknown (must keep)
4200            true,
4201            // s1 ["", "A"]  ==> some rows could pass (must keep)
4202            true,
4203            // s1 ["", ""]  ==> no rows can pass (not keep)
4204            false,
4205            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4206            true,
4207            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4208            true,
4209        ];
4210        prune_with_expr(expr, &schema, &statistics, expected_ret);
4211
4212        let expr = col("s1").like(lit("_A_"));
4213        #[rustfmt::skip]
4214        let expected_ret = &[
4215            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4216            true,
4217            // s1 ["A", "L"] ==> some rows could pass (must keep)
4218            true,
4219            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4220            true,
4221            // s1 ["M", "M"] ==> some rows could pass (must keep)
4222            true,
4223            // s1 [NULL, NULL]  ==> unknown (must keep)
4224            true,
4225            // s1 ["A", NULL]  ==> unknown (must keep)
4226            true,
4227            // s1 ["", "A"]  ==> some rows could pass (must keep)
4228            true,
4229            // s1 ["", ""]  ==> some rows could pass (must keep)
4230            true,
4231            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4232            true,
4233            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4234            true,
4235        ];
4236        prune_with_expr(expr, &schema, &statistics, expected_ret);
4237
4238        let expr = col("s1").like(lit("_"));
4239        #[rustfmt::skip]
4240        let expected_ret = &[
4241            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4242            true,
4243            // s1 ["A", "L"] ==> all rows must pass (must keep)
4244            true,
4245            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4246            true,
4247            // s1 ["M", "M"] ==> all rows must pass (must keep)
4248            true,
4249            // s1 [NULL, NULL]  ==> unknown (must keep)
4250            true,
4251            // s1 ["A", NULL]  ==> unknown (must keep)
4252            true,
4253            // s1 ["", "A"]  ==> all rows must pass (must keep)
4254            true,
4255            // s1 ["", ""]  ==> all rows must pass (must keep)
4256            true,
4257            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4258            true,
4259            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4260            true,
4261        ];
4262        prune_with_expr(expr, &schema, &statistics, expected_ret);
4263
4264        let expr = col("s1").like(lit(""));
4265        #[rustfmt::skip]
4266        let expected_ret = &[
4267            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4268            false,
4269            // s1 ["A", "L"] ==> no rows can pass (not keep)
4270            false,
4271            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4272            false,
4273            // s1 ["M", "M"] ==> no rows can pass (not keep)
4274            false,
4275            // s1 [NULL, NULL]  ==> unknown (must keep)
4276            true,
4277            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4278            false,
4279            // s1 ["", "A"]  ==> some rows could pass (must keep)
4280            true,
4281            // s1 ["", ""]  ==> all rows must pass (must keep)
4282            true,
4283            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4284            false,
4285            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4286            false,
4287        ];
4288        prune_with_expr(expr, &schema, &statistics, expected_ret);
4289    }
4290
4291    #[test]
4292    fn prune_utf8_like_many() {
4293        let (schema, statistics) = utf8_setup();
4294
4295        let expr = col("s1").like(lit("A%"));
4296        #[rustfmt::skip]
4297        let expected_ret = &[
4298            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4299            true,
4300            // s1 ["A", "L"] ==> some rows could pass (must keep)
4301            true,
4302            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4303            false,
4304            // s1 ["M", "M"] ==> no rows can pass (not keep)
4305            false,
4306            // s1 [NULL, NULL]  ==> unknown (must keep)
4307            true,
4308            // s1 ["A", NULL]  ==> unknown (must keep)
4309            true,
4310            // s1 ["", "A"]  ==> some rows could pass (must keep)
4311            true,
4312            // s1 ["", ""]  ==> no rows can pass (not keep)
4313            false,
4314            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4315            true,
4316            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4317            true,
4318        ];
4319        prune_with_expr(expr, &schema, &statistics, expected_ret);
4320
4321        let expr = col("s1").like(lit("%A%"));
4322        #[rustfmt::skip]
4323        let expected_ret = &[
4324            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4325            true,
4326            // s1 ["A", "L"] ==> some rows could pass (must keep)
4327            true,
4328            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4329            true,
4330            // s1 ["M", "M"] ==> some rows could pass (must keep)
4331            true,
4332            // s1 [NULL, NULL]  ==> unknown (must keep)
4333            true,
4334            // s1 ["A", NULL]  ==> unknown (must keep)
4335            true,
4336            // s1 ["", "A"]  ==> some rows could pass (must keep)
4337            true,
4338            // s1 ["", ""]  ==> some rows could pass (must keep)
4339            true,
4340            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4341            true,
4342            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4343            true,
4344        ];
4345        prune_with_expr(expr, &schema, &statistics, expected_ret);
4346
4347        let expr = col("s1").like(lit("%"));
4348        #[rustfmt::skip]
4349        let expected_ret = &[
4350            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4351            true,
4352            // s1 ["A", "L"] ==> all rows must pass (must keep)
4353            true,
4354            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4355            true,
4356            // s1 ["M", "M"] ==> all rows must pass (must keep)
4357            true,
4358            // s1 [NULL, NULL]  ==> unknown (must keep)
4359            true,
4360            // s1 ["A", NULL]  ==> unknown (must keep)
4361            true,
4362            // s1 ["", "A"]  ==> all rows must pass (must keep)
4363            true,
4364            // s1 ["", ""]  ==> all rows must pass (must keep)
4365            true,
4366            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4367            true,
4368            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4369            true,
4370        ];
4371        prune_with_expr(expr, &schema, &statistics, expected_ret);
4372
4373        let expr = col("s1").like(lit(""));
4374        #[rustfmt::skip]
4375        let expected_ret = &[
4376            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4377            false,
4378            // s1 ["A", "L"] ==> no rows can pass (not keep)
4379            false,
4380            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4381            false,
4382            // s1 ["M", "M"] ==> no rows can pass (not keep)
4383            false,
4384            // s1 [NULL, NULL]  ==> unknown (must keep)
4385            true,
4386            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4387            false,
4388            // s1 ["", "A"]  ==> some rows could pass (must keep)
4389            true,
4390            // s1 ["", ""]  ==> all rows must pass (must keep)
4391            true,
4392            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4393            false,
4394            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4395            false,
4396        ];
4397        prune_with_expr(expr, &schema, &statistics, expected_ret);
4398    }
4399
4400    #[test]
4401    fn prune_utf8_not_like_one() {
4402        let (schema, statistics) = utf8_setup();
4403
4404        let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4405        #[rustfmt::skip]
4406        let expected_ret = &[
4407            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4408            true,
4409            // s1 ["A", "L"] ==> some rows could pass (must keep)
4410            true,
4411            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4412            true,
4413            // s1 ["M", "M"] ==> some rows could pass (must keep)
4414            true,
4415            // s1 [NULL, NULL]  ==> unknown (must keep)
4416            true,
4417            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4418            true,
4419            // s1 ["", "A"]  ==> some rows could pass (must keep)
4420            true,
4421            // s1 ["", ""]  ==> some rows could pass (must keep)
4422            true,
4423            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4424            true,
4425            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match. (min, max) maybe truncate 
4426            // original (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
4427            true,
4428        ];
4429        prune_with_expr(expr, &schema, &statistics, expected_ret);
4430    }
4431
4432    #[test]
4433    fn prune_utf8_not_like_many() {
4434        let (schema, statistics) = utf8_setup();
4435
4436        let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4437        #[rustfmt::skip]
4438        let expected_ret = &[
4439            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4440            true,
4441            // s1 ["A", "L"] ==> some rows could pass (must keep)
4442            true,
4443            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4444            true,
4445            // s1 ["M", "M"] ==> some rows could pass (must keep)
4446            true,
4447            // s1 [NULL, NULL]  ==> unknown (must keep)
4448            true,
4449            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4450            true,
4451            // s1 ["", "A"]  ==> some rows could pass (must keep)
4452            true,
4453            // s1 ["", ""]  ==> some rows could pass (must keep)
4454            true,
4455            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4456            true,
4457            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match
4458            false,
4459        ];
4460        prune_with_expr(expr, &schema, &statistics, expected_ret);
4461
4462        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4463        #[rustfmt::skip]
4464        let expected_ret = &[
4465            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4466            true,
4467            // s1 ["A", "L"] ==> some rows could pass (must keep)
4468            true,
4469            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4470            true,
4471            // s1 ["M", "M"] ==> some rows could pass (must keep)
4472            true,
4473            // s1 [NULL, NULL]  ==> unknown (must keep)
4474            true,
4475            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4476            true,
4477            // s1 ["", "A"]  ==> some rows could pass (must keep)
4478            true,
4479            // s1 ["", ""]  ==> some rows could pass (must keep)
4480            true,
4481            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4482            true,
4483            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4484            true,
4485        ];
4486        prune_with_expr(expr, &schema, &statistics, expected_ret);
4487
4488        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4489        #[rustfmt::skip]
4490        let expected_ret = &[
4491            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4492            true,
4493            // s1 ["A", "L"] ==> some rows could pass (must keep)
4494            true,
4495            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4496            true,
4497            // s1 ["M", "M"] ==> some rows could pass (must keep)
4498            true,
4499            // s1 [NULL, NULL]  ==> unknown (must keep)
4500            true,
4501            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4502            true,
4503            // s1 ["", "A"]  ==> some rows could pass (must keep)
4504            true,
4505            // s1 ["", ""]  ==> some rows could pass (must keep)
4506            true,
4507            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4508            true,
4509            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4510            true,
4511        ];
4512        prune_with_expr(expr, &schema, &statistics, expected_ret);
4513
4514        let expr = col("s1").not_like(lit("A\\%%"));
4515        let statistics = TestStatistics::new().with(
4516            "s1",
4517            ContainerStats::new_utf8(
4518                vec![Some("A%a"), Some("A")],
4519                vec![Some("A%c"), Some("A")],
4520            ),
4521        );
4522        let expected_ret = &[false, true];
4523        prune_with_expr(expr, &schema, &statistics, expected_ret);
4524    }
4525
4526    #[test]
4527    fn test_rewrite_expr_to_prunable() {
4528        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4529        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4530
4531        // column op lit
4532        let left_input = col("a");
4533        let left_input = logical2physical(&left_input, &schema);
4534        let right_input = lit(ScalarValue::Int32(Some(12)));
4535        let right_input = logical2physical(&right_input, &schema);
4536        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4537            &left_input,
4538            Operator::Eq,
4539            &right_input,
4540            df_schema.clone(),
4541        )
4542        .unwrap();
4543        assert_eq!(result_left.to_string(), left_input.to_string());
4544        assert_eq!(result_right.to_string(), right_input.to_string());
4545
4546        // cast op lit
4547        let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4548        let left_input = logical2physical(&left_input, &schema);
4549        let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4550        let right_input = logical2physical(&right_input, &schema);
4551        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4552            &left_input,
4553            Operator::Gt,
4554            &right_input,
4555            df_schema.clone(),
4556        )
4557        .unwrap();
4558        assert_eq!(result_left.to_string(), left_input.to_string());
4559        assert_eq!(result_right.to_string(), right_input.to_string());
4560
4561        // try_cast op lit
4562        let left_input = try_cast(col("a"), DataType::Int64);
4563        let left_input = logical2physical(&left_input, &schema);
4564        let right_input = lit(ScalarValue::Int64(Some(12)));
4565        let right_input = logical2physical(&right_input, &schema);
4566        let (result_left, _, result_right) =
4567            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4568                .unwrap();
4569        assert_eq!(result_left.to_string(), left_input.to_string());
4570        assert_eq!(result_right.to_string(), right_input.to_string());
4571
4572        // TODO: add test for other case and op
4573    }
4574
4575    #[test]
4576    fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4577        struct CustomUnhandledHook;
4578
4579        impl UnhandledPredicateHook for CustomUnhandledHook {
4580            /// This handles an arbitrary case of a column that doesn't exist in the schema
4581            /// by renaming it to yet another column that doesn't exist in the schema
4582            /// (the transformation is arbitrary, the point is that it can do whatever it wants)
4583            fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4584                Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4585            }
4586        }
4587
4588        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4589        let schema_with_b = Schema::new(vec![
4590            Field::new("a", DataType::Int32, true),
4591            Field::new("b", DataType::Int32, true),
4592        ]);
4593
4594        let rewriter = PredicateRewriter::new()
4595            .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4596
4597        let transform_expr = |expr| {
4598            let expr = logical2physical(&expr, &schema_with_b);
4599            rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4600        };
4601
4602        // transform an arbitrary valid expression that we know is handled
4603        let known_expression = col("a").eq(lit(12));
4604        let known_expression_transformed = PredicateRewriter::new()
4605            .rewrite_predicate_to_statistics_predicate(
4606                &logical2physical(&known_expression, &schema),
4607                &schema,
4608            );
4609
4610        // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
4611        let input = col("b").eq(lit(12));
4612        let expected = logical2physical(&lit(42), &schema);
4613        let transformed = transform_expr(input.clone());
4614        assert_eq!(transformed.to_string(), expected.to_string());
4615
4616        // more complex case with unknown column
4617        let input = known_expression.clone().and(input.clone());
4618        let expected = phys_expr::BinaryExpr::new(
4619            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4620            Operator::And,
4621            logical2physical(&lit(42), &schema),
4622        );
4623        let transformed = transform_expr(input.clone());
4624        assert_eq!(transformed.to_string(), expected.to_string());
4625
4626        // an unknown expression gets passed to the hook
4627        let input = array_has(make_array(vec![lit(1)]), col("a"));
4628        let expected = logical2physical(&lit(42), &schema);
4629        let transformed = transform_expr(input.clone());
4630        assert_eq!(transformed.to_string(), expected.to_string());
4631
4632        // more complex case with unknown expression
4633        let input = known_expression.and(input);
4634        let expected = phys_expr::BinaryExpr::new(
4635            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4636            Operator::And,
4637            logical2physical(&lit(42), &schema),
4638        );
4639        let transformed = transform_expr(input.clone());
4640        assert_eq!(transformed.to_string(), expected.to_string());
4641    }
4642
4643    #[test]
4644    fn test_rewrite_expr_to_prunable_error() {
4645        // cast string value to numeric value
4646        // this cast is not supported
4647        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4648        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4649        let left_input = cast(col("a"), DataType::Int64);
4650        let left_input = logical2physical(&left_input, &schema);
4651        let right_input = lit(ScalarValue::Int64(Some(12)));
4652        let right_input = logical2physical(&right_input, &schema);
4653        let result = rewrite_expr_to_prunable(
4654            &left_input,
4655            Operator::Gt,
4656            &right_input,
4657            df_schema.clone(),
4658        );
4659        assert!(result.is_err());
4660
4661        // other expr
4662        let left_input = is_null(col("a"));
4663        let left_input = logical2physical(&left_input, &schema);
4664        let right_input = lit(ScalarValue::Int64(Some(12)));
4665        let right_input = logical2physical(&right_input, &schema);
4666        let result =
4667            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4668        assert!(result.is_err());
4669        // TODO: add other negative test for other case and op
4670    }
4671
4672    #[test]
4673    fn prune_with_contained_one_column() {
4674        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4675
4676        // Model having information like a bloom filter for s1
4677        let statistics = TestStatistics::new()
4678            .with_contained(
4679                "s1",
4680                [ScalarValue::from("foo")],
4681                [
4682                    // container 0 known to only contain "foo"",
4683                    Some(true),
4684                    // container 1 known to not contain "foo"
4685                    Some(false),
4686                    // container 2 unknown about "foo"
4687                    None,
4688                    // container 3 known to only contain "foo"
4689                    Some(true),
4690                    // container 4 known to not contain "foo"
4691                    Some(false),
4692                    // container 5 unknown about "foo"
4693                    None,
4694                    // container 6 known to only contain "foo"
4695                    Some(true),
4696                    // container 7 known to not contain "foo"
4697                    Some(false),
4698                    // container 8 unknown about "foo"
4699                    None,
4700                ],
4701            )
4702            .with_contained(
4703                "s1",
4704                [ScalarValue::from("bar")],
4705                [
4706                    // containers 0,1,2 known to only contain "bar"
4707                    Some(true),
4708                    Some(true),
4709                    Some(true),
4710                    // container 3,4,5 known to not contain "bar"
4711                    Some(false),
4712                    Some(false),
4713                    Some(false),
4714                    // container 6,7,8 unknown about "bar"
4715                    None,
4716                    None,
4717                    None,
4718                ],
4719            )
4720            .with_contained(
4721                // the way the tests are setup, this data is
4722                // consulted if the "foo" and "bar" are being checked at the same time
4723                "s1",
4724                [ScalarValue::from("foo"), ScalarValue::from("bar")],
4725                [
4726                    // container 0,1,2 unknown about ("foo, "bar")
4727                    None,
4728                    None,
4729                    None,
4730                    // container 3,4,5 known to contain only either "foo" and "bar"
4731                    Some(true),
4732                    Some(true),
4733                    Some(true),
4734                    // container 6,7,8  known to contain  neither "foo" and "bar"
4735                    Some(false),
4736                    Some(false),
4737                    Some(false),
4738                ],
4739            );
4740
4741        // s1 = 'foo'
4742        prune_with_expr(
4743            col("s1").eq(lit("foo")),
4744            &schema,
4745            &statistics,
4746            // rule out containers ('false) where we know foo is not present
4747            &[true, false, true, true, false, true, true, false, true],
4748        );
4749
4750        // s1 = 'bar'
4751        prune_with_expr(
4752            col("s1").eq(lit("bar")),
4753            &schema,
4754            &statistics,
4755            // rule out containers where we know bar is not present
4756            &[true, true, true, false, false, false, true, true, true],
4757        );
4758
4759        // s1 = 'baz' (unknown value)
4760        prune_with_expr(
4761            col("s1").eq(lit("baz")),
4762            &schema,
4763            &statistics,
4764            // can't rule out anything
4765            &[true, true, true, true, true, true, true, true, true],
4766        );
4767
4768        // s1 = 'foo' AND s1 = 'bar'
4769        prune_with_expr(
4770            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4771            &schema,
4772            &statistics,
4773            // logically this predicate can't possibly be true (the column can't
4774            // take on both values) but we could rule it out if the stats tell
4775            // us that both values are not present
4776            &[true, true, true, true, true, true, true, true, true],
4777        );
4778
4779        // s1 = 'foo' OR s1 = 'bar'
4780        prune_with_expr(
4781            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4782            &schema,
4783            &statistics,
4784            // can rule out containers that we know contain neither foo nor bar
4785            &[true, true, true, true, true, true, false, false, false],
4786        );
4787
4788        // s1 = 'foo' OR s1 = 'baz'
4789        prune_with_expr(
4790            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4791            &schema,
4792            &statistics,
4793            // can't rule out anything container
4794            &[true, true, true, true, true, true, true, true, true],
4795        );
4796
4797        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
4798        prune_with_expr(
4799            col("s1")
4800                .eq(lit("foo"))
4801                .or(col("s1").eq(lit("bar")))
4802                .or(col("s1").eq(lit("baz"))),
4803            &schema,
4804            &statistics,
4805            // can rule out any containers based on knowledge of s1 and `foo`,
4806            // `bar` and (`foo`, `bar`)
4807            &[true, true, true, true, true, true, true, true, true],
4808        );
4809
4810        // s1 != foo
4811        prune_with_expr(
4812            col("s1").not_eq(lit("foo")),
4813            &schema,
4814            &statistics,
4815            // rule out containers we know for sure only contain foo
4816            &[false, true, true, false, true, true, false, true, true],
4817        );
4818
4819        // s1 != bar
4820        prune_with_expr(
4821            col("s1").not_eq(lit("bar")),
4822            &schema,
4823            &statistics,
4824            // rule out when we know for sure s1 has the value bar
4825            &[false, false, false, true, true, true, true, true, true],
4826        );
4827
4828        // s1 != foo AND s1 != bar
4829        prune_with_expr(
4830            col("s1")
4831                .not_eq(lit("foo"))
4832                .and(col("s1").not_eq(lit("bar"))),
4833            &schema,
4834            &statistics,
4835            // can rule out any container where we know s1 does not have either 'foo' or 'bar'
4836            &[true, true, true, false, false, false, true, true, true],
4837        );
4838
4839        // s1 != foo AND s1 != bar AND s1 != baz
4840        prune_with_expr(
4841            col("s1")
4842                .not_eq(lit("foo"))
4843                .and(col("s1").not_eq(lit("bar")))
4844                .and(col("s1").not_eq(lit("baz"))),
4845            &schema,
4846            &statistics,
4847            // can't rule out any container based on  knowledge of s1,s2
4848            &[true, true, true, true, true, true, true, true, true],
4849        );
4850
4851        // s1 != foo OR s1 != bar
4852        prune_with_expr(
4853            col("s1")
4854                .not_eq(lit("foo"))
4855                .or(col("s1").not_eq(lit("bar"))),
4856            &schema,
4857            &statistics,
4858            // cant' rule out anything based on contains information
4859            &[true, true, true, true, true, true, true, true, true],
4860        );
4861
4862        // s1 != foo OR s1 != bar OR s1 != baz
4863        prune_with_expr(
4864            col("s1")
4865                .not_eq(lit("foo"))
4866                .or(col("s1").not_eq(lit("bar")))
4867                .or(col("s1").not_eq(lit("baz"))),
4868            &schema,
4869            &statistics,
4870            // cant' rule out anything based on contains information
4871            &[true, true, true, true, true, true, true, true, true],
4872        );
4873    }
4874
4875    #[test]
4876    fn prune_with_contained_two_columns() {
4877        let schema = Arc::new(Schema::new(vec![
4878            Field::new("s1", DataType::Utf8, true),
4879            Field::new("s2", DataType::Utf8, true),
4880        ]));
4881
4882        // Model having information like bloom filters for s1 and s2
4883        let statistics = TestStatistics::new()
4884            .with_contained(
4885                "s1",
4886                [ScalarValue::from("foo")],
4887                [
4888                    // container 0, s1 known to only contain "foo"",
4889                    Some(true),
4890                    // container 1, s1 known to not contain "foo"
4891                    Some(false),
4892                    // container 2, s1 unknown about "foo"
4893                    None,
4894                    // container 3, s1 known to only contain "foo"
4895                    Some(true),
4896                    // container 4, s1 known to not contain "foo"
4897                    Some(false),
4898                    // container 5, s1 unknown about "foo"
4899                    None,
4900                    // container 6, s1 known to only contain "foo"
4901                    Some(true),
4902                    // container 7, s1 known to not contain "foo"
4903                    Some(false),
4904                    // container 8, s1 unknown about "foo"
4905                    None,
4906                ],
4907            )
4908            .with_contained(
4909                "s2", // for column s2
4910                [ScalarValue::from("bar")],
4911                [
4912                    // containers 0,1,2 s2 known to only contain "bar"
4913                    Some(true),
4914                    Some(true),
4915                    Some(true),
4916                    // container 3,4,5 s2 known to not contain "bar"
4917                    Some(false),
4918                    Some(false),
4919                    Some(false),
4920                    // container 6,7,8 s2 unknown about "bar"
4921                    None,
4922                    None,
4923                    None,
4924                ],
4925            );
4926
4927        // s1 = 'foo'
4928        prune_with_expr(
4929            col("s1").eq(lit("foo")),
4930            &schema,
4931            &statistics,
4932            // rule out containers where we know s1 is not present
4933            &[true, false, true, true, false, true, true, false, true],
4934        );
4935
4936        // s1 = 'foo' OR s2 = 'bar'
4937        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4938        prune_with_expr(
4939            expr,
4940            &schema,
4941            &statistics,
4942            //  can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
4943            &[true, true, true, true, true, true, true, true, true],
4944        );
4945
4946        // s1 = 'foo' AND s2 != 'bar'
4947        prune_with_expr(
4948            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4949            &schema,
4950            &statistics,
4951            // can only rule out container where we know either:
4952            // 1. s1 doesn't have the value 'foo` or
4953            // 2. s2 has only the value of 'bar'
4954            &[false, false, false, true, false, true, true, false, true],
4955        );
4956
4957        // s1 != 'foo' AND s2 != 'bar'
4958        prune_with_expr(
4959            col("s1")
4960                .not_eq(lit("foo"))
4961                .and(col("s2").not_eq(lit("bar"))),
4962            &schema,
4963            &statistics,
4964            // Can  rule out any container where we know either
4965            // 1. s1 has only the value 'foo'
4966            // 2. s2 has only the value 'bar'
4967            &[false, false, false, false, true, true, false, true, true],
4968        );
4969
4970        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
4971        prune_with_expr(
4972            col("s1")
4973                .not_eq(lit("foo"))
4974                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4975            &schema,
4976            &statistics,
4977            // Can rule out any container where we know s1 has only the value
4978            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
4979            &[false, true, true, false, true, true, false, true, true],
4980        );
4981
4982        // s1 like '%foo%bar%'
4983        prune_with_expr(
4984            col("s1").like(lit("foo%bar%")),
4985            &schema,
4986            &statistics,
4987            // cant rule out anything with information we know
4988            &[true, true, true, true, true, true, true, true, true],
4989        );
4990
4991        // s1 like '%foo%bar%' AND s2 = 'bar'
4992        prune_with_expr(
4993            col("s1")
4994                .like(lit("foo%bar%"))
4995                .and(col("s2").eq(lit("bar"))),
4996            &schema,
4997            &statistics,
4998            // can rule out any container where we know s2 does not have the value 'bar'
4999            &[true, true, true, false, false, false, true, true, true],
5000        );
5001
5002        // s1 like '%foo%bar%' OR s2 = 'bar'
5003        prune_with_expr(
5004            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
5005            &schema,
5006            &statistics,
5007            // can't rule out anything (we would have to prove that both the
5008            // like and the equality must be false)
5009            &[true, true, true, true, true, true, true, true, true],
5010        );
5011    }
5012
5013    #[test]
5014    fn prune_with_range_and_contained() {
5015        // Setup mimics range information for i, a bloom filter for s
5016        let schema = Arc::new(Schema::new(vec![
5017            Field::new("i", DataType::Int32, true),
5018            Field::new("s", DataType::Utf8, true),
5019        ]));
5020
5021        let statistics = TestStatistics::new()
5022            .with(
5023                "i",
5024                ContainerStats::new_i32(
5025                    // Container 0, 3, 6: [-5 to 5]
5026                    // Container 1, 4, 7: [10 to 20]
5027                    // Container 2, 5, 9: unknown
5028                    vec![
5029                        Some(-5),
5030                        Some(10),
5031                        None,
5032                        Some(-5),
5033                        Some(10),
5034                        None,
5035                        Some(-5),
5036                        Some(10),
5037                        None,
5038                    ], // min
5039                    vec![
5040                        Some(5),
5041                        Some(20),
5042                        None,
5043                        Some(5),
5044                        Some(20),
5045                        None,
5046                        Some(5),
5047                        Some(20),
5048                        None,
5049                    ], // max
5050                ),
5051            )
5052            // Add contained  information about the s and "foo"
5053            .with_contained(
5054                "s",
5055                [ScalarValue::from("foo")],
5056                [
5057                    // container 0,1,2 known to only contain "foo"
5058                    Some(true),
5059                    Some(true),
5060                    Some(true),
5061                    // container 3,4,5 known to not contain "foo"
5062                    Some(false),
5063                    Some(false),
5064                    Some(false),
5065                    // container 6,7,8 unknown about "foo"
5066                    None,
5067                    None,
5068                    None,
5069                ],
5070            );
5071
5072        // i = 0 and s = 'foo'
5073        prune_with_expr(
5074            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5075            &schema,
5076            &statistics,
5077            // Can rule out container where we know that either:
5078            // 1. 0 is outside the min/max range of i
5079            // 1. s does not contain foo
5080            // (range is false, and contained  is false)
5081            &[true, false, true, false, false, false, true, false, true],
5082        );
5083
5084        // i = 0 and s != 'foo'
5085        prune_with_expr(
5086            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5087            &schema,
5088            &statistics,
5089            // Can rule out containers where either:
5090            // 1. 0 is outside the min/max range of i
5091            // 2. s only contains foo
5092            &[false, false, false, true, false, true, true, false, true],
5093        );
5094
5095        // i = 0 OR s = 'foo'
5096        prune_with_expr(
5097            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5098            &schema,
5099            &statistics,
5100            // in theory could rule out containers if we had min/max values for
5101            // s as well. But in this case we don't so we can't rule out anything
5102            &[true, true, true, true, true, true, true, true, true],
5103        );
5104    }
5105
5106    /// prunes the specified expr with the specified schema and statistics, and
5107    /// ensures it returns expected.
5108    ///
5109    /// `expected` is a vector of bools, where true means the row group should
5110    /// be kept, and false means it should be pruned.
5111    ///
5112    // TODO refactor other tests to use this to reduce boiler plate
5113    fn prune_with_expr(
5114        expr: Expr,
5115        schema: &SchemaRef,
5116        statistics: &TestStatistics,
5117        expected: &[bool],
5118    ) {
5119        println!("Pruning with expr: {expr}");
5120        let expr = logical2physical(&expr, schema);
5121        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5122        let result = p.prune(statistics).unwrap();
5123        assert_eq!(result, expected);
5124    }
5125
5126    fn test_build_predicate_expression(
5127        expr: &Expr,
5128        schema: &Schema,
5129        required_columns: &mut RequiredColumns,
5130    ) -> Arc<dyn PhysicalExpr> {
5131        let expr = logical2physical(expr, schema);
5132        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5133        build_predicate_expression(
5134            &expr,
5135            &Arc::new(schema.clone()),
5136            required_columns,
5137            &unhandled_hook,
5138        )
5139    }
5140
5141    #[test]
5142    fn test_build_predicate_expression_with_false() {
5143        let expr = lit(ScalarValue::Boolean(Some(false)));
5144        let schema = Schema::empty();
5145        let res =
5146            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5147        let expected = logical2physical(&expr, &schema);
5148        assert_eq!(&res, &expected);
5149    }
5150
5151    #[test]
5152    fn test_build_predicate_expression_with_and_false() {
5153        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5154        let expr = and(
5155            col("c1").eq(lit("a")),
5156            lit(ScalarValue::Boolean(Some(false))),
5157        );
5158        let res =
5159            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5160        let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5161        assert_eq!(&res, &expected);
5162    }
5163
5164    #[test]
5165    fn test_build_predicate_expression_with_or_false() {
5166        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5167        let left_expr = col("c1").eq(lit("a"));
5168        let right_expr = lit(ScalarValue::Boolean(Some(false)));
5169        let res = test_build_predicate_expression(
5170            &or(left_expr.clone(), right_expr.clone()),
5171            &schema,
5172            &mut RequiredColumns::new(),
5173        );
5174        let expected =
5175            "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5176        assert_eq!(res.to_string(), expected);
5177    }
5178}