kotoba_execution/planner/
physical.rs

1//! 物理プランナー(論理プラン → 物理プラン)
2
3use kotoba_core::ir::*;
4use kotoba_core::types::*;
5
6/// 物理演算子
7#[derive(Debug, Clone)]
8pub enum PhysicalOp {
9    /// ノードスキャン
10    NodeScan {
11        label: Label,
12        as_: String,
13        props: Option<Properties>,
14    },
15
16    /// インデックススキャン
17    IndexScan {
18        label: Label,
19        as_: String,
20        index: String,
21        value: Value,
22    },
23
24    /// フィルタ
25    Filter {
26        pred: Predicate,
27        input: Box<PhysicalOp>,
28    },
29
30    /// エッジ展開
31    Expand {
32        edge: EdgePattern,
33        to_as: String,
34        input: Box<PhysicalOp>,
35    },
36
37    /// ネステッドループ結合
38    NestedLoopJoin {
39        left: Box<PhysicalOp>,
40        right: Box<PhysicalOp>,
41        on: Vec<String>,
42    },
43
44    /// ハッシュ結合
45    HashJoin {
46        left: Box<PhysicalOp>,
47        right: Box<PhysicalOp>,
48        on: Vec<String>,
49    },
50
51    /// 射影
52    Project {
53        cols: Vec<String>,
54        input: Box<PhysicalOp>,
55    },
56
57    /// グループ化
58    Group {
59        keys: Vec<String>,
60        aggregations: Vec<Aggregation>,
61        input: Box<PhysicalOp>,
62    },
63
64    /// ソート
65    Sort {
66        keys: Vec<SortKey>,
67        input: Box<PhysicalOp>,
68    },
69
70    /// リミット
71    Limit {
72        count: usize,
73        input: Box<PhysicalOp>,
74    },
75
76    /// 重複除去
77    Distinct {
78        input: Box<PhysicalOp>,
79    },
80}
81
82/// 物理プラン
83#[derive(Debug, Clone)]
84pub struct PhysicalPlan {
85    pub op: PhysicalOp,
86    pub estimated_cost: f64,
87}
88
89/// 物理プランナー
90#[derive(Debug)]
91pub struct PhysicalPlanner;
92
93impl PhysicalPlanner {
94    pub fn new() -> Self {
95        Self
96    }
97
98    /// 論理プランを物理プランに変換
99    pub fn plan_to_physical(&self, logical: &PlanIR, catalog: &Catalog) -> Result<PhysicalPlan> {
100        let op = self.convert_logical_op(&logical.plan, catalog)?;
101        let cost = self.estimate_cost(&op, catalog);
102
103        Ok(PhysicalPlan {
104            op,
105            estimated_cost: cost,
106        })
107    }
108
109    /// 論理演算子を物理演算子に変換
110    fn convert_logical_op(&self, logical: &LogicalOp, catalog: &Catalog) -> Result<PhysicalOp> {
111        match logical {
112            LogicalOp::NodeScan { label, as_, props } => {
113                // インデックスが存在する場合はIndexScanを使用
114                if self.has_index(catalog, label, props) {
115                    Ok(PhysicalOp::IndexScan {
116                        label: label.clone(),
117                        as_: as_.clone(),
118                        index: format!("{}_index", label),
119                        value: Value::String("dummy".to_string()), // 仮
120                    })
121                } else {
122                    Ok(PhysicalOp::NodeScan {
123                        label: label.clone(),
124                        as_: as_.clone(),
125                        props: props.clone(),
126                    })
127                }
128            }
129
130            LogicalOp::IndexScan { label, as_, index, value } => {
131                Ok(PhysicalOp::IndexScan {
132                    label: label.clone(),
133                    as_: as_.clone(),
134                    index: index.clone(),
135                    value: value.clone(),
136                })
137            }
138
139            LogicalOp::Filter { pred, input } => {
140                let input_op = self.convert_logical_op(input, catalog)?;
141                Ok(PhysicalOp::Filter {
142                    pred: pred.clone(),
143                    input: Box::new(input_op),
144                })
145            }
146
147            LogicalOp::Expand { edge, to_as, from } => {
148                let input_op = self.convert_logical_op(from, catalog)?;
149                Ok(PhysicalOp::Expand {
150                    edge: edge.clone(),
151                    to_as: to_as.clone(),
152                    input: Box::new(input_op),
153                })
154            }
155
156            LogicalOp::Join { left, right, on } => {
157                let left_op = self.convert_logical_op(left, catalog)?;
158                let right_op = self.convert_logical_op(right, catalog)?;
159
160                // コストに基づいて結合アルゴリズムを選択
161                let left_cost = self.estimate_cost(&left_op, catalog);
162                let right_cost = self.estimate_cost(&right_op, catalog);
163
164                if left_cost < right_cost && left_cost < 1000.0 {
165                    Ok(PhysicalOp::NestedLoopJoin {
166                        left: Box::new(left_op),
167                        right: Box::new(right_op),
168                        on: on.clone(),
169                    })
170                } else {
171                    Ok(PhysicalOp::HashJoin {
172                        left: Box::new(left_op),
173                        right: Box::new(right_op),
174                        on: on.clone(),
175                    })
176                }
177            }
178
179            LogicalOp::Project { cols, input } => {
180                let input_op = self.convert_logical_op(input, catalog)?;
181                Ok(PhysicalOp::Project {
182                    cols: cols.clone(),
183                    input: Box::new(input_op),
184                })
185            }
186
187            LogicalOp::Group { keys, aggregations, input } => {
188                let input_op = self.convert_logical_op(input, catalog)?;
189                Ok(PhysicalOp::Group {
190                    keys: keys.clone(),
191                    aggregations: aggregations.clone(),
192                    input: Box::new(input_op),
193                })
194            }
195
196            LogicalOp::Sort { keys, input } => {
197                let input_op = self.convert_logical_op(input, catalog)?;
198                Ok(PhysicalOp::Sort {
199                    keys: keys.clone(),
200                    input: Box::new(input_op),
201                })
202            }
203
204            LogicalOp::Limit { count, input } => {
205                let input_op = self.convert_logical_op(input, catalog)?;
206                Ok(PhysicalOp::Limit {
207                    count: *count,
208                    input: Box::new(input_op),
209                })
210            }
211
212            LogicalOp::Distinct { input } => {
213                let input_op = self.convert_logical_op(input, catalog)?;
214                Ok(PhysicalOp::Distinct {
215                    input: Box::new(input_op),
216                })
217            }
218        }
219    }
220
221    /// インデックスが存在するかチェック
222    fn has_index(&self, catalog: &Catalog, label: &Label, props: &Option<Properties>) -> bool {
223        if let Some(props) = props {
224            // プロパティベースのインデックスをチェック
225            catalog.indexes.iter().any(|idx| {
226                idx.label == *label &&
227                props.contains_key(&idx.properties[0])
228            })
229        } else {
230            false
231        }
232    }
233
234    /// 物理演算子のコストを推定
235    fn estimate_cost(&self, op: &PhysicalOp, catalog: &Catalog) -> f64 {
236        match op {
237            PhysicalOp::NodeScan { .. } => 100.0,
238            PhysicalOp::IndexScan { .. } => 10.0,
239            PhysicalOp::Filter { input, .. } => self.estimate_cost(input, catalog) + 10.0,
240            PhysicalOp::Expand { input, .. } => self.estimate_cost(input, catalog) + 50.0,
241            PhysicalOp::NestedLoopJoin { left, right, .. } => {
242                let left_cost = self.estimate_cost(left, catalog);
243                let right_cost = self.estimate_cost(right, catalog);
244                left_cost * right_cost * 2.0
245            }
246            PhysicalOp::HashJoin { left, right, .. } => {
247                let left_cost = self.estimate_cost(left, catalog);
248                let right_cost = self.estimate_cost(right, catalog);
249                left_cost + right_cost + 100.0
250            }
251            PhysicalOp::Project { input, .. } => self.estimate_cost(input, catalog) + 5.0,
252            PhysicalOp::Group { input, .. } => self.estimate_cost(input, catalog) + 150.0,
253            PhysicalOp::Sort { input, .. } => self.estimate_cost(input, catalog) + 100.0,
254            PhysicalOp::Limit { input, .. } => self.estimate_cost(input, catalog) + 1.0,
255            PhysicalOp::Distinct { input, .. } => self.estimate_cost(input, catalog) + 50.0,
256        }
257    }
258}