Skip to main content

aegis_query/
executor.rs

1//! Aegis Query Executor
2//!
3//! Executes query plans using a Volcano-style iterator model.
4//! Implements vectorized execution for improved performance.
5//!
6//! Key Features:
7//! - Pull-based iterator execution
8//! - Vectorized batch processing
9//! - Memory-efficient result streaming
10//! - Support for parallel execution
11//!
12//! @version 0.1.0
13//! @author AutomataNexus Development Team
14
15use crate::ast::SetOperationType;
16use crate::index::{IndexError, IndexKey, IndexType, TableIndexManager};
17use crate::planner::{
18    AggregateExpr, AggregateFunction, AggregateNode, AlterTableNode, CreateIndexNode,
19    CreateTableConstraint, CreateTableNode, DeleteNode, DropIndexNode, DropTableNode, FilterNode,
20    InsertNode, InsertPlanSource, JoinNode, JoinStrategy, LimitNode, PlanAlterOperation,
21    PlanBinaryOp, PlanExpression, PlanJoinType, PlanLiteral, PlanNode, PlanUnaryOp, ProjectNode,
22    ProjectionExpr, QueryPlan, ScanNode, SetOperationNode, SortKey, SortNode, UpdateNode,
23};
24use aegis_common::{DataType, Row, Value};
25use std::collections::{HashMap, HashSet};
26use std::sync::{Arc, RwLock};
27use thiserror::Error;
28
29// =============================================================================
30// Error Types
31// =============================================================================
32
33#[derive(Debug, Error)]
34pub enum ExecutorError {
35    #[error("Table not found: {0}")]
36    TableNotFound(String),
37
38    #[error("Column not found: {0}")]
39    ColumnNotFound(String),
40
41    #[error("Type mismatch: expected {expected}, got {actual}")]
42    TypeMismatch { expected: String, actual: String },
43
44    #[error("Division by zero")]
45    DivisionByZero,
46
47    #[error("Invalid operation: {0}")]
48    InvalidOperation(String),
49
50    #[error("Execution error: {0}")]
51    Internal(String),
52
53    #[error("Index error: {0}")]
54    IndexError(String),
55}
56
57impl From<IndexError> for ExecutorError {
58    fn from(e: IndexError) -> Self {
59        ExecutorError::IndexError(e.to_string())
60    }
61}
62
63pub type ExecutorResult<T> = Result<T, ExecutorError>;
64
65// =============================================================================
66// Execution Context
67// =============================================================================
68
69/// Context for query execution.
70pub struct ExecutionContext {
71    tables: HashMap<String, Arc<RwLock<TableData>>>,
72    table_schemas: HashMap<String, TableSchema>,
73    /// Index metadata (for persistence)
74    indexes: HashMap<String, Vec<IndexSchema>>,
75    /// Actual index data structures
76    table_indexes: HashMap<String, Arc<TableIndexManager>>,
77    batch_size: usize,
78    /// Global version clock — incremented on every mutation.
79    version_clock: u64,
80    /// Snapshot version for the current transaction (0 = no active transaction, see all live rows).
81    snapshot_version: Option<u64>,
82}
83
84/// In-memory table data for execution with row-level versioning.
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
86pub struct TableData {
87    pub name: String,
88    pub columns: Vec<String>,
89    pub rows: Vec<Row>,
90    /// Version at which each row was created (parallel to `rows`).
91    /// Readers with a snapshot version < created_version won't see the row.
92    #[serde(default)]
93    pub row_created_version: Vec<u64>,
94    /// Version at which each row was deleted (0 = alive).
95    #[serde(default)]
96    pub row_deleted_version: Vec<u64>,
97}
98
99/// Stored table constraint.
100#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
101pub struct StoredConstraint {
102    pub name: String,
103    pub constraint_type: StoredConstraintType,
104}
105
106/// Stored constraint type.
107#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
108pub enum StoredConstraintType {
109    PrimaryKey {
110        columns: Vec<String>,
111    },
112    Unique {
113        columns: Vec<String>,
114    },
115    ForeignKey {
116        columns: Vec<String>,
117        ref_table: String,
118        ref_columns: Vec<String>,
119    },
120    Check {
121        expression_text: String,
122    },
123}
124
125/// Table schema information.
126#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
127pub struct TableSchema {
128    pub name: String,
129    pub columns: Vec<ColumnSchema>,
130    pub primary_key: Option<Vec<String>>,
131    pub constraints: Vec<StoredConstraint>,
132}
133
134/// Column schema information.
135#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
136pub struct ColumnSchema {
137    pub name: String,
138    pub data_type: DataType,
139    pub nullable: bool,
140    pub default: Option<Value>,
141}
142
143/// Index schema information.
144#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145pub struct IndexSchema {
146    pub name: String,
147    pub table: String,
148    pub columns: Vec<String>,
149    pub unique: bool,
150}
151
152/// Serializable snapshot of the execution context for persistence.
153#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
154pub struct ExecutionContextSnapshot {
155    pub tables: Vec<TableData>,
156    pub schemas: Vec<TableSchema>,
157    pub indexes: HashMap<String, Vec<IndexSchema>>,
158}
159
160impl ExecutionContext {
161    pub fn new() -> Self {
162        Self {
163            tables: HashMap::new(),
164            table_schemas: HashMap::new(),
165            indexes: HashMap::new(),
166            table_indexes: HashMap::new(),
167            batch_size: 1024,
168            version_clock: 1,
169            snapshot_version: None,
170        }
171    }
172
173    /// Take an MVCC snapshot — subsequent reads will only see rows visible at this version.
174    pub fn begin_snapshot(&mut self) {
175        self.snapshot_version = Some(self.version_clock);
176    }
177
178    /// Release the MVCC snapshot and advance the clock.
179    pub fn commit_snapshot(&mut self) {
180        self.version_clock += 1;
181        self.snapshot_version = None;
182    }
183
184    /// Release the MVCC snapshot without advancing.
185    pub fn rollback_snapshot(&mut self) {
186        self.snapshot_version = None;
187    }
188
189    /// Current version clock value.
190    pub fn current_version(&self) -> u64 {
191        self.version_clock
192    }
193
194    /// Check if a row at index `i` is visible given the current snapshot.
195    fn is_row_visible(&self, table: &TableData, i: usize) -> bool {
196        let created = table.row_created_version.get(i).copied().unwrap_or(0);
197        let deleted = table.row_deleted_version.get(i).copied().unwrap_or(0);
198
199        if let Some(snap) = self.snapshot_version {
200            // Snapshot isolation: row must have been created before snapshot and not deleted before snapshot
201            created <= snap && (deleted == 0 || deleted > snap)
202        } else {
203            // No active snapshot: row is visible if alive
204            deleted == 0
205        }
206    }
207
208    pub fn with_batch_size(mut self, size: usize) -> Self {
209        self.batch_size = size;
210        self
211    }
212
213    pub fn add_table(&mut self, table: TableData) {
214        self.tables
215            .insert(table.name.clone(), Arc::new(RwLock::new(table)));
216    }
217
218    pub fn get_table(&self, name: &str) -> Option<Arc<RwLock<TableData>>> {
219        self.tables.get(name).cloned()
220    }
221
222    pub fn batch_size(&self) -> usize {
223        self.batch_size
224    }
225
226    // ==========================================================================
227    // DDL Operations
228    // ==========================================================================
229
230    /// Create a new table.
231    pub fn create_table(
232        &mut self,
233        name: String,
234        columns: Vec<ColumnSchema>,
235        primary_key: Option<Vec<String>>,
236        constraints: Vec<StoredConstraint>,
237        if_not_exists: bool,
238    ) -> ExecutorResult<()> {
239        if self.tables.contains_key(&name) {
240            if if_not_exists {
241                return Ok(());
242            }
243            return Err(ExecutorError::InvalidOperation(format!(
244                "Table '{}' already exists",
245                name
246            )));
247        }
248
249        let column_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
250
251        // Create table schema
252        let schema = TableSchema {
253            name: name.clone(),
254            columns,
255            primary_key,
256            constraints,
257        };
258        self.table_schemas.insert(name.clone(), schema);
259
260        // Create empty table data
261        let table_data = TableData {
262            name: name.clone(),
263            columns: column_names,
264            rows: Vec::new(),
265            row_created_version: Vec::new(),
266            row_deleted_version: Vec::new(),
267        };
268        self.tables.insert(name, Arc::new(RwLock::new(table_data)));
269
270        Ok(())
271    }
272
273    /// Drop a table.
274    pub fn drop_table(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
275        if !self.tables.contains_key(name) {
276            if if_exists {
277                return Ok(());
278            }
279            return Err(ExecutorError::TableNotFound(name.to_string()));
280        }
281
282        self.tables.remove(name);
283        self.table_schemas.remove(name);
284        self.indexes.remove(name);
285
286        Ok(())
287    }
288
289    /// Alter a table.
290    pub fn alter_table(&mut self, name: &str, op: &PlanAlterOperation) -> ExecutorResult<()> {
291        let schema = self
292            .table_schemas
293            .get_mut(name)
294            .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
295
296        match op {
297            PlanAlterOperation::AddColumn(col) => {
298                // Check if column already exists
299                if schema.columns.iter().any(|c| c.name == col.name) {
300                    return Err(ExecutorError::InvalidOperation(format!(
301                        "Column '{}' already exists in table '{}'",
302                        col.name, name
303                    )));
304                }
305
306                // Evaluate default expression if provided
307                let default = col
308                    .default
309                    .as_ref()
310                    .map(evaluate_default_expression)
311                    .transpose()?;
312
313                schema.columns.push(ColumnSchema {
314                    name: col.name.clone(),
315                    data_type: col.data_type.clone(),
316                    nullable: col.nullable,
317                    default: default.clone(),
318                });
319
320                // Add default (or null) values to existing rows
321                let fill_value = default.unwrap_or(Value::Null);
322                if let Some(table_data) = self.tables.get(name) {
323                    let mut table = table_data
324                        .write()
325                        .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
326                    for row in table.rows.iter_mut() {
327                        row.values.push(fill_value.clone());
328                    }
329                }
330            }
331            PlanAlterOperation::DropColumn {
332                name: col_name,
333                if_exists,
334            } => {
335                let col_idx = schema.columns.iter().position(|c| c.name == *col_name);
336                match col_idx {
337                    Some(idx) => {
338                        schema.columns.remove(idx);
339                        // Remove values from existing rows
340                        if let Some(table_data) = self.tables.get(name) {
341                            let mut table = table_data.write().map_err(|_| {
342                                ExecutorError::Internal("Lock poisoned".to_string())
343                            })?;
344                            for row in table.rows.iter_mut() {
345                                if idx < row.values.len() {
346                                    row.values.remove(idx);
347                                }
348                            }
349                        }
350                    }
351                    None if *if_exists => {}
352                    None => {
353                        return Err(ExecutorError::ColumnNotFound(col_name.clone()));
354                    }
355                }
356            }
357            PlanAlterOperation::RenameColumn { old_name, new_name } => {
358                let col = schema
359                    .columns
360                    .iter_mut()
361                    .find(|c| c.name == *old_name)
362                    .ok_or_else(|| ExecutorError::ColumnNotFound(old_name.clone()))?;
363                col.name = new_name.clone();
364            }
365            PlanAlterOperation::AlterColumn {
366                name: col_name,
367                data_type,
368                set_not_null,
369                set_default,
370            } => {
371                let col = schema
372                    .columns
373                    .iter_mut()
374                    .find(|c| c.name == *col_name)
375                    .ok_or_else(|| ExecutorError::ColumnNotFound(col_name.clone()))?;
376
377                if let Some(dt) = data_type {
378                    col.data_type = dt.clone();
379                }
380                if let Some(not_null) = set_not_null {
381                    col.nullable = !not_null;
382                }
383                // Handle set_default: None = no change, Some(None) = drop default, Some(Some(expr)) = set new default
384                if let Some(new_default) = set_default {
385                    col.default = new_default
386                        .as_ref()
387                        .map(evaluate_default_expression)
388                        .transpose()?;
389                }
390            }
391            PlanAlterOperation::RenameTable { new_name } => {
392                // Move data to new table name
393                if let Some(rows) = self.tables.remove(name) {
394                    self.tables.insert(new_name.clone(), rows);
395                }
396                if let Some(mut schema) = self.table_schemas.remove(name) {
397                    schema.name = new_name.clone();
398                    self.table_schemas.insert(new_name.clone(), schema);
399                }
400                if let Some(indexes) = self.indexes.remove(name) {
401                    self.indexes.insert(new_name.clone(), indexes);
402                }
403                // Return early since name is now invalid
404                return Ok(());
405            }
406            PlanAlterOperation::AddConstraint(constraint) => {
407                // Pre-validate foreign key reference before getting mutable borrow
408                if let CreateTableConstraint::ForeignKey { ref_table, .. } = constraint {
409                    if !self.table_schemas.contains_key(ref_table) {
410                        return Err(ExecutorError::TableNotFound(ref_table.clone()));
411                    }
412                }
413
414                // Now safe to get mutable reference for the rest of the operation
415                let schema = self
416                    .table_schemas
417                    .get_mut(name)
418                    .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
419
420                let constraint_count = schema.constraints.len() + 1;
421                let (name_suffix, constraint_type) = match constraint {
422                    CreateTableConstraint::PrimaryKey { columns } => {
423                        // Check if primary key already exists
424                        if schema.primary_key.is_some() {
425                            return Err(ExecutorError::InvalidOperation(format!(
426                                "Table '{}' already has a primary key",
427                                name
428                            )));
429                        }
430                        schema.primary_key = Some(columns.clone());
431                        (
432                            "pk",
433                            StoredConstraintType::PrimaryKey {
434                                columns: columns.clone(),
435                            },
436                        )
437                    }
438                    CreateTableConstraint::Unique { columns } => {
439                        // Validate columns exist
440                        for col in columns {
441                            if !schema.columns.iter().any(|c| c.name == *col) {
442                                return Err(ExecutorError::ColumnNotFound(col.clone()));
443                            }
444                        }
445                        (
446                            "uq",
447                            StoredConstraintType::Unique {
448                                columns: columns.clone(),
449                            },
450                        )
451                    }
452                    CreateTableConstraint::ForeignKey {
453                        columns,
454                        ref_table,
455                        ref_columns,
456                    } => {
457                        // Validate columns exist
458                        for col in columns {
459                            if !schema.columns.iter().any(|c| c.name == *col) {
460                                return Err(ExecutorError::ColumnNotFound(col.clone()));
461                            }
462                        }
463                        // ref_table validated above
464                        (
465                            "fk",
466                            StoredConstraintType::ForeignKey {
467                                columns: columns.clone(),
468                                ref_table: ref_table.clone(),
469                                ref_columns: ref_columns.clone(),
470                            },
471                        )
472                    }
473                    CreateTableConstraint::Check { expression } => (
474                        "ck",
475                        StoredConstraintType::Check {
476                            expression_text: format!("{:?}", expression),
477                        },
478                    ),
479                };
480                schema.constraints.push(StoredConstraint {
481                    name: format!("{}_{}{}", name, name_suffix, constraint_count),
482                    constraint_type,
483                });
484                return Ok(());
485            }
486            PlanAlterOperation::DropConstraint {
487                name: constraint_name,
488            } => {
489                let pos = schema
490                    .constraints
491                    .iter()
492                    .position(|c| c.name == *constraint_name);
493                match pos {
494                    Some(idx) => {
495                        let removed = schema.constraints.remove(idx);
496                        // If dropping primary key constraint, also clear primary_key field
497                        if matches!(
498                            removed.constraint_type,
499                            StoredConstraintType::PrimaryKey { .. }
500                        ) {
501                            schema.primary_key = None;
502                        }
503                    }
504                    None => {
505                        return Err(ExecutorError::InvalidOperation(format!(
506                            "Constraint '{}' not found on table '{}'",
507                            constraint_name, name
508                        )));
509                    }
510                }
511            }
512        }
513
514        Ok(())
515    }
516
517    /// Create an index.
518    pub fn create_index(
519        &mut self,
520        name: String,
521        table: String,
522        columns: Vec<String>,
523        unique: bool,
524        if_not_exists: bool,
525    ) -> ExecutorResult<()> {
526        let table_data = self
527            .tables
528            .get(&table)
529            .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?
530            .clone();
531
532        let indexes = self.indexes.entry(table.clone()).or_default();
533
534        // Check if index already exists
535        if indexes.iter().any(|idx| idx.name == name) {
536            if if_not_exists {
537                return Ok(());
538            }
539            return Err(ExecutorError::InvalidOperation(format!(
540                "Index '{}' already exists",
541                name
542            )));
543        }
544
545        // Store metadata
546        indexes.push(IndexSchema {
547            name: name.clone(),
548            table: table.clone(),
549            columns: columns.clone(),
550            unique,
551        });
552
553        // Create actual index structure
554        let index_manager = self
555            .table_indexes
556            .entry(table.clone())
557            .or_insert_with(|| Arc::new(TableIndexManager::new(table.clone())));
558
559        // Use B-tree by default (supports range queries)
560        Arc::get_mut(index_manager)
561            .ok_or_else(|| {
562                ExecutorError::Internal("Cannot modify shared index manager".to_string())
563            })?
564            .create_index(name.clone(), columns.clone(), unique, IndexType::BTree)?;
565
566        // Populate the index with existing data
567        let table_guard = table_data
568            .read()
569            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
570
571        let index_manager = self
572            .table_indexes
573            .get(&table)
574            .expect("index_manager was just inserted for this table");
575        for (row_id, row) in table_guard.rows.iter().enumerate() {
576            let column_values: HashMap<String, Value> = table_guard
577                .columns
578                .iter()
579                .zip(row.values.iter())
580                .map(|(col, val)| (col.clone(), val.clone()))
581                .collect();
582
583            // Build index key from specified columns
584            let key_values: Vec<crate::index::IndexValue> = columns
585                .iter()
586                .map(|col| {
587                    column_values
588                        .get(col)
589                        .map(IndexKey::from_value)
590                        .unwrap_or(crate::index::IndexValue::Null)
591                })
592                .collect();
593            let key = IndexKey::new(key_values);
594
595            // Insert into the index using public API
596            index_manager.insert_into_index(&name, key, row_id)?;
597        }
598
599        Ok(())
600    }
601
602    /// Drop an index.
603    pub fn drop_index(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
604        let mut found = false;
605        let mut table_name = None;
606
607        // Find and remove from metadata
608        for (tbl, indexes) in self.indexes.iter_mut() {
609            if let Some(pos) = indexes.iter().position(|idx| idx.name == name) {
610                indexes.remove(pos);
611                table_name = Some(tbl.clone());
612                found = true;
613                break;
614            }
615        }
616
617        // Remove from actual index structure
618        if let Some(tbl) = table_name {
619            if let Some(manager) = self.table_indexes.get_mut(&tbl) {
620                if let Some(m) = Arc::get_mut(manager) {
621                    let _ = m.drop_index(name);
622                }
623            }
624        }
625
626        if !found && !if_exists {
627            return Err(ExecutorError::InvalidOperation(format!(
628                "Index '{}' not found",
629                name
630            )));
631        }
632
633        Ok(())
634    }
635
636    /// Get the index manager for a table.
637    pub fn get_index_manager(&self, table: &str) -> Option<Arc<TableIndexManager>> {
638        self.table_indexes.get(table).cloned()
639    }
640
641    /// Get index metadata for a table.
642    pub fn get_indexes(&self, table: &str) -> Option<&Vec<IndexSchema>> {
643        self.indexes.get(table)
644    }
645
646    /// Get table schema.
647    pub fn get_table_schema(&self, name: &str) -> Option<&TableSchema> {
648        self.table_schemas.get(name)
649    }
650
651    /// List all table names.
652    pub fn list_tables(&self) -> Vec<String> {
653        self.tables.keys().cloned().collect()
654    }
655
656    // ==========================================================================
657    // Persistence Operations
658    // ==========================================================================
659
660    /// Create a serializable snapshot of the execution context.
661    pub fn to_snapshot(&self) -> ExecutionContextSnapshot {
662        let tables: Vec<TableData> = self
663            .tables
664            .values()
665            .filter_map(|t| t.read().ok().map(|t| t.clone()))
666            .collect();
667
668        ExecutionContextSnapshot {
669            tables,
670            schemas: self.table_schemas.values().cloned().collect(),
671            indexes: self.indexes.clone(),
672        }
673    }
674
675    /// Restore execution context from a snapshot.
676    pub fn from_snapshot(snapshot: ExecutionContextSnapshot) -> Self {
677        let mut ctx = Self::new();
678
679        // Restore schemas first
680        for schema in snapshot.schemas {
681            ctx.table_schemas.insert(schema.name.clone(), schema);
682        }
683
684        // Restore tables
685        for table in snapshot.tables {
686            ctx.tables
687                .insert(table.name.clone(), Arc::new(RwLock::new(table)));
688        }
689
690        // Store index metadata
691        ctx.indexes = snapshot.indexes.clone();
692
693        // Rebuild actual index structures from metadata
694        for (table_name, index_schemas) in snapshot.indexes {
695            for index_schema in index_schemas {
696                // Rebuild each index
697                let _ = ctx.rebuild_index(
698                    &index_schema.name,
699                    &table_name,
700                    &index_schema.columns,
701                    index_schema.unique,
702                );
703            }
704        }
705
706        ctx
707    }
708
709    /// Restore this context in-place from a snapshot (used for ROLLBACK).
710    /// Resets MVCC version_clock and snapshot_version to ensure consistency.
711    pub fn restore_from_snapshot(&mut self, snapshot: ExecutionContextSnapshot) {
712        self.tables.clear();
713        self.table_schemas.clear();
714        self.indexes.clear();
715        self.table_indexes.clear();
716        // Reset MVCC state to pre-transaction baseline
717        self.snapshot_version = None;
718
719        for schema in snapshot.schemas {
720            self.table_schemas.insert(schema.name.clone(), schema);
721        }
722        for table in snapshot.tables {
723            self.tables
724                .insert(table.name.clone(), Arc::new(RwLock::new(table)));
725        }
726        self.indexes = snapshot.indexes.clone();
727        for (table_name, index_schemas) in snapshot.indexes {
728            for index_schema in index_schemas {
729                let _ = self.rebuild_index(
730                    &index_schema.name,
731                    &table_name,
732                    &index_schema.columns,
733                    index_schema.unique,
734                );
735            }
736        }
737    }
738
739    /// Rebuild an index from existing table data.
740    fn rebuild_index(
741        &mut self,
742        name: &str,
743        table: &str,
744        columns: &[String],
745        unique: bool,
746    ) -> ExecutorResult<()> {
747        let table_data = self
748            .tables
749            .get(table)
750            .ok_or_else(|| ExecutorError::TableNotFound(table.to_string()))?
751            .clone();
752
753        // Create index manager if needed
754        let index_manager = self
755            .table_indexes
756            .entry(table.to_string())
757            .or_insert_with(|| Arc::new(TableIndexManager::new(table.to_string())));
758
759        // Create the index structure
760        if let Some(m) = Arc::get_mut(index_manager) {
761            m.create_index(name.to_string(), columns.to_vec(), unique, IndexType::BTree)?;
762        } else {
763            return Err(ExecutorError::Internal(
764                "Cannot modify shared index manager".to_string(),
765            ));
766        }
767
768        // Populate with existing data
769        let table_guard = table_data
770            .read()
771            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
772
773        let index_manager = self
774            .table_indexes
775            .get(table)
776            .expect("index_manager was just inserted for this table");
777        for (row_id, row) in table_guard.rows.iter().enumerate() {
778            let column_values: HashMap<String, Value> = table_guard
779                .columns
780                .iter()
781                .zip(row.values.iter())
782                .map(|(col, val)| (col.clone(), val.clone()))
783                .collect();
784
785            let key_values: Vec<crate::index::IndexValue> = columns
786                .iter()
787                .map(|col| {
788                    column_values
789                        .get(col)
790                        .map(IndexKey::from_value)
791                        .unwrap_or(crate::index::IndexValue::Null)
792                })
793                .collect();
794            let key = IndexKey::new(key_values);
795
796            index_manager.insert_into_index(name, key, row_id)?;
797        }
798
799        Ok(())
800    }
801
802    /// Save the execution context to a file.
803    pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
804        let snapshot = self.to_snapshot();
805        let json = serde_json::to_string_pretty(&snapshot)
806            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
807        std::fs::write(path, json)
808    }
809
810    /// Load the execution context from a file.
811    pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
812        let json = std::fs::read_to_string(path)?;
813        let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
814            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
815        Ok(Self::from_snapshot(snapshot))
816    }
817
818    /// Merge data from a file into the current context (for loading on startup).
819    pub fn merge_from_file(&mut self, path: &std::path::Path) -> std::io::Result<()> {
820        let json = std::fs::read_to_string(path)?;
821        let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
822            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
823
824        // Merge schemas
825        for schema in snapshot.schemas {
826            self.table_schemas.insert(schema.name.clone(), schema);
827        }
828
829        // Merge tables
830        for table in snapshot.tables {
831            self.tables
832                .insert(table.name.clone(), Arc::new(RwLock::new(table)));
833        }
834
835        // Merge indexes
836        for (table, idxs) in snapshot.indexes {
837            self.indexes.insert(table, idxs);
838        }
839
840        Ok(())
841    }
842
843    // ==========================================================================
844    // DML Operations
845    // ==========================================================================
846
847    /// Insert rows into a table.
848    pub fn insert_rows(
849        &self,
850        table_name: &str,
851        columns: &[String],
852        rows: Vec<Vec<Value>>,
853    ) -> ExecutorResult<u64> {
854        let table = self
855            .get_table(table_name)
856            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
857
858        let mut table_data = table
859            .write()
860            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
861
862        // Get schema to access column defaults
863        let schema = self
864            .table_schemas
865            .get(table_name)
866            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
867
868        // If no columns specified, use all columns in order
869        let target_columns: Vec<String> = if columns.is_empty() {
870            table_data.columns.clone()
871        } else {
872            columns.to_vec()
873        };
874
875        // Build column index mapping
876        let mut column_indices: Vec<usize> = Vec::new();
877        for col in &target_columns {
878            let idx = table_data
879                .columns
880                .iter()
881                .position(|c| c == col)
882                .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
883            column_indices.push(idx);
884        }
885
886        let mut inserted = 0u64;
887
888        // Build default values for each column
889        let defaults: Vec<Value> = schema
890            .columns
891            .iter()
892            .map(|col| col.default.clone().unwrap_or(Value::Null))
893            .collect();
894
895        // Identify constraint columns for validation
896        let pk_columns: Vec<usize> = schema
897            .primary_key
898            .as_ref()
899            .map(|pk| {
900                pk.iter()
901                    .filter_map(|c| table_data.columns.iter().position(|tc| tc == c))
902                    .collect()
903            })
904            .unwrap_or_default();
905        let unique_columns: Vec<Vec<usize>> = schema
906            .constraints
907            .iter()
908            .filter_map(|c| match &c.constraint_type {
909                StoredConstraintType::Unique { columns: cols } => Some(
910                    cols.iter()
911                        .filter_map(|c| table_data.columns.iter().position(|tc| tc == c))
912                        .collect(),
913                ),
914                _ => None,
915            })
916            .collect();
917        let not_null_columns: Vec<usize> = schema
918            .columns
919            .iter()
920            .enumerate()
921            .filter(|(_, c)| !c.nullable)
922            .map(|(i, _)| i)
923            .collect();
924
925        for row_values in rows {
926            // Create a new row initialized with defaults (or Null if no default)
927            let mut new_row: Vec<Value> = defaults.clone();
928
929            // Fill in the provided values
930            for (i, &col_idx) in column_indices.iter().enumerate() {
931                if let Some(value) = row_values.get(i) {
932                    new_row[col_idx] = value.clone();
933                }
934            }
935
936            // Enforce NOT NULL constraints
937            for &col_idx in &not_null_columns {
938                if matches!(new_row.get(col_idx), Some(Value::Null) | None) {
939                    let col_name = table_data.columns.get(col_idx).cloned().unwrap_or_default();
940                    return Err(ExecutorError::InvalidOperation(format!(
941                        "NOT NULL constraint violated for column '{}'",
942                        col_name
943                    )));
944                }
945            }
946
947            // Enforce PRIMARY KEY uniqueness
948            if !pk_columns.is_empty() {
949                let pk_vals: Vec<&Value> = pk_columns.iter().map(|&i| &new_row[i]).collect();
950                for (ri, existing) in table_data.rows.iter().enumerate() {
951                    if !self.is_row_visible(&table_data, ri) {
952                        continue;
953                    }
954                    let existing_pk: Vec<&Value> =
955                        pk_columns.iter().map(|&i| &existing.values[i]).collect();
956                    if pk_vals == existing_pk {
957                        return Err(ExecutorError::InvalidOperation(format!(
958                            "PRIMARY KEY constraint violated: duplicate key in table '{}'",
959                            table_name
960                        )));
961                    }
962                }
963            }
964
965            // Enforce UNIQUE constraints
966            for unique_cols in &unique_columns {
967                let vals: Vec<&Value> = unique_cols.iter().map(|&i| &new_row[i]).collect();
968                // Skip uniqueness check if any value is NULL (SQL standard)
969                if vals.iter().any(|v| matches!(v, Value::Null)) {
970                    continue;
971                }
972                for (ri, existing) in table_data.rows.iter().enumerate() {
973                    if !self.is_row_visible(&table_data, ri) {
974                        continue;
975                    }
976                    let existing_vals: Vec<&Value> =
977                        unique_cols.iter().map(|&i| &existing.values[i]).collect();
978                    if vals == existing_vals {
979                        return Err(ExecutorError::InvalidOperation(
980                            "UNIQUE constraint violated".to_string(),
981                        ));
982                    }
983                }
984            }
985
986            table_data.rows.push(Row { values: new_row });
987            table_data.row_created_version.push(self.version_clock);
988            table_data.row_deleted_version.push(0);
989            inserted += 1;
990        }
991
992        Ok(inserted)
993    }
994
995    /// Update rows in a table.
996    pub fn update_rows(
997        &self,
998        table_name: &str,
999        assignments: &[(String, Value)],
1000        predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
1001    ) -> ExecutorResult<u64> {
1002        let table = self
1003            .get_table(table_name)
1004            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
1005
1006        let mut table_data = table
1007            .write()
1008            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1009        let columns = table_data.columns.clone();
1010
1011        // Build assignment index mapping
1012        let mut assignment_indices: Vec<(usize, Value)> = Vec::new();
1013        for (col, val) in assignments {
1014            let idx = columns
1015                .iter()
1016                .position(|c| c == col)
1017                .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
1018            assignment_indices.push((idx, val.clone()));
1019        }
1020
1021        let mut updated = 0u64;
1022
1023        for i in 0..table_data.rows.len() {
1024            if !self.is_row_visible(&table_data, i) {
1025                continue;
1026            }
1027            let should_update = predicate
1028                .map(|p| p(&table_data.rows[i], &columns))
1029                .unwrap_or(true);
1030            if should_update {
1031                for (col_idx, value) in &assignment_indices {
1032                    table_data.rows[i].values[*col_idx] = value.clone();
1033                }
1034                updated += 1;
1035            }
1036        }
1037
1038        Ok(updated)
1039    }
1040
1041    /// Delete rows from a table.
1042    pub fn delete_rows(
1043        &self,
1044        table_name: &str,
1045        predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
1046    ) -> ExecutorResult<u64> {
1047        let table = self
1048            .get_table(table_name)
1049            .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
1050
1051        let mut table_data = table
1052            .write()
1053            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1054        let columns = table_data.columns.clone();
1055        let ver = self.version_clock;
1056
1057        let mut deleted = 0u64;
1058
1059        if let Some(pred) = predicate {
1060            for i in 0..table_data.rows.len() {
1061                // Only consider visible rows
1062                if !self.is_row_visible(&table_data, i) {
1063                    continue;
1064                }
1065                if pred(&table_data.rows[i], &columns) {
1066                    // Mark as deleted at current version
1067                    if i < table_data.row_deleted_version.len() {
1068                        table_data.row_deleted_version[i] = ver;
1069                    }
1070                    deleted += 1;
1071                }
1072            }
1073        } else {
1074            for i in 0..table_data.rows.len() {
1075                if self.is_row_visible(&table_data, i) {
1076                    if i < table_data.row_deleted_version.len() {
1077                        table_data.row_deleted_version[i] = ver;
1078                    }
1079                    deleted += 1;
1080                }
1081            }
1082        }
1083
1084        // If no active snapshot, physically remove deleted rows (compaction)
1085        if self.snapshot_version.is_none() {
1086            let mut i = 0;
1087            while i < table_data.rows.len() {
1088                let del_ver = table_data.row_deleted_version.get(i).copied().unwrap_or(0);
1089                if del_ver > 0 {
1090                    table_data.rows.remove(i);
1091                    if i < table_data.row_created_version.len() {
1092                        table_data.row_created_version.remove(i);
1093                    }
1094                    if i < table_data.row_deleted_version.len() {
1095                        table_data.row_deleted_version.remove(i);
1096                    }
1097                } else {
1098                    i += 1;
1099                }
1100            }
1101        }
1102
1103        Ok(deleted)
1104    }
1105}
1106
1107impl Default for ExecutionContext {
1108    fn default() -> Self {
1109        Self::new()
1110    }
1111}
1112
1113// =============================================================================
1114// Result Batch
1115// =============================================================================
1116
1117/// A batch of result rows.
1118#[derive(Debug, Clone)]
1119pub struct ResultBatch {
1120    pub columns: Vec<String>,
1121    pub rows: Vec<Row>,
1122}
1123
1124impl ResultBatch {
1125    pub fn new(columns: Vec<String>) -> Self {
1126        Self {
1127            columns,
1128            rows: Vec::new(),
1129        }
1130    }
1131
1132    pub fn with_rows(columns: Vec<String>, rows: Vec<Row>) -> Self {
1133        Self { columns, rows }
1134    }
1135
1136    pub fn is_empty(&self) -> bool {
1137        self.rows.is_empty()
1138    }
1139
1140    pub fn len(&self) -> usize {
1141        self.rows.len()
1142    }
1143}
1144
1145// =============================================================================
1146// Query Result
1147// =============================================================================
1148
1149/// Complete query result.
1150#[derive(Debug, Clone)]
1151pub struct QueryResult {
1152    pub columns: Vec<String>,
1153    pub rows: Vec<Row>,
1154    pub rows_affected: u64,
1155}
1156
1157impl QueryResult {
1158    pub fn new(columns: Vec<String>, rows: Vec<Row>) -> Self {
1159        let rows_affected = rows.len() as u64;
1160        Self {
1161            columns,
1162            rows,
1163            rows_affected,
1164        }
1165    }
1166
1167    pub fn empty() -> Self {
1168        Self {
1169            columns: Vec::new(),
1170            rows: Vec::new(),
1171            rows_affected: 0,
1172        }
1173    }
1174}
1175
1176// =============================================================================
1177// Executor
1178// =============================================================================
1179
1180/// Query executor.
1181pub struct Executor {
1182    context: Arc<RwLock<ExecutionContext>>,
1183}
1184
1185impl Executor {
1186    pub fn new(context: ExecutionContext) -> Self {
1187        Self {
1188            context: Arc::new(RwLock::new(context)),
1189        }
1190    }
1191
1192    /// Create executor with a shared context (for persistent DDL across queries).
1193    pub fn with_shared_context(context: Arc<RwLock<ExecutionContext>>) -> Self {
1194        Self { context }
1195    }
1196
1197    /// Execute a query plan.
1198    pub fn execute(&self, plan: &QueryPlan) -> ExecutorResult<QueryResult> {
1199        self.execute_internal(&plan.root)
1200    }
1201
1202    /// Execute a query plan with bound parameters.
1203    /// Parameters are indexed starting from 1 (e.g., $1, $2, etc.)
1204    pub fn execute_with_params(
1205        &self,
1206        plan: &QueryPlan,
1207        params: &[Value],
1208    ) -> ExecutorResult<QueryResult> {
1209        // Substitute placeholders with actual parameter values
1210        let bound_plan = substitute_parameters(&plan.root, params)?;
1211        self.execute_internal(&bound_plan)
1212    }
1213
1214    fn execute_internal(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
1215        match root {
1216            // DDL operations
1217            PlanNode::CreateTable(node) => self.execute_create_table(node),
1218            PlanNode::DropTable(node) => self.execute_drop_table(node),
1219            PlanNode::AlterTable(node) => self.execute_alter_table(node),
1220            PlanNode::CreateIndex(node) => self.execute_create_index(node),
1221            PlanNode::DropIndex(node) => self.execute_drop_index(node),
1222
1223            // DML operations
1224            PlanNode::Insert(node) => self.execute_insert(node),
1225            PlanNode::Update(node) => self.execute_update(node),
1226            PlanNode::Delete(node) => self.execute_delete(node),
1227
1228            // Transaction control — return status message, actual logic in QueryEngine
1229            PlanNode::BeginTransaction => Ok(QueryResult {
1230                columns: vec!["status".to_string()],
1231                rows: vec![Row {
1232                    values: vec![Value::String("BEGIN".to_string())],
1233                }],
1234                rows_affected: 0,
1235            }),
1236            PlanNode::CommitTransaction => Ok(QueryResult {
1237                columns: vec!["status".to_string()],
1238                rows: vec![Row {
1239                    values: vec![Value::String("COMMIT".to_string())],
1240                }],
1241                rows_affected: 0,
1242            }),
1243            PlanNode::RollbackTransaction => Ok(QueryResult {
1244                columns: vec!["status".to_string()],
1245                rows: vec![Row {
1246                    values: vec![Value::String("ROLLBACK".to_string())],
1247                }],
1248                rows_affected: 0,
1249            }),
1250
1251            // Query operations (SELECT)
1252            _ => self.execute_query(root),
1253        }
1254    }
1255
1256    /// Execute a SELECT query.
1257    /// Maximum rows returned by a single SELECT query (safety limit).
1258    const MAX_RESULT_ROWS: usize = 100_000;
1259
1260    fn execute_query(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
1261        let context = self
1262            .context
1263            .read()
1264            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1265        let mut operator = self.create_operator(root, &context)?;
1266        let mut all_rows = Vec::new();
1267        let mut columns = Vec::new();
1268
1269        while let Some(batch) = operator.next_batch()? {
1270            if columns.is_empty() {
1271                columns = batch.columns.clone();
1272            }
1273            all_rows.extend(batch.rows);
1274            if all_rows.len() > Self::MAX_RESULT_ROWS {
1275                all_rows.truncate(Self::MAX_RESULT_ROWS);
1276                break;
1277            }
1278        }
1279
1280        Ok(QueryResult::new(columns, all_rows))
1281    }
1282
1283    /// Execute CREATE TABLE.
1284    fn execute_create_table(&self, node: &CreateTableNode) -> ExecutorResult<QueryResult> {
1285        let columns: Vec<ColumnSchema> = node
1286            .columns
1287            .iter()
1288            .map(|col| {
1289                let default = col
1290                    .default
1291                    .as_ref()
1292                    .map(evaluate_default_expression)
1293                    .transpose()?;
1294                Ok(ColumnSchema {
1295                    name: col.name.clone(),
1296                    data_type: col.data_type.clone(),
1297                    nullable: col.nullable,
1298                    default,
1299                })
1300            })
1301            .collect::<ExecutorResult<Vec<_>>>()?;
1302
1303        // Extract primary key from constraints
1304        let primary_key = node
1305            .constraints
1306            .iter()
1307            .find_map(|c| {
1308                if let CreateTableConstraint::PrimaryKey { columns } = c {
1309                    Some(columns.clone())
1310                } else {
1311                    None
1312                }
1313            })
1314            .or_else(|| {
1315                // Check column-level primary key
1316                let pk_cols: Vec<String> = node
1317                    .columns
1318                    .iter()
1319                    .filter(|c| c.primary_key)
1320                    .map(|c| c.name.clone())
1321                    .collect();
1322                if pk_cols.is_empty() {
1323                    None
1324                } else {
1325                    Some(pk_cols)
1326                }
1327            });
1328
1329        // Convert plan constraints to stored constraints with auto-generated names
1330        let mut stored_constraints = Vec::new();
1331        let mut constraint_counter = 0;
1332        for c in &node.constraints {
1333            constraint_counter += 1;
1334            let (name_suffix, constraint_type) = match c {
1335                CreateTableConstraint::PrimaryKey { columns } => (
1336                    "pk",
1337                    StoredConstraintType::PrimaryKey {
1338                        columns: columns.clone(),
1339                    },
1340                ),
1341                CreateTableConstraint::Unique { columns } => (
1342                    "uq",
1343                    StoredConstraintType::Unique {
1344                        columns: columns.clone(),
1345                    },
1346                ),
1347                CreateTableConstraint::ForeignKey {
1348                    columns,
1349                    ref_table,
1350                    ref_columns,
1351                } => (
1352                    "fk",
1353                    StoredConstraintType::ForeignKey {
1354                        columns: columns.clone(),
1355                        ref_table: ref_table.clone(),
1356                        ref_columns: ref_columns.clone(),
1357                    },
1358                ),
1359                CreateTableConstraint::Check { expression } => (
1360                    "ck",
1361                    StoredConstraintType::Check {
1362                        expression_text: format!("{:?}", expression),
1363                    },
1364                ),
1365            };
1366            stored_constraints.push(StoredConstraint {
1367                name: format!("{}_{}{}", node.table_name, name_suffix, constraint_counter),
1368                constraint_type,
1369            });
1370        }
1371
1372        // Add column-level unique constraints
1373        for col in &node.columns {
1374            if col.unique {
1375                constraint_counter += 1;
1376                stored_constraints.push(StoredConstraint {
1377                    name: format!("{}_{}_uq{}", node.table_name, col.name, constraint_counter),
1378                    constraint_type: StoredConstraintType::Unique {
1379                        columns: vec![col.name.clone()],
1380                    },
1381                });
1382            }
1383        }
1384
1385        self.context
1386            .write()
1387            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1388            .create_table(
1389                node.table_name.clone(),
1390                columns,
1391                primary_key,
1392                stored_constraints,
1393                node.if_not_exists,
1394            )?;
1395
1396        Ok(QueryResult {
1397            columns: vec!["result".to_string()],
1398            rows: vec![Row {
1399                values: vec![Value::String(format!(
1400                    "Table '{}' created",
1401                    node.table_name
1402                ))],
1403            }],
1404            rows_affected: 0,
1405        })
1406    }
1407
1408    /// Execute DROP TABLE.
1409    fn execute_drop_table(&self, node: &DropTableNode) -> ExecutorResult<QueryResult> {
1410        self.context
1411            .write()
1412            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1413            .drop_table(&node.table_name, node.if_exists)?;
1414
1415        Ok(QueryResult {
1416            columns: vec!["result".to_string()],
1417            rows: vec![Row {
1418                values: vec![Value::String(format!(
1419                    "Table '{}' dropped",
1420                    node.table_name
1421                ))],
1422            }],
1423            rows_affected: 0,
1424        })
1425    }
1426
1427    /// Execute ALTER TABLE.
1428    fn execute_alter_table(&self, node: &AlterTableNode) -> ExecutorResult<QueryResult> {
1429        let mut ctx = self
1430            .context
1431            .write()
1432            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1433
1434        for op in &node.operations {
1435            ctx.alter_table(&node.table_name, op)?;
1436        }
1437
1438        let op_count = node.operations.len();
1439        let msg = if op_count == 1 {
1440            format!("Table '{}' altered", node.table_name)
1441        } else {
1442            format!(
1443                "Table '{}' altered ({} operations)",
1444                node.table_name, op_count
1445            )
1446        };
1447
1448        Ok(QueryResult {
1449            columns: vec!["result".to_string()],
1450            rows: vec![Row {
1451                values: vec![Value::String(msg)],
1452            }],
1453            rows_affected: 0,
1454        })
1455    }
1456
1457    /// Execute CREATE INDEX.
1458    fn execute_create_index(&self, node: &CreateIndexNode) -> ExecutorResult<QueryResult> {
1459        self.context
1460            .write()
1461            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1462            .create_index(
1463                node.index_name.clone(),
1464                node.table_name.clone(),
1465                node.columns.clone(),
1466                node.unique,
1467                node.if_not_exists,
1468            )?;
1469
1470        Ok(QueryResult {
1471            columns: vec!["result".to_string()],
1472            rows: vec![Row {
1473                values: vec![Value::String(format!(
1474                    "Index '{}' created",
1475                    node.index_name
1476                ))],
1477            }],
1478            rows_affected: 0,
1479        })
1480    }
1481
1482    /// Execute DROP INDEX.
1483    fn execute_drop_index(&self, node: &DropIndexNode) -> ExecutorResult<QueryResult> {
1484        self.context
1485            .write()
1486            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1487            .drop_index(&node.index_name, node.if_exists)?;
1488
1489        Ok(QueryResult {
1490            columns: vec!["result".to_string()],
1491            rows: vec![Row {
1492                values: vec![Value::String(format!(
1493                    "Index '{}' dropped",
1494                    node.index_name
1495                ))],
1496            }],
1497            rows_affected: 0,
1498        })
1499    }
1500
1501    /// Execute INSERT.
1502    fn execute_insert(&self, node: &InsertNode) -> ExecutorResult<QueryResult> {
1503        let rows: Vec<Vec<Value>> = match &node.source {
1504            InsertPlanSource::Values(values) => {
1505                // Evaluate all value expressions
1506                let empty_row = Row { values: vec![] };
1507                let empty_columns: Vec<String> = vec![];
1508
1509                values
1510                    .iter()
1511                    .map(|row_exprs| {
1512                        row_exprs
1513                            .iter()
1514                            .map(|expr| evaluate_expression(expr, &empty_row, &empty_columns))
1515                            .collect::<ExecutorResult<Vec<_>>>()
1516                    })
1517                    .collect::<ExecutorResult<Vec<_>>>()?
1518            }
1519            InsertPlanSource::Query(subquery) => {
1520                // Execute the subquery and use its results as the insert data
1521                let context = self
1522                    .context
1523                    .read()
1524                    .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1525                let mut operator = self.create_operator(subquery, &context)?;
1526                let mut all_rows = Vec::new();
1527
1528                while let Some(batch) = operator.next_batch()? {
1529                    for row in batch.rows {
1530                        all_rows.push(row.values);
1531                    }
1532                }
1533
1534                all_rows
1535            }
1536        };
1537
1538        let inserted = self
1539            .context
1540            .read()
1541            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1542            .insert_rows(&node.table_name, &node.columns, rows)?;
1543
1544        Ok(QueryResult {
1545            columns: vec!["rows_affected".to_string()],
1546            rows: vec![Row {
1547                values: vec![Value::Integer(inserted as i64)],
1548            }],
1549            rows_affected: inserted,
1550        })
1551    }
1552
1553    /// Execute UPDATE.
1554    fn execute_update(&self, node: &UpdateNode) -> ExecutorResult<QueryResult> {
1555        let context = self
1556            .context
1557            .read()
1558            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1559
1560        // Get table columns for expression evaluation
1561        let table = context
1562            .get_table(&node.table_name)
1563            .ok_or_else(|| ExecutorError::TableNotFound(node.table_name.clone()))?;
1564        let table_data = table
1565            .read()
1566            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1567        let _columns = table_data.columns.clone();
1568        drop(table_data);
1569
1570        // Evaluate assignment values
1571        let empty_row = Row { values: vec![] };
1572        let assignments: Vec<(String, Value)> = node
1573            .assignments
1574            .iter()
1575            .map(|(col, expr)| {
1576                let value = evaluate_expression(expr, &empty_row, &[])?;
1577                Ok((col.clone(), value))
1578            })
1579            .collect::<ExecutorResult<Vec<_>>>()?;
1580
1581        // Create predicate closure if where clause exists
1582        let where_clause = node.where_clause.clone();
1583        let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1584            Box::new(move |row: &Row, cols: &[String]| {
1585                evaluate_expression(&wc, row, cols)
1586                    .map(|v| matches!(v, Value::Boolean(true)))
1587                    .unwrap_or(false)
1588            }) as Box<dyn Fn(&Row, &[String]) -> bool>
1589        });
1590
1591        let updated = context.update_rows(
1592            &node.table_name,
1593            &assignments,
1594            predicate.as_ref().map(|p| p.as_ref()),
1595        )?;
1596
1597        Ok(QueryResult {
1598            columns: vec!["rows_affected".to_string()],
1599            rows: vec![Row {
1600                values: vec![Value::Integer(updated as i64)],
1601            }],
1602            rows_affected: updated,
1603        })
1604    }
1605
1606    /// Execute DELETE.
1607    fn execute_delete(&self, node: &DeleteNode) -> ExecutorResult<QueryResult> {
1608        let context = self
1609            .context
1610            .read()
1611            .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1612
1613        // Create predicate closure if where clause exists
1614        let where_clause = node.where_clause.clone();
1615        let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1616            Box::new(move |row: &Row, cols: &[String]| {
1617                evaluate_expression(&wc, row, cols)
1618                    .map(|v| matches!(v, Value::Boolean(true)))
1619                    .unwrap_or(false)
1620            }) as Box<dyn Fn(&Row, &[String]) -> bool>
1621        });
1622
1623        let deleted =
1624            context.delete_rows(&node.table_name, predicate.as_ref().map(|p| p.as_ref()))?;
1625
1626        Ok(QueryResult {
1627            columns: vec!["rows_affected".to_string()],
1628            rows: vec![Row {
1629                values: vec![Value::Integer(deleted as i64)],
1630            }],
1631            rows_affected: deleted,
1632        })
1633    }
1634
1635    /// Create an operator tree from a plan node.
1636    fn create_operator<'a>(
1637        &'a self,
1638        node: &PlanNode,
1639        context: &'a ExecutionContext,
1640    ) -> ExecutorResult<Box<dyn Operator + 'a>> {
1641        match node {
1642            PlanNode::Scan(scan) => Ok(Box::new(ScanOperator::new(scan.clone(), context)?)),
1643
1644            PlanNode::Filter(filter) => {
1645                let input = self.create_operator(&filter.input, context)?;
1646                Ok(Box::new(FilterOperator::new(
1647                    input,
1648                    filter.predicate.clone(),
1649                )))
1650            }
1651
1652            PlanNode::Project(project) => {
1653                let input = self.create_operator(&project.input, context)?;
1654                Ok(Box::new(ProjectOperator::new(
1655                    input,
1656                    project.expressions.clone(),
1657                )))
1658            }
1659
1660            PlanNode::Join(join) => {
1661                let left = self.create_operator(&join.left, context)?;
1662                let right = self.create_operator(&join.right, context)?;
1663                match join.strategy {
1664                    JoinStrategy::HashJoin => Ok(Box::new(HashJoinOperator::new(
1665                        left,
1666                        right,
1667                        join.join_type,
1668                        join.condition.clone(),
1669                    )?)),
1670                    _ => Ok(Box::new(JoinOperator::new(
1671                        left,
1672                        right,
1673                        join.join_type,
1674                        join.condition.clone(),
1675                        join.strategy,
1676                    )?)),
1677                }
1678            }
1679
1680            PlanNode::Aggregate(agg) => {
1681                let input = self.create_operator(&agg.input, context)?;
1682                Ok(Box::new(AggregateOperator::new(
1683                    input,
1684                    agg.group_by.clone(),
1685                    agg.aggregates.clone(),
1686                )))
1687            }
1688
1689            PlanNode::Sort(sort) => {
1690                let input = self.create_operator(&sort.input, context)?;
1691                Ok(Box::new(SortOperator::new(input, sort.order_by.clone())))
1692            }
1693
1694            PlanNode::Limit(limit) => {
1695                let input = self.create_operator(&limit.input, context)?;
1696                Ok(Box::new(LimitOperator::new(
1697                    input,
1698                    limit.limit,
1699                    limit.offset,
1700                )))
1701            }
1702
1703            PlanNode::Empty => Ok(Box::new(EmptyOperator::new())),
1704
1705            PlanNode::SetOperation(set_op) => {
1706                let left = self.create_operator(&set_op.left, context)?;
1707                let right = self.create_operator(&set_op.right, context)?;
1708                Ok(Box::new(SetOperationOperator::new(left, right, set_op.op)))
1709            }
1710
1711            // DDL and DML nodes are handled in execute(), not here
1712            PlanNode::CreateTable(_)
1713            | PlanNode::DropTable(_)
1714            | PlanNode::AlterTable(_)
1715            | PlanNode::CreateIndex(_)
1716            | PlanNode::DropIndex(_)
1717            | PlanNode::Insert(_)
1718            | PlanNode::Update(_)
1719            | PlanNode::Delete(_)
1720            | PlanNode::BeginTransaction
1721            | PlanNode::CommitTransaction
1722            | PlanNode::RollbackTransaction => Err(ExecutorError::Internal(
1723                "DDL/DML/transaction nodes should not be in operator tree".to_string(),
1724            )),
1725        }
1726    }
1727}
1728
1729// =============================================================================
1730// Operator Trait
1731// =============================================================================
1732
1733/// Operator in the execution pipeline.
1734pub trait Operator {
1735    /// Get the next batch of results.
1736    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>>;
1737
1738    /// Get output column names.
1739    fn columns(&self) -> &[String];
1740}
1741
1742// =============================================================================
1743// Scan Operator
1744// =============================================================================
1745
1746struct ScanOperator {
1747    table: Arc<RwLock<TableData>>,
1748    columns: Vec<String>,
1749    position: usize,
1750    batch_size: usize,
1751    // Cache the visible rows to avoid repeated locking
1752    cached_rows: Option<Vec<Row>>,
1753    /// MVCC snapshot version (None = see all live rows)
1754    snapshot_version: Option<u64>,
1755}
1756
1757impl ScanOperator {
1758    fn new(scan: ScanNode, context: &ExecutionContext) -> ExecutorResult<Self> {
1759        let table = context
1760            .get_table(&scan.table_name)
1761            .ok_or_else(|| ExecutorError::TableNotFound(scan.table_name.clone()))?;
1762
1763        // Read columns under lock, then release lock before moving table into struct
1764        let columns = {
1765            let table_data = table
1766                .read()
1767                .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1768            if scan.columns.is_empty() {
1769                table_data.columns.clone()
1770            } else {
1771                scan.columns.clone()
1772            }
1773        }; // table_data guard dropped here
1774
1775        Ok(Self {
1776            table,
1777            columns,
1778            position: 0,
1779            batch_size: context.batch_size(),
1780            cached_rows: None,
1781            snapshot_version: context.snapshot_version,
1782        })
1783    }
1784}
1785
1786impl Operator for ScanOperator {
1787    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1788        // Cache visible rows on first access
1789        if self.cached_rows.is_none() {
1790            let table_data = self
1791                .table
1792                .read()
1793                .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1794            let mut visible = Vec::new();
1795            for i in 0..table_data.rows.len() {
1796                let created = table_data.row_created_version.get(i).copied().unwrap_or(0);
1797                let deleted = table_data.row_deleted_version.get(i).copied().unwrap_or(0);
1798                let vis = if let Some(snap) = self.snapshot_version {
1799                    created <= snap && (deleted == 0 || deleted > snap)
1800                } else {
1801                    deleted == 0
1802                };
1803                if vis {
1804                    visible.push(table_data.rows[i].clone());
1805                }
1806            }
1807            self.cached_rows = Some(visible);
1808        }
1809
1810        let rows = self
1811            .cached_rows
1812            .as_ref()
1813            .expect("cached_rows was just set to Some");
1814
1815        if self.position >= rows.len() {
1816            return Ok(None);
1817        }
1818
1819        let end = (self.position + self.batch_size).min(rows.len());
1820        let batch_rows: Vec<Row> = rows[self.position..end].to_vec();
1821        self.position = end;
1822
1823        Ok(Some(ResultBatch::with_rows(
1824            self.columns.clone(),
1825            batch_rows,
1826        )))
1827    }
1828
1829    fn columns(&self) -> &[String] {
1830        &self.columns
1831    }
1832}
1833
1834// =============================================================================
1835// Filter Operator
1836// =============================================================================
1837
1838struct FilterOperator<'a> {
1839    input: Box<dyn Operator + 'a>,
1840    predicate: PlanExpression,
1841    columns: Vec<String>,
1842}
1843
1844impl<'a> FilterOperator<'a> {
1845    fn new(input: Box<dyn Operator + 'a>, predicate: PlanExpression) -> Self {
1846        let columns = input.columns().to_vec();
1847        Self {
1848            input,
1849            predicate,
1850            columns,
1851        }
1852    }
1853
1854    fn evaluate_predicate(&self, row: &Row, columns: &[String]) -> ExecutorResult<bool> {
1855        let value = evaluate_expression(&self.predicate, row, columns)?;
1856        match value {
1857            Value::Boolean(b) => Ok(b),
1858            Value::Null => Ok(false),
1859            _ => Err(ExecutorError::TypeMismatch {
1860                expected: "boolean".to_string(),
1861                actual: format!("{:?}", value),
1862            }),
1863        }
1864    }
1865}
1866
1867impl<'a> Operator for FilterOperator<'a> {
1868    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1869        while let Some(batch) = self.input.next_batch()? {
1870            let filtered: Vec<Row> = batch
1871                .rows
1872                .into_iter()
1873                .filter(|row| {
1874                    self.evaluate_predicate(row, &batch.columns)
1875                        .unwrap_or(false)
1876                })
1877                .collect();
1878
1879            if !filtered.is_empty() {
1880                return Ok(Some(ResultBatch::with_rows(self.columns.clone(), filtered)));
1881            }
1882        }
1883        Ok(None)
1884    }
1885
1886    fn columns(&self) -> &[String] {
1887        &self.columns
1888    }
1889}
1890
1891// =============================================================================
1892// Project Operator
1893// =============================================================================
1894
1895struct ProjectOperator<'a> {
1896    input: Box<dyn Operator + 'a>,
1897    expressions: Vec<crate::planner::ProjectionExpr>,
1898    columns: Vec<String>,
1899    input_columns: Vec<String>,
1900}
1901
1902impl<'a> ProjectOperator<'a> {
1903    fn new(
1904        input: Box<dyn Operator + 'a>,
1905        expressions: Vec<crate::planner::ProjectionExpr>,
1906    ) -> Self {
1907        let input_columns = input.columns().to_vec();
1908
1909        // Expand wildcards in expressions and build column names
1910        let mut expanded_expressions = Vec::new();
1911        let mut columns = Vec::new();
1912
1913        for (i, proj_expr) in expressions.iter().enumerate() {
1914            // Check if this is a wildcard expression (SELECT *)
1915            if let PlanExpression::Column { name, table, .. } = &proj_expr.expr {
1916                if name == "*" {
1917                    // Expand to all input columns (optionally filtered by table prefix)
1918                    for input_col in &input_columns {
1919                        // For table-qualified wildcards (e.g., SELECT t.*), only include
1920                        // columns belonging to that table
1921                        if let Some(tbl) = table {
1922                            let prefix = format!("{}.", tbl);
1923                            if !input_col.starts_with(&prefix) && input_col != tbl {
1924                                // Also match unqualified names if the column count is unambiguous
1925                                // Skip columns that clearly belong to a different table
1926                                if input_col.contains('.') {
1927                                    continue;
1928                                }
1929                            }
1930                        }
1931                        expanded_expressions.push(crate::planner::ProjectionExpr {
1932                            expr: PlanExpression::Column {
1933                                table: None,
1934                                name: input_col.clone(),
1935                                data_type: DataType::Any,
1936                            },
1937                            alias: None,
1938                        });
1939                        columns.push(input_col.clone());
1940                    }
1941                    continue;
1942                }
1943            }
1944
1945            // Regular expression
1946            expanded_expressions.push(proj_expr.clone());
1947            let col_name = proj_expr.alias.clone().unwrap_or_else(|| {
1948                extract_column_name(&proj_expr.expr).unwrap_or_else(|| format!("column_{}", i))
1949            });
1950            columns.push(col_name);
1951        }
1952
1953        Self {
1954            input,
1955            expressions: expanded_expressions,
1956            columns,
1957            input_columns,
1958        }
1959    }
1960}
1961
1962/// Extract a meaningful column name from a PlanExpression.
1963fn extract_column_name(expr: &PlanExpression) -> Option<String> {
1964    match expr {
1965        PlanExpression::Column { name, .. } => Some(name.clone()),
1966        PlanExpression::Function { name, .. } => Some(name.clone()),
1967        PlanExpression::Cast { expr, .. } => extract_column_name(expr),
1968        PlanExpression::UnaryOp { expr, .. } => extract_column_name(expr),
1969        _ => None,
1970    }
1971}
1972
1973impl<'a> Operator for ProjectOperator<'a> {
1974    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1975        if let Some(batch) = self.input.next_batch()? {
1976            let mut result_rows = Vec::with_capacity(batch.rows.len());
1977
1978            for row in &batch.rows {
1979                let mut projected_values = Vec::with_capacity(self.expressions.len());
1980
1981                for expr in &self.expressions {
1982                    let value = evaluate_expression(&expr.expr, row, &self.input_columns)?;
1983                    projected_values.push(value);
1984                }
1985
1986                result_rows.push(Row {
1987                    values: projected_values,
1988                });
1989            }
1990
1991            Ok(Some(ResultBatch::with_rows(
1992                self.columns.clone(),
1993                result_rows,
1994            )))
1995        } else {
1996            Ok(None)
1997        }
1998    }
1999
2000    fn columns(&self) -> &[String] {
2001        &self.columns
2002    }
2003}
2004
2005// =============================================================================
2006// Join Operator
2007// =============================================================================
2008
2009struct JoinOperator<'a> {
2010    left: Box<dyn Operator + 'a>,
2011    right: Box<dyn Operator + 'a>,
2012    join_type: PlanJoinType,
2013    condition: Option<PlanExpression>,
2014    _strategy: JoinStrategy,
2015    columns: Vec<String>,
2016    right_col_count: usize,
2017    right_data: Option<Vec<Row>>,
2018    left_batch: Option<ResultBatch>,
2019    left_row_idx: usize,
2020    right_row_idx: usize,
2021}
2022
2023impl<'a> JoinOperator<'a> {
2024    fn new(
2025        left: Box<dyn Operator + 'a>,
2026        right: Box<dyn Operator + 'a>,
2027        join_type: PlanJoinType,
2028        condition: Option<PlanExpression>,
2029        strategy: JoinStrategy,
2030    ) -> ExecutorResult<Self> {
2031        let mut columns = left.columns().to_vec();
2032        let right_col_count = right.columns().len();
2033        columns.extend(right.columns().to_vec());
2034
2035        Ok(Self {
2036            left,
2037            right,
2038            join_type,
2039            condition,
2040            _strategy: strategy,
2041            columns,
2042            right_col_count,
2043            right_data: None,
2044            left_batch: None,
2045            left_row_idx: 0,
2046            right_row_idx: 0,
2047        })
2048    }
2049
2050    fn materialize_right(&mut self) -> ExecutorResult<()> {
2051        if self.right_data.is_some() {
2052            return Ok(());
2053        }
2054
2055        let mut all_rows = Vec::new();
2056        while let Some(batch) = self.right.next_batch()? {
2057            all_rows.extend(batch.rows);
2058        }
2059        self.right_data = Some(all_rows);
2060        Ok(())
2061    }
2062
2063    fn evaluate_join_condition(&self, left: &Row, right: &Row) -> ExecutorResult<bool> {
2064        match &self.condition {
2065            None => Ok(true),
2066            Some(expr) => {
2067                let mut combined = left.values.clone();
2068                combined.extend(right.values.clone());
2069                let combined_row = Row { values: combined };
2070
2071                let value = evaluate_expression(expr, &combined_row, &self.columns)?;
2072                match value {
2073                    Value::Boolean(b) => Ok(b),
2074                    Value::Null => Ok(false),
2075                    _ => Ok(false),
2076                }
2077            }
2078        }
2079    }
2080}
2081
2082impl<'a> Operator for JoinOperator<'a> {
2083    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2084        self.materialize_right()?;
2085        // Safe to use expect: materialize_right() sets right_data to Some
2086        let right_data = self
2087            .right_data
2088            .as_ref()
2089            .expect("right_data was set by materialize_right");
2090
2091        let mut result_rows = Vec::new();
2092
2093        loop {
2094            if self.left_batch.is_none() {
2095                self.left_batch = self.left.next_batch()?;
2096                self.left_row_idx = 0;
2097                self.right_row_idx = 0;
2098
2099                if self.left_batch.is_none() {
2100                    break;
2101                }
2102            }
2103
2104            // Safe to use expect: we checked left_batch.is_some() above
2105            let left_batch = self
2106                .left_batch
2107                .as_ref()
2108                .expect("left_batch verified to be Some");
2109
2110            while self.left_row_idx < left_batch.rows.len() {
2111                let left_row = &left_batch.rows[self.left_row_idx];
2112                let mut matched = false;
2113
2114                while self.right_row_idx < right_data.len() {
2115                    let right_row = &right_data[self.right_row_idx];
2116                    self.right_row_idx += 1;
2117
2118                    if self.evaluate_join_condition(left_row, right_row)? {
2119                        let mut combined = left_row.values.clone();
2120                        combined.extend(right_row.values.clone());
2121                        result_rows.push(Row { values: combined });
2122                        matched = true;
2123
2124                        if result_rows.len() >= 1024 {
2125                            return Ok(Some(ResultBatch::with_rows(
2126                                self.columns.clone(),
2127                                result_rows,
2128                            )));
2129                        }
2130                    }
2131                }
2132
2133                // For LEFT joins, emit left + NULLs if no match found
2134                if !matched && self.join_type == PlanJoinType::Left {
2135                    let mut combined = left_row.values.clone();
2136                    combined.extend(vec![Value::Null; self.right_col_count]);
2137                    result_rows.push(Row { values: combined });
2138                }
2139
2140                self.left_row_idx += 1;
2141                self.right_row_idx = 0;
2142            }
2143
2144            self.left_batch = None;
2145        }
2146
2147        if result_rows.is_empty() {
2148            Ok(None)
2149        } else {
2150            Ok(Some(ResultBatch::with_rows(
2151                self.columns.clone(),
2152                result_rows,
2153            )))
2154        }
2155    }
2156
2157    fn columns(&self) -> &[String] {
2158        &self.columns
2159    }
2160}
2161
2162// =============================================================================
2163// Hash Join Operator
2164// =============================================================================
2165
2166struct HashJoinOperator<'a> {
2167    left: Box<dyn Operator + 'a>,
2168    join_type: PlanJoinType,
2169    condition: Option<PlanExpression>,
2170    columns: Vec<String>,
2171    right_columns: Vec<String>,
2172    /// Hash table: key is the stringified join-key value, value is matching rows
2173    hash_table: Option<HashMap<String, Vec<Row>>>,
2174    /// Index of the left-side column used as the probe key (within left_columns)
2175    left_key_idx: Option<usize>,
2176    done: bool,
2177}
2178
2179impl<'a> HashJoinOperator<'a> {
2180    fn new(
2181        left: Box<dyn Operator + 'a>,
2182        mut right: Box<dyn Operator + 'a>,
2183        join_type: PlanJoinType,
2184        condition: Option<PlanExpression>,
2185    ) -> ExecutorResult<Self> {
2186        let left_columns = left.columns().to_vec();
2187        let right_columns = right.columns().to_vec();
2188
2189        let mut columns = left_columns.clone();
2190        columns.extend(right_columns.clone());
2191
2192        // Extract equality key columns from condition for hash partitioning
2193        let (left_key_idx, right_key_idx) =
2194            Self::extract_key_indices(&condition, &left_columns, &right_columns);
2195
2196        // Build phase: materialize right side into hash table
2197        let mut hash_table: HashMap<String, Vec<Row>> = HashMap::new();
2198        while let Some(batch) = right.next_batch()? {
2199            for row in batch.rows {
2200                let key = if let Some(idx) = right_key_idx {
2201                    format!("{:?}", row.values.get(idx).unwrap_or(&Value::Null))
2202                } else {
2203                    String::new() // fallback: single bucket (degrades to nested loop)
2204                };
2205                hash_table.entry(key).or_default().push(row);
2206            }
2207        }
2208
2209        Ok(Self {
2210            left,
2211            join_type,
2212            condition,
2213            columns,
2214            right_columns,
2215            hash_table: Some(hash_table),
2216            left_key_idx,
2217            done: false,
2218        })
2219    }
2220
2221    /// Try to extract (left_col_idx, right_col_idx) from an equality condition
2222    fn extract_key_indices(
2223        condition: &Option<PlanExpression>,
2224        left_columns: &[String],
2225        right_columns: &[String],
2226    ) -> (Option<usize>, Option<usize>) {
2227        if let Some(PlanExpression::BinaryOp {
2228            left,
2229            op: PlanBinaryOp::Equal,
2230            right,
2231        }) = condition
2232        {
2233            let left_name = Self::extract_column_name(left);
2234            let right_name = Self::extract_column_name(right);
2235
2236            if let (Some(ln), Some(rn)) = (left_name, right_name) {
2237                // Try both orderings: condition might be left_col = right_col or right_col = left_col
2238                let li = left_columns
2239                    .iter()
2240                    .position(|c| c == &ln || c.ends_with(&format!(".{}", ln)));
2241                let ri = right_columns
2242                    .iter()
2243                    .position(|c| c == &rn || c.ends_with(&format!(".{}", rn)));
2244                if li.is_some() && ri.is_some() {
2245                    return (li, ri);
2246                }
2247                // Try swapped
2248                let li = left_columns
2249                    .iter()
2250                    .position(|c| c == &rn || c.ends_with(&format!(".{}", rn)));
2251                let ri = right_columns
2252                    .iter()
2253                    .position(|c| c == &ln || c.ends_with(&format!(".{}", ln)));
2254                if li.is_some() && ri.is_some() {
2255                    return (li, ri);
2256                }
2257            }
2258        }
2259        (None, None)
2260    }
2261
2262    fn extract_column_name(expr: &PlanExpression) -> Option<String> {
2263        match expr {
2264            PlanExpression::Column { name, .. } => Some(name.clone()),
2265            _ => None,
2266        }
2267    }
2268
2269    fn evaluate_join_condition(&self, left: &Row, right: &Row) -> ExecutorResult<bool> {
2270        match &self.condition {
2271            None => Ok(true),
2272            Some(expr) => {
2273                let mut combined = left.values.clone();
2274                combined.extend(right.values.clone());
2275                let combined_row = Row { values: combined };
2276                let value = evaluate_expression(expr, &combined_row, &self.columns)?;
2277                match value {
2278                    Value::Boolean(b) => Ok(b),
2279                    _ => Ok(false),
2280                }
2281            }
2282        }
2283    }
2284}
2285
2286impl<'a> Operator for HashJoinOperator<'a> {
2287    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2288        if self.done {
2289            return Ok(None);
2290        }
2291
2292        let hash_table = self.hash_table.as_ref().unwrap();
2293        let mut result_rows = Vec::new();
2294
2295        while let Some(left_batch) = self.left.next_batch()? {
2296            for left_row in &left_batch.rows {
2297                // Probe phase: look up matching right rows by hash key
2298                let probe_key = if let Some(idx) = self.left_key_idx {
2299                    format!("{:?}", left_row.values.get(idx).unwrap_or(&Value::Null))
2300                } else {
2301                    String::new()
2302                };
2303
2304                let candidates = if self.left_key_idx.is_some() {
2305                    hash_table
2306                        .get(&probe_key)
2307                        .map(|v| v.as_slice())
2308                        .unwrap_or(&[])
2309                } else {
2310                    // No key extraction — fall back to checking all right rows
2311                    // This shouldn't happen for HashJoin but handles edge cases
2312                    &[]
2313                };
2314
2315                let mut matched = false;
2316                for right_row in candidates {
2317                    if self.evaluate_join_condition(left_row, right_row)? {
2318                        let mut combined = left_row.values.clone();
2319                        combined.extend(right_row.values.clone());
2320                        result_rows.push(Row { values: combined });
2321                        matched = true;
2322
2323                        if result_rows.len() >= 1024 {
2324                            return Ok(Some(ResultBatch::with_rows(
2325                                self.columns.clone(),
2326                                result_rows,
2327                            )));
2328                        }
2329                    }
2330                }
2331
2332                // For LEFT joins, emit left + NULLs if no match found
2333                if !matched && self.join_type == PlanJoinType::Left {
2334                    let mut combined = left_row.values.clone();
2335                    combined.extend(vec![Value::Null; self.right_columns.len()]);
2336                    result_rows.push(Row { values: combined });
2337                }
2338            }
2339        }
2340
2341        self.done = true;
2342
2343        if result_rows.is_empty() {
2344            Ok(None)
2345        } else {
2346            Ok(Some(ResultBatch::with_rows(
2347                self.columns.clone(),
2348                result_rows,
2349            )))
2350        }
2351    }
2352
2353    fn columns(&self) -> &[String] {
2354        &self.columns
2355    }
2356}
2357
2358// =============================================================================
2359// Aggregate Operator
2360// =============================================================================
2361
2362struct AggregateOperator<'a> {
2363    input: Box<dyn Operator + 'a>,
2364    _group_by: Vec<PlanExpression>,
2365    aggregates: Vec<crate::planner::AggregateExpr>,
2366    columns: Vec<String>,
2367    input_columns: Vec<String>,
2368    done: bool,
2369}
2370
2371impl<'a> AggregateOperator<'a> {
2372    fn new(
2373        input: Box<dyn Operator + 'a>,
2374        group_by: Vec<PlanExpression>,
2375        aggregates: Vec<crate::planner::AggregateExpr>,
2376    ) -> Self {
2377        let input_columns = input.columns().to_vec();
2378
2379        let columns: Vec<String> = aggregates
2380            .iter()
2381            .enumerate()
2382            .map(|(i, agg)| agg.alias.clone().unwrap_or_else(|| format!("agg_{}", i)))
2383            .collect();
2384
2385        Self {
2386            input,
2387            _group_by: group_by,
2388            aggregates,
2389            columns,
2390            input_columns,
2391            done: false,
2392        }
2393    }
2394}
2395
2396impl<'a> Operator for AggregateOperator<'a> {
2397    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2398        if self.done {
2399            return Ok(None);
2400        }
2401
2402        let mut accumulators: Vec<Accumulator> = self
2403            .aggregates
2404            .iter()
2405            .map(|agg| Accumulator::new(agg.function))
2406            .collect();
2407
2408        while let Some(batch) = self.input.next_batch()? {
2409            for row in &batch.rows {
2410                for (i, agg) in self.aggregates.iter().enumerate() {
2411                    let value = if let Some(ref arg) = agg.argument {
2412                        evaluate_expression(arg, row, &self.input_columns)?
2413                    } else {
2414                        Value::Integer(1)
2415                    };
2416                    accumulators[i].accumulate(&value)?;
2417                }
2418            }
2419        }
2420
2421        let result_values: Vec<Value> = accumulators.iter().map(|acc| acc.finalize()).collect();
2422
2423        self.done = true;
2424
2425        Ok(Some(ResultBatch::with_rows(
2426            self.columns.clone(),
2427            vec![Row {
2428                values: result_values,
2429            }],
2430        )))
2431    }
2432
2433    fn columns(&self) -> &[String] {
2434        &self.columns
2435    }
2436}
2437
2438/// Accumulator for aggregate functions.
2439struct Accumulator {
2440    function: AggregateFunction,
2441    count: i64,
2442    sum: f64,
2443    min: Option<Value>,
2444    max: Option<Value>,
2445}
2446
2447impl Accumulator {
2448    fn new(function: AggregateFunction) -> Self {
2449        Self {
2450            function,
2451            count: 0,
2452            sum: 0.0,
2453            min: None,
2454            max: None,
2455        }
2456    }
2457
2458    fn accumulate(&mut self, value: &Value) -> ExecutorResult<()> {
2459        if matches!(value, Value::Null) {
2460            return Ok(());
2461        }
2462
2463        self.count += 1;
2464
2465        match self.function {
2466            AggregateFunction::Count => {}
2467            AggregateFunction::Sum | AggregateFunction::Avg => {
2468                self.sum += value_to_f64(value)?;
2469            }
2470            AggregateFunction::Min => {
2471                // Safe to use expect: we check is_none() first, so in the || branch it's Some
2472                if self.min.is_none()
2473                    || compare_values(
2474                        value,
2475                        self.min
2476                            .as_ref()
2477                            .expect("min verified to be Some in || branch"),
2478                    )? == std::cmp::Ordering::Less
2479                {
2480                    self.min = Some(value.clone());
2481                }
2482            }
2483            AggregateFunction::Max => {
2484                // Safe to use expect: we check is_none() first, so in the || branch it's Some
2485                if self.max.is_none()
2486                    || compare_values(
2487                        value,
2488                        self.max
2489                            .as_ref()
2490                            .expect("max verified to be Some in || branch"),
2491                    )? == std::cmp::Ordering::Greater
2492                {
2493                    self.max = Some(value.clone());
2494                }
2495            }
2496        }
2497
2498        Ok(())
2499    }
2500
2501    fn finalize(&self) -> Value {
2502        match self.function {
2503            AggregateFunction::Count => Value::Integer(self.count),
2504            AggregateFunction::Sum => Value::Float(self.sum),
2505            AggregateFunction::Avg => {
2506                if self.count == 0 {
2507                    Value::Null
2508                } else {
2509                    Value::Float(self.sum / self.count as f64)
2510                }
2511            }
2512            AggregateFunction::Min => self.min.clone().unwrap_or(Value::Null),
2513            AggregateFunction::Max => self.max.clone().unwrap_or(Value::Null),
2514        }
2515    }
2516}
2517
2518// =============================================================================
2519// Sort Operator
2520// =============================================================================
2521
2522struct SortOperator<'a> {
2523    input: Box<dyn Operator + 'a>,
2524    order_by: Vec<crate::planner::SortKey>,
2525    columns: Vec<String>,
2526    sorted_data: Option<Vec<Row>>,
2527    position: usize,
2528}
2529
2530impl<'a> SortOperator<'a> {
2531    fn new(input: Box<dyn Operator + 'a>, order_by: Vec<crate::planner::SortKey>) -> Self {
2532        let columns = input.columns().to_vec();
2533        Self {
2534            input,
2535            order_by,
2536            columns,
2537            sorted_data: None,
2538            position: 0,
2539        }
2540    }
2541}
2542
2543impl<'a> Operator for SortOperator<'a> {
2544    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2545        if self.sorted_data.is_none() {
2546            let mut all_rows = Vec::new();
2547            while let Some(batch) = self.input.next_batch()? {
2548                all_rows.extend(batch.rows);
2549            }
2550
2551            let columns = self.columns.clone();
2552            let order_by = self.order_by.clone();
2553
2554            all_rows.sort_by(|a, b| {
2555                for key in &order_by {
2556                    let a_val = evaluate_expression(&key.expr, a, &columns).unwrap_or(Value::Null);
2557                    let b_val = evaluate_expression(&key.expr, b, &columns).unwrap_or(Value::Null);
2558
2559                    let cmp = compare_values(&a_val, &b_val).unwrap_or(std::cmp::Ordering::Equal);
2560
2561                    if cmp != std::cmp::Ordering::Equal {
2562                        return if key.ascending { cmp } else { cmp.reverse() };
2563                    }
2564                }
2565                std::cmp::Ordering::Equal
2566            });
2567
2568            self.sorted_data = Some(all_rows);
2569        }
2570
2571        // Safe to use expect: sorted_data was just set to Some above if it was None
2572        let data = self
2573            .sorted_data
2574            .as_ref()
2575            .expect("sorted_data was just set to Some");
2576
2577        if self.position >= data.len() {
2578            return Ok(None);
2579        }
2580
2581        let end = (self.position + 1024).min(data.len());
2582        let rows = data[self.position..end].to_vec();
2583        self.position = end;
2584
2585        Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)))
2586    }
2587
2588    fn columns(&self) -> &[String] {
2589        &self.columns
2590    }
2591}
2592
2593// =============================================================================
2594// Limit Operator
2595// =============================================================================
2596
2597struct LimitOperator<'a> {
2598    input: Box<dyn Operator + 'a>,
2599    limit: Option<u64>,
2600    offset: Option<u64>,
2601    columns: Vec<String>,
2602    rows_skipped: u64,
2603    rows_returned: u64,
2604}
2605
2606impl<'a> LimitOperator<'a> {
2607    fn new(input: Box<dyn Operator + 'a>, limit: Option<u64>, offset: Option<u64>) -> Self {
2608        let columns = input.columns().to_vec();
2609        Self {
2610            input,
2611            limit,
2612            offset,
2613            columns,
2614            rows_skipped: 0,
2615            rows_returned: 0,
2616        }
2617    }
2618}
2619
2620impl<'a> Operator for LimitOperator<'a> {
2621    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2622        if let Some(limit) = self.limit {
2623            if self.rows_returned >= limit {
2624                return Ok(None);
2625            }
2626        }
2627
2628        while let Some(batch) = self.input.next_batch()? {
2629            let mut rows = batch.rows;
2630
2631            let offset = self.offset.unwrap_or(0);
2632            if self.rows_skipped < offset {
2633                let skip = (offset - self.rows_skipped) as usize;
2634                if skip >= rows.len() {
2635                    self.rows_skipped += rows.len() as u64;
2636                    continue;
2637                }
2638                rows = rows[skip..].to_vec();
2639                self.rows_skipped = offset;
2640            }
2641
2642            if let Some(limit) = self.limit {
2643                let remaining = limit - self.rows_returned;
2644                if rows.len() as u64 > remaining {
2645                    rows.truncate(remaining as usize);
2646                }
2647            }
2648
2649            if rows.is_empty() {
2650                continue;
2651            }
2652
2653            self.rows_returned += rows.len() as u64;
2654
2655            return Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)));
2656        }
2657
2658        Ok(None)
2659    }
2660
2661    fn columns(&self) -> &[String] {
2662        &self.columns
2663    }
2664}
2665
2666// =============================================================================
2667// Empty Operator
2668// =============================================================================
2669
2670struct EmptyOperator {
2671    done: bool,
2672}
2673
2674impl EmptyOperator {
2675    fn new() -> Self {
2676        Self { done: false }
2677    }
2678}
2679
2680impl Operator for EmptyOperator {
2681    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2682        if self.done {
2683            return Ok(None);
2684        }
2685        self.done = true;
2686        Ok(Some(ResultBatch::with_rows(
2687            vec![],
2688            vec![Row { values: vec![] }],
2689        )))
2690    }
2691
2692    fn columns(&self) -> &[String] {
2693        &[]
2694    }
2695}
2696
2697// =============================================================================
2698// Set Operation Operator
2699// =============================================================================
2700
2701struct SetOperationOperator<'a> {
2702    left: Box<dyn Operator + 'a>,
2703    right: Box<dyn Operator + 'a>,
2704    op: SetOperationType,
2705    result_rows: Option<Vec<Row>>,
2706    result_columns: Vec<String>,
2707    position: usize,
2708}
2709
2710impl<'a> SetOperationOperator<'a> {
2711    fn new(
2712        left: Box<dyn Operator + 'a>,
2713        right: Box<dyn Operator + 'a>,
2714        op: SetOperationType,
2715    ) -> Self {
2716        Self {
2717            left,
2718            right,
2719            op,
2720            result_rows: None,
2721            result_columns: Vec::new(),
2722            position: 0,
2723        }
2724    }
2725
2726    /// Drain all rows from an operator.
2727    fn drain_operator(op: &mut Box<dyn Operator + 'a>) -> ExecutorResult<(Vec<String>, Vec<Row>)> {
2728        let mut all_rows = Vec::new();
2729        let mut columns = Vec::new();
2730        while let Some(batch) = op.next_batch()? {
2731            if columns.is_empty() {
2732                columns = batch.columns;
2733            }
2734            all_rows.extend(batch.rows);
2735        }
2736        Ok((columns, all_rows))
2737    }
2738
2739    /// Compute the set operation result lazily on first call.
2740    fn compute(&mut self) -> ExecutorResult<()> {
2741        if self.result_rows.is_some() {
2742            return Ok(());
2743        }
2744
2745        let (left_cols, left_rows) = Self::drain_operator(&mut self.left)?;
2746        let (right_cols, right_rows) = Self::drain_operator(&mut self.right)?;
2747
2748        self.result_columns = if !left_cols.is_empty() {
2749            left_cols
2750        } else {
2751            right_cols
2752        };
2753
2754        let result = match self.op {
2755            SetOperationType::UnionAll => {
2756                let mut combined = left_rows;
2757                combined.extend(right_rows);
2758                combined
2759            }
2760            SetOperationType::Union => {
2761                let mut combined = left_rows;
2762                combined.extend(right_rows);
2763                deduplicate_rows(combined)
2764            }
2765            SetOperationType::Intersect => {
2766                // Keep rows present in both sides
2767                let right_set: HashSet<Vec<ValueKey>> = right_rows
2768                    .iter()
2769                    .map(|r| r.values.iter().map(value_to_key).collect())
2770                    .collect();
2771                let mut result = Vec::new();
2772                let mut seen = HashSet::new();
2773                for row in left_rows {
2774                    let key: Vec<ValueKey> = row.values.iter().map(value_to_key).collect();
2775                    if right_set.contains(&key) && seen.insert(key) {
2776                        result.push(row);
2777                    }
2778                }
2779                result
2780            }
2781            SetOperationType::Except => {
2782                // Keep rows in left that are not in right
2783                let right_set: HashSet<Vec<ValueKey>> = right_rows
2784                    .iter()
2785                    .map(|r| r.values.iter().map(value_to_key).collect())
2786                    .collect();
2787                let mut result = Vec::new();
2788                let mut seen = HashSet::new();
2789                for row in left_rows {
2790                    let key: Vec<ValueKey> = row.values.iter().map(value_to_key).collect();
2791                    if !right_set.contains(&key) && seen.insert(key) {
2792                        result.push(row);
2793                    }
2794                }
2795                result
2796            }
2797        };
2798
2799        self.result_rows = Some(result);
2800        Ok(())
2801    }
2802}
2803
2804impl<'a> Operator for SetOperationOperator<'a> {
2805    fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2806        self.compute()?;
2807
2808        let rows = self.result_rows.as_ref().unwrap();
2809        if self.position >= rows.len() {
2810            return Ok(None);
2811        }
2812
2813        let end = (self.position + 1024).min(rows.len());
2814        let batch_rows = rows[self.position..end].to_vec();
2815        self.position = end;
2816
2817        Ok(Some(ResultBatch::with_rows(
2818            self.result_columns.clone(),
2819            batch_rows,
2820        )))
2821    }
2822
2823    fn columns(&self) -> &[String] {
2824        &self.result_columns
2825    }
2826}
2827
2828/// A hashable key for a Value, used for set operation deduplication.
2829#[derive(Hash, Eq, PartialEq, Clone, Debug)]
2830enum ValueKey {
2831    Null,
2832    Boolean(bool),
2833    Integer(i64),
2834    Float(u64), // bit representation for hashing
2835    String(String),
2836}
2837
2838fn value_to_key(v: &Value) -> ValueKey {
2839    match v {
2840        Value::Null => ValueKey::Null,
2841        Value::Boolean(b) => ValueKey::Boolean(*b),
2842        Value::Integer(i) => ValueKey::Integer(*i),
2843        Value::Float(f) => ValueKey::Float(f.to_bits()),
2844        Value::String(s) => ValueKey::String(s.clone()),
2845        // For any other types, convert to string representation
2846        other => ValueKey::String(format!("{:?}", other)),
2847    }
2848}
2849
2850fn deduplicate_rows(rows: Vec<Row>) -> Vec<Row> {
2851    let mut seen = HashSet::new();
2852    let mut result = Vec::new();
2853    for row in rows {
2854        let key: Vec<ValueKey> = row.values.iter().map(value_to_key).collect();
2855        if seen.insert(key) {
2856            result.push(row);
2857        }
2858    }
2859    result
2860}
2861
2862// =============================================================================
2863// Expression Evaluation
2864// =============================================================================
2865
2866/// Context for expression evaluation, optionally holding executor for subqueries.
2867pub struct EvalContext<'a> {
2868    executor: Option<&'a Executor>,
2869}
2870
2871impl<'a> EvalContext<'a> {
2872    pub fn new() -> Self {
2873        Self { executor: None }
2874    }
2875
2876    pub fn with_executor(executor: &'a Executor) -> Self {
2877        Self {
2878            executor: Some(executor),
2879        }
2880    }
2881
2882    /// Execute a subquery and return all result rows.
2883    fn execute_subquery(&self, subquery: &PlanNode) -> ExecutorResult<Vec<Row>> {
2884        let executor = self.executor.ok_or_else(|| {
2885            ExecutorError::Internal("Subquery requires execution context".to_string())
2886        })?;
2887        let plan = QueryPlan {
2888            root: subquery.clone(),
2889            estimated_cost: 0.0,
2890            estimated_rows: 0,
2891        };
2892        let result = executor.execute(&plan)?;
2893        Ok(result.rows)
2894    }
2895}
2896
2897impl Default for EvalContext<'_> {
2898    fn default() -> Self {
2899        Self::new()
2900    }
2901}
2902
2903/// Evaluate a default expression (constant expression that doesn't need row context).
2904fn evaluate_default_expression(expr: &PlanExpression) -> ExecutorResult<Value> {
2905    let empty_row = Row { values: vec![] };
2906    let empty_columns: Vec<String> = vec![];
2907    evaluate_expression(expr, &empty_row, &empty_columns)
2908}
2909
2910/// Substitute parameter placeholders in a query plan with actual values.
2911fn substitute_parameters(node: &PlanNode, params: &[Value]) -> ExecutorResult<PlanNode> {
2912    match node {
2913        PlanNode::Scan(scan) => {
2914            // ScanNode doesn't have predicates - predicates are in Filter nodes
2915            // Just need to handle index_scan key_range expressions if present
2916            let index_scan = if let Some(idx) = &scan.index_scan {
2917                let start = idx
2918                    .key_range
2919                    .start
2920                    .as_ref()
2921                    .map(|e| substitute_expr(e, params))
2922                    .transpose()?;
2923                let end = idx
2924                    .key_range
2925                    .end
2926                    .as_ref()
2927                    .map(|e| substitute_expr(e, params))
2928                    .transpose()?;
2929                Some(crate::planner::IndexScan {
2930                    index_name: idx.index_name.clone(),
2931                    key_range: crate::planner::KeyRange {
2932                        start,
2933                        start_inclusive: idx.key_range.start_inclusive,
2934                        end,
2935                        end_inclusive: idx.key_range.end_inclusive,
2936                    },
2937                })
2938            } else {
2939                None
2940            };
2941            Ok(PlanNode::Scan(ScanNode {
2942                table_name: scan.table_name.clone(),
2943                alias: scan.alias.clone(),
2944                columns: scan.columns.clone(),
2945                index_scan,
2946            }))
2947        }
2948        PlanNode::Filter(filter) => {
2949            let input = Box::new(substitute_parameters(&filter.input, params)?);
2950            let predicate = substitute_expr(&filter.predicate, params)?;
2951            Ok(PlanNode::Filter(FilterNode { input, predicate }))
2952        }
2953        PlanNode::Project(proj) => {
2954            let input = Box::new(substitute_parameters(&proj.input, params)?);
2955            let expressions = proj
2956                .expressions
2957                .iter()
2958                .map(|pe| {
2959                    Ok(ProjectionExpr {
2960                        expr: substitute_expr(&pe.expr, params)?,
2961                        alias: pe.alias.clone(),
2962                    })
2963                })
2964                .collect::<ExecutorResult<Vec<_>>>()?;
2965            Ok(PlanNode::Project(ProjectNode { input, expressions }))
2966        }
2967        PlanNode::Sort(sort) => {
2968            let input = Box::new(substitute_parameters(&sort.input, params)?);
2969            let order_by = sort
2970                .order_by
2971                .iter()
2972                .map(|sk| {
2973                    Ok(SortKey {
2974                        expr: substitute_expr(&sk.expr, params)?,
2975                        ascending: sk.ascending,
2976                        nulls_first: sk.nulls_first,
2977                    })
2978                })
2979                .collect::<ExecutorResult<Vec<_>>>()?;
2980            Ok(PlanNode::Sort(SortNode { input, order_by }))
2981        }
2982        PlanNode::Limit(limit) => {
2983            let input = Box::new(substitute_parameters(&limit.input, params)?);
2984            Ok(PlanNode::Limit(LimitNode {
2985                input,
2986                limit: limit.limit,
2987                offset: limit.offset,
2988            }))
2989        }
2990        PlanNode::Aggregate(agg) => {
2991            let input = Box::new(substitute_parameters(&agg.input, params)?);
2992            let group_by = agg
2993                .group_by
2994                .iter()
2995                .map(|e| substitute_expr(e, params))
2996                .collect::<ExecutorResult<Vec<_>>>()?;
2997            let aggregates = agg
2998                .aggregates
2999                .iter()
3000                .map(|ae| {
3001                    Ok(AggregateExpr {
3002                        function: ae.function,
3003                        argument: ae
3004                            .argument
3005                            .as_ref()
3006                            .map(|a| substitute_expr(a, params))
3007                            .transpose()?,
3008                        distinct: ae.distinct,
3009                        alias: ae.alias.clone(),
3010                    })
3011                })
3012                .collect::<ExecutorResult<Vec<_>>>()?;
3013            Ok(PlanNode::Aggregate(AggregateNode {
3014                input,
3015                group_by,
3016                aggregates,
3017            }))
3018        }
3019        PlanNode::Join(join) => {
3020            let left = Box::new(substitute_parameters(&join.left, params)?);
3021            let right = Box::new(substitute_parameters(&join.right, params)?);
3022            let condition = join
3023                .condition
3024                .as_ref()
3025                .map(|c| substitute_expr(c, params))
3026                .transpose()?;
3027            Ok(PlanNode::Join(JoinNode {
3028                left,
3029                right,
3030                join_type: join.join_type,
3031                condition,
3032                strategy: join.strategy,
3033            }))
3034        }
3035        PlanNode::Insert(insert) => {
3036            let source = match &insert.source {
3037                InsertPlanSource::Values(rows) => {
3038                    let substituted = rows
3039                        .iter()
3040                        .map(|row| {
3041                            row.iter()
3042                                .map(|e| substitute_expr(e, params))
3043                                .collect::<ExecutorResult<Vec<_>>>()
3044                        })
3045                        .collect::<ExecutorResult<Vec<_>>>()?;
3046                    InsertPlanSource::Values(substituted)
3047                }
3048                InsertPlanSource::Query(q) => {
3049                    InsertPlanSource::Query(Box::new(substitute_parameters(q, params)?))
3050                }
3051            };
3052            Ok(PlanNode::Insert(InsertNode {
3053                table_name: insert.table_name.clone(),
3054                columns: insert.columns.clone(),
3055                source,
3056            }))
3057        }
3058        PlanNode::Update(update) => {
3059            let assignments = update
3060                .assignments
3061                .iter()
3062                .map(|(col, expr)| Ok((col.clone(), substitute_expr(expr, params)?)))
3063                .collect::<ExecutorResult<Vec<_>>>()?;
3064            let where_clause = update
3065                .where_clause
3066                .as_ref()
3067                .map(|p| substitute_expr(p, params))
3068                .transpose()?;
3069            Ok(PlanNode::Update(UpdateNode {
3070                table_name: update.table_name.clone(),
3071                assignments,
3072                where_clause,
3073            }))
3074        }
3075        PlanNode::Delete(delete) => {
3076            let where_clause = delete
3077                .where_clause
3078                .as_ref()
3079                .map(|p| substitute_expr(p, params))
3080                .transpose()?;
3081            Ok(PlanNode::Delete(DeleteNode {
3082                table_name: delete.table_name.clone(),
3083                where_clause,
3084            }))
3085        }
3086        PlanNode::SetOperation(set_op) => {
3087            let left = Box::new(substitute_parameters(&set_op.left, params)?);
3088            let right = Box::new(substitute_parameters(&set_op.right, params)?);
3089            Ok(PlanNode::SetOperation(SetOperationNode {
3090                op: set_op.op,
3091                left,
3092                right,
3093            }))
3094        }
3095        // DDL operations don't have parameters, return as-is
3096        PlanNode::CreateTable(_)
3097        | PlanNode::DropTable(_)
3098        | PlanNode::AlterTable(_)
3099        | PlanNode::CreateIndex(_)
3100        | PlanNode::DropIndex(_)
3101        | PlanNode::Empty
3102        | PlanNode::BeginTransaction
3103        | PlanNode::CommitTransaction
3104        | PlanNode::RollbackTransaction => Ok(node.clone()),
3105    }
3106}
3107
3108/// Substitute parameter placeholders in an expression with actual values.
3109fn substitute_expr(expr: &PlanExpression, params: &[Value]) -> ExecutorResult<PlanExpression> {
3110    match expr {
3111        PlanExpression::Placeholder(idx) => {
3112            // Parameters are 1-indexed ($1, $2, etc.)
3113            let param_idx = *idx - 1;
3114            params.get(param_idx).map(value_to_literal).ok_or_else(|| {
3115                ExecutorError::Internal(format!(
3116                    "Missing parameter ${}: expected {} parameters but got {}",
3117                    idx,
3118                    idx,
3119                    params.len()
3120                ))
3121            })
3122        }
3123        PlanExpression::BinaryOp { op, left, right } => Ok(PlanExpression::BinaryOp {
3124            op: *op,
3125            left: Box::new(substitute_expr(left, params)?),
3126            right: Box::new(substitute_expr(right, params)?),
3127        }),
3128        PlanExpression::UnaryOp { op, expr: inner } => Ok(PlanExpression::UnaryOp {
3129            op: *op,
3130            expr: Box::new(substitute_expr(inner, params)?),
3131        }),
3132        PlanExpression::Function {
3133            name,
3134            args,
3135            return_type,
3136        } => {
3137            let new_args = args
3138                .iter()
3139                .map(|a| substitute_expr(a, params))
3140                .collect::<ExecutorResult<Vec<_>>>()?;
3141            Ok(PlanExpression::Function {
3142                name: name.clone(),
3143                args: new_args,
3144                return_type: return_type.clone(),
3145            })
3146        }
3147        PlanExpression::Case {
3148            operand,
3149            conditions,
3150            else_result,
3151        } => {
3152            let new_operand = operand
3153                .as_ref()
3154                .map(|o| substitute_expr(o, params))
3155                .transpose()?
3156                .map(Box::new);
3157            let new_conditions = conditions
3158                .iter()
3159                .map(|(cond, result)| {
3160                    Ok((
3161                        substitute_expr(cond, params)?,
3162                        substitute_expr(result, params)?,
3163                    ))
3164                })
3165                .collect::<ExecutorResult<Vec<_>>>()?;
3166            let new_else = else_result
3167                .as_ref()
3168                .map(|e| substitute_expr(e, params))
3169                .transpose()?
3170                .map(Box::new);
3171            Ok(PlanExpression::Case {
3172                operand: new_operand,
3173                conditions: new_conditions,
3174                else_result: new_else,
3175            })
3176        }
3177        PlanExpression::InList {
3178            expr: inner,
3179            list,
3180            negated,
3181        } => {
3182            let new_inner = Box::new(substitute_expr(inner, params)?);
3183            let new_list = list
3184                .iter()
3185                .map(|e| substitute_expr(e, params))
3186                .collect::<ExecutorResult<Vec<_>>>()?;
3187            Ok(PlanExpression::InList {
3188                expr: new_inner,
3189                list: new_list,
3190                negated: *negated,
3191            })
3192        }
3193        PlanExpression::InSubquery {
3194            expr: inner,
3195            subquery,
3196            negated,
3197        } => {
3198            let new_inner = Box::new(substitute_expr(inner, params)?);
3199            let new_subquery = Box::new(substitute_parameters(subquery, params)?);
3200            Ok(PlanExpression::InSubquery {
3201                expr: new_inner,
3202                subquery: new_subquery,
3203                negated: *negated,
3204            })
3205        }
3206        PlanExpression::Exists { subquery, negated } => {
3207            let new_subquery = Box::new(substitute_parameters(subquery, params)?);
3208            Ok(PlanExpression::Exists {
3209                subquery: new_subquery,
3210                negated: *negated,
3211            })
3212        }
3213        PlanExpression::ScalarSubquery(subquery) => {
3214            let new_subquery = Box::new(substitute_parameters(subquery, params)?);
3215            Ok(PlanExpression::ScalarSubquery(new_subquery))
3216        }
3217        PlanExpression::Between {
3218            expr: inner,
3219            low,
3220            high,
3221            negated,
3222        } => Ok(PlanExpression::Between {
3223            expr: Box::new(substitute_expr(inner, params)?),
3224            low: Box::new(substitute_expr(low, params)?),
3225            high: Box::new(substitute_expr(high, params)?),
3226            negated: *negated,
3227        }),
3228        PlanExpression::Like {
3229            expr: inner,
3230            pattern,
3231            negated,
3232        } => Ok(PlanExpression::Like {
3233            expr: Box::new(substitute_expr(inner, params)?),
3234            pattern: Box::new(substitute_expr(pattern, params)?),
3235            negated: *negated,
3236        }),
3237        PlanExpression::IsNull {
3238            expr: inner,
3239            negated,
3240        } => Ok(PlanExpression::IsNull {
3241            expr: Box::new(substitute_expr(inner, params)?),
3242            negated: *negated,
3243        }),
3244        PlanExpression::Cast {
3245            expr: inner,
3246            target_type,
3247        } => Ok(PlanExpression::Cast {
3248            expr: Box::new(substitute_expr(inner, params)?),
3249            target_type: target_type.clone(),
3250        }),
3251        // Literals and columns don't have parameters
3252        PlanExpression::Literal(_) | PlanExpression::Column { .. } => Ok(expr.clone()),
3253    }
3254}
3255
3256/// Convert a Value back to a PlanExpression::Literal.
3257fn value_to_literal(value: &Value) -> PlanExpression {
3258    let lit = match value {
3259        Value::Null => PlanLiteral::Null,
3260        Value::Boolean(b) => PlanLiteral::Boolean(*b),
3261        Value::Integer(i) => PlanLiteral::Integer(*i),
3262        Value::Float(f) => PlanLiteral::Float(*f),
3263        Value::String(s) => PlanLiteral::String(s.clone()),
3264        Value::Bytes(b) => PlanLiteral::String(String::from_utf8_lossy(b).to_string()),
3265        Value::Timestamp(t) => PlanLiteral::Integer(t.timestamp_millis()),
3266        // Arrays and objects can be serialized to string representation for parameters
3267        Value::Array(arr) => PlanLiteral::String(format!("{:?}", arr)),
3268        Value::Object(obj) => PlanLiteral::String(format!("{:?}", obj)),
3269    };
3270    PlanExpression::Literal(lit)
3271}
3272
3273/// Evaluate expression without subquery support (backwards compatible).
3274fn evaluate_expression(
3275    expr: &PlanExpression,
3276    row: &Row,
3277    columns: &[String],
3278) -> ExecutorResult<Value> {
3279    evaluate_expression_with_context(expr, row, columns, &EvalContext::new())
3280}
3281
3282/// Evaluate expression with optional subquery support.
3283fn evaluate_expression_with_context(
3284    expr: &PlanExpression,
3285    row: &Row,
3286    columns: &[String],
3287    ctx: &EvalContext,
3288) -> ExecutorResult<Value> {
3289    match expr {
3290        PlanExpression::Literal(lit) => Ok(match lit {
3291            PlanLiteral::Null => Value::Null,
3292            PlanLiteral::Boolean(b) => Value::Boolean(*b),
3293            PlanLiteral::Integer(i) => Value::Integer(*i),
3294            PlanLiteral::Float(f) => Value::Float(*f),
3295            PlanLiteral::String(s) => Value::String(s.clone()),
3296        }),
3297
3298        PlanExpression::Column { table: _, name, .. } => {
3299            if name == "*" {
3300                return Ok(Value::Null);
3301            }
3302
3303            let idx = columns
3304                .iter()
3305                .position(|c| c == name)
3306                .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
3307
3308            Ok(row.values.get(idx).cloned().unwrap_or(Value::Null))
3309        }
3310
3311        PlanExpression::BinaryOp { left, op, right } => {
3312            let left_val = evaluate_expression_with_context(left, row, columns, ctx)?;
3313            let right_val = evaluate_expression_with_context(right, row, columns, ctx)?;
3314            evaluate_binary_op(*op, &left_val, &right_val)
3315        }
3316
3317        PlanExpression::UnaryOp { op, expr } => {
3318            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3319            evaluate_unary_op(*op, &val)
3320        }
3321
3322        PlanExpression::IsNull { expr, negated } => {
3323            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3324            let is_null = matches!(val, Value::Null);
3325            Ok(Value::Boolean(if *negated { !is_null } else { is_null }))
3326        }
3327
3328        PlanExpression::Cast { expr, target_type } => {
3329            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3330            cast_value(&val, target_type)
3331        }
3332
3333        PlanExpression::Function { name, args, .. } => {
3334            let arg_values: Vec<Value> = args
3335                .iter()
3336                .map(|a| evaluate_expression_with_context(a, row, columns, ctx))
3337                .collect::<ExecutorResult<Vec<_>>>()?;
3338            evaluate_function(name, &arg_values)
3339        }
3340
3341        PlanExpression::Case {
3342            operand,
3343            conditions,
3344            else_result,
3345        } => {
3346            match operand {
3347                Some(operand_expr) => {
3348                    // CASE operand WHEN value THEN result ... END
3349                    let operand_val =
3350                        evaluate_expression_with_context(operand_expr, row, columns, ctx)?;
3351                    for (when_expr, then_expr) in conditions {
3352                        let when_val =
3353                            evaluate_expression_with_context(when_expr, row, columns, ctx)?;
3354                        if compare_values(&operand_val, &when_val)? == std::cmp::Ordering::Equal {
3355                            return evaluate_expression_with_context(then_expr, row, columns, ctx);
3356                        }
3357                    }
3358                }
3359                None => {
3360                    // CASE WHEN condition THEN result ... END
3361                    for (when_expr, then_expr) in conditions {
3362                        let when_val =
3363                            evaluate_expression_with_context(when_expr, row, columns, ctx)?;
3364                        if matches!(when_val, Value::Boolean(true)) {
3365                            return evaluate_expression_with_context(then_expr, row, columns, ctx);
3366                        }
3367                    }
3368                }
3369            }
3370            // Return ELSE result or NULL
3371            match else_result {
3372                Some(else_expr) => evaluate_expression_with_context(else_expr, row, columns, ctx),
3373                None => Ok(Value::Null),
3374            }
3375        }
3376
3377        PlanExpression::InList {
3378            expr,
3379            list,
3380            negated,
3381        } => {
3382            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3383            let mut found = false;
3384            for item in list {
3385                let item_val = evaluate_expression_with_context(item, row, columns, ctx)?;
3386                if compare_values(&val, &item_val)? == std::cmp::Ordering::Equal {
3387                    found = true;
3388                    break;
3389                }
3390            }
3391            Ok(Value::Boolean(if *negated { !found } else { found }))
3392        }
3393
3394        PlanExpression::Between {
3395            expr,
3396            low,
3397            high,
3398            negated,
3399        } => {
3400            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3401            let low_val = evaluate_expression_with_context(low, row, columns, ctx)?;
3402            let high_val = evaluate_expression_with_context(high, row, columns, ctx)?;
3403
3404            let ge_low = compare_values(&val, &low_val)? != std::cmp::Ordering::Less;
3405            let le_high = compare_values(&val, &high_val)? != std::cmp::Ordering::Greater;
3406            let in_range = ge_low && le_high;
3407
3408            Ok(Value::Boolean(if *negated { !in_range } else { in_range }))
3409        }
3410
3411        PlanExpression::Like {
3412            expr,
3413            pattern,
3414            negated,
3415        } => {
3416            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3417            let pattern_val = evaluate_expression_with_context(pattern, row, columns, ctx)?;
3418
3419            let val_str = match val {
3420                Value::String(s) => s,
3421                Value::Null => return Ok(Value::Null),
3422                Value::Integer(i) => i.to_string(),
3423                Value::Float(f) => f.to_string(),
3424                Value::Boolean(b) => b.to_string(),
3425                Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
3426                Value::Timestamp(t) => t.to_rfc3339(),
3427                Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
3428            };
3429
3430            let pattern_str = match pattern_val {
3431                Value::String(s) => s,
3432                Value::Null => return Ok(Value::Null),
3433                Value::Integer(i) => i.to_string(),
3434                Value::Float(f) => f.to_string(),
3435                Value::Boolean(b) => b.to_string(),
3436                Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
3437                Value::Timestamp(t) => t.to_rfc3339(),
3438                Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
3439            };
3440
3441            // Convert SQL LIKE pattern to regex
3442            let regex_pattern = pattern_str.replace('%', ".*").replace('_', ".");
3443            let regex_pattern = format!("^{}$", regex_pattern);
3444
3445            // Simple pattern matching (could use regex crate for full support)
3446            let matches = if regex_pattern == "^.*$" {
3447                true
3448            } else if regex_pattern.starts_with("^") && regex_pattern.ends_with("$") {
3449                let inner = &regex_pattern[1..regex_pattern.len() - 1];
3450                if inner.contains(".*") || inner.contains('.') {
3451                    // For patterns with wildcards, do simple matching
3452                    let parts: Vec<&str> = inner.split(".*").collect();
3453                    if parts.len() == 1 {
3454                        val_str == parts[0]
3455                    } else {
3456                        let mut pos = 0;
3457                        let mut matched = true;
3458                        for (i, part) in parts.iter().enumerate() {
3459                            if part.is_empty() {
3460                                continue;
3461                            }
3462                            if let Some(found_pos) = val_str[pos..].find(part) {
3463                                if i == 0 && found_pos != 0 {
3464                                    matched = false;
3465                                    break;
3466                                }
3467                                pos += found_pos + part.len();
3468                            } else {
3469                                matched = false;
3470                                break;
3471                            }
3472                        }
3473                        matched
3474                    }
3475                } else {
3476                    val_str == inner
3477                }
3478            } else {
3479                val_str.contains(&pattern_str)
3480            };
3481
3482            Ok(Value::Boolean(if *negated { !matches } else { matches }))
3483        }
3484
3485        PlanExpression::InSubquery {
3486            expr,
3487            subquery,
3488            negated,
3489        } => {
3490            // Evaluate the expression to check
3491            let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3492
3493            // Execute the subquery
3494            let subquery_rows = ctx.execute_subquery(subquery)?;
3495
3496            // Check if val is in any of the subquery results (first column)
3497            let mut found = false;
3498            for subquery_row in &subquery_rows {
3499                if let Some(subquery_val) = subquery_row.values.first() {
3500                    if compare_values(&val, subquery_val)? == std::cmp::Ordering::Equal {
3501                        found = true;
3502                        break;
3503                    }
3504                }
3505            }
3506
3507            Ok(Value::Boolean(if *negated { !found } else { found }))
3508        }
3509
3510        PlanExpression::Exists { subquery, negated } => {
3511            // Execute the subquery and check if it returns any rows
3512            let subquery_rows = ctx.execute_subquery(subquery)?;
3513            let exists = !subquery_rows.is_empty();
3514
3515            Ok(Value::Boolean(if *negated { !exists } else { exists }))
3516        }
3517
3518        PlanExpression::ScalarSubquery(subquery) => {
3519            // Execute the subquery and return the single value
3520            let subquery_rows = ctx.execute_subquery(subquery)?;
3521
3522            if subquery_rows.is_empty() {
3523                return Ok(Value::Null);
3524            }
3525
3526            if subquery_rows.len() > 1 {
3527                return Err(ExecutorError::Internal(
3528                    "Scalar subquery returned more than one row".to_string(),
3529                ));
3530            }
3531
3532            // Return the first column of the first row
3533            subquery_rows[0].values.first().cloned().ok_or_else(|| {
3534                ExecutorError::Internal("Scalar subquery returned no columns".to_string())
3535            })
3536        }
3537
3538        PlanExpression::Placeholder(idx) => {
3539            // Placeholders should be replaced before execution
3540            Err(ExecutorError::Internal(format!(
3541                "Unresolved placeholder ${}",
3542                idx
3543            )))
3544        }
3545    }
3546}
3547
3548fn evaluate_binary_op(op: PlanBinaryOp, left: &Value, right: &Value) -> ExecutorResult<Value> {
3549    if matches!(left, Value::Null) || matches!(right, Value::Null) {
3550        if matches!(op, PlanBinaryOp::Equal | PlanBinaryOp::NotEqual) {
3551            return Ok(Value::Null);
3552        }
3553        return Ok(Value::Null);
3554    }
3555
3556    match op {
3557        PlanBinaryOp::Add => {
3558            let l = value_to_f64(left)?;
3559            let r = value_to_f64(right)?;
3560            Ok(Value::Float(l + r))
3561        }
3562        PlanBinaryOp::Subtract => {
3563            let l = value_to_f64(left)?;
3564            let r = value_to_f64(right)?;
3565            Ok(Value::Float(l - r))
3566        }
3567        PlanBinaryOp::Multiply => {
3568            let l = value_to_f64(left)?;
3569            let r = value_to_f64(right)?;
3570            Ok(Value::Float(l * r))
3571        }
3572        PlanBinaryOp::Divide => {
3573            let l = value_to_f64(left)?;
3574            let r = value_to_f64(right)?;
3575            if r == 0.0 {
3576                return Err(ExecutorError::DivisionByZero);
3577            }
3578            Ok(Value::Float(l / r))
3579        }
3580        PlanBinaryOp::Modulo => {
3581            let l = value_to_i64(left)?;
3582            let r = value_to_i64(right)?;
3583            if r == 0 {
3584                return Err(ExecutorError::DivisionByZero);
3585            }
3586            Ok(Value::Integer(l % r))
3587        }
3588        PlanBinaryOp::Equal => Ok(Value::Boolean(
3589            compare_values(left, right)? == std::cmp::Ordering::Equal,
3590        )),
3591        PlanBinaryOp::NotEqual => Ok(Value::Boolean(
3592            compare_values(left, right)? != std::cmp::Ordering::Equal,
3593        )),
3594        PlanBinaryOp::LessThan => Ok(Value::Boolean(
3595            compare_values(left, right)? == std::cmp::Ordering::Less,
3596        )),
3597        PlanBinaryOp::LessThanOrEqual => Ok(Value::Boolean(
3598            compare_values(left, right)? != std::cmp::Ordering::Greater,
3599        )),
3600        PlanBinaryOp::GreaterThan => Ok(Value::Boolean(
3601            compare_values(left, right)? == std::cmp::Ordering::Greater,
3602        )),
3603        PlanBinaryOp::GreaterThanOrEqual => Ok(Value::Boolean(
3604            compare_values(left, right)? != std::cmp::Ordering::Less,
3605        )),
3606        PlanBinaryOp::And => {
3607            let l = value_to_bool(left)?;
3608            let r = value_to_bool(right)?;
3609            Ok(Value::Boolean(l && r))
3610        }
3611        PlanBinaryOp::Or => {
3612            let l = value_to_bool(left)?;
3613            let r = value_to_bool(right)?;
3614            Ok(Value::Boolean(l || r))
3615        }
3616        PlanBinaryOp::Concat => {
3617            let l = value_to_string(left);
3618            let r = value_to_string(right);
3619            Ok(Value::String(format!("{}{}", l, r)))
3620        }
3621    }
3622}
3623
3624fn evaluate_unary_op(op: PlanUnaryOp, value: &Value) -> ExecutorResult<Value> {
3625    match op {
3626        PlanUnaryOp::Not => {
3627            let b = value_to_bool(value)?;
3628            Ok(Value::Boolean(!b))
3629        }
3630        PlanUnaryOp::Negative => {
3631            let f = value_to_f64(value)?;
3632            Ok(Value::Float(-f))
3633        }
3634    }
3635}
3636
3637fn evaluate_function(name: &str, args: &[Value]) -> ExecutorResult<Value> {
3638    match name.to_uppercase().as_str() {
3639        "UPPER" => {
3640            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3641            Ok(Value::String(s.to_uppercase()))
3642        }
3643        "LOWER" => {
3644            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3645            Ok(Value::String(s.to_lowercase()))
3646        }
3647        "LENGTH" => {
3648            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3649            Ok(Value::Integer(s.len() as i64))
3650        }
3651        "ABS" => {
3652            let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3653            Ok(Value::Float(f.abs()))
3654        }
3655        "COALESCE" => {
3656            for arg in args {
3657                if !matches!(arg, Value::Null) {
3658                    return Ok(arg.clone());
3659                }
3660            }
3661            Ok(Value::Null)
3662        }
3663        "ROUND" => {
3664            let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3665            let decimals = args
3666                .get(1)
3667                .and_then(|v| match v {
3668                    Value::Integer(i) => Some(*i as i32),
3669                    _ => None,
3670                })
3671                .unwrap_or(0);
3672            let factor = 10_f64.powi(decimals);
3673            Ok(Value::Float((f * factor).round() / factor))
3674        }
3675        "CEIL" | "CEILING" => {
3676            let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3677            Ok(Value::Float(f.ceil()))
3678        }
3679        "FLOOR" => {
3680            let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3681            Ok(Value::Float(f.floor()))
3682        }
3683        "SUBSTRING" | "SUBSTR" => {
3684            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3685            let start = args
3686                .get(1)
3687                .and_then(|v| match v {
3688                    Value::Integer(i) => Some((*i as usize).saturating_sub(1)), // SQL is 1-indexed
3689                    _ => None,
3690                })
3691                .unwrap_or(0);
3692            let len = args.get(2).and_then(|v| match v {
3693                Value::Integer(i) => Some(*i as usize),
3694                _ => None,
3695            });
3696            let result: String = match len {
3697                Some(l) => s.chars().skip(start).take(l).collect(),
3698                None => s.chars().skip(start).collect(),
3699            };
3700            Ok(Value::String(result))
3701        }
3702        "TRIM" => {
3703            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3704            Ok(Value::String(s.trim().to_string()))
3705        }
3706        "LTRIM" => {
3707            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3708            Ok(Value::String(s.trim_start().to_string()))
3709        }
3710        "RTRIM" => {
3711            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3712            Ok(Value::String(s.trim_end().to_string()))
3713        }
3714        "NULLIF" => {
3715            if args.len() >= 2 && args[0] == args[1] {
3716                Ok(Value::Null)
3717            } else {
3718                Ok(args.first().cloned().unwrap_or(Value::Null))
3719            }
3720        }
3721        "NOW" | "CURRENT_TIMESTAMP" => Ok(Value::String(
3722            chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
3723        )),
3724        "CURRENT_DATE" => Ok(Value::String(
3725            chrono::Utc::now().format("%Y-%m-%d").to_string(),
3726        )),
3727        "CURRENT_TIME" => Ok(Value::String(
3728            chrono::Utc::now().format("%H:%M:%S").to_string(),
3729        )),
3730        "EXTRACT" => {
3731            // EXTRACT is typically handled specially by the parser, but handle the function form
3732            let s = value_to_string(&args.last().cloned().unwrap_or(Value::Null));
3733            let part =
3734                value_to_string(&args.first().cloned().unwrap_or(Value::Null)).to_uppercase();
3735            if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S") {
3736                let val = match part.as_str() {
3737                    "YEAR" => dt.format("%Y").to_string().parse::<i64>().unwrap_or(0),
3738                    "MONTH" => dt.format("%m").to_string().parse::<i64>().unwrap_or(0),
3739                    "DAY" => dt.format("%d").to_string().parse::<i64>().unwrap_or(0),
3740                    "HOUR" => dt.format("%H").to_string().parse::<i64>().unwrap_or(0),
3741                    "MINUTE" => dt.format("%M").to_string().parse::<i64>().unwrap_or(0),
3742                    "SECOND" => dt.format("%S").to_string().parse::<i64>().unwrap_or(0),
3743                    _ => 0,
3744                };
3745                Ok(Value::Integer(val))
3746            } else {
3747                Ok(Value::Null)
3748            }
3749        }
3750        "REPLACE" => {
3751            let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3752            let from = value_to_string(&args.get(1).cloned().unwrap_or(Value::Null));
3753            let to = value_to_string(&args.get(2).cloned().unwrap_or(Value::Null));
3754            Ok(Value::String(s.replace(&from, &to)))
3755        }
3756        "CONCAT" => {
3757            let result: String = args.iter().map(|a| value_to_string(a)).collect();
3758            Ok(Value::String(result))
3759        }
3760        _ => Err(ExecutorError::InvalidOperation(format!(
3761            "Unknown function: {}",
3762            name
3763        ))),
3764    }
3765}
3766
3767// =============================================================================
3768// Value Conversion Utilities
3769// =============================================================================
3770
3771fn value_to_f64(value: &Value) -> ExecutorResult<f64> {
3772    match value {
3773        Value::Integer(i) => Ok(*i as f64),
3774        Value::Float(f) => Ok(*f),
3775        Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
3776            expected: "number".to_string(),
3777            actual: "string".to_string(),
3778        }),
3779        _ => Err(ExecutorError::TypeMismatch {
3780            expected: "number".to_string(),
3781            actual: format!("{:?}", value),
3782        }),
3783    }
3784}
3785
3786fn value_to_i64(value: &Value) -> ExecutorResult<i64> {
3787    match value {
3788        Value::Integer(i) => Ok(*i),
3789        Value::Float(f) => Ok(*f as i64),
3790        Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
3791            expected: "integer".to_string(),
3792            actual: "string".to_string(),
3793        }),
3794        _ => Err(ExecutorError::TypeMismatch {
3795            expected: "integer".to_string(),
3796            actual: format!("{:?}", value),
3797        }),
3798    }
3799}
3800
3801fn value_to_bool(value: &Value) -> ExecutorResult<bool> {
3802    match value {
3803        Value::Boolean(b) => Ok(*b),
3804        Value::Integer(i) => Ok(*i != 0),
3805        Value::Null => Ok(false),
3806        _ => Err(ExecutorError::TypeMismatch {
3807            expected: "boolean".to_string(),
3808            actual: format!("{:?}", value),
3809        }),
3810    }
3811}
3812
3813fn value_to_string(value: &Value) -> String {
3814    match value {
3815        Value::String(s) => s.clone(),
3816        Value::Integer(i) => i.to_string(),
3817        Value::Float(f) => f.to_string(),
3818        Value::Boolean(b) => b.to_string(),
3819        Value::Null => String::new(),
3820        _ => format!("{:?}", value),
3821    }
3822}
3823
3824fn compare_values(left: &Value, right: &Value) -> ExecutorResult<std::cmp::Ordering> {
3825    match (left, right) {
3826        (Value::Integer(l), Value::Integer(r)) => Ok(l.cmp(r)),
3827        (Value::Float(l), Value::Float(r)) => {
3828            Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
3829        }
3830        (Value::Integer(l), Value::Float(r)) => {
3831            let l = *l as f64;
3832            Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
3833        }
3834        (Value::Float(l), Value::Integer(r)) => {
3835            let r = *r as f64;
3836            Ok(l.partial_cmp(&r).unwrap_or(std::cmp::Ordering::Equal))
3837        }
3838        (Value::String(l), Value::String(r)) => Ok(l.cmp(r)),
3839        (Value::Boolean(l), Value::Boolean(r)) => Ok(l.cmp(r)),
3840        _ => Ok(std::cmp::Ordering::Equal),
3841    }
3842}
3843
3844fn cast_value(value: &Value, target_type: &DataType) -> ExecutorResult<Value> {
3845    match target_type {
3846        DataType::Integer => Ok(Value::Integer(value_to_i64(value)?)),
3847        DataType::Float => Ok(Value::Float(value_to_f64(value)?)),
3848        DataType::Text => Ok(Value::String(value_to_string(value))),
3849        DataType::Boolean => Ok(Value::Boolean(value_to_bool(value)?)),
3850        _ => Ok(value.clone()),
3851    }
3852}
3853
3854// =============================================================================
3855// Tests
3856// =============================================================================
3857
3858#[cfg(test)]
3859mod tests {
3860    use super::*;
3861    use crate::planner::{LimitNode, PlanNode, ProjectNode, ProjectionExpr, QueryPlan, ScanNode};
3862
3863    fn create_test_context() -> ExecutionContext {
3864        let mut context = ExecutionContext::new();
3865
3866        context.add_table(TableData {
3867            name: "users".to_string(),
3868            columns: vec!["id".to_string(), "name".to_string(), "age".to_string()],
3869            rows: vec![
3870                Row {
3871                    values: vec![
3872                        Value::Integer(1),
3873                        Value::String("Alice".to_string()),
3874                        Value::Integer(30),
3875                    ],
3876                },
3877                Row {
3878                    values: vec![
3879                        Value::Integer(2),
3880                        Value::String("Bob".to_string()),
3881                        Value::Integer(25),
3882                    ],
3883                },
3884                Row {
3885                    values: vec![
3886                        Value::Integer(3),
3887                        Value::String("Charlie".to_string()),
3888                        Value::Integer(35),
3889                    ],
3890                },
3891            ],
3892            row_created_version: vec![0, 0, 0],
3893            row_deleted_version: vec![0, 0, 0],
3894        });
3895
3896        context
3897    }
3898
3899    #[test]
3900    fn test_scan_operator() {
3901        let context = create_test_context();
3902        let executor = Executor::new(context);
3903
3904        let plan = QueryPlan {
3905            root: PlanNode::Project(ProjectNode {
3906                input: Box::new(PlanNode::Scan(ScanNode {
3907                    table_name: "users".to_string(),
3908                    alias: None,
3909                    columns: vec!["id".to_string(), "name".to_string(), "age".to_string()],
3910                    index_scan: None,
3911                })),
3912                expressions: vec![
3913                    ProjectionExpr {
3914                        expr: PlanExpression::Column {
3915                            table: None,
3916                            name: "id".to_string(),
3917                            data_type: DataType::Integer,
3918                        },
3919                        alias: Some("id".to_string()),
3920                    },
3921                    ProjectionExpr {
3922                        expr: PlanExpression::Column {
3923                            table: None,
3924                            name: "name".to_string(),
3925                            data_type: DataType::Text,
3926                        },
3927                        alias: Some("name".to_string()),
3928                    },
3929                ],
3930            }),
3931            estimated_cost: 100.0,
3932            estimated_rows: 3,
3933        };
3934
3935        let result = executor.execute(&plan).unwrap();
3936
3937        assert_eq!(result.rows.len(), 3);
3938        assert_eq!(result.columns.len(), 2);
3939    }
3940
3941    #[test]
3942    fn test_filter_operator() {
3943        let context = create_test_context();
3944        let executor = Executor::new(context);
3945
3946        let plan = QueryPlan {
3947            root: PlanNode::Project(ProjectNode {
3948                input: Box::new(PlanNode::Filter(crate::planner::FilterNode {
3949                    input: Box::new(PlanNode::Scan(ScanNode {
3950                        table_name: "users".to_string(),
3951                        alias: None,
3952                        columns: vec!["id".to_string(), "name".to_string(), "age".to_string()],
3953                        index_scan: None,
3954                    })),
3955                    predicate: PlanExpression::BinaryOp {
3956                        left: Box::new(PlanExpression::Column {
3957                            table: None,
3958                            name: "age".to_string(),
3959                            data_type: DataType::Integer,
3960                        }),
3961                        op: PlanBinaryOp::GreaterThan,
3962                        right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(28))),
3963                    },
3964                })),
3965                expressions: vec![ProjectionExpr {
3966                    expr: PlanExpression::Column {
3967                        table: None,
3968                        name: "name".to_string(),
3969                        data_type: DataType::Text,
3970                    },
3971                    alias: Some("name".to_string()),
3972                }],
3973            }),
3974            estimated_cost: 100.0,
3975            estimated_rows: 2,
3976        };
3977
3978        let result = executor.execute(&plan).unwrap();
3979
3980        assert_eq!(result.rows.len(), 2);
3981    }
3982
3983    #[test]
3984    fn test_limit_operator() {
3985        let context = create_test_context();
3986        let executor = Executor::new(context);
3987
3988        let plan = QueryPlan {
3989            root: PlanNode::Limit(LimitNode {
3990                input: Box::new(PlanNode::Project(ProjectNode {
3991                    input: Box::new(PlanNode::Scan(ScanNode {
3992                        table_name: "users".to_string(),
3993                        alias: None,
3994                        columns: vec!["id".to_string()],
3995                        index_scan: None,
3996                    })),
3997                    expressions: vec![ProjectionExpr {
3998                        expr: PlanExpression::Column {
3999                            table: None,
4000                            name: "id".to_string(),
4001                            data_type: DataType::Integer,
4002                        },
4003                        alias: Some("id".to_string()),
4004                    }],
4005                })),
4006                limit: Some(2),
4007                offset: None,
4008            }),
4009            estimated_cost: 100.0,
4010            estimated_rows: 2,
4011        };
4012
4013        let result = executor.execute(&plan).unwrap();
4014
4015        assert_eq!(result.rows.len(), 2);
4016    }
4017
4018    #[test]
4019    fn test_create_table() {
4020        use crate::planner::{CreateColumnDef, CreateTableNode};
4021
4022        let executor = Executor::new(ExecutionContext::new());
4023
4024        let plan = QueryPlan {
4025            root: PlanNode::CreateTable(CreateTableNode {
4026                table_name: "test_table".to_string(),
4027                columns: vec![
4028                    CreateColumnDef {
4029                        name: "id".to_string(),
4030                        data_type: DataType::Integer,
4031                        nullable: false,
4032                        default: None,
4033                        primary_key: true,
4034                        unique: false,
4035                    },
4036                    CreateColumnDef {
4037                        name: "name".to_string(),
4038                        data_type: DataType::Text,
4039                        nullable: true,
4040                        default: None,
4041                        primary_key: false,
4042                        unique: false,
4043                    },
4044                ],
4045                constraints: vec![],
4046                if_not_exists: false,
4047            }),
4048            estimated_cost: 1.0,
4049            estimated_rows: 0,
4050        };
4051
4052        let result = executor.execute(&plan).unwrap();
4053        match &result.rows[0].values[0] {
4054            Value::String(s) => assert!(s.contains("created")),
4055            _ => panic!("Expected string result"),
4056        }
4057
4058        // Verify table exists by listing tables
4059        let tables = executor.context.read().unwrap().list_tables();
4060        assert!(tables.contains(&"test_table".to_string()));
4061    }
4062
4063    #[test]
4064    fn test_insert_into_table() {
4065        use crate::planner::{CreateColumnDef, CreateTableNode, InsertNode, InsertPlanSource};
4066
4067        let executor = Executor::new(ExecutionContext::new());
4068
4069        // First create a table
4070        let create_plan = QueryPlan {
4071            root: PlanNode::CreateTable(CreateTableNode {
4072                table_name: "test_insert".to_string(),
4073                columns: vec![
4074                    CreateColumnDef {
4075                        name: "id".to_string(),
4076                        data_type: DataType::Integer,
4077                        nullable: false,
4078                        default: None,
4079                        primary_key: true,
4080                        unique: false,
4081                    },
4082                    CreateColumnDef {
4083                        name: "value".to_string(),
4084                        data_type: DataType::Text,
4085                        nullable: true,
4086                        default: None,
4087                        primary_key: false,
4088                        unique: false,
4089                    },
4090                ],
4091                constraints: vec![],
4092                if_not_exists: false,
4093            }),
4094            estimated_cost: 1.0,
4095            estimated_rows: 0,
4096        };
4097        executor.execute(&create_plan).unwrap();
4098
4099        // Now insert data
4100        let insert_plan = QueryPlan {
4101            root: PlanNode::Insert(InsertNode {
4102                table_name: "test_insert".to_string(),
4103                columns: vec!["id".to_string(), "value".to_string()],
4104                source: InsertPlanSource::Values(vec![
4105                    vec![
4106                        PlanExpression::Literal(PlanLiteral::Integer(1)),
4107                        PlanExpression::Literal(PlanLiteral::String("hello".to_string())),
4108                    ],
4109                    vec![
4110                        PlanExpression::Literal(PlanLiteral::Integer(2)),
4111                        PlanExpression::Literal(PlanLiteral::String("world".to_string())),
4112                    ],
4113                ]),
4114            }),
4115            estimated_cost: 2.0,
4116            estimated_rows: 2,
4117        };
4118
4119        let result = executor.execute(&insert_plan).unwrap();
4120        assert_eq!(result.rows_affected, 2);
4121
4122        // Query the data
4123        let query_plan = QueryPlan {
4124            root: PlanNode::Project(ProjectNode {
4125                input: Box::new(PlanNode::Scan(ScanNode {
4126                    table_name: "test_insert".to_string(),
4127                    alias: None,
4128                    columns: vec!["id".to_string(), "value".to_string()],
4129                    index_scan: None,
4130                })),
4131                expressions: vec![ProjectionExpr {
4132                    expr: PlanExpression::Column {
4133                        table: None,
4134                        name: "id".to_string(),
4135                        data_type: DataType::Integer,
4136                    },
4137                    alias: Some("id".to_string()),
4138                }],
4139            }),
4140            estimated_cost: 100.0,
4141            estimated_rows: 2,
4142        };
4143
4144        let query_result = executor.execute(&query_plan).unwrap();
4145        assert_eq!(query_result.rows.len(), 2);
4146    }
4147
4148    #[test]
4149    fn test_drop_table() {
4150        use crate::planner::{CreateColumnDef, CreateTableNode, DropTableNode};
4151
4152        let executor = Executor::new(ExecutionContext::new());
4153
4154        // Create a table
4155        let create_plan = QueryPlan {
4156            root: PlanNode::CreateTable(CreateTableNode {
4157                table_name: "to_drop".to_string(),
4158                columns: vec![CreateColumnDef {
4159                    name: "id".to_string(),
4160                    data_type: DataType::Integer,
4161                    nullable: false,
4162                    default: None,
4163                    primary_key: true,
4164                    unique: false,
4165                }],
4166                constraints: vec![],
4167                if_not_exists: false,
4168            }),
4169            estimated_cost: 1.0,
4170            estimated_rows: 0,
4171        };
4172        executor.execute(&create_plan).unwrap();
4173
4174        // Verify it exists
4175        let tables = executor.context.read().unwrap().list_tables();
4176        assert!(tables.contains(&"to_drop".to_string()));
4177
4178        // Drop the table
4179        let drop_plan = QueryPlan {
4180            root: PlanNode::DropTable(DropTableNode {
4181                table_name: "to_drop".to_string(),
4182                if_exists: false,
4183            }),
4184            estimated_cost: 1.0,
4185            estimated_rows: 0,
4186        };
4187        executor.execute(&drop_plan).unwrap();
4188
4189        // Verify it's gone
4190        let tables = executor.context.read().unwrap().list_tables();
4191        assert!(!tables.contains(&"to_drop".to_string()));
4192    }
4193
4194    #[test]
4195    fn test_shared_context_persistence() {
4196        use crate::planner::{CreateColumnDef, CreateTableNode};
4197        use std::sync::{Arc, RwLock};
4198
4199        // Create a shared context
4200        let shared_context = Arc::new(RwLock::new(ExecutionContext::new()));
4201
4202        // Create executor with shared context
4203        let executor = Executor::with_shared_context(shared_context.clone());
4204
4205        // Create a table
4206        let create_plan = QueryPlan {
4207            root: PlanNode::CreateTable(CreateTableNode {
4208                table_name: "shared_table".to_string(),
4209                columns: vec![CreateColumnDef {
4210                    name: "id".to_string(),
4211                    data_type: DataType::Integer,
4212                    nullable: false,
4213                    default: None,
4214                    primary_key: true,
4215                    unique: false,
4216                }],
4217                constraints: vec![],
4218                if_not_exists: false,
4219            }),
4220            estimated_cost: 1.0,
4221            estimated_rows: 0,
4222        };
4223        executor.execute(&create_plan).unwrap();
4224
4225        // Create a new executor with the SAME shared context
4226        let executor2 = Executor::with_shared_context(shared_context.clone());
4227
4228        // Verify the table exists from the second executor's perspective
4229        let tables = executor2.context.read().unwrap().list_tables();
4230        assert!(tables.contains(&"shared_table".to_string()));
4231    }
4232
4233    #[test]
4234    fn test_insert_select() {
4235        use crate::planner::{
4236            CreateColumnDef, CreateTableNode, FilterNode, InsertNode, InsertPlanSource,
4237        };
4238
4239        let executor = Executor::new(ExecutionContext::new());
4240
4241        // Create source table
4242        let create_source = QueryPlan {
4243            root: PlanNode::CreateTable(CreateTableNode {
4244                table_name: "source_table".to_string(),
4245                columns: vec![
4246                    CreateColumnDef {
4247                        name: "id".to_string(),
4248                        data_type: DataType::Integer,
4249                        nullable: false,
4250                        default: None,
4251                        primary_key: true,
4252                        unique: false,
4253                    },
4254                    CreateColumnDef {
4255                        name: "value".to_string(),
4256                        data_type: DataType::Text,
4257                        nullable: true,
4258                        default: None,
4259                        primary_key: false,
4260                        unique: false,
4261                    },
4262                ],
4263                constraints: vec![],
4264                if_not_exists: false,
4265            }),
4266            estimated_cost: 1.0,
4267            estimated_rows: 0,
4268        };
4269        executor.execute(&create_source).unwrap();
4270
4271        // Create destination table
4272        let create_dest = QueryPlan {
4273            root: PlanNode::CreateTable(CreateTableNode {
4274                table_name: "dest_table".to_string(),
4275                columns: vec![
4276                    CreateColumnDef {
4277                        name: "id".to_string(),
4278                        data_type: DataType::Integer,
4279                        nullable: false,
4280                        default: None,
4281                        primary_key: true,
4282                        unique: false,
4283                    },
4284                    CreateColumnDef {
4285                        name: "value".to_string(),
4286                        data_type: DataType::Text,
4287                        nullable: true,
4288                        default: None,
4289                        primary_key: false,
4290                        unique: false,
4291                    },
4292                ],
4293                constraints: vec![],
4294                if_not_exists: false,
4295            }),
4296            estimated_cost: 1.0,
4297            estimated_rows: 0,
4298        };
4299        executor.execute(&create_dest).unwrap();
4300
4301        // Insert data into source table
4302        let insert_source = QueryPlan {
4303            root: PlanNode::Insert(InsertNode {
4304                table_name: "source_table".to_string(),
4305                columns: vec!["id".to_string(), "value".to_string()],
4306                source: InsertPlanSource::Values(vec![
4307                    vec![
4308                        PlanExpression::Literal(PlanLiteral::Integer(1)),
4309                        PlanExpression::Literal(PlanLiteral::String("one".to_string())),
4310                    ],
4311                    vec![
4312                        PlanExpression::Literal(PlanLiteral::Integer(2)),
4313                        PlanExpression::Literal(PlanLiteral::String("two".to_string())),
4314                    ],
4315                    vec![
4316                        PlanExpression::Literal(PlanLiteral::Integer(3)),
4317                        PlanExpression::Literal(PlanLiteral::String("three".to_string())),
4318                    ],
4319                ]),
4320            }),
4321            estimated_cost: 3.0,
4322            estimated_rows: 3,
4323        };
4324        executor.execute(&insert_source).unwrap();
4325
4326        // INSERT INTO dest_table SELECT * FROM source_table WHERE id > 1
4327        let insert_select = QueryPlan {
4328            root: PlanNode::Insert(InsertNode {
4329                table_name: "dest_table".to_string(),
4330                columns: vec!["id".to_string(), "value".to_string()],
4331                source: InsertPlanSource::Query(Box::new(PlanNode::Project(ProjectNode {
4332                    input: Box::new(PlanNode::Filter(FilterNode {
4333                        input: Box::new(PlanNode::Scan(ScanNode {
4334                            table_name: "source_table".to_string(),
4335                            alias: None,
4336                            columns: vec!["id".to_string(), "value".to_string()],
4337                            index_scan: None,
4338                        })),
4339                        predicate: PlanExpression::BinaryOp {
4340                            left: Box::new(PlanExpression::Column {
4341                                table: None,
4342                                name: "id".to_string(),
4343                                data_type: DataType::Integer,
4344                            }),
4345                            op: PlanBinaryOp::GreaterThan,
4346                            right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
4347                        },
4348                    })),
4349                    expressions: vec![
4350                        ProjectionExpr {
4351                            expr: PlanExpression::Column {
4352                                table: None,
4353                                name: "id".to_string(),
4354                                data_type: DataType::Integer,
4355                            },
4356                            alias: Some("id".to_string()),
4357                        },
4358                        ProjectionExpr {
4359                            expr: PlanExpression::Column {
4360                                table: None,
4361                                name: "value".to_string(),
4362                                data_type: DataType::Text,
4363                            },
4364                            alias: Some("value".to_string()),
4365                        },
4366                    ],
4367                }))),
4368            }),
4369            estimated_cost: 2.0,
4370            estimated_rows: 2,
4371        };
4372
4373        let result = executor.execute(&insert_select).unwrap();
4374        assert_eq!(result.rows_affected, 2); // Only rows with id > 1 (id=2 and id=3)
4375
4376        // Query dest_table to verify
4377        let query_plan = QueryPlan {
4378            root: PlanNode::Project(ProjectNode {
4379                input: Box::new(PlanNode::Scan(ScanNode {
4380                    table_name: "dest_table".to_string(),
4381                    alias: None,
4382                    columns: vec!["id".to_string(), "value".to_string()],
4383                    index_scan: None,
4384                })),
4385                expressions: vec![ProjectionExpr {
4386                    expr: PlanExpression::Column {
4387                        table: None,
4388                        name: "id".to_string(),
4389                        data_type: DataType::Integer,
4390                    },
4391                    alias: Some("id".to_string()),
4392                }],
4393            }),
4394            estimated_cost: 100.0,
4395            estimated_rows: 2,
4396        };
4397
4398        let query_result = executor.execute(&query_plan).unwrap();
4399        assert_eq!(query_result.rows.len(), 2);
4400    }
4401
4402    #[test]
4403    fn test_case_expression() {
4404        let row = Row {
4405            values: vec![Value::Integer(2)],
4406        };
4407        let columns = vec!["status".to_string()];
4408
4409        // CASE WHEN status = 1 THEN 'one' WHEN status = 2 THEN 'two' ELSE 'other' END
4410        let case_expr = PlanExpression::Case {
4411            operand: None,
4412            conditions: vec![
4413                (
4414                    PlanExpression::BinaryOp {
4415                        left: Box::new(PlanExpression::Column {
4416                            table: None,
4417                            name: "status".to_string(),
4418                            data_type: DataType::Integer,
4419                        }),
4420                        op: PlanBinaryOp::Equal,
4421                        right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
4422                    },
4423                    PlanExpression::Literal(PlanLiteral::String("one".to_string())),
4424                ),
4425                (
4426                    PlanExpression::BinaryOp {
4427                        left: Box::new(PlanExpression::Column {
4428                            table: None,
4429                            name: "status".to_string(),
4430                            data_type: DataType::Integer,
4431                        }),
4432                        op: PlanBinaryOp::Equal,
4433                        right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(2))),
4434                    },
4435                    PlanExpression::Literal(PlanLiteral::String("two".to_string())),
4436                ),
4437            ],
4438            else_result: Some(Box::new(PlanExpression::Literal(PlanLiteral::String(
4439                "other".to_string(),
4440            )))),
4441        };
4442
4443        let result = evaluate_expression(&case_expr, &row, &columns).unwrap();
4444        assert_eq!(result, Value::String("two".to_string()));
4445    }
4446
4447    #[test]
4448    fn test_in_list_expression() {
4449        let row = Row {
4450            values: vec![Value::Integer(3)],
4451        };
4452        let columns = vec!["id".to_string()];
4453
4454        // id IN (1, 2, 3, 4, 5)
4455        let in_expr = PlanExpression::InList {
4456            expr: Box::new(PlanExpression::Column {
4457                table: None,
4458                name: "id".to_string(),
4459                data_type: DataType::Integer,
4460            }),
4461            list: vec![
4462                PlanExpression::Literal(PlanLiteral::Integer(1)),
4463                PlanExpression::Literal(PlanLiteral::Integer(2)),
4464                PlanExpression::Literal(PlanLiteral::Integer(3)),
4465                PlanExpression::Literal(PlanLiteral::Integer(4)),
4466                PlanExpression::Literal(PlanLiteral::Integer(5)),
4467            ],
4468            negated: false,
4469        };
4470
4471        let result = evaluate_expression(&in_expr, &row, &columns).unwrap();
4472        assert_eq!(result, Value::Boolean(true));
4473
4474        // id NOT IN (1, 2, 3, 4, 5) - should be false
4475        let not_in_expr = PlanExpression::InList {
4476            expr: Box::new(PlanExpression::Column {
4477                table: None,
4478                name: "id".to_string(),
4479                data_type: DataType::Integer,
4480            }),
4481            list: vec![
4482                PlanExpression::Literal(PlanLiteral::Integer(1)),
4483                PlanExpression::Literal(PlanLiteral::Integer(2)),
4484                PlanExpression::Literal(PlanLiteral::Integer(3)),
4485            ],
4486            negated: true,
4487        };
4488
4489        let result = evaluate_expression(&not_in_expr, &row, &columns).unwrap();
4490        assert_eq!(result, Value::Boolean(false));
4491    }
4492
4493    #[test]
4494    fn test_between_expression() {
4495        let row = Row {
4496            values: vec![Value::Integer(50)],
4497        };
4498        let columns = vec!["value".to_string()];
4499
4500        // value BETWEEN 10 AND 100
4501        let between_expr = PlanExpression::Between {
4502            expr: Box::new(PlanExpression::Column {
4503                table: None,
4504                name: "value".to_string(),
4505                data_type: DataType::Integer,
4506            }),
4507            low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
4508            high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(100))),
4509            negated: false,
4510        };
4511
4512        let result = evaluate_expression(&between_expr, &row, &columns).unwrap();
4513        assert_eq!(result, Value::Boolean(true));
4514
4515        // value NOT BETWEEN 10 AND 40 (50 is outside, should be true)
4516        let not_between_expr = PlanExpression::Between {
4517            expr: Box::new(PlanExpression::Column {
4518                table: None,
4519                name: "value".to_string(),
4520                data_type: DataType::Integer,
4521            }),
4522            low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
4523            high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(40))),
4524            negated: true,
4525        };
4526
4527        let result = evaluate_expression(&not_between_expr, &row, &columns).unwrap();
4528        assert_eq!(result, Value::Boolean(true));
4529    }
4530
4531    #[test]
4532    fn test_like_expression() {
4533        let row = Row {
4534            values: vec![Value::String("hello world".to_string())],
4535        };
4536        let columns = vec!["text".to_string()];
4537
4538        // text LIKE 'hello%'
4539        let like_expr = PlanExpression::Like {
4540            expr: Box::new(PlanExpression::Column {
4541                table: None,
4542                name: "text".to_string(),
4543                data_type: DataType::Text,
4544            }),
4545            pattern: Box::new(PlanExpression::Literal(PlanLiteral::String(
4546                "hello%".to_string(),
4547            ))),
4548            negated: false,
4549        };
4550
4551        let result = evaluate_expression(&like_expr, &row, &columns).unwrap();
4552        assert_eq!(result, Value::Boolean(true));
4553
4554        // text LIKE '%world'
4555        let like_expr2 = PlanExpression::Like {
4556            expr: Box::new(PlanExpression::Column {
4557                table: None,
4558                name: "text".to_string(),
4559                data_type: DataType::Text,
4560            }),
4561            pattern: Box::new(PlanExpression::Literal(PlanLiteral::String(
4562                "%world".to_string(),
4563            ))),
4564            negated: false,
4565        };
4566
4567        let result = evaluate_expression(&like_expr2, &row, &columns).unwrap();
4568        assert_eq!(result, Value::Boolean(true));
4569
4570        // text NOT LIKE '%foo%' (should be true since 'foo' is not in 'hello world')
4571        let not_like_expr = PlanExpression::Like {
4572            expr: Box::new(PlanExpression::Column {
4573                table: None,
4574                name: "text".to_string(),
4575                data_type: DataType::Text,
4576            }),
4577            pattern: Box::new(PlanExpression::Literal(PlanLiteral::String(
4578                "%foo%".to_string(),
4579            ))),
4580            negated: true,
4581        };
4582
4583        let result = evaluate_expression(&not_like_expr, &row, &columns).unwrap();
4584        assert_eq!(result, Value::Boolean(true));
4585    }
4586
4587    #[test]
4588    fn test_alter_table() {
4589        use crate::planner::{
4590            AlterTableNode, CreateColumnDef, CreateTableNode, PlanAlterOperation,
4591        };
4592
4593        let executor = Executor::new(ExecutionContext::new());
4594
4595        // Create a table
4596        let create_plan = QueryPlan {
4597            root: PlanNode::CreateTable(CreateTableNode {
4598                table_name: "alter_test".to_string(),
4599                columns: vec![CreateColumnDef {
4600                    name: "id".to_string(),
4601                    data_type: DataType::Integer,
4602                    nullable: false,
4603                    default: None,
4604                    primary_key: true,
4605                    unique: false,
4606                }],
4607                constraints: vec![],
4608                if_not_exists: false,
4609            }),
4610            estimated_cost: 1.0,
4611            estimated_rows: 0,
4612        };
4613        executor.execute(&create_plan).unwrap();
4614
4615        // Add a column
4616        let add_column_plan = QueryPlan {
4617            root: PlanNode::AlterTable(AlterTableNode {
4618                table_name: "alter_test".to_string(),
4619                operations: vec![PlanAlterOperation::AddColumn(CreateColumnDef {
4620                    name: "name".to_string(),
4621                    data_type: DataType::Text,
4622                    nullable: true,
4623                    default: None,
4624                    primary_key: false,
4625                    unique: false,
4626                })],
4627            }),
4628            estimated_cost: 1.0,
4629            estimated_rows: 0,
4630        };
4631        let result = executor.execute(&add_column_plan).unwrap();
4632        match &result.rows[0].values[0] {
4633            Value::String(s) => assert!(s.contains("altered")),
4634            _ => panic!("Expected string result"),
4635        }
4636
4637        // Verify the column was added
4638        let schema = executor
4639            .context
4640            .read()
4641            .unwrap()
4642            .get_table_schema("alter_test")
4643            .unwrap()
4644            .clone();
4645        assert_eq!(schema.columns.len(), 2);
4646        assert_eq!(schema.columns[1].name, "name");
4647
4648        // Rename the column
4649        let rename_column_plan = QueryPlan {
4650            root: PlanNode::AlterTable(AlterTableNode {
4651                table_name: "alter_test".to_string(),
4652                operations: vec![PlanAlterOperation::RenameColumn {
4653                    old_name: "name".to_string(),
4654                    new_name: "full_name".to_string(),
4655                }],
4656            }),
4657            estimated_cost: 1.0,
4658            estimated_rows: 0,
4659        };
4660        executor.execute(&rename_column_plan).unwrap();
4661
4662        // Verify the column was renamed
4663        let schema = executor
4664            .context
4665            .read()
4666            .unwrap()
4667            .get_table_schema("alter_test")
4668            .unwrap()
4669            .clone();
4670        assert_eq!(schema.columns[1].name, "full_name");
4671
4672        // Drop the column
4673        let drop_column_plan = QueryPlan {
4674            root: PlanNode::AlterTable(AlterTableNode {
4675                table_name: "alter_test".to_string(),
4676                operations: vec![PlanAlterOperation::DropColumn {
4677                    name: "full_name".to_string(),
4678                    if_exists: false,
4679                }],
4680            }),
4681            estimated_cost: 1.0,
4682            estimated_rows: 0,
4683        };
4684        executor.execute(&drop_column_plan).unwrap();
4685
4686        // Verify the column was dropped
4687        let schema = executor
4688            .context
4689            .read()
4690            .unwrap()
4691            .get_table_schema("alter_test")
4692            .unwrap()
4693            .clone();
4694        assert_eq!(schema.columns.len(), 1);
4695    }
4696
4697    #[test]
4698    fn test_alter_table_constraints() {
4699        use crate::planner::{
4700            AlterTableNode, CreateColumnDef, CreateTableConstraint, CreateTableNode,
4701            PlanAlterOperation,
4702        };
4703
4704        let executor = Executor::new(ExecutionContext::new());
4705
4706        // Create a table with two columns
4707        let create_plan = QueryPlan {
4708            root: PlanNode::CreateTable(CreateTableNode {
4709                table_name: "constraint_test".to_string(),
4710                columns: vec![
4711                    CreateColumnDef {
4712                        name: "id".to_string(),
4713                        data_type: DataType::Integer,
4714                        nullable: false,
4715                        default: None,
4716                        primary_key: false,
4717                        unique: false,
4718                    },
4719                    CreateColumnDef {
4720                        name: "email".to_string(),
4721                        data_type: DataType::Text,
4722                        nullable: true,
4723                        default: None,
4724                        primary_key: false,
4725                        unique: false,
4726                    },
4727                ],
4728                constraints: vec![],
4729                if_not_exists: false,
4730            }),
4731            estimated_cost: 1.0,
4732            estimated_rows: 0,
4733        };
4734        executor.execute(&create_plan).unwrap();
4735
4736        // Add a UNIQUE constraint
4737        let add_unique_plan = QueryPlan {
4738            root: PlanNode::AlterTable(AlterTableNode {
4739                table_name: "constraint_test".to_string(),
4740                operations: vec![PlanAlterOperation::AddConstraint(
4741                    CreateTableConstraint::Unique {
4742                        columns: vec!["email".to_string()],
4743                    },
4744                )],
4745            }),
4746            estimated_cost: 1.0,
4747            estimated_rows: 0,
4748        };
4749        executor.execute(&add_unique_plan).unwrap();
4750
4751        // Verify the constraint was added
4752        let schema = executor
4753            .context
4754            .read()
4755            .unwrap()
4756            .get_table_schema("constraint_test")
4757            .unwrap()
4758            .clone();
4759        assert_eq!(schema.constraints.len(), 1);
4760        assert!(schema.constraints[0].name.contains("uq"));
4761
4762        // Add a PRIMARY KEY constraint
4763        let add_pk_plan = QueryPlan {
4764            root: PlanNode::AlterTable(AlterTableNode {
4765                table_name: "constraint_test".to_string(),
4766                operations: vec![PlanAlterOperation::AddConstraint(
4767                    CreateTableConstraint::PrimaryKey {
4768                        columns: vec!["id".to_string()],
4769                    },
4770                )],
4771            }),
4772            estimated_cost: 1.0,
4773            estimated_rows: 0,
4774        };
4775        executor.execute(&add_pk_plan).unwrap();
4776
4777        // Verify the primary key was set
4778        let schema = executor
4779            .context
4780            .read()
4781            .unwrap()
4782            .get_table_schema("constraint_test")
4783            .unwrap()
4784            .clone();
4785        assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
4786        assert_eq!(schema.constraints.len(), 2);
4787
4788        // Try adding another PRIMARY KEY (should fail)
4789        let add_dup_pk_plan = QueryPlan {
4790            root: PlanNode::AlterTable(AlterTableNode {
4791                table_name: "constraint_test".to_string(),
4792                operations: vec![PlanAlterOperation::AddConstraint(
4793                    CreateTableConstraint::PrimaryKey {
4794                        columns: vec!["email".to_string()],
4795                    },
4796                )],
4797            }),
4798            estimated_cost: 1.0,
4799            estimated_rows: 0,
4800        };
4801        let result = executor.execute(&add_dup_pk_plan);
4802        assert!(result.is_err());
4803
4804        // Drop the UNIQUE constraint by name
4805        let pk_constraint_name = {
4806            let schema = executor
4807                .context
4808                .read()
4809                .unwrap()
4810                .get_table_schema("constraint_test")
4811                .unwrap()
4812                .clone();
4813            schema
4814                .constraints
4815                .iter()
4816                .find(|c| matches!(c.constraint_type, StoredConstraintType::Unique { .. }))
4817                .map(|c| c.name.clone())
4818                .unwrap()
4819        };
4820
4821        let drop_constraint_plan = QueryPlan {
4822            root: PlanNode::AlterTable(AlterTableNode {
4823                table_name: "constraint_test".to_string(),
4824                operations: vec![PlanAlterOperation::DropConstraint {
4825                    name: pk_constraint_name,
4826                }],
4827            }),
4828            estimated_cost: 1.0,
4829            estimated_rows: 0,
4830        };
4831        executor.execute(&drop_constraint_plan).unwrap();
4832
4833        // Verify the constraint was dropped
4834        let schema = executor
4835            .context
4836            .read()
4837            .unwrap()
4838            .get_table_schema("constraint_test")
4839            .unwrap()
4840            .clone();
4841        assert_eq!(schema.constraints.len(), 1);
4842        // Only the PK constraint should remain
4843        assert!(matches!(
4844            schema.constraints[0].constraint_type,
4845            StoredConstraintType::PrimaryKey { .. }
4846        ));
4847    }
4848
4849    #[test]
4850    fn test_create_table_with_constraints() {
4851        use crate::planner::{CreateColumnDef, CreateTableConstraint, CreateTableNode};
4852
4853        let executor = Executor::new(ExecutionContext::new());
4854
4855        // Create a table with constraints
4856        let create_plan = QueryPlan {
4857            root: PlanNode::CreateTable(CreateTableNode {
4858                table_name: "users".to_string(),
4859                columns: vec![
4860                    CreateColumnDef {
4861                        name: "id".to_string(),
4862                        data_type: DataType::Integer,
4863                        nullable: false,
4864                        default: None,
4865                        primary_key: true,
4866                        unique: false,
4867                    },
4868                    CreateColumnDef {
4869                        name: "email".to_string(),
4870                        data_type: DataType::Text,
4871                        nullable: false,
4872                        default: None,
4873                        primary_key: false,
4874                        unique: true, // Column-level unique
4875                    },
4876                    CreateColumnDef {
4877                        name: "name".to_string(),
4878                        data_type: DataType::Text,
4879                        nullable: true,
4880                        default: None,
4881                        primary_key: false,
4882                        unique: false,
4883                    },
4884                ],
4885                constraints: vec![CreateTableConstraint::Unique {
4886                    columns: vec!["name".to_string()],
4887                }],
4888                if_not_exists: false,
4889            }),
4890            estimated_cost: 1.0,
4891            estimated_rows: 0,
4892        };
4893        executor.execute(&create_plan).unwrap();
4894
4895        // Verify schema has primary key and constraints
4896        let schema = executor
4897            .context
4898            .read()
4899            .unwrap()
4900            .get_table_schema("users")
4901            .unwrap()
4902            .clone();
4903        assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
4904        // Should have: pk, uq for name, uq for email (column-level)
4905        assert!(schema.constraints.len() >= 2);
4906    }
4907
4908    #[test]
4909    fn test_column_default_values() {
4910        use crate::planner::{
4911            CreateColumnDef, CreateTableNode, InsertNode, InsertPlanSource, PlanExpression,
4912            PlanLiteral,
4913        };
4914
4915        let executor = Executor::new(ExecutionContext::new());
4916
4917        // Create a table with a default value
4918        let create_plan = QueryPlan {
4919            root: PlanNode::CreateTable(CreateTableNode {
4920                table_name: "defaults_test".to_string(),
4921                columns: vec![
4922                    CreateColumnDef {
4923                        name: "id".to_string(),
4924                        data_type: DataType::Integer,
4925                        nullable: false,
4926                        default: None,
4927                        primary_key: true,
4928                        unique: false,
4929                    },
4930                    CreateColumnDef {
4931                        name: "status".to_string(),
4932                        data_type: DataType::Text,
4933                        nullable: false,
4934                        default: Some(PlanExpression::Literal(PlanLiteral::String(
4935                            "active".to_string(),
4936                        ))),
4937                        primary_key: false,
4938                        unique: false,
4939                    },
4940                    CreateColumnDef {
4941                        name: "count".to_string(),
4942                        data_type: DataType::Integer,
4943                        nullable: true,
4944                        default: Some(PlanExpression::Literal(PlanLiteral::Integer(0))),
4945                        primary_key: false,
4946                        unique: false,
4947                    },
4948                ],
4949                constraints: vec![],
4950                if_not_exists: false,
4951            }),
4952            estimated_cost: 1.0,
4953            estimated_rows: 0,
4954        };
4955        executor.execute(&create_plan).unwrap();
4956
4957        // Verify default values are stored in schema
4958        let schema = executor
4959            .context
4960            .read()
4961            .unwrap()
4962            .get_table_schema("defaults_test")
4963            .unwrap()
4964            .clone();
4965        assert_eq!(
4966            schema.columns[1].default,
4967            Some(Value::String("active".to_string()))
4968        );
4969        assert_eq!(schema.columns[2].default, Some(Value::Integer(0)));
4970
4971        // Insert a row specifying only the id column
4972        let insert_plan = QueryPlan {
4973            root: PlanNode::Insert(InsertNode {
4974                table_name: "defaults_test".to_string(),
4975                columns: vec!["id".to_string()],
4976                source: InsertPlanSource::Values(vec![vec![PlanExpression::Literal(
4977                    PlanLiteral::Integer(1),
4978                )]]),
4979            }),
4980            estimated_cost: 1.0,
4981            estimated_rows: 1,
4982        };
4983        executor.execute(&insert_plan).unwrap();
4984
4985        // Verify the default values were used
4986        let context = executor.context.read().unwrap();
4987        let table = context.get_table("defaults_test").unwrap();
4988        let table_data = table.read().unwrap();
4989
4990        assert_eq!(table_data.rows.len(), 1);
4991        assert_eq!(table_data.rows[0].values[0], Value::Integer(1)); // id
4992        assert_eq!(
4993            table_data.rows[0].values[1],
4994            Value::String("active".to_string())
4995        ); // status default
4996        assert_eq!(table_data.rows[0].values[2], Value::Integer(0)); // count default
4997    }
4998
4999    #[test]
5000    fn test_alter_column_default() {
5001        use crate::planner::{
5002            AlterTableNode, CreateColumnDef, CreateTableNode, PlanAlterOperation, PlanExpression,
5003            PlanLiteral,
5004        };
5005
5006        let executor = Executor::new(ExecutionContext::new());
5007
5008        // Create a table
5009        let create_plan = QueryPlan {
5010            root: PlanNode::CreateTable(CreateTableNode {
5011                table_name: "alter_default_test".to_string(),
5012                columns: vec![
5013                    CreateColumnDef {
5014                        name: "id".to_string(),
5015                        data_type: DataType::Integer,
5016                        nullable: false,
5017                        default: None,
5018                        primary_key: true,
5019                        unique: false,
5020                    },
5021                    CreateColumnDef {
5022                        name: "value".to_string(),
5023                        data_type: DataType::Integer,
5024                        nullable: true,
5025                        default: None,
5026                        primary_key: false,
5027                        unique: false,
5028                    },
5029                ],
5030                constraints: vec![],
5031                if_not_exists: false,
5032            }),
5033            estimated_cost: 1.0,
5034            estimated_rows: 0,
5035        };
5036        executor.execute(&create_plan).unwrap();
5037
5038        // Alter column to add a default
5039        let alter_plan = QueryPlan {
5040            root: PlanNode::AlterTable(AlterTableNode {
5041                table_name: "alter_default_test".to_string(),
5042                operations: vec![PlanAlterOperation::AlterColumn {
5043                    name: "value".to_string(),
5044                    data_type: None,
5045                    set_not_null: None,
5046                    set_default: Some(Some(PlanExpression::Literal(PlanLiteral::Integer(42)))),
5047                }],
5048            }),
5049            estimated_cost: 1.0,
5050            estimated_rows: 0,
5051        };
5052        executor.execute(&alter_plan).unwrap();
5053
5054        // Verify the default was set
5055        let schema = executor
5056            .context
5057            .read()
5058            .unwrap()
5059            .get_table_schema("alter_default_test")
5060            .unwrap()
5061            .clone();
5062        assert_eq!(schema.columns[1].default, Some(Value::Integer(42)));
5063
5064        // Alter column to drop the default
5065        let drop_default_plan = QueryPlan {
5066            root: PlanNode::AlterTable(AlterTableNode {
5067                table_name: "alter_default_test".to_string(),
5068                operations: vec![PlanAlterOperation::AlterColumn {
5069                    name: "value".to_string(),
5070                    data_type: None,
5071                    set_not_null: None,
5072                    set_default: Some(None), // Drop default
5073                }],
5074            }),
5075            estimated_cost: 1.0,
5076            estimated_rows: 0,
5077        };
5078        executor.execute(&drop_default_plan).unwrap();
5079
5080        // Verify the default was removed
5081        let schema = executor
5082            .context
5083            .read()
5084            .unwrap()
5085            .get_table_schema("alter_default_test")
5086            .unwrap()
5087            .clone();
5088        assert_eq!(schema.columns[1].default, None);
5089    }
5090
5091    #[test]
5092    fn test_parameterized_query() {
5093        use crate::planner::{
5094            CreateColumnDef, CreateTableNode, FilterNode, InsertNode, InsertPlanSource,
5095        };
5096
5097        let executor = Executor::new(ExecutionContext::new());
5098
5099        // Create a test table
5100        let create_plan = QueryPlan {
5101            root: PlanNode::CreateTable(CreateTableNode {
5102                table_name: "params_test".to_string(),
5103                columns: vec![
5104                    CreateColumnDef {
5105                        name: "id".to_string(),
5106                        data_type: DataType::Integer,
5107                        nullable: false,
5108                        default: None,
5109                        primary_key: true,
5110                        unique: false,
5111                    },
5112                    CreateColumnDef {
5113                        name: "name".to_string(),
5114                        data_type: DataType::Text,
5115                        nullable: true,
5116                        default: None,
5117                        primary_key: false,
5118                        unique: false,
5119                    },
5120                ],
5121                constraints: vec![],
5122                if_not_exists: false,
5123            }),
5124            estimated_cost: 1.0,
5125            estimated_rows: 0,
5126        };
5127        executor.execute(&create_plan).unwrap();
5128
5129        // Insert some data
5130        let insert_plan = QueryPlan {
5131            root: PlanNode::Insert(InsertNode {
5132                table_name: "params_test".to_string(),
5133                columns: vec!["id".to_string(), "name".to_string()],
5134                source: InsertPlanSource::Values(vec![
5135                    vec![
5136                        PlanExpression::Literal(PlanLiteral::Integer(1)),
5137                        PlanExpression::Literal(PlanLiteral::String("Alice".to_string())),
5138                    ],
5139                    vec![
5140                        PlanExpression::Literal(PlanLiteral::Integer(2)),
5141                        PlanExpression::Literal(PlanLiteral::String("Bob".to_string())),
5142                    ],
5143                    vec![
5144                        PlanExpression::Literal(PlanLiteral::Integer(3)),
5145                        PlanExpression::Literal(PlanLiteral::String("Charlie".to_string())),
5146                    ],
5147                ]),
5148            }),
5149            estimated_cost: 1.0,
5150            estimated_rows: 3,
5151        };
5152        executor.execute(&insert_plan).unwrap();
5153
5154        // Create a query with a placeholder: SELECT * FROM params_test WHERE id = $1
5155        let query_plan = QueryPlan {
5156            root: PlanNode::Filter(FilterNode {
5157                input: Box::new(PlanNode::Scan(ScanNode {
5158                    table_name: "params_test".to_string(),
5159                    alias: None,
5160                    columns: vec!["id".to_string(), "name".to_string()],
5161                    index_scan: None,
5162                })),
5163                predicate: PlanExpression::BinaryOp {
5164                    left: Box::new(PlanExpression::Column {
5165                        table: None,
5166                        name: "id".to_string(),
5167                        data_type: DataType::Integer,
5168                    }),
5169                    op: PlanBinaryOp::Equal,
5170                    right: Box::new(PlanExpression::Placeholder(1)),
5171                },
5172            }),
5173            estimated_cost: 10.0,
5174            estimated_rows: 1,
5175        };
5176
5177        // Execute with parameter $1 = 2 (should find Bob)
5178        let result = executor
5179            .execute_with_params(&query_plan, &[Value::Integer(2)])
5180            .unwrap();
5181        assert_eq!(result.rows.len(), 1);
5182        assert_eq!(result.rows[0].values[0], Value::Integer(2));
5183        assert_eq!(result.rows[0].values[1], Value::String("Bob".to_string()));
5184
5185        // Execute with parameter $1 = 1 (should find Alice)
5186        let result = executor
5187            .execute_with_params(&query_plan, &[Value::Integer(1)])
5188            .unwrap();
5189        assert_eq!(result.rows.len(), 1);
5190        assert_eq!(result.rows[0].values[1], Value::String("Alice".to_string()));
5191
5192        // Execute with parameter $1 = 99 (should find nothing)
5193        let result = executor
5194            .execute_with_params(&query_plan, &[Value::Integer(99)])
5195            .unwrap();
5196        assert_eq!(result.rows.len(), 0);
5197    }
5198
5199    #[test]
5200    fn test_parameterized_insert() {
5201        use crate::planner::{CreateColumnDef, CreateTableNode, InsertNode, InsertPlanSource};
5202
5203        let executor = Executor::new(ExecutionContext::new());
5204
5205        // Create a test table
5206        let create_plan = QueryPlan {
5207            root: PlanNode::CreateTable(CreateTableNode {
5208                table_name: "param_insert_test".to_string(),
5209                columns: vec![
5210                    CreateColumnDef {
5211                        name: "id".to_string(),
5212                        data_type: DataType::Integer,
5213                        nullable: false,
5214                        default: None,
5215                        primary_key: true,
5216                        unique: false,
5217                    },
5218                    CreateColumnDef {
5219                        name: "value".to_string(),
5220                        data_type: DataType::Text,
5221                        nullable: true,
5222                        default: None,
5223                        primary_key: false,
5224                        unique: false,
5225                    },
5226                ],
5227                constraints: vec![],
5228                if_not_exists: false,
5229            }),
5230            estimated_cost: 1.0,
5231            estimated_rows: 0,
5232        };
5233        executor.execute(&create_plan).unwrap();
5234
5235        // Create an INSERT with placeholders: INSERT INTO param_insert_test (id, value) VALUES ($1, $2)
5236        let insert_plan = QueryPlan {
5237            root: PlanNode::Insert(InsertNode {
5238                table_name: "param_insert_test".to_string(),
5239                columns: vec!["id".to_string(), "value".to_string()],
5240                source: InsertPlanSource::Values(vec![vec![
5241                    PlanExpression::Placeholder(1),
5242                    PlanExpression::Placeholder(2),
5243                ]]),
5244            }),
5245            estimated_cost: 1.0,
5246            estimated_rows: 1,
5247        };
5248
5249        // Execute insert with parameters
5250        executor
5251            .execute_with_params(
5252                &insert_plan,
5253                &[Value::Integer(42), Value::String("test value".to_string())],
5254            )
5255            .unwrap();
5256
5257        // Verify the data was inserted
5258        let context = executor.context.read().unwrap();
5259        let table = context.get_table("param_insert_test").unwrap();
5260        let table_data = table.read().unwrap();
5261
5262        assert_eq!(table_data.rows.len(), 1);
5263        assert_eq!(table_data.rows[0].values[0], Value::Integer(42));
5264        assert_eq!(
5265            table_data.rows[0].values[1],
5266            Value::String("test value".to_string())
5267        );
5268    }
5269
5270    // =========================================================================
5271    // Set Operation Tests
5272    // =========================================================================
5273
5274    /// Helper: create a context with two tables sharing an "id" column for set op tests.
5275    fn create_set_op_context() -> ExecutionContext {
5276        let mut context = ExecutionContext::new();
5277
5278        context.add_table(TableData {
5279            name: "t1".to_string(),
5280            columns: vec!["id".to_string(), "val".to_string()],
5281            rows: vec![
5282                Row {
5283                    values: vec![Value::Integer(1), Value::String("a".to_string())],
5284                },
5285                Row {
5286                    values: vec![Value::Integer(2), Value::String("b".to_string())],
5287                },
5288                Row {
5289                    values: vec![Value::Integer(3), Value::String("c".to_string())],
5290                },
5291            ],
5292            row_created_version: vec![0, 0, 0],
5293            row_deleted_version: vec![0, 0, 0],
5294        });
5295
5296        context.add_table(TableData {
5297            name: "t2".to_string(),
5298            columns: vec!["id".to_string(), "val".to_string()],
5299            rows: vec![
5300                Row {
5301                    values: vec![Value::Integer(2), Value::String("b".to_string())],
5302                },
5303                Row {
5304                    values: vec![Value::Integer(3), Value::String("c".to_string())],
5305                },
5306                Row {
5307                    values: vec![Value::Integer(4), Value::String("d".to_string())],
5308                },
5309            ],
5310            row_created_version: vec![0, 0, 0],
5311            row_deleted_version: vec![0, 0, 0],
5312        });
5313
5314        context
5315    }
5316
5317    /// Build a SetOperation plan from two table scans with projections.
5318    fn make_set_op_plan(op: SetOperationType) -> QueryPlan {
5319        use crate::planner::SetOperationNode;
5320
5321        let left = PlanNode::Project(ProjectNode {
5322            input: Box::new(PlanNode::Scan(ScanNode {
5323                table_name: "t1".to_string(),
5324                alias: None,
5325                columns: vec!["id".to_string(), "val".to_string()],
5326                index_scan: None,
5327            })),
5328            expressions: vec![
5329                ProjectionExpr {
5330                    expr: PlanExpression::Column {
5331                        table: None,
5332                        name: "id".to_string(),
5333                        data_type: DataType::Integer,
5334                    },
5335                    alias: Some("id".to_string()),
5336                },
5337                ProjectionExpr {
5338                    expr: PlanExpression::Column {
5339                        table: None,
5340                        name: "val".to_string(),
5341                        data_type: DataType::Text,
5342                    },
5343                    alias: Some("val".to_string()),
5344                },
5345            ],
5346        });
5347
5348        let right = PlanNode::Project(ProjectNode {
5349            input: Box::new(PlanNode::Scan(ScanNode {
5350                table_name: "t2".to_string(),
5351                alias: None,
5352                columns: vec!["id".to_string(), "val".to_string()],
5353                index_scan: None,
5354            })),
5355            expressions: vec![
5356                ProjectionExpr {
5357                    expr: PlanExpression::Column {
5358                        table: None,
5359                        name: "id".to_string(),
5360                        data_type: DataType::Integer,
5361                    },
5362                    alias: Some("id".to_string()),
5363                },
5364                ProjectionExpr {
5365                    expr: PlanExpression::Column {
5366                        table: None,
5367                        name: "val".to_string(),
5368                        data_type: DataType::Text,
5369                    },
5370                    alias: Some("val".to_string()),
5371                },
5372            ],
5373        });
5374
5375        QueryPlan {
5376            root: PlanNode::SetOperation(SetOperationNode {
5377                op,
5378                left: Box::new(left),
5379                right: Box::new(right),
5380            }),
5381            estimated_cost: 10.0,
5382            estimated_rows: 6,
5383        }
5384    }
5385
5386    #[test]
5387    fn test_union_all() {
5388        let context = create_set_op_context();
5389        let executor = Executor::new(context);
5390        let plan = make_set_op_plan(crate::ast::SetOperationType::UnionAll);
5391
5392        let result = executor.execute(&plan).unwrap();
5393
5394        // UNION ALL: 3 + 3 = 6 rows, no dedup
5395        assert_eq!(result.rows.len(), 6);
5396        assert_eq!(result.columns, vec!["id".to_string(), "val".to_string()]);
5397    }
5398
5399    #[test]
5400    fn test_union() {
5401        let context = create_set_op_context();
5402        let executor = Executor::new(context);
5403        let plan = make_set_op_plan(crate::ast::SetOperationType::Union);
5404
5405        let result = executor.execute(&plan).unwrap();
5406
5407        // UNION: {1,a}, {2,b}, {3,c}, {4,d} = 4 unique rows
5408        assert_eq!(result.rows.len(), 4);
5409        // Check the IDs are 1, 2, 3, 4
5410        let mut ids: Vec<i64> = result
5411            .rows
5412            .iter()
5413            .map(|r| match &r.values[0] {
5414                Value::Integer(i) => *i,
5415                _ => panic!("Expected int"),
5416            })
5417            .collect();
5418        ids.sort();
5419        assert_eq!(ids, vec![1, 2, 3, 4]);
5420    }
5421
5422    #[test]
5423    fn test_intersect() {
5424        let context = create_set_op_context();
5425        let executor = Executor::new(context);
5426        let plan = make_set_op_plan(crate::ast::SetOperationType::Intersect);
5427
5428        let result = executor.execute(&plan).unwrap();
5429
5430        // INTERSECT: {2,b}, {3,c} = 2 rows
5431        assert_eq!(result.rows.len(), 2);
5432        let mut ids: Vec<i64> = result
5433            .rows
5434            .iter()
5435            .map(|r| match &r.values[0] {
5436                Value::Integer(i) => *i,
5437                _ => panic!("Expected int"),
5438            })
5439            .collect();
5440        ids.sort();
5441        assert_eq!(ids, vec![2, 3]);
5442    }
5443
5444    #[test]
5445    fn test_except() {
5446        let context = create_set_op_context();
5447        let executor = Executor::new(context);
5448        let plan = make_set_op_plan(crate::ast::SetOperationType::Except);
5449
5450        let result = executor.execute(&plan).unwrap();
5451
5452        // EXCEPT: {1,a} only (in t1 but not in t2)
5453        assert_eq!(result.rows.len(), 1);
5454        assert_eq!(result.rows[0].values[0], Value::Integer(1));
5455        assert_eq!(result.rows[0].values[1], Value::String("a".to_string()));
5456    }
5457}