Skip to main content

datafusion_physical_expr/
analysis.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interval and selectivity in [`AnalysisContext`]
19
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24use crate::expressions::Column;
25use crate::intervals::cp_solver::{ExprIntervalGraph, PropagationResult};
26use crate::utils::collect_columns;
27
28use arrow::datatypes::Schema;
29use datafusion_common::stats::Precision;
30use datafusion_common::{
31    ColumnStatistics, Result, ScalarValue, assert_or_internal_err,
32    internal_datafusion_err, internal_err,
33};
34use datafusion_expr::interval_arithmetic::{Interval, cardinality_ratio};
35
36/// The shared context used during the analysis of an expression. Includes
37/// the boundaries for all known columns.
38#[derive(Clone, Debug, PartialEq)]
39pub struct AnalysisContext {
40    // A list of known column boundaries, ordered by the index
41    // of the column in the current schema.
42    pub boundaries: Vec<ExprBoundaries>,
43    /// The estimated percentage of rows that this expression would select, if
44    /// it were to be used as a boolean predicate on a filter. The value will be
45    /// between 0.0 (selects nothing) and 1.0 (selects everything).
46    pub selectivity: Option<f64>,
47}
48
49impl AnalysisContext {
50    pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
51        Self {
52            boundaries,
53            selectivity: None,
54        }
55    }
56
57    pub fn with_selectivity(mut self, selectivity: f64) -> Self {
58        self.selectivity = Some(selectivity);
59        self
60    }
61
62    /// Create a new analysis context from column statistics.
63    pub fn try_from_statistics(
64        input_schema: &Schema,
65        statistics: &[ColumnStatistics],
66    ) -> Result<Self> {
67        statistics
68            .iter()
69            .enumerate()
70            .map(|(idx, stats)| ExprBoundaries::try_from_column(input_schema, stats, idx))
71            .collect::<Result<Vec<_>>>()
72            .map(Self::new)
73    }
74}
75
76/// Represents the boundaries (e.g. min and max values) of a particular column
77///
78/// This is used range analysis of expressions, to determine if the expression
79/// limits the value of particular columns (e.g. analyzing an expression such as
80/// `time < 50` would result in a boundary interval for `time` having a max
81/// value of `50`).
82#[derive(Clone, Debug, PartialEq)]
83pub struct ExprBoundaries {
84    pub column: Column,
85    /// Minimum and maximum values this expression can have. A `None` value
86    /// indicates that evaluating the given column results in an empty set.
87    /// For example, if the column `a` has values in the range [10, 20],
88    /// and there is a filter asserting that `a > 50`, then the resulting interval
89    /// range of `a` will be `None`.
90    pub interval: Option<Interval>,
91    /// Maximum number of distinct values this expression can produce, if known.
92    pub distinct_count: Precision<usize>,
93}
94
95impl ExprBoundaries {
96    /// Create a new `ExprBoundaries` object from column level statistics.
97    pub fn try_from_column(
98        schema: &Schema,
99        col_stats: &ColumnStatistics,
100        col_index: usize,
101    ) -> Result<Self> {
102        let field = schema.fields().get(col_index).ok_or_else(|| {
103            internal_datafusion_err!(
104                "Could not create `ExprBoundaries`: in `try_from_column` `col_index`
105                has gone out of bounds with a value of {col_index}, the schema has {} columns.",
106                schema.fields.len()
107            )
108        })?;
109        let empty_field =
110            ScalarValue::try_from(field.data_type()).unwrap_or(ScalarValue::Null);
111        let interval = Interval::try_new(
112            col_stats
113                .min_value
114                .get_value()
115                .cloned()
116                .unwrap_or_else(|| empty_field.clone()),
117            col_stats
118                .max_value
119                .get_value()
120                .cloned()
121                .unwrap_or(empty_field),
122        )?;
123        let column = Column::new(field.name(), col_index);
124        Ok(ExprBoundaries {
125            column,
126            interval: Some(interval),
127            distinct_count: col_stats.distinct_count,
128        })
129    }
130
131    /// Create `ExprBoundaries` that represent no known bounds for all the
132    /// columns in `schema`
133    pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
134        schema
135            .fields()
136            .iter()
137            .enumerate()
138            .map(|(i, field)| {
139                Ok(Self {
140                    column: Column::new(field.name(), i),
141                    interval: Some(Interval::make_unbounded(field.data_type())?),
142                    distinct_count: Precision::Absent,
143                })
144            })
145            .collect()
146    }
147}
148
149/// Attempts to refine column boundaries and compute a selectivity value.
150///
151/// The function accepts boundaries of the input columns in the `context` parameter.
152/// It then tries to tighten these boundaries based on the provided `expr`.
153/// The resulting selectivity value is calculated by comparing the initial and final boundaries.
154/// The computation assumes that the data within the column is uniformly distributed and not sorted.
155///
156/// # Arguments
157///
158/// * `context` - The context holding input column boundaries.
159/// * `expr` - The expression used to shrink the column boundaries.
160///
161/// # Returns
162///
163/// * `AnalysisContext` constructed by pruned boundaries and a selectivity value.
164pub fn analyze(
165    expr: &Arc<dyn PhysicalExpr>,
166    context: AnalysisContext,
167    schema: &Schema,
168) -> Result<AnalysisContext> {
169    let initial_boundaries = &context.boundaries;
170
171    if initial_boundaries
172        .iter()
173        .all(|bound| bound.interval.is_none())
174    {
175        assert_or_internal_err!(
176            !initial_boundaries
177                .iter()
178                .any(|bound| bound.distinct_count != Precision::Exact(0)),
179            "ExprBoundaries has a non-zero distinct count although it represents an empty table"
180        );
181        assert_or_internal_err!(
182            context.selectivity.unwrap_or(0.0) == 0.0,
183            "AnalysisContext has a non-zero selectivity although it represents an empty table"
184        );
185        Ok(context)
186    } else if initial_boundaries
187        .iter()
188        .any(|bound| bound.interval.is_none())
189    {
190        internal_err!(
191            "AnalysisContext is an inconsistent state. Some columns represent empty table while others don't"
192        )
193    } else {
194        let mut target_boundaries = context.boundaries;
195        let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
196        let columns = collect_columns(expr)
197            .into_iter()
198            .map(|c| Arc::new(c) as _)
199            .collect::<Vec<_>>();
200
201        let mut target_indices_and_boundaries = vec![];
202        let target_expr_and_indices = graph.gather_node_indices(columns.as_slice());
203
204        for (expr, index) in &target_expr_and_indices {
205            if let Some(column) = expr.downcast_ref::<Column>()
206                && let Some(bound) =
207                    target_boundaries.iter().find(|b| b.column == *column)
208            {
209                // Now, it's safe to unwrap
210                target_indices_and_boundaries
211                    .push((*index, bound.interval.as_ref().unwrap().clone()));
212            }
213        }
214
215        match graph.update_ranges(&mut target_indices_and_boundaries, Interval::TRUE)? {
216            PropagationResult::Success => {
217                shrink_boundaries(&graph, target_boundaries, &target_expr_and_indices)
218            }
219            PropagationResult::Infeasible => {
220                // If the propagation result is infeasible, set intervals to None
221                target_boundaries
222                    .iter_mut()
223                    .for_each(|bound| bound.interval = None);
224                Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
225            }
226            PropagationResult::CannotPropagate => {
227                Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
228            }
229        }
230    }
231}
232
233/// If the `PropagationResult` indicates success, this function calculates the
234/// selectivity value by comparing the initial and final column boundaries.
235/// Following this, it constructs and returns a new `AnalysisContext` with the
236/// updated parameters.
237fn shrink_boundaries(
238    graph: &ExprIntervalGraph,
239    mut target_boundaries: Vec<ExprBoundaries>,
240    target_expr_and_indices: &[(Arc<dyn PhysicalExpr>, usize)],
241) -> Result<AnalysisContext> {
242    let initial_boundaries = target_boundaries.clone();
243    target_expr_and_indices.iter().for_each(|(expr, i)| {
244        if let Some(column) = expr.downcast_ref::<Column>()
245            && let Some(bound) = target_boundaries
246                .iter_mut()
247                .find(|bound| bound.column.eq(column))
248        {
249            bound.interval = Some(graph.get_interval(*i));
250        };
251    });
252
253    let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?;
254
255    assert_or_internal_err!(
256        (0.0..=1.0).contains(&selectivity),
257        "Selectivity is out of limit: {selectivity}",
258    );
259
260    Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
261}
262
263/// Returns `Some(1.0 / distinct_count)` when the filter demonstrably collapsed
264/// a non-singleton interval down to a single point, i.e. an equality predicate
265/// was applied.  Returns `None` in all other cases, signalling that the caller
266/// should fall back to [`cardinality_ratio`].
267///
268/// The `initial_interval` guard prevents double-counting selectivity when the
269/// column statistics already described a singleton before any filter was
270/// applied: if the initial interval was already the same single point, no
271/// additional selectivity has been gained and the `1 / NDV` shortcut must not
272/// fire.
273fn singleton_selectivity(
274    initial_interval: &Interval,
275    target_interval: &Interval,
276    distinct_count: usize,
277) -> Option<f64> {
278    // The target must have collapsed to a single non-null value.
279    if distinct_count == 0
280        || target_interval.lower().is_null()
281        || target_interval.lower() != target_interval.upper()
282    {
283        return None;
284    }
285
286    // Only treat this as a newly-applied equality filter when the initial
287    // interval was not already that same singleton.  If it was, the stats
288    // already encoded this restriction and applying 1/NDV again would
289    // under-estimate the row count.
290    let initial_is_same_singleton = !initial_interval.lower().is_null()
291        && initial_interval.lower() == initial_interval.upper()
292        && initial_interval.lower() == target_interval.lower();
293
294    if initial_is_same_singleton {
295        return None;
296    }
297
298    Some(1.0 / distinct_count as f64)
299}
300
301/// This function calculates the filter predicate's selectivity by comparing
302/// the initial and pruned column boundaries. Selectivity is defined as the
303/// ratio of rows in a table that satisfy the filter's predicate.
304fn calculate_selectivity(
305    target_boundaries: &[ExprBoundaries],
306    initial_boundaries: &[ExprBoundaries],
307) -> Result<f64> {
308    // Since the intervals are assumed uniform and the values
309    // are not correlated, we need to multiply the selectivities
310    // of multiple columns to get the overall selectivity.
311    if target_boundaries.len() != initial_boundaries.len() {
312        return Err(internal_datafusion_err!(
313            "The number of columns in the initial and target boundaries should be the same"
314        ));
315    }
316    let mut acc: f64 = 1.0;
317    for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
318        match (initial.interval.as_ref(), target.interval.as_ref()) {
319            (Some(initial_interval), Some(target_interval)) => {
320                if let Precision::Exact(distinct_count)
321                | Precision::Inexact(distinct_count) = target.distinct_count
322                    && let Some(s) = singleton_selectivity(
323                        initial_interval,
324                        target_interval,
325                        distinct_count,
326                    )
327                {
328                    acc *= s;
329                    continue;
330                }
331                acc *= cardinality_ratio(initial_interval, target_interval);
332            }
333            (None, Some(_)) => {
334                return internal_err!(
335                    "Initial boundary cannot be None while having a Some() target boundary"
336                );
337            }
338            _ => return Ok(0.0),
339        }
340    }
341
342    Ok(acc)
343}
344
345#[cfg(test)]
346mod tests {
347    use std::sync::Arc;
348
349    use arrow::datatypes::{DataType, Field, Schema};
350    use datafusion_common::{DFSchema, ScalarValue, assert_contains, stats::Precision};
351    use datafusion_expr::{
352        Expr, col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit,
353    };
354
355    use crate::{AnalysisContext, create_physical_expr, expressions::Column};
356
357    use super::{ExprBoundaries, analyze, calculate_selectivity, singleton_selectivity};
358
359    fn make_field(name: &str, data_type: DataType) -> Field {
360        let nullable = false;
361        Field::new(name, data_type, nullable)
362    }
363
364    #[test]
365    fn test_analyze_boundary_exprs() {
366        let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
367
368        /// Test case containing (expression tree, lower bound, upper bound)
369        type TestCase = (Expr, Option<i32>, Option<i32>);
370
371        let test_cases: Vec<TestCase> = vec![
372            // a > 10
373            (col("a").gt(lit(10)), Some(11), None),
374            // a < 20
375            (col("a").lt(lit(20)), None, Some(19)),
376            // a > 10 AND a < 20
377            (
378                col("a").gt(lit(10)).and(col("a").lt(lit(20))),
379                Some(11),
380                Some(19),
381            ),
382            // a >= 10
383            (col("a").gt_eq(lit(10)), Some(10), None),
384            // a <= 20
385            (col("a").lt_eq(lit(20)), None, Some(20)),
386            // a >= 10 AND a <= 20
387            (
388                col("a").gt_eq(lit(10)).and(col("a").lt_eq(lit(20))),
389                Some(10),
390                Some(20),
391            ),
392            // a > 10 AND a < 20 AND a < 15
393            (
394                col("a")
395                    .gt(lit(10))
396                    .and(col("a").lt(lit(20)))
397                    .and(col("a").lt(lit(15))),
398                Some(11),
399                Some(14),
400            ),
401            // (a > 10 AND a < 20) AND (a > 15 AND a < 25)
402            (
403                col("a")
404                    .gt(lit(10))
405                    .and(col("a").lt(lit(20)))
406                    .and(col("a").gt(lit(15)))
407                    .and(col("a").lt(lit(25))),
408                Some(16),
409                Some(19),
410            ),
411        ];
412        for (expr, lower, upper) in test_cases {
413            let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
414            let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
415            let physical_expr =
416                create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
417            let analysis_result = analyze(
418                &physical_expr,
419                AnalysisContext::new(boundaries),
420                df_schema.as_ref(),
421            )
422            .unwrap();
423            let Some(actual) = &analysis_result.boundaries[0].interval else {
424                panic!(
425                    "The analysis result should contain non-empty intervals for all columns"
426                );
427            };
428            let expected = Interval::make(lower, upper).unwrap();
429            assert_eq!(
430                &expected, actual,
431                "did not get correct interval for SQL expression: {expr:?}"
432            );
433        }
434    }
435
436    #[test]
437    fn test_analyze_empty_set_boundary_exprs() {
438        let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
439
440        let test_cases: Vec<Expr> = vec![
441            // a > 10 AND a < 10
442            col("a").gt(lit(10)).and(col("a").lt(lit(10))),
443            // a > 5 AND (a < 20 OR a > 20)
444            // a > 10 AND a < 20
445            // (a > 10 AND a < 20) AND (a > 20 AND a < 30)
446            col("a")
447                .gt(lit(10))
448                .and(col("a").lt(lit(20)))
449                .and(col("a").gt(lit(20)))
450                .and(col("a").lt(lit(30))),
451        ];
452
453        for expr in test_cases {
454            let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
455            let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
456            let physical_expr =
457                create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
458            let analysis_result = analyze(
459                &physical_expr,
460                AnalysisContext::new(boundaries),
461                df_schema.as_ref(),
462            )
463            .unwrap();
464
465            for boundary in analysis_result.boundaries {
466                assert!(boundary.interval.is_none());
467            }
468        }
469    }
470
471    #[test]
472    fn test_analyze_invalid_boundary_exprs() {
473        let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
474        let expr = col("a").lt(lit(10)).or(col("a").gt(lit(20)));
475        let expected_error = "OR operator cannot yet propagate true intervals";
476        let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
477        let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
478        let physical_expr =
479            create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
480        let analysis_error = analyze(
481            &physical_expr,
482            AnalysisContext::new(boundaries),
483            df_schema.as_ref(),
484        )
485        .unwrap_err();
486        assert_contains!(analysis_error.to_string(), expected_error);
487    }
488
489    // ---------------------------------------------------------------------------
490    // Unit tests for singleton_selectivity and calculate_selectivity
491    // ---------------------------------------------------------------------------
492
493    fn make_boundary(lower: i32, upper: i32, distinct_count: usize) -> ExprBoundaries {
494        ExprBoundaries {
495            column: Column::new("a", 0),
496            interval: Some(
497                Interval::try_new(
498                    ScalarValue::Int32(Some(lower)),
499                    ScalarValue::Int32(Some(upper)),
500                )
501                .unwrap(),
502            ),
503            distinct_count: Precision::Exact(distinct_count),
504        }
505    }
506
507    /// When the initial interval is already the same singleton as the target,
508    /// `singleton_selectivity` must return `None` so we do not double-apply
509    /// 1/NDV selectivity.
510    #[test]
511    fn test_singleton_selectivity_skipped_when_initial_is_same_singleton() {
512        let singleton =
513            Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5)))
514                .unwrap();
515        // Both initial and target are [5, 5] — no new equality filter was applied.
516        assert_eq!(
517            singleton_selectivity(&singleton, &singleton, 10),
518            None,
519            "shortcut must not fire when initial interval was already the same singleton"
520        );
521    }
522
523    /// When the initial interval is a broader range and the target collapses to
524    /// a singleton, `singleton_selectivity` must return `Some(1/NDV)`.
525    #[test]
526    fn test_singleton_selectivity_applied_when_range_collapses() {
527        let initial =
528            Interval::try_new(ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100)))
529                .unwrap();
530        let target =
531            Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5)))
532                .unwrap();
533        let result = singleton_selectivity(&initial, &target, 10);
534        assert_eq!(
535            result,
536            Some(0.1),
537            "shortcut must return 1/NDV when a range collapses to a singleton"
538        );
539    }
540
541    /// Regression test: `calculate_selectivity` must not apply the `1/NDV`
542    /// shortcut when the column statistics already describe a singleton interval
543    /// (i.e. before the filter, the column only ever held one value).  In that
544    /// case the target and initial intervals are the same singleton, so the
545    /// cardinality ratio is 1.0 and the overall selectivity should remain 1.0.
546    #[test]
547    fn test_calculate_selectivity_already_singleton_initial_interval() {
548        let already_singleton = make_boundary(7, 7, 1);
549
550        let selectivity = calculate_selectivity(
551            std::slice::from_ref(&already_singleton),
552            std::slice::from_ref(&already_singleton),
553        )
554        .unwrap();
555
556        let wide_initial = make_boundary(1, 100, 50);
557        let same_singleton_target = make_boundary(7, 7, 50);
558        let selectivity_new =
559            calculate_selectivity(&[same_singleton_target], &[wide_initial]).unwrap();
560        assert!(
561            (selectivity_new - 0.02).abs() < 1e-10,
562            "expected selectivity 1/NDV = 0.02, got {selectivity_new}"
563        );
564
565        let singleton_initial = make_boundary(7, 7, 50);
566        let singleton_target = make_boundary(7, 7, 50);
567        let selectivity_no_new_filter =
568            calculate_selectivity(&[singleton_target], &[singleton_initial]).unwrap();
569        assert!(
570            (selectivity_no_new_filter - 1.0).abs() < 1e-10,
571            "expected selectivity 1.0 when initial was already the same singleton, got {selectivity_no_new_filter}"
572        );
573
574        let _ = selectivity; // silence unused warning
575    }
576}