datafusion_physical_expr/
analysis.rs1use std::fmt::Debug;
21use std::sync::Arc;
22
23use crate::PhysicalExpr;
24use crate::expressions::Column;
25use crate::intervals::cp_solver::{ExprIntervalGraph, PropagationResult};
26use crate::utils::collect_columns;
27
28use arrow::datatypes::Schema;
29use datafusion_common::stats::Precision;
30use datafusion_common::{
31 ColumnStatistics, Result, ScalarValue, assert_or_internal_err,
32 internal_datafusion_err, internal_err,
33};
34use datafusion_expr::interval_arithmetic::{Interval, cardinality_ratio};
35
36#[derive(Clone, Debug, PartialEq)]
39pub struct AnalysisContext {
40 pub boundaries: Vec<ExprBoundaries>,
43 pub selectivity: Option<f64>,
47}
48
49impl AnalysisContext {
50 pub fn new(boundaries: Vec<ExprBoundaries>) -> Self {
51 Self {
52 boundaries,
53 selectivity: None,
54 }
55 }
56
57 pub fn with_selectivity(mut self, selectivity: f64) -> Self {
58 self.selectivity = Some(selectivity);
59 self
60 }
61
62 pub fn try_from_statistics(
64 input_schema: &Schema,
65 statistics: &[ColumnStatistics],
66 ) -> Result<Self> {
67 statistics
68 .iter()
69 .enumerate()
70 .map(|(idx, stats)| ExprBoundaries::try_from_column(input_schema, stats, idx))
71 .collect::<Result<Vec<_>>>()
72 .map(Self::new)
73 }
74}
75
76#[derive(Clone, Debug, PartialEq)]
83pub struct ExprBoundaries {
84 pub column: Column,
85 pub interval: Option<Interval>,
91 pub distinct_count: Precision<usize>,
93}
94
95impl ExprBoundaries {
96 pub fn try_from_column(
98 schema: &Schema,
99 col_stats: &ColumnStatistics,
100 col_index: usize,
101 ) -> Result<Self> {
102 let field = schema.fields().get(col_index).ok_or_else(|| {
103 internal_datafusion_err!(
104 "Could not create `ExprBoundaries`: in `try_from_column` `col_index`
105 has gone out of bounds with a value of {col_index}, the schema has {} columns.",
106 schema.fields.len()
107 )
108 })?;
109 let empty_field =
110 ScalarValue::try_from(field.data_type()).unwrap_or(ScalarValue::Null);
111 let interval = Interval::try_new(
112 col_stats
113 .min_value
114 .get_value()
115 .cloned()
116 .unwrap_or_else(|| empty_field.clone()),
117 col_stats
118 .max_value
119 .get_value()
120 .cloned()
121 .unwrap_or(empty_field),
122 )?;
123 let column = Column::new(field.name(), col_index);
124 Ok(ExprBoundaries {
125 column,
126 interval: Some(interval),
127 distinct_count: col_stats.distinct_count,
128 })
129 }
130
131 pub fn try_new_unbounded(schema: &Schema) -> Result<Vec<Self>> {
134 schema
135 .fields()
136 .iter()
137 .enumerate()
138 .map(|(i, field)| {
139 Ok(Self {
140 column: Column::new(field.name(), i),
141 interval: Some(Interval::make_unbounded(field.data_type())?),
142 distinct_count: Precision::Absent,
143 })
144 })
145 .collect()
146 }
147}
148
149pub fn analyze(
165 expr: &Arc<dyn PhysicalExpr>,
166 context: AnalysisContext,
167 schema: &Schema,
168) -> Result<AnalysisContext> {
169 let initial_boundaries = &context.boundaries;
170 if initial_boundaries
171 .iter()
172 .all(|bound| bound.interval.is_none())
173 {
174 assert_or_internal_err!(
175 !initial_boundaries
176 .iter()
177 .any(|bound| bound.distinct_count != Precision::Exact(0)),
178 "ExprBoundaries has a non-zero distinct count although it represents an empty table"
179 );
180 assert_or_internal_err!(
181 context.selectivity == Some(0.0),
182 "AnalysisContext has a non-zero selectivity although it represents an empty table"
183 );
184 Ok(context)
185 } else if initial_boundaries
186 .iter()
187 .any(|bound| bound.interval.is_none())
188 {
189 internal_err!(
190 "AnalysisContext is an inconsistent state. Some columns represent empty table while others don't"
191 )
192 } else {
193 let mut target_boundaries = context.boundaries;
194 let mut graph = ExprIntervalGraph::try_new(Arc::clone(expr), schema)?;
195 let columns = collect_columns(expr)
196 .into_iter()
197 .map(|c| Arc::new(c) as _)
198 .collect::<Vec<_>>();
199
200 let mut target_indices_and_boundaries = vec![];
201 let target_expr_and_indices = graph.gather_node_indices(columns.as_slice());
202
203 for (expr, index) in &target_expr_and_indices {
204 if let Some(column) = expr.as_any().downcast_ref::<Column>()
205 && let Some(bound) =
206 target_boundaries.iter().find(|b| b.column == *column)
207 {
208 target_indices_and_boundaries
210 .push((*index, bound.interval.as_ref().unwrap().clone()));
211 }
212 }
213
214 match graph.update_ranges(&mut target_indices_and_boundaries, Interval::TRUE)? {
215 PropagationResult::Success => {
216 shrink_boundaries(&graph, target_boundaries, &target_expr_and_indices)
217 }
218 PropagationResult::Infeasible => {
219 target_boundaries
221 .iter_mut()
222 .for_each(|bound| bound.interval = None);
223 Ok(AnalysisContext::new(target_boundaries).with_selectivity(0.0))
224 }
225 PropagationResult::CannotPropagate => {
226 Ok(AnalysisContext::new(target_boundaries).with_selectivity(1.0))
227 }
228 }
229 }
230}
231
232fn shrink_boundaries(
237 graph: &ExprIntervalGraph,
238 mut target_boundaries: Vec<ExprBoundaries>,
239 target_expr_and_indices: &[(Arc<dyn PhysicalExpr>, usize)],
240) -> Result<AnalysisContext> {
241 let initial_boundaries = target_boundaries.clone();
242 target_expr_and_indices.iter().for_each(|(expr, i)| {
243 if let Some(column) = expr.as_any().downcast_ref::<Column>()
244 && let Some(bound) = target_boundaries
245 .iter_mut()
246 .find(|bound| bound.column.eq(column))
247 {
248 bound.interval = Some(graph.get_interval(*i));
249 };
250 });
251
252 let selectivity = calculate_selectivity(&target_boundaries, &initial_boundaries)?;
253
254 assert_or_internal_err!(
255 (0.0..=1.0).contains(&selectivity),
256 "Selectivity is out of limit: {selectivity}",
257 );
258
259 Ok(AnalysisContext::new(target_boundaries).with_selectivity(selectivity))
260}
261
262fn calculate_selectivity(
266 target_boundaries: &[ExprBoundaries],
267 initial_boundaries: &[ExprBoundaries],
268) -> Result<f64> {
269 if target_boundaries.len() != initial_boundaries.len() {
273 return Err(internal_datafusion_err!(
274 "The number of columns in the initial and target boundaries should be the same"
275 ));
276 }
277 let mut acc: f64 = 1.0;
278 for (initial, target) in initial_boundaries.iter().zip(target_boundaries) {
279 match (initial.interval.as_ref(), target.interval.as_ref()) {
280 (Some(initial), Some(target)) => {
281 acc *= cardinality_ratio(initial, target);
282 }
283 (None, Some(_)) => {
284 return internal_err!(
285 "Initial boundary cannot be None while having a Some() target boundary"
286 );
287 }
288 _ => return Ok(0.0),
289 }
290 }
291
292 Ok(acc)
293}
294
295#[cfg(test)]
296mod tests {
297 use std::sync::Arc;
298
299 use arrow::datatypes::{DataType, Field, Schema};
300 use datafusion_common::{DFSchema, assert_contains};
301 use datafusion_expr::{
302 Expr, col, execution_props::ExecutionProps, interval_arithmetic::Interval, lit,
303 };
304
305 use crate::{AnalysisContext, create_physical_expr};
306
307 use super::{ExprBoundaries, analyze};
308
309 fn make_field(name: &str, data_type: DataType) -> Field {
310 let nullable = false;
311 Field::new(name, data_type, nullable)
312 }
313
314 #[test]
315 fn test_analyze_boundary_exprs() {
316 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
317
318 type TestCase = (Expr, Option<i32>, Option<i32>);
320
321 let test_cases: Vec<TestCase> = vec![
322 (col("a").gt(lit(10)), Some(11), None),
324 (col("a").lt(lit(20)), None, Some(19)),
326 (
328 col("a").gt(lit(10)).and(col("a").lt(lit(20))),
329 Some(11),
330 Some(19),
331 ),
332 (col("a").gt_eq(lit(10)), Some(10), None),
334 (col("a").lt_eq(lit(20)), None, Some(20)),
336 (
338 col("a").gt_eq(lit(10)).and(col("a").lt_eq(lit(20))),
339 Some(10),
340 Some(20),
341 ),
342 (
344 col("a")
345 .gt(lit(10))
346 .and(col("a").lt(lit(20)))
347 .and(col("a").lt(lit(15))),
348 Some(11),
349 Some(14),
350 ),
351 (
353 col("a")
354 .gt(lit(10))
355 .and(col("a").lt(lit(20)))
356 .and(col("a").gt(lit(15)))
357 .and(col("a").lt(lit(25))),
358 Some(16),
359 Some(19),
360 ),
361 ];
362 for (expr, lower, upper) in test_cases {
363 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
364 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
365 let physical_expr =
366 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
367 let analysis_result = analyze(
368 &physical_expr,
369 AnalysisContext::new(boundaries),
370 df_schema.as_ref(),
371 )
372 .unwrap();
373 let Some(actual) = &analysis_result.boundaries[0].interval else {
374 panic!(
375 "The analysis result should contain non-empty intervals for all columns"
376 );
377 };
378 let expected = Interval::make(lower, upper).unwrap();
379 assert_eq!(
380 &expected, actual,
381 "did not get correct interval for SQL expression: {expr:?}"
382 );
383 }
384 }
385
386 #[test]
387 fn test_analyze_empty_set_boundary_exprs() {
388 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
389
390 let test_cases: Vec<Expr> = vec![
391 col("a").gt(lit(10)).and(col("a").lt(lit(10))),
393 col("a")
397 .gt(lit(10))
398 .and(col("a").lt(lit(20)))
399 .and(col("a").gt(lit(20)))
400 .and(col("a").lt(lit(30))),
401 ];
402
403 for expr in test_cases {
404 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
405 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
406 let physical_expr =
407 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
408 let analysis_result = analyze(
409 &physical_expr,
410 AnalysisContext::new(boundaries),
411 df_schema.as_ref(),
412 )
413 .unwrap();
414
415 for boundary in analysis_result.boundaries {
416 assert!(boundary.interval.is_none());
417 }
418 }
419 }
420
421 #[test]
422 fn test_analyze_invalid_boundary_exprs() {
423 let schema = Arc::new(Schema::new(vec![make_field("a", DataType::Int32)]));
424 let expr = col("a").lt(lit(10)).or(col("a").gt(lit(20)));
425 let expected_error = "OR operator cannot yet propagate true intervals";
426 let boundaries = ExprBoundaries::try_new_unbounded(&schema).unwrap();
427 let df_schema = DFSchema::try_from(Arc::clone(&schema)).unwrap();
428 let physical_expr =
429 create_physical_expr(&expr, &df_schema, &ExecutionProps::new()).unwrap();
430 let analysis_error = analyze(
431 &physical_expr,
432 AnalysisContext::new(boundaries),
433 df_schema.as_ref(),
434 )
435 .unwrap_err();
436 assert_contains!(analysis_error.to_string(), expected_error);
437 }
438}