datafusion_physical_optimizer/
pruning.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27    array::{new_null_array, ArrayRef, BooleanArray},
28    datatypes::{DataType, Field, Schema, SchemaRef},
29    record_batch::{RecordBatch, RecordBatchOptions},
30};
31use log::trace;
32
33use datafusion_common::error::{DataFusionError, Result};
34use datafusion_common::tree_node::TransformedResult;
35use datafusion_common::{
36    internal_err, plan_datafusion_err, plan_err,
37    tree_node::{Transformed, TreeNode},
38    ScalarValue,
39};
40use datafusion_common::{Column, DFSchema};
41use datafusion_expr_common::operator::Operator;
42use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
43use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
44use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
45
46/// A source of runtime statistical information to [`PruningPredicate`]s.
47///
48/// # Supported Information
49///
50/// 1. Minimum and maximum values for columns
51///
52/// 2. Null counts and row counts for columns
53///
54/// 3. Whether the values in a column are contained in a set of literals
55///
56/// # Vectorized Interface
57///
58/// Information for containers / files are returned as Arrow [`ArrayRef`], so
59/// the evaluation happens once on a single `RecordBatch`, which amortizes the
60/// overhead of evaluating the predicate. This is important when pruning 1000s
61/// of containers which often happens in analytic systems that have 1000s of
62/// potential files to consider.
63///
64/// For example, for the following three files with a single column `a`:
65/// ```text
66/// file1: column a: min=5, max=10
67/// file2: column a: No stats
68/// file2: column a: min=20, max=30
69/// ```
70///
71/// PruningStatistics would return:
72///
73/// ```text
74/// min_values("a") -> Some([5, Null, 20])
75/// max_values("a") -> Some([10, Null, 30])
76/// min_values("X") -> None
77/// ```
78pub trait PruningStatistics {
79    /// Return the minimum values for the named column, if known.
80    ///
81    /// If the minimum value for a particular container is not known, the
82    /// returned array should have `null` in that row. If the minimum value is
83    /// not known for any row, return `None`.
84    ///
85    /// Note: the returned array must contain [`Self::num_containers`] rows
86    fn min_values(&self, column: &Column) -> Option<ArrayRef>;
87
88    /// Return the maximum values for the named column, if known.
89    ///
90    /// See [`Self::min_values`] for when to return `None` and null values.
91    ///
92    /// Note: the returned array must contain [`Self::num_containers`] rows
93    fn max_values(&self, column: &Column) -> Option<ArrayRef>;
94
95    /// Return the number of containers (e.g. Row Groups) being pruned with
96    /// these statistics.
97    ///
98    /// This value corresponds to the size of the [`ArrayRef`] returned by
99    /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
100    /// and [`Self::row_counts`].
101    fn num_containers(&self) -> usize;
102
103    /// Return the number of null values for the named column as an
104    /// [`UInt64Array`]
105    ///
106    /// See [`Self::min_values`] for when to return `None` and null values.
107    ///
108    /// Note: the returned array must contain [`Self::num_containers`] rows
109    ///
110    /// [`UInt64Array`]: arrow::array::UInt64Array
111    fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
112
113    /// Return the number of rows for the named column in each container
114    /// as an [`UInt64Array`].
115    ///
116    /// See [`Self::min_values`] for when to return `None` and null values.
117    ///
118    /// Note: the returned array must contain [`Self::num_containers`] rows
119    ///
120    /// [`UInt64Array`]: arrow::array::UInt64Array
121    fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
122
123    /// Returns [`BooleanArray`] where each row represents information known
124    /// about specific literal `values` in a column.
125    ///
126    /// For example, Parquet Bloom Filters implement this API to communicate
127    /// that `values` are known not to be present in a Row Group.
128    ///
129    /// The returned array has one row for each container, with the following
130    /// meanings:
131    /// * `true` if the values in `column`  ONLY contain values from `values`
132    /// * `false` if the values in `column` are NOT ANY of `values`
133    /// * `null` if the neither of the above holds or is unknown.
134    ///
135    /// If these statistics can not determine column membership for any
136    /// container, return `None` (the default).
137    ///
138    /// Note: the returned array must contain [`Self::num_containers`] rows
139    fn contained(
140        &self,
141        column: &Column,
142        values: &HashSet<ScalarValue>,
143    ) -> Option<BooleanArray>;
144}
145
146/// Used to prove that arbitrary predicates (boolean expression) can not
147/// possibly evaluate to `true` given information about a column provided by
148/// [`PruningStatistics`].
149///
150/// # Introduction
151///
152/// `PruningPredicate` analyzes filter expressions using statistics such as
153/// min/max values and null counts, attempting to prove a "container" (e.g.
154/// Parquet Row Group) can be skipped without reading the actual data,
155/// potentially leading to significant performance improvements.
156///
157/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
158/// on the min/max values found in the Parquet metadata. If the
159/// `PruningPredicate` can prove that the filter can never evaluate to `true`
160/// for any row in the Row Group, the entire Row Group is skipped during query
161/// execution.
162///
163/// The `PruningPredicate` API is general, and can be used for pruning other
164/// types of containers (e.g. files) based on statistics that may be known from
165/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
166/// subtle topic.  See the Background and Implementation section for details.
167///
168/// `PruningPredicate` supports:
169///
170/// 1. Arbitrary expressions (including user defined functions)
171///
172/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
173///    so it is suitable for pruning 1000s of containers.
174///
175/// 3. Any source of information that implements the [`PruningStatistics`] trait
176///    (not just Parquet metadata).
177///
178/// # Example
179///
180/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
181/// example of how to use `PruningPredicate` to prune files based on min/max
182/// values.
183///
184/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/pruning.rs
185///
186/// Given an expression like `x = 5` and statistics for 3 containers (Row
187/// Groups, files, etc) `A`, `B`, and `C`:
188///
189/// ```text
190///   A: {x_min = 0, x_max = 4}
191///   B: {x_min = 2, x_max = 10}
192///   C: {x_min = 5, x_max = 8}
193/// ```
194///
195/// `PruningPredicate` will conclude that the rows in container `A` can never
196/// be true (as the maximum value is only `4`), so it can be pruned:
197///
198/// ```text
199/// A: false (no rows could possibly match x = 5)
200/// B: true  (rows might match x = 5)
201/// C: true  (rows might match x = 5)
202/// ```
203///
204/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
205///
206/// # Background
207///
208/// ## Boolean Tri-state logic
209///
210/// To understand the details of the rest of this documentation, it is important
211/// to understand how the tri-state boolean logic in SQL works. As this is
212/// somewhat esoteric, we review it here.
213///
214/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
215/// uncertainty propagates through expressions. SQL `NULL` behaves very
216/// differently than the `NULL` in most other languages where it is a special,
217/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
218/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
219/// first encountered as they behave differently than most programmers may
220/// expect.
221///
222/// In most other programming languages,
223/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
224/// * `a == NULL` evaluates to `false` if `a` has any other value
225///
226/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
227/// `false`):
228///
229/// Expression    | Result
230/// ------------- | ---------
231/// `1 = NULL`    | `NULL`
232/// `NULL = NULL` | `NULL`
233///
234/// Also important is how `AND` and `OR` works with tri-state boolean logic as
235/// (perhaps counterintuitively) the result is **not** always NULL. While
236/// consistent with the notion of `NULL` representing “unknown”, this is again,
237/// often deeply confusing 🤯 when first encountered.
238///
239/// Expression       | Result    | Intuition
240/// ---------------  | --------- | -----------
241/// `NULL AND true`  |   `NULL`  | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
242/// `NULL AND false` |  `false`  | If the `NULL` was either `true` or `false` the overall expression is still `false`
243/// `NULL AND NULL`  | `NULL`    |
244///
245/// Expression      | Result    | Intuition
246/// --------------- | --------- | ----------
247/// `NULL OR true`  | `true`    |  If the `NULL` was either `true` or `false` the overall expression is still `true`
248/// `NULL OR false` | `NULL`    |  The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
249/// `NULL OR NULL`  |  `NULL`   |
250///
251/// ## SQL Filter Semantics
252///
253/// The SQL `WHERE` clause has a boolean expression, often called a filter or
254/// predicate. The semantics of this predicate are that the query evaluates the
255/// predicate for each row in the input tables and:
256///
257/// * Rows that evaluate to `true` are returned in the query results
258///
259/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
260///
261/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
262///   Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
263///   in the rewritten predicate described below.*
264///
265/// # `PruningPredicate` Implementation
266///
267/// Armed with the information in the Background section, we can now understand
268/// how the `PruningPredicate` logic works.
269///
270/// ## Interface
271///
272/// **Inputs**
273/// 1. An input schema describing what columns exist
274///
275/// 2. A predicate (expression that evaluates to a boolean)
276///
277/// 3. [`PruningStatistics`] that provides information about columns in that
278///    schema, for multiple “containers”. For each column in each container, it
279///    provides optional information on contained values, min_values, max_values,
280///    null_counts counts, and row_counts counts.
281///
282/// **Outputs**:
283/// A (non null) boolean value for each container:
284/// * `true`: There MAY be rows that match the predicate
285///
286/// * `false`: There are no rows that could possibly match the predicate (the
287///   predicate can never possibly be true). The container can be pruned (skipped)
288///   entirely.
289///
290/// While `PruningPredicate` will never return a `NULL` value, the
291/// rewritten predicate (as returned by `build_predicate_expression` and used internally
292/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
293/// or null / row counts are not known.
294///
295/// In order to be correct, `PruningPredicate` must return false
296/// **only** if it can determine that for all rows in the container, the
297/// predicate could never evaluate to `true` (always evaluates to either `NULL`
298/// or `false`).
299///
300/// ## Contains Analysis and Min/Max Rewrite
301///
302/// `PruningPredicate` works by first analyzing the predicate to see what
303/// [`LiteralGuarantee`] must hold for the predicate to be true.
304///
305/// Then, the `PruningPredicate` rewrites the original predicate into an
306/// expression that references the min/max values of each column in the original
307/// predicate.
308///
309/// When the min/max values are actually substituted in to this expression and
310/// evaluated, the result means
311///
312/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
313///
314/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
315///           Note that rewritten predicate can evaluate to NULL when some of
316///           the min/max values are not known. *Note that this is different than
317///           the SQL filter semantics where `NULL` means the row is filtered
318///           out.*
319///
320/// * `false`: there are no rows that could possibly match the predicate,
321///            **PRUNES** the container
322///
323/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
324/// `x_row_count` represent the minimum and maximum values, the null count of
325/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
326/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
327/// is known to be all `NULL`s. Note this is different from knowing nothing about
328/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
329/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
330///
331/// Here are some examples of the rewritten predicates:
332///
333/// Original Predicate | Rewritten Predicate
334/// ------------------ | --------------------
335/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
336/// `x < 5` | `x_null_count != x_row_count THEN false (x_max < 5)`
337/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
338/// `x IS NULL`  | `x_null_count > 0`
339/// `x IS NOT NULL`  | `x_null_count != row_count`
340/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
341///
342/// ## Predicate Evaluation
343/// The PruningPredicate works in two passes
344///
345/// **First pass**:  For each `LiteralGuarantee` calls
346/// [`PruningStatistics::contained`] and rules out containers where the
347/// LiteralGuarantees are not satisfied
348///
349/// **Second Pass**: Evaluates the rewritten expression using the
350/// min/max/null_counts/row_counts values for each column for each container. For any
351/// container that this expression evaluates to `false`, it rules out those
352/// containers.
353///
354///
355/// ### Example 1
356///
357/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
358///
359/// ```sql
360/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
361/// AND
362/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
363/// ```
364///
365/// If we know that for a given container, `x` is between `1 and 100` and we know that
366/// `y` is between `4` and `7`, we know nothing about the null count and row count of
367/// `x` and `y`, the input statistics might look like:
368///
369/// Column   | Value
370/// -------- | -----
371/// `x_min`  | `1`
372/// `x_max`  | `100`
373/// `x_null_count` | `null`
374/// `x_row_count`  | `null`
375/// `y_min`  | `4`
376/// `y_max`  | `7`
377/// `y_null_count` | `null`
378/// `y_row_count`  | `null`
379///
380/// When these statistics values are substituted in to the rewritten predicate and
381/// simplified, the result is `false`:
382///
383/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
384/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
385/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
386/// * evaluating the clauses further we get:
387/// * `null and true and null and false`
388/// * `null and false`
389/// * `false`
390///
391/// Returning `false` means the container can be pruned, which matches the
392/// intuition that  `x = 5 AND y = 10` can’t be true for any row if all values of `y`
393/// are `7` or less.
394///
395/// Note that if we had ended up with `null AND true AND null AND true` the result
396/// would have been `null`.
397/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
398///
399/// If, for some other container, we knew `y` was between the values `4` and
400/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
401/// left as an exercise to the reader -- are you still here?), and the container
402/// **could not** be pruned. The intuition is that there may be rows where the
403/// predicate *might* evaluate to `true`, and the only way to find out is to do
404/// more analysis, for example by actually reading the data and evaluating the
405/// predicate row by row.
406///
407/// ### Example 2
408///
409/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
410/// look like the same as example 1:
411///
412/// ```sql
413/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
414/// AND
415/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
416/// ```
417///
418/// If we know that for another given container, `x_min` is NULL and `x_max` is
419/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
420///  is `100`; we know that `y` is between `4` and `7`, but we know nothing about
421/// the null count and row count of `y`. The input statistics might look like:
422///
423/// Column   | Value
424/// -------- | -----
425/// `x_min`  | `null`
426/// `x_max`  | `null`
427/// `x_null_count` | `100`
428/// `x_row_count`  | `100`
429/// `y_min`  | `4`
430/// `y_max`  | `7`
431/// `y_null_count` | `null`
432/// `y_row_count`  | `null`
433///
434/// When these statistics values are substituted in to the rewritten predicate and
435/// simplified, the result is `false`:
436///
437/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
438/// * `false AND null AND null AND false`
439/// * `false AND false`
440/// * `false`
441///
442/// Returning `false` means the container can be pruned, which matches the
443/// intuition that  `x = 5 AND y = 10` can’t be true because all values in `x`
444/// are known to be NULL.
445///
446/// # Related Work
447///
448/// [`PruningPredicate`] implements the type of min/max pruning described in
449/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
450/// described by various research such as [small materialized aggregates], [zone
451/// maps], and [data skipping].
452///
453/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
454/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
455/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
456/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
457#[derive(Debug, Clone)]
458pub struct PruningPredicate {
459    /// The input schema against which the predicate will be evaluated
460    schema: SchemaRef,
461    /// A min/max pruning predicate (rewritten in terms of column min/max
462    /// values, which are supplied by statistics)
463    predicate_expr: Arc<dyn PhysicalExpr>,
464    /// Description of which statistics are required to evaluate `predicate_expr`
465    required_columns: RequiredColumns,
466    /// Original physical predicate from which this predicate expr is derived
467    /// (required for serialization)
468    orig_expr: Arc<dyn PhysicalExpr>,
469    /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
470    /// evaluate to `true`.
471    ///
472    /// See [`PruningPredicate::literal_guarantees`] for more details.
473    literal_guarantees: Vec<LiteralGuarantee>,
474}
475
476/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
477/// complex expressions or predicates that reference columns that are not in the
478/// schema.
479pub trait UnhandledPredicateHook {
480    /// Called when a predicate can not be rewritten in terms of statistics or
481    /// references a column that is not in the schema.
482    fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
483}
484
485/// The default handling for unhandled predicates is to return a constant `true`
486/// (meaning don't prune the container)
487#[derive(Debug, Clone)]
488struct ConstantUnhandledPredicateHook {
489    default: Arc<dyn PhysicalExpr>,
490}
491
492impl Default for ConstantUnhandledPredicateHook {
493    fn default() -> Self {
494        Self {
495            default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
496        }
497    }
498}
499
500impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
501    fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
502        Arc::clone(&self.default)
503    }
504}
505
506impl PruningPredicate {
507    /// Try to create a new instance of [`PruningPredicate`]
508    ///
509    /// This will translate the provided `expr` filter expression into
510    /// a *pruning predicate*.
511    ///
512    /// A pruning predicate is one that has been rewritten in terms of
513    /// the min and max values of column references and that evaluates
514    /// to FALSE if the filter predicate would evaluate FALSE *for
515    /// every row* whose values fell within the min / max ranges (aka
516    /// could be pruned).
517    ///
518    /// The pruning predicate evaluates to TRUE or NULL
519    /// if the filter predicate *might* evaluate to TRUE for at least
520    /// one row whose values fell within the min/max ranges (in other
521    /// words they might pass the predicate)
522    ///
523    /// For example, the filter expression `(column / 2) = 4` becomes
524    /// the pruning predicate
525    /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
526    ///
527    /// See the struct level documentation on [`PruningPredicate`] for more
528    /// details.
529    pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
530        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
531
532        // build predicate expression once
533        let mut required_columns = RequiredColumns::new();
534        let predicate_expr = build_predicate_expression(
535            &expr,
536            schema.as_ref(),
537            &mut required_columns,
538            &unhandled_hook,
539        );
540
541        let literal_guarantees = LiteralGuarantee::analyze(&expr);
542
543        Ok(Self {
544            schema,
545            predicate_expr,
546            required_columns,
547            orig_expr: expr,
548            literal_guarantees,
549        })
550    }
551
552    /// For each set of statistics, evaluates the pruning predicate
553    /// and returns a `bool` with the following meaning for a
554    /// all rows whose values match the statistics:
555    ///
556    /// `true`: There MAY be rows that match the predicate
557    ///
558    /// `false`: There are no rows that could possibly match the predicate
559    ///
560    /// Note: the predicate passed to `prune` should already be simplified as
561    /// much as possible (e.g. this pass doesn't handle some
562    /// expressions like `b = false`, but it does handle the
563    /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
564    ///
565    /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
566    pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
567        let mut builder = BoolVecBuilder::new(statistics.num_containers());
568
569        // Try to prove the predicate can't be true for the containers based on
570        // literal guarantees
571        for literal_guarantee in &self.literal_guarantees {
572            let LiteralGuarantee {
573                column,
574                guarantee,
575                literals,
576            } = literal_guarantee;
577            if let Some(results) = statistics.contained(column, literals) {
578                match guarantee {
579                    // `In` means the values in the column must be one of the
580                    // values in the set for the predicate to evaluate to true.
581                    // If `contained` returns false, that means the column is
582                    // not any of the values so we can prune the container
583                    Guarantee::In => builder.combine_array(&results),
584                    // `NotIn` means the values in the column must must not be
585                    // any of the values in the set for the predicate to
586                    // evaluate to true. If contained returns true, it means the
587                    // column is only in the set of values so we can prune the
588                    // container
589                    Guarantee::NotIn => {
590                        builder.combine_array(&arrow::compute::not(&results)?)
591                    }
592                }
593                // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
594                // can return early without evaluating the rest of predicates.
595                if builder.check_all_pruned() {
596                    return Ok(builder.build());
597                }
598            }
599        }
600
601        // Next, try to prove the predicate can't be true for the containers based
602        // on min/max values
603
604        // build a RecordBatch that contains the min/max values in the
605        // appropriate statistics columns for the min/max predicate
606        let statistics_batch =
607            build_statistics_record_batch(statistics, &self.required_columns)?;
608
609        // Evaluate the pruning predicate on that record batch and append any results to the builder
610        builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
611
612        Ok(builder.build())
613    }
614
615    /// Return a reference to the input schema
616    pub fn schema(&self) -> &SchemaRef {
617        &self.schema
618    }
619
620    /// Returns a reference to the physical expr used to construct this pruning predicate
621    pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
622        &self.orig_expr
623    }
624
625    /// Returns a reference to the predicate expr
626    pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
627        &self.predicate_expr
628    }
629
630    /// Returns a reference to the literal guarantees
631    ///
632    /// Note that **All** `LiteralGuarantee`s must be satisfied for the
633    /// expression to possibly be `true`. If any is not satisfied, the
634    /// expression is guaranteed to be `null` or `false`.
635    pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
636        &self.literal_guarantees
637    }
638
639    /// Returns true if this pruning predicate can not prune anything.
640    ///
641    /// This happens if the predicate is a literal `true`  and
642    /// literal_guarantees is empty.
643    ///
644    /// This can happen when a predicate is simplified to a constant `true`
645    pub fn always_true(&self) -> bool {
646        is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
647    }
648
649    // this is only used by `parquet` feature right now
650    #[allow(dead_code)]
651    pub fn required_columns(&self) -> &RequiredColumns {
652        &self.required_columns
653    }
654
655    /// Names of the columns that are known to be / not be in a set
656    /// of literals (constants). These are the columns the that may be passed to
657    /// [`PruningStatistics::contained`] during pruning.
658    ///
659    /// This is useful to avoid fetching statistics for columns that will not be
660    /// used in the predicate. For example, it can be used to avoid reading
661    /// unneeded bloom filters (a non trivial operation).
662    pub fn literal_columns(&self) -> Vec<String> {
663        let mut seen = HashSet::new();
664        self.literal_guarantees
665            .iter()
666            .map(|e| &e.column.name)
667            // avoid duplicates
668            .filter(|name| seen.insert(*name))
669            .map(|s| s.to_string())
670            .collect()
671    }
672}
673
674/// Builds the return `Vec` for [`PruningPredicate::prune`].
675#[derive(Debug)]
676struct BoolVecBuilder {
677    /// One element per container. Each element is
678    /// * `true`: if the container has row that may pass the predicate
679    /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
680    inner: Vec<bool>,
681}
682
683impl BoolVecBuilder {
684    /// Create a new `BoolVecBuilder` with `num_containers` elements
685    fn new(num_containers: usize) -> Self {
686        Self {
687            // assume by default all containers may pass the predicate
688            inner: vec![true; num_containers],
689        }
690    }
691
692    /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
693    /// predicate into the currently in progress array.
694    ///
695    /// Each `array` element is:
696    /// * `true`: container has row that may pass the predicate
697    /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
698    /// * `null`: container may or may not have rows that pass the predicate
699    fn combine_array(&mut self, array: &BooleanArray) {
700        assert_eq!(array.len(), self.inner.len());
701        for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
702            // `false` for this conjunct means we know for sure no rows could
703            // pass the predicate and thus we set the corresponding container
704            // location to false.
705            if let Some(false) = new {
706                *cur = false;
707            }
708        }
709    }
710
711    /// Combines the results in the [`ColumnarValue`] to the currently in
712    /// progress array, following the same rules as [`Self::combine_array`].
713    ///
714    /// # Panics
715    /// If `value` is not boolean
716    fn combine_value(&mut self, value: ColumnarValue) {
717        match value {
718            ColumnarValue::Array(array) => {
719                self.combine_array(array.as_boolean());
720            }
721            ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
722                // False means all containers can not pass the predicate
723                self.inner = vec![false; self.inner.len()];
724            }
725            _ => {
726                // Null or true means the rows in container may pass this
727                // conjunct so we can't prune any containers based on that
728            }
729        }
730    }
731
732    /// Convert this builder into a Vec of bools
733    fn build(self) -> Vec<bool> {
734        self.inner
735    }
736
737    /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
738    fn check_all_pruned(&self) -> bool {
739        self.inner.iter().all(|&x| !x)
740    }
741}
742
743fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
744    expr.as_any()
745        .downcast_ref::<phys_expr::Literal>()
746        .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
747        .unwrap_or_default()
748}
749
750/// Describes which columns statistics are necessary to evaluate a
751/// [`PruningPredicate`].
752///
753/// This structure permits reading and creating the minimum number statistics,
754/// which is important since statistics may be non trivial to read (e.g. large
755/// strings or when there are 1000s of columns).
756///
757/// Handles creating references to the min/max statistics
758/// for columns as well as recording which statistics are needed
759#[derive(Debug, Default, Clone)]
760pub struct RequiredColumns {
761    /// The statistics required to evaluate this predicate:
762    /// * The unqualified column in the input schema
763    /// * Statistics type (e.g. Min or Max or Null_Count)
764    /// * The field the statistics value should be placed in for
765    ///   pruning predicate evaluation (e.g. `min_value` or `max_value`)
766    columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
767}
768
769impl RequiredColumns {
770    fn new() -> Self {
771        Self::default()
772    }
773
774    /// Returns Some(column) if this is a single column predicate.
775    ///
776    /// Returns None if this is a multi-column predicate.
777    ///
778    /// Examples:
779    /// * `a > 5 OR a < 10` returns `Some(a)`
780    /// * `a > 5 OR b < 10` returns `None`
781    /// * `true` returns None
782    #[allow(dead_code)]
783    // this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
784    pub fn single_column(&self) -> Option<&phys_expr::Column> {
785        if self.columns.windows(2).all(|w| {
786            // check if all columns are the same (ignoring statistics and field)
787            let c1 = &w[0].0;
788            let c2 = &w[1].0;
789            c1 == c2
790        }) {
791            self.columns.first().map(|r| &r.0)
792        } else {
793            None
794        }
795    }
796
797    /// Returns an iterator over items in columns (see doc on
798    /// `self.columns` for details)
799    pub(crate) fn iter(
800        &self,
801    ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
802        self.columns.iter()
803    }
804
805    fn find_stat_column(
806        &self,
807        column: &phys_expr::Column,
808        statistics_type: StatisticsType,
809    ) -> Option<usize> {
810        match statistics_type {
811            StatisticsType::RowCount => {
812                // Use the first row count we find, if any
813                self.columns
814                    .iter()
815                    .enumerate()
816                    .find(|(_i, (_c, t, _f))| t == &statistics_type)
817                    .map(|(i, (_c, _t, _f))| i)
818            }
819            _ => self
820                .columns
821                .iter()
822                .enumerate()
823                .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
824                .map(|(i, (_c, _t, _f))| i),
825        }
826    }
827
828    /// Rewrites column_expr so that all appearances of column
829    /// are replaced with a reference to either the min or max
830    /// statistics column, while keeping track that a reference to the statistics
831    /// column is required
832    ///
833    /// for example, an expression like `col("foo") > 5`, when called
834    /// with Max would result in an expression like `col("foo_max") >
835    /// 5` with the appropriate entry noted in self.columns
836    fn stat_column_expr(
837        &mut self,
838        column: &phys_expr::Column,
839        column_expr: &Arc<dyn PhysicalExpr>,
840        field: &Field,
841        stat_type: StatisticsType,
842    ) -> Result<Arc<dyn PhysicalExpr>> {
843        let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
844            Some(idx) => (idx, false),
845            None => (self.columns.len(), true),
846        };
847
848        let column_name = column.name();
849        let stat_column_name = match stat_type {
850            StatisticsType::Min => format!("{column_name}_min"),
851            StatisticsType::Max => format!("{column_name}_max"),
852            StatisticsType::NullCount => format!("{column_name}_null_count"),
853            StatisticsType::RowCount => "row_count".to_string(),
854        };
855
856        let stat_column = phys_expr::Column::new(&stat_column_name, idx);
857
858        // only add statistics column if not previously added
859        if need_to_insert {
860            // may be null if statistics are not present
861            let nullable = true;
862            let stat_field =
863                Field::new(stat_column.name(), field.data_type().clone(), nullable);
864            self.columns.push((column.clone(), stat_type, stat_field));
865        }
866        rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
867    }
868
869    /// rewrite col --> col_min
870    fn min_column_expr(
871        &mut self,
872        column: &phys_expr::Column,
873        column_expr: &Arc<dyn PhysicalExpr>,
874        field: &Field,
875    ) -> Result<Arc<dyn PhysicalExpr>> {
876        self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
877    }
878
879    /// rewrite col --> col_max
880    fn max_column_expr(
881        &mut self,
882        column: &phys_expr::Column,
883        column_expr: &Arc<dyn PhysicalExpr>,
884        field: &Field,
885    ) -> Result<Arc<dyn PhysicalExpr>> {
886        self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
887    }
888
889    /// rewrite col --> col_null_count
890    fn null_count_column_expr(
891        &mut self,
892        column: &phys_expr::Column,
893        column_expr: &Arc<dyn PhysicalExpr>,
894        field: &Field,
895    ) -> Result<Arc<dyn PhysicalExpr>> {
896        self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
897    }
898
899    /// rewrite col --> col_row_count
900    fn row_count_column_expr(
901        &mut self,
902        column: &phys_expr::Column,
903        column_expr: &Arc<dyn PhysicalExpr>,
904        field: &Field,
905    ) -> Result<Arc<dyn PhysicalExpr>> {
906        self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
907    }
908}
909
910impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
911    fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
912        Self { columns }
913    }
914}
915
916/// Build a RecordBatch from a list of statistics, creating arrays,
917/// with one row for each PruningStatistics and columns specified in
918/// in the required_columns parameter.
919///
920/// For example, if the requested columns are
921/// ```text
922/// ("s1", Min, Field:s1_min)
923/// ("s2", Max, field:s2_max)
924///```
925///
926/// And the input statistics had
927/// ```text
928/// S1(Min: 5, Max: 10)
929/// S2(Min: 99, Max: 1000)
930/// S3(Min: 1, Max: 2)
931/// ```
932///
933/// Then this function would build a record batch with 2 columns and
934/// one row s1_min and s2_max as follows (s3 is not requested):
935///
936/// ```text
937/// s1_min | s2_max
938/// -------+--------
939///   5    | 1000
940/// ```
941fn build_statistics_record_batch<S: PruningStatistics>(
942    statistics: &S,
943    required_columns: &RequiredColumns,
944) -> Result<RecordBatch> {
945    let mut fields = Vec::<Field>::new();
946    let mut arrays = Vec::<ArrayRef>::new();
947    // For each needed statistics column:
948    for (column, statistics_type, stat_field) in required_columns.iter() {
949        let column = Column::from_name(column.name());
950        let data_type = stat_field.data_type();
951
952        let num_containers = statistics.num_containers();
953
954        let array = match statistics_type {
955            StatisticsType::Min => statistics.min_values(&column),
956            StatisticsType::Max => statistics.max_values(&column),
957            StatisticsType::NullCount => statistics.null_counts(&column),
958            StatisticsType::RowCount => statistics.row_counts(&column),
959        };
960        let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
961
962        if num_containers != array.len() {
963            return internal_err!(
964                "mismatched statistics length. Expected {}, got {}",
965                num_containers,
966                array.len()
967            );
968        }
969
970        // cast statistics array to required data type (e.g. parquet
971        // provides timestamp statistics as "Int64")
972        let array = arrow::compute::cast(&array, data_type)?;
973
974        fields.push(stat_field.clone());
975        arrays.push(array);
976    }
977
978    let schema = Arc::new(Schema::new(fields));
979    // provide the count in case there were no needed statistics
980    let mut options = RecordBatchOptions::default();
981    options.row_count = Some(statistics.num_containers());
982
983    trace!(
984        "Creating statistics batch for {:#?} with {:#?}",
985        required_columns,
986        arrays
987    );
988
989    RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
990        plan_datafusion_err!("Can not create statistics record batch: {err}")
991    })
992}
993
994struct PruningExpressionBuilder<'a> {
995    column: phys_expr::Column,
996    column_expr: Arc<dyn PhysicalExpr>,
997    op: Operator,
998    scalar_expr: Arc<dyn PhysicalExpr>,
999    field: &'a Field,
1000    required_columns: &'a mut RequiredColumns,
1001}
1002
1003impl<'a> PruningExpressionBuilder<'a> {
1004    fn try_new(
1005        left: &'a Arc<dyn PhysicalExpr>,
1006        right: &'a Arc<dyn PhysicalExpr>,
1007        op: Operator,
1008        schema: &'a Schema,
1009        required_columns: &'a mut RequiredColumns,
1010    ) -> Result<Self> {
1011        // find column name; input could be a more complicated expression
1012        let left_columns = collect_columns(left);
1013        let right_columns = collect_columns(right);
1014        let (column_expr, scalar_expr, columns, correct_operator) =
1015            match (left_columns.len(), right_columns.len()) {
1016                (1, 0) => (left, right, left_columns, op),
1017                (0, 1) => (right, left, right_columns, reverse_operator(op)?),
1018                _ => {
1019                    // if more than one column used in expression - not supported
1020                    return plan_err!(
1021                        "Multi-column expressions are not currently supported"
1022                    );
1023                }
1024            };
1025
1026        let df_schema = DFSchema::try_from(schema.clone())?;
1027        let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1028            column_expr,
1029            correct_operator,
1030            scalar_expr,
1031            df_schema,
1032        )?;
1033        let column = columns.iter().next().unwrap().clone();
1034        let field = match schema.column_with_name(column.name()) {
1035            Some((_, f)) => f,
1036            _ => {
1037                return plan_err!("Field not found in schema");
1038            }
1039        };
1040
1041        Ok(Self {
1042            column,
1043            column_expr,
1044            op: correct_operator,
1045            scalar_expr,
1046            field,
1047            required_columns,
1048        })
1049    }
1050
1051    fn op(&self) -> Operator {
1052        self.op
1053    }
1054
1055    fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1056        &self.scalar_expr
1057    }
1058
1059    fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1060        self.required_columns
1061            .min_column_expr(&self.column, &self.column_expr, self.field)
1062    }
1063
1064    fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1065        self.required_columns
1066            .max_column_expr(&self.column, &self.column_expr, self.field)
1067    }
1068
1069    /// This function is to simply retune the `null_count` physical expression no matter what the
1070    /// predicate expression is
1071    ///
1072    /// i.e., x > 5 => x_null_count,
1073    ///       cast(x as int) < 10 => x_null_count,
1074    ///       try_cast(x as float) < 10.0 => x_null_count
1075    fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1076        // Retune to [`phys_expr::Column`]
1077        let column_expr = Arc::new(self.column.clone()) as _;
1078
1079        // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1080        let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1081
1082        self.required_columns.null_count_column_expr(
1083            &self.column,
1084            &column_expr,
1085            null_count_field,
1086        )
1087    }
1088
1089    /// This function is to simply retune the `row_count` physical expression no matter what the
1090    /// predicate expression is
1091    ///
1092    /// i.e., x > 5 => x_row_count,
1093    ///       cast(x as int) < 10 => x_row_count,
1094    ///       try_cast(x as float) < 10.0 => x_row_count
1095    fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1096        // Retune to [`phys_expr::Column`]
1097        let column_expr = Arc::new(self.column.clone()) as _;
1098
1099        // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1100        let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1101
1102        self.required_columns.row_count_column_expr(
1103            &self.column,
1104            &column_expr,
1105            row_count_field,
1106        )
1107    }
1108}
1109
1110/// This function is designed to rewrite the column_expr to
1111/// ensure the column_expr is monotonically increasing.
1112///
1113/// For example,
1114/// 1. `col > 10`
1115/// 2. `-col > 10` should be rewritten to `col < -10`
1116/// 3. `!col = true` would be rewritten to `col = !true`
1117/// 4. `abs(a - 10) > 0` not supported
1118/// 5. `cast(can_prunable_expr) > 10`
1119/// 6. `try_cast(can_prunable_expr) > 10`
1120///
1121/// More rewrite rules are still in progress.
1122fn rewrite_expr_to_prunable(
1123    column_expr: &PhysicalExprRef,
1124    op: Operator,
1125    scalar_expr: &PhysicalExprRef,
1126    schema: DFSchema,
1127) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1128    if !is_compare_op(op) {
1129        return plan_err!("rewrite_expr_to_prunable only support compare expression");
1130    }
1131
1132    let column_expr_any = column_expr.as_any();
1133
1134    if column_expr_any
1135        .downcast_ref::<phys_expr::Column>()
1136        .is_some()
1137    {
1138        // `col op lit()`
1139        Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1140    } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1141        // `cast(col) op lit()`
1142        let arrow_schema: SchemaRef = schema.clone().into();
1143        let from_type = cast.expr().data_type(&arrow_schema)?;
1144        verify_support_type_for_prune(&from_type, cast.cast_type())?;
1145        let (left, op, right) =
1146            rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1147        let left = Arc::new(phys_expr::CastExpr::new(
1148            left,
1149            cast.cast_type().clone(),
1150            None,
1151        ));
1152        Ok((left, op, right))
1153    } else if let Some(try_cast) =
1154        column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1155    {
1156        // `try_cast(col) op lit()`
1157        let arrow_schema: SchemaRef = schema.clone().into();
1158        let from_type = try_cast.expr().data_type(&arrow_schema)?;
1159        verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1160        let (left, op, right) =
1161            rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1162        let left = Arc::new(phys_expr::TryCastExpr::new(
1163            left,
1164            try_cast.cast_type().clone(),
1165        ));
1166        Ok((left, op, right))
1167    } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1168        // `-col > lit()`  --> `col < -lit()`
1169        let (left, op, right) =
1170            rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1171        let right = Arc::new(phys_expr::NegativeExpr::new(right));
1172        Ok((left, reverse_operator(op)?, right))
1173    } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1174        // `!col = true` --> `col = !true`
1175        if op != Operator::Eq && op != Operator::NotEq {
1176            return plan_err!("Not with operator other than Eq / NotEq is not supported");
1177        }
1178        if not
1179            .arg()
1180            .as_any()
1181            .downcast_ref::<phys_expr::Column>()
1182            .is_some()
1183        {
1184            let left = Arc::clone(not.arg());
1185            let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1186            Ok((left, reverse_operator(op)?, right))
1187        } else {
1188            plan_err!("Not with complex expression {column_expr:?} is not supported")
1189        }
1190    } else {
1191        plan_err!("column expression {column_expr:?} is not supported")
1192    }
1193}
1194
1195fn is_compare_op(op: Operator) -> bool {
1196    matches!(
1197        op,
1198        Operator::Eq
1199            | Operator::NotEq
1200            | Operator::Lt
1201            | Operator::LtEq
1202            | Operator::Gt
1203            | Operator::GtEq
1204            | Operator::LikeMatch
1205            | Operator::NotLikeMatch
1206    )
1207}
1208
1209// The pruning logic is based on the comparing the min/max bounds.
1210// Must make sure the two type has order.
1211// For example, casts from string to numbers is not correct.
1212// Because the "13" is less than "3" with UTF8 comparison order.
1213fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1214    // TODO: support other data type for prunable cast or try cast
1215    if matches!(
1216        from_type,
1217        DataType::Int8
1218            | DataType::Int16
1219            | DataType::Int32
1220            | DataType::Int64
1221            | DataType::Decimal128(_, _)
1222    ) && matches!(
1223        to_type,
1224        DataType::Int8 | DataType::Int32 | DataType::Int64 | DataType::Decimal128(_, _)
1225    ) {
1226        Ok(())
1227    } else {
1228        plan_err!(
1229            "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1230        )
1231    }
1232}
1233
1234/// replaces a column with an old name with a new name in an expression
1235fn rewrite_column_expr(
1236    e: Arc<dyn PhysicalExpr>,
1237    column_old: &phys_expr::Column,
1238    column_new: &phys_expr::Column,
1239) -> Result<Arc<dyn PhysicalExpr>> {
1240    e.transform(|expr| {
1241        if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1242            if column == column_old {
1243                return Ok(Transformed::yes(Arc::new(column_new.clone())));
1244            }
1245        }
1246
1247        Ok(Transformed::no(expr))
1248    })
1249    .data()
1250}
1251
1252fn reverse_operator(op: Operator) -> Result<Operator> {
1253    op.swap().ok_or_else(|| {
1254        DataFusionError::Internal(format!(
1255            "Could not reverse operator {op} while building pruning predicate"
1256        ))
1257    })
1258}
1259
1260/// Given a column reference to `column`, returns a pruning
1261/// expression in terms of the min and max that will evaluate to true
1262/// if the column may contain values, and false if definitely does not
1263/// contain values
1264fn build_single_column_expr(
1265    column: &phys_expr::Column,
1266    schema: &Schema,
1267    required_columns: &mut RequiredColumns,
1268    is_not: bool, // if true, treat as !col
1269) -> Option<Arc<dyn PhysicalExpr>> {
1270    let field = schema.field_with_name(column.name()).ok()?;
1271
1272    if matches!(field.data_type(), &DataType::Boolean) {
1273        let col_ref = Arc::new(column.clone()) as _;
1274
1275        let min = required_columns
1276            .min_column_expr(column, &col_ref, field)
1277            .ok()?;
1278        let max = required_columns
1279            .max_column_expr(column, &col_ref, field)
1280            .ok()?;
1281
1282        // remember -- we want an expression that is:
1283        // TRUE: if there may be rows that match
1284        // FALSE: if there are no rows that match
1285        if is_not {
1286            // The only way we know a column couldn't match is if both the min and max are true
1287            // !(min && max)
1288            Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1289                phys_expr::BinaryExpr::new(min, Operator::And, max),
1290            ))))
1291        } else {
1292            // the only way we know a column couldn't match is if both the min and max are false
1293            // !(!min && !max) --> min || max
1294            Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1295        }
1296    } else {
1297        None
1298    }
1299}
1300
1301/// Given an expression reference to `expr`, if `expr` is a column expression,
1302/// returns a pruning expression in terms of IsNull that will evaluate to true
1303/// if the column may contain null, and false if definitely does not
1304/// contain null.
1305/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1306/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1307/// at least one NULL value.  In this case we can know that `IS NOT NULL` can not be true and
1308/// thus can prune the row group / value
1309fn build_is_null_column_expr(
1310    expr: &Arc<dyn PhysicalExpr>,
1311    schema: &Schema,
1312    required_columns: &mut RequiredColumns,
1313    with_not: bool,
1314) -> Option<Arc<dyn PhysicalExpr>> {
1315    if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1316        let field = schema.field_with_name(col.name()).ok()?;
1317
1318        let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1319        if with_not {
1320            if let Ok(row_count_expr) =
1321                required_columns.row_count_column_expr(col, expr, null_count_field)
1322            {
1323                required_columns
1324                    .null_count_column_expr(col, expr, null_count_field)
1325                    .map(|null_count_column_expr| {
1326                        // IsNotNull(column) => null_count != row_count
1327                        Arc::new(phys_expr::BinaryExpr::new(
1328                            null_count_column_expr,
1329                            Operator::NotEq,
1330                            row_count_expr,
1331                        )) as _
1332                    })
1333                    .ok()
1334            } else {
1335                None
1336            }
1337        } else {
1338            required_columns
1339                .null_count_column_expr(col, expr, null_count_field)
1340                .map(|null_count_column_expr| {
1341                    // IsNull(column) => null_count > 0
1342                    Arc::new(phys_expr::BinaryExpr::new(
1343                        null_count_column_expr,
1344                        Operator::Gt,
1345                        Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1346                    )) as _
1347                })
1348                .ok()
1349        }
1350    } else {
1351        None
1352    }
1353}
1354
1355/// The maximum number of entries in an `InList` that might be rewritten into
1356/// an OR chain
1357const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1358
1359/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1360/// for use as a [`PruningPredicate`].
1361pub struct PredicateRewriter {
1362    unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1363}
1364
1365impl Default for PredicateRewriter {
1366    fn default() -> Self {
1367        Self {
1368            unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1369        }
1370    }
1371}
1372
1373impl PredicateRewriter {
1374    /// Create a new `PredicateRewriter`
1375    pub fn new() -> Self {
1376        Self::default()
1377    }
1378
1379    /// Set the unhandled hook to be used when a predicate can not be rewritten
1380    pub fn with_unhandled_hook(
1381        self,
1382        unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1383    ) -> Self {
1384        Self { unhandled_hook }
1385    }
1386
1387    /// Translate logical filter expression into pruning predicate
1388    /// expression that will evaluate to FALSE if it can be determined no
1389    /// rows between the min/max values could pass the predicates.
1390    ///
1391    /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1392    ///
1393    /// Returns the pruning predicate as an [`PhysicalExpr`]
1394    ///
1395    /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1396    pub fn rewrite_predicate_to_statistics_predicate(
1397        &self,
1398        expr: &Arc<dyn PhysicalExpr>,
1399        schema: &Schema,
1400    ) -> Arc<dyn PhysicalExpr> {
1401        let mut required_columns = RequiredColumns::new();
1402        build_predicate_expression(
1403            expr,
1404            schema,
1405            &mut required_columns,
1406            &self.unhandled_hook,
1407        )
1408    }
1409}
1410
1411/// Translate logical filter expression into pruning predicate
1412/// expression that will evaluate to FALSE if it can be determined no
1413/// rows between the min/max values could pass the predicates.
1414///
1415/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1416///
1417/// Returns the pruning predicate as an [`PhysicalExpr`]
1418///
1419/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1420fn build_predicate_expression(
1421    expr: &Arc<dyn PhysicalExpr>,
1422    schema: &Schema,
1423    required_columns: &mut RequiredColumns,
1424    unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1425) -> Arc<dyn PhysicalExpr> {
1426    // predicate expression can only be a binary expression
1427    let expr_any = expr.as_any();
1428    if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1429        return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1430            .unwrap_or_else(|| unhandled_hook.handle(expr));
1431    }
1432    if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1433        return build_is_null_column_expr(
1434            is_not_null.arg(),
1435            schema,
1436            required_columns,
1437            true,
1438        )
1439        .unwrap_or_else(|| unhandled_hook.handle(expr));
1440    }
1441    if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1442        return build_single_column_expr(col, schema, required_columns, false)
1443            .unwrap_or_else(|| unhandled_hook.handle(expr));
1444    }
1445    if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1446        // match !col (don't do so recursively)
1447        if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1448            return build_single_column_expr(col, schema, required_columns, true)
1449                .unwrap_or_else(|| unhandled_hook.handle(expr));
1450        } else {
1451            return unhandled_hook.handle(expr);
1452        }
1453    }
1454    if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1455        if !in_list.list().is_empty()
1456            && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1457        {
1458            let eq_op = if in_list.negated() {
1459                Operator::NotEq
1460            } else {
1461                Operator::Eq
1462            };
1463            let re_op = if in_list.negated() {
1464                Operator::And
1465            } else {
1466                Operator::Or
1467            };
1468            let change_expr = in_list
1469                .list()
1470                .iter()
1471                .map(|e| {
1472                    Arc::new(phys_expr::BinaryExpr::new(
1473                        Arc::clone(in_list.expr()),
1474                        eq_op,
1475                        Arc::clone(e),
1476                    )) as _
1477                })
1478                .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1479                .unwrap();
1480            return build_predicate_expression(
1481                &change_expr,
1482                schema,
1483                required_columns,
1484                unhandled_hook,
1485            );
1486        } else {
1487            return unhandled_hook.handle(expr);
1488        }
1489    }
1490
1491    let (left, op, right) = {
1492        if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1493            (
1494                Arc::clone(bin_expr.left()),
1495                *bin_expr.op(),
1496                Arc::clone(bin_expr.right()),
1497            )
1498        } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1499            if like_expr.case_insensitive() {
1500                return unhandled_hook.handle(expr);
1501            }
1502            let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1503                (false, false) => Operator::LikeMatch,
1504                (true, false) => Operator::NotLikeMatch,
1505                (false, true) => Operator::ILikeMatch,
1506                (true, true) => Operator::NotILikeMatch,
1507            };
1508            (
1509                Arc::clone(like_expr.expr()),
1510                op,
1511                Arc::clone(like_expr.pattern()),
1512            )
1513        } else {
1514            return unhandled_hook.handle(expr);
1515        }
1516    };
1517
1518    if op == Operator::And || op == Operator::Or {
1519        let left_expr =
1520            build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1521        let right_expr =
1522            build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1523        // simplify boolean expression if applicable
1524        let expr = match (&left_expr, op, &right_expr) {
1525            (left, Operator::And, _) if is_always_true(left) => right_expr,
1526            (_, Operator::And, right) if is_always_true(right) => left_expr,
1527            (left, Operator::Or, right)
1528                if is_always_true(left) || is_always_true(right) =>
1529            {
1530                Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1531            }
1532            _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1533        };
1534        return expr;
1535    }
1536
1537    let expr_builder =
1538        PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1539    let mut expr_builder = match expr_builder {
1540        Ok(builder) => builder,
1541        // allow partial failure in predicate expression generation
1542        // this can still produce a useful predicate when multiple conditions are joined using AND
1543        Err(_) => return unhandled_hook.handle(expr),
1544    };
1545
1546    build_statistics_expr(&mut expr_builder)
1547        .unwrap_or_else(|_| unhandled_hook.handle(expr))
1548}
1549
1550fn build_statistics_expr(
1551    expr_builder: &mut PruningExpressionBuilder,
1552) -> Result<Arc<dyn PhysicalExpr>> {
1553    let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1554        Operator::NotEq => {
1555            // column != literal => (min, max) = literal =>
1556            // !(min != literal && max != literal) ==>
1557            // min != literal || literal != max
1558            let min_column_expr = expr_builder.min_column_expr()?;
1559            let max_column_expr = expr_builder.max_column_expr()?;
1560            Arc::new(phys_expr::BinaryExpr::new(
1561                Arc::new(phys_expr::BinaryExpr::new(
1562                    min_column_expr,
1563                    Operator::NotEq,
1564                    Arc::clone(expr_builder.scalar_expr()),
1565                )),
1566                Operator::Or,
1567                Arc::new(phys_expr::BinaryExpr::new(
1568                    Arc::clone(expr_builder.scalar_expr()),
1569                    Operator::NotEq,
1570                    max_column_expr,
1571                )),
1572            ))
1573        }
1574        Operator::Eq => {
1575            // column = literal => (min, max) = literal => min <= literal && literal <= max
1576            // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1577            let min_column_expr = expr_builder.min_column_expr()?;
1578            let max_column_expr = expr_builder.max_column_expr()?;
1579            Arc::new(phys_expr::BinaryExpr::new(
1580                Arc::new(phys_expr::BinaryExpr::new(
1581                    min_column_expr,
1582                    Operator::LtEq,
1583                    Arc::clone(expr_builder.scalar_expr()),
1584                )),
1585                Operator::And,
1586                Arc::new(phys_expr::BinaryExpr::new(
1587                    Arc::clone(expr_builder.scalar_expr()),
1588                    Operator::LtEq,
1589                    max_column_expr,
1590                )),
1591            ))
1592        }
1593        Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1594        Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1595            plan_datafusion_err!(
1596                "LIKE expression with wildcard at the beginning is not supported"
1597            )
1598        })?,
1599        Operator::Gt => {
1600            // column > literal => (min, max) > literal => max > literal
1601            Arc::new(phys_expr::BinaryExpr::new(
1602                expr_builder.max_column_expr()?,
1603                Operator::Gt,
1604                Arc::clone(expr_builder.scalar_expr()),
1605            ))
1606        }
1607        Operator::GtEq => {
1608            // column >= literal => (min, max) >= literal => max >= literal
1609            Arc::new(phys_expr::BinaryExpr::new(
1610                expr_builder.max_column_expr()?,
1611                Operator::GtEq,
1612                Arc::clone(expr_builder.scalar_expr()),
1613            ))
1614        }
1615        Operator::Lt => {
1616            // column < literal => (min, max) < literal => min < literal
1617            Arc::new(phys_expr::BinaryExpr::new(
1618                expr_builder.min_column_expr()?,
1619                Operator::Lt,
1620                Arc::clone(expr_builder.scalar_expr()),
1621            ))
1622        }
1623        Operator::LtEq => {
1624            // column <= literal => (min, max) <= literal => min <= literal
1625            Arc::new(phys_expr::BinaryExpr::new(
1626                expr_builder.min_column_expr()?,
1627                Operator::LtEq,
1628                Arc::clone(expr_builder.scalar_expr()),
1629            ))
1630        }
1631        // other expressions are not supported
1632        _ => {
1633            return plan_err!(
1634                "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1635            );
1636        }
1637    };
1638    let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1639    Ok(statistics_expr)
1640}
1641
1642/// returns the string literal of the scalar value if it is a string
1643fn unpack_string(s: &ScalarValue) -> Option<&str> {
1644    s.try_as_str().flatten()
1645}
1646
1647fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1648    if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1649        let s = unpack_string(lit.value())?;
1650        return Some(s);
1651    }
1652    None
1653}
1654
1655/// Convert `column LIKE literal` where P is a constant prefix of the literal
1656/// to a range check on the column: `P <= column && column < P'`, where P' is the
1657/// lowest string after all P* strings.
1658fn build_like_match(
1659    expr_builder: &mut PruningExpressionBuilder,
1660) -> Option<Arc<dyn PhysicalExpr>> {
1661    // column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
1662    // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1663    // column LIKE '%foo' => min <= '' && '' <= max => true
1664    // column LIKE '%foo%' => min <= '' && '' <= max => true
1665    // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1666
1667    // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1668    //  this may involve building the physical expressions that call lower() and upper()
1669    let min_column_expr = expr_builder.min_column_expr().ok()?;
1670    let max_column_expr = expr_builder.max_column_expr().ok()?;
1671    let scalar_expr = expr_builder.scalar_expr();
1672    // check that the scalar is a string literal
1673    let s = extract_string_literal(scalar_expr)?;
1674    // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1675    let first_wildcard_index = s.find(['%', '_']);
1676    if first_wildcard_index == Some(0) {
1677        // there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
1678        return None;
1679    }
1680    let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1681        let prefix = &s[..wildcard_index];
1682        let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1683            prefix.to_string(),
1684        ))));
1685        let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1686            increment_utf8(prefix)?,
1687        ))));
1688        (lower_bound_lit, upper_bound_lit)
1689    } else {
1690        // the like expression is a literal and can be converted into a comparison
1691        let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1692            s.to_string(),
1693        ))));
1694        (Arc::clone(&bound), bound)
1695    };
1696    let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1697        lower_bound,
1698        Operator::LtEq,
1699        Arc::clone(&max_column_expr),
1700    ));
1701    let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1702        Arc::clone(&min_column_expr),
1703        Operator::LtEq,
1704        upper_bound,
1705    ));
1706    let combined = Arc::new(phys_expr::BinaryExpr::new(
1707        upper_bound_expr,
1708        Operator::And,
1709        lower_bound_expr,
1710    ));
1711    Some(combined)
1712}
1713
1714// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1715//
1716// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1717// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1718// looking for rows that don't begin with `const_prefix` can never be true)
1719fn build_not_like_match(
1720    expr_builder: &mut PruningExpressionBuilder<'_>,
1721) -> Result<Arc<dyn PhysicalExpr>> {
1722    // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1723
1724    let min_column_expr = expr_builder.min_column_expr()?;
1725    let max_column_expr = expr_builder.max_column_expr()?;
1726
1727    let scalar_expr = expr_builder.scalar_expr();
1728
1729    let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1730        plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1731    })?;
1732
1733    let (const_prefix, remaining) = split_constant_prefix(pattern);
1734    if const_prefix.is_empty() || remaining != "%" {
1735        // we can not handle `%` at the beginning or in the middle of the pattern
1736        // Example: For pattern "foo%bar", the row group might include values like
1737        // ["foobar", "food", "foodbar"], making it unsafe to prune.
1738        // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1739        // match the pattern, intermediate values like "food" may not
1740        // match the full pattern "foo%bar", making pruning unsafe.
1741        // (truncate foo%bar to foo% have same problem)
1742
1743        // we can not handle pattern containing `_`
1744        // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1745        // which means not every row is guaranteed to match the pattern.
1746        return Err(plan_datafusion_err!(
1747            "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1748        ));
1749    }
1750
1751    let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1752        true,
1753        false,
1754        Arc::clone(&min_column_expr),
1755        Arc::clone(scalar_expr),
1756    ));
1757
1758    let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1759        true,
1760        false,
1761        Arc::clone(&max_column_expr),
1762        Arc::clone(scalar_expr),
1763    ));
1764
1765    Ok(Arc::new(phys_expr::BinaryExpr::new(
1766        min_col_not_like_epxr,
1767        Operator::Or,
1768        max_col_not_like_expr,
1769    )))
1770}
1771
1772/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1773fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1774    let char_indices = pattern.char_indices().collect::<Vec<_>>();
1775    for i in 0..char_indices.len() {
1776        let (idx, char) = char_indices[i];
1777        if char == '%' || char == '_' {
1778            if i != 0 && char_indices[i - 1].1 == '\\' {
1779                // ecsaped by `\`
1780                continue;
1781            }
1782            return (&pattern[..idx], &pattern[idx..]);
1783        }
1784    }
1785    (pattern, "")
1786}
1787
1788/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1789/// This makes it so that the returned string will always compare greater than the input string
1790/// or any other string with the same prefix.
1791/// This is necessary since the statistics may have been truncated: if we have a min statistic
1792/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1793/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1794/// In this example `increment_utf8("foo") == "fop"
1795fn increment_utf8(data: &str) -> Option<String> {
1796    // Helper function to check if a character is valid to use
1797    fn is_valid_unicode(c: char) -> bool {
1798        let cp = c as u32;
1799
1800        // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1801        if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1802            return false;
1803        }
1804
1805        // Filter out private use area
1806        if cp >= 0x110000 {
1807            return false;
1808        }
1809
1810        true
1811    }
1812
1813    // Convert string to vector of code points
1814    let mut code_points: Vec<char> = data.chars().collect();
1815
1816    // Work backwards through code points
1817    for idx in (0..code_points.len()).rev() {
1818        let original = code_points[idx] as u32;
1819
1820        // Try incrementing the code point
1821        if let Some(next_char) = char::from_u32(original + 1) {
1822            if is_valid_unicode(next_char) {
1823                code_points[idx] = next_char;
1824                // truncate the string to the current index
1825                code_points.truncate(idx + 1);
1826                return Some(code_points.into_iter().collect());
1827            }
1828        }
1829    }
1830
1831    None
1832}
1833
1834/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
1835///
1836/// This is important not only as an optimization but also because statistics may not be
1837/// accurate for columns that are all nulls.
1838/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
1839/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
1840///
1841/// For example:
1842///
1843/// `x_min <= 10 AND 10 <= x_max`
1844///
1845/// will become
1846///
1847/// ```sql
1848/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
1849/// ````
1850///
1851/// If the column is known to be all nulls, then the expression
1852/// `x_null_count = x_row_count` will be true, which will cause the
1853/// boolean expression to return false. Therefore, prune out the container.
1854fn wrap_null_count_check_expr(
1855    statistics_expr: Arc<dyn PhysicalExpr>,
1856    expr_builder: &mut PruningExpressionBuilder,
1857) -> Result<Arc<dyn PhysicalExpr>> {
1858    // x_null_count != x_row_count
1859    let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1860        expr_builder.null_count_column_expr()?,
1861        Operator::NotEq,
1862        expr_builder.row_count_column_expr()?,
1863    ));
1864
1865    // (x_null_count != x_row_count) AND (<statistics_expr>)
1866    Ok(Arc::new(phys_expr::BinaryExpr::new(
1867        not_when_null_count_eq_row_count,
1868        Operator::And,
1869        statistics_expr,
1870    )))
1871}
1872
1873#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1874pub(crate) enum StatisticsType {
1875    Min,
1876    Max,
1877    NullCount,
1878    RowCount,
1879}
1880
1881#[cfg(test)]
1882mod tests {
1883    use std::collections::HashMap;
1884    use std::ops::{Not, Rem};
1885
1886    use super::*;
1887    use datafusion_common::assert_batches_eq;
1888    use datafusion_expr::{col, lit};
1889
1890    use arrow::array::Decimal128Array;
1891    use arrow::{
1892        array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1893        datatypes::TimeUnit,
1894    };
1895    use datafusion_expr::expr::InList;
1896    use datafusion_expr::{cast, is_null, try_cast, Expr};
1897    use datafusion_functions_nested::expr_fn::{array_has, make_array};
1898    use datafusion_physical_expr::expressions as phys_expr;
1899    use datafusion_physical_expr::planner::logical2physical;
1900
1901    #[derive(Debug, Default)]
1902    /// Mock statistic provider for tests
1903    ///
1904    /// Each row represents the statistics for a "container" (which
1905    /// might represent an entire parquet file, or directory of files,
1906    /// or some other collection of data for which we had statistics)
1907    ///
1908    /// Note All `ArrayRefs` must be the same size.
1909    struct ContainerStats {
1910        min: Option<ArrayRef>,
1911        max: Option<ArrayRef>,
1912        /// Optional values
1913        null_counts: Option<ArrayRef>,
1914        row_counts: Option<ArrayRef>,
1915        /// Optional known values (e.g. mimic a bloom filter)
1916        /// (value, contained)
1917        /// If present, all BooleanArrays must be the same size as min/max
1918        contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1919    }
1920
1921    impl ContainerStats {
1922        fn new() -> Self {
1923            Default::default()
1924        }
1925        fn new_decimal128(
1926            min: impl IntoIterator<Item = Option<i128>>,
1927            max: impl IntoIterator<Item = Option<i128>>,
1928            precision: u8,
1929            scale: i8,
1930        ) -> Self {
1931            Self::new()
1932                .with_min(Arc::new(
1933                    min.into_iter()
1934                        .collect::<Decimal128Array>()
1935                        .with_precision_and_scale(precision, scale)
1936                        .unwrap(),
1937                ))
1938                .with_max(Arc::new(
1939                    max.into_iter()
1940                        .collect::<Decimal128Array>()
1941                        .with_precision_and_scale(precision, scale)
1942                        .unwrap(),
1943                ))
1944        }
1945
1946        fn new_i64(
1947            min: impl IntoIterator<Item = Option<i64>>,
1948            max: impl IntoIterator<Item = Option<i64>>,
1949        ) -> Self {
1950            Self::new()
1951                .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1952                .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1953        }
1954
1955        fn new_i32(
1956            min: impl IntoIterator<Item = Option<i32>>,
1957            max: impl IntoIterator<Item = Option<i32>>,
1958        ) -> Self {
1959            Self::new()
1960                .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1961                .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1962        }
1963
1964        fn new_utf8<'a>(
1965            min: impl IntoIterator<Item = Option<&'a str>>,
1966            max: impl IntoIterator<Item = Option<&'a str>>,
1967        ) -> Self {
1968            Self::new()
1969                .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1970                .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1971        }
1972
1973        fn new_bool(
1974            min: impl IntoIterator<Item = Option<bool>>,
1975            max: impl IntoIterator<Item = Option<bool>>,
1976        ) -> Self {
1977            Self::new()
1978                .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1979                .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1980        }
1981
1982        fn min(&self) -> Option<ArrayRef> {
1983            self.min.clone()
1984        }
1985
1986        fn max(&self) -> Option<ArrayRef> {
1987            self.max.clone()
1988        }
1989
1990        fn null_counts(&self) -> Option<ArrayRef> {
1991            self.null_counts.clone()
1992        }
1993
1994        fn row_counts(&self) -> Option<ArrayRef> {
1995            self.row_counts.clone()
1996        }
1997
1998        /// return an iterator over all arrays in this statistics
1999        fn arrays(&self) -> Vec<ArrayRef> {
2000            let contained_arrays = self
2001                .contained
2002                .iter()
2003                .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2004
2005            [
2006                self.min.as_ref().cloned(),
2007                self.max.as_ref().cloned(),
2008                self.null_counts.as_ref().cloned(),
2009                self.row_counts.as_ref().cloned(),
2010            ]
2011            .into_iter()
2012            .flatten()
2013            .chain(contained_arrays)
2014            .collect()
2015        }
2016
2017        /// Returns the number of containers represented by this statistics This
2018        /// picks the length of the first array as all arrays must have the same
2019        /// length (which is verified by `assert_invariants`).
2020        fn len(&self) -> usize {
2021            // pick the first non zero length
2022            self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2023        }
2024
2025        /// Ensure that the lengths of all arrays are consistent
2026        fn assert_invariants(&self) {
2027            let mut prev_len = None;
2028
2029            for len in self.arrays().iter().map(|a| a.len()) {
2030                // Get a length, if we don't already have one
2031                match prev_len {
2032                    None => {
2033                        prev_len = Some(len);
2034                    }
2035                    Some(prev_len) => {
2036                        assert_eq!(prev_len, len);
2037                    }
2038                }
2039            }
2040        }
2041
2042        /// Add min values
2043        fn with_min(mut self, min: ArrayRef) -> Self {
2044            self.min = Some(min);
2045            self
2046        }
2047
2048        /// Add max values
2049        fn with_max(mut self, max: ArrayRef) -> Self {
2050            self.max = Some(max);
2051            self
2052        }
2053
2054        /// Add null counts. There must be the same number of null counts as
2055        /// there are containers
2056        fn with_null_counts(
2057            mut self,
2058            counts: impl IntoIterator<Item = Option<u64>>,
2059        ) -> Self {
2060            let null_counts: ArrayRef =
2061                Arc::new(counts.into_iter().collect::<UInt64Array>());
2062
2063            self.assert_invariants();
2064            self.null_counts = Some(null_counts);
2065            self
2066        }
2067
2068        /// Add row counts. There must be the same number of row counts as
2069        /// there are containers
2070        fn with_row_counts(
2071            mut self,
2072            counts: impl IntoIterator<Item = Option<u64>>,
2073        ) -> Self {
2074            let row_counts: ArrayRef =
2075                Arc::new(counts.into_iter().collect::<UInt64Array>());
2076
2077            self.assert_invariants();
2078            self.row_counts = Some(row_counts);
2079            self
2080        }
2081
2082        /// Add contained information.
2083        pub fn with_contained(
2084            mut self,
2085            values: impl IntoIterator<Item = ScalarValue>,
2086            contained: impl IntoIterator<Item = Option<bool>>,
2087        ) -> Self {
2088            let contained: BooleanArray = contained.into_iter().collect();
2089            let values: HashSet<_> = values.into_iter().collect();
2090
2091            self.contained.push((values, contained));
2092            self.assert_invariants();
2093            self
2094        }
2095
2096        /// get any contained information for the specified values
2097        fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2098            // find the one with the matching values
2099            self.contained
2100                .iter()
2101                .find(|(values, _contained)| values == find_values)
2102                .map(|(_values, contained)| contained.clone())
2103        }
2104    }
2105
2106    #[derive(Debug, Default)]
2107    struct TestStatistics {
2108        // key: column name
2109        stats: HashMap<Column, ContainerStats>,
2110    }
2111
2112    impl TestStatistics {
2113        fn new() -> Self {
2114            Self::default()
2115        }
2116
2117        fn with(
2118            mut self,
2119            name: impl Into<String>,
2120            container_stats: ContainerStats,
2121        ) -> Self {
2122            let col = Column::from_name(name.into());
2123            self.stats.insert(col, container_stats);
2124            self
2125        }
2126
2127        /// Add null counts for the specified column.
2128        /// There must be the same number of null counts as
2129        /// there are containers
2130        fn with_null_counts(
2131            mut self,
2132            name: impl Into<String>,
2133            counts: impl IntoIterator<Item = Option<u64>>,
2134        ) -> Self {
2135            let col = Column::from_name(name.into());
2136
2137            // take stats out and update them
2138            let container_stats = self
2139                .stats
2140                .remove(&col)
2141                .unwrap_or_default()
2142                .with_null_counts(counts);
2143
2144            // put stats back in
2145            self.stats.insert(col, container_stats);
2146            self
2147        }
2148
2149        /// Add row counts for the specified column.
2150        /// There must be the same number of row counts as
2151        /// there are containers
2152        fn with_row_counts(
2153            mut self,
2154            name: impl Into<String>,
2155            counts: impl IntoIterator<Item = Option<u64>>,
2156        ) -> Self {
2157            let col = Column::from_name(name.into());
2158
2159            // take stats out and update them
2160            let container_stats = self
2161                .stats
2162                .remove(&col)
2163                .unwrap_or_default()
2164                .with_row_counts(counts);
2165
2166            // put stats back in
2167            self.stats.insert(col, container_stats);
2168            self
2169        }
2170
2171        /// Add contained information for the specified column.
2172        fn with_contained(
2173            mut self,
2174            name: impl Into<String>,
2175            values: impl IntoIterator<Item = ScalarValue>,
2176            contained: impl IntoIterator<Item = Option<bool>>,
2177        ) -> Self {
2178            let col = Column::from_name(name.into());
2179
2180            // take stats out and update them
2181            let container_stats = self
2182                .stats
2183                .remove(&col)
2184                .unwrap_or_default()
2185                .with_contained(values, contained);
2186
2187            // put stats back in
2188            self.stats.insert(col, container_stats);
2189            self
2190        }
2191    }
2192
2193    impl PruningStatistics for TestStatistics {
2194        fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2195            self.stats
2196                .get(column)
2197                .map(|container_stats| container_stats.min())
2198                .unwrap_or(None)
2199        }
2200
2201        fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2202            self.stats
2203                .get(column)
2204                .map(|container_stats| container_stats.max())
2205                .unwrap_or(None)
2206        }
2207
2208        fn num_containers(&self) -> usize {
2209            self.stats
2210                .values()
2211                .next()
2212                .map(|container_stats| container_stats.len())
2213                .unwrap_or(0)
2214        }
2215
2216        fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2217            self.stats
2218                .get(column)
2219                .map(|container_stats| container_stats.null_counts())
2220                .unwrap_or(None)
2221        }
2222
2223        fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2224            self.stats
2225                .get(column)
2226                .map(|container_stats| container_stats.row_counts())
2227                .unwrap_or(None)
2228        }
2229
2230        fn contained(
2231            &self,
2232            column: &Column,
2233            values: &HashSet<ScalarValue>,
2234        ) -> Option<BooleanArray> {
2235            self.stats
2236                .get(column)
2237                .and_then(|container_stats| container_stats.contained(values))
2238        }
2239    }
2240
2241    /// Returns the specified min/max container values
2242    struct OneContainerStats {
2243        min_values: Option<ArrayRef>,
2244        max_values: Option<ArrayRef>,
2245        num_containers: usize,
2246    }
2247
2248    impl PruningStatistics for OneContainerStats {
2249        fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2250            self.min_values.clone()
2251        }
2252
2253        fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2254            self.max_values.clone()
2255        }
2256
2257        fn num_containers(&self) -> usize {
2258            self.num_containers
2259        }
2260
2261        fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2262            None
2263        }
2264
2265        fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2266            None
2267        }
2268
2269        fn contained(
2270            &self,
2271            _column: &Column,
2272            _values: &HashSet<ScalarValue>,
2273        ) -> Option<BooleanArray> {
2274            None
2275        }
2276    }
2277
2278    /// Row count should only be referenced once in the pruning expression, even if we need the row count
2279    /// for multiple columns.
2280    #[test]
2281    fn test_unique_row_count_field_and_column() {
2282        // c1 = 100 AND c2 = 200
2283        let schema: SchemaRef = Arc::new(Schema::new(vec![
2284            Field::new("c1", DataType::Int32, true),
2285            Field::new("c2", DataType::Int32, true),
2286        ]));
2287        let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2288        let expr = logical2physical(&expr, &schema);
2289        let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2290        // note pruning expression refers to row_count twice
2291        assert_eq!(
2292            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2293            p.predicate_expr.to_string()
2294        );
2295
2296        // Fields in required schema should be unique, otherwise when creating batches
2297        // it will fail because of duplicate field names
2298        let mut fields = HashSet::new();
2299        for (_col, _ty, field) in p.required_columns().iter() {
2300            let was_new = fields.insert(field);
2301            if !was_new {
2302                panic!(
2303                    "Duplicate field in required schema: {:?}. Previous fields:\n{:#?}",
2304                    field, fields
2305                );
2306            }
2307        }
2308    }
2309
2310    #[test]
2311    fn prune_all_rows_null_counts() {
2312        // if null_count = row_count then we should prune the container for i = 0
2313        // regardless of the statistics
2314        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2315        let statistics = TestStatistics::new().with(
2316            "i",
2317            ContainerStats::new_i32(
2318                vec![Some(0)], // min
2319                vec![Some(0)], // max
2320            )
2321            .with_null_counts(vec![Some(1)])
2322            .with_row_counts(vec![Some(1)]),
2323        );
2324        let expected_ret = &[false];
2325        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2326
2327        // this should be true even if the container stats are missing
2328        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2329        let container_stats = ContainerStats {
2330            min: Some(Arc::new(Int32Array::from(vec![None]))),
2331            max: Some(Arc::new(Int32Array::from(vec![None]))),
2332            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2333            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2334            ..ContainerStats::default()
2335        };
2336        let statistics = TestStatistics::new().with("i", container_stats);
2337        let expected_ret = &[false];
2338        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2339
2340        // If the null counts themselves are missing we should be able to fall back to the stats
2341        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2342        let container_stats = ContainerStats {
2343            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2344            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2345            null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2346            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2347            ..ContainerStats::default()
2348        };
2349        let statistics = TestStatistics::new().with("i", container_stats);
2350        let expected_ret = &[true];
2351        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2352        let expected_ret = &[false];
2353        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2354
2355        // Same for the row counts
2356        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2357        let container_stats = ContainerStats {
2358            min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2359            max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2360            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2361            row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2362            ..ContainerStats::default()
2363        };
2364        let statistics = TestStatistics::new().with("i", container_stats);
2365        let expected_ret = &[true];
2366        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2367        let expected_ret = &[false];
2368        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2369    }
2370
2371    #[test]
2372    fn prune_missing_statistics() {
2373        // If the min or max stats are missing we should not prune
2374        // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2375        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2376        let container_stats = ContainerStats {
2377            min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2378            max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2379            null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2380            row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2381            ..ContainerStats::default()
2382        };
2383        let statistics = TestStatistics::new().with("i", container_stats);
2384        let expected_ret = &[true, true];
2385        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2386        let expected_ret = &[false, true];
2387        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2388        let expected_ret = &[true, false];
2389        prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2390    }
2391
2392    #[test]
2393    fn prune_null_stats() {
2394        // if null_count = row_count then we should prune the container for i = 0
2395        // regardless of the statistics
2396        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2397
2398        let statistics = TestStatistics::new().with(
2399            "i",
2400            ContainerStats::new_i32(
2401                vec![Some(0)], // min
2402                vec![Some(0)], // max
2403            )
2404            .with_null_counts(vec![Some(1)])
2405            .with_row_counts(vec![Some(1)]),
2406        );
2407
2408        let expected_ret = &[false];
2409
2410        // i = 0
2411        prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2412    }
2413
2414    #[test]
2415    fn test_build_statistics_record_batch() {
2416        // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2417        let required_columns = RequiredColumns::from(vec![
2418            // min of original column s1, named s1_min
2419            (
2420                phys_expr::Column::new("s1", 1),
2421                StatisticsType::Min,
2422                Field::new("s1_min", DataType::Int32, true),
2423            ),
2424            // max of original column s2, named s2_max
2425            (
2426                phys_expr::Column::new("s2", 2),
2427                StatisticsType::Max,
2428                Field::new("s2_max", DataType::Int32, true),
2429            ),
2430            // max of original column s3, named s3_max
2431            (
2432                phys_expr::Column::new("s3", 3),
2433                StatisticsType::Max,
2434                Field::new("s3_max", DataType::Utf8, true),
2435            ),
2436            // min of original column s3, named s3_min
2437            (
2438                phys_expr::Column::new("s3", 3),
2439                StatisticsType::Min,
2440                Field::new("s3_min", DataType::Utf8, true),
2441            ),
2442        ]);
2443
2444        let statistics = TestStatistics::new()
2445            .with(
2446                "s1",
2447                ContainerStats::new_i32(
2448                    vec![None, None, Some(9), None],  // min
2449                    vec![Some(10), None, None, None], // max
2450                ),
2451            )
2452            .with(
2453                "s2",
2454                ContainerStats::new_i32(
2455                    vec![Some(2), None, None, None],  // min
2456                    vec![Some(20), None, None, None], // max
2457                ),
2458            )
2459            .with(
2460                "s3",
2461                ContainerStats::new_utf8(
2462                    vec![Some("a"), None, None, None],      // min
2463                    vec![Some("q"), None, Some("r"), None], // max
2464                ),
2465            );
2466
2467        let batch =
2468            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2469        let expected = [
2470            "+--------+--------+--------+--------+",
2471            "| s1_min | s2_max | s3_max | s3_min |",
2472            "+--------+--------+--------+--------+",
2473            "|        | 20     | q      | a      |",
2474            "|        |        |        |        |",
2475            "| 9      |        | r      |        |",
2476            "|        |        |        |        |",
2477            "+--------+--------+--------+--------+",
2478        ];
2479
2480        assert_batches_eq!(expected, &[batch]);
2481    }
2482
2483    #[test]
2484    fn test_build_statistics_casting() {
2485        // Test requesting a Timestamp column, but getting statistics as Int64
2486        // which is what Parquet does
2487
2488        // Request a record batch with of s1_min as a timestamp
2489        let required_columns = RequiredColumns::from(vec![(
2490            phys_expr::Column::new("s3", 3),
2491            StatisticsType::Min,
2492            Field::new(
2493                "s1_min",
2494                DataType::Timestamp(TimeUnit::Nanosecond, None),
2495                true,
2496            ),
2497        )]);
2498
2499        // Note the statistics pass back i64 (not timestamp)
2500        let statistics = OneContainerStats {
2501            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2502            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2503            num_containers: 1,
2504        };
2505
2506        let batch =
2507            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2508        let expected = [
2509            "+-------------------------------+",
2510            "| s1_min                        |",
2511            "+-------------------------------+",
2512            "| 1970-01-01T00:00:00.000000010 |",
2513            "+-------------------------------+",
2514        ];
2515
2516        assert_batches_eq!(expected, &[batch]);
2517    }
2518
2519    #[test]
2520    fn test_build_statistics_no_required_stats() {
2521        let required_columns = RequiredColumns::new();
2522
2523        let statistics = OneContainerStats {
2524            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2525            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2526            num_containers: 1,
2527        };
2528
2529        let batch =
2530            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2531        assert_eq!(batch.num_rows(), 1); // had 1 container
2532    }
2533
2534    #[test]
2535    fn test_build_statistics_inconsistent_types() {
2536        // Test requesting a Utf8 column when the stats return some other type
2537
2538        // Request a record batch with of s1_min as a timestamp
2539        let required_columns = RequiredColumns::from(vec![(
2540            phys_expr::Column::new("s3", 3),
2541            StatisticsType::Min,
2542            Field::new("s1_min", DataType::Utf8, true),
2543        )]);
2544
2545        // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2546        let statistics = OneContainerStats {
2547            min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2548            max_values: None,
2549            num_containers: 1,
2550        };
2551
2552        let batch =
2553            build_statistics_record_batch(&statistics, &required_columns).unwrap();
2554        let expected = [
2555            "+--------+",
2556            "| s1_min |",
2557            "+--------+",
2558            "|        |",
2559            "+--------+",
2560        ];
2561
2562        assert_batches_eq!(expected, &[batch]);
2563    }
2564
2565    #[test]
2566    fn test_build_statistics_inconsistent_length() {
2567        // return an inconsistent length to the actual statistics arrays
2568        let required_columns = RequiredColumns::from(vec![(
2569            phys_expr::Column::new("s1", 3),
2570            StatisticsType::Min,
2571            Field::new("s1_min", DataType::Int64, true),
2572        )]);
2573
2574        // Note the statistics pass back i64 (not timestamp)
2575        let statistics = OneContainerStats {
2576            min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2577            max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2578            num_containers: 3,
2579        };
2580
2581        let result =
2582            build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2583        assert!(
2584            result
2585                .to_string()
2586                .contains("mismatched statistics length. Expected 3, got 1"),
2587            "{}",
2588            result
2589        );
2590    }
2591
2592    #[test]
2593    fn row_group_predicate_eq() -> Result<()> {
2594        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2595        let expected_expr =
2596            "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2597
2598        // test column on the left
2599        let expr = col("c1").eq(lit(1));
2600        let predicate_expr =
2601            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2602        assert_eq!(predicate_expr.to_string(), expected_expr);
2603
2604        // test column on the right
2605        let expr = lit(1).eq(col("c1"));
2606        let predicate_expr =
2607            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2608        assert_eq!(predicate_expr.to_string(), expected_expr);
2609
2610        Ok(())
2611    }
2612
2613    #[test]
2614    fn row_group_predicate_not_eq() -> Result<()> {
2615        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2616        let expected_expr =
2617            "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2618
2619        // test column on the left
2620        let expr = col("c1").not_eq(lit(1));
2621        let predicate_expr =
2622            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2623        assert_eq!(predicate_expr.to_string(), expected_expr);
2624
2625        // test column on the right
2626        let expr = lit(1).not_eq(col("c1"));
2627        let predicate_expr =
2628            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2629        assert_eq!(predicate_expr.to_string(), expected_expr);
2630
2631        Ok(())
2632    }
2633
2634    #[test]
2635    fn row_group_predicate_gt() -> Result<()> {
2636        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2637        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2638
2639        // test column on the left
2640        let expr = col("c1").gt(lit(1));
2641        let predicate_expr =
2642            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2643        assert_eq!(predicate_expr.to_string(), expected_expr);
2644
2645        // test column on the right
2646        let expr = lit(1).lt(col("c1"));
2647        let predicate_expr =
2648            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2649        assert_eq!(predicate_expr.to_string(), expected_expr);
2650
2651        Ok(())
2652    }
2653
2654    #[test]
2655    fn row_group_predicate_gt_eq() -> Result<()> {
2656        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2657        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2658
2659        // test column on the left
2660        let expr = col("c1").gt_eq(lit(1));
2661        let predicate_expr =
2662            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2663        assert_eq!(predicate_expr.to_string(), expected_expr);
2664        // test column on the right
2665        let expr = lit(1).lt_eq(col("c1"));
2666        let predicate_expr =
2667            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2668        assert_eq!(predicate_expr.to_string(), expected_expr);
2669
2670        Ok(())
2671    }
2672
2673    #[test]
2674    fn row_group_predicate_lt() -> Result<()> {
2675        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2676        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2677
2678        // test column on the left
2679        let expr = col("c1").lt(lit(1));
2680        let predicate_expr =
2681            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2682        assert_eq!(predicate_expr.to_string(), expected_expr);
2683
2684        // test column on the right
2685        let expr = lit(1).gt(col("c1"));
2686        let predicate_expr =
2687            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2688        assert_eq!(predicate_expr.to_string(), expected_expr);
2689
2690        Ok(())
2691    }
2692
2693    #[test]
2694    fn row_group_predicate_lt_eq() -> Result<()> {
2695        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2696        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2697
2698        // test column on the left
2699        let expr = col("c1").lt_eq(lit(1));
2700        let predicate_expr =
2701            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2702        assert_eq!(predicate_expr.to_string(), expected_expr);
2703        // test column on the right
2704        let expr = lit(1).gt_eq(col("c1"));
2705        let predicate_expr =
2706            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2707        assert_eq!(predicate_expr.to_string(), expected_expr);
2708
2709        Ok(())
2710    }
2711
2712    #[test]
2713    fn row_group_predicate_and() -> Result<()> {
2714        let schema = Schema::new(vec![
2715            Field::new("c1", DataType::Int32, false),
2716            Field::new("c2", DataType::Int32, false),
2717            Field::new("c3", DataType::Int32, false),
2718        ]);
2719        // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2720        let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2721        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2722        let predicate_expr =
2723            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2724        assert_eq!(predicate_expr.to_string(), expected_expr);
2725
2726        Ok(())
2727    }
2728
2729    #[test]
2730    fn row_group_predicate_or() -> Result<()> {
2731        let schema = Schema::new(vec![
2732            Field::new("c1", DataType::Int32, false),
2733            Field::new("c2", DataType::Int32, false),
2734        ]);
2735        // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2736        let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2737        let expected_expr = "true";
2738        let predicate_expr =
2739            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2740        assert_eq!(predicate_expr.to_string(), expected_expr);
2741
2742        Ok(())
2743    }
2744
2745    #[test]
2746    fn row_group_predicate_not() -> Result<()> {
2747        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2748        let expected_expr = "true";
2749
2750        let expr = col("c1").not();
2751        let predicate_expr =
2752            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2753        assert_eq!(predicate_expr.to_string(), expected_expr);
2754
2755        Ok(())
2756    }
2757
2758    #[test]
2759    fn row_group_predicate_not_bool() -> Result<()> {
2760        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2761        let expected_expr = "NOT c1_min@0 AND c1_max@1";
2762
2763        let expr = col("c1").not();
2764        let predicate_expr =
2765            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2766        assert_eq!(predicate_expr.to_string(), expected_expr);
2767
2768        Ok(())
2769    }
2770
2771    #[test]
2772    fn row_group_predicate_bool() -> Result<()> {
2773        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2774        let expected_expr = "c1_min@0 OR c1_max@1";
2775
2776        let expr = col("c1");
2777        let predicate_expr =
2778            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2779        assert_eq!(predicate_expr.to_string(), expected_expr);
2780
2781        Ok(())
2782    }
2783
2784    #[test]
2785    fn row_group_predicate_lt_bool() -> Result<()> {
2786        let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2787        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2788
2789        // DF doesn't support arithmetic on boolean columns so
2790        // this predicate will error when evaluated
2791        let expr = col("c1").lt(lit(true));
2792        let predicate_expr =
2793            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2794        assert_eq!(predicate_expr.to_string(), expected_expr);
2795
2796        Ok(())
2797    }
2798
2799    #[test]
2800    fn row_group_predicate_required_columns() -> Result<()> {
2801        let schema = Schema::new(vec![
2802            Field::new("c1", DataType::Int32, false),
2803            Field::new("c2", DataType::Int32, false),
2804        ]);
2805        let mut required_columns = RequiredColumns::new();
2806        // c1 < 1 and (c2 = 2 or c2 = 3)
2807        let expr = col("c1")
2808            .lt(lit(1))
2809            .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2810        let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2811        let predicate_expr =
2812            test_build_predicate_expression(&expr, &schema, &mut required_columns);
2813        assert_eq!(predicate_expr.to_string(), expected_expr);
2814        println!("required_columns: {:#?}", required_columns); // for debugging assertions below
2815                                                               // c1 < 1 should add c1_min
2816        let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2817        assert_eq!(
2818            required_columns.columns[0],
2819            (
2820                phys_expr::Column::new("c1", 0),
2821                StatisticsType::Min,
2822                c1_min_field.with_nullable(true) // could be nullable if stats are not present
2823            )
2824        );
2825        // c1 < 1 should add c1_null_count
2826        let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2827        assert_eq!(
2828            required_columns.columns[1],
2829            (
2830                phys_expr::Column::new("c1", 0),
2831                StatisticsType::NullCount,
2832                c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
2833            )
2834        );
2835        // c1 < 1 should add row_count
2836        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2837        assert_eq!(
2838            required_columns.columns[2],
2839            (
2840                phys_expr::Column::new("c1", 0),
2841                StatisticsType::RowCount,
2842                row_count_field.with_nullable(true) // could be nullable if stats are not present
2843            )
2844        );
2845        // c2 = 2 should add c2_min and c2_max
2846        let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2847        assert_eq!(
2848            required_columns.columns[3],
2849            (
2850                phys_expr::Column::new("c2", 1),
2851                StatisticsType::Min,
2852                c2_min_field.with_nullable(true) // could be nullable if stats are not present
2853            )
2854        );
2855        let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2856        assert_eq!(
2857            required_columns.columns[4],
2858            (
2859                phys_expr::Column::new("c2", 1),
2860                StatisticsType::Max,
2861                c2_max_field.with_nullable(true) // could be nullable if stats are not present
2862            )
2863        );
2864        // c2 = 2 should add c2_null_count
2865        let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2866        assert_eq!(
2867            required_columns.columns[5],
2868            (
2869                phys_expr::Column::new("c2", 1),
2870                StatisticsType::NullCount,
2871                c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
2872            )
2873        );
2874        // c2 = 1 should add row_count
2875        let row_count_field = Field::new("row_count", DataType::UInt64, false);
2876        assert_eq!(
2877            required_columns.columns[2],
2878            (
2879                phys_expr::Column::new("c1", 0),
2880                StatisticsType::RowCount,
2881                row_count_field.with_nullable(true) // could be nullable if stats are not present
2882            )
2883        );
2884        // c2 = 3 shouldn't add any new statistics fields
2885        assert_eq!(required_columns.columns.len(), 6);
2886
2887        Ok(())
2888    }
2889
2890    #[test]
2891    fn row_group_predicate_in_list() -> Result<()> {
2892        let schema = Schema::new(vec![
2893            Field::new("c1", DataType::Int32, false),
2894            Field::new("c2", DataType::Int32, false),
2895        ]);
2896        // test c1 in(1, 2, 3)
2897        let expr = Expr::InList(InList::new(
2898            Box::new(col("c1")),
2899            vec![lit(1), lit(2), lit(3)],
2900            false,
2901        ));
2902        let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2903        let predicate_expr =
2904            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2905        assert_eq!(predicate_expr.to_string(), expected_expr);
2906
2907        Ok(())
2908    }
2909
2910    #[test]
2911    fn row_group_predicate_in_list_empty() -> Result<()> {
2912        let schema = Schema::new(vec![
2913            Field::new("c1", DataType::Int32, false),
2914            Field::new("c2", DataType::Int32, false),
2915        ]);
2916        // test c1 in()
2917        let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2918        let expected_expr = "true";
2919        let predicate_expr =
2920            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2921        assert_eq!(predicate_expr.to_string(), expected_expr);
2922
2923        Ok(())
2924    }
2925
2926    #[test]
2927    fn row_group_predicate_in_list_negated() -> Result<()> {
2928        let schema = Schema::new(vec![
2929            Field::new("c1", DataType::Int32, false),
2930            Field::new("c2", DataType::Int32, false),
2931        ]);
2932        // test c1 not in(1, 2, 3)
2933        let expr = Expr::InList(InList::new(
2934            Box::new(col("c1")),
2935            vec![lit(1), lit(2), lit(3)],
2936            true,
2937        ));
2938        let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2939        let predicate_expr =
2940            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2941        assert_eq!(predicate_expr.to_string(), expected_expr);
2942
2943        Ok(())
2944    }
2945
2946    #[test]
2947    fn row_group_predicate_between() -> Result<()> {
2948        let schema = Schema::new(vec![
2949            Field::new("c1", DataType::Int32, false),
2950            Field::new("c2", DataType::Int32, false),
2951        ]);
2952
2953        // test c1 BETWEEN 1 AND 5
2954        let expr1 = col("c1").between(lit(1), lit(5));
2955
2956        // test 1 <= c1 <= 5
2957        let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2958
2959        let predicate_expr1 =
2960            test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2961
2962        let predicate_expr2 =
2963            test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2964        assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2965
2966        Ok(())
2967    }
2968
2969    #[test]
2970    fn row_group_predicate_between_with_in_list() -> Result<()> {
2971        let schema = Schema::new(vec![
2972            Field::new("c1", DataType::Int32, false),
2973            Field::new("c2", DataType::Int32, false),
2974        ]);
2975        // test c1 in(1, 2)
2976        let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2977
2978        // test c2 BETWEEN 4 AND 5
2979        let expr2 = col("c2").between(lit(4), lit(5));
2980
2981        // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
2982        let expr3 = expr1.and(expr2);
2983
2984        let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2985        let predicate_expr =
2986            test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2987        assert_eq!(predicate_expr.to_string(), expected_expr);
2988
2989        Ok(())
2990    }
2991
2992    #[test]
2993    fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2994        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2995        // test c1 in(1..21)
2996        // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
2997        // always true
2998        let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2999
3000        let expected_expr = "true";
3001        let predicate_expr =
3002            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3003        assert_eq!(predicate_expr.to_string(), expected_expr);
3004
3005        Ok(())
3006    }
3007
3008    #[test]
3009    fn row_group_predicate_cast() -> Result<()> {
3010        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3011        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3012
3013        // test cast(c1 as int64) = 1
3014        // test column on the left
3015        let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3016        let predicate_expr =
3017            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3018        assert_eq!(predicate_expr.to_string(), expected_expr);
3019
3020        // test column on the right
3021        let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3022        let predicate_expr =
3023            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3024        assert_eq!(predicate_expr.to_string(), expected_expr);
3025
3026        let expected_expr =
3027            "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3028
3029        // test column on the left
3030        let expr =
3031            try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3032        let predicate_expr =
3033            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3034        assert_eq!(predicate_expr.to_string(), expected_expr);
3035
3036        // test column on the right
3037        let expr =
3038            lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3039        let predicate_expr =
3040            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3041        assert_eq!(predicate_expr.to_string(), expected_expr);
3042
3043        Ok(())
3044    }
3045
3046    #[test]
3047    fn row_group_predicate_cast_list() -> Result<()> {
3048        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3049        // test cast(c1 as int64) in int64(1, 2, 3)
3050        let expr = Expr::InList(InList::new(
3051            Box::new(cast(col("c1"), DataType::Int64)),
3052            vec![
3053                lit(ScalarValue::Int64(Some(1))),
3054                lit(ScalarValue::Int64(Some(2))),
3055                lit(ScalarValue::Int64(Some(3))),
3056            ],
3057            false,
3058        ));
3059        let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3060        let predicate_expr =
3061            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3062        assert_eq!(predicate_expr.to_string(), expected_expr);
3063
3064        let expr = Expr::InList(InList::new(
3065            Box::new(cast(col("c1"), DataType::Int64)),
3066            vec![
3067                lit(ScalarValue::Int64(Some(1))),
3068                lit(ScalarValue::Int64(Some(2))),
3069                lit(ScalarValue::Int64(Some(3))),
3070            ],
3071            true,
3072        ));
3073        let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3074        let predicate_expr =
3075            test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3076        assert_eq!(predicate_expr.to_string(), expected_expr);
3077
3078        Ok(())
3079    }
3080
3081    #[test]
3082    fn prune_decimal_data() {
3083        // decimal(9,2)
3084        let schema = Arc::new(Schema::new(vec![Field::new(
3085            "s1",
3086            DataType::Decimal128(9, 2),
3087            true,
3088        )]));
3089
3090        prune_with_expr(
3091            // s1 > 5
3092            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3093            &schema,
3094            // If the data is written by spark, the physical data type is INT32 in the parquet
3095            // So we use the INT32 type of statistic.
3096            &TestStatistics::new().with(
3097                "s1",
3098                ContainerStats::new_i32(
3099                    vec![Some(0), Some(4), None, Some(3)], // min
3100                    vec![Some(5), Some(6), Some(4), None], // max
3101                ),
3102            ),
3103            &[false, true, false, true],
3104        );
3105
3106        prune_with_expr(
3107            // with cast column to other type
3108            cast(col("s1"), DataType::Decimal128(14, 3))
3109                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3110            &schema,
3111            &TestStatistics::new().with(
3112                "s1",
3113                ContainerStats::new_i32(
3114                    vec![Some(0), Some(4), None, Some(3)], // min
3115                    vec![Some(5), Some(6), Some(4), None], // max
3116                ),
3117            ),
3118            &[false, true, false, true],
3119        );
3120
3121        prune_with_expr(
3122            // with try cast column to other type
3123            try_cast(col("s1"), DataType::Decimal128(14, 3))
3124                .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3125            &schema,
3126            &TestStatistics::new().with(
3127                "s1",
3128                ContainerStats::new_i32(
3129                    vec![Some(0), Some(4), None, Some(3)], // min
3130                    vec![Some(5), Some(6), Some(4), None], // max
3131                ),
3132            ),
3133            &[false, true, false, true],
3134        );
3135
3136        // decimal(18,2)
3137        let schema = Arc::new(Schema::new(vec![Field::new(
3138            "s1",
3139            DataType::Decimal128(18, 2),
3140            true,
3141        )]));
3142        prune_with_expr(
3143            // s1 > 5
3144            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3145            &schema,
3146            // If the data is written by spark, the physical data type is INT64 in the parquet
3147            // So we use the INT32 type of statistic.
3148            &TestStatistics::new().with(
3149                "s1",
3150                ContainerStats::new_i64(
3151                    vec![Some(0), Some(4), None, Some(3)], // min
3152                    vec![Some(5), Some(6), Some(4), None], // max
3153                ),
3154            ),
3155            &[false, true, false, true],
3156        );
3157
3158        // decimal(23,2)
3159        let schema = Arc::new(Schema::new(vec![Field::new(
3160            "s1",
3161            DataType::Decimal128(23, 2),
3162            true,
3163        )]));
3164
3165        prune_with_expr(
3166            // s1 > 5
3167            col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3168            &schema,
3169            &TestStatistics::new().with(
3170                "s1",
3171                ContainerStats::new_decimal128(
3172                    vec![Some(0), Some(400), None, Some(300)], // min
3173                    vec![Some(500), Some(600), Some(400), None], // max
3174                    23,
3175                    2,
3176                ),
3177            ),
3178            &[false, true, false, true],
3179        );
3180    }
3181
3182    #[test]
3183    fn prune_api() {
3184        let schema = Arc::new(Schema::new(vec![
3185            Field::new("s1", DataType::Utf8, true),
3186            Field::new("s2", DataType::Int32, true),
3187        ]));
3188
3189        let statistics = TestStatistics::new().with(
3190            "s2",
3191            ContainerStats::new_i32(
3192                vec![Some(0), Some(4), None, Some(3)], // min
3193                vec![Some(5), Some(6), None, None],    // max
3194            ),
3195        );
3196        prune_with_expr(
3197            // Prune using s2 > 5
3198            col("s2").gt(lit(5)),
3199            &schema,
3200            &statistics,
3201            // s2 [0, 5] ==> no rows should pass
3202            // s2 [4, 6] ==> some rows could pass
3203            // No stats for s2 ==> some rows could pass
3204            // s2 [3, None] (null max) ==> some rows could pass
3205            &[false, true, true, true],
3206        );
3207
3208        prune_with_expr(
3209            // filter with cast
3210            cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3211            &schema,
3212            &statistics,
3213            &[false, true, true, true],
3214        );
3215    }
3216
3217    #[test]
3218    fn prune_not_eq_data() {
3219        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3220
3221        prune_with_expr(
3222            // Prune using s2 != 'M'
3223            col("s1").not_eq(lit("M")),
3224            &schema,
3225            &TestStatistics::new().with(
3226                "s1",
3227                ContainerStats::new_utf8(
3228                    vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3229                    vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3230                ),
3231            ),
3232            // s1 [A, Z] ==> might have values that pass predicate
3233            // s1 [A, L] ==> all rows pass the predicate
3234            // s1 [N, Z] ==> all rows pass the predicate
3235            // s1 [M, M] ==> all rows do not pass the predicate
3236            // No stats for s2 ==> some rows could pass
3237            // s2 [3, None] (null max) ==> some rows could pass
3238            &[true, true, true, false, true, true],
3239        );
3240    }
3241
3242    /// Creates setup for boolean chunk pruning
3243    ///
3244    /// For predicate "b1" (boolean expr)
3245    /// b1 [false, false] ==> no rows can pass (not keep)
3246    /// b1 [false, true] ==> some rows could pass (must keep)
3247    /// b1 [true, true] ==> all rows must pass (must keep)
3248    /// b1 [NULL, NULL]  ==> unknown (must keep)
3249    /// b1 [false, NULL]  ==> unknown (must keep)
3250    ///
3251    /// For predicate "!b1" (boolean expr)
3252    /// b1 [false, false] ==> all rows pass (must keep)
3253    /// b1 [false, true] ==> some rows could pass (must keep)
3254    /// b1 [true, true] ==> no rows can pass (not keep)
3255    /// b1 [NULL, NULL]  ==> unknown (must keep)
3256    /// b1 [false, NULL]  ==> unknown (must keep)
3257    fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3258        let schema =
3259            Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3260
3261        let statistics = TestStatistics::new().with(
3262            "b1",
3263            ContainerStats::new_bool(
3264                vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3265                vec![Some(false), Some(true), Some(true), None, None],         // max
3266            ),
3267        );
3268        let expected_true = vec![false, true, true, true, true];
3269        let expected_false = vec![true, true, false, true, true];
3270
3271        (schema, statistics, expected_true, expected_false)
3272    }
3273
3274    #[test]
3275    fn prune_bool_const_expr() {
3276        let (schema, statistics, _, _) = bool_setup();
3277
3278        prune_with_expr(
3279            // true
3280            lit(true),
3281            &schema,
3282            &statistics,
3283            &[true, true, true, true, true],
3284        );
3285
3286        prune_with_expr(
3287            // false
3288            // constant literals that do NOT refer to any columns are currently not evaluated at all, hence the result is
3289            // "all true"
3290            lit(false),
3291            &schema,
3292            &statistics,
3293            &[true, true, true, true, true],
3294        );
3295    }
3296
3297    #[test]
3298    fn prune_bool_column() {
3299        let (schema, statistics, expected_true, _) = bool_setup();
3300
3301        prune_with_expr(
3302            // b1
3303            col("b1"),
3304            &schema,
3305            &statistics,
3306            &expected_true,
3307        );
3308    }
3309
3310    #[test]
3311    fn prune_bool_not_column() {
3312        let (schema, statistics, _, expected_false) = bool_setup();
3313
3314        prune_with_expr(
3315            // !b1
3316            col("b1").not(),
3317            &schema,
3318            &statistics,
3319            &expected_false,
3320        );
3321    }
3322
3323    #[test]
3324    fn prune_bool_column_eq_true() {
3325        let (schema, statistics, expected_true, _) = bool_setup();
3326
3327        prune_with_expr(
3328            // b1 = true
3329            col("b1").eq(lit(true)),
3330            &schema,
3331            &statistics,
3332            &expected_true,
3333        );
3334    }
3335
3336    #[test]
3337    fn prune_bool_not_column_eq_true() {
3338        let (schema, statistics, _, expected_false) = bool_setup();
3339
3340        prune_with_expr(
3341            // !b1 = true
3342            col("b1").not().eq(lit(true)),
3343            &schema,
3344            &statistics,
3345            &expected_false,
3346        );
3347    }
3348
3349    /// Creates a setup for chunk pruning, modeling a int32 column "i"
3350    /// with 5 different containers (e.g. RowGroups). They have [min,
3351    /// max]:
3352    ///
3353    /// i [-5, 5]
3354    /// i [1, 11]
3355    /// i [-11, -1]
3356    /// i [NULL, NULL]
3357    /// i [1, NULL]
3358    fn int32_setup() -> (SchemaRef, TestStatistics) {
3359        let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3360
3361        let statistics = TestStatistics::new().with(
3362            "i",
3363            ContainerStats::new_i32(
3364                vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3365                vec![Some(5), Some(11), Some(-1), None, None],     // max
3366            ),
3367        );
3368        (schema, statistics)
3369    }
3370
3371    #[test]
3372    fn prune_int32_col_gt_zero() {
3373        let (schema, statistics) = int32_setup();
3374
3375        // Expression "i > 0" and "-i < 0"
3376        // i [-5, 5] ==> some rows could pass (must keep)
3377        // i [1, 11] ==> all rows must pass (must keep)
3378        // i [-11, -1] ==>  no rows can pass (not keep)
3379        // i [NULL, NULL]  ==> unknown (must keep)
3380        // i [1, NULL]  ==> unknown (must keep)
3381        let expected_ret = &[true, true, false, true, true];
3382
3383        // i > 0
3384        prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3385
3386        // -i < 0
3387        prune_with_expr(
3388            Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3389            &schema,
3390            &statistics,
3391            expected_ret,
3392        );
3393    }
3394
3395    #[test]
3396    fn prune_int32_col_lte_zero() {
3397        let (schema, statistics) = int32_setup();
3398
3399        // Expression "i <= 0" and "-i >= 0"
3400        // i [-5, 5] ==> some rows could pass (must keep)
3401        // i [1, 11] ==> no rows can pass (not keep)
3402        // i [-11, -1] ==>  all rows must pass (must keep)
3403        // i [NULL, NULL]  ==> unknown (must keep)
3404        // i [1, NULL]  ==> no rows can pass (not keep)
3405        let expected_ret = &[true, false, true, true, false];
3406
3407        prune_with_expr(
3408            // i <= 0
3409            col("i").lt_eq(lit(0)),
3410            &schema,
3411            &statistics,
3412            expected_ret,
3413        );
3414
3415        prune_with_expr(
3416            // -i >= 0
3417            Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3418            &schema,
3419            &statistics,
3420            expected_ret,
3421        );
3422    }
3423
3424    #[test]
3425    fn prune_int32_col_lte_zero_cast() {
3426        let (schema, statistics) = int32_setup();
3427
3428        // Expression "cast(i as utf8) <= '0'"
3429        // i [-5, 5] ==> some rows could pass (must keep)
3430        // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
3431        // i [-11, -1] ==>  no rows could pass in theory (conservatively keep)
3432        // i [NULL, NULL]  ==> unknown (must keep)
3433        // i [1, NULL]  ==> no rows can pass (conservatively keep)
3434        let expected_ret = &[true, true, true, true, true];
3435
3436        prune_with_expr(
3437            // cast(i as utf8) <= 0
3438            cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3439            &schema,
3440            &statistics,
3441            expected_ret,
3442        );
3443
3444        prune_with_expr(
3445            // try_cast(i as utf8) <= 0
3446            try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3447            &schema,
3448            &statistics,
3449            expected_ret,
3450        );
3451
3452        prune_with_expr(
3453            // cast(-i as utf8) >= 0
3454            cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3455            &schema,
3456            &statistics,
3457            expected_ret,
3458        );
3459
3460        prune_with_expr(
3461            // try_cast(-i as utf8) >= 0
3462            try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3463            &schema,
3464            &statistics,
3465            expected_ret,
3466        );
3467    }
3468
3469    #[test]
3470    fn prune_int32_col_eq_zero() {
3471        let (schema, statistics) = int32_setup();
3472
3473        // Expression "i = 0"
3474        // i [-5, 5] ==> some rows could pass (must keep)
3475        // i [1, 11] ==> no rows can pass (not keep)
3476        // i [-11, -1] ==>  no rows can pass (not keep)
3477        // i [NULL, NULL]  ==> unknown (must keep)
3478        // i [1, NULL]  ==> no rows can pass (not keep)
3479        let expected_ret = &[true, false, false, true, false];
3480
3481        prune_with_expr(
3482            // i = 0
3483            col("i").eq(lit(0)),
3484            &schema,
3485            &statistics,
3486            expected_ret,
3487        );
3488    }
3489
3490    #[test]
3491    fn prune_int32_col_eq_zero_cast() {
3492        let (schema, statistics) = int32_setup();
3493
3494        // Expression "cast(i as int64) = 0"
3495        // i [-5, 5] ==> some rows could pass (must keep)
3496        // i [1, 11] ==> no rows can pass (not keep)
3497        // i [-11, -1] ==>  no rows can pass (not keep)
3498        // i [NULL, NULL]  ==> unknown (must keep)
3499        // i [1, NULL]  ==> no rows can pass (not keep)
3500        let expected_ret = &[true, false, false, true, false];
3501
3502        prune_with_expr(
3503            cast(col("i"), DataType::Int64).eq(lit(0i64)),
3504            &schema,
3505            &statistics,
3506            expected_ret,
3507        );
3508
3509        prune_with_expr(
3510            try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3511            &schema,
3512            &statistics,
3513            expected_ret,
3514        );
3515    }
3516
3517    #[test]
3518    fn prune_int32_col_eq_zero_cast_as_str() {
3519        let (schema, statistics) = int32_setup();
3520
3521        // Note the cast is to a string where sorting properties are
3522        // not the same as integers
3523        //
3524        // Expression "cast(i as utf8) = '0'"
3525        // i [-5, 5] ==> some rows could pass (keep)
3526        // i [1, 11] ==> no rows can pass  (could keep)
3527        // i [-11, -1] ==>  no rows can pass (could keep)
3528        // i [NULL, NULL]  ==> unknown (keep)
3529        // i [1, NULL]  ==> no rows can pass (could keep)
3530        let expected_ret = &[true, true, true, true, true];
3531
3532        prune_with_expr(
3533            cast(col("i"), DataType::Utf8).eq(lit("0")),
3534            &schema,
3535            &statistics,
3536            expected_ret,
3537        );
3538    }
3539
3540    #[test]
3541    fn prune_int32_col_lt_neg_one() {
3542        let (schema, statistics) = int32_setup();
3543
3544        // Expression "i > -1" and "-i < 1"
3545        // i [-5, 5] ==> some rows could pass (must keep)
3546        // i [1, 11] ==> all rows must pass (must keep)
3547        // i [-11, -1] ==>  no rows can pass (not keep)
3548        // i [NULL, NULL]  ==> unknown (must keep)
3549        // i [1, NULL]  ==> all rows must pass (must keep)
3550        let expected_ret = &[true, true, false, true, true];
3551
3552        prune_with_expr(
3553            // i > -1
3554            col("i").gt(lit(-1)),
3555            &schema,
3556            &statistics,
3557            expected_ret,
3558        );
3559
3560        prune_with_expr(
3561            // -i < 1
3562            Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3563            &schema,
3564            &statistics,
3565            expected_ret,
3566        );
3567    }
3568
3569    #[test]
3570    fn prune_int32_is_null() {
3571        let (schema, statistics) = int32_setup();
3572
3573        // Expression "i IS NULL" when there are no null statistics,
3574        // should all be kept
3575        let expected_ret = &[true, true, true, true, true];
3576
3577        prune_with_expr(
3578            // i IS NULL, no null statistics
3579            col("i").is_null(),
3580            &schema,
3581            &statistics,
3582            expected_ret,
3583        );
3584
3585        // provide null counts for each column
3586        let statistics = statistics.with_null_counts(
3587            "i",
3588            vec![
3589                Some(0), // no nulls (don't keep)
3590                Some(1), // 1 null
3591                None,    // unknown nulls
3592                None, // unknown nulls (min/max are both null too, like no stats at all)
3593                Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
3594            ],
3595        );
3596
3597        let expected_ret = &[false, true, true, true, false];
3598
3599        prune_with_expr(
3600            // i IS NULL, with actual null statistics
3601            col("i").is_null(),
3602            &schema,
3603            &statistics,
3604            expected_ret,
3605        );
3606    }
3607
3608    #[test]
3609    fn prune_int32_column_is_known_all_null() {
3610        let (schema, statistics) = int32_setup();
3611
3612        // Expression "i < 0"
3613        // i [-5, 5] ==> some rows could pass (must keep)
3614        // i [1, 11] ==> no rows can pass (not keep)
3615        // i [-11, -1] ==>  all rows must pass (must keep)
3616        // i [NULL, NULL]  ==> unknown (must keep)
3617        // i [1, NULL]  ==> no rows can pass (not keep)
3618        let expected_ret = &[true, false, true, true, false];
3619
3620        prune_with_expr(
3621            // i < 0
3622            col("i").lt(lit(0)),
3623            &schema,
3624            &statistics,
3625            expected_ret,
3626        );
3627
3628        // provide row counts for each column
3629        let statistics = statistics.with_row_counts(
3630            "i",
3631            vec![
3632                Some(10), // 10 rows of data
3633                Some(9),  // 9 rows of data
3634                None,     // unknown row counts
3635                Some(4),
3636                Some(10),
3637            ],
3638        );
3639
3640        // pruning result is still the same if we only know row counts
3641        prune_with_expr(
3642            // i < 0, with only row counts statistics
3643            col("i").lt(lit(0)),
3644            &schema,
3645            &statistics,
3646            expected_ret,
3647        );
3648
3649        // provide null counts for each column
3650        let statistics = statistics.with_null_counts(
3651            "i",
3652            vec![
3653                Some(0), // no nulls
3654                Some(1), // 1 null
3655                None,    // unknown nulls
3656                Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
3657                Some(0), // 0 nulls (max=null too which means no known max)
3658            ],
3659        );
3660
3661        // Expression "i < 0" with actual null and row counts statistics
3662        // col | min, max     | row counts | null counts |
3663        // ----+--------------+------------+-------------+
3664        //  i  | [-5, 5]      | 10         | 0           | ==> Some rows could pass (must keep)
3665        //  i  | [1, 11]      | 9          | 1           | ==> No rows can pass (not keep)
3666        //  i  | [-11,-1]     | Unknown    | Unknown     | ==> All rows must pass (must keep)
3667        //  i  | [NULL, NULL] | 4          | 4           | ==> The column is all null (not keep)
3668        //  i  | [1, NULL]    | 10         | 0           | ==> No rows can pass (not keep)
3669        let expected_ret = &[true, false, true, false, false];
3670
3671        prune_with_expr(
3672            // i < 0, with actual null and row counts statistics
3673            col("i").lt(lit(0)),
3674            &schema,
3675            &statistics,
3676            expected_ret,
3677        );
3678    }
3679
3680    #[test]
3681    fn prune_cast_column_scalar() {
3682        // The data type of column i is INT32
3683        let (schema, statistics) = int32_setup();
3684        let expected_ret = &[true, true, false, true, true];
3685
3686        prune_with_expr(
3687            // i > int64(0)
3688            col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3689            &schema,
3690            &statistics,
3691            expected_ret,
3692        );
3693
3694        prune_with_expr(
3695            // cast(i as int64) > int64(0)
3696            cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3697            &schema,
3698            &statistics,
3699            expected_ret,
3700        );
3701
3702        prune_with_expr(
3703            // try_cast(i as int64) > int64(0)
3704            try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3705            &schema,
3706            &statistics,
3707            expected_ret,
3708        );
3709
3710        prune_with_expr(
3711            // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
3712            Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3713                .lt(lit(ScalarValue::Int64(Some(0)))),
3714            &schema,
3715            &statistics,
3716            expected_ret,
3717        );
3718    }
3719
3720    #[test]
3721    fn test_increment_utf8() {
3722        // Basic ASCII
3723        assert_eq!(increment_utf8("abc").unwrap(), "abd");
3724        assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3725
3726        // Test around ASCII 127 (DEL)
3727        assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
3728        assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
3729
3730        // Test 2-byte UTF-8 sequences
3731        assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
3732
3733        // Test 3-byte UTF-8 sequences
3734        assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
3735
3736        // Test at UTF-8 boundaries
3737        assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
3738        assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
3739
3740        // Test that if we can't increment we return None
3741        assert!(increment_utf8("").is_none());
3742        assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
3743
3744        // Test that if we can't increment the last character we do the previous one and truncate
3745        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3746
3747        // Test surrogate pair range (0xD800..=0xDFFF)
3748        assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
3749        assert!(increment_utf8("\u{D7FF}").is_none());
3750
3751        // Test non-characters range (0xFDD0..=0xFDEF)
3752        assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
3753        assert!(increment_utf8("\u{FDCF}").is_none());
3754
3755        // Test private use area limit (>= 0x110000)
3756        assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3757        assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
3758    }
3759
3760    /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
3761    /// with 5 different containers (e.g. RowGroups). They have [min,
3762    /// max]:
3763    /// s1 ["A", "Z"]
3764    /// s1 ["A", "L"]
3765    /// s1 ["N", "Z"]
3766    /// s1 [NULL, NULL]
3767    /// s1 ["A", NULL]
3768    /// s1 ["", "A"]
3769    /// s1 ["", ""]
3770    /// s1 ["AB", "A\u{10ffff}"]
3771    /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
3772    fn utf8_setup() -> (SchemaRef, TestStatistics) {
3773        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3774
3775        let statistics = TestStatistics::new().with(
3776            "s1",
3777            ContainerStats::new_utf8(
3778                vec![
3779                    Some("A"),
3780                    Some("A"),
3781                    Some("N"),
3782                    Some("M"),
3783                    None,
3784                    Some("A"),
3785                    Some(""),
3786                    Some(""),
3787                    Some("AB"),
3788                    Some("A\u{10ffff}\u{10ffff}"),
3789                ], // min
3790                vec![
3791                    Some("Z"),
3792                    Some("L"),
3793                    Some("Z"),
3794                    Some("M"),
3795                    None,
3796                    None,
3797                    Some("A"),
3798                    Some(""),
3799                    Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
3800                    Some("A\u{10ffff}\u{10ffff}"),
3801                ], // max
3802            ),
3803        );
3804        (schema, statistics)
3805    }
3806
3807    #[test]
3808    fn prune_utf8_eq() {
3809        let (schema, statistics) = utf8_setup();
3810
3811        let expr = col("s1").eq(lit("A"));
3812        #[rustfmt::skip]
3813        let expected_ret = &[
3814            // s1 ["A", "Z"] ==> some rows could pass (must keep)
3815            true,
3816            // s1 ["A", "L"] ==> some rows could pass (must keep)
3817            true,
3818            // s1 ["N", "Z"] ==> no rows can pass (not keep)
3819            false,
3820            // s1 ["M", "M"] ==> no rows can pass (not keep)
3821            false,
3822            // s1 [NULL, NULL]  ==> unknown (must keep)
3823            true,
3824            // s1 ["A", NULL]  ==> unknown (must keep)
3825            true,
3826            // s1 ["", "A"]  ==> some rows could pass (must keep)
3827            true,
3828            // s1 ["", ""]  ==> no rows can pass (not keep)
3829            false,
3830            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
3831            false,
3832            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
3833            false,
3834        ];
3835        prune_with_expr(expr, &schema, &statistics, expected_ret);
3836
3837        let expr = col("s1").eq(lit(""));
3838        #[rustfmt::skip]
3839        let expected_ret = &[
3840            // s1 ["A", "Z"] ==> no rows can pass (not keep)
3841            false,
3842            // s1 ["A", "L"] ==> no rows can pass (not keep)
3843            false,
3844            // s1 ["N", "Z"] ==> no rows can pass (not keep)
3845            false,
3846            // s1 ["M", "M"] ==> no rows can pass (not keep)
3847            false,
3848            // s1 [NULL, NULL]  ==> unknown (must keep)
3849            true,
3850            // s1 ["A", NULL]  ==> no rows can pass (not keep)
3851            false,
3852            // s1 ["", "A"]  ==> some rows could pass (must keep)
3853            true,
3854            // s1 ["", ""]  ==> all rows must pass (must keep)
3855            true,
3856            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
3857            false,
3858            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
3859            false,
3860        ];
3861        prune_with_expr(expr, &schema, &statistics, expected_ret);
3862    }
3863
3864    #[test]
3865    fn prune_utf8_not_eq() {
3866        let (schema, statistics) = utf8_setup();
3867
3868        let expr = col("s1").not_eq(lit("A"));
3869        #[rustfmt::skip]
3870        let expected_ret = &[
3871            // s1 ["A", "Z"] ==> some rows could pass (must keep)
3872            true,
3873            // s1 ["A", "L"] ==> some rows could pass (must keep)
3874            true,
3875            // s1 ["N", "Z"] ==> all rows must pass (must keep)
3876            true,
3877            // s1 ["M", "M"] ==> all rows must pass (must keep)
3878            true,
3879            // s1 [NULL, NULL]  ==> unknown (must keep)
3880            true,
3881            // s1 ["A", NULL]  ==> unknown (must keep)
3882            true,
3883            // s1 ["", "A"]  ==> some rows could pass (must keep)
3884            true,
3885            // s1 ["", ""]  ==> all rows must pass (must keep)
3886            true,
3887            // s1 ["AB", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
3888            true,
3889            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
3890            true,
3891        ];
3892        prune_with_expr(expr, &schema, &statistics, expected_ret);
3893
3894        let expr = col("s1").not_eq(lit(""));
3895        #[rustfmt::skip]
3896        let expected_ret = &[
3897            // s1 ["A", "Z"] ==> all rows must pass (must keep)
3898            true,
3899            // s1 ["A", "L"] ==> all rows must pass (must keep)
3900            true,
3901            // s1 ["N", "Z"] ==> all rows must pass (must keep)
3902            true,
3903            // s1 ["M", "M"] ==> all rows must pass (must keep)
3904            true,
3905            // s1 [NULL, NULL]  ==> unknown (must keep)
3906            true,
3907            // s1 ["A", NULL]  ==> unknown (must keep)
3908            true,
3909            // s1 ["", "A"]  ==> some rows could pass (must keep)
3910            true,
3911            // s1 ["", ""]  ==> no rows can pass (not keep)
3912            false,
3913            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
3914            true,
3915            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
3916            true,
3917        ];
3918        prune_with_expr(expr, &schema, &statistics, expected_ret);
3919    }
3920
3921    #[test]
3922    fn prune_utf8_like_one() {
3923        let (schema, statistics) = utf8_setup();
3924
3925        let expr = col("s1").like(lit("A_"));
3926        #[rustfmt::skip]
3927        let expected_ret = &[
3928            // s1 ["A", "Z"] ==> some rows could pass (must keep)
3929            true,
3930            // s1 ["A", "L"] ==> some rows could pass (must keep)
3931            true,
3932            // s1 ["N", "Z"] ==> no rows can pass (not keep)
3933            false,
3934            // s1 ["M", "M"] ==> no rows can pass (not keep)
3935            false,
3936            // s1 [NULL, NULL]  ==> unknown (must keep)
3937            true,
3938            // s1 ["A", NULL]  ==> unknown (must keep)
3939            true,
3940            // s1 ["", "A"]  ==> some rows could pass (must keep)
3941            true,
3942            // s1 ["", ""]  ==> no rows can pass (not keep)
3943            false,
3944            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
3945            true,
3946            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
3947            true,
3948        ];
3949        prune_with_expr(expr, &schema, &statistics, expected_ret);
3950
3951        let expr = col("s1").like(lit("_A_"));
3952        #[rustfmt::skip]
3953        let expected_ret = &[
3954            // s1 ["A", "Z"] ==> some rows could pass (must keep)
3955            true,
3956            // s1 ["A", "L"] ==> some rows could pass (must keep)
3957            true,
3958            // s1 ["N", "Z"] ==> some rows could pass (must keep)
3959            true,
3960            // s1 ["M", "M"] ==> some rows could pass (must keep)
3961            true,
3962            // s1 [NULL, NULL]  ==> unknown (must keep)
3963            true,
3964            // s1 ["A", NULL]  ==> unknown (must keep)
3965            true,
3966            // s1 ["", "A"]  ==> some rows could pass (must keep)
3967            true,
3968            // s1 ["", ""]  ==> some rows could pass (must keep)
3969            true,
3970            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
3971            true,
3972            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
3973            true,
3974        ];
3975        prune_with_expr(expr, &schema, &statistics, expected_ret);
3976
3977        let expr = col("s1").like(lit("_"));
3978        #[rustfmt::skip]
3979        let expected_ret = &[
3980            // s1 ["A", "Z"] ==> all rows must pass (must keep)
3981            true,
3982            // s1 ["A", "L"] ==> all rows must pass (must keep)
3983            true,
3984            // s1 ["N", "Z"] ==> all rows must pass (must keep)
3985            true,
3986            // s1 ["M", "M"] ==> all rows must pass (must keep)
3987            true,
3988            // s1 [NULL, NULL]  ==> unknown (must keep)
3989            true,
3990            // s1 ["A", NULL]  ==> unknown (must keep)
3991            true,
3992            // s1 ["", "A"]  ==> all rows must pass (must keep)
3993            true,
3994            // s1 ["", ""]  ==> all rows must pass (must keep)
3995            true,
3996            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
3997            true,
3998            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
3999            true,
4000        ];
4001        prune_with_expr(expr, &schema, &statistics, expected_ret);
4002
4003        let expr = col("s1").like(lit(""));
4004        #[rustfmt::skip]
4005        let expected_ret = &[
4006            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4007            false,
4008            // s1 ["A", "L"] ==> no rows can pass (not keep)
4009            false,
4010            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4011            false,
4012            // s1 ["M", "M"] ==> no rows can pass (not keep)
4013            false,
4014            // s1 [NULL, NULL]  ==> unknown (must keep)
4015            true,
4016            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4017            false,
4018            // s1 ["", "A"]  ==> some rows could pass (must keep)
4019            true,
4020            // s1 ["", ""]  ==> all rows must pass (must keep)
4021            true,
4022            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4023            false,
4024            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4025            false,
4026        ];
4027        prune_with_expr(expr, &schema, &statistics, expected_ret);
4028    }
4029
4030    #[test]
4031    fn prune_utf8_like_many() {
4032        let (schema, statistics) = utf8_setup();
4033
4034        let expr = col("s1").like(lit("A%"));
4035        #[rustfmt::skip]
4036        let expected_ret = &[
4037            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4038            true,
4039            // s1 ["A", "L"] ==> some rows could pass (must keep)
4040            true,
4041            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4042            false,
4043            // s1 ["M", "M"] ==> no rows can pass (not keep)
4044            false,
4045            // s1 [NULL, NULL]  ==> unknown (must keep)
4046            true,
4047            // s1 ["A", NULL]  ==> unknown (must keep)
4048            true,
4049            // s1 ["", "A"]  ==> some rows could pass (must keep)
4050            true,
4051            // s1 ["", ""]  ==> no rows can pass (not keep)
4052            false,
4053            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4054            true,
4055            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4056            true,
4057        ];
4058        prune_with_expr(expr, &schema, &statistics, expected_ret);
4059
4060        let expr = col("s1").like(lit("%A%"));
4061        #[rustfmt::skip]
4062        let expected_ret = &[
4063            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4064            true,
4065            // s1 ["A", "L"] ==> some rows could pass (must keep)
4066            true,
4067            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4068            true,
4069            // s1 ["M", "M"] ==> some rows could pass (must keep)
4070            true,
4071            // s1 [NULL, NULL]  ==> unknown (must keep)
4072            true,
4073            // s1 ["A", NULL]  ==> unknown (must keep)
4074            true,
4075            // s1 ["", "A"]  ==> some rows could pass (must keep)
4076            true,
4077            // s1 ["", ""]  ==> some rows could pass (must keep)
4078            true,
4079            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4080            true,
4081            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4082            true,
4083        ];
4084        prune_with_expr(expr, &schema, &statistics, expected_ret);
4085
4086        let expr = col("s1").like(lit("%"));
4087        #[rustfmt::skip]
4088        let expected_ret = &[
4089            // s1 ["A", "Z"] ==> all rows must pass (must keep)
4090            true,
4091            // s1 ["A", "L"] ==> all rows must pass (must keep)
4092            true,
4093            // s1 ["N", "Z"] ==> all rows must pass (must keep)
4094            true,
4095            // s1 ["M", "M"] ==> all rows must pass (must keep)
4096            true,
4097            // s1 [NULL, NULL]  ==> unknown (must keep)
4098            true,
4099            // s1 ["A", NULL]  ==> unknown (must keep)
4100            true,
4101            // s1 ["", "A"]  ==> all rows must pass (must keep)
4102            true,
4103            // s1 ["", ""]  ==> all rows must pass (must keep)
4104            true,
4105            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4106            true,
4107            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> all rows must pass (must keep)
4108            true,
4109        ];
4110        prune_with_expr(expr, &schema, &statistics, expected_ret);
4111
4112        let expr = col("s1").like(lit(""));
4113        #[rustfmt::skip]
4114        let expected_ret = &[
4115            // s1 ["A", "Z"] ==> no rows can pass (not keep)
4116            false,
4117            // s1 ["A", "L"] ==> no rows can pass (not keep)
4118            false,
4119            // s1 ["N", "Z"] ==> no rows can pass (not keep)
4120            false,
4121            // s1 ["M", "M"] ==> no rows can pass (not keep)
4122            false,
4123            // s1 [NULL, NULL]  ==> unknown (must keep)
4124            true,
4125            // s1 ["A", NULL]  ==> no rows can pass (not keep)
4126            false,
4127            // s1 ["", "A"]  ==> some rows could pass (must keep)
4128            true,
4129            // s1 ["", ""]  ==> all rows must pass (must keep)
4130            true,
4131            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4132            false,
4133            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no rows can pass (not keep)
4134            false,
4135        ];
4136        prune_with_expr(expr, &schema, &statistics, expected_ret);
4137    }
4138
4139    #[test]
4140    fn prune_utf8_not_like_one() {
4141        let (schema, statistics) = utf8_setup();
4142
4143        let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4144        #[rustfmt::skip]
4145        let expected_ret = &[
4146            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4147            true,
4148            // s1 ["A", "L"] ==> some rows could pass (must keep)
4149            true,
4150            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4151            true,
4152            // s1 ["M", "M"] ==> some rows could pass (must keep)
4153            true,
4154            // s1 [NULL, NULL]  ==> unknown (must keep)
4155            true,
4156            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4157            true,
4158            // s1 ["", "A"]  ==> some rows could pass (must keep)
4159            true,
4160            // s1 ["", ""]  ==> some rows could pass (must keep)
4161            true,
4162            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4163            true,
4164            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match. (min, max) maybe truncate 
4165            // orignal (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
4166            true,
4167        ];
4168        prune_with_expr(expr, &schema, &statistics, expected_ret);
4169    }
4170
4171    #[test]
4172    fn prune_utf8_not_like_many() {
4173        let (schema, statistics) = utf8_setup();
4174
4175        let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4176        #[rustfmt::skip]
4177        let expected_ret = &[
4178            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4179            true,
4180            // s1 ["A", "L"] ==> some rows could pass (must keep)
4181            true,
4182            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4183            true,
4184            // s1 ["M", "M"] ==> some rows could pass (must keep)
4185            true,
4186            // s1 [NULL, NULL]  ==> unknown (must keep)
4187            true,
4188            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4189            true,
4190            // s1 ["", "A"]  ==> some rows could pass (must keep)
4191            true,
4192            // s1 ["", ""]  ==> some rows could pass (must keep)
4193            true,
4194            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4195            true,
4196            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> no row match
4197            false,
4198        ];
4199        prune_with_expr(expr, &schema, &statistics, expected_ret);
4200
4201        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4202        #[rustfmt::skip]
4203        let expected_ret = &[
4204            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4205            true,
4206            // s1 ["A", "L"] ==> some rows could pass (must keep)
4207            true,
4208            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4209            true,
4210            // s1 ["M", "M"] ==> some rows could pass (must keep)
4211            true,
4212            // s1 [NULL, NULL]  ==> unknown (must keep)
4213            true,
4214            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4215            true,
4216            // s1 ["", "A"]  ==> some rows could pass (must keep)
4217            true,
4218            // s1 ["", ""]  ==> some rows could pass (must keep)
4219            true,
4220            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4221            true,
4222            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4223            true,
4224        ];
4225        prune_with_expr(expr, &schema, &statistics, expected_ret);
4226
4227        let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4228        #[rustfmt::skip]
4229        let expected_ret = &[
4230            // s1 ["A", "Z"] ==> some rows could pass (must keep)
4231            true,
4232            // s1 ["A", "L"] ==> some rows could pass (must keep)
4233            true,
4234            // s1 ["N", "Z"] ==> some rows could pass (must keep)
4235            true,
4236            // s1 ["M", "M"] ==> some rows could pass (must keep)
4237            true,
4238            // s1 [NULL, NULL]  ==> unknown (must keep)
4239            true,
4240            // s1 ["A", NULL]  ==> some rows could pass (must keep)
4241            true,
4242            // s1 ["", "A"]  ==> some rows could pass (must keep)
4243            true,
4244            // s1 ["", ""]  ==> some rows could pass (must keep)
4245            true,
4246            // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4247            true,
4248            // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]  ==> some rows could pass (must keep)
4249            true,
4250        ];
4251        prune_with_expr(expr, &schema, &statistics, expected_ret);
4252
4253        let expr = col("s1").not_like(lit("A\\%%"));
4254        let statistics = TestStatistics::new().with(
4255            "s1",
4256            ContainerStats::new_utf8(
4257                vec![Some("A%a"), Some("A")],
4258                vec![Some("A%c"), Some("A")],
4259            ),
4260        );
4261        let expected_ret = &[false, true];
4262        prune_with_expr(expr, &schema, &statistics, expected_ret);
4263    }
4264
4265    #[test]
4266    fn test_rewrite_expr_to_prunable() {
4267        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4268        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4269
4270        // column op lit
4271        let left_input = col("a");
4272        let left_input = logical2physical(&left_input, &schema);
4273        let right_input = lit(ScalarValue::Int32(Some(12)));
4274        let right_input = logical2physical(&right_input, &schema);
4275        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4276            &left_input,
4277            Operator::Eq,
4278            &right_input,
4279            df_schema.clone(),
4280        )
4281        .unwrap();
4282        assert_eq!(result_left.to_string(), left_input.to_string());
4283        assert_eq!(result_right.to_string(), right_input.to_string());
4284
4285        // cast op lit
4286        let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4287        let left_input = logical2physical(&left_input, &schema);
4288        let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4289        let right_input = logical2physical(&right_input, &schema);
4290        let (result_left, _, result_right) = rewrite_expr_to_prunable(
4291            &left_input,
4292            Operator::Gt,
4293            &right_input,
4294            df_schema.clone(),
4295        )
4296        .unwrap();
4297        assert_eq!(result_left.to_string(), left_input.to_string());
4298        assert_eq!(result_right.to_string(), right_input.to_string());
4299
4300        // try_cast op lit
4301        let left_input = try_cast(col("a"), DataType::Int64);
4302        let left_input = logical2physical(&left_input, &schema);
4303        let right_input = lit(ScalarValue::Int64(Some(12)));
4304        let right_input = logical2physical(&right_input, &schema);
4305        let (result_left, _, result_right) =
4306            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4307                .unwrap();
4308        assert_eq!(result_left.to_string(), left_input.to_string());
4309        assert_eq!(result_right.to_string(), right_input.to_string());
4310
4311        // TODO: add test for other case and op
4312    }
4313
4314    #[test]
4315    fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4316        struct CustomUnhandledHook;
4317
4318        impl UnhandledPredicateHook for CustomUnhandledHook {
4319            /// This handles an arbitrary case of a column that doesn't exist in the schema
4320            /// by renaming it to yet another column that doesn't exist in the schema
4321            /// (the transformation is arbitrary, the point is that it can do whatever it wants)
4322            fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4323                Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4324            }
4325        }
4326
4327        let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4328        let schema_with_b = Schema::new(vec![
4329            Field::new("a", DataType::Int32, true),
4330            Field::new("b", DataType::Int32, true),
4331        ]);
4332
4333        let rewriter = PredicateRewriter::new()
4334            .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4335
4336        let transform_expr = |expr| {
4337            let expr = logical2physical(&expr, &schema_with_b);
4338            rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4339        };
4340
4341        // transform an arbitrary valid expression that we know is handled
4342        let known_expression = col("a").eq(lit(12));
4343        let known_expression_transformed = PredicateRewriter::new()
4344            .rewrite_predicate_to_statistics_predicate(
4345                &logical2physical(&known_expression, &schema),
4346                &schema,
4347            );
4348
4349        // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
4350        let input = col("b").eq(lit(12));
4351        let expected = logical2physical(&lit(42), &schema);
4352        let transformed = transform_expr(input.clone());
4353        assert_eq!(transformed.to_string(), expected.to_string());
4354
4355        // more complex case with unknown column
4356        let input = known_expression.clone().and(input.clone());
4357        let expected = phys_expr::BinaryExpr::new(
4358            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4359            Operator::And,
4360            logical2physical(&lit(42), &schema),
4361        );
4362        let transformed = transform_expr(input.clone());
4363        assert_eq!(transformed.to_string(), expected.to_string());
4364
4365        // an unknown expression gets passed to the hook
4366        let input = array_has(make_array(vec![lit(1)]), col("a"));
4367        let expected = logical2physical(&lit(42), &schema);
4368        let transformed = transform_expr(input.clone());
4369        assert_eq!(transformed.to_string(), expected.to_string());
4370
4371        // more complex case with unknown expression
4372        let input = known_expression.and(input);
4373        let expected = phys_expr::BinaryExpr::new(
4374            Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4375            Operator::And,
4376            logical2physical(&lit(42), &schema),
4377        );
4378        let transformed = transform_expr(input.clone());
4379        assert_eq!(transformed.to_string(), expected.to_string());
4380    }
4381
4382    #[test]
4383    fn test_rewrite_expr_to_prunable_error() {
4384        // cast string value to numeric value
4385        // this cast is not supported
4386        let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4387        let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4388        let left_input = cast(col("a"), DataType::Int64);
4389        let left_input = logical2physical(&left_input, &schema);
4390        let right_input = lit(ScalarValue::Int64(Some(12)));
4391        let right_input = logical2physical(&right_input, &schema);
4392        let result = rewrite_expr_to_prunable(
4393            &left_input,
4394            Operator::Gt,
4395            &right_input,
4396            df_schema.clone(),
4397        );
4398        assert!(result.is_err());
4399
4400        // other expr
4401        let left_input = is_null(col("a"));
4402        let left_input = logical2physical(&left_input, &schema);
4403        let right_input = lit(ScalarValue::Int64(Some(12)));
4404        let right_input = logical2physical(&right_input, &schema);
4405        let result =
4406            rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4407        assert!(result.is_err());
4408        // TODO: add other negative test for other case and op
4409    }
4410
4411    #[test]
4412    fn prune_with_contained_one_column() {
4413        let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4414
4415        // Model having information like a bloom filter for s1
4416        let statistics = TestStatistics::new()
4417            .with_contained(
4418                "s1",
4419                [ScalarValue::from("foo")],
4420                [
4421                    // container 0 known to only contain "foo"",
4422                    Some(true),
4423                    // container 1 known to not contain "foo"
4424                    Some(false),
4425                    // container 2 unknown about "foo"
4426                    None,
4427                    // container 3 known to only contain "foo"
4428                    Some(true),
4429                    // container 4 known to not contain "foo"
4430                    Some(false),
4431                    // container 5 unknown about "foo"
4432                    None,
4433                    // container 6 known to only contain "foo"
4434                    Some(true),
4435                    // container 7 known to not contain "foo"
4436                    Some(false),
4437                    // container 8 unknown about "foo"
4438                    None,
4439                ],
4440            )
4441            .with_contained(
4442                "s1",
4443                [ScalarValue::from("bar")],
4444                [
4445                    // containers 0,1,2 known to only contain "bar"
4446                    Some(true),
4447                    Some(true),
4448                    Some(true),
4449                    // container 3,4,5 known to not contain "bar"
4450                    Some(false),
4451                    Some(false),
4452                    Some(false),
4453                    // container 6,7,8 unknown about "bar"
4454                    None,
4455                    None,
4456                    None,
4457                ],
4458            )
4459            .with_contained(
4460                // the way the tests are setup, this data is
4461                // consulted if the "foo" and "bar" are being checked at the same time
4462                "s1",
4463                [ScalarValue::from("foo"), ScalarValue::from("bar")],
4464                [
4465                    // container 0,1,2 unknown about ("foo, "bar")
4466                    None,
4467                    None,
4468                    None,
4469                    // container 3,4,5 known to contain only either "foo" and "bar"
4470                    Some(true),
4471                    Some(true),
4472                    Some(true),
4473                    // container 6,7,8  known to contain  neither "foo" and "bar"
4474                    Some(false),
4475                    Some(false),
4476                    Some(false),
4477                ],
4478            );
4479
4480        // s1 = 'foo'
4481        prune_with_expr(
4482            col("s1").eq(lit("foo")),
4483            &schema,
4484            &statistics,
4485            // rule out containers ('false) where we know foo is not present
4486            &[true, false, true, true, false, true, true, false, true],
4487        );
4488
4489        // s1 = 'bar'
4490        prune_with_expr(
4491            col("s1").eq(lit("bar")),
4492            &schema,
4493            &statistics,
4494            // rule out containers where we know bar is not present
4495            &[true, true, true, false, false, false, true, true, true],
4496        );
4497
4498        // s1 = 'baz' (unknown value)
4499        prune_with_expr(
4500            col("s1").eq(lit("baz")),
4501            &schema,
4502            &statistics,
4503            // can't rule out anything
4504            &[true, true, true, true, true, true, true, true, true],
4505        );
4506
4507        // s1 = 'foo' AND s1 = 'bar'
4508        prune_with_expr(
4509            col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4510            &schema,
4511            &statistics,
4512            // logically this predicate can't possibly be true (the column can't
4513            // take on both values) but we could rule it out if the stats tell
4514            // us that both values are not present
4515            &[true, true, true, true, true, true, true, true, true],
4516        );
4517
4518        // s1 = 'foo' OR s1 = 'bar'
4519        prune_with_expr(
4520            col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4521            &schema,
4522            &statistics,
4523            // can rule out containers that we know contain neither foo nor bar
4524            &[true, true, true, true, true, true, false, false, false],
4525        );
4526
4527        // s1 = 'foo' OR s1 = 'baz'
4528        prune_with_expr(
4529            col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4530            &schema,
4531            &statistics,
4532            // can't rule out anything container
4533            &[true, true, true, true, true, true, true, true, true],
4534        );
4535
4536        // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
4537        prune_with_expr(
4538            col("s1")
4539                .eq(lit("foo"))
4540                .or(col("s1").eq(lit("bar")))
4541                .or(col("s1").eq(lit("baz"))),
4542            &schema,
4543            &statistics,
4544            // can rule out any containers based on knowledge of s1 and `foo`,
4545            // `bar` and (`foo`, `bar`)
4546            &[true, true, true, true, true, true, true, true, true],
4547        );
4548
4549        // s1 != foo
4550        prune_with_expr(
4551            col("s1").not_eq(lit("foo")),
4552            &schema,
4553            &statistics,
4554            // rule out containers we know for sure only contain foo
4555            &[false, true, true, false, true, true, false, true, true],
4556        );
4557
4558        // s1 != bar
4559        prune_with_expr(
4560            col("s1").not_eq(lit("bar")),
4561            &schema,
4562            &statistics,
4563            // rule out when we know for sure s1 has the value bar
4564            &[false, false, false, true, true, true, true, true, true],
4565        );
4566
4567        // s1 != foo AND s1 != bar
4568        prune_with_expr(
4569            col("s1")
4570                .not_eq(lit("foo"))
4571                .and(col("s1").not_eq(lit("bar"))),
4572            &schema,
4573            &statistics,
4574            // can rule out any container where we know s1 does not have either 'foo' or 'bar'
4575            &[true, true, true, false, false, false, true, true, true],
4576        );
4577
4578        // s1 != foo AND s1 != bar AND s1 != baz
4579        prune_with_expr(
4580            col("s1")
4581                .not_eq(lit("foo"))
4582                .and(col("s1").not_eq(lit("bar")))
4583                .and(col("s1").not_eq(lit("baz"))),
4584            &schema,
4585            &statistics,
4586            // can't rule out any container based on  knowledge of s1,s2
4587            &[true, true, true, true, true, true, true, true, true],
4588        );
4589
4590        // s1 != foo OR s1 != bar
4591        prune_with_expr(
4592            col("s1")
4593                .not_eq(lit("foo"))
4594                .or(col("s1").not_eq(lit("bar"))),
4595            &schema,
4596            &statistics,
4597            // cant' rule out anything based on contains information
4598            &[true, true, true, true, true, true, true, true, true],
4599        );
4600
4601        // s1 != foo OR s1 != bar OR s1 != baz
4602        prune_with_expr(
4603            col("s1")
4604                .not_eq(lit("foo"))
4605                .or(col("s1").not_eq(lit("bar")))
4606                .or(col("s1").not_eq(lit("baz"))),
4607            &schema,
4608            &statistics,
4609            // cant' rule out anything based on contains information
4610            &[true, true, true, true, true, true, true, true, true],
4611        );
4612    }
4613
4614    #[test]
4615    fn prune_with_contained_two_columns() {
4616        let schema = Arc::new(Schema::new(vec![
4617            Field::new("s1", DataType::Utf8, true),
4618            Field::new("s2", DataType::Utf8, true),
4619        ]));
4620
4621        // Model having information like bloom filters for s1 and s2
4622        let statistics = TestStatistics::new()
4623            .with_contained(
4624                "s1",
4625                [ScalarValue::from("foo")],
4626                [
4627                    // container 0, s1 known to only contain "foo"",
4628                    Some(true),
4629                    // container 1, s1 known to not contain "foo"
4630                    Some(false),
4631                    // container 2, s1 unknown about "foo"
4632                    None,
4633                    // container 3, s1 known to only contain "foo"
4634                    Some(true),
4635                    // container 4, s1 known to not contain "foo"
4636                    Some(false),
4637                    // container 5, s1 unknown about "foo"
4638                    None,
4639                    // container 6, s1 known to only contain "foo"
4640                    Some(true),
4641                    // container 7, s1 known to not contain "foo"
4642                    Some(false),
4643                    // container 8, s1 unknown about "foo"
4644                    None,
4645                ],
4646            )
4647            .with_contained(
4648                "s2", // for column s2
4649                [ScalarValue::from("bar")],
4650                [
4651                    // containers 0,1,2 s2 known to only contain "bar"
4652                    Some(true),
4653                    Some(true),
4654                    Some(true),
4655                    // container 3,4,5 s2 known to not contain "bar"
4656                    Some(false),
4657                    Some(false),
4658                    Some(false),
4659                    // container 6,7,8 s2 unknown about "bar"
4660                    None,
4661                    None,
4662                    None,
4663                ],
4664            );
4665
4666        // s1 = 'foo'
4667        prune_with_expr(
4668            col("s1").eq(lit("foo")),
4669            &schema,
4670            &statistics,
4671            // rule out containers where we know s1 is not present
4672            &[true, false, true, true, false, true, true, false, true],
4673        );
4674
4675        // s1 = 'foo' OR s2 = 'bar'
4676        let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4677        prune_with_expr(
4678            expr,
4679            &schema,
4680            &statistics,
4681            //  can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
4682            &[true, true, true, true, true, true, true, true, true],
4683        );
4684
4685        // s1 = 'foo' AND s2 != 'bar'
4686        prune_with_expr(
4687            col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4688            &schema,
4689            &statistics,
4690            // can only rule out container where we know either:
4691            // 1. s1 doesn't have the value 'foo` or
4692            // 2. s2 has only the value of 'bar'
4693            &[false, false, false, true, false, true, true, false, true],
4694        );
4695
4696        // s1 != 'foo' AND s2 != 'bar'
4697        prune_with_expr(
4698            col("s1")
4699                .not_eq(lit("foo"))
4700                .and(col("s2").not_eq(lit("bar"))),
4701            &schema,
4702            &statistics,
4703            // Can  rule out any container where we know either
4704            // 1. s1 has only the value 'foo'
4705            // 2. s2 has only the value 'bar'
4706            &[false, false, false, false, true, true, false, true, true],
4707        );
4708
4709        // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
4710        prune_with_expr(
4711            col("s1")
4712                .not_eq(lit("foo"))
4713                .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4714            &schema,
4715            &statistics,
4716            // Can rule out any container where we know s1 has only the value
4717            // 'foo'. Can't use knowledge of s2 and bar to rule out anything
4718            &[false, true, true, false, true, true, false, true, true],
4719        );
4720
4721        // s1 like '%foo%bar%'
4722        prune_with_expr(
4723            col("s1").like(lit("foo%bar%")),
4724            &schema,
4725            &statistics,
4726            // cant rule out anything with information we know
4727            &[true, true, true, true, true, true, true, true, true],
4728        );
4729
4730        // s1 like '%foo%bar%' AND s2 = 'bar'
4731        prune_with_expr(
4732            col("s1")
4733                .like(lit("foo%bar%"))
4734                .and(col("s2").eq(lit("bar"))),
4735            &schema,
4736            &statistics,
4737            // can rule out any container where we know s2 does not have the value 'bar'
4738            &[true, true, true, false, false, false, true, true, true],
4739        );
4740
4741        // s1 like '%foo%bar%' OR s2 = 'bar'
4742        prune_with_expr(
4743            col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
4744            &schema,
4745            &statistics,
4746            // can't rule out anything (we would have to prove that both the
4747            // like and the equality must be false)
4748            &[true, true, true, true, true, true, true, true, true],
4749        );
4750    }
4751
4752    #[test]
4753    fn prune_with_range_and_contained() {
4754        // Setup mimics range information for i, a bloom filter for s
4755        let schema = Arc::new(Schema::new(vec![
4756            Field::new("i", DataType::Int32, true),
4757            Field::new("s", DataType::Utf8, true),
4758        ]));
4759
4760        let statistics = TestStatistics::new()
4761            .with(
4762                "i",
4763                ContainerStats::new_i32(
4764                    // Container 0, 3, 6: [-5 to 5]
4765                    // Container 1, 4, 7: [10 to 20]
4766                    // Container 2, 5, 9: unknown
4767                    vec![
4768                        Some(-5),
4769                        Some(10),
4770                        None,
4771                        Some(-5),
4772                        Some(10),
4773                        None,
4774                        Some(-5),
4775                        Some(10),
4776                        None,
4777                    ], // min
4778                    vec![
4779                        Some(5),
4780                        Some(20),
4781                        None,
4782                        Some(5),
4783                        Some(20),
4784                        None,
4785                        Some(5),
4786                        Some(20),
4787                        None,
4788                    ], // max
4789                ),
4790            )
4791            // Add contained  information about the s and "foo"
4792            .with_contained(
4793                "s",
4794                [ScalarValue::from("foo")],
4795                [
4796                    // container 0,1,2 known to only contain "foo"
4797                    Some(true),
4798                    Some(true),
4799                    Some(true),
4800                    // container 3,4,5 known to not contain "foo"
4801                    Some(false),
4802                    Some(false),
4803                    Some(false),
4804                    // container 6,7,8 unknown about "foo"
4805                    None,
4806                    None,
4807                    None,
4808                ],
4809            );
4810
4811        // i = 0 and s = 'foo'
4812        prune_with_expr(
4813            col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
4814            &schema,
4815            &statistics,
4816            // Can rule out container where we know that either:
4817            // 1. 0 is outside the min/max range of i
4818            // 1. s does not contain foo
4819            // (range is false, and contained  is false)
4820            &[true, false, true, false, false, false, true, false, true],
4821        );
4822
4823        // i = 0 and s != 'foo'
4824        prune_with_expr(
4825            col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
4826            &schema,
4827            &statistics,
4828            // Can rule out containers where either:
4829            // 1. 0 is outside the min/max range of i
4830            // 2. s only contains foo
4831            &[false, false, false, true, false, true, true, false, true],
4832        );
4833
4834        // i = 0 OR s = 'foo'
4835        prune_with_expr(
4836            col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
4837            &schema,
4838            &statistics,
4839            // in theory could rule out containers if we had min/max values for
4840            // s as well. But in this case we don't so we can't rule out anything
4841            &[true, true, true, true, true, true, true, true, true],
4842        );
4843    }
4844
4845    /// prunes the specified expr with the specified schema and statistics, and
4846    /// ensures it returns expected.
4847    ///
4848    /// `expected` is a vector of bools, where true means the row group should
4849    /// be kept, and false means it should be pruned.
4850    ///
4851    // TODO refactor other tests to use this to reduce boiler plate
4852    fn prune_with_expr(
4853        expr: Expr,
4854        schema: &SchemaRef,
4855        statistics: &TestStatistics,
4856        expected: &[bool],
4857    ) {
4858        println!("Pruning with expr: {}", expr);
4859        let expr = logical2physical(&expr, schema);
4860        let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
4861        let result = p.prune(statistics).unwrap();
4862        assert_eq!(result, expected);
4863    }
4864
4865    fn test_build_predicate_expression(
4866        expr: &Expr,
4867        schema: &Schema,
4868        required_columns: &mut RequiredColumns,
4869    ) -> Arc<dyn PhysicalExpr> {
4870        let expr = logical2physical(expr, schema);
4871        let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
4872        build_predicate_expression(&expr, schema, required_columns, &unhandled_hook)
4873    }
4874}