dbx_core/sql/optimizer/
projection_pushdown.rs1use crate::error::DbxResult;
6use crate::sql::planner::{Expr, LogicalPlan};
7
8use super::OptimizationRule;
9
10pub struct ProjectionPushdownRule;
12
13impl OptimizationRule for ProjectionPushdownRule {
14 fn name(&self) -> &str {
15 "ProjectionPushdown"
16 }
17
18 fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
19 self.push_down(plan)
20 }
21}
22
23impl ProjectionPushdownRule {
24 fn push_down(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
25 match plan {
26 LogicalPlan::Project {
27 input,
28 projections: columns,
29 } => {
30 let optimized_input = self.push_down(*input)?;
31 match optimized_input {
32 LogicalPlan::Scan {
33 table,
34 columns: scan_cols,
35 filter,
36 } if !columns.is_empty() => {
37 let needed = self.extract_column_names(&columns);
38 let final_cols = if scan_cols.is_empty() {
39 needed
40 } else {
41 scan_cols
42 .into_iter()
43 .filter(|c| needed.contains(c))
44 .collect()
45 };
46 Ok(LogicalPlan::Project {
47 input: Box::new(LogicalPlan::Scan {
48 table,
49 columns: final_cols,
50 filter,
51 }),
52 projections: columns,
53 })
54 }
55 other => Ok(LogicalPlan::Project {
56 input: Box::new(other),
57 projections: columns,
58 }),
59 }
60 }
61 LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
62 input: Box::new(self.push_down(*input)?),
63 predicate,
64 }),
65 LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
66 input: Box::new(self.push_down(*input)?),
67 order_by,
68 }),
69 LogicalPlan::Limit {
70 input,
71 count,
72 offset,
73 } => Ok(LogicalPlan::Limit {
74 input: Box::new(self.push_down(*input)?),
75 count,
76 offset,
77 }),
78 LogicalPlan::Aggregate {
79 input,
80 group_by,
81 aggregates,
82 } => Ok(LogicalPlan::Aggregate {
83 input: Box::new(self.push_down(*input)?),
84 group_by,
85 aggregates,
86 }),
87 other => Ok(other),
88 }
89 }
90
91 fn extract_column_names(&self, projections: &[(Expr, Option<String>)]) -> Vec<String> {
93 let mut names = Vec::new();
94 for (expr, _) in projections {
95 self.collect_columns(expr, &mut names);
96 }
97 names.sort();
98 names.dedup();
99 names
100 }
101
102 fn collect_columns(&self, expr: &Expr, out: &mut Vec<String>) {
103 match expr {
104 Expr::Column(name) => out.push(name.clone()),
105 Expr::BinaryOp { left, right, .. } => {
106 self.collect_columns(left, out);
107 self.collect_columns(right, out);
108 }
109 Expr::Function { args, .. } => {
110 for arg in args {
111 self.collect_columns(arg, out);
112 }
113 }
114 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
115 self.collect_columns(inner, out);
116 }
117 Expr::InList { expr, list, .. } => {
118 self.collect_columns(expr, out);
119 for item in list {
120 self.collect_columns(item, out);
121 }
122 }
123 _ => {}
124 }
125 }
126}