1use super::types::*;
6use crate::error::{DbxError, DbxResult};
7use arrow::datatypes::Schema;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10
11pub struct PhysicalPlanner {
13 table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
14}
15
16impl PhysicalPlanner {
17 pub fn new(table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>) -> Self {
18 Self { table_schemas }
19 }
20
21 pub fn plan(&self, logical_plan: &LogicalPlan) -> DbxResult<PhysicalPlan> {
23 match logical_plan {
24 LogicalPlan::Scan {
25 table,
26 columns: _,
27 filter,
28 } => {
29 let schemas = self.table_schemas.read().unwrap();
30 let schema = schemas
31 .get(table)
32 .or_else(|| {
33 let table_lower = table.to_lowercase();
34 schemas
35 .iter()
36 .find(|(k, _)| k.to_lowercase() == table_lower)
37 .map(|(_, v)| v)
38 })
39 .cloned()
40 .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
41
42 let column_names: Vec<String> =
43 schema.fields().iter().map(|f| f.name().clone()).collect();
44 drop(schemas);
45
46 let physical_filter = filter
47 .as_ref()
48 .map(|f| self.plan_physical_expr(f, &column_names))
49 .transpose()?;
50
51 Ok(PhysicalPlan::TableScan {
52 table: table.clone(),
53 projection: vec![],
54 filter: physical_filter,
55 })
56 }
57 LogicalPlan::Project { input, projections } => {
58 let input_plan = self.plan(input)?;
59 let input_schema = self.extract_schema(&input_plan);
60 let mut physical_exprs = Vec::new();
61 let mut aliases = Vec::new();
62
63 for (expr, alias) in projections {
64 physical_exprs.push(self.plan_physical_expr(expr, &input_schema)?);
65 aliases.push(alias.clone());
66 }
67
68 Ok(PhysicalPlan::Projection {
69 input: Box::new(input_plan),
70 exprs: physical_exprs,
71 aliases,
72 })
73 }
74 LogicalPlan::Filter { input, predicate } => {
75 let mut input_plan = self.plan(input)?;
76 let input_schema = self.extract_schema(&input_plan);
77 let physical_pred = self.plan_physical_expr(predicate, &input_schema)?;
78
79 match &mut input_plan {
80 PhysicalPlan::TableScan { filter, .. } if filter.is_none() => {
81 *filter = Some(physical_pred);
82 Ok(input_plan)
83 }
84 _ => Ok(PhysicalPlan::Projection {
85 input: Box::new(input_plan),
86 exprs: vec![physical_pred],
87 aliases: vec![None], }),
89 }
90 }
91 LogicalPlan::Aggregate {
92 input,
93 group_by,
94 aggregates,
95 } => {
96 let input_plan = self.plan(input)?;
97 let input_schema = self.extract_schema(&input_plan);
98
99 let group_by_indices: Vec<usize> = group_by
100 .iter()
101 .map(|e| match e {
102 Expr::Column(name) => {
103 input_schema.iter().position(|s| s == name).unwrap_or(0)
104 }
105 _ => 0,
106 })
107 .collect();
108 let physical_aggs = aggregates
109 .iter()
110 .map(|agg| PhysicalAggExpr {
111 function: agg.function,
112 input: match &agg.expr {
113 Expr::Column(name) => {
114 input_schema.iter().position(|s| s == name).unwrap_or(0)
115 }
116 _ => 0,
117 },
118 alias: agg.alias.clone(),
119 })
120 .collect();
121 Ok(PhysicalPlan::HashAggregate {
122 input: Box::new(input_plan),
123 group_by: group_by_indices,
124 aggregates: physical_aggs,
125 })
126 }
127 LogicalPlan::Sort { input, order_by } => {
128 let input_plan = self.plan(input)?;
129 let input_schema = self.extract_schema(&input_plan);
130
131 let order_by_physical: Vec<(usize, bool)> = order_by
132 .iter()
133 .map(|s| {
134 let idx = match &s.expr {
135 Expr::Column(name) => {
136 input_schema.iter().position(|n| n == name).unwrap_or(0)
137 }
138 _ => 0,
139 };
140 (idx, s.asc)
141 })
142 .collect();
143 Ok(PhysicalPlan::SortMerge {
144 input: Box::new(input_plan),
145 order_by: order_by_physical,
146 })
147 }
148 LogicalPlan::Limit {
149 input,
150 count,
151 offset,
152 } => {
153 let input_plan = self.plan(input)?;
154 Ok(PhysicalPlan::Limit {
155 input: Box::new(input_plan),
156 count: *count,
157 offset: *offset,
158 })
159 }
160 LogicalPlan::Join {
161 left,
162 right,
163 join_type,
164 on,
165 } => {
166 let left_plan = self.plan(left)?;
167 let right_plan = self.plan(right)?;
168
169 let left_schema = self.extract_schema(&left_plan);
170 let right_schema = self.extract_schema(&right_plan);
171
172 let on_pairs = self.parse_join_condition(on, &left_schema, &right_schema)?;
173
174 Ok(PhysicalPlan::HashJoin {
175 left: Box::new(left_plan),
176 right: Box::new(right_plan),
177 on: on_pairs,
178 join_type: *join_type,
179 })
180 }
181 LogicalPlan::Insert {
182 table,
183 columns,
184 values,
185 } => {
186 let physical_values: Vec<Vec<PhysicalExpr>> = values
188 .iter()
189 .map(|row| {
190 row.iter()
191 .map(|expr| match expr {
192 Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
193 Expr::Column(_name) => {
194 Err(DbxError::SqlNotSupported {
196 feature: "Column references in INSERT VALUES".to_string(),
197 hint: "Use literal values only".to_string(),
198 })
199 }
200 _ => Err(DbxError::SqlNotSupported {
201 feature: format!("Expression in INSERT VALUES: {:?}", expr),
202 hint: "Use literal values only".to_string(),
203 }),
204 })
205 .collect::<DbxResult<Vec<_>>>()
206 })
207 .collect::<DbxResult<Vec<_>>>()?;
208
209 Ok(PhysicalPlan::Insert {
210 table: table.clone(),
211 columns: columns.clone(),
212 values: physical_values,
213 })
214 }
215 LogicalPlan::Update {
216 table,
217 assignments,
218 filter,
219 } => {
220 let physical_assignments: Vec<(String, PhysicalExpr)> = assignments
222 .iter()
223 .map(|(col, expr)| {
224 let physical_expr = match expr {
225 Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
226 _ => Err(DbxError::NotImplemented(
227 "Non-literal UPDATE values not yet supported".to_string(),
228 )),
229 }?;
230 Ok((col.clone(), physical_expr))
231 })
232 .collect::<DbxResult<Vec<_>>>()?;
233
234 let physical_filter = if let Some(f) = filter.as_ref() {
236 let schemas = self.table_schemas.read().unwrap();
237 let column_names: Vec<String> = schemas
238 .get(table)
239 .map(|schema| {
240 schema
241 .fields()
242 .iter()
243 .map(|field| field.name().clone())
244 .collect()
245 })
246 .unwrap_or_default();
247 drop(schemas);
248 Some(self.plan_physical_expr(f, &column_names)?)
249 } else {
250 None
251 };
252
253 Ok(PhysicalPlan::Update {
254 table: table.clone(),
255 assignments: physical_assignments,
256 filter: physical_filter,
257 })
258 }
259 LogicalPlan::Delete { table, filter } => {
260 let physical_filter = if let Some(f) = filter.as_ref() {
262 let schemas = self.table_schemas.read().unwrap();
263 let column_names: Vec<String> = schemas
264 .get(table)
265 .map(|schema| {
266 schema
267 .fields()
268 .iter()
269 .map(|field| field.name().clone())
270 .collect()
271 })
272 .unwrap_or_default();
273 drop(schemas);
274 Some(self.plan_physical_expr(f, &column_names)?)
275 } else {
276 None
277 };
278
279 Ok(PhysicalPlan::Delete {
280 table: table.clone(),
281 filter: physical_filter,
282 })
283 }
284 LogicalPlan::DropTable { table, if_exists } => Ok(PhysicalPlan::DropTable {
285 table: table.clone(),
286 if_exists: *if_exists,
287 }),
288 LogicalPlan::CreateTable {
289 table,
290 columns,
291 if_not_exists,
292 } => Ok(PhysicalPlan::CreateTable {
293 table: table.clone(),
294 columns: columns.clone(),
295 if_not_exists: *if_not_exists,
296 }),
297 LogicalPlan::CreateIndex {
298 table,
299 index_name,
300 columns,
301 if_not_exists,
302 } => Ok(PhysicalPlan::CreateIndex {
303 table: table.clone(),
304 index_name: index_name.clone(),
305 columns: columns.clone(),
306 if_not_exists: *if_not_exists,
307 }),
308 LogicalPlan::DropIndex {
309 table,
310 index_name,
311 if_exists,
312 } => Ok(PhysicalPlan::DropIndex {
313 table: table.clone(),
314 index_name: index_name.clone(),
315 if_exists: *if_exists,
316 }),
317 LogicalPlan::AlterTable { table, operation } => Ok(PhysicalPlan::AlterTable {
318 table: table.clone(),
319 operation: operation.clone(),
320 }),
321 LogicalPlan::CreateFunction {
322 name,
323 params,
324 return_type,
325 language,
326 body,
327 } => Ok(PhysicalPlan::CreateFunction {
328 name: name.clone(),
329 params: params.clone(),
330 return_type: return_type.clone(),
331 language: language.clone(),
332 body: body.clone(),
333 }),
334 LogicalPlan::CreateTrigger {
335 name,
336 timing,
337 event,
338 table,
339 for_each,
340 function,
341 } => Ok(PhysicalPlan::CreateTrigger {
342 name: name.clone(),
343 timing: *timing,
344 event: *event,
345 table: table.clone(),
346 for_each: *for_each,
347 function: function.clone(),
348 }),
349 LogicalPlan::CreateJob {
350 name,
351 schedule,
352 function,
353 } => Ok(PhysicalPlan::CreateJob {
354 name: name.clone(),
355 schedule: schedule.clone(),
356 function: function.clone(),
357 }),
358 LogicalPlan::DropFunction { name, if_exists } => Ok(PhysicalPlan::DropFunction {
359 name: name.clone(),
360 if_exists: *if_exists,
361 }),
362 LogicalPlan::DropTrigger { name, if_exists } => Ok(PhysicalPlan::DropTrigger {
363 name: name.clone(),
364 if_exists: *if_exists,
365 }),
366 LogicalPlan::DropJob { name, if_exists } => Ok(PhysicalPlan::DropJob {
367 name: name.clone(),
368 if_exists: *if_exists,
369 }),
370 }
371 }
372
373 fn plan_physical_expr(&self, expr: &Expr, schema: &[String]) -> DbxResult<PhysicalExpr> {
374 match expr {
375 Expr::Column(name) => {
376 if let Some(idx) = schema
377 .iter()
378 .position(|s| s.to_lowercase() == name.to_lowercase())
379 {
380 Ok(PhysicalExpr::Column(idx))
381 } else {
382 Err(DbxError::Schema(format!(
383 "Column '{}' not found in schema: {:?}",
384 name, schema
385 )))
386 }
387 }
388 Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
389 Expr::BinaryOp { left, op, right } => Ok(PhysicalExpr::BinaryOp {
390 left: Box::new(self.plan_physical_expr(left, schema)?),
391 op: *op,
392 right: Box::new(self.plan_physical_expr(right, schema)?),
393 }),
394 Expr::IsNull(expr) => Ok(PhysicalExpr::IsNull(Box::new(
395 self.plan_physical_expr(expr, schema)?,
396 ))),
397 Expr::IsNotNull(expr) => Ok(PhysicalExpr::IsNotNull(Box::new(
398 self.plan_physical_expr(expr, schema)?,
399 ))),
400 Expr::ScalarFunc { func, args } => {
401 let physical_args = args
402 .iter()
403 .map(|arg| self.plan_physical_expr(arg, schema))
404 .collect::<DbxResult<Vec<_>>>()?;
405 Ok(PhysicalExpr::ScalarFunc {
406 func: *func,
407 args: physical_args,
408 })
409 }
410 _ => Err(DbxError::NotImplemented(format!(
411 "Physical expression not supported: {:?}",
412 expr
413 ))),
414 }
415 }
416
417 fn extract_schema(&self, plan: &PhysicalPlan) -> Vec<String> {
419 match plan {
420 PhysicalPlan::TableScan { table, .. } => {
421 let schemas = self.table_schemas.read().unwrap();
423 let schema = schemas.get(table).or_else(|| {
425 let table_lower = table.to_lowercase();
426 schemas
427 .iter()
428 .find(|(k, _)| k.to_lowercase() == table_lower)
429 .map(|(_, v)| v)
430 });
431
432 if let Some(schema) = schema {
433 schema.fields().iter().map(|f| f.name().clone()).collect()
434 } else {
435 vec![]
436 }
437 }
438 PhysicalPlan::Projection { exprs, aliases, .. } => exprs
439 .iter()
440 .enumerate()
441 .map(|(i, _)| {
442 if let Some(alias) = aliases.get(i) {
443 alias.clone().unwrap_or_else(|| format!("col_{}", i))
444 } else {
445 format!("col_{}", i)
446 }
447 })
448 .collect(),
449 PhysicalPlan::HashAggregate {
450 input,
451 group_by,
452 aggregates,
453 } => {
454 let input_schema = self.extract_schema(input);
455 let mut fields = Vec::new();
456 for &idx in group_by {
457 fields.push(
458 input_schema
459 .get(idx)
460 .cloned()
461 .unwrap_or_else(|| format!("col_{}", idx)),
462 );
463 }
464 for agg in aggregates {
465 fields.push(
466 agg.alias
467 .clone()
468 .unwrap_or_else(|| format!("agg_{:?}", agg.function)),
469 );
470 }
471 fields
472 }
473 PhysicalPlan::SortMerge { input, .. } => self.extract_schema(input),
474 PhysicalPlan::Limit { input, .. } => self.extract_schema(input),
475 PhysicalPlan::HashJoin { left, right, .. } => {
476 let mut fields = self.extract_schema(left);
477 fields.extend(self.extract_schema(right));
478 fields
479 }
480 PhysicalPlan::Insert { columns, .. } => columns.clone(),
481 PhysicalPlan::Update { .. } => vec![],
482 PhysicalPlan::Delete { .. } => vec![],
483 PhysicalPlan::DropTable { .. } => vec![],
484 PhysicalPlan::CreateTable { .. } => vec![],
485 PhysicalPlan::CreateIndex { .. } => vec![],
486 PhysicalPlan::DropIndex { .. } => vec![],
487 PhysicalPlan::AlterTable { .. } => vec![],
488 PhysicalPlan::CreateFunction { .. } => vec![],
489 PhysicalPlan::CreateTrigger { .. } => vec![],
490 PhysicalPlan::CreateJob { .. } => vec![],
491 PhysicalPlan::DropFunction { .. } => vec![],
492 PhysicalPlan::DropTrigger { .. } => vec![],
493 PhysicalPlan::DropJob { .. } => vec![],
494 }
495 }
496
497 fn parse_join_condition(
500 &self,
501 on: &Expr,
502 left_schema: &[String],
503 right_schema: &[String],
504 ) -> DbxResult<Vec<(usize, usize)>> {
505 let mut pairs = Vec::new();
506 self.extract_join_pairs(on, left_schema, right_schema, &mut pairs)?;
507
508 if pairs.is_empty() {
509 pairs.push((0, 1));
511 }
512
513 Ok(pairs)
514 }
515
516 fn extract_join_pairs(
518 &self,
519 expr: &Expr,
520 left_schema: &[String],
521 right_schema: &[String],
522 pairs: &mut Vec<(usize, usize)>,
523 ) -> DbxResult<()> {
524 match expr {
525 Expr::BinaryOp { left, op, right } => {
526 match op {
527 BinaryOperator::Eq => {
528 let left_col = self.extract_column_name(left)?;
530 let right_col = self.extract_column_name(right)?;
531
532 let left_idx =
535 self.resolve_column_index(&left_col, left_schema, right_schema, true)?;
536 let right_idx = self.resolve_column_index(
537 &right_col,
538 left_schema,
539 right_schema,
540 false,
541 )?;
542
543 pairs.push((left_idx, right_idx));
544 }
545 BinaryOperator::And => {
546 self.extract_join_pairs(left, left_schema, right_schema, pairs)?;
548 self.extract_join_pairs(right, left_schema, right_schema, pairs)?;
549 }
550 _ => {
551 return Err(DbxError::NotImplemented(format!(
552 "JOIN condition operator not supported: {:?}",
553 op
554 )));
555 }
556 }
557 }
558 _ => {
559 return Err(DbxError::NotImplemented(format!(
560 "JOIN condition expression not supported: {:?}",
561 expr
562 )));
563 }
564 }
565 Ok(())
566 }
567
568 fn extract_column_name(&self, expr: &Expr) -> DbxResult<String> {
570 match expr {
571 Expr::Column(name) => {
572 if let Some(dot_pos) = name.rfind('.') {
574 Ok(name[dot_pos + 1..].to_string())
575 } else {
576 Ok(name.clone())
577 }
578 }
579 _ => Err(DbxError::NotImplemented(format!(
580 "Expected column reference, got: {:?}",
581 expr
582 ))),
583 }
584 }
585
586 fn resolve_column_index(
588 &self,
589 col_name: &str,
590 left_schema: &[String],
591 right_schema: &[String],
592 prefer_left: bool,
593 ) -> DbxResult<usize> {
594 if prefer_left {
596 if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
597 return Ok(idx);
598 }
599 if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
600 return Ok(idx);
601 }
602 } else {
603 if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
604 return Ok(idx);
605 }
606 if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
607 return Ok(idx);
608 }
609 }
610
611 match col_name {
614 "id" => Ok(0),
615 "user_id" => Ok(1),
616 "name" => Ok(1),
617 _ => Ok(0),
618 }
619 }
620}
621
622impl Default for PhysicalPlanner {
623 fn default() -> Self {
624 Self::new(Arc::new(RwLock::new(HashMap::new())))
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631 use crate::sql::{LogicalPlanner, SqlParser};
632
633 #[test]
634 fn test_physical_plan_simple_select() {
635 let parser = SqlParser::new();
636 let statements = parser.parse("SELECT * FROM users").unwrap();
637
638 let planner = LogicalPlanner::new();
640 let logical_plan = planner.plan(&statements[0]).unwrap();
641 println!("🔍 execute_sql: Logical Plan created: {:?}", logical_plan);
642
643 let table_schemas = Arc::new(RwLock::new(HashMap::new()));
645 let schema = Arc::new(Schema::new(vec![
646 arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int32, false),
647 arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false),
648 ]));
649 table_schemas
650 .write()
651 .unwrap()
652 .insert("users".to_string(), schema);
653
654 let physical_planner = PhysicalPlanner::new(table_schemas);
655 let physical_plan = physical_planner.plan(&logical_plan).unwrap();
656
657 match physical_plan {
658 PhysicalPlan::Projection { input, .. } => match input.as_ref() {
659 PhysicalPlan::TableScan { table, .. } => {
660 assert_eq!(table, "users");
661 }
662 _ => panic!("Expected TableScan inside Projection"),
663 },
664 PhysicalPlan::TableScan { table, .. } => {
665 assert_eq!(table, "users");
666 }
667 _ => panic!("Expected Projection or TableScan"),
668 }
669 }
670
671 #[test]
672 fn test_physical_plan_analytical_detection() {
673 let plan1 = PhysicalPlan::TableScan {
675 table: "users".to_string(),
676 projection: vec![0, 1],
677 filter: None,
678 };
679 assert!(!plan1.is_analytical());
680 assert_eq!(plan1.tables(), vec!["users"]);
681
682 let plan2 = PhysicalPlan::TableScan {
684 table: "users".to_string(),
685 projection: vec![0, 1],
686 filter: Some(PhysicalExpr::Column(0)),
687 };
688 assert!(plan2.is_analytical());
689
690 let plan3 = PhysicalPlan::HashJoin {
692 left: Box::new(PhysicalPlan::TableScan {
693 table: "users".to_string(),
694 projection: vec![0],
695 filter: None,
696 }),
697 right: Box::new(PhysicalPlan::TableScan {
698 table: "orders".to_string(),
699 projection: vec![0],
700 filter: None,
701 }),
702 on: vec![(0, 0)],
703 join_type: JoinType::Inner,
704 };
705 assert!(plan3.is_analytical());
706 let tables = plan3.tables();
707 assert!(tables.contains(&"users".to_string()));
708 assert!(tables.contains(&"orders".to_string()));
709 }
710}