dbx_core/sql/optimizer/
distributed_pushdown.rs1use super::OptimizationRule;
8use crate::error::DbxResult;
9use crate::sql::planner::{AggregateMode, LogicalPlan};
10
11pub struct DistributedPushdownRule;
12
13impl OptimizationRule for DistributedPushdownRule {
14 fn name(&self) -> &str {
15 "DistributedPushdown"
16 }
17
18 fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
19 self.push_down(plan)
20 }
21}
22
23impl DistributedPushdownRule {
24 fn push_down(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
25 match plan {
26 LogicalPlan::Project { input, projections } => Ok(LogicalPlan::Project {
28 input: Box::new(self.push_down(*input)?),
29 projections,
30 }),
31 LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
32 input: Box::new(self.push_down(*input)?),
33 predicate,
34 }),
35 LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
36 input: Box::new(self.push_down(*input)?),
37 order_by,
38 }),
39 LogicalPlan::Limit {
40 input,
41 count,
42 offset,
43 } => Ok(LogicalPlan::Limit {
44 input: Box::new(self.push_down(*input)?),
45 count,
46 offset,
47 }),
48 LogicalPlan::Aggregate {
49 input,
50 group_by,
51 aggregates,
52 mode,
53 } => {
54 let pushed_input = self.push_down(*input)?;
55
56 if mode == AggregateMode::Simple {
57 let partial_agg = LogicalPlan::Aggregate {
59 input: Box::new(pushed_input),
60 group_by: group_by.clone(),
61 aggregates: aggregates.clone(),
62 mode: AggregateMode::Partial,
63 };
64
65 Ok(LogicalPlan::Aggregate {
69 input: Box::new(partial_agg),
70 group_by,
71 aggregates,
72 mode: AggregateMode::Final,
73 })
74 } else {
75 Ok(LogicalPlan::Aggregate {
76 input: Box::new(pushed_input),
77 group_by,
78 aggregates,
79 mode,
80 })
81 }
82 }
83 LogicalPlan::Join {
85 left,
86 right,
87 join_type,
88 on,
89 } => Ok(LogicalPlan::Join {
90 left: Box::new(self.push_down(*left)?),
91 right: Box::new(self.push_down(*right)?),
92 join_type,
93 on,
94 }),
95 other => Ok(other),
96 }
97 }
98}