kotoba_execution/planner/
physical.rs1use kotoba_core::ir::*;
4use kotoba_core::types::*;
5
6#[derive(Debug, Clone)]
8pub enum PhysicalOp {
9 NodeScan {
11 label: Label,
12 as_: String,
13 props: Option<Properties>,
14 },
15
16 IndexScan {
18 label: Label,
19 as_: String,
20 index: String,
21 value: Value,
22 },
23
24 Filter {
26 pred: Predicate,
27 input: Box<PhysicalOp>,
28 },
29
30 Expand {
32 edge: EdgePattern,
33 to_as: String,
34 input: Box<PhysicalOp>,
35 },
36
37 NestedLoopJoin {
39 left: Box<PhysicalOp>,
40 right: Box<PhysicalOp>,
41 on: Vec<String>,
42 },
43
44 HashJoin {
46 left: Box<PhysicalOp>,
47 right: Box<PhysicalOp>,
48 on: Vec<String>,
49 },
50
51 Project {
53 cols: Vec<String>,
54 input: Box<PhysicalOp>,
55 },
56
57 Group {
59 keys: Vec<String>,
60 aggregations: Vec<Aggregation>,
61 input: Box<PhysicalOp>,
62 },
63
64 Sort {
66 keys: Vec<SortKey>,
67 input: Box<PhysicalOp>,
68 },
69
70 Limit {
72 count: usize,
73 input: Box<PhysicalOp>,
74 },
75
76 Distinct {
78 input: Box<PhysicalOp>,
79 },
80}
81
82#[derive(Debug, Clone)]
84pub struct PhysicalPlan {
85 pub op: PhysicalOp,
86 pub estimated_cost: f64,
87}
88
89#[derive(Debug)]
91pub struct PhysicalPlanner;
92
93impl PhysicalPlanner {
94 pub fn new() -> Self {
95 Self
96 }
97
98 pub fn plan_to_physical(&self, logical: &PlanIR, catalog: &Catalog) -> Result<PhysicalPlan> {
100 let op = self.convert_logical_op(&logical.plan, catalog)?;
101 let cost = self.estimate_cost(&op, catalog);
102
103 Ok(PhysicalPlan {
104 op,
105 estimated_cost: cost,
106 })
107 }
108
109 fn convert_logical_op(&self, logical: &LogicalOp, catalog: &Catalog) -> Result<PhysicalOp> {
111 match logical {
112 LogicalOp::NodeScan { label, as_, props } => {
113 if self.has_index(catalog, label, props) {
115 Ok(PhysicalOp::IndexScan {
116 label: label.clone(),
117 as_: as_.clone(),
118 index: format!("{}_index", label),
119 value: Value::String("dummy".to_string()), })
121 } else {
122 Ok(PhysicalOp::NodeScan {
123 label: label.clone(),
124 as_: as_.clone(),
125 props: props.clone(),
126 })
127 }
128 }
129
130 LogicalOp::IndexScan { label, as_, index, value } => {
131 Ok(PhysicalOp::IndexScan {
132 label: label.clone(),
133 as_: as_.clone(),
134 index: index.clone(),
135 value: value.clone(),
136 })
137 }
138
139 LogicalOp::Filter { pred, input } => {
140 let input_op = self.convert_logical_op(input, catalog)?;
141 Ok(PhysicalOp::Filter {
142 pred: pred.clone(),
143 input: Box::new(input_op),
144 })
145 }
146
147 LogicalOp::Expand { edge, to_as, from } => {
148 let input_op = self.convert_logical_op(from, catalog)?;
149 Ok(PhysicalOp::Expand {
150 edge: edge.clone(),
151 to_as: to_as.clone(),
152 input: Box::new(input_op),
153 })
154 }
155
156 LogicalOp::Join { left, right, on } => {
157 let left_op = self.convert_logical_op(left, catalog)?;
158 let right_op = self.convert_logical_op(right, catalog)?;
159
160 let left_cost = self.estimate_cost(&left_op, catalog);
162 let right_cost = self.estimate_cost(&right_op, catalog);
163
164 if left_cost < right_cost && left_cost < 1000.0 {
165 Ok(PhysicalOp::NestedLoopJoin {
166 left: Box::new(left_op),
167 right: Box::new(right_op),
168 on: on.clone(),
169 })
170 } else {
171 Ok(PhysicalOp::HashJoin {
172 left: Box::new(left_op),
173 right: Box::new(right_op),
174 on: on.clone(),
175 })
176 }
177 }
178
179 LogicalOp::Project { cols, input } => {
180 let input_op = self.convert_logical_op(input, catalog)?;
181 Ok(PhysicalOp::Project {
182 cols: cols.clone(),
183 input: Box::new(input_op),
184 })
185 }
186
187 LogicalOp::Group { keys, aggregations, input } => {
188 let input_op = self.convert_logical_op(input, catalog)?;
189 Ok(PhysicalOp::Group {
190 keys: keys.clone(),
191 aggregations: aggregations.clone(),
192 input: Box::new(input_op),
193 })
194 }
195
196 LogicalOp::Sort { keys, input } => {
197 let input_op = self.convert_logical_op(input, catalog)?;
198 Ok(PhysicalOp::Sort {
199 keys: keys.clone(),
200 input: Box::new(input_op),
201 })
202 }
203
204 LogicalOp::Limit { count, input } => {
205 let input_op = self.convert_logical_op(input, catalog)?;
206 Ok(PhysicalOp::Limit {
207 count: *count,
208 input: Box::new(input_op),
209 })
210 }
211
212 LogicalOp::Distinct { input } => {
213 let input_op = self.convert_logical_op(input, catalog)?;
214 Ok(PhysicalOp::Distinct {
215 input: Box::new(input_op),
216 })
217 }
218 }
219 }
220
221 fn has_index(&self, catalog: &Catalog, label: &Label, props: &Option<Properties>) -> bool {
223 if let Some(props) = props {
224 catalog.indexes.iter().any(|idx| {
226 idx.label == *label &&
227 props.contains_key(&idx.properties[0])
228 })
229 } else {
230 false
231 }
232 }
233
234 fn estimate_cost(&self, op: &PhysicalOp, catalog: &Catalog) -> f64 {
236 match op {
237 PhysicalOp::NodeScan { .. } => 100.0,
238 PhysicalOp::IndexScan { .. } => 10.0,
239 PhysicalOp::Filter { input, .. } => self.estimate_cost(input, catalog) + 10.0,
240 PhysicalOp::Expand { input, .. } => self.estimate_cost(input, catalog) + 50.0,
241 PhysicalOp::NestedLoopJoin { left, right, .. } => {
242 let left_cost = self.estimate_cost(left, catalog);
243 let right_cost = self.estimate_cost(right, catalog);
244 left_cost * right_cost * 2.0
245 }
246 PhysicalOp::HashJoin { left, right, .. } => {
247 let left_cost = self.estimate_cost(left, catalog);
248 let right_cost = self.estimate_cost(right, catalog);
249 left_cost + right_cost + 100.0
250 }
251 PhysicalOp::Project { input, .. } => self.estimate_cost(input, catalog) + 5.0,
252 PhysicalOp::Group { input, .. } => self.estimate_cost(input, catalog) + 150.0,
253 PhysicalOp::Sort { input, .. } => self.estimate_cost(input, catalog) + 100.0,
254 PhysicalOp::Limit { input, .. } => self.estimate_cost(input, catalog) + 1.0,
255 PhysicalOp::Distinct { input, .. } => self.estimate_cost(input, catalog) + 50.0,
256 }
257 }
258}