Skip to main content

aegis_query/
executor.rs

1//! Aegis Query Executor
2//!
3//! Executes query plans using a Volcano-style iterator model.
4//! Implements vectorized execution for improved performance.
5//!
6//! Key Features:
7//! - Pull-based iterator execution
8//! - Vectorized batch processing
9//! - Memory-efficient result streaming
10//! - Support for parallel execution
11//!
12//! @version 0.1.0
13//! @author AutomataNexus Development Team
14
15use crate::planner::{
16    AggregateExpr, AggregateFunction, AggregateNode, AlterTableNode, CreateIndexNode,
17    CreateTableConstraint, CreateTableNode, DeleteNode, DropIndexNode, DropTableNode, FilterNode,
18    InsertNode, InsertPlanSource, JoinNode, JoinStrategy, LimitNode, PlanAlterOperation,
19    PlanBinaryOp, PlanExpression, PlanJoinType, PlanLiteral, PlanNode, PlanUnaryOp, ProjectNode,
20    ProjectionExpr, QueryPlan, ScanNode, SortKey, SortNode, UpdateNode,
21};
22use crate::index::{TableIndexManager, IndexType, IndexKey, IndexError};
23use aegis_common::{DataType, Row, Value};
24use std::collections::HashMap;
25use std::sync::{Arc, RwLock};
26use thiserror::Error;
27
28// =============================================================================
29// Error Types
30// =============================================================================
31
32#[derive(Debug, Error)]
33pub enum ExecutorError {
34    #[error("Table not found: {0}")]
35    TableNotFound(String),
36
37    #[error("Column not found: {0}")]
38    ColumnNotFound(String),
39
40    #[error("Type mismatch: expected {expected}, got {actual}")]
41    TypeMismatch { expected: String, actual: String },
42
43    #[error("Division by zero")]
44    DivisionByZero,
45
46    #[error("Invalid operation: {0}")]
47    InvalidOperation(String),
48
49    #[error("Execution error: {0}")]
50    Internal(String),
51
52    #[error("Index error: {0}")]
53    IndexError(String),
54}
55
56impl From<IndexError> for ExecutorError {
57    fn from(e: IndexError) -> Self {
58        ExecutorError::IndexError(e.to_string())
59    }
60}
61
62pub type ExecutorResult<T> = Result<T, ExecutorError>;
63
64// =============================================================================
65// Execution Context
66// =============================================================================
67
68/// Context for query execution.
69pub struct ExecutionContext {
70    tables: HashMap<String, Arc<RwLock<TableData>>>,
71    table_schemas: HashMap<String, TableSchema>,
72    /// Index metadata (for persistence)
73    indexes: HashMap<String, Vec<IndexSchema>>,
74    /// Actual index data structures
75    table_indexes: HashMap<String, Arc<TableIndexManager>>,
76    batch_size: usize,
77}
78
79/// In-memory table data for execution.
80#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81pub struct TableData {
82    pub name: String,
83    pub columns: Vec<String>,
84    pub rows: Vec<Row>,
85}
86
87/// Stored table constraint.
88#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
89pub struct StoredConstraint {
90    pub name: String,
91    pub constraint_type: StoredConstraintType,
92}
93
94/// Stored constraint type.
95#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
96pub enum StoredConstraintType {
97    PrimaryKey { columns: Vec<String> },
98    Unique { columns: Vec<String> },
99    ForeignKey {
100        columns: Vec<String>,
101        ref_table: String,
102        ref_columns: Vec<String>,
103    },
104    Check { expression_text: String },
105}
106
107/// Table schema information.
108#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
109pub struct TableSchema {
110    pub name: String,
111    pub columns: Vec<ColumnSchema>,
112    pub primary_key: Option<Vec<String>>,
113    pub constraints: Vec<StoredConstraint>,
114}
115
116/// Column schema information.
117#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
118pub struct ColumnSchema {
119    pub name: String,
120    pub data_type: DataType,
121    pub nullable: bool,
122    pub default: Option<Value>,
123}
124
125/// Index schema information.
126#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
127pub struct IndexSchema {
128    pub name: String,
129    pub table: String,
130    pub columns: Vec<String>,
131    pub unique: bool,
132}
133
134/// Serializable snapshot of the execution context for persistence.
135#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
136pub struct ExecutionContextSnapshot {
137    pub tables: Vec<TableData>,
138    pub schemas: Vec<TableSchema>,
139    pub indexes: HashMap<String, Vec<IndexSchema>>,
140}
141
142impl ExecutionContext {
143    pub fn new() -> Self {
144        Self {
145            tables: HashMap::new(),
146            table_schemas: HashMap::new(),
147            indexes: HashMap::new(),
148            table_indexes: HashMap::new(),
149            batch_size: 1024,
150        }
151    }
152
153    pub fn with_batch_size(mut self, size: usize) -> Self {
154        self.batch_size = size;
155        self
156    }
157
158    pub fn add_table(&mut self, table: TableData) {
159        self.tables.insert(table.name.clone(), Arc::new(RwLock::new(table)));
160    }
161
162    pub fn get_table(&self, name: &str) -> Option<Arc<RwLock<TableData>>> {
163        self.tables.get(name).cloned()
164    }
165
166    pub fn batch_size(&self) -> usize {
167        self.batch_size
168    }
169
170    // ==========================================================================
171    // DDL Operations
172    // ==========================================================================
173
174    /// Create a new table.
175    pub fn create_table(
176        &mut self,
177        name: String,
178        columns: Vec<ColumnSchema>,
179        primary_key: Option<Vec<String>>,
180        constraints: Vec<StoredConstraint>,
181        if_not_exists: bool,
182    ) -> ExecutorResult<()> {
183        if self.tables.contains_key(&name) {
184            if if_not_exists {
185                return Ok(());
186            }
187            return Err(ExecutorError::InvalidOperation(format!(
188                "Table '{}' already exists",
189                name
190            )));
191        }
192
193        let column_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
194
195        // Create table schema
196        let schema = TableSchema {
197            name: name.clone(),
198            columns,
199            primary_key,
200            constraints,
201        };
202        self.table_schemas.insert(name.clone(), schema);
203
204        // Create empty table data
205        let table_data = TableData {
206            name: name.clone(),
207            columns: column_names,
208            rows: Vec::new(),
209        };
210        self.tables.insert(name, Arc::new(RwLock::new(table_data)));
211
212        Ok(())
213    }
214
215    /// Drop a table.
216    pub fn drop_table(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
217        if !self.tables.contains_key(name) {
218            if if_exists {
219                return Ok(());
220            }
221            return Err(ExecutorError::TableNotFound(name.to_string()));
222        }
223
224        self.tables.remove(name);
225        self.table_schemas.remove(name);
226        self.indexes.remove(name);
227
228        Ok(())
229    }
230
231    /// Alter a table.
232    pub fn alter_table(&mut self, name: &str, op: &PlanAlterOperation) -> ExecutorResult<()> {
233        let schema = self.table_schemas.get_mut(name)
234            .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
235
236        match op {
237            PlanAlterOperation::AddColumn(col) => {
238                // Check if column already exists
239                if schema.columns.iter().any(|c| c.name == col.name) {
240                    return Err(ExecutorError::InvalidOperation(format!(
241                        "Column '{}' already exists in table '{}'",
242                        col.name, name
243                    )));
244                }
245
246                // Evaluate default expression if provided
247                let default = col.default.as_ref()
248                    .map(evaluate_default_expression)
249                    .transpose()?;
250
251                schema.columns.push(ColumnSchema {
252                    name: col.name.clone(),
253                    data_type: col.data_type.clone(),
254                    nullable: col.nullable,
255                    default: default.clone(),
256                });
257
258                // Add default (or null) values to existing rows
259                let fill_value = default.unwrap_or(Value::Null);
260                if let Some(table_data) = self.tables.get(name) {
261                    let mut table = table_data.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
262                    for row in table.rows.iter_mut() {
263                        row.values.push(fill_value.clone());
264                    }
265                }
266            }
267            PlanAlterOperation::DropColumn { name: col_name, if_exists } => {
268                let col_idx = schema.columns.iter().position(|c| c.name == *col_name);
269                match col_idx {
270                    Some(idx) => {
271                        schema.columns.remove(idx);
272                        // Remove values from existing rows
273                        if let Some(table_data) = self.tables.get(name) {
274                            let mut table = table_data.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
275                            for row in table.rows.iter_mut() {
276                                if idx < row.values.len() {
277                                    row.values.remove(idx);
278                                }
279                            }
280                        }
281                    }
282                    None if *if_exists => {}
283                    None => {
284                        return Err(ExecutorError::ColumnNotFound(col_name.clone()));
285                    }
286                }
287            }
288            PlanAlterOperation::RenameColumn { old_name, new_name } => {
289                let col = schema.columns.iter_mut().find(|c| c.name == *old_name)
290                    .ok_or_else(|| ExecutorError::ColumnNotFound(old_name.clone()))?;
291                col.name = new_name.clone();
292            }
293            PlanAlterOperation::AlterColumn { name: col_name, data_type, set_not_null, set_default } => {
294                let col = schema.columns.iter_mut().find(|c| c.name == *col_name)
295                    .ok_or_else(|| ExecutorError::ColumnNotFound(col_name.clone()))?;
296
297                if let Some(dt) = data_type {
298                    col.data_type = dt.clone();
299                }
300                if let Some(not_null) = set_not_null {
301                    col.nullable = !not_null;
302                }
303                // Handle set_default: None = no change, Some(None) = drop default, Some(Some(expr)) = set new default
304                if let Some(new_default) = set_default {
305                    col.default = new_default.as_ref()
306                        .map(evaluate_default_expression)
307                        .transpose()?;
308                }
309            }
310            PlanAlterOperation::RenameTable { new_name } => {
311                // Move data to new table name
312                if let Some(rows) = self.tables.remove(name) {
313                    self.tables.insert(new_name.clone(), rows);
314                }
315                if let Some(mut schema) = self.table_schemas.remove(name) {
316                    schema.name = new_name.clone();
317                    self.table_schemas.insert(new_name.clone(), schema);
318                }
319                if let Some(indexes) = self.indexes.remove(name) {
320                    self.indexes.insert(new_name.clone(), indexes);
321                }
322                // Return early since name is now invalid
323                return Ok(());
324            }
325            PlanAlterOperation::AddConstraint(constraint) => {
326                // Pre-validate foreign key reference before getting mutable borrow
327                if let CreateTableConstraint::ForeignKey { ref_table, .. } = constraint {
328                    if !self.table_schemas.contains_key(ref_table) {
329                        return Err(ExecutorError::TableNotFound(ref_table.clone()));
330                    }
331                }
332
333                // Now safe to get mutable reference for the rest of the operation
334                let schema = self.table_schemas.get_mut(name)
335                    .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
336
337                let constraint_count = schema.constraints.len() + 1;
338                let (name_suffix, constraint_type) = match constraint {
339                    CreateTableConstraint::PrimaryKey { columns } => {
340                        // Check if primary key already exists
341                        if schema.primary_key.is_some() {
342                            return Err(ExecutorError::InvalidOperation(
343                                format!("Table '{}' already has a primary key", name)
344                            ));
345                        }
346                        schema.primary_key = Some(columns.clone());
347                        ("pk", StoredConstraintType::PrimaryKey { columns: columns.clone() })
348                    }
349                    CreateTableConstraint::Unique { columns } => {
350                        // Validate columns exist
351                        for col in columns {
352                            if !schema.columns.iter().any(|c| c.name == *col) {
353                                return Err(ExecutorError::ColumnNotFound(col.clone()));
354                            }
355                        }
356                        ("uq", StoredConstraintType::Unique { columns: columns.clone() })
357                    }
358                    CreateTableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
359                        // Validate columns exist
360                        for col in columns {
361                            if !schema.columns.iter().any(|c| c.name == *col) {
362                                return Err(ExecutorError::ColumnNotFound(col.clone()));
363                            }
364                        }
365                        // ref_table validated above
366                        ("fk", StoredConstraintType::ForeignKey {
367                            columns: columns.clone(),
368                            ref_table: ref_table.clone(),
369                            ref_columns: ref_columns.clone(),
370                        })
371                    }
372                    CreateTableConstraint::Check { expression } => {
373                        ("ck", StoredConstraintType::Check {
374                            expression_text: format!("{:?}", expression),
375                        })
376                    }
377                };
378                schema.constraints.push(StoredConstraint {
379                    name: format!("{}_{}{}", name, name_suffix, constraint_count),
380                    constraint_type,
381                });
382                return Ok(());
383            }
384            PlanAlterOperation::DropConstraint { name: constraint_name } => {
385                let pos = schema.constraints.iter().position(|c| c.name == *constraint_name);
386                match pos {
387                    Some(idx) => {
388                        let removed = schema.constraints.remove(idx);
389                        // If dropping primary key constraint, also clear primary_key field
390                        if matches!(removed.constraint_type, StoredConstraintType::PrimaryKey { .. }) {
391                            schema.primary_key = None;
392                        }
393                    }
394                    None => {
395                        return Err(ExecutorError::InvalidOperation(
396                            format!("Constraint '{}' not found on table '{}'", constraint_name, name)
397                        ));
398                    }
399                }
400            }
401        }
402
403        Ok(())
404    }
405
406    /// Create an index.
407    pub fn create_index(
408        &mut self,
409        name: String,
410        table: String,
411        columns: Vec<String>,
412        unique: bool,
413        if_not_exists: bool,
414    ) -> ExecutorResult<()> {
415        let table_data = self.tables.get(&table)
416            .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?
417            .clone();
418
419        let indexes = self.indexes.entry(table.clone()).or_default();
420
421        // Check if index already exists
422        if indexes.iter().any(|idx| idx.name == name) {
423            if if_not_exists {
424                return Ok(());
425            }
426            return Err(ExecutorError::InvalidOperation(format!(
427                "Index '{}' already exists",
428                name
429            )));
430        }
431
432        // Store metadata
433        indexes.push(IndexSchema {
434            name: name.clone(),
435            table: table.clone(),
436            columns: columns.clone(),
437            unique,
438        });
439
440        // Create actual index structure
441        let index_manager = self.table_indexes.entry(table.clone())
442            .or_insert_with(|| Arc::new(TableIndexManager::new(table.clone())));
443
444        // Use B-tree by default (supports range queries)
445        Arc::get_mut(index_manager)
446            .ok_or_else(|| ExecutorError::Internal("Cannot modify shared index manager".to_string()))?
447            .create_index(name.clone(), columns.clone(), unique, IndexType::BTree)?;
448
449        // Populate the index with existing data
450        let table_guard = table_data.read()
451            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
452
453        let index_manager = self.table_indexes.get(&table)
454            .expect("index_manager was just inserted for this table");
455        for (row_id, row) in table_guard.rows.iter().enumerate() {
456            let column_values: HashMap<String, Value> = table_guard.columns.iter()
457                .zip(row.values.iter())
458                .map(|(col, val)| (col.clone(), val.clone()))
459                .collect();
460
461            // Build index key from specified columns
462            let key_values: Vec<crate::index::IndexValue> = columns.iter()
463                .map(|col| {
464                    column_values.get(col)
465                        .map(IndexKey::from_value)
466                        .unwrap_or(crate::index::IndexValue::Null)
467                })
468                .collect();
469            let key = IndexKey::new(key_values);
470
471            // Insert into the index using public API
472            index_manager.insert_into_index(&name, key, row_id)?;
473        }
474
475        Ok(())
476    }
477
478    /// Drop an index.
479    pub fn drop_index(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
480        let mut found = false;
481        let mut table_name = None;
482
483        // Find and remove from metadata
484        for (tbl, indexes) in self.indexes.iter_mut() {
485            if let Some(pos) = indexes.iter().position(|idx| idx.name == name) {
486                indexes.remove(pos);
487                table_name = Some(tbl.clone());
488                found = true;
489                break;
490            }
491        }
492
493        // Remove from actual index structure
494        if let Some(tbl) = table_name {
495            if let Some(manager) = self.table_indexes.get_mut(&tbl) {
496                if let Some(m) = Arc::get_mut(manager) {
497                    let _ = m.drop_index(name);
498                }
499            }
500        }
501
502        if !found && !if_exists {
503            return Err(ExecutorError::InvalidOperation(format!(
504                "Index '{}' not found",
505                name
506            )));
507        }
508
509        Ok(())
510    }
511
512    /// Get the index manager for a table.
513    pub fn get_index_manager(&self, table: &str) -> Option<Arc<TableIndexManager>> {
514        self.table_indexes.get(table).cloned()
515    }
516
517    /// Get index metadata for a table.
518    pub fn get_indexes(&self, table: &str) -> Option<&Vec<IndexSchema>> {
519        self.indexes.get(table)
520    }
521
522    /// Get table schema.
523    pub fn get_table_schema(&self, name: &str) -> Option<&TableSchema> {
524        self.table_schemas.get(name)
525    }
526
527    /// List all table names.
528    pub fn list_tables(&self) -> Vec<String> {
529        self.tables.keys().cloned().collect()
530    }
531
532    // ==========================================================================
533    // Persistence Operations
534    // ==========================================================================
535
536    /// Create a serializable snapshot of the execution context.
537    pub fn to_snapshot(&self) -> ExecutionContextSnapshot {
538        let tables: Vec<TableData> = self.tables.values()
539            .filter_map(|t| t.read().ok().map(|t| t.clone()))
540            .collect();
541
542        ExecutionContextSnapshot {
543            tables,
544            schemas: self.table_schemas.values().cloned().collect(),
545            indexes: self.indexes.clone(),
546        }
547    }
548
549    /// Restore execution context from a snapshot.
550    pub fn from_snapshot(snapshot: ExecutionContextSnapshot) -> Self {
551        let mut ctx = Self::new();
552
553        // Restore schemas first
554        for schema in snapshot.schemas {
555            ctx.table_schemas.insert(schema.name.clone(), schema);
556        }
557
558        // Restore tables
559        for table in snapshot.tables {
560            ctx.tables.insert(table.name.clone(), Arc::new(RwLock::new(table)));
561        }
562
563        // Store index metadata
564        ctx.indexes = snapshot.indexes.clone();
565
566        // Rebuild actual index structures from metadata
567        for (table_name, index_schemas) in snapshot.indexes {
568            for index_schema in index_schemas {
569                // Rebuild each index
570                let _ = ctx.rebuild_index(
571                    &index_schema.name,
572                    &table_name,
573                    &index_schema.columns,
574                    index_schema.unique,
575                );
576            }
577        }
578
579        ctx
580    }
581
582    /// Rebuild an index from existing table data.
583    fn rebuild_index(
584        &mut self,
585        name: &str,
586        table: &str,
587        columns: &[String],
588        unique: bool,
589    ) -> ExecutorResult<()> {
590        let table_data = self.tables.get(table)
591            .ok_or_else(|| ExecutorError::TableNotFound(table.to_string()))?
592            .clone();
593
594        // Create index manager if needed
595        let index_manager = self.table_indexes.entry(table.to_string())
596            .or_insert_with(|| Arc::new(TableIndexManager::new(table.to_string())));
597
598        // Create the index structure
599        if let Some(m) = Arc::get_mut(index_manager) {
600            m.create_index(name.to_string(), columns.to_vec(), unique, IndexType::BTree)?;
601        } else {
602            return Err(ExecutorError::Internal("Cannot modify shared index manager".to_string()));
603        }
604
605        // Populate with existing data
606        let table_guard = table_data.read()
607            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
608
609        let index_manager = self.table_indexes.get(table)
610            .expect("index_manager was just inserted for this table");
611        for (row_id, row) in table_guard.rows.iter().enumerate() {
612            let column_values: HashMap<String, Value> = table_guard.columns.iter()
613                .zip(row.values.iter())
614                .map(|(col, val)| (col.clone(), val.clone()))
615                .collect();
616
617            let key_values: Vec<crate::index::IndexValue> = columns.iter()
618                .map(|col| {
619                    column_values.get(col)
620                        .map(IndexKey::from_value)
621                        .unwrap_or(crate::index::IndexValue::Null)
622                })
623                .collect();
624            let key = IndexKey::new(key_values);
625
626            index_manager.insert_into_index(name, key, row_id)?;
627        }
628
629        Ok(())
630    }
631
632    /// Save the execution context to a file.
633    pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
634        let snapshot = self.to_snapshot();
635        let json = serde_json::to_string_pretty(&snapshot)
636            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
637        std::fs::write(path, json)
638    }
639
640    /// Load the execution context from a file.
641    pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
642        let json = std::fs::read_to_string(path)?;
643        let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
644            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
645        Ok(Self::from_snapshot(snapshot))
646    }
647
648    /// Merge data from a file into the current context (for loading on startup).
649    pub fn merge_from_file(&mut self, path: &std::path::Path) -> std::io::Result<()> {
650        let json = std::fs::read_to_string(path)?;
651        let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
652            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
653
654        // Merge schemas
655        for schema in snapshot.schemas {
656            self.table_schemas.insert(schema.name.clone(), schema);
657        }
658
659        // Merge tables
660        for table in snapshot.tables {
661            self.tables.insert(table.name.clone(), Arc::new(RwLock::new(table)));
662        }
663
664        // Merge indexes
665        for (table, idxs) in snapshot.indexes {
666            self.indexes.insert(table, idxs);
667        }
668
669        Ok(())
670    }
671
672    // ==========================================================================
673    // DML Operations
674    // ==========================================================================
675
676    /// Insert rows into a table.
677    pub fn insert_rows(
678        &self,
679        table_name: &str,
680        columns: &[String],
681        rows: Vec<Vec<Value>>,
682    ) -> ExecutorResult<u64> {
683        let table = self.get_table(table_name)
684            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
685
686        let mut table_data = table.write()
687            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
688
689        // Get schema to access column defaults
690        let schema = self.table_schemas.get(table_name)
691            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
692
693        // If no columns specified, use all columns in order
694        let target_columns: Vec<String> = if columns.is_empty() {
695            table_data.columns.clone()
696        } else {
697            columns.to_vec()
698        };
699
700        // Build column index mapping
701        let mut column_indices: Vec<usize> = Vec::new();
702        for col in &target_columns {
703            let idx = table_data.columns.iter().position(|c| c == col)
704                .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
705            column_indices.push(idx);
706        }
707
708        let mut inserted = 0u64;
709
710        // Build default values for each column
711        let defaults: Vec<Value> = schema.columns.iter()
712            .map(|col| col.default.clone().unwrap_or(Value::Null))
713            .collect();
714
715        for row_values in rows {
716            // Create a new row initialized with defaults (or Null if no default)
717            let mut new_row: Vec<Value> = defaults.clone();
718
719            // Fill in the provided values
720            for (i, &col_idx) in column_indices.iter().enumerate() {
721                if let Some(value) = row_values.get(i) {
722                    new_row[col_idx] = value.clone();
723                }
724            }
725
726            table_data.rows.push(Row { values: new_row });
727            inserted += 1;
728        }
729
730        Ok(inserted)
731    }
732
733    /// Update rows in a table.
734    pub fn update_rows(
735        &self,
736        table_name: &str,
737        assignments: &[(String, Value)],
738        predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
739    ) -> ExecutorResult<u64> {
740        let table = self.get_table(table_name)
741            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
742
743        let mut table_data = table.write()
744            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
745        let columns = table_data.columns.clone();
746
747        // Build assignment index mapping
748        let mut assignment_indices: Vec<(usize, Value)> = Vec::new();
749        for (col, val) in assignments {
750            let idx = columns.iter().position(|c| c == col)
751                .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
752            assignment_indices.push((idx, val.clone()));
753        }
754
755        let mut updated = 0u64;
756
757        for row in &mut table_data.rows {
758            let should_update = predicate.map(|p| p(row, &columns)).unwrap_or(true);
759            if should_update {
760                for (col_idx, value) in &assignment_indices {
761                    row.values[*col_idx] = value.clone();
762                }
763                updated += 1;
764            }
765        }
766
767        Ok(updated)
768    }
769
770    /// Delete rows from a table.
771    pub fn delete_rows(
772        &self,
773        table_name: &str,
774        predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
775    ) -> ExecutorResult<u64> {
776        let table = self.get_table(table_name)
777            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
778
779        let mut table_data = table.write()
780            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
781        let columns = table_data.columns.clone();
782
783        let original_len = table_data.rows.len();
784
785        if let Some(pred) = predicate {
786            table_data.rows.retain(|row| !pred(row, &columns));
787        } else {
788            table_data.rows.clear();
789        }
790
791        Ok((original_len - table_data.rows.len()) as u64)
792    }
793}
794
795impl Default for ExecutionContext {
796    fn default() -> Self {
797        Self::new()
798    }
799}
800
801// =============================================================================
802// Result Batch
803// =============================================================================
804
805/// A batch of result rows.
806#[derive(Debug, Clone)]
807pub struct ResultBatch {
808    pub columns: Vec<String>,
809    pub rows: Vec<Row>,
810}
811
812impl ResultBatch {
813    pub fn new(columns: Vec<String>) -> Self {
814        Self {
815            columns,
816            rows: Vec::new(),
817        }
818    }
819
820    pub fn with_rows(columns: Vec<String>, rows: Vec<Row>) -> Self {
821        Self { columns, rows }
822    }
823
824    pub fn is_empty(&self) -> bool {
825        self.rows.is_empty()
826    }
827
828    pub fn len(&self) -> usize {
829        self.rows.len()
830    }
831}
832
833// =============================================================================
834// Query Result
835// =============================================================================
836
837/// Complete query result.
838#[derive(Debug, Clone)]
839pub struct QueryResult {
840    pub columns: Vec<String>,
841    pub rows: Vec<Row>,
842    pub rows_affected: u64,
843}
844
845impl QueryResult {
846    pub fn new(columns: Vec<String>, rows: Vec<Row>) -> Self {
847        let rows_affected = rows.len() as u64;
848        Self {
849            columns,
850            rows,
851            rows_affected,
852        }
853    }
854
855    pub fn empty() -> Self {
856        Self {
857            columns: Vec::new(),
858            rows: Vec::new(),
859            rows_affected: 0,
860        }
861    }
862}
863
864// =============================================================================
865// Executor
866// =============================================================================
867
868/// Query executor.
869pub struct Executor {
870    context: Arc<RwLock<ExecutionContext>>,
871}
872
873impl Executor {
874    pub fn new(context: ExecutionContext) -> Self {
875        Self { context: Arc::new(RwLock::new(context)) }
876    }
877
878    /// Create executor with a shared context (for persistent DDL across queries).
879    pub fn with_shared_context(context: Arc<RwLock<ExecutionContext>>) -> Self {
880        Self { context }
881    }
882
883    /// Execute a query plan.
884    pub fn execute(&self, plan: &QueryPlan) -> ExecutorResult<QueryResult> {
885        self.execute_internal(&plan.root)
886    }
887
888    /// Execute a query plan with bound parameters.
889    /// Parameters are indexed starting from 1 (e.g., $1, $2, etc.)
890    pub fn execute_with_params(&self, plan: &QueryPlan, params: &[Value]) -> ExecutorResult<QueryResult> {
891        // Substitute placeholders with actual parameter values
892        let bound_plan = substitute_parameters(&plan.root, params)?;
893        self.execute_internal(&bound_plan)
894    }
895
896    fn execute_internal(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
897        match root {
898            // DDL operations
899            PlanNode::CreateTable(node) => self.execute_create_table(node),
900            PlanNode::DropTable(node) => self.execute_drop_table(node),
901            PlanNode::AlterTable(node) => self.execute_alter_table(node),
902            PlanNode::CreateIndex(node) => self.execute_create_index(node),
903            PlanNode::DropIndex(node) => self.execute_drop_index(node),
904
905            // DML operations
906            PlanNode::Insert(node) => self.execute_insert(node),
907            PlanNode::Update(node) => self.execute_update(node),
908            PlanNode::Delete(node) => self.execute_delete(node),
909
910            // Query operations (SELECT)
911            _ => self.execute_query(root),
912        }
913    }
914
915    /// Execute a SELECT query.
916    fn execute_query(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
917        let context = self.context.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
918        let mut operator = self.create_operator(root, &context)?;
919        let mut all_rows = Vec::new();
920        let mut columns = Vec::new();
921
922        while let Some(batch) = operator.next_batch()? {
923            if columns.is_empty() {
924                columns = batch.columns.clone();
925            }
926            all_rows.extend(batch.rows);
927        }
928
929        Ok(QueryResult::new(columns, all_rows))
930    }
931
932    /// Execute CREATE TABLE.
933    fn execute_create_table(&self, node: &CreateTableNode) -> ExecutorResult<QueryResult> {
934        let columns: Vec<ColumnSchema> = node.columns.iter().map(|col| {
935            let default = col.default.as_ref()
936                .map(evaluate_default_expression)
937                .transpose()?;
938            Ok(ColumnSchema {
939                name: col.name.clone(),
940                data_type: col.data_type.clone(),
941                nullable: col.nullable,
942                default,
943            })
944        }).collect::<ExecutorResult<Vec<_>>>()?;
945
946        // Extract primary key from constraints
947        let primary_key = node.constraints.iter()
948            .find_map(|c| {
949                if let CreateTableConstraint::PrimaryKey { columns } = c {
950                    Some(columns.clone())
951                } else {
952                    None
953                }
954            })
955            .or_else(|| {
956                // Check column-level primary key
957                let pk_cols: Vec<String> = node.columns.iter()
958                    .filter(|c| c.primary_key)
959                    .map(|c| c.name.clone())
960                    .collect();
961                if pk_cols.is_empty() { None } else { Some(pk_cols) }
962            });
963
964        // Convert plan constraints to stored constraints with auto-generated names
965        let mut stored_constraints = Vec::new();
966        let mut constraint_counter = 0;
967        for c in &node.constraints {
968            constraint_counter += 1;
969            let (name_suffix, constraint_type) = match c {
970                CreateTableConstraint::PrimaryKey { columns } => {
971                    ("pk", StoredConstraintType::PrimaryKey { columns: columns.clone() })
972                }
973                CreateTableConstraint::Unique { columns } => {
974                    ("uq", StoredConstraintType::Unique { columns: columns.clone() })
975                }
976                CreateTableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
977                    ("fk", StoredConstraintType::ForeignKey {
978                        columns: columns.clone(),
979                        ref_table: ref_table.clone(),
980                        ref_columns: ref_columns.clone(),
981                    })
982                }
983                CreateTableConstraint::Check { expression } => {
984                    ("ck", StoredConstraintType::Check {
985                        expression_text: format!("{:?}", expression),
986                    })
987                }
988            };
989            stored_constraints.push(StoredConstraint {
990                name: format!("{}_{}{}", node.table_name, name_suffix, constraint_counter),
991                constraint_type,
992            });
993        }
994
995        // Add column-level unique constraints
996        for col in &node.columns {
997            if col.unique {
998                constraint_counter += 1;
999                stored_constraints.push(StoredConstraint {
1000                    name: format!("{}_{}_uq{}", node.table_name, col.name, constraint_counter),
1001                    constraint_type: StoredConstraintType::Unique { columns: vec![col.name.clone()] },
1002                });
1003            }
1004        }
1005
1006        self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.create_table(
1007            node.table_name.clone(),
1008            columns,
1009            primary_key,
1010            stored_constraints,
1011            node.if_not_exists,
1012        )?;
1013
1014        Ok(QueryResult {
1015            columns: vec!["result".to_string()],
1016            rows: vec![Row { values: vec![Value::String(format!("Table '{}' created", node.table_name))] }],
1017            rows_affected: 0,
1018        })
1019    }
1020
1021    /// Execute DROP TABLE.
1022    fn execute_drop_table(&self, node: &DropTableNode) -> ExecutorResult<QueryResult> {
1023        self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.drop_table(&node.table_name, node.if_exists)?;
1024
1025        Ok(QueryResult {
1026            columns: vec!["result".to_string()],
1027            rows: vec![Row { values: vec![Value::String(format!("Table '{}' dropped", node.table_name))] }],
1028            rows_affected: 0,
1029        })
1030    }
1031
1032    /// Execute ALTER TABLE.
1033    fn execute_alter_table(&self, node: &AlterTableNode) -> ExecutorResult<QueryResult> {
1034        let mut ctx = self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1035
1036        for op in &node.operations {
1037            ctx.alter_table(&node.table_name, op)?;
1038        }
1039
1040        let op_count = node.operations.len();
1041        let msg = if op_count == 1 {
1042            format!("Table '{}' altered", node.table_name)
1043        } else {
1044            format!("Table '{}' altered ({} operations)", node.table_name, op_count)
1045        };
1046
1047        Ok(QueryResult {
1048            columns: vec!["result".to_string()],
1049            rows: vec![Row { values: vec![Value::String(msg)] }],
1050            rows_affected: 0,
1051        })
1052    }
1053
1054    /// Execute CREATE INDEX.
1055    fn execute_create_index(&self, node: &CreateIndexNode) -> ExecutorResult<QueryResult> {
1056        self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.create_index(
1057            node.index_name.clone(),
1058            node.table_name.clone(),
1059            node.columns.clone(),
1060            node.unique,
1061            node.if_not_exists,
1062        )?;
1063
1064        Ok(QueryResult {
1065            columns: vec!["result".to_string()],
1066            rows: vec![Row { values: vec![Value::String(format!("Index '{}' created", node.index_name))] }],
1067            rows_affected: 0,
1068        })
1069    }
1070
1071    /// Execute DROP INDEX.
1072    fn execute_drop_index(&self, node: &DropIndexNode) -> ExecutorResult<QueryResult> {
1073        self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.drop_index(&node.index_name, node.if_exists)?;
1074
1075        Ok(QueryResult {
1076            columns: vec!["result".to_string()],
1077            rows: vec![Row { values: vec![Value::String(format!("Index '{}' dropped", node.index_name))] }],
1078            rows_affected: 0,
1079        })
1080    }
1081
1082    /// Execute INSERT.
1083    fn execute_insert(&self, node: &InsertNode) -> ExecutorResult<QueryResult> {
1084        let rows: Vec<Vec<Value>> = match &node.source {
1085            InsertPlanSource::Values(values) => {
1086                // Evaluate all value expressions
1087                let empty_row = Row { values: vec![] };
1088                let empty_columns: Vec<String> = vec![];
1089
1090                values.iter()
1091                    .map(|row_exprs| {
1092                        row_exprs.iter()
1093                            .map(|expr| evaluate_expression(expr, &empty_row, &empty_columns))
1094                            .collect::<ExecutorResult<Vec<_>>>()
1095                    })
1096                    .collect::<ExecutorResult<Vec<_>>>()?
1097            }
1098            InsertPlanSource::Query(subquery) => {
1099                // Execute the subquery and use its results as the insert data
1100                let context = self.context.read()
1101                    .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1102                let mut operator = self.create_operator(subquery, &context)?;
1103                let mut all_rows = Vec::new();
1104
1105                while let Some(batch) = operator.next_batch()? {
1106                    for row in batch.rows {
1107                        all_rows.push(row.values);
1108                    }
1109                }
1110
1111                all_rows
1112            }
1113        };
1114
1115        let inserted = self.context.read()
1116            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1117            .insert_rows(&node.table_name, &node.columns, rows)?;
1118
1119        Ok(QueryResult {
1120            columns: vec!["rows_affected".to_string()],
1121            rows: vec![Row { values: vec![Value::Integer(inserted as i64)] }],
1122            rows_affected: inserted,
1123        })
1124    }
1125
1126    /// Execute UPDATE.
1127    fn execute_update(&self, node: &UpdateNode) -> ExecutorResult<QueryResult> {
1128        let context = self.context.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1129
1130        // Get table columns for expression evaluation
1131        let table = context.get_table(&node.table_name)
1132            .ok_or_else(|| ExecutorError::TableNotFound(node.table_name.clone()))?;
1133        let table_data = table.read()
1134            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1135        let _columns = table_data.columns.clone();
1136        drop(table_data);
1137
1138        // Evaluate assignment values
1139        let empty_row = Row { values: vec![] };
1140        let assignments: Vec<(String, Value)> = node.assignments.iter()
1141            .map(|(col, expr)| {
1142                let value = evaluate_expression(expr, &empty_row, &[])?;
1143                Ok((col.clone(), value))
1144            })
1145            .collect::<ExecutorResult<Vec<_>>>()?;
1146
1147        // Create predicate closure if where clause exists
1148        let where_clause = node.where_clause.clone();
1149        let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1150            Box::new(move |row: &Row, cols: &[String]| {
1151                evaluate_expression(&wc, row, cols)
1152                    .map(|v| matches!(v, Value::Boolean(true)))
1153                    .unwrap_or(false)
1154            }) as Box<dyn Fn(&Row, &[String]) -> bool>
1155        });
1156
1157        let updated = context.update_rows(
1158            &node.table_name,
1159            &assignments,
1160            predicate.as_ref().map(|p| p.as_ref()),
1161        )?;
1162
1163        Ok(QueryResult {
1164            columns: vec!["rows_affected".to_string()],
1165            rows: vec![Row { values: vec![Value::Integer(updated as i64)] }],
1166            rows_affected: updated,
1167        })
1168    }
1169
1170    /// Execute DELETE.
1171    fn execute_delete(&self, node: &DeleteNode) -> ExecutorResult<QueryResult> {
1172        let context = self.context.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1173
1174        // Create predicate closure if where clause exists
1175        let where_clause = node.where_clause.clone();
1176        let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1177            Box::new(move |row: &Row, cols: &[String]| {
1178                evaluate_expression(&wc, row, cols)
1179                    .map(|v| matches!(v, Value::Boolean(true)))
1180                    .unwrap_or(false)
1181            }) as Box<dyn Fn(&Row, &[String]) -> bool>
1182        });
1183
1184        let deleted = context.delete_rows(
1185            &node.table_name,
1186            predicate.as_ref().map(|p| p.as_ref()),
1187        )?;
1188
1189        Ok(QueryResult {
1190            columns: vec!["rows_affected".to_string()],
1191            rows: vec![Row { values: vec![Value::Integer(deleted as i64)] }],
1192            rows_affected: deleted,
1193        })
1194    }
1195
1196    /// Create an operator tree from a plan node.
1197    fn create_operator<'a>(&'a self, node: &PlanNode, context: &'a ExecutionContext) -> ExecutorResult<Box<dyn Operator + 'a>> {
1198        match node {
1199            PlanNode::Scan(scan) => Ok(Box::new(ScanOperator::new(scan.clone(), context)?)),
1200
1201            PlanNode::Filter(filter) => {
1202                let input = self.create_operator(&filter.input, context)?;
1203                Ok(Box::new(FilterOperator::new(
1204                    input,
1205                    filter.predicate.clone(),
1206                )))
1207            }
1208
1209            PlanNode::Project(project) => {
1210                let input = self.create_operator(&project.input, context)?;
1211                Ok(Box::new(ProjectOperator::new(
1212                    input,
1213                    project.expressions.clone(),
1214                )))
1215            }
1216
1217            PlanNode::Join(join) => {
1218                let left = self.create_operator(&join.left, context)?;
1219                let right = self.create_operator(&join.right, context)?;
1220                Ok(Box::new(JoinOperator::new(
1221                    left,
1222                    right,
1223                    join.join_type,
1224                    join.condition.clone(),
1225                    join.strategy,
1226                )?))
1227            }
1228
1229            PlanNode::Aggregate(agg) => {
1230                let input = self.create_operator(&agg.input, context)?;
1231                Ok(Box::new(AggregateOperator::new(
1232                    input,
1233                    agg.group_by.clone(),
1234                    agg.aggregates.clone(),
1235                )))
1236            }
1237
1238            PlanNode::Sort(sort) => {
1239                let input = self.create_operator(&sort.input, context)?;
1240                Ok(Box::new(SortOperator::new(input, sort.order_by.clone())))
1241            }
1242
1243            PlanNode::Limit(limit) => {
1244                let input = self.create_operator(&limit.input, context)?;
1245                Ok(Box::new(LimitOperator::new(input, limit.limit, limit.offset)))
1246            }
1247
1248            PlanNode::Empty => Ok(Box::new(EmptyOperator::new())),
1249
1250            // DDL and DML nodes are handled in execute(), not here
1251            PlanNode::CreateTable(_) | PlanNode::DropTable(_) | PlanNode::AlterTable(_) |
1252            PlanNode::CreateIndex(_) | PlanNode::DropIndex(_) |
1253            PlanNode::Insert(_) | PlanNode::Update(_) | PlanNode::Delete(_) => {
1254                Err(ExecutorError::Internal("DDL/DML nodes should not be in operator tree".to_string()))
1255            }
1256        }
1257    }
1258}
1259
1260// =============================================================================
1261// Operator Trait
1262// =============================================================================
1263
1264/// Operator in the execution pipeline.
1265pub trait Operator {
1266    /// Get the next batch of results.
1267    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>>;
1268
1269    /// Get output column names.
1270    fn columns(&self) -> &[String];
1271}
1272
1273// =============================================================================
1274// Scan Operator
1275// =============================================================================
1276
1277struct ScanOperator {
1278    table: Arc<RwLock<TableData>>,
1279    columns: Vec<String>,
1280    position: usize,
1281    batch_size: usize,
1282    // Cache the rows to avoid repeated locking
1283    cached_rows: Option<Vec<Row>>,
1284}
1285
1286impl ScanOperator {
1287    fn new(scan: ScanNode, context: &ExecutionContext) -> ExecutorResult<Self> {
1288        let table = context
1289            .get_table(&scan.table_name)
1290            .ok_or_else(|| ExecutorError::TableNotFound(scan.table_name.clone()))?;
1291
1292        // Read columns under lock, then release lock before moving table into struct
1293        let columns = {
1294            let table_data = table.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1295            if scan.columns.is_empty() {
1296                table_data.columns.clone()
1297            } else {
1298                scan.columns.clone()
1299            }
1300        }; // table_data guard dropped here
1301
1302        Ok(Self {
1303            table,
1304            columns,
1305            position: 0,
1306            batch_size: context.batch_size(),
1307            cached_rows: None,
1308        })
1309    }
1310}
1311
1312impl Operator for ScanOperator {
1313    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1314        // Cache rows on first access
1315        if self.cached_rows.is_none() {
1316            let table_data = self.table.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1317            self.cached_rows = Some(table_data.rows.clone());
1318        }
1319
1320        // Safe to use expect: we just set cached_rows to Some above if it was None
1321        let rows = self.cached_rows.as_ref().expect("cached_rows was just set to Some");
1322
1323        if self.position >= rows.len() {
1324            return Ok(None);
1325        }
1326
1327        let end = (self.position + self.batch_size).min(rows.len());
1328        let batch_rows: Vec<Row> = rows[self.position..end].to_vec();
1329        self.position = end;
1330
1331        Ok(Some(ResultBatch::with_rows(self.columns.clone(), batch_rows)))
1332    }
1333
1334    fn columns(&self) -> &[String] {
1335        &self.columns
1336    }
1337}
1338
1339// =============================================================================
1340// Filter Operator
1341// =============================================================================
1342
1343struct FilterOperator<'a> {
1344    input: Box<dyn Operator + 'a>,
1345    predicate: PlanExpression,
1346    columns: Vec<String>,
1347}
1348
1349impl<'a> FilterOperator<'a> {
1350    fn new(input: Box<dyn Operator + 'a>, predicate: PlanExpression) -> Self {
1351        let columns = input.columns().to_vec();
1352        Self {
1353            input,
1354            predicate,
1355            columns,
1356        }
1357    }
1358
1359    fn evaluate_predicate(&self, row: &Row, columns: &[String]) -> ExecutorResult<bool> {
1360        let value = evaluate_expression(&self.predicate, row, columns)?;
1361        match value {
1362            Value::Boolean(b) => Ok(b),
1363            Value::Null => Ok(false),
1364            _ => Err(ExecutorError::TypeMismatch {
1365                expected: "boolean".to_string(),
1366                actual: format!("{:?}", value),
1367            }),
1368        }
1369    }
1370}
1371
1372impl<'a> Operator for FilterOperator<'a> {
1373    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1374        while let Some(batch) = self.input.next_batch()? {
1375            let filtered: Vec<Row> = batch
1376                .rows
1377                .into_iter()
1378                .filter(|row| self.evaluate_predicate(row, &batch.columns).unwrap_or(false))
1379                .collect();
1380
1381            if !filtered.is_empty() {
1382                return Ok(Some(ResultBatch::with_rows(self.columns.clone(), filtered)));
1383            }
1384        }
1385        Ok(None)
1386    }
1387
1388    fn columns(&self) -> &[String] {
1389        &self.columns
1390    }
1391}
1392
1393// =============================================================================
1394// Project Operator
1395// =============================================================================
1396
1397struct ProjectOperator<'a> {
1398    input: Box<dyn Operator + 'a>,
1399    expressions: Vec<crate::planner::ProjectionExpr>,
1400    columns: Vec<String>,
1401    input_columns: Vec<String>,
1402}
1403
1404impl<'a> ProjectOperator<'a> {
1405    fn new(input: Box<dyn Operator + 'a>, expressions: Vec<crate::planner::ProjectionExpr>) -> Self {
1406        let input_columns = input.columns().to_vec();
1407
1408        // Expand wildcards in expressions and build column names
1409        let mut expanded_expressions = Vec::new();
1410        let mut columns = Vec::new();
1411
1412        for (i, proj_expr) in expressions.iter().enumerate() {
1413            // Check if this is a wildcard expression (SELECT *)
1414            if let PlanExpression::Column { name, table, .. } = &proj_expr.expr {
1415                if name == "*" {
1416                    // Expand to all input columns (optionally filtered by table)
1417                    for input_col in &input_columns {
1418                        // For table-qualified wildcards (table.*), filter by table prefix
1419                        // TODO: When table prefixes are added to column names, filter here
1420                        let _ = table; // Acknowledge unused variable for future implementation
1421                        expanded_expressions.push(crate::planner::ProjectionExpr {
1422                            expr: PlanExpression::Column {
1423                                table: None,
1424                                name: input_col.clone(),
1425                                data_type: DataType::Any,
1426                            },
1427                            alias: None,
1428                        });
1429                        columns.push(input_col.clone());
1430                    }
1431                    continue;
1432                }
1433            }
1434
1435            // Regular expression
1436            expanded_expressions.push(proj_expr.clone());
1437            let col_name = proj_expr.alias.clone().unwrap_or_else(|| {
1438                extract_column_name(&proj_expr.expr)
1439                    .unwrap_or_else(|| format!("column_{}", i))
1440            });
1441            columns.push(col_name);
1442        }
1443
1444        Self {
1445            input,
1446            expressions: expanded_expressions,
1447            columns,
1448            input_columns,
1449        }
1450    }
1451}
1452
1453/// Extract a meaningful column name from a PlanExpression.
1454fn extract_column_name(expr: &PlanExpression) -> Option<String> {
1455    match expr {
1456        PlanExpression::Column { name, .. } => Some(name.clone()),
1457        PlanExpression::Function { name, .. } => Some(name.clone()),
1458        PlanExpression::Cast { expr, .. } => extract_column_name(expr),
1459        PlanExpression::UnaryOp { expr, .. } => extract_column_name(expr),
1460        _ => None,
1461    }
1462}
1463
1464impl<'a> Operator for ProjectOperator<'a> {
1465    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1466        if let Some(batch) = self.input.next_batch()? {
1467            let mut result_rows = Vec::with_capacity(batch.rows.len());
1468
1469            for row in &batch.rows {
1470                let mut projected_values = Vec::with_capacity(self.expressions.len());
1471
1472                for expr in &self.expressions {
1473                    let value = evaluate_expression(&expr.expr, row, &self.input_columns)?;
1474                    projected_values.push(value);
1475                }
1476
1477                result_rows.push(Row {
1478                    values: projected_values,
1479                });
1480            }
1481
1482            Ok(Some(ResultBatch::with_rows(
1483                self.columns.clone(),
1484                result_rows,
1485            )))
1486        } else {
1487            Ok(None)
1488        }
1489    }
1490
1491    fn columns(&self) -> &[String] {
1492        &self.columns
1493    }
1494}
1495
1496// =============================================================================
1497// Join Operator
1498// =============================================================================
1499
1500#[allow(dead_code)]
1501struct JoinOperator<'a> {
1502    left: Box<dyn Operator + 'a>,
1503    right: Box<dyn Operator + 'a>,
1504    join_type: PlanJoinType,
1505    condition: Option<PlanExpression>,
1506    _strategy: JoinStrategy,
1507    columns: Vec<String>,
1508    left_columns: Vec<String>,
1509    right_columns: Vec<String>,
1510    right_data: Option<Vec<Row>>,
1511    left_batch: Option<ResultBatch>,
1512    left_row_idx: usize,
1513    right_row_idx: usize,
1514}
1515
1516impl<'a> JoinOperator<'a> {
1517    fn new(
1518        left: Box<dyn Operator + 'a>,
1519        right: Box<dyn Operator + 'a>,
1520        join_type: PlanJoinType,
1521        condition: Option<PlanExpression>,
1522        strategy: JoinStrategy,
1523    ) -> ExecutorResult<Self> {
1524        let left_columns = left.columns().to_vec();
1525        let right_columns = right.columns().to_vec();
1526
1527        let mut columns = left_columns.clone();
1528        columns.extend(right_columns.clone());
1529
1530        Ok(Self {
1531            left,
1532            right,
1533            join_type,
1534            condition,
1535            _strategy: strategy,
1536            columns,
1537            left_columns,
1538            right_columns,
1539            right_data: None,
1540            left_batch: None,
1541            left_row_idx: 0,
1542            right_row_idx: 0,
1543        })
1544    }
1545
1546    fn materialize_right(&mut self) -> ExecutorResult<()> {
1547        if self.right_data.is_some() {
1548            return Ok(());
1549        }
1550
1551        let mut all_rows = Vec::new();
1552        while let Some(batch) = self.right.next_batch()? {
1553            all_rows.extend(batch.rows);
1554        }
1555        self.right_data = Some(all_rows);
1556        Ok(())
1557    }
1558
1559    fn evaluate_join_condition(&self, left: &Row, right: &Row) -> ExecutorResult<bool> {
1560        match &self.condition {
1561            None => Ok(true),
1562            Some(expr) => {
1563                let mut combined = left.values.clone();
1564                combined.extend(right.values.clone());
1565                let combined_row = Row { values: combined };
1566
1567                let value = evaluate_expression(expr, &combined_row, &self.columns)?;
1568                match value {
1569                    Value::Boolean(b) => Ok(b),
1570                    Value::Null => Ok(false),
1571                    _ => Ok(false),
1572                }
1573            }
1574        }
1575    }
1576}
1577
1578impl<'a> Operator for JoinOperator<'a> {
1579    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1580        self.materialize_right()?;
1581        // Safe to use expect: materialize_right() sets right_data to Some
1582        let right_data = self.right_data.as_ref().expect("right_data was set by materialize_right");
1583
1584        let mut result_rows = Vec::new();
1585
1586        loop {
1587            if self.left_batch.is_none() {
1588                self.left_batch = self.left.next_batch()?;
1589                self.left_row_idx = 0;
1590                self.right_row_idx = 0;
1591
1592                if self.left_batch.is_none() {
1593                    break;
1594                }
1595            }
1596
1597            // Safe to use expect: we checked left_batch.is_some() above
1598            let left_batch = self.left_batch.as_ref().expect("left_batch verified to be Some");
1599
1600            while self.left_row_idx < left_batch.rows.len() {
1601                let left_row = &left_batch.rows[self.left_row_idx];
1602
1603                while self.right_row_idx < right_data.len() {
1604                    let right_row = &right_data[self.right_row_idx];
1605                    self.right_row_idx += 1;
1606
1607                    if self.evaluate_join_condition(left_row, right_row)? {
1608                        let mut combined = left_row.values.clone();
1609                        combined.extend(right_row.values.clone());
1610                        result_rows.push(Row { values: combined });
1611
1612                        if result_rows.len() >= 1024 {
1613                            return Ok(Some(ResultBatch::with_rows(
1614                                self.columns.clone(),
1615                                result_rows,
1616                            )));
1617                        }
1618                    }
1619                }
1620
1621                self.left_row_idx += 1;
1622                self.right_row_idx = 0;
1623            }
1624
1625            self.left_batch = None;
1626        }
1627
1628        if result_rows.is_empty() {
1629            Ok(None)
1630        } else {
1631            Ok(Some(ResultBatch::with_rows(
1632                self.columns.clone(),
1633                result_rows,
1634            )))
1635        }
1636    }
1637
1638    fn columns(&self) -> &[String] {
1639        &self.columns
1640    }
1641}
1642
1643// =============================================================================
1644// Aggregate Operator
1645// =============================================================================
1646
1647struct AggregateOperator<'a> {
1648    input: Box<dyn Operator + 'a>,
1649    _group_by: Vec<PlanExpression>,
1650    aggregates: Vec<crate::planner::AggregateExpr>,
1651    columns: Vec<String>,
1652    input_columns: Vec<String>,
1653    done: bool,
1654}
1655
1656impl<'a> AggregateOperator<'a> {
1657    fn new(
1658        input: Box<dyn Operator + 'a>,
1659        group_by: Vec<PlanExpression>,
1660        aggregates: Vec<crate::planner::AggregateExpr>,
1661    ) -> Self {
1662        let input_columns = input.columns().to_vec();
1663
1664        let columns: Vec<String> = aggregates
1665            .iter()
1666            .enumerate()
1667            .map(|(i, agg)| {
1668                agg.alias
1669                    .clone()
1670                    .unwrap_or_else(|| format!("agg_{}", i))
1671            })
1672            .collect();
1673
1674        Self {
1675            input,
1676            _group_by: group_by,
1677            aggregates,
1678            columns,
1679            input_columns,
1680            done: false,
1681        }
1682    }
1683}
1684
1685impl<'a> Operator for AggregateOperator<'a> {
1686    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1687        if self.done {
1688            return Ok(None);
1689        }
1690
1691        let mut accumulators: Vec<Accumulator> = self
1692            .aggregates
1693            .iter()
1694            .map(|agg| Accumulator::new(agg.function))
1695            .collect();
1696
1697        while let Some(batch) = self.input.next_batch()? {
1698            for row in &batch.rows {
1699                for (i, agg) in self.aggregates.iter().enumerate() {
1700                    let value = if let Some(ref arg) = agg.argument {
1701                        evaluate_expression(arg, row, &self.input_columns)?
1702                    } else {
1703                        Value::Integer(1)
1704                    };
1705                    accumulators[i].accumulate(&value)?;
1706                }
1707            }
1708        }
1709
1710        let result_values: Vec<Value> = accumulators.iter().map(|acc| acc.finalize()).collect();
1711
1712        self.done = true;
1713
1714        Ok(Some(ResultBatch::with_rows(
1715            self.columns.clone(),
1716            vec![Row {
1717                values: result_values,
1718            }],
1719        )))
1720    }
1721
1722    fn columns(&self) -> &[String] {
1723        &self.columns
1724    }
1725}
1726
1727/// Accumulator for aggregate functions.
1728struct Accumulator {
1729    function: AggregateFunction,
1730    count: i64,
1731    sum: f64,
1732    min: Option<Value>,
1733    max: Option<Value>,
1734}
1735
1736impl Accumulator {
1737    fn new(function: AggregateFunction) -> Self {
1738        Self {
1739            function,
1740            count: 0,
1741            sum: 0.0,
1742            min: None,
1743            max: None,
1744        }
1745    }
1746
1747    fn accumulate(&mut self, value: &Value) -> ExecutorResult<()> {
1748        if matches!(value, Value::Null) {
1749            return Ok(());
1750        }
1751
1752        self.count += 1;
1753
1754        match self.function {
1755            AggregateFunction::Count => {}
1756            AggregateFunction::Sum | AggregateFunction::Avg => {
1757                self.sum += value_to_f64(value)?;
1758            }
1759            AggregateFunction::Min => {
1760                // Safe to use expect: we check is_none() first, so in the || branch it's Some
1761                if self.min.is_none() || compare_values(value, self.min.as_ref().expect("min verified to be Some in || branch"))? == std::cmp::Ordering::Less {
1762                    self.min = Some(value.clone());
1763                }
1764            }
1765            AggregateFunction::Max => {
1766                // Safe to use expect: we check is_none() first, so in the || branch it's Some
1767                if self.max.is_none() || compare_values(value, self.max.as_ref().expect("max verified to be Some in || branch"))? == std::cmp::Ordering::Greater {
1768                    self.max = Some(value.clone());
1769                }
1770            }
1771        }
1772
1773        Ok(())
1774    }
1775
1776    fn finalize(&self) -> Value {
1777        match self.function {
1778            AggregateFunction::Count => Value::Integer(self.count),
1779            AggregateFunction::Sum => Value::Float(self.sum),
1780            AggregateFunction::Avg => {
1781                if self.count == 0 {
1782                    Value::Null
1783                } else {
1784                    Value::Float(self.sum / self.count as f64)
1785                }
1786            }
1787            AggregateFunction::Min => self.min.clone().unwrap_or(Value::Null),
1788            AggregateFunction::Max => self.max.clone().unwrap_or(Value::Null),
1789        }
1790    }
1791}
1792
1793// =============================================================================
1794// Sort Operator
1795// =============================================================================
1796
1797struct SortOperator<'a> {
1798    input: Box<dyn Operator + 'a>,
1799    order_by: Vec<crate::planner::SortKey>,
1800    columns: Vec<String>,
1801    sorted_data: Option<Vec<Row>>,
1802    position: usize,
1803}
1804
1805impl<'a> SortOperator<'a> {
1806    fn new(input: Box<dyn Operator + 'a>, order_by: Vec<crate::planner::SortKey>) -> Self {
1807        let columns = input.columns().to_vec();
1808        Self {
1809            input,
1810            order_by,
1811            columns,
1812            sorted_data: None,
1813            position: 0,
1814        }
1815    }
1816}
1817
1818impl<'a> Operator for SortOperator<'a> {
1819    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1820        if self.sorted_data.is_none() {
1821            let mut all_rows = Vec::new();
1822            while let Some(batch) = self.input.next_batch()? {
1823                all_rows.extend(batch.rows);
1824            }
1825
1826            let columns = self.columns.clone();
1827            let order_by = self.order_by.clone();
1828
1829            all_rows.sort_by(|a, b| {
1830                for key in &order_by {
1831                    let a_val = evaluate_expression(&key.expr, a, &columns).unwrap_or(Value::Null);
1832                    let b_val = evaluate_expression(&key.expr, b, &columns).unwrap_or(Value::Null);
1833
1834                    let cmp = compare_values(&a_val, &b_val).unwrap_or(std::cmp::Ordering::Equal);
1835
1836                    if cmp != std::cmp::Ordering::Equal {
1837                        return if key.ascending {
1838                            cmp
1839                        } else {
1840                            cmp.reverse()
1841                        };
1842                    }
1843                }
1844                std::cmp::Ordering::Equal
1845            });
1846
1847            self.sorted_data = Some(all_rows);
1848        }
1849
1850        // Safe to use expect: sorted_data was just set to Some above if it was None
1851        let data = self.sorted_data.as_ref().expect("sorted_data was just set to Some");
1852
1853        if self.position >= data.len() {
1854            return Ok(None);
1855        }
1856
1857        let end = (self.position + 1024).min(data.len());
1858        let rows = data[self.position..end].to_vec();
1859        self.position = end;
1860
1861        Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)))
1862    }
1863
1864    fn columns(&self) -> &[String] {
1865        &self.columns
1866    }
1867}
1868
1869// =============================================================================
1870// Limit Operator
1871// =============================================================================
1872
1873struct LimitOperator<'a> {
1874    input: Box<dyn Operator + 'a>,
1875    limit: Option<u64>,
1876    offset: Option<u64>,
1877    columns: Vec<String>,
1878    rows_skipped: u64,
1879    rows_returned: u64,
1880}
1881
1882impl<'a> LimitOperator<'a> {
1883    fn new(input: Box<dyn Operator + 'a>, limit: Option<u64>, offset: Option<u64>) -> Self {
1884        let columns = input.columns().to_vec();
1885        Self {
1886            input,
1887            limit,
1888            offset,
1889            columns,
1890            rows_skipped: 0,
1891            rows_returned: 0,
1892        }
1893    }
1894}
1895
1896impl<'a> Operator for LimitOperator<'a> {
1897    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1898        if let Some(limit) = self.limit {
1899            if self.rows_returned >= limit {
1900                return Ok(None);
1901            }
1902        }
1903
1904        while let Some(batch) = self.input.next_batch()? {
1905            let mut rows = batch.rows;
1906
1907            let offset = self.offset.unwrap_or(0);
1908            if self.rows_skipped < offset {
1909                let skip = (offset - self.rows_skipped) as usize;
1910                if skip >= rows.len() {
1911                    self.rows_skipped += rows.len() as u64;
1912                    continue;
1913                }
1914                rows = rows[skip..].to_vec();
1915                self.rows_skipped = offset;
1916            }
1917
1918            if let Some(limit) = self.limit {
1919                let remaining = limit - self.rows_returned;
1920                if rows.len() as u64 > remaining {
1921                    rows.truncate(remaining as usize);
1922                }
1923            }
1924
1925            if rows.is_empty() {
1926                continue;
1927            }
1928
1929            self.rows_returned += rows.len() as u64;
1930
1931            return Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)));
1932        }
1933
1934        Ok(None)
1935    }
1936
1937    fn columns(&self) -> &[String] {
1938        &self.columns
1939    }
1940}
1941
1942// =============================================================================
1943// Empty Operator
1944// =============================================================================
1945
1946struct EmptyOperator {
1947    done: bool,
1948}
1949
1950impl EmptyOperator {
1951    fn new() -> Self {
1952        Self { done: false }
1953    }
1954}
1955
1956impl Operator for EmptyOperator {
1957    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1958        if self.done {
1959            return Ok(None);
1960        }
1961        self.done = true;
1962        Ok(Some(ResultBatch::with_rows(vec![], vec![Row { values: vec![] }])))
1963    }
1964
1965    fn columns(&self) -> &[String] {
1966        &[]
1967    }
1968}
1969
1970// =============================================================================
1971// Expression Evaluation
1972// =============================================================================
1973
1974/// Context for expression evaluation, optionally holding executor for subqueries.
1975pub struct EvalContext<'a> {
1976    executor: Option<&'a Executor>,
1977}
1978
1979impl<'a> EvalContext<'a> {
1980    pub fn new() -> Self {
1981        Self { executor: None }
1982    }
1983
1984    pub fn with_executor(executor: &'a Executor) -> Self {
1985        Self { executor: Some(executor) }
1986    }
1987
1988    /// Execute a subquery and return all result rows.
1989    fn execute_subquery(&self, subquery: &PlanNode) -> ExecutorResult<Vec<Row>> {
1990        let executor = self.executor.ok_or_else(|| {
1991            ExecutorError::Internal("Subquery requires execution context".to_string())
1992        })?;
1993        let plan = QueryPlan {
1994            root: subquery.clone(),
1995            estimated_cost: 0.0,
1996            estimated_rows: 0,
1997        };
1998        let result = executor.execute(&plan)?;
1999        Ok(result.rows)
2000    }
2001}
2002
2003impl Default for EvalContext<'_> {
2004    fn default() -> Self {
2005        Self::new()
2006    }
2007}
2008
2009/// Evaluate a default expression (constant expression that doesn't need row context).
2010fn evaluate_default_expression(expr: &PlanExpression) -> ExecutorResult<Value> {
2011    let empty_row = Row { values: vec![] };
2012    let empty_columns: Vec<String> = vec![];
2013    evaluate_expression(expr, &empty_row, &empty_columns)
2014}
2015
2016/// Substitute parameter placeholders in a query plan with actual values.
2017fn substitute_parameters(node: &PlanNode, params: &[Value]) -> ExecutorResult<PlanNode> {
2018    match node {
2019        PlanNode::Scan(scan) => {
2020            // ScanNode doesn't have predicates - predicates are in Filter nodes
2021            // Just need to handle index_scan key_range expressions if present
2022            let index_scan = if let Some(idx) = &scan.index_scan {
2023                let start = idx.key_range.start.as_ref()
2024                    .map(|e| substitute_expr(e, params))
2025                    .transpose()?;
2026                let end = idx.key_range.end.as_ref()
2027                    .map(|e| substitute_expr(e, params))
2028                    .transpose()?;
2029                Some(crate::planner::IndexScan {
2030                    index_name: idx.index_name.clone(),
2031                    key_range: crate::planner::KeyRange {
2032                        start,
2033                        start_inclusive: idx.key_range.start_inclusive,
2034                        end,
2035                        end_inclusive: idx.key_range.end_inclusive,
2036                    },
2037                })
2038            } else {
2039                None
2040            };
2041            Ok(PlanNode::Scan(ScanNode {
2042                table_name: scan.table_name.clone(),
2043                alias: scan.alias.clone(),
2044                columns: scan.columns.clone(),
2045                index_scan,
2046            }))
2047        }
2048        PlanNode::Filter(filter) => {
2049            let input = Box::new(substitute_parameters(&filter.input, params)?);
2050            let predicate = substitute_expr(&filter.predicate, params)?;
2051            Ok(PlanNode::Filter(FilterNode { input, predicate }))
2052        }
2053        PlanNode::Project(proj) => {
2054            let input = Box::new(substitute_parameters(&proj.input, params)?);
2055            let expressions = proj.expressions.iter()
2056                .map(|pe| Ok(ProjectionExpr {
2057                    expr: substitute_expr(&pe.expr, params)?,
2058                    alias: pe.alias.clone(),
2059                }))
2060                .collect::<ExecutorResult<Vec<_>>>()?;
2061            Ok(PlanNode::Project(ProjectNode { input, expressions }))
2062        }
2063        PlanNode::Sort(sort) => {
2064            let input = Box::new(substitute_parameters(&sort.input, params)?);
2065            let order_by = sort.order_by.iter()
2066                .map(|sk| Ok(SortKey {
2067                    expr: substitute_expr(&sk.expr, params)?,
2068                    ascending: sk.ascending,
2069                    nulls_first: sk.nulls_first,
2070                }))
2071                .collect::<ExecutorResult<Vec<_>>>()?;
2072            Ok(PlanNode::Sort(SortNode { input, order_by }))
2073        }
2074        PlanNode::Limit(limit) => {
2075            let input = Box::new(substitute_parameters(&limit.input, params)?);
2076            Ok(PlanNode::Limit(LimitNode {
2077                input,
2078                limit: limit.limit,
2079                offset: limit.offset,
2080            }))
2081        }
2082        PlanNode::Aggregate(agg) => {
2083            let input = Box::new(substitute_parameters(&agg.input, params)?);
2084            let group_by = agg.group_by.iter()
2085                .map(|e| substitute_expr(e, params))
2086                .collect::<ExecutorResult<Vec<_>>>()?;
2087            let aggregates = agg.aggregates.iter()
2088                .map(|ae| Ok(AggregateExpr {
2089                    function: ae.function,
2090                    argument: ae.argument.as_ref().map(|a| substitute_expr(a, params)).transpose()?,
2091                    distinct: ae.distinct,
2092                    alias: ae.alias.clone(),
2093                }))
2094                .collect::<ExecutorResult<Vec<_>>>()?;
2095            Ok(PlanNode::Aggregate(AggregateNode { input, group_by, aggregates }))
2096        }
2097        PlanNode::Join(join) => {
2098            let left = Box::new(substitute_parameters(&join.left, params)?);
2099            let right = Box::new(substitute_parameters(&join.right, params)?);
2100            let condition = join.condition.as_ref()
2101                .map(|c| substitute_expr(c, params))
2102                .transpose()?;
2103            Ok(PlanNode::Join(JoinNode {
2104                left,
2105                right,
2106                join_type: join.join_type,
2107                condition,
2108                strategy: join.strategy,
2109            }))
2110        }
2111        PlanNode::Insert(insert) => {
2112            let source = match &insert.source {
2113                InsertPlanSource::Values(rows) => {
2114                    let substituted = rows.iter()
2115                        .map(|row| row.iter().map(|e| substitute_expr(e, params)).collect::<ExecutorResult<Vec<_>>>())
2116                        .collect::<ExecutorResult<Vec<_>>>()?;
2117                    InsertPlanSource::Values(substituted)
2118                }
2119                InsertPlanSource::Query(q) => {
2120                    InsertPlanSource::Query(Box::new(substitute_parameters(q, params)?))
2121                }
2122            };
2123            Ok(PlanNode::Insert(InsertNode {
2124                table_name: insert.table_name.clone(),
2125                columns: insert.columns.clone(),
2126                source,
2127            }))
2128        }
2129        PlanNode::Update(update) => {
2130            let assignments = update.assignments.iter()
2131                .map(|(col, expr)| Ok((col.clone(), substitute_expr(expr, params)?)))
2132                .collect::<ExecutorResult<Vec<_>>>()?;
2133            let where_clause = update.where_clause.as_ref()
2134                .map(|p| substitute_expr(p, params))
2135                .transpose()?;
2136            Ok(PlanNode::Update(UpdateNode {
2137                table_name: update.table_name.clone(),
2138                assignments,
2139                where_clause,
2140            }))
2141        }
2142        PlanNode::Delete(delete) => {
2143            let where_clause = delete.where_clause.as_ref()
2144                .map(|p| substitute_expr(p, params))
2145                .transpose()?;
2146            Ok(PlanNode::Delete(DeleteNode {
2147                table_name: delete.table_name.clone(),
2148                where_clause,
2149            }))
2150        }
2151        // DDL operations don't have parameters, return as-is
2152        PlanNode::CreateTable(_) |
2153        PlanNode::DropTable(_) |
2154        PlanNode::AlterTable(_) |
2155        PlanNode::CreateIndex(_) |
2156        PlanNode::DropIndex(_) |
2157        PlanNode::Empty => Ok(node.clone()),
2158    }
2159}
2160
2161/// Substitute parameter placeholders in an expression with actual values.
2162fn substitute_expr(expr: &PlanExpression, params: &[Value]) -> ExecutorResult<PlanExpression> {
2163    match expr {
2164        PlanExpression::Placeholder(idx) => {
2165            // Parameters are 1-indexed ($1, $2, etc.)
2166            let param_idx = *idx - 1;
2167            params.get(param_idx)
2168                .map(value_to_literal)
2169                .ok_or_else(|| ExecutorError::Internal(format!(
2170                    "Missing parameter ${}: expected {} parameters but got {}",
2171                    idx, idx, params.len()
2172                )))
2173        }
2174        PlanExpression::BinaryOp { op, left, right } => {
2175            Ok(PlanExpression::BinaryOp {
2176                op: *op,
2177                left: Box::new(substitute_expr(left, params)?),
2178                right: Box::new(substitute_expr(right, params)?),
2179            })
2180        }
2181        PlanExpression::UnaryOp { op, expr: inner } => {
2182            Ok(PlanExpression::UnaryOp {
2183                op: *op,
2184                expr: Box::new(substitute_expr(inner, params)?),
2185            })
2186        }
2187        PlanExpression::Function { name, args, return_type } => {
2188            let new_args = args.iter()
2189                .map(|a| substitute_expr(a, params))
2190                .collect::<ExecutorResult<Vec<_>>>()?;
2191            Ok(PlanExpression::Function { name: name.clone(), args: new_args, return_type: return_type.clone() })
2192        }
2193        PlanExpression::Case { operand, conditions, else_result } => {
2194            let new_operand = operand.as_ref()
2195                .map(|o| substitute_expr(o, params))
2196                .transpose()?
2197                .map(Box::new);
2198            let new_conditions = conditions.iter()
2199                .map(|(cond, result)| Ok((substitute_expr(cond, params)?, substitute_expr(result, params)?)))
2200                .collect::<ExecutorResult<Vec<_>>>()?;
2201            let new_else = else_result.as_ref()
2202                .map(|e| substitute_expr(e, params))
2203                .transpose()?
2204                .map(Box::new);
2205            Ok(PlanExpression::Case {
2206                operand: new_operand,
2207                conditions: new_conditions,
2208                else_result: new_else,
2209            })
2210        }
2211        PlanExpression::InList { expr: inner, list, negated } => {
2212            let new_inner = Box::new(substitute_expr(inner, params)?);
2213            let new_list = list.iter()
2214                .map(|e| substitute_expr(e, params))
2215                .collect::<ExecutorResult<Vec<_>>>()?;
2216            Ok(PlanExpression::InList { expr: new_inner, list: new_list, negated: *negated })
2217        }
2218        PlanExpression::InSubquery { expr: inner, subquery, negated } => {
2219            let new_inner = Box::new(substitute_expr(inner, params)?);
2220            let new_subquery = Box::new(substitute_parameters(subquery, params)?);
2221            Ok(PlanExpression::InSubquery { expr: new_inner, subquery: new_subquery, negated: *negated })
2222        }
2223        PlanExpression::Exists { subquery, negated } => {
2224            let new_subquery = Box::new(substitute_parameters(subquery, params)?);
2225            Ok(PlanExpression::Exists { subquery: new_subquery, negated: *negated })
2226        }
2227        PlanExpression::ScalarSubquery(subquery) => {
2228            let new_subquery = Box::new(substitute_parameters(subquery, params)?);
2229            Ok(PlanExpression::ScalarSubquery(new_subquery))
2230        }
2231        PlanExpression::Between { expr: inner, low, high, negated } => {
2232            Ok(PlanExpression::Between {
2233                expr: Box::new(substitute_expr(inner, params)?),
2234                low: Box::new(substitute_expr(low, params)?),
2235                high: Box::new(substitute_expr(high, params)?),
2236                negated: *negated,
2237            })
2238        }
2239        PlanExpression::Like { expr: inner, pattern, negated } => {
2240            Ok(PlanExpression::Like {
2241                expr: Box::new(substitute_expr(inner, params)?),
2242                pattern: Box::new(substitute_expr(pattern, params)?),
2243                negated: *negated,
2244            })
2245        }
2246        PlanExpression::IsNull { expr: inner, negated } => {
2247            Ok(PlanExpression::IsNull {
2248                expr: Box::new(substitute_expr(inner, params)?),
2249                negated: *negated,
2250            })
2251        }
2252        PlanExpression::Cast { expr: inner, target_type } => {
2253            Ok(PlanExpression::Cast {
2254                expr: Box::new(substitute_expr(inner, params)?),
2255                target_type: target_type.clone(),
2256            })
2257        }
2258        // Literals and columns don't have parameters
2259        PlanExpression::Literal(_) | PlanExpression::Column { .. } => Ok(expr.clone()),
2260    }
2261}
2262
2263/// Convert a Value back to a PlanExpression::Literal.
2264fn value_to_literal(value: &Value) -> PlanExpression {
2265    let lit = match value {
2266        Value::Null => PlanLiteral::Null,
2267        Value::Boolean(b) => PlanLiteral::Boolean(*b),
2268        Value::Integer(i) => PlanLiteral::Integer(*i),
2269        Value::Float(f) => PlanLiteral::Float(*f),
2270        Value::String(s) => PlanLiteral::String(s.clone()),
2271        Value::Bytes(b) => PlanLiteral::String(String::from_utf8_lossy(b).to_string()),
2272        Value::Timestamp(t) => PlanLiteral::Integer(t.timestamp_millis()),
2273        // Arrays and objects can be serialized to string representation for parameters
2274        Value::Array(arr) => PlanLiteral::String(format!("{:?}", arr)),
2275        Value::Object(obj) => PlanLiteral::String(format!("{:?}", obj)),
2276    };
2277    PlanExpression::Literal(lit)
2278}
2279
2280/// Evaluate expression without subquery support (backwards compatible).
2281fn evaluate_expression(
2282    expr: &PlanExpression,
2283    row: &Row,
2284    columns: &[String],
2285) -> ExecutorResult<Value> {
2286    evaluate_expression_with_context(expr, row, columns, &EvalContext::new())
2287}
2288
2289/// Evaluate expression with optional subquery support.
2290fn evaluate_expression_with_context(
2291    expr: &PlanExpression,
2292    row: &Row,
2293    columns: &[String],
2294    ctx: &EvalContext,
2295) -> ExecutorResult<Value> {
2296    match expr {
2297        PlanExpression::Literal(lit) => Ok(match lit {
2298            PlanLiteral::Null => Value::Null,
2299            PlanLiteral::Boolean(b) => Value::Boolean(*b),
2300            PlanLiteral::Integer(i) => Value::Integer(*i),
2301            PlanLiteral::Float(f) => Value::Float(*f),
2302            PlanLiteral::String(s) => Value::String(s.clone()),
2303        }),
2304
2305        PlanExpression::Column { table: _, name, .. } => {
2306            if name == "*" {
2307                return Ok(Value::Null);
2308            }
2309
2310            let idx = columns
2311                .iter()
2312                .position(|c| c == name)
2313                .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
2314
2315            Ok(row.values.get(idx).cloned().unwrap_or(Value::Null))
2316        }
2317
2318        PlanExpression::BinaryOp { left, op, right } => {
2319            let left_val = evaluate_expression_with_context(left, row, columns, ctx)?;
2320            let right_val = evaluate_expression_with_context(right, row, columns, ctx)?;
2321            evaluate_binary_op(*op, &left_val, &right_val)
2322        }
2323
2324        PlanExpression::UnaryOp { op, expr } => {
2325            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2326            evaluate_unary_op(*op, &val)
2327        }
2328
2329        PlanExpression::IsNull { expr, negated } => {
2330            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2331            let is_null = matches!(val, Value::Null);
2332            Ok(Value::Boolean(if *negated { !is_null } else { is_null }))
2333        }
2334
2335        PlanExpression::Cast { expr, target_type } => {
2336            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2337            cast_value(&val, target_type)
2338        }
2339
2340        PlanExpression::Function { name, args, .. } => {
2341            let arg_values: Vec<Value> = args
2342                .iter()
2343                .map(|a| evaluate_expression_with_context(a, row, columns, ctx))
2344                .collect::<ExecutorResult<Vec<_>>>()?;
2345            evaluate_function(name, &arg_values)
2346        }
2347
2348        PlanExpression::Case { operand, conditions, else_result } => {
2349            match operand {
2350                Some(operand_expr) => {
2351                    // CASE operand WHEN value THEN result ... END
2352                    let operand_val = evaluate_expression_with_context(operand_expr, row, columns, ctx)?;
2353                    for (when_expr, then_expr) in conditions {
2354                        let when_val = evaluate_expression_with_context(when_expr, row, columns, ctx)?;
2355                        if compare_values(&operand_val, &when_val)? == std::cmp::Ordering::Equal {
2356                            return evaluate_expression_with_context(then_expr, row, columns, ctx);
2357                        }
2358                    }
2359                }
2360                None => {
2361                    // CASE WHEN condition THEN result ... END
2362                    for (when_expr, then_expr) in conditions {
2363                        let when_val = evaluate_expression_with_context(when_expr, row, columns, ctx)?;
2364                        if matches!(when_val, Value::Boolean(true)) {
2365                            return evaluate_expression_with_context(then_expr, row, columns, ctx);
2366                        }
2367                    }
2368                }
2369            }
2370            // Return ELSE result or NULL
2371            match else_result {
2372                Some(else_expr) => evaluate_expression_with_context(else_expr, row, columns, ctx),
2373                None => Ok(Value::Null),
2374            }
2375        }
2376
2377        PlanExpression::InList { expr, list, negated } => {
2378            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2379            let mut found = false;
2380            for item in list {
2381                let item_val = evaluate_expression_with_context(item, row, columns, ctx)?;
2382                if compare_values(&val, &item_val)? == std::cmp::Ordering::Equal {
2383                    found = true;
2384                    break;
2385                }
2386            }
2387            Ok(Value::Boolean(if *negated { !found } else { found }))
2388        }
2389
2390        PlanExpression::Between { expr, low, high, negated } => {
2391            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2392            let low_val = evaluate_expression_with_context(low, row, columns, ctx)?;
2393            let high_val = evaluate_expression_with_context(high, row, columns, ctx)?;
2394
2395            let ge_low = compare_values(&val, &low_val)? != std::cmp::Ordering::Less;
2396            let le_high = compare_values(&val, &high_val)? != std::cmp::Ordering::Greater;
2397            let in_range = ge_low && le_high;
2398
2399            Ok(Value::Boolean(if *negated { !in_range } else { in_range }))
2400        }
2401
2402        PlanExpression::Like { expr, pattern, negated } => {
2403            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2404            let pattern_val = evaluate_expression_with_context(pattern, row, columns, ctx)?;
2405
2406            let val_str = match val {
2407                Value::String(s) => s,
2408                Value::Null => return Ok(Value::Null),
2409                Value::Integer(i) => i.to_string(),
2410                Value::Float(f) => f.to_string(),
2411                Value::Boolean(b) => b.to_string(),
2412                Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
2413                Value::Timestamp(t) => t.to_rfc3339(),
2414                Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
2415            };
2416
2417            let pattern_str = match pattern_val {
2418                Value::String(s) => s,
2419                Value::Null => return Ok(Value::Null),
2420                Value::Integer(i) => i.to_string(),
2421                Value::Float(f) => f.to_string(),
2422                Value::Boolean(b) => b.to_string(),
2423                Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
2424                Value::Timestamp(t) => t.to_rfc3339(),
2425                Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
2426            };
2427
2428            // Convert SQL LIKE pattern to regex
2429            let regex_pattern = pattern_str
2430                .replace('%', ".*")
2431                .replace('_', ".");
2432            let regex_pattern = format!("^{}$", regex_pattern);
2433
2434            // Simple pattern matching (could use regex crate for full support)
2435            let matches = if regex_pattern == "^.*$" {
2436                true
2437            } else if regex_pattern.starts_with("^") && regex_pattern.ends_with("$") {
2438                let inner = &regex_pattern[1..regex_pattern.len()-1];
2439                if inner.contains(".*") || inner.contains('.') {
2440                    // For patterns with wildcards, do simple matching
2441                    let parts: Vec<&str> = inner.split(".*").collect();
2442                    if parts.len() == 1 {
2443                        val_str == parts[0]
2444                    } else {
2445                        let mut pos = 0;
2446                        let mut matched = true;
2447                        for (i, part) in parts.iter().enumerate() {
2448                            if part.is_empty() { continue; }
2449                            if let Some(found_pos) = val_str[pos..].find(part) {
2450                                if i == 0 && found_pos != 0 {
2451                                    matched = false;
2452                                    break;
2453                                }
2454                                pos += found_pos + part.len();
2455                            } else {
2456                                matched = false;
2457                                break;
2458                            }
2459                        }
2460                        matched
2461                    }
2462                } else {
2463                    val_str == inner
2464                }
2465            } else {
2466                val_str.contains(&pattern_str)
2467            };
2468
2469            Ok(Value::Boolean(if *negated { !matches } else { matches }))
2470        }
2471
2472        PlanExpression::InSubquery { expr, subquery, negated } => {
2473            // Evaluate the expression to check
2474            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2475
2476            // Execute the subquery
2477            let subquery_rows = ctx.execute_subquery(subquery)?;
2478
2479            // Check if val is in any of the subquery results (first column)
2480            let mut found = false;
2481            for subquery_row in &subquery_rows {
2482                if let Some(subquery_val) = subquery_row.values.first() {
2483                    if compare_values(&val, subquery_val)? == std::cmp::Ordering::Equal {
2484                        found = true;
2485                        break;
2486                    }
2487                }
2488            }
2489
2490            Ok(Value::Boolean(if *negated { !found } else { found }))
2491        }
2492
2493        PlanExpression::Exists { subquery, negated } => {
2494            // Execute the subquery and check if it returns any rows
2495            let subquery_rows = ctx.execute_subquery(subquery)?;
2496            let exists = !subquery_rows.is_empty();
2497
2498            Ok(Value::Boolean(if *negated { !exists } else { exists }))
2499        }
2500
2501        PlanExpression::ScalarSubquery(subquery) => {
2502            // Execute the subquery and return the single value
2503            let subquery_rows = ctx.execute_subquery(subquery)?;
2504
2505            if subquery_rows.is_empty() {
2506                return Ok(Value::Null);
2507            }
2508
2509            if subquery_rows.len() > 1 {
2510                return Err(ExecutorError::Internal(
2511                    "Scalar subquery returned more than one row".to_string()
2512                ));
2513            }
2514
2515            // Return the first column of the first row
2516            subquery_rows[0]
2517                .values
2518                .first()
2519                .cloned()
2520                .ok_or_else(|| ExecutorError::Internal("Scalar subquery returned no columns".to_string()))
2521        }
2522
2523        PlanExpression::Placeholder(idx) => {
2524            // Placeholders should be replaced before execution
2525            Err(ExecutorError::Internal(format!("Unresolved placeholder ${}", idx)))
2526        }
2527    }
2528}
2529
2530fn evaluate_binary_op(op: PlanBinaryOp, left: &Value, right: &Value) -> ExecutorResult<Value> {
2531    if matches!(left, Value::Null) || matches!(right, Value::Null) {
2532        if matches!(op, PlanBinaryOp::Equal | PlanBinaryOp::NotEqual) {
2533            return Ok(Value::Null);
2534        }
2535        return Ok(Value::Null);
2536    }
2537
2538    match op {
2539        PlanBinaryOp::Add => {
2540            let l = value_to_f64(left)?;
2541            let r = value_to_f64(right)?;
2542            Ok(Value::Float(l + r))
2543        }
2544        PlanBinaryOp::Subtract => {
2545            let l = value_to_f64(left)?;
2546            let r = value_to_f64(right)?;
2547            Ok(Value::Float(l - r))
2548        }
2549        PlanBinaryOp::Multiply => {
2550            let l = value_to_f64(left)?;
2551            let r = value_to_f64(right)?;
2552            Ok(Value::Float(l * r))
2553        }
2554        PlanBinaryOp::Divide => {
2555            let l = value_to_f64(left)?;
2556            let r = value_to_f64(right)?;
2557            if r == 0.0 {
2558                return Err(ExecutorError::DivisionByZero);
2559            }
2560            Ok(Value::Float(l / r))
2561        }
2562        PlanBinaryOp::Modulo => {
2563            let l = value_to_i64(left)?;
2564            let r = value_to_i64(right)?;
2565            if r == 0 {
2566                return Err(ExecutorError::DivisionByZero);
2567            }
2568            Ok(Value::Integer(l % r))
2569        }
2570        PlanBinaryOp::Equal => Ok(Value::Boolean(compare_values(left, right)? == std::cmp::Ordering::Equal)),
2571        PlanBinaryOp::NotEqual => Ok(Value::Boolean(compare_values(left, right)? != std::cmp::Ordering::Equal)),
2572        PlanBinaryOp::LessThan => Ok(Value::Boolean(compare_values(left, right)? == std::cmp::Ordering::Less)),
2573        PlanBinaryOp::LessThanOrEqual => Ok(Value::Boolean(compare_values(left, right)? != std::cmp::Ordering::Greater)),
2574        PlanBinaryOp::GreaterThan => Ok(Value::Boolean(compare_values(left, right)? == std::cmp::Ordering::Greater)),
2575        PlanBinaryOp::GreaterThanOrEqual => Ok(Value::Boolean(compare_values(left, right)? != std::cmp::Ordering::Less)),
2576        PlanBinaryOp::And => {
2577            let l = value_to_bool(left)?;
2578            let r = value_to_bool(right)?;
2579            Ok(Value::Boolean(l && r))
2580        }
2581        PlanBinaryOp::Or => {
2582            let l = value_to_bool(left)?;
2583            let r = value_to_bool(right)?;
2584            Ok(Value::Boolean(l || r))
2585        }
2586        PlanBinaryOp::Concat => {
2587            let l = value_to_string(left);
2588            let r = value_to_string(right);
2589            Ok(Value::String(format!("{}{}", l, r)))
2590        }
2591    }
2592}
2593
2594fn evaluate_unary_op(op: PlanUnaryOp, value: &Value) -> ExecutorResult<Value> {
2595    match op {
2596        PlanUnaryOp::Not => {
2597            let b = value_to_bool(value)?;
2598            Ok(Value::Boolean(!b))
2599        }
2600        PlanUnaryOp::Negative => {
2601            let f = value_to_f64(value)?;
2602            Ok(Value::Float(-f))
2603        }
2604    }
2605}
2606
2607fn evaluate_function(name: &str, args: &[Value]) -> ExecutorResult<Value> {
2608    match name.to_uppercase().as_str() {
2609        "UPPER" => {
2610            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
2611            Ok(Value::String(s.to_uppercase()))
2612        }
2613        "LOWER" => {
2614            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
2615            Ok(Value::String(s.to_lowercase()))
2616        }
2617        "LENGTH" => {
2618            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
2619            Ok(Value::Integer(s.len() as i64))
2620        }
2621        "ABS" => {
2622            let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
2623            Ok(Value::Float(f.abs()))
2624        }
2625        "COALESCE" => {
2626            for arg in args {
2627                if !matches!(arg, Value::Null) {
2628                    return Ok(arg.clone());
2629                }
2630            }
2631            Ok(Value::Null)
2632        }
2633        _ => Err(ExecutorError::InvalidOperation(format!(
2634            "Unknown function: {}",
2635            name
2636        ))),
2637    }
2638}
2639
2640// =============================================================================
2641// Value Conversion Utilities
2642// =============================================================================
2643
2644fn value_to_f64(value: &Value) -> ExecutorResult<f64> {
2645    match value {
2646        Value::Integer(i) => Ok(*i as f64),
2647        Value::Float(f) => Ok(*f),
2648        Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
2649            expected: "number".to_string(),
2650            actual: "string".to_string(),
2651        }),
2652        _ => Err(ExecutorError::TypeMismatch {
2653            expected: "number".to_string(),
2654            actual: format!("{:?}", value),
2655        }),
2656    }
2657}
2658
2659fn value_to_i64(value: &Value) -> ExecutorResult<i64> {
2660    match value {
2661        Value::Integer(i) => Ok(*i),
2662        Value::Float(f) => Ok(*f as i64),
2663        Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
2664            expected: "integer".to_string(),
2665            actual: "string".to_string(),
2666        }),
2667        _ => Err(ExecutorError::TypeMismatch {
2668            expected: "integer".to_string(),
2669            actual: format!("{:?}", value),
2670        }),
2671    }
2672}
2673
2674fn value_to_bool(value: &Value) -> ExecutorResult<bool> {
2675    match value {
2676        Value::Boolean(b) => Ok(*b),
2677        Value::Integer(i) => Ok(*i != 0),
2678        Value::Null => Ok(false),
2679        _ => Err(ExecutorError::TypeMismatch {
2680            expected: "boolean".to_string(),
2681            actual: format!("{:?}", value),
2682        }),
2683    }
2684}
2685
2686fn value_to_string(value: &Value) -> String {
2687    match value {
2688        Value::String(s) => s.clone(),
2689        Value::Integer(i) => i.to_string(),
2690        Value::Float(f) => f.to_string(),
2691        Value::Boolean(b) => b.to_string(),
2692        Value::Null => String::new(),
2693        _ => format!("{:?}", value),
2694    }
2695}
2696
2697fn compare_values(left: &Value, right: &Value) -> ExecutorResult<std::cmp::Ordering> {
2698    match (left, right) {
2699        (Value::Integer(l), Value::Integer(r)) => Ok(l.cmp(r)),
2700        (Value::Float(l), Value::Float(r)) => {
2701            Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
2702        }
2703        (Value::Integer(l), Value::Float(r)) => {
2704            let l = *l as f64;
2705            Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
2706        }
2707        (Value::Float(l), Value::Integer(r)) => {
2708            let r = *r as f64;
2709            Ok(l.partial_cmp(&r).unwrap_or(std::cmp::Ordering::Equal))
2710        }
2711        (Value::String(l), Value::String(r)) => Ok(l.cmp(r)),
2712        (Value::Boolean(l), Value::Boolean(r)) => Ok(l.cmp(r)),
2713        _ => Ok(std::cmp::Ordering::Equal),
2714    }
2715}
2716
2717fn cast_value(value: &Value, target_type: &DataType) -> ExecutorResult<Value> {
2718    match target_type {
2719        DataType::Integer => Ok(Value::Integer(value_to_i64(value)?)),
2720        DataType::Float => Ok(Value::Float(value_to_f64(value)?)),
2721        DataType::Text => Ok(Value::String(value_to_string(value))),
2722        DataType::Boolean => Ok(Value::Boolean(value_to_bool(value)?)),
2723        _ => Ok(value.clone()),
2724    }
2725}
2726
2727// =============================================================================
2728// Tests
2729// =============================================================================
2730
2731#[cfg(test)]
2732mod tests {
2733    use super::*;
2734    use crate::planner::{LimitNode, PlanNode, ProjectNode, ProjectionExpr, QueryPlan, ScanNode};
2735
2736    fn create_test_context() -> ExecutionContext {
2737        let mut context = ExecutionContext::new();
2738
2739        context.add_table(TableData {
2740            name: "users".to_string(),
2741            columns: vec![
2742                "id".to_string(),
2743                "name".to_string(),
2744                "age".to_string(),
2745            ],
2746            rows: vec![
2747                Row {
2748                    values: vec![
2749                        Value::Integer(1),
2750                        Value::String("Alice".to_string()),
2751                        Value::Integer(30),
2752                    ],
2753                },
2754                Row {
2755                    values: vec![
2756                        Value::Integer(2),
2757                        Value::String("Bob".to_string()),
2758                        Value::Integer(25),
2759                    ],
2760                },
2761                Row {
2762                    values: vec![
2763                        Value::Integer(3),
2764                        Value::String("Charlie".to_string()),
2765                        Value::Integer(35),
2766                    ],
2767                },
2768            ],
2769        });
2770
2771        context
2772    }
2773
2774    #[test]
2775    fn test_scan_operator() {
2776        let context = create_test_context();
2777        let executor = Executor::new(context);
2778
2779        let plan = QueryPlan {
2780            root: PlanNode::Project(ProjectNode {
2781                input: Box::new(PlanNode::Scan(ScanNode {
2782                    table_name: "users".to_string(),
2783                    alias: None,
2784                    columns: vec![
2785                        "id".to_string(),
2786                        "name".to_string(),
2787                        "age".to_string(),
2788                    ],
2789                    index_scan: None,
2790                })),
2791                expressions: vec![
2792                    ProjectionExpr {
2793                        expr: PlanExpression::Column {
2794                            table: None,
2795                            name: "id".to_string(),
2796                            data_type: DataType::Integer,
2797                        },
2798                        alias: Some("id".to_string()),
2799                    },
2800                    ProjectionExpr {
2801                        expr: PlanExpression::Column {
2802                            table: None,
2803                            name: "name".to_string(),
2804                            data_type: DataType::Text,
2805                        },
2806                        alias: Some("name".to_string()),
2807                    },
2808                ],
2809            }),
2810            estimated_cost: 100.0,
2811            estimated_rows: 3,
2812        };
2813
2814        let result = executor.execute(&plan).unwrap();
2815
2816        assert_eq!(result.rows.len(), 3);
2817        assert_eq!(result.columns.len(), 2);
2818    }
2819
2820    #[test]
2821    fn test_filter_operator() {
2822        let context = create_test_context();
2823        let executor = Executor::new(context);
2824
2825        let plan = QueryPlan {
2826            root: PlanNode::Project(ProjectNode {
2827                input: Box::new(PlanNode::Filter(crate::planner::FilterNode {
2828                    input: Box::new(PlanNode::Scan(ScanNode {
2829                        table_name: "users".to_string(),
2830                        alias: None,
2831                        columns: vec![
2832                            "id".to_string(),
2833                            "name".to_string(),
2834                            "age".to_string(),
2835                        ],
2836                        index_scan: None,
2837                    })),
2838                    predicate: PlanExpression::BinaryOp {
2839                        left: Box::new(PlanExpression::Column {
2840                            table: None,
2841                            name: "age".to_string(),
2842                            data_type: DataType::Integer,
2843                        }),
2844                        op: PlanBinaryOp::GreaterThan,
2845                        right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(28))),
2846                    },
2847                })),
2848                expressions: vec![ProjectionExpr {
2849                    expr: PlanExpression::Column {
2850                        table: None,
2851                        name: "name".to_string(),
2852                        data_type: DataType::Text,
2853                    },
2854                    alias: Some("name".to_string()),
2855                }],
2856            }),
2857            estimated_cost: 100.0,
2858            estimated_rows: 2,
2859        };
2860
2861        let result = executor.execute(&plan).unwrap();
2862
2863        assert_eq!(result.rows.len(), 2);
2864    }
2865
2866    #[test]
2867    fn test_limit_operator() {
2868        let context = create_test_context();
2869        let executor = Executor::new(context);
2870
2871        let plan = QueryPlan {
2872            root: PlanNode::Limit(LimitNode {
2873                input: Box::new(PlanNode::Project(ProjectNode {
2874                    input: Box::new(PlanNode::Scan(ScanNode {
2875                        table_name: "users".to_string(),
2876                        alias: None,
2877                        columns: vec!["id".to_string()],
2878                        index_scan: None,
2879                    })),
2880                    expressions: vec![ProjectionExpr {
2881                        expr: PlanExpression::Column {
2882                            table: None,
2883                            name: "id".to_string(),
2884                            data_type: DataType::Integer,
2885                        },
2886                        alias: Some("id".to_string()),
2887                    }],
2888                })),
2889                limit: Some(2),
2890                offset: None,
2891            }),
2892            estimated_cost: 100.0,
2893            estimated_rows: 2,
2894        };
2895
2896        let result = executor.execute(&plan).unwrap();
2897
2898        assert_eq!(result.rows.len(), 2);
2899    }
2900
2901    #[test]
2902    fn test_create_table() {
2903        use crate::planner::{CreateTableNode, CreateColumnDef};
2904
2905        let executor = Executor::new(ExecutionContext::new());
2906
2907        let plan = QueryPlan {
2908            root: PlanNode::CreateTable(CreateTableNode {
2909                table_name: "test_table".to_string(),
2910                columns: vec![
2911                    CreateColumnDef {
2912                        name: "id".to_string(),
2913                        data_type: DataType::Integer,
2914                        nullable: false,
2915                        default: None,
2916                        primary_key: true,
2917                        unique: false,
2918                    },
2919                    CreateColumnDef {
2920                        name: "name".to_string(),
2921                        data_type: DataType::Text,
2922                        nullable: true,
2923                        default: None,
2924                        primary_key: false,
2925                        unique: false,
2926                    },
2927                ],
2928                constraints: vec![],
2929                if_not_exists: false,
2930            }),
2931            estimated_cost: 1.0,
2932            estimated_rows: 0,
2933        };
2934
2935        let result = executor.execute(&plan).unwrap();
2936        match &result.rows[0].values[0] {
2937            Value::String(s) => assert!(s.contains("created")),
2938            _ => panic!("Expected string result"),
2939        }
2940
2941        // Verify table exists by listing tables
2942        let tables = executor.context.read().unwrap().list_tables();
2943        assert!(tables.contains(&"test_table".to_string()));
2944    }
2945
2946    #[test]
2947    fn test_insert_into_table() {
2948        use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource};
2949
2950        let executor = Executor::new(ExecutionContext::new());
2951
2952        // First create a table
2953        let create_plan = QueryPlan {
2954            root: PlanNode::CreateTable(CreateTableNode {
2955                table_name: "test_insert".to_string(),
2956                columns: vec![
2957                    CreateColumnDef {
2958                        name: "id".to_string(),
2959                        data_type: DataType::Integer,
2960                        nullable: false,
2961                        default: None,
2962                        primary_key: true,
2963                        unique: false,
2964                    },
2965                    CreateColumnDef {
2966                        name: "value".to_string(),
2967                        data_type: DataType::Text,
2968                        nullable: true,
2969                        default: None,
2970                        primary_key: false,
2971                        unique: false,
2972                    },
2973                ],
2974                constraints: vec![],
2975                if_not_exists: false,
2976            }),
2977            estimated_cost: 1.0,
2978            estimated_rows: 0,
2979        };
2980        executor.execute(&create_plan).unwrap();
2981
2982        // Now insert data
2983        let insert_plan = QueryPlan {
2984            root: PlanNode::Insert(InsertNode {
2985                table_name: "test_insert".to_string(),
2986                columns: vec!["id".to_string(), "value".to_string()],
2987                source: InsertPlanSource::Values(vec![
2988                    vec![
2989                        PlanExpression::Literal(PlanLiteral::Integer(1)),
2990                        PlanExpression::Literal(PlanLiteral::String("hello".to_string())),
2991                    ],
2992                    vec![
2993                        PlanExpression::Literal(PlanLiteral::Integer(2)),
2994                        PlanExpression::Literal(PlanLiteral::String("world".to_string())),
2995                    ],
2996                ]),
2997            }),
2998            estimated_cost: 2.0,
2999            estimated_rows: 2,
3000        };
3001
3002        let result = executor.execute(&insert_plan).unwrap();
3003        assert_eq!(result.rows_affected, 2);
3004
3005        // Query the data
3006        let query_plan = QueryPlan {
3007            root: PlanNode::Project(ProjectNode {
3008                input: Box::new(PlanNode::Scan(ScanNode {
3009                    table_name: "test_insert".to_string(),
3010                    alias: None,
3011                    columns: vec!["id".to_string(), "value".to_string()],
3012                    index_scan: None,
3013                })),
3014                expressions: vec![
3015                    ProjectionExpr {
3016                        expr: PlanExpression::Column {
3017                            table: None,
3018                            name: "id".to_string(),
3019                            data_type: DataType::Integer,
3020                        },
3021                        alias: Some("id".to_string()),
3022                    },
3023                ],
3024            }),
3025            estimated_cost: 100.0,
3026            estimated_rows: 2,
3027        };
3028
3029        let query_result = executor.execute(&query_plan).unwrap();
3030        assert_eq!(query_result.rows.len(), 2);
3031    }
3032
3033    #[test]
3034    fn test_drop_table() {
3035        use crate::planner::{CreateTableNode, CreateColumnDef, DropTableNode};
3036
3037        let executor = Executor::new(ExecutionContext::new());
3038
3039        // Create a table
3040        let create_plan = QueryPlan {
3041            root: PlanNode::CreateTable(CreateTableNode {
3042                table_name: "to_drop".to_string(),
3043                columns: vec![CreateColumnDef {
3044                    name: "id".to_string(),
3045                    data_type: DataType::Integer,
3046                    nullable: false,
3047                    default: None,
3048                    primary_key: true,
3049                    unique: false,
3050                }],
3051                constraints: vec![],
3052                if_not_exists: false,
3053            }),
3054            estimated_cost: 1.0,
3055            estimated_rows: 0,
3056        };
3057        executor.execute(&create_plan).unwrap();
3058
3059        // Verify it exists
3060        let tables = executor.context.read().unwrap().list_tables();
3061        assert!(tables.contains(&"to_drop".to_string()));
3062
3063        // Drop the table
3064        let drop_plan = QueryPlan {
3065            root: PlanNode::DropTable(DropTableNode {
3066                table_name: "to_drop".to_string(),
3067                if_exists: false,
3068            }),
3069            estimated_cost: 1.0,
3070            estimated_rows: 0,
3071        };
3072        executor.execute(&drop_plan).unwrap();
3073
3074        // Verify it's gone
3075        let tables = executor.context.read().unwrap().list_tables();
3076        assert!(!tables.contains(&"to_drop".to_string()));
3077    }
3078
3079    #[test]
3080    fn test_shared_context_persistence() {
3081        use crate::planner::{CreateTableNode, CreateColumnDef};
3082        use std::sync::{Arc, RwLock};
3083
3084        // Create a shared context
3085        let shared_context = Arc::new(RwLock::new(ExecutionContext::new()));
3086
3087        // Create executor with shared context
3088        let executor = Executor::with_shared_context(shared_context.clone());
3089
3090        // Create a table
3091        let create_plan = QueryPlan {
3092            root: PlanNode::CreateTable(CreateTableNode {
3093                table_name: "shared_table".to_string(),
3094                columns: vec![CreateColumnDef {
3095                    name: "id".to_string(),
3096                    data_type: DataType::Integer,
3097                    nullable: false,
3098                    default: None,
3099                    primary_key: true,
3100                    unique: false,
3101                }],
3102                constraints: vec![],
3103                if_not_exists: false,
3104            }),
3105            estimated_cost: 1.0,
3106            estimated_rows: 0,
3107        };
3108        executor.execute(&create_plan).unwrap();
3109
3110        // Create a new executor with the SAME shared context
3111        let executor2 = Executor::with_shared_context(shared_context.clone());
3112
3113        // Verify the table exists from the second executor's perspective
3114        let tables = executor2.context.read().unwrap().list_tables();
3115        assert!(tables.contains(&"shared_table".to_string()));
3116    }
3117
3118    #[test]
3119    fn test_insert_select() {
3120        use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource, FilterNode};
3121
3122        let executor = Executor::new(ExecutionContext::new());
3123
3124        // Create source table
3125        let create_source = QueryPlan {
3126            root: PlanNode::CreateTable(CreateTableNode {
3127                table_name: "source_table".to_string(),
3128                columns: vec![
3129                    CreateColumnDef {
3130                        name: "id".to_string(),
3131                        data_type: DataType::Integer,
3132                        nullable: false,
3133                        default: None,
3134                        primary_key: true,
3135                        unique: false,
3136                    },
3137                    CreateColumnDef {
3138                        name: "value".to_string(),
3139                        data_type: DataType::Text,
3140                        nullable: true,
3141                        default: None,
3142                        primary_key: false,
3143                        unique: false,
3144                    },
3145                ],
3146                constraints: vec![],
3147                if_not_exists: false,
3148            }),
3149            estimated_cost: 1.0,
3150            estimated_rows: 0,
3151        };
3152        executor.execute(&create_source).unwrap();
3153
3154        // Create destination table
3155        let create_dest = QueryPlan {
3156            root: PlanNode::CreateTable(CreateTableNode {
3157                table_name: "dest_table".to_string(),
3158                columns: vec![
3159                    CreateColumnDef {
3160                        name: "id".to_string(),
3161                        data_type: DataType::Integer,
3162                        nullable: false,
3163                        default: None,
3164                        primary_key: true,
3165                        unique: false,
3166                    },
3167                    CreateColumnDef {
3168                        name: "value".to_string(),
3169                        data_type: DataType::Text,
3170                        nullable: true,
3171                        default: None,
3172                        primary_key: false,
3173                        unique: false,
3174                    },
3175                ],
3176                constraints: vec![],
3177                if_not_exists: false,
3178            }),
3179            estimated_cost: 1.0,
3180            estimated_rows: 0,
3181        };
3182        executor.execute(&create_dest).unwrap();
3183
3184        // Insert data into source table
3185        let insert_source = QueryPlan {
3186            root: PlanNode::Insert(InsertNode {
3187                table_name: "source_table".to_string(),
3188                columns: vec!["id".to_string(), "value".to_string()],
3189                source: InsertPlanSource::Values(vec![
3190                    vec![
3191                        PlanExpression::Literal(PlanLiteral::Integer(1)),
3192                        PlanExpression::Literal(PlanLiteral::String("one".to_string())),
3193                    ],
3194                    vec![
3195                        PlanExpression::Literal(PlanLiteral::Integer(2)),
3196                        PlanExpression::Literal(PlanLiteral::String("two".to_string())),
3197                    ],
3198                    vec![
3199                        PlanExpression::Literal(PlanLiteral::Integer(3)),
3200                        PlanExpression::Literal(PlanLiteral::String("three".to_string())),
3201                    ],
3202                ]),
3203            }),
3204            estimated_cost: 3.0,
3205            estimated_rows: 3,
3206        };
3207        executor.execute(&insert_source).unwrap();
3208
3209        // INSERT INTO dest_table SELECT * FROM source_table WHERE id > 1
3210        let insert_select = QueryPlan {
3211            root: PlanNode::Insert(InsertNode {
3212                table_name: "dest_table".to_string(),
3213                columns: vec!["id".to_string(), "value".to_string()],
3214                source: InsertPlanSource::Query(Box::new(
3215                    PlanNode::Project(ProjectNode {
3216                        input: Box::new(PlanNode::Filter(FilterNode {
3217                            input: Box::new(PlanNode::Scan(ScanNode {
3218                                table_name: "source_table".to_string(),
3219                                alias: None,
3220                                columns: vec!["id".to_string(), "value".to_string()],
3221                                index_scan: None,
3222                            })),
3223                            predicate: PlanExpression::BinaryOp {
3224                                left: Box::new(PlanExpression::Column {
3225                                    table: None,
3226                                    name: "id".to_string(),
3227                                    data_type: DataType::Integer,
3228                                }),
3229                                op: PlanBinaryOp::GreaterThan,
3230                                right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
3231                            },
3232                        })),
3233                        expressions: vec![
3234                            ProjectionExpr {
3235                                expr: PlanExpression::Column {
3236                                    table: None,
3237                                    name: "id".to_string(),
3238                                    data_type: DataType::Integer,
3239                                },
3240                                alias: Some("id".to_string()),
3241                            },
3242                            ProjectionExpr {
3243                                expr: PlanExpression::Column {
3244                                    table: None,
3245                                    name: "value".to_string(),
3246                                    data_type: DataType::Text,
3247                                },
3248                                alias: Some("value".to_string()),
3249                            },
3250                        ],
3251                    })
3252                )),
3253            }),
3254            estimated_cost: 2.0,
3255            estimated_rows: 2,
3256        };
3257
3258        let result = executor.execute(&insert_select).unwrap();
3259        assert_eq!(result.rows_affected, 2); // Only rows with id > 1 (id=2 and id=3)
3260
3261        // Query dest_table to verify
3262        let query_plan = QueryPlan {
3263            root: PlanNode::Project(ProjectNode {
3264                input: Box::new(PlanNode::Scan(ScanNode {
3265                    table_name: "dest_table".to_string(),
3266                    alias: None,
3267                    columns: vec!["id".to_string(), "value".to_string()],
3268                    index_scan: None,
3269                })),
3270                expressions: vec![
3271                    ProjectionExpr {
3272                        expr: PlanExpression::Column {
3273                            table: None,
3274                            name: "id".to_string(),
3275                            data_type: DataType::Integer,
3276                        },
3277                        alias: Some("id".to_string()),
3278                    },
3279                ],
3280            }),
3281            estimated_cost: 100.0,
3282            estimated_rows: 2,
3283        };
3284
3285        let query_result = executor.execute(&query_plan).unwrap();
3286        assert_eq!(query_result.rows.len(), 2);
3287    }
3288
3289    #[test]
3290    fn test_case_expression() {
3291        let row = Row { values: vec![Value::Integer(2)] };
3292        let columns = vec!["status".to_string()];
3293
3294        // CASE WHEN status = 1 THEN 'one' WHEN status = 2 THEN 'two' ELSE 'other' END
3295        let case_expr = PlanExpression::Case {
3296            operand: None,
3297            conditions: vec![
3298                (
3299                    PlanExpression::BinaryOp {
3300                        left: Box::new(PlanExpression::Column {
3301                            table: None,
3302                            name: "status".to_string(),
3303                            data_type: DataType::Integer,
3304                        }),
3305                        op: PlanBinaryOp::Equal,
3306                        right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
3307                    },
3308                    PlanExpression::Literal(PlanLiteral::String("one".to_string())),
3309                ),
3310                (
3311                    PlanExpression::BinaryOp {
3312                        left: Box::new(PlanExpression::Column {
3313                            table: None,
3314                            name: "status".to_string(),
3315                            data_type: DataType::Integer,
3316                        }),
3317                        op: PlanBinaryOp::Equal,
3318                        right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(2))),
3319                    },
3320                    PlanExpression::Literal(PlanLiteral::String("two".to_string())),
3321                ),
3322            ],
3323            else_result: Some(Box::new(PlanExpression::Literal(PlanLiteral::String("other".to_string())))),
3324        };
3325
3326        let result = evaluate_expression(&case_expr, &row, &columns).unwrap();
3327        assert_eq!(result, Value::String("two".to_string()));
3328    }
3329
3330    #[test]
3331    fn test_in_list_expression() {
3332        let row = Row { values: vec![Value::Integer(3)] };
3333        let columns = vec!["id".to_string()];
3334
3335        // id IN (1, 2, 3, 4, 5)
3336        let in_expr = PlanExpression::InList {
3337            expr: Box::new(PlanExpression::Column {
3338                table: None,
3339                name: "id".to_string(),
3340                data_type: DataType::Integer,
3341            }),
3342            list: vec![
3343                PlanExpression::Literal(PlanLiteral::Integer(1)),
3344                PlanExpression::Literal(PlanLiteral::Integer(2)),
3345                PlanExpression::Literal(PlanLiteral::Integer(3)),
3346                PlanExpression::Literal(PlanLiteral::Integer(4)),
3347                PlanExpression::Literal(PlanLiteral::Integer(5)),
3348            ],
3349            negated: false,
3350        };
3351
3352        let result = evaluate_expression(&in_expr, &row, &columns).unwrap();
3353        assert_eq!(result, Value::Boolean(true));
3354
3355        // id NOT IN (1, 2, 3, 4, 5) - should be false
3356        let not_in_expr = PlanExpression::InList {
3357            expr: Box::new(PlanExpression::Column {
3358                table: None,
3359                name: "id".to_string(),
3360                data_type: DataType::Integer,
3361            }),
3362            list: vec![
3363                PlanExpression::Literal(PlanLiteral::Integer(1)),
3364                PlanExpression::Literal(PlanLiteral::Integer(2)),
3365                PlanExpression::Literal(PlanLiteral::Integer(3)),
3366            ],
3367            negated: true,
3368        };
3369
3370        let result = evaluate_expression(&not_in_expr, &row, &columns).unwrap();
3371        assert_eq!(result, Value::Boolean(false));
3372    }
3373
3374    #[test]
3375    fn test_between_expression() {
3376        let row = Row { values: vec![Value::Integer(50)] };
3377        let columns = vec!["value".to_string()];
3378
3379        // value BETWEEN 10 AND 100
3380        let between_expr = PlanExpression::Between {
3381            expr: Box::new(PlanExpression::Column {
3382                table: None,
3383                name: "value".to_string(),
3384                data_type: DataType::Integer,
3385            }),
3386            low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
3387            high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(100))),
3388            negated: false,
3389        };
3390
3391        let result = evaluate_expression(&between_expr, &row, &columns).unwrap();
3392        assert_eq!(result, Value::Boolean(true));
3393
3394        // value NOT BETWEEN 10 AND 40 (50 is outside, should be true)
3395        let not_between_expr = PlanExpression::Between {
3396            expr: Box::new(PlanExpression::Column {
3397                table: None,
3398                name: "value".to_string(),
3399                data_type: DataType::Integer,
3400            }),
3401            low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
3402            high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(40))),
3403            negated: true,
3404        };
3405
3406        let result = evaluate_expression(&not_between_expr, &row, &columns).unwrap();
3407        assert_eq!(result, Value::Boolean(true));
3408    }
3409
3410    #[test]
3411    fn test_like_expression() {
3412        let row = Row { values: vec![Value::String("hello world".to_string())] };
3413        let columns = vec!["text".to_string()];
3414
3415        // text LIKE 'hello%'
3416        let like_expr = PlanExpression::Like {
3417            expr: Box::new(PlanExpression::Column {
3418                table: None,
3419                name: "text".to_string(),
3420                data_type: DataType::Text,
3421            }),
3422            pattern: Box::new(PlanExpression::Literal(PlanLiteral::String("hello%".to_string()))),
3423            negated: false,
3424        };
3425
3426        let result = evaluate_expression(&like_expr, &row, &columns).unwrap();
3427        assert_eq!(result, Value::Boolean(true));
3428
3429        // text LIKE '%world'
3430        let like_expr2 = PlanExpression::Like {
3431            expr: Box::new(PlanExpression::Column {
3432                table: None,
3433                name: "text".to_string(),
3434                data_type: DataType::Text,
3435            }),
3436            pattern: Box::new(PlanExpression::Literal(PlanLiteral::String("%world".to_string()))),
3437            negated: false,
3438        };
3439
3440        let result = evaluate_expression(&like_expr2, &row, &columns).unwrap();
3441        assert_eq!(result, Value::Boolean(true));
3442
3443        // text NOT LIKE '%foo%' (should be true since 'foo' is not in 'hello world')
3444        let not_like_expr = PlanExpression::Like {
3445            expr: Box::new(PlanExpression::Column {
3446                table: None,
3447                name: "text".to_string(),
3448                data_type: DataType::Text,
3449            }),
3450            pattern: Box::new(PlanExpression::Literal(PlanLiteral::String("%foo%".to_string()))),
3451            negated: true,
3452        };
3453
3454        let result = evaluate_expression(&not_like_expr, &row, &columns).unwrap();
3455        assert_eq!(result, Value::Boolean(true));
3456    }
3457
3458    #[test]
3459    fn test_alter_table() {
3460        use crate::planner::{CreateTableNode, CreateColumnDef, AlterTableNode, PlanAlterOperation};
3461
3462        let executor = Executor::new(ExecutionContext::new());
3463
3464        // Create a table
3465        let create_plan = QueryPlan {
3466            root: PlanNode::CreateTable(CreateTableNode {
3467                table_name: "alter_test".to_string(),
3468                columns: vec![
3469                    CreateColumnDef {
3470                        name: "id".to_string(),
3471                        data_type: DataType::Integer,
3472                        nullable: false,
3473                        default: None,
3474                        primary_key: true,
3475                        unique: false,
3476                    },
3477                ],
3478                constraints: vec![],
3479                if_not_exists: false,
3480            }),
3481            estimated_cost: 1.0,
3482            estimated_rows: 0,
3483        };
3484        executor.execute(&create_plan).unwrap();
3485
3486        // Add a column
3487        let add_column_plan = QueryPlan {
3488            root: PlanNode::AlterTable(AlterTableNode {
3489                table_name: "alter_test".to_string(),
3490                operations: vec![
3491                    PlanAlterOperation::AddColumn(CreateColumnDef {
3492                        name: "name".to_string(),
3493                        data_type: DataType::Text,
3494                        nullable: true,
3495                        default: None,
3496                        primary_key: false,
3497                        unique: false,
3498                    }),
3499                ],
3500            }),
3501            estimated_cost: 1.0,
3502            estimated_rows: 0,
3503        };
3504        let result = executor.execute(&add_column_plan).unwrap();
3505        match &result.rows[0].values[0] {
3506            Value::String(s) => assert!(s.contains("altered")),
3507            _ => panic!("Expected string result"),
3508        }
3509
3510        // Verify the column was added
3511        let schema = executor.context.read().unwrap().get_table_schema("alter_test").unwrap().clone();
3512        assert_eq!(schema.columns.len(), 2);
3513        assert_eq!(schema.columns[1].name, "name");
3514
3515        // Rename the column
3516        let rename_column_plan = QueryPlan {
3517            root: PlanNode::AlterTable(AlterTableNode {
3518                table_name: "alter_test".to_string(),
3519                operations: vec![
3520                    PlanAlterOperation::RenameColumn {
3521                        old_name: "name".to_string(),
3522                        new_name: "full_name".to_string(),
3523                    },
3524                ],
3525            }),
3526            estimated_cost: 1.0,
3527            estimated_rows: 0,
3528        };
3529        executor.execute(&rename_column_plan).unwrap();
3530
3531        // Verify the column was renamed
3532        let schema = executor.context.read().unwrap().get_table_schema("alter_test").unwrap().clone();
3533        assert_eq!(schema.columns[1].name, "full_name");
3534
3535        // Drop the column
3536        let drop_column_plan = QueryPlan {
3537            root: PlanNode::AlterTable(AlterTableNode {
3538                table_name: "alter_test".to_string(),
3539                operations: vec![
3540                    PlanAlterOperation::DropColumn {
3541                        name: "full_name".to_string(),
3542                        if_exists: false,
3543                    },
3544                ],
3545            }),
3546            estimated_cost: 1.0,
3547            estimated_rows: 0,
3548        };
3549        executor.execute(&drop_column_plan).unwrap();
3550
3551        // Verify the column was dropped
3552        let schema = executor.context.read().unwrap().get_table_schema("alter_test").unwrap().clone();
3553        assert_eq!(schema.columns.len(), 1);
3554    }
3555
3556    #[test]
3557    fn test_alter_table_constraints() {
3558        use crate::planner::{CreateTableNode, CreateColumnDef, AlterTableNode, PlanAlterOperation, CreateTableConstraint};
3559
3560        let executor = Executor::new(ExecutionContext::new());
3561
3562        // Create a table with two columns
3563        let create_plan = QueryPlan {
3564            root: PlanNode::CreateTable(CreateTableNode {
3565                table_name: "constraint_test".to_string(),
3566                columns: vec![
3567                    CreateColumnDef {
3568                        name: "id".to_string(),
3569                        data_type: DataType::Integer,
3570                        nullable: false,
3571                        default: None,
3572                        primary_key: false,
3573                        unique: false,
3574                    },
3575                    CreateColumnDef {
3576                        name: "email".to_string(),
3577                        data_type: DataType::Text,
3578                        nullable: true,
3579                        default: None,
3580                        primary_key: false,
3581                        unique: false,
3582                    },
3583                ],
3584                constraints: vec![],
3585                if_not_exists: false,
3586            }),
3587            estimated_cost: 1.0,
3588            estimated_rows: 0,
3589        };
3590        executor.execute(&create_plan).unwrap();
3591
3592        // Add a UNIQUE constraint
3593        let add_unique_plan = QueryPlan {
3594            root: PlanNode::AlterTable(AlterTableNode {
3595                table_name: "constraint_test".to_string(),
3596                operations: vec![
3597                    PlanAlterOperation::AddConstraint(CreateTableConstraint::Unique {
3598                        columns: vec!["email".to_string()],
3599                    }),
3600                ],
3601            }),
3602            estimated_cost: 1.0,
3603            estimated_rows: 0,
3604        };
3605        executor.execute(&add_unique_plan).unwrap();
3606
3607        // Verify the constraint was added
3608        let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3609        assert_eq!(schema.constraints.len(), 1);
3610        assert!(schema.constraints[0].name.contains("uq"));
3611
3612        // Add a PRIMARY KEY constraint
3613        let add_pk_plan = QueryPlan {
3614            root: PlanNode::AlterTable(AlterTableNode {
3615                table_name: "constraint_test".to_string(),
3616                operations: vec![
3617                    PlanAlterOperation::AddConstraint(CreateTableConstraint::PrimaryKey {
3618                        columns: vec!["id".to_string()],
3619                    }),
3620                ],
3621            }),
3622            estimated_cost: 1.0,
3623            estimated_rows: 0,
3624        };
3625        executor.execute(&add_pk_plan).unwrap();
3626
3627        // Verify the primary key was set
3628        let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3629        assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
3630        assert_eq!(schema.constraints.len(), 2);
3631
3632        // Try adding another PRIMARY KEY (should fail)
3633        let add_dup_pk_plan = QueryPlan {
3634            root: PlanNode::AlterTable(AlterTableNode {
3635                table_name: "constraint_test".to_string(),
3636                operations: vec![
3637                    PlanAlterOperation::AddConstraint(CreateTableConstraint::PrimaryKey {
3638                        columns: vec!["email".to_string()],
3639                    }),
3640                ],
3641            }),
3642            estimated_cost: 1.0,
3643            estimated_rows: 0,
3644        };
3645        let result = executor.execute(&add_dup_pk_plan);
3646        assert!(result.is_err());
3647
3648        // Drop the UNIQUE constraint by name
3649        let pk_constraint_name = {
3650            let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3651            schema.constraints.iter()
3652                .find(|c| matches!(c.constraint_type, StoredConstraintType::Unique { .. }))
3653                .map(|c| c.name.clone())
3654                .unwrap()
3655        };
3656
3657        let drop_constraint_plan = QueryPlan {
3658            root: PlanNode::AlterTable(AlterTableNode {
3659                table_name: "constraint_test".to_string(),
3660                operations: vec![
3661                    PlanAlterOperation::DropConstraint {
3662                        name: pk_constraint_name,
3663                    },
3664                ],
3665            }),
3666            estimated_cost: 1.0,
3667            estimated_rows: 0,
3668        };
3669        executor.execute(&drop_constraint_plan).unwrap();
3670
3671        // Verify the constraint was dropped
3672        let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3673        assert_eq!(schema.constraints.len(), 1);
3674        // Only the PK constraint should remain
3675        assert!(matches!(schema.constraints[0].constraint_type, StoredConstraintType::PrimaryKey { .. }));
3676    }
3677
3678    #[test]
3679    fn test_create_table_with_constraints() {
3680        use crate::planner::{CreateTableNode, CreateColumnDef, CreateTableConstraint};
3681
3682        let executor = Executor::new(ExecutionContext::new());
3683
3684        // Create a table with constraints
3685        let create_plan = QueryPlan {
3686            root: PlanNode::CreateTable(CreateTableNode {
3687                table_name: "users".to_string(),
3688                columns: vec![
3689                    CreateColumnDef {
3690                        name: "id".to_string(),
3691                        data_type: DataType::Integer,
3692                        nullable: false,
3693                        default: None,
3694                        primary_key: true,
3695                        unique: false,
3696                    },
3697                    CreateColumnDef {
3698                        name: "email".to_string(),
3699                        data_type: DataType::Text,
3700                        nullable: false,
3701                        default: None,
3702                        primary_key: false,
3703                        unique: true, // Column-level unique
3704                    },
3705                    CreateColumnDef {
3706                        name: "name".to_string(),
3707                        data_type: DataType::Text,
3708                        nullable: true,
3709                        default: None,
3710                        primary_key: false,
3711                        unique: false,
3712                    },
3713                ],
3714                constraints: vec![
3715                    CreateTableConstraint::Unique {
3716                        columns: vec!["name".to_string()],
3717                    },
3718                ],
3719                if_not_exists: false,
3720            }),
3721            estimated_cost: 1.0,
3722            estimated_rows: 0,
3723        };
3724        executor.execute(&create_plan).unwrap();
3725
3726        // Verify schema has primary key and constraints
3727        let schema = executor.context.read().unwrap().get_table_schema("users").unwrap().clone();
3728        assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
3729        // Should have: pk, uq for name, uq for email (column-level)
3730        assert!(schema.constraints.len() >= 2);
3731    }
3732
3733    #[test]
3734    fn test_column_default_values() {
3735        use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource, PlanExpression, PlanLiteral};
3736
3737        let executor = Executor::new(ExecutionContext::new());
3738
3739        // Create a table with a default value
3740        let create_plan = QueryPlan {
3741            root: PlanNode::CreateTable(CreateTableNode {
3742                table_name: "defaults_test".to_string(),
3743                columns: vec![
3744                    CreateColumnDef {
3745                        name: "id".to_string(),
3746                        data_type: DataType::Integer,
3747                        nullable: false,
3748                        default: None,
3749                        primary_key: true,
3750                        unique: false,
3751                    },
3752                    CreateColumnDef {
3753                        name: "status".to_string(),
3754                        data_type: DataType::Text,
3755                        nullable: false,
3756                        default: Some(PlanExpression::Literal(PlanLiteral::String("active".to_string()))),
3757                        primary_key: false,
3758                        unique: false,
3759                    },
3760                    CreateColumnDef {
3761                        name: "count".to_string(),
3762                        data_type: DataType::Integer,
3763                        nullable: true,
3764                        default: Some(PlanExpression::Literal(PlanLiteral::Integer(0))),
3765                        primary_key: false,
3766                        unique: false,
3767                    },
3768                ],
3769                constraints: vec![],
3770                if_not_exists: false,
3771            }),
3772            estimated_cost: 1.0,
3773            estimated_rows: 0,
3774        };
3775        executor.execute(&create_plan).unwrap();
3776
3777        // Verify default values are stored in schema
3778        let schema = executor.context.read().unwrap().get_table_schema("defaults_test").unwrap().clone();
3779        assert_eq!(schema.columns[1].default, Some(Value::String("active".to_string())));
3780        assert_eq!(schema.columns[2].default, Some(Value::Integer(0)));
3781
3782        // Insert a row specifying only the id column
3783        let insert_plan = QueryPlan {
3784            root: PlanNode::Insert(InsertNode {
3785                table_name: "defaults_test".to_string(),
3786                columns: vec!["id".to_string()],
3787                source: InsertPlanSource::Values(vec![
3788                    vec![PlanExpression::Literal(PlanLiteral::Integer(1))],
3789                ]),
3790            }),
3791            estimated_cost: 1.0,
3792            estimated_rows: 1,
3793        };
3794        executor.execute(&insert_plan).unwrap();
3795
3796        // Verify the default values were used
3797        let context = executor.context.read().unwrap();
3798        let table = context.get_table("defaults_test").unwrap();
3799        let table_data = table.read().unwrap();
3800
3801        assert_eq!(table_data.rows.len(), 1);
3802        assert_eq!(table_data.rows[0].values[0], Value::Integer(1)); // id
3803        assert_eq!(table_data.rows[0].values[1], Value::String("active".to_string())); // status default
3804        assert_eq!(table_data.rows[0].values[2], Value::Integer(0)); // count default
3805    }
3806
3807    #[test]
3808    fn test_alter_column_default() {
3809        use crate::planner::{CreateTableNode, CreateColumnDef, AlterTableNode, PlanAlterOperation, PlanExpression, PlanLiteral};
3810
3811        let executor = Executor::new(ExecutionContext::new());
3812
3813        // Create a table
3814        let create_plan = QueryPlan {
3815            root: PlanNode::CreateTable(CreateTableNode {
3816                table_name: "alter_default_test".to_string(),
3817                columns: vec![
3818                    CreateColumnDef {
3819                        name: "id".to_string(),
3820                        data_type: DataType::Integer,
3821                        nullable: false,
3822                        default: None,
3823                        primary_key: true,
3824                        unique: false,
3825                    },
3826                    CreateColumnDef {
3827                        name: "value".to_string(),
3828                        data_type: DataType::Integer,
3829                        nullable: true,
3830                        default: None,
3831                        primary_key: false,
3832                        unique: false,
3833                    },
3834                ],
3835                constraints: vec![],
3836                if_not_exists: false,
3837            }),
3838            estimated_cost: 1.0,
3839            estimated_rows: 0,
3840        };
3841        executor.execute(&create_plan).unwrap();
3842
3843        // Alter column to add a default
3844        let alter_plan = QueryPlan {
3845            root: PlanNode::AlterTable(AlterTableNode {
3846                table_name: "alter_default_test".to_string(),
3847                operations: vec![
3848                    PlanAlterOperation::AlterColumn {
3849                        name: "value".to_string(),
3850                        data_type: None,
3851                        set_not_null: None,
3852                        set_default: Some(Some(PlanExpression::Literal(PlanLiteral::Integer(42)))),
3853                    },
3854                ],
3855            }),
3856            estimated_cost: 1.0,
3857            estimated_rows: 0,
3858        };
3859        executor.execute(&alter_plan).unwrap();
3860
3861        // Verify the default was set
3862        let schema = executor.context.read().unwrap().get_table_schema("alter_default_test").unwrap().clone();
3863        assert_eq!(schema.columns[1].default, Some(Value::Integer(42)));
3864
3865        // Alter column to drop the default
3866        let drop_default_plan = QueryPlan {
3867            root: PlanNode::AlterTable(AlterTableNode {
3868                table_name: "alter_default_test".to_string(),
3869                operations: vec![
3870                    PlanAlterOperation::AlterColumn {
3871                        name: "value".to_string(),
3872                        data_type: None,
3873                        set_not_null: None,
3874                        set_default: Some(None), // Drop default
3875                    },
3876                ],
3877            }),
3878            estimated_cost: 1.0,
3879            estimated_rows: 0,
3880        };
3881        executor.execute(&drop_default_plan).unwrap();
3882
3883        // Verify the default was removed
3884        let schema = executor.context.read().unwrap().get_table_schema("alter_default_test").unwrap().clone();
3885        assert_eq!(schema.columns[1].default, None);
3886    }
3887
3888    #[test]
3889    fn test_parameterized_query() {
3890        use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource, FilterNode};
3891
3892        let executor = Executor::new(ExecutionContext::new());
3893
3894        // Create a test table
3895        let create_plan = QueryPlan {
3896            root: PlanNode::CreateTable(CreateTableNode {
3897                table_name: "params_test".to_string(),
3898                columns: vec![
3899                    CreateColumnDef {
3900                        name: "id".to_string(),
3901                        data_type: DataType::Integer,
3902                        nullable: false,
3903                        default: None,
3904                        primary_key: true,
3905                        unique: false,
3906                    },
3907                    CreateColumnDef {
3908                        name: "name".to_string(),
3909                        data_type: DataType::Text,
3910                        nullable: true,
3911                        default: None,
3912                        primary_key: false,
3913                        unique: false,
3914                    },
3915                ],
3916                constraints: vec![],
3917                if_not_exists: false,
3918            }),
3919            estimated_cost: 1.0,
3920            estimated_rows: 0,
3921        };
3922        executor.execute(&create_plan).unwrap();
3923
3924        // Insert some data
3925        let insert_plan = QueryPlan {
3926            root: PlanNode::Insert(InsertNode {
3927                table_name: "params_test".to_string(),
3928                columns: vec!["id".to_string(), "name".to_string()],
3929                source: InsertPlanSource::Values(vec![
3930                    vec![
3931                        PlanExpression::Literal(PlanLiteral::Integer(1)),
3932                        PlanExpression::Literal(PlanLiteral::String("Alice".to_string())),
3933                    ],
3934                    vec![
3935                        PlanExpression::Literal(PlanLiteral::Integer(2)),
3936                        PlanExpression::Literal(PlanLiteral::String("Bob".to_string())),
3937                    ],
3938                    vec![
3939                        PlanExpression::Literal(PlanLiteral::Integer(3)),
3940                        PlanExpression::Literal(PlanLiteral::String("Charlie".to_string())),
3941                    ],
3942                ]),
3943            }),
3944            estimated_cost: 1.0,
3945            estimated_rows: 3,
3946        };
3947        executor.execute(&insert_plan).unwrap();
3948
3949        // Create a query with a placeholder: SELECT * FROM params_test WHERE id = $1
3950        let query_plan = QueryPlan {
3951            root: PlanNode::Filter(FilterNode {
3952                input: Box::new(PlanNode::Scan(ScanNode {
3953                    table_name: "params_test".to_string(),
3954                    alias: None,
3955                    columns: vec!["id".to_string(), "name".to_string()],
3956                    index_scan: None,
3957                })),
3958                predicate: PlanExpression::BinaryOp {
3959                    left: Box::new(PlanExpression::Column {
3960                        table: None,
3961                        name: "id".to_string(),
3962                        data_type: DataType::Integer,
3963                    }),
3964                    op: PlanBinaryOp::Equal,
3965                    right: Box::new(PlanExpression::Placeholder(1)),
3966                },
3967            }),
3968            estimated_cost: 10.0,
3969            estimated_rows: 1,
3970        };
3971
3972        // Execute with parameter $1 = 2 (should find Bob)
3973        let result = executor.execute_with_params(&query_plan, &[Value::Integer(2)]).unwrap();
3974        assert_eq!(result.rows.len(), 1);
3975        assert_eq!(result.rows[0].values[0], Value::Integer(2));
3976        assert_eq!(result.rows[0].values[1], Value::String("Bob".to_string()));
3977
3978        // Execute with parameter $1 = 1 (should find Alice)
3979        let result = executor.execute_with_params(&query_plan, &[Value::Integer(1)]).unwrap();
3980        assert_eq!(result.rows.len(), 1);
3981        assert_eq!(result.rows[0].values[1], Value::String("Alice".to_string()));
3982
3983        // Execute with parameter $1 = 99 (should find nothing)
3984        let result = executor.execute_with_params(&query_plan, &[Value::Integer(99)]).unwrap();
3985        assert_eq!(result.rows.len(), 0);
3986    }
3987
3988    #[test]
3989    fn test_parameterized_insert() {
3990        use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource};
3991
3992        let executor = Executor::new(ExecutionContext::new());
3993
3994        // Create a test table
3995        let create_plan = QueryPlan {
3996            root: PlanNode::CreateTable(CreateTableNode {
3997                table_name: "param_insert_test".to_string(),
3998                columns: vec![
3999                    CreateColumnDef {
4000                        name: "id".to_string(),
4001                        data_type: DataType::Integer,
4002                        nullable: false,
4003                        default: None,
4004                        primary_key: true,
4005                        unique: false,
4006                    },
4007                    CreateColumnDef {
4008                        name: "value".to_string(),
4009                        data_type: DataType::Text,
4010                        nullable: true,
4011                        default: None,
4012                        primary_key: false,
4013                        unique: false,
4014                    },
4015                ],
4016                constraints: vec![],
4017                if_not_exists: false,
4018            }),
4019            estimated_cost: 1.0,
4020            estimated_rows: 0,
4021        };
4022        executor.execute(&create_plan).unwrap();
4023
4024        // Create an INSERT with placeholders: INSERT INTO param_insert_test (id, value) VALUES ($1, $2)
4025        let insert_plan = QueryPlan {
4026            root: PlanNode::Insert(InsertNode {
4027                table_name: "param_insert_test".to_string(),
4028                columns: vec!["id".to_string(), "value".to_string()],
4029                source: InsertPlanSource::Values(vec![
4030                    vec![
4031                        PlanExpression::Placeholder(1),
4032                        PlanExpression::Placeholder(2),
4033                    ],
4034                ]),
4035            }),
4036            estimated_cost: 1.0,
4037            estimated_rows: 1,
4038        };
4039
4040        // Execute insert with parameters
4041        executor.execute_with_params(&insert_plan, &[
4042            Value::Integer(42),
4043            Value::String("test value".to_string()),
4044        ]).unwrap();
4045
4046        // Verify the data was inserted
4047        let context = executor.context.read().unwrap();
4048        let table = context.get_table("param_insert_test").unwrap();
4049        let table_data = table.read().unwrap();
4050
4051        assert_eq!(table_data.rows.len(), 1);
4052        assert_eq!(table_data.rows[0].values[0], Value::Integer(42));
4053        assert_eq!(table_data.rows[0].values[1], Value::String("test value".to_string()));
4054    }
4055}