1use crate::planner::{
16 AggregateExpr, AggregateFunction, AggregateNode, AlterTableNode, CreateIndexNode,
17 CreateTableConstraint, CreateTableNode, DeleteNode, DropIndexNode, DropTableNode, FilterNode,
18 InsertNode, InsertPlanSource, JoinNode, JoinStrategy, LimitNode, PlanAlterOperation,
19 PlanBinaryOp, PlanExpression, PlanJoinType, PlanLiteral, PlanNode, PlanUnaryOp, ProjectNode,
20 ProjectionExpr, QueryPlan, ScanNode, SortKey, SortNode, UpdateNode,
21};
22use crate::index::{TableIndexManager, IndexType, IndexKey, IndexError};
23use aegis_common::{DataType, Row, Value};
24use std::collections::HashMap;
25use std::sync::{Arc, RwLock};
26use thiserror::Error;
27
28#[derive(Debug, Error)]
33pub enum ExecutorError {
34 #[error("Table not found: {0}")]
35 TableNotFound(String),
36
37 #[error("Column not found: {0}")]
38 ColumnNotFound(String),
39
40 #[error("Type mismatch: expected {expected}, got {actual}")]
41 TypeMismatch { expected: String, actual: String },
42
43 #[error("Division by zero")]
44 DivisionByZero,
45
46 #[error("Invalid operation: {0}")]
47 InvalidOperation(String),
48
49 #[error("Execution error: {0}")]
50 Internal(String),
51
52 #[error("Index error: {0}")]
53 IndexError(String),
54}
55
56impl From<IndexError> for ExecutorError {
57 fn from(e: IndexError) -> Self {
58 ExecutorError::IndexError(e.to_string())
59 }
60}
61
62pub type ExecutorResult<T> = Result<T, ExecutorError>;
63
64pub struct ExecutionContext {
70 tables: HashMap<String, Arc<RwLock<TableData>>>,
71 table_schemas: HashMap<String, TableSchema>,
72 indexes: HashMap<String, Vec<IndexSchema>>,
74 table_indexes: HashMap<String, Arc<TableIndexManager>>,
76 batch_size: usize,
77}
78
79#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
81pub struct TableData {
82 pub name: String,
83 pub columns: Vec<String>,
84 pub rows: Vec<Row>,
85}
86
87#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
89pub struct StoredConstraint {
90 pub name: String,
91 pub constraint_type: StoredConstraintType,
92}
93
94#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
96pub enum StoredConstraintType {
97 PrimaryKey { columns: Vec<String> },
98 Unique { columns: Vec<String> },
99 ForeignKey {
100 columns: Vec<String>,
101 ref_table: String,
102 ref_columns: Vec<String>,
103 },
104 Check { expression_text: String },
105}
106
107#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
109pub struct TableSchema {
110 pub name: String,
111 pub columns: Vec<ColumnSchema>,
112 pub primary_key: Option<Vec<String>>,
113 pub constraints: Vec<StoredConstraint>,
114}
115
116#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
118pub struct ColumnSchema {
119 pub name: String,
120 pub data_type: DataType,
121 pub nullable: bool,
122 pub default: Option<Value>,
123}
124
125#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
127pub struct IndexSchema {
128 pub name: String,
129 pub table: String,
130 pub columns: Vec<String>,
131 pub unique: bool,
132}
133
134#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
136pub struct ExecutionContextSnapshot {
137 pub tables: Vec<TableData>,
138 pub schemas: Vec<TableSchema>,
139 pub indexes: HashMap<String, Vec<IndexSchema>>,
140}
141
142impl ExecutionContext {
143 pub fn new() -> Self {
144 Self {
145 tables: HashMap::new(),
146 table_schemas: HashMap::new(),
147 indexes: HashMap::new(),
148 table_indexes: HashMap::new(),
149 batch_size: 1024,
150 }
151 }
152
153 pub fn with_batch_size(mut self, size: usize) -> Self {
154 self.batch_size = size;
155 self
156 }
157
158 pub fn add_table(&mut self, table: TableData) {
159 self.tables.insert(table.name.clone(), Arc::new(RwLock::new(table)));
160 }
161
162 pub fn get_table(&self, name: &str) -> Option<Arc<RwLock<TableData>>> {
163 self.tables.get(name).cloned()
164 }
165
166 pub fn batch_size(&self) -> usize {
167 self.batch_size
168 }
169
170 pub fn create_table(
176 &mut self,
177 name: String,
178 columns: Vec<ColumnSchema>,
179 primary_key: Option<Vec<String>>,
180 constraints: Vec<StoredConstraint>,
181 if_not_exists: bool,
182 ) -> ExecutorResult<()> {
183 if self.tables.contains_key(&name) {
184 if if_not_exists {
185 return Ok(());
186 }
187 return Err(ExecutorError::InvalidOperation(format!(
188 "Table '{}' already exists",
189 name
190 )));
191 }
192
193 let column_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
194
195 let schema = TableSchema {
197 name: name.clone(),
198 columns,
199 primary_key,
200 constraints,
201 };
202 self.table_schemas.insert(name.clone(), schema);
203
204 let table_data = TableData {
206 name: name.clone(),
207 columns: column_names,
208 rows: Vec::new(),
209 };
210 self.tables.insert(name, Arc::new(RwLock::new(table_data)));
211
212 Ok(())
213 }
214
215 pub fn drop_table(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
217 if !self.tables.contains_key(name) {
218 if if_exists {
219 return Ok(());
220 }
221 return Err(ExecutorError::TableNotFound(name.to_string()));
222 }
223
224 self.tables.remove(name);
225 self.table_schemas.remove(name);
226 self.indexes.remove(name);
227
228 Ok(())
229 }
230
231 pub fn alter_table(&mut self, name: &str, op: &PlanAlterOperation) -> ExecutorResult<()> {
233 let schema = self.table_schemas.get_mut(name)
234 .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
235
236 match op {
237 PlanAlterOperation::AddColumn(col) => {
238 if schema.columns.iter().any(|c| c.name == col.name) {
240 return Err(ExecutorError::InvalidOperation(format!(
241 "Column '{}' already exists in table '{}'",
242 col.name, name
243 )));
244 }
245
246 let default = col.default.as_ref()
248 .map(evaluate_default_expression)
249 .transpose()?;
250
251 schema.columns.push(ColumnSchema {
252 name: col.name.clone(),
253 data_type: col.data_type.clone(),
254 nullable: col.nullable,
255 default: default.clone(),
256 });
257
258 let fill_value = default.unwrap_or(Value::Null);
260 if let Some(table_data) = self.tables.get(name) {
261 let mut table = table_data.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
262 for row in table.rows.iter_mut() {
263 row.values.push(fill_value.clone());
264 }
265 }
266 }
267 PlanAlterOperation::DropColumn { name: col_name, if_exists } => {
268 let col_idx = schema.columns.iter().position(|c| c.name == *col_name);
269 match col_idx {
270 Some(idx) => {
271 schema.columns.remove(idx);
272 if let Some(table_data) = self.tables.get(name) {
274 let mut table = table_data.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
275 for row in table.rows.iter_mut() {
276 if idx < row.values.len() {
277 row.values.remove(idx);
278 }
279 }
280 }
281 }
282 None if *if_exists => {}
283 None => {
284 return Err(ExecutorError::ColumnNotFound(col_name.clone()));
285 }
286 }
287 }
288 PlanAlterOperation::RenameColumn { old_name, new_name } => {
289 let col = schema.columns.iter_mut().find(|c| c.name == *old_name)
290 .ok_or_else(|| ExecutorError::ColumnNotFound(old_name.clone()))?;
291 col.name = new_name.clone();
292 }
293 PlanAlterOperation::AlterColumn { name: col_name, data_type, set_not_null, set_default } => {
294 let col = schema.columns.iter_mut().find(|c| c.name == *col_name)
295 .ok_or_else(|| ExecutorError::ColumnNotFound(col_name.clone()))?;
296
297 if let Some(dt) = data_type {
298 col.data_type = dt.clone();
299 }
300 if let Some(not_null) = set_not_null {
301 col.nullable = !not_null;
302 }
303 if let Some(new_default) = set_default {
305 col.default = new_default.as_ref()
306 .map(evaluate_default_expression)
307 .transpose()?;
308 }
309 }
310 PlanAlterOperation::RenameTable { new_name } => {
311 if let Some(rows) = self.tables.remove(name) {
313 self.tables.insert(new_name.clone(), rows);
314 }
315 if let Some(mut schema) = self.table_schemas.remove(name) {
316 schema.name = new_name.clone();
317 self.table_schemas.insert(new_name.clone(), schema);
318 }
319 if let Some(indexes) = self.indexes.remove(name) {
320 self.indexes.insert(new_name.clone(), indexes);
321 }
322 return Ok(());
324 }
325 PlanAlterOperation::AddConstraint(constraint) => {
326 if let CreateTableConstraint::ForeignKey { ref_table, .. } = constraint {
328 if !self.table_schemas.contains_key(ref_table) {
329 return Err(ExecutorError::TableNotFound(ref_table.clone()));
330 }
331 }
332
333 let schema = self.table_schemas.get_mut(name)
335 .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
336
337 let constraint_count = schema.constraints.len() + 1;
338 let (name_suffix, constraint_type) = match constraint {
339 CreateTableConstraint::PrimaryKey { columns } => {
340 if schema.primary_key.is_some() {
342 return Err(ExecutorError::InvalidOperation(
343 format!("Table '{}' already has a primary key", name)
344 ));
345 }
346 schema.primary_key = Some(columns.clone());
347 ("pk", StoredConstraintType::PrimaryKey { columns: columns.clone() })
348 }
349 CreateTableConstraint::Unique { columns } => {
350 for col in columns {
352 if !schema.columns.iter().any(|c| c.name == *col) {
353 return Err(ExecutorError::ColumnNotFound(col.clone()));
354 }
355 }
356 ("uq", StoredConstraintType::Unique { columns: columns.clone() })
357 }
358 CreateTableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
359 for col in columns {
361 if !schema.columns.iter().any(|c| c.name == *col) {
362 return Err(ExecutorError::ColumnNotFound(col.clone()));
363 }
364 }
365 ("fk", StoredConstraintType::ForeignKey {
367 columns: columns.clone(),
368 ref_table: ref_table.clone(),
369 ref_columns: ref_columns.clone(),
370 })
371 }
372 CreateTableConstraint::Check { expression } => {
373 ("ck", StoredConstraintType::Check {
374 expression_text: format!("{:?}", expression),
375 })
376 }
377 };
378 schema.constraints.push(StoredConstraint {
379 name: format!("{}_{}{}", name, name_suffix, constraint_count),
380 constraint_type,
381 });
382 return Ok(());
383 }
384 PlanAlterOperation::DropConstraint { name: constraint_name } => {
385 let pos = schema.constraints.iter().position(|c| c.name == *constraint_name);
386 match pos {
387 Some(idx) => {
388 let removed = schema.constraints.remove(idx);
389 if matches!(removed.constraint_type, StoredConstraintType::PrimaryKey { .. }) {
391 schema.primary_key = None;
392 }
393 }
394 None => {
395 return Err(ExecutorError::InvalidOperation(
396 format!("Constraint '{}' not found on table '{}'", constraint_name, name)
397 ));
398 }
399 }
400 }
401 }
402
403 Ok(())
404 }
405
406 pub fn create_index(
408 &mut self,
409 name: String,
410 table: String,
411 columns: Vec<String>,
412 unique: bool,
413 if_not_exists: bool,
414 ) -> ExecutorResult<()> {
415 let table_data = self.tables.get(&table)
416 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?
417 .clone();
418
419 let indexes = self.indexes.entry(table.clone()).or_default();
420
421 if indexes.iter().any(|idx| idx.name == name) {
423 if if_not_exists {
424 return Ok(());
425 }
426 return Err(ExecutorError::InvalidOperation(format!(
427 "Index '{}' already exists",
428 name
429 )));
430 }
431
432 indexes.push(IndexSchema {
434 name: name.clone(),
435 table: table.clone(),
436 columns: columns.clone(),
437 unique,
438 });
439
440 let index_manager = self.table_indexes.entry(table.clone())
442 .or_insert_with(|| Arc::new(TableIndexManager::new(table.clone())));
443
444 Arc::get_mut(index_manager)
446 .ok_or_else(|| ExecutorError::Internal("Cannot modify shared index manager".to_string()))?
447 .create_index(name.clone(), columns.clone(), unique, IndexType::BTree)?;
448
449 let table_guard = table_data.read()
451 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
452
453 let index_manager = self.table_indexes.get(&table)
454 .expect("index_manager was just inserted for this table");
455 for (row_id, row) in table_guard.rows.iter().enumerate() {
456 let column_values: HashMap<String, Value> = table_guard.columns.iter()
457 .zip(row.values.iter())
458 .map(|(col, val)| (col.clone(), val.clone()))
459 .collect();
460
461 let key_values: Vec<crate::index::IndexValue> = columns.iter()
463 .map(|col| {
464 column_values.get(col)
465 .map(IndexKey::from_value)
466 .unwrap_or(crate::index::IndexValue::Null)
467 })
468 .collect();
469 let key = IndexKey::new(key_values);
470
471 index_manager.insert_into_index(&name, key, row_id)?;
473 }
474
475 Ok(())
476 }
477
478 pub fn drop_index(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
480 let mut found = false;
481 let mut table_name = None;
482
483 for (tbl, indexes) in self.indexes.iter_mut() {
485 if let Some(pos) = indexes.iter().position(|idx| idx.name == name) {
486 indexes.remove(pos);
487 table_name = Some(tbl.clone());
488 found = true;
489 break;
490 }
491 }
492
493 if let Some(tbl) = table_name {
495 if let Some(manager) = self.table_indexes.get_mut(&tbl) {
496 if let Some(m) = Arc::get_mut(manager) {
497 let _ = m.drop_index(name);
498 }
499 }
500 }
501
502 if !found && !if_exists {
503 return Err(ExecutorError::InvalidOperation(format!(
504 "Index '{}' not found",
505 name
506 )));
507 }
508
509 Ok(())
510 }
511
512 pub fn get_index_manager(&self, table: &str) -> Option<Arc<TableIndexManager>> {
514 self.table_indexes.get(table).cloned()
515 }
516
517 pub fn get_indexes(&self, table: &str) -> Option<&Vec<IndexSchema>> {
519 self.indexes.get(table)
520 }
521
522 pub fn get_table_schema(&self, name: &str) -> Option<&TableSchema> {
524 self.table_schemas.get(name)
525 }
526
527 pub fn list_tables(&self) -> Vec<String> {
529 self.tables.keys().cloned().collect()
530 }
531
532 pub fn to_snapshot(&self) -> ExecutionContextSnapshot {
538 let tables: Vec<TableData> = self.tables.values()
539 .filter_map(|t| t.read().ok().map(|t| t.clone()))
540 .collect();
541
542 ExecutionContextSnapshot {
543 tables,
544 schemas: self.table_schemas.values().cloned().collect(),
545 indexes: self.indexes.clone(),
546 }
547 }
548
549 pub fn from_snapshot(snapshot: ExecutionContextSnapshot) -> Self {
551 let mut ctx = Self::new();
552
553 for schema in snapshot.schemas {
555 ctx.table_schemas.insert(schema.name.clone(), schema);
556 }
557
558 for table in snapshot.tables {
560 ctx.tables.insert(table.name.clone(), Arc::new(RwLock::new(table)));
561 }
562
563 ctx.indexes = snapshot.indexes.clone();
565
566 for (table_name, index_schemas) in snapshot.indexes {
568 for index_schema in index_schemas {
569 let _ = ctx.rebuild_index(
571 &index_schema.name,
572 &table_name,
573 &index_schema.columns,
574 index_schema.unique,
575 );
576 }
577 }
578
579 ctx
580 }
581
582 fn rebuild_index(
584 &mut self,
585 name: &str,
586 table: &str,
587 columns: &[String],
588 unique: bool,
589 ) -> ExecutorResult<()> {
590 let table_data = self.tables.get(table)
591 .ok_or_else(|| ExecutorError::TableNotFound(table.to_string()))?
592 .clone();
593
594 let index_manager = self.table_indexes.entry(table.to_string())
596 .or_insert_with(|| Arc::new(TableIndexManager::new(table.to_string())));
597
598 if let Some(m) = Arc::get_mut(index_manager) {
600 m.create_index(name.to_string(), columns.to_vec(), unique, IndexType::BTree)?;
601 } else {
602 return Err(ExecutorError::Internal("Cannot modify shared index manager".to_string()));
603 }
604
605 let table_guard = table_data.read()
607 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
608
609 let index_manager = self.table_indexes.get(table)
610 .expect("index_manager was just inserted for this table");
611 for (row_id, row) in table_guard.rows.iter().enumerate() {
612 let column_values: HashMap<String, Value> = table_guard.columns.iter()
613 .zip(row.values.iter())
614 .map(|(col, val)| (col.clone(), val.clone()))
615 .collect();
616
617 let key_values: Vec<crate::index::IndexValue> = columns.iter()
618 .map(|col| {
619 column_values.get(col)
620 .map(IndexKey::from_value)
621 .unwrap_or(crate::index::IndexValue::Null)
622 })
623 .collect();
624 let key = IndexKey::new(key_values);
625
626 index_manager.insert_into_index(name, key, row_id)?;
627 }
628
629 Ok(())
630 }
631
632 pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
634 let snapshot = self.to_snapshot();
635 let json = serde_json::to_string_pretty(&snapshot)
636 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
637 std::fs::write(path, json)
638 }
639
640 pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
642 let json = std::fs::read_to_string(path)?;
643 let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
644 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
645 Ok(Self::from_snapshot(snapshot))
646 }
647
648 pub fn merge_from_file(&mut self, path: &std::path::Path) -> std::io::Result<()> {
650 let json = std::fs::read_to_string(path)?;
651 let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
652 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
653
654 for schema in snapshot.schemas {
656 self.table_schemas.insert(schema.name.clone(), schema);
657 }
658
659 for table in snapshot.tables {
661 self.tables.insert(table.name.clone(), Arc::new(RwLock::new(table)));
662 }
663
664 for (table, idxs) in snapshot.indexes {
666 self.indexes.insert(table, idxs);
667 }
668
669 Ok(())
670 }
671
672 pub fn insert_rows(
678 &self,
679 table_name: &str,
680 columns: &[String],
681 rows: Vec<Vec<Value>>,
682 ) -> ExecutorResult<u64> {
683 let table = self.get_table(table_name)
684 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
685
686 let mut table_data = table.write()
687 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
688
689 let schema = self.table_schemas.get(table_name)
691 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
692
693 let target_columns: Vec<String> = if columns.is_empty() {
695 table_data.columns.clone()
696 } else {
697 columns.to_vec()
698 };
699
700 let mut column_indices: Vec<usize> = Vec::new();
702 for col in &target_columns {
703 let idx = table_data.columns.iter().position(|c| c == col)
704 .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
705 column_indices.push(idx);
706 }
707
708 let mut inserted = 0u64;
709
710 let defaults: Vec<Value> = schema.columns.iter()
712 .map(|col| col.default.clone().unwrap_or(Value::Null))
713 .collect();
714
715 for row_values in rows {
716 let mut new_row: Vec<Value> = defaults.clone();
718
719 for (i, &col_idx) in column_indices.iter().enumerate() {
721 if let Some(value) = row_values.get(i) {
722 new_row[col_idx] = value.clone();
723 }
724 }
725
726 table_data.rows.push(Row { values: new_row });
727 inserted += 1;
728 }
729
730 Ok(inserted)
731 }
732
733 pub fn update_rows(
735 &self,
736 table_name: &str,
737 assignments: &[(String, Value)],
738 predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
739 ) -> ExecutorResult<u64> {
740 let table = self.get_table(table_name)
741 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
742
743 let mut table_data = table.write()
744 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
745 let columns = table_data.columns.clone();
746
747 let mut assignment_indices: Vec<(usize, Value)> = Vec::new();
749 for (col, val) in assignments {
750 let idx = columns.iter().position(|c| c == col)
751 .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
752 assignment_indices.push((idx, val.clone()));
753 }
754
755 let mut updated = 0u64;
756
757 for row in &mut table_data.rows {
758 let should_update = predicate.map(|p| p(row, &columns)).unwrap_or(true);
759 if should_update {
760 for (col_idx, value) in &assignment_indices {
761 row.values[*col_idx] = value.clone();
762 }
763 updated += 1;
764 }
765 }
766
767 Ok(updated)
768 }
769
770 pub fn delete_rows(
772 &self,
773 table_name: &str,
774 predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
775 ) -> ExecutorResult<u64> {
776 let table = self.get_table(table_name)
777 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
778
779 let mut table_data = table.write()
780 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
781 let columns = table_data.columns.clone();
782
783 let original_len = table_data.rows.len();
784
785 if let Some(pred) = predicate {
786 table_data.rows.retain(|row| !pred(row, &columns));
787 } else {
788 table_data.rows.clear();
789 }
790
791 Ok((original_len - table_data.rows.len()) as u64)
792 }
793}
794
795impl Default for ExecutionContext {
796 fn default() -> Self {
797 Self::new()
798 }
799}
800
801#[derive(Debug, Clone)]
807pub struct ResultBatch {
808 pub columns: Vec<String>,
809 pub rows: Vec<Row>,
810}
811
812impl ResultBatch {
813 pub fn new(columns: Vec<String>) -> Self {
814 Self {
815 columns,
816 rows: Vec::new(),
817 }
818 }
819
820 pub fn with_rows(columns: Vec<String>, rows: Vec<Row>) -> Self {
821 Self { columns, rows }
822 }
823
824 pub fn is_empty(&self) -> bool {
825 self.rows.is_empty()
826 }
827
828 pub fn len(&self) -> usize {
829 self.rows.len()
830 }
831}
832
833#[derive(Debug, Clone)]
839pub struct QueryResult {
840 pub columns: Vec<String>,
841 pub rows: Vec<Row>,
842 pub rows_affected: u64,
843}
844
845impl QueryResult {
846 pub fn new(columns: Vec<String>, rows: Vec<Row>) -> Self {
847 let rows_affected = rows.len() as u64;
848 Self {
849 columns,
850 rows,
851 rows_affected,
852 }
853 }
854
855 pub fn empty() -> Self {
856 Self {
857 columns: Vec::new(),
858 rows: Vec::new(),
859 rows_affected: 0,
860 }
861 }
862}
863
864pub struct Executor {
870 context: Arc<RwLock<ExecutionContext>>,
871}
872
873impl Executor {
874 pub fn new(context: ExecutionContext) -> Self {
875 Self { context: Arc::new(RwLock::new(context)) }
876 }
877
878 pub fn with_shared_context(context: Arc<RwLock<ExecutionContext>>) -> Self {
880 Self { context }
881 }
882
883 pub fn execute(&self, plan: &QueryPlan) -> ExecutorResult<QueryResult> {
885 self.execute_internal(&plan.root)
886 }
887
888 pub fn execute_with_params(&self, plan: &QueryPlan, params: &[Value]) -> ExecutorResult<QueryResult> {
891 let bound_plan = substitute_parameters(&plan.root, params)?;
893 self.execute_internal(&bound_plan)
894 }
895
896 fn execute_internal(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
897 match root {
898 PlanNode::CreateTable(node) => self.execute_create_table(node),
900 PlanNode::DropTable(node) => self.execute_drop_table(node),
901 PlanNode::AlterTable(node) => self.execute_alter_table(node),
902 PlanNode::CreateIndex(node) => self.execute_create_index(node),
903 PlanNode::DropIndex(node) => self.execute_drop_index(node),
904
905 PlanNode::Insert(node) => self.execute_insert(node),
907 PlanNode::Update(node) => self.execute_update(node),
908 PlanNode::Delete(node) => self.execute_delete(node),
909
910 _ => self.execute_query(root),
912 }
913 }
914
915 fn execute_query(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
917 let context = self.context.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
918 let mut operator = self.create_operator(root, &context)?;
919 let mut all_rows = Vec::new();
920 let mut columns = Vec::new();
921
922 while let Some(batch) = operator.next_batch()? {
923 if columns.is_empty() {
924 columns = batch.columns.clone();
925 }
926 all_rows.extend(batch.rows);
927 }
928
929 Ok(QueryResult::new(columns, all_rows))
930 }
931
932 fn execute_create_table(&self, node: &CreateTableNode) -> ExecutorResult<QueryResult> {
934 let columns: Vec<ColumnSchema> = node.columns.iter().map(|col| {
935 let default = col.default.as_ref()
936 .map(evaluate_default_expression)
937 .transpose()?;
938 Ok(ColumnSchema {
939 name: col.name.clone(),
940 data_type: col.data_type.clone(),
941 nullable: col.nullable,
942 default,
943 })
944 }).collect::<ExecutorResult<Vec<_>>>()?;
945
946 let primary_key = node.constraints.iter()
948 .find_map(|c| {
949 if let CreateTableConstraint::PrimaryKey { columns } = c {
950 Some(columns.clone())
951 } else {
952 None
953 }
954 })
955 .or_else(|| {
956 let pk_cols: Vec<String> = node.columns.iter()
958 .filter(|c| c.primary_key)
959 .map(|c| c.name.clone())
960 .collect();
961 if pk_cols.is_empty() { None } else { Some(pk_cols) }
962 });
963
964 let mut stored_constraints = Vec::new();
966 let mut constraint_counter = 0;
967 for c in &node.constraints {
968 constraint_counter += 1;
969 let (name_suffix, constraint_type) = match c {
970 CreateTableConstraint::PrimaryKey { columns } => {
971 ("pk", StoredConstraintType::PrimaryKey { columns: columns.clone() })
972 }
973 CreateTableConstraint::Unique { columns } => {
974 ("uq", StoredConstraintType::Unique { columns: columns.clone() })
975 }
976 CreateTableConstraint::ForeignKey { columns, ref_table, ref_columns } => {
977 ("fk", StoredConstraintType::ForeignKey {
978 columns: columns.clone(),
979 ref_table: ref_table.clone(),
980 ref_columns: ref_columns.clone(),
981 })
982 }
983 CreateTableConstraint::Check { expression } => {
984 ("ck", StoredConstraintType::Check {
985 expression_text: format!("{:?}", expression),
986 })
987 }
988 };
989 stored_constraints.push(StoredConstraint {
990 name: format!("{}_{}{}", node.table_name, name_suffix, constraint_counter),
991 constraint_type,
992 });
993 }
994
995 for col in &node.columns {
997 if col.unique {
998 constraint_counter += 1;
999 stored_constraints.push(StoredConstraint {
1000 name: format!("{}_{}_uq{}", node.table_name, col.name, constraint_counter),
1001 constraint_type: StoredConstraintType::Unique { columns: vec![col.name.clone()] },
1002 });
1003 }
1004 }
1005
1006 self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.create_table(
1007 node.table_name.clone(),
1008 columns,
1009 primary_key,
1010 stored_constraints,
1011 node.if_not_exists,
1012 )?;
1013
1014 Ok(QueryResult {
1015 columns: vec!["result".to_string()],
1016 rows: vec![Row { values: vec![Value::String(format!("Table '{}' created", node.table_name))] }],
1017 rows_affected: 0,
1018 })
1019 }
1020
1021 fn execute_drop_table(&self, node: &DropTableNode) -> ExecutorResult<QueryResult> {
1023 self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.drop_table(&node.table_name, node.if_exists)?;
1024
1025 Ok(QueryResult {
1026 columns: vec!["result".to_string()],
1027 rows: vec![Row { values: vec![Value::String(format!("Table '{}' dropped", node.table_name))] }],
1028 rows_affected: 0,
1029 })
1030 }
1031
1032 fn execute_alter_table(&self, node: &AlterTableNode) -> ExecutorResult<QueryResult> {
1034 let mut ctx = self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1035
1036 for op in &node.operations {
1037 ctx.alter_table(&node.table_name, op)?;
1038 }
1039
1040 let op_count = node.operations.len();
1041 let msg = if op_count == 1 {
1042 format!("Table '{}' altered", node.table_name)
1043 } else {
1044 format!("Table '{}' altered ({} operations)", node.table_name, op_count)
1045 };
1046
1047 Ok(QueryResult {
1048 columns: vec!["result".to_string()],
1049 rows: vec![Row { values: vec![Value::String(msg)] }],
1050 rows_affected: 0,
1051 })
1052 }
1053
1054 fn execute_create_index(&self, node: &CreateIndexNode) -> ExecutorResult<QueryResult> {
1056 self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.create_index(
1057 node.index_name.clone(),
1058 node.table_name.clone(),
1059 node.columns.clone(),
1060 node.unique,
1061 node.if_not_exists,
1062 )?;
1063
1064 Ok(QueryResult {
1065 columns: vec!["result".to_string()],
1066 rows: vec![Row { values: vec![Value::String(format!("Index '{}' created", node.index_name))] }],
1067 rows_affected: 0,
1068 })
1069 }
1070
1071 fn execute_drop_index(&self, node: &DropIndexNode) -> ExecutorResult<QueryResult> {
1073 self.context.write().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?.drop_index(&node.index_name, node.if_exists)?;
1074
1075 Ok(QueryResult {
1076 columns: vec!["result".to_string()],
1077 rows: vec![Row { values: vec![Value::String(format!("Index '{}' dropped", node.index_name))] }],
1078 rows_affected: 0,
1079 })
1080 }
1081
1082 fn execute_insert(&self, node: &InsertNode) -> ExecutorResult<QueryResult> {
1084 let rows: Vec<Vec<Value>> = match &node.source {
1085 InsertPlanSource::Values(values) => {
1086 let empty_row = Row { values: vec![] };
1088 let empty_columns: Vec<String> = vec![];
1089
1090 values.iter()
1091 .map(|row_exprs| {
1092 row_exprs.iter()
1093 .map(|expr| evaluate_expression(expr, &empty_row, &empty_columns))
1094 .collect::<ExecutorResult<Vec<_>>>()
1095 })
1096 .collect::<ExecutorResult<Vec<_>>>()?
1097 }
1098 InsertPlanSource::Query(subquery) => {
1099 let context = self.context.read()
1101 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1102 let mut operator = self.create_operator(subquery, &context)?;
1103 let mut all_rows = Vec::new();
1104
1105 while let Some(batch) = operator.next_batch()? {
1106 for row in batch.rows {
1107 all_rows.push(row.values);
1108 }
1109 }
1110
1111 all_rows
1112 }
1113 };
1114
1115 let inserted = self.context.read()
1116 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1117 .insert_rows(&node.table_name, &node.columns, rows)?;
1118
1119 Ok(QueryResult {
1120 columns: vec!["rows_affected".to_string()],
1121 rows: vec![Row { values: vec![Value::Integer(inserted as i64)] }],
1122 rows_affected: inserted,
1123 })
1124 }
1125
1126 fn execute_update(&self, node: &UpdateNode) -> ExecutorResult<QueryResult> {
1128 let context = self.context.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1129
1130 let table = context.get_table(&node.table_name)
1132 .ok_or_else(|| ExecutorError::TableNotFound(node.table_name.clone()))?;
1133 let table_data = table.read()
1134 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1135 let _columns = table_data.columns.clone();
1136 drop(table_data);
1137
1138 let empty_row = Row { values: vec![] };
1140 let assignments: Vec<(String, Value)> = node.assignments.iter()
1141 .map(|(col, expr)| {
1142 let value = evaluate_expression(expr, &empty_row, &[])?;
1143 Ok((col.clone(), value))
1144 })
1145 .collect::<ExecutorResult<Vec<_>>>()?;
1146
1147 let where_clause = node.where_clause.clone();
1149 let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1150 Box::new(move |row: &Row, cols: &[String]| {
1151 evaluate_expression(&wc, row, cols)
1152 .map(|v| matches!(v, Value::Boolean(true)))
1153 .unwrap_or(false)
1154 }) as Box<dyn Fn(&Row, &[String]) -> bool>
1155 });
1156
1157 let updated = context.update_rows(
1158 &node.table_name,
1159 &assignments,
1160 predicate.as_ref().map(|p| p.as_ref()),
1161 )?;
1162
1163 Ok(QueryResult {
1164 columns: vec!["rows_affected".to_string()],
1165 rows: vec![Row { values: vec![Value::Integer(updated as i64)] }],
1166 rows_affected: updated,
1167 })
1168 }
1169
1170 fn execute_delete(&self, node: &DeleteNode) -> ExecutorResult<QueryResult> {
1172 let context = self.context.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1173
1174 let where_clause = node.where_clause.clone();
1176 let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1177 Box::new(move |row: &Row, cols: &[String]| {
1178 evaluate_expression(&wc, row, cols)
1179 .map(|v| matches!(v, Value::Boolean(true)))
1180 .unwrap_or(false)
1181 }) as Box<dyn Fn(&Row, &[String]) -> bool>
1182 });
1183
1184 let deleted = context.delete_rows(
1185 &node.table_name,
1186 predicate.as_ref().map(|p| p.as_ref()),
1187 )?;
1188
1189 Ok(QueryResult {
1190 columns: vec!["rows_affected".to_string()],
1191 rows: vec![Row { values: vec![Value::Integer(deleted as i64)] }],
1192 rows_affected: deleted,
1193 })
1194 }
1195
1196 fn create_operator<'a>(&'a self, node: &PlanNode, context: &'a ExecutionContext) -> ExecutorResult<Box<dyn Operator + 'a>> {
1198 match node {
1199 PlanNode::Scan(scan) => Ok(Box::new(ScanOperator::new(scan.clone(), context)?)),
1200
1201 PlanNode::Filter(filter) => {
1202 let input = self.create_operator(&filter.input, context)?;
1203 Ok(Box::new(FilterOperator::new(
1204 input,
1205 filter.predicate.clone(),
1206 )))
1207 }
1208
1209 PlanNode::Project(project) => {
1210 let input = self.create_operator(&project.input, context)?;
1211 Ok(Box::new(ProjectOperator::new(
1212 input,
1213 project.expressions.clone(),
1214 )))
1215 }
1216
1217 PlanNode::Join(join) => {
1218 let left = self.create_operator(&join.left, context)?;
1219 let right = self.create_operator(&join.right, context)?;
1220 Ok(Box::new(JoinOperator::new(
1221 left,
1222 right,
1223 join.join_type,
1224 join.condition.clone(),
1225 join.strategy,
1226 )?))
1227 }
1228
1229 PlanNode::Aggregate(agg) => {
1230 let input = self.create_operator(&agg.input, context)?;
1231 Ok(Box::new(AggregateOperator::new(
1232 input,
1233 agg.group_by.clone(),
1234 agg.aggregates.clone(),
1235 )))
1236 }
1237
1238 PlanNode::Sort(sort) => {
1239 let input = self.create_operator(&sort.input, context)?;
1240 Ok(Box::new(SortOperator::new(input, sort.order_by.clone())))
1241 }
1242
1243 PlanNode::Limit(limit) => {
1244 let input = self.create_operator(&limit.input, context)?;
1245 Ok(Box::new(LimitOperator::new(input, limit.limit, limit.offset)))
1246 }
1247
1248 PlanNode::Empty => Ok(Box::new(EmptyOperator::new())),
1249
1250 PlanNode::CreateTable(_) | PlanNode::DropTable(_) | PlanNode::AlterTable(_) |
1252 PlanNode::CreateIndex(_) | PlanNode::DropIndex(_) |
1253 PlanNode::Insert(_) | PlanNode::Update(_) | PlanNode::Delete(_) => {
1254 Err(ExecutorError::Internal("DDL/DML nodes should not be in operator tree".to_string()))
1255 }
1256 }
1257 }
1258}
1259
1260pub trait Operator {
1266 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>>;
1268
1269 fn columns(&self) -> &[String];
1271}
1272
1273struct ScanOperator {
1278 table: Arc<RwLock<TableData>>,
1279 columns: Vec<String>,
1280 position: usize,
1281 batch_size: usize,
1282 cached_rows: Option<Vec<Row>>,
1284}
1285
1286impl ScanOperator {
1287 fn new(scan: ScanNode, context: &ExecutionContext) -> ExecutorResult<Self> {
1288 let table = context
1289 .get_table(&scan.table_name)
1290 .ok_or_else(|| ExecutorError::TableNotFound(scan.table_name.clone()))?;
1291
1292 let columns = {
1294 let table_data = table.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1295 if scan.columns.is_empty() {
1296 table_data.columns.clone()
1297 } else {
1298 scan.columns.clone()
1299 }
1300 }; Ok(Self {
1303 table,
1304 columns,
1305 position: 0,
1306 batch_size: context.batch_size(),
1307 cached_rows: None,
1308 })
1309 }
1310}
1311
1312impl Operator for ScanOperator {
1313 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1314 if self.cached_rows.is_none() {
1316 let table_data = self.table.read().map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1317 self.cached_rows = Some(table_data.rows.clone());
1318 }
1319
1320 let rows = self.cached_rows.as_ref().expect("cached_rows was just set to Some");
1322
1323 if self.position >= rows.len() {
1324 return Ok(None);
1325 }
1326
1327 let end = (self.position + self.batch_size).min(rows.len());
1328 let batch_rows: Vec<Row> = rows[self.position..end].to_vec();
1329 self.position = end;
1330
1331 Ok(Some(ResultBatch::with_rows(self.columns.clone(), batch_rows)))
1332 }
1333
1334 fn columns(&self) -> &[String] {
1335 &self.columns
1336 }
1337}
1338
1339struct FilterOperator<'a> {
1344 input: Box<dyn Operator + 'a>,
1345 predicate: PlanExpression,
1346 columns: Vec<String>,
1347}
1348
1349impl<'a> FilterOperator<'a> {
1350 fn new(input: Box<dyn Operator + 'a>, predicate: PlanExpression) -> Self {
1351 let columns = input.columns().to_vec();
1352 Self {
1353 input,
1354 predicate,
1355 columns,
1356 }
1357 }
1358
1359 fn evaluate_predicate(&self, row: &Row, columns: &[String]) -> ExecutorResult<bool> {
1360 let value = evaluate_expression(&self.predicate, row, columns)?;
1361 match value {
1362 Value::Boolean(b) => Ok(b),
1363 Value::Null => Ok(false),
1364 _ => Err(ExecutorError::TypeMismatch {
1365 expected: "boolean".to_string(),
1366 actual: format!("{:?}", value),
1367 }),
1368 }
1369 }
1370}
1371
1372impl<'a> Operator for FilterOperator<'a> {
1373 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1374 while let Some(batch) = self.input.next_batch()? {
1375 let filtered: Vec<Row> = batch
1376 .rows
1377 .into_iter()
1378 .filter(|row| self.evaluate_predicate(row, &batch.columns).unwrap_or(false))
1379 .collect();
1380
1381 if !filtered.is_empty() {
1382 return Ok(Some(ResultBatch::with_rows(self.columns.clone(), filtered)));
1383 }
1384 }
1385 Ok(None)
1386 }
1387
1388 fn columns(&self) -> &[String] {
1389 &self.columns
1390 }
1391}
1392
1393struct ProjectOperator<'a> {
1398 input: Box<dyn Operator + 'a>,
1399 expressions: Vec<crate::planner::ProjectionExpr>,
1400 columns: Vec<String>,
1401 input_columns: Vec<String>,
1402}
1403
1404impl<'a> ProjectOperator<'a> {
1405 fn new(input: Box<dyn Operator + 'a>, expressions: Vec<crate::planner::ProjectionExpr>) -> Self {
1406 let input_columns = input.columns().to_vec();
1407
1408 let mut expanded_expressions = Vec::new();
1410 let mut columns = Vec::new();
1411
1412 for (i, proj_expr) in expressions.iter().enumerate() {
1413 if let PlanExpression::Column { name, table, .. } = &proj_expr.expr {
1415 if name == "*" {
1416 for input_col in &input_columns {
1418 let _ = table; expanded_expressions.push(crate::planner::ProjectionExpr {
1422 expr: PlanExpression::Column {
1423 table: None,
1424 name: input_col.clone(),
1425 data_type: DataType::Any,
1426 },
1427 alias: None,
1428 });
1429 columns.push(input_col.clone());
1430 }
1431 continue;
1432 }
1433 }
1434
1435 expanded_expressions.push(proj_expr.clone());
1437 let col_name = proj_expr.alias.clone().unwrap_or_else(|| {
1438 extract_column_name(&proj_expr.expr)
1439 .unwrap_or_else(|| format!("column_{}", i))
1440 });
1441 columns.push(col_name);
1442 }
1443
1444 Self {
1445 input,
1446 expressions: expanded_expressions,
1447 columns,
1448 input_columns,
1449 }
1450 }
1451}
1452
1453fn extract_column_name(expr: &PlanExpression) -> Option<String> {
1455 match expr {
1456 PlanExpression::Column { name, .. } => Some(name.clone()),
1457 PlanExpression::Function { name, .. } => Some(name.clone()),
1458 PlanExpression::Cast { expr, .. } => extract_column_name(expr),
1459 PlanExpression::UnaryOp { expr, .. } => extract_column_name(expr),
1460 _ => None,
1461 }
1462}
1463
1464impl<'a> Operator for ProjectOperator<'a> {
1465 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1466 if let Some(batch) = self.input.next_batch()? {
1467 let mut result_rows = Vec::with_capacity(batch.rows.len());
1468
1469 for row in &batch.rows {
1470 let mut projected_values = Vec::with_capacity(self.expressions.len());
1471
1472 for expr in &self.expressions {
1473 let value = evaluate_expression(&expr.expr, row, &self.input_columns)?;
1474 projected_values.push(value);
1475 }
1476
1477 result_rows.push(Row {
1478 values: projected_values,
1479 });
1480 }
1481
1482 Ok(Some(ResultBatch::with_rows(
1483 self.columns.clone(),
1484 result_rows,
1485 )))
1486 } else {
1487 Ok(None)
1488 }
1489 }
1490
1491 fn columns(&self) -> &[String] {
1492 &self.columns
1493 }
1494}
1495
1496#[allow(dead_code)]
1501struct JoinOperator<'a> {
1502 left: Box<dyn Operator + 'a>,
1503 right: Box<dyn Operator + 'a>,
1504 join_type: PlanJoinType,
1505 condition: Option<PlanExpression>,
1506 _strategy: JoinStrategy,
1507 columns: Vec<String>,
1508 left_columns: Vec<String>,
1509 right_columns: Vec<String>,
1510 right_data: Option<Vec<Row>>,
1511 left_batch: Option<ResultBatch>,
1512 left_row_idx: usize,
1513 right_row_idx: usize,
1514}
1515
1516impl<'a> JoinOperator<'a> {
1517 fn new(
1518 left: Box<dyn Operator + 'a>,
1519 right: Box<dyn Operator + 'a>,
1520 join_type: PlanJoinType,
1521 condition: Option<PlanExpression>,
1522 strategy: JoinStrategy,
1523 ) -> ExecutorResult<Self> {
1524 let left_columns = left.columns().to_vec();
1525 let right_columns = right.columns().to_vec();
1526
1527 let mut columns = left_columns.clone();
1528 columns.extend(right_columns.clone());
1529
1530 Ok(Self {
1531 left,
1532 right,
1533 join_type,
1534 condition,
1535 _strategy: strategy,
1536 columns,
1537 left_columns,
1538 right_columns,
1539 right_data: None,
1540 left_batch: None,
1541 left_row_idx: 0,
1542 right_row_idx: 0,
1543 })
1544 }
1545
1546 fn materialize_right(&mut self) -> ExecutorResult<()> {
1547 if self.right_data.is_some() {
1548 return Ok(());
1549 }
1550
1551 let mut all_rows = Vec::new();
1552 while let Some(batch) = self.right.next_batch()? {
1553 all_rows.extend(batch.rows);
1554 }
1555 self.right_data = Some(all_rows);
1556 Ok(())
1557 }
1558
1559 fn evaluate_join_condition(&self, left: &Row, right: &Row) -> ExecutorResult<bool> {
1560 match &self.condition {
1561 None => Ok(true),
1562 Some(expr) => {
1563 let mut combined = left.values.clone();
1564 combined.extend(right.values.clone());
1565 let combined_row = Row { values: combined };
1566
1567 let value = evaluate_expression(expr, &combined_row, &self.columns)?;
1568 match value {
1569 Value::Boolean(b) => Ok(b),
1570 Value::Null => Ok(false),
1571 _ => Ok(false),
1572 }
1573 }
1574 }
1575 }
1576}
1577
1578impl<'a> Operator for JoinOperator<'a> {
1579 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1580 self.materialize_right()?;
1581 let right_data = self.right_data.as_ref().expect("right_data was set by materialize_right");
1583
1584 let mut result_rows = Vec::new();
1585
1586 loop {
1587 if self.left_batch.is_none() {
1588 self.left_batch = self.left.next_batch()?;
1589 self.left_row_idx = 0;
1590 self.right_row_idx = 0;
1591
1592 if self.left_batch.is_none() {
1593 break;
1594 }
1595 }
1596
1597 let left_batch = self.left_batch.as_ref().expect("left_batch verified to be Some");
1599
1600 while self.left_row_idx < left_batch.rows.len() {
1601 let left_row = &left_batch.rows[self.left_row_idx];
1602
1603 while self.right_row_idx < right_data.len() {
1604 let right_row = &right_data[self.right_row_idx];
1605 self.right_row_idx += 1;
1606
1607 if self.evaluate_join_condition(left_row, right_row)? {
1608 let mut combined = left_row.values.clone();
1609 combined.extend(right_row.values.clone());
1610 result_rows.push(Row { values: combined });
1611
1612 if result_rows.len() >= 1024 {
1613 return Ok(Some(ResultBatch::with_rows(
1614 self.columns.clone(),
1615 result_rows,
1616 )));
1617 }
1618 }
1619 }
1620
1621 self.left_row_idx += 1;
1622 self.right_row_idx = 0;
1623 }
1624
1625 self.left_batch = None;
1626 }
1627
1628 if result_rows.is_empty() {
1629 Ok(None)
1630 } else {
1631 Ok(Some(ResultBatch::with_rows(
1632 self.columns.clone(),
1633 result_rows,
1634 )))
1635 }
1636 }
1637
1638 fn columns(&self) -> &[String] {
1639 &self.columns
1640 }
1641}
1642
1643struct AggregateOperator<'a> {
1648 input: Box<dyn Operator + 'a>,
1649 _group_by: Vec<PlanExpression>,
1650 aggregates: Vec<crate::planner::AggregateExpr>,
1651 columns: Vec<String>,
1652 input_columns: Vec<String>,
1653 done: bool,
1654}
1655
1656impl<'a> AggregateOperator<'a> {
1657 fn new(
1658 input: Box<dyn Operator + 'a>,
1659 group_by: Vec<PlanExpression>,
1660 aggregates: Vec<crate::planner::AggregateExpr>,
1661 ) -> Self {
1662 let input_columns = input.columns().to_vec();
1663
1664 let columns: Vec<String> = aggregates
1665 .iter()
1666 .enumerate()
1667 .map(|(i, agg)| {
1668 agg.alias
1669 .clone()
1670 .unwrap_or_else(|| format!("agg_{}", i))
1671 })
1672 .collect();
1673
1674 Self {
1675 input,
1676 _group_by: group_by,
1677 aggregates,
1678 columns,
1679 input_columns,
1680 done: false,
1681 }
1682 }
1683}
1684
1685impl<'a> Operator for AggregateOperator<'a> {
1686 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1687 if self.done {
1688 return Ok(None);
1689 }
1690
1691 let mut accumulators: Vec<Accumulator> = self
1692 .aggregates
1693 .iter()
1694 .map(|agg| Accumulator::new(agg.function))
1695 .collect();
1696
1697 while let Some(batch) = self.input.next_batch()? {
1698 for row in &batch.rows {
1699 for (i, agg) in self.aggregates.iter().enumerate() {
1700 let value = if let Some(ref arg) = agg.argument {
1701 evaluate_expression(arg, row, &self.input_columns)?
1702 } else {
1703 Value::Integer(1)
1704 };
1705 accumulators[i].accumulate(&value)?;
1706 }
1707 }
1708 }
1709
1710 let result_values: Vec<Value> = accumulators.iter().map(|acc| acc.finalize()).collect();
1711
1712 self.done = true;
1713
1714 Ok(Some(ResultBatch::with_rows(
1715 self.columns.clone(),
1716 vec![Row {
1717 values: result_values,
1718 }],
1719 )))
1720 }
1721
1722 fn columns(&self) -> &[String] {
1723 &self.columns
1724 }
1725}
1726
1727struct Accumulator {
1729 function: AggregateFunction,
1730 count: i64,
1731 sum: f64,
1732 min: Option<Value>,
1733 max: Option<Value>,
1734}
1735
1736impl Accumulator {
1737 fn new(function: AggregateFunction) -> Self {
1738 Self {
1739 function,
1740 count: 0,
1741 sum: 0.0,
1742 min: None,
1743 max: None,
1744 }
1745 }
1746
1747 fn accumulate(&mut self, value: &Value) -> ExecutorResult<()> {
1748 if matches!(value, Value::Null) {
1749 return Ok(());
1750 }
1751
1752 self.count += 1;
1753
1754 match self.function {
1755 AggregateFunction::Count => {}
1756 AggregateFunction::Sum | AggregateFunction::Avg => {
1757 self.sum += value_to_f64(value)?;
1758 }
1759 AggregateFunction::Min => {
1760 if self.min.is_none() || compare_values(value, self.min.as_ref().expect("min verified to be Some in || branch"))? == std::cmp::Ordering::Less {
1762 self.min = Some(value.clone());
1763 }
1764 }
1765 AggregateFunction::Max => {
1766 if self.max.is_none() || compare_values(value, self.max.as_ref().expect("max verified to be Some in || branch"))? == std::cmp::Ordering::Greater {
1768 self.max = Some(value.clone());
1769 }
1770 }
1771 }
1772
1773 Ok(())
1774 }
1775
1776 fn finalize(&self) -> Value {
1777 match self.function {
1778 AggregateFunction::Count => Value::Integer(self.count),
1779 AggregateFunction::Sum => Value::Float(self.sum),
1780 AggregateFunction::Avg => {
1781 if self.count == 0 {
1782 Value::Null
1783 } else {
1784 Value::Float(self.sum / self.count as f64)
1785 }
1786 }
1787 AggregateFunction::Min => self.min.clone().unwrap_or(Value::Null),
1788 AggregateFunction::Max => self.max.clone().unwrap_or(Value::Null),
1789 }
1790 }
1791}
1792
1793struct SortOperator<'a> {
1798 input: Box<dyn Operator + 'a>,
1799 order_by: Vec<crate::planner::SortKey>,
1800 columns: Vec<String>,
1801 sorted_data: Option<Vec<Row>>,
1802 position: usize,
1803}
1804
1805impl<'a> SortOperator<'a> {
1806 fn new(input: Box<dyn Operator + 'a>, order_by: Vec<crate::planner::SortKey>) -> Self {
1807 let columns = input.columns().to_vec();
1808 Self {
1809 input,
1810 order_by,
1811 columns,
1812 sorted_data: None,
1813 position: 0,
1814 }
1815 }
1816}
1817
1818impl<'a> Operator for SortOperator<'a> {
1819 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1820 if self.sorted_data.is_none() {
1821 let mut all_rows = Vec::new();
1822 while let Some(batch) = self.input.next_batch()? {
1823 all_rows.extend(batch.rows);
1824 }
1825
1826 let columns = self.columns.clone();
1827 let order_by = self.order_by.clone();
1828
1829 all_rows.sort_by(|a, b| {
1830 for key in &order_by {
1831 let a_val = evaluate_expression(&key.expr, a, &columns).unwrap_or(Value::Null);
1832 let b_val = evaluate_expression(&key.expr, b, &columns).unwrap_or(Value::Null);
1833
1834 let cmp = compare_values(&a_val, &b_val).unwrap_or(std::cmp::Ordering::Equal);
1835
1836 if cmp != std::cmp::Ordering::Equal {
1837 return if key.ascending {
1838 cmp
1839 } else {
1840 cmp.reverse()
1841 };
1842 }
1843 }
1844 std::cmp::Ordering::Equal
1845 });
1846
1847 self.sorted_data = Some(all_rows);
1848 }
1849
1850 let data = self.sorted_data.as_ref().expect("sorted_data was just set to Some");
1852
1853 if self.position >= data.len() {
1854 return Ok(None);
1855 }
1856
1857 let end = (self.position + 1024).min(data.len());
1858 let rows = data[self.position..end].to_vec();
1859 self.position = end;
1860
1861 Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)))
1862 }
1863
1864 fn columns(&self) -> &[String] {
1865 &self.columns
1866 }
1867}
1868
1869struct LimitOperator<'a> {
1874 input: Box<dyn Operator + 'a>,
1875 limit: Option<u64>,
1876 offset: Option<u64>,
1877 columns: Vec<String>,
1878 rows_skipped: u64,
1879 rows_returned: u64,
1880}
1881
1882impl<'a> LimitOperator<'a> {
1883 fn new(input: Box<dyn Operator + 'a>, limit: Option<u64>, offset: Option<u64>) -> Self {
1884 let columns = input.columns().to_vec();
1885 Self {
1886 input,
1887 limit,
1888 offset,
1889 columns,
1890 rows_skipped: 0,
1891 rows_returned: 0,
1892 }
1893 }
1894}
1895
1896impl<'a> Operator for LimitOperator<'a> {
1897 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1898 if let Some(limit) = self.limit {
1899 if self.rows_returned >= limit {
1900 return Ok(None);
1901 }
1902 }
1903
1904 while let Some(batch) = self.input.next_batch()? {
1905 let mut rows = batch.rows;
1906
1907 let offset = self.offset.unwrap_or(0);
1908 if self.rows_skipped < offset {
1909 let skip = (offset - self.rows_skipped) as usize;
1910 if skip >= rows.len() {
1911 self.rows_skipped += rows.len() as u64;
1912 continue;
1913 }
1914 rows = rows[skip..].to_vec();
1915 self.rows_skipped = offset;
1916 }
1917
1918 if let Some(limit) = self.limit {
1919 let remaining = limit - self.rows_returned;
1920 if rows.len() as u64 > remaining {
1921 rows.truncate(remaining as usize);
1922 }
1923 }
1924
1925 if rows.is_empty() {
1926 continue;
1927 }
1928
1929 self.rows_returned += rows.len() as u64;
1930
1931 return Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)));
1932 }
1933
1934 Ok(None)
1935 }
1936
1937 fn columns(&self) -> &[String] {
1938 &self.columns
1939 }
1940}
1941
1942struct EmptyOperator {
1947 done: bool,
1948}
1949
1950impl EmptyOperator {
1951 fn new() -> Self {
1952 Self { done: false }
1953 }
1954}
1955
1956impl Operator for EmptyOperator {
1957 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1958 if self.done {
1959 return Ok(None);
1960 }
1961 self.done = true;
1962 Ok(Some(ResultBatch::with_rows(vec![], vec![Row { values: vec![] }])))
1963 }
1964
1965 fn columns(&self) -> &[String] {
1966 &[]
1967 }
1968}
1969
1970pub struct EvalContext<'a> {
1976 executor: Option<&'a Executor>,
1977}
1978
1979impl<'a> EvalContext<'a> {
1980 pub fn new() -> Self {
1981 Self { executor: None }
1982 }
1983
1984 pub fn with_executor(executor: &'a Executor) -> Self {
1985 Self { executor: Some(executor) }
1986 }
1987
1988 fn execute_subquery(&self, subquery: &PlanNode) -> ExecutorResult<Vec<Row>> {
1990 let executor = self.executor.ok_or_else(|| {
1991 ExecutorError::Internal("Subquery requires execution context".to_string())
1992 })?;
1993 let plan = QueryPlan {
1994 root: subquery.clone(),
1995 estimated_cost: 0.0,
1996 estimated_rows: 0,
1997 };
1998 let result = executor.execute(&plan)?;
1999 Ok(result.rows)
2000 }
2001}
2002
2003impl Default for EvalContext<'_> {
2004 fn default() -> Self {
2005 Self::new()
2006 }
2007}
2008
2009fn evaluate_default_expression(expr: &PlanExpression) -> ExecutorResult<Value> {
2011 let empty_row = Row { values: vec![] };
2012 let empty_columns: Vec<String> = vec![];
2013 evaluate_expression(expr, &empty_row, &empty_columns)
2014}
2015
2016fn substitute_parameters(node: &PlanNode, params: &[Value]) -> ExecutorResult<PlanNode> {
2018 match node {
2019 PlanNode::Scan(scan) => {
2020 let index_scan = if let Some(idx) = &scan.index_scan {
2023 let start = idx.key_range.start.as_ref()
2024 .map(|e| substitute_expr(e, params))
2025 .transpose()?;
2026 let end = idx.key_range.end.as_ref()
2027 .map(|e| substitute_expr(e, params))
2028 .transpose()?;
2029 Some(crate::planner::IndexScan {
2030 index_name: idx.index_name.clone(),
2031 key_range: crate::planner::KeyRange {
2032 start,
2033 start_inclusive: idx.key_range.start_inclusive,
2034 end,
2035 end_inclusive: idx.key_range.end_inclusive,
2036 },
2037 })
2038 } else {
2039 None
2040 };
2041 Ok(PlanNode::Scan(ScanNode {
2042 table_name: scan.table_name.clone(),
2043 alias: scan.alias.clone(),
2044 columns: scan.columns.clone(),
2045 index_scan,
2046 }))
2047 }
2048 PlanNode::Filter(filter) => {
2049 let input = Box::new(substitute_parameters(&filter.input, params)?);
2050 let predicate = substitute_expr(&filter.predicate, params)?;
2051 Ok(PlanNode::Filter(FilterNode { input, predicate }))
2052 }
2053 PlanNode::Project(proj) => {
2054 let input = Box::new(substitute_parameters(&proj.input, params)?);
2055 let expressions = proj.expressions.iter()
2056 .map(|pe| Ok(ProjectionExpr {
2057 expr: substitute_expr(&pe.expr, params)?,
2058 alias: pe.alias.clone(),
2059 }))
2060 .collect::<ExecutorResult<Vec<_>>>()?;
2061 Ok(PlanNode::Project(ProjectNode { input, expressions }))
2062 }
2063 PlanNode::Sort(sort) => {
2064 let input = Box::new(substitute_parameters(&sort.input, params)?);
2065 let order_by = sort.order_by.iter()
2066 .map(|sk| Ok(SortKey {
2067 expr: substitute_expr(&sk.expr, params)?,
2068 ascending: sk.ascending,
2069 nulls_first: sk.nulls_first,
2070 }))
2071 .collect::<ExecutorResult<Vec<_>>>()?;
2072 Ok(PlanNode::Sort(SortNode { input, order_by }))
2073 }
2074 PlanNode::Limit(limit) => {
2075 let input = Box::new(substitute_parameters(&limit.input, params)?);
2076 Ok(PlanNode::Limit(LimitNode {
2077 input,
2078 limit: limit.limit,
2079 offset: limit.offset,
2080 }))
2081 }
2082 PlanNode::Aggregate(agg) => {
2083 let input = Box::new(substitute_parameters(&agg.input, params)?);
2084 let group_by = agg.group_by.iter()
2085 .map(|e| substitute_expr(e, params))
2086 .collect::<ExecutorResult<Vec<_>>>()?;
2087 let aggregates = agg.aggregates.iter()
2088 .map(|ae| Ok(AggregateExpr {
2089 function: ae.function,
2090 argument: ae.argument.as_ref().map(|a| substitute_expr(a, params)).transpose()?,
2091 distinct: ae.distinct,
2092 alias: ae.alias.clone(),
2093 }))
2094 .collect::<ExecutorResult<Vec<_>>>()?;
2095 Ok(PlanNode::Aggregate(AggregateNode { input, group_by, aggregates }))
2096 }
2097 PlanNode::Join(join) => {
2098 let left = Box::new(substitute_parameters(&join.left, params)?);
2099 let right = Box::new(substitute_parameters(&join.right, params)?);
2100 let condition = join.condition.as_ref()
2101 .map(|c| substitute_expr(c, params))
2102 .transpose()?;
2103 Ok(PlanNode::Join(JoinNode {
2104 left,
2105 right,
2106 join_type: join.join_type,
2107 condition,
2108 strategy: join.strategy,
2109 }))
2110 }
2111 PlanNode::Insert(insert) => {
2112 let source = match &insert.source {
2113 InsertPlanSource::Values(rows) => {
2114 let substituted = rows.iter()
2115 .map(|row| row.iter().map(|e| substitute_expr(e, params)).collect::<ExecutorResult<Vec<_>>>())
2116 .collect::<ExecutorResult<Vec<_>>>()?;
2117 InsertPlanSource::Values(substituted)
2118 }
2119 InsertPlanSource::Query(q) => {
2120 InsertPlanSource::Query(Box::new(substitute_parameters(q, params)?))
2121 }
2122 };
2123 Ok(PlanNode::Insert(InsertNode {
2124 table_name: insert.table_name.clone(),
2125 columns: insert.columns.clone(),
2126 source,
2127 }))
2128 }
2129 PlanNode::Update(update) => {
2130 let assignments = update.assignments.iter()
2131 .map(|(col, expr)| Ok((col.clone(), substitute_expr(expr, params)?)))
2132 .collect::<ExecutorResult<Vec<_>>>()?;
2133 let where_clause = update.where_clause.as_ref()
2134 .map(|p| substitute_expr(p, params))
2135 .transpose()?;
2136 Ok(PlanNode::Update(UpdateNode {
2137 table_name: update.table_name.clone(),
2138 assignments,
2139 where_clause,
2140 }))
2141 }
2142 PlanNode::Delete(delete) => {
2143 let where_clause = delete.where_clause.as_ref()
2144 .map(|p| substitute_expr(p, params))
2145 .transpose()?;
2146 Ok(PlanNode::Delete(DeleteNode {
2147 table_name: delete.table_name.clone(),
2148 where_clause,
2149 }))
2150 }
2151 PlanNode::CreateTable(_) |
2153 PlanNode::DropTable(_) |
2154 PlanNode::AlterTable(_) |
2155 PlanNode::CreateIndex(_) |
2156 PlanNode::DropIndex(_) |
2157 PlanNode::Empty => Ok(node.clone()),
2158 }
2159}
2160
2161fn substitute_expr(expr: &PlanExpression, params: &[Value]) -> ExecutorResult<PlanExpression> {
2163 match expr {
2164 PlanExpression::Placeholder(idx) => {
2165 let param_idx = *idx - 1;
2167 params.get(param_idx)
2168 .map(value_to_literal)
2169 .ok_or_else(|| ExecutorError::Internal(format!(
2170 "Missing parameter ${}: expected {} parameters but got {}",
2171 idx, idx, params.len()
2172 )))
2173 }
2174 PlanExpression::BinaryOp { op, left, right } => {
2175 Ok(PlanExpression::BinaryOp {
2176 op: *op,
2177 left: Box::new(substitute_expr(left, params)?),
2178 right: Box::new(substitute_expr(right, params)?),
2179 })
2180 }
2181 PlanExpression::UnaryOp { op, expr: inner } => {
2182 Ok(PlanExpression::UnaryOp {
2183 op: *op,
2184 expr: Box::new(substitute_expr(inner, params)?),
2185 })
2186 }
2187 PlanExpression::Function { name, args, return_type } => {
2188 let new_args = args.iter()
2189 .map(|a| substitute_expr(a, params))
2190 .collect::<ExecutorResult<Vec<_>>>()?;
2191 Ok(PlanExpression::Function { name: name.clone(), args: new_args, return_type: return_type.clone() })
2192 }
2193 PlanExpression::Case { operand, conditions, else_result } => {
2194 let new_operand = operand.as_ref()
2195 .map(|o| substitute_expr(o, params))
2196 .transpose()?
2197 .map(Box::new);
2198 let new_conditions = conditions.iter()
2199 .map(|(cond, result)| Ok((substitute_expr(cond, params)?, substitute_expr(result, params)?)))
2200 .collect::<ExecutorResult<Vec<_>>>()?;
2201 let new_else = else_result.as_ref()
2202 .map(|e| substitute_expr(e, params))
2203 .transpose()?
2204 .map(Box::new);
2205 Ok(PlanExpression::Case {
2206 operand: new_operand,
2207 conditions: new_conditions,
2208 else_result: new_else,
2209 })
2210 }
2211 PlanExpression::InList { expr: inner, list, negated } => {
2212 let new_inner = Box::new(substitute_expr(inner, params)?);
2213 let new_list = list.iter()
2214 .map(|e| substitute_expr(e, params))
2215 .collect::<ExecutorResult<Vec<_>>>()?;
2216 Ok(PlanExpression::InList { expr: new_inner, list: new_list, negated: *negated })
2217 }
2218 PlanExpression::InSubquery { expr: inner, subquery, negated } => {
2219 let new_inner = Box::new(substitute_expr(inner, params)?);
2220 let new_subquery = Box::new(substitute_parameters(subquery, params)?);
2221 Ok(PlanExpression::InSubquery { expr: new_inner, subquery: new_subquery, negated: *negated })
2222 }
2223 PlanExpression::Exists { subquery, negated } => {
2224 let new_subquery = Box::new(substitute_parameters(subquery, params)?);
2225 Ok(PlanExpression::Exists { subquery: new_subquery, negated: *negated })
2226 }
2227 PlanExpression::ScalarSubquery(subquery) => {
2228 let new_subquery = Box::new(substitute_parameters(subquery, params)?);
2229 Ok(PlanExpression::ScalarSubquery(new_subquery))
2230 }
2231 PlanExpression::Between { expr: inner, low, high, negated } => {
2232 Ok(PlanExpression::Between {
2233 expr: Box::new(substitute_expr(inner, params)?),
2234 low: Box::new(substitute_expr(low, params)?),
2235 high: Box::new(substitute_expr(high, params)?),
2236 negated: *negated,
2237 })
2238 }
2239 PlanExpression::Like { expr: inner, pattern, negated } => {
2240 Ok(PlanExpression::Like {
2241 expr: Box::new(substitute_expr(inner, params)?),
2242 pattern: Box::new(substitute_expr(pattern, params)?),
2243 negated: *negated,
2244 })
2245 }
2246 PlanExpression::IsNull { expr: inner, negated } => {
2247 Ok(PlanExpression::IsNull {
2248 expr: Box::new(substitute_expr(inner, params)?),
2249 negated: *negated,
2250 })
2251 }
2252 PlanExpression::Cast { expr: inner, target_type } => {
2253 Ok(PlanExpression::Cast {
2254 expr: Box::new(substitute_expr(inner, params)?),
2255 target_type: target_type.clone(),
2256 })
2257 }
2258 PlanExpression::Literal(_) | PlanExpression::Column { .. } => Ok(expr.clone()),
2260 }
2261}
2262
2263fn value_to_literal(value: &Value) -> PlanExpression {
2265 let lit = match value {
2266 Value::Null => PlanLiteral::Null,
2267 Value::Boolean(b) => PlanLiteral::Boolean(*b),
2268 Value::Integer(i) => PlanLiteral::Integer(*i),
2269 Value::Float(f) => PlanLiteral::Float(*f),
2270 Value::String(s) => PlanLiteral::String(s.clone()),
2271 Value::Bytes(b) => PlanLiteral::String(String::from_utf8_lossy(b).to_string()),
2272 Value::Timestamp(t) => PlanLiteral::Integer(t.timestamp_millis()),
2273 Value::Array(arr) => PlanLiteral::String(format!("{:?}", arr)),
2275 Value::Object(obj) => PlanLiteral::String(format!("{:?}", obj)),
2276 };
2277 PlanExpression::Literal(lit)
2278}
2279
2280fn evaluate_expression(
2282 expr: &PlanExpression,
2283 row: &Row,
2284 columns: &[String],
2285) -> ExecutorResult<Value> {
2286 evaluate_expression_with_context(expr, row, columns, &EvalContext::new())
2287}
2288
2289fn evaluate_expression_with_context(
2291 expr: &PlanExpression,
2292 row: &Row,
2293 columns: &[String],
2294 ctx: &EvalContext,
2295) -> ExecutorResult<Value> {
2296 match expr {
2297 PlanExpression::Literal(lit) => Ok(match lit {
2298 PlanLiteral::Null => Value::Null,
2299 PlanLiteral::Boolean(b) => Value::Boolean(*b),
2300 PlanLiteral::Integer(i) => Value::Integer(*i),
2301 PlanLiteral::Float(f) => Value::Float(*f),
2302 PlanLiteral::String(s) => Value::String(s.clone()),
2303 }),
2304
2305 PlanExpression::Column { table: _, name, .. } => {
2306 if name == "*" {
2307 return Ok(Value::Null);
2308 }
2309
2310 let idx = columns
2311 .iter()
2312 .position(|c| c == name)
2313 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
2314
2315 Ok(row.values.get(idx).cloned().unwrap_or(Value::Null))
2316 }
2317
2318 PlanExpression::BinaryOp { left, op, right } => {
2319 let left_val = evaluate_expression_with_context(left, row, columns, ctx)?;
2320 let right_val = evaluate_expression_with_context(right, row, columns, ctx)?;
2321 evaluate_binary_op(*op, &left_val, &right_val)
2322 }
2323
2324 PlanExpression::UnaryOp { op, expr } => {
2325 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2326 evaluate_unary_op(*op, &val)
2327 }
2328
2329 PlanExpression::IsNull { expr, negated } => {
2330 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2331 let is_null = matches!(val, Value::Null);
2332 Ok(Value::Boolean(if *negated { !is_null } else { is_null }))
2333 }
2334
2335 PlanExpression::Cast { expr, target_type } => {
2336 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2337 cast_value(&val, target_type)
2338 }
2339
2340 PlanExpression::Function { name, args, .. } => {
2341 let arg_values: Vec<Value> = args
2342 .iter()
2343 .map(|a| evaluate_expression_with_context(a, row, columns, ctx))
2344 .collect::<ExecutorResult<Vec<_>>>()?;
2345 evaluate_function(name, &arg_values)
2346 }
2347
2348 PlanExpression::Case { operand, conditions, else_result } => {
2349 match operand {
2350 Some(operand_expr) => {
2351 let operand_val = evaluate_expression_with_context(operand_expr, row, columns, ctx)?;
2353 for (when_expr, then_expr) in conditions {
2354 let when_val = evaluate_expression_with_context(when_expr, row, columns, ctx)?;
2355 if compare_values(&operand_val, &when_val)? == std::cmp::Ordering::Equal {
2356 return evaluate_expression_with_context(then_expr, row, columns, ctx);
2357 }
2358 }
2359 }
2360 None => {
2361 for (when_expr, then_expr) in conditions {
2363 let when_val = evaluate_expression_with_context(when_expr, row, columns, ctx)?;
2364 if matches!(when_val, Value::Boolean(true)) {
2365 return evaluate_expression_with_context(then_expr, row, columns, ctx);
2366 }
2367 }
2368 }
2369 }
2370 match else_result {
2372 Some(else_expr) => evaluate_expression_with_context(else_expr, row, columns, ctx),
2373 None => Ok(Value::Null),
2374 }
2375 }
2376
2377 PlanExpression::InList { expr, list, negated } => {
2378 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2379 let mut found = false;
2380 for item in list {
2381 let item_val = evaluate_expression_with_context(item, row, columns, ctx)?;
2382 if compare_values(&val, &item_val)? == std::cmp::Ordering::Equal {
2383 found = true;
2384 break;
2385 }
2386 }
2387 Ok(Value::Boolean(if *negated { !found } else { found }))
2388 }
2389
2390 PlanExpression::Between { expr, low, high, negated } => {
2391 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2392 let low_val = evaluate_expression_with_context(low, row, columns, ctx)?;
2393 let high_val = evaluate_expression_with_context(high, row, columns, ctx)?;
2394
2395 let ge_low = compare_values(&val, &low_val)? != std::cmp::Ordering::Less;
2396 let le_high = compare_values(&val, &high_val)? != std::cmp::Ordering::Greater;
2397 let in_range = ge_low && le_high;
2398
2399 Ok(Value::Boolean(if *negated { !in_range } else { in_range }))
2400 }
2401
2402 PlanExpression::Like { expr, pattern, negated } => {
2403 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2404 let pattern_val = evaluate_expression_with_context(pattern, row, columns, ctx)?;
2405
2406 let val_str = match val {
2407 Value::String(s) => s,
2408 Value::Null => return Ok(Value::Null),
2409 Value::Integer(i) => i.to_string(),
2410 Value::Float(f) => f.to_string(),
2411 Value::Boolean(b) => b.to_string(),
2412 Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
2413 Value::Timestamp(t) => t.to_rfc3339(),
2414 Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
2415 };
2416
2417 let pattern_str = match pattern_val {
2418 Value::String(s) => s,
2419 Value::Null => return Ok(Value::Null),
2420 Value::Integer(i) => i.to_string(),
2421 Value::Float(f) => f.to_string(),
2422 Value::Boolean(b) => b.to_string(),
2423 Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
2424 Value::Timestamp(t) => t.to_rfc3339(),
2425 Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
2426 };
2427
2428 let regex_pattern = pattern_str
2430 .replace('%', ".*")
2431 .replace('_', ".");
2432 let regex_pattern = format!("^{}$", regex_pattern);
2433
2434 let matches = if regex_pattern == "^.*$" {
2436 true
2437 } else if regex_pattern.starts_with("^") && regex_pattern.ends_with("$") {
2438 let inner = ®ex_pattern[1..regex_pattern.len()-1];
2439 if inner.contains(".*") || inner.contains('.') {
2440 let parts: Vec<&str> = inner.split(".*").collect();
2442 if parts.len() == 1 {
2443 val_str == parts[0]
2444 } else {
2445 let mut pos = 0;
2446 let mut matched = true;
2447 for (i, part) in parts.iter().enumerate() {
2448 if part.is_empty() { continue; }
2449 if let Some(found_pos) = val_str[pos..].find(part) {
2450 if i == 0 && found_pos != 0 {
2451 matched = false;
2452 break;
2453 }
2454 pos += found_pos + part.len();
2455 } else {
2456 matched = false;
2457 break;
2458 }
2459 }
2460 matched
2461 }
2462 } else {
2463 val_str == inner
2464 }
2465 } else {
2466 val_str.contains(&pattern_str)
2467 };
2468
2469 Ok(Value::Boolean(if *negated { !matches } else { matches }))
2470 }
2471
2472 PlanExpression::InSubquery { expr, subquery, negated } => {
2473 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
2475
2476 let subquery_rows = ctx.execute_subquery(subquery)?;
2478
2479 let mut found = false;
2481 for subquery_row in &subquery_rows {
2482 if let Some(subquery_val) = subquery_row.values.first() {
2483 if compare_values(&val, subquery_val)? == std::cmp::Ordering::Equal {
2484 found = true;
2485 break;
2486 }
2487 }
2488 }
2489
2490 Ok(Value::Boolean(if *negated { !found } else { found }))
2491 }
2492
2493 PlanExpression::Exists { subquery, negated } => {
2494 let subquery_rows = ctx.execute_subquery(subquery)?;
2496 let exists = !subquery_rows.is_empty();
2497
2498 Ok(Value::Boolean(if *negated { !exists } else { exists }))
2499 }
2500
2501 PlanExpression::ScalarSubquery(subquery) => {
2502 let subquery_rows = ctx.execute_subquery(subquery)?;
2504
2505 if subquery_rows.is_empty() {
2506 return Ok(Value::Null);
2507 }
2508
2509 if subquery_rows.len() > 1 {
2510 return Err(ExecutorError::Internal(
2511 "Scalar subquery returned more than one row".to_string()
2512 ));
2513 }
2514
2515 subquery_rows[0]
2517 .values
2518 .first()
2519 .cloned()
2520 .ok_or_else(|| ExecutorError::Internal("Scalar subquery returned no columns".to_string()))
2521 }
2522
2523 PlanExpression::Placeholder(idx) => {
2524 Err(ExecutorError::Internal(format!("Unresolved placeholder ${}", idx)))
2526 }
2527 }
2528}
2529
2530fn evaluate_binary_op(op: PlanBinaryOp, left: &Value, right: &Value) -> ExecutorResult<Value> {
2531 if matches!(left, Value::Null) || matches!(right, Value::Null) {
2532 if matches!(op, PlanBinaryOp::Equal | PlanBinaryOp::NotEqual) {
2533 return Ok(Value::Null);
2534 }
2535 return Ok(Value::Null);
2536 }
2537
2538 match op {
2539 PlanBinaryOp::Add => {
2540 let l = value_to_f64(left)?;
2541 let r = value_to_f64(right)?;
2542 Ok(Value::Float(l + r))
2543 }
2544 PlanBinaryOp::Subtract => {
2545 let l = value_to_f64(left)?;
2546 let r = value_to_f64(right)?;
2547 Ok(Value::Float(l - r))
2548 }
2549 PlanBinaryOp::Multiply => {
2550 let l = value_to_f64(left)?;
2551 let r = value_to_f64(right)?;
2552 Ok(Value::Float(l * r))
2553 }
2554 PlanBinaryOp::Divide => {
2555 let l = value_to_f64(left)?;
2556 let r = value_to_f64(right)?;
2557 if r == 0.0 {
2558 return Err(ExecutorError::DivisionByZero);
2559 }
2560 Ok(Value::Float(l / r))
2561 }
2562 PlanBinaryOp::Modulo => {
2563 let l = value_to_i64(left)?;
2564 let r = value_to_i64(right)?;
2565 if r == 0 {
2566 return Err(ExecutorError::DivisionByZero);
2567 }
2568 Ok(Value::Integer(l % r))
2569 }
2570 PlanBinaryOp::Equal => Ok(Value::Boolean(compare_values(left, right)? == std::cmp::Ordering::Equal)),
2571 PlanBinaryOp::NotEqual => Ok(Value::Boolean(compare_values(left, right)? != std::cmp::Ordering::Equal)),
2572 PlanBinaryOp::LessThan => Ok(Value::Boolean(compare_values(left, right)? == std::cmp::Ordering::Less)),
2573 PlanBinaryOp::LessThanOrEqual => Ok(Value::Boolean(compare_values(left, right)? != std::cmp::Ordering::Greater)),
2574 PlanBinaryOp::GreaterThan => Ok(Value::Boolean(compare_values(left, right)? == std::cmp::Ordering::Greater)),
2575 PlanBinaryOp::GreaterThanOrEqual => Ok(Value::Boolean(compare_values(left, right)? != std::cmp::Ordering::Less)),
2576 PlanBinaryOp::And => {
2577 let l = value_to_bool(left)?;
2578 let r = value_to_bool(right)?;
2579 Ok(Value::Boolean(l && r))
2580 }
2581 PlanBinaryOp::Or => {
2582 let l = value_to_bool(left)?;
2583 let r = value_to_bool(right)?;
2584 Ok(Value::Boolean(l || r))
2585 }
2586 PlanBinaryOp::Concat => {
2587 let l = value_to_string(left);
2588 let r = value_to_string(right);
2589 Ok(Value::String(format!("{}{}", l, r)))
2590 }
2591 }
2592}
2593
2594fn evaluate_unary_op(op: PlanUnaryOp, value: &Value) -> ExecutorResult<Value> {
2595 match op {
2596 PlanUnaryOp::Not => {
2597 let b = value_to_bool(value)?;
2598 Ok(Value::Boolean(!b))
2599 }
2600 PlanUnaryOp::Negative => {
2601 let f = value_to_f64(value)?;
2602 Ok(Value::Float(-f))
2603 }
2604 }
2605}
2606
2607fn evaluate_function(name: &str, args: &[Value]) -> ExecutorResult<Value> {
2608 match name.to_uppercase().as_str() {
2609 "UPPER" => {
2610 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
2611 Ok(Value::String(s.to_uppercase()))
2612 }
2613 "LOWER" => {
2614 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
2615 Ok(Value::String(s.to_lowercase()))
2616 }
2617 "LENGTH" => {
2618 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
2619 Ok(Value::Integer(s.len() as i64))
2620 }
2621 "ABS" => {
2622 let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
2623 Ok(Value::Float(f.abs()))
2624 }
2625 "COALESCE" => {
2626 for arg in args {
2627 if !matches!(arg, Value::Null) {
2628 return Ok(arg.clone());
2629 }
2630 }
2631 Ok(Value::Null)
2632 }
2633 _ => Err(ExecutorError::InvalidOperation(format!(
2634 "Unknown function: {}",
2635 name
2636 ))),
2637 }
2638}
2639
2640fn value_to_f64(value: &Value) -> ExecutorResult<f64> {
2645 match value {
2646 Value::Integer(i) => Ok(*i as f64),
2647 Value::Float(f) => Ok(*f),
2648 Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
2649 expected: "number".to_string(),
2650 actual: "string".to_string(),
2651 }),
2652 _ => Err(ExecutorError::TypeMismatch {
2653 expected: "number".to_string(),
2654 actual: format!("{:?}", value),
2655 }),
2656 }
2657}
2658
2659fn value_to_i64(value: &Value) -> ExecutorResult<i64> {
2660 match value {
2661 Value::Integer(i) => Ok(*i),
2662 Value::Float(f) => Ok(*f as i64),
2663 Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
2664 expected: "integer".to_string(),
2665 actual: "string".to_string(),
2666 }),
2667 _ => Err(ExecutorError::TypeMismatch {
2668 expected: "integer".to_string(),
2669 actual: format!("{:?}", value),
2670 }),
2671 }
2672}
2673
2674fn value_to_bool(value: &Value) -> ExecutorResult<bool> {
2675 match value {
2676 Value::Boolean(b) => Ok(*b),
2677 Value::Integer(i) => Ok(*i != 0),
2678 Value::Null => Ok(false),
2679 _ => Err(ExecutorError::TypeMismatch {
2680 expected: "boolean".to_string(),
2681 actual: format!("{:?}", value),
2682 }),
2683 }
2684}
2685
2686fn value_to_string(value: &Value) -> String {
2687 match value {
2688 Value::String(s) => s.clone(),
2689 Value::Integer(i) => i.to_string(),
2690 Value::Float(f) => f.to_string(),
2691 Value::Boolean(b) => b.to_string(),
2692 Value::Null => String::new(),
2693 _ => format!("{:?}", value),
2694 }
2695}
2696
2697fn compare_values(left: &Value, right: &Value) -> ExecutorResult<std::cmp::Ordering> {
2698 match (left, right) {
2699 (Value::Integer(l), Value::Integer(r)) => Ok(l.cmp(r)),
2700 (Value::Float(l), Value::Float(r)) => {
2701 Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
2702 }
2703 (Value::Integer(l), Value::Float(r)) => {
2704 let l = *l as f64;
2705 Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
2706 }
2707 (Value::Float(l), Value::Integer(r)) => {
2708 let r = *r as f64;
2709 Ok(l.partial_cmp(&r).unwrap_or(std::cmp::Ordering::Equal))
2710 }
2711 (Value::String(l), Value::String(r)) => Ok(l.cmp(r)),
2712 (Value::Boolean(l), Value::Boolean(r)) => Ok(l.cmp(r)),
2713 _ => Ok(std::cmp::Ordering::Equal),
2714 }
2715}
2716
2717fn cast_value(value: &Value, target_type: &DataType) -> ExecutorResult<Value> {
2718 match target_type {
2719 DataType::Integer => Ok(Value::Integer(value_to_i64(value)?)),
2720 DataType::Float => Ok(Value::Float(value_to_f64(value)?)),
2721 DataType::Text => Ok(Value::String(value_to_string(value))),
2722 DataType::Boolean => Ok(Value::Boolean(value_to_bool(value)?)),
2723 _ => Ok(value.clone()),
2724 }
2725}
2726
2727#[cfg(test)]
2732mod tests {
2733 use super::*;
2734 use crate::planner::{LimitNode, PlanNode, ProjectNode, ProjectionExpr, QueryPlan, ScanNode};
2735
2736 fn create_test_context() -> ExecutionContext {
2737 let mut context = ExecutionContext::new();
2738
2739 context.add_table(TableData {
2740 name: "users".to_string(),
2741 columns: vec![
2742 "id".to_string(),
2743 "name".to_string(),
2744 "age".to_string(),
2745 ],
2746 rows: vec![
2747 Row {
2748 values: vec![
2749 Value::Integer(1),
2750 Value::String("Alice".to_string()),
2751 Value::Integer(30),
2752 ],
2753 },
2754 Row {
2755 values: vec![
2756 Value::Integer(2),
2757 Value::String("Bob".to_string()),
2758 Value::Integer(25),
2759 ],
2760 },
2761 Row {
2762 values: vec![
2763 Value::Integer(3),
2764 Value::String("Charlie".to_string()),
2765 Value::Integer(35),
2766 ],
2767 },
2768 ],
2769 });
2770
2771 context
2772 }
2773
2774 #[test]
2775 fn test_scan_operator() {
2776 let context = create_test_context();
2777 let executor = Executor::new(context);
2778
2779 let plan = QueryPlan {
2780 root: PlanNode::Project(ProjectNode {
2781 input: Box::new(PlanNode::Scan(ScanNode {
2782 table_name: "users".to_string(),
2783 alias: None,
2784 columns: vec![
2785 "id".to_string(),
2786 "name".to_string(),
2787 "age".to_string(),
2788 ],
2789 index_scan: None,
2790 })),
2791 expressions: vec![
2792 ProjectionExpr {
2793 expr: PlanExpression::Column {
2794 table: None,
2795 name: "id".to_string(),
2796 data_type: DataType::Integer,
2797 },
2798 alias: Some("id".to_string()),
2799 },
2800 ProjectionExpr {
2801 expr: PlanExpression::Column {
2802 table: None,
2803 name: "name".to_string(),
2804 data_type: DataType::Text,
2805 },
2806 alias: Some("name".to_string()),
2807 },
2808 ],
2809 }),
2810 estimated_cost: 100.0,
2811 estimated_rows: 3,
2812 };
2813
2814 let result = executor.execute(&plan).unwrap();
2815
2816 assert_eq!(result.rows.len(), 3);
2817 assert_eq!(result.columns.len(), 2);
2818 }
2819
2820 #[test]
2821 fn test_filter_operator() {
2822 let context = create_test_context();
2823 let executor = Executor::new(context);
2824
2825 let plan = QueryPlan {
2826 root: PlanNode::Project(ProjectNode {
2827 input: Box::new(PlanNode::Filter(crate::planner::FilterNode {
2828 input: Box::new(PlanNode::Scan(ScanNode {
2829 table_name: "users".to_string(),
2830 alias: None,
2831 columns: vec![
2832 "id".to_string(),
2833 "name".to_string(),
2834 "age".to_string(),
2835 ],
2836 index_scan: None,
2837 })),
2838 predicate: PlanExpression::BinaryOp {
2839 left: Box::new(PlanExpression::Column {
2840 table: None,
2841 name: "age".to_string(),
2842 data_type: DataType::Integer,
2843 }),
2844 op: PlanBinaryOp::GreaterThan,
2845 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(28))),
2846 },
2847 })),
2848 expressions: vec![ProjectionExpr {
2849 expr: PlanExpression::Column {
2850 table: None,
2851 name: "name".to_string(),
2852 data_type: DataType::Text,
2853 },
2854 alias: Some("name".to_string()),
2855 }],
2856 }),
2857 estimated_cost: 100.0,
2858 estimated_rows: 2,
2859 };
2860
2861 let result = executor.execute(&plan).unwrap();
2862
2863 assert_eq!(result.rows.len(), 2);
2864 }
2865
2866 #[test]
2867 fn test_limit_operator() {
2868 let context = create_test_context();
2869 let executor = Executor::new(context);
2870
2871 let plan = QueryPlan {
2872 root: PlanNode::Limit(LimitNode {
2873 input: Box::new(PlanNode::Project(ProjectNode {
2874 input: Box::new(PlanNode::Scan(ScanNode {
2875 table_name: "users".to_string(),
2876 alias: None,
2877 columns: vec!["id".to_string()],
2878 index_scan: None,
2879 })),
2880 expressions: vec![ProjectionExpr {
2881 expr: PlanExpression::Column {
2882 table: None,
2883 name: "id".to_string(),
2884 data_type: DataType::Integer,
2885 },
2886 alias: Some("id".to_string()),
2887 }],
2888 })),
2889 limit: Some(2),
2890 offset: None,
2891 }),
2892 estimated_cost: 100.0,
2893 estimated_rows: 2,
2894 };
2895
2896 let result = executor.execute(&plan).unwrap();
2897
2898 assert_eq!(result.rows.len(), 2);
2899 }
2900
2901 #[test]
2902 fn test_create_table() {
2903 use crate::planner::{CreateTableNode, CreateColumnDef};
2904
2905 let executor = Executor::new(ExecutionContext::new());
2906
2907 let plan = QueryPlan {
2908 root: PlanNode::CreateTable(CreateTableNode {
2909 table_name: "test_table".to_string(),
2910 columns: vec![
2911 CreateColumnDef {
2912 name: "id".to_string(),
2913 data_type: DataType::Integer,
2914 nullable: false,
2915 default: None,
2916 primary_key: true,
2917 unique: false,
2918 },
2919 CreateColumnDef {
2920 name: "name".to_string(),
2921 data_type: DataType::Text,
2922 nullable: true,
2923 default: None,
2924 primary_key: false,
2925 unique: false,
2926 },
2927 ],
2928 constraints: vec![],
2929 if_not_exists: false,
2930 }),
2931 estimated_cost: 1.0,
2932 estimated_rows: 0,
2933 };
2934
2935 let result = executor.execute(&plan).unwrap();
2936 match &result.rows[0].values[0] {
2937 Value::String(s) => assert!(s.contains("created")),
2938 _ => panic!("Expected string result"),
2939 }
2940
2941 let tables = executor.context.read().unwrap().list_tables();
2943 assert!(tables.contains(&"test_table".to_string()));
2944 }
2945
2946 #[test]
2947 fn test_insert_into_table() {
2948 use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource};
2949
2950 let executor = Executor::new(ExecutionContext::new());
2951
2952 let create_plan = QueryPlan {
2954 root: PlanNode::CreateTable(CreateTableNode {
2955 table_name: "test_insert".to_string(),
2956 columns: vec![
2957 CreateColumnDef {
2958 name: "id".to_string(),
2959 data_type: DataType::Integer,
2960 nullable: false,
2961 default: None,
2962 primary_key: true,
2963 unique: false,
2964 },
2965 CreateColumnDef {
2966 name: "value".to_string(),
2967 data_type: DataType::Text,
2968 nullable: true,
2969 default: None,
2970 primary_key: false,
2971 unique: false,
2972 },
2973 ],
2974 constraints: vec![],
2975 if_not_exists: false,
2976 }),
2977 estimated_cost: 1.0,
2978 estimated_rows: 0,
2979 };
2980 executor.execute(&create_plan).unwrap();
2981
2982 let insert_plan = QueryPlan {
2984 root: PlanNode::Insert(InsertNode {
2985 table_name: "test_insert".to_string(),
2986 columns: vec!["id".to_string(), "value".to_string()],
2987 source: InsertPlanSource::Values(vec![
2988 vec![
2989 PlanExpression::Literal(PlanLiteral::Integer(1)),
2990 PlanExpression::Literal(PlanLiteral::String("hello".to_string())),
2991 ],
2992 vec![
2993 PlanExpression::Literal(PlanLiteral::Integer(2)),
2994 PlanExpression::Literal(PlanLiteral::String("world".to_string())),
2995 ],
2996 ]),
2997 }),
2998 estimated_cost: 2.0,
2999 estimated_rows: 2,
3000 };
3001
3002 let result = executor.execute(&insert_plan).unwrap();
3003 assert_eq!(result.rows_affected, 2);
3004
3005 let query_plan = QueryPlan {
3007 root: PlanNode::Project(ProjectNode {
3008 input: Box::new(PlanNode::Scan(ScanNode {
3009 table_name: "test_insert".to_string(),
3010 alias: None,
3011 columns: vec!["id".to_string(), "value".to_string()],
3012 index_scan: None,
3013 })),
3014 expressions: vec![
3015 ProjectionExpr {
3016 expr: PlanExpression::Column {
3017 table: None,
3018 name: "id".to_string(),
3019 data_type: DataType::Integer,
3020 },
3021 alias: Some("id".to_string()),
3022 },
3023 ],
3024 }),
3025 estimated_cost: 100.0,
3026 estimated_rows: 2,
3027 };
3028
3029 let query_result = executor.execute(&query_plan).unwrap();
3030 assert_eq!(query_result.rows.len(), 2);
3031 }
3032
3033 #[test]
3034 fn test_drop_table() {
3035 use crate::planner::{CreateTableNode, CreateColumnDef, DropTableNode};
3036
3037 let executor = Executor::new(ExecutionContext::new());
3038
3039 let create_plan = QueryPlan {
3041 root: PlanNode::CreateTable(CreateTableNode {
3042 table_name: "to_drop".to_string(),
3043 columns: vec![CreateColumnDef {
3044 name: "id".to_string(),
3045 data_type: DataType::Integer,
3046 nullable: false,
3047 default: None,
3048 primary_key: true,
3049 unique: false,
3050 }],
3051 constraints: vec![],
3052 if_not_exists: false,
3053 }),
3054 estimated_cost: 1.0,
3055 estimated_rows: 0,
3056 };
3057 executor.execute(&create_plan).unwrap();
3058
3059 let tables = executor.context.read().unwrap().list_tables();
3061 assert!(tables.contains(&"to_drop".to_string()));
3062
3063 let drop_plan = QueryPlan {
3065 root: PlanNode::DropTable(DropTableNode {
3066 table_name: "to_drop".to_string(),
3067 if_exists: false,
3068 }),
3069 estimated_cost: 1.0,
3070 estimated_rows: 0,
3071 };
3072 executor.execute(&drop_plan).unwrap();
3073
3074 let tables = executor.context.read().unwrap().list_tables();
3076 assert!(!tables.contains(&"to_drop".to_string()));
3077 }
3078
3079 #[test]
3080 fn test_shared_context_persistence() {
3081 use crate::planner::{CreateTableNode, CreateColumnDef};
3082 use std::sync::{Arc, RwLock};
3083
3084 let shared_context = Arc::new(RwLock::new(ExecutionContext::new()));
3086
3087 let executor = Executor::with_shared_context(shared_context.clone());
3089
3090 let create_plan = QueryPlan {
3092 root: PlanNode::CreateTable(CreateTableNode {
3093 table_name: "shared_table".to_string(),
3094 columns: vec![CreateColumnDef {
3095 name: "id".to_string(),
3096 data_type: DataType::Integer,
3097 nullable: false,
3098 default: None,
3099 primary_key: true,
3100 unique: false,
3101 }],
3102 constraints: vec![],
3103 if_not_exists: false,
3104 }),
3105 estimated_cost: 1.0,
3106 estimated_rows: 0,
3107 };
3108 executor.execute(&create_plan).unwrap();
3109
3110 let executor2 = Executor::with_shared_context(shared_context.clone());
3112
3113 let tables = executor2.context.read().unwrap().list_tables();
3115 assert!(tables.contains(&"shared_table".to_string()));
3116 }
3117
3118 #[test]
3119 fn test_insert_select() {
3120 use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource, FilterNode};
3121
3122 let executor = Executor::new(ExecutionContext::new());
3123
3124 let create_source = QueryPlan {
3126 root: PlanNode::CreateTable(CreateTableNode {
3127 table_name: "source_table".to_string(),
3128 columns: vec![
3129 CreateColumnDef {
3130 name: "id".to_string(),
3131 data_type: DataType::Integer,
3132 nullable: false,
3133 default: None,
3134 primary_key: true,
3135 unique: false,
3136 },
3137 CreateColumnDef {
3138 name: "value".to_string(),
3139 data_type: DataType::Text,
3140 nullable: true,
3141 default: None,
3142 primary_key: false,
3143 unique: false,
3144 },
3145 ],
3146 constraints: vec![],
3147 if_not_exists: false,
3148 }),
3149 estimated_cost: 1.0,
3150 estimated_rows: 0,
3151 };
3152 executor.execute(&create_source).unwrap();
3153
3154 let create_dest = QueryPlan {
3156 root: PlanNode::CreateTable(CreateTableNode {
3157 table_name: "dest_table".to_string(),
3158 columns: vec![
3159 CreateColumnDef {
3160 name: "id".to_string(),
3161 data_type: DataType::Integer,
3162 nullable: false,
3163 default: None,
3164 primary_key: true,
3165 unique: false,
3166 },
3167 CreateColumnDef {
3168 name: "value".to_string(),
3169 data_type: DataType::Text,
3170 nullable: true,
3171 default: None,
3172 primary_key: false,
3173 unique: false,
3174 },
3175 ],
3176 constraints: vec![],
3177 if_not_exists: false,
3178 }),
3179 estimated_cost: 1.0,
3180 estimated_rows: 0,
3181 };
3182 executor.execute(&create_dest).unwrap();
3183
3184 let insert_source = QueryPlan {
3186 root: PlanNode::Insert(InsertNode {
3187 table_name: "source_table".to_string(),
3188 columns: vec!["id".to_string(), "value".to_string()],
3189 source: InsertPlanSource::Values(vec![
3190 vec![
3191 PlanExpression::Literal(PlanLiteral::Integer(1)),
3192 PlanExpression::Literal(PlanLiteral::String("one".to_string())),
3193 ],
3194 vec![
3195 PlanExpression::Literal(PlanLiteral::Integer(2)),
3196 PlanExpression::Literal(PlanLiteral::String("two".to_string())),
3197 ],
3198 vec![
3199 PlanExpression::Literal(PlanLiteral::Integer(3)),
3200 PlanExpression::Literal(PlanLiteral::String("three".to_string())),
3201 ],
3202 ]),
3203 }),
3204 estimated_cost: 3.0,
3205 estimated_rows: 3,
3206 };
3207 executor.execute(&insert_source).unwrap();
3208
3209 let insert_select = QueryPlan {
3211 root: PlanNode::Insert(InsertNode {
3212 table_name: "dest_table".to_string(),
3213 columns: vec!["id".to_string(), "value".to_string()],
3214 source: InsertPlanSource::Query(Box::new(
3215 PlanNode::Project(ProjectNode {
3216 input: Box::new(PlanNode::Filter(FilterNode {
3217 input: Box::new(PlanNode::Scan(ScanNode {
3218 table_name: "source_table".to_string(),
3219 alias: None,
3220 columns: vec!["id".to_string(), "value".to_string()],
3221 index_scan: None,
3222 })),
3223 predicate: PlanExpression::BinaryOp {
3224 left: Box::new(PlanExpression::Column {
3225 table: None,
3226 name: "id".to_string(),
3227 data_type: DataType::Integer,
3228 }),
3229 op: PlanBinaryOp::GreaterThan,
3230 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
3231 },
3232 })),
3233 expressions: vec![
3234 ProjectionExpr {
3235 expr: PlanExpression::Column {
3236 table: None,
3237 name: "id".to_string(),
3238 data_type: DataType::Integer,
3239 },
3240 alias: Some("id".to_string()),
3241 },
3242 ProjectionExpr {
3243 expr: PlanExpression::Column {
3244 table: None,
3245 name: "value".to_string(),
3246 data_type: DataType::Text,
3247 },
3248 alias: Some("value".to_string()),
3249 },
3250 ],
3251 })
3252 )),
3253 }),
3254 estimated_cost: 2.0,
3255 estimated_rows: 2,
3256 };
3257
3258 let result = executor.execute(&insert_select).unwrap();
3259 assert_eq!(result.rows_affected, 2); let query_plan = QueryPlan {
3263 root: PlanNode::Project(ProjectNode {
3264 input: Box::new(PlanNode::Scan(ScanNode {
3265 table_name: "dest_table".to_string(),
3266 alias: None,
3267 columns: vec!["id".to_string(), "value".to_string()],
3268 index_scan: None,
3269 })),
3270 expressions: vec![
3271 ProjectionExpr {
3272 expr: PlanExpression::Column {
3273 table: None,
3274 name: "id".to_string(),
3275 data_type: DataType::Integer,
3276 },
3277 alias: Some("id".to_string()),
3278 },
3279 ],
3280 }),
3281 estimated_cost: 100.0,
3282 estimated_rows: 2,
3283 };
3284
3285 let query_result = executor.execute(&query_plan).unwrap();
3286 assert_eq!(query_result.rows.len(), 2);
3287 }
3288
3289 #[test]
3290 fn test_case_expression() {
3291 let row = Row { values: vec![Value::Integer(2)] };
3292 let columns = vec!["status".to_string()];
3293
3294 let case_expr = PlanExpression::Case {
3296 operand: None,
3297 conditions: vec![
3298 (
3299 PlanExpression::BinaryOp {
3300 left: Box::new(PlanExpression::Column {
3301 table: None,
3302 name: "status".to_string(),
3303 data_type: DataType::Integer,
3304 }),
3305 op: PlanBinaryOp::Equal,
3306 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
3307 },
3308 PlanExpression::Literal(PlanLiteral::String("one".to_string())),
3309 ),
3310 (
3311 PlanExpression::BinaryOp {
3312 left: Box::new(PlanExpression::Column {
3313 table: None,
3314 name: "status".to_string(),
3315 data_type: DataType::Integer,
3316 }),
3317 op: PlanBinaryOp::Equal,
3318 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(2))),
3319 },
3320 PlanExpression::Literal(PlanLiteral::String("two".to_string())),
3321 ),
3322 ],
3323 else_result: Some(Box::new(PlanExpression::Literal(PlanLiteral::String("other".to_string())))),
3324 };
3325
3326 let result = evaluate_expression(&case_expr, &row, &columns).unwrap();
3327 assert_eq!(result, Value::String("two".to_string()));
3328 }
3329
3330 #[test]
3331 fn test_in_list_expression() {
3332 let row = Row { values: vec![Value::Integer(3)] };
3333 let columns = vec!["id".to_string()];
3334
3335 let in_expr = PlanExpression::InList {
3337 expr: Box::new(PlanExpression::Column {
3338 table: None,
3339 name: "id".to_string(),
3340 data_type: DataType::Integer,
3341 }),
3342 list: vec![
3343 PlanExpression::Literal(PlanLiteral::Integer(1)),
3344 PlanExpression::Literal(PlanLiteral::Integer(2)),
3345 PlanExpression::Literal(PlanLiteral::Integer(3)),
3346 PlanExpression::Literal(PlanLiteral::Integer(4)),
3347 PlanExpression::Literal(PlanLiteral::Integer(5)),
3348 ],
3349 negated: false,
3350 };
3351
3352 let result = evaluate_expression(&in_expr, &row, &columns).unwrap();
3353 assert_eq!(result, Value::Boolean(true));
3354
3355 let not_in_expr = PlanExpression::InList {
3357 expr: Box::new(PlanExpression::Column {
3358 table: None,
3359 name: "id".to_string(),
3360 data_type: DataType::Integer,
3361 }),
3362 list: vec![
3363 PlanExpression::Literal(PlanLiteral::Integer(1)),
3364 PlanExpression::Literal(PlanLiteral::Integer(2)),
3365 PlanExpression::Literal(PlanLiteral::Integer(3)),
3366 ],
3367 negated: true,
3368 };
3369
3370 let result = evaluate_expression(¬_in_expr, &row, &columns).unwrap();
3371 assert_eq!(result, Value::Boolean(false));
3372 }
3373
3374 #[test]
3375 fn test_between_expression() {
3376 let row = Row { values: vec![Value::Integer(50)] };
3377 let columns = vec!["value".to_string()];
3378
3379 let between_expr = PlanExpression::Between {
3381 expr: Box::new(PlanExpression::Column {
3382 table: None,
3383 name: "value".to_string(),
3384 data_type: DataType::Integer,
3385 }),
3386 low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
3387 high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(100))),
3388 negated: false,
3389 };
3390
3391 let result = evaluate_expression(&between_expr, &row, &columns).unwrap();
3392 assert_eq!(result, Value::Boolean(true));
3393
3394 let not_between_expr = PlanExpression::Between {
3396 expr: Box::new(PlanExpression::Column {
3397 table: None,
3398 name: "value".to_string(),
3399 data_type: DataType::Integer,
3400 }),
3401 low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
3402 high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(40))),
3403 negated: true,
3404 };
3405
3406 let result = evaluate_expression(¬_between_expr, &row, &columns).unwrap();
3407 assert_eq!(result, Value::Boolean(true));
3408 }
3409
3410 #[test]
3411 fn test_like_expression() {
3412 let row = Row { values: vec![Value::String("hello world".to_string())] };
3413 let columns = vec!["text".to_string()];
3414
3415 let like_expr = PlanExpression::Like {
3417 expr: Box::new(PlanExpression::Column {
3418 table: None,
3419 name: "text".to_string(),
3420 data_type: DataType::Text,
3421 }),
3422 pattern: Box::new(PlanExpression::Literal(PlanLiteral::String("hello%".to_string()))),
3423 negated: false,
3424 };
3425
3426 let result = evaluate_expression(&like_expr, &row, &columns).unwrap();
3427 assert_eq!(result, Value::Boolean(true));
3428
3429 let like_expr2 = PlanExpression::Like {
3431 expr: Box::new(PlanExpression::Column {
3432 table: None,
3433 name: "text".to_string(),
3434 data_type: DataType::Text,
3435 }),
3436 pattern: Box::new(PlanExpression::Literal(PlanLiteral::String("%world".to_string()))),
3437 negated: false,
3438 };
3439
3440 let result = evaluate_expression(&like_expr2, &row, &columns).unwrap();
3441 assert_eq!(result, Value::Boolean(true));
3442
3443 let not_like_expr = PlanExpression::Like {
3445 expr: Box::new(PlanExpression::Column {
3446 table: None,
3447 name: "text".to_string(),
3448 data_type: DataType::Text,
3449 }),
3450 pattern: Box::new(PlanExpression::Literal(PlanLiteral::String("%foo%".to_string()))),
3451 negated: true,
3452 };
3453
3454 let result = evaluate_expression(¬_like_expr, &row, &columns).unwrap();
3455 assert_eq!(result, Value::Boolean(true));
3456 }
3457
3458 #[test]
3459 fn test_alter_table() {
3460 use crate::planner::{CreateTableNode, CreateColumnDef, AlterTableNode, PlanAlterOperation};
3461
3462 let executor = Executor::new(ExecutionContext::new());
3463
3464 let create_plan = QueryPlan {
3466 root: PlanNode::CreateTable(CreateTableNode {
3467 table_name: "alter_test".to_string(),
3468 columns: vec![
3469 CreateColumnDef {
3470 name: "id".to_string(),
3471 data_type: DataType::Integer,
3472 nullable: false,
3473 default: None,
3474 primary_key: true,
3475 unique: false,
3476 },
3477 ],
3478 constraints: vec![],
3479 if_not_exists: false,
3480 }),
3481 estimated_cost: 1.0,
3482 estimated_rows: 0,
3483 };
3484 executor.execute(&create_plan).unwrap();
3485
3486 let add_column_plan = QueryPlan {
3488 root: PlanNode::AlterTable(AlterTableNode {
3489 table_name: "alter_test".to_string(),
3490 operations: vec![
3491 PlanAlterOperation::AddColumn(CreateColumnDef {
3492 name: "name".to_string(),
3493 data_type: DataType::Text,
3494 nullable: true,
3495 default: None,
3496 primary_key: false,
3497 unique: false,
3498 }),
3499 ],
3500 }),
3501 estimated_cost: 1.0,
3502 estimated_rows: 0,
3503 };
3504 let result = executor.execute(&add_column_plan).unwrap();
3505 match &result.rows[0].values[0] {
3506 Value::String(s) => assert!(s.contains("altered")),
3507 _ => panic!("Expected string result"),
3508 }
3509
3510 let schema = executor.context.read().unwrap().get_table_schema("alter_test").unwrap().clone();
3512 assert_eq!(schema.columns.len(), 2);
3513 assert_eq!(schema.columns[1].name, "name");
3514
3515 let rename_column_plan = QueryPlan {
3517 root: PlanNode::AlterTable(AlterTableNode {
3518 table_name: "alter_test".to_string(),
3519 operations: vec![
3520 PlanAlterOperation::RenameColumn {
3521 old_name: "name".to_string(),
3522 new_name: "full_name".to_string(),
3523 },
3524 ],
3525 }),
3526 estimated_cost: 1.0,
3527 estimated_rows: 0,
3528 };
3529 executor.execute(&rename_column_plan).unwrap();
3530
3531 let schema = executor.context.read().unwrap().get_table_schema("alter_test").unwrap().clone();
3533 assert_eq!(schema.columns[1].name, "full_name");
3534
3535 let drop_column_plan = QueryPlan {
3537 root: PlanNode::AlterTable(AlterTableNode {
3538 table_name: "alter_test".to_string(),
3539 operations: vec![
3540 PlanAlterOperation::DropColumn {
3541 name: "full_name".to_string(),
3542 if_exists: false,
3543 },
3544 ],
3545 }),
3546 estimated_cost: 1.0,
3547 estimated_rows: 0,
3548 };
3549 executor.execute(&drop_column_plan).unwrap();
3550
3551 let schema = executor.context.read().unwrap().get_table_schema("alter_test").unwrap().clone();
3553 assert_eq!(schema.columns.len(), 1);
3554 }
3555
3556 #[test]
3557 fn test_alter_table_constraints() {
3558 use crate::planner::{CreateTableNode, CreateColumnDef, AlterTableNode, PlanAlterOperation, CreateTableConstraint};
3559
3560 let executor = Executor::new(ExecutionContext::new());
3561
3562 let create_plan = QueryPlan {
3564 root: PlanNode::CreateTable(CreateTableNode {
3565 table_name: "constraint_test".to_string(),
3566 columns: vec![
3567 CreateColumnDef {
3568 name: "id".to_string(),
3569 data_type: DataType::Integer,
3570 nullable: false,
3571 default: None,
3572 primary_key: false,
3573 unique: false,
3574 },
3575 CreateColumnDef {
3576 name: "email".to_string(),
3577 data_type: DataType::Text,
3578 nullable: true,
3579 default: None,
3580 primary_key: false,
3581 unique: false,
3582 },
3583 ],
3584 constraints: vec![],
3585 if_not_exists: false,
3586 }),
3587 estimated_cost: 1.0,
3588 estimated_rows: 0,
3589 };
3590 executor.execute(&create_plan).unwrap();
3591
3592 let add_unique_plan = QueryPlan {
3594 root: PlanNode::AlterTable(AlterTableNode {
3595 table_name: "constraint_test".to_string(),
3596 operations: vec![
3597 PlanAlterOperation::AddConstraint(CreateTableConstraint::Unique {
3598 columns: vec!["email".to_string()],
3599 }),
3600 ],
3601 }),
3602 estimated_cost: 1.0,
3603 estimated_rows: 0,
3604 };
3605 executor.execute(&add_unique_plan).unwrap();
3606
3607 let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3609 assert_eq!(schema.constraints.len(), 1);
3610 assert!(schema.constraints[0].name.contains("uq"));
3611
3612 let add_pk_plan = QueryPlan {
3614 root: PlanNode::AlterTable(AlterTableNode {
3615 table_name: "constraint_test".to_string(),
3616 operations: vec![
3617 PlanAlterOperation::AddConstraint(CreateTableConstraint::PrimaryKey {
3618 columns: vec!["id".to_string()],
3619 }),
3620 ],
3621 }),
3622 estimated_cost: 1.0,
3623 estimated_rows: 0,
3624 };
3625 executor.execute(&add_pk_plan).unwrap();
3626
3627 let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3629 assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
3630 assert_eq!(schema.constraints.len(), 2);
3631
3632 let add_dup_pk_plan = QueryPlan {
3634 root: PlanNode::AlterTable(AlterTableNode {
3635 table_name: "constraint_test".to_string(),
3636 operations: vec![
3637 PlanAlterOperation::AddConstraint(CreateTableConstraint::PrimaryKey {
3638 columns: vec!["email".to_string()],
3639 }),
3640 ],
3641 }),
3642 estimated_cost: 1.0,
3643 estimated_rows: 0,
3644 };
3645 let result = executor.execute(&add_dup_pk_plan);
3646 assert!(result.is_err());
3647
3648 let pk_constraint_name = {
3650 let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3651 schema.constraints.iter()
3652 .find(|c| matches!(c.constraint_type, StoredConstraintType::Unique { .. }))
3653 .map(|c| c.name.clone())
3654 .unwrap()
3655 };
3656
3657 let drop_constraint_plan = QueryPlan {
3658 root: PlanNode::AlterTable(AlterTableNode {
3659 table_name: "constraint_test".to_string(),
3660 operations: vec![
3661 PlanAlterOperation::DropConstraint {
3662 name: pk_constraint_name,
3663 },
3664 ],
3665 }),
3666 estimated_cost: 1.0,
3667 estimated_rows: 0,
3668 };
3669 executor.execute(&drop_constraint_plan).unwrap();
3670
3671 let schema = executor.context.read().unwrap().get_table_schema("constraint_test").unwrap().clone();
3673 assert_eq!(schema.constraints.len(), 1);
3674 assert!(matches!(schema.constraints[0].constraint_type, StoredConstraintType::PrimaryKey { .. }));
3676 }
3677
3678 #[test]
3679 fn test_create_table_with_constraints() {
3680 use crate::planner::{CreateTableNode, CreateColumnDef, CreateTableConstraint};
3681
3682 let executor = Executor::new(ExecutionContext::new());
3683
3684 let create_plan = QueryPlan {
3686 root: PlanNode::CreateTable(CreateTableNode {
3687 table_name: "users".to_string(),
3688 columns: vec![
3689 CreateColumnDef {
3690 name: "id".to_string(),
3691 data_type: DataType::Integer,
3692 nullable: false,
3693 default: None,
3694 primary_key: true,
3695 unique: false,
3696 },
3697 CreateColumnDef {
3698 name: "email".to_string(),
3699 data_type: DataType::Text,
3700 nullable: false,
3701 default: None,
3702 primary_key: false,
3703 unique: true, },
3705 CreateColumnDef {
3706 name: "name".to_string(),
3707 data_type: DataType::Text,
3708 nullable: true,
3709 default: None,
3710 primary_key: false,
3711 unique: false,
3712 },
3713 ],
3714 constraints: vec![
3715 CreateTableConstraint::Unique {
3716 columns: vec!["name".to_string()],
3717 },
3718 ],
3719 if_not_exists: false,
3720 }),
3721 estimated_cost: 1.0,
3722 estimated_rows: 0,
3723 };
3724 executor.execute(&create_plan).unwrap();
3725
3726 let schema = executor.context.read().unwrap().get_table_schema("users").unwrap().clone();
3728 assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
3729 assert!(schema.constraints.len() >= 2);
3731 }
3732
3733 #[test]
3734 fn test_column_default_values() {
3735 use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource, PlanExpression, PlanLiteral};
3736
3737 let executor = Executor::new(ExecutionContext::new());
3738
3739 let create_plan = QueryPlan {
3741 root: PlanNode::CreateTable(CreateTableNode {
3742 table_name: "defaults_test".to_string(),
3743 columns: vec![
3744 CreateColumnDef {
3745 name: "id".to_string(),
3746 data_type: DataType::Integer,
3747 nullable: false,
3748 default: None,
3749 primary_key: true,
3750 unique: false,
3751 },
3752 CreateColumnDef {
3753 name: "status".to_string(),
3754 data_type: DataType::Text,
3755 nullable: false,
3756 default: Some(PlanExpression::Literal(PlanLiteral::String("active".to_string()))),
3757 primary_key: false,
3758 unique: false,
3759 },
3760 CreateColumnDef {
3761 name: "count".to_string(),
3762 data_type: DataType::Integer,
3763 nullable: true,
3764 default: Some(PlanExpression::Literal(PlanLiteral::Integer(0))),
3765 primary_key: false,
3766 unique: false,
3767 },
3768 ],
3769 constraints: vec![],
3770 if_not_exists: false,
3771 }),
3772 estimated_cost: 1.0,
3773 estimated_rows: 0,
3774 };
3775 executor.execute(&create_plan).unwrap();
3776
3777 let schema = executor.context.read().unwrap().get_table_schema("defaults_test").unwrap().clone();
3779 assert_eq!(schema.columns[1].default, Some(Value::String("active".to_string())));
3780 assert_eq!(schema.columns[2].default, Some(Value::Integer(0)));
3781
3782 let insert_plan = QueryPlan {
3784 root: PlanNode::Insert(InsertNode {
3785 table_name: "defaults_test".to_string(),
3786 columns: vec!["id".to_string()],
3787 source: InsertPlanSource::Values(vec![
3788 vec![PlanExpression::Literal(PlanLiteral::Integer(1))],
3789 ]),
3790 }),
3791 estimated_cost: 1.0,
3792 estimated_rows: 1,
3793 };
3794 executor.execute(&insert_plan).unwrap();
3795
3796 let context = executor.context.read().unwrap();
3798 let table = context.get_table("defaults_test").unwrap();
3799 let table_data = table.read().unwrap();
3800
3801 assert_eq!(table_data.rows.len(), 1);
3802 assert_eq!(table_data.rows[0].values[0], Value::Integer(1)); assert_eq!(table_data.rows[0].values[1], Value::String("active".to_string())); assert_eq!(table_data.rows[0].values[2], Value::Integer(0)); }
3806
3807 #[test]
3808 fn test_alter_column_default() {
3809 use crate::planner::{CreateTableNode, CreateColumnDef, AlterTableNode, PlanAlterOperation, PlanExpression, PlanLiteral};
3810
3811 let executor = Executor::new(ExecutionContext::new());
3812
3813 let create_plan = QueryPlan {
3815 root: PlanNode::CreateTable(CreateTableNode {
3816 table_name: "alter_default_test".to_string(),
3817 columns: vec![
3818 CreateColumnDef {
3819 name: "id".to_string(),
3820 data_type: DataType::Integer,
3821 nullable: false,
3822 default: None,
3823 primary_key: true,
3824 unique: false,
3825 },
3826 CreateColumnDef {
3827 name: "value".to_string(),
3828 data_type: DataType::Integer,
3829 nullable: true,
3830 default: None,
3831 primary_key: false,
3832 unique: false,
3833 },
3834 ],
3835 constraints: vec![],
3836 if_not_exists: false,
3837 }),
3838 estimated_cost: 1.0,
3839 estimated_rows: 0,
3840 };
3841 executor.execute(&create_plan).unwrap();
3842
3843 let alter_plan = QueryPlan {
3845 root: PlanNode::AlterTable(AlterTableNode {
3846 table_name: "alter_default_test".to_string(),
3847 operations: vec![
3848 PlanAlterOperation::AlterColumn {
3849 name: "value".to_string(),
3850 data_type: None,
3851 set_not_null: None,
3852 set_default: Some(Some(PlanExpression::Literal(PlanLiteral::Integer(42)))),
3853 },
3854 ],
3855 }),
3856 estimated_cost: 1.0,
3857 estimated_rows: 0,
3858 };
3859 executor.execute(&alter_plan).unwrap();
3860
3861 let schema = executor.context.read().unwrap().get_table_schema("alter_default_test").unwrap().clone();
3863 assert_eq!(schema.columns[1].default, Some(Value::Integer(42)));
3864
3865 let drop_default_plan = QueryPlan {
3867 root: PlanNode::AlterTable(AlterTableNode {
3868 table_name: "alter_default_test".to_string(),
3869 operations: vec![
3870 PlanAlterOperation::AlterColumn {
3871 name: "value".to_string(),
3872 data_type: None,
3873 set_not_null: None,
3874 set_default: Some(None), },
3876 ],
3877 }),
3878 estimated_cost: 1.0,
3879 estimated_rows: 0,
3880 };
3881 executor.execute(&drop_default_plan).unwrap();
3882
3883 let schema = executor.context.read().unwrap().get_table_schema("alter_default_test").unwrap().clone();
3885 assert_eq!(schema.columns[1].default, None);
3886 }
3887
3888 #[test]
3889 fn test_parameterized_query() {
3890 use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource, FilterNode};
3891
3892 let executor = Executor::new(ExecutionContext::new());
3893
3894 let create_plan = QueryPlan {
3896 root: PlanNode::CreateTable(CreateTableNode {
3897 table_name: "params_test".to_string(),
3898 columns: vec![
3899 CreateColumnDef {
3900 name: "id".to_string(),
3901 data_type: DataType::Integer,
3902 nullable: false,
3903 default: None,
3904 primary_key: true,
3905 unique: false,
3906 },
3907 CreateColumnDef {
3908 name: "name".to_string(),
3909 data_type: DataType::Text,
3910 nullable: true,
3911 default: None,
3912 primary_key: false,
3913 unique: false,
3914 },
3915 ],
3916 constraints: vec![],
3917 if_not_exists: false,
3918 }),
3919 estimated_cost: 1.0,
3920 estimated_rows: 0,
3921 };
3922 executor.execute(&create_plan).unwrap();
3923
3924 let insert_plan = QueryPlan {
3926 root: PlanNode::Insert(InsertNode {
3927 table_name: "params_test".to_string(),
3928 columns: vec!["id".to_string(), "name".to_string()],
3929 source: InsertPlanSource::Values(vec![
3930 vec![
3931 PlanExpression::Literal(PlanLiteral::Integer(1)),
3932 PlanExpression::Literal(PlanLiteral::String("Alice".to_string())),
3933 ],
3934 vec![
3935 PlanExpression::Literal(PlanLiteral::Integer(2)),
3936 PlanExpression::Literal(PlanLiteral::String("Bob".to_string())),
3937 ],
3938 vec![
3939 PlanExpression::Literal(PlanLiteral::Integer(3)),
3940 PlanExpression::Literal(PlanLiteral::String("Charlie".to_string())),
3941 ],
3942 ]),
3943 }),
3944 estimated_cost: 1.0,
3945 estimated_rows: 3,
3946 };
3947 executor.execute(&insert_plan).unwrap();
3948
3949 let query_plan = QueryPlan {
3951 root: PlanNode::Filter(FilterNode {
3952 input: Box::new(PlanNode::Scan(ScanNode {
3953 table_name: "params_test".to_string(),
3954 alias: None,
3955 columns: vec!["id".to_string(), "name".to_string()],
3956 index_scan: None,
3957 })),
3958 predicate: PlanExpression::BinaryOp {
3959 left: Box::new(PlanExpression::Column {
3960 table: None,
3961 name: "id".to_string(),
3962 data_type: DataType::Integer,
3963 }),
3964 op: PlanBinaryOp::Equal,
3965 right: Box::new(PlanExpression::Placeholder(1)),
3966 },
3967 }),
3968 estimated_cost: 10.0,
3969 estimated_rows: 1,
3970 };
3971
3972 let result = executor.execute_with_params(&query_plan, &[Value::Integer(2)]).unwrap();
3974 assert_eq!(result.rows.len(), 1);
3975 assert_eq!(result.rows[0].values[0], Value::Integer(2));
3976 assert_eq!(result.rows[0].values[1], Value::String("Bob".to_string()));
3977
3978 let result = executor.execute_with_params(&query_plan, &[Value::Integer(1)]).unwrap();
3980 assert_eq!(result.rows.len(), 1);
3981 assert_eq!(result.rows[0].values[1], Value::String("Alice".to_string()));
3982
3983 let result = executor.execute_with_params(&query_plan, &[Value::Integer(99)]).unwrap();
3985 assert_eq!(result.rows.len(), 0);
3986 }
3987
3988 #[test]
3989 fn test_parameterized_insert() {
3990 use crate::planner::{CreateTableNode, CreateColumnDef, InsertNode, InsertPlanSource};
3991
3992 let executor = Executor::new(ExecutionContext::new());
3993
3994 let create_plan = QueryPlan {
3996 root: PlanNode::CreateTable(CreateTableNode {
3997 table_name: "param_insert_test".to_string(),
3998 columns: vec![
3999 CreateColumnDef {
4000 name: "id".to_string(),
4001 data_type: DataType::Integer,
4002 nullable: false,
4003 default: None,
4004 primary_key: true,
4005 unique: false,
4006 },
4007 CreateColumnDef {
4008 name: "value".to_string(),
4009 data_type: DataType::Text,
4010 nullable: true,
4011 default: None,
4012 primary_key: false,
4013 unique: false,
4014 },
4015 ],
4016 constraints: vec![],
4017 if_not_exists: false,
4018 }),
4019 estimated_cost: 1.0,
4020 estimated_rows: 0,
4021 };
4022 executor.execute(&create_plan).unwrap();
4023
4024 let insert_plan = QueryPlan {
4026 root: PlanNode::Insert(InsertNode {
4027 table_name: "param_insert_test".to_string(),
4028 columns: vec!["id".to_string(), "value".to_string()],
4029 source: InsertPlanSource::Values(vec![
4030 vec![
4031 PlanExpression::Placeholder(1),
4032 PlanExpression::Placeholder(2),
4033 ],
4034 ]),
4035 }),
4036 estimated_cost: 1.0,
4037 estimated_rows: 1,
4038 };
4039
4040 executor.execute_with_params(&insert_plan, &[
4042 Value::Integer(42),
4043 Value::String("test value".to_string()),
4044 ]).unwrap();
4045
4046 let context = executor.context.read().unwrap();
4048 let table = context.get_table("param_insert_test").unwrap();
4049 let table_data = table.read().unwrap();
4050
4051 assert_eq!(table_data.rows.len(), 1);
4052 assert_eq!(table_data.rows[0].values[0], Value::Integer(42));
4053 assert_eq!(table_data.rows[0].values[1], Value::String("test value".to_string()));
4054 }
4055}