Skip to main content

dbx_core/sql/optimizer/
tier_pruning.rs

1use crate::error::DbxResult;
2use crate::sql::optimizer::OptimizationRule;
3use crate::sql::planner::LogicalPlan;
4use crate::storage::metadata::MetadataRegistry;
5use std::sync::Arc;
6
7/// 5-Tier Storage 푸루닝 최적화 규칙 (Phase 6)
8/// MetadataRegistry를 조회하여 쿼리 조건에 부합하는 파티션(Parquet 파일)만
9/// 논리 플랜(TableScan)에 주입합니다.
10pub struct TierPruningRule {
11    registry: Arc<MetadataRegistry>,
12}
13
14impl TierPruningRule {
15    pub fn new(registry: Arc<MetadataRegistry>) -> Self {
16        Self { registry }
17    }
18}
19
20impl OptimizationRule for TierPruningRule {
21    fn name(&self) -> &str {
22        "TierPruning"
23    }
24
25    fn apply(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
26        // 재귀 탐색을 통해 모든 LogicalPlan::Scan 노드 탐색 및 수정
27        self.prune(plan)
28    }
29}
30
31impl TierPruningRule {
32    fn prune(&self, plan: LogicalPlan) -> DbxResult<LogicalPlan> {
33        match plan {
34            LogicalPlan::Scan {
35                table,
36                columns,
37                filter,
38                ros_files: _, // 기존값 무시
39            } => {
40                let mut valid_ros_files = vec![];
41
42                // 1. 레지스트리 조회
43                let partitions = self
44                    .registry
45                    .tables
46                    .get(&table)
47                    .map(|t| {
48                        t.partitions
49                            .iter()
50                            .map(|kv| kv.value().clone())
51                            .collect::<Vec<_>>()
52                    })
53                    .unwrap_or_default();
54
55                // 2. 푸루닝 (단위 MVP: 우선 모든 ROS 파티션 등록, 추후 filter 분석 고도화)
56                // TODO: Predicate Pushdown (filter) 기반으로 PartitionMeta.min_key ~ max_key 범위 필터링 기능 추가
57                for partition in partitions {
58                    if partition.tier == crate::storage::metadata::StorageTier::DiskROS {
59                        valid_ros_files.push(partition.file_path.clone());
60                    }
61                }
62
63                Ok(LogicalPlan::Scan {
64                    table,
65                    columns,
66                    filter,
67                    ros_files: valid_ros_files,
68                })
69            }
70            LogicalPlan::Project { input, projections } => Ok(LogicalPlan::Project {
71                input: Box::new(self.prune(*input)?),
72                projections,
73            }),
74            LogicalPlan::Filter { input, predicate } => Ok(LogicalPlan::Filter {
75                input: Box::new(self.prune(*input)?),
76                predicate,
77            }),
78            LogicalPlan::Sort { input, order_by } => Ok(LogicalPlan::Sort {
79                input: Box::new(self.prune(*input)?),
80                order_by,
81            }),
82            LogicalPlan::Limit {
83                input,
84                count,
85                offset,
86            } => Ok(LogicalPlan::Limit {
87                input: Box::new(self.prune(*input)?),
88                count,
89                offset,
90            }),
91            LogicalPlan::Aggregate {
92                input,
93                group_by,
94                aggregates,
95                mode,
96            } => Ok(LogicalPlan::Aggregate {
97                input: Box::new(self.prune(*input)?),
98                group_by,
99                aggregates,
100                mode,
101            }),
102            LogicalPlan::Join {
103                left,
104                right,
105                join_type,
106                on,
107            } => Ok(LogicalPlan::Join {
108                left: Box::new(self.prune(*left)?),
109                right: Box::new(self.prune(*right)?),
110                join_type,
111                on,
112            }),
113            // 데이터 수정/DDL 등 스캔 노드가 없는 나머지 플랜은 그대로 반환
114            other => Ok(other),
115        }
116    }
117}