1use super::types::*;
6use crate::error::{DbxError, DbxResult};
7use arrow::datatypes::Schema;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10
11pub struct PhysicalPlanner {
13 table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
14}
15
16impl PhysicalPlanner {
17 pub fn new(table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>) -> Self {
18 Self { table_schemas }
19 }
20
21 pub fn plan(&self, logical_plan: &LogicalPlan) -> DbxResult<PhysicalPlan> {
23 match logical_plan {
24 LogicalPlan::Scan {
25 table,
26 columns: _,
27 filter,
28 ros_files,
29 } => {
30 let schemas = self.table_schemas.read().unwrap();
31 let schema = schemas
32 .get(table)
33 .or_else(|| {
34 let table_lower = table.to_lowercase();
35 schemas
36 .iter()
37 .find(|(k, _)| k.to_lowercase() == table_lower)
38 .map(|(_, v)| v)
39 })
40 .cloned()
41 .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
42
43 let column_names: Vec<String> =
44 schema.fields().iter().map(|f| f.name().clone()).collect();
45 drop(schemas);
46
47 let physical_filter = filter
48 .as_ref()
49 .map(|f| self.plan_physical_expr(f, &column_names))
50 .transpose()?;
51
52 Ok(PhysicalPlan::TableScan {
53 table: table.clone(),
54 projection: vec![],
55 filter: physical_filter,
56 ros_files: ros_files.clone(),
57 })
58 }
59 LogicalPlan::Project { input, projections } => {
60 let input_plan = self.plan(input)?;
61 let input_schema = self.extract_schema(&input_plan);
62 let mut physical_exprs = Vec::new();
63 let mut aliases = Vec::new();
64
65 for (expr, alias) in projections {
66 physical_exprs.push(self.plan_physical_expr(expr, &input_schema)?);
67 aliases.push(alias.clone());
68 }
69
70 Ok(PhysicalPlan::Projection {
71 input: Box::new(input_plan),
72 exprs: physical_exprs,
73 aliases,
74 })
75 }
76 LogicalPlan::Filter { input, predicate } => {
77 let mut input_plan = self.plan(input)?;
78 let input_schema = self.extract_schema(&input_plan);
79 let physical_pred = self.plan_physical_expr(predicate, &input_schema)?;
80
81 match &mut input_plan {
82 PhysicalPlan::TableScan { filter, .. } if filter.is_none() => {
83 *filter = Some(physical_pred);
84 Ok(input_plan)
85 }
86 _ => Ok(PhysicalPlan::Projection {
87 input: Box::new(input_plan),
88 exprs: vec![physical_pred],
89 aliases: vec![None], }),
91 }
92 }
93 LogicalPlan::Aggregate {
94 input,
95 group_by,
96 aggregates,
97 mode,
98 } => {
99 let input_plan = self.plan(input)?;
100 let input_schema = self.extract_schema(&input_plan);
101
102 let group_by_indices: Vec<usize> = group_by
103 .iter()
104 .map(|e| match e {
105 Expr::Column(name) => {
106 input_schema.iter().position(|s| s == name).unwrap_or(0)
107 }
108 _ => 0,
109 })
110 .collect();
111 let physical_aggs = aggregates
112 .iter()
113 .map(|agg| PhysicalAggExpr {
114 function: agg.function,
115 input: match &agg.expr {
116 Expr::Column(name) => {
117 input_schema.iter().position(|s| s == name).unwrap_or(0)
118 }
119 _ => 0,
120 },
121 alias: agg.alias.clone(),
122 })
123 .collect();
124 Ok(PhysicalPlan::HashAggregate {
125 input: Box::new(input_plan),
126 group_by: group_by_indices,
127 aggregates: physical_aggs,
128 mode: *mode,
129 })
130 }
131 LogicalPlan::Sort { input, order_by } => {
132 let input_plan = self.plan(input)?;
133 let input_schema = self.extract_schema(&input_plan);
134
135 let order_by_physical: Vec<(usize, bool)> = order_by
136 .iter()
137 .map(|s| {
138 let idx = match &s.expr {
139 Expr::Column(name) => {
140 input_schema.iter().position(|n| n == name).unwrap_or(0)
141 }
142 _ => 0,
143 };
144 (idx, s.asc)
145 })
146 .collect();
147 Ok(PhysicalPlan::SortMerge {
148 input: Box::new(input_plan),
149 order_by: order_by_physical,
150 })
151 }
152 LogicalPlan::Limit {
153 input,
154 count,
155 offset,
156 } => {
157 let input_plan = self.plan(input)?;
158 Ok(PhysicalPlan::Limit {
159 input: Box::new(input_plan),
160 count: *count,
161 offset: *offset,
162 })
163 }
164 LogicalPlan::Join {
165 left,
166 right,
167 join_type,
168 on,
169 } => {
170 let left_plan = self.plan(left)?;
171 let right_plan = self.plan(right)?;
172
173 let left_schema = self.extract_schema(&left_plan);
174 let right_schema = self.extract_schema(&right_plan);
175
176 let on_pairs = self.parse_join_condition(on, &left_schema, &right_schema)?;
177
178 Ok(PhysicalPlan::HashJoin {
179 left: Box::new(left_plan),
180 right: Box::new(right_plan),
181 on: on_pairs,
182 join_type: *join_type,
183 })
184 }
185 LogicalPlan::Insert {
186 table,
187 columns,
188 values,
189 } => {
190 let physical_values: Vec<Vec<PhysicalExpr>> = values
192 .iter()
193 .map(|row| {
194 row.iter()
195 .map(|expr| match expr {
196 Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
197 Expr::Column(_name) => {
198 Err(DbxError::SqlNotSupported {
200 feature: "Column references in INSERT VALUES".to_string(),
201 hint: "Use literal values only".to_string(),
202 })
203 }
204 _ => Err(DbxError::SqlNotSupported {
205 feature: format!("Expression in INSERT VALUES: {:?}", expr),
206 hint: "Use literal values only".to_string(),
207 }),
208 })
209 .collect::<DbxResult<Vec<_>>>()
210 })
211 .collect::<DbxResult<Vec<_>>>()?;
212
213 Ok(PhysicalPlan::Insert {
214 table: table.clone(),
215 columns: columns.clone(),
216 values: physical_values,
217 })
218 }
219 LogicalPlan::Update {
220 table,
221 assignments,
222 filter,
223 } => {
224 let physical_assignments: Vec<(String, PhysicalExpr)> = assignments
226 .iter()
227 .map(|(col, expr)| {
228 let physical_expr = match expr {
229 Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
230 _ => Err(DbxError::NotImplemented(
231 "Non-literal UPDATE values not yet supported".to_string(),
232 )),
233 }?;
234 Ok((col.clone(), physical_expr))
235 })
236 .collect::<DbxResult<Vec<_>>>()?;
237
238 let physical_filter = if let Some(f) = filter.as_ref() {
240 let schemas = self.table_schemas.read().unwrap();
241 let column_names: Vec<String> = schemas
242 .get(table)
243 .map(|schema| {
244 schema
245 .fields()
246 .iter()
247 .map(|field| field.name().clone())
248 .collect()
249 })
250 .unwrap_or_default();
251 drop(schemas);
252 Some(self.plan_physical_expr(f, &column_names)?)
253 } else {
254 None
255 };
256
257 Ok(PhysicalPlan::Update {
258 table: table.clone(),
259 assignments: physical_assignments,
260 filter: physical_filter,
261 })
262 }
263 LogicalPlan::Delete { table, filter } => {
264 let physical_filter = if let Some(f) = filter.as_ref() {
266 let schemas = self.table_schemas.read().unwrap();
267 let column_names: Vec<String> = schemas
268 .get(table)
269 .map(|schema| {
270 schema
271 .fields()
272 .iter()
273 .map(|field| field.name().clone())
274 .collect()
275 })
276 .unwrap_or_default();
277 drop(schemas);
278 Some(self.plan_physical_expr(f, &column_names)?)
279 } else {
280 None
281 };
282
283 Ok(PhysicalPlan::Delete {
284 table: table.clone(),
285 filter: physical_filter,
286 })
287 }
288 LogicalPlan::DropTable { table, if_exists } => Ok(PhysicalPlan::DropTable {
289 table: table.clone(),
290 if_exists: *if_exists,
291 }),
292 LogicalPlan::CreateTable {
293 table,
294 columns,
295 if_not_exists,
296 policy,
297 } => Ok(PhysicalPlan::CreateTable {
298 table: table.clone(),
299 columns: columns.clone(),
300 if_not_exists: *if_not_exists,
301 policy: policy.clone(),
302 }),
303 LogicalPlan::CreateIndex {
304 table,
305 index_name,
306 columns,
307 if_not_exists,
308 } => Ok(PhysicalPlan::CreateIndex {
309 table: table.clone(),
310 index_name: index_name.clone(),
311 columns: columns.clone(),
312 if_not_exists: *if_not_exists,
313 }),
314 LogicalPlan::DropIndex {
315 table,
316 index_name,
317 if_exists,
318 } => Ok(PhysicalPlan::DropIndex {
319 table: table.clone(),
320 index_name: index_name.clone(),
321 if_exists: *if_exists,
322 }),
323 LogicalPlan::AlterTable { table, operation } => Ok(PhysicalPlan::AlterTable {
324 table: table.clone(),
325 operation: operation.clone(),
326 }),
327 LogicalPlan::CreateFunction {
328 name,
329 params,
330 return_type,
331 language,
332 body,
333 } => Ok(PhysicalPlan::CreateFunction {
334 name: name.clone(),
335 params: params.clone(),
336 return_type: return_type.clone(),
337 language: language.clone(),
338 body: body.clone(),
339 }),
340 LogicalPlan::CreateTrigger {
341 name,
342 timing,
343 event,
344 table,
345 for_each,
346 function,
347 } => Ok(PhysicalPlan::CreateTrigger {
348 name: name.clone(),
349 timing: *timing,
350 event: *event,
351 table: table.clone(),
352 for_each: *for_each,
353 function: function.clone(),
354 }),
355 LogicalPlan::CreateJob {
356 name,
357 schedule,
358 function,
359 } => Ok(PhysicalPlan::CreateJob {
360 name: name.clone(),
361 schedule: schedule.clone(),
362 function: function.clone(),
363 }),
364 LogicalPlan::DropFunction { name, if_exists } => Ok(PhysicalPlan::DropFunction {
365 name: name.clone(),
366 if_exists: *if_exists,
367 }),
368 LogicalPlan::DropTrigger { name, if_exists } => Ok(PhysicalPlan::DropTrigger {
369 name: name.clone(),
370 if_exists: *if_exists,
371 }),
372 LogicalPlan::DropJob { name, if_exists } => Ok(PhysicalPlan::DropJob {
373 name: name.clone(),
374 if_exists: *if_exists,
375 }),
376 }
377 }
378
379 fn plan_physical_expr(&self, expr: &Expr, schema: &[String]) -> DbxResult<PhysicalExpr> {
380 match expr {
381 Expr::Column(name) => {
382 if let Some(idx) = schema
383 .iter()
384 .position(|s| s.to_lowercase() == name.to_lowercase())
385 {
386 Ok(PhysicalExpr::Column(idx))
387 } else {
388 Err(DbxError::Schema(format!(
389 "Column '{}' not found in schema: {:?}",
390 name, schema
391 )))
392 }
393 }
394 Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
395 Expr::BinaryOp { left, op, right } => Ok(PhysicalExpr::BinaryOp {
396 left: Box::new(self.plan_physical_expr(left, schema)?),
397 op: *op,
398 right: Box::new(self.plan_physical_expr(right, schema)?),
399 }),
400 Expr::IsNull(expr) => Ok(PhysicalExpr::IsNull(Box::new(
401 self.plan_physical_expr(expr, schema)?,
402 ))),
403 Expr::IsNotNull(expr) => Ok(PhysicalExpr::IsNotNull(Box::new(
404 self.plan_physical_expr(expr, schema)?,
405 ))),
406 Expr::ScalarFunc { func, args } => {
407 let physical_args = args
408 .iter()
409 .map(|arg| self.plan_physical_expr(arg, schema))
410 .collect::<DbxResult<Vec<_>>>()?;
411 Ok(PhysicalExpr::ScalarFunc {
412 func: *func,
413 args: physical_args,
414 })
415 }
416 _ => Err(DbxError::NotImplemented(format!(
417 "Physical expression not supported: {:?}",
418 expr
419 ))),
420 }
421 }
422
423 fn extract_schema(&self, plan: &PhysicalPlan) -> Vec<String> {
425 match plan {
426 PhysicalPlan::TableScan { table, .. } => {
427 let schemas = self.table_schemas.read().unwrap();
429 let schema = schemas.get(table).or_else(|| {
431 let table_lower = table.to_lowercase();
432 schemas
433 .iter()
434 .find(|(k, _)| k.to_lowercase() == table_lower)
435 .map(|(_, v)| v)
436 });
437
438 if let Some(schema) = schema {
439 schema.fields().iter().map(|f| f.name().clone()).collect()
440 } else {
441 vec![]
442 }
443 }
444 PhysicalPlan::Projection { exprs, aliases, .. } => exprs
445 .iter()
446 .enumerate()
447 .map(|(i, _)| {
448 if let Some(alias) = aliases.get(i) {
449 alias.clone().unwrap_or_else(|| format!("col_{}", i))
450 } else {
451 format!("col_{}", i)
452 }
453 })
454 .collect(),
455 PhysicalPlan::HashAggregate {
456 input,
457 group_by,
458 aggregates,
459 ..
460 } => {
461 let input_schema = self.extract_schema(input);
462 let mut fields = Vec::new();
463 for &idx in group_by {
464 fields.push(
465 input_schema
466 .get(idx)
467 .cloned()
468 .unwrap_or_else(|| format!("col_{}", idx)),
469 );
470 }
471 for agg in aggregates {
472 fields.push(
473 agg.alias
474 .clone()
475 .unwrap_or_else(|| format!("agg_{:?}", agg.function)),
476 );
477 }
478 fields
479 }
480 PhysicalPlan::SortMerge { input, .. } => self.extract_schema(input),
481 PhysicalPlan::Limit { input, .. } => self.extract_schema(input),
482 PhysicalPlan::HashJoin { left, right, .. } => {
483 let mut fields = self.extract_schema(left);
484 fields.extend(self.extract_schema(right));
485 fields
486 }
487 PhysicalPlan::Insert { columns, .. } => columns.clone(),
488 PhysicalPlan::Update { .. } => vec![],
489 PhysicalPlan::Delete { .. } => vec![],
490 PhysicalPlan::DropTable { .. } => vec![],
491 PhysicalPlan::CreateTable { .. } => vec![],
492 PhysicalPlan::CreateIndex { .. } => vec![],
493 PhysicalPlan::DropIndex { .. } => vec![],
494 PhysicalPlan::AlterTable { .. } => vec![],
495 PhysicalPlan::CreateFunction { .. } => vec![],
496 PhysicalPlan::CreateTrigger { .. } => vec![],
497 PhysicalPlan::CreateJob { .. } => vec![],
498 PhysicalPlan::DropFunction { .. } => vec![],
499 PhysicalPlan::DropTrigger { .. } => vec![],
500 PhysicalPlan::DropJob { .. } => vec![],
501 PhysicalPlan::GridExchange { .. } => vec![], PhysicalPlan::ShuffleWriter { .. } => vec![], }
504 }
505
506 fn parse_join_condition(
509 &self,
510 on: &Expr,
511 left_schema: &[String],
512 right_schema: &[String],
513 ) -> DbxResult<Vec<(usize, usize)>> {
514 let mut pairs = Vec::new();
515 self.extract_join_pairs(on, left_schema, right_schema, &mut pairs)?;
516
517 if pairs.is_empty() {
518 pairs.push((0, 1));
520 }
521
522 Ok(pairs)
523 }
524
525 fn extract_join_pairs(
527 &self,
528 expr: &Expr,
529 left_schema: &[String],
530 right_schema: &[String],
531 pairs: &mut Vec<(usize, usize)>,
532 ) -> DbxResult<()> {
533 match expr {
534 Expr::BinaryOp { left, op, right } => {
535 match op {
536 BinaryOperator::Eq => {
537 let left_col = self.extract_column_name(left)?;
539 let right_col = self.extract_column_name(right)?;
540
541 let left_idx =
544 self.resolve_column_index(&left_col, left_schema, right_schema, true)?;
545 let right_idx = self.resolve_column_index(
546 &right_col,
547 left_schema,
548 right_schema,
549 false,
550 )?;
551
552 pairs.push((left_idx, right_idx));
553 }
554 BinaryOperator::And => {
555 self.extract_join_pairs(left, left_schema, right_schema, pairs)?;
557 self.extract_join_pairs(right, left_schema, right_schema, pairs)?;
558 }
559 _ => {
560 return Err(DbxError::NotImplemented(format!(
561 "JOIN condition operator not supported: {:?}",
562 op
563 )));
564 }
565 }
566 }
567 _ => {
568 return Err(DbxError::NotImplemented(format!(
569 "JOIN condition expression not supported: {:?}",
570 expr
571 )));
572 }
573 }
574 Ok(())
575 }
576
577 fn extract_column_name(&self, expr: &Expr) -> DbxResult<String> {
579 match expr {
580 Expr::Column(name) => {
581 if let Some(dot_pos) = name.rfind('.') {
583 Ok(name[dot_pos + 1..].to_string())
584 } else {
585 Ok(name.clone())
586 }
587 }
588 _ => Err(DbxError::NotImplemented(format!(
589 "Expected column reference, got: {:?}",
590 expr
591 ))),
592 }
593 }
594
595 fn resolve_column_index(
597 &self,
598 col_name: &str,
599 left_schema: &[String],
600 right_schema: &[String],
601 prefer_left: bool,
602 ) -> DbxResult<usize> {
603 if prefer_left {
605 if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
606 return Ok(idx);
607 }
608 if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
609 return Ok(idx);
610 }
611 } else {
612 if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
613 return Ok(idx);
614 }
615 if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
616 return Ok(idx);
617 }
618 }
619
620 match col_name {
623 "id" => Ok(0),
624 "user_id" => Ok(1),
625 "name" => Ok(1),
626 _ => Ok(0),
627 }
628 }
629}
630
631impl Default for PhysicalPlanner {
632 fn default() -> Self {
633 Self::new(Arc::new(RwLock::new(HashMap::new())))
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 use crate::sql::{LogicalPlanner, SqlParser};
641
642 #[test]
643 fn test_physical_plan_simple_select() {
644 let parser = SqlParser::new();
645 let statements = parser.parse("SELECT * FROM users").unwrap();
646
647 let planner = LogicalPlanner::new();
649 let logical_plan = planner.plan(&statements[0]).unwrap();
650 println!("🔍 execute_sql: Logical Plan created: {:?}", logical_plan);
651
652 let table_schemas = Arc::new(RwLock::new(HashMap::new()));
654 let schema = Arc::new(Schema::new(vec![
655 arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int32, false),
656 arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false),
657 ]));
658 table_schemas
659 .write()
660 .unwrap()
661 .insert("users".to_string(), schema);
662
663 let physical_planner = PhysicalPlanner::new(table_schemas);
664 let physical_plan = physical_planner.plan(&logical_plan).unwrap();
665
666 match physical_plan {
667 PhysicalPlan::Projection { input, .. } => match input.as_ref() {
668 PhysicalPlan::TableScan { table, .. } => {
669 assert_eq!(table, "users");
670 }
671 _ => panic!("Expected TableScan inside Projection"),
672 },
673 PhysicalPlan::TableScan { table, .. } => {
674 assert_eq!(table, "users");
675 }
676 _ => panic!("Expected Projection or TableScan"),
677 }
678 }
679
680 #[test]
681 fn test_physical_plan_analytical_detection() {
682 let plan1 = PhysicalPlan::TableScan {
684 table: "users".to_string(),
685 projection: vec![0, 1],
686 filter: None,
687 ros_files: vec![],
688 };
689 assert!(!plan1.is_analytical());
690 assert_eq!(plan1.tables(), vec!["users"]);
691
692 let plan2 = PhysicalPlan::TableScan {
694 table: "users".to_string(),
695 projection: vec![0, 1],
696 filter: Some(PhysicalExpr::Column(0)),
697 ros_files: vec![],
698 };
699 assert!(plan2.is_analytical());
700
701 let plan3 = PhysicalPlan::HashJoin {
703 left: Box::new(PhysicalPlan::TableScan {
704 table: "users".to_string(),
705 projection: vec![0],
706 filter: None,
707 ros_files: vec![],
708 }),
709 right: Box::new(PhysicalPlan::TableScan {
710 table: "orders".to_string(),
711 projection: vec![0],
712 filter: None,
713 ros_files: vec![],
714 }),
715 on: vec![(0, 0)],
716 join_type: JoinType::Inner,
717 };
718 assert!(plan3.is_analytical());
719 let tables = plan3.tables();
720 assert!(tables.contains(&"users".to_string()));
721 assert!(tables.contains(&"orders".to_string()));
722 }
723}