datafusion_physical_optimizer/
pruning.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27    array::{new_null_array, ArrayRef, BooleanArray},
28    datatypes::{DataType, Field, Schema, SchemaRef},
29    record_batch::{RecordBatch, RecordBatchOptions},
30};
31// pub use for backwards compatibility
32pub use datafusion_common::pruning::PruningStatistics;
33use log::{debug, trace};
34
35use datafusion_common::error::{DataFusionError, Result};
36use datafusion_common::tree_node::TransformedResult;
37use datafusion_common::{
38    internal_err, plan_datafusion_err, plan_err,
39    tree_node::{Transformed, TreeNode},
40    ScalarValue,
41};
42use datafusion_common::{Column, DFSchema};
43use datafusion_expr_common::operator::Operator;
44use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
45use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
46use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
47use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
48
49/// Used to prove that arbitrary predicates (boolean expression) can not
50/// possibly evaluate to `true` given information about a column provided by
51/// [`PruningStatistics`].
52///
53/// # Introduction
54///
55/// `PruningPredicate` analyzes filter expressions using statistics such as
56/// min/max values and null counts, attempting to prove a "container" (e.g.
57/// Parquet Row Group) can be skipped without reading the actual data,
58/// potentially leading to significant performance improvements.
59///
60/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
61/// on the min/max values found in the Parquet metadata. If the
62/// `PruningPredicate` can prove that the filter can never evaluate to `true`
63/// for any row in the Row Group, the entire Row Group is skipped during query
64/// execution.
65///
66/// The `PruningPredicate` API is general, and can be used for pruning other
67/// types of containers (e.g. files) based on statistics that may be known from
68/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
69/// subtle topic.  See the Background and Implementation section for details.
70///
71/// `PruningPredicate` supports:
72///
73/// 1. Arbitrary expressions (including user defined functions)
74///
75/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
76///    so it is suitable for pruning 1000s of containers.
77///
78/// 3. Any source of information that implements the [`PruningStatistics`] trait
79///    (not just Parquet metadata).
80///
81/// # Example
82///
83/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
84/// example of how to use `PruningPredicate` to prune files based on min/max
85/// values.
86///
87/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/pruning.rs
88///
89/// Given an expression like `x = 5` and statistics for 3 containers (Row
90/// Groups, files, etc) `A`, `B`, and `C`:
91///
92/// ```text
93///   A: {x_min = 0, x_max = 4}
94///   B: {x_min = 2, x_max = 10}
95///   C: {x_min = 5, x_max = 8}
96/// ```
97///
98/// `PruningPredicate` will conclude that the rows in container `A` can never
99/// be true (as the maximum value is only `4`), so it can be pruned:
100///
101/// ```text
102/// A: false (no rows could possibly match x = 5)
103/// B: true  (rows might match x = 5)
104/// C: true  (rows might match x = 5)
105/// ```
106///
107/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
108///
109/// # Background
110///
111/// ## Boolean Tri-state logic
112///
113/// To understand the details of the rest of this documentation, it is important
114/// to understand how the tri-state boolean logic in SQL works. As this is
115/// somewhat esoteric, we review it here.
116///
117/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
118/// uncertainty propagates through expressions. SQL `NULL` behaves very
119/// differently than the `NULL` in most other languages where it is a special,
120/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
121/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
122/// first encountered as they behave differently than most programmers may
123/// expect.
124///
125/// In most other programming languages,
126/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
127/// * `a == NULL` evaluates to `false` if `a` has any other value
128///
129/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
130/// `false`):
131///
132/// Expression    | Result
133/// ------------- | ---------
134/// `1 = NULL`    | `NULL`
135/// `NULL = NULL` | `NULL`
136///
137/// Also important is how `AND` and `OR` works with tri-state boolean logic as
138/// (perhaps counterintuitively) the result is **not** always NULL. While
139/// consistent with the notion of `NULL` representing “unknown”, this is again,
140/// often deeply confusing 🤯 when first encountered.
141///
142/// Expression       | Result    | Intuition
143/// ---------------  | --------- | -----------
144/// `NULL AND true`  |   `NULL`  | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
145/// `NULL AND false` |  `false`  | If the `NULL` was either `true` or `false` the overall expression is still `false`
146/// `NULL AND NULL`  | `NULL`    |
147///
148/// Expression      | Result    | Intuition
149/// --------------- | --------- | ----------
150/// `NULL OR true`  | `true`    |  If the `NULL` was either `true` or `false` the overall expression is still `true`
151/// `NULL OR false` | `NULL`    |  The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
152/// `NULL OR NULL`  |  `NULL`   |
153///
154/// ## SQL Filter Semantics
155///
156/// The SQL `WHERE` clause has a boolean expression, often called a filter or
157/// predicate. The semantics of this predicate are that the query evaluates the
158/// predicate for each row in the input tables and:
159///
160/// * Rows that evaluate to `true` are returned in the query results
161///
162/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
163///
164/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
165///   Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
166///   in the rewritten predicate described below.*
167///
168/// # `PruningPredicate` Implementation
169///
170/// Armed with the information in the Background section, we can now understand
171/// how the `PruningPredicate` logic works.
172///
173/// ## Interface
174///
175/// **Inputs**
176/// 1. An input schema describing what columns exist
177///
178/// 2. A predicate (expression that evaluates to a boolean)
179///
180/// 3. [`PruningStatistics`] that provides information about columns in that
181///    schema, for multiple “containers”. For each column in each container, it
182///    provides optional information on contained values, min_values, max_values,
183///    null_counts counts, and row_counts counts.
184///
185/// **Outputs**:
186/// A (non null) boolean value for each container:
187/// * `true`: There MAY be rows that match the predicate
188///
189/// * `false`: There are no rows that could possibly match the predicate (the
190///   predicate can never possibly be true). The container can be pruned (skipped)
191///   entirely.
192///
193/// While `PruningPredicate` will never return a `NULL` value, the
194/// rewritten predicate (as returned by `build_predicate_expression` and used internally
195/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
196/// or null / row counts are not known.
197///
198/// In order to be correct, `PruningPredicate` must return false
199/// **only** if it can determine that for all rows in the container, the
200/// predicate could never evaluate to `true` (always evaluates to either `NULL`
201/// or `false`).
202///
203/// ## Contains Analysis and Min/Max Rewrite
204///
205/// `PruningPredicate` works by first analyzing the predicate to see what
206/// [`LiteralGuarantee`] must hold for the predicate to be true.
207///
208/// Then, the `PruningPredicate` rewrites the original predicate into an
209/// expression that references the min/max values of each column in the original
210/// predicate.
211///
212/// When the min/max values are actually substituted in to this expression and
213/// evaluated, the result means
214///
215/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
216///
217/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
218///   Note that rewritten predicate can evaluate to NULL when some of
219///   the min/max values are not known. *Note that this is different than
220///   the SQL filter semantics where `NULL` means the row is filtered
221///   out.*
222///
223/// * `false`: there are no rows that could possibly match the predicate,
224///   **PRUNES** the container
225///
226/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
227/// `x_row_count` represent the minimum and maximum values, the null count of
228/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
229/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
230/// is known to be all `NULL`s. Note this is different from knowing nothing about
231/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
232/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
233///
234/// Here are some examples of the rewritten predicates:
235///
236/// Original Predicate | Rewritten Predicate
237/// ------------------ | --------------------
238/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
239/// `x < 5` | `x_null_count != x_row_count THEN false (x_max < 5)`
240/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
241/// `x IS NULL`  | `x_null_count > 0`
242/// `x IS NOT NULL`  | `x_null_count != row_count`
243/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
244///
245/// ## Predicate Evaluation
246/// The PruningPredicate works in two passes
247///
248/// **First pass**:  For each `LiteralGuarantee` calls
249/// [`PruningStatistics::contained`] and rules out containers where the
250/// LiteralGuarantees are not satisfied
251///
252/// **Second Pass**: Evaluates the rewritten expression using the
253/// min/max/null_counts/row_counts values for each column for each container. For any
254/// container that this expression evaluates to `false`, it rules out those
255/// containers.
256///
257///
258/// ### Example 1
259///
260/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
261///
262/// ```sql
263/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
264/// AND
265/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
266/// ```
267///
268/// If we know that for a given container, `x` is between `1 and 100` and we know that
269/// `y` is between `4` and `7`, we know nothing about the null count and row count of
270/// `x` and `y`, the input statistics might look like:
271///
272/// Column   | Value
273/// -------- | -----
274/// `x_min`  | `1`
275/// `x_max`  | `100`
276/// `x_null_count` | `null`
277/// `x_row_count`  | `null`
278/// `y_min`  | `4`
279/// `y_max`  | `7`
280/// `y_null_count` | `null`
281/// `y_row_count`  | `null`
282///
283/// When these statistics values are substituted in to the rewritten predicate and
284/// simplified, the result is `false`:
285///
286/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
287/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
288/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
289/// * evaluating the clauses further we get:
290/// * `null and true and null and false`
291/// * `null and false`
292/// * `false`
293///
294/// Returning `false` means the container can be pruned, which matches the
295/// intuition that  `x = 5 AND y = 10` can’t be true for any row if all values of `y`
296/// are `7` or less.
297///
298/// Note that if we had ended up with `null AND true AND null AND true` the result
299/// would have been `null`.
300/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
301///
302/// If, for some other container, we knew `y` was between the values `4` and
303/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
304/// left as an exercise to the reader -- are you still here?), and the container
305/// **could not** be pruned. The intuition is that there may be rows where the
306/// predicate *might* evaluate to `true`, and the only way to find out is to do
307/// more analysis, for example by actually reading the data and evaluating the
308/// predicate row by row.
309///
310/// ### Example 2
311///
312/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
313/// look like the same as example 1:
314///
315/// ```sql
316/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
317/// AND
318/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
319/// ```
320///
321/// If we know that for another given container, `x_min` is NULL and `x_max` is
322/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
323///  is `100`; we know that `y` is between `4` and `7`, but we know nothing about
324/// the null count and row count of `y`. The input statistics might look like:
325///
326/// Column   | Value
327/// -------- | -----
328/// `x_min`  | `null`
329/// `x_max`  | `null`
330/// `x_null_count` | `100`
331/// `x_row_count`  | `100`
332/// `y_min`  | `4`
333/// `y_max`  | `7`
334/// `y_null_count` | `null`
335/// `y_row_count`  | `null`
336///
337/// When these statistics values are substituted in to the rewritten predicate and
338/// simplified, the result is `false`:
339///
340/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
341/// * `false AND null AND null AND false`
342/// * `false AND false`
343/// * `false`
344///
345/// Returning `false` means the container can be pruned, which matches the
346/// intuition that  `x = 5 AND y = 10` can’t be true because all values in `x`
347/// are known to be NULL.
348///
349/// # Related Work
350///
351/// [`PruningPredicate`] implements the type of min/max pruning described in
352/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
353/// described by various research such as [small materialized aggregates], [zone
354/// maps], and [data skipping].
355///
356/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
357/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
358/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
359/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
360#[derive(Debug, Clone)]
361pub struct PruningPredicate {
362    /// The input schema against which the predicate will be evaluated
363    schema: SchemaRef,
364    /// A min/max pruning predicate (rewritten in terms of column min/max
365    /// values, which are supplied by statistics)
366    predicate_expr: Arc<dyn PhysicalExpr>,
367    /// Description of which statistics are required to evaluate `predicate_expr`
368    required_columns: RequiredColumns,
369    /// Original physical predicate from which this predicate expr is derived
370    /// (required for serialization)
371    orig_expr: Arc<dyn PhysicalExpr>,
372    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
373    /// evaluate to `true`.
374    ///
375    /// See [`PruningPredicate::literal_guarantees`] for more details.
376    literal_guarantees: Vec<LiteralGuarantee>,
377}
378
379/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
380/// complex expressions or predicates that reference columns that are not in the
381/// schema.
382pub trait UnhandledPredicateHook {
383    /// Called when a predicate can not be rewritten in terms of statistics or
384    /// references a column that is not in the schema.
385    fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
386}
387
388/// The default handling for unhandled predicates is to return a constant `true`
389/// (meaning don't prune the container)
390#[derive(Debug, Clone)]
391struct ConstantUnhandledPredicateHook {
392    default: Arc<dyn PhysicalExpr>,
393}
394
395impl Default for ConstantUnhandledPredicateHook {
396    fn default() -> Self {
397        Self {
398            default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
399        }
400    }
401}
402
403impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
404    fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
405        Arc::clone(&self.default)
406    }
407}
408
409impl PruningPredicate {
410    /// Try to create a new instance of [`PruningPredicate`]
411    ///
412    /// This will translate the provided `expr` filter expression into
413    /// a *pruning predicate*.
414    ///
415    /// A pruning predicate is one that has been rewritten in terms of
416    /// the min and max values of column references and that evaluates
417    /// to FALSE if the filter predicate would evaluate FALSE *for
418    /// every row* whose values fell within the min / max ranges (aka
419    /// could be pruned).
420    ///
421    /// The pruning predicate evaluates to TRUE or NULL
422    /// if the filter predicate *might* evaluate to TRUE for at least
423    /// one row whose values fell within the min/max ranges (in other
424    /// words they might pass the predicate)
425    ///
426    /// For example, the filter expression `(column / 2) = 4` becomes
427    /// the pruning predicate
428    /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
429    ///
430    /// See the struct level documentation on [`PruningPredicate`] for more
431    /// details.
432    pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
433        // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate`
434        // which does not handle dynamic exprs  in general
435        let expr = snapshot_physical_expr(expr)?;
436        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
437
438        // build predicate expression once
439        let mut required_columns = RequiredColumns::new();
440        let predicate_expr = build_predicate_expression(
441            &expr,
442            schema.as_ref(),
443            &mut required_columns,
444            &unhandled_hook,
445        );
446
447        let literal_guarantees = LiteralGuarantee::analyze(&expr);
448
449        Ok(Self {
450            schema,
451            predicate_expr,
452            required_columns,
453            orig_expr: expr,
454            literal_guarantees,
455        })
456    }
457
458    /// For each set of statistics, evaluates the pruning predicate
459    /// and returns a `bool` with the following meaning for a
460    /// all rows whose values match the statistics:
461    ///
462    /// `true`: There MAY be rows that match the predicate
463    ///
464    /// `false`: There are no rows that could possibly match the predicate
465    ///
466    /// Note: the predicate passed to `prune` should already be simplified as
467    /// much as possible (e.g. this pass doesn't handle some
468    /// expressions like `b = false`, but it does handle the
469    /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
470    ///
471    /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
472    pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
473        let mut builder = BoolVecBuilder::new(statistics.num_containers());
474
475        // Try to prove the predicate can't be true for the containers based on
476        // literal guarantees
477        for literal_guarantee in &self.literal_guarantees {
478            let LiteralGuarantee {
479                column,
480                guarantee,
481                literals,
482            } = literal_guarantee;
483            if let Some(results) = statistics.contained(column, literals) {
484                match guarantee {
485                    // `In` means the values in the column must be one of the
486                    // values in the set for the predicate to evaluate to true.
487                    // If `contained` returns false, that means the column is
488                    // not any of the values so we can prune the container
489                    Guarantee::In => builder.combine_array(&results),
490                    // `NotIn` means the values in the column must must not be
491                    // any of the values in the set for the predicate to
492                    // evaluate to true. If contained returns true, it means the
493                    // column is only in the set of values so we can prune the
494                    // container
495                    Guarantee::NotIn => {
496                        builder.combine_array(&arrow::compute::not(&results)?)
497                    }
498                }
499                // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
500                // can return early without evaluating the rest of predicates.
501                if builder.check_all_pruned() {
502                    return Ok(builder.build());
503                }
504            }
505        }
506
507        // Next, try to prove the predicate can't be true for the containers based
508        // on min/max values
509
510        // build a RecordBatch that contains the min/max values in the
511        // appropriate statistics columns for the min/max predicate
512        let statistics_batch =
513            build_statistics_record_batch(statistics, &self.required_columns)?;
514
515        // Evaluate the pruning predicate on that record batch and append any results to the builder
516        builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
517
518        Ok(builder.build())
519    }
520
521    /// Return a reference to the input schema
522    pub fn schema(&self) -> &SchemaRef {
523        &self.schema
524    }
525
526    /// Returns a reference to the physical expr used to construct this pruning predicate
527    pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
528        &self.orig_expr
529    }
530
531    /// Returns a reference to the predicate expr
532    pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
533        &self.predicate_expr
534    }
535
536    /// Returns a reference to the literal guarantees
537    ///
538    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
539    /// expression to possibly be `true`. If any is not satisfied, the
540    /// expression is guaranteed to be `null` or `false`.
541    pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
542        &self.literal_guarantees
543    }
544
545    /// Returns true if this pruning predicate can not prune anything.
546    ///
547    /// This happens if the predicate is a literal `true`  and
548    /// literal_guarantees is empty.
549    ///
550    /// This can happen when a predicate is simplified to a constant `true`
551    pub fn always_true(&self) -> bool {
552        is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
553    }
554
555    // this is only used by `parquet` feature right now
556    #[allow(dead_code)]
557    pub fn required_columns(&self) -> &RequiredColumns {
558        &self.required_columns
559    }
560
561    /// Names of the columns that are known to be / not be in a set
562    /// of literals (constants). These are the columns the that may be passed to
563    /// [`PruningStatistics::contained`] during pruning.
564    ///
565    /// This is useful to avoid fetching statistics for columns that will not be
566    /// used in the predicate. For example, it can be used to avoid reading
567    /// unneeded bloom filters (a non trivial operation).
568    pub fn literal_columns(&self) -> Vec<String> {
569        let mut seen = HashSet::new();
570        self.literal_guarantees
571            .iter()
572            .map(|e| &e.column.name)
573            // avoid duplicates
574            .filter(|name| seen.insert(*name))
575            .map(|s| s.to_string())
576            .collect()
577    }
578}
579
580/// Builds the return `Vec` for [`PruningPredicate::prune`].
581#[derive(Debug)]
582struct BoolVecBuilder {
583    /// One element per container. Each element is
584    /// * `true`: if the container has row that may pass the predicate
585    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
586    inner: Vec<bool>,
587}
588
589impl BoolVecBuilder {
590    /// Create a new `BoolVecBuilder` with `num_containers` elements
591    fn new(num_containers: usize) -> Self {
592        Self {
593            // assume by default all containers may pass the predicate
594            inner: vec![true; num_containers],
595        }
596    }
597
598    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
599    /// predicate into the currently in progress array.
600    ///
601    /// Each `array` element is:
602    /// * `true`: container has row that may pass the predicate
603    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
604    /// * `null`: container may or may not have rows that pass the predicate
605    fn combine_array(&mut self, array: &BooleanArray) {
606        assert_eq!(array.len(), self.inner.len());
607        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
608            // `false` for this conjunct means we know for sure no rows could
609            // pass the predicate and thus we set the corresponding container
610            // location to false.
611            if let Some(false) = new {
612                *cur = false;
613            }
614        }
615    }
616
617    /// Combines the results in the [`ColumnarValue`] to the currently in
618    /// progress array, following the same rules as [`Self::combine_array`].
619    ///
620    /// # Panics
621    /// If `value` is not boolean
622    fn combine_value(&mut self, value: ColumnarValue) {
623        match value {
624            ColumnarValue::Array(array) => {
625                self.combine_array(array.as_boolean());
626            }
627            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
628                // False means all containers can not pass the predicate
629                self.inner = vec![false; self.inner.len()];
630            }
631            _ => {
632                // Null or true means the rows in container may pass this
633                // conjunct so we can't prune any containers based on that
634            }
635        }
636    }
637
638    /// Convert this builder into a Vec of bools
639    fn build(self) -> Vec<bool> {
640        self.inner
641    }
642
643    /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
644    fn check_all_pruned(&self) -> bool {
645        self.inner.iter().all(|&x| !x)
646    }
647}
648
649fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
650    expr.as_any()
651        .downcast_ref::<phys_expr::Literal>()
652        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
653        .unwrap_or_default()
654}
655
656fn is_always_false(expr: &Arc<dyn PhysicalExpr>) -> bool {
657    expr.as_any()
658        .downcast_ref::<phys_expr::Literal>()
659        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(false))))
660        .unwrap_or_default()
661}
662
663/// Describes which columns statistics are necessary to evaluate a
664/// [`PruningPredicate`].
665///
666/// This structure permits reading and creating the minimum number statistics,
667/// which is important since statistics may be non trivial to read (e.g. large
668/// strings or when there are 1000s of columns).
669///
670/// Handles creating references to the min/max statistics
671/// for columns as well as recording which statistics are needed
672#[derive(Debug, Default, Clone)]
673pub struct RequiredColumns {
674    /// The statistics required to evaluate this predicate:
675    /// * The unqualified column in the input schema
676    /// * Statistics type (e.g. Min or Max or Null_Count)
677    /// * The field the statistics value should be placed in for
678    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
679    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
680}
681
682impl RequiredColumns {
683    fn new() -> Self {
684        Self::default()
685    }
686
687    /// Returns Some(column) if this is a single column predicate.
688    ///
689    /// Returns None if this is a multi-column predicate.
690    ///
691    /// Examples:
692    /// * `a > 5 OR a < 10` returns `Some(a)`
693    /// * `a > 5 OR b < 10` returns `None`
694    /// * `true` returns None
695    #[allow(dead_code)]
696    // this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
697    pub fn single_column(&self) -> Option<&phys_expr::Column> {
698        if self.columns.windows(2).all(|w| {
699            // check if all columns are the same (ignoring statistics and field)
700            let c1 = &w[0].0;
701            let c2 = &w[1].0;
702            c1 == c2
703        }) {
704            self.columns.first().map(|r| &r.0)
705        } else {
706            None
707        }
708    }
709
710    /// Returns an iterator over items in columns (see doc on
711    /// `self.columns` for details)
712    pub(crate) fn iter(
713        &self,
714    ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
715        self.columns.iter()
716    }
717
718    fn find_stat_column(
719        &self,
720        column: &phys_expr::Column,
721        statistics_type: StatisticsType,
722    ) -> Option<usize> {
723        match statistics_type {
724            StatisticsType::RowCount => {
725                // Use the first row count we find, if any
726                self.columns
727                    .iter()
728                    .enumerate()
729                    .find(|(_i, (_c, t, _f))| t == &statistics_type)
730                    .map(|(i, (_c, _t, _f))| i)
731            }
732            _ => self
733                .columns
734                .iter()
735                .enumerate()
736                .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
737                .map(|(i, (_c, _t, _f))| i),
738        }
739    }
740
741    /// Rewrites column_expr so that all appearances of column
742    /// are replaced with a reference to either the min or max
743    /// statistics column, while keeping track that a reference to the statistics
744    /// column is required
745    ///
746    /// for example, an expression like `col("foo") > 5`, when called
747    /// with Max would result in an expression like `col("foo_max") >
748    /// 5` with the appropriate entry noted in self.columns
749    fn stat_column_expr(
750        &mut self,
751        column: &phys_expr::Column,
752        column_expr: &Arc<dyn PhysicalExpr>,
753        field: &Field,
754        stat_type: StatisticsType,
755    ) -> Result<Arc<dyn PhysicalExpr>> {
756        let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
757            Some(idx) => (idx, false),
758            None => (self.columns.len(), true),
759        };
760
761        let column_name = column.name();
762        let stat_column_name = match stat_type {
763            StatisticsType::Min => format!("{column_name}_min"),
764            StatisticsType::Max => format!("{column_name}_max"),
765            StatisticsType::NullCount => format!("{column_name}_null_count"),
766            StatisticsType::RowCount => "row_count".to_string(),
767        };
768
769        let stat_column = phys_expr::Column::new(&stat_column_name, idx);
770
771        // only add statistics column if not previously added
772        if need_to_insert {
773            // may be null if statistics are not present
774            let nullable = true;
775            let stat_field =
776                Field::new(stat_column.name(), field.data_type().clone(), nullable);
777            self.columns.push((column.clone(), stat_type, stat_field));
778        }
779        rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
780    }
781
782    /// rewrite col --> col_min
783    fn min_column_expr(
784        &mut self,
785        column: &phys_expr::Column,
786        column_expr: &Arc<dyn PhysicalExpr>,
787        field: &Field,
788    ) -> Result<Arc<dyn PhysicalExpr>> {
789        self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
790    }
791
792    /// rewrite col --> col_max
793    fn max_column_expr(
794        &mut self,
795        column: &phys_expr::Column,
796        column_expr: &Arc<dyn PhysicalExpr>,
797        field: &Field,
798    ) -> Result<Arc<dyn PhysicalExpr>> {
799        self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
800    }
801
802    /// rewrite col --> col_null_count
803    fn null_count_column_expr(
804        &mut self,
805        column: &phys_expr::Column,
806        column_expr: &Arc<dyn PhysicalExpr>,
807        field: &Field,
808    ) -> Result<Arc<dyn PhysicalExpr>> {
809        self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
810    }
811
812    /// rewrite col --> col_row_count
813    fn row_count_column_expr(
814        &mut self,
815        column: &phys_expr::Column,
816        column_expr: &Arc<dyn PhysicalExpr>,
817        field: &Field,
818    ) -> Result<Arc<dyn PhysicalExpr>> {
819        self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
820    }
821}
822
823impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
824    fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
825        Self { columns }
826    }
827}
828
829/// Build a RecordBatch from a list of statistics, creating arrays,
830/// with one row for each PruningStatistics and columns specified in
831/// in the required_columns parameter.
832///
833/// For example, if the requested columns are
834/// ```text
835/// ("s1", Min, Field:s1_min)
836/// ("s2", Max, field:s2_max)
837///```
838///
839/// And the input statistics had
840/// ```text
841/// S1(Min: 5, Max: 10)
842/// S2(Min: 99, Max: 1000)
843/// S3(Min: 1, Max: 2)
844/// ```
845///
846/// Then this function would build a record batch with 2 columns and
847/// one row s1_min and s2_max as follows (s3 is not requested):
848///
849/// ```text
850/// s1_min | s2_max
851/// -------+--------
852///   5    | 1000
853/// ```
854fn build_statistics_record_batch<S: PruningStatistics>(
855    statistics: &S,
856    required_columns: &RequiredColumns,
857) -> Result<RecordBatch> {
858    let mut fields = Vec::<Field>::new();
859    let mut arrays = Vec::<ArrayRef>::new();
860    // For each needed statistics column:
861    for (column, statistics_type, stat_field) in required_columns.iter() {
862        let column = Column::from_name(column.name());
863        let data_type = stat_field.data_type();
864
865        let num_containers = statistics.num_containers();
866
867        let array = match statistics_type {
868            StatisticsType::Min => statistics.min_values(&column),
869            StatisticsType::Max => statistics.max_values(&column),
870            StatisticsType::NullCount => statistics.null_counts(&column),
871            StatisticsType::RowCount => statistics.row_counts(&column),
872        };
873        let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
874
875        if num_containers != array.len() {
876            return internal_err!(
877                "mismatched statistics length. Expected {}, got {}",
878                num_containers,
879                array.len()
880            );
881        }
882
883        // cast statistics array to required data type (e.g. parquet
884        // provides timestamp statistics as "Int64")
885        let array = arrow::compute::cast(&array, data_type)?;
886
887        fields.push(stat_field.clone());
888        arrays.push(array);
889    }
890
891    let schema = Arc::new(Schema::new(fields));
892    // provide the count in case there were no needed statistics
893    let mut options = RecordBatchOptions::default();
894    options.row_count = Some(statistics.num_containers());
895
896    trace!("Creating statistics batch for {required_columns:#?} with {arrays:#?}");
897
898    RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
899        plan_datafusion_err!("Can not create statistics record batch: {err}")
900    })
901}
902
903struct PruningExpressionBuilder<'a> {
904    column: phys_expr::Column,
905    column_expr: Arc<dyn PhysicalExpr>,
906    op: Operator,
907    scalar_expr: Arc<dyn PhysicalExpr>,
908    field: &'a Field,
909    required_columns: &'a mut RequiredColumns,
910}
911
912impl<'a> PruningExpressionBuilder<'a> {
913    fn try_new(
914        left: &'a Arc<dyn PhysicalExpr>,
915        right: &'a Arc<dyn PhysicalExpr>,
916        op: Operator,
917        schema: &'a Schema,
918        required_columns: &'a mut RequiredColumns,
919    ) -> Result<Self> {
920        // find column name; input could be a more complicated expression
921        let left_columns = collect_columns(left);
922        let right_columns = collect_columns(right);
923        let (column_expr, scalar_expr, columns, correct_operator) =
924            match (left_columns.len(), right_columns.len()) {
925                (1, 0) => (left, right, left_columns, op),
926                (0, 1) => (right, left, right_columns, reverse_operator(op)?),
927                _ => {
928                    // if more than one column used in expression - not supported
929                    return plan_err!(
930                        "Multi-column expressions are not currently supported"
931                    );
932                }
933            };
934
935        let df_schema = DFSchema::try_from(schema.clone())?;
936        let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
937            column_expr,
938            correct_operator,
939            scalar_expr,
940            df_schema,
941        )?;
942        let column = columns.iter().next().unwrap().clone();
943        let field = match schema.column_with_name(column.name()) {
944            Some((_, f)) => f,
945            _ => {
946                return plan_err!("Field not found in schema");
947            }
948        };
949
950        Ok(Self {
951            column,
952            column_expr,
953            op: correct_operator,
954            scalar_expr,
955            field,
956            required_columns,
957        })
958    }
959
960    fn op(&self) -> Operator {
961        self.op
962    }
963
964    fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
965        &self.scalar_expr
966    }
967
968    fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
969        self.required_columns
970            .min_column_expr(&self.column, &self.column_expr, self.field)
971    }
972
973    fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
974        self.required_columns
975            .max_column_expr(&self.column, &self.column_expr, self.field)
976    }
977
978    /// This function is to simply retune the `null_count` physical expression no matter what the
979    /// predicate expression is
980    ///
981    /// i.e., x > 5 => x_null_count,
982    ///       cast(x as int) < 10 => x_null_count,
983    ///       try_cast(x as float) < 10.0 => x_null_count
984    fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
985        // Retune to [`phys_expr::Column`]
986        let column_expr = Arc::new(self.column.clone()) as _;
987
988        // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
989        let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
990
991        self.required_columns.null_count_column_expr(
992            &self.column,
993            &column_expr,
994            null_count_field,
995        )
996    }
997
998    /// This function is to simply retune the `row_count` physical expression no matter what the
999    /// predicate expression is
1000    ///
1001    /// i.e., x > 5 => x_row_count,
1002    ///       cast(x as int) < 10 => x_row_count,
1003    ///       try_cast(x as float) < 10.0 => x_row_count
1004    fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1005        // Retune to [`phys_expr::Column`]
1006        let column_expr = Arc::new(self.column.clone()) as _;
1007
1008        // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1009        let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1010
1011        self.required_columns.row_count_column_expr(
1012            &self.column,
1013            &column_expr,
1014            row_count_field,
1015        )
1016    }
1017}
1018
1019/// This function is designed to rewrite the column_expr to
1020/// ensure the column_expr is monotonically increasing.
1021///
1022/// For example,
1023/// 1. `col > 10`
1024/// 2. `-col > 10` should be rewritten to `col < -10`
1025/// 3. `!col = true` would be rewritten to `col = !true`
1026/// 4. `abs(a - 10) > 0` not supported
1027/// 5. `cast(can_prunable_expr) > 10`
1028/// 6. `try_cast(can_prunable_expr) > 10`
1029///
1030/// More rewrite rules are still in progress.
1031fn rewrite_expr_to_prunable(
1032    column_expr: &PhysicalExprRef,
1033    op: Operator,
1034    scalar_expr: &PhysicalExprRef,
1035    schema: DFSchema,
1036) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1037    if !is_compare_op(op) {
1038        return plan_err!("rewrite_expr_to_prunable only support compare expression");
1039    }
1040
1041    let column_expr_any = column_expr.as_any();
1042
1043    if column_expr_any
1044        .downcast_ref::<phys_expr::Column>()
1045        .is_some()
1046    {
1047        // `col op lit()`
1048        Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1049    } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1050        // `cast(col) op lit()`
1051        let arrow_schema: SchemaRef = schema.clone().into();
1052        let from_type = cast.expr().data_type(&arrow_schema)?;
1053        verify_support_type_for_prune(&from_type, cast.cast_type())?;
1054        let (left, op, right) =
1055            rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1056        let left = Arc::new(phys_expr::CastExpr::new(
1057            left,
1058            cast.cast_type().clone(),
1059            None,
1060        ));
1061        Ok((left, op, right))
1062    } else if let Some(try_cast) =
1063        column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1064    {
1065        // `try_cast(col) op lit()`
1066        let arrow_schema: SchemaRef = schema.clone().into();
1067        let from_type = try_cast.expr().data_type(&arrow_schema)?;
1068        verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1069        let (left, op, right) =
1070            rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1071        let left = Arc::new(phys_expr::TryCastExpr::new(
1072            left,
1073            try_cast.cast_type().clone(),
1074        ));
1075        Ok((left, op, right))
1076    } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1077        // `-col > lit()`  --> `col < -lit()`
1078        let (left, op, right) =
1079            rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1080        let right = Arc::new(phys_expr::NegativeExpr::new(right));
1081        Ok((left, reverse_operator(op)?, right))
1082    } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1083        // `!col = true` --> `col = !true`
1084        if op != Operator::Eq && op != Operator::NotEq {
1085            return plan_err!("Not with operator other than Eq / NotEq is not supported");
1086        }
1087        if not
1088            .arg()
1089            .as_any()
1090            .downcast_ref::<phys_expr::Column>()
1091            .is_some()
1092        {
1093            let left = Arc::clone(not.arg());
1094            let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1095            Ok((left, reverse_operator(op)?, right))
1096        } else {
1097            plan_err!("Not with complex expression {column_expr:?} is not supported")
1098        }
1099    } else {
1100        plan_err!("column expression {column_expr:?} is not supported")
1101    }
1102}
1103
1104fn is_compare_op(op: Operator) -> bool {
1105    matches!(
1106        op,
1107        Operator::Eq
1108            | Operator::NotEq
1109            | Operator::Lt
1110            | Operator::LtEq
1111            | Operator::Gt
1112            | Operator::GtEq
1113            | Operator::LikeMatch
1114            | Operator::NotLikeMatch
1115    )
1116}
1117
1118fn is_string_type(data_type: &DataType) -> bool {
1119    matches!(
1120        data_type,
1121        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
1122    )
1123}
1124
1125// The pruning logic is based on the comparing the min/max bounds.
1126// Must make sure the two type has order.
1127// For example, casts from string to numbers is not correct.
1128// Because the "13" is less than "3" with UTF8 comparison order.
1129fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1130    // Dictionary casts are always supported as long as the value types are supported
1131    let from_type = match from_type {
1132        DataType::Dictionary(_, t) => {
1133            return verify_support_type_for_prune(t.as_ref(), to_type)
1134        }
1135        _ => from_type,
1136    };
1137    let to_type = match to_type {
1138        DataType::Dictionary(_, t) => {
1139            return verify_support_type_for_prune(from_type, t.as_ref())
1140        }
1141        _ => to_type,
1142    };
1143    // If both types are strings or both are not strings (number, timestamp, etc)
1144    // then we can compare them.
1145    // PruningPredicate does not support casting of strings to numbers and such.
1146    if is_string_type(from_type) == is_string_type(to_type) {
1147        Ok(())
1148    } else {
1149        plan_err!(
1150            "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1151        )
1152    }
1153}
1154
1155/// replaces a column with an old name with a new name in an expression
1156fn rewrite_column_expr(
1157    e: Arc<dyn PhysicalExpr>,
1158    column_old: &phys_expr::Column,
1159    column_new: &phys_expr::Column,
1160) -> Result<Arc<dyn PhysicalExpr>> {
1161    e.transform(|expr| {
1162        if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1163            if column == column_old {
1164                return Ok(Transformed::yes(Arc::new(column_new.clone())));
1165            }
1166        }
1167
1168        Ok(Transformed::no(expr))
1169    })
1170    .data()
1171}
1172
1173fn reverse_operator(op: Operator) -> Result<Operator> {
1174    op.swap().ok_or_else(|| {
1175        DataFusionError::Internal(format!(
1176            "Could not reverse operator {op} while building pruning predicate"
1177        ))
1178    })
1179}
1180
1181/// Given a column reference to `column`, returns a pruning
1182/// expression in terms of the min and max that will evaluate to true
1183/// if the column may contain values, and false if definitely does not
1184/// contain values
1185fn build_single_column_expr(
1186    column: &phys_expr::Column,
1187    schema: &Schema,
1188    required_columns: &mut RequiredColumns,
1189    is_not: bool, // if true, treat as !col
1190) -> Option<Arc<dyn PhysicalExpr>> {
1191    let field = schema.field_with_name(column.name()).ok()?;
1192
1193    if matches!(field.data_type(), &DataType::Boolean) {
1194        let col_ref = Arc::new(column.clone()) as _;
1195
1196        let min = required_columns
1197            .min_column_expr(column, &col_ref, field)
1198            .ok()?;
1199        let max = required_columns
1200            .max_column_expr(column, &col_ref, field)
1201            .ok()?;
1202
1203        // remember -- we want an expression that is:
1204        // TRUE: if there may be rows that match
1205        // FALSE: if there are no rows that match
1206        if is_not {
1207            // The only way we know a column couldn't match is if both the min and max are true
1208            // !(min && max)
1209            Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1210                phys_expr::BinaryExpr::new(min, Operator::And, max),
1211            ))))
1212        } else {
1213            // the only way we know a column couldn't match is if both the min and max are false
1214            // !(!min && !max) --> min || max
1215            Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1216        }
1217    } else {
1218        None
1219    }
1220}
1221
1222/// Given an expression reference to `expr`, if `expr` is a column expression,
1223/// returns a pruning expression in terms of IsNull that will evaluate to true
1224/// if the column may contain null, and false if definitely does not
1225/// contain null.
1226/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1227/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1228/// at least one NULL value.  In this case we can know that `IS NOT NULL` can not be true and
1229/// thus can prune the row group / value
1230fn build_is_null_column_expr(
1231    expr: &Arc<dyn PhysicalExpr>,
1232    schema: &Schema,
1233    required_columns: &mut RequiredColumns,
1234    with_not: bool,
1235) -> Option<Arc<dyn PhysicalExpr>> {
1236    if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1237        let field = schema.field_with_name(col.name()).ok()?;
1238
1239        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1240        if with_not {
1241            if let Ok(row_count_expr) =
1242                required_columns.row_count_column_expr(col, expr, null_count_field)
1243            {
1244                required_columns
1245                    .null_count_column_expr(col, expr, null_count_field)
1246                    .map(|null_count_column_expr| {
1247                        // IsNotNull(column) => null_count != row_count
1248                        Arc::new(phys_expr::BinaryExpr::new(
1249                            null_count_column_expr,
1250                            Operator::NotEq,
1251                            row_count_expr,
1252                        )) as _
1253                    })
1254                    .ok()
1255            } else {
1256                None
1257            }
1258        } else {
1259            required_columns
1260                .null_count_column_expr(col, expr, null_count_field)
1261                .map(|null_count_column_expr| {
1262                    // IsNull(column) => null_count > 0
1263                    Arc::new(phys_expr::BinaryExpr::new(
1264                        null_count_column_expr,
1265                        Operator::Gt,
1266                        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1267                    )) as _
1268                })
1269                .ok()
1270        }
1271    } else {
1272        None
1273    }
1274}
1275
1276/// The maximum number of entries in an `InList` that might be rewritten into
1277/// an OR chain
1278const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1279
1280/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1281/// for use as a [`PruningPredicate`].
1282pub struct PredicateRewriter {
1283    unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1284}
1285
1286impl Default for PredicateRewriter {
1287    fn default() -> Self {
1288        Self {
1289            unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1290        }
1291    }
1292}
1293
1294impl PredicateRewriter {
1295    /// Create a new `PredicateRewriter`
1296    pub fn new() -> Self {
1297        Self::default()
1298    }
1299
1300    /// Set the unhandled hook to be used when a predicate can not be rewritten
1301    pub fn with_unhandled_hook(
1302        self,
1303        unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1304    ) -> Self {
1305        Self { unhandled_hook }
1306    }
1307
1308    /// Translate logical filter expression into pruning predicate
1309    /// expression that will evaluate to FALSE if it can be determined no
1310    /// rows between the min/max values could pass the predicates.
1311    ///
1312    /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1313    ///
1314    /// Returns the pruning predicate as an [`PhysicalExpr`]
1315    ///
1316    /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1317    pub fn rewrite_predicate_to_statistics_predicate(
1318        &self,
1319        expr: &Arc<dyn PhysicalExpr>,
1320        schema: &Schema,
1321    ) -> Arc<dyn PhysicalExpr> {
1322        let mut required_columns = RequiredColumns::new();
1323        build_predicate_expression(
1324            expr,
1325            schema,
1326            &mut required_columns,
1327            &self.unhandled_hook,
1328        )
1329    }
1330}
1331
1332/// Translate logical filter expression into pruning predicate
1333/// expression that will evaluate to FALSE if it can be determined no
1334/// rows between the min/max values could pass the predicates.
1335///
1336/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1337///
1338/// Returns the pruning predicate as an [`PhysicalExpr`]
1339///
1340/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1341fn build_predicate_expression(
1342    expr: &Arc<dyn PhysicalExpr>,
1343    schema: &Schema,
1344    required_columns: &mut RequiredColumns,
1345    unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1346) -> Arc<dyn PhysicalExpr> {
1347    if is_always_false(expr) {
1348        // Shouldn't return `unhandled_hook.handle(expr)`
1349        // Because it will transfer false to true.
1350        return Arc::clone(expr);
1351    }
1352    // predicate expression can only be a binary expression
1353    let expr_any = expr.as_any();
1354    if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1355        return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1356            .unwrap_or_else(|| unhandled_hook.handle(expr));
1357    }
1358    if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1359        return build_is_null_column_expr(
1360            is_not_null.arg(),
1361            schema,
1362            required_columns,
1363            true,
1364        )
1365        .unwrap_or_else(|| unhandled_hook.handle(expr));
1366    }
1367    if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1368        return build_single_column_expr(col, schema, required_columns, false)
1369            .unwrap_or_else(|| unhandled_hook.handle(expr));
1370    }
1371    if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1372        // match !col (don't do so recursively)
1373        if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1374            return build_single_column_expr(col, schema, required_columns, true)
1375                .unwrap_or_else(|| unhandled_hook.handle(expr));
1376        } else {
1377            return unhandled_hook.handle(expr);
1378        }
1379    }
1380    if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1381        if !in_list.list().is_empty()
1382            && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1383        {
1384            let eq_op = if in_list.negated() {
1385                Operator::NotEq
1386            } else {
1387                Operator::Eq
1388            };
1389            let re_op = if in_list.negated() {
1390                Operator::And
1391            } else {
1392                Operator::Or
1393            };
1394            let change_expr = in_list
1395                .list()
1396                .iter()
1397                .map(|e| {
1398                    Arc::new(phys_expr::BinaryExpr::new(
1399                        Arc::clone(in_list.expr()),
1400                        eq_op,
1401                        Arc::clone(e),
1402                    )) as _
1403                })
1404                .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1405                .unwrap();
1406            return build_predicate_expression(
1407                &change_expr,
1408                schema,
1409                required_columns,
1410                unhandled_hook,
1411            );
1412        } else {
1413            return unhandled_hook.handle(expr);
1414        }
1415    }
1416
1417    let (left, op, right) = {
1418        if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1419            (
1420                Arc::clone(bin_expr.left()),
1421                *bin_expr.op(),
1422                Arc::clone(bin_expr.right()),
1423            )
1424        } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1425            if like_expr.case_insensitive() {
1426                return unhandled_hook.handle(expr);
1427            }
1428            let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1429                (false, false) => Operator::LikeMatch,
1430                (true, false) => Operator::NotLikeMatch,
1431                (false, true) => Operator::ILikeMatch,
1432                (true, true) => Operator::NotILikeMatch,
1433            };
1434            (
1435                Arc::clone(like_expr.expr()),
1436                op,
1437                Arc::clone(like_expr.pattern()),
1438            )
1439        } else {
1440            return unhandled_hook.handle(expr);
1441        }
1442    };
1443
1444    if op == Operator::And || op == Operator::Or {
1445        let left_expr =
1446            build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1447        let right_expr =
1448            build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1449        // simplify boolean expression if applicable
1450        let expr = match (&left_expr, op, &right_expr) {
1451            (left, Operator::And, right)
1452                if is_always_false(left) || is_always_false(right) =>
1453            {
1454                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(false))))
1455            }
1456            (left, Operator::And, _) if is_always_true(left) => right_expr,
1457            (_, Operator::And, right) if is_always_true(right) => left_expr,
1458            (left, Operator::Or, right)
1459                if is_always_true(left) || is_always_true(right) =>
1460            {
1461                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1462            }
1463            (left, Operator::Or, _) if is_always_false(left) => right_expr,
1464            (_, Operator::Or, right) if is_always_false(right) => left_expr,
1465
1466            _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1467        };
1468        return expr;
1469    }
1470
1471    let expr_builder =
1472        PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1473    let mut expr_builder = match expr_builder {
1474        Ok(builder) => builder,
1475        // allow partial failure in predicate expression generation
1476        // this can still produce a useful predicate when multiple conditions are joined using AND
1477        Err(e) => {
1478            debug!("Error building pruning expression: {e}");
1479            return unhandled_hook.handle(expr);
1480        }
1481    };
1482
1483    build_statistics_expr(&mut expr_builder)
1484        .unwrap_or_else(|_| unhandled_hook.handle(expr))
1485}
1486
1487fn build_statistics_expr(
1488    expr_builder: &mut PruningExpressionBuilder,
1489) -> Result<Arc<dyn PhysicalExpr>> {
1490    let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1491        Operator::NotEq => {
1492            // column != literal => (min, max) = literal =>
1493            // !(min != literal && max != literal) ==>
1494            // min != literal || literal != max
1495            let min_column_expr = expr_builder.min_column_expr()?;
1496            let max_column_expr = expr_builder.max_column_expr()?;
1497            Arc::new(phys_expr::BinaryExpr::new(
1498                Arc::new(phys_expr::BinaryExpr::new(
1499                    min_column_expr,
1500                    Operator::NotEq,
1501                    Arc::clone(expr_builder.scalar_expr()),
1502                )),
1503                Operator::Or,
1504                Arc::new(phys_expr::BinaryExpr::new(
1505                    Arc::clone(expr_builder.scalar_expr()),
1506                    Operator::NotEq,
1507                    max_column_expr,
1508                )),
1509            ))
1510        }
1511        Operator::Eq => {
1512            // column = literal => (min, max) = literal => min <= literal && literal <= max
1513            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1514            let min_column_expr = expr_builder.min_column_expr()?;
1515            let max_column_expr = expr_builder.max_column_expr()?;
1516            Arc::new(phys_expr::BinaryExpr::new(
1517                Arc::new(phys_expr::BinaryExpr::new(
1518                    min_column_expr,
1519                    Operator::LtEq,
1520                    Arc::clone(expr_builder.scalar_expr()),
1521                )),
1522                Operator::And,
1523                Arc::new(phys_expr::BinaryExpr::new(
1524                    Arc::clone(expr_builder.scalar_expr()),
1525                    Operator::LtEq,
1526                    max_column_expr,
1527                )),
1528            ))
1529        }
1530        Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1531        Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1532            plan_datafusion_err!(
1533                "LIKE expression with wildcard at the beginning is not supported"
1534            )
1535        })?,
1536        Operator::Gt => {
1537            // column > literal => (min, max) > literal => max > literal
1538            Arc::new(phys_expr::BinaryExpr::new(
1539                expr_builder.max_column_expr()?,
1540                Operator::Gt,
1541                Arc::clone(expr_builder.scalar_expr()),
1542            ))
1543        }
1544        Operator::GtEq => {
1545            // column >= literal => (min, max) >= literal => max >= literal
1546            Arc::new(phys_expr::BinaryExpr::new(
1547                expr_builder.max_column_expr()?,
1548                Operator::GtEq,
1549                Arc::clone(expr_builder.scalar_expr()),
1550            ))
1551        }
1552        Operator::Lt => {
1553            // column < literal => (min, max) < literal => min < literal
1554            Arc::new(phys_expr::BinaryExpr::new(
1555                expr_builder.min_column_expr()?,
1556                Operator::Lt,
1557                Arc::clone(expr_builder.scalar_expr()),
1558            ))
1559        }
1560        Operator::LtEq => {
1561            // column <= literal => (min, max) <= literal => min <= literal
1562            Arc::new(phys_expr::BinaryExpr::new(
1563                expr_builder.min_column_expr()?,
1564                Operator::LtEq,
1565                Arc::clone(expr_builder.scalar_expr()),
1566            ))
1567        }
1568        // other expressions are not supported
1569        _ => {
1570            return plan_err!(
1571                "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1572            );
1573        }
1574    };
1575    let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1576    Ok(statistics_expr)
1577}
1578
1579/// returns the string literal of the scalar value if it is a string
1580fn unpack_string(s: &ScalarValue) -> Option<&str> {
1581    s.try_as_str().flatten()
1582}
1583
1584fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1585    if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1586        let s = unpack_string(lit.value())?;
1587        return Some(s);
1588    }
1589    None
1590}
1591
1592/// Convert `column LIKE literal` where P is a constant prefix of the literal
1593/// to a range check on the column: `P <= column && column < P'`, where P' is the
1594/// lowest string after all P* strings.
1595fn build_like_match(
1596    expr_builder: &mut PruningExpressionBuilder,
1597) -> Option<Arc<dyn PhysicalExpr>> {
1598    // column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
1599    // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1600    // column LIKE '%foo' => min <= '' && '' <= max => true
1601    // column LIKE '%foo%' => min <= '' && '' <= max => true
1602    // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1603
1604    // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1605    //  this may involve building the physical expressions that call lower() and upper()
1606    let min_column_expr = expr_builder.min_column_expr().ok()?;
1607    let max_column_expr = expr_builder.max_column_expr().ok()?;
1608    let scalar_expr = expr_builder.scalar_expr();
1609    // check that the scalar is a string literal
1610    let s = extract_string_literal(scalar_expr)?;
1611    // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1612    let first_wildcard_index = s.find(['%', '_']);
1613    if first_wildcard_index == Some(0) {
1614        // there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
1615        return None;
1616    }
1617    let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1618        let prefix = &s[..wildcard_index];
1619        let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1620            prefix.to_string(),
1621        ))));
1622        let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1623            increment_utf8(prefix)?,
1624        ))));
1625        (lower_bound_lit, upper_bound_lit)
1626    } else {
1627        // the like expression is a literal and can be converted into a comparison
1628        let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1629            s.to_string(),
1630        ))));
1631        (Arc::clone(&bound), bound)
1632    };
1633    let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1634        lower_bound,
1635        Operator::LtEq,
1636        Arc::clone(&max_column_expr),
1637    ));
1638    let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1639        Arc::clone(&min_column_expr),
1640        Operator::LtEq,
1641        upper_bound,
1642    ));
1643    let combined = Arc::new(phys_expr::BinaryExpr::new(
1644        upper_bound_expr,
1645        Operator::And,
1646        lower_bound_expr,
1647    ));
1648    Some(combined)
1649}
1650
1651// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1652//
1653// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1654// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1655// looking for rows that don't begin with `const_prefix` can never be true)
1656fn build_not_like_match(
1657    expr_builder: &mut PruningExpressionBuilder<'_>,
1658) -> Result<Arc<dyn PhysicalExpr>> {
1659    // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1660
1661    let min_column_expr = expr_builder.min_column_expr()?;
1662    let max_column_expr = expr_builder.max_column_expr()?;
1663
1664    let scalar_expr = expr_builder.scalar_expr();
1665
1666    let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1667        plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1668    })?;
1669
1670    let (const_prefix, remaining) = split_constant_prefix(pattern);
1671    if const_prefix.is_empty() || remaining != "%" {
1672        // we can not handle `%` at the beginning or in the middle of the pattern
1673        // Example: For pattern "foo%bar", the row group might include values like
1674        // ["foobar", "food", "foodbar"], making it unsafe to prune.
1675        // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1676        // match the pattern, intermediate values like "food" may not
1677        // match the full pattern "foo%bar", making pruning unsafe.
1678        // (truncate foo%bar to foo% have same problem)
1679
1680        // we can not handle pattern containing `_`
1681        // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1682        // which means not every row is guaranteed to match the pattern.
1683        return Err(plan_datafusion_err!(
1684            "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1685        ));
1686    }
1687
1688    let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1689        true,
1690        false,
1691        Arc::clone(&min_column_expr),
1692        Arc::clone(scalar_expr),
1693    ));
1694
1695    let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1696        true,
1697        false,
1698        Arc::clone(&max_column_expr),
1699        Arc::clone(scalar_expr),
1700    ));
1701
1702    Ok(Arc::new(phys_expr::BinaryExpr::new(
1703        min_col_not_like_epxr,
1704        Operator::Or,
1705        max_col_not_like_expr,
1706    )))
1707}
1708
1709/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1710fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1711    let char_indices = pattern.char_indices().collect::<Vec<_>>();
1712    for i in 0..char_indices.len() {
1713        let (idx, char) = char_indices[i];
1714        if char == '%' || char == '_' {
1715            if i != 0 && char_indices[i - 1].1 == '\\' {
1716                // ecsaped by `\`
1717                continue;
1718            }
1719            return (&pattern[..idx], &pattern[idx..]);
1720        }
1721    }
1722    (pattern, "")
1723}
1724
1725/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1726/// This makes it so that the returned string will always compare greater than the input string
1727/// or any other string with the same prefix.
1728/// This is necessary since the statistics may have been truncated: if we have a min statistic
1729/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1730/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1731/// In this example `increment_utf8("foo") == "fop"
1732fn increment_utf8(data: &str) -> Option<String> {
1733    // Helper function to check if a character is valid to use
1734    fn is_valid_unicode(c: char) -> bool {
1735        let cp = c as u32;
1736
1737        // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1738        if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1739            return false;
1740        }
1741
1742        // Filter out private use area
1743        if cp >= 0x110000 {
1744            return false;
1745        }
1746
1747        true
1748    }
1749
1750    // Convert string to vector of code points
1751    let mut code_points: Vec<char> = data.chars().collect();
1752
1753    // Work backwards through code points
1754    for idx in (0..code_points.len()).rev() {
1755        let original = code_points[idx] as u32;
1756
1757        // Try incrementing the code point
1758        if let Some(next_char) = char::from_u32(original + 1) {
1759            if is_valid_unicode(next_char) {
1760                code_points[idx] = next_char;
1761                // truncate the string to the current index
1762                code_points.truncate(idx + 1);
1763                return Some(code_points.into_iter().collect());
1764            }
1765        }
1766    }
1767
1768    None
1769}
1770
1771/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
1772///
1773/// This is important not only as an optimization but also because statistics may not be
1774/// accurate for columns that are all nulls.
1775/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
1776/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
1777///
1778/// For example:
1779///
1780/// `x_min <= 10 AND 10 <= x_max`
1781///
1782/// will become
1783///
1784/// ```sql
1785/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
1786/// ````
1787///
1788/// If the column is known to be all nulls, then the expression
1789/// `x_null_count = x_row_count` will be true, which will cause the
1790/// boolean expression to return false. Therefore, prune out the container.
1791fn wrap_null_count_check_expr(
1792    statistics_expr: Arc<dyn PhysicalExpr>,
1793    expr_builder: &mut PruningExpressionBuilder,
1794) -> Result<Arc<dyn PhysicalExpr>> {
1795    // x_null_count != x_row_count
1796    let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1797        expr_builder.null_count_column_expr()?,
1798        Operator::NotEq,
1799        expr_builder.row_count_column_expr()?,
1800    ));
1801
1802    // (x_null_count != x_row_count) AND (<statistics_expr>)
1803    Ok(Arc::new(phys_expr::BinaryExpr::new(
1804        not_when_null_count_eq_row_count,
1805        Operator::And,
1806        statistics_expr,
1807    )))
1808}
1809
1810#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1811pub(crate) enum StatisticsType {
1812    Min,
1813    Max,
1814    NullCount,
1815    RowCount,
1816}
1817
1818#[cfg(test)]
1819mod tests {
1820    use std::collections::HashMap;
1821    use std::ops::{Not, Rem};
1822
1823    use super::*;
1824    use datafusion_common::test_util::batches_to_string;
1825    use datafusion_expr::{and, col, lit, or};
1826    use insta::assert_snapshot;
1827
1828    use arrow::array::Decimal128Array;
1829    use arrow::{
1830        array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1831        datatypes::TimeUnit,
1832    };
1833    use datafusion_expr::expr::InList;
1834    use datafusion_expr::{cast, is_null, try_cast, Expr};
1835    use datafusion_functions_nested::expr_fn::{array_has, make_array};
1836    use datafusion_physical_expr::expressions as phys_expr;
1837    use datafusion_physical_expr::planner::logical2physical;
1838
1839    #[derive(Debug, Default)]
1840    /// Mock statistic provider for tests
1841    ///
1842    /// Each row represents the statistics for a "container" (which
1843    /// might represent an entire parquet file, or directory of files,
1844    /// or some other collection of data for which we had statistics)
1845    ///
1846    /// Note All `ArrayRefs` must be the same size.
1847    struct ContainerStats {
1848        min: Option<ArrayRef>,
1849        max: Option<ArrayRef>,
1850        /// Optional values
1851        null_counts: Option<ArrayRef>,
1852        row_counts: Option<ArrayRef>,
1853        /// Optional known values (e.g. mimic a bloom filter)
1854        /// (value, contained)
1855        /// If present, all BooleanArrays must be the same size as min/max
1856        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1857    }
1858
1859    impl ContainerStats {
1860        fn new() -> Self {
1861            Default::default()
1862        }
1863        fn new_decimal128(
1864            min: impl IntoIterator<Item = Option<i128>>,
1865            max: impl IntoIterator<Item = Option<i128>>,
1866            precision: u8,
1867            scale: i8,
1868        ) -> Self {
1869            Self::new()
1870                .with_min(Arc::new(
1871                    min.into_iter()
1872                        .collect::<Decimal128Array>()
1873                        .with_precision_and_scale(precision, scale)
1874                        .unwrap(),
1875                ))
1876                .with_max(Arc::new(
1877                    max.into_iter()
1878                        .collect::<Decimal128Array>()
1879                        .with_precision_and_scale(precision, scale)
1880                        .unwrap(),
1881                ))
1882        }
1883
1884        fn new_i64(
1885            min: impl IntoIterator<Item = Option<i64>>,
1886            max: impl IntoIterator<Item = Option<i64>>,
1887        ) -> Self {
1888            Self::new()
1889                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1890                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1891        }
1892
1893        fn new_i32(
1894            min: impl IntoIterator<Item = Option<i32>>,
1895            max: impl IntoIterator<Item = Option<i32>>,
1896        ) -> Self {
1897            Self::new()
1898                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1899                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1900        }
1901
1902        fn new_utf8<'a>(
1903            min: impl IntoIterator<Item = Option<&'a str>>,
1904            max: impl IntoIterator<Item = Option<&'a str>>,
1905        ) -> Self {
1906            Self::new()
1907                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1908                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1909        }
1910
1911        fn new_bool(
1912            min: impl IntoIterator<Item = Option<bool>>,
1913            max: impl IntoIterator<Item = Option<bool>>,
1914        ) -> Self {
1915            Self::new()
1916                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1917                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1918        }
1919
1920        fn min(&self) -> Option<ArrayRef> {
1921            self.min.clone()
1922        }
1923
1924        fn max(&self) -> Option<ArrayRef> {
1925            self.max.clone()
1926        }
1927
1928        fn null_counts(&self) -> Option<ArrayRef> {
1929            self.null_counts.clone()
1930        }
1931
1932        fn row_counts(&self) -> Option<ArrayRef> {
1933            self.row_counts.clone()
1934        }
1935
1936        /// return an iterator over all arrays in this statistics
1937        fn arrays(&self) -> Vec<ArrayRef> {
1938            let contained_arrays = self
1939                .contained
1940                .iter()
1941                .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
1942
1943            [
1944                self.min.as_ref().cloned(),
1945                self.max.as_ref().cloned(),
1946                self.null_counts.as_ref().cloned(),
1947                self.row_counts.as_ref().cloned(),
1948            ]
1949            .into_iter()
1950            .flatten()
1951            .chain(contained_arrays)
1952            .collect()
1953        }
1954
1955        /// Returns the number of containers represented by this statistics This
1956        /// picks the length of the first array as all arrays must have the same
1957        /// length (which is verified by `assert_invariants`).
1958        fn len(&self) -> usize {
1959            // pick the first non zero length
1960            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
1961        }
1962
1963        /// Ensure that the lengths of all arrays are consistent
1964        fn assert_invariants(&self) {
1965            let mut prev_len = None;
1966
1967            for len in self.arrays().iter().map(|a| a.len()) {
1968                // Get a length, if we don't already have one
1969                match prev_len {
1970                    None => {
1971                        prev_len = Some(len);
1972                    }
1973                    Some(prev_len) => {
1974                        assert_eq!(prev_len, len);
1975                    }
1976                }
1977            }
1978        }
1979
1980        /// Add min values
1981        fn with_min(mut self, min: ArrayRef) -> Self {
1982            self.min = Some(min);
1983            self
1984        }
1985
1986        /// Add max values
1987        fn with_max(mut self, max: ArrayRef) -> Self {
1988            self.max = Some(max);
1989            self
1990        }
1991
1992        /// Add null counts. There must be the same number of null counts as
1993        /// there are containers
1994        fn with_null_counts(
1995            mut self,
1996            counts: impl IntoIterator<Item = Option<u64>>,
1997        ) -> Self {
1998            let null_counts: ArrayRef =
1999                Arc::new(counts.into_iter().collect::<UInt64Array>());
2000
2001            self.assert_invariants();
2002            self.null_counts = Some(null_counts);
2003            self
2004        }
2005
2006        /// Add row counts. There must be the same number of row counts as
2007        /// there are containers
2008        fn with_row_counts(
2009            mut self,
2010            counts: impl IntoIterator<Item = Option<u64>>,
2011        ) -> Self {
2012            let row_counts: ArrayRef =
2013                Arc::new(counts.into_iter().collect::<UInt64Array>());
2014
2015            self.assert_invariants();
2016            self.row_counts = Some(row_counts);
2017            self
2018        }
2019
2020        /// Add contained information.
2021        pub fn with_contained(
2022            mut self,
2023            values: impl IntoIterator<Item = ScalarValue>,
2024            contained: impl IntoIterator<Item = Option<bool>>,
2025        ) -> Self {
2026            let contained: BooleanArray = contained.into_iter().collect();
2027            let values: HashSet<_> = values.into_iter().collect();
2028
2029            self.contained.push((values, contained));
2030            self.assert_invariants();
2031            self
2032        }
2033
2034        /// get any contained information for the specified values
2035        fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2036            // find the one with the matching values
2037            self.contained
2038                .iter()
2039                .find(|(values, _contained)| values == find_values)
2040                .map(|(_values, contained)| contained.clone())
2041        }
2042    }
2043
2044    #[derive(Debug, Default)]
2045    struct TestStatistics {
2046        // key: column name
2047        stats: HashMap<Column, ContainerStats>,
2048    }
2049
2050    impl TestStatistics {
2051        fn new() -> Self {
2052            Self::default()
2053        }
2054
2055        fn with(
2056            mut self,
2057            name: impl Into<String>,
2058            container_stats: ContainerStats,
2059        ) -> Self {
2060            let col = Column::from_name(name.into());
2061            self.stats.insert(col, container_stats);
2062            self
2063        }
2064
2065        /// Add null counts for the specified column.
2066        /// There must be the same number of null counts as
2067        /// there are containers
2068        fn with_null_counts(
2069            mut self,
2070            name: impl Into<String>,
2071            counts: impl IntoIterator<Item = Option<u64>>,
2072        ) -> Self {
2073            let col = Column::from_name(name.into());
2074
2075            // take stats out and update them
2076            let container_stats = self
2077                .stats
2078                .remove(&col)
2079                .unwrap_or_default()
2080                .with_null_counts(counts);
2081
2082            // put stats back in
2083            self.stats.insert(col, container_stats);
2084            self
2085        }
2086
2087        /// Add row counts for the specified column.
2088        /// There must be the same number of row counts as
2089        /// there are containers
2090        fn with_row_counts(
2091            mut self,
2092            name: impl Into<String>,
2093            counts: impl IntoIterator<Item = Option<u64>>,
2094        ) -> Self {
2095            let col = Column::from_name(name.into());
2096
2097            // take stats out and update them
2098            let container_stats = self
2099                .stats
2100                .remove(&col)
2101                .unwrap_or_default()
2102                .with_row_counts(counts);
2103
2104            // put stats back in
2105            self.stats.insert(col, container_stats);
2106            self
2107        }
2108
2109        /// Add contained information for the specified column.
2110        fn with_contained(
2111            mut self,
2112            name: impl Into<String>,
2113            values: impl IntoIterator<Item = ScalarValue>,
2114            contained: impl IntoIterator<Item = Option<bool>>,
2115        ) -> Self {
2116            let col = Column::from_name(name.into());
2117
2118            // take stats out and update them
2119            let container_stats = self
2120                .stats
2121                .remove(&col)
2122                .unwrap_or_default()
2123                .with_contained(values, contained);
2124
2125            // put stats back in
2126            self.stats.insert(col, container_stats);
2127            self
2128        }
2129    }
2130
2131    impl PruningStatistics for TestStatistics {
2132        fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2133            self.stats
2134                .get(column)
2135                .map(|container_stats| container_stats.min())
2136                .unwrap_or(None)
2137        }
2138
2139        fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2140            self.stats
2141                .get(column)
2142                .map(|container_stats| container_stats.max())
2143                .unwrap_or(None)
2144        }
2145
2146        fn num_containers(&self) -> usize {
2147            self.stats
2148                .values()
2149                .next()
2150                .map(|container_stats| container_stats.len())
2151                .unwrap_or(0)
2152        }
2153
2154        fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2155            self.stats
2156                .get(column)
2157                .map(|container_stats| container_stats.null_counts())
2158                .unwrap_or(None)
2159        }
2160
2161        fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2162            self.stats
2163                .get(column)
2164                .map(|container_stats| container_stats.row_counts())
2165                .unwrap_or(None)
2166        }
2167
2168        fn contained(
2169            &self,
2170            column: &Column,
2171            values: &HashSet<ScalarValue>,
2172        ) -> Option<BooleanArray> {
2173            self.stats
2174                .get(column)
2175                .and_then(|container_stats| container_stats.contained(values))
2176        }
2177    }
2178
2179    /// Returns the specified min/max container values
2180    struct OneContainerStats {
2181        min_values: Option<ArrayRef>,
2182        max_values: Option<ArrayRef>,
2183        num_containers: usize,
2184    }
2185
2186    impl PruningStatistics for OneContainerStats {
2187        fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2188            self.min_values.clone()
2189        }
2190
2191        fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2192            self.max_values.clone()
2193        }
2194
2195        fn num_containers(&self) -> usize {
2196            self.num_containers
2197        }
2198
2199        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2200            None
2201        }
2202
2203        fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2204            None
2205        }
2206
2207        fn contained(
2208            &self,
2209            _column: &Column,
2210            _values: &HashSet<ScalarValue>,
2211        ) -> Option<BooleanArray> {
2212            None
2213        }
2214    }
2215
2216    /// Row count should only be referenced once in the pruning expression, even if we need the row count
2217    /// for multiple columns.
2218    #[test]
2219    fn test_unique_row_count_field_and_column() {
2220        // c1 = 100 AND c2 = 200
2221        let schema: SchemaRef = Arc::new(Schema::new(vec![
2222            Field::new("c1", DataType::Int32, true),
2223            Field::new("c2", DataType::Int32, true),
2224        ]));
2225        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2226        let expr = logical2physical(&expr, &schema);
2227        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2228        // note pruning expression refers to row_count twice
2229        assert_eq!(
2230            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2231            p.predicate_expr.to_string()
2232        );
2233
2234        // Fields in required schema should be unique, otherwise when creating batches
2235        // it will fail because of duplicate field names
2236        let mut fields = HashSet::new();
2237        for (_col, _ty, field) in p.required_columns().iter() {
2238            let was_new = fields.insert(field);
2239            if !was_new {
2240                panic!(
2241                    "Duplicate field in required schema: {field:?}. Previous fields:\n{fields:#?}"
2242                );
2243            }
2244        }
2245    }
2246
2247    #[test]
2248    fn prune_all_rows_null_counts() {
2249        // if null_count = row_count then we should prune the container for i = 0
2250        // regardless of the statistics
2251        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2252        let statistics = TestStatistics::new().with(
2253            "i",
2254            ContainerStats::new_i32(
2255                vec![Some(0)], // min
2256                vec![Some(0)], // max
2257            )
2258            .with_null_counts(vec![Some(1)])
2259            .with_row_counts(vec![Some(1)]),
2260        );
2261        let expected_ret = &[false];
2262        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2263
2264        // this should be true even if the container stats are missing
2265        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2266        let container_stats = ContainerStats {
2267            min: Some(Arc::new(Int32Array::from(vec![None]))),
2268            max: Some(Arc::new(Int32Array::from(vec![None]))),
2269            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2270            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2271            ..ContainerStats::default()
2272        };
2273        let statistics = TestStatistics::new().with("i", container_stats);
2274        let expected_ret = &[false];
2275        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2276
2277        // If the null counts themselves are missing we should be able to fall back to the stats
2278        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2279        let container_stats = ContainerStats {
2280            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2281            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2282            null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2283            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2284            ..ContainerStats::default()
2285        };
2286        let statistics = TestStatistics::new().with("i", container_stats);
2287        let expected_ret = &[true];
2288        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2289        let expected_ret = &[false];
2290        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2291
2292        // Same for the row counts
2293        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2294        let container_stats = ContainerStats {
2295            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2296            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2297            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2298            row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2299            ..ContainerStats::default()
2300        };
2301        let statistics = TestStatistics::new().with("i", container_stats);
2302        let expected_ret = &[true];
2303        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2304        let expected_ret = &[false];
2305        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2306    }
2307
2308    #[test]
2309    fn prune_missing_statistics() {
2310        // If the min or max stats are missing we should not prune
2311        // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2312        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2313        let container_stats = ContainerStats {
2314            min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2315            max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2316            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2317            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2318            ..ContainerStats::default()
2319        };
2320        let statistics = TestStatistics::new().with("i", container_stats);
2321        let expected_ret = &[true, true];
2322        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2323        let expected_ret = &[false, true];
2324        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2325        let expected_ret = &[true, false];
2326        prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2327    }
2328
2329    #[test]
2330    fn prune_null_stats() {
2331        // if null_count = row_count then we should prune the container for i = 0
2332        // regardless of the statistics
2333        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2334
2335        let statistics = TestStatistics::new().with(
2336            "i",
2337            ContainerStats::new_i32(
2338                vec![Some(0)], // min
2339                vec![Some(0)], // max
2340            )
2341            .with_null_counts(vec![Some(1)])
2342            .with_row_counts(vec![Some(1)]),
2343        );
2344
2345        let expected_ret = &[false];
2346
2347        // i = 0
2348        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2349    }
2350
2351    #[test]
2352    fn test_build_statistics_record_batch() {
2353        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2354        let required_columns = RequiredColumns::from(vec![
2355            // min of original column s1, named s1_min
2356            (
2357                phys_expr::Column::new("s1", 1),
2358                StatisticsType::Min,
2359                Field::new("s1_min", DataType::Int32, true),
2360            ),
2361            // max of original column s2, named s2_max
2362            (
2363                phys_expr::Column::new("s2", 2),
2364                StatisticsType::Max,
2365                Field::new("s2_max", DataType::Int32, true),
2366            ),
2367            // max of original column s3, named s3_max
2368            (
2369                phys_expr::Column::new("s3", 3),
2370                StatisticsType::Max,
2371                Field::new("s3_max", DataType::Utf8, true),
2372            ),
2373            // min of original column s3, named s3_min
2374            (
2375                phys_expr::Column::new("s3", 3),
2376                StatisticsType::Min,
2377                Field::new("s3_min", DataType::Utf8, true),
2378            ),
2379        ]);
2380
2381        let statistics = TestStatistics::new()
2382            .with(
2383                "s1",
2384                ContainerStats::new_i32(
2385                    vec![None, None, Some(9), None],  // min
2386                    vec![Some(10), None, None, None], // max
2387                ),
2388            )
2389            .with(
2390                "s2",
2391                ContainerStats::new_i32(
2392                    vec![Some(2), None, None, None],  // min
2393                    vec![Some(20), None, None, None], // max
2394                ),
2395            )
2396            .with(
2397                "s3",
2398                ContainerStats::new_utf8(
2399                    vec![Some("a"), None, None, None],      // min
2400                    vec![Some("q"), None, Some("r"), None], // max
2401                ),
2402            );
2403
2404        let batch =
2405            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2406        assert_snapshot!(batches_to_string(&[batch]), @r"
2407        +--------+--------+--------+--------+
2408        | s1_min | s2_max | s3_max | s3_min |
2409        +--------+--------+--------+--------+
2410        |        | 20     | q      | a      |
2411        |        |        |        |        |
2412        | 9      |        | r      |        |
2413        |        |        |        |        |
2414        +--------+--------+--------+--------+
2415        ");
2416    }
2417
2418    #[test]
2419    fn test_build_statistics_casting() {
2420        // Test requesting a Timestamp column, but getting statistics as Int64
2421        // which is what Parquet does
2422
2423        // Request a record batch with of s1_min as a timestamp
2424        let required_columns = RequiredColumns::from(vec![(
2425            phys_expr::Column::new("s3", 3),
2426            StatisticsType::Min,
2427            Field::new(
2428                "s1_min",
2429                DataType::Timestamp(TimeUnit::Nanosecond, None),
2430                true,
2431            ),
2432        )]);
2433
2434        // Note the statistics pass back i64 (not timestamp)
2435        let statistics = OneContainerStats {
2436            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2437            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2438            num_containers: 1,
2439        };
2440
2441        let batch =
2442            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2443
2444        assert_snapshot!(batches_to_string(&[batch]), @r"
2445        +-------------------------------+
2446        | s1_min                        |
2447        +-------------------------------+
2448        | 1970-01-01T00:00:00.000000010 |
2449        +-------------------------------+
2450        ");
2451    }
2452
2453    #[test]
2454    fn test_build_statistics_no_required_stats() {
2455        let required_columns = RequiredColumns::new();
2456
2457        let statistics = OneContainerStats {
2458            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2459            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2460            num_containers: 1,
2461        };
2462
2463        let batch =
2464            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2465        assert_eq!(batch.num_rows(), 1); // had 1 container
2466    }
2467
2468    #[test]
2469    fn test_build_statistics_inconsistent_types() {
2470        // Test requesting a Utf8 column when the stats return some other type
2471
2472        // Request a record batch with of s1_min as a timestamp
2473        let required_columns = RequiredColumns::from(vec![(
2474            phys_expr::Column::new("s3", 3),
2475            StatisticsType::Min,
2476            Field::new("s1_min", DataType::Utf8, true),
2477        )]);
2478
2479        // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2480        let statistics = OneContainerStats {
2481            min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2482            max_values: None,
2483            num_containers: 1,
2484        };
2485
2486        let batch =
2487            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2488        assert_snapshot!(batches_to_string(&[batch]), @r"
2489        +--------+
2490        | s1_min |
2491        +--------+
2492        |        |
2493        +--------+
2494        ");
2495    }
2496
2497    #[test]
2498    fn test_build_statistics_inconsistent_length() {
2499        // return an inconsistent length to the actual statistics arrays
2500        let required_columns = RequiredColumns::from(vec![(
2501            phys_expr::Column::new("s1", 3),
2502            StatisticsType::Min,
2503            Field::new("s1_min", DataType::Int64, true),
2504        )]);
2505
2506        // Note the statistics pass back i64 (not timestamp)
2507        let statistics = OneContainerStats {
2508            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2509            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2510            num_containers: 3,
2511        };
2512
2513        let result =
2514            build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2515        assert!(
2516            result
2517                .to_string()
2518                .contains("mismatched statistics length. Expected 3, got 1"),
2519            "{}",
2520            result
2521        );
2522    }
2523
2524    #[test]
2525    fn row_group_predicate_eq() -> Result<()> {
2526        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2527        let expected_expr =
2528            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2529
2530        // test column on the left
2531        let expr = col("c1").eq(lit(1));
2532        let predicate_expr =
2533            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2534        assert_eq!(predicate_expr.to_string(), expected_expr);
2535
2536        // test column on the right
2537        let expr = lit(1).eq(col("c1"));
2538        let predicate_expr =
2539            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2540        assert_eq!(predicate_expr.to_string(), expected_expr);
2541
2542        Ok(())
2543    }
2544
2545    #[test]
2546    fn row_group_predicate_not_eq() -> Result<()> {
2547        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2548        let expected_expr =
2549            "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2550
2551        // test column on the left
2552        let expr = col("c1").not_eq(lit(1));
2553        let predicate_expr =
2554            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2555        assert_eq!(predicate_expr.to_string(), expected_expr);
2556
2557        // test column on the right
2558        let expr = lit(1).not_eq(col("c1"));
2559        let predicate_expr =
2560            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2561        assert_eq!(predicate_expr.to_string(), expected_expr);
2562
2563        Ok(())
2564    }
2565
2566    #[test]
2567    fn row_group_predicate_gt() -> Result<()> {
2568        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2569        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2570
2571        // test column on the left
2572        let expr = col("c1").gt(lit(1));
2573        let predicate_expr =
2574            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2575        assert_eq!(predicate_expr.to_string(), expected_expr);
2576
2577        // test column on the right
2578        let expr = lit(1).lt(col("c1"));
2579        let predicate_expr =
2580            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2581        assert_eq!(predicate_expr.to_string(), expected_expr);
2582
2583        Ok(())
2584    }
2585
2586    #[test]
2587    fn row_group_predicate_gt_eq() -> Result<()> {
2588        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2589        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2590
2591        // test column on the left
2592        let expr = col("c1").gt_eq(lit(1));
2593        let predicate_expr =
2594            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2595        assert_eq!(predicate_expr.to_string(), expected_expr);
2596        // test column on the right
2597        let expr = lit(1).lt_eq(col("c1"));
2598        let predicate_expr =
2599            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2600        assert_eq!(predicate_expr.to_string(), expected_expr);
2601
2602        Ok(())
2603    }
2604
2605    #[test]
2606    fn row_group_predicate_lt() -> Result<()> {
2607        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2608        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2609
2610        // test column on the left
2611        let expr = col("c1").lt(lit(1));
2612        let predicate_expr =
2613            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2614        assert_eq!(predicate_expr.to_string(), expected_expr);
2615
2616        // test column on the right
2617        let expr = lit(1).gt(col("c1"));
2618        let predicate_expr =
2619            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2620        assert_eq!(predicate_expr.to_string(), expected_expr);
2621
2622        Ok(())
2623    }
2624
2625    #[test]
2626    fn row_group_predicate_lt_eq() -> Result<()> {
2627        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2628        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2629
2630        // test column on the left
2631        let expr = col("c1").lt_eq(lit(1));
2632        let predicate_expr =
2633            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2634        assert_eq!(predicate_expr.to_string(), expected_expr);
2635        // test column on the right
2636        let expr = lit(1).gt_eq(col("c1"));
2637        let predicate_expr =
2638            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2639        assert_eq!(predicate_expr.to_string(), expected_expr);
2640
2641        Ok(())
2642    }
2643
2644    #[test]
2645    fn row_group_predicate_and() -> Result<()> {
2646        let schema = Schema::new(vec![
2647            Field::new("c1", DataType::Int32, false),
2648            Field::new("c2", DataType::Int32, false),
2649            Field::new("c3", DataType::Int32, false),
2650        ]);
2651        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2652        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2653        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2654        let predicate_expr =
2655            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2656        assert_eq!(predicate_expr.to_string(), expected_expr);
2657
2658        Ok(())
2659    }
2660
2661    #[test]
2662    fn row_group_predicate_or() -> Result<()> {
2663        let schema = Schema::new(vec![
2664            Field::new("c1", DataType::Int32, false),
2665            Field::new("c2", DataType::Int32, false),
2666        ]);
2667        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2668        let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2669        let expected_expr = "true";
2670        let predicate_expr =
2671            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2672        assert_eq!(predicate_expr.to_string(), expected_expr);
2673
2674        Ok(())
2675    }
2676
2677    #[test]
2678    fn row_group_predicate_not() -> Result<()> {
2679        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2680        let expected_expr = "true";
2681
2682        let expr = col("c1").not();
2683        let predicate_expr =
2684            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2685        assert_eq!(predicate_expr.to_string(), expected_expr);
2686
2687        Ok(())
2688    }
2689
2690    #[test]
2691    fn row_group_predicate_not_bool() -> Result<()> {
2692        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2693        let expected_expr = "NOT c1_min@0 AND c1_max@1";
2694
2695        let expr = col("c1").not();
2696        let predicate_expr =
2697            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2698        assert_eq!(predicate_expr.to_string(), expected_expr);
2699
2700        Ok(())
2701    }
2702
2703    #[test]
2704    fn row_group_predicate_bool() -> Result<()> {
2705        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2706        let expected_expr = "c1_min@0 OR c1_max@1";
2707
2708        let expr = col("c1");
2709        let predicate_expr =
2710            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2711        assert_eq!(predicate_expr.to_string(), expected_expr);
2712
2713        Ok(())
2714    }
2715
2716    #[test]
2717    fn row_group_predicate_lt_bool() -> Result<()> {
2718        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2719        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2720
2721        // DF doesn't support arithmetic on boolean columns so
2722        // this predicate will error when evaluated
2723        let expr = col("c1").lt(lit(true));
2724        let predicate_expr =
2725            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2726        assert_eq!(predicate_expr.to_string(), expected_expr);
2727
2728        Ok(())
2729    }
2730
2731    #[test]
2732    fn row_group_predicate_required_columns() -> Result<()> {
2733        let schema = Schema::new(vec![
2734            Field::new("c1", DataType::Int32, false),
2735            Field::new("c2", DataType::Int32, false),
2736        ]);
2737        let mut required_columns = RequiredColumns::new();
2738        // c1 < 1 and (c2 = 2 or c2 = 3)
2739        let expr = col("c1")
2740            .lt(lit(1))
2741            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2742        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2743        let predicate_expr =
2744            test_build_predicate_expression(&expr, &schema, &mut required_columns);
2745        assert_eq!(predicate_expr.to_string(), expected_expr);
2746        println!("required_columns: {required_columns:#?}"); // for debugging assertions below
2747                                                             // c1 < 1 should add c1_min
2748        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2749        assert_eq!(
2750            required_columns.columns[0],
2751            (
2752                phys_expr::Column::new("c1", 0),
2753                StatisticsType::Min,
2754                c1_min_field.with_nullable(true) // could be nullable if stats are not present
2755            )
2756        );
2757        // c1 < 1 should add c1_null_count
2758        let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2759        assert_eq!(
2760            required_columns.columns[1],
2761            (
2762                phys_expr::Column::new("c1", 0),
2763                StatisticsType::NullCount,
2764                c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
2765            )
2766        );
2767        // c1 < 1 should add row_count
2768        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2769        assert_eq!(
2770            required_columns.columns[2],
2771            (
2772                phys_expr::Column::new("c1", 0),
2773                StatisticsType::RowCount,
2774                row_count_field.with_nullable(true) // could be nullable if stats are not present
2775            )
2776        );
2777        // c2 = 2 should add c2_min and c2_max
2778        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2779        assert_eq!(
2780            required_columns.columns[3],
2781            (
2782                phys_expr::Column::new("c2", 1),
2783                StatisticsType::Min,
2784                c2_min_field.with_nullable(true) // could be nullable if stats are not present
2785            )
2786        );
2787        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2788        assert_eq!(
2789            required_columns.columns[4],
2790            (
2791                phys_expr::Column::new("c2", 1),
2792                StatisticsType::Max,
2793                c2_max_field.with_nullable(true) // could be nullable if stats are not present
2794            )
2795        );
2796        // c2 = 2 should add c2_null_count
2797        let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2798        assert_eq!(
2799            required_columns.columns[5],
2800            (
2801                phys_expr::Column::new("c2", 1),
2802                StatisticsType::NullCount,
2803                c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
2804            )
2805        );
2806        // c2 = 1 should add row_count
2807        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2808        assert_eq!(
2809            required_columns.columns[2],
2810            (
2811                phys_expr::Column::new("c1", 0),
2812                StatisticsType::RowCount,
2813                row_count_field.with_nullable(true) // could be nullable if stats are not present
2814            )
2815        );
2816        // c2 = 3 shouldn't add any new statistics fields
2817        assert_eq!(required_columns.columns.len(), 6);
2818
2819        Ok(())
2820    }
2821
2822    #[test]
2823    fn row_group_predicate_in_list() -> Result<()> {
2824        let schema = Schema::new(vec![
2825            Field::new("c1", DataType::Int32, false),
2826            Field::new("c2", DataType::Int32, false),
2827        ]);
2828        // test c1 in(1, 2, 3)
2829        let expr = Expr::InList(InList::new(
2830            Box::new(col("c1")),
2831            vec![lit(1), lit(2), lit(3)],
2832            false,
2833        ));
2834        let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2835        let predicate_expr =
2836            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2837        assert_eq!(predicate_expr.to_string(), expected_expr);
2838
2839        Ok(())
2840    }
2841
2842    #[test]
2843    fn row_group_predicate_in_list_empty() -> Result<()> {
2844        let schema = Schema::new(vec![
2845            Field::new("c1", DataType::Int32, false),
2846            Field::new("c2", DataType::Int32, false),
2847        ]);
2848        // test c1 in()
2849        let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2850        let expected_expr = "true";
2851        let predicate_expr =
2852            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2853        assert_eq!(predicate_expr.to_string(), expected_expr);
2854
2855        Ok(())
2856    }
2857
2858    #[test]
2859    fn row_group_predicate_in_list_negated() -> Result<()> {
2860        let schema = Schema::new(vec![
2861            Field::new("c1", DataType::Int32, false),
2862            Field::new("c2", DataType::Int32, false),
2863        ]);
2864        // test c1 not in(1, 2, 3)
2865        let expr = Expr::InList(InList::new(
2866            Box::new(col("c1")),
2867            vec![lit(1), lit(2), lit(3)],
2868            true,
2869        ));
2870        let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2871        let predicate_expr =
2872            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2873        assert_eq!(predicate_expr.to_string(), expected_expr);
2874
2875        Ok(())
2876    }
2877
2878    #[test]
2879    fn row_group_predicate_between() -> Result<()> {
2880        let schema = Schema::new(vec![
2881            Field::new("c1", DataType::Int32, false),
2882            Field::new("c2", DataType::Int32, false),
2883        ]);
2884
2885        // test c1 BETWEEN 1 AND 5
2886        let expr1 = col("c1").between(lit(1), lit(5));
2887
2888        // test 1 <= c1 <= 5
2889        let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2890
2891        let predicate_expr1 =
2892            test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2893
2894        let predicate_expr2 =
2895            test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2896        assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2897
2898        Ok(())
2899    }
2900
2901    #[test]
2902    fn row_group_predicate_between_with_in_list() -> Result<()> {
2903        let schema = Schema::new(vec![
2904            Field::new("c1", DataType::Int32, false),
2905            Field::new("c2", DataType::Int32, false),
2906        ]);
2907        // test c1 in(1, 2)
2908        let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2909
2910        // test c2 BETWEEN 4 AND 5
2911        let expr2 = col("c2").between(lit(4), lit(5));
2912
2913        // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
2914        let expr3 = expr1.and(expr2);
2915
2916        let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2917        let predicate_expr =
2918            test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2919        assert_eq!(predicate_expr.to_string(), expected_expr);
2920
2921        Ok(())
2922    }
2923
2924    #[test]
2925    fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2926        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2927        // test c1 in(1..21)
2928        // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
2929        // always true
2930        let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2931
2932        let expected_expr = "true";
2933        let predicate_expr =
2934            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2935        assert_eq!(predicate_expr.to_string(), expected_expr);
2936
2937        Ok(())
2938    }
2939
2940    #[test]
2941    fn row_group_predicate_cast_int_int() -> Result<()> {
2942        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2943        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
2944
2945        // test cast(c1 as int64) = 1
2946        // test column on the left
2947        let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
2948        let predicate_expr =
2949            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2950        assert_eq!(predicate_expr.to_string(), expected_expr);
2951
2952        // test column on the right
2953        let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
2954        let predicate_expr =
2955            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2956        assert_eq!(predicate_expr.to_string(), expected_expr);
2957
2958        let expected_expr =
2959            "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
2960
2961        // test column on the left
2962        let expr =
2963            try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
2964        let predicate_expr =
2965            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2966        assert_eq!(predicate_expr.to_string(), expected_expr);
2967
2968        // test column on the right
2969        let expr =
2970            lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
2971        let predicate_expr =
2972            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2973        assert_eq!(predicate_expr.to_string(), expected_expr);
2974
2975        Ok(())
2976    }
2977
2978    #[test]
2979    fn row_group_predicate_cast_string_string() -> Result<()> {
2980        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
2981        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Utf8) <= 1 AND 1 <= CAST(c1_max@1 AS Utf8)";
2982
2983        // test column on the left
2984        let expr = cast(col("c1"), DataType::Utf8)
2985            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
2986        let predicate_expr =
2987            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2988        assert_eq!(predicate_expr.to_string(), expected_expr);
2989
2990        // test column on the right
2991        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
2992            .eq(cast(col("c1"), DataType::Utf8));
2993        let predicate_expr =
2994            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2995        assert_eq!(predicate_expr.to_string(), expected_expr);
2996
2997        Ok(())
2998    }
2999
3000    #[test]
3001    fn row_group_predicate_cast_string_int() -> Result<()> {
3002        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
3003        let expected_expr = "true";
3004
3005        // test column on the left
3006        let expr = cast(col("c1"), DataType::Int32).eq(lit(ScalarValue::Int32(Some(1))));
3007        let predicate_expr =
3008            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3009        assert_eq!(predicate_expr.to_string(), expected_expr);
3010
3011        // test column on the right
3012        let expr = lit(ScalarValue::Int32(Some(1))).eq(cast(col("c1"), DataType::Int32));
3013        let predicate_expr =
3014            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3015        assert_eq!(predicate_expr.to_string(), expected_expr);
3016
3017        Ok(())
3018    }
3019
3020    #[test]
3021    fn row_group_predicate_cast_int_string() -> Result<()> {
3022        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3023        let expected_expr = "true";
3024
3025        // test column on the left
3026        let expr = cast(col("c1"), DataType::Utf8)
3027            .eq(lit(ScalarValue::Utf8(Some("1".to_string()))));
3028        let predicate_expr =
3029            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3030        assert_eq!(predicate_expr.to_string(), expected_expr);
3031
3032        // test column on the right
3033        let expr = lit(ScalarValue::Utf8(Some("1".to_string())))
3034            .eq(cast(col("c1"), DataType::Utf8));
3035        let predicate_expr =
3036            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3037        assert_eq!(predicate_expr.to_string(), expected_expr);
3038
3039        Ok(())
3040    }
3041
3042    #[test]
3043    fn row_group_predicate_date_date() -> Result<()> {
3044        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3045        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Date64) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Date64)";
3046
3047        // test column on the left
3048        let expr =
3049            cast(col("c1"), DataType::Date64).eq(lit(ScalarValue::Date64(Some(123))));
3050        let predicate_expr =
3051            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3052        assert_eq!(predicate_expr.to_string(), expected_expr);
3053
3054        // test column on the right
3055        let expr =
3056            lit(ScalarValue::Date64(Some(123))).eq(cast(col("c1"), DataType::Date64));
3057        let predicate_expr =
3058            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3059        assert_eq!(predicate_expr.to_string(), expected_expr);
3060
3061        Ok(())
3062    }
3063
3064    #[test]
3065    fn row_group_predicate_dict_string_date() -> Result<()> {
3066        // Test with Dictionary<UInt8, Utf8> for the literal
3067        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3068        let expected_expr = "true";
3069
3070        // test column on the left
3071        let expr = cast(
3072            col("c1"),
3073            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3074        )
3075        .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3076        let predicate_expr =
3077            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3078        assert_eq!(predicate_expr.to_string(), expected_expr);
3079
3080        // test column on the right
3081        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))).eq(cast(
3082            col("c1"),
3083            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3084        ));
3085        let predicate_expr =
3086            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3087        assert_eq!(predicate_expr.to_string(), expected_expr);
3088
3089        Ok(())
3090    }
3091
3092    #[test]
3093    fn row_group_predicate_date_dict_string() -> Result<()> {
3094        // Test with Dictionary<UInt8, Utf8> for the column
3095        let schema = Schema::new(vec![Field::new(
3096            "c1",
3097            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3098            false,
3099        )]);
3100        let expected_expr = "true";
3101
3102        // test column on the left
3103        let expr =
3104            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3105        let predicate_expr =
3106            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3107        assert_eq!(predicate_expr.to_string(), expected_expr);
3108
3109        // test column on the right
3110        let expr =
3111            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3112        let predicate_expr =
3113            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3114        assert_eq!(predicate_expr.to_string(), expected_expr);
3115
3116        Ok(())
3117    }
3118
3119    #[test]
3120    fn row_group_predicate_dict_dict_same_value_type() -> Result<()> {
3121        // Test with Dictionary types that have the same value type but different key types
3122        let schema = Schema::new(vec![Field::new(
3123            "c1",
3124            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
3125            false,
3126        )]);
3127
3128        // Direct comparison with no cast
3129        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3130        let predicate_expr =
3131            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3132        let expected_expr =
3133            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3134        assert_eq!(predicate_expr.to_string(), expected_expr);
3135
3136        // Test with column cast to a dictionary with different key type
3137        let expr = cast(
3138            col("c1"),
3139            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
3140        )
3141        .eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3142        let predicate_expr =
3143            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3144        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Utf8)) <= test AND test <= CAST(c1_max@1 AS Dictionary(UInt16, Utf8))";
3145        assert_eq!(predicate_expr.to_string(), expected_expr);
3146
3147        Ok(())
3148    }
3149
3150    #[test]
3151    fn row_group_predicate_dict_dict_different_value_type() -> Result<()> {
3152        // Test with Dictionary types that have different value types
3153        let schema = Schema::new(vec![Field::new(
3154            "c1",
3155            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Int32)),
3156            false,
3157        )]);
3158        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 123 AND 123 <= CAST(c1_max@1 AS Int64)";
3159
3160        // Test with literal of a different type
3161        let expr =
3162            cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(123))));
3163        let predicate_expr =
3164            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3165        assert_eq!(predicate_expr.to_string(), expected_expr);
3166
3167        Ok(())
3168    }
3169
3170    #[test]
3171    fn row_group_predicate_nested_dict() -> Result<()> {
3172        // Test with nested Dictionary types
3173        let schema = Schema::new(vec![Field::new(
3174            "c1",
3175            DataType::Dictionary(
3176                Box::new(DataType::UInt8),
3177                Box::new(DataType::Dictionary(
3178                    Box::new(DataType::UInt16),
3179                    Box::new(DataType::Utf8),
3180                )),
3181            ),
3182            false,
3183        )]);
3184        let expected_expr =
3185            "c1_null_count@2 != row_count@3 AND c1_min@0 <= test AND test <= c1_max@1";
3186
3187        // Test with a simple literal
3188        let expr = col("c1").eq(lit(ScalarValue::Utf8(Some("test".to_string()))));
3189        let predicate_expr =
3190            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3191        assert_eq!(predicate_expr.to_string(), expected_expr);
3192
3193        Ok(())
3194    }
3195
3196    #[test]
3197    fn row_group_predicate_dict_date_dict_date() -> Result<()> {
3198        // Test with dictionary-wrapped date types for both sides
3199        let schema = Schema::new(vec![Field::new(
3200            "c1",
3201            DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Date32)),
3202            false,
3203        )]);
3204        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Dictionary(UInt16, Date64)) <= 1970-01-01 AND 1970-01-01 <= CAST(c1_max@1 AS Dictionary(UInt16, Date64))";
3205
3206        // Test with a cast to a different date type
3207        let expr = cast(
3208            col("c1"),
3209            DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Date64)),
3210        )
3211        .eq(lit(ScalarValue::Date64(Some(123))));
3212        let predicate_expr =
3213            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3214        assert_eq!(predicate_expr.to_string(), expected_expr);
3215
3216        Ok(())
3217    }
3218
3219    #[test]
3220    fn row_group_predicate_date_string() -> Result<()> {
3221        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8, false)]);
3222        let expected_expr = "true";
3223
3224        // test column on the left
3225        let expr =
3226            cast(col("c1"), DataType::Date32).eq(lit(ScalarValue::Date32(Some(123))));
3227        let predicate_expr =
3228            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3229        assert_eq!(predicate_expr.to_string(), expected_expr);
3230
3231        // test column on the right
3232        let expr =
3233            lit(ScalarValue::Date32(Some(123))).eq(cast(col("c1"), DataType::Date32));
3234        let predicate_expr =
3235            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3236        assert_eq!(predicate_expr.to_string(), expected_expr);
3237
3238        Ok(())
3239    }
3240
3241    #[test]
3242    fn row_group_predicate_string_date() -> Result<()> {
3243        let schema = Schema::new(vec![Field::new("c1", DataType::Date32, false)]);
3244        let expected_expr = "true";
3245
3246        // test column on the left
3247        let expr = cast(col("c1"), DataType::Utf8)
3248            .eq(lit(ScalarValue::Utf8(Some("2024-01-01".to_string()))));
3249        let predicate_expr =
3250            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3251        assert_eq!(predicate_expr.to_string(), expected_expr);
3252
3253        // test column on the right
3254        let expr = lit(ScalarValue::Utf8(Some("2024-01-01".to_string())))
3255            .eq(cast(col("c1"), DataType::Utf8));
3256        let predicate_expr =
3257            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3258        assert_eq!(predicate_expr.to_string(), expected_expr);
3259
3260        Ok(())
3261    }
3262
3263    #[test]
3264    fn row_group_predicate_cast_list() -> Result<()> {
3265        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3266        // test cast(c1 as int64) in int64(1, 2, 3)
3267        let expr = Expr::InList(InList::new(
3268            Box::new(cast(col("c1"), DataType::Int64)),
3269            vec![
3270                lit(ScalarValue::Int64(Some(1))),
3271                lit(ScalarValue::Int64(Some(2))),
3272                lit(ScalarValue::Int64(Some(3))),
3273            ],
3274            false,
3275        ));
3276        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3277        let predicate_expr =
3278            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3279        assert_eq!(predicate_expr.to_string(), expected_expr);
3280
3281        let expr = Expr::InList(InList::new(
3282            Box::new(cast(col("c1"), DataType::Int64)),
3283            vec![
3284                lit(ScalarValue::Int64(Some(1))),
3285                lit(ScalarValue::Int64(Some(2))),
3286                lit(ScalarValue::Int64(Some(3))),
3287            ],
3288            true,
3289        ));
3290        let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3291        let predicate_expr =
3292            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3293        assert_eq!(predicate_expr.to_string(), expected_expr);
3294
3295        Ok(())
3296    }
3297
3298    #[test]
3299    fn prune_decimal_data() {
3300        // decimal(9,2)
3301        let schema = Arc::new(Schema::new(vec![Field::new(
3302            "s1",
3303            DataType::Decimal128(9, 2),
3304            true,
3305        )]));
3306
3307        prune_with_expr(
3308            // s1 > 5
3309            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3310            &schema,
3311            // If the data is written by spark, the physical data type is INT32 in the parquet
3312            // So we use the INT32 type of statistic.
3313            &TestStatistics::new().with(
3314                "s1",
3315                ContainerStats::new_i32(
3316                    vec![Some(0), Some(4), None, Some(3)], // min
3317                    vec![Some(5), Some(6), Some(4), None], // max
3318                ),
3319            ),
3320            &[false, true, false, true],
3321        );
3322
3323        prune_with_expr(
3324            // with cast column to other type
3325            cast(col("s1"), DataType::Decimal128(14, 3))
3326                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3327            &schema,
3328            &TestStatistics::new().with(
3329                "s1",
3330                ContainerStats::new_i32(
3331                    vec![Some(0), Some(4), None, Some(3)], // min
3332                    vec![Some(5), Some(6), Some(4), None], // max
3333                ),
3334            ),
3335            &[false, true, false, true],
3336        );
3337
3338        prune_with_expr(
3339            // with try cast column to other type
3340            try_cast(col("s1"), DataType::Decimal128(14, 3))
3341                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3342            &schema,
3343            &TestStatistics::new().with(
3344                "s1",
3345                ContainerStats::new_i32(
3346                    vec![Some(0), Some(4), None, Some(3)], // min
3347                    vec![Some(5), Some(6), Some(4), None], // max
3348                ),
3349            ),
3350            &[false, true, false, true],
3351        );
3352
3353        // decimal(18,2)
3354        let schema = Arc::new(Schema::new(vec![Field::new(
3355            "s1",
3356            DataType::Decimal128(18, 2),
3357            true,
3358        )]));
3359        prune_with_expr(
3360            // s1 > 5
3361            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3362            &schema,
3363            // If the data is written by spark, the physical data type is INT64 in the parquet
3364            // So we use the INT32 type of statistic.
3365            &TestStatistics::new().with(
3366                "s1",
3367                ContainerStats::new_i64(
3368                    vec![Some(0), Some(4), None, Some(3)], // min
3369                    vec![Some(5), Some(6), Some(4), None], // max
3370                ),
3371            ),
3372            &[false, true, false, true],
3373        );
3374
3375        // decimal(23,2)
3376        let schema = Arc::new(Schema::new(vec![Field::new(
3377            "s1",
3378            DataType::Decimal128(23, 2),
3379            true,
3380        )]));
3381
3382        prune_with_expr(
3383            // s1 > 5
3384            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3385            &schema,
3386            &TestStatistics::new().with(
3387                "s1",
3388                ContainerStats::new_decimal128(
3389                    vec![Some(0), Some(400), None, Some(300)], // min
3390                    vec![Some(500), Some(600), Some(400), None], // max
3391                    23,
3392                    2,
3393                ),
3394            ),
3395            &[false, true, false, true],
3396        );
3397    }
3398
3399    #[test]
3400    fn prune_api() {
3401        let schema = Arc::new(Schema::new(vec![
3402            Field::new("s1", DataType::Utf8, true),
3403            Field::new("s2", DataType::Int32, true),
3404        ]));
3405
3406        let statistics = TestStatistics::new().with(
3407            "s2",
3408            ContainerStats::new_i32(
3409                vec![Some(0), Some(4), None, Some(3)], // min
3410                vec![Some(5), Some(6), None, None],    // max
3411            ),
3412        );
3413        prune_with_expr(
3414            // Prune using s2 > 5
3415            col("s2").gt(lit(5)),
3416            &schema,
3417            &statistics,
3418            // s2 [0, 5] ==> no rows should pass
3419            // s2 [4, 6] ==> some rows could pass
3420            // No stats for s2 ==> some rows could pass
3421            // s2 [3, None] (null max) ==> some rows could pass
3422            &[false, true, true, true],
3423        );
3424
3425        prune_with_expr(
3426            // filter with cast
3427            cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3428            &schema,
3429            &statistics,
3430            &[false, true, true, true],
3431        );
3432    }
3433
3434    #[test]
3435    fn prune_not_eq_data() {
3436        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3437
3438        prune_with_expr(
3439            // Prune using s2 != 'M'
3440            col("s1").not_eq(lit("M")),
3441            &schema,
3442            &TestStatistics::new().with(
3443                "s1",
3444                ContainerStats::new_utf8(
3445                    vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3446                    vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3447                ),
3448            ),
3449            // s1 [A, Z] ==> might have values that pass predicate
3450            // s1 [A, L] ==> all rows pass the predicate
3451            // s1 [N, Z] ==> all rows pass the predicate
3452            // s1 [M, M] ==> all rows do not pass the predicate
3453            // No stats for s2 ==> some rows could pass
3454            // s2 [3, None] (null max) ==> some rows could pass
3455            &[true, true, true, false, true, true],
3456        );
3457    }
3458
3459    /// Creates setup for boolean chunk pruning
3460    ///
3461    /// For predicate "b1" (boolean expr)
3462    /// b1 [false, false] ==> no rows can pass (not keep)
3463    /// b1 [false, true] ==> some rows could pass (must keep)
3464    /// b1 [true, true] ==> all rows must pass (must keep)
3465    /// b1 [NULL, NULL]  ==> unknown (must keep)
3466    /// b1 [false, NULL]  ==> unknown (must keep)
3467    ///
3468    /// For predicate "!b1" (boolean expr)
3469    /// b1 [false, false] ==> all rows pass (must keep)
3470    /// b1 [false, true] ==> some rows could pass (must keep)
3471    /// b1 [true, true] ==> no rows can pass (not keep)
3472    /// b1 [NULL, NULL]  ==> unknown (must keep)
3473    /// b1 [false, NULL]  ==> unknown (must keep)
3474    fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3475        let schema =
3476            Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3477
3478        let statistics = TestStatistics::new().with(
3479            "b1",
3480            ContainerStats::new_bool(
3481                vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3482                vec![Some(false), Some(true), Some(true), None, None],         // max
3483            ),
3484        );
3485        let expected_true = vec![false, true, true, true, true];
3486        let expected_false = vec![true, true, false, true, true];
3487
3488        (schema, statistics, expected_true, expected_false)
3489    }
3490
3491    #[test]
3492    fn prune_bool_const_expr() {
3493        let (schema, statistics, _, _) = bool_setup();
3494
3495        prune_with_expr(
3496            // true
3497            lit(true),
3498            &schema,
3499            &statistics,
3500            &[true, true, true, true, true],
3501        );
3502
3503        prune_with_expr(
3504            // false
3505            lit(false),
3506            &schema,
3507            &statistics,
3508            &[false, false, false, false, false],
3509        );
3510    }
3511
3512    #[test]
3513    fn prune_bool_column() {
3514        let (schema, statistics, expected_true, _) = bool_setup();
3515
3516        prune_with_expr(
3517            // b1
3518            col("b1"),
3519            &schema,
3520            &statistics,
3521            &expected_true,
3522        );
3523    }
3524
3525    #[test]
3526    fn prune_bool_not_column() {
3527        let (schema, statistics, _, expected_false) = bool_setup();
3528
3529        prune_with_expr(
3530            // !b1
3531            col("b1").not(),
3532            &schema,
3533            &statistics,
3534            &expected_false,
3535        );
3536    }
3537
3538    #[test]
3539    fn prune_bool_column_eq_true() {
3540        let (schema, statistics, expected_true, _) = bool_setup();
3541
3542        prune_with_expr(
3543            // b1 = true
3544            col("b1").eq(lit(true)),
3545            &schema,
3546            &statistics,
3547            &expected_true,
3548        );
3549    }
3550
3551    #[test]
3552    fn prune_bool_not_column_eq_true() {
3553        let (schema, statistics, _, expected_false) = bool_setup();
3554
3555        prune_with_expr(
3556            // !b1 = true
3557            col("b1").not().eq(lit(true)),
3558            &schema,
3559            &statistics,
3560            &expected_false,
3561        );
3562    }
3563
3564    /// Creates a setup for chunk pruning, modeling a int32 column "i"
3565    /// with 5 different containers (e.g. RowGroups). They have [min,
3566    /// max]:
3567    ///
3568    /// i [-5, 5]
3569    /// i [1, 11]
3570    /// i [-11, -1]
3571    /// i [NULL, NULL]
3572    /// i [1, NULL]
3573    fn int32_setup() -> (SchemaRef, TestStatistics) {
3574        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3575
3576        let statistics = TestStatistics::new().with(
3577            "i",
3578            ContainerStats::new_i32(
3579                vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3580                vec![Some(5), Some(11), Some(-1), None, None],     // max
3581            ),
3582        );
3583        (schema, statistics)
3584    }
3585
3586    #[test]
3587    fn prune_int32_col_gt_zero() {
3588        let (schema, statistics) = int32_setup();
3589
3590        // Expression "i > 0" and "-i < 0"
3591        // i [-5, 5] ==> some rows could pass (must keep)
3592        // i [1, 11] ==> all rows must pass (must keep)
3593        // i [-11, -1] ==>  no rows can pass (not keep)
3594        // i [NULL, NULL]  ==> unknown (must keep)
3595        // i [1, NULL]  ==> unknown (must keep)
3596        let expected_ret = &[true, true, false, true, true];
3597
3598        // i > 0
3599        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3600
3601        // -i < 0
3602        prune_with_expr(
3603            Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3604            &schema,
3605            &statistics,
3606            expected_ret,
3607        );
3608    }
3609
3610    #[test]
3611    fn prune_int32_col_lte_zero() {
3612        let (schema, statistics) = int32_setup();
3613
3614        // Expression "i <= 0" and "-i >= 0"
3615        // i [-5, 5] ==> some rows could pass (must keep)
3616        // i [1, 11] ==> no rows can pass (not keep)
3617        // i [-11, -1] ==>  all rows must pass (must keep)
3618        // i [NULL, NULL]  ==> unknown (must keep)
3619        // i [1, NULL]  ==> no rows can pass (not keep)
3620        let expected_ret = &[true, false, true, true, false];
3621
3622        prune_with_expr(
3623            // i <= 0
3624            col("i").lt_eq(lit(0)),
3625            &schema,
3626            &statistics,
3627            expected_ret,
3628        );
3629
3630        prune_with_expr(
3631            // -i >= 0
3632            Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3633            &schema,
3634            &statistics,
3635            expected_ret,
3636        );
3637    }
3638
3639    #[test]
3640    fn prune_int32_col_lte_zero_cast() {
3641        let (schema, statistics) = int32_setup();
3642
3643        // Expression "cast(i as utf8) <= '0'"
3644        // i [-5, 5] ==> some rows could pass (must keep)
3645        // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
3646        // i [-11, -1] ==>  no rows could pass in theory (conservatively keep)
3647        // i [NULL, NULL]  ==> unknown (must keep)
3648        // i [1, NULL]  ==> no rows can pass (conservatively keep)
3649        let expected_ret = &[true, true, true, true, true];
3650
3651        prune_with_expr(
3652            // cast(i as utf8) <= 0
3653            cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3654            &schema,
3655            &statistics,
3656            expected_ret,
3657        );
3658
3659        prune_with_expr(
3660            // try_cast(i as utf8) <= 0
3661            try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3662            &schema,
3663            &statistics,
3664            expected_ret,
3665        );
3666
3667        prune_with_expr(
3668            // cast(-i as utf8) >= 0
3669            cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3670            &schema,
3671            &statistics,
3672            expected_ret,
3673        );
3674
3675        prune_with_expr(
3676            // try_cast(-i as utf8) >= 0
3677            try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3678            &schema,
3679            &statistics,
3680            expected_ret,
3681        );
3682    }
3683
3684    #[test]
3685    fn prune_int32_col_eq_zero() {
3686        let (schema, statistics) = int32_setup();
3687
3688        // Expression "i = 0"
3689        // i [-5, 5] ==> some rows could pass (must keep)
3690        // i [1, 11] ==> no rows can pass (not keep)
3691        // i [-11, -1] ==>  no rows can pass (not keep)
3692        // i [NULL, NULL]  ==> unknown (must keep)
3693        // i [1, NULL]  ==> no rows can pass (not keep)
3694        let expected_ret = &[true, false, false, true, false];
3695
3696        prune_with_expr(
3697            // i = 0
3698            col("i").eq(lit(0)),
3699            &schema,
3700            &statistics,
3701            expected_ret,
3702        );
3703    }
3704
3705    #[test]
3706    fn prune_int32_col_eq_zero_cast() {
3707        let (schema, statistics) = int32_setup();
3708
3709        // Expression "cast(i as int64) = 0"
3710        // i [-5, 5] ==> some rows could pass (must keep)
3711        // i [1, 11] ==> no rows can pass (not keep)
3712        // i [-11, -1] ==>  no rows can pass (not keep)
3713        // i [NULL, NULL]  ==> unknown (must keep)
3714        // i [1, NULL]  ==> no rows can pass (not keep)
3715        let expected_ret = &[true, false, false, true, false];
3716
3717        prune_with_expr(
3718            cast(col("i"), DataType::Int64).eq(lit(0i64)),
3719            &schema,
3720            &statistics,
3721            expected_ret,
3722        );
3723
3724        prune_with_expr(
3725            try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3726            &schema,
3727            &statistics,
3728            expected_ret,
3729        );
3730    }
3731
3732    #[test]
3733    fn prune_int32_col_eq_zero_cast_as_str() {
3734        let (schema, statistics) = int32_setup();
3735
3736        // Note the cast is to a string where sorting properties are
3737        // not the same as integers
3738        //
3739        // Expression "cast(i as utf8) = '0'"
3740        // i [-5, 5] ==> some rows could pass (keep)
3741        // i [1, 11] ==> no rows can pass  (could keep)
3742        // i [-11, -1] ==>  no rows can pass (could keep)
3743        // i [NULL, NULL]  ==> unknown (keep)
3744        // i [1, NULL]  ==> no rows can pass (could keep)
3745        let expected_ret = &[true, true, true, true, true];
3746
3747        prune_with_expr(
3748            cast(col("i"), DataType::Utf8).eq(lit("0")),
3749            &schema,
3750            &statistics,
3751            expected_ret,
3752        );
3753    }
3754
3755    #[test]
3756    fn prune_int32_col_lt_neg_one() {
3757        let (schema, statistics) = int32_setup();
3758
3759        // Expression "i > -1" and "-i < 1"
3760        // i [-5, 5] ==> some rows could pass (must keep)
3761        // i [1, 11] ==> all rows must pass (must keep)
3762        // i [-11, -1] ==>  no rows can pass (not keep)
3763        // i [NULL, NULL]  ==> unknown (must keep)
3764        // i [1, NULL]  ==> all rows must pass (must keep)
3765        let expected_ret = &[true, true, false, true, true];
3766
3767        prune_with_expr(
3768            // i > -1
3769            col("i").gt(lit(-1)),
3770            &schema,
3771            &statistics,
3772            expected_ret,
3773        );
3774
3775        prune_with_expr(
3776            // -i < 1
3777            Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3778            &schema,
3779            &statistics,
3780            expected_ret,
3781        );
3782    }
3783
3784    #[test]
3785    fn prune_int32_is_null() {
3786        let (schema, statistics) = int32_setup();
3787
3788        // Expression "i IS NULL" when there are no null statistics,
3789        // should all be kept
3790        let expected_ret = &[true, true, true, true, true];
3791
3792        prune_with_expr(
3793            // i IS NULL, no null statistics
3794            col("i").is_null(),
3795            &schema,
3796            &statistics,
3797            expected_ret,
3798        );
3799
3800        // provide null counts for each column
3801        let statistics = statistics.with_null_counts(
3802            "i",
3803            vec![
3804                Some(0), // no nulls (don't keep)
3805                Some(1), // 1 null
3806                None,    // unknown nulls
3807                None, // unknown nulls (min/max are both null too, like no stats at all)
3808                Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
3809            ],
3810        );
3811
3812        let expected_ret = &[false, true, true, true, false];
3813
3814        prune_with_expr(
3815            // i IS NULL, with actual null statistics
3816            col("i").is_null(),
3817            &schema,
3818            &statistics,
3819            expected_ret,
3820        );
3821    }
3822
3823    #[test]
3824    fn prune_int32_column_is_known_all_null() {
3825        let (schema, statistics) = int32_setup();
3826
3827        // Expression "i < 0"
3828        // i [-5, 5] ==> some rows could pass (must keep)
3829        // i [1, 11] ==> no rows can pass (not keep)
3830        // i [-11, -1] ==>  all rows must pass (must keep)
3831        // i [NULL, NULL]  ==> unknown (must keep)
3832        // i [1, NULL]  ==> no rows can pass (not keep)
3833        let expected_ret = &[true, false, true, true, false];
3834
3835        prune_with_expr(
3836            // i < 0
3837            col("i").lt(lit(0)),
3838            &schema,
3839            &statistics,
3840            expected_ret,
3841        );
3842
3843        // provide row counts for each column
3844        let statistics = statistics.with_row_counts(
3845            "i",
3846            vec![
3847                Some(10), // 10 rows of data
3848                Some(9),  // 9 rows of data
3849                None,     // unknown row counts
3850                Some(4),
3851                Some(10),
3852            ],
3853        );
3854
3855        // pruning result is still the same if we only know row counts
3856        prune_with_expr(
3857            // i < 0, with only row counts statistics
3858            col("i").lt(lit(0)),
3859            &schema,
3860            &statistics,
3861            expected_ret,
3862        );
3863
3864        // provide null counts for each column
3865        let statistics = statistics.with_null_counts(
3866            "i",
3867            vec![
3868                Some(0), // no nulls
3869                Some(1), // 1 null
3870                None,    // unknown nulls
3871                Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
3872                Some(0), // 0 nulls (max=null too which means no known max)
3873            ],
3874        );
3875
3876        // Expression "i < 0" with actual null and row counts statistics
3877        // col | min, max     | row counts | null counts |
3878        // ----+--------------+------------+-------------+
3879        //  i  | [-5, 5]      | 10         | 0           | ==> Some rows could pass (must keep)
3880        //  i  | [1, 11]      | 9          | 1           | ==> No rows can pass (not keep)
3881        //  i  | [-11,-1]     | Unknown    | Unknown     | ==> All rows must pass (must keep)
3882        //  i  | [NULL, NULL] | 4          | 4           | ==> The column is all null (not keep)
3883        //  i  | [1, NULL]    | 10         | 0           | ==> No rows can pass (not keep)
3884        let expected_ret = &[true, false, true, false, false];
3885
3886        prune_with_expr(
3887            // i < 0, with actual null and row counts statistics
3888            col("i").lt(lit(0)),
3889            &schema,
3890            &statistics,
3891            expected_ret,
3892        );
3893    }
3894
3895    #[test]
3896    fn prune_cast_column_scalar() {
3897        // The data type of column i is INT32
3898        let (schema, statistics) = int32_setup();
3899        let expected_ret = &[true, true, false, true, true];
3900
3901        prune_with_expr(
3902            // i > int64(0)
3903            col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3904            &schema,
3905            &statistics,
3906            expected_ret,
3907        );
3908
3909        prune_with_expr(
3910            // cast(i as int64) > int64(0)
3911            cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3912            &schema,
3913            &statistics,
3914            expected_ret,
3915        );
3916
3917        prune_with_expr(
3918            // try_cast(i as int64) > int64(0)
3919            try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3920            &schema,
3921            &statistics,
3922            expected_ret,
3923        );
3924
3925        prune_with_expr(
3926            // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
3927            Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3928                .lt(lit(ScalarValue::Int64(Some(0)))),
3929            &schema,
3930            &statistics,
3931            expected_ret,
3932        );
3933    }
3934
3935    #[test]
3936    fn test_increment_utf8() {
3937        // Basic ASCII
3938        assert_eq!(increment_utf8("abc").unwrap(), "abd");
3939        assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3940
3941        // Test around ASCII 127 (DEL)
3942        assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
3943        assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
3944
3945        // Test 2-byte UTF-8 sequences
3946        assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
3947
3948        // Test 3-byte UTF-8 sequences
3949        assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
3950
3951        // Test at UTF-8 boundaries
3952        assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
3953        assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
3954
3955        // Test that if we can't increment we return None
3956        assert!(increment_utf8("").is_none());
3957        assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
3958
3959        // Test that if we can't increment the last character we do the previous one and truncate
3960        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3961
3962        // Test surrogate pair range (0xD800..=0xDFFF)
3963        assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
3964        assert!(increment_utf8("\u{D7FF}").is_none());
3965
3966        // Test non-characters range (0xFDD0..=0xFDEF)
3967        assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
3968        assert!(increment_utf8("\u{FDCF}").is_none());
3969
3970        // Test private use area limit (>= 0x110000)
3971        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3972        assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
3973    }
3974
3975    /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
3976    /// with 5 different containers (e.g. RowGroups). They have [min,
3977    /// max]:
3978    /// s1 ["A", "Z"]
3979    /// s1 ["A", "L"]
3980    /// s1 ["N", "Z"]
3981    /// s1 [NULL, NULL]
3982    /// s1 ["A", NULL]
3983    /// s1 ["", "A"]
3984    /// s1 ["", ""]
3985    /// s1 ["AB", "A\u{10ffff}"]
3986    /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
3987    fn utf8_setup() -> (SchemaRef, TestStatistics) {
3988        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3989
3990        let statistics = TestStatistics::new().with(
3991            "s1",
3992            ContainerStats::new_utf8(
3993                vec![
3994                    Some("A"),
3995                    Some("A"),
3996                    Some("N"),
3997                    Some("M"),
3998                    None,
3999                    Some("A"),
4000                    Some(""),
4001                    Some(""),
4002                    Some("AB"),
4003                    Some("A\u{10ffff}\u{10ffff}"),
4004                ], // min
4005                vec![
4006                    Some("Z"),
4007                    Some("L"),
4008                    Some("Z"),
4009                    Some("M"),
4010                    None,
4011                    None,
4012                    Some("A"),
4013                    Some(""),
4014                    Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
4015                    Some("A\u{10ffff}\u{10ffff}"),
4016                ], // max
4017            ),
4018        );
4019        (schema, statistics)
4020    }
4021
4022    #[test]
4023    fn prune_utf8_eq() {
4024        let (schema, statistics) = utf8_setup();
4025
4026        let expr = col("s1").eq(lit("A"));
4027        #[rustfmt::skip]
4028        let expected_ret = &[
4029            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4030            true,
4031            // s1 ["A", "L"] ==> some rows could pass (must keep)
4032            true,
4033            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4034            false,
4035            // s1 ["M", "M"] ==> no rows can pass (not keep)
4036            false,
4037            // s1 [NULL, NULL]  ==> unknown (must keep)
4038            true,
4039            // s1 ["A", NULL]  ==> unknown (must keep)
4040            true,
4041            // s1 ["", "A"]  ==> some rows could pass (must keep)
4042            true,
4043            // s1 ["", ""]  ==> no rows can pass (not keep)
4044            false,
4045            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4046            false,
4047            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4048            false,
4049        ];
4050        prune_with_expr(expr, &schema, &statistics, expected_ret);
4051
4052        let expr = col("s1").eq(lit(""));
4053        #[rustfmt::skip]
4054        let expected_ret = &[
4055            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4056            false,
4057            // s1 ["A", "L"] ==> no rows can pass (not keep)
4058            false,
4059            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4060            false,
4061            // s1 ["M", "M"] ==> no rows can pass (not keep)
4062            false,
4063            // s1 [NULL, NULL]  ==> unknown (must keep)
4064            true,
4065            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4066            false,
4067            // s1 ["", "A"]  ==> some rows could pass (must keep)
4068            true,
4069            // s1 ["", ""]  ==> all rows must pass (must keep)
4070            true,
4071            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4072            false,
4073            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4074            false,
4075        ];
4076        prune_with_expr(expr, &schema, &statistics, expected_ret);
4077    }
4078
4079    #[test]
4080    fn prune_utf8_not_eq() {
4081        let (schema, statistics) = utf8_setup();
4082
4083        let expr = col("s1").not_eq(lit("A"));
4084        #[rustfmt::skip]
4085        let expected_ret = &[
4086            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4087            true,
4088            // s1 ["A", "L"] ==> some rows could pass (must keep)
4089            true,
4090            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4091            true,
4092            // s1 ["M", "M"] ==> all rows must pass (must keep)
4093            true,
4094            // s1 [NULL, NULL]  ==> unknown (must keep)
4095            true,
4096            // s1 ["A", NULL]  ==> unknown (must keep)
4097            true,
4098            // s1 ["", "A"]  ==> some rows could pass (must keep)
4099            true,
4100            // s1 ["", ""]  ==> all rows must pass (must keep)
4101            true,
4102            // s1 ["AB", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4103            true,
4104            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4105            true,
4106        ];
4107        prune_with_expr(expr, &schema, &statistics, expected_ret);
4108
4109        let expr = col("s1").not_eq(lit(""));
4110        #[rustfmt::skip]
4111        let expected_ret = &[
4112            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4113            true,
4114            // s1 ["A", "L"] ==> all rows must pass (must keep)
4115            true,
4116            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4117            true,
4118            // s1 ["M", "M"] ==> all rows must pass (must keep)
4119            true,
4120            // s1 [NULL, NULL]  ==> unknown (must keep)
4121            true,
4122            // s1 ["A", NULL]  ==> unknown (must keep)
4123            true,
4124            // s1 ["", "A"]  ==> some rows could pass (must keep)
4125            true,
4126            // s1 ["", ""]  ==> no rows can pass (not keep)
4127            false,
4128            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4129            true,
4130            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4131            true,
4132        ];
4133        prune_with_expr(expr, &schema, &statistics, expected_ret);
4134    }
4135
4136    #[test]
4137    fn prune_utf8_like_one() {
4138        let (schema, statistics) = utf8_setup();
4139
4140        let expr = col("s1").like(lit("A_"));
4141        #[rustfmt::skip]
4142        let expected_ret = &[
4143            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4144            true,
4145            // s1 ["A", "L"] ==> some rows could pass (must keep)
4146            true,
4147            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4148            false,
4149            // s1 ["M", "M"] ==> no rows can pass (not keep)
4150            false,
4151            // s1 [NULL, NULL]  ==> unknown (must keep)
4152            true,
4153            // s1 ["A", NULL]  ==> unknown (must keep)
4154            true,
4155            // s1 ["", "A"]  ==> some rows could pass (must keep)
4156            true,
4157            // s1 ["", ""]  ==> no rows can pass (not keep)
4158            false,
4159            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4160            true,
4161            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4162            true,
4163        ];
4164        prune_with_expr(expr, &schema, &statistics, expected_ret);
4165
4166        let expr = col("s1").like(lit("_A_"));
4167        #[rustfmt::skip]
4168        let expected_ret = &[
4169            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4170            true,
4171            // s1 ["A", "L"] ==> some rows could pass (must keep)
4172            true,
4173            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4174            true,
4175            // s1 ["M", "M"] ==> some rows could pass (must keep)
4176            true,
4177            // s1 [NULL, NULL]  ==> unknown (must keep)
4178            true,
4179            // s1 ["A", NULL]  ==> unknown (must keep)
4180            true,
4181            // s1 ["", "A"]  ==> some rows could pass (must keep)
4182            true,
4183            // s1 ["", ""]  ==> some rows could pass (must keep)
4184            true,
4185            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4186            true,
4187            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4188            true,
4189        ];
4190        prune_with_expr(expr, &schema, &statistics, expected_ret);
4191
4192        let expr = col("s1").like(lit("_"));
4193        #[rustfmt::skip]
4194        let expected_ret = &[
4195            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4196            true,
4197            // s1 ["A", "L"] ==> all rows must pass (must keep)
4198            true,
4199            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4200            true,
4201            // s1 ["M", "M"] ==> all rows must pass (must keep)
4202            true,
4203            // s1 [NULL, NULL]  ==> unknown (must keep)
4204            true,
4205            // s1 ["A", NULL]  ==> unknown (must keep)
4206            true,
4207            // s1 ["", "A"]  ==> all rows must pass (must keep)
4208            true,
4209            // s1 ["", ""]  ==> all rows must pass (must keep)
4210            true,
4211            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4212            true,
4213            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4214            true,
4215        ];
4216        prune_with_expr(expr, &schema, &statistics, expected_ret);
4217
4218        let expr = col("s1").like(lit(""));
4219        #[rustfmt::skip]
4220        let expected_ret = &[
4221            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4222            false,
4223            // s1 ["A", "L"] ==> no rows can pass (not keep)
4224            false,
4225            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4226            false,
4227            // s1 ["M", "M"] ==> no rows can pass (not keep)
4228            false,
4229            // s1 [NULL, NULL]  ==> unknown (must keep)
4230            true,
4231            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4232            false,
4233            // s1 ["", "A"]  ==> some rows could pass (must keep)
4234            true,
4235            // s1 ["", ""]  ==> all rows must pass (must keep)
4236            true,
4237            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4238            false,
4239            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4240            false,
4241        ];
4242        prune_with_expr(expr, &schema, &statistics, expected_ret);
4243    }
4244
4245    #[test]
4246    fn prune_utf8_like_many() {
4247        let (schema, statistics) = utf8_setup();
4248
4249        let expr = col("s1").like(lit("A%"));
4250        #[rustfmt::skip]
4251        let expected_ret = &[
4252            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4253            true,
4254            // s1 ["A", "L"] ==> some rows could pass (must keep)
4255            true,
4256            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4257            false,
4258            // s1 ["M", "M"] ==> no rows can pass (not keep)
4259            false,
4260            // s1 [NULL, NULL]  ==> unknown (must keep)
4261            true,
4262            // s1 ["A", NULL]  ==> unknown (must keep)
4263            true,
4264            // s1 ["", "A"]  ==> some rows could pass (must keep)
4265            true,
4266            // s1 ["", ""]  ==> no rows can pass (not keep)
4267            false,
4268            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4269            true,
4270            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4271            true,
4272        ];
4273        prune_with_expr(expr, &schema, &statistics, expected_ret);
4274
4275        let expr = col("s1").like(lit("%A%"));
4276        #[rustfmt::skip]
4277        let expected_ret = &[
4278            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4279            true,
4280            // s1 ["A", "L"] ==> some rows could pass (must keep)
4281            true,
4282            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4283            true,
4284            // s1 ["M", "M"] ==> some rows could pass (must keep)
4285            true,
4286            // s1 [NULL, NULL]  ==> unknown (must keep)
4287            true,
4288            // s1 ["A", NULL]  ==> unknown (must keep)
4289            true,
4290            // s1 ["", "A"]  ==> some rows could pass (must keep)
4291            true,
4292            // s1 ["", ""]  ==> some rows could pass (must keep)
4293            true,
4294            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4295            true,
4296            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4297            true,
4298        ];
4299        prune_with_expr(expr, &schema, &statistics, expected_ret);
4300
4301        let expr = col("s1").like(lit("%"));
4302        #[rustfmt::skip]
4303        let expected_ret = &[
4304            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4305            true,
4306            // s1 ["A", "L"] ==> all rows must pass (must keep)
4307            true,
4308            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4309            true,
4310            // s1 ["M", "M"] ==> all rows must pass (must keep)
4311            true,
4312            // s1 [NULL, NULL]  ==> unknown (must keep)
4313            true,
4314            // s1 ["A", NULL]  ==> unknown (must keep)
4315            true,
4316            // s1 ["", "A"]  ==> all rows must pass (must keep)
4317            true,
4318            // s1 ["", ""]  ==> all rows must pass (must keep)
4319            true,
4320            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4321            true,
4322            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4323            true,
4324        ];
4325        prune_with_expr(expr, &schema, &statistics, expected_ret);
4326
4327        let expr = col("s1").like(lit(""));
4328        #[rustfmt::skip]
4329        let expected_ret = &[
4330            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4331            false,
4332            // s1 ["A", "L"] ==> no rows can pass (not keep)
4333            false,
4334            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4335            false,
4336            // s1 ["M", "M"] ==> no rows can pass (not keep)
4337            false,
4338            // s1 [NULL, NULL]  ==> unknown (must keep)
4339            true,
4340            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4341            false,
4342            // s1 ["", "A"]  ==> some rows could pass (must keep)
4343            true,
4344            // s1 ["", ""]  ==> all rows must pass (must keep)
4345            true,
4346            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4347            false,
4348            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4349            false,
4350        ];
4351        prune_with_expr(expr, &schema, &statistics, expected_ret);
4352    }
4353
4354    #[test]
4355    fn prune_utf8_not_like_one() {
4356        let (schema, statistics) = utf8_setup();
4357
4358        let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4359        #[rustfmt::skip]
4360        let expected_ret = &[
4361            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4362            true,
4363            // s1 ["A", "L"] ==> some rows could pass (must keep)
4364            true,
4365            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4366            true,
4367            // s1 ["M", "M"] ==> some rows could pass (must keep)
4368            true,
4369            // s1 [NULL, NULL]  ==> unknown (must keep)
4370            true,
4371            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4372            true,
4373            // s1 ["", "A"]  ==> some rows could pass (must keep)
4374            true,
4375            // s1 ["", ""]  ==> some rows could pass (must keep)
4376            true,
4377            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4378            true,
4379            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match. (min, max) maybe truncate 
4380            // orignal (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
4381            true,
4382        ];
4383        prune_with_expr(expr, &schema, &statistics, expected_ret);
4384    }
4385
4386    #[test]
4387    fn prune_utf8_not_like_many() {
4388        let (schema, statistics) = utf8_setup();
4389
4390        let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4391        #[rustfmt::skip]
4392        let expected_ret = &[
4393            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4394            true,
4395            // s1 ["A", "L"] ==> some rows could pass (must keep)
4396            true,
4397            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4398            true,
4399            // s1 ["M", "M"] ==> some rows could pass (must keep)
4400            true,
4401            // s1 [NULL, NULL]  ==> unknown (must keep)
4402            true,
4403            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4404            true,
4405            // s1 ["", "A"]  ==> some rows could pass (must keep)
4406            true,
4407            // s1 ["", ""]  ==> some rows could pass (must keep)
4408            true,
4409            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4410            true,
4411            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match
4412            false,
4413        ];
4414        prune_with_expr(expr, &schema, &statistics, expected_ret);
4415
4416        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4417        #[rustfmt::skip]
4418        let expected_ret = &[
4419            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4420            true,
4421            // s1 ["A", "L"] ==> some rows could pass (must keep)
4422            true,
4423            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4424            true,
4425            // s1 ["M", "M"] ==> some rows could pass (must keep)
4426            true,
4427            // s1 [NULL, NULL]  ==> unknown (must keep)
4428            true,
4429            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4430            true,
4431            // s1 ["", "A"]  ==> some rows could pass (must keep)
4432            true,
4433            // s1 ["", ""]  ==> some rows could pass (must keep)
4434            true,
4435            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4436            true,
4437            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4438            true,
4439        ];
4440        prune_with_expr(expr, &schema, &statistics, expected_ret);
4441
4442        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4443        #[rustfmt::skip]
4444        let expected_ret = &[
4445            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4446            true,
4447            // s1 ["A", "L"] ==> some rows could pass (must keep)
4448            true,
4449            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4450            true,
4451            // s1 ["M", "M"] ==> some rows could pass (must keep)
4452            true,
4453            // s1 [NULL, NULL]  ==> unknown (must keep)
4454            true,
4455            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4456            true,
4457            // s1 ["", "A"]  ==> some rows could pass (must keep)
4458            true,
4459            // s1 ["", ""]  ==> some rows could pass (must keep)
4460            true,
4461            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4462            true,
4463            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4464            true,
4465        ];
4466        prune_with_expr(expr, &schema, &statistics, expected_ret);
4467
4468        let expr = col("s1").not_like(lit("A\\%%"));
4469        let statistics = TestStatistics::new().with(
4470            "s1",
4471            ContainerStats::new_utf8(
4472                vec![Some("A%a"), Some("A")],
4473                vec![Some("A%c"), Some("A")],
4474            ),
4475        );
4476        let expected_ret = &[false, true];
4477        prune_with_expr(expr, &schema, &statistics, expected_ret);
4478    }
4479
4480    #[test]
4481    fn test_rewrite_expr_to_prunable() {
4482        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4483        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4484
4485        // column op lit
4486        let left_input = col("a");
4487        let left_input = logical2physical(&left_input, &schema);
4488        let right_input = lit(ScalarValue::Int32(Some(12)));
4489        let right_input = logical2physical(&right_input, &schema);
4490        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4491            &left_input,
4492            Operator::Eq,
4493            &right_input,
4494            df_schema.clone(),
4495        )
4496        .unwrap();
4497        assert_eq!(result_left.to_string(), left_input.to_string());
4498        assert_eq!(result_right.to_string(), right_input.to_string());
4499
4500        // cast op lit
4501        let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4502        let left_input = logical2physical(&left_input, &schema);
4503        let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4504        let right_input = logical2physical(&right_input, &schema);
4505        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4506            &left_input,
4507            Operator::Gt,
4508            &right_input,
4509            df_schema.clone(),
4510        )
4511        .unwrap();
4512        assert_eq!(result_left.to_string(), left_input.to_string());
4513        assert_eq!(result_right.to_string(), right_input.to_string());
4514
4515        // try_cast op lit
4516        let left_input = try_cast(col("a"), DataType::Int64);
4517        let left_input = logical2physical(&left_input, &schema);
4518        let right_input = lit(ScalarValue::Int64(Some(12)));
4519        let right_input = logical2physical(&right_input, &schema);
4520        let (result_left, _, result_right) =
4521            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4522                .unwrap();
4523        assert_eq!(result_left.to_string(), left_input.to_string());
4524        assert_eq!(result_right.to_string(), right_input.to_string());
4525
4526        // TODO: add test for other case and op
4527    }
4528
4529    #[test]
4530    fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4531        struct CustomUnhandledHook;
4532
4533        impl UnhandledPredicateHook for CustomUnhandledHook {
4534            /// This handles an arbitrary case of a column that doesn't exist in the schema
4535            /// by renaming it to yet another column that doesn't exist in the schema
4536            /// (the transformation is arbitrary, the point is that it can do whatever it wants)
4537            fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4538                Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4539            }
4540        }
4541
4542        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4543        let schema_with_b = Schema::new(vec![
4544            Field::new("a", DataType::Int32, true),
4545            Field::new("b", DataType::Int32, true),
4546        ]);
4547
4548        let rewriter = PredicateRewriter::new()
4549            .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4550
4551        let transform_expr = |expr| {
4552            let expr = logical2physical(&expr, &schema_with_b);
4553            rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4554        };
4555
4556        // transform an arbitrary valid expression that we know is handled
4557        let known_expression = col("a").eq(lit(12));
4558        let known_expression_transformed = PredicateRewriter::new()
4559            .rewrite_predicate_to_statistics_predicate(
4560                &logical2physical(&known_expression, &schema),
4561                &schema,
4562            );
4563
4564        // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
4565        let input = col("b").eq(lit(12));
4566        let expected = logical2physical(&lit(42), &schema);
4567        let transformed = transform_expr(input.clone());
4568        assert_eq!(transformed.to_string(), expected.to_string());
4569
4570        // more complex case with unknown column
4571        let input = known_expression.clone().and(input.clone());
4572        let expected = phys_expr::BinaryExpr::new(
4573            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4574            Operator::And,
4575            logical2physical(&lit(42), &schema),
4576        );
4577        let transformed = transform_expr(input.clone());
4578        assert_eq!(transformed.to_string(), expected.to_string());
4579
4580        // an unknown expression gets passed to the hook
4581        let input = array_has(make_array(vec![lit(1)]), col("a"));
4582        let expected = logical2physical(&lit(42), &schema);
4583        let transformed = transform_expr(input.clone());
4584        assert_eq!(transformed.to_string(), expected.to_string());
4585
4586        // more complex case with unknown expression
4587        let input = known_expression.and(input);
4588        let expected = phys_expr::BinaryExpr::new(
4589            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4590            Operator::And,
4591            logical2physical(&lit(42), &schema),
4592        );
4593        let transformed = transform_expr(input.clone());
4594        assert_eq!(transformed.to_string(), expected.to_string());
4595    }
4596
4597    #[test]
4598    fn test_rewrite_expr_to_prunable_error() {
4599        // cast string value to numeric value
4600        // this cast is not supported
4601        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4602        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4603        let left_input = cast(col("a"), DataType::Int64);
4604        let left_input = logical2physical(&left_input, &schema);
4605        let right_input = lit(ScalarValue::Int64(Some(12)));
4606        let right_input = logical2physical(&right_input, &schema);
4607        let result = rewrite_expr_to_prunable(
4608            &left_input,
4609            Operator::Gt,
4610            &right_input,
4611            df_schema.clone(),
4612        );
4613        assert!(result.is_err());
4614
4615        // other expr
4616        let left_input = is_null(col("a"));
4617        let left_input = logical2physical(&left_input, &schema);
4618        let right_input = lit(ScalarValue::Int64(Some(12)));
4619        let right_input = logical2physical(&right_input, &schema);
4620        let result =
4621            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4622        assert!(result.is_err());
4623        // TODO: add other negative test for other case and op
4624    }
4625
4626    #[test]
4627    fn prune_with_contained_one_column() {
4628        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4629
4630        // Model having information like a bloom filter for s1
4631        let statistics = TestStatistics::new()
4632            .with_contained(
4633                "s1",
4634                [ScalarValue::from("foo")],
4635                [
4636                    // container 0 known to only contain "foo"",
4637                    Some(true),
4638                    // container 1 known to not contain "foo"
4639                    Some(false),
4640                    // container 2 unknown about "foo"
4641                    None,
4642                    // container 3 known to only contain "foo"
4643                    Some(true),
4644                    // container 4 known to not contain "foo"
4645                    Some(false),
4646                    // container 5 unknown about "foo"
4647                    None,
4648                    // container 6 known to only contain "foo"
4649                    Some(true),
4650                    // container 7 known to not contain "foo"
4651                    Some(false),
4652                    // container 8 unknown about "foo"
4653                    None,
4654                ],
4655            )
4656            .with_contained(
4657                "s1",
4658                [ScalarValue::from("bar")],
4659                [
4660                    // containers 0,1,2 known to only contain "bar"
4661                    Some(true),
4662                    Some(true),
4663                    Some(true),
4664                    // container 3,4,5 known to not contain "bar"
4665                    Some(false),
4666                    Some(false),
4667                    Some(false),
4668                    // container 6,7,8 unknown about "bar"
4669                    None,
4670                    None,
4671                    None,
4672                ],
4673            )
4674            .with_contained(
4675                // the way the tests are setup, this data is
4676                // consulted if the "foo" and "bar" are being checked at the same time
4677                "s1",
4678                [ScalarValue::from("foo"), ScalarValue::from("bar")],
4679                [
4680                    // container 0,1,2 unknown about ("foo, "bar")
4681                    None,
4682                    None,
4683                    None,
4684                    // container 3,4,5 known to contain only either "foo" and "bar"
4685                    Some(true),
4686                    Some(true),
4687                    Some(true),
4688                    // container 6,7,8  known to contain  neither "foo" and "bar"
4689                    Some(false),
4690                    Some(false),
4691                    Some(false),
4692                ],
4693            );
4694
4695        // s1 = 'foo'
4696        prune_with_expr(
4697            col("s1").eq(lit("foo")),
4698            &schema,
4699            &statistics,
4700            // rule out containers ('false) where we know foo is not present
4701            &[true, false, true, true, false, true, true, false, true],
4702        );
4703
4704        // s1 = 'bar'
4705        prune_with_expr(
4706            col("s1").eq(lit("bar")),
4707            &schema,
4708            &statistics,
4709            // rule out containers where we know bar is not present
4710            &[true, true, true, false, false, false, true, true, true],
4711        );
4712
4713        // s1 = 'baz' (unknown value)
4714        prune_with_expr(
4715            col("s1").eq(lit("baz")),
4716            &schema,
4717            &statistics,
4718            // can't rule out anything
4719            &[true, true, true, true, true, true, true, true, true],
4720        );
4721
4722        // s1 = 'foo' AND s1 = 'bar'
4723        prune_with_expr(
4724            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4725            &schema,
4726            &statistics,
4727            // logically this predicate can't possibly be true (the column can't
4728            // take on both values) but we could rule it out if the stats tell
4729            // us that both values are not present
4730            &[true, true, true, true, true, true, true, true, true],
4731        );
4732
4733        // s1 = 'foo' OR s1 = 'bar'
4734        prune_with_expr(
4735            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4736            &schema,
4737            &statistics,
4738            // can rule out containers that we know contain neither foo nor bar
4739            &[true, true, true, true, true, true, false, false, false],
4740        );
4741
4742        // s1 = 'foo' OR s1 = 'baz'
4743        prune_with_expr(
4744            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4745            &schema,
4746            &statistics,
4747            // can't rule out anything container
4748            &[true, true, true, true, true, true, true, true, true],
4749        );
4750
4751        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
4752        prune_with_expr(
4753            col("s1")
4754                .eq(lit("foo"))
4755                .or(col("s1").eq(lit("bar")))
4756                .or(col("s1").eq(lit("baz"))),
4757            &schema,
4758            &statistics,
4759            // can rule out any containers based on knowledge of s1 and `foo`,
4760            // `bar` and (`foo`, `bar`)
4761            &[true, true, true, true, true, true, true, true, true],
4762        );
4763
4764        // s1 != foo
4765        prune_with_expr(
4766            col("s1").not_eq(lit("foo")),
4767            &schema,
4768            &statistics,
4769            // rule out containers we know for sure only contain foo
4770            &[false, true, true, false, true, true, false, true, true],
4771        );
4772
4773        // s1 != bar
4774        prune_with_expr(
4775            col("s1").not_eq(lit("bar")),
4776            &schema,
4777            &statistics,
4778            // rule out when we know for sure s1 has the value bar
4779            &[false, false, false, true, true, true, true, true, true],
4780        );
4781
4782        // s1 != foo AND s1 != bar
4783        prune_with_expr(
4784            col("s1")
4785                .not_eq(lit("foo"))
4786                .and(col("s1").not_eq(lit("bar"))),
4787            &schema,
4788            &statistics,
4789            // can rule out any container where we know s1 does not have either 'foo' or 'bar'
4790            &[true, true, true, false, false, false, true, true, true],
4791        );
4792
4793        // s1 != foo AND s1 != bar AND s1 != baz
4794        prune_with_expr(
4795            col("s1")
4796                .not_eq(lit("foo"))
4797                .and(col("s1").not_eq(lit("bar")))
4798                .and(col("s1").not_eq(lit("baz"))),
4799            &schema,
4800            &statistics,
4801            // can't rule out any container based on  knowledge of s1,s2
4802            &[true, true, true, true, true, true, true, true, true],
4803        );
4804
4805        // s1 != foo OR s1 != bar
4806        prune_with_expr(
4807            col("s1")
4808                .not_eq(lit("foo"))
4809                .or(col("s1").not_eq(lit("bar"))),
4810            &schema,
4811            &statistics,
4812            // cant' rule out anything based on contains information
4813            &[true, true, true, true, true, true, true, true, true],
4814        );
4815
4816        // s1 != foo OR s1 != bar OR s1 != baz
4817        prune_with_expr(
4818            col("s1")
4819                .not_eq(lit("foo"))
4820                .or(col("s1").not_eq(lit("bar")))
4821                .or(col("s1").not_eq(lit("baz"))),
4822            &schema,
4823            &statistics,
4824            // cant' rule out anything based on contains information
4825            &[true, true, true, true, true, true, true, true, true],
4826        );
4827    }
4828
4829    #[test]
4830    fn prune_with_contained_two_columns() {
4831        let schema = Arc::new(Schema::new(vec![
4832            Field::new("s1", DataType::Utf8, true),
4833            Field::new("s2", DataType::Utf8, true),
4834        ]));
4835
4836        // Model having information like bloom filters for s1 and s2
4837        let statistics = TestStatistics::new()
4838            .with_contained(
4839                "s1",
4840                [ScalarValue::from("foo")],
4841                [
4842                    // container 0, s1 known to only contain "foo"",
4843                    Some(true),
4844                    // container 1, s1 known to not contain "foo"
4845                    Some(false),
4846                    // container 2, s1 unknown about "foo"
4847                    None,
4848                    // container 3, s1 known to only contain "foo"
4849                    Some(true),
4850                    // container 4, s1 known to not contain "foo"
4851                    Some(false),
4852                    // container 5, s1 unknown about "foo"
4853                    None,
4854                    // container 6, s1 known to only contain "foo"
4855                    Some(true),
4856                    // container 7, s1 known to not contain "foo"
4857                    Some(false),
4858                    // container 8, s1 unknown about "foo"
4859                    None,
4860                ],
4861            )
4862            .with_contained(
4863                "s2", // for column s2
4864                [ScalarValue::from("bar")],
4865                [
4866                    // containers 0,1,2 s2 known to only contain "bar"
4867                    Some(true),
4868                    Some(true),
4869                    Some(true),
4870                    // container 3,4,5 s2 known to not contain "bar"
4871                    Some(false),
4872                    Some(false),
4873                    Some(false),
4874                    // container 6,7,8 s2 unknown about "bar"
4875                    None,
4876                    None,
4877                    None,
4878                ],
4879            );
4880
4881        // s1 = 'foo'
4882        prune_with_expr(
4883            col("s1").eq(lit("foo")),
4884            &schema,
4885            &statistics,
4886            // rule out containers where we know s1 is not present
4887            &[true, false, true, true, false, true, true, false, true],
4888        );
4889
4890        // s1 = 'foo' OR s2 = 'bar'
4891        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4892        prune_with_expr(
4893            expr,
4894            &schema,
4895            &statistics,
4896            //  can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
4897            &[true, true, true, true, true, true, true, true, true],
4898        );
4899
4900        // s1 = 'foo' AND s2 != 'bar'
4901        prune_with_expr(
4902            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4903            &schema,
4904            &statistics,
4905            // can only rule out container where we know either:
4906            // 1. s1 doesn't have the value 'foo` or
4907            // 2. s2 has only the value of 'bar'
4908            &[false, false, false, true, false, true, true, false, true],
4909        );
4910
4911        // s1 != 'foo' AND s2 != 'bar'
4912        prune_with_expr(
4913            col("s1")
4914                .not_eq(lit("foo"))
4915                .and(col("s2").not_eq(lit("bar"))),
4916            &schema,
4917            &statistics,
4918            // Can  rule out any container where we know either
4919            // 1. s1 has only the value 'foo'
4920            // 2. s2 has only the value 'bar'
4921            &[false, false, false, false, true, true, false, true, true],
4922        );
4923
4924        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
4925        prune_with_expr(
4926            col("s1")
4927                .not_eq(lit("foo"))
4928                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4929            &schema,
4930            &statistics,
4931            // Can rule out any container where we know s1 has only the value
4932            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
4933            &[false, true, true, false, true, true, false, true, true],
4934        );
4935
4936        // s1 like '%foo%bar%'
4937        prune_with_expr(
4938            col("s1").like(lit("foo%bar%")),
4939            &schema,
4940            &statistics,
4941            // cant rule out anything with information we know
4942            &[true, true, true, true, true, true, true, true, true],
4943        );
4944
4945        // s1 like '%foo%bar%' AND s2 = 'bar'
4946        prune_with_expr(
4947            col("s1")
4948                .like(lit("foo%bar%"))
4949                .and(col("s2").eq(lit("bar"))),
4950            &schema,
4951            &statistics,
4952            // can rule out any container where we know s2 does not have the value 'bar'
4953            &[true, true, true, false, false, false, true, true, true],
4954        );
4955
4956        // s1 like '%foo%bar%' OR s2 = 'bar'
4957        prune_with_expr(
4958            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
4959            &schema,
4960            &statistics,
4961            // can't rule out anything (we would have to prove that both the
4962            // like and the equality must be false)
4963            &[true, true, true, true, true, true, true, true, true],
4964        );
4965    }
4966
4967    #[test]
4968    fn prune_with_range_and_contained() {
4969        // Setup mimics range information for i, a bloom filter for s
4970        let schema = Arc::new(Schema::new(vec![
4971            Field::new("i", DataType::Int32, true),
4972            Field::new("s", DataType::Utf8, true),
4973        ]));
4974
4975        let statistics = TestStatistics::new()
4976            .with(
4977                "i",
4978                ContainerStats::new_i32(
4979                    // Container 0, 3, 6: [-5 to 5]
4980                    // Container 1, 4, 7: [10 to 20]
4981                    // Container 2, 5, 9: unknown
4982                    vec![
4983                        Some(-5),
4984                        Some(10),
4985                        None,
4986                        Some(-5),
4987                        Some(10),
4988                        None,
4989                        Some(-5),
4990                        Some(10),
4991                        None,
4992                    ], // min
4993                    vec![
4994                        Some(5),
4995                        Some(20),
4996                        None,
4997                        Some(5),
4998                        Some(20),
4999                        None,
5000                        Some(5),
5001                        Some(20),
5002                        None,
5003                    ], // max
5004                ),
5005            )
5006            // Add contained  information about the s and "foo"
5007            .with_contained(
5008                "s",
5009                [ScalarValue::from("foo")],
5010                [
5011                    // container 0,1,2 known to only contain "foo"
5012                    Some(true),
5013                    Some(true),
5014                    Some(true),
5015                    // container 3,4,5 known to not contain "foo"
5016                    Some(false),
5017                    Some(false),
5018                    Some(false),
5019                    // container 6,7,8 unknown about "foo"
5020                    None,
5021                    None,
5022                    None,
5023                ],
5024            );
5025
5026        // i = 0 and s = 'foo'
5027        prune_with_expr(
5028            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
5029            &schema,
5030            &statistics,
5031            // Can rule out container where we know that either:
5032            // 1. 0 is outside the min/max range of i
5033            // 1. s does not contain foo
5034            // (range is false, and contained  is false)
5035            &[true, false, true, false, false, false, true, false, true],
5036        );
5037
5038        // i = 0 and s != 'foo'
5039        prune_with_expr(
5040            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
5041            &schema,
5042            &statistics,
5043            // Can rule out containers where either:
5044            // 1. 0 is outside the min/max range of i
5045            // 2. s only contains foo
5046            &[false, false, false, true, false, true, true, false, true],
5047        );
5048
5049        // i = 0 OR s = 'foo'
5050        prune_with_expr(
5051            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
5052            &schema,
5053            &statistics,
5054            // in theory could rule out containers if we had min/max values for
5055            // s as well. But in this case we don't so we can't rule out anything
5056            &[true, true, true, true, true, true, true, true, true],
5057        );
5058    }
5059
5060    /// prunes the specified expr with the specified schema and statistics, and
5061    /// ensures it returns expected.
5062    ///
5063    /// `expected` is a vector of bools, where true means the row group should
5064    /// be kept, and false means it should be pruned.
5065    ///
5066    // TODO refactor other tests to use this to reduce boiler plate
5067    fn prune_with_expr(
5068        expr: Expr,
5069        schema: &SchemaRef,
5070        statistics: &TestStatistics,
5071        expected: &[bool],
5072    ) {
5073        println!("Pruning with expr: {expr}");
5074        let expr = logical2physical(&expr, schema);
5075        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
5076        let result = p.prune(statistics).unwrap();
5077        assert_eq!(result, expected);
5078    }
5079
5080    fn test_build_predicate_expression(
5081        expr: &Expr,
5082        schema: &Schema,
5083        required_columns: &mut RequiredColumns,
5084    ) -> Arc<dyn PhysicalExpr> {
5085        let expr = logical2physical(expr, schema);
5086        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
5087        build_predicate_expression(&expr, schema, required_columns, &unhandled_hook)
5088    }
5089
5090    #[test]
5091    fn test_build_predicate_expression_with_false() {
5092        let expr = lit(ScalarValue::Boolean(Some(false)));
5093        let schema = Schema::empty();
5094        let res =
5095            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5096        let expected = logical2physical(&expr, &schema);
5097        assert_eq!(&res, &expected);
5098    }
5099
5100    #[test]
5101    fn test_build_predicate_expression_with_and_false() {
5102        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5103        let expr = and(
5104            col("c1").eq(lit("a")),
5105            lit(ScalarValue::Boolean(Some(false))),
5106        );
5107        let res =
5108            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
5109        let expected = logical2physical(&lit(ScalarValue::Boolean(Some(false))), &schema);
5110        assert_eq!(&res, &expected);
5111    }
5112
5113    #[test]
5114    fn test_build_predicate_expression_with_or_false() {
5115        let schema = Schema::new(vec![Field::new("c1", DataType::Utf8View, false)]);
5116        let left_expr = col("c1").eq(lit("a"));
5117        let right_expr = lit(ScalarValue::Boolean(Some(false)));
5118        let res = test_build_predicate_expression(
5119            &or(left_expr.clone(), right_expr.clone()),
5120            &schema,
5121            &mut RequiredColumns::new(),
5122        );
5123        let expected =
5124            "c1_null_count@2 != row_count@3 AND c1_min@0 <= a AND a <= c1_max@1";
5125        assert_eq!(res.to_string(), expected);
5126    }
5127}