1use super::ast::*;
5use std::fmt;
6
7#[derive(Debug, Clone)]
9pub struct PhysicalPlan {
10 pub root: PhysicalOperator,
11}
12
13#[derive(Debug, Clone)]
15pub enum PhysicalOperator {
16 TableScan { table: String },
18 IndexScan {
20 table: String,
21 index: String,
22 key: Vec<u8>,
23 },
24 IndexRangeScan {
26 table: String,
27 index: String,
28 start: Option<Vec<u8>>,
29 end: Option<Vec<u8>>,
30 },
31 Filter {
33 input: Box<PhysicalOperator>,
34 condition: Expression,
35 },
36 Sort {
38 input: Box<PhysicalOperator>,
39 columns: Vec<OrderByColumn>,
40 },
41 Limit {
43 input: Box<PhysicalOperator>,
44 count: usize,
45 offset: usize,
46 },
47 Project {
49 input: Box<PhysicalOperator>,
50 columns: Vec<SelectColumn>,
51 },
52 HashJoin {
54 left: Box<PhysicalOperator>,
55 right: Box<PhysicalOperator>,
56 join_type: JoinType,
57 condition: Expression,
58 },
59 Aggregate {
61 input: Box<PhysicalOperator>,
62 aggregates: Vec<SelectColumn>,
63 },
64}
65
66pub struct Planner {
68 available_indexes: Vec<IndexMetadata>,
70}
71
72#[derive(Debug, Clone)]
74pub struct IndexMetadata {
75 pub name: String,
76 pub table: String,
77 pub index_type: String, }
79
80impl Planner {
81 pub fn new() -> Self {
83 Self {
84 available_indexes: Vec::new(),
85 }
86 }
87
88 pub fn with_indexes(indexes: Vec<IndexMetadata>) -> Self {
90 Self {
91 available_indexes: indexes,
92 }
93 }
94
95 pub fn plan(&self, query: &Query) -> Result<PhysicalPlan, PlanError> {
97 let mut plan = self.plan_table_access(&query.from)?;
99
100 if let Some(ref where_clause) = query.where_clause {
102 plan = self.apply_filter(plan, &where_clause.condition)?;
103 }
104
105 plan = PhysicalOperator::Project {
107 input: Box::new(plan),
108 columns: query.select.columns.clone(),
109 };
110
111 let has_aggregates = query
113 .select
114 .columns
115 .iter()
116 .any(|col| matches!(col, SelectColumn::Aggregate { .. }));
117 if has_aggregates {
118 plan = PhysicalOperator::Aggregate {
119 input: Box::new(plan),
120 aggregates: query.select.columns.clone(),
121 };
122 }
123
124 if let Some(ref order_by) = query.order_by {
126 plan = PhysicalOperator::Sort {
127 input: Box::new(plan),
128 columns: order_by.columns.clone(),
129 };
130 }
131
132 if let Some(ref limit) = query.limit {
134 plan = PhysicalOperator::Limit {
135 input: Box::new(plan),
136 count: limit.count,
137 offset: limit.offset.unwrap_or(0),
138 };
139 }
140
141 Ok(PhysicalPlan { root: plan })
142 }
143
144 fn plan_table_access(&self, from: &FromClause) -> Result<PhysicalOperator, PlanError> {
145 let mut plan = PhysicalOperator::TableScan {
146 table: from.table.clone(),
147 };
148
149 for join in &from.joins {
151 let right = PhysicalOperator::TableScan {
152 table: join.table.clone(),
153 };
154
155 plan = PhysicalOperator::HashJoin {
156 left: Box::new(plan),
157 right: Box::new(right),
158 join_type: join.join_type.clone(),
159 condition: join.condition.clone(),
160 };
161 }
162
163 Ok(plan)
164 }
165
166 fn apply_filter(
167 &self,
168 input: PhysicalOperator,
169 condition: &Expression,
170 ) -> Result<PhysicalOperator, PlanError> {
171 if let Some(index_scan) = self.try_index_scan(condition) {
173 return Ok(index_scan);
174 }
175
176 Ok(PhysicalOperator::Filter {
178 input: Box::new(input),
179 condition: condition.clone(),
180 })
181 }
182
183 fn try_index_scan(&self, condition: &Expression) -> Option<PhysicalOperator> {
184 match condition {
186 Expression::BinaryOp { left, op, right } => {
187 let (column, value) = match (left.as_ref(), right.as_ref()) {
189 (Expression::Column(col), Expression::Literal(lit)) => (col, lit),
190 _ => return None,
191 };
192
193 for index in &self.available_indexes {
195 if index.name.contains(column) {
197 match index.index_type.as_str() {
198 "Hash" if *op == BinaryOperator::Eq => {
199 return Some(PhysicalOperator::IndexScan {
201 table: index.table.clone(),
202 index: index.name.clone(),
203 key: literal_to_bytes(value),
204 });
205 }
206 "BTree" => {
207 match op {
209 BinaryOperator::Eq => {
210 return Some(PhysicalOperator::IndexScan {
211 table: index.table.clone(),
212 index: index.name.clone(),
213 key: literal_to_bytes(value),
214 });
215 }
216 BinaryOperator::Lt | BinaryOperator::Le => {
217 return Some(PhysicalOperator::IndexRangeScan {
218 table: index.table.clone(),
219 index: index.name.clone(),
220 start: None,
221 end: Some(literal_to_bytes(value)),
222 });
223 }
224 BinaryOperator::Gt | BinaryOperator::Ge => {
225 return Some(PhysicalOperator::IndexRangeScan {
226 table: index.table.clone(),
227 index: index.name.clone(),
228 start: Some(literal_to_bytes(value)),
229 end: None,
230 });
231 }
232 _ => {}
233 }
234 }
235 _ => {}
236 }
237 }
238 }
239 }
240 Expression::Between { expr, min, max } => {
241 let column = match expr.as_ref() {
243 Expression::Column(col) => col,
244 _ => return None,
245 };
246
247 for index in &self.available_indexes {
249 if index.index_type == "BTree" && index.name.contains(column) {
250 let start = match min.as_ref() {
251 Expression::Literal(lit) => Some(literal_to_bytes(lit)),
252 _ => None,
253 };
254 let end = match max.as_ref() {
255 Expression::Literal(lit) => Some(literal_to_bytes(lit)),
256 _ => None,
257 };
258
259 return Some(PhysicalOperator::IndexRangeScan {
260 table: index.table.clone(),
261 index: index.name.clone(),
262 start,
263 end,
264 });
265 }
266 }
267 }
268 _ => {}
269 }
270
271 None
272 }
273}
274
275impl Default for Planner {
276 fn default() -> Self {
277 Self::new()
278 }
279}
280
281fn literal_to_bytes(literal: &Literal) -> Vec<u8> {
283 match literal {
284 Literal::Integer(i) => i.to_le_bytes().to_vec(),
285 Literal::Float(f) => f.to_le_bytes().to_vec(),
286 Literal::String(s) => s.as_bytes().to_vec(),
287 Literal::Boolean(b) => vec![if *b { 1 } else { 0 }],
288 Literal::Null => vec![],
289 }
290}
291
292#[derive(Debug, Clone)]
294pub enum PlanError {
295 UnsupportedOperation(String),
296 InvalidExpression(String),
297}
298
299impl fmt::Display for PlanError {
300 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301 match self {
302 PlanError::UnsupportedOperation(op) => write!(f, "Unsupported operation: {}", op),
303 PlanError::InvalidExpression(expr) => write!(f, "Invalid expression: {}", expr),
304 }
305 }
306}
307
308impl std::error::Error for PlanError {}
309
310impl fmt::Display for PhysicalPlan {
311 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
312 write!(f, "{}", self.root)
313 }
314}
315
316impl fmt::Display for PhysicalOperator {
317 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318 match self {
319 PhysicalOperator::TableScan { table } => write!(f, "TableScan({})", table),
320 PhysicalOperator::IndexScan { table, index, .. } => {
321 write!(f, "IndexScan({}.{})", table, index)
322 }
323 PhysicalOperator::IndexRangeScan { table, index, .. } => {
324 write!(f, "IndexRangeScan({}.{})", table, index)
325 }
326 PhysicalOperator::Filter { input, condition } => {
327 write!(f, "Filter({}) -> {}", condition, input)
328 }
329 PhysicalOperator::Sort { input, columns } => {
330 write!(f, "Sort(")?;
331 for (i, col) in columns.iter().enumerate() {
332 if i > 0 {
333 write!(f, ", ")?;
334 }
335 write!(f, "{}", col)?;
336 }
337 write!(f, ") -> {}", input)
338 }
339 PhysicalOperator::Limit {
340 input,
341 count,
342 offset,
343 } => {
344 write!(f, "Limit({}, {}) -> {}", count, offset, input)
345 }
346 PhysicalOperator::Project { input, columns } => {
347 write!(f, "Project(")?;
348 for (i, col) in columns.iter().enumerate() {
349 if i > 0 {
350 write!(f, ", ")?;
351 }
352 write!(f, "{}", col)?;
353 }
354 write!(f, ") -> {}", input)
355 }
356 PhysicalOperator::HashJoin {
357 left,
358 right,
359 join_type,
360 ..
361 } => {
362 write!(f, "{}Join({} x {})", join_type, left, right)
363 }
364 PhysicalOperator::Aggregate { input, aggregates } => {
365 write!(f, "Aggregate(")?;
366 for (i, agg) in aggregates.iter().enumerate() {
367 if i > 0 {
368 write!(f, ", ")?;
369 }
370 write!(f, "{}", agg)?;
371 }
372 write!(f, ") -> {}", input)
373 }
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381 use crate::query::parser::Parser;
382
383 #[test]
384 fn test_simple_plan() {
385 let mut parser = Parser::new("SELECT * FROM users").unwrap();
386 let query = parser.parse().unwrap();
387
388 let planner = Planner::new();
389 let plan = planner.plan(&query).unwrap();
390
391 match plan.root {
393 PhysicalOperator::Project { input, .. } => match *input {
394 PhysicalOperator::TableScan { .. } => {}
395 _ => panic!("Expected TableScan"),
396 },
397 _ => panic!("Expected Project"),
398 }
399 }
400
401 #[test]
402 fn test_filter_plan() {
403 let mut parser = Parser::new("SELECT * FROM users WHERE age > 18").unwrap();
404 let query = parser.parse().unwrap();
405
406 let planner = Planner::new();
407 let plan = planner.plan(&query).unwrap();
408
409 match plan.root {
411 PhysicalOperator::Project { input, .. } => match *input {
412 PhysicalOperator::Filter { .. } => {}
413 _ => panic!("Expected Filter"),
414 },
415 _ => panic!("Expected Project"),
416 }
417 }
418
419 #[test]
420 fn test_order_by_plan() {
421 let mut parser = Parser::new("SELECT * FROM users ORDER BY name").unwrap();
422 let query = parser.parse().unwrap();
423
424 let planner = Planner::new();
425 let plan = planner.plan(&query).unwrap();
426
427 let plan_str = format!("{}", plan);
429 assert!(plan_str.contains("Sort"));
430 }
431
432 #[test]
433 fn test_limit_plan() {
434 let mut parser = Parser::new("SELECT * FROM users LIMIT 10").unwrap();
435 let query = parser.parse().unwrap();
436
437 let planner = Planner::new();
438 let plan = planner.plan(&query).unwrap();
439
440 let plan_str = format!("{}", plan);
442 assert!(plan_str.contains("Limit"));
443 }
444}