datafusion_physical_optimizer/pruning.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`PruningPredicate`] to apply filter [`Expr`] to prune "containers"
19//! based on statistics (e.g. Parquet Row Groups)
20//!
21//! [`Expr`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/enum.Expr.html
22use std::collections::HashSet;
23use std::sync::Arc;
24
25use arrow::array::AsArray;
26use arrow::{
27 array::{new_null_array, ArrayRef, BooleanArray},
28 datatypes::{DataType, Field, Schema, SchemaRef},
29 record_batch::{RecordBatch, RecordBatchOptions},
30};
31use log::trace;
32
33use datafusion_common::error::{DataFusionError, Result};
34use datafusion_common::tree_node::TransformedResult;
35use datafusion_common::{
36 internal_err, plan_datafusion_err, plan_err,
37 tree_node::{Transformed, TreeNode},
38 ScalarValue,
39};
40use datafusion_common::{Column, DFSchema};
41use datafusion_expr_common::operator::Operator;
42use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee};
43use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef};
44use datafusion_physical_plan::{ColumnarValue, PhysicalExpr};
45
46/// A source of runtime statistical information to [`PruningPredicate`]s.
47///
48/// # Supported Information
49///
50/// 1. Minimum and maximum values for columns
51///
52/// 2. Null counts and row counts for columns
53///
54/// 3. Whether the values in a column are contained in a set of literals
55///
56/// # Vectorized Interface
57///
58/// Information for containers / files are returned as Arrow [`ArrayRef`], so
59/// the evaluation happens once on a single `RecordBatch`, which amortizes the
60/// overhead of evaluating the predicate. This is important when pruning 1000s
61/// of containers which often happens in analytic systems that have 1000s of
62/// potential files to consider.
63///
64/// For example, for the following three files with a single column `a`:
65/// ```text
66/// file1: column a: min=5, max=10
67/// file2: column a: No stats
68/// file2: column a: min=20, max=30
69/// ```
70///
71/// PruningStatistics would return:
72///
73/// ```text
74/// min_values("a") -> Some([5, Null, 20])
75/// max_values("a") -> Some([10, Null, 30])
76/// min_values("X") -> None
77/// ```
78pub trait PruningStatistics {
79 /// Return the minimum values for the named column, if known.
80 ///
81 /// If the minimum value for a particular container is not known, the
82 /// returned array should have `null` in that row. If the minimum value is
83 /// not known for any row, return `None`.
84 ///
85 /// Note: the returned array must contain [`Self::num_containers`] rows
86 fn min_values(&self, column: &Column) -> Option<ArrayRef>;
87
88 /// Return the maximum values for the named column, if known.
89 ///
90 /// See [`Self::min_values`] for when to return `None` and null values.
91 ///
92 /// Note: the returned array must contain [`Self::num_containers`] rows
93 fn max_values(&self, column: &Column) -> Option<ArrayRef>;
94
95 /// Return the number of containers (e.g. Row Groups) being pruned with
96 /// these statistics.
97 ///
98 /// This value corresponds to the size of the [`ArrayRef`] returned by
99 /// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
100 /// and [`Self::row_counts`].
101 fn num_containers(&self) -> usize;
102
103 /// Return the number of null values for the named column as an
104 /// [`UInt64Array`]
105 ///
106 /// See [`Self::min_values`] for when to return `None` and null values.
107 ///
108 /// Note: the returned array must contain [`Self::num_containers`] rows
109 ///
110 /// [`UInt64Array`]: arrow::array::UInt64Array
111 fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
112
113 /// Return the number of rows for the named column in each container
114 /// as an [`UInt64Array`].
115 ///
116 /// See [`Self::min_values`] for when to return `None` and null values.
117 ///
118 /// Note: the returned array must contain [`Self::num_containers`] rows
119 ///
120 /// [`UInt64Array`]: arrow::array::UInt64Array
121 fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
122
123 /// Returns [`BooleanArray`] where each row represents information known
124 /// about specific literal `values` in a column.
125 ///
126 /// For example, Parquet Bloom Filters implement this API to communicate
127 /// that `values` are known not to be present in a Row Group.
128 ///
129 /// The returned array has one row for each container, with the following
130 /// meanings:
131 /// * `true` if the values in `column` ONLY contain values from `values`
132 /// * `false` if the values in `column` are NOT ANY of `values`
133 /// * `null` if the neither of the above holds or is unknown.
134 ///
135 /// If these statistics can not determine column membership for any
136 /// container, return `None` (the default).
137 ///
138 /// Note: the returned array must contain [`Self::num_containers`] rows
139 fn contained(
140 &self,
141 column: &Column,
142 values: &HashSet<ScalarValue>,
143 ) -> Option<BooleanArray>;
144}
145
146/// Used to prove that arbitrary predicates (boolean expression) can not
147/// possibly evaluate to `true` given information about a column provided by
148/// [`PruningStatistics`].
149///
150/// # Introduction
151///
152/// `PruningPredicate` analyzes filter expressions using statistics such as
153/// min/max values and null counts, attempting to prove a "container" (e.g.
154/// Parquet Row Group) can be skipped without reading the actual data,
155/// potentially leading to significant performance improvements.
156///
157/// For example, `PruningPredicate`s are used to prune Parquet Row Groups based
158/// on the min/max values found in the Parquet metadata. If the
159/// `PruningPredicate` can prove that the filter can never evaluate to `true`
160/// for any row in the Row Group, the entire Row Group is skipped during query
161/// execution.
162///
163/// The `PruningPredicate` API is general, and can be used for pruning other
164/// types of containers (e.g. files) based on statistics that may be known from
165/// external catalogs (e.g. Delta Lake) or other sources. How this works is a
166/// subtle topic. See the Background and Implementation section for details.
167///
168/// `PruningPredicate` supports:
169///
170/// 1. Arbitrary expressions (including user defined functions)
171///
172/// 2. Vectorized evaluation (provide more than one set of statistics at a time)
173/// so it is suitable for pruning 1000s of containers.
174///
175/// 3. Any source of information that implements the [`PruningStatistics`] trait
176/// (not just Parquet metadata).
177///
178/// # Example
179///
180/// See the [`pruning.rs` example in the `datafusion-examples`] for a complete
181/// example of how to use `PruningPredicate` to prune files based on min/max
182/// values.
183///
184/// [`pruning.rs` example in the `datafusion-examples`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/pruning.rs
185///
186/// Given an expression like `x = 5` and statistics for 3 containers (Row
187/// Groups, files, etc) `A`, `B`, and `C`:
188///
189/// ```text
190/// A: {x_min = 0, x_max = 4}
191/// B: {x_min = 2, x_max = 10}
192/// C: {x_min = 5, x_max = 8}
193/// ```
194///
195/// `PruningPredicate` will conclude that the rows in container `A` can never
196/// be true (as the maximum value is only `4`), so it can be pruned:
197///
198/// ```text
199/// A: false (no rows could possibly match x = 5)
200/// B: true (rows might match x = 5)
201/// C: true (rows might match x = 5)
202/// ```
203///
204/// See [`PruningPredicate::try_new`] and [`PruningPredicate::prune`] for more information.
205///
206/// # Background
207///
208/// ## Boolean Tri-state logic
209///
210/// To understand the details of the rest of this documentation, it is important
211/// to understand how the tri-state boolean logic in SQL works. As this is
212/// somewhat esoteric, we review it here.
213///
214/// SQL has a notion of `NULL` that represents the value is `“unknown”` and this
215/// uncertainty propagates through expressions. SQL `NULL` behaves very
216/// differently than the `NULL` in most other languages where it is a special,
217/// sentinel value (e.g. `0` in `C/C++`). While representing uncertainty with
218/// `NULL` is powerful and elegant, SQL `NULL`s are often deeply confusing when
219/// first encountered as they behave differently than most programmers may
220/// expect.
221///
222/// In most other programming languages,
223/// * `a == NULL` evaluates to `true` if `a` also had the value `NULL`
224/// * `a == NULL` evaluates to `false` if `a` has any other value
225///
226/// However, in SQL `a = NULL` **always** evaluates to `NULL` (never `true` or
227/// `false`):
228///
229/// Expression | Result
230/// ------------- | ---------
231/// `1 = NULL` | `NULL`
232/// `NULL = NULL` | `NULL`
233///
234/// Also important is how `AND` and `OR` works with tri-state boolean logic as
235/// (perhaps counterintuitively) the result is **not** always NULL. While
236/// consistent with the notion of `NULL` representing “unknown”, this is again,
237/// often deeply confusing 🤯 when first encountered.
238///
239/// Expression | Result | Intuition
240/// --------------- | --------- | -----------
241/// `NULL AND true` | `NULL` | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
242/// `NULL AND false` | `false` | If the `NULL` was either `true` or `false` the overall expression is still `false`
243/// `NULL AND NULL` | `NULL` |
244///
245/// Expression | Result | Intuition
246/// --------------- | --------- | ----------
247/// `NULL OR true` | `true` | If the `NULL` was either `true` or `false` the overall expression is still `true`
248/// `NULL OR false` | `NULL` | The `NULL` stands for “unknown” and if it were `true` or `false` the overall expression value could change
249/// `NULL OR NULL` | `NULL` |
250///
251/// ## SQL Filter Semantics
252///
253/// The SQL `WHERE` clause has a boolean expression, often called a filter or
254/// predicate. The semantics of this predicate are that the query evaluates the
255/// predicate for each row in the input tables and:
256///
257/// * Rows that evaluate to `true` are returned in the query results
258///
259/// * Rows that evaluate to `false` are not returned (“filtered out” or “pruned” or “skipped”).
260///
261/// * Rows that evaluate to `NULL` are **NOT** returned (also “filtered out”).
262/// Note: *this treatment of `NULL` is **DIFFERENT** than how `NULL` is treated
263/// in the rewritten predicate described below.*
264///
265/// # `PruningPredicate` Implementation
266///
267/// Armed with the information in the Background section, we can now understand
268/// how the `PruningPredicate` logic works.
269///
270/// ## Interface
271///
272/// **Inputs**
273/// 1. An input schema describing what columns exist
274///
275/// 2. A predicate (expression that evaluates to a boolean)
276///
277/// 3. [`PruningStatistics`] that provides information about columns in that
278/// schema, for multiple “containers”. For each column in each container, it
279/// provides optional information on contained values, min_values, max_values,
280/// null_counts counts, and row_counts counts.
281///
282/// **Outputs**:
283/// A (non null) boolean value for each container:
284/// * `true`: There MAY be rows that match the predicate
285///
286/// * `false`: There are no rows that could possibly match the predicate (the
287/// predicate can never possibly be true). The container can be pruned (skipped)
288/// entirely.
289///
290/// While `PruningPredicate` will never return a `NULL` value, the
291/// rewritten predicate (as returned by `build_predicate_expression` and used internally
292/// by `PruningPredicate`) may evaluate to `NULL` when some of the min/max values
293/// or null / row counts are not known.
294///
295/// In order to be correct, `PruningPredicate` must return false
296/// **only** if it can determine that for all rows in the container, the
297/// predicate could never evaluate to `true` (always evaluates to either `NULL`
298/// or `false`).
299///
300/// ## Contains Analysis and Min/Max Rewrite
301///
302/// `PruningPredicate` works by first analyzing the predicate to see what
303/// [`LiteralGuarantee`] must hold for the predicate to be true.
304///
305/// Then, the `PruningPredicate` rewrites the original predicate into an
306/// expression that references the min/max values of each column in the original
307/// predicate.
308///
309/// When the min/max values are actually substituted in to this expression and
310/// evaluated, the result means
311///
312/// * `true`: there MAY be rows that pass the predicate, **KEEPS** the container
313///
314/// * `NULL`: there MAY be rows that pass the predicate, **KEEPS** the container
315/// Note that rewritten predicate can evaluate to NULL when some of
316/// the min/max values are not known. *Note that this is different than
317/// the SQL filter semantics where `NULL` means the row is filtered
318/// out.*
319///
320/// * `false`: there are no rows that could possibly match the predicate,
321/// **PRUNES** the container
322///
323/// For example, given a column `x`, the `x_min`, `x_max`, `x_null_count`, and
324/// `x_row_count` represent the minimum and maximum values, the null count of
325/// column `x`, and the row count of column `x`, provided by the `PruningStatistics`.
326/// `x_null_count` and `x_row_count` are used to handle the case where the column `x`
327/// is known to be all `NULL`s. Note this is different from knowing nothing about
328/// the column `x`, which confusingly is encoded by returning `NULL` for the min/max
329/// values from [`PruningStatistics::max_values`] and [`PruningStatistics::min_values`].
330///
331/// Here are some examples of the rewritten predicates:
332///
333/// Original Predicate | Rewritten Predicate
334/// ------------------ | --------------------
335/// `x = 5` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)`
336/// `x < 5` | `x_null_count != x_row_count THEN false (x_max < 5)`
337/// `x = 5 AND y = 10` | `x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max) AND y_null_count != y_row_count (y_min <= 10 AND 10 <= y_max)`
338/// `x IS NULL` | `x_null_count > 0`
339/// `x IS NOT NULL` | `x_null_count != row_count`
340/// `CAST(x as int) = 5` | `x_null_count != x_row_count (CAST(x_min as int) <= 5 AND 5 <= CAST(x_max as int))`
341///
342/// ## Predicate Evaluation
343/// The PruningPredicate works in two passes
344///
345/// **First pass**: For each `LiteralGuarantee` calls
346/// [`PruningStatistics::contained`] and rules out containers where the
347/// LiteralGuarantees are not satisfied
348///
349/// **Second Pass**: Evaluates the rewritten expression using the
350/// min/max/null_counts/row_counts values for each column for each container. For any
351/// container that this expression evaluates to `false`, it rules out those
352/// containers.
353///
354///
355/// ### Example 1
356///
357/// Given the predicate, `x = 5 AND y = 10`, the rewritten predicate would look like:
358///
359/// ```sql
360/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
361/// AND
362/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
363/// ```
364///
365/// If we know that for a given container, `x` is between `1 and 100` and we know that
366/// `y` is between `4` and `7`, we know nothing about the null count and row count of
367/// `x` and `y`, the input statistics might look like:
368///
369/// Column | Value
370/// -------- | -----
371/// `x_min` | `1`
372/// `x_max` | `100`
373/// `x_null_count` | `null`
374/// `x_row_count` | `null`
375/// `y_min` | `4`
376/// `y_max` | `7`
377/// `y_null_count` | `null`
378/// `y_row_count` | `null`
379///
380/// When these statistics values are substituted in to the rewritten predicate and
381/// simplified, the result is `false`:
382///
383/// * `null != null AND (1 <= 5 AND 5 <= 100) AND null != null AND (4 <= 10 AND 10 <= 7)`
384/// * `null = null` is `null` which is not true, so the AND moves on to the next clause
385/// * `null and (1 <= 5 AND 5 <= 100) AND null AND (4 <= 10 AND 10 <= 7)`
386/// * evaluating the clauses further we get:
387/// * `null and true and null and false`
388/// * `null and false`
389/// * `false`
390///
391/// Returning `false` means the container can be pruned, which matches the
392/// intuition that `x = 5 AND y = 10` can’t be true for any row if all values of `y`
393/// are `7` or less.
394///
395/// Note that if we had ended up with `null AND true AND null AND true` the result
396/// would have been `null`.
397/// `null` is treated the same as`true`, because we can't prove that the predicate is `false.`
398///
399/// If, for some other container, we knew `y` was between the values `4` and
400/// `15`, then the rewritten predicate evaluates to `true` (verifying this is
401/// left as an exercise to the reader -- are you still here?), and the container
402/// **could not** be pruned. The intuition is that there may be rows where the
403/// predicate *might* evaluate to `true`, and the only way to find out is to do
404/// more analysis, for example by actually reading the data and evaluating the
405/// predicate row by row.
406///
407/// ### Example 2
408///
409/// Given the same predicate, `x = 5 AND y = 10`, the rewritten predicate would
410/// look like the same as example 1:
411///
412/// ```sql
413/// x_null_count != x_row_count AND (x_min <= 5 AND 5 <= x_max)
414/// AND
415/// y_null_count != y_row_count AND (y_min <= 10 AND 10 <= y_max)
416/// ```
417///
418/// If we know that for another given container, `x_min` is NULL and `x_max` is
419/// NULL (the min/max values are unknown), `x_null_count` is `100` and `x_row_count`
420/// is `100`; we know that `y` is between `4` and `7`, but we know nothing about
421/// the null count and row count of `y`. The input statistics might look like:
422///
423/// Column | Value
424/// -------- | -----
425/// `x_min` | `null`
426/// `x_max` | `null`
427/// `x_null_count` | `100`
428/// `x_row_count` | `100`
429/// `y_min` | `4`
430/// `y_max` | `7`
431/// `y_null_count` | `null`
432/// `y_row_count` | `null`
433///
434/// When these statistics values are substituted in to the rewritten predicate and
435/// simplified, the result is `false`:
436///
437/// * `100 != 100 AND (null <= 5 AND 5 <= null) AND null = null AND (4 <= 10 AND 10 <= 7)`
438/// * `false AND null AND null AND false`
439/// * `false AND false`
440/// * `false`
441///
442/// Returning `false` means the container can be pruned, which matches the
443/// intuition that `x = 5 AND y = 10` can’t be true because all values in `x`
444/// are known to be NULL.
445///
446/// # Related Work
447///
448/// [`PruningPredicate`] implements the type of min/max pruning described in
449/// Section `3.3.3` of the [`Snowflake SIGMOD Paper`]. The technique is
450/// described by various research such as [small materialized aggregates], [zone
451/// maps], and [data skipping].
452///
453/// [`Snowflake SIGMOD Paper`]: https://dl.acm.org/doi/10.1145/2882903.2903741
454/// [small materialized aggregates]: https://www.vldb.org/conf/1998/p476.pdf
455/// [zone maps]: https://dl.acm.org/doi/10.1007/978-3-642-03730-6_10
456/// [data skipping]: https://dl.acm.org/doi/10.1145/2588555.2610515
457#[derive(Debug, Clone)]
458pub struct PruningPredicate {
459 /// The input schema against which the predicate will be evaluated
460 schema: SchemaRef,
461 /// A min/max pruning predicate (rewritten in terms of column min/max
462 /// values, which are supplied by statistics)
463 predicate_expr: Arc<dyn PhysicalExpr>,
464 /// Description of which statistics are required to evaluate `predicate_expr`
465 required_columns: RequiredColumns,
466 /// Original physical predicate from which this predicate expr is derived
467 /// (required for serialization)
468 orig_expr: Arc<dyn PhysicalExpr>,
469 /// [`LiteralGuarantee`]s used to try and prove a predicate can not possibly
470 /// evaluate to `true`.
471 ///
472 /// See [`PruningPredicate::literal_guarantees`] for more details.
473 literal_guarantees: Vec<LiteralGuarantee>,
474}
475
476/// Rewrites predicates that [`PredicateRewriter`] can not handle, e.g. certain
477/// complex expressions or predicates that reference columns that are not in the
478/// schema.
479pub trait UnhandledPredicateHook {
480 /// Called when a predicate can not be rewritten in terms of statistics or
481 /// references a column that is not in the schema.
482 fn handle(&self, expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr>;
483}
484
485/// The default handling for unhandled predicates is to return a constant `true`
486/// (meaning don't prune the container)
487#[derive(Debug, Clone)]
488struct ConstantUnhandledPredicateHook {
489 default: Arc<dyn PhysicalExpr>,
490}
491
492impl Default for ConstantUnhandledPredicateHook {
493 fn default() -> Self {
494 Self {
495 default: Arc::new(phys_expr::Literal::new(ScalarValue::from(true))),
496 }
497 }
498}
499
500impl UnhandledPredicateHook for ConstantUnhandledPredicateHook {
501 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
502 Arc::clone(&self.default)
503 }
504}
505
506impl PruningPredicate {
507 /// Try to create a new instance of [`PruningPredicate`]
508 ///
509 /// This will translate the provided `expr` filter expression into
510 /// a *pruning predicate*.
511 ///
512 /// A pruning predicate is one that has been rewritten in terms of
513 /// the min and max values of column references and that evaluates
514 /// to FALSE if the filter predicate would evaluate FALSE *for
515 /// every row* whose values fell within the min / max ranges (aka
516 /// could be pruned).
517 ///
518 /// The pruning predicate evaluates to TRUE or NULL
519 /// if the filter predicate *might* evaluate to TRUE for at least
520 /// one row whose values fell within the min/max ranges (in other
521 /// words they might pass the predicate)
522 ///
523 /// For example, the filter expression `(column / 2) = 4` becomes
524 /// the pruning predicate
525 /// `(column_min / 2) <= 4 && 4 <= (column_max / 2))`
526 ///
527 /// See the struct level documentation on [`PruningPredicate`] for more
528 /// details.
529 pub fn try_new(expr: Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Result<Self> {
530 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
531
532 // build predicate expression once
533 let mut required_columns = RequiredColumns::new();
534 let predicate_expr = build_predicate_expression(
535 &expr,
536 schema.as_ref(),
537 &mut required_columns,
538 &unhandled_hook,
539 );
540
541 let literal_guarantees = LiteralGuarantee::analyze(&expr);
542
543 Ok(Self {
544 schema,
545 predicate_expr,
546 required_columns,
547 orig_expr: expr,
548 literal_guarantees,
549 })
550 }
551
552 /// For each set of statistics, evaluates the pruning predicate
553 /// and returns a `bool` with the following meaning for a
554 /// all rows whose values match the statistics:
555 ///
556 /// `true`: There MAY be rows that match the predicate
557 ///
558 /// `false`: There are no rows that could possibly match the predicate
559 ///
560 /// Note: the predicate passed to `prune` should already be simplified as
561 /// much as possible (e.g. this pass doesn't handle some
562 /// expressions like `b = false`, but it does handle the
563 /// simplified version `b`. See [`ExprSimplifier`] to simplify expressions.
564 ///
565 /// [`ExprSimplifier`]: https://docs.rs/datafusion/latest/datafusion/optimizer/simplify_expressions/struct.ExprSimplifier.html
566 pub fn prune<S: PruningStatistics>(&self, statistics: &S) -> Result<Vec<bool>> {
567 let mut builder = BoolVecBuilder::new(statistics.num_containers());
568
569 // Try to prove the predicate can't be true for the containers based on
570 // literal guarantees
571 for literal_guarantee in &self.literal_guarantees {
572 let LiteralGuarantee {
573 column,
574 guarantee,
575 literals,
576 } = literal_guarantee;
577 if let Some(results) = statistics.contained(column, literals) {
578 match guarantee {
579 // `In` means the values in the column must be one of the
580 // values in the set for the predicate to evaluate to true.
581 // If `contained` returns false, that means the column is
582 // not any of the values so we can prune the container
583 Guarantee::In => builder.combine_array(&results),
584 // `NotIn` means the values in the column must must not be
585 // any of the values in the set for the predicate to
586 // evaluate to true. If contained returns true, it means the
587 // column is only in the set of values so we can prune the
588 // container
589 Guarantee::NotIn => {
590 builder.combine_array(&arrow::compute::not(&results)?)
591 }
592 }
593 // if all containers are pruned (has rows that DEFINITELY DO NOT pass the predicate)
594 // can return early without evaluating the rest of predicates.
595 if builder.check_all_pruned() {
596 return Ok(builder.build());
597 }
598 }
599 }
600
601 // Next, try to prove the predicate can't be true for the containers based
602 // on min/max values
603
604 // build a RecordBatch that contains the min/max values in the
605 // appropriate statistics columns for the min/max predicate
606 let statistics_batch =
607 build_statistics_record_batch(statistics, &self.required_columns)?;
608
609 // Evaluate the pruning predicate on that record batch and append any results to the builder
610 builder.combine_value(self.predicate_expr.evaluate(&statistics_batch)?);
611
612 Ok(builder.build())
613 }
614
615 /// Return a reference to the input schema
616 pub fn schema(&self) -> &SchemaRef {
617 &self.schema
618 }
619
620 /// Returns a reference to the physical expr used to construct this pruning predicate
621 pub fn orig_expr(&self) -> &Arc<dyn PhysicalExpr> {
622 &self.orig_expr
623 }
624
625 /// Returns a reference to the predicate expr
626 pub fn predicate_expr(&self) -> &Arc<dyn PhysicalExpr> {
627 &self.predicate_expr
628 }
629
630 /// Returns a reference to the literal guarantees
631 ///
632 /// Note that **All** `LiteralGuarantee`s must be satisfied for the
633 /// expression to possibly be `true`. If any is not satisfied, the
634 /// expression is guaranteed to be `null` or `false`.
635 pub fn literal_guarantees(&self) -> &[LiteralGuarantee] {
636 &self.literal_guarantees
637 }
638
639 /// Returns true if this pruning predicate can not prune anything.
640 ///
641 /// This happens if the predicate is a literal `true` and
642 /// literal_guarantees is empty.
643 ///
644 /// This can happen when a predicate is simplified to a constant `true`
645 pub fn always_true(&self) -> bool {
646 is_always_true(&self.predicate_expr) && self.literal_guarantees.is_empty()
647 }
648
649 // this is only used by `parquet` feature right now
650 #[allow(dead_code)]
651 pub fn required_columns(&self) -> &RequiredColumns {
652 &self.required_columns
653 }
654
655 /// Names of the columns that are known to be / not be in a set
656 /// of literals (constants). These are the columns the that may be passed to
657 /// [`PruningStatistics::contained`] during pruning.
658 ///
659 /// This is useful to avoid fetching statistics for columns that will not be
660 /// used in the predicate. For example, it can be used to avoid reading
661 /// unneeded bloom filters (a non trivial operation).
662 pub fn literal_columns(&self) -> Vec<String> {
663 let mut seen = HashSet::new();
664 self.literal_guarantees
665 .iter()
666 .map(|e| &e.column.name)
667 // avoid duplicates
668 .filter(|name| seen.insert(*name))
669 .map(|s| s.to_string())
670 .collect()
671 }
672}
673
674/// Builds the return `Vec` for [`PruningPredicate::prune`].
675#[derive(Debug)]
676struct BoolVecBuilder {
677 /// One element per container. Each element is
678 /// * `true`: if the container has row that may pass the predicate
679 /// * `false`: if the container has rows that DEFINITELY DO NOT pass the predicate
680 inner: Vec<bool>,
681}
682
683impl BoolVecBuilder {
684 /// Create a new `BoolVecBuilder` with `num_containers` elements
685 fn new(num_containers: usize) -> Self {
686 Self {
687 // assume by default all containers may pass the predicate
688 inner: vec![true; num_containers],
689 }
690 }
691
692 /// Combines result `array` for a conjunct (e.g. `AND` clause) of a
693 /// predicate into the currently in progress array.
694 ///
695 /// Each `array` element is:
696 /// * `true`: container has row that may pass the predicate
697 /// * `false`: all container rows DEFINITELY DO NOT pass the predicate
698 /// * `null`: container may or may not have rows that pass the predicate
699 fn combine_array(&mut self, array: &BooleanArray) {
700 assert_eq!(array.len(), self.inner.len());
701 for (cur, new) in self.inner.iter_mut().zip(array.iter()) {
702 // `false` for this conjunct means we know for sure no rows could
703 // pass the predicate and thus we set the corresponding container
704 // location to false.
705 if let Some(false) = new {
706 *cur = false;
707 }
708 }
709 }
710
711 /// Combines the results in the [`ColumnarValue`] to the currently in
712 /// progress array, following the same rules as [`Self::combine_array`].
713 ///
714 /// # Panics
715 /// If `value` is not boolean
716 fn combine_value(&mut self, value: ColumnarValue) {
717 match value {
718 ColumnarValue::Array(array) => {
719 self.combine_array(array.as_boolean());
720 }
721 ColumnarValue::Scalar(ScalarValue::Boolean(Some(false))) => {
722 // False means all containers can not pass the predicate
723 self.inner = vec![false; self.inner.len()];
724 }
725 _ => {
726 // Null or true means the rows in container may pass this
727 // conjunct so we can't prune any containers based on that
728 }
729 }
730 }
731
732 /// Convert this builder into a Vec of bools
733 fn build(self) -> Vec<bool> {
734 self.inner
735 }
736
737 /// Check all containers has rows that DEFINITELY DO NOT pass the predicate
738 fn check_all_pruned(&self) -> bool {
739 self.inner.iter().all(|&x| !x)
740 }
741}
742
743fn is_always_true(expr: &Arc<dyn PhysicalExpr>) -> bool {
744 expr.as_any()
745 .downcast_ref::<phys_expr::Literal>()
746 .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
747 .unwrap_or_default()
748}
749
750/// Describes which columns statistics are necessary to evaluate a
751/// [`PruningPredicate`].
752///
753/// This structure permits reading and creating the minimum number statistics,
754/// which is important since statistics may be non trivial to read (e.g. large
755/// strings or when there are 1000s of columns).
756///
757/// Handles creating references to the min/max statistics
758/// for columns as well as recording which statistics are needed
759#[derive(Debug, Default, Clone)]
760pub struct RequiredColumns {
761 /// The statistics required to evaluate this predicate:
762 /// * The unqualified column in the input schema
763 /// * Statistics type (e.g. Min or Max or Null_Count)
764 /// * The field the statistics value should be placed in for
765 /// pruning predicate evaluation (e.g. `min_value` or `max_value`)
766 columns: Vec<(phys_expr::Column, StatisticsType, Field)>,
767}
768
769impl RequiredColumns {
770 fn new() -> Self {
771 Self::default()
772 }
773
774 /// Returns Some(column) if this is a single column predicate.
775 ///
776 /// Returns None if this is a multi-column predicate.
777 ///
778 /// Examples:
779 /// * `a > 5 OR a < 10` returns `Some(a)`
780 /// * `a > 5 OR b < 10` returns `None`
781 /// * `true` returns None
782 #[allow(dead_code)]
783 // this fn is only used by `parquet` feature right now, thus the `allow(dead_code)`
784 pub fn single_column(&self) -> Option<&phys_expr::Column> {
785 if self.columns.windows(2).all(|w| {
786 // check if all columns are the same (ignoring statistics and field)
787 let c1 = &w[0].0;
788 let c2 = &w[1].0;
789 c1 == c2
790 }) {
791 self.columns.first().map(|r| &r.0)
792 } else {
793 None
794 }
795 }
796
797 /// Returns an iterator over items in columns (see doc on
798 /// `self.columns` for details)
799 pub(crate) fn iter(
800 &self,
801 ) -> impl Iterator<Item = &(phys_expr::Column, StatisticsType, Field)> {
802 self.columns.iter()
803 }
804
805 fn find_stat_column(
806 &self,
807 column: &phys_expr::Column,
808 statistics_type: StatisticsType,
809 ) -> Option<usize> {
810 match statistics_type {
811 StatisticsType::RowCount => {
812 // Use the first row count we find, if any
813 self.columns
814 .iter()
815 .enumerate()
816 .find(|(_i, (_c, t, _f))| t == &statistics_type)
817 .map(|(i, (_c, _t, _f))| i)
818 }
819 _ => self
820 .columns
821 .iter()
822 .enumerate()
823 .find(|(_i, (c, t, _f))| c == column && t == &statistics_type)
824 .map(|(i, (_c, _t, _f))| i),
825 }
826 }
827
828 /// Rewrites column_expr so that all appearances of column
829 /// are replaced with a reference to either the min or max
830 /// statistics column, while keeping track that a reference to the statistics
831 /// column is required
832 ///
833 /// for example, an expression like `col("foo") > 5`, when called
834 /// with Max would result in an expression like `col("foo_max") >
835 /// 5` with the appropriate entry noted in self.columns
836 fn stat_column_expr(
837 &mut self,
838 column: &phys_expr::Column,
839 column_expr: &Arc<dyn PhysicalExpr>,
840 field: &Field,
841 stat_type: StatisticsType,
842 ) -> Result<Arc<dyn PhysicalExpr>> {
843 let (idx, need_to_insert) = match self.find_stat_column(column, stat_type) {
844 Some(idx) => (idx, false),
845 None => (self.columns.len(), true),
846 };
847
848 let column_name = column.name();
849 let stat_column_name = match stat_type {
850 StatisticsType::Min => format!("{column_name}_min"),
851 StatisticsType::Max => format!("{column_name}_max"),
852 StatisticsType::NullCount => format!("{column_name}_null_count"),
853 StatisticsType::RowCount => "row_count".to_string(),
854 };
855
856 let stat_column = phys_expr::Column::new(&stat_column_name, idx);
857
858 // only add statistics column if not previously added
859 if need_to_insert {
860 // may be null if statistics are not present
861 let nullable = true;
862 let stat_field =
863 Field::new(stat_column.name(), field.data_type().clone(), nullable);
864 self.columns.push((column.clone(), stat_type, stat_field));
865 }
866 rewrite_column_expr(Arc::clone(column_expr), column, &stat_column)
867 }
868
869 /// rewrite col --> col_min
870 fn min_column_expr(
871 &mut self,
872 column: &phys_expr::Column,
873 column_expr: &Arc<dyn PhysicalExpr>,
874 field: &Field,
875 ) -> Result<Arc<dyn PhysicalExpr>> {
876 self.stat_column_expr(column, column_expr, field, StatisticsType::Min)
877 }
878
879 /// rewrite col --> col_max
880 fn max_column_expr(
881 &mut self,
882 column: &phys_expr::Column,
883 column_expr: &Arc<dyn PhysicalExpr>,
884 field: &Field,
885 ) -> Result<Arc<dyn PhysicalExpr>> {
886 self.stat_column_expr(column, column_expr, field, StatisticsType::Max)
887 }
888
889 /// rewrite col --> col_null_count
890 fn null_count_column_expr(
891 &mut self,
892 column: &phys_expr::Column,
893 column_expr: &Arc<dyn PhysicalExpr>,
894 field: &Field,
895 ) -> Result<Arc<dyn PhysicalExpr>> {
896 self.stat_column_expr(column, column_expr, field, StatisticsType::NullCount)
897 }
898
899 /// rewrite col --> col_row_count
900 fn row_count_column_expr(
901 &mut self,
902 column: &phys_expr::Column,
903 column_expr: &Arc<dyn PhysicalExpr>,
904 field: &Field,
905 ) -> Result<Arc<dyn PhysicalExpr>> {
906 self.stat_column_expr(column, column_expr, field, StatisticsType::RowCount)
907 }
908}
909
910impl From<Vec<(phys_expr::Column, StatisticsType, Field)>> for RequiredColumns {
911 fn from(columns: Vec<(phys_expr::Column, StatisticsType, Field)>) -> Self {
912 Self { columns }
913 }
914}
915
916/// Build a RecordBatch from a list of statistics, creating arrays,
917/// with one row for each PruningStatistics and columns specified in
918/// in the required_columns parameter.
919///
920/// For example, if the requested columns are
921/// ```text
922/// ("s1", Min, Field:s1_min)
923/// ("s2", Max, field:s2_max)
924///```
925///
926/// And the input statistics had
927/// ```text
928/// S1(Min: 5, Max: 10)
929/// S2(Min: 99, Max: 1000)
930/// S3(Min: 1, Max: 2)
931/// ```
932///
933/// Then this function would build a record batch with 2 columns and
934/// one row s1_min and s2_max as follows (s3 is not requested):
935///
936/// ```text
937/// s1_min | s2_max
938/// -------+--------
939/// 5 | 1000
940/// ```
941fn build_statistics_record_batch<S: PruningStatistics>(
942 statistics: &S,
943 required_columns: &RequiredColumns,
944) -> Result<RecordBatch> {
945 let mut fields = Vec::<Field>::new();
946 let mut arrays = Vec::<ArrayRef>::new();
947 // For each needed statistics column:
948 for (column, statistics_type, stat_field) in required_columns.iter() {
949 let column = Column::from_name(column.name());
950 let data_type = stat_field.data_type();
951
952 let num_containers = statistics.num_containers();
953
954 let array = match statistics_type {
955 StatisticsType::Min => statistics.min_values(&column),
956 StatisticsType::Max => statistics.max_values(&column),
957 StatisticsType::NullCount => statistics.null_counts(&column),
958 StatisticsType::RowCount => statistics.row_counts(&column),
959 };
960 let array = array.unwrap_or_else(|| new_null_array(data_type, num_containers));
961
962 if num_containers != array.len() {
963 return internal_err!(
964 "mismatched statistics length. Expected {}, got {}",
965 num_containers,
966 array.len()
967 );
968 }
969
970 // cast statistics array to required data type (e.g. parquet
971 // provides timestamp statistics as "Int64")
972 let array = arrow::compute::cast(&array, data_type)?;
973
974 fields.push(stat_field.clone());
975 arrays.push(array);
976 }
977
978 let schema = Arc::new(Schema::new(fields));
979 // provide the count in case there were no needed statistics
980 let mut options = RecordBatchOptions::default();
981 options.row_count = Some(statistics.num_containers());
982
983 trace!(
984 "Creating statistics batch for {:#?} with {:#?}",
985 required_columns,
986 arrays
987 );
988
989 RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
990 plan_datafusion_err!("Can not create statistics record batch: {err}")
991 })
992}
993
994struct PruningExpressionBuilder<'a> {
995 column: phys_expr::Column,
996 column_expr: Arc<dyn PhysicalExpr>,
997 op: Operator,
998 scalar_expr: Arc<dyn PhysicalExpr>,
999 field: &'a Field,
1000 required_columns: &'a mut RequiredColumns,
1001}
1002
1003impl<'a> PruningExpressionBuilder<'a> {
1004 fn try_new(
1005 left: &'a Arc<dyn PhysicalExpr>,
1006 right: &'a Arc<dyn PhysicalExpr>,
1007 op: Operator,
1008 schema: &'a Schema,
1009 required_columns: &'a mut RequiredColumns,
1010 ) -> Result<Self> {
1011 // find column name; input could be a more complicated expression
1012 let left_columns = collect_columns(left);
1013 let right_columns = collect_columns(right);
1014 let (column_expr, scalar_expr, columns, correct_operator) =
1015 match (left_columns.len(), right_columns.len()) {
1016 (1, 0) => (left, right, left_columns, op),
1017 (0, 1) => (right, left, right_columns, reverse_operator(op)?),
1018 _ => {
1019 // if more than one column used in expression - not supported
1020 return plan_err!(
1021 "Multi-column expressions are not currently supported"
1022 );
1023 }
1024 };
1025
1026 let df_schema = DFSchema::try_from(schema.clone())?;
1027 let (column_expr, correct_operator, scalar_expr) = rewrite_expr_to_prunable(
1028 column_expr,
1029 correct_operator,
1030 scalar_expr,
1031 df_schema,
1032 )?;
1033 let column = columns.iter().next().unwrap().clone();
1034 let field = match schema.column_with_name(column.name()) {
1035 Some((_, f)) => f,
1036 _ => {
1037 return plan_err!("Field not found in schema");
1038 }
1039 };
1040
1041 Ok(Self {
1042 column,
1043 column_expr,
1044 op: correct_operator,
1045 scalar_expr,
1046 field,
1047 required_columns,
1048 })
1049 }
1050
1051 fn op(&self) -> Operator {
1052 self.op
1053 }
1054
1055 fn scalar_expr(&self) -> &Arc<dyn PhysicalExpr> {
1056 &self.scalar_expr
1057 }
1058
1059 fn min_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1060 self.required_columns
1061 .min_column_expr(&self.column, &self.column_expr, self.field)
1062 }
1063
1064 fn max_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1065 self.required_columns
1066 .max_column_expr(&self.column, &self.column_expr, self.field)
1067 }
1068
1069 /// This function is to simply retune the `null_count` physical expression no matter what the
1070 /// predicate expression is
1071 ///
1072 /// i.e., x > 5 => x_null_count,
1073 /// cast(x as int) < 10 => x_null_count,
1074 /// try_cast(x as float) < 10.0 => x_null_count
1075 fn null_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1076 // Retune to [`phys_expr::Column`]
1077 let column_expr = Arc::new(self.column.clone()) as _;
1078
1079 // null_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1080 let null_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1081
1082 self.required_columns.null_count_column_expr(
1083 &self.column,
1084 &column_expr,
1085 null_count_field,
1086 )
1087 }
1088
1089 /// This function is to simply retune the `row_count` physical expression no matter what the
1090 /// predicate expression is
1091 ///
1092 /// i.e., x > 5 => x_row_count,
1093 /// cast(x as int) < 10 => x_row_count,
1094 /// try_cast(x as float) < 10.0 => x_row_count
1095 fn row_count_column_expr(&mut self) -> Result<Arc<dyn PhysicalExpr>> {
1096 // Retune to [`phys_expr::Column`]
1097 let column_expr = Arc::new(self.column.clone()) as _;
1098
1099 // row_count is DataType::UInt64, which is different from the column's data type (i.e. self.field)
1100 let row_count_field = &Field::new(self.field.name(), DataType::UInt64, true);
1101
1102 self.required_columns.row_count_column_expr(
1103 &self.column,
1104 &column_expr,
1105 row_count_field,
1106 )
1107 }
1108}
1109
1110/// This function is designed to rewrite the column_expr to
1111/// ensure the column_expr is monotonically increasing.
1112///
1113/// For example,
1114/// 1. `col > 10`
1115/// 2. `-col > 10` should be rewritten to `col < -10`
1116/// 3. `!col = true` would be rewritten to `col = !true`
1117/// 4. `abs(a - 10) > 0` not supported
1118/// 5. `cast(can_prunable_expr) > 10`
1119/// 6. `try_cast(can_prunable_expr) > 10`
1120///
1121/// More rewrite rules are still in progress.
1122fn rewrite_expr_to_prunable(
1123 column_expr: &PhysicalExprRef,
1124 op: Operator,
1125 scalar_expr: &PhysicalExprRef,
1126 schema: DFSchema,
1127) -> Result<(PhysicalExprRef, Operator, PhysicalExprRef)> {
1128 if !is_compare_op(op) {
1129 return plan_err!("rewrite_expr_to_prunable only support compare expression");
1130 }
1131
1132 let column_expr_any = column_expr.as_any();
1133
1134 if column_expr_any
1135 .downcast_ref::<phys_expr::Column>()
1136 .is_some()
1137 {
1138 // `col op lit()`
1139 Ok((Arc::clone(column_expr), op, Arc::clone(scalar_expr)))
1140 } else if let Some(cast) = column_expr_any.downcast_ref::<phys_expr::CastExpr>() {
1141 // `cast(col) op lit()`
1142 let arrow_schema: SchemaRef = schema.clone().into();
1143 let from_type = cast.expr().data_type(&arrow_schema)?;
1144 verify_support_type_for_prune(&from_type, cast.cast_type())?;
1145 let (left, op, right) =
1146 rewrite_expr_to_prunable(cast.expr(), op, scalar_expr, schema)?;
1147 let left = Arc::new(phys_expr::CastExpr::new(
1148 left,
1149 cast.cast_type().clone(),
1150 None,
1151 ));
1152 Ok((left, op, right))
1153 } else if let Some(try_cast) =
1154 column_expr_any.downcast_ref::<phys_expr::TryCastExpr>()
1155 {
1156 // `try_cast(col) op lit()`
1157 let arrow_schema: SchemaRef = schema.clone().into();
1158 let from_type = try_cast.expr().data_type(&arrow_schema)?;
1159 verify_support_type_for_prune(&from_type, try_cast.cast_type())?;
1160 let (left, op, right) =
1161 rewrite_expr_to_prunable(try_cast.expr(), op, scalar_expr, schema)?;
1162 let left = Arc::new(phys_expr::TryCastExpr::new(
1163 left,
1164 try_cast.cast_type().clone(),
1165 ));
1166 Ok((left, op, right))
1167 } else if let Some(neg) = column_expr_any.downcast_ref::<phys_expr::NegativeExpr>() {
1168 // `-col > lit()` --> `col < -lit()`
1169 let (left, op, right) =
1170 rewrite_expr_to_prunable(neg.arg(), op, scalar_expr, schema)?;
1171 let right = Arc::new(phys_expr::NegativeExpr::new(right));
1172 Ok((left, reverse_operator(op)?, right))
1173 } else if let Some(not) = column_expr_any.downcast_ref::<phys_expr::NotExpr>() {
1174 // `!col = true` --> `col = !true`
1175 if op != Operator::Eq && op != Operator::NotEq {
1176 return plan_err!("Not with operator other than Eq / NotEq is not supported");
1177 }
1178 if not
1179 .arg()
1180 .as_any()
1181 .downcast_ref::<phys_expr::Column>()
1182 .is_some()
1183 {
1184 let left = Arc::clone(not.arg());
1185 let right = Arc::new(phys_expr::NotExpr::new(Arc::clone(scalar_expr)));
1186 Ok((left, reverse_operator(op)?, right))
1187 } else {
1188 plan_err!("Not with complex expression {column_expr:?} is not supported")
1189 }
1190 } else {
1191 plan_err!("column expression {column_expr:?} is not supported")
1192 }
1193}
1194
1195fn is_compare_op(op: Operator) -> bool {
1196 matches!(
1197 op,
1198 Operator::Eq
1199 | Operator::NotEq
1200 | Operator::Lt
1201 | Operator::LtEq
1202 | Operator::Gt
1203 | Operator::GtEq
1204 | Operator::LikeMatch
1205 | Operator::NotLikeMatch
1206 )
1207}
1208
1209// The pruning logic is based on the comparing the min/max bounds.
1210// Must make sure the two type has order.
1211// For example, casts from string to numbers is not correct.
1212// Because the "13" is less than "3" with UTF8 comparison order.
1213fn verify_support_type_for_prune(from_type: &DataType, to_type: &DataType) -> Result<()> {
1214 // TODO: support other data type for prunable cast or try cast
1215 if matches!(
1216 from_type,
1217 DataType::Int8
1218 | DataType::Int16
1219 | DataType::Int32
1220 | DataType::Int64
1221 | DataType::Decimal128(_, _)
1222 ) && matches!(
1223 to_type,
1224 DataType::Int8 | DataType::Int32 | DataType::Int64 | DataType::Decimal128(_, _)
1225 ) {
1226 Ok(())
1227 } else {
1228 plan_err!(
1229 "Try Cast/Cast with from type {from_type} to type {to_type} is not supported"
1230 )
1231 }
1232}
1233
1234/// replaces a column with an old name with a new name in an expression
1235fn rewrite_column_expr(
1236 e: Arc<dyn PhysicalExpr>,
1237 column_old: &phys_expr::Column,
1238 column_new: &phys_expr::Column,
1239) -> Result<Arc<dyn PhysicalExpr>> {
1240 e.transform(|expr| {
1241 if let Some(column) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1242 if column == column_old {
1243 return Ok(Transformed::yes(Arc::new(column_new.clone())));
1244 }
1245 }
1246
1247 Ok(Transformed::no(expr))
1248 })
1249 .data()
1250}
1251
1252fn reverse_operator(op: Operator) -> Result<Operator> {
1253 op.swap().ok_or_else(|| {
1254 DataFusionError::Internal(format!(
1255 "Could not reverse operator {op} while building pruning predicate"
1256 ))
1257 })
1258}
1259
1260/// Given a column reference to `column`, returns a pruning
1261/// expression in terms of the min and max that will evaluate to true
1262/// if the column may contain values, and false if definitely does not
1263/// contain values
1264fn build_single_column_expr(
1265 column: &phys_expr::Column,
1266 schema: &Schema,
1267 required_columns: &mut RequiredColumns,
1268 is_not: bool, // if true, treat as !col
1269) -> Option<Arc<dyn PhysicalExpr>> {
1270 let field = schema.field_with_name(column.name()).ok()?;
1271
1272 if matches!(field.data_type(), &DataType::Boolean) {
1273 let col_ref = Arc::new(column.clone()) as _;
1274
1275 let min = required_columns
1276 .min_column_expr(column, &col_ref, field)
1277 .ok()?;
1278 let max = required_columns
1279 .max_column_expr(column, &col_ref, field)
1280 .ok()?;
1281
1282 // remember -- we want an expression that is:
1283 // TRUE: if there may be rows that match
1284 // FALSE: if there are no rows that match
1285 if is_not {
1286 // The only way we know a column couldn't match is if both the min and max are true
1287 // !(min && max)
1288 Some(Arc::new(phys_expr::NotExpr::new(Arc::new(
1289 phys_expr::BinaryExpr::new(min, Operator::And, max),
1290 ))))
1291 } else {
1292 // the only way we know a column couldn't match is if both the min and max are false
1293 // !(!min && !max) --> min || max
1294 Some(Arc::new(phys_expr::BinaryExpr::new(min, Operator::Or, max)))
1295 }
1296 } else {
1297 None
1298 }
1299}
1300
1301/// Given an expression reference to `expr`, if `expr` is a column expression,
1302/// returns a pruning expression in terms of IsNull that will evaluate to true
1303/// if the column may contain null, and false if definitely does not
1304/// contain null.
1305/// If `with_not` is true, build a pruning expression for `col IS NOT NULL`: `col_count != col_null_count`
1306/// The pruning expression evaluates to true ONLY if the column definitely CONTAINS
1307/// at least one NULL value. In this case we can know that `IS NOT NULL` can not be true and
1308/// thus can prune the row group / value
1309fn build_is_null_column_expr(
1310 expr: &Arc<dyn PhysicalExpr>,
1311 schema: &Schema,
1312 required_columns: &mut RequiredColumns,
1313 with_not: bool,
1314) -> Option<Arc<dyn PhysicalExpr>> {
1315 if let Some(col) = expr.as_any().downcast_ref::<phys_expr::Column>() {
1316 let field = schema.field_with_name(col.name()).ok()?;
1317
1318 let null_count_field = &Field::new(field.name(), DataType::UInt64, true);
1319 if with_not {
1320 if let Ok(row_count_expr) =
1321 required_columns.row_count_column_expr(col, expr, null_count_field)
1322 {
1323 required_columns
1324 .null_count_column_expr(col, expr, null_count_field)
1325 .map(|null_count_column_expr| {
1326 // IsNotNull(column) => null_count != row_count
1327 Arc::new(phys_expr::BinaryExpr::new(
1328 null_count_column_expr,
1329 Operator::NotEq,
1330 row_count_expr,
1331 )) as _
1332 })
1333 .ok()
1334 } else {
1335 None
1336 }
1337 } else {
1338 required_columns
1339 .null_count_column_expr(col, expr, null_count_field)
1340 .map(|null_count_column_expr| {
1341 // IsNull(column) => null_count > 0
1342 Arc::new(phys_expr::BinaryExpr::new(
1343 null_count_column_expr,
1344 Operator::Gt,
1345 Arc::new(phys_expr::Literal::new(ScalarValue::UInt64(Some(0)))),
1346 )) as _
1347 })
1348 .ok()
1349 }
1350 } else {
1351 None
1352 }
1353}
1354
1355/// The maximum number of entries in an `InList` that might be rewritten into
1356/// an OR chain
1357const MAX_LIST_VALUE_SIZE_REWRITE: usize = 20;
1358
1359/// Rewrite a predicate expression in terms of statistics (min/max/null_counts)
1360/// for use as a [`PruningPredicate`].
1361pub struct PredicateRewriter {
1362 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1363}
1364
1365impl Default for PredicateRewriter {
1366 fn default() -> Self {
1367 Self {
1368 unhandled_hook: Arc::new(ConstantUnhandledPredicateHook::default()),
1369 }
1370 }
1371}
1372
1373impl PredicateRewriter {
1374 /// Create a new `PredicateRewriter`
1375 pub fn new() -> Self {
1376 Self::default()
1377 }
1378
1379 /// Set the unhandled hook to be used when a predicate can not be rewritten
1380 pub fn with_unhandled_hook(
1381 self,
1382 unhandled_hook: Arc<dyn UnhandledPredicateHook>,
1383 ) -> Self {
1384 Self { unhandled_hook }
1385 }
1386
1387 /// Translate logical filter expression into pruning predicate
1388 /// expression that will evaluate to FALSE if it can be determined no
1389 /// rows between the min/max values could pass the predicates.
1390 ///
1391 /// Any predicates that can not be translated will be passed to `unhandled_hook`.
1392 ///
1393 /// Returns the pruning predicate as an [`PhysicalExpr`]
1394 ///
1395 /// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1396 pub fn rewrite_predicate_to_statistics_predicate(
1397 &self,
1398 expr: &Arc<dyn PhysicalExpr>,
1399 schema: &Schema,
1400 ) -> Arc<dyn PhysicalExpr> {
1401 let mut required_columns = RequiredColumns::new();
1402 build_predicate_expression(
1403 expr,
1404 schema,
1405 &mut required_columns,
1406 &self.unhandled_hook,
1407 )
1408 }
1409}
1410
1411/// Translate logical filter expression into pruning predicate
1412/// expression that will evaluate to FALSE if it can be determined no
1413/// rows between the min/max values could pass the predicates.
1414///
1415/// Any predicates that can not be translated will be passed to `unhandled_hook`.
1416///
1417/// Returns the pruning predicate as an [`PhysicalExpr`]
1418///
1419/// Notice: Does not handle [`phys_expr::InListExpr`] greater than 20, which will fall back to calling `unhandled_hook`
1420fn build_predicate_expression(
1421 expr: &Arc<dyn PhysicalExpr>,
1422 schema: &Schema,
1423 required_columns: &mut RequiredColumns,
1424 unhandled_hook: &Arc<dyn UnhandledPredicateHook>,
1425) -> Arc<dyn PhysicalExpr> {
1426 // predicate expression can only be a binary expression
1427 let expr_any = expr.as_any();
1428 if let Some(is_null) = expr_any.downcast_ref::<phys_expr::IsNullExpr>() {
1429 return build_is_null_column_expr(is_null.arg(), schema, required_columns, false)
1430 .unwrap_or_else(|| unhandled_hook.handle(expr));
1431 }
1432 if let Some(is_not_null) = expr_any.downcast_ref::<phys_expr::IsNotNullExpr>() {
1433 return build_is_null_column_expr(
1434 is_not_null.arg(),
1435 schema,
1436 required_columns,
1437 true,
1438 )
1439 .unwrap_or_else(|| unhandled_hook.handle(expr));
1440 }
1441 if let Some(col) = expr_any.downcast_ref::<phys_expr::Column>() {
1442 return build_single_column_expr(col, schema, required_columns, false)
1443 .unwrap_or_else(|| unhandled_hook.handle(expr));
1444 }
1445 if let Some(not) = expr_any.downcast_ref::<phys_expr::NotExpr>() {
1446 // match !col (don't do so recursively)
1447 if let Some(col) = not.arg().as_any().downcast_ref::<phys_expr::Column>() {
1448 return build_single_column_expr(col, schema, required_columns, true)
1449 .unwrap_or_else(|| unhandled_hook.handle(expr));
1450 } else {
1451 return unhandled_hook.handle(expr);
1452 }
1453 }
1454 if let Some(in_list) = expr_any.downcast_ref::<phys_expr::InListExpr>() {
1455 if !in_list.list().is_empty()
1456 && in_list.list().len() <= MAX_LIST_VALUE_SIZE_REWRITE
1457 {
1458 let eq_op = if in_list.negated() {
1459 Operator::NotEq
1460 } else {
1461 Operator::Eq
1462 };
1463 let re_op = if in_list.negated() {
1464 Operator::And
1465 } else {
1466 Operator::Or
1467 };
1468 let change_expr = in_list
1469 .list()
1470 .iter()
1471 .map(|e| {
1472 Arc::new(phys_expr::BinaryExpr::new(
1473 Arc::clone(in_list.expr()),
1474 eq_op,
1475 Arc::clone(e),
1476 )) as _
1477 })
1478 .reduce(|a, b| Arc::new(phys_expr::BinaryExpr::new(a, re_op, b)) as _)
1479 .unwrap();
1480 return build_predicate_expression(
1481 &change_expr,
1482 schema,
1483 required_columns,
1484 unhandled_hook,
1485 );
1486 } else {
1487 return unhandled_hook.handle(expr);
1488 }
1489 }
1490
1491 let (left, op, right) = {
1492 if let Some(bin_expr) = expr_any.downcast_ref::<phys_expr::BinaryExpr>() {
1493 (
1494 Arc::clone(bin_expr.left()),
1495 *bin_expr.op(),
1496 Arc::clone(bin_expr.right()),
1497 )
1498 } else if let Some(like_expr) = expr_any.downcast_ref::<phys_expr::LikeExpr>() {
1499 if like_expr.case_insensitive() {
1500 return unhandled_hook.handle(expr);
1501 }
1502 let op = match (like_expr.negated(), like_expr.case_insensitive()) {
1503 (false, false) => Operator::LikeMatch,
1504 (true, false) => Operator::NotLikeMatch,
1505 (false, true) => Operator::ILikeMatch,
1506 (true, true) => Operator::NotILikeMatch,
1507 };
1508 (
1509 Arc::clone(like_expr.expr()),
1510 op,
1511 Arc::clone(like_expr.pattern()),
1512 )
1513 } else {
1514 return unhandled_hook.handle(expr);
1515 }
1516 };
1517
1518 if op == Operator::And || op == Operator::Or {
1519 let left_expr =
1520 build_predicate_expression(&left, schema, required_columns, unhandled_hook);
1521 let right_expr =
1522 build_predicate_expression(&right, schema, required_columns, unhandled_hook);
1523 // simplify boolean expression if applicable
1524 let expr = match (&left_expr, op, &right_expr) {
1525 (left, Operator::And, _) if is_always_true(left) => right_expr,
1526 (_, Operator::And, right) if is_always_true(right) => left_expr,
1527 (left, Operator::Or, right)
1528 if is_always_true(left) || is_always_true(right) =>
1529 {
1530 Arc::new(phys_expr::Literal::new(ScalarValue::Boolean(Some(true))))
1531 }
1532 _ => Arc::new(phys_expr::BinaryExpr::new(left_expr, op, right_expr)),
1533 };
1534 return expr;
1535 }
1536
1537 let expr_builder =
1538 PruningExpressionBuilder::try_new(&left, &right, op, schema, required_columns);
1539 let mut expr_builder = match expr_builder {
1540 Ok(builder) => builder,
1541 // allow partial failure in predicate expression generation
1542 // this can still produce a useful predicate when multiple conditions are joined using AND
1543 Err(_) => return unhandled_hook.handle(expr),
1544 };
1545
1546 build_statistics_expr(&mut expr_builder)
1547 .unwrap_or_else(|_| unhandled_hook.handle(expr))
1548}
1549
1550fn build_statistics_expr(
1551 expr_builder: &mut PruningExpressionBuilder,
1552) -> Result<Arc<dyn PhysicalExpr>> {
1553 let statistics_expr: Arc<dyn PhysicalExpr> = match expr_builder.op() {
1554 Operator::NotEq => {
1555 // column != literal => (min, max) = literal =>
1556 // !(min != literal && max != literal) ==>
1557 // min != literal || literal != max
1558 let min_column_expr = expr_builder.min_column_expr()?;
1559 let max_column_expr = expr_builder.max_column_expr()?;
1560 Arc::new(phys_expr::BinaryExpr::new(
1561 Arc::new(phys_expr::BinaryExpr::new(
1562 min_column_expr,
1563 Operator::NotEq,
1564 Arc::clone(expr_builder.scalar_expr()),
1565 )),
1566 Operator::Or,
1567 Arc::new(phys_expr::BinaryExpr::new(
1568 Arc::clone(expr_builder.scalar_expr()),
1569 Operator::NotEq,
1570 max_column_expr,
1571 )),
1572 ))
1573 }
1574 Operator::Eq => {
1575 // column = literal => (min, max) = literal => min <= literal && literal <= max
1576 // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2)
1577 let min_column_expr = expr_builder.min_column_expr()?;
1578 let max_column_expr = expr_builder.max_column_expr()?;
1579 Arc::new(phys_expr::BinaryExpr::new(
1580 Arc::new(phys_expr::BinaryExpr::new(
1581 min_column_expr,
1582 Operator::LtEq,
1583 Arc::clone(expr_builder.scalar_expr()),
1584 )),
1585 Operator::And,
1586 Arc::new(phys_expr::BinaryExpr::new(
1587 Arc::clone(expr_builder.scalar_expr()),
1588 Operator::LtEq,
1589 max_column_expr,
1590 )),
1591 ))
1592 }
1593 Operator::NotLikeMatch => build_not_like_match(expr_builder)?,
1594 Operator::LikeMatch => build_like_match(expr_builder).ok_or_else(|| {
1595 plan_datafusion_err!(
1596 "LIKE expression with wildcard at the beginning is not supported"
1597 )
1598 })?,
1599 Operator::Gt => {
1600 // column > literal => (min, max) > literal => max > literal
1601 Arc::new(phys_expr::BinaryExpr::new(
1602 expr_builder.max_column_expr()?,
1603 Operator::Gt,
1604 Arc::clone(expr_builder.scalar_expr()),
1605 ))
1606 }
1607 Operator::GtEq => {
1608 // column >= literal => (min, max) >= literal => max >= literal
1609 Arc::new(phys_expr::BinaryExpr::new(
1610 expr_builder.max_column_expr()?,
1611 Operator::GtEq,
1612 Arc::clone(expr_builder.scalar_expr()),
1613 ))
1614 }
1615 Operator::Lt => {
1616 // column < literal => (min, max) < literal => min < literal
1617 Arc::new(phys_expr::BinaryExpr::new(
1618 expr_builder.min_column_expr()?,
1619 Operator::Lt,
1620 Arc::clone(expr_builder.scalar_expr()),
1621 ))
1622 }
1623 Operator::LtEq => {
1624 // column <= literal => (min, max) <= literal => min <= literal
1625 Arc::new(phys_expr::BinaryExpr::new(
1626 expr_builder.min_column_expr()?,
1627 Operator::LtEq,
1628 Arc::clone(expr_builder.scalar_expr()),
1629 ))
1630 }
1631 // other expressions are not supported
1632 _ => {
1633 return plan_err!(
1634 "expressions other than (neq, eq, gt, gteq, lt, lteq) are not supported"
1635 );
1636 }
1637 };
1638 let statistics_expr = wrap_null_count_check_expr(statistics_expr, expr_builder)?;
1639 Ok(statistics_expr)
1640}
1641
1642/// returns the string literal of the scalar value if it is a string
1643fn unpack_string(s: &ScalarValue) -> Option<&str> {
1644 s.try_as_str().flatten()
1645}
1646
1647fn extract_string_literal(expr: &Arc<dyn PhysicalExpr>) -> Option<&str> {
1648 if let Some(lit) = expr.as_any().downcast_ref::<phys_expr::Literal>() {
1649 let s = unpack_string(lit.value())?;
1650 return Some(s);
1651 }
1652 None
1653}
1654
1655/// Convert `column LIKE literal` where P is a constant prefix of the literal
1656/// to a range check on the column: `P <= column && column < P'`, where P' is the
1657/// lowest string after all P* strings.
1658fn build_like_match(
1659 expr_builder: &mut PruningExpressionBuilder,
1660) -> Option<Arc<dyn PhysicalExpr>> {
1661 // column LIKE literal => (min, max) LIKE literal split at % => min <= split literal && split literal <= max
1662 // column LIKE 'foo%' => min <= 'foo' && 'foo' <= max
1663 // column LIKE '%foo' => min <= '' && '' <= max => true
1664 // column LIKE '%foo%' => min <= '' && '' <= max => true
1665 // column LIKE 'foo' => min <= 'foo' && 'foo' <= max
1666
1667 // TODO Handle ILIKE perhaps by making the min lowercase and max uppercase
1668 // this may involve building the physical expressions that call lower() and upper()
1669 let min_column_expr = expr_builder.min_column_expr().ok()?;
1670 let max_column_expr = expr_builder.max_column_expr().ok()?;
1671 let scalar_expr = expr_builder.scalar_expr();
1672 // check that the scalar is a string literal
1673 let s = extract_string_literal(scalar_expr)?;
1674 // ANSI SQL specifies two wildcards: % and _. % matches zero or more characters, _ matches exactly one character.
1675 let first_wildcard_index = s.find(['%', '_']);
1676 if first_wildcard_index == Some(0) {
1677 // there's no filtering we could possibly do, return an error and have this be handled by the unhandled hook
1678 return None;
1679 }
1680 let (lower_bound, upper_bound) = if let Some(wildcard_index) = first_wildcard_index {
1681 let prefix = &s[..wildcard_index];
1682 let lower_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1683 prefix.to_string(),
1684 ))));
1685 let upper_bound_lit = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1686 increment_utf8(prefix)?,
1687 ))));
1688 (lower_bound_lit, upper_bound_lit)
1689 } else {
1690 // the like expression is a literal and can be converted into a comparison
1691 let bound = Arc::new(phys_expr::Literal::new(ScalarValue::Utf8(Some(
1692 s.to_string(),
1693 ))));
1694 (Arc::clone(&bound), bound)
1695 };
1696 let lower_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1697 lower_bound,
1698 Operator::LtEq,
1699 Arc::clone(&max_column_expr),
1700 ));
1701 let upper_bound_expr = Arc::new(phys_expr::BinaryExpr::new(
1702 Arc::clone(&min_column_expr),
1703 Operator::LtEq,
1704 upper_bound,
1705 ));
1706 let combined = Arc::new(phys_expr::BinaryExpr::new(
1707 upper_bound_expr,
1708 Operator::And,
1709 lower_bound_expr,
1710 ));
1711 Some(combined)
1712}
1713
1714// For predicate `col NOT LIKE 'const_prefix%'`, we rewrite it as `(col_min NOT LIKE 'const_prefix%' OR col_max NOT LIKE 'const_prefix%')`.
1715//
1716// The intuition is that if both `col_min` and `col_max` begin with `const_prefix` that means
1717// **all** data in this row group begins with `const_prefix` as well (and therefore the predicate
1718// looking for rows that don't begin with `const_prefix` can never be true)
1719fn build_not_like_match(
1720 expr_builder: &mut PruningExpressionBuilder<'_>,
1721) -> Result<Arc<dyn PhysicalExpr>> {
1722 // col NOT LIKE 'const_prefix%' -> !(col_min LIKE 'const_prefix%' && col_max LIKE 'const_prefix%') -> (col_min NOT LIKE 'const_prefix%' || col_max NOT LIKE 'const_prefix%')
1723
1724 let min_column_expr = expr_builder.min_column_expr()?;
1725 let max_column_expr = expr_builder.max_column_expr()?;
1726
1727 let scalar_expr = expr_builder.scalar_expr();
1728
1729 let pattern = extract_string_literal(scalar_expr).ok_or_else(|| {
1730 plan_datafusion_err!("cannot extract literal from NOT LIKE expression")
1731 })?;
1732
1733 let (const_prefix, remaining) = split_constant_prefix(pattern);
1734 if const_prefix.is_empty() || remaining != "%" {
1735 // we can not handle `%` at the beginning or in the middle of the pattern
1736 // Example: For pattern "foo%bar", the row group might include values like
1737 // ["foobar", "food", "foodbar"], making it unsafe to prune.
1738 // Even if the min/max values in the group (e.g., "foobar" and "foodbar")
1739 // match the pattern, intermediate values like "food" may not
1740 // match the full pattern "foo%bar", making pruning unsafe.
1741 // (truncate foo%bar to foo% have same problem)
1742
1743 // we can not handle pattern containing `_`
1744 // Example: For pattern "foo_", row groups might contain ["fooa", "fooaa", "foob"],
1745 // which means not every row is guaranteed to match the pattern.
1746 return Err(plan_datafusion_err!(
1747 "NOT LIKE expressions only support constant_prefix+wildcard`%`"
1748 ));
1749 }
1750
1751 let min_col_not_like_epxr = Arc::new(phys_expr::LikeExpr::new(
1752 true,
1753 false,
1754 Arc::clone(&min_column_expr),
1755 Arc::clone(scalar_expr),
1756 ));
1757
1758 let max_col_not_like_expr = Arc::new(phys_expr::LikeExpr::new(
1759 true,
1760 false,
1761 Arc::clone(&max_column_expr),
1762 Arc::clone(scalar_expr),
1763 ));
1764
1765 Ok(Arc::new(phys_expr::BinaryExpr::new(
1766 min_col_not_like_epxr,
1767 Operator::Or,
1768 max_col_not_like_expr,
1769 )))
1770}
1771
1772/// Returns unescaped constant prefix of a LIKE pattern (possibly empty) and the remaining pattern (possibly empty)
1773fn split_constant_prefix(pattern: &str) -> (&str, &str) {
1774 let char_indices = pattern.char_indices().collect::<Vec<_>>();
1775 for i in 0..char_indices.len() {
1776 let (idx, char) = char_indices[i];
1777 if char == '%' || char == '_' {
1778 if i != 0 && char_indices[i - 1].1 == '\\' {
1779 // ecsaped by `\`
1780 continue;
1781 }
1782 return (&pattern[..idx], &pattern[idx..]);
1783 }
1784 }
1785 (pattern, "")
1786}
1787
1788/// Increment a UTF8 string by one, returning `None` if it can't be incremented.
1789/// This makes it so that the returned string will always compare greater than the input string
1790/// or any other string with the same prefix.
1791/// This is necessary since the statistics may have been truncated: if we have a min statistic
1792/// of "fo" that may have originally been "foz" or anything else with the prefix "fo".
1793/// E.g. `increment_utf8("foo") >= "foo"` and `increment_utf8("foo") >= "fooz"`
1794/// In this example `increment_utf8("foo") == "fop"
1795fn increment_utf8(data: &str) -> Option<String> {
1796 // Helper function to check if a character is valid to use
1797 fn is_valid_unicode(c: char) -> bool {
1798 let cp = c as u32;
1799
1800 // Filter out non-characters (https://www.unicode.org/versions/corrigendum9.html)
1801 if [0xFFFE, 0xFFFF].contains(&cp) || (0xFDD0..=0xFDEF).contains(&cp) {
1802 return false;
1803 }
1804
1805 // Filter out private use area
1806 if cp >= 0x110000 {
1807 return false;
1808 }
1809
1810 true
1811 }
1812
1813 // Convert string to vector of code points
1814 let mut code_points: Vec<char> = data.chars().collect();
1815
1816 // Work backwards through code points
1817 for idx in (0..code_points.len()).rev() {
1818 let original = code_points[idx] as u32;
1819
1820 // Try incrementing the code point
1821 if let Some(next_char) = char::from_u32(original + 1) {
1822 if is_valid_unicode(next_char) {
1823 code_points[idx] = next_char;
1824 // truncate the string to the current index
1825 code_points.truncate(idx + 1);
1826 return Some(code_points.into_iter().collect());
1827 }
1828 }
1829 }
1830
1831 None
1832}
1833
1834/// Wrap the statistics expression in a check that skips the expression if the column is all nulls.
1835///
1836/// This is important not only as an optimization but also because statistics may not be
1837/// accurate for columns that are all nulls.
1838/// For example, for an `int` column `x` with all nulls, the min/max/null_count statistics
1839/// might be set to 0 and evaluating `x = 0` would incorrectly include the column.
1840///
1841/// For example:
1842///
1843/// `x_min <= 10 AND 10 <= x_max`
1844///
1845/// will become
1846///
1847/// ```sql
1848/// x_null_count != x_row_count AND (x_min <= 10 AND 10 <= x_max)
1849/// ````
1850///
1851/// If the column is known to be all nulls, then the expression
1852/// `x_null_count = x_row_count` will be true, which will cause the
1853/// boolean expression to return false. Therefore, prune out the container.
1854fn wrap_null_count_check_expr(
1855 statistics_expr: Arc<dyn PhysicalExpr>,
1856 expr_builder: &mut PruningExpressionBuilder,
1857) -> Result<Arc<dyn PhysicalExpr>> {
1858 // x_null_count != x_row_count
1859 let not_when_null_count_eq_row_count = Arc::new(phys_expr::BinaryExpr::new(
1860 expr_builder.null_count_column_expr()?,
1861 Operator::NotEq,
1862 expr_builder.row_count_column_expr()?,
1863 ));
1864
1865 // (x_null_count != x_row_count) AND (<statistics_expr>)
1866 Ok(Arc::new(phys_expr::BinaryExpr::new(
1867 not_when_null_count_eq_row_count,
1868 Operator::And,
1869 statistics_expr,
1870 )))
1871}
1872
1873#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1874pub(crate) enum StatisticsType {
1875 Min,
1876 Max,
1877 NullCount,
1878 RowCount,
1879}
1880
1881#[cfg(test)]
1882mod tests {
1883 use std::collections::HashMap;
1884 use std::ops::{Not, Rem};
1885
1886 use super::*;
1887 use datafusion_common::assert_batches_eq;
1888 use datafusion_expr::{col, lit};
1889
1890 use arrow::array::Decimal128Array;
1891 use arrow::{
1892 array::{BinaryArray, Int32Array, Int64Array, StringArray, UInt64Array},
1893 datatypes::TimeUnit,
1894 };
1895 use datafusion_expr::expr::InList;
1896 use datafusion_expr::{cast, is_null, try_cast, Expr};
1897 use datafusion_functions_nested::expr_fn::{array_has, make_array};
1898 use datafusion_physical_expr::expressions as phys_expr;
1899 use datafusion_physical_expr::planner::logical2physical;
1900
1901 #[derive(Debug, Default)]
1902 /// Mock statistic provider for tests
1903 ///
1904 /// Each row represents the statistics for a "container" (which
1905 /// might represent an entire parquet file, or directory of files,
1906 /// or some other collection of data for which we had statistics)
1907 ///
1908 /// Note All `ArrayRefs` must be the same size.
1909 struct ContainerStats {
1910 min: Option<ArrayRef>,
1911 max: Option<ArrayRef>,
1912 /// Optional values
1913 null_counts: Option<ArrayRef>,
1914 row_counts: Option<ArrayRef>,
1915 /// Optional known values (e.g. mimic a bloom filter)
1916 /// (value, contained)
1917 /// If present, all BooleanArrays must be the same size as min/max
1918 contained: Vec<(HashSet<ScalarValue>, BooleanArray)>,
1919 }
1920
1921 impl ContainerStats {
1922 fn new() -> Self {
1923 Default::default()
1924 }
1925 fn new_decimal128(
1926 min: impl IntoIterator<Item = Option<i128>>,
1927 max: impl IntoIterator<Item = Option<i128>>,
1928 precision: u8,
1929 scale: i8,
1930 ) -> Self {
1931 Self::new()
1932 .with_min(Arc::new(
1933 min.into_iter()
1934 .collect::<Decimal128Array>()
1935 .with_precision_and_scale(precision, scale)
1936 .unwrap(),
1937 ))
1938 .with_max(Arc::new(
1939 max.into_iter()
1940 .collect::<Decimal128Array>()
1941 .with_precision_and_scale(precision, scale)
1942 .unwrap(),
1943 ))
1944 }
1945
1946 fn new_i64(
1947 min: impl IntoIterator<Item = Option<i64>>,
1948 max: impl IntoIterator<Item = Option<i64>>,
1949 ) -> Self {
1950 Self::new()
1951 .with_min(Arc::new(min.into_iter().collect::<Int64Array>()))
1952 .with_max(Arc::new(max.into_iter().collect::<Int64Array>()))
1953 }
1954
1955 fn new_i32(
1956 min: impl IntoIterator<Item = Option<i32>>,
1957 max: impl IntoIterator<Item = Option<i32>>,
1958 ) -> Self {
1959 Self::new()
1960 .with_min(Arc::new(min.into_iter().collect::<Int32Array>()))
1961 .with_max(Arc::new(max.into_iter().collect::<Int32Array>()))
1962 }
1963
1964 fn new_utf8<'a>(
1965 min: impl IntoIterator<Item = Option<&'a str>>,
1966 max: impl IntoIterator<Item = Option<&'a str>>,
1967 ) -> Self {
1968 Self::new()
1969 .with_min(Arc::new(min.into_iter().collect::<StringArray>()))
1970 .with_max(Arc::new(max.into_iter().collect::<StringArray>()))
1971 }
1972
1973 fn new_bool(
1974 min: impl IntoIterator<Item = Option<bool>>,
1975 max: impl IntoIterator<Item = Option<bool>>,
1976 ) -> Self {
1977 Self::new()
1978 .with_min(Arc::new(min.into_iter().collect::<BooleanArray>()))
1979 .with_max(Arc::new(max.into_iter().collect::<BooleanArray>()))
1980 }
1981
1982 fn min(&self) -> Option<ArrayRef> {
1983 self.min.clone()
1984 }
1985
1986 fn max(&self) -> Option<ArrayRef> {
1987 self.max.clone()
1988 }
1989
1990 fn null_counts(&self) -> Option<ArrayRef> {
1991 self.null_counts.clone()
1992 }
1993
1994 fn row_counts(&self) -> Option<ArrayRef> {
1995 self.row_counts.clone()
1996 }
1997
1998 /// return an iterator over all arrays in this statistics
1999 fn arrays(&self) -> Vec<ArrayRef> {
2000 let contained_arrays = self
2001 .contained
2002 .iter()
2003 .map(|(_values, contained)| Arc::new(contained.clone()) as ArrayRef);
2004
2005 [
2006 self.min.as_ref().cloned(),
2007 self.max.as_ref().cloned(),
2008 self.null_counts.as_ref().cloned(),
2009 self.row_counts.as_ref().cloned(),
2010 ]
2011 .into_iter()
2012 .flatten()
2013 .chain(contained_arrays)
2014 .collect()
2015 }
2016
2017 /// Returns the number of containers represented by this statistics This
2018 /// picks the length of the first array as all arrays must have the same
2019 /// length (which is verified by `assert_invariants`).
2020 fn len(&self) -> usize {
2021 // pick the first non zero length
2022 self.arrays().iter().map(|a| a.len()).next().unwrap_or(0)
2023 }
2024
2025 /// Ensure that the lengths of all arrays are consistent
2026 fn assert_invariants(&self) {
2027 let mut prev_len = None;
2028
2029 for len in self.arrays().iter().map(|a| a.len()) {
2030 // Get a length, if we don't already have one
2031 match prev_len {
2032 None => {
2033 prev_len = Some(len);
2034 }
2035 Some(prev_len) => {
2036 assert_eq!(prev_len, len);
2037 }
2038 }
2039 }
2040 }
2041
2042 /// Add min values
2043 fn with_min(mut self, min: ArrayRef) -> Self {
2044 self.min = Some(min);
2045 self
2046 }
2047
2048 /// Add max values
2049 fn with_max(mut self, max: ArrayRef) -> Self {
2050 self.max = Some(max);
2051 self
2052 }
2053
2054 /// Add null counts. There must be the same number of null counts as
2055 /// there are containers
2056 fn with_null_counts(
2057 mut self,
2058 counts: impl IntoIterator<Item = Option<u64>>,
2059 ) -> Self {
2060 let null_counts: ArrayRef =
2061 Arc::new(counts.into_iter().collect::<UInt64Array>());
2062
2063 self.assert_invariants();
2064 self.null_counts = Some(null_counts);
2065 self
2066 }
2067
2068 /// Add row counts. There must be the same number of row counts as
2069 /// there are containers
2070 fn with_row_counts(
2071 mut self,
2072 counts: impl IntoIterator<Item = Option<u64>>,
2073 ) -> Self {
2074 let row_counts: ArrayRef =
2075 Arc::new(counts.into_iter().collect::<UInt64Array>());
2076
2077 self.assert_invariants();
2078 self.row_counts = Some(row_counts);
2079 self
2080 }
2081
2082 /// Add contained information.
2083 pub fn with_contained(
2084 mut self,
2085 values: impl IntoIterator<Item = ScalarValue>,
2086 contained: impl IntoIterator<Item = Option<bool>>,
2087 ) -> Self {
2088 let contained: BooleanArray = contained.into_iter().collect();
2089 let values: HashSet<_> = values.into_iter().collect();
2090
2091 self.contained.push((values, contained));
2092 self.assert_invariants();
2093 self
2094 }
2095
2096 /// get any contained information for the specified values
2097 fn contained(&self, find_values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
2098 // find the one with the matching values
2099 self.contained
2100 .iter()
2101 .find(|(values, _contained)| values == find_values)
2102 .map(|(_values, contained)| contained.clone())
2103 }
2104 }
2105
2106 #[derive(Debug, Default)]
2107 struct TestStatistics {
2108 // key: column name
2109 stats: HashMap<Column, ContainerStats>,
2110 }
2111
2112 impl TestStatistics {
2113 fn new() -> Self {
2114 Self::default()
2115 }
2116
2117 fn with(
2118 mut self,
2119 name: impl Into<String>,
2120 container_stats: ContainerStats,
2121 ) -> Self {
2122 let col = Column::from_name(name.into());
2123 self.stats.insert(col, container_stats);
2124 self
2125 }
2126
2127 /// Add null counts for the specified column.
2128 /// There must be the same number of null counts as
2129 /// there are containers
2130 fn with_null_counts(
2131 mut self,
2132 name: impl Into<String>,
2133 counts: impl IntoIterator<Item = Option<u64>>,
2134 ) -> Self {
2135 let col = Column::from_name(name.into());
2136
2137 // take stats out and update them
2138 let container_stats = self
2139 .stats
2140 .remove(&col)
2141 .unwrap_or_default()
2142 .with_null_counts(counts);
2143
2144 // put stats back in
2145 self.stats.insert(col, container_stats);
2146 self
2147 }
2148
2149 /// Add row counts for the specified column.
2150 /// There must be the same number of row counts as
2151 /// there are containers
2152 fn with_row_counts(
2153 mut self,
2154 name: impl Into<String>,
2155 counts: impl IntoIterator<Item = Option<u64>>,
2156 ) -> Self {
2157 let col = Column::from_name(name.into());
2158
2159 // take stats out and update them
2160 let container_stats = self
2161 .stats
2162 .remove(&col)
2163 .unwrap_or_default()
2164 .with_row_counts(counts);
2165
2166 // put stats back in
2167 self.stats.insert(col, container_stats);
2168 self
2169 }
2170
2171 /// Add contained information for the specified column.
2172 fn with_contained(
2173 mut self,
2174 name: impl Into<String>,
2175 values: impl IntoIterator<Item = ScalarValue>,
2176 contained: impl IntoIterator<Item = Option<bool>>,
2177 ) -> Self {
2178 let col = Column::from_name(name.into());
2179
2180 // take stats out and update them
2181 let container_stats = self
2182 .stats
2183 .remove(&col)
2184 .unwrap_or_default()
2185 .with_contained(values, contained);
2186
2187 // put stats back in
2188 self.stats.insert(col, container_stats);
2189 self
2190 }
2191 }
2192
2193 impl PruningStatistics for TestStatistics {
2194 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
2195 self.stats
2196 .get(column)
2197 .map(|container_stats| container_stats.min())
2198 .unwrap_or(None)
2199 }
2200
2201 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
2202 self.stats
2203 .get(column)
2204 .map(|container_stats| container_stats.max())
2205 .unwrap_or(None)
2206 }
2207
2208 fn num_containers(&self) -> usize {
2209 self.stats
2210 .values()
2211 .next()
2212 .map(|container_stats| container_stats.len())
2213 .unwrap_or(0)
2214 }
2215
2216 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
2217 self.stats
2218 .get(column)
2219 .map(|container_stats| container_stats.null_counts())
2220 .unwrap_or(None)
2221 }
2222
2223 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
2224 self.stats
2225 .get(column)
2226 .map(|container_stats| container_stats.row_counts())
2227 .unwrap_or(None)
2228 }
2229
2230 fn contained(
2231 &self,
2232 column: &Column,
2233 values: &HashSet<ScalarValue>,
2234 ) -> Option<BooleanArray> {
2235 self.stats
2236 .get(column)
2237 .and_then(|container_stats| container_stats.contained(values))
2238 }
2239 }
2240
2241 /// Returns the specified min/max container values
2242 struct OneContainerStats {
2243 min_values: Option<ArrayRef>,
2244 max_values: Option<ArrayRef>,
2245 num_containers: usize,
2246 }
2247
2248 impl PruningStatistics for OneContainerStats {
2249 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
2250 self.min_values.clone()
2251 }
2252
2253 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
2254 self.max_values.clone()
2255 }
2256
2257 fn num_containers(&self) -> usize {
2258 self.num_containers
2259 }
2260
2261 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
2262 None
2263 }
2264
2265 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
2266 None
2267 }
2268
2269 fn contained(
2270 &self,
2271 _column: &Column,
2272 _values: &HashSet<ScalarValue>,
2273 ) -> Option<BooleanArray> {
2274 None
2275 }
2276 }
2277
2278 /// Row count should only be referenced once in the pruning expression, even if we need the row count
2279 /// for multiple columns.
2280 #[test]
2281 fn test_unique_row_count_field_and_column() {
2282 // c1 = 100 AND c2 = 200
2283 let schema: SchemaRef = Arc::new(Schema::new(vec![
2284 Field::new("c1", DataType::Int32, true),
2285 Field::new("c2", DataType::Int32, true),
2286 ]));
2287 let expr = col("c1").eq(lit(100)).and(col("c2").eq(lit(200)));
2288 let expr = logical2physical(&expr, &schema);
2289 let p = PruningPredicate::try_new(expr, Arc::clone(&schema)).unwrap();
2290 // note pruning expression refers to row_count twice
2291 assert_eq!(
2292 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 100 AND 100 <= c1_max@1 AND c2_null_count@6 != row_count@3 AND c2_min@4 <= 200 AND 200 <= c2_max@5",
2293 p.predicate_expr.to_string()
2294 );
2295
2296 // Fields in required schema should be unique, otherwise when creating batches
2297 // it will fail because of duplicate field names
2298 let mut fields = HashSet::new();
2299 for (_col, _ty, field) in p.required_columns().iter() {
2300 let was_new = fields.insert(field);
2301 if !was_new {
2302 panic!(
2303 "Duplicate field in required schema: {:?}. Previous fields:\n{:#?}",
2304 field, fields
2305 );
2306 }
2307 }
2308 }
2309
2310 #[test]
2311 fn prune_all_rows_null_counts() {
2312 // if null_count = row_count then we should prune the container for i = 0
2313 // regardless of the statistics
2314 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2315 let statistics = TestStatistics::new().with(
2316 "i",
2317 ContainerStats::new_i32(
2318 vec![Some(0)], // min
2319 vec![Some(0)], // max
2320 )
2321 .with_null_counts(vec![Some(1)])
2322 .with_row_counts(vec![Some(1)]),
2323 );
2324 let expected_ret = &[false];
2325 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2326
2327 // this should be true even if the container stats are missing
2328 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2329 let container_stats = ContainerStats {
2330 min: Some(Arc::new(Int32Array::from(vec![None]))),
2331 max: Some(Arc::new(Int32Array::from(vec![None]))),
2332 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2333 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2334 ..ContainerStats::default()
2335 };
2336 let statistics = TestStatistics::new().with("i", container_stats);
2337 let expected_ret = &[false];
2338 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2339
2340 // If the null counts themselves are missing we should be able to fall back to the stats
2341 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2342 let container_stats = ContainerStats {
2343 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2344 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2345 null_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2346 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2347 ..ContainerStats::default()
2348 };
2349 let statistics = TestStatistics::new().with("i", container_stats);
2350 let expected_ret = &[true];
2351 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2352 let expected_ret = &[false];
2353 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2354
2355 // Same for the row counts
2356 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2357 let container_stats = ContainerStats {
2358 min: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2359 max: Some(Arc::new(Int32Array::from(vec![Some(0)]))),
2360 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(1)]))),
2361 row_counts: Some(Arc::new(UInt64Array::from(vec![None]))),
2362 ..ContainerStats::default()
2363 };
2364 let statistics = TestStatistics::new().with("i", container_stats);
2365 let expected_ret = &[true];
2366 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2367 let expected_ret = &[false];
2368 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2369 }
2370
2371 #[test]
2372 fn prune_missing_statistics() {
2373 // If the min or max stats are missing we should not prune
2374 // (unless we know all rows are null, see `prune_all_rows_null_counts`)
2375 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2376 let container_stats = ContainerStats {
2377 min: Some(Arc::new(Int32Array::from(vec![None, Some(0)]))),
2378 max: Some(Arc::new(Int32Array::from(vec![Some(0), None]))),
2379 null_counts: Some(Arc::new(UInt64Array::from(vec![Some(0), Some(0)]))),
2380 row_counts: Some(Arc::new(UInt64Array::from(vec![Some(1), Some(1)]))),
2381 ..ContainerStats::default()
2382 };
2383 let statistics = TestStatistics::new().with("i", container_stats);
2384 let expected_ret = &[true, true];
2385 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2386 let expected_ret = &[false, true];
2387 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
2388 let expected_ret = &[true, false];
2389 prune_with_expr(col("i").lt(lit(0)), &schema, &statistics, expected_ret);
2390 }
2391
2392 #[test]
2393 fn prune_null_stats() {
2394 // if null_count = row_count then we should prune the container for i = 0
2395 // regardless of the statistics
2396 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
2397
2398 let statistics = TestStatistics::new().with(
2399 "i",
2400 ContainerStats::new_i32(
2401 vec![Some(0)], // min
2402 vec![Some(0)], // max
2403 )
2404 .with_null_counts(vec![Some(1)])
2405 .with_row_counts(vec![Some(1)]),
2406 );
2407
2408 let expected_ret = &[false];
2409
2410 // i = 0
2411 prune_with_expr(col("i").eq(lit(0)), &schema, &statistics, expected_ret);
2412 }
2413
2414 #[test]
2415 fn test_build_statistics_record_batch() {
2416 // Request a record batch with of s1_min, s2_max, s3_max, s3_min
2417 let required_columns = RequiredColumns::from(vec![
2418 // min of original column s1, named s1_min
2419 (
2420 phys_expr::Column::new("s1", 1),
2421 StatisticsType::Min,
2422 Field::new("s1_min", DataType::Int32, true),
2423 ),
2424 // max of original column s2, named s2_max
2425 (
2426 phys_expr::Column::new("s2", 2),
2427 StatisticsType::Max,
2428 Field::new("s2_max", DataType::Int32, true),
2429 ),
2430 // max of original column s3, named s3_max
2431 (
2432 phys_expr::Column::new("s3", 3),
2433 StatisticsType::Max,
2434 Field::new("s3_max", DataType::Utf8, true),
2435 ),
2436 // min of original column s3, named s3_min
2437 (
2438 phys_expr::Column::new("s3", 3),
2439 StatisticsType::Min,
2440 Field::new("s3_min", DataType::Utf8, true),
2441 ),
2442 ]);
2443
2444 let statistics = TestStatistics::new()
2445 .with(
2446 "s1",
2447 ContainerStats::new_i32(
2448 vec![None, None, Some(9), None], // min
2449 vec![Some(10), None, None, None], // max
2450 ),
2451 )
2452 .with(
2453 "s2",
2454 ContainerStats::new_i32(
2455 vec![Some(2), None, None, None], // min
2456 vec![Some(20), None, None, None], // max
2457 ),
2458 )
2459 .with(
2460 "s3",
2461 ContainerStats::new_utf8(
2462 vec![Some("a"), None, None, None], // min
2463 vec![Some("q"), None, Some("r"), None], // max
2464 ),
2465 );
2466
2467 let batch =
2468 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2469 let expected = [
2470 "+--------+--------+--------+--------+",
2471 "| s1_min | s2_max | s3_max | s3_min |",
2472 "+--------+--------+--------+--------+",
2473 "| | 20 | q | a |",
2474 "| | | | |",
2475 "| 9 | | r | |",
2476 "| | | | |",
2477 "+--------+--------+--------+--------+",
2478 ];
2479
2480 assert_batches_eq!(expected, &[batch]);
2481 }
2482
2483 #[test]
2484 fn test_build_statistics_casting() {
2485 // Test requesting a Timestamp column, but getting statistics as Int64
2486 // which is what Parquet does
2487
2488 // Request a record batch with of s1_min as a timestamp
2489 let required_columns = RequiredColumns::from(vec![(
2490 phys_expr::Column::new("s3", 3),
2491 StatisticsType::Min,
2492 Field::new(
2493 "s1_min",
2494 DataType::Timestamp(TimeUnit::Nanosecond, None),
2495 true,
2496 ),
2497 )]);
2498
2499 // Note the statistics pass back i64 (not timestamp)
2500 let statistics = OneContainerStats {
2501 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2502 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2503 num_containers: 1,
2504 };
2505
2506 let batch =
2507 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2508 let expected = [
2509 "+-------------------------------+",
2510 "| s1_min |",
2511 "+-------------------------------+",
2512 "| 1970-01-01T00:00:00.000000010 |",
2513 "+-------------------------------+",
2514 ];
2515
2516 assert_batches_eq!(expected, &[batch]);
2517 }
2518
2519 #[test]
2520 fn test_build_statistics_no_required_stats() {
2521 let required_columns = RequiredColumns::new();
2522
2523 let statistics = OneContainerStats {
2524 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2525 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2526 num_containers: 1,
2527 };
2528
2529 let batch =
2530 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2531 assert_eq!(batch.num_rows(), 1); // had 1 container
2532 }
2533
2534 #[test]
2535 fn test_build_statistics_inconsistent_types() {
2536 // Test requesting a Utf8 column when the stats return some other type
2537
2538 // Request a record batch with of s1_min as a timestamp
2539 let required_columns = RequiredColumns::from(vec![(
2540 phys_expr::Column::new("s3", 3),
2541 StatisticsType::Min,
2542 Field::new("s1_min", DataType::Utf8, true),
2543 )]);
2544
2545 // Note the statistics return an invalid UTF-8 sequence which will be converted to null
2546 let statistics = OneContainerStats {
2547 min_values: Some(Arc::new(BinaryArray::from(vec![&[255u8] as &[u8]]))),
2548 max_values: None,
2549 num_containers: 1,
2550 };
2551
2552 let batch =
2553 build_statistics_record_batch(&statistics, &required_columns).unwrap();
2554 let expected = [
2555 "+--------+",
2556 "| s1_min |",
2557 "+--------+",
2558 "| |",
2559 "+--------+",
2560 ];
2561
2562 assert_batches_eq!(expected, &[batch]);
2563 }
2564
2565 #[test]
2566 fn test_build_statistics_inconsistent_length() {
2567 // return an inconsistent length to the actual statistics arrays
2568 let required_columns = RequiredColumns::from(vec![(
2569 phys_expr::Column::new("s1", 3),
2570 StatisticsType::Min,
2571 Field::new("s1_min", DataType::Int64, true),
2572 )]);
2573
2574 // Note the statistics pass back i64 (not timestamp)
2575 let statistics = OneContainerStats {
2576 min_values: Some(Arc::new(Int64Array::from(vec![Some(10)]))),
2577 max_values: Some(Arc::new(Int64Array::from(vec![Some(20)]))),
2578 num_containers: 3,
2579 };
2580
2581 let result =
2582 build_statistics_record_batch(&statistics, &required_columns).unwrap_err();
2583 assert!(
2584 result
2585 .to_string()
2586 .contains("mismatched statistics length. Expected 3, got 1"),
2587 "{}",
2588 result
2589 );
2590 }
2591
2592 #[test]
2593 fn row_group_predicate_eq() -> Result<()> {
2594 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2595 let expected_expr =
2596 "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1";
2597
2598 // test column on the left
2599 let expr = col("c1").eq(lit(1));
2600 let predicate_expr =
2601 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2602 assert_eq!(predicate_expr.to_string(), expected_expr);
2603
2604 // test column on the right
2605 let expr = lit(1).eq(col("c1"));
2606 let predicate_expr =
2607 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2608 assert_eq!(predicate_expr.to_string(), expected_expr);
2609
2610 Ok(())
2611 }
2612
2613 #[test]
2614 fn row_group_predicate_not_eq() -> Result<()> {
2615 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2616 let expected_expr =
2617 "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1)";
2618
2619 // test column on the left
2620 let expr = col("c1").not_eq(lit(1));
2621 let predicate_expr =
2622 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2623 assert_eq!(predicate_expr.to_string(), expected_expr);
2624
2625 // test column on the right
2626 let expr = lit(1).not_eq(col("c1"));
2627 let predicate_expr =
2628 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2629 assert_eq!(predicate_expr.to_string(), expected_expr);
2630
2631 Ok(())
2632 }
2633
2634 #[test]
2635 fn row_group_predicate_gt() -> Result<()> {
2636 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2637 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 > 1";
2638
2639 // test column on the left
2640 let expr = col("c1").gt(lit(1));
2641 let predicate_expr =
2642 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2643 assert_eq!(predicate_expr.to_string(), expected_expr);
2644
2645 // test column on the right
2646 let expr = lit(1).lt(col("c1"));
2647 let predicate_expr =
2648 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2649 assert_eq!(predicate_expr.to_string(), expected_expr);
2650
2651 Ok(())
2652 }
2653
2654 #[test]
2655 fn row_group_predicate_gt_eq() -> Result<()> {
2656 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2657 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_max@0 >= 1";
2658
2659 // test column on the left
2660 let expr = col("c1").gt_eq(lit(1));
2661 let predicate_expr =
2662 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2663 assert_eq!(predicate_expr.to_string(), expected_expr);
2664 // test column on the right
2665 let expr = lit(1).lt_eq(col("c1"));
2666 let predicate_expr =
2667 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2668 assert_eq!(predicate_expr.to_string(), expected_expr);
2669
2670 Ok(())
2671 }
2672
2673 #[test]
2674 fn row_group_predicate_lt() -> Result<()> {
2675 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2676 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2677
2678 // test column on the left
2679 let expr = col("c1").lt(lit(1));
2680 let predicate_expr =
2681 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2682 assert_eq!(predicate_expr.to_string(), expected_expr);
2683
2684 // test column on the right
2685 let expr = lit(1).gt(col("c1"));
2686 let predicate_expr =
2687 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2688 assert_eq!(predicate_expr.to_string(), expected_expr);
2689
2690 Ok(())
2691 }
2692
2693 #[test]
2694 fn row_group_predicate_lt_eq() -> Result<()> {
2695 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2696 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 <= 1";
2697
2698 // test column on the left
2699 let expr = col("c1").lt_eq(lit(1));
2700 let predicate_expr =
2701 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2702 assert_eq!(predicate_expr.to_string(), expected_expr);
2703 // test column on the right
2704 let expr = lit(1).gt_eq(col("c1"));
2705 let predicate_expr =
2706 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2707 assert_eq!(predicate_expr.to_string(), expected_expr);
2708
2709 Ok(())
2710 }
2711
2712 #[test]
2713 fn row_group_predicate_and() -> Result<()> {
2714 let schema = Schema::new(vec![
2715 Field::new("c1", DataType::Int32, false),
2716 Field::new("c2", DataType::Int32, false),
2717 Field::new("c3", DataType::Int32, false),
2718 ]);
2719 // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression
2720 let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3")));
2721 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1";
2722 let predicate_expr =
2723 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2724 assert_eq!(predicate_expr.to_string(), expected_expr);
2725
2726 Ok(())
2727 }
2728
2729 #[test]
2730 fn row_group_predicate_or() -> Result<()> {
2731 let schema = Schema::new(vec![
2732 Field::new("c1", DataType::Int32, false),
2733 Field::new("c2", DataType::Int32, false),
2734 ]);
2735 // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 = 0 expression
2736 let expr = col("c1").lt(lit(1)).or(col("c2").rem(lit(2)).eq(lit(0)));
2737 let expected_expr = "true";
2738 let predicate_expr =
2739 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2740 assert_eq!(predicate_expr.to_string(), expected_expr);
2741
2742 Ok(())
2743 }
2744
2745 #[test]
2746 fn row_group_predicate_not() -> Result<()> {
2747 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2748 let expected_expr = "true";
2749
2750 let expr = col("c1").not();
2751 let predicate_expr =
2752 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2753 assert_eq!(predicate_expr.to_string(), expected_expr);
2754
2755 Ok(())
2756 }
2757
2758 #[test]
2759 fn row_group_predicate_not_bool() -> Result<()> {
2760 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2761 let expected_expr = "NOT c1_min@0 AND c1_max@1";
2762
2763 let expr = col("c1").not();
2764 let predicate_expr =
2765 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2766 assert_eq!(predicate_expr.to_string(), expected_expr);
2767
2768 Ok(())
2769 }
2770
2771 #[test]
2772 fn row_group_predicate_bool() -> Result<()> {
2773 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2774 let expected_expr = "c1_min@0 OR c1_max@1";
2775
2776 let expr = col("c1");
2777 let predicate_expr =
2778 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2779 assert_eq!(predicate_expr.to_string(), expected_expr);
2780
2781 Ok(())
2782 }
2783
2784 #[test]
2785 fn row_group_predicate_lt_bool() -> Result<()> {
2786 let schema = Schema::new(vec![Field::new("c1", DataType::Boolean, false)]);
2787 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < true";
2788
2789 // DF doesn't support arithmetic on boolean columns so
2790 // this predicate will error when evaluated
2791 let expr = col("c1").lt(lit(true));
2792 let predicate_expr =
2793 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2794 assert_eq!(predicate_expr.to_string(), expected_expr);
2795
2796 Ok(())
2797 }
2798
2799 #[test]
2800 fn row_group_predicate_required_columns() -> Result<()> {
2801 let schema = Schema::new(vec![
2802 Field::new("c1", DataType::Int32, false),
2803 Field::new("c2", DataType::Int32, false),
2804 ]);
2805 let mut required_columns = RequiredColumns::new();
2806 // c1 < 1 and (c2 = 2 or c2 = 3)
2807 let expr = col("c1")
2808 .lt(lit(1))
2809 .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3))));
2810 let expected_expr = "c1_null_count@1 != row_count@2 AND c1_min@0 < 1 AND (c2_null_count@5 != row_count@2 AND c2_min@3 <= 2 AND 2 <= c2_max@4 OR c2_null_count@5 != row_count@2 AND c2_min@3 <= 3 AND 3 <= c2_max@4)";
2811 let predicate_expr =
2812 test_build_predicate_expression(&expr, &schema, &mut required_columns);
2813 assert_eq!(predicate_expr.to_string(), expected_expr);
2814 println!("required_columns: {:#?}", required_columns); // for debugging assertions below
2815 // c1 < 1 should add c1_min
2816 let c1_min_field = Field::new("c1_min", DataType::Int32, false);
2817 assert_eq!(
2818 required_columns.columns[0],
2819 (
2820 phys_expr::Column::new("c1", 0),
2821 StatisticsType::Min,
2822 c1_min_field.with_nullable(true) // could be nullable if stats are not present
2823 )
2824 );
2825 // c1 < 1 should add c1_null_count
2826 let c1_null_count_field = Field::new("c1_null_count", DataType::UInt64, false);
2827 assert_eq!(
2828 required_columns.columns[1],
2829 (
2830 phys_expr::Column::new("c1", 0),
2831 StatisticsType::NullCount,
2832 c1_null_count_field.with_nullable(true) // could be nullable if stats are not present
2833 )
2834 );
2835 // c1 < 1 should add row_count
2836 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2837 assert_eq!(
2838 required_columns.columns[2],
2839 (
2840 phys_expr::Column::new("c1", 0),
2841 StatisticsType::RowCount,
2842 row_count_field.with_nullable(true) // could be nullable if stats are not present
2843 )
2844 );
2845 // c2 = 2 should add c2_min and c2_max
2846 let c2_min_field = Field::new("c2_min", DataType::Int32, false);
2847 assert_eq!(
2848 required_columns.columns[3],
2849 (
2850 phys_expr::Column::new("c2", 1),
2851 StatisticsType::Min,
2852 c2_min_field.with_nullable(true) // could be nullable if stats are not present
2853 )
2854 );
2855 let c2_max_field = Field::new("c2_max", DataType::Int32, false);
2856 assert_eq!(
2857 required_columns.columns[4],
2858 (
2859 phys_expr::Column::new("c2", 1),
2860 StatisticsType::Max,
2861 c2_max_field.with_nullable(true) // could be nullable if stats are not present
2862 )
2863 );
2864 // c2 = 2 should add c2_null_count
2865 let c2_null_count_field = Field::new("c2_null_count", DataType::UInt64, false);
2866 assert_eq!(
2867 required_columns.columns[5],
2868 (
2869 phys_expr::Column::new("c2", 1),
2870 StatisticsType::NullCount,
2871 c2_null_count_field.with_nullable(true) // could be nullable if stats are not present
2872 )
2873 );
2874 // c2 = 1 should add row_count
2875 let row_count_field = Field::new("row_count", DataType::UInt64, false);
2876 assert_eq!(
2877 required_columns.columns[2],
2878 (
2879 phys_expr::Column::new("c1", 0),
2880 StatisticsType::RowCount,
2881 row_count_field.with_nullable(true) // could be nullable if stats are not present
2882 )
2883 );
2884 // c2 = 3 shouldn't add any new statistics fields
2885 assert_eq!(required_columns.columns.len(), 6);
2886
2887 Ok(())
2888 }
2889
2890 #[test]
2891 fn row_group_predicate_in_list() -> Result<()> {
2892 let schema = Schema::new(vec![
2893 Field::new("c1", DataType::Int32, false),
2894 Field::new("c2", DataType::Int32, false),
2895 ]);
2896 // test c1 in(1, 2, 3)
2897 let expr = Expr::InList(InList::new(
2898 Box::new(col("c1")),
2899 vec![lit(1), lit(2), lit(3)],
2900 false,
2901 ));
2902 let expected_expr = "c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 3 AND 3 <= c1_max@1";
2903 let predicate_expr =
2904 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2905 assert_eq!(predicate_expr.to_string(), expected_expr);
2906
2907 Ok(())
2908 }
2909
2910 #[test]
2911 fn row_group_predicate_in_list_empty() -> Result<()> {
2912 let schema = Schema::new(vec![
2913 Field::new("c1", DataType::Int32, false),
2914 Field::new("c2", DataType::Int32, false),
2915 ]);
2916 // test c1 in()
2917 let expr = Expr::InList(InList::new(Box::new(col("c1")), vec![], false));
2918 let expected_expr = "true";
2919 let predicate_expr =
2920 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2921 assert_eq!(predicate_expr.to_string(), expected_expr);
2922
2923 Ok(())
2924 }
2925
2926 #[test]
2927 fn row_group_predicate_in_list_negated() -> Result<()> {
2928 let schema = Schema::new(vec![
2929 Field::new("c1", DataType::Int32, false),
2930 Field::new("c2", DataType::Int32, false),
2931 ]);
2932 // test c1 not in(1, 2, 3)
2933 let expr = Expr::InList(InList::new(
2934 Box::new(col("c1")),
2935 vec![lit(1), lit(2), lit(3)],
2936 true,
2937 ));
2938 let expected_expr = "c1_null_count@2 != row_count@3 AND (c1_min@0 != 1 OR 1 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 2 OR 2 != c1_max@1) AND c1_null_count@2 != row_count@3 AND (c1_min@0 != 3 OR 3 != c1_max@1)";
2939 let predicate_expr =
2940 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
2941 assert_eq!(predicate_expr.to_string(), expected_expr);
2942
2943 Ok(())
2944 }
2945
2946 #[test]
2947 fn row_group_predicate_between() -> Result<()> {
2948 let schema = Schema::new(vec![
2949 Field::new("c1", DataType::Int32, false),
2950 Field::new("c2", DataType::Int32, false),
2951 ]);
2952
2953 // test c1 BETWEEN 1 AND 5
2954 let expr1 = col("c1").between(lit(1), lit(5));
2955
2956 // test 1 <= c1 <= 5
2957 let expr2 = col("c1").gt_eq(lit(1)).and(col("c1").lt_eq(lit(5)));
2958
2959 let predicate_expr1 =
2960 test_build_predicate_expression(&expr1, &schema, &mut RequiredColumns::new());
2961
2962 let predicate_expr2 =
2963 test_build_predicate_expression(&expr2, &schema, &mut RequiredColumns::new());
2964 assert_eq!(predicate_expr1.to_string(), predicate_expr2.to_string());
2965
2966 Ok(())
2967 }
2968
2969 #[test]
2970 fn row_group_predicate_between_with_in_list() -> Result<()> {
2971 let schema = Schema::new(vec![
2972 Field::new("c1", DataType::Int32, false),
2973 Field::new("c2", DataType::Int32, false),
2974 ]);
2975 // test c1 in(1, 2)
2976 let expr1 = col("c1").in_list(vec![lit(1), lit(2)], false);
2977
2978 // test c2 BETWEEN 4 AND 5
2979 let expr2 = col("c2").between(lit(4), lit(5));
2980
2981 // test c1 in(1, 2) and c2 BETWEEN 4 AND 5
2982 let expr3 = expr1.and(expr2);
2983
2984 let expected_expr = "(c1_null_count@2 != row_count@3 AND c1_min@0 <= 1 AND 1 <= c1_max@1 OR c1_null_count@2 != row_count@3 AND c1_min@0 <= 2 AND 2 <= c1_max@1) AND c2_null_count@5 != row_count@3 AND c2_max@4 >= 4 AND c2_null_count@5 != row_count@3 AND c2_min@6 <= 5";
2985 let predicate_expr =
2986 test_build_predicate_expression(&expr3, &schema, &mut RequiredColumns::new());
2987 assert_eq!(predicate_expr.to_string(), expected_expr);
2988
2989 Ok(())
2990 }
2991
2992 #[test]
2993 fn row_group_predicate_in_list_to_many_values() -> Result<()> {
2994 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
2995 // test c1 in(1..21)
2996 // in pruning.rs has MAX_LIST_VALUE_SIZE_REWRITE = 20, more than this value will be rewrite
2997 // always true
2998 let expr = col("c1").in_list((1..=21).map(lit).collect(), false);
2999
3000 let expected_expr = "true";
3001 let predicate_expr =
3002 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3003 assert_eq!(predicate_expr.to_string(), expected_expr);
3004
3005 Ok(())
3006 }
3007
3008 #[test]
3009 fn row_group_predicate_cast() -> Result<()> {
3010 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3011 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64)";
3012
3013 // test cast(c1 as int64) = 1
3014 // test column on the left
3015 let expr = cast(col("c1"), DataType::Int64).eq(lit(ScalarValue::Int64(Some(1))));
3016 let predicate_expr =
3017 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3018 assert_eq!(predicate_expr.to_string(), expected_expr);
3019
3020 // test column on the right
3021 let expr = lit(ScalarValue::Int64(Some(1))).eq(cast(col("c1"), DataType::Int64));
3022 let predicate_expr =
3023 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3024 assert_eq!(predicate_expr.to_string(), expected_expr);
3025
3026 let expected_expr =
3027 "c1_null_count@1 != row_count@2 AND TRY_CAST(c1_max@0 AS Int64) > 1";
3028
3029 // test column on the left
3030 let expr =
3031 try_cast(col("c1"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(1))));
3032 let predicate_expr =
3033 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3034 assert_eq!(predicate_expr.to_string(), expected_expr);
3035
3036 // test column on the right
3037 let expr =
3038 lit(ScalarValue::Int64(Some(1))).lt(try_cast(col("c1"), DataType::Int64));
3039 let predicate_expr =
3040 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3041 assert_eq!(predicate_expr.to_string(), expected_expr);
3042
3043 Ok(())
3044 }
3045
3046 #[test]
3047 fn row_group_predicate_cast_list() -> Result<()> {
3048 let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
3049 // test cast(c1 as int64) in int64(1, 2, 3)
3050 let expr = Expr::InList(InList::new(
3051 Box::new(cast(col("c1"), DataType::Int64)),
3052 vec![
3053 lit(ScalarValue::Int64(Some(1))),
3054 lit(ScalarValue::Int64(Some(2))),
3055 lit(ScalarValue::Int64(Some(3))),
3056 ],
3057 false,
3058 ));
3059 let expected_expr = "c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 1 AND 1 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 2 AND 2 <= CAST(c1_max@1 AS Int64) OR c1_null_count@2 != row_count@3 AND CAST(c1_min@0 AS Int64) <= 3 AND 3 <= CAST(c1_max@1 AS Int64)";
3060 let predicate_expr =
3061 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3062 assert_eq!(predicate_expr.to_string(), expected_expr);
3063
3064 let expr = Expr::InList(InList::new(
3065 Box::new(cast(col("c1"), DataType::Int64)),
3066 vec![
3067 lit(ScalarValue::Int64(Some(1))),
3068 lit(ScalarValue::Int64(Some(2))),
3069 lit(ScalarValue::Int64(Some(3))),
3070 ],
3071 true,
3072 ));
3073 let expected_expr = "c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 1 OR 1 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 2 OR 2 != CAST(c1_max@1 AS Int64)) AND c1_null_count@2 != row_count@3 AND (CAST(c1_min@0 AS Int64) != 3 OR 3 != CAST(c1_max@1 AS Int64))";
3074 let predicate_expr =
3075 test_build_predicate_expression(&expr, &schema, &mut RequiredColumns::new());
3076 assert_eq!(predicate_expr.to_string(), expected_expr);
3077
3078 Ok(())
3079 }
3080
3081 #[test]
3082 fn prune_decimal_data() {
3083 // decimal(9,2)
3084 let schema = Arc::new(Schema::new(vec![Field::new(
3085 "s1",
3086 DataType::Decimal128(9, 2),
3087 true,
3088 )]));
3089
3090 prune_with_expr(
3091 // s1 > 5
3092 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))),
3093 &schema,
3094 // If the data is written by spark, the physical data type is INT32 in the parquet
3095 // So we use the INT32 type of statistic.
3096 &TestStatistics::new().with(
3097 "s1",
3098 ContainerStats::new_i32(
3099 vec![Some(0), Some(4), None, Some(3)], // min
3100 vec![Some(5), Some(6), Some(4), None], // max
3101 ),
3102 ),
3103 &[false, true, false, true],
3104 );
3105
3106 prune_with_expr(
3107 // with cast column to other type
3108 cast(col("s1"), DataType::Decimal128(14, 3))
3109 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3110 &schema,
3111 &TestStatistics::new().with(
3112 "s1",
3113 ContainerStats::new_i32(
3114 vec![Some(0), Some(4), None, Some(3)], // min
3115 vec![Some(5), Some(6), Some(4), None], // max
3116 ),
3117 ),
3118 &[false, true, false, true],
3119 );
3120
3121 prune_with_expr(
3122 // with try cast column to other type
3123 try_cast(col("s1"), DataType::Decimal128(14, 3))
3124 .gt(lit(ScalarValue::Decimal128(Some(5000), 14, 3))),
3125 &schema,
3126 &TestStatistics::new().with(
3127 "s1",
3128 ContainerStats::new_i32(
3129 vec![Some(0), Some(4), None, Some(3)], // min
3130 vec![Some(5), Some(6), Some(4), None], // max
3131 ),
3132 ),
3133 &[false, true, false, true],
3134 );
3135
3136 // decimal(18,2)
3137 let schema = Arc::new(Schema::new(vec![Field::new(
3138 "s1",
3139 DataType::Decimal128(18, 2),
3140 true,
3141 )]));
3142 prune_with_expr(
3143 // s1 > 5
3144 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 18, 2))),
3145 &schema,
3146 // If the data is written by spark, the physical data type is INT64 in the parquet
3147 // So we use the INT32 type of statistic.
3148 &TestStatistics::new().with(
3149 "s1",
3150 ContainerStats::new_i64(
3151 vec![Some(0), Some(4), None, Some(3)], // min
3152 vec![Some(5), Some(6), Some(4), None], // max
3153 ),
3154 ),
3155 &[false, true, false, true],
3156 );
3157
3158 // decimal(23,2)
3159 let schema = Arc::new(Schema::new(vec![Field::new(
3160 "s1",
3161 DataType::Decimal128(23, 2),
3162 true,
3163 )]));
3164
3165 prune_with_expr(
3166 // s1 > 5
3167 col("s1").gt(lit(ScalarValue::Decimal128(Some(500), 23, 2))),
3168 &schema,
3169 &TestStatistics::new().with(
3170 "s1",
3171 ContainerStats::new_decimal128(
3172 vec![Some(0), Some(400), None, Some(300)], // min
3173 vec![Some(500), Some(600), Some(400), None], // max
3174 23,
3175 2,
3176 ),
3177 ),
3178 &[false, true, false, true],
3179 );
3180 }
3181
3182 #[test]
3183 fn prune_api() {
3184 let schema = Arc::new(Schema::new(vec![
3185 Field::new("s1", DataType::Utf8, true),
3186 Field::new("s2", DataType::Int32, true),
3187 ]));
3188
3189 let statistics = TestStatistics::new().with(
3190 "s2",
3191 ContainerStats::new_i32(
3192 vec![Some(0), Some(4), None, Some(3)], // min
3193 vec![Some(5), Some(6), None, None], // max
3194 ),
3195 );
3196 prune_with_expr(
3197 // Prune using s2 > 5
3198 col("s2").gt(lit(5)),
3199 &schema,
3200 &statistics,
3201 // s2 [0, 5] ==> no rows should pass
3202 // s2 [4, 6] ==> some rows could pass
3203 // No stats for s2 ==> some rows could pass
3204 // s2 [3, None] (null max) ==> some rows could pass
3205 &[false, true, true, true],
3206 );
3207
3208 prune_with_expr(
3209 // filter with cast
3210 cast(col("s2"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(5)))),
3211 &schema,
3212 &statistics,
3213 &[false, true, true, true],
3214 );
3215 }
3216
3217 #[test]
3218 fn prune_not_eq_data() {
3219 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3220
3221 prune_with_expr(
3222 // Prune using s2 != 'M'
3223 col("s1").not_eq(lit("M")),
3224 &schema,
3225 &TestStatistics::new().with(
3226 "s1",
3227 ContainerStats::new_utf8(
3228 vec![Some("A"), Some("A"), Some("N"), Some("M"), None, Some("A")], // min
3229 vec![Some("Z"), Some("L"), Some("Z"), Some("M"), None, None], // max
3230 ),
3231 ),
3232 // s1 [A, Z] ==> might have values that pass predicate
3233 // s1 [A, L] ==> all rows pass the predicate
3234 // s1 [N, Z] ==> all rows pass the predicate
3235 // s1 [M, M] ==> all rows do not pass the predicate
3236 // No stats for s2 ==> some rows could pass
3237 // s2 [3, None] (null max) ==> some rows could pass
3238 &[true, true, true, false, true, true],
3239 );
3240 }
3241
3242 /// Creates setup for boolean chunk pruning
3243 ///
3244 /// For predicate "b1" (boolean expr)
3245 /// b1 [false, false] ==> no rows can pass (not keep)
3246 /// b1 [false, true] ==> some rows could pass (must keep)
3247 /// b1 [true, true] ==> all rows must pass (must keep)
3248 /// b1 [NULL, NULL] ==> unknown (must keep)
3249 /// b1 [false, NULL] ==> unknown (must keep)
3250 ///
3251 /// For predicate "!b1" (boolean expr)
3252 /// b1 [false, false] ==> all rows pass (must keep)
3253 /// b1 [false, true] ==> some rows could pass (must keep)
3254 /// b1 [true, true] ==> no rows can pass (not keep)
3255 /// b1 [NULL, NULL] ==> unknown (must keep)
3256 /// b1 [false, NULL] ==> unknown (must keep)
3257 fn bool_setup() -> (SchemaRef, TestStatistics, Vec<bool>, Vec<bool>) {
3258 let schema =
3259 Arc::new(Schema::new(vec![Field::new("b1", DataType::Boolean, true)]));
3260
3261 let statistics = TestStatistics::new().with(
3262 "b1",
3263 ContainerStats::new_bool(
3264 vec![Some(false), Some(false), Some(true), None, Some(false)], // min
3265 vec![Some(false), Some(true), Some(true), None, None], // max
3266 ),
3267 );
3268 let expected_true = vec![false, true, true, true, true];
3269 let expected_false = vec![true, true, false, true, true];
3270
3271 (schema, statistics, expected_true, expected_false)
3272 }
3273
3274 #[test]
3275 fn prune_bool_const_expr() {
3276 let (schema, statistics, _, _) = bool_setup();
3277
3278 prune_with_expr(
3279 // true
3280 lit(true),
3281 &schema,
3282 &statistics,
3283 &[true, true, true, true, true],
3284 );
3285
3286 prune_with_expr(
3287 // false
3288 // constant literals that do NOT refer to any columns are currently not evaluated at all, hence the result is
3289 // "all true"
3290 lit(false),
3291 &schema,
3292 &statistics,
3293 &[true, true, true, true, true],
3294 );
3295 }
3296
3297 #[test]
3298 fn prune_bool_column() {
3299 let (schema, statistics, expected_true, _) = bool_setup();
3300
3301 prune_with_expr(
3302 // b1
3303 col("b1"),
3304 &schema,
3305 &statistics,
3306 &expected_true,
3307 );
3308 }
3309
3310 #[test]
3311 fn prune_bool_not_column() {
3312 let (schema, statistics, _, expected_false) = bool_setup();
3313
3314 prune_with_expr(
3315 // !b1
3316 col("b1").not(),
3317 &schema,
3318 &statistics,
3319 &expected_false,
3320 );
3321 }
3322
3323 #[test]
3324 fn prune_bool_column_eq_true() {
3325 let (schema, statistics, expected_true, _) = bool_setup();
3326
3327 prune_with_expr(
3328 // b1 = true
3329 col("b1").eq(lit(true)),
3330 &schema,
3331 &statistics,
3332 &expected_true,
3333 );
3334 }
3335
3336 #[test]
3337 fn prune_bool_not_column_eq_true() {
3338 let (schema, statistics, _, expected_false) = bool_setup();
3339
3340 prune_with_expr(
3341 // !b1 = true
3342 col("b1").not().eq(lit(true)),
3343 &schema,
3344 &statistics,
3345 &expected_false,
3346 );
3347 }
3348
3349 /// Creates a setup for chunk pruning, modeling a int32 column "i"
3350 /// with 5 different containers (e.g. RowGroups). They have [min,
3351 /// max]:
3352 ///
3353 /// i [-5, 5]
3354 /// i [1, 11]
3355 /// i [-11, -1]
3356 /// i [NULL, NULL]
3357 /// i [1, NULL]
3358 fn int32_setup() -> (SchemaRef, TestStatistics) {
3359 let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, true)]));
3360
3361 let statistics = TestStatistics::new().with(
3362 "i",
3363 ContainerStats::new_i32(
3364 vec![Some(-5), Some(1), Some(-11), None, Some(1)], // min
3365 vec![Some(5), Some(11), Some(-1), None, None], // max
3366 ),
3367 );
3368 (schema, statistics)
3369 }
3370
3371 #[test]
3372 fn prune_int32_col_gt_zero() {
3373 let (schema, statistics) = int32_setup();
3374
3375 // Expression "i > 0" and "-i < 0"
3376 // i [-5, 5] ==> some rows could pass (must keep)
3377 // i [1, 11] ==> all rows must pass (must keep)
3378 // i [-11, -1] ==> no rows can pass (not keep)
3379 // i [NULL, NULL] ==> unknown (must keep)
3380 // i [1, NULL] ==> unknown (must keep)
3381 let expected_ret = &[true, true, false, true, true];
3382
3383 // i > 0
3384 prune_with_expr(col("i").gt(lit(0)), &schema, &statistics, expected_ret);
3385
3386 // -i < 0
3387 prune_with_expr(
3388 Expr::Negative(Box::new(col("i"))).lt(lit(0)),
3389 &schema,
3390 &statistics,
3391 expected_ret,
3392 );
3393 }
3394
3395 #[test]
3396 fn prune_int32_col_lte_zero() {
3397 let (schema, statistics) = int32_setup();
3398
3399 // Expression "i <= 0" and "-i >= 0"
3400 // i [-5, 5] ==> some rows could pass (must keep)
3401 // i [1, 11] ==> no rows can pass (not keep)
3402 // i [-11, -1] ==> all rows must pass (must keep)
3403 // i [NULL, NULL] ==> unknown (must keep)
3404 // i [1, NULL] ==> no rows can pass (not keep)
3405 let expected_ret = &[true, false, true, true, false];
3406
3407 prune_with_expr(
3408 // i <= 0
3409 col("i").lt_eq(lit(0)),
3410 &schema,
3411 &statistics,
3412 expected_ret,
3413 );
3414
3415 prune_with_expr(
3416 // -i >= 0
3417 Expr::Negative(Box::new(col("i"))).gt_eq(lit(0)),
3418 &schema,
3419 &statistics,
3420 expected_ret,
3421 );
3422 }
3423
3424 #[test]
3425 fn prune_int32_col_lte_zero_cast() {
3426 let (schema, statistics) = int32_setup();
3427
3428 // Expression "cast(i as utf8) <= '0'"
3429 // i [-5, 5] ==> some rows could pass (must keep)
3430 // i [1, 11] ==> no rows can pass in theory, -0.22 (conservatively keep)
3431 // i [-11, -1] ==> no rows could pass in theory (conservatively keep)
3432 // i [NULL, NULL] ==> unknown (must keep)
3433 // i [1, NULL] ==> no rows can pass (conservatively keep)
3434 let expected_ret = &[true, true, true, true, true];
3435
3436 prune_with_expr(
3437 // cast(i as utf8) <= 0
3438 cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3439 &schema,
3440 &statistics,
3441 expected_ret,
3442 );
3443
3444 prune_with_expr(
3445 // try_cast(i as utf8) <= 0
3446 try_cast(col("i"), DataType::Utf8).lt_eq(lit("0")),
3447 &schema,
3448 &statistics,
3449 expected_ret,
3450 );
3451
3452 prune_with_expr(
3453 // cast(-i as utf8) >= 0
3454 cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3455 &schema,
3456 &statistics,
3457 expected_ret,
3458 );
3459
3460 prune_with_expr(
3461 // try_cast(-i as utf8) >= 0
3462 try_cast(Expr::Negative(Box::new(col("i"))), DataType::Utf8).gt_eq(lit("0")),
3463 &schema,
3464 &statistics,
3465 expected_ret,
3466 );
3467 }
3468
3469 #[test]
3470 fn prune_int32_col_eq_zero() {
3471 let (schema, statistics) = int32_setup();
3472
3473 // Expression "i = 0"
3474 // i [-5, 5] ==> some rows could pass (must keep)
3475 // i [1, 11] ==> no rows can pass (not keep)
3476 // i [-11, -1] ==> no rows can pass (not keep)
3477 // i [NULL, NULL] ==> unknown (must keep)
3478 // i [1, NULL] ==> no rows can pass (not keep)
3479 let expected_ret = &[true, false, false, true, false];
3480
3481 prune_with_expr(
3482 // i = 0
3483 col("i").eq(lit(0)),
3484 &schema,
3485 &statistics,
3486 expected_ret,
3487 );
3488 }
3489
3490 #[test]
3491 fn prune_int32_col_eq_zero_cast() {
3492 let (schema, statistics) = int32_setup();
3493
3494 // Expression "cast(i as int64) = 0"
3495 // i [-5, 5] ==> some rows could pass (must keep)
3496 // i [1, 11] ==> no rows can pass (not keep)
3497 // i [-11, -1] ==> no rows can pass (not keep)
3498 // i [NULL, NULL] ==> unknown (must keep)
3499 // i [1, NULL] ==> no rows can pass (not keep)
3500 let expected_ret = &[true, false, false, true, false];
3501
3502 prune_with_expr(
3503 cast(col("i"), DataType::Int64).eq(lit(0i64)),
3504 &schema,
3505 &statistics,
3506 expected_ret,
3507 );
3508
3509 prune_with_expr(
3510 try_cast(col("i"), DataType::Int64).eq(lit(0i64)),
3511 &schema,
3512 &statistics,
3513 expected_ret,
3514 );
3515 }
3516
3517 #[test]
3518 fn prune_int32_col_eq_zero_cast_as_str() {
3519 let (schema, statistics) = int32_setup();
3520
3521 // Note the cast is to a string where sorting properties are
3522 // not the same as integers
3523 //
3524 // Expression "cast(i as utf8) = '0'"
3525 // i [-5, 5] ==> some rows could pass (keep)
3526 // i [1, 11] ==> no rows can pass (could keep)
3527 // i [-11, -1] ==> no rows can pass (could keep)
3528 // i [NULL, NULL] ==> unknown (keep)
3529 // i [1, NULL] ==> no rows can pass (could keep)
3530 let expected_ret = &[true, true, true, true, true];
3531
3532 prune_with_expr(
3533 cast(col("i"), DataType::Utf8).eq(lit("0")),
3534 &schema,
3535 &statistics,
3536 expected_ret,
3537 );
3538 }
3539
3540 #[test]
3541 fn prune_int32_col_lt_neg_one() {
3542 let (schema, statistics) = int32_setup();
3543
3544 // Expression "i > -1" and "-i < 1"
3545 // i [-5, 5] ==> some rows could pass (must keep)
3546 // i [1, 11] ==> all rows must pass (must keep)
3547 // i [-11, -1] ==> no rows can pass (not keep)
3548 // i [NULL, NULL] ==> unknown (must keep)
3549 // i [1, NULL] ==> all rows must pass (must keep)
3550 let expected_ret = &[true, true, false, true, true];
3551
3552 prune_with_expr(
3553 // i > -1
3554 col("i").gt(lit(-1)),
3555 &schema,
3556 &statistics,
3557 expected_ret,
3558 );
3559
3560 prune_with_expr(
3561 // -i < 1
3562 Expr::Negative(Box::new(col("i"))).lt(lit(1)),
3563 &schema,
3564 &statistics,
3565 expected_ret,
3566 );
3567 }
3568
3569 #[test]
3570 fn prune_int32_is_null() {
3571 let (schema, statistics) = int32_setup();
3572
3573 // Expression "i IS NULL" when there are no null statistics,
3574 // should all be kept
3575 let expected_ret = &[true, true, true, true, true];
3576
3577 prune_with_expr(
3578 // i IS NULL, no null statistics
3579 col("i").is_null(),
3580 &schema,
3581 &statistics,
3582 expected_ret,
3583 );
3584
3585 // provide null counts for each column
3586 let statistics = statistics.with_null_counts(
3587 "i",
3588 vec![
3589 Some(0), // no nulls (don't keep)
3590 Some(1), // 1 null
3591 None, // unknown nulls
3592 None, // unknown nulls (min/max are both null too, like no stats at all)
3593 Some(0), // 0 nulls (max=null too which means no known max) (don't keep)
3594 ],
3595 );
3596
3597 let expected_ret = &[false, true, true, true, false];
3598
3599 prune_with_expr(
3600 // i IS NULL, with actual null statistics
3601 col("i").is_null(),
3602 &schema,
3603 &statistics,
3604 expected_ret,
3605 );
3606 }
3607
3608 #[test]
3609 fn prune_int32_column_is_known_all_null() {
3610 let (schema, statistics) = int32_setup();
3611
3612 // Expression "i < 0"
3613 // i [-5, 5] ==> some rows could pass (must keep)
3614 // i [1, 11] ==> no rows can pass (not keep)
3615 // i [-11, -1] ==> all rows must pass (must keep)
3616 // i [NULL, NULL] ==> unknown (must keep)
3617 // i [1, NULL] ==> no rows can pass (not keep)
3618 let expected_ret = &[true, false, true, true, false];
3619
3620 prune_with_expr(
3621 // i < 0
3622 col("i").lt(lit(0)),
3623 &schema,
3624 &statistics,
3625 expected_ret,
3626 );
3627
3628 // provide row counts for each column
3629 let statistics = statistics.with_row_counts(
3630 "i",
3631 vec![
3632 Some(10), // 10 rows of data
3633 Some(9), // 9 rows of data
3634 None, // unknown row counts
3635 Some(4),
3636 Some(10),
3637 ],
3638 );
3639
3640 // pruning result is still the same if we only know row counts
3641 prune_with_expr(
3642 // i < 0, with only row counts statistics
3643 col("i").lt(lit(0)),
3644 &schema,
3645 &statistics,
3646 expected_ret,
3647 );
3648
3649 // provide null counts for each column
3650 let statistics = statistics.with_null_counts(
3651 "i",
3652 vec![
3653 Some(0), // no nulls
3654 Some(1), // 1 null
3655 None, // unknown nulls
3656 Some(4), // 4 nulls, which is the same as the row counts, i.e. this column is all null (don't keep)
3657 Some(0), // 0 nulls (max=null too which means no known max)
3658 ],
3659 );
3660
3661 // Expression "i < 0" with actual null and row counts statistics
3662 // col | min, max | row counts | null counts |
3663 // ----+--------------+------------+-------------+
3664 // i | [-5, 5] | 10 | 0 | ==> Some rows could pass (must keep)
3665 // i | [1, 11] | 9 | 1 | ==> No rows can pass (not keep)
3666 // i | [-11,-1] | Unknown | Unknown | ==> All rows must pass (must keep)
3667 // i | [NULL, NULL] | 4 | 4 | ==> The column is all null (not keep)
3668 // i | [1, NULL] | 10 | 0 | ==> No rows can pass (not keep)
3669 let expected_ret = &[true, false, true, false, false];
3670
3671 prune_with_expr(
3672 // i < 0, with actual null and row counts statistics
3673 col("i").lt(lit(0)),
3674 &schema,
3675 &statistics,
3676 expected_ret,
3677 );
3678 }
3679
3680 #[test]
3681 fn prune_cast_column_scalar() {
3682 // The data type of column i is INT32
3683 let (schema, statistics) = int32_setup();
3684 let expected_ret = &[true, true, false, true, true];
3685
3686 prune_with_expr(
3687 // i > int64(0)
3688 col("i").gt(cast(lit(ScalarValue::Int64(Some(0))), DataType::Int32)),
3689 &schema,
3690 &statistics,
3691 expected_ret,
3692 );
3693
3694 prune_with_expr(
3695 // cast(i as int64) > int64(0)
3696 cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3697 &schema,
3698 &statistics,
3699 expected_ret,
3700 );
3701
3702 prune_with_expr(
3703 // try_cast(i as int64) > int64(0)
3704 try_cast(col("i"), DataType::Int64).gt(lit(ScalarValue::Int64(Some(0)))),
3705 &schema,
3706 &statistics,
3707 expected_ret,
3708 );
3709
3710 prune_with_expr(
3711 // `-cast(i as int64) < 0` convert to `cast(i as int64) > -0`
3712 Expr::Negative(Box::new(cast(col("i"), DataType::Int64)))
3713 .lt(lit(ScalarValue::Int64(Some(0)))),
3714 &schema,
3715 &statistics,
3716 expected_ret,
3717 );
3718 }
3719
3720 #[test]
3721 fn test_increment_utf8() {
3722 // Basic ASCII
3723 assert_eq!(increment_utf8("abc").unwrap(), "abd");
3724 assert_eq!(increment_utf8("abz").unwrap(), "ab{");
3725
3726 // Test around ASCII 127 (DEL)
3727 assert_eq!(increment_utf8("~").unwrap(), "\u{7f}"); // 126 -> 127
3728 assert_eq!(increment_utf8("\u{7f}").unwrap(), "\u{80}"); // 127 -> 128
3729
3730 // Test 2-byte UTF-8 sequences
3731 assert_eq!(increment_utf8("ß").unwrap(), "à"); // U+00DF -> U+00E0
3732
3733 // Test 3-byte UTF-8 sequences
3734 assert_eq!(increment_utf8("℣").unwrap(), "ℤ"); // U+2123 -> U+2124
3735
3736 // Test at UTF-8 boundaries
3737 assert_eq!(increment_utf8("\u{7FF}").unwrap(), "\u{800}"); // 2-byte to 3-byte boundary
3738 assert_eq!(increment_utf8("\u{FFFF}").unwrap(), "\u{10000}"); // 3-byte to 4-byte boundary
3739
3740 // Test that if we can't increment we return None
3741 assert!(increment_utf8("").is_none());
3742 assert!(increment_utf8("\u{10FFFF}").is_none()); // U+10FFFF is the max code point
3743
3744 // Test that if we can't increment the last character we do the previous one and truncate
3745 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3746
3747 // Test surrogate pair range (0xD800..=0xDFFF)
3748 assert_eq!(increment_utf8("a\u{D7FF}").unwrap(), "b");
3749 assert!(increment_utf8("\u{D7FF}").is_none());
3750
3751 // Test non-characters range (0xFDD0..=0xFDEF)
3752 assert_eq!(increment_utf8("a\u{FDCF}").unwrap(), "b");
3753 assert!(increment_utf8("\u{FDCF}").is_none());
3754
3755 // Test private use area limit (>= 0x110000)
3756 assert_eq!(increment_utf8("a\u{10FFFF}").unwrap(), "b");
3757 assert!(increment_utf8("\u{10FFFF}").is_none()); // Can't increment past max valid codepoint
3758 }
3759
3760 /// Creates a setup for chunk pruning, modeling a utf8 column "s1"
3761 /// with 5 different containers (e.g. RowGroups). They have [min,
3762 /// max]:
3763 /// s1 ["A", "Z"]
3764 /// s1 ["A", "L"]
3765 /// s1 ["N", "Z"]
3766 /// s1 [NULL, NULL]
3767 /// s1 ["A", NULL]
3768 /// s1 ["", "A"]
3769 /// s1 ["", ""]
3770 /// s1 ["AB", "A\u{10ffff}"]
3771 /// s1 ["A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"]
3772 fn utf8_setup() -> (SchemaRef, TestStatistics) {
3773 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
3774
3775 let statistics = TestStatistics::new().with(
3776 "s1",
3777 ContainerStats::new_utf8(
3778 vec![
3779 Some("A"),
3780 Some("A"),
3781 Some("N"),
3782 Some("M"),
3783 None,
3784 Some("A"),
3785 Some(""),
3786 Some(""),
3787 Some("AB"),
3788 Some("A\u{10ffff}\u{10ffff}"),
3789 ], // min
3790 vec![
3791 Some("Z"),
3792 Some("L"),
3793 Some("Z"),
3794 Some("M"),
3795 None,
3796 None,
3797 Some("A"),
3798 Some(""),
3799 Some("A\u{10ffff}\u{10ffff}\u{10ffff}"),
3800 Some("A\u{10ffff}\u{10ffff}"),
3801 ], // max
3802 ),
3803 );
3804 (schema, statistics)
3805 }
3806
3807 #[test]
3808 fn prune_utf8_eq() {
3809 let (schema, statistics) = utf8_setup();
3810
3811 let expr = col("s1").eq(lit("A"));
3812 #[rustfmt::skip]
3813 let expected_ret = &[
3814 // s1 ["A", "Z"] ==> some rows could pass (must keep)
3815 true,
3816 // s1 ["A", "L"] ==> some rows could pass (must keep)
3817 true,
3818 // s1 ["N", "Z"] ==> no rows can pass (not keep)
3819 false,
3820 // s1 ["M", "M"] ==> no rows can pass (not keep)
3821 false,
3822 // s1 [NULL, NULL] ==> unknown (must keep)
3823 true,
3824 // s1 ["A", NULL] ==> unknown (must keep)
3825 true,
3826 // s1 ["", "A"] ==> some rows could pass (must keep)
3827 true,
3828 // s1 ["", ""] ==> no rows can pass (not keep)
3829 false,
3830 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
3831 false,
3832 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
3833 false,
3834 ];
3835 prune_with_expr(expr, &schema, &statistics, expected_ret);
3836
3837 let expr = col("s1").eq(lit(""));
3838 #[rustfmt::skip]
3839 let expected_ret = &[
3840 // s1 ["A", "Z"] ==> no rows can pass (not keep)
3841 false,
3842 // s1 ["A", "L"] ==> no rows can pass (not keep)
3843 false,
3844 // s1 ["N", "Z"] ==> no rows can pass (not keep)
3845 false,
3846 // s1 ["M", "M"] ==> no rows can pass (not keep)
3847 false,
3848 // s1 [NULL, NULL] ==> unknown (must keep)
3849 true,
3850 // s1 ["A", NULL] ==> no rows can pass (not keep)
3851 false,
3852 // s1 ["", "A"] ==> some rows could pass (must keep)
3853 true,
3854 // s1 ["", ""] ==> all rows must pass (must keep)
3855 true,
3856 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
3857 false,
3858 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
3859 false,
3860 ];
3861 prune_with_expr(expr, &schema, &statistics, expected_ret);
3862 }
3863
3864 #[test]
3865 fn prune_utf8_not_eq() {
3866 let (schema, statistics) = utf8_setup();
3867
3868 let expr = col("s1").not_eq(lit("A"));
3869 #[rustfmt::skip]
3870 let expected_ret = &[
3871 // s1 ["A", "Z"] ==> some rows could pass (must keep)
3872 true,
3873 // s1 ["A", "L"] ==> some rows could pass (must keep)
3874 true,
3875 // s1 ["N", "Z"] ==> all rows must pass (must keep)
3876 true,
3877 // s1 ["M", "M"] ==> all rows must pass (must keep)
3878 true,
3879 // s1 [NULL, NULL] ==> unknown (must keep)
3880 true,
3881 // s1 ["A", NULL] ==> unknown (must keep)
3882 true,
3883 // s1 ["", "A"] ==> some rows could pass (must keep)
3884 true,
3885 // s1 ["", ""] ==> all rows must pass (must keep)
3886 true,
3887 // s1 ["AB", "A\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
3888 true,
3889 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
3890 true,
3891 ];
3892 prune_with_expr(expr, &schema, &statistics, expected_ret);
3893
3894 let expr = col("s1").not_eq(lit(""));
3895 #[rustfmt::skip]
3896 let expected_ret = &[
3897 // s1 ["A", "Z"] ==> all rows must pass (must keep)
3898 true,
3899 // s1 ["A", "L"] ==> all rows must pass (must keep)
3900 true,
3901 // s1 ["N", "Z"] ==> all rows must pass (must keep)
3902 true,
3903 // s1 ["M", "M"] ==> all rows must pass (must keep)
3904 true,
3905 // s1 [NULL, NULL] ==> unknown (must keep)
3906 true,
3907 // s1 ["A", NULL] ==> unknown (must keep)
3908 true,
3909 // s1 ["", "A"] ==> some rows could pass (must keep)
3910 true,
3911 // s1 ["", ""] ==> no rows can pass (not keep)
3912 false,
3913 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
3914 true,
3915 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
3916 true,
3917 ];
3918 prune_with_expr(expr, &schema, &statistics, expected_ret);
3919 }
3920
3921 #[test]
3922 fn prune_utf8_like_one() {
3923 let (schema, statistics) = utf8_setup();
3924
3925 let expr = col("s1").like(lit("A_"));
3926 #[rustfmt::skip]
3927 let expected_ret = &[
3928 // s1 ["A", "Z"] ==> some rows could pass (must keep)
3929 true,
3930 // s1 ["A", "L"] ==> some rows could pass (must keep)
3931 true,
3932 // s1 ["N", "Z"] ==> no rows can pass (not keep)
3933 false,
3934 // s1 ["M", "M"] ==> no rows can pass (not keep)
3935 false,
3936 // s1 [NULL, NULL] ==> unknown (must keep)
3937 true,
3938 // s1 ["A", NULL] ==> unknown (must keep)
3939 true,
3940 // s1 ["", "A"] ==> some rows could pass (must keep)
3941 true,
3942 // s1 ["", ""] ==> no rows can pass (not keep)
3943 false,
3944 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
3945 true,
3946 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
3947 true,
3948 ];
3949 prune_with_expr(expr, &schema, &statistics, expected_ret);
3950
3951 let expr = col("s1").like(lit("_A_"));
3952 #[rustfmt::skip]
3953 let expected_ret = &[
3954 // s1 ["A", "Z"] ==> some rows could pass (must keep)
3955 true,
3956 // s1 ["A", "L"] ==> some rows could pass (must keep)
3957 true,
3958 // s1 ["N", "Z"] ==> some rows could pass (must keep)
3959 true,
3960 // s1 ["M", "M"] ==> some rows could pass (must keep)
3961 true,
3962 // s1 [NULL, NULL] ==> unknown (must keep)
3963 true,
3964 // s1 ["A", NULL] ==> unknown (must keep)
3965 true,
3966 // s1 ["", "A"] ==> some rows could pass (must keep)
3967 true,
3968 // s1 ["", ""] ==> some rows could pass (must keep)
3969 true,
3970 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
3971 true,
3972 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
3973 true,
3974 ];
3975 prune_with_expr(expr, &schema, &statistics, expected_ret);
3976
3977 let expr = col("s1").like(lit("_"));
3978 #[rustfmt::skip]
3979 let expected_ret = &[
3980 // s1 ["A", "Z"] ==> all rows must pass (must keep)
3981 true,
3982 // s1 ["A", "L"] ==> all rows must pass (must keep)
3983 true,
3984 // s1 ["N", "Z"] ==> all rows must pass (must keep)
3985 true,
3986 // s1 ["M", "M"] ==> all rows must pass (must keep)
3987 true,
3988 // s1 [NULL, NULL] ==> unknown (must keep)
3989 true,
3990 // s1 ["A", NULL] ==> unknown (must keep)
3991 true,
3992 // s1 ["", "A"] ==> all rows must pass (must keep)
3993 true,
3994 // s1 ["", ""] ==> all rows must pass (must keep)
3995 true,
3996 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
3997 true,
3998 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
3999 true,
4000 ];
4001 prune_with_expr(expr, &schema, &statistics, expected_ret);
4002
4003 let expr = col("s1").like(lit(""));
4004 #[rustfmt::skip]
4005 let expected_ret = &[
4006 // s1 ["A", "Z"] ==> no rows can pass (not keep)
4007 false,
4008 // s1 ["A", "L"] ==> no rows can pass (not keep)
4009 false,
4010 // s1 ["N", "Z"] ==> no rows can pass (not keep)
4011 false,
4012 // s1 ["M", "M"] ==> no rows can pass (not keep)
4013 false,
4014 // s1 [NULL, NULL] ==> unknown (must keep)
4015 true,
4016 // s1 ["A", NULL] ==> no rows can pass (not keep)
4017 false,
4018 // s1 ["", "A"] ==> some rows could pass (must keep)
4019 true,
4020 // s1 ["", ""] ==> all rows must pass (must keep)
4021 true,
4022 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
4023 false,
4024 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
4025 false,
4026 ];
4027 prune_with_expr(expr, &schema, &statistics, expected_ret);
4028 }
4029
4030 #[test]
4031 fn prune_utf8_like_many() {
4032 let (schema, statistics) = utf8_setup();
4033
4034 let expr = col("s1").like(lit("A%"));
4035 #[rustfmt::skip]
4036 let expected_ret = &[
4037 // s1 ["A", "Z"] ==> some rows could pass (must keep)
4038 true,
4039 // s1 ["A", "L"] ==> some rows could pass (must keep)
4040 true,
4041 // s1 ["N", "Z"] ==> no rows can pass (not keep)
4042 false,
4043 // s1 ["M", "M"] ==> no rows can pass (not keep)
4044 false,
4045 // s1 [NULL, NULL] ==> unknown (must keep)
4046 true,
4047 // s1 ["A", NULL] ==> unknown (must keep)
4048 true,
4049 // s1 ["", "A"] ==> some rows could pass (must keep)
4050 true,
4051 // s1 ["", ""] ==> no rows can pass (not keep)
4052 false,
4053 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4054 true,
4055 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4056 true,
4057 ];
4058 prune_with_expr(expr, &schema, &statistics, expected_ret);
4059
4060 let expr = col("s1").like(lit("%A%"));
4061 #[rustfmt::skip]
4062 let expected_ret = &[
4063 // s1 ["A", "Z"] ==> some rows could pass (must keep)
4064 true,
4065 // s1 ["A", "L"] ==> some rows could pass (must keep)
4066 true,
4067 // s1 ["N", "Z"] ==> some rows could pass (must keep)
4068 true,
4069 // s1 ["M", "M"] ==> some rows could pass (must keep)
4070 true,
4071 // s1 [NULL, NULL] ==> unknown (must keep)
4072 true,
4073 // s1 ["A", NULL] ==> unknown (must keep)
4074 true,
4075 // s1 ["", "A"] ==> some rows could pass (must keep)
4076 true,
4077 // s1 ["", ""] ==> some rows could pass (must keep)
4078 true,
4079 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4080 true,
4081 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4082 true,
4083 ];
4084 prune_with_expr(expr, &schema, &statistics, expected_ret);
4085
4086 let expr = col("s1").like(lit("%"));
4087 #[rustfmt::skip]
4088 let expected_ret = &[
4089 // s1 ["A", "Z"] ==> all rows must pass (must keep)
4090 true,
4091 // s1 ["A", "L"] ==> all rows must pass (must keep)
4092 true,
4093 // s1 ["N", "Z"] ==> all rows must pass (must keep)
4094 true,
4095 // s1 ["M", "M"] ==> all rows must pass (must keep)
4096 true,
4097 // s1 [NULL, NULL] ==> unknown (must keep)
4098 true,
4099 // s1 ["A", NULL] ==> unknown (must keep)
4100 true,
4101 // s1 ["", "A"] ==> all rows must pass (must keep)
4102 true,
4103 // s1 ["", ""] ==> all rows must pass (must keep)
4104 true,
4105 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
4106 true,
4107 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> all rows must pass (must keep)
4108 true,
4109 ];
4110 prune_with_expr(expr, &schema, &statistics, expected_ret);
4111
4112 let expr = col("s1").like(lit(""));
4113 #[rustfmt::skip]
4114 let expected_ret = &[
4115 // s1 ["A", "Z"] ==> no rows can pass (not keep)
4116 false,
4117 // s1 ["A", "L"] ==> no rows can pass (not keep)
4118 false,
4119 // s1 ["N", "Z"] ==> no rows can pass (not keep)
4120 false,
4121 // s1 ["M", "M"] ==> no rows can pass (not keep)
4122 false,
4123 // s1 [NULL, NULL] ==> unknown (must keep)
4124 true,
4125 // s1 ["A", NULL] ==> no rows can pass (not keep)
4126 false,
4127 // s1 ["", "A"] ==> some rows could pass (must keep)
4128 true,
4129 // s1 ["", ""] ==> all rows must pass (must keep)
4130 true,
4131 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
4132 false,
4133 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> no rows can pass (not keep)
4134 false,
4135 ];
4136 prune_with_expr(expr, &schema, &statistics, expected_ret);
4137 }
4138
4139 #[test]
4140 fn prune_utf8_not_like_one() {
4141 let (schema, statistics) = utf8_setup();
4142
4143 let expr = col("s1").not_like(lit("A\u{10ffff}_"));
4144 #[rustfmt::skip]
4145 let expected_ret = &[
4146 // s1 ["A", "Z"] ==> some rows could pass (must keep)
4147 true,
4148 // s1 ["A", "L"] ==> some rows could pass (must keep)
4149 true,
4150 // s1 ["N", "Z"] ==> some rows could pass (must keep)
4151 true,
4152 // s1 ["M", "M"] ==> some rows could pass (must keep)
4153 true,
4154 // s1 [NULL, NULL] ==> unknown (must keep)
4155 true,
4156 // s1 ["A", NULL] ==> some rows could pass (must keep)
4157 true,
4158 // s1 ["", "A"] ==> some rows could pass (must keep)
4159 true,
4160 // s1 ["", ""] ==> some rows could pass (must keep)
4161 true,
4162 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4163 true,
4164 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> no row match. (min, max) maybe truncate
4165 // orignal (min, max) maybe ("A\u{10ffff}\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}\u{10ffff}\u{10ffff}")
4166 true,
4167 ];
4168 prune_with_expr(expr, &schema, &statistics, expected_ret);
4169 }
4170
4171 #[test]
4172 fn prune_utf8_not_like_many() {
4173 let (schema, statistics) = utf8_setup();
4174
4175 let expr = col("s1").not_like(lit("A\u{10ffff}%"));
4176 #[rustfmt::skip]
4177 let expected_ret = &[
4178 // s1 ["A", "Z"] ==> some rows could pass (must keep)
4179 true,
4180 // s1 ["A", "L"] ==> some rows could pass (must keep)
4181 true,
4182 // s1 ["N", "Z"] ==> some rows could pass (must keep)
4183 true,
4184 // s1 ["M", "M"] ==> some rows could pass (must keep)
4185 true,
4186 // s1 [NULL, NULL] ==> unknown (must keep)
4187 true,
4188 // s1 ["A", NULL] ==> some rows could pass (must keep)
4189 true,
4190 // s1 ["", "A"] ==> some rows could pass (must keep)
4191 true,
4192 // s1 ["", ""] ==> some rows could pass (must keep)
4193 true,
4194 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4195 true,
4196 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> no row match
4197 false,
4198 ];
4199 prune_with_expr(expr, &schema, &statistics, expected_ret);
4200
4201 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}"));
4202 #[rustfmt::skip]
4203 let expected_ret = &[
4204 // s1 ["A", "Z"] ==> some rows could pass (must keep)
4205 true,
4206 // s1 ["A", "L"] ==> some rows could pass (must keep)
4207 true,
4208 // s1 ["N", "Z"] ==> some rows could pass (must keep)
4209 true,
4210 // s1 ["M", "M"] ==> some rows could pass (must keep)
4211 true,
4212 // s1 [NULL, NULL] ==> unknown (must keep)
4213 true,
4214 // s1 ["A", NULL] ==> some rows could pass (must keep)
4215 true,
4216 // s1 ["", "A"] ==> some rows could pass (must keep)
4217 true,
4218 // s1 ["", ""] ==> some rows could pass (must keep)
4219 true,
4220 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4221 true,
4222 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4223 true,
4224 ];
4225 prune_with_expr(expr, &schema, &statistics, expected_ret);
4226
4227 let expr = col("s1").not_like(lit("A\u{10ffff}%\u{10ffff}_"));
4228 #[rustfmt::skip]
4229 let expected_ret = &[
4230 // s1 ["A", "Z"] ==> some rows could pass (must keep)
4231 true,
4232 // s1 ["A", "L"] ==> some rows could pass (must keep)
4233 true,
4234 // s1 ["N", "Z"] ==> some rows could pass (must keep)
4235 true,
4236 // s1 ["M", "M"] ==> some rows could pass (must keep)
4237 true,
4238 // s1 [NULL, NULL] ==> unknown (must keep)
4239 true,
4240 // s1 ["A", NULL] ==> some rows could pass (must keep)
4241 true,
4242 // s1 ["", "A"] ==> some rows could pass (must keep)
4243 true,
4244 // s1 ["", ""] ==> some rows could pass (must keep)
4245 true,
4246 // s1 ["AB", "A\u{10ffff}\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4247 true,
4248 // s1 ["A\u{10ffff}\u{10ffff}", "A\u{10ffff}\u{10ffff}"] ==> some rows could pass (must keep)
4249 true,
4250 ];
4251 prune_with_expr(expr, &schema, &statistics, expected_ret);
4252
4253 let expr = col("s1").not_like(lit("A\\%%"));
4254 let statistics = TestStatistics::new().with(
4255 "s1",
4256 ContainerStats::new_utf8(
4257 vec![Some("A%a"), Some("A")],
4258 vec![Some("A%c"), Some("A")],
4259 ),
4260 );
4261 let expected_ret = &[false, true];
4262 prune_with_expr(expr, &schema, &statistics, expected_ret);
4263 }
4264
4265 #[test]
4266 fn test_rewrite_expr_to_prunable() {
4267 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4268 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4269
4270 // column op lit
4271 let left_input = col("a");
4272 let left_input = logical2physical(&left_input, &schema);
4273 let right_input = lit(ScalarValue::Int32(Some(12)));
4274 let right_input = logical2physical(&right_input, &schema);
4275 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4276 &left_input,
4277 Operator::Eq,
4278 &right_input,
4279 df_schema.clone(),
4280 )
4281 .unwrap();
4282 assert_eq!(result_left.to_string(), left_input.to_string());
4283 assert_eq!(result_right.to_string(), right_input.to_string());
4284
4285 // cast op lit
4286 let left_input = cast(col("a"), DataType::Decimal128(20, 3));
4287 let left_input = logical2physical(&left_input, &schema);
4288 let right_input = lit(ScalarValue::Decimal128(Some(12), 20, 3));
4289 let right_input = logical2physical(&right_input, &schema);
4290 let (result_left, _, result_right) = rewrite_expr_to_prunable(
4291 &left_input,
4292 Operator::Gt,
4293 &right_input,
4294 df_schema.clone(),
4295 )
4296 .unwrap();
4297 assert_eq!(result_left.to_string(), left_input.to_string());
4298 assert_eq!(result_right.to_string(), right_input.to_string());
4299
4300 // try_cast op lit
4301 let left_input = try_cast(col("a"), DataType::Int64);
4302 let left_input = logical2physical(&left_input, &schema);
4303 let right_input = lit(ScalarValue::Int64(Some(12)));
4304 let right_input = logical2physical(&right_input, &schema);
4305 let (result_left, _, result_right) =
4306 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema)
4307 .unwrap();
4308 assert_eq!(result_left.to_string(), left_input.to_string());
4309 assert_eq!(result_right.to_string(), right_input.to_string());
4310
4311 // TODO: add test for other case and op
4312 }
4313
4314 #[test]
4315 fn test_rewrite_expr_to_prunable_custom_unhandled_hook() {
4316 struct CustomUnhandledHook;
4317
4318 impl UnhandledPredicateHook for CustomUnhandledHook {
4319 /// This handles an arbitrary case of a column that doesn't exist in the schema
4320 /// by renaming it to yet another column that doesn't exist in the schema
4321 /// (the transformation is arbitrary, the point is that it can do whatever it wants)
4322 fn handle(&self, _expr: &Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
4323 Arc::new(phys_expr::Literal::new(ScalarValue::Int32(Some(42))))
4324 }
4325 }
4326
4327 let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
4328 let schema_with_b = Schema::new(vec![
4329 Field::new("a", DataType::Int32, true),
4330 Field::new("b", DataType::Int32, true),
4331 ]);
4332
4333 let rewriter = PredicateRewriter::new()
4334 .with_unhandled_hook(Arc::new(CustomUnhandledHook {}));
4335
4336 let transform_expr = |expr| {
4337 let expr = logical2physical(&expr, &schema_with_b);
4338 rewriter.rewrite_predicate_to_statistics_predicate(&expr, &schema)
4339 };
4340
4341 // transform an arbitrary valid expression that we know is handled
4342 let known_expression = col("a").eq(lit(12));
4343 let known_expression_transformed = PredicateRewriter::new()
4344 .rewrite_predicate_to_statistics_predicate(
4345 &logical2physical(&known_expression, &schema),
4346 &schema,
4347 );
4348
4349 // an expression referencing an unknown column (that is not in the schema) gets passed to the hook
4350 let input = col("b").eq(lit(12));
4351 let expected = logical2physical(&lit(42), &schema);
4352 let transformed = transform_expr(input.clone());
4353 assert_eq!(transformed.to_string(), expected.to_string());
4354
4355 // more complex case with unknown column
4356 let input = known_expression.clone().and(input.clone());
4357 let expected = phys_expr::BinaryExpr::new(
4358 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4359 Operator::And,
4360 logical2physical(&lit(42), &schema),
4361 );
4362 let transformed = transform_expr(input.clone());
4363 assert_eq!(transformed.to_string(), expected.to_string());
4364
4365 // an unknown expression gets passed to the hook
4366 let input = array_has(make_array(vec![lit(1)]), col("a"));
4367 let expected = logical2physical(&lit(42), &schema);
4368 let transformed = transform_expr(input.clone());
4369 assert_eq!(transformed.to_string(), expected.to_string());
4370
4371 // more complex case with unknown expression
4372 let input = known_expression.and(input);
4373 let expected = phys_expr::BinaryExpr::new(
4374 Arc::<dyn PhysicalExpr>::clone(&known_expression_transformed),
4375 Operator::And,
4376 logical2physical(&lit(42), &schema),
4377 );
4378 let transformed = transform_expr(input.clone());
4379 assert_eq!(transformed.to_string(), expected.to_string());
4380 }
4381
4382 #[test]
4383 fn test_rewrite_expr_to_prunable_error() {
4384 // cast string value to numeric value
4385 // this cast is not supported
4386 let schema = Schema::new(vec![Field::new("a", DataType::Utf8, true)]);
4387 let df_schema = DFSchema::try_from(schema.clone()).unwrap();
4388 let left_input = cast(col("a"), DataType::Int64);
4389 let left_input = logical2physical(&left_input, &schema);
4390 let right_input = lit(ScalarValue::Int64(Some(12)));
4391 let right_input = logical2physical(&right_input, &schema);
4392 let result = rewrite_expr_to_prunable(
4393 &left_input,
4394 Operator::Gt,
4395 &right_input,
4396 df_schema.clone(),
4397 );
4398 assert!(result.is_err());
4399
4400 // other expr
4401 let left_input = is_null(col("a"));
4402 let left_input = logical2physical(&left_input, &schema);
4403 let right_input = lit(ScalarValue::Int64(Some(12)));
4404 let right_input = logical2physical(&right_input, &schema);
4405 let result =
4406 rewrite_expr_to_prunable(&left_input, Operator::Gt, &right_input, df_schema);
4407 assert!(result.is_err());
4408 // TODO: add other negative test for other case and op
4409 }
4410
4411 #[test]
4412 fn prune_with_contained_one_column() {
4413 let schema = Arc::new(Schema::new(vec![Field::new("s1", DataType::Utf8, true)]));
4414
4415 // Model having information like a bloom filter for s1
4416 let statistics = TestStatistics::new()
4417 .with_contained(
4418 "s1",
4419 [ScalarValue::from("foo")],
4420 [
4421 // container 0 known to only contain "foo"",
4422 Some(true),
4423 // container 1 known to not contain "foo"
4424 Some(false),
4425 // container 2 unknown about "foo"
4426 None,
4427 // container 3 known to only contain "foo"
4428 Some(true),
4429 // container 4 known to not contain "foo"
4430 Some(false),
4431 // container 5 unknown about "foo"
4432 None,
4433 // container 6 known to only contain "foo"
4434 Some(true),
4435 // container 7 known to not contain "foo"
4436 Some(false),
4437 // container 8 unknown about "foo"
4438 None,
4439 ],
4440 )
4441 .with_contained(
4442 "s1",
4443 [ScalarValue::from("bar")],
4444 [
4445 // containers 0,1,2 known to only contain "bar"
4446 Some(true),
4447 Some(true),
4448 Some(true),
4449 // container 3,4,5 known to not contain "bar"
4450 Some(false),
4451 Some(false),
4452 Some(false),
4453 // container 6,7,8 unknown about "bar"
4454 None,
4455 None,
4456 None,
4457 ],
4458 )
4459 .with_contained(
4460 // the way the tests are setup, this data is
4461 // consulted if the "foo" and "bar" are being checked at the same time
4462 "s1",
4463 [ScalarValue::from("foo"), ScalarValue::from("bar")],
4464 [
4465 // container 0,1,2 unknown about ("foo, "bar")
4466 None,
4467 None,
4468 None,
4469 // container 3,4,5 known to contain only either "foo" and "bar"
4470 Some(true),
4471 Some(true),
4472 Some(true),
4473 // container 6,7,8 known to contain neither "foo" and "bar"
4474 Some(false),
4475 Some(false),
4476 Some(false),
4477 ],
4478 );
4479
4480 // s1 = 'foo'
4481 prune_with_expr(
4482 col("s1").eq(lit("foo")),
4483 &schema,
4484 &statistics,
4485 // rule out containers ('false) where we know foo is not present
4486 &[true, false, true, true, false, true, true, false, true],
4487 );
4488
4489 // s1 = 'bar'
4490 prune_with_expr(
4491 col("s1").eq(lit("bar")),
4492 &schema,
4493 &statistics,
4494 // rule out containers where we know bar is not present
4495 &[true, true, true, false, false, false, true, true, true],
4496 );
4497
4498 // s1 = 'baz' (unknown value)
4499 prune_with_expr(
4500 col("s1").eq(lit("baz")),
4501 &schema,
4502 &statistics,
4503 // can't rule out anything
4504 &[true, true, true, true, true, true, true, true, true],
4505 );
4506
4507 // s1 = 'foo' AND s1 = 'bar'
4508 prune_with_expr(
4509 col("s1").eq(lit("foo")).and(col("s1").eq(lit("bar"))),
4510 &schema,
4511 &statistics,
4512 // logically this predicate can't possibly be true (the column can't
4513 // take on both values) but we could rule it out if the stats tell
4514 // us that both values are not present
4515 &[true, true, true, true, true, true, true, true, true],
4516 );
4517
4518 // s1 = 'foo' OR s1 = 'bar'
4519 prune_with_expr(
4520 col("s1").eq(lit("foo")).or(col("s1").eq(lit("bar"))),
4521 &schema,
4522 &statistics,
4523 // can rule out containers that we know contain neither foo nor bar
4524 &[true, true, true, true, true, true, false, false, false],
4525 );
4526
4527 // s1 = 'foo' OR s1 = 'baz'
4528 prune_with_expr(
4529 col("s1").eq(lit("foo")).or(col("s1").eq(lit("baz"))),
4530 &schema,
4531 &statistics,
4532 // can't rule out anything container
4533 &[true, true, true, true, true, true, true, true, true],
4534 );
4535
4536 // s1 = 'foo' OR s1 = 'bar' OR s1 = 'baz'
4537 prune_with_expr(
4538 col("s1")
4539 .eq(lit("foo"))
4540 .or(col("s1").eq(lit("bar")))
4541 .or(col("s1").eq(lit("baz"))),
4542 &schema,
4543 &statistics,
4544 // can rule out any containers based on knowledge of s1 and `foo`,
4545 // `bar` and (`foo`, `bar`)
4546 &[true, true, true, true, true, true, true, true, true],
4547 );
4548
4549 // s1 != foo
4550 prune_with_expr(
4551 col("s1").not_eq(lit("foo")),
4552 &schema,
4553 &statistics,
4554 // rule out containers we know for sure only contain foo
4555 &[false, true, true, false, true, true, false, true, true],
4556 );
4557
4558 // s1 != bar
4559 prune_with_expr(
4560 col("s1").not_eq(lit("bar")),
4561 &schema,
4562 &statistics,
4563 // rule out when we know for sure s1 has the value bar
4564 &[false, false, false, true, true, true, true, true, true],
4565 );
4566
4567 // s1 != foo AND s1 != bar
4568 prune_with_expr(
4569 col("s1")
4570 .not_eq(lit("foo"))
4571 .and(col("s1").not_eq(lit("bar"))),
4572 &schema,
4573 &statistics,
4574 // can rule out any container where we know s1 does not have either 'foo' or 'bar'
4575 &[true, true, true, false, false, false, true, true, true],
4576 );
4577
4578 // s1 != foo AND s1 != bar AND s1 != baz
4579 prune_with_expr(
4580 col("s1")
4581 .not_eq(lit("foo"))
4582 .and(col("s1").not_eq(lit("bar")))
4583 .and(col("s1").not_eq(lit("baz"))),
4584 &schema,
4585 &statistics,
4586 // can't rule out any container based on knowledge of s1,s2
4587 &[true, true, true, true, true, true, true, true, true],
4588 );
4589
4590 // s1 != foo OR s1 != bar
4591 prune_with_expr(
4592 col("s1")
4593 .not_eq(lit("foo"))
4594 .or(col("s1").not_eq(lit("bar"))),
4595 &schema,
4596 &statistics,
4597 // cant' rule out anything based on contains information
4598 &[true, true, true, true, true, true, true, true, true],
4599 );
4600
4601 // s1 != foo OR s1 != bar OR s1 != baz
4602 prune_with_expr(
4603 col("s1")
4604 .not_eq(lit("foo"))
4605 .or(col("s1").not_eq(lit("bar")))
4606 .or(col("s1").not_eq(lit("baz"))),
4607 &schema,
4608 &statistics,
4609 // cant' rule out anything based on contains information
4610 &[true, true, true, true, true, true, true, true, true],
4611 );
4612 }
4613
4614 #[test]
4615 fn prune_with_contained_two_columns() {
4616 let schema = Arc::new(Schema::new(vec![
4617 Field::new("s1", DataType::Utf8, true),
4618 Field::new("s2", DataType::Utf8, true),
4619 ]));
4620
4621 // Model having information like bloom filters for s1 and s2
4622 let statistics = TestStatistics::new()
4623 .with_contained(
4624 "s1",
4625 [ScalarValue::from("foo")],
4626 [
4627 // container 0, s1 known to only contain "foo"",
4628 Some(true),
4629 // container 1, s1 known to not contain "foo"
4630 Some(false),
4631 // container 2, s1 unknown about "foo"
4632 None,
4633 // container 3, s1 known to only contain "foo"
4634 Some(true),
4635 // container 4, s1 known to not contain "foo"
4636 Some(false),
4637 // container 5, s1 unknown about "foo"
4638 None,
4639 // container 6, s1 known to only contain "foo"
4640 Some(true),
4641 // container 7, s1 known to not contain "foo"
4642 Some(false),
4643 // container 8, s1 unknown about "foo"
4644 None,
4645 ],
4646 )
4647 .with_contained(
4648 "s2", // for column s2
4649 [ScalarValue::from("bar")],
4650 [
4651 // containers 0,1,2 s2 known to only contain "bar"
4652 Some(true),
4653 Some(true),
4654 Some(true),
4655 // container 3,4,5 s2 known to not contain "bar"
4656 Some(false),
4657 Some(false),
4658 Some(false),
4659 // container 6,7,8 s2 unknown about "bar"
4660 None,
4661 None,
4662 None,
4663 ],
4664 );
4665
4666 // s1 = 'foo'
4667 prune_with_expr(
4668 col("s1").eq(lit("foo")),
4669 &schema,
4670 &statistics,
4671 // rule out containers where we know s1 is not present
4672 &[true, false, true, true, false, true, true, false, true],
4673 );
4674
4675 // s1 = 'foo' OR s2 = 'bar'
4676 let expr = col("s1").eq(lit("foo")).or(col("s2").eq(lit("bar")));
4677 prune_with_expr(
4678 expr,
4679 &schema,
4680 &statistics,
4681 // can't rule out any container (would need to prove that s1 != foo AND s2 != bar)
4682 &[true, true, true, true, true, true, true, true, true],
4683 );
4684
4685 // s1 = 'foo' AND s2 != 'bar'
4686 prune_with_expr(
4687 col("s1").eq(lit("foo")).and(col("s2").not_eq(lit("bar"))),
4688 &schema,
4689 &statistics,
4690 // can only rule out container where we know either:
4691 // 1. s1 doesn't have the value 'foo` or
4692 // 2. s2 has only the value of 'bar'
4693 &[false, false, false, true, false, true, true, false, true],
4694 );
4695
4696 // s1 != 'foo' AND s2 != 'bar'
4697 prune_with_expr(
4698 col("s1")
4699 .not_eq(lit("foo"))
4700 .and(col("s2").not_eq(lit("bar"))),
4701 &schema,
4702 &statistics,
4703 // Can rule out any container where we know either
4704 // 1. s1 has only the value 'foo'
4705 // 2. s2 has only the value 'bar'
4706 &[false, false, false, false, true, true, false, true, true],
4707 );
4708
4709 // s1 != 'foo' AND (s2 = 'bar' OR s2 = 'baz')
4710 prune_with_expr(
4711 col("s1")
4712 .not_eq(lit("foo"))
4713 .and(col("s2").eq(lit("bar")).or(col("s2").eq(lit("baz")))),
4714 &schema,
4715 &statistics,
4716 // Can rule out any container where we know s1 has only the value
4717 // 'foo'. Can't use knowledge of s2 and bar to rule out anything
4718 &[false, true, true, false, true, true, false, true, true],
4719 );
4720
4721 // s1 like '%foo%bar%'
4722 prune_with_expr(
4723 col("s1").like(lit("foo%bar%")),
4724 &schema,
4725 &statistics,
4726 // cant rule out anything with information we know
4727 &[true, true, true, true, true, true, true, true, true],
4728 );
4729
4730 // s1 like '%foo%bar%' AND s2 = 'bar'
4731 prune_with_expr(
4732 col("s1")
4733 .like(lit("foo%bar%"))
4734 .and(col("s2").eq(lit("bar"))),
4735 &schema,
4736 &statistics,
4737 // can rule out any container where we know s2 does not have the value 'bar'
4738 &[true, true, true, false, false, false, true, true, true],
4739 );
4740
4741 // s1 like '%foo%bar%' OR s2 = 'bar'
4742 prune_with_expr(
4743 col("s1").like(lit("foo%bar%")).or(col("s2").eq(lit("bar"))),
4744 &schema,
4745 &statistics,
4746 // can't rule out anything (we would have to prove that both the
4747 // like and the equality must be false)
4748 &[true, true, true, true, true, true, true, true, true],
4749 );
4750 }
4751
4752 #[test]
4753 fn prune_with_range_and_contained() {
4754 // Setup mimics range information for i, a bloom filter for s
4755 let schema = Arc::new(Schema::new(vec![
4756 Field::new("i", DataType::Int32, true),
4757 Field::new("s", DataType::Utf8, true),
4758 ]));
4759
4760 let statistics = TestStatistics::new()
4761 .with(
4762 "i",
4763 ContainerStats::new_i32(
4764 // Container 0, 3, 6: [-5 to 5]
4765 // Container 1, 4, 7: [10 to 20]
4766 // Container 2, 5, 9: unknown
4767 vec![
4768 Some(-5),
4769 Some(10),
4770 None,
4771 Some(-5),
4772 Some(10),
4773 None,
4774 Some(-5),
4775 Some(10),
4776 None,
4777 ], // min
4778 vec![
4779 Some(5),
4780 Some(20),
4781 None,
4782 Some(5),
4783 Some(20),
4784 None,
4785 Some(5),
4786 Some(20),
4787 None,
4788 ], // max
4789 ),
4790 )
4791 // Add contained information about the s and "foo"
4792 .with_contained(
4793 "s",
4794 [ScalarValue::from("foo")],
4795 [
4796 // container 0,1,2 known to only contain "foo"
4797 Some(true),
4798 Some(true),
4799 Some(true),
4800 // container 3,4,5 known to not contain "foo"
4801 Some(false),
4802 Some(false),
4803 Some(false),
4804 // container 6,7,8 unknown about "foo"
4805 None,
4806 None,
4807 None,
4808 ],
4809 );
4810
4811 // i = 0 and s = 'foo'
4812 prune_with_expr(
4813 col("i").eq(lit(0)).and(col("s").eq(lit("foo"))),
4814 &schema,
4815 &statistics,
4816 // Can rule out container where we know that either:
4817 // 1. 0 is outside the min/max range of i
4818 // 1. s does not contain foo
4819 // (range is false, and contained is false)
4820 &[true, false, true, false, false, false, true, false, true],
4821 );
4822
4823 // i = 0 and s != 'foo'
4824 prune_with_expr(
4825 col("i").eq(lit(0)).and(col("s").not_eq(lit("foo"))),
4826 &schema,
4827 &statistics,
4828 // Can rule out containers where either:
4829 // 1. 0 is outside the min/max range of i
4830 // 2. s only contains foo
4831 &[false, false, false, true, false, true, true, false, true],
4832 );
4833
4834 // i = 0 OR s = 'foo'
4835 prune_with_expr(
4836 col("i").eq(lit(0)).or(col("s").eq(lit("foo"))),
4837 &schema,
4838 &statistics,
4839 // in theory could rule out containers if we had min/max values for
4840 // s as well. But in this case we don't so we can't rule out anything
4841 &[true, true, true, true, true, true, true, true, true],
4842 );
4843 }
4844
4845 /// prunes the specified expr with the specified schema and statistics, and
4846 /// ensures it returns expected.
4847 ///
4848 /// `expected` is a vector of bools, where true means the row group should
4849 /// be kept, and false means it should be pruned.
4850 ///
4851 // TODO refactor other tests to use this to reduce boiler plate
4852 fn prune_with_expr(
4853 expr: Expr,
4854 schema: &SchemaRef,
4855 statistics: &TestStatistics,
4856 expected: &[bool],
4857 ) {
4858 println!("Pruning with expr: {}", expr);
4859 let expr = logical2physical(&expr, schema);
4860 let p = PruningPredicate::try_new(expr, Arc::<Schema>::clone(schema)).unwrap();
4861 let result = p.prune(statistics).unwrap();
4862 assert_eq!(result, expected);
4863 }
4864
4865 fn test_build_predicate_expression(
4866 expr: &Expr,
4867 schema: &Schema,
4868 required_columns: &mut RequiredColumns,
4869 ) -> Arc<dyn PhysicalExpr> {
4870 let expr = logical2physical(expr, schema);
4871 let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _;
4872 build_predicate_expression(&expr, schema, required_columns, &unhandled_hook)
4873 }
4874}