Skip to main content

dbx_core/sql/planner/
physical.rs

1//! PhysicalPlanner 구현
2//!
3//! LogicalPlan → PhysicalPlan 변환
4
5use super::types::*;
6use crate::error::{DbxError, DbxResult};
7use arrow::datatypes::Schema;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10
11/// 물리 플랜 빌더 — LogicalPlan → PhysicalPlan 변환
12pub struct PhysicalPlanner {
13    table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
14}
15
16impl PhysicalPlanner {
17    pub fn new(table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>) -> Self {
18        Self { table_schemas }
19    }
20
21    /// Convert LogicalPlan → PhysicalPlan
22    pub fn plan(&self, logical_plan: &LogicalPlan) -> DbxResult<PhysicalPlan> {
23        match logical_plan {
24            LogicalPlan::Scan {
25                table,
26                columns: _,
27                filter,
28            } => {
29                let schemas = self.table_schemas.read().unwrap();
30                let schema = schemas
31                    .get(table)
32                    .or_else(|| {
33                        let table_lower = table.to_lowercase();
34                        schemas
35                            .iter()
36                            .find(|(k, _)| k.to_lowercase() == table_lower)
37                            .map(|(_, v)| v)
38                    })
39                    .cloned()
40                    .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
41
42                let column_names: Vec<String> =
43                    schema.fields().iter().map(|f| f.name().clone()).collect();
44                drop(schemas);
45
46                let physical_filter = filter
47                    .as_ref()
48                    .map(|f| self.plan_physical_expr(f, &column_names))
49                    .transpose()?;
50
51                Ok(PhysicalPlan::TableScan {
52                    table: table.clone(),
53                    projection: vec![],
54                    filter: physical_filter,
55                })
56            }
57            LogicalPlan::Project { input, projections } => {
58                let input_plan = self.plan(input)?;
59                let input_schema = self.extract_schema(&input_plan);
60                let mut physical_exprs = Vec::new();
61                let mut aliases = Vec::new();
62
63                for (expr, alias) in projections {
64                    physical_exprs.push(self.plan_physical_expr(expr, &input_schema)?);
65                    aliases.push(alias.clone());
66                }
67
68                Ok(PhysicalPlan::Projection {
69                    input: Box::new(input_plan),
70                    exprs: physical_exprs,
71                    aliases,
72                })
73            }
74            LogicalPlan::Filter { input, predicate } => {
75                let mut input_plan = self.plan(input)?;
76                let input_schema = self.extract_schema(&input_plan);
77                let physical_pred = self.plan_physical_expr(predicate, &input_schema)?;
78
79                match &mut input_plan {
80                    PhysicalPlan::TableScan { filter, .. } if filter.is_none() => {
81                        *filter = Some(physical_pred);
82                        Ok(input_plan)
83                    }
84                    _ => Ok(PhysicalPlan::Projection {
85                        input: Box::new(input_plan),
86                        exprs: vec![physical_pred],
87                        aliases: vec![None], // Filter result column placeholder
88                    }),
89                }
90            }
91            LogicalPlan::Aggregate {
92                input,
93                group_by,
94                aggregates,
95            } => {
96                let input_plan = self.plan(input)?;
97                let input_schema = self.extract_schema(&input_plan);
98
99                let group_by_indices: Vec<usize> = group_by
100                    .iter()
101                    .map(|e| match e {
102                        Expr::Column(name) => {
103                            input_schema.iter().position(|s| s == name).unwrap_or(0)
104                        }
105                        _ => 0,
106                    })
107                    .collect();
108                let physical_aggs = aggregates
109                    .iter()
110                    .map(|agg| PhysicalAggExpr {
111                        function: agg.function,
112                        input: match &agg.expr {
113                            Expr::Column(name) => {
114                                input_schema.iter().position(|s| s == name).unwrap_or(0)
115                            }
116                            _ => 0,
117                        },
118                        alias: agg.alias.clone(),
119                    })
120                    .collect();
121                Ok(PhysicalPlan::HashAggregate {
122                    input: Box::new(input_plan),
123                    group_by: group_by_indices,
124                    aggregates: physical_aggs,
125                })
126            }
127            LogicalPlan::Sort { input, order_by } => {
128                let input_plan = self.plan(input)?;
129                let input_schema = self.extract_schema(&input_plan);
130
131                let order_by_physical: Vec<(usize, bool)> = order_by
132                    .iter()
133                    .map(|s| {
134                        let idx = match &s.expr {
135                            Expr::Column(name) => {
136                                input_schema.iter().position(|n| n == name).unwrap_or(0)
137                            }
138                            _ => 0,
139                        };
140                        (idx, s.asc)
141                    })
142                    .collect();
143                Ok(PhysicalPlan::SortMerge {
144                    input: Box::new(input_plan),
145                    order_by: order_by_physical,
146                })
147            }
148            LogicalPlan::Limit {
149                input,
150                count,
151                offset,
152            } => {
153                let input_plan = self.plan(input)?;
154                Ok(PhysicalPlan::Limit {
155                    input: Box::new(input_plan),
156                    count: *count,
157                    offset: *offset,
158                })
159            }
160            LogicalPlan::Join {
161                left,
162                right,
163                join_type,
164                on,
165            } => {
166                let left_plan = self.plan(left)?;
167                let right_plan = self.plan(right)?;
168
169                let left_schema = self.extract_schema(&left_plan);
170                let right_schema = self.extract_schema(&right_plan);
171
172                let on_pairs = self.parse_join_condition(on, &left_schema, &right_schema)?;
173
174                Ok(PhysicalPlan::HashJoin {
175                    left: Box::new(left_plan),
176                    right: Box::new(right_plan),
177                    on: on_pairs,
178                    join_type: *join_type,
179                })
180            }
181            LogicalPlan::Insert {
182                table,
183                columns,
184                values,
185            } => {
186                // Convert logical expressions to physical expressions
187                let physical_values: Vec<Vec<PhysicalExpr>> = values
188                    .iter()
189                    .map(|row| {
190                        row.iter()
191                            .map(|expr| match expr {
192                                Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
193                                Expr::Column(_name) => {
194                                    // For INSERT, columns are literal values, not references
195                                    Err(DbxError::SqlNotSupported {
196                                        feature: "Column references in INSERT VALUES".to_string(),
197                                        hint: "Use literal values only".to_string(),
198                                    })
199                                }
200                                _ => Err(DbxError::SqlNotSupported {
201                                    feature: format!("Expression in INSERT VALUES: {:?}", expr),
202                                    hint: "Use literal values only".to_string(),
203                                }),
204                            })
205                            .collect::<DbxResult<Vec<_>>>()
206                    })
207                    .collect::<DbxResult<Vec<_>>>()?;
208
209                Ok(PhysicalPlan::Insert {
210                    table: table.clone(),
211                    columns: columns.clone(),
212                    values: physical_values,
213                })
214            }
215            LogicalPlan::Update {
216                table,
217                assignments,
218                filter,
219            } => {
220                // Convert assignments to physical expressions
221                let physical_assignments: Vec<(String, PhysicalExpr)> = assignments
222                    .iter()
223                    .map(|(col, expr)| {
224                        let physical_expr = match expr {
225                            Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
226                            _ => Err(DbxError::NotImplemented(
227                                "Non-literal UPDATE values not yet supported".to_string(),
228                            )),
229                        }?;
230                        Ok((col.clone(), physical_expr))
231                    })
232                    .collect::<DbxResult<Vec<_>>>()?;
233
234                // Convert filter using full expression planner (same as SELECT)
235                let physical_filter = if let Some(f) = filter.as_ref() {
236                    let schemas = self.table_schemas.read().unwrap();
237                    let column_names: Vec<String> = schemas
238                        .get(table)
239                        .map(|schema| {
240                            schema
241                                .fields()
242                                .iter()
243                                .map(|field| field.name().clone())
244                                .collect()
245                        })
246                        .unwrap_or_default();
247                    drop(schemas);
248                    Some(self.plan_physical_expr(f, &column_names)?)
249                } else {
250                    None
251                };
252
253                Ok(PhysicalPlan::Update {
254                    table: table.clone(),
255                    assignments: physical_assignments,
256                    filter: physical_filter,
257                })
258            }
259            LogicalPlan::Delete { table, filter } => {
260                // Convert filter using full expression planner (same as SELECT)
261                let physical_filter = if let Some(f) = filter.as_ref() {
262                    let schemas = self.table_schemas.read().unwrap();
263                    let column_names: Vec<String> = schemas
264                        .get(table)
265                        .map(|schema| {
266                            schema
267                                .fields()
268                                .iter()
269                                .map(|field| field.name().clone())
270                                .collect()
271                        })
272                        .unwrap_or_default();
273                    drop(schemas);
274                    Some(self.plan_physical_expr(f, &column_names)?)
275                } else {
276                    None
277                };
278
279                Ok(PhysicalPlan::Delete {
280                    table: table.clone(),
281                    filter: physical_filter,
282                })
283            }
284            LogicalPlan::DropTable { table, if_exists } => Ok(PhysicalPlan::DropTable {
285                table: table.clone(),
286                if_exists: *if_exists,
287            }),
288            LogicalPlan::CreateTable {
289                table,
290                columns,
291                if_not_exists,
292            } => Ok(PhysicalPlan::CreateTable {
293                table: table.clone(),
294                columns: columns.clone(),
295                if_not_exists: *if_not_exists,
296            }),
297            LogicalPlan::CreateIndex {
298                table,
299                index_name,
300                columns,
301                if_not_exists,
302            } => Ok(PhysicalPlan::CreateIndex {
303                table: table.clone(),
304                index_name: index_name.clone(),
305                columns: columns.clone(),
306                if_not_exists: *if_not_exists,
307            }),
308            LogicalPlan::DropIndex {
309                table,
310                index_name,
311                if_exists,
312            } => Ok(PhysicalPlan::DropIndex {
313                table: table.clone(),
314                index_name: index_name.clone(),
315                if_exists: *if_exists,
316            }),
317            LogicalPlan::AlterTable { table, operation } => Ok(PhysicalPlan::AlterTable {
318                table: table.clone(),
319                operation: operation.clone(),
320            }),
321            LogicalPlan::CreateFunction {
322                name,
323                params,
324                return_type,
325                language,
326                body,
327            } => Ok(PhysicalPlan::CreateFunction {
328                name: name.clone(),
329                params: params.clone(),
330                return_type: return_type.clone(),
331                language: language.clone(),
332                body: body.clone(),
333            }),
334            LogicalPlan::CreateTrigger {
335                name,
336                timing,
337                event,
338                table,
339                for_each,
340                function,
341            } => Ok(PhysicalPlan::CreateTrigger {
342                name: name.clone(),
343                timing: *timing,
344                event: *event,
345                table: table.clone(),
346                for_each: *for_each,
347                function: function.clone(),
348            }),
349            LogicalPlan::CreateJob {
350                name,
351                schedule,
352                function,
353            } => Ok(PhysicalPlan::CreateJob {
354                name: name.clone(),
355                schedule: schedule.clone(),
356                function: function.clone(),
357            }),
358            LogicalPlan::DropFunction { name, if_exists } => Ok(PhysicalPlan::DropFunction {
359                name: name.clone(),
360                if_exists: *if_exists,
361            }),
362            LogicalPlan::DropTrigger { name, if_exists } => Ok(PhysicalPlan::DropTrigger {
363                name: name.clone(),
364                if_exists: *if_exists,
365            }),
366            LogicalPlan::DropJob { name, if_exists } => Ok(PhysicalPlan::DropJob {
367                name: name.clone(),
368                if_exists: *if_exists,
369            }),
370        }
371    }
372
373    fn plan_physical_expr(&self, expr: &Expr, schema: &[String]) -> DbxResult<PhysicalExpr> {
374        match expr {
375            Expr::Column(name) => {
376                if let Some(idx) = schema
377                    .iter()
378                    .position(|s| s.to_lowercase() == name.to_lowercase())
379                {
380                    Ok(PhysicalExpr::Column(idx))
381                } else {
382                    Err(DbxError::Schema(format!(
383                        "Column '{}' not found in schema: {:?}",
384                        name, schema
385                    )))
386                }
387            }
388            Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
389            Expr::BinaryOp { left, op, right } => Ok(PhysicalExpr::BinaryOp {
390                left: Box::new(self.plan_physical_expr(left, schema)?),
391                op: *op,
392                right: Box::new(self.plan_physical_expr(right, schema)?),
393            }),
394            Expr::IsNull(expr) => Ok(PhysicalExpr::IsNull(Box::new(
395                self.plan_physical_expr(expr, schema)?,
396            ))),
397            Expr::IsNotNull(expr) => Ok(PhysicalExpr::IsNotNull(Box::new(
398                self.plan_physical_expr(expr, schema)?,
399            ))),
400            Expr::ScalarFunc { func, args } => {
401                let physical_args = args
402                    .iter()
403                    .map(|arg| self.plan_physical_expr(arg, schema))
404                    .collect::<DbxResult<Vec<_>>>()?;
405                Ok(PhysicalExpr::ScalarFunc {
406                    func: *func,
407                    args: physical_args,
408                })
409            }
410            _ => Err(DbxError::NotImplemented(format!(
411                "Physical expression not supported: {:?}",
412                expr
413            ))),
414        }
415    }
416
417    /// Extract schema field names from PhysicalPlan.
418    fn extract_schema(&self, plan: &PhysicalPlan) -> Vec<String> {
419        match plan {
420            PhysicalPlan::TableScan { table, .. } => {
421                // Return actual table column names from stored schemas
422                let schemas = self.table_schemas.read().unwrap();
423                // Try case-insensitive lookup
424                let schema = schemas.get(table).or_else(|| {
425                    let table_lower = table.to_lowercase();
426                    schemas
427                        .iter()
428                        .find(|(k, _)| k.to_lowercase() == table_lower)
429                        .map(|(_, v)| v)
430                });
431
432                if let Some(schema) = schema {
433                    schema.fields().iter().map(|f| f.name().clone()).collect()
434                } else {
435                    vec![]
436                }
437            }
438            PhysicalPlan::Projection { exprs, aliases, .. } => exprs
439                .iter()
440                .enumerate()
441                .map(|(i, _)| {
442                    if let Some(alias) = aliases.get(i) {
443                        alias.clone().unwrap_or_else(|| format!("col_{}", i))
444                    } else {
445                        format!("col_{}", i)
446                    }
447                })
448                .collect(),
449            PhysicalPlan::HashAggregate {
450                input,
451                group_by,
452                aggregates,
453            } => {
454                let input_schema = self.extract_schema(input);
455                let mut fields = Vec::new();
456                for &idx in group_by {
457                    fields.push(
458                        input_schema
459                            .get(idx)
460                            .cloned()
461                            .unwrap_or_else(|| format!("col_{}", idx)),
462                    );
463                }
464                for agg in aggregates {
465                    fields.push(
466                        agg.alias
467                            .clone()
468                            .unwrap_or_else(|| format!("agg_{:?}", agg.function)),
469                    );
470                }
471                fields
472            }
473            PhysicalPlan::SortMerge { input, .. } => self.extract_schema(input),
474            PhysicalPlan::Limit { input, .. } => self.extract_schema(input),
475            PhysicalPlan::HashJoin { left, right, .. } => {
476                let mut fields = self.extract_schema(left);
477                fields.extend(self.extract_schema(right));
478                fields
479            }
480            PhysicalPlan::Insert { columns, .. } => columns.clone(),
481            PhysicalPlan::Update { .. } => vec![],
482            PhysicalPlan::Delete { .. } => vec![],
483            PhysicalPlan::DropTable { .. } => vec![],
484            PhysicalPlan::CreateTable { .. } => vec![],
485            PhysicalPlan::CreateIndex { .. } => vec![],
486            PhysicalPlan::DropIndex { .. } => vec![],
487            PhysicalPlan::AlterTable { .. } => vec![],
488            PhysicalPlan::CreateFunction { .. } => vec![],
489            PhysicalPlan::CreateTrigger { .. } => vec![],
490            PhysicalPlan::CreateJob { .. } => vec![],
491            PhysicalPlan::DropFunction { .. } => vec![],
492            PhysicalPlan::DropTrigger { .. } => vec![],
493            PhysicalPlan::DropJob { .. } => vec![],
494        }
495    }
496
497    /// Parse JOIN ON condition to extract (left_col_idx, right_col_idx) pairs.
498    /// Supports: col1 = col2 AND col3 = col4 ...
499    fn parse_join_condition(
500        &self,
501        on: &Expr,
502        left_schema: &[String],
503        right_schema: &[String],
504    ) -> DbxResult<Vec<(usize, usize)>> {
505        let mut pairs = Vec::new();
506        self.extract_join_pairs(on, left_schema, right_schema, &mut pairs)?;
507
508        if pairs.is_empty() {
509            // Fallback: use (0, 1) if no pairs extracted
510            pairs.push((0, 1));
511        }
512
513        Ok(pairs)
514    }
515
516    /// Recursively extract join column pairs from ON expression.
517    fn extract_join_pairs(
518        &self,
519        expr: &Expr,
520        left_schema: &[String],
521        right_schema: &[String],
522        pairs: &mut Vec<(usize, usize)>,
523    ) -> DbxResult<()> {
524        match expr {
525            Expr::BinaryOp { left, op, right } => {
526                match op {
527                    BinaryOperator::Eq => {
528                        // Extract column names from left = right
529                        let left_col = self.extract_column_name(left)?;
530                        let right_col = self.extract_column_name(right)?;
531
532                        // Resolve column indices
533                        // Try to find in left schema first, then right
534                        let left_idx =
535                            self.resolve_column_index(&left_col, left_schema, right_schema, true)?;
536                        let right_idx = self.resolve_column_index(
537                            &right_col,
538                            left_schema,
539                            right_schema,
540                            false,
541                        )?;
542
543                        pairs.push((left_idx, right_idx));
544                    }
545                    BinaryOperator::And => {
546                        // Recursively process AND conditions
547                        self.extract_join_pairs(left, left_schema, right_schema, pairs)?;
548                        self.extract_join_pairs(right, left_schema, right_schema, pairs)?;
549                    }
550                    _ => {
551                        return Err(DbxError::NotImplemented(format!(
552                            "JOIN condition operator not supported: {:?}",
553                            op
554                        )));
555                    }
556                }
557            }
558            _ => {
559                return Err(DbxError::NotImplemented(format!(
560                    "JOIN condition expression not supported: {:?}",
561                    expr
562                )));
563            }
564        }
565        Ok(())
566    }
567
568    /// Extract column name from expression (handles table.column format).
569    fn extract_column_name(&self, expr: &Expr) -> DbxResult<String> {
570        match expr {
571            Expr::Column(name) => {
572                // Handle "table.column" or just "column"
573                if let Some(dot_pos) = name.rfind('.') {
574                    Ok(name[dot_pos + 1..].to_string())
575                } else {
576                    Ok(name.clone())
577                }
578            }
579            _ => Err(DbxError::NotImplemented(format!(
580                "Expected column reference, got: {:?}",
581                expr
582            ))),
583        }
584    }
585
586    /// Resolve column name to index in schema.
587    fn resolve_column_index(
588        &self,
589        col_name: &str,
590        left_schema: &[String],
591        right_schema: &[String],
592        prefer_left: bool,
593    ) -> DbxResult<usize> {
594        // Try preferred schema first
595        if prefer_left {
596            if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
597                return Ok(idx);
598            }
599            if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
600                return Ok(idx);
601            }
602        } else {
603            if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
604                return Ok(idx);
605            }
606            if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
607                return Ok(idx);
608            }
609        }
610
611        // Fallback: use hardcoded indices based on column name
612        // This is a temporary workaround until we have proper schema binding
613        match col_name {
614            "id" => Ok(0),
615            "user_id" => Ok(1),
616            "name" => Ok(1),
617            _ => Ok(0),
618        }
619    }
620}
621
622impl Default for PhysicalPlanner {
623    fn default() -> Self {
624        Self::new(Arc::new(RwLock::new(HashMap::new())))
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631    use crate::sql::{LogicalPlanner, SqlParser};
632
633    #[test]
634    fn test_physical_plan_simple_select() {
635        let parser = SqlParser::new();
636        let statements = parser.parse("SELECT * FROM users").unwrap();
637
638        // Step 2: Logical Plan
639        let planner = LogicalPlanner::new();
640        let logical_plan = planner.plan(&statements[0]).unwrap();
641        println!("🔍 execute_sql: Logical Plan created: {:?}", logical_plan);
642
643        // Inject schema for 'users'
644        let table_schemas = Arc::new(RwLock::new(HashMap::new()));
645        let schema = Arc::new(Schema::new(vec![
646            arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int32, false),
647            arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false),
648        ]));
649        table_schemas
650            .write()
651            .unwrap()
652            .insert("users".to_string(), schema);
653
654        let physical_planner = PhysicalPlanner::new(table_schemas);
655        let physical_plan = physical_planner.plan(&logical_plan).unwrap();
656
657        match physical_plan {
658            PhysicalPlan::Projection { input, .. } => match input.as_ref() {
659                PhysicalPlan::TableScan { table, .. } => {
660                    assert_eq!(table, "users");
661                }
662                _ => panic!("Expected TableScan inside Projection"),
663            },
664            PhysicalPlan::TableScan { table, .. } => {
665                assert_eq!(table, "users");
666            }
667            _ => panic!("Expected Projection or TableScan"),
668        }
669    }
670
671    #[test]
672    fn test_physical_plan_analytical_detection() {
673        // 1. Simple TableScan (not analytical)
674        let plan1 = PhysicalPlan::TableScan {
675            table: "users".to_string(),
676            projection: vec![0, 1],
677            filter: None,
678        };
679        assert!(!plan1.is_analytical());
680        assert_eq!(plan1.tables(), vec!["users"]);
681
682        // 2. TableScan with Filter (analytical)
683        let plan2 = PhysicalPlan::TableScan {
684            table: "users".to_string(),
685            projection: vec![0, 1],
686            filter: Some(PhysicalExpr::Column(0)),
687        };
688        assert!(plan2.is_analytical());
689
690        // 3. HashJoin (analytical)
691        let plan3 = PhysicalPlan::HashJoin {
692            left: Box::new(PhysicalPlan::TableScan {
693                table: "users".to_string(),
694                projection: vec![0],
695                filter: None,
696            }),
697            right: Box::new(PhysicalPlan::TableScan {
698                table: "orders".to_string(),
699                projection: vec![0],
700                filter: None,
701            }),
702            on: vec![(0, 0)],
703            join_type: JoinType::Inner,
704        };
705        assert!(plan3.is_analytical());
706        let tables = plan3.tables();
707        assert!(tables.contains(&"users".to_string()));
708        assert!(tables.contains(&"orders".to_string()));
709    }
710}