dbx_core/sql/optimizer/
projection_pushdown.rs1use crate::error::DbxResult;
6use crate::sql::planner::{Expr, LogicalPlan};
7
8use super::OptimizationRule;
9
10pub struct ProjectionPushdownRule;
12
13impl OptimizationRule for ProjectionPushdownRule {
14 fn name(&self) -> &str {
15 "ProjectionPushdown"
16 }
17
18 fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
19 self.push_down(plan)
20 }
21}
22
23impl ProjectionPushdownRule {
24 fn push_down(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
25 match plan {
26 LogicalPlan::Project {
27 input,
28 projections: columns,
29 } => {
30 let optimized_input = self.push_down(*input)?;
31 match optimized_input {
32 LogicalPlan::Scan {
33 table,
34 columns: scan_cols,
35 filter,
36 ros_files,
37 } if !columns.is_empty() => {
38 let needed = self.extract_column_names(&columns);
39 let final_cols = if scan_cols.is_empty() {
40 needed
41 } else {
42 scan_cols
43 .into_iter()
44 .filter(|c| needed.contains(c))
45 .collect()
46 };
47 Ok(LogicalPlan::Project {
48 input: Box::new(LogicalPlan::Scan {
49 table,
50 columns: final_cols,
51 filter,
52 ros_files,
53 }),
54 projections: columns,
55 })
56 }
57 other => Ok(LogicalPlan::Project {
58 input: Box::new(other),
59 projections: columns,
60 }),
61 }
62 }
63 LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
64 input: Box::new(self.push_down(*input)?),
65 predicate,
66 }),
67 LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
68 input: Box::new(self.push_down(*input)?),
69 order_by,
70 }),
71 LogicalPlan::Limit {
72 input,
73 count,
74 offset,
75 } => Ok(LogicalPlan::Limit {
76 input: Box::new(self.push_down(*input)?),
77 count,
78 offset,
79 }),
80 LogicalPlan::Aggregate {
81 input,
82 group_by,
83 aggregates,
84 mode,
85 } => Ok(LogicalPlan::Aggregate {
86 input: Box::new(self.push_down(*input)?),
87 group_by,
88 aggregates,
89 mode,
90 }),
91 other => Ok(other),
92 }
93 }
94
95 fn extract_column_names(&self, projections: &[(Expr, Option<String>)]) -> Vec<String> {
97 let mut names = Vec::new();
98 for (expr, _) in projections {
99 self.collect_columns(expr, &mut names);
100 }
101 names.sort();
102 names.dedup();
103 names
104 }
105
106 fn collect_columns(&self, expr: &Expr, out: &mut Vec<String>) {
107 match expr {
108 Expr::Column(name) => out.push(name.clone()),
109 Expr::BinaryOp { left, right, .. } => {
110 self.collect_columns(left, out);
111 self.collect_columns(right, out);
112 }
113 Expr::Function { args, .. } => {
114 for arg in args {
115 self.collect_columns(arg, out);
116 }
117 }
118 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
119 self.collect_columns(inner, out);
120 }
121 Expr::InList { expr, list, .. } => {
122 self.collect_columns(expr, out);
123 for item in list {
124 self.collect_columns(item, out);
125 }
126 }
127 _ => {}
128 }
129 }
130}