1use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24use crate::expressions::Column;
25use crate::intervals::cp_solver::{ExprIntervalGraph, PropagationResult};
26use crate::utils::collect_columns;
27
28use arrow::datatypes::Schema;
29use datafusion_common::stats::Precision;
30use datafusion_common::{
31 ColumnStatistics, Result, ScalarValue, assert_or_internal_err,
32 internal_datafusion_err, internal_err,
33};
34use datafusion_expr::interval_arithmetic::{Interval, cardinality_ratio};
35
36#[derive(Clone, Debug, PartialEq)]
39pub struct AnalysisContext {
40 pub boundaries: Vec<ExprBoundaries>,
43 pub selectivity: Option<f64>,
47}
48
49impl AnalysisContext {
50 pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
51 Self {
52 boundaries,
53 selectivity: None,
54 }
55 }
56
57 pub fn with_selectivity(mut self, selectivity: f64) -> Self {
58 self.selectivity = Some(selectivity);
59 self
60 }
61
62 pub fn try_from_statistics(
64 input_schema: &Schema,
65 statistics: &[ColumnStatistics],
66 ) -> Result<Self> {
67 statistics
68 .iter()
69 .enumerate()
70 .map(|(idx, stats)| ExprBoundaries::try_from_column(input_schema, stats, idx))
71 .collect::<Result<Vec<_>>>()
72 .map(Self::new)
73 }
74}
75
76#[derive(Clone, Debug, PartialEq)]
83pub struct ExprBoundaries {
84 pub column: Column,
85 pub interval: Option<Interval>,
91 pub distinct_count: Precision<usize>,
93}
94
95impl ExprBoundaries {
96 pub fn try_from_column(
98 schema: &Schema,
99 col_stats: &ColumnStatistics,
100 col_index: usize,
101 ) -> Result<Self> {
102 let field = schema.fields().get(col_index).ok_or_else(|| {
103 internal_datafusion_err!(
104 "Could not create `ExprBoundaries`: in `try_from_column` `col_index`
105 has gone out of bounds with a value of {col_index}, the schema has {} columns.",
106 schema.fields.len()
107 )
108 })?;
109 let empty_field =
110 ScalarValue::try_from(field.data_type()).unwrap_or(ScalarValue::Null);
111 let interval = Interval::try_new(
112 col_stats
113 .min_value
114 .get_value()
115 .cloned()
116 .unwrap_or_else(|| empty_field.clone()),
117 col_stats
118 .max_value
119 .get_value()
120 .cloned()
121 .unwrap_or(empty_field),
122 )?;
123 let column = Column::new(field.name(), col_index);
124 Ok(ExprBoundaries {
125 column,
126 interval: Some(interval),
127 distinct_count: col_stats.distinct_count,
128 })
129 }
130
131 pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
134 schema
135 .fields()
136 .iter()
137 .enumerate()
138 .map(|(i, field)| {
139 Ok(Self {
140 column: Column::new(field.name(), i),
141 interval: Some(Interval::make_unbounded(field.data_type())?),
142 distinct_count: Precision::Absent,
143 })
144 })
145 .collect()
146 }
147}
148
149pub fn analyze(
165 expr: &Arc<dyn PhysicalExpr>,
166 context: AnalysisContext,
167 schema: &Schema,
168) -> Result<AnalysisContext> {
169 let initial_boundaries = &context.boundaries;
170
171 if initial_boundaries
172 .iter()
173 .all(|bound| bound.interval.is_none())
174 {
175 assert_or_internal_err!(
176 !initial_boundaries
177 .iter()
178 .any(|bound| bound.distinct_count != Precision::Exact(0)),
179 "ExprBoundaries has a non-zero distinct count although it represents an empty table"
180 );
181 assert_or_internal_err!(
182 context.selectivity.unwrap_or(0.0) == 0.0,
183 "AnalysisContext has a non-zero selectivity although it represents an empty table"
184 );
185 Ok(context)
186 } else if initial_boundaries
187 .iter()
188 .any(|bound| bound.interval.is_none())
189 {
190 internal_err!(
191 "AnalysisContext is an inconsistent state. Some columns represent empty table while others don't"
192 )
193 } else {
194 let mut target_boundaries = context.boundaries;
195 let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
196 let columns = collect_columns(expr)
197 .into_iter()
198 .map(|c| Arc::new(c) as _)
199 .collect::<Vec<_>>();
200
201 let mut target_indices_and_boundaries = vec![];
202 let target_expr_and_indices = graph.gather_node_indices(columns.as_slice());
203
204 for (expr, index) in &target_expr_and_indices {
205 if let Some(column) = expr.downcast_ref::<Column>()
206 && let Some(bound) =
207 target_boundaries.iter().find(|b| b.column == *column)
208 {
209 target_indices_and_boundaries
211 .push((*index, bound.interval.as_ref().unwrap().clone()));
212 }
213 }
214
215 match graph.update_ranges(&mut target_indices_and_boundaries, Interval::TRUE)? {
216 PropagationResult::Success => {
217 shrink_boundaries(&graph, target_boundaries, &target_expr_and_indices)
218 }
219 PropagationResult::Infeasible => {
220 target_boundaries
222 .iter_mut()
223 .for_each(|bound| bound.interval = None);
224 Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
225 }
226 PropagationResult::CannotPropagate => {
227 Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
228 }
229 }
230 }
231}
232
233fn shrink_boundaries(
238 graph: &ExprIntervalGraph,
239 mut target_boundaries: Vec<ExprBoundaries>,
240 target_expr_and_indices: &[(Arc<dyn PhysicalExpr>, usize)],
241) -> Result<AnalysisContext> {
242 let initial_boundaries = target_boundaries.clone();
243 target_expr_and_indices.iter().for_each(|(expr, i)| {
244 if let Some(column) = expr.downcast_ref::<Column>()
245 && let Some(bound) = target_boundaries
246 .iter_mut()
247 .find(|bound| bound.column.eq(column))
248 {
249 bound.interval = Some(graph.get_interval(*i));
250 };
251 });
252
253 let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?;
254
255 assert_or_internal_err!(
256 (0.0..=1.0).contains(&selectivity),
257 "Selectivity is out of limit: {selectivity}",
258 );
259
260 Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
261}
262
263fn singleton_selectivity(
274 initial_interval: &Interval,
275 target_interval: &Interval,
276 distinct_count: usize,
277) -> Option<f64> {
278 if distinct_count == 0
280 || target_interval.lower().is_null()
281 || target_interval.lower() != target_interval.upper()
282 {
283 return None;
284 }
285
286 let initial_is_same_singleton = !initial_interval.lower().is_null()
291 && initial_interval.lower() == initial_interval.upper()
292 && initial_interval.lower() == target_interval.lower();
293
294 if initial_is_same_singleton {
295 return None;
296 }
297
298 Some(1.0 / distinct_count as f64)
299}
300
301fn calculate_selectivity(
305 target_boundaries: &[ExprBoundaries],
306 initial_boundaries: &[ExprBoundaries],
307) -> Result<f64> {
308 if target_boundaries.len() != initial_boundaries.len() {
312 return Err(internal_datafusion_err!(
313 "The number of columns in the initial and target boundaries should be the same"
314 ));
315 }
316 let mut acc: f64 = 1.0;
317 for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
318 match (initial.interval.as_ref(), target.interval.as_ref()) {
319 (Some(initial_interval), Some(target_interval)) => {
320 if let Precision::Exact(distinct_count)
321 | Precision::Inexact(distinct_count) = target.distinct_count
322 && let Some(s) = singleton_selectivity(
323 initial_interval,
324 target_interval,
325 distinct_count,
326 )
327 {
328 acc *= s;
329 continue;
330 }
331 acc *= cardinality_ratio(initial_interval, target_interval);
332 }
333 (None, Some(_)) => {
334 return internal_err!(
335 "Initial boundary cannot be None while having a Some() target boundary"
336 );
337 }
338 _ => return Ok(0.0),
339 }
340 }
341
342 Ok(acc)
343}
344
345#[cfg(test)]
346mod tests {
347 use std::sync::Arc;
348
349 use arrow::datatypes::{DataType, Field, Schema};
350 use datafusion_common::{DFSchema, ScalarValue, assert_contains, stats::Precision};
351 use datafusion_expr::{
352 Expr, col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit,
353 };
354
355 use crate::{AnalysisContext, create_physical_expr, expressions::Column};
356
357 use super::{ExprBoundaries, analyze, calculate_selectivity, singleton_selectivity};
358
359 fn make_field(name: &str, data_type: DataType) -> Field {
360 let nullable = false;
361 Field::new(name, data_type, nullable)
362 }
363
364 #[test]
365 fn test_analyze_boundary_exprs() {
366 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
367
368 type TestCase = (Expr, Option<i32>, Option<i32>);
370
371 let test_cases: Vec<TestCase> = vec![
372 (col("a").gt(lit(10)), Some(11), None),
374 (col("a").lt(lit(20)), None, Some(19)),
376 (
378 col("a").gt(lit(10)).and(col("a").lt(lit(20))),
379 Some(11),
380 Some(19),
381 ),
382 (col("a").gt_eq(lit(10)), Some(10), None),
384 (col("a").lt_eq(lit(20)), None, Some(20)),
386 (
388 col("a").gt_eq(lit(10)).and(col("a").lt_eq(lit(20))),
389 Some(10),
390 Some(20),
391 ),
392 (
394 col("a")
395 .gt(lit(10))
396 .and(col("a").lt(lit(20)))
397 .and(col("a").lt(lit(15))),
398 Some(11),
399 Some(14),
400 ),
401 (
403 col("a")
404 .gt(lit(10))
405 .and(col("a").lt(lit(20)))
406 .and(col("a").gt(lit(15)))
407 .and(col("a").lt(lit(25))),
408 Some(16),
409 Some(19),
410 ),
411 ];
412 for (expr, lower, upper) in test_cases {
413 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
414 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
415 let physical_expr =
416 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
417 let analysis_result = analyze(
418 &physical_expr,
419 AnalysisContext::new(boundaries),
420 df_schema.as_ref(),
421 )
422 .unwrap();
423 let Some(actual) = &analysis_result.boundaries[0].interval else {
424 panic!(
425 "The analysis result should contain non-empty intervals for all columns"
426 );
427 };
428 let expected = Interval::make(lower, upper).unwrap();
429 assert_eq!(
430 &expected, actual,
431 "did not get correct interval for SQL expression: {expr:?}"
432 );
433 }
434 }
435
436 #[test]
437 fn test_analyze_empty_set_boundary_exprs() {
438 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
439
440 let test_cases: Vec<Expr> = vec![
441 col("a").gt(lit(10)).and(col("a").lt(lit(10))),
443 col("a")
447 .gt(lit(10))
448 .and(col("a").lt(lit(20)))
449 .and(col("a").gt(lit(20)))
450 .and(col("a").lt(lit(30))),
451 ];
452
453 for expr in test_cases {
454 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
455 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
456 let physical_expr =
457 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
458 let analysis_result = analyze(
459 &physical_expr,
460 AnalysisContext::new(boundaries),
461 df_schema.as_ref(),
462 )
463 .unwrap();
464
465 for boundary in analysis_result.boundaries {
466 assert!(boundary.interval.is_none());
467 }
468 }
469 }
470
471 #[test]
472 fn test_analyze_invalid_boundary_exprs() {
473 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
474 let expr = col("a").lt(lit(10)).or(col("a").gt(lit(20)));
475 let expected_error = "OR operator cannot yet propagate true intervals";
476 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
477 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
478 let physical_expr =
479 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
480 let analysis_error = analyze(
481 &physical_expr,
482 AnalysisContext::new(boundaries),
483 df_schema.as_ref(),
484 )
485 .unwrap_err();
486 assert_contains!(analysis_error.to_string(), expected_error);
487 }
488
489 fn make_boundary(lower: i32, upper: i32, distinct_count: usize) -> ExprBoundaries {
494 ExprBoundaries {
495 column: Column::new("a", 0),
496 interval: Some(
497 Interval::try_new(
498 ScalarValue::Int32(Some(lower)),
499 ScalarValue::Int32(Some(upper)),
500 )
501 .unwrap(),
502 ),
503 distinct_count: Precision::Exact(distinct_count),
504 }
505 }
506
507 #[test]
511 fn test_singleton_selectivity_skipped_when_initial_is_same_singleton() {
512 let singleton =
513 Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5)))
514 .unwrap();
515 assert_eq!(
517 singleton_selectivity(&singleton, &singleton, 10),
518 None,
519 "shortcut must not fire when initial interval was already the same singleton"
520 );
521 }
522
523 #[test]
526 fn test_singleton_selectivity_applied_when_range_collapses() {
527 let initial =
528 Interval::try_new(ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100)))
529 .unwrap();
530 let target =
531 Interval::try_new(ScalarValue::Int32(Some(5)), ScalarValue::Int32(Some(5)))
532 .unwrap();
533 let result = singleton_selectivity(&initial, &target, 10);
534 assert_eq!(
535 result,
536 Some(0.1),
537 "shortcut must return 1/NDV when a range collapses to a singleton"
538 );
539 }
540
541 #[test]
547 fn test_calculate_selectivity_already_singleton_initial_interval() {
548 let already_singleton = make_boundary(7, 7, 1);
549
550 let selectivity = calculate_selectivity(
551 std::slice::from_ref(&already_singleton),
552 std::slice::from_ref(&already_singleton),
553 )
554 .unwrap();
555
556 let wide_initial = make_boundary(1, 100, 50);
557 let same_singleton_target = make_boundary(7, 7, 50);
558 let selectivity_new =
559 calculate_selectivity(&[same_singleton_target], &[wide_initial]).unwrap();
560 assert!(
561 (selectivity_new - 0.02).abs() < 1e-10,
562 "expected selectivity 1/NDV = 0.02, got {selectivity_new}"
563 );
564
565 let singleton_initial = make_boundary(7, 7, 50);
566 let singleton_target = make_boundary(7, 7, 50);
567 let selectivity_no_new_filter =
568 calculate_selectivity(&[singleton_target], &[singleton_initial]).unwrap();
569 assert!(
570 (selectivity_no_new_filter - 1.0).abs() < 1e-10,
571 "expected selectivity 1.0 when initial was already the same singleton, got {selectivity_no_new_filter}"
572 );
573
574 let _ = selectivity; }
576}