Skip to main content

dbx_core/sql/optimizer/
distributed_pushdown.rs

1//! Rule: Distributed Pushdown
2//!
3//! 분산 환경(Grid)에 최적화된 실행 계획을 생성합니다.
4//! - Partial Aggregate Pushdown: `Aggregate` 노드를 Shard 수준의 `LocalAggregate`와 Coordinator 수준의 `GlobalAggregate`로 분리
5//! - Limit Pushdown: 분산 실행 환경에서 불필요한 네트워크 전송을 줄이기 위해 부분 Limit 적용 반영 (현재는 기본 LimitPushdown과 연계)
6
7use super::OptimizationRule;
8use crate::error::DbxResult;
9use crate::sql::planner::{AggregateMode, LogicalPlan};
10
11pub struct DistributedPushdownRule;
12
13impl OptimizationRule for DistributedPushdownRule {
14    fn name(&self) -> &str {
15        "DistributedPushdown"
16    }
17
18    fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
19        self.push_down(plan)
20    }
21}
22
23impl DistributedPushdownRule {
24    fn push_down(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
25        match plan {
26            // Traverse down
27            LogicalPlan::Project { input, projections } => Ok(LogicalPlan::Project {
28                input: Box::new(self.push_down(*input)?),
29                projections,
30            }),
31            LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
32                input: Box::new(self.push_down(*input)?),
33                predicate,
34            }),
35            LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
36                input: Box::new(self.push_down(*input)?),
37                order_by,
38            }),
39            LogicalPlan::Limit {
40                input,
41                count,
42                offset,
43            } => Ok(LogicalPlan::Limit {
44                input: Box::new(self.push_down(*input)?),
45                count,
46                offset,
47            }),
48            LogicalPlan::Aggregate {
49                input,
50                group_by,
51                aggregates,
52                mode,
53            } => {
54                let pushed_input = self.push_down(*input)?;
55
56                if mode == AggregateMode::Simple {
57                    // Create Partial Aggregate for Shard level
58                    let partial_agg = LogicalPlan::Aggregate {
59                        input: Box::new(pushed_input),
60                        group_by: group_by.clone(),
61                        aggregates: aggregates.clone(),
62                        mode: AggregateMode::Partial,
63                    };
64
65                    // Create Final Aggregate for Coordinator level
66                    // For now, Final uses the same expressions, but HashAggregateOperator
67                    // will handle the "Merge" logic based on its mode.
68                    Ok(LogicalPlan::Aggregate {
69                        input: Box::new(partial_agg),
70                        group_by,
71                        aggregates,
72                        mode: AggregateMode::Final,
73                    })
74                } else {
75                    Ok(LogicalPlan::Aggregate {
76                        input: Box::new(pushed_input),
77                        group_by,
78                        aggregates,
79                        mode,
80                    })
81                }
82            }
83            // Join도 양측 traverse
84            LogicalPlan::Join {
85                left,
86                right,
87                join_type,
88                on,
89            } => Ok(LogicalPlan::Join {
90                left: Box::new(self.push_down(*left)?),
91                right: Box::new(self.push_down(*right)?),
92                join_type,
93                on,
94            }),
95            other => Ok(other),
96        }
97    }
98}