dbx_core/sql/optimizer/
predicate_pushdown.rs1use crate::error::DbxResult;
6use crate::sql::planner::{BinaryOperator, Expr, LogicalPlan};
7
8use super::OptimizationRule;
9
10pub struct PredicatePushdownRule;
12
13impl OptimizationRule for PredicatePushdownRule {
14 fn name(&self) -> &str {
15 "PredicatePushdown"
16 }
17
18 fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
19 self.push_down(plan)
20 }
21}
22
23impl PredicatePushdownRule {
24 fn push_down(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
25 match plan {
26 LogicalPlan::Filter { input, predicate } => {
27 let optimized_input = self.push_down(*input)?;
28 match optimized_input {
29 LogicalPlan::Project {
30 input: project_input,
31 projections: columns,
32 } if self.can_push_through_project(&predicate, &columns) => {
33 let pushed = self.push_down(LogicalPlan::Filter {
34 input: project_input,
35 predicate,
36 })?;
37 Ok(LogicalPlan::Project {
38 input: Box::new(pushed),
39 projections: columns,
40 })
41 }
42 LogicalPlan::Scan {
43 table,
44 columns,
45 filter: None,
46 ros_files,
47 } => Ok(LogicalPlan::Scan {
48 table,
49 columns,
50 filter: Some(predicate),
51 ros_files,
52 }),
53 LogicalPlan::Scan {
54 table,
55 columns,
56 filter: Some(existing),
57 ros_files,
58 } => Ok(LogicalPlan::Scan {
59 table,
60 columns,
61 filter: Some(Expr::BinaryOp {
62 left: Box::new(existing),
63 op: BinaryOperator::And,
64 right: Box::new(predicate),
65 }),
66 ros_files,
67 }),
68 other => Ok(LogicalPlan::Filter {
69 input: Box::new(other),
70 predicate,
71 }),
72 }
73 }
74 LogicalPlan::Project { input, projections } => Ok(LogicalPlan::Project {
75 input: Box::new(self.push_down(*input)?),
76 projections,
77 }),
78 LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
79 input: Box::new(self.push_down(*input)?),
80 order_by,
81 }),
82 LogicalPlan::Limit {
83 input,
84 count,
85 offset,
86 } => Ok(LogicalPlan::Limit {
87 input: Box::new(self.push_down(*input)?),
88 count,
89 offset,
90 }),
91 LogicalPlan::Aggregate {
92 input,
93 group_by,
94 aggregates,
95 mode,
96 } => Ok(LogicalPlan::Aggregate {
97 input: Box::new(self.push_down(*input)?),
98 group_by,
99 aggregates,
100 mode,
101 }),
102 other => Ok(other),
103 }
104 }
105
106 fn can_push_through_project(
108 &self,
109 predicate: &Expr,
110 _projections: &[(Expr, Option<String>)],
111 ) -> bool {
112 self.is_simple_column_predicate(predicate)
113 }
114
115 fn is_simple_column_predicate(&self, expr: &Expr) -> bool {
116 match expr {
117 Expr::Column(_) | Expr::Literal(_) => true,
118 Expr::BinaryOp { left, right, .. } => {
119 self.is_simple_column_predicate(left) && self.is_simple_column_predicate(right)
120 }
121 Expr::IsNull(inner) | Expr::IsNotNull(inner) => self.is_simple_column_predicate(inner),
122 _ => false,
123 }
124 }
125}