Skip to main content

dbx_core/sql/planner/
physical.rs

1//! PhysicalPlanner 구현
2//!
3//! LogicalPlan → PhysicalPlan 변환
4
5use super::types::*;
6use crate::error::{DbxError, DbxResult};
7use arrow::datatypes::Schema;
8use std::collections::HashMap;
9use std::sync::{Arc, RwLock};
10
11/// 물리 플랜 빌더 — LogicalPlan → PhysicalPlan 변환
12pub struct PhysicalPlanner {
13    table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>,
14}
15
16impl PhysicalPlanner {
17    pub fn new(table_schemas: Arc<RwLock<HashMap<String, Arc<Schema>>>>) -> Self {
18        Self { table_schemas }
19    }
20
21    /// Convert LogicalPlan → PhysicalPlan
22    pub fn plan(&self, logical_plan: &LogicalPlan) -> DbxResult<PhysicalPlan> {
23        match logical_plan {
24            LogicalPlan::Scan {
25                table,
26                columns: _,
27                filter,
28                ros_files,
29            } => {
30                let schemas = self.table_schemas.read().unwrap();
31                let schema = schemas
32                    .get(table)
33                    .or_else(|| {
34                        let table_lower = table.to_lowercase();
35                        schemas
36                            .iter()
37                            .find(|(k, _)| k.to_lowercase() == table_lower)
38                            .map(|(_, v)| v)
39                    })
40                    .cloned()
41                    .ok_or_else(|| DbxError::TableNotFound(table.clone()))?;
42
43                let column_names: Vec<String> =
44                    schema.fields().iter().map(|f| f.name().clone()).collect();
45                drop(schemas);
46
47                let physical_filter = filter
48                    .as_ref()
49                    .map(|f| self.plan_physical_expr(f, &column_names))
50                    .transpose()?;
51
52                Ok(PhysicalPlan::TableScan {
53                    table: table.clone(),
54                    projection: vec![],
55                    filter: physical_filter,
56                    ros_files: ros_files.clone(),
57                })
58            }
59            LogicalPlan::Project { input, projections } => {
60                let input_plan = self.plan(input)?;
61                let input_schema = self.extract_schema(&input_plan);
62                let mut physical_exprs = Vec::new();
63                let mut aliases = Vec::new();
64
65                for (expr, alias) in projections {
66                    physical_exprs.push(self.plan_physical_expr(expr, &input_schema)?);
67                    aliases.push(alias.clone());
68                }
69
70                Ok(PhysicalPlan::Projection {
71                    input: Box::new(input_plan),
72                    exprs: physical_exprs,
73                    aliases,
74                })
75            }
76            LogicalPlan::Filter { input, predicate } => {
77                let mut input_plan = self.plan(input)?;
78                let input_schema = self.extract_schema(&input_plan);
79                let physical_pred = self.plan_physical_expr(predicate, &input_schema)?;
80
81                match &mut input_plan {
82                    PhysicalPlan::TableScan { filter, .. } if filter.is_none() => {
83                        *filter = Some(physical_pred);
84                        Ok(input_plan)
85                    }
86                    _ => Ok(PhysicalPlan::Projection {
87                        input: Box::new(input_plan),
88                        exprs: vec![physical_pred],
89                        aliases: vec![None], // Filter result column placeholder
90                    }),
91                }
92            }
93            LogicalPlan::Aggregate {
94                input,
95                group_by,
96                aggregates,
97                mode,
98            } => {
99                let input_plan = self.plan(input)?;
100                let input_schema = self.extract_schema(&input_plan);
101
102                let group_by_indices: Vec<usize> = group_by
103                    .iter()
104                    .map(|e| match e {
105                        Expr::Column(name) => {
106                            input_schema.iter().position(|s| s == name).unwrap_or(0)
107                        }
108                        _ => 0,
109                    })
110                    .collect();
111                let physical_aggs = aggregates
112                    .iter()
113                    .map(|agg| PhysicalAggExpr {
114                        function: agg.function,
115                        input: match &agg.expr {
116                            Expr::Column(name) => {
117                                input_schema.iter().position(|s| s == name).unwrap_or(0)
118                            }
119                            _ => 0,
120                        },
121                        alias: agg.alias.clone(),
122                    })
123                    .collect();
124                Ok(PhysicalPlan::HashAggregate {
125                    input: Box::new(input_plan),
126                    group_by: group_by_indices,
127                    aggregates: physical_aggs,
128                    mode: *mode,
129                })
130            }
131            LogicalPlan::Sort { input, order_by } => {
132                let input_plan = self.plan(input)?;
133                let input_schema = self.extract_schema(&input_plan);
134
135                let order_by_physical: Vec<(usize, bool)> = order_by
136                    .iter()
137                    .map(|s| {
138                        let idx = match &s.expr {
139                            Expr::Column(name) => {
140                                input_schema.iter().position(|n| n == name).unwrap_or(0)
141                            }
142                            _ => 0,
143                        };
144                        (idx, s.asc)
145                    })
146                    .collect();
147                Ok(PhysicalPlan::SortMerge {
148                    input: Box::new(input_plan),
149                    order_by: order_by_physical,
150                })
151            }
152            LogicalPlan::Limit {
153                input,
154                count,
155                offset,
156            } => {
157                let input_plan = self.plan(input)?;
158                Ok(PhysicalPlan::Limit {
159                    input: Box::new(input_plan),
160                    count: *count,
161                    offset: *offset,
162                })
163            }
164            LogicalPlan::Join {
165                left,
166                right,
167                join_type,
168                on,
169            } => {
170                let left_plan = self.plan(left)?;
171                let right_plan = self.plan(right)?;
172
173                let left_schema = self.extract_schema(&left_plan);
174                let right_schema = self.extract_schema(&right_plan);
175
176                let on_pairs = self.parse_join_condition(on, &left_schema, &right_schema)?;
177
178                Ok(PhysicalPlan::HashJoin {
179                    left: Box::new(left_plan),
180                    right: Box::new(right_plan),
181                    on: on_pairs,
182                    join_type: *join_type,
183                })
184            }
185            LogicalPlan::Insert {
186                table,
187                columns,
188                values,
189            } => {
190                // Convert logical expressions to physical expressions
191                let physical_values: Vec<Vec<PhysicalExpr>> = values
192                    .iter()
193                    .map(|row| {
194                        row.iter()
195                            .map(|expr| match expr {
196                                Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
197                                Expr::Column(_name) => {
198                                    // For INSERT, columns are literal values, not references
199                                    Err(DbxError::SqlNotSupported {
200                                        feature: "Column references in INSERT VALUES".to_string(),
201                                        hint: "Use literal values only".to_string(),
202                                    })
203                                }
204                                _ => Err(DbxError::SqlNotSupported {
205                                    feature: format!("Expression in INSERT VALUES: {:?}", expr),
206                                    hint: "Use literal values only".to_string(),
207                                }),
208                            })
209                            .collect::<DbxResult<Vec<_>>>()
210                    })
211                    .collect::<DbxResult<Vec<_>>>()?;
212
213                Ok(PhysicalPlan::Insert {
214                    table: table.clone(),
215                    columns: columns.clone(),
216                    values: physical_values,
217                })
218            }
219            LogicalPlan::Update {
220                table,
221                assignments,
222                filter,
223            } => {
224                // Convert assignments to physical expressions
225                let physical_assignments: Vec<(String, PhysicalExpr)> = assignments
226                    .iter()
227                    .map(|(col, expr)| {
228                        let physical_expr = match expr {
229                            Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
230                            _ => Err(DbxError::NotImplemented(
231                                "Non-literal UPDATE values not yet supported".to_string(),
232                            )),
233                        }?;
234                        Ok((col.clone(), physical_expr))
235                    })
236                    .collect::<DbxResult<Vec<_>>>()?;
237
238                // Convert filter using full expression planner (same as SELECT)
239                let physical_filter = if let Some(f) = filter.as_ref() {
240                    let schemas = self.table_schemas.read().unwrap();
241                    let column_names: Vec<String> = schemas
242                        .get(table)
243                        .map(|schema| {
244                            schema
245                                .fields()
246                                .iter()
247                                .map(|field| field.name().clone())
248                                .collect()
249                        })
250                        .unwrap_or_default();
251                    drop(schemas);
252                    Some(self.plan_physical_expr(f, &column_names)?)
253                } else {
254                    None
255                };
256
257                Ok(PhysicalPlan::Update {
258                    table: table.clone(),
259                    assignments: physical_assignments,
260                    filter: physical_filter,
261                })
262            }
263            LogicalPlan::Delete { table, filter } => {
264                // Convert filter using full expression planner (same as SELECT)
265                let physical_filter = if let Some(f) = filter.as_ref() {
266                    let schemas = self.table_schemas.read().unwrap();
267                    let column_names: Vec<String> = schemas
268                        .get(table)
269                        .map(|schema| {
270                            schema
271                                .fields()
272                                .iter()
273                                .map(|field| field.name().clone())
274                                .collect()
275                        })
276                        .unwrap_or_default();
277                    drop(schemas);
278                    Some(self.plan_physical_expr(f, &column_names)?)
279                } else {
280                    None
281                };
282
283                Ok(PhysicalPlan::Delete {
284                    table: table.clone(),
285                    filter: physical_filter,
286                })
287            }
288            LogicalPlan::DropTable { table, if_exists } => Ok(PhysicalPlan::DropTable {
289                table: table.clone(),
290                if_exists: *if_exists,
291            }),
292            LogicalPlan::CreateTable {
293                table,
294                columns,
295                if_not_exists,
296                policy,
297            } => Ok(PhysicalPlan::CreateTable {
298                table: table.clone(),
299                columns: columns.clone(),
300                if_not_exists: *if_not_exists,
301                policy: policy.clone(),
302            }),
303            LogicalPlan::CreateIndex {
304                table,
305                index_name,
306                columns,
307                if_not_exists,
308            } => Ok(PhysicalPlan::CreateIndex {
309                table: table.clone(),
310                index_name: index_name.clone(),
311                columns: columns.clone(),
312                if_not_exists: *if_not_exists,
313            }),
314            LogicalPlan::DropIndex {
315                table,
316                index_name,
317                if_exists,
318            } => Ok(PhysicalPlan::DropIndex {
319                table: table.clone(),
320                index_name: index_name.clone(),
321                if_exists: *if_exists,
322            }),
323            LogicalPlan::AlterTable { table, operation } => Ok(PhysicalPlan::AlterTable {
324                table: table.clone(),
325                operation: operation.clone(),
326            }),
327            LogicalPlan::CreateFunction {
328                name,
329                params,
330                return_type,
331                language,
332                body,
333            } => Ok(PhysicalPlan::CreateFunction {
334                name: name.clone(),
335                params: params.clone(),
336                return_type: return_type.clone(),
337                language: language.clone(),
338                body: body.clone(),
339            }),
340            LogicalPlan::CreateTrigger {
341                name,
342                timing,
343                event,
344                table,
345                for_each,
346                function,
347            } => Ok(PhysicalPlan::CreateTrigger {
348                name: name.clone(),
349                timing: *timing,
350                event: *event,
351                table: table.clone(),
352                for_each: *for_each,
353                function: function.clone(),
354            }),
355            LogicalPlan::CreateJob {
356                name,
357                schedule,
358                function,
359            } => Ok(PhysicalPlan::CreateJob {
360                name: name.clone(),
361                schedule: schedule.clone(),
362                function: function.clone(),
363            }),
364            LogicalPlan::DropFunction { name, if_exists } => Ok(PhysicalPlan::DropFunction {
365                name: name.clone(),
366                if_exists: *if_exists,
367            }),
368            LogicalPlan::DropTrigger { name, if_exists } => Ok(PhysicalPlan::DropTrigger {
369                name: name.clone(),
370                if_exists: *if_exists,
371            }),
372            LogicalPlan::DropJob { name, if_exists } => Ok(PhysicalPlan::DropJob {
373                name: name.clone(),
374                if_exists: *if_exists,
375            }),
376        }
377    }
378
379    fn plan_physical_expr(&self, expr: &Expr, schema: &[String]) -> DbxResult<PhysicalExpr> {
380        match expr {
381            Expr::Column(name) => {
382                if let Some(idx) = schema
383                    .iter()
384                    .position(|s| s.to_lowercase() == name.to_lowercase())
385                {
386                    Ok(PhysicalExpr::Column(idx))
387                } else {
388                    Err(DbxError::Schema(format!(
389                        "Column '{}' not found in schema: {:?}",
390                        name, schema
391                    )))
392                }
393            }
394            Expr::Literal(scalar) => Ok(PhysicalExpr::Literal(scalar.clone())),
395            Expr::BinaryOp { left, op, right } => Ok(PhysicalExpr::BinaryOp {
396                left: Box::new(self.plan_physical_expr(left, schema)?),
397                op: *op,
398                right: Box::new(self.plan_physical_expr(right, schema)?),
399            }),
400            Expr::IsNull(expr) => Ok(PhysicalExpr::IsNull(Box::new(
401                self.plan_physical_expr(expr, schema)?,
402            ))),
403            Expr::IsNotNull(expr) => Ok(PhysicalExpr::IsNotNull(Box::new(
404                self.plan_physical_expr(expr, schema)?,
405            ))),
406            Expr::ScalarFunc { func, args } => {
407                let physical_args = args
408                    .iter()
409                    .map(|arg| self.plan_physical_expr(arg, schema))
410                    .collect::<DbxResult<Vec<_>>>()?;
411                Ok(PhysicalExpr::ScalarFunc {
412                    func: *func,
413                    args: physical_args,
414                })
415            }
416            _ => Err(DbxError::NotImplemented(format!(
417                "Physical expression not supported: {:?}",
418                expr
419            ))),
420        }
421    }
422
423    /// Extract schema field names from PhysicalPlan.
424    fn extract_schema(&self, plan: &PhysicalPlan) -> Vec<String> {
425        match plan {
426            PhysicalPlan::TableScan { table, .. } => {
427                // Return actual table column names from stored schemas
428                let schemas = self.table_schemas.read().unwrap();
429                // Try case-insensitive lookup
430                let schema = schemas.get(table).or_else(|| {
431                    let table_lower = table.to_lowercase();
432                    schemas
433                        .iter()
434                        .find(|(k, _)| k.to_lowercase() == table_lower)
435                        .map(|(_, v)| v)
436                });
437
438                if let Some(schema) = schema {
439                    schema.fields().iter().map(|f| f.name().clone()).collect()
440                } else {
441                    vec![]
442                }
443            }
444            PhysicalPlan::Projection { exprs, aliases, .. } => exprs
445                .iter()
446                .enumerate()
447                .map(|(i, _)| {
448                    if let Some(alias) = aliases.get(i) {
449                        alias.clone().unwrap_or_else(|| format!("col_{}", i))
450                    } else {
451                        format!("col_{}", i)
452                    }
453                })
454                .collect(),
455            PhysicalPlan::HashAggregate {
456                input,
457                group_by,
458                aggregates,
459                ..
460            } => {
461                let input_schema = self.extract_schema(input);
462                let mut fields = Vec::new();
463                for &idx in group_by {
464                    fields.push(
465                        input_schema
466                            .get(idx)
467                            .cloned()
468                            .unwrap_or_else(|| format!("col_{}", idx)),
469                    );
470                }
471                for agg in aggregates {
472                    fields.push(
473                        agg.alias
474                            .clone()
475                            .unwrap_or_else(|| format!("agg_{:?}", agg.function)),
476                    );
477                }
478                fields
479            }
480            PhysicalPlan::SortMerge { input, .. } => self.extract_schema(input),
481            PhysicalPlan::Limit { input, .. } => self.extract_schema(input),
482            PhysicalPlan::HashJoin { left, right, .. } => {
483                let mut fields = self.extract_schema(left);
484                fields.extend(self.extract_schema(right));
485                fields
486            }
487            PhysicalPlan::Insert { columns, .. } => columns.clone(),
488            PhysicalPlan::Update { .. } => vec![],
489            PhysicalPlan::Delete { .. } => vec![],
490            PhysicalPlan::DropTable { .. } => vec![],
491            PhysicalPlan::CreateTable { .. } => vec![],
492            PhysicalPlan::CreateIndex { .. } => vec![],
493            PhysicalPlan::DropIndex { .. } => vec![],
494            PhysicalPlan::AlterTable { .. } => vec![],
495            PhysicalPlan::CreateFunction { .. } => vec![],
496            PhysicalPlan::CreateTrigger { .. } => vec![],
497            PhysicalPlan::CreateJob { .. } => vec![],
498            PhysicalPlan::DropFunction { .. } => vec![],
499            PhysicalPlan::DropTrigger { .. } => vec![],
500            PhysicalPlan::DropJob { .. } => vec![],
501            PhysicalPlan::GridExchange { .. } => vec![], // 플레이스홀더
502            PhysicalPlan::ShuffleWriter { .. } => vec![], // 테이블 리스트는 불필요
503        }
504    }
505
506    /// Parse JOIN ON condition to extract (left_col_idx, right_col_idx) pairs.
507    /// Supports: col1 = col2 AND col3 = col4 ...
508    fn parse_join_condition(
509        &self,
510        on: &Expr,
511        left_schema: &[String],
512        right_schema: &[String],
513    ) -> DbxResult<Vec<(usize, usize)>> {
514        let mut pairs = Vec::new();
515        self.extract_join_pairs(on, left_schema, right_schema, &mut pairs)?;
516
517        if pairs.is_empty() {
518            // Fallback: use (0, 1) if no pairs extracted
519            pairs.push((0, 1));
520        }
521
522        Ok(pairs)
523    }
524
525    /// Recursively extract join column pairs from ON expression.
526    fn extract_join_pairs(
527        &self,
528        expr: &Expr,
529        left_schema: &[String],
530        right_schema: &[String],
531        pairs: &mut Vec<(usize, usize)>,
532    ) -> DbxResult<()> {
533        match expr {
534            Expr::BinaryOp { left, op, right } => {
535                match op {
536                    BinaryOperator::Eq => {
537                        // Extract column names from left = right
538                        let left_col = self.extract_column_name(left)?;
539                        let right_col = self.extract_column_name(right)?;
540
541                        // Resolve column indices
542                        // Try to find in left schema first, then right
543                        let left_idx =
544                            self.resolve_column_index(&left_col, left_schema, right_schema, true)?;
545                        let right_idx = self.resolve_column_index(
546                            &right_col,
547                            left_schema,
548                            right_schema,
549                            false,
550                        )?;
551
552                        pairs.push((left_idx, right_idx));
553                    }
554                    BinaryOperator::And => {
555                        // Recursively process AND conditions
556                        self.extract_join_pairs(left, left_schema, right_schema, pairs)?;
557                        self.extract_join_pairs(right, left_schema, right_schema, pairs)?;
558                    }
559                    _ => {
560                        return Err(DbxError::NotImplemented(format!(
561                            "JOIN condition operator not supported: {:?}",
562                            op
563                        )));
564                    }
565                }
566            }
567            _ => {
568                return Err(DbxError::NotImplemented(format!(
569                    "JOIN condition expression not supported: {:?}",
570                    expr
571                )));
572            }
573        }
574        Ok(())
575    }
576
577    /// Extract column name from expression (handles table.column format).
578    fn extract_column_name(&self, expr: &Expr) -> DbxResult<String> {
579        match expr {
580            Expr::Column(name) => {
581                // Handle "table.column" or just "column"
582                if let Some(dot_pos) = name.rfind('.') {
583                    Ok(name[dot_pos + 1..].to_string())
584                } else {
585                    Ok(name.clone())
586                }
587            }
588            _ => Err(DbxError::NotImplemented(format!(
589                "Expected column reference, got: {:?}",
590                expr
591            ))),
592        }
593    }
594
595    /// Resolve column name to index in schema.
596    fn resolve_column_index(
597        &self,
598        col_name: &str,
599        left_schema: &[String],
600        right_schema: &[String],
601        prefer_left: bool,
602    ) -> DbxResult<usize> {
603        // Try preferred schema first
604        if prefer_left {
605            if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
606                return Ok(idx);
607            }
608            if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
609                return Ok(idx);
610            }
611        } else {
612            if let Some(idx) = right_schema.iter().position(|f| f == col_name) {
613                return Ok(idx);
614            }
615            if let Some(idx) = left_schema.iter().position(|f| f == col_name) {
616                return Ok(idx);
617            }
618        }
619
620        // Fallback: use hardcoded indices based on column name
621        // This is a temporary workaround until we have proper schema binding
622        match col_name {
623            "id" => Ok(0),
624            "user_id" => Ok(1),
625            "name" => Ok(1),
626            _ => Ok(0),
627        }
628    }
629}
630
631impl Default for PhysicalPlanner {
632    fn default() -> Self {
633        Self::new(Arc::new(RwLock::new(HashMap::new())))
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use crate::sql::{LogicalPlanner, SqlParser};
641
642    #[test]
643    fn test_physical_plan_simple_select() {
644        let parser = SqlParser::new();
645        let statements = parser.parse("SELECT * FROM users").unwrap();
646
647        // Step 2: Logical Plan
648        let planner = LogicalPlanner::new();
649        let logical_plan = planner.plan(&statements[0]).unwrap();
650        println!("🔍 execute_sql: Logical Plan created: {:?}", logical_plan);
651
652        // Inject schema for 'users'
653        let table_schemas = Arc::new(RwLock::new(HashMap::new()));
654        let schema = Arc::new(Schema::new(vec![
655            arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int32, false),
656            arrow::datatypes::Field::new("name", arrow::datatypes::DataType::Utf8, false),
657        ]));
658        table_schemas
659            .write()
660            .unwrap()
661            .insert("users".to_string(), schema);
662
663        let physical_planner = PhysicalPlanner::new(table_schemas);
664        let physical_plan = physical_planner.plan(&logical_plan).unwrap();
665
666        match physical_plan {
667            PhysicalPlan::Projection { input, .. } => match input.as_ref() {
668                PhysicalPlan::TableScan { table, .. } => {
669                    assert_eq!(table, "users");
670                }
671                _ => panic!("Expected TableScan inside Projection"),
672            },
673            PhysicalPlan::TableScan { table, .. } => {
674                assert_eq!(table, "users");
675            }
676            _ => panic!("Expected Projection or TableScan"),
677        }
678    }
679
680    #[test]
681    fn test_physical_plan_analytical_detection() {
682        // 1. Simple TableScan (not analytical)
683        let plan1 = PhysicalPlan::TableScan {
684            table: "users".to_string(),
685            projection: vec![0, 1],
686            filter: None,
687            ros_files: vec![],
688        };
689        assert!(!plan1.is_analytical());
690        assert_eq!(plan1.tables(), vec!["users"]);
691
692        // 2. TableScan with Filter (analytical)
693        let plan2 = PhysicalPlan::TableScan {
694            table: "users".to_string(),
695            projection: vec![0, 1],
696            filter: Some(PhysicalExpr::Column(0)),
697            ros_files: vec![],
698        };
699        assert!(plan2.is_analytical());
700
701        // 3. HashJoin (analytical)
702        let plan3 = PhysicalPlan::HashJoin {
703            left: Box::new(PhysicalPlan::TableScan {
704                table: "users".to_string(),
705                projection: vec![0],
706                filter: None,
707                ros_files: vec![],
708            }),
709            right: Box::new(PhysicalPlan::TableScan {
710                table: "orders".to_string(),
711                projection: vec![0],
712                filter: None,
713                ros_files: vec![],
714            }),
715            on: vec![(0, 0)],
716            join_type: JoinType::Inner,
717        };
718        assert!(plan3.is_analytical());
719        let tables = plan3.tables();
720        assert!(tables.contains(&"users".to_string()));
721        assert!(tables.contains(&"orders".to_string()));
722    }
723}