1use super::ast::*;
5use std::fmt;
6
7#[derive(Debug, Clone)]
9pub struct PhysicalPlan {
10 pub root: PhysicalOperator,
11}
12
13#[derive(Debug, Clone)]
15pub enum PhysicalOperator {
16 TableScan { table: String },
18 IndexScan {
20 table: String,
21 index: String,
22 key: Vec<u8>,
23 },
24 IndexRangeScan {
26 table: String,
27 index: String,
28 start: Option<Vec<u8>>,
29 end: Option<Vec<u8>>,
30 },
31 Filter {
33 input: Box<PhysicalOperator>,
34 condition: Expression,
35 },
36 Sort {
38 input: Box<PhysicalOperator>,
39 columns: Vec<OrderByColumn>,
40 },
41 Limit {
43 input: Box<PhysicalOperator>,
44 count: usize,
45 offset: usize,
46 },
47 Project {
49 input: Box<PhysicalOperator>,
50 columns: Vec<SelectColumn>,
51 },
52 HashJoin {
54 left: Box<PhysicalOperator>,
55 right: Box<PhysicalOperator>,
56 join_type: JoinType,
57 condition: Expression,
58 },
59 GroupBy {
61 input: Box<PhysicalOperator>,
62 group_columns: Vec<String>,
63 aggregates: Vec<SelectColumn>,
64 having: Option<Expression>,
65 },
66 Aggregate {
68 input: Box<PhysicalOperator>,
69 aggregates: Vec<SelectColumn>,
70 },
71}
72
73pub struct Planner {
75 available_indexes: Vec<IndexMetadata>,
77}
78
79#[derive(Debug, Clone)]
81pub struct IndexMetadata {
82 pub name: String,
83 pub table: String,
84 pub index_type: String, }
86
87impl Planner {
88 pub fn new() -> Self {
90 Self {
91 available_indexes: Vec::new(),
92 }
93 }
94
95 pub fn with_indexes(indexes: Vec<IndexMetadata>) -> Self {
97 Self {
98 available_indexes: indexes,
99 }
100 }
101
102 pub fn plan(&self, query: &Query) -> Result<PhysicalPlan, PlanError> {
104 let mut plan = self.plan_table_access(&query.from)?;
106
107 if let Some(ref where_clause) = query.where_clause {
109 plan = self.apply_filter(plan, &where_clause.condition)?;
110 }
111
112 let has_aggregates = query
114 .select
115 .columns
116 .iter()
117 .any(|col| matches!(col, SelectColumn::Aggregate { .. }));
118
119 if let Some(ref group_by) = query.group_by {
121 plan = PhysicalOperator::GroupBy {
128 input: Box::new(plan),
129 group_columns: group_by.columns.clone(),
130 aggregates: query.select.columns.clone(),
131 having: query.having.as_ref().map(|h| h.condition.clone()),
132 };
133 } else if has_aggregates {
134 plan = PhysicalOperator::Aggregate {
139 input: Box::new(plan),
140 aggregates: query.select.columns.clone(),
141 };
142 } else {
143 plan = PhysicalOperator::Project {
145 input: Box::new(plan),
146 columns: query.select.columns.clone(),
147 };
148 }
149
150 if let Some(ref order_by) = query.order_by {
152 plan = PhysicalOperator::Sort {
153 input: Box::new(plan),
154 columns: order_by.columns.clone(),
155 };
156 }
157
158 if let Some(ref limit) = query.limit {
160 plan = PhysicalOperator::Limit {
161 input: Box::new(plan),
162 count: limit.count,
163 offset: limit.offset.unwrap_or(0),
164 };
165 }
166
167 Ok(PhysicalPlan { root: plan })
168 }
169
170 fn plan_table_access(&self, from: &FromClause) -> Result<PhysicalOperator, PlanError> {
171 let mut plan = PhysicalOperator::TableScan {
172 table: from.table.clone(),
173 };
174
175 for join in &from.joins {
177 let right = PhysicalOperator::TableScan {
178 table: join.table.clone(),
179 };
180
181 plan = PhysicalOperator::HashJoin {
182 left: Box::new(plan),
183 right: Box::new(right),
184 join_type: join.join_type.clone(),
185 condition: join.condition.clone(),
186 };
187 }
188
189 Ok(plan)
190 }
191
192 fn apply_filter(
193 &self,
194 input: PhysicalOperator,
195 condition: &Expression,
196 ) -> Result<PhysicalOperator, PlanError> {
197 if let Some(index_scan) = self.try_index_scan(condition) {
199 return Ok(index_scan);
200 }
201
202 Ok(PhysicalOperator::Filter {
204 input: Box::new(input),
205 condition: condition.clone(),
206 })
207 }
208
209 fn try_index_scan(&self, condition: &Expression) -> Option<PhysicalOperator> {
210 match condition {
212 Expression::BinaryOp { left, op, right } => {
213 let (column, value) = match (left.as_ref(), right.as_ref()) {
215 (Expression::Column(col), Expression::Literal(lit)) => (col, lit),
216 _ => return None,
217 };
218
219 for index in &self.available_indexes {
221 if index.name.contains(column) {
223 match index.index_type.as_str() {
224 "Hash" if *op == BinaryOperator::Eq => {
225 return Some(PhysicalOperator::IndexScan {
227 table: index.table.clone(),
228 index: index.name.clone(),
229 key: literal_to_bytes(value),
230 });
231 }
232 "BTree" => {
233 match op {
235 BinaryOperator::Eq => {
236 return Some(PhysicalOperator::IndexScan {
237 table: index.table.clone(),
238 index: index.name.clone(),
239 key: literal_to_bytes(value),
240 });
241 }
242 BinaryOperator::Lt | BinaryOperator::Le => {
243 return Some(PhysicalOperator::IndexRangeScan {
244 table: index.table.clone(),
245 index: index.name.clone(),
246 start: None,
247 end: Some(literal_to_bytes(value)),
248 });
249 }
250 BinaryOperator::Gt | BinaryOperator::Ge => {
251 return Some(PhysicalOperator::IndexRangeScan {
252 table: index.table.clone(),
253 index: index.name.clone(),
254 start: Some(literal_to_bytes(value)),
255 end: None,
256 });
257 }
258 _ => {}
259 }
260 }
261 _ => {}
262 }
263 }
264 }
265 }
266 Expression::Between { expr, min, max } => {
267 let column = match expr.as_ref() {
269 Expression::Column(col) => col,
270 _ => return None,
271 };
272
273 for index in &self.available_indexes {
275 if index.index_type == "BTree" && index.name.contains(column) {
276 let start = match min.as_ref() {
277 Expression::Literal(lit) => Some(literal_to_bytes(lit)),
278 _ => None,
279 };
280 let end = match max.as_ref() {
281 Expression::Literal(lit) => Some(literal_to_bytes(lit)),
282 _ => None,
283 };
284
285 return Some(PhysicalOperator::IndexRangeScan {
286 table: index.table.clone(),
287 index: index.name.clone(),
288 start,
289 end,
290 });
291 }
292 }
293 }
294 _ => {}
295 }
296
297 None
298 }
299}
300
301impl Default for Planner {
302 fn default() -> Self {
303 Self::new()
304 }
305}
306
307fn literal_to_bytes(literal: &Literal) -> Vec<u8> {
309 match literal {
310 Literal::Integer(i) => i.to_le_bytes().to_vec(),
311 Literal::Float(f) => f.to_le_bytes().to_vec(),
312 Literal::String(s) => s.as_bytes().to_vec(),
313 Literal::Boolean(b) => vec![if *b { 1 } else { 0 }],
314 Literal::Null => vec![],
315 }
316}
317
318#[derive(Debug, Clone)]
320pub enum PlanError {
321 UnsupportedOperation(String),
322 InvalidExpression(String),
323}
324
325impl fmt::Display for PlanError {
326 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327 match self {
328 PlanError::UnsupportedOperation(op) => write!(f, "Unsupported operation: {}", op),
329 PlanError::InvalidExpression(expr) => write!(f, "Invalid expression: {}", expr),
330 }
331 }
332}
333
334impl std::error::Error for PlanError {}
335
336impl fmt::Display for PhysicalPlan {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 write!(f, "{}", self.root)
339 }
340}
341
342impl fmt::Display for PhysicalOperator {
343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344 match self {
345 PhysicalOperator::TableScan { table } => write!(f, "TableScan({})", table),
346 PhysicalOperator::IndexScan { table, index, .. } => {
347 write!(f, "IndexScan({}.{})", table, index)
348 }
349 PhysicalOperator::IndexRangeScan { table, index, .. } => {
350 write!(f, "IndexRangeScan({}.{})", table, index)
351 }
352 PhysicalOperator::Filter { input, condition } => {
353 write!(f, "Filter({}) -> {}", condition, input)
354 }
355 PhysicalOperator::Sort { input, columns } => {
356 write!(f, "Sort(")?;
357 for (i, col) in columns.iter().enumerate() {
358 if i > 0 {
359 write!(f, ", ")?;
360 }
361 write!(f, "{}", col)?;
362 }
363 write!(f, ") -> {}", input)
364 }
365 PhysicalOperator::Limit {
366 input,
367 count,
368 offset,
369 } => {
370 write!(f, "Limit({}, {}) -> {}", count, offset, input)
371 }
372 PhysicalOperator::Project { input, columns } => {
373 write!(f, "Project(")?;
374 for (i, col) in columns.iter().enumerate() {
375 if i > 0 {
376 write!(f, ", ")?;
377 }
378 write!(f, "{}", col)?;
379 }
380 write!(f, ") -> {}", input)
381 }
382 PhysicalOperator::HashJoin {
383 left,
384 right,
385 join_type,
386 ..
387 } => {
388 write!(f, "{}Join({} x {})", join_type, left, right)
389 }
390 PhysicalOperator::GroupBy {
391 input,
392 group_columns,
393 aggregates,
394 having,
395 } => {
396 write!(f, "GroupBy(")?;
397 for (i, col) in group_columns.iter().enumerate() {
398 if i > 0 {
399 write!(f, ", ")?;
400 }
401 write!(f, "{}", col)?;
402 }
403 if !aggregates.is_empty() {
404 write!(f, " | ")?;
405 for (i, agg) in aggregates.iter().enumerate() {
406 if i > 0 {
407 write!(f, ", ")?;
408 }
409 write!(f, "{}", agg)?;
410 }
411 }
412 if let Some(h) = having {
413 write!(f, " HAVING {}", h)?;
414 }
415 write!(f, ") -> {}", input)
416 }
417 PhysicalOperator::Aggregate { input, aggregates } => {
418 write!(f, "Aggregate(")?;
419 for (i, agg) in aggregates.iter().enumerate() {
420 if i > 0 {
421 write!(f, ", ")?;
422 }
423 write!(f, "{}", agg)?;
424 }
425 write!(f, ") -> {}", input)
426 }
427 }
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use super::*;
434 use crate::query::parser::Parser;
435
436 #[test]
437 fn test_simple_plan() {
438 let mut parser = Parser::new("SELECT * FROM users").unwrap();
439 let query = parser.parse().unwrap();
440
441 let planner = Planner::new();
442 let plan = planner.plan(&query).unwrap();
443
444 match plan.root {
446 PhysicalOperator::Project { input, .. } => match *input {
447 PhysicalOperator::TableScan { .. } => {}
448 _ => panic!("Expected TableScan"),
449 },
450 _ => panic!("Expected Project"),
451 }
452 }
453
454 #[test]
455 fn test_filter_plan() {
456 let mut parser = Parser::new("SELECT * FROM users WHERE age > 18").unwrap();
457 let query = parser.parse().unwrap();
458
459 let planner = Planner::new();
460 let plan = planner.plan(&query).unwrap();
461
462 match plan.root {
464 PhysicalOperator::Project { input, .. } => match *input {
465 PhysicalOperator::Filter { .. } => {}
466 _ => panic!("Expected Filter"),
467 },
468 _ => panic!("Expected Project"),
469 }
470 }
471
472 #[test]
473 fn test_order_by_plan() {
474 let mut parser = Parser::new("SELECT * FROM users ORDER BY name").unwrap();
475 let query = parser.parse().unwrap();
476
477 let planner = Planner::new();
478 let plan = planner.plan(&query).unwrap();
479
480 let plan_str = format!("{}", plan);
482 assert!(plan_str.contains("Sort"));
483 }
484
485 #[test]
486 fn test_limit_plan() {
487 let mut parser = Parser::new("SELECT * FROM users LIMIT 10").unwrap();
488 let query = parser.parse().unwrap();
489
490 let planner = Planner::new();
491 let plan = planner.plan(&query).unwrap();
492
493 let plan_str = format!("{}", plan);
495 assert!(plan_str.contains("Limit"));
496 }
497}