1use crate::ast::SetOperationType;
16use crate::index::{IndexError, IndexKey, IndexType, TableIndexManager};
17use crate::planner::{
18 AggregateExpr, AggregateFunction, AggregateNode, AlterTableNode, CreateIndexNode,
19 CreateTableConstraint, CreateTableNode, DeleteNode, DropIndexNode, DropTableNode, FilterNode,
20 InsertNode, InsertPlanSource, JoinNode, JoinStrategy, LimitNode, PlanAlterOperation,
21 PlanBinaryOp, PlanExpression, PlanJoinType, PlanLiteral, PlanNode, PlanUnaryOp, ProjectNode,
22 ProjectionExpr, QueryPlan, ScanNode, SetOperationNode, SortKey, SortNode, UpdateNode,
23};
24use aegis_common::{DataType, Row, Value};
25use std::collections::{HashMap, HashSet};
26use std::sync::{Arc, RwLock};
27use thiserror::Error;
28
29#[derive(Debug, Error)]
34pub enum ExecutorError {
35 #[error("Table not found: {0}")]
36 TableNotFound(String),
37
38 #[error("Column not found: {0}")]
39 ColumnNotFound(String),
40
41 #[error("Type mismatch: expected {expected}, got {actual}")]
42 TypeMismatch { expected: String, actual: String },
43
44 #[error("Division by zero")]
45 DivisionByZero,
46
47 #[error("Invalid operation: {0}")]
48 InvalidOperation(String),
49
50 #[error("Execution error: {0}")]
51 Internal(String),
52
53 #[error("Index error: {0}")]
54 IndexError(String),
55}
56
57impl From<IndexError> for ExecutorError {
58 fn from(e: IndexError) -> Self {
59 ExecutorError::IndexError(e.to_string())
60 }
61}
62
63pub type ExecutorResult<T> = Result<T, ExecutorError>;
64
65pub struct ExecutionContext {
71 tables: HashMap<String, Arc<RwLock<TableData>>>,
72 table_schemas: HashMap<String, TableSchema>,
73 indexes: HashMap<String, Vec<IndexSchema>>,
75 table_indexes: HashMap<String, Arc<TableIndexManager>>,
77 batch_size: usize,
78 version_clock: u64,
80 snapshot_version: Option<u64>,
82}
83
84#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
86pub struct TableData {
87 pub name: String,
88 pub columns: Vec<String>,
89 pub rows: Vec<Row>,
90 #[serde(default)]
93 pub row_created_version: Vec<u64>,
94 #[serde(default)]
96 pub row_deleted_version: Vec<u64>,
97}
98
99#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
101pub struct StoredConstraint {
102 pub name: String,
103 pub constraint_type: StoredConstraintType,
104}
105
106#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
108pub enum StoredConstraintType {
109 PrimaryKey {
110 columns: Vec<String>,
111 },
112 Unique {
113 columns: Vec<String>,
114 },
115 ForeignKey {
116 columns: Vec<String>,
117 ref_table: String,
118 ref_columns: Vec<String>,
119 },
120 Check {
121 expression_text: String,
122 },
123}
124
125#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
127pub struct TableSchema {
128 pub name: String,
129 pub columns: Vec<ColumnSchema>,
130 pub primary_key: Option<Vec<String>>,
131 pub constraints: Vec<StoredConstraint>,
132}
133
134#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
136pub struct ColumnSchema {
137 pub name: String,
138 pub data_type: DataType,
139 pub nullable: bool,
140 pub default: Option<Value>,
141}
142
143#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
145pub struct IndexSchema {
146 pub name: String,
147 pub table: String,
148 pub columns: Vec<String>,
149 pub unique: bool,
150}
151
152#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
154pub struct ExecutionContextSnapshot {
155 pub tables: Vec<TableData>,
156 pub schemas: Vec<TableSchema>,
157 pub indexes: HashMap<String, Vec<IndexSchema>>,
158}
159
160impl ExecutionContext {
161 pub fn new() -> Self {
162 Self {
163 tables: HashMap::new(),
164 table_schemas: HashMap::new(),
165 indexes: HashMap::new(),
166 table_indexes: HashMap::new(),
167 batch_size: 1024,
168 version_clock: 1,
169 snapshot_version: None,
170 }
171 }
172
173 pub fn begin_snapshot(&mut self) {
175 self.snapshot_version = Some(self.version_clock);
176 }
177
178 pub fn commit_snapshot(&mut self) {
180 self.version_clock += 1;
181 self.snapshot_version = None;
182 }
183
184 pub fn rollback_snapshot(&mut self) {
186 self.snapshot_version = None;
187 }
188
189 pub fn current_version(&self) -> u64 {
191 self.version_clock
192 }
193
194 fn is_row_visible(&self, table: &TableData, i: usize) -> bool {
196 let created = table.row_created_version.get(i).copied().unwrap_or(0);
197 let deleted = table.row_deleted_version.get(i).copied().unwrap_or(0);
198
199 if let Some(snap) = self.snapshot_version {
200 created <= snap && (deleted == 0 || deleted > snap)
202 } else {
203 deleted == 0
205 }
206 }
207
208 pub fn with_batch_size(mut self, size: usize) -> Self {
209 self.batch_size = size;
210 self
211 }
212
213 pub fn add_table(&mut self, table: TableData) {
214 self.tables
215 .insert(table.name.clone(), Arc::new(RwLock::new(table)));
216 }
217
218 pub fn get_table(&self, name: &str) -> Option<Arc<RwLock<TableData>>> {
219 self.tables.get(name).cloned()
220 }
221
222 pub fn batch_size(&self) -> usize {
223 self.batch_size
224 }
225
226 pub fn create_table(
232 &mut self,
233 name: String,
234 columns: Vec<ColumnSchema>,
235 primary_key: Option<Vec<String>>,
236 constraints: Vec<StoredConstraint>,
237 if_not_exists: bool,
238 ) -> ExecutorResult<()> {
239 if self.tables.contains_key(&name) {
240 if if_not_exists {
241 return Ok(());
242 }
243 return Err(ExecutorError::InvalidOperation(format!(
244 "Table '{}' already exists",
245 name
246 )));
247 }
248
249 let column_names: Vec<String> = columns.iter().map(|c| c.name.clone()).collect();
250
251 let schema = TableSchema {
253 name: name.clone(),
254 columns,
255 primary_key,
256 constraints,
257 };
258 self.table_schemas.insert(name.clone(), schema);
259
260 let table_data = TableData {
262 name: name.clone(),
263 columns: column_names,
264 rows: Vec::new(),
265 row_created_version: Vec::new(),
266 row_deleted_version: Vec::new(),
267 };
268 self.tables.insert(name, Arc::new(RwLock::new(table_data)));
269
270 Ok(())
271 }
272
273 pub fn drop_table(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
275 if !self.tables.contains_key(name) {
276 if if_exists {
277 return Ok(());
278 }
279 return Err(ExecutorError::TableNotFound(name.to_string()));
280 }
281
282 self.tables.remove(name);
283 self.table_schemas.remove(name);
284 self.indexes.remove(name);
285
286 Ok(())
287 }
288
289 pub fn alter_table(&mut self, name: &str, op: &PlanAlterOperation) -> ExecutorResult<()> {
291 let schema = self
292 .table_schemas
293 .get_mut(name)
294 .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
295
296 match op {
297 PlanAlterOperation::AddColumn(col) => {
298 if schema.columns.iter().any(|c| c.name == col.name) {
300 return Err(ExecutorError::InvalidOperation(format!(
301 "Column '{}' already exists in table '{}'",
302 col.name, name
303 )));
304 }
305
306 let default = col
308 .default
309 .as_ref()
310 .map(evaluate_default_expression)
311 .transpose()?;
312
313 schema.columns.push(ColumnSchema {
314 name: col.name.clone(),
315 data_type: col.data_type.clone(),
316 nullable: col.nullable,
317 default: default.clone(),
318 });
319
320 let fill_value = default.unwrap_or(Value::Null);
322 if let Some(table_data) = self.tables.get(name) {
323 let mut table = table_data
324 .write()
325 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
326 for row in table.rows.iter_mut() {
327 row.values.push(fill_value.clone());
328 }
329 }
330 }
331 PlanAlterOperation::DropColumn {
332 name: col_name,
333 if_exists,
334 } => {
335 let col_idx = schema.columns.iter().position(|c| c.name == *col_name);
336 match col_idx {
337 Some(idx) => {
338 schema.columns.remove(idx);
339 if let Some(table_data) = self.tables.get(name) {
341 let mut table = table_data.write().map_err(|_| {
342 ExecutorError::Internal("Lock poisoned".to_string())
343 })?;
344 for row in table.rows.iter_mut() {
345 if idx < row.values.len() {
346 row.values.remove(idx);
347 }
348 }
349 }
350 }
351 None if *if_exists => {}
352 None => {
353 return Err(ExecutorError::ColumnNotFound(col_name.clone()));
354 }
355 }
356 }
357 PlanAlterOperation::RenameColumn { old_name, new_name } => {
358 let col = schema
359 .columns
360 .iter_mut()
361 .find(|c| c.name == *old_name)
362 .ok_or_else(|| ExecutorError::ColumnNotFound(old_name.clone()))?;
363 col.name = new_name.clone();
364 }
365 PlanAlterOperation::AlterColumn {
366 name: col_name,
367 data_type,
368 set_not_null,
369 set_default,
370 } => {
371 let col = schema
372 .columns
373 .iter_mut()
374 .find(|c| c.name == *col_name)
375 .ok_or_else(|| ExecutorError::ColumnNotFound(col_name.clone()))?;
376
377 if let Some(dt) = data_type {
378 col.data_type = dt.clone();
379 }
380 if let Some(not_null) = set_not_null {
381 col.nullable = !not_null;
382 }
383 if let Some(new_default) = set_default {
385 col.default = new_default
386 .as_ref()
387 .map(evaluate_default_expression)
388 .transpose()?;
389 }
390 }
391 PlanAlterOperation::RenameTable { new_name } => {
392 if let Some(rows) = self.tables.remove(name) {
394 self.tables.insert(new_name.clone(), rows);
395 }
396 if let Some(mut schema) = self.table_schemas.remove(name) {
397 schema.name = new_name.clone();
398 self.table_schemas.insert(new_name.clone(), schema);
399 }
400 if let Some(indexes) = self.indexes.remove(name) {
401 self.indexes.insert(new_name.clone(), indexes);
402 }
403 return Ok(());
405 }
406 PlanAlterOperation::AddConstraint(constraint) => {
407 if let CreateTableConstraint::ForeignKey { ref_table, .. } = constraint {
409 if !self.table_schemas.contains_key(ref_table) {
410 return Err(ExecutorError::TableNotFound(ref_table.clone()));
411 }
412 }
413
414 let schema = self
416 .table_schemas
417 .get_mut(name)
418 .ok_or_else(|| ExecutorError::TableNotFound(name.to_string()))?;
419
420 let constraint_count = schema.constraints.len() + 1;
421 let (name_suffix, constraint_type) = match constraint {
422 CreateTableConstraint::PrimaryKey { columns } => {
423 if schema.primary_key.is_some() {
425 return Err(ExecutorError::InvalidOperation(format!(
426 "Table '{}' already has a primary key",
427 name
428 )));
429 }
430 schema.primary_key = Some(columns.clone());
431 (
432 "pk",
433 StoredConstraintType::PrimaryKey {
434 columns: columns.clone(),
435 },
436 )
437 }
438 CreateTableConstraint::Unique { columns } => {
439 for col in columns {
441 if !schema.columns.iter().any(|c| c.name == *col) {
442 return Err(ExecutorError::ColumnNotFound(col.clone()));
443 }
444 }
445 (
446 "uq",
447 StoredConstraintType::Unique {
448 columns: columns.clone(),
449 },
450 )
451 }
452 CreateTableConstraint::ForeignKey {
453 columns,
454 ref_table,
455 ref_columns,
456 } => {
457 for col in columns {
459 if !schema.columns.iter().any(|c| c.name == *col) {
460 return Err(ExecutorError::ColumnNotFound(col.clone()));
461 }
462 }
463 (
465 "fk",
466 StoredConstraintType::ForeignKey {
467 columns: columns.clone(),
468 ref_table: ref_table.clone(),
469 ref_columns: ref_columns.clone(),
470 },
471 )
472 }
473 CreateTableConstraint::Check { expression } => (
474 "ck",
475 StoredConstraintType::Check {
476 expression_text: format!("{:?}", expression),
477 },
478 ),
479 };
480 schema.constraints.push(StoredConstraint {
481 name: format!("{}_{}{}", name, name_suffix, constraint_count),
482 constraint_type,
483 });
484 return Ok(());
485 }
486 PlanAlterOperation::DropConstraint {
487 name: constraint_name,
488 } => {
489 let pos = schema
490 .constraints
491 .iter()
492 .position(|c| c.name == *constraint_name);
493 match pos {
494 Some(idx) => {
495 let removed = schema.constraints.remove(idx);
496 if matches!(
498 removed.constraint_type,
499 StoredConstraintType::PrimaryKey { .. }
500 ) {
501 schema.primary_key = None;
502 }
503 }
504 None => {
505 return Err(ExecutorError::InvalidOperation(format!(
506 "Constraint '{}' not found on table '{}'",
507 constraint_name, name
508 )));
509 }
510 }
511 }
512 }
513
514 Ok(())
515 }
516
517 pub fn create_index(
519 &mut self,
520 name: String,
521 table: String,
522 columns: Vec<String>,
523 unique: bool,
524 if_not_exists: bool,
525 ) -> ExecutorResult<()> {
526 let table_data = self
527 .tables
528 .get(&table)
529 .ok_or_else(|| ExecutorError::TableNotFound(table.clone()))?
530 .clone();
531
532 let indexes = self.indexes.entry(table.clone()).or_default();
533
534 if indexes.iter().any(|idx| idx.name == name) {
536 if if_not_exists {
537 return Ok(());
538 }
539 return Err(ExecutorError::InvalidOperation(format!(
540 "Index '{}' already exists",
541 name
542 )));
543 }
544
545 indexes.push(IndexSchema {
547 name: name.clone(),
548 table: table.clone(),
549 columns: columns.clone(),
550 unique,
551 });
552
553 let index_manager = self
555 .table_indexes
556 .entry(table.clone())
557 .or_insert_with(|| Arc::new(TableIndexManager::new(table.clone())));
558
559 Arc::get_mut(index_manager)
561 .ok_or_else(|| {
562 ExecutorError::Internal("Cannot modify shared index manager".to_string())
563 })?
564 .create_index(name.clone(), columns.clone(), unique, IndexType::BTree)?;
565
566 let table_guard = table_data
568 .read()
569 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
570
571 let index_manager = self
572 .table_indexes
573 .get(&table)
574 .expect("index_manager was just inserted for this table");
575 for (row_id, row) in table_guard.rows.iter().enumerate() {
576 let column_values: HashMap<String, Value> = table_guard
577 .columns
578 .iter()
579 .zip(row.values.iter())
580 .map(|(col, val)| (col.clone(), val.clone()))
581 .collect();
582
583 let key_values: Vec<crate::index::IndexValue> = columns
585 .iter()
586 .map(|col| {
587 column_values
588 .get(col)
589 .map(IndexKey::from_value)
590 .unwrap_or(crate::index::IndexValue::Null)
591 })
592 .collect();
593 let key = IndexKey::new(key_values);
594
595 index_manager.insert_into_index(&name, key, row_id)?;
597 }
598
599 Ok(())
600 }
601
602 pub fn drop_index(&mut self, name: &str, if_exists: bool) -> ExecutorResult<()> {
604 let mut found = false;
605 let mut table_name = None;
606
607 for (tbl, indexes) in self.indexes.iter_mut() {
609 if let Some(pos) = indexes.iter().position(|idx| idx.name == name) {
610 indexes.remove(pos);
611 table_name = Some(tbl.clone());
612 found = true;
613 break;
614 }
615 }
616
617 if let Some(tbl) = table_name {
619 if let Some(manager) = self.table_indexes.get_mut(&tbl) {
620 if let Some(m) = Arc::get_mut(manager) {
621 let _ = m.drop_index(name);
622 }
623 }
624 }
625
626 if !found && !if_exists {
627 return Err(ExecutorError::InvalidOperation(format!(
628 "Index '{}' not found",
629 name
630 )));
631 }
632
633 Ok(())
634 }
635
636 pub fn get_index_manager(&self, table: &str) -> Option<Arc<TableIndexManager>> {
638 self.table_indexes.get(table).cloned()
639 }
640
641 pub fn get_indexes(&self, table: &str) -> Option<&Vec<IndexSchema>> {
643 self.indexes.get(table)
644 }
645
646 pub fn get_table_schema(&self, name: &str) -> Option<&TableSchema> {
648 self.table_schemas.get(name)
649 }
650
651 pub fn list_tables(&self) -> Vec<String> {
653 self.tables.keys().cloned().collect()
654 }
655
656 pub fn to_snapshot(&self) -> ExecutionContextSnapshot {
662 let tables: Vec<TableData> = self
663 .tables
664 .values()
665 .filter_map(|t| t.read().ok().map(|t| t.clone()))
666 .collect();
667
668 ExecutionContextSnapshot {
669 tables,
670 schemas: self.table_schemas.values().cloned().collect(),
671 indexes: self.indexes.clone(),
672 }
673 }
674
675 pub fn from_snapshot(snapshot: ExecutionContextSnapshot) -> Self {
677 let mut ctx = Self::new();
678
679 for schema in snapshot.schemas {
681 ctx.table_schemas.insert(schema.name.clone(), schema);
682 }
683
684 for table in snapshot.tables {
686 ctx.tables
687 .insert(table.name.clone(), Arc::new(RwLock::new(table)));
688 }
689
690 ctx.indexes = snapshot.indexes.clone();
692
693 for (table_name, index_schemas) in snapshot.indexes {
695 for index_schema in index_schemas {
696 let _ = ctx.rebuild_index(
698 &index_schema.name,
699 &table_name,
700 &index_schema.columns,
701 index_schema.unique,
702 );
703 }
704 }
705
706 ctx
707 }
708
709 pub fn restore_from_snapshot(&mut self, snapshot: ExecutionContextSnapshot) {
712 self.tables.clear();
713 self.table_schemas.clear();
714 self.indexes.clear();
715 self.table_indexes.clear();
716 self.snapshot_version = None;
718
719 for schema in snapshot.schemas {
720 self.table_schemas.insert(schema.name.clone(), schema);
721 }
722 for table in snapshot.tables {
723 self.tables
724 .insert(table.name.clone(), Arc::new(RwLock::new(table)));
725 }
726 self.indexes = snapshot.indexes.clone();
727 for (table_name, index_schemas) in snapshot.indexes {
728 for index_schema in index_schemas {
729 let _ = self.rebuild_index(
730 &index_schema.name,
731 &table_name,
732 &index_schema.columns,
733 index_schema.unique,
734 );
735 }
736 }
737 }
738
739 fn rebuild_index(
741 &mut self,
742 name: &str,
743 table: &str,
744 columns: &[String],
745 unique: bool,
746 ) -> ExecutorResult<()> {
747 let table_data = self
748 .tables
749 .get(table)
750 .ok_or_else(|| ExecutorError::TableNotFound(table.to_string()))?
751 .clone();
752
753 let index_manager = self
755 .table_indexes
756 .entry(table.to_string())
757 .or_insert_with(|| Arc::new(TableIndexManager::new(table.to_string())));
758
759 if let Some(m) = Arc::get_mut(index_manager) {
761 m.create_index(name.to_string(), columns.to_vec(), unique, IndexType::BTree)?;
762 } else {
763 return Err(ExecutorError::Internal(
764 "Cannot modify shared index manager".to_string(),
765 ));
766 }
767
768 let table_guard = table_data
770 .read()
771 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
772
773 let index_manager = self
774 .table_indexes
775 .get(table)
776 .expect("index_manager was just inserted for this table");
777 for (row_id, row) in table_guard.rows.iter().enumerate() {
778 let column_values: HashMap<String, Value> = table_guard
779 .columns
780 .iter()
781 .zip(row.values.iter())
782 .map(|(col, val)| (col.clone(), val.clone()))
783 .collect();
784
785 let key_values: Vec<crate::index::IndexValue> = columns
786 .iter()
787 .map(|col| {
788 column_values
789 .get(col)
790 .map(IndexKey::from_value)
791 .unwrap_or(crate::index::IndexValue::Null)
792 })
793 .collect();
794 let key = IndexKey::new(key_values);
795
796 index_manager.insert_into_index(name, key, row_id)?;
797 }
798
799 Ok(())
800 }
801
802 pub fn save_to_file(&self, path: &std::path::Path) -> std::io::Result<()> {
804 let snapshot = self.to_snapshot();
805 let json = serde_json::to_string_pretty(&snapshot)
806 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
807 std::fs::write(path, json)
808 }
809
810 pub fn load_from_file(path: &std::path::Path) -> std::io::Result<Self> {
812 let json = std::fs::read_to_string(path)?;
813 let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
814 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
815 Ok(Self::from_snapshot(snapshot))
816 }
817
818 pub fn merge_from_file(&mut self, path: &std::path::Path) -> std::io::Result<()> {
820 let json = std::fs::read_to_string(path)?;
821 let snapshot: ExecutionContextSnapshot = serde_json::from_str(&json)
822 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
823
824 for schema in snapshot.schemas {
826 self.table_schemas.insert(schema.name.clone(), schema);
827 }
828
829 for table in snapshot.tables {
831 self.tables
832 .insert(table.name.clone(), Arc::new(RwLock::new(table)));
833 }
834
835 for (table, idxs) in snapshot.indexes {
837 self.indexes.insert(table, idxs);
838 }
839
840 Ok(())
841 }
842
843 pub fn insert_rows(
849 &self,
850 table_name: &str,
851 columns: &[String],
852 rows: Vec<Vec<Value>>,
853 ) -> ExecutorResult<u64> {
854 let table = self
855 .get_table(table_name)
856 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
857
858 let mut table_data = table
859 .write()
860 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
861
862 let schema = self
864 .table_schemas
865 .get(table_name)
866 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
867
868 let target_columns: Vec<String> = if columns.is_empty() {
870 table_data.columns.clone()
871 } else {
872 columns.to_vec()
873 };
874
875 let mut column_indices: Vec<usize> = Vec::new();
877 for col in &target_columns {
878 let idx = table_data
879 .columns
880 .iter()
881 .position(|c| c == col)
882 .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
883 column_indices.push(idx);
884 }
885
886 let mut inserted = 0u64;
887
888 let defaults: Vec<Value> = schema
890 .columns
891 .iter()
892 .map(|col| col.default.clone().unwrap_or(Value::Null))
893 .collect();
894
895 let pk_columns: Vec<usize> = schema
897 .primary_key
898 .as_ref()
899 .map(|pk| {
900 pk.iter()
901 .filter_map(|c| table_data.columns.iter().position(|tc| tc == c))
902 .collect()
903 })
904 .unwrap_or_default();
905 let unique_columns: Vec<Vec<usize>> = schema
906 .constraints
907 .iter()
908 .filter_map(|c| match &c.constraint_type {
909 StoredConstraintType::Unique { columns: cols } => Some(
910 cols.iter()
911 .filter_map(|c| table_data.columns.iter().position(|tc| tc == c))
912 .collect(),
913 ),
914 _ => None,
915 })
916 .collect();
917 let not_null_columns: Vec<usize> = schema
918 .columns
919 .iter()
920 .enumerate()
921 .filter(|(_, c)| !c.nullable)
922 .map(|(i, _)| i)
923 .collect();
924
925 for row_values in rows {
926 let mut new_row: Vec<Value> = defaults.clone();
928
929 for (i, &col_idx) in column_indices.iter().enumerate() {
931 if let Some(value) = row_values.get(i) {
932 new_row[col_idx] = value.clone();
933 }
934 }
935
936 for &col_idx in ¬_null_columns {
938 if matches!(new_row.get(col_idx), Some(Value::Null) | None) {
939 let col_name = table_data.columns.get(col_idx).cloned().unwrap_or_default();
940 return Err(ExecutorError::InvalidOperation(format!(
941 "NOT NULL constraint violated for column '{}'",
942 col_name
943 )));
944 }
945 }
946
947 if !pk_columns.is_empty() {
949 let pk_vals: Vec<&Value> = pk_columns.iter().map(|&i| &new_row[i]).collect();
950 for (ri, existing) in table_data.rows.iter().enumerate() {
951 if !self.is_row_visible(&table_data, ri) {
952 continue;
953 }
954 let existing_pk: Vec<&Value> =
955 pk_columns.iter().map(|&i| &existing.values[i]).collect();
956 if pk_vals == existing_pk {
957 return Err(ExecutorError::InvalidOperation(format!(
958 "PRIMARY KEY constraint violated: duplicate key in table '{}'",
959 table_name
960 )));
961 }
962 }
963 }
964
965 for unique_cols in &unique_columns {
967 let vals: Vec<&Value> = unique_cols.iter().map(|&i| &new_row[i]).collect();
968 if vals.iter().any(|v| matches!(v, Value::Null)) {
970 continue;
971 }
972 for (ri, existing) in table_data.rows.iter().enumerate() {
973 if !self.is_row_visible(&table_data, ri) {
974 continue;
975 }
976 let existing_vals: Vec<&Value> =
977 unique_cols.iter().map(|&i| &existing.values[i]).collect();
978 if vals == existing_vals {
979 return Err(ExecutorError::InvalidOperation(
980 "UNIQUE constraint violated".to_string(),
981 ));
982 }
983 }
984 }
985
986 table_data.rows.push(Row { values: new_row });
987 table_data.row_created_version.push(self.version_clock);
988 table_data.row_deleted_version.push(0);
989 inserted += 1;
990 }
991
992 Ok(inserted)
993 }
994
995 pub fn update_rows(
997 &self,
998 table_name: &str,
999 assignments: &[(String, Value)],
1000 predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
1001 ) -> ExecutorResult<u64> {
1002 let table = self
1003 .get_table(table_name)
1004 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
1005
1006 let mut table_data = table
1007 .write()
1008 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1009 let columns = table_data.columns.clone();
1010
1011 let mut assignment_indices: Vec<(usize, Value)> = Vec::new();
1013 for (col, val) in assignments {
1014 let idx = columns
1015 .iter()
1016 .position(|c| c == col)
1017 .ok_or_else(|| ExecutorError::ColumnNotFound(col.clone()))?;
1018 assignment_indices.push((idx, val.clone()));
1019 }
1020
1021 let mut updated = 0u64;
1022
1023 for i in 0..table_data.rows.len() {
1024 if !self.is_row_visible(&table_data, i) {
1025 continue;
1026 }
1027 let should_update = predicate
1028 .map(|p| p(&table_data.rows[i], &columns))
1029 .unwrap_or(true);
1030 if should_update {
1031 for (col_idx, value) in &assignment_indices {
1032 table_data.rows[i].values[*col_idx] = value.clone();
1033 }
1034 updated += 1;
1035 }
1036 }
1037
1038 Ok(updated)
1039 }
1040
1041 pub fn delete_rows(
1043 &self,
1044 table_name: &str,
1045 predicate: Option<&dyn Fn(&Row, &[String]) -> bool>,
1046 ) -> ExecutorResult<u64> {
1047 let table = self
1048 .get_table(table_name)
1049 .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
1050
1051 let mut table_data = table
1052 .write()
1053 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1054 let columns = table_data.columns.clone();
1055 let ver = self.version_clock;
1056
1057 let mut deleted = 0u64;
1058
1059 if let Some(pred) = predicate {
1060 for i in 0..table_data.rows.len() {
1061 if !self.is_row_visible(&table_data, i) {
1063 continue;
1064 }
1065 if pred(&table_data.rows[i], &columns) {
1066 if i < table_data.row_deleted_version.len() {
1068 table_data.row_deleted_version[i] = ver;
1069 }
1070 deleted += 1;
1071 }
1072 }
1073 } else {
1074 for i in 0..table_data.rows.len() {
1075 if self.is_row_visible(&table_data, i) {
1076 if i < table_data.row_deleted_version.len() {
1077 table_data.row_deleted_version[i] = ver;
1078 }
1079 deleted += 1;
1080 }
1081 }
1082 }
1083
1084 if self.snapshot_version.is_none() {
1086 let mut i = 0;
1087 while i < table_data.rows.len() {
1088 let del_ver = table_data.row_deleted_version.get(i).copied().unwrap_or(0);
1089 if del_ver > 0 {
1090 table_data.rows.remove(i);
1091 if i < table_data.row_created_version.len() {
1092 table_data.row_created_version.remove(i);
1093 }
1094 if i < table_data.row_deleted_version.len() {
1095 table_data.row_deleted_version.remove(i);
1096 }
1097 } else {
1098 i += 1;
1099 }
1100 }
1101 }
1102
1103 Ok(deleted)
1104 }
1105}
1106
1107impl Default for ExecutionContext {
1108 fn default() -> Self {
1109 Self::new()
1110 }
1111}
1112
1113#[derive(Debug, Clone)]
1119pub struct ResultBatch {
1120 pub columns: Vec<String>,
1121 pub rows: Vec<Row>,
1122}
1123
1124impl ResultBatch {
1125 pub fn new(columns: Vec<String>) -> Self {
1126 Self {
1127 columns,
1128 rows: Vec::new(),
1129 }
1130 }
1131
1132 pub fn with_rows(columns: Vec<String>, rows: Vec<Row>) -> Self {
1133 Self { columns, rows }
1134 }
1135
1136 pub fn is_empty(&self) -> bool {
1137 self.rows.is_empty()
1138 }
1139
1140 pub fn len(&self) -> usize {
1141 self.rows.len()
1142 }
1143}
1144
1145#[derive(Debug, Clone)]
1151pub struct QueryResult {
1152 pub columns: Vec<String>,
1153 pub rows: Vec<Row>,
1154 pub rows_affected: u64,
1155}
1156
1157impl QueryResult {
1158 pub fn new(columns: Vec<String>, rows: Vec<Row>) -> Self {
1159 let rows_affected = rows.len() as u64;
1160 Self {
1161 columns,
1162 rows,
1163 rows_affected,
1164 }
1165 }
1166
1167 pub fn empty() -> Self {
1168 Self {
1169 columns: Vec::new(),
1170 rows: Vec::new(),
1171 rows_affected: 0,
1172 }
1173 }
1174}
1175
1176pub struct Executor {
1182 context: Arc<RwLock<ExecutionContext>>,
1183}
1184
1185impl Executor {
1186 pub fn new(context: ExecutionContext) -> Self {
1187 Self {
1188 context: Arc::new(RwLock::new(context)),
1189 }
1190 }
1191
1192 pub fn with_shared_context(context: Arc<RwLock<ExecutionContext>>) -> Self {
1194 Self { context }
1195 }
1196
1197 pub fn execute(&self, plan: &QueryPlan) -> ExecutorResult<QueryResult> {
1199 self.execute_internal(&plan.root)
1200 }
1201
1202 pub fn execute_with_params(
1205 &self,
1206 plan: &QueryPlan,
1207 params: &[Value],
1208 ) -> ExecutorResult<QueryResult> {
1209 let bound_plan = substitute_parameters(&plan.root, params)?;
1211 self.execute_internal(&bound_plan)
1212 }
1213
1214 fn execute_internal(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
1215 match root {
1216 PlanNode::CreateTable(node) => self.execute_create_table(node),
1218 PlanNode::DropTable(node) => self.execute_drop_table(node),
1219 PlanNode::AlterTable(node) => self.execute_alter_table(node),
1220 PlanNode::CreateIndex(node) => self.execute_create_index(node),
1221 PlanNode::DropIndex(node) => self.execute_drop_index(node),
1222
1223 PlanNode::Insert(node) => self.execute_insert(node),
1225 PlanNode::Update(node) => self.execute_update(node),
1226 PlanNode::Delete(node) => self.execute_delete(node),
1227
1228 PlanNode::BeginTransaction => Ok(QueryResult {
1230 columns: vec!["status".to_string()],
1231 rows: vec![Row {
1232 values: vec![Value::String("BEGIN".to_string())],
1233 }],
1234 rows_affected: 0,
1235 }),
1236 PlanNode::CommitTransaction => Ok(QueryResult {
1237 columns: vec!["status".to_string()],
1238 rows: vec![Row {
1239 values: vec![Value::String("COMMIT".to_string())],
1240 }],
1241 rows_affected: 0,
1242 }),
1243 PlanNode::RollbackTransaction => Ok(QueryResult {
1244 columns: vec!["status".to_string()],
1245 rows: vec![Row {
1246 values: vec![Value::String("ROLLBACK".to_string())],
1247 }],
1248 rows_affected: 0,
1249 }),
1250
1251 _ => self.execute_query(root),
1253 }
1254 }
1255
1256 const MAX_RESULT_ROWS: usize = 100_000;
1259
1260 fn execute_query(&self, root: &PlanNode) -> ExecutorResult<QueryResult> {
1261 let context = self
1262 .context
1263 .read()
1264 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1265 let mut operator = self.create_operator(root, &context)?;
1266 let mut all_rows = Vec::new();
1267 let mut columns = Vec::new();
1268
1269 while let Some(batch) = operator.next_batch()? {
1270 if columns.is_empty() {
1271 columns = batch.columns.clone();
1272 }
1273 all_rows.extend(batch.rows);
1274 if all_rows.len() > Self::MAX_RESULT_ROWS {
1275 all_rows.truncate(Self::MAX_RESULT_ROWS);
1276 break;
1277 }
1278 }
1279
1280 Ok(QueryResult::new(columns, all_rows))
1281 }
1282
1283 fn execute_create_table(&self, node: &CreateTableNode) -> ExecutorResult<QueryResult> {
1285 let columns: Vec<ColumnSchema> = node
1286 .columns
1287 .iter()
1288 .map(|col| {
1289 let default = col
1290 .default
1291 .as_ref()
1292 .map(evaluate_default_expression)
1293 .transpose()?;
1294 Ok(ColumnSchema {
1295 name: col.name.clone(),
1296 data_type: col.data_type.clone(),
1297 nullable: col.nullable,
1298 default,
1299 })
1300 })
1301 .collect::<ExecutorResult<Vec<_>>>()?;
1302
1303 let primary_key = node
1305 .constraints
1306 .iter()
1307 .find_map(|c| {
1308 if let CreateTableConstraint::PrimaryKey { columns } = c {
1309 Some(columns.clone())
1310 } else {
1311 None
1312 }
1313 })
1314 .or_else(|| {
1315 let pk_cols: Vec<String> = node
1317 .columns
1318 .iter()
1319 .filter(|c| c.primary_key)
1320 .map(|c| c.name.clone())
1321 .collect();
1322 if pk_cols.is_empty() {
1323 None
1324 } else {
1325 Some(pk_cols)
1326 }
1327 });
1328
1329 let mut stored_constraints = Vec::new();
1331 let mut constraint_counter = 0;
1332 for c in &node.constraints {
1333 constraint_counter += 1;
1334 let (name_suffix, constraint_type) = match c {
1335 CreateTableConstraint::PrimaryKey { columns } => (
1336 "pk",
1337 StoredConstraintType::PrimaryKey {
1338 columns: columns.clone(),
1339 },
1340 ),
1341 CreateTableConstraint::Unique { columns } => (
1342 "uq",
1343 StoredConstraintType::Unique {
1344 columns: columns.clone(),
1345 },
1346 ),
1347 CreateTableConstraint::ForeignKey {
1348 columns,
1349 ref_table,
1350 ref_columns,
1351 } => (
1352 "fk",
1353 StoredConstraintType::ForeignKey {
1354 columns: columns.clone(),
1355 ref_table: ref_table.clone(),
1356 ref_columns: ref_columns.clone(),
1357 },
1358 ),
1359 CreateTableConstraint::Check { expression } => (
1360 "ck",
1361 StoredConstraintType::Check {
1362 expression_text: format!("{:?}", expression),
1363 },
1364 ),
1365 };
1366 stored_constraints.push(StoredConstraint {
1367 name: format!("{}_{}{}", node.table_name, name_suffix, constraint_counter),
1368 constraint_type,
1369 });
1370 }
1371
1372 for col in &node.columns {
1374 if col.unique {
1375 constraint_counter += 1;
1376 stored_constraints.push(StoredConstraint {
1377 name: format!("{}_{}_uq{}", node.table_name, col.name, constraint_counter),
1378 constraint_type: StoredConstraintType::Unique {
1379 columns: vec![col.name.clone()],
1380 },
1381 });
1382 }
1383 }
1384
1385 self.context
1386 .write()
1387 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1388 .create_table(
1389 node.table_name.clone(),
1390 columns,
1391 primary_key,
1392 stored_constraints,
1393 node.if_not_exists,
1394 )?;
1395
1396 Ok(QueryResult {
1397 columns: vec!["result".to_string()],
1398 rows: vec![Row {
1399 values: vec![Value::String(format!(
1400 "Table '{}' created",
1401 node.table_name
1402 ))],
1403 }],
1404 rows_affected: 0,
1405 })
1406 }
1407
1408 fn execute_drop_table(&self, node: &DropTableNode) -> ExecutorResult<QueryResult> {
1410 self.context
1411 .write()
1412 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1413 .drop_table(&node.table_name, node.if_exists)?;
1414
1415 Ok(QueryResult {
1416 columns: vec!["result".to_string()],
1417 rows: vec![Row {
1418 values: vec![Value::String(format!(
1419 "Table '{}' dropped",
1420 node.table_name
1421 ))],
1422 }],
1423 rows_affected: 0,
1424 })
1425 }
1426
1427 fn execute_alter_table(&self, node: &AlterTableNode) -> ExecutorResult<QueryResult> {
1429 let mut ctx = self
1430 .context
1431 .write()
1432 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1433
1434 for op in &node.operations {
1435 ctx.alter_table(&node.table_name, op)?;
1436 }
1437
1438 let op_count = node.operations.len();
1439 let msg = if op_count == 1 {
1440 format!("Table '{}' altered", node.table_name)
1441 } else {
1442 format!(
1443 "Table '{}' altered ({} operations)",
1444 node.table_name, op_count
1445 )
1446 };
1447
1448 Ok(QueryResult {
1449 columns: vec!["result".to_string()],
1450 rows: vec![Row {
1451 values: vec![Value::String(msg)],
1452 }],
1453 rows_affected: 0,
1454 })
1455 }
1456
1457 fn execute_create_index(&self, node: &CreateIndexNode) -> ExecutorResult<QueryResult> {
1459 self.context
1460 .write()
1461 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1462 .create_index(
1463 node.index_name.clone(),
1464 node.table_name.clone(),
1465 node.columns.clone(),
1466 node.unique,
1467 node.if_not_exists,
1468 )?;
1469
1470 Ok(QueryResult {
1471 columns: vec!["result".to_string()],
1472 rows: vec![Row {
1473 values: vec![Value::String(format!(
1474 "Index '{}' created",
1475 node.index_name
1476 ))],
1477 }],
1478 rows_affected: 0,
1479 })
1480 }
1481
1482 fn execute_drop_index(&self, node: &DropIndexNode) -> ExecutorResult<QueryResult> {
1484 self.context
1485 .write()
1486 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1487 .drop_index(&node.index_name, node.if_exists)?;
1488
1489 Ok(QueryResult {
1490 columns: vec!["result".to_string()],
1491 rows: vec![Row {
1492 values: vec![Value::String(format!(
1493 "Index '{}' dropped",
1494 node.index_name
1495 ))],
1496 }],
1497 rows_affected: 0,
1498 })
1499 }
1500
1501 fn execute_insert(&self, node: &InsertNode) -> ExecutorResult<QueryResult> {
1503 let rows: Vec<Vec<Value>> = match &node.source {
1504 InsertPlanSource::Values(values) => {
1505 let empty_row = Row { values: vec![] };
1507 let empty_columns: Vec<String> = vec![];
1508
1509 values
1510 .iter()
1511 .map(|row_exprs| {
1512 row_exprs
1513 .iter()
1514 .map(|expr| evaluate_expression(expr, &empty_row, &empty_columns))
1515 .collect::<ExecutorResult<Vec<_>>>()
1516 })
1517 .collect::<ExecutorResult<Vec<_>>>()?
1518 }
1519 InsertPlanSource::Query(subquery) => {
1520 let context = self
1522 .context
1523 .read()
1524 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1525 let mut operator = self.create_operator(subquery, &context)?;
1526 let mut all_rows = Vec::new();
1527
1528 while let Some(batch) = operator.next_batch()? {
1529 for row in batch.rows {
1530 all_rows.push(row.values);
1531 }
1532 }
1533
1534 all_rows
1535 }
1536 };
1537
1538 let inserted = self
1539 .context
1540 .read()
1541 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?
1542 .insert_rows(&node.table_name, &node.columns, rows)?;
1543
1544 Ok(QueryResult {
1545 columns: vec!["rows_affected".to_string()],
1546 rows: vec![Row {
1547 values: vec![Value::Integer(inserted as i64)],
1548 }],
1549 rows_affected: inserted,
1550 })
1551 }
1552
1553 fn execute_update(&self, node: &UpdateNode) -> ExecutorResult<QueryResult> {
1555 let context = self
1556 .context
1557 .read()
1558 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1559
1560 let table = context
1562 .get_table(&node.table_name)
1563 .ok_or_else(|| ExecutorError::TableNotFound(node.table_name.clone()))?;
1564 let table_data = table
1565 .read()
1566 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1567 let _columns = table_data.columns.clone();
1568 drop(table_data);
1569
1570 let empty_row = Row { values: vec![] };
1572 let assignments: Vec<(String, Value)> = node
1573 .assignments
1574 .iter()
1575 .map(|(col, expr)| {
1576 let value = evaluate_expression(expr, &empty_row, &[])?;
1577 Ok((col.clone(), value))
1578 })
1579 .collect::<ExecutorResult<Vec<_>>>()?;
1580
1581 let where_clause = node.where_clause.clone();
1583 let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1584 Box::new(move |row: &Row, cols: &[String]| {
1585 evaluate_expression(&wc, row, cols)
1586 .map(|v| matches!(v, Value::Boolean(true)))
1587 .unwrap_or(false)
1588 }) as Box<dyn Fn(&Row, &[String]) -> bool>
1589 });
1590
1591 let updated = context.update_rows(
1592 &node.table_name,
1593 &assignments,
1594 predicate.as_ref().map(|p| p.as_ref()),
1595 )?;
1596
1597 Ok(QueryResult {
1598 columns: vec!["rows_affected".to_string()],
1599 rows: vec![Row {
1600 values: vec![Value::Integer(updated as i64)],
1601 }],
1602 rows_affected: updated,
1603 })
1604 }
1605
1606 fn execute_delete(&self, node: &DeleteNode) -> ExecutorResult<QueryResult> {
1608 let context = self
1609 .context
1610 .read()
1611 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1612
1613 let where_clause = node.where_clause.clone();
1615 let predicate: Option<Box<dyn Fn(&Row, &[String]) -> bool>> = where_clause.map(|wc| {
1616 Box::new(move |row: &Row, cols: &[String]| {
1617 evaluate_expression(&wc, row, cols)
1618 .map(|v| matches!(v, Value::Boolean(true)))
1619 .unwrap_or(false)
1620 }) as Box<dyn Fn(&Row, &[String]) -> bool>
1621 });
1622
1623 let deleted =
1624 context.delete_rows(&node.table_name, predicate.as_ref().map(|p| p.as_ref()))?;
1625
1626 Ok(QueryResult {
1627 columns: vec!["rows_affected".to_string()],
1628 rows: vec![Row {
1629 values: vec![Value::Integer(deleted as i64)],
1630 }],
1631 rows_affected: deleted,
1632 })
1633 }
1634
1635 fn create_operator<'a>(
1637 &'a self,
1638 node: &PlanNode,
1639 context: &'a ExecutionContext,
1640 ) -> ExecutorResult<Box<dyn Operator + 'a>> {
1641 match node {
1642 PlanNode::Scan(scan) => Ok(Box::new(ScanOperator::new(scan.clone(), context)?)),
1643
1644 PlanNode::Filter(filter) => {
1645 let input = self.create_operator(&filter.input, context)?;
1646 Ok(Box::new(FilterOperator::new(
1647 input,
1648 filter.predicate.clone(),
1649 )))
1650 }
1651
1652 PlanNode::Project(project) => {
1653 let input = self.create_operator(&project.input, context)?;
1654 Ok(Box::new(ProjectOperator::new(
1655 input,
1656 project.expressions.clone(),
1657 )))
1658 }
1659
1660 PlanNode::Join(join) => {
1661 let left = self.create_operator(&join.left, context)?;
1662 let right = self.create_operator(&join.right, context)?;
1663 match join.strategy {
1664 JoinStrategy::HashJoin => Ok(Box::new(HashJoinOperator::new(
1665 left,
1666 right,
1667 join.join_type,
1668 join.condition.clone(),
1669 )?)),
1670 _ => Ok(Box::new(JoinOperator::new(
1671 left,
1672 right,
1673 join.join_type,
1674 join.condition.clone(),
1675 join.strategy,
1676 )?)),
1677 }
1678 }
1679
1680 PlanNode::Aggregate(agg) => {
1681 let input = self.create_operator(&agg.input, context)?;
1682 Ok(Box::new(AggregateOperator::new(
1683 input,
1684 agg.group_by.clone(),
1685 agg.aggregates.clone(),
1686 )))
1687 }
1688
1689 PlanNode::Sort(sort) => {
1690 let input = self.create_operator(&sort.input, context)?;
1691 Ok(Box::new(SortOperator::new(input, sort.order_by.clone())))
1692 }
1693
1694 PlanNode::Limit(limit) => {
1695 let input = self.create_operator(&limit.input, context)?;
1696 Ok(Box::new(LimitOperator::new(
1697 input,
1698 limit.limit,
1699 limit.offset,
1700 )))
1701 }
1702
1703 PlanNode::Empty => Ok(Box::new(EmptyOperator::new())),
1704
1705 PlanNode::SetOperation(set_op) => {
1706 let left = self.create_operator(&set_op.left, context)?;
1707 let right = self.create_operator(&set_op.right, context)?;
1708 Ok(Box::new(SetOperationOperator::new(left, right, set_op.op)))
1709 }
1710
1711 PlanNode::CreateTable(_)
1713 | PlanNode::DropTable(_)
1714 | PlanNode::AlterTable(_)
1715 | PlanNode::CreateIndex(_)
1716 | PlanNode::DropIndex(_)
1717 | PlanNode::Insert(_)
1718 | PlanNode::Update(_)
1719 | PlanNode::Delete(_)
1720 | PlanNode::BeginTransaction
1721 | PlanNode::CommitTransaction
1722 | PlanNode::RollbackTransaction => Err(ExecutorError::Internal(
1723 "DDL/DML/transaction nodes should not be in operator tree".to_string(),
1724 )),
1725 }
1726 }
1727}
1728
1729pub trait Operator {
1735 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>>;
1737
1738 fn columns(&self) -> &[String];
1740}
1741
1742struct ScanOperator {
1747 table: Arc<RwLock<TableData>>,
1748 columns: Vec<String>,
1749 position: usize,
1750 batch_size: usize,
1751 cached_rows: Option<Vec<Row>>,
1753 snapshot_version: Option<u64>,
1755}
1756
1757impl ScanOperator {
1758 fn new(scan: ScanNode, context: &ExecutionContext) -> ExecutorResult<Self> {
1759 let table = context
1760 .get_table(&scan.table_name)
1761 .ok_or_else(|| ExecutorError::TableNotFound(scan.table_name.clone()))?;
1762
1763 let columns = {
1765 let table_data = table
1766 .read()
1767 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1768 if scan.columns.is_empty() {
1769 table_data.columns.clone()
1770 } else {
1771 scan.columns.clone()
1772 }
1773 }; Ok(Self {
1776 table,
1777 columns,
1778 position: 0,
1779 batch_size: context.batch_size(),
1780 cached_rows: None,
1781 snapshot_version: context.snapshot_version,
1782 })
1783 }
1784}
1785
1786impl Operator for ScanOperator {
1787 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1788 if self.cached_rows.is_none() {
1790 let table_data = self
1791 .table
1792 .read()
1793 .map_err(|_| ExecutorError::Internal("Lock poisoned".to_string()))?;
1794 let mut visible = Vec::new();
1795 for i in 0..table_data.rows.len() {
1796 let created = table_data.row_created_version.get(i).copied().unwrap_or(0);
1797 let deleted = table_data.row_deleted_version.get(i).copied().unwrap_or(0);
1798 let vis = if let Some(snap) = self.snapshot_version {
1799 created <= snap && (deleted == 0 || deleted > snap)
1800 } else {
1801 deleted == 0
1802 };
1803 if vis {
1804 visible.push(table_data.rows[i].clone());
1805 }
1806 }
1807 self.cached_rows = Some(visible);
1808 }
1809
1810 let rows = self
1811 .cached_rows
1812 .as_ref()
1813 .expect("cached_rows was just set to Some");
1814
1815 if self.position >= rows.len() {
1816 return Ok(None);
1817 }
1818
1819 let end = (self.position + self.batch_size).min(rows.len());
1820 let batch_rows: Vec<Row> = rows[self.position..end].to_vec();
1821 self.position = end;
1822
1823 Ok(Some(ResultBatch::with_rows(
1824 self.columns.clone(),
1825 batch_rows,
1826 )))
1827 }
1828
1829 fn columns(&self) -> &[String] {
1830 &self.columns
1831 }
1832}
1833
1834struct FilterOperator<'a> {
1839 input: Box<dyn Operator + 'a>,
1840 predicate: PlanExpression,
1841 columns: Vec<String>,
1842}
1843
1844impl<'a> FilterOperator<'a> {
1845 fn new(input: Box<dyn Operator + 'a>, predicate: PlanExpression) -> Self {
1846 let columns = input.columns().to_vec();
1847 Self {
1848 input,
1849 predicate,
1850 columns,
1851 }
1852 }
1853
1854 fn evaluate_predicate(&self, row: &Row, columns: &[String]) -> ExecutorResult<bool> {
1855 let value = evaluate_expression(&self.predicate, row, columns)?;
1856 match value {
1857 Value::Boolean(b) => Ok(b),
1858 Value::Null => Ok(false),
1859 _ => Err(ExecutorError::TypeMismatch {
1860 expected: "boolean".to_string(),
1861 actual: format!("{:?}", value),
1862 }),
1863 }
1864 }
1865}
1866
1867impl<'a> Operator for FilterOperator<'a> {
1868 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1869 while let Some(batch) = self.input.next_batch()? {
1870 let filtered: Vec<Row> = batch
1871 .rows
1872 .into_iter()
1873 .filter(|row| {
1874 self.evaluate_predicate(row, &batch.columns)
1875 .unwrap_or(false)
1876 })
1877 .collect();
1878
1879 if !filtered.is_empty() {
1880 return Ok(Some(ResultBatch::with_rows(self.columns.clone(), filtered)));
1881 }
1882 }
1883 Ok(None)
1884 }
1885
1886 fn columns(&self) -> &[String] {
1887 &self.columns
1888 }
1889}
1890
1891struct ProjectOperator<'a> {
1896 input: Box<dyn Operator + 'a>,
1897 expressions: Vec<crate::planner::ProjectionExpr>,
1898 columns: Vec<String>,
1899 input_columns: Vec<String>,
1900}
1901
1902impl<'a> ProjectOperator<'a> {
1903 fn new(
1904 input: Box<dyn Operator + 'a>,
1905 expressions: Vec<crate::planner::ProjectionExpr>,
1906 ) -> Self {
1907 let input_columns = input.columns().to_vec();
1908
1909 let mut expanded_expressions = Vec::new();
1911 let mut columns = Vec::new();
1912
1913 for (i, proj_expr) in expressions.iter().enumerate() {
1914 if let PlanExpression::Column { name, table, .. } = &proj_expr.expr {
1916 if name == "*" {
1917 for input_col in &input_columns {
1919 if let Some(tbl) = table {
1922 let prefix = format!("{}.", tbl);
1923 if !input_col.starts_with(&prefix) && input_col != tbl {
1924 if input_col.contains('.') {
1927 continue;
1928 }
1929 }
1930 }
1931 expanded_expressions.push(crate::planner::ProjectionExpr {
1932 expr: PlanExpression::Column {
1933 table: None,
1934 name: input_col.clone(),
1935 data_type: DataType::Any,
1936 },
1937 alias: None,
1938 });
1939 columns.push(input_col.clone());
1940 }
1941 continue;
1942 }
1943 }
1944
1945 expanded_expressions.push(proj_expr.clone());
1947 let col_name = proj_expr.alias.clone().unwrap_or_else(|| {
1948 extract_column_name(&proj_expr.expr).unwrap_or_else(|| format!("column_{}", i))
1949 });
1950 columns.push(col_name);
1951 }
1952
1953 Self {
1954 input,
1955 expressions: expanded_expressions,
1956 columns,
1957 input_columns,
1958 }
1959 }
1960}
1961
1962fn extract_column_name(expr: &PlanExpression) -> Option<String> {
1964 match expr {
1965 PlanExpression::Column { name, .. } => Some(name.clone()),
1966 PlanExpression::Function { name, .. } => Some(name.clone()),
1967 PlanExpression::Cast { expr, .. } => extract_column_name(expr),
1968 PlanExpression::UnaryOp { expr, .. } => extract_column_name(expr),
1969 _ => None,
1970 }
1971}
1972
1973impl<'a> Operator for ProjectOperator<'a> {
1974 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
1975 if let Some(batch) = self.input.next_batch()? {
1976 let mut result_rows = Vec::with_capacity(batch.rows.len());
1977
1978 for row in &batch.rows {
1979 let mut projected_values = Vec::with_capacity(self.expressions.len());
1980
1981 for expr in &self.expressions {
1982 let value = evaluate_expression(&expr.expr, row, &self.input_columns)?;
1983 projected_values.push(value);
1984 }
1985
1986 result_rows.push(Row {
1987 values: projected_values,
1988 });
1989 }
1990
1991 Ok(Some(ResultBatch::with_rows(
1992 self.columns.clone(),
1993 result_rows,
1994 )))
1995 } else {
1996 Ok(None)
1997 }
1998 }
1999
2000 fn columns(&self) -> &[String] {
2001 &self.columns
2002 }
2003}
2004
2005struct JoinOperator<'a> {
2010 left: Box<dyn Operator + 'a>,
2011 right: Box<dyn Operator + 'a>,
2012 join_type: PlanJoinType,
2013 condition: Option<PlanExpression>,
2014 _strategy: JoinStrategy,
2015 columns: Vec<String>,
2016 right_col_count: usize,
2017 right_data: Option<Vec<Row>>,
2018 left_batch: Option<ResultBatch>,
2019 left_row_idx: usize,
2020 right_row_idx: usize,
2021}
2022
2023impl<'a> JoinOperator<'a> {
2024 fn new(
2025 left: Box<dyn Operator + 'a>,
2026 right: Box<dyn Operator + 'a>,
2027 join_type: PlanJoinType,
2028 condition: Option<PlanExpression>,
2029 strategy: JoinStrategy,
2030 ) -> ExecutorResult<Self> {
2031 let mut columns = left.columns().to_vec();
2032 let right_col_count = right.columns().len();
2033 columns.extend(right.columns().to_vec());
2034
2035 Ok(Self {
2036 left,
2037 right,
2038 join_type,
2039 condition,
2040 _strategy: strategy,
2041 columns,
2042 right_col_count,
2043 right_data: None,
2044 left_batch: None,
2045 left_row_idx: 0,
2046 right_row_idx: 0,
2047 })
2048 }
2049
2050 fn materialize_right(&mut self) -> ExecutorResult<()> {
2051 if self.right_data.is_some() {
2052 return Ok(());
2053 }
2054
2055 let mut all_rows = Vec::new();
2056 while let Some(batch) = self.right.next_batch()? {
2057 all_rows.extend(batch.rows);
2058 }
2059 self.right_data = Some(all_rows);
2060 Ok(())
2061 }
2062
2063 fn evaluate_join_condition(&self, left: &Row, right: &Row) -> ExecutorResult<bool> {
2064 match &self.condition {
2065 None => Ok(true),
2066 Some(expr) => {
2067 let mut combined = left.values.clone();
2068 combined.extend(right.values.clone());
2069 let combined_row = Row { values: combined };
2070
2071 let value = evaluate_expression(expr, &combined_row, &self.columns)?;
2072 match value {
2073 Value::Boolean(b) => Ok(b),
2074 Value::Null => Ok(false),
2075 _ => Ok(false),
2076 }
2077 }
2078 }
2079 }
2080}
2081
2082impl<'a> Operator for JoinOperator<'a> {
2083 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2084 self.materialize_right()?;
2085 let right_data = self
2087 .right_data
2088 .as_ref()
2089 .expect("right_data was set by materialize_right");
2090
2091 let mut result_rows = Vec::new();
2092
2093 loop {
2094 if self.left_batch.is_none() {
2095 self.left_batch = self.left.next_batch()?;
2096 self.left_row_idx = 0;
2097 self.right_row_idx = 0;
2098
2099 if self.left_batch.is_none() {
2100 break;
2101 }
2102 }
2103
2104 let left_batch = self
2106 .left_batch
2107 .as_ref()
2108 .expect("left_batch verified to be Some");
2109
2110 while self.left_row_idx < left_batch.rows.len() {
2111 let left_row = &left_batch.rows[self.left_row_idx];
2112 let mut matched = false;
2113
2114 while self.right_row_idx < right_data.len() {
2115 let right_row = &right_data[self.right_row_idx];
2116 self.right_row_idx += 1;
2117
2118 if self.evaluate_join_condition(left_row, right_row)? {
2119 let mut combined = left_row.values.clone();
2120 combined.extend(right_row.values.clone());
2121 result_rows.push(Row { values: combined });
2122 matched = true;
2123
2124 if result_rows.len() >= 1024 {
2125 return Ok(Some(ResultBatch::with_rows(
2126 self.columns.clone(),
2127 result_rows,
2128 )));
2129 }
2130 }
2131 }
2132
2133 if !matched && self.join_type == PlanJoinType::Left {
2135 let mut combined = left_row.values.clone();
2136 combined.extend(vec![Value::Null; self.right_col_count]);
2137 result_rows.push(Row { values: combined });
2138 }
2139
2140 self.left_row_idx += 1;
2141 self.right_row_idx = 0;
2142 }
2143
2144 self.left_batch = None;
2145 }
2146
2147 if result_rows.is_empty() {
2148 Ok(None)
2149 } else {
2150 Ok(Some(ResultBatch::with_rows(
2151 self.columns.clone(),
2152 result_rows,
2153 )))
2154 }
2155 }
2156
2157 fn columns(&self) -> &[String] {
2158 &self.columns
2159 }
2160}
2161
2162struct HashJoinOperator<'a> {
2167 left: Box<dyn Operator + 'a>,
2168 join_type: PlanJoinType,
2169 condition: Option<PlanExpression>,
2170 columns: Vec<String>,
2171 right_columns: Vec<String>,
2172 hash_table: Option<HashMap<String, Vec<Row>>>,
2174 left_key_idx: Option<usize>,
2176 done: bool,
2177}
2178
2179impl<'a> HashJoinOperator<'a> {
2180 fn new(
2181 left: Box<dyn Operator + 'a>,
2182 mut right: Box<dyn Operator + 'a>,
2183 join_type: PlanJoinType,
2184 condition: Option<PlanExpression>,
2185 ) -> ExecutorResult<Self> {
2186 let left_columns = left.columns().to_vec();
2187 let right_columns = right.columns().to_vec();
2188
2189 let mut columns = left_columns.clone();
2190 columns.extend(right_columns.clone());
2191
2192 let (left_key_idx, right_key_idx) =
2194 Self::extract_key_indices(&condition, &left_columns, &right_columns);
2195
2196 let mut hash_table: HashMap<String, Vec<Row>> = HashMap::new();
2198 while let Some(batch) = right.next_batch()? {
2199 for row in batch.rows {
2200 let key = if let Some(idx) = right_key_idx {
2201 format!("{:?}", row.values.get(idx).unwrap_or(&Value::Null))
2202 } else {
2203 String::new() };
2205 hash_table.entry(key).or_default().push(row);
2206 }
2207 }
2208
2209 Ok(Self {
2210 left,
2211 join_type,
2212 condition,
2213 columns,
2214 right_columns,
2215 hash_table: Some(hash_table),
2216 left_key_idx,
2217 done: false,
2218 })
2219 }
2220
2221 fn extract_key_indices(
2223 condition: &Option<PlanExpression>,
2224 left_columns: &[String],
2225 right_columns: &[String],
2226 ) -> (Option<usize>, Option<usize>) {
2227 if let Some(PlanExpression::BinaryOp {
2228 left,
2229 op: PlanBinaryOp::Equal,
2230 right,
2231 }) = condition
2232 {
2233 let left_name = Self::extract_column_name(left);
2234 let right_name = Self::extract_column_name(right);
2235
2236 if let (Some(ln), Some(rn)) = (left_name, right_name) {
2237 let li = left_columns
2239 .iter()
2240 .position(|c| c == &ln || c.ends_with(&format!(".{}", ln)));
2241 let ri = right_columns
2242 .iter()
2243 .position(|c| c == &rn || c.ends_with(&format!(".{}", rn)));
2244 if li.is_some() && ri.is_some() {
2245 return (li, ri);
2246 }
2247 let li = left_columns
2249 .iter()
2250 .position(|c| c == &rn || c.ends_with(&format!(".{}", rn)));
2251 let ri = right_columns
2252 .iter()
2253 .position(|c| c == &ln || c.ends_with(&format!(".{}", ln)));
2254 if li.is_some() && ri.is_some() {
2255 return (li, ri);
2256 }
2257 }
2258 }
2259 (None, None)
2260 }
2261
2262 fn extract_column_name(expr: &PlanExpression) -> Option<String> {
2263 match expr {
2264 PlanExpression::Column { name, .. } => Some(name.clone()),
2265 _ => None,
2266 }
2267 }
2268
2269 fn evaluate_join_condition(&self, left: &Row, right: &Row) -> ExecutorResult<bool> {
2270 match &self.condition {
2271 None => Ok(true),
2272 Some(expr) => {
2273 let mut combined = left.values.clone();
2274 combined.extend(right.values.clone());
2275 let combined_row = Row { values: combined };
2276 let value = evaluate_expression(expr, &combined_row, &self.columns)?;
2277 match value {
2278 Value::Boolean(b) => Ok(b),
2279 _ => Ok(false),
2280 }
2281 }
2282 }
2283 }
2284}
2285
2286impl<'a> Operator for HashJoinOperator<'a> {
2287 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2288 if self.done {
2289 return Ok(None);
2290 }
2291
2292 let hash_table = self.hash_table.as_ref().unwrap();
2293 let mut result_rows = Vec::new();
2294
2295 while let Some(left_batch) = self.left.next_batch()? {
2296 for left_row in &left_batch.rows {
2297 let probe_key = if let Some(idx) = self.left_key_idx {
2299 format!("{:?}", left_row.values.get(idx).unwrap_or(&Value::Null))
2300 } else {
2301 String::new()
2302 };
2303
2304 let candidates = if self.left_key_idx.is_some() {
2305 hash_table
2306 .get(&probe_key)
2307 .map(|v| v.as_slice())
2308 .unwrap_or(&[])
2309 } else {
2310 &[]
2313 };
2314
2315 let mut matched = false;
2316 for right_row in candidates {
2317 if self.evaluate_join_condition(left_row, right_row)? {
2318 let mut combined = left_row.values.clone();
2319 combined.extend(right_row.values.clone());
2320 result_rows.push(Row { values: combined });
2321 matched = true;
2322
2323 if result_rows.len() >= 1024 {
2324 return Ok(Some(ResultBatch::with_rows(
2325 self.columns.clone(),
2326 result_rows,
2327 )));
2328 }
2329 }
2330 }
2331
2332 if !matched && self.join_type == PlanJoinType::Left {
2334 let mut combined = left_row.values.clone();
2335 combined.extend(vec![Value::Null; self.right_columns.len()]);
2336 result_rows.push(Row { values: combined });
2337 }
2338 }
2339 }
2340
2341 self.done = true;
2342
2343 if result_rows.is_empty() {
2344 Ok(None)
2345 } else {
2346 Ok(Some(ResultBatch::with_rows(
2347 self.columns.clone(),
2348 result_rows,
2349 )))
2350 }
2351 }
2352
2353 fn columns(&self) -> &[String] {
2354 &self.columns
2355 }
2356}
2357
2358struct AggregateOperator<'a> {
2363 input: Box<dyn Operator + 'a>,
2364 _group_by: Vec<PlanExpression>,
2365 aggregates: Vec<crate::planner::AggregateExpr>,
2366 columns: Vec<String>,
2367 input_columns: Vec<String>,
2368 done: bool,
2369}
2370
2371impl<'a> AggregateOperator<'a> {
2372 fn new(
2373 input: Box<dyn Operator + 'a>,
2374 group_by: Vec<PlanExpression>,
2375 aggregates: Vec<crate::planner::AggregateExpr>,
2376 ) -> Self {
2377 let input_columns = input.columns().to_vec();
2378
2379 let columns: Vec<String> = aggregates
2380 .iter()
2381 .enumerate()
2382 .map(|(i, agg)| agg.alias.clone().unwrap_or_else(|| format!("agg_{}", i)))
2383 .collect();
2384
2385 Self {
2386 input,
2387 _group_by: group_by,
2388 aggregates,
2389 columns,
2390 input_columns,
2391 done: false,
2392 }
2393 }
2394}
2395
2396impl<'a> Operator for AggregateOperator<'a> {
2397 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2398 if self.done {
2399 return Ok(None);
2400 }
2401
2402 let mut accumulators: Vec<Accumulator> = self
2403 .aggregates
2404 .iter()
2405 .map(|agg| Accumulator::new(agg.function))
2406 .collect();
2407
2408 while let Some(batch) = self.input.next_batch()? {
2409 for row in &batch.rows {
2410 for (i, agg) in self.aggregates.iter().enumerate() {
2411 let value = if let Some(ref arg) = agg.argument {
2412 evaluate_expression(arg, row, &self.input_columns)?
2413 } else {
2414 Value::Integer(1)
2415 };
2416 accumulators[i].accumulate(&value)?;
2417 }
2418 }
2419 }
2420
2421 let result_values: Vec<Value> = accumulators.iter().map(|acc| acc.finalize()).collect();
2422
2423 self.done = true;
2424
2425 Ok(Some(ResultBatch::with_rows(
2426 self.columns.clone(),
2427 vec![Row {
2428 values: result_values,
2429 }],
2430 )))
2431 }
2432
2433 fn columns(&self) -> &[String] {
2434 &self.columns
2435 }
2436}
2437
2438struct Accumulator {
2440 function: AggregateFunction,
2441 count: i64,
2442 sum: f64,
2443 min: Option<Value>,
2444 max: Option<Value>,
2445}
2446
2447impl Accumulator {
2448 fn new(function: AggregateFunction) -> Self {
2449 Self {
2450 function,
2451 count: 0,
2452 sum: 0.0,
2453 min: None,
2454 max: None,
2455 }
2456 }
2457
2458 fn accumulate(&mut self, value: &Value) -> ExecutorResult<()> {
2459 if matches!(value, Value::Null) {
2460 return Ok(());
2461 }
2462
2463 self.count += 1;
2464
2465 match self.function {
2466 AggregateFunction::Count => {}
2467 AggregateFunction::Sum | AggregateFunction::Avg => {
2468 self.sum += value_to_f64(value)?;
2469 }
2470 AggregateFunction::Min => {
2471 if self.min.is_none()
2473 || compare_values(
2474 value,
2475 self.min
2476 .as_ref()
2477 .expect("min verified to be Some in || branch"),
2478 )? == std::cmp::Ordering::Less
2479 {
2480 self.min = Some(value.clone());
2481 }
2482 }
2483 AggregateFunction::Max => {
2484 if self.max.is_none()
2486 || compare_values(
2487 value,
2488 self.max
2489 .as_ref()
2490 .expect("max verified to be Some in || branch"),
2491 )? == std::cmp::Ordering::Greater
2492 {
2493 self.max = Some(value.clone());
2494 }
2495 }
2496 }
2497
2498 Ok(())
2499 }
2500
2501 fn finalize(&self) -> Value {
2502 match self.function {
2503 AggregateFunction::Count => Value::Integer(self.count),
2504 AggregateFunction::Sum => Value::Float(self.sum),
2505 AggregateFunction::Avg => {
2506 if self.count == 0 {
2507 Value::Null
2508 } else {
2509 Value::Float(self.sum / self.count as f64)
2510 }
2511 }
2512 AggregateFunction::Min => self.min.clone().unwrap_or(Value::Null),
2513 AggregateFunction::Max => self.max.clone().unwrap_or(Value::Null),
2514 }
2515 }
2516}
2517
2518struct SortOperator<'a> {
2523 input: Box<dyn Operator + 'a>,
2524 order_by: Vec<crate::planner::SortKey>,
2525 columns: Vec<String>,
2526 sorted_data: Option<Vec<Row>>,
2527 position: usize,
2528}
2529
2530impl<'a> SortOperator<'a> {
2531 fn new(input: Box<dyn Operator + 'a>, order_by: Vec<crate::planner::SortKey>) -> Self {
2532 let columns = input.columns().to_vec();
2533 Self {
2534 input,
2535 order_by,
2536 columns,
2537 sorted_data: None,
2538 position: 0,
2539 }
2540 }
2541}
2542
2543impl<'a> Operator for SortOperator<'a> {
2544 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2545 if self.sorted_data.is_none() {
2546 let mut all_rows = Vec::new();
2547 while let Some(batch) = self.input.next_batch()? {
2548 all_rows.extend(batch.rows);
2549 }
2550
2551 let columns = self.columns.clone();
2552 let order_by = self.order_by.clone();
2553
2554 all_rows.sort_by(|a, b| {
2555 for key in &order_by {
2556 let a_val = evaluate_expression(&key.expr, a, &columns).unwrap_or(Value::Null);
2557 let b_val = evaluate_expression(&key.expr, b, &columns).unwrap_or(Value::Null);
2558
2559 let cmp = compare_values(&a_val, &b_val).unwrap_or(std::cmp::Ordering::Equal);
2560
2561 if cmp != std::cmp::Ordering::Equal {
2562 return if key.ascending { cmp } else { cmp.reverse() };
2563 }
2564 }
2565 std::cmp::Ordering::Equal
2566 });
2567
2568 self.sorted_data = Some(all_rows);
2569 }
2570
2571 let data = self
2573 .sorted_data
2574 .as_ref()
2575 .expect("sorted_data was just set to Some");
2576
2577 if self.position >= data.len() {
2578 return Ok(None);
2579 }
2580
2581 let end = (self.position + 1024).min(data.len());
2582 let rows = data[self.position..end].to_vec();
2583 self.position = end;
2584
2585 Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)))
2586 }
2587
2588 fn columns(&self) -> &[String] {
2589 &self.columns
2590 }
2591}
2592
2593struct LimitOperator<'a> {
2598 input: Box<dyn Operator + 'a>,
2599 limit: Option<u64>,
2600 offset: Option<u64>,
2601 columns: Vec<String>,
2602 rows_skipped: u64,
2603 rows_returned: u64,
2604}
2605
2606impl<'a> LimitOperator<'a> {
2607 fn new(input: Box<dyn Operator + 'a>, limit: Option<u64>, offset: Option<u64>) -> Self {
2608 let columns = input.columns().to_vec();
2609 Self {
2610 input,
2611 limit,
2612 offset,
2613 columns,
2614 rows_skipped: 0,
2615 rows_returned: 0,
2616 }
2617 }
2618}
2619
2620impl<'a> Operator for LimitOperator<'a> {
2621 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2622 if let Some(limit) = self.limit {
2623 if self.rows_returned >= limit {
2624 return Ok(None);
2625 }
2626 }
2627
2628 while let Some(batch) = self.input.next_batch()? {
2629 let mut rows = batch.rows;
2630
2631 let offset = self.offset.unwrap_or(0);
2632 if self.rows_skipped < offset {
2633 let skip = (offset - self.rows_skipped) as usize;
2634 if skip >= rows.len() {
2635 self.rows_skipped += rows.len() as u64;
2636 continue;
2637 }
2638 rows = rows[skip..].to_vec();
2639 self.rows_skipped = offset;
2640 }
2641
2642 if let Some(limit) = self.limit {
2643 let remaining = limit - self.rows_returned;
2644 if rows.len() as u64 > remaining {
2645 rows.truncate(remaining as usize);
2646 }
2647 }
2648
2649 if rows.is_empty() {
2650 continue;
2651 }
2652
2653 self.rows_returned += rows.len() as u64;
2654
2655 return Ok(Some(ResultBatch::with_rows(self.columns.clone(), rows)));
2656 }
2657
2658 Ok(None)
2659 }
2660
2661 fn columns(&self) -> &[String] {
2662 &self.columns
2663 }
2664}
2665
2666struct EmptyOperator {
2671 done: bool,
2672}
2673
2674impl EmptyOperator {
2675 fn new() -> Self {
2676 Self { done: false }
2677 }
2678}
2679
2680impl Operator for EmptyOperator {
2681 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2682 if self.done {
2683 return Ok(None);
2684 }
2685 self.done = true;
2686 Ok(Some(ResultBatch::with_rows(
2687 vec![],
2688 vec![Row { values: vec![] }],
2689 )))
2690 }
2691
2692 fn columns(&self) -> &[String] {
2693 &[]
2694 }
2695}
2696
2697struct SetOperationOperator<'a> {
2702 left: Box<dyn Operator + 'a>,
2703 right: Box<dyn Operator + 'a>,
2704 op: SetOperationType,
2705 result_rows: Option<Vec<Row>>,
2706 result_columns: Vec<String>,
2707 position: usize,
2708}
2709
2710impl<'a> SetOperationOperator<'a> {
2711 fn new(
2712 left: Box<dyn Operator + 'a>,
2713 right: Box<dyn Operator + 'a>,
2714 op: SetOperationType,
2715 ) -> Self {
2716 Self {
2717 left,
2718 right,
2719 op,
2720 result_rows: None,
2721 result_columns: Vec::new(),
2722 position: 0,
2723 }
2724 }
2725
2726 fn drain_operator(op: &mut Box<dyn Operator + 'a>) -> ExecutorResult<(Vec<String>, Vec<Row>)> {
2728 let mut all_rows = Vec::new();
2729 let mut columns = Vec::new();
2730 while let Some(batch) = op.next_batch()? {
2731 if columns.is_empty() {
2732 columns = batch.columns;
2733 }
2734 all_rows.extend(batch.rows);
2735 }
2736 Ok((columns, all_rows))
2737 }
2738
2739 fn compute(&mut self) -> ExecutorResult<()> {
2741 if self.result_rows.is_some() {
2742 return Ok(());
2743 }
2744
2745 let (left_cols, left_rows) = Self::drain_operator(&mut self.left)?;
2746 let (right_cols, right_rows) = Self::drain_operator(&mut self.right)?;
2747
2748 self.result_columns = if !left_cols.is_empty() {
2749 left_cols
2750 } else {
2751 right_cols
2752 };
2753
2754 let result = match self.op {
2755 SetOperationType::UnionAll => {
2756 let mut combined = left_rows;
2757 combined.extend(right_rows);
2758 combined
2759 }
2760 SetOperationType::Union => {
2761 let mut combined = left_rows;
2762 combined.extend(right_rows);
2763 deduplicate_rows(combined)
2764 }
2765 SetOperationType::Intersect => {
2766 let right_set: HashSet<Vec<ValueKey>> = right_rows
2768 .iter()
2769 .map(|r| r.values.iter().map(value_to_key).collect())
2770 .collect();
2771 let mut result = Vec::new();
2772 let mut seen = HashSet::new();
2773 for row in left_rows {
2774 let key: Vec<ValueKey> = row.values.iter().map(value_to_key).collect();
2775 if right_set.contains(&key) && seen.insert(key) {
2776 result.push(row);
2777 }
2778 }
2779 result
2780 }
2781 SetOperationType::Except => {
2782 let right_set: HashSet<Vec<ValueKey>> = right_rows
2784 .iter()
2785 .map(|r| r.values.iter().map(value_to_key).collect())
2786 .collect();
2787 let mut result = Vec::new();
2788 let mut seen = HashSet::new();
2789 for row in left_rows {
2790 let key: Vec<ValueKey> = row.values.iter().map(value_to_key).collect();
2791 if !right_set.contains(&key) && seen.insert(key) {
2792 result.push(row);
2793 }
2794 }
2795 result
2796 }
2797 };
2798
2799 self.result_rows = Some(result);
2800 Ok(())
2801 }
2802}
2803
2804impl<'a> Operator for SetOperationOperator<'a> {
2805 fn next_batch(&mut self) -> ExecutorResult<Option<ResultBatch>> {
2806 self.compute()?;
2807
2808 let rows = self.result_rows.as_ref().unwrap();
2809 if self.position >= rows.len() {
2810 return Ok(None);
2811 }
2812
2813 let end = (self.position + 1024).min(rows.len());
2814 let batch_rows = rows[self.position..end].to_vec();
2815 self.position = end;
2816
2817 Ok(Some(ResultBatch::with_rows(
2818 self.result_columns.clone(),
2819 batch_rows,
2820 )))
2821 }
2822
2823 fn columns(&self) -> &[String] {
2824 &self.result_columns
2825 }
2826}
2827
2828#[derive(Hash, Eq, PartialEq, Clone, Debug)]
2830enum ValueKey {
2831 Null,
2832 Boolean(bool),
2833 Integer(i64),
2834 Float(u64), String(String),
2836}
2837
2838fn value_to_key(v: &Value) -> ValueKey {
2839 match v {
2840 Value::Null => ValueKey::Null,
2841 Value::Boolean(b) => ValueKey::Boolean(*b),
2842 Value::Integer(i) => ValueKey::Integer(*i),
2843 Value::Float(f) => ValueKey::Float(f.to_bits()),
2844 Value::String(s) => ValueKey::String(s.clone()),
2845 other => ValueKey::String(format!("{:?}", other)),
2847 }
2848}
2849
2850fn deduplicate_rows(rows: Vec<Row>) -> Vec<Row> {
2851 let mut seen = HashSet::new();
2852 let mut result = Vec::new();
2853 for row in rows {
2854 let key: Vec<ValueKey> = row.values.iter().map(value_to_key).collect();
2855 if seen.insert(key) {
2856 result.push(row);
2857 }
2858 }
2859 result
2860}
2861
2862pub struct EvalContext<'a> {
2868 executor: Option<&'a Executor>,
2869}
2870
2871impl<'a> EvalContext<'a> {
2872 pub fn new() -> Self {
2873 Self { executor: None }
2874 }
2875
2876 pub fn with_executor(executor: &'a Executor) -> Self {
2877 Self {
2878 executor: Some(executor),
2879 }
2880 }
2881
2882 fn execute_subquery(&self, subquery: &PlanNode) -> ExecutorResult<Vec<Row>> {
2884 let executor = self.executor.ok_or_else(|| {
2885 ExecutorError::Internal("Subquery requires execution context".to_string())
2886 })?;
2887 let plan = QueryPlan {
2888 root: subquery.clone(),
2889 estimated_cost: 0.0,
2890 estimated_rows: 0,
2891 };
2892 let result = executor.execute(&plan)?;
2893 Ok(result.rows)
2894 }
2895}
2896
2897impl Default for EvalContext<'_> {
2898 fn default() -> Self {
2899 Self::new()
2900 }
2901}
2902
2903fn evaluate_default_expression(expr: &PlanExpression) -> ExecutorResult<Value> {
2905 let empty_row = Row { values: vec![] };
2906 let empty_columns: Vec<String> = vec![];
2907 evaluate_expression(expr, &empty_row, &empty_columns)
2908}
2909
2910fn substitute_parameters(node: &PlanNode, params: &[Value]) -> ExecutorResult<PlanNode> {
2912 match node {
2913 PlanNode::Scan(scan) => {
2914 let index_scan = if let Some(idx) = &scan.index_scan {
2917 let start = idx
2918 .key_range
2919 .start
2920 .as_ref()
2921 .map(|e| substitute_expr(e, params))
2922 .transpose()?;
2923 let end = idx
2924 .key_range
2925 .end
2926 .as_ref()
2927 .map(|e| substitute_expr(e, params))
2928 .transpose()?;
2929 Some(crate::planner::IndexScan {
2930 index_name: idx.index_name.clone(),
2931 key_range: crate::planner::KeyRange {
2932 start,
2933 start_inclusive: idx.key_range.start_inclusive,
2934 end,
2935 end_inclusive: idx.key_range.end_inclusive,
2936 },
2937 })
2938 } else {
2939 None
2940 };
2941 Ok(PlanNode::Scan(ScanNode {
2942 table_name: scan.table_name.clone(),
2943 alias: scan.alias.clone(),
2944 columns: scan.columns.clone(),
2945 index_scan,
2946 }))
2947 }
2948 PlanNode::Filter(filter) => {
2949 let input = Box::new(substitute_parameters(&filter.input, params)?);
2950 let predicate = substitute_expr(&filter.predicate, params)?;
2951 Ok(PlanNode::Filter(FilterNode { input, predicate }))
2952 }
2953 PlanNode::Project(proj) => {
2954 let input = Box::new(substitute_parameters(&proj.input, params)?);
2955 let expressions = proj
2956 .expressions
2957 .iter()
2958 .map(|pe| {
2959 Ok(ProjectionExpr {
2960 expr: substitute_expr(&pe.expr, params)?,
2961 alias: pe.alias.clone(),
2962 })
2963 })
2964 .collect::<ExecutorResult<Vec<_>>>()?;
2965 Ok(PlanNode::Project(ProjectNode { input, expressions }))
2966 }
2967 PlanNode::Sort(sort) => {
2968 let input = Box::new(substitute_parameters(&sort.input, params)?);
2969 let order_by = sort
2970 .order_by
2971 .iter()
2972 .map(|sk| {
2973 Ok(SortKey {
2974 expr: substitute_expr(&sk.expr, params)?,
2975 ascending: sk.ascending,
2976 nulls_first: sk.nulls_first,
2977 })
2978 })
2979 .collect::<ExecutorResult<Vec<_>>>()?;
2980 Ok(PlanNode::Sort(SortNode { input, order_by }))
2981 }
2982 PlanNode::Limit(limit) => {
2983 let input = Box::new(substitute_parameters(&limit.input, params)?);
2984 Ok(PlanNode::Limit(LimitNode {
2985 input,
2986 limit: limit.limit,
2987 offset: limit.offset,
2988 }))
2989 }
2990 PlanNode::Aggregate(agg) => {
2991 let input = Box::new(substitute_parameters(&agg.input, params)?);
2992 let group_by = agg
2993 .group_by
2994 .iter()
2995 .map(|e| substitute_expr(e, params))
2996 .collect::<ExecutorResult<Vec<_>>>()?;
2997 let aggregates = agg
2998 .aggregates
2999 .iter()
3000 .map(|ae| {
3001 Ok(AggregateExpr {
3002 function: ae.function,
3003 argument: ae
3004 .argument
3005 .as_ref()
3006 .map(|a| substitute_expr(a, params))
3007 .transpose()?,
3008 distinct: ae.distinct,
3009 alias: ae.alias.clone(),
3010 })
3011 })
3012 .collect::<ExecutorResult<Vec<_>>>()?;
3013 Ok(PlanNode::Aggregate(AggregateNode {
3014 input,
3015 group_by,
3016 aggregates,
3017 }))
3018 }
3019 PlanNode::Join(join) => {
3020 let left = Box::new(substitute_parameters(&join.left, params)?);
3021 let right = Box::new(substitute_parameters(&join.right, params)?);
3022 let condition = join
3023 .condition
3024 .as_ref()
3025 .map(|c| substitute_expr(c, params))
3026 .transpose()?;
3027 Ok(PlanNode::Join(JoinNode {
3028 left,
3029 right,
3030 join_type: join.join_type,
3031 condition,
3032 strategy: join.strategy,
3033 }))
3034 }
3035 PlanNode::Insert(insert) => {
3036 let source = match &insert.source {
3037 InsertPlanSource::Values(rows) => {
3038 let substituted = rows
3039 .iter()
3040 .map(|row| {
3041 row.iter()
3042 .map(|e| substitute_expr(e, params))
3043 .collect::<ExecutorResult<Vec<_>>>()
3044 })
3045 .collect::<ExecutorResult<Vec<_>>>()?;
3046 InsertPlanSource::Values(substituted)
3047 }
3048 InsertPlanSource::Query(q) => {
3049 InsertPlanSource::Query(Box::new(substitute_parameters(q, params)?))
3050 }
3051 };
3052 Ok(PlanNode::Insert(InsertNode {
3053 table_name: insert.table_name.clone(),
3054 columns: insert.columns.clone(),
3055 source,
3056 }))
3057 }
3058 PlanNode::Update(update) => {
3059 let assignments = update
3060 .assignments
3061 .iter()
3062 .map(|(col, expr)| Ok((col.clone(), substitute_expr(expr, params)?)))
3063 .collect::<ExecutorResult<Vec<_>>>()?;
3064 let where_clause = update
3065 .where_clause
3066 .as_ref()
3067 .map(|p| substitute_expr(p, params))
3068 .transpose()?;
3069 Ok(PlanNode::Update(UpdateNode {
3070 table_name: update.table_name.clone(),
3071 assignments,
3072 where_clause,
3073 }))
3074 }
3075 PlanNode::Delete(delete) => {
3076 let where_clause = delete
3077 .where_clause
3078 .as_ref()
3079 .map(|p| substitute_expr(p, params))
3080 .transpose()?;
3081 Ok(PlanNode::Delete(DeleteNode {
3082 table_name: delete.table_name.clone(),
3083 where_clause,
3084 }))
3085 }
3086 PlanNode::SetOperation(set_op) => {
3087 let left = Box::new(substitute_parameters(&set_op.left, params)?);
3088 let right = Box::new(substitute_parameters(&set_op.right, params)?);
3089 Ok(PlanNode::SetOperation(SetOperationNode {
3090 op: set_op.op,
3091 left,
3092 right,
3093 }))
3094 }
3095 PlanNode::CreateTable(_)
3097 | PlanNode::DropTable(_)
3098 | PlanNode::AlterTable(_)
3099 | PlanNode::CreateIndex(_)
3100 | PlanNode::DropIndex(_)
3101 | PlanNode::Empty
3102 | PlanNode::BeginTransaction
3103 | PlanNode::CommitTransaction
3104 | PlanNode::RollbackTransaction => Ok(node.clone()),
3105 }
3106}
3107
3108fn substitute_expr(expr: &PlanExpression, params: &[Value]) -> ExecutorResult<PlanExpression> {
3110 match expr {
3111 PlanExpression::Placeholder(idx) => {
3112 let param_idx = *idx - 1;
3114 params.get(param_idx).map(value_to_literal).ok_or_else(|| {
3115 ExecutorError::Internal(format!(
3116 "Missing parameter ${}: expected {} parameters but got {}",
3117 idx,
3118 idx,
3119 params.len()
3120 ))
3121 })
3122 }
3123 PlanExpression::BinaryOp { op, left, right } => Ok(PlanExpression::BinaryOp {
3124 op: *op,
3125 left: Box::new(substitute_expr(left, params)?),
3126 right: Box::new(substitute_expr(right, params)?),
3127 }),
3128 PlanExpression::UnaryOp { op, expr: inner } => Ok(PlanExpression::UnaryOp {
3129 op: *op,
3130 expr: Box::new(substitute_expr(inner, params)?),
3131 }),
3132 PlanExpression::Function {
3133 name,
3134 args,
3135 return_type,
3136 } => {
3137 let new_args = args
3138 .iter()
3139 .map(|a| substitute_expr(a, params))
3140 .collect::<ExecutorResult<Vec<_>>>()?;
3141 Ok(PlanExpression::Function {
3142 name: name.clone(),
3143 args: new_args,
3144 return_type: return_type.clone(),
3145 })
3146 }
3147 PlanExpression::Case {
3148 operand,
3149 conditions,
3150 else_result,
3151 } => {
3152 let new_operand = operand
3153 .as_ref()
3154 .map(|o| substitute_expr(o, params))
3155 .transpose()?
3156 .map(Box::new);
3157 let new_conditions = conditions
3158 .iter()
3159 .map(|(cond, result)| {
3160 Ok((
3161 substitute_expr(cond, params)?,
3162 substitute_expr(result, params)?,
3163 ))
3164 })
3165 .collect::<ExecutorResult<Vec<_>>>()?;
3166 let new_else = else_result
3167 .as_ref()
3168 .map(|e| substitute_expr(e, params))
3169 .transpose()?
3170 .map(Box::new);
3171 Ok(PlanExpression::Case {
3172 operand: new_operand,
3173 conditions: new_conditions,
3174 else_result: new_else,
3175 })
3176 }
3177 PlanExpression::InList {
3178 expr: inner,
3179 list,
3180 negated,
3181 } => {
3182 let new_inner = Box::new(substitute_expr(inner, params)?);
3183 let new_list = list
3184 .iter()
3185 .map(|e| substitute_expr(e, params))
3186 .collect::<ExecutorResult<Vec<_>>>()?;
3187 Ok(PlanExpression::InList {
3188 expr: new_inner,
3189 list: new_list,
3190 negated: *negated,
3191 })
3192 }
3193 PlanExpression::InSubquery {
3194 expr: inner,
3195 subquery,
3196 negated,
3197 } => {
3198 let new_inner = Box::new(substitute_expr(inner, params)?);
3199 let new_subquery = Box::new(substitute_parameters(subquery, params)?);
3200 Ok(PlanExpression::InSubquery {
3201 expr: new_inner,
3202 subquery: new_subquery,
3203 negated: *negated,
3204 })
3205 }
3206 PlanExpression::Exists { subquery, negated } => {
3207 let new_subquery = Box::new(substitute_parameters(subquery, params)?);
3208 Ok(PlanExpression::Exists {
3209 subquery: new_subquery,
3210 negated: *negated,
3211 })
3212 }
3213 PlanExpression::ScalarSubquery(subquery) => {
3214 let new_subquery = Box::new(substitute_parameters(subquery, params)?);
3215 Ok(PlanExpression::ScalarSubquery(new_subquery))
3216 }
3217 PlanExpression::Between {
3218 expr: inner,
3219 low,
3220 high,
3221 negated,
3222 } => Ok(PlanExpression::Between {
3223 expr: Box::new(substitute_expr(inner, params)?),
3224 low: Box::new(substitute_expr(low, params)?),
3225 high: Box::new(substitute_expr(high, params)?),
3226 negated: *negated,
3227 }),
3228 PlanExpression::Like {
3229 expr: inner,
3230 pattern,
3231 negated,
3232 } => Ok(PlanExpression::Like {
3233 expr: Box::new(substitute_expr(inner, params)?),
3234 pattern: Box::new(substitute_expr(pattern, params)?),
3235 negated: *negated,
3236 }),
3237 PlanExpression::IsNull {
3238 expr: inner,
3239 negated,
3240 } => Ok(PlanExpression::IsNull {
3241 expr: Box::new(substitute_expr(inner, params)?),
3242 negated: *negated,
3243 }),
3244 PlanExpression::Cast {
3245 expr: inner,
3246 target_type,
3247 } => Ok(PlanExpression::Cast {
3248 expr: Box::new(substitute_expr(inner, params)?),
3249 target_type: target_type.clone(),
3250 }),
3251 PlanExpression::Literal(_) | PlanExpression::Column { .. } => Ok(expr.clone()),
3253 }
3254}
3255
3256fn value_to_literal(value: &Value) -> PlanExpression {
3258 let lit = match value {
3259 Value::Null => PlanLiteral::Null,
3260 Value::Boolean(b) => PlanLiteral::Boolean(*b),
3261 Value::Integer(i) => PlanLiteral::Integer(*i),
3262 Value::Float(f) => PlanLiteral::Float(*f),
3263 Value::String(s) => PlanLiteral::String(s.clone()),
3264 Value::Bytes(b) => PlanLiteral::String(String::from_utf8_lossy(b).to_string()),
3265 Value::Timestamp(t) => PlanLiteral::Integer(t.timestamp_millis()),
3266 Value::Array(arr) => PlanLiteral::String(format!("{:?}", arr)),
3268 Value::Object(obj) => PlanLiteral::String(format!("{:?}", obj)),
3269 };
3270 PlanExpression::Literal(lit)
3271}
3272
3273fn evaluate_expression(
3275 expr: &PlanExpression,
3276 row: &Row,
3277 columns: &[String],
3278) -> ExecutorResult<Value> {
3279 evaluate_expression_with_context(expr, row, columns, &EvalContext::new())
3280}
3281
3282fn evaluate_expression_with_context(
3284 expr: &PlanExpression,
3285 row: &Row,
3286 columns: &[String],
3287 ctx: &EvalContext,
3288) -> ExecutorResult<Value> {
3289 match expr {
3290 PlanExpression::Literal(lit) => Ok(match lit {
3291 PlanLiteral::Null => Value::Null,
3292 PlanLiteral::Boolean(b) => Value::Boolean(*b),
3293 PlanLiteral::Integer(i) => Value::Integer(*i),
3294 PlanLiteral::Float(f) => Value::Float(*f),
3295 PlanLiteral::String(s) => Value::String(s.clone()),
3296 }),
3297
3298 PlanExpression::Column { table: _, name, .. } => {
3299 if name == "*" {
3300 return Ok(Value::Null);
3301 }
3302
3303 let idx = columns
3304 .iter()
3305 .position(|c| c == name)
3306 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))?;
3307
3308 Ok(row.values.get(idx).cloned().unwrap_or(Value::Null))
3309 }
3310
3311 PlanExpression::BinaryOp { left, op, right } => {
3312 let left_val = evaluate_expression_with_context(left, row, columns, ctx)?;
3313 let right_val = evaluate_expression_with_context(right, row, columns, ctx)?;
3314 evaluate_binary_op(*op, &left_val, &right_val)
3315 }
3316
3317 PlanExpression::UnaryOp { op, expr } => {
3318 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3319 evaluate_unary_op(*op, &val)
3320 }
3321
3322 PlanExpression::IsNull { expr, negated } => {
3323 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3324 let is_null = matches!(val, Value::Null);
3325 Ok(Value::Boolean(if *negated { !is_null } else { is_null }))
3326 }
3327
3328 PlanExpression::Cast { expr, target_type } => {
3329 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3330 cast_value(&val, target_type)
3331 }
3332
3333 PlanExpression::Function { name, args, .. } => {
3334 let arg_values: Vec<Value> = args
3335 .iter()
3336 .map(|a| evaluate_expression_with_context(a, row, columns, ctx))
3337 .collect::<ExecutorResult<Vec<_>>>()?;
3338 evaluate_function(name, &arg_values)
3339 }
3340
3341 PlanExpression::Case {
3342 operand,
3343 conditions,
3344 else_result,
3345 } => {
3346 match operand {
3347 Some(operand_expr) => {
3348 let operand_val =
3350 evaluate_expression_with_context(operand_expr, row, columns, ctx)?;
3351 for (when_expr, then_expr) in conditions {
3352 let when_val =
3353 evaluate_expression_with_context(when_expr, row, columns, ctx)?;
3354 if compare_values(&operand_val, &when_val)? == std::cmp::Ordering::Equal {
3355 return evaluate_expression_with_context(then_expr, row, columns, ctx);
3356 }
3357 }
3358 }
3359 None => {
3360 for (when_expr, then_expr) in conditions {
3362 let when_val =
3363 evaluate_expression_with_context(when_expr, row, columns, ctx)?;
3364 if matches!(when_val, Value::Boolean(true)) {
3365 return evaluate_expression_with_context(then_expr, row, columns, ctx);
3366 }
3367 }
3368 }
3369 }
3370 match else_result {
3372 Some(else_expr) => evaluate_expression_with_context(else_expr, row, columns, ctx),
3373 None => Ok(Value::Null),
3374 }
3375 }
3376
3377 PlanExpression::InList {
3378 expr,
3379 list,
3380 negated,
3381 } => {
3382 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3383 let mut found = false;
3384 for item in list {
3385 let item_val = evaluate_expression_with_context(item, row, columns, ctx)?;
3386 if compare_values(&val, &item_val)? == std::cmp::Ordering::Equal {
3387 found = true;
3388 break;
3389 }
3390 }
3391 Ok(Value::Boolean(if *negated { !found } else { found }))
3392 }
3393
3394 PlanExpression::Between {
3395 expr,
3396 low,
3397 high,
3398 negated,
3399 } => {
3400 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3401 let low_val = evaluate_expression_with_context(low, row, columns, ctx)?;
3402 let high_val = evaluate_expression_with_context(high, row, columns, ctx)?;
3403
3404 let ge_low = compare_values(&val, &low_val)? != std::cmp::Ordering::Less;
3405 let le_high = compare_values(&val, &high_val)? != std::cmp::Ordering::Greater;
3406 let in_range = ge_low && le_high;
3407
3408 Ok(Value::Boolean(if *negated { !in_range } else { in_range }))
3409 }
3410
3411 PlanExpression::Like {
3412 expr,
3413 pattern,
3414 negated,
3415 } => {
3416 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3417 let pattern_val = evaluate_expression_with_context(pattern, row, columns, ctx)?;
3418
3419 let val_str = match val {
3420 Value::String(s) => s,
3421 Value::Null => return Ok(Value::Null),
3422 Value::Integer(i) => i.to_string(),
3423 Value::Float(f) => f.to_string(),
3424 Value::Boolean(b) => b.to_string(),
3425 Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
3426 Value::Timestamp(t) => t.to_rfc3339(),
3427 Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
3428 };
3429
3430 let pattern_str = match pattern_val {
3431 Value::String(s) => s,
3432 Value::Null => return Ok(Value::Null),
3433 Value::Integer(i) => i.to_string(),
3434 Value::Float(f) => f.to_string(),
3435 Value::Boolean(b) => b.to_string(),
3436 Value::Bytes(b) => String::from_utf8_lossy(&b).to_string(),
3437 Value::Timestamp(t) => t.to_rfc3339(),
3438 Value::Array(_) | Value::Object(_) => return Ok(Value::Boolean(false)),
3439 };
3440
3441 let regex_pattern = pattern_str.replace('%', ".*").replace('_', ".");
3443 let regex_pattern = format!("^{}$", regex_pattern);
3444
3445 let matches = if regex_pattern == "^.*$" {
3447 true
3448 } else if regex_pattern.starts_with("^") && regex_pattern.ends_with("$") {
3449 let inner = ®ex_pattern[1..regex_pattern.len() - 1];
3450 if inner.contains(".*") || inner.contains('.') {
3451 let parts: Vec<&str> = inner.split(".*").collect();
3453 if parts.len() == 1 {
3454 val_str == parts[0]
3455 } else {
3456 let mut pos = 0;
3457 let mut matched = true;
3458 for (i, part) in parts.iter().enumerate() {
3459 if part.is_empty() {
3460 continue;
3461 }
3462 if let Some(found_pos) = val_str[pos..].find(part) {
3463 if i == 0 && found_pos != 0 {
3464 matched = false;
3465 break;
3466 }
3467 pos += found_pos + part.len();
3468 } else {
3469 matched = false;
3470 break;
3471 }
3472 }
3473 matched
3474 }
3475 } else {
3476 val_str == inner
3477 }
3478 } else {
3479 val_str.contains(&pattern_str)
3480 };
3481
3482 Ok(Value::Boolean(if *negated { !matches } else { matches }))
3483 }
3484
3485 PlanExpression::InSubquery {
3486 expr,
3487 subquery,
3488 negated,
3489 } => {
3490 let val = evaluate_expression_with_context(expr, row, columns, ctx)?;
3492
3493 let subquery_rows = ctx.execute_subquery(subquery)?;
3495
3496 let mut found = false;
3498 for subquery_row in &subquery_rows {
3499 if let Some(subquery_val) = subquery_row.values.first() {
3500 if compare_values(&val, subquery_val)? == std::cmp::Ordering::Equal {
3501 found = true;
3502 break;
3503 }
3504 }
3505 }
3506
3507 Ok(Value::Boolean(if *negated { !found } else { found }))
3508 }
3509
3510 PlanExpression::Exists { subquery, negated } => {
3511 let subquery_rows = ctx.execute_subquery(subquery)?;
3513 let exists = !subquery_rows.is_empty();
3514
3515 Ok(Value::Boolean(if *negated { !exists } else { exists }))
3516 }
3517
3518 PlanExpression::ScalarSubquery(subquery) => {
3519 let subquery_rows = ctx.execute_subquery(subquery)?;
3521
3522 if subquery_rows.is_empty() {
3523 return Ok(Value::Null);
3524 }
3525
3526 if subquery_rows.len() > 1 {
3527 return Err(ExecutorError::Internal(
3528 "Scalar subquery returned more than one row".to_string(),
3529 ));
3530 }
3531
3532 subquery_rows[0].values.first().cloned().ok_or_else(|| {
3534 ExecutorError::Internal("Scalar subquery returned no columns".to_string())
3535 })
3536 }
3537
3538 PlanExpression::Placeholder(idx) => {
3539 Err(ExecutorError::Internal(format!(
3541 "Unresolved placeholder ${}",
3542 idx
3543 )))
3544 }
3545 }
3546}
3547
3548fn evaluate_binary_op(op: PlanBinaryOp, left: &Value, right: &Value) -> ExecutorResult<Value> {
3549 if matches!(left, Value::Null) || matches!(right, Value::Null) {
3550 if matches!(op, PlanBinaryOp::Equal | PlanBinaryOp::NotEqual) {
3551 return Ok(Value::Null);
3552 }
3553 return Ok(Value::Null);
3554 }
3555
3556 match op {
3557 PlanBinaryOp::Add => {
3558 let l = value_to_f64(left)?;
3559 let r = value_to_f64(right)?;
3560 Ok(Value::Float(l + r))
3561 }
3562 PlanBinaryOp::Subtract => {
3563 let l = value_to_f64(left)?;
3564 let r = value_to_f64(right)?;
3565 Ok(Value::Float(l - r))
3566 }
3567 PlanBinaryOp::Multiply => {
3568 let l = value_to_f64(left)?;
3569 let r = value_to_f64(right)?;
3570 Ok(Value::Float(l * r))
3571 }
3572 PlanBinaryOp::Divide => {
3573 let l = value_to_f64(left)?;
3574 let r = value_to_f64(right)?;
3575 if r == 0.0 {
3576 return Err(ExecutorError::DivisionByZero);
3577 }
3578 Ok(Value::Float(l / r))
3579 }
3580 PlanBinaryOp::Modulo => {
3581 let l = value_to_i64(left)?;
3582 let r = value_to_i64(right)?;
3583 if r == 0 {
3584 return Err(ExecutorError::DivisionByZero);
3585 }
3586 Ok(Value::Integer(l % r))
3587 }
3588 PlanBinaryOp::Equal => Ok(Value::Boolean(
3589 compare_values(left, right)? == std::cmp::Ordering::Equal,
3590 )),
3591 PlanBinaryOp::NotEqual => Ok(Value::Boolean(
3592 compare_values(left, right)? != std::cmp::Ordering::Equal,
3593 )),
3594 PlanBinaryOp::LessThan => Ok(Value::Boolean(
3595 compare_values(left, right)? == std::cmp::Ordering::Less,
3596 )),
3597 PlanBinaryOp::LessThanOrEqual => Ok(Value::Boolean(
3598 compare_values(left, right)? != std::cmp::Ordering::Greater,
3599 )),
3600 PlanBinaryOp::GreaterThan => Ok(Value::Boolean(
3601 compare_values(left, right)? == std::cmp::Ordering::Greater,
3602 )),
3603 PlanBinaryOp::GreaterThanOrEqual => Ok(Value::Boolean(
3604 compare_values(left, right)? != std::cmp::Ordering::Less,
3605 )),
3606 PlanBinaryOp::And => {
3607 let l = value_to_bool(left)?;
3608 let r = value_to_bool(right)?;
3609 Ok(Value::Boolean(l && r))
3610 }
3611 PlanBinaryOp::Or => {
3612 let l = value_to_bool(left)?;
3613 let r = value_to_bool(right)?;
3614 Ok(Value::Boolean(l || r))
3615 }
3616 PlanBinaryOp::Concat => {
3617 let l = value_to_string(left);
3618 let r = value_to_string(right);
3619 Ok(Value::String(format!("{}{}", l, r)))
3620 }
3621 }
3622}
3623
3624fn evaluate_unary_op(op: PlanUnaryOp, value: &Value) -> ExecutorResult<Value> {
3625 match op {
3626 PlanUnaryOp::Not => {
3627 let b = value_to_bool(value)?;
3628 Ok(Value::Boolean(!b))
3629 }
3630 PlanUnaryOp::Negative => {
3631 let f = value_to_f64(value)?;
3632 Ok(Value::Float(-f))
3633 }
3634 }
3635}
3636
3637fn evaluate_function(name: &str, args: &[Value]) -> ExecutorResult<Value> {
3638 match name.to_uppercase().as_str() {
3639 "UPPER" => {
3640 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3641 Ok(Value::String(s.to_uppercase()))
3642 }
3643 "LOWER" => {
3644 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3645 Ok(Value::String(s.to_lowercase()))
3646 }
3647 "LENGTH" => {
3648 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3649 Ok(Value::Integer(s.len() as i64))
3650 }
3651 "ABS" => {
3652 let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3653 Ok(Value::Float(f.abs()))
3654 }
3655 "COALESCE" => {
3656 for arg in args {
3657 if !matches!(arg, Value::Null) {
3658 return Ok(arg.clone());
3659 }
3660 }
3661 Ok(Value::Null)
3662 }
3663 "ROUND" => {
3664 let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3665 let decimals = args
3666 .get(1)
3667 .and_then(|v| match v {
3668 Value::Integer(i) => Some(*i as i32),
3669 _ => None,
3670 })
3671 .unwrap_or(0);
3672 let factor = 10_f64.powi(decimals);
3673 Ok(Value::Float((f * factor).round() / factor))
3674 }
3675 "CEIL" | "CEILING" => {
3676 let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3677 Ok(Value::Float(f.ceil()))
3678 }
3679 "FLOOR" => {
3680 let f = value_to_f64(&args.first().cloned().unwrap_or(Value::Null))?;
3681 Ok(Value::Float(f.floor()))
3682 }
3683 "SUBSTRING" | "SUBSTR" => {
3684 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3685 let start = args
3686 .get(1)
3687 .and_then(|v| match v {
3688 Value::Integer(i) => Some((*i as usize).saturating_sub(1)), _ => None,
3690 })
3691 .unwrap_or(0);
3692 let len = args.get(2).and_then(|v| match v {
3693 Value::Integer(i) => Some(*i as usize),
3694 _ => None,
3695 });
3696 let result: String = match len {
3697 Some(l) => s.chars().skip(start).take(l).collect(),
3698 None => s.chars().skip(start).collect(),
3699 };
3700 Ok(Value::String(result))
3701 }
3702 "TRIM" => {
3703 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3704 Ok(Value::String(s.trim().to_string()))
3705 }
3706 "LTRIM" => {
3707 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3708 Ok(Value::String(s.trim_start().to_string()))
3709 }
3710 "RTRIM" => {
3711 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3712 Ok(Value::String(s.trim_end().to_string()))
3713 }
3714 "NULLIF" => {
3715 if args.len() >= 2 && args[0] == args[1] {
3716 Ok(Value::Null)
3717 } else {
3718 Ok(args.first().cloned().unwrap_or(Value::Null))
3719 }
3720 }
3721 "NOW" | "CURRENT_TIMESTAMP" => Ok(Value::String(
3722 chrono::Utc::now().format("%Y-%m-%d %H:%M:%S").to_string(),
3723 )),
3724 "CURRENT_DATE" => Ok(Value::String(
3725 chrono::Utc::now().format("%Y-%m-%d").to_string(),
3726 )),
3727 "CURRENT_TIME" => Ok(Value::String(
3728 chrono::Utc::now().format("%H:%M:%S").to_string(),
3729 )),
3730 "EXTRACT" => {
3731 let s = value_to_string(&args.last().cloned().unwrap_or(Value::Null));
3733 let part =
3734 value_to_string(&args.first().cloned().unwrap_or(Value::Null)).to_uppercase();
3735 if let Ok(dt) = chrono::NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S") {
3736 let val = match part.as_str() {
3737 "YEAR" => dt.format("%Y").to_string().parse::<i64>().unwrap_or(0),
3738 "MONTH" => dt.format("%m").to_string().parse::<i64>().unwrap_or(0),
3739 "DAY" => dt.format("%d").to_string().parse::<i64>().unwrap_or(0),
3740 "HOUR" => dt.format("%H").to_string().parse::<i64>().unwrap_or(0),
3741 "MINUTE" => dt.format("%M").to_string().parse::<i64>().unwrap_or(0),
3742 "SECOND" => dt.format("%S").to_string().parse::<i64>().unwrap_or(0),
3743 _ => 0,
3744 };
3745 Ok(Value::Integer(val))
3746 } else {
3747 Ok(Value::Null)
3748 }
3749 }
3750 "REPLACE" => {
3751 let s = value_to_string(&args.first().cloned().unwrap_or(Value::Null));
3752 let from = value_to_string(&args.get(1).cloned().unwrap_or(Value::Null));
3753 let to = value_to_string(&args.get(2).cloned().unwrap_or(Value::Null));
3754 Ok(Value::String(s.replace(&from, &to)))
3755 }
3756 "CONCAT" => {
3757 let result: String = args.iter().map(|a| value_to_string(a)).collect();
3758 Ok(Value::String(result))
3759 }
3760 _ => Err(ExecutorError::InvalidOperation(format!(
3761 "Unknown function: {}",
3762 name
3763 ))),
3764 }
3765}
3766
3767fn value_to_f64(value: &Value) -> ExecutorResult<f64> {
3772 match value {
3773 Value::Integer(i) => Ok(*i as f64),
3774 Value::Float(f) => Ok(*f),
3775 Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
3776 expected: "number".to_string(),
3777 actual: "string".to_string(),
3778 }),
3779 _ => Err(ExecutorError::TypeMismatch {
3780 expected: "number".to_string(),
3781 actual: format!("{:?}", value),
3782 }),
3783 }
3784}
3785
3786fn value_to_i64(value: &Value) -> ExecutorResult<i64> {
3787 match value {
3788 Value::Integer(i) => Ok(*i),
3789 Value::Float(f) => Ok(*f as i64),
3790 Value::String(s) => s.parse().map_err(|_| ExecutorError::TypeMismatch {
3791 expected: "integer".to_string(),
3792 actual: "string".to_string(),
3793 }),
3794 _ => Err(ExecutorError::TypeMismatch {
3795 expected: "integer".to_string(),
3796 actual: format!("{:?}", value),
3797 }),
3798 }
3799}
3800
3801fn value_to_bool(value: &Value) -> ExecutorResult<bool> {
3802 match value {
3803 Value::Boolean(b) => Ok(*b),
3804 Value::Integer(i) => Ok(*i != 0),
3805 Value::Null => Ok(false),
3806 _ => Err(ExecutorError::TypeMismatch {
3807 expected: "boolean".to_string(),
3808 actual: format!("{:?}", value),
3809 }),
3810 }
3811}
3812
3813fn value_to_string(value: &Value) -> String {
3814 match value {
3815 Value::String(s) => s.clone(),
3816 Value::Integer(i) => i.to_string(),
3817 Value::Float(f) => f.to_string(),
3818 Value::Boolean(b) => b.to_string(),
3819 Value::Null => String::new(),
3820 _ => format!("{:?}", value),
3821 }
3822}
3823
3824fn compare_values(left: &Value, right: &Value) -> ExecutorResult<std::cmp::Ordering> {
3825 match (left, right) {
3826 (Value::Integer(l), Value::Integer(r)) => Ok(l.cmp(r)),
3827 (Value::Float(l), Value::Float(r)) => {
3828 Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
3829 }
3830 (Value::Integer(l), Value::Float(r)) => {
3831 let l = *l as f64;
3832 Ok(l.partial_cmp(r).unwrap_or(std::cmp::Ordering::Equal))
3833 }
3834 (Value::Float(l), Value::Integer(r)) => {
3835 let r = *r as f64;
3836 Ok(l.partial_cmp(&r).unwrap_or(std::cmp::Ordering::Equal))
3837 }
3838 (Value::String(l), Value::String(r)) => Ok(l.cmp(r)),
3839 (Value::Boolean(l), Value::Boolean(r)) => Ok(l.cmp(r)),
3840 _ => Ok(std::cmp::Ordering::Equal),
3841 }
3842}
3843
3844fn cast_value(value: &Value, target_type: &DataType) -> ExecutorResult<Value> {
3845 match target_type {
3846 DataType::Integer => Ok(Value::Integer(value_to_i64(value)?)),
3847 DataType::Float => Ok(Value::Float(value_to_f64(value)?)),
3848 DataType::Text => Ok(Value::String(value_to_string(value))),
3849 DataType::Boolean => Ok(Value::Boolean(value_to_bool(value)?)),
3850 _ => Ok(value.clone()),
3851 }
3852}
3853
3854#[cfg(test)]
3859mod tests {
3860 use super::*;
3861 use crate::planner::{LimitNode, PlanNode, ProjectNode, ProjectionExpr, QueryPlan, ScanNode};
3862
3863 fn create_test_context() -> ExecutionContext {
3864 let mut context = ExecutionContext::new();
3865
3866 context.add_table(TableData {
3867 name: "users".to_string(),
3868 columns: vec!["id".to_string(), "name".to_string(), "age".to_string()],
3869 rows: vec![
3870 Row {
3871 values: vec![
3872 Value::Integer(1),
3873 Value::String("Alice".to_string()),
3874 Value::Integer(30),
3875 ],
3876 },
3877 Row {
3878 values: vec![
3879 Value::Integer(2),
3880 Value::String("Bob".to_string()),
3881 Value::Integer(25),
3882 ],
3883 },
3884 Row {
3885 values: vec![
3886 Value::Integer(3),
3887 Value::String("Charlie".to_string()),
3888 Value::Integer(35),
3889 ],
3890 },
3891 ],
3892 row_created_version: vec![0, 0, 0],
3893 row_deleted_version: vec![0, 0, 0],
3894 });
3895
3896 context
3897 }
3898
3899 #[test]
3900 fn test_scan_operator() {
3901 let context = create_test_context();
3902 let executor = Executor::new(context);
3903
3904 let plan = QueryPlan {
3905 root: PlanNode::Project(ProjectNode {
3906 input: Box::new(PlanNode::Scan(ScanNode {
3907 table_name: "users".to_string(),
3908 alias: None,
3909 columns: vec!["id".to_string(), "name".to_string(), "age".to_string()],
3910 index_scan: None,
3911 })),
3912 expressions: vec![
3913 ProjectionExpr {
3914 expr: PlanExpression::Column {
3915 table: None,
3916 name: "id".to_string(),
3917 data_type: DataType::Integer,
3918 },
3919 alias: Some("id".to_string()),
3920 },
3921 ProjectionExpr {
3922 expr: PlanExpression::Column {
3923 table: None,
3924 name: "name".to_string(),
3925 data_type: DataType::Text,
3926 },
3927 alias: Some("name".to_string()),
3928 },
3929 ],
3930 }),
3931 estimated_cost: 100.0,
3932 estimated_rows: 3,
3933 };
3934
3935 let result = executor.execute(&plan).unwrap();
3936
3937 assert_eq!(result.rows.len(), 3);
3938 assert_eq!(result.columns.len(), 2);
3939 }
3940
3941 #[test]
3942 fn test_filter_operator() {
3943 let context = create_test_context();
3944 let executor = Executor::new(context);
3945
3946 let plan = QueryPlan {
3947 root: PlanNode::Project(ProjectNode {
3948 input: Box::new(PlanNode::Filter(crate::planner::FilterNode {
3949 input: Box::new(PlanNode::Scan(ScanNode {
3950 table_name: "users".to_string(),
3951 alias: None,
3952 columns: vec!["id".to_string(), "name".to_string(), "age".to_string()],
3953 index_scan: None,
3954 })),
3955 predicate: PlanExpression::BinaryOp {
3956 left: Box::new(PlanExpression::Column {
3957 table: None,
3958 name: "age".to_string(),
3959 data_type: DataType::Integer,
3960 }),
3961 op: PlanBinaryOp::GreaterThan,
3962 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(28))),
3963 },
3964 })),
3965 expressions: vec![ProjectionExpr {
3966 expr: PlanExpression::Column {
3967 table: None,
3968 name: "name".to_string(),
3969 data_type: DataType::Text,
3970 },
3971 alias: Some("name".to_string()),
3972 }],
3973 }),
3974 estimated_cost: 100.0,
3975 estimated_rows: 2,
3976 };
3977
3978 let result = executor.execute(&plan).unwrap();
3979
3980 assert_eq!(result.rows.len(), 2);
3981 }
3982
3983 #[test]
3984 fn test_limit_operator() {
3985 let context = create_test_context();
3986 let executor = Executor::new(context);
3987
3988 let plan = QueryPlan {
3989 root: PlanNode::Limit(LimitNode {
3990 input: Box::new(PlanNode::Project(ProjectNode {
3991 input: Box::new(PlanNode::Scan(ScanNode {
3992 table_name: "users".to_string(),
3993 alias: None,
3994 columns: vec!["id".to_string()],
3995 index_scan: None,
3996 })),
3997 expressions: vec![ProjectionExpr {
3998 expr: PlanExpression::Column {
3999 table: None,
4000 name: "id".to_string(),
4001 data_type: DataType::Integer,
4002 },
4003 alias: Some("id".to_string()),
4004 }],
4005 })),
4006 limit: Some(2),
4007 offset: None,
4008 }),
4009 estimated_cost: 100.0,
4010 estimated_rows: 2,
4011 };
4012
4013 let result = executor.execute(&plan).unwrap();
4014
4015 assert_eq!(result.rows.len(), 2);
4016 }
4017
4018 #[test]
4019 fn test_create_table() {
4020 use crate::planner::{CreateColumnDef, CreateTableNode};
4021
4022 let executor = Executor::new(ExecutionContext::new());
4023
4024 let plan = QueryPlan {
4025 root: PlanNode::CreateTable(CreateTableNode {
4026 table_name: "test_table".to_string(),
4027 columns: vec![
4028 CreateColumnDef {
4029 name: "id".to_string(),
4030 data_type: DataType::Integer,
4031 nullable: false,
4032 default: None,
4033 primary_key: true,
4034 unique: false,
4035 },
4036 CreateColumnDef {
4037 name: "name".to_string(),
4038 data_type: DataType::Text,
4039 nullable: true,
4040 default: None,
4041 primary_key: false,
4042 unique: false,
4043 },
4044 ],
4045 constraints: vec![],
4046 if_not_exists: false,
4047 }),
4048 estimated_cost: 1.0,
4049 estimated_rows: 0,
4050 };
4051
4052 let result = executor.execute(&plan).unwrap();
4053 match &result.rows[0].values[0] {
4054 Value::String(s) => assert!(s.contains("created")),
4055 _ => panic!("Expected string result"),
4056 }
4057
4058 let tables = executor.context.read().unwrap().list_tables();
4060 assert!(tables.contains(&"test_table".to_string()));
4061 }
4062
4063 #[test]
4064 fn test_insert_into_table() {
4065 use crate::planner::{CreateColumnDef, CreateTableNode, InsertNode, InsertPlanSource};
4066
4067 let executor = Executor::new(ExecutionContext::new());
4068
4069 let create_plan = QueryPlan {
4071 root: PlanNode::CreateTable(CreateTableNode {
4072 table_name: "test_insert".to_string(),
4073 columns: vec![
4074 CreateColumnDef {
4075 name: "id".to_string(),
4076 data_type: DataType::Integer,
4077 nullable: false,
4078 default: None,
4079 primary_key: true,
4080 unique: false,
4081 },
4082 CreateColumnDef {
4083 name: "value".to_string(),
4084 data_type: DataType::Text,
4085 nullable: true,
4086 default: None,
4087 primary_key: false,
4088 unique: false,
4089 },
4090 ],
4091 constraints: vec![],
4092 if_not_exists: false,
4093 }),
4094 estimated_cost: 1.0,
4095 estimated_rows: 0,
4096 };
4097 executor.execute(&create_plan).unwrap();
4098
4099 let insert_plan = QueryPlan {
4101 root: PlanNode::Insert(InsertNode {
4102 table_name: "test_insert".to_string(),
4103 columns: vec!["id".to_string(), "value".to_string()],
4104 source: InsertPlanSource::Values(vec![
4105 vec![
4106 PlanExpression::Literal(PlanLiteral::Integer(1)),
4107 PlanExpression::Literal(PlanLiteral::String("hello".to_string())),
4108 ],
4109 vec![
4110 PlanExpression::Literal(PlanLiteral::Integer(2)),
4111 PlanExpression::Literal(PlanLiteral::String("world".to_string())),
4112 ],
4113 ]),
4114 }),
4115 estimated_cost: 2.0,
4116 estimated_rows: 2,
4117 };
4118
4119 let result = executor.execute(&insert_plan).unwrap();
4120 assert_eq!(result.rows_affected, 2);
4121
4122 let query_plan = QueryPlan {
4124 root: PlanNode::Project(ProjectNode {
4125 input: Box::new(PlanNode::Scan(ScanNode {
4126 table_name: "test_insert".to_string(),
4127 alias: None,
4128 columns: vec!["id".to_string(), "value".to_string()],
4129 index_scan: None,
4130 })),
4131 expressions: vec![ProjectionExpr {
4132 expr: PlanExpression::Column {
4133 table: None,
4134 name: "id".to_string(),
4135 data_type: DataType::Integer,
4136 },
4137 alias: Some("id".to_string()),
4138 }],
4139 }),
4140 estimated_cost: 100.0,
4141 estimated_rows: 2,
4142 };
4143
4144 let query_result = executor.execute(&query_plan).unwrap();
4145 assert_eq!(query_result.rows.len(), 2);
4146 }
4147
4148 #[test]
4149 fn test_drop_table() {
4150 use crate::planner::{CreateColumnDef, CreateTableNode, DropTableNode};
4151
4152 let executor = Executor::new(ExecutionContext::new());
4153
4154 let create_plan = QueryPlan {
4156 root: PlanNode::CreateTable(CreateTableNode {
4157 table_name: "to_drop".to_string(),
4158 columns: vec![CreateColumnDef {
4159 name: "id".to_string(),
4160 data_type: DataType::Integer,
4161 nullable: false,
4162 default: None,
4163 primary_key: true,
4164 unique: false,
4165 }],
4166 constraints: vec![],
4167 if_not_exists: false,
4168 }),
4169 estimated_cost: 1.0,
4170 estimated_rows: 0,
4171 };
4172 executor.execute(&create_plan).unwrap();
4173
4174 let tables = executor.context.read().unwrap().list_tables();
4176 assert!(tables.contains(&"to_drop".to_string()));
4177
4178 let drop_plan = QueryPlan {
4180 root: PlanNode::DropTable(DropTableNode {
4181 table_name: "to_drop".to_string(),
4182 if_exists: false,
4183 }),
4184 estimated_cost: 1.0,
4185 estimated_rows: 0,
4186 };
4187 executor.execute(&drop_plan).unwrap();
4188
4189 let tables = executor.context.read().unwrap().list_tables();
4191 assert!(!tables.contains(&"to_drop".to_string()));
4192 }
4193
4194 #[test]
4195 fn test_shared_context_persistence() {
4196 use crate::planner::{CreateColumnDef, CreateTableNode};
4197 use std::sync::{Arc, RwLock};
4198
4199 let shared_context = Arc::new(RwLock::new(ExecutionContext::new()));
4201
4202 let executor = Executor::with_shared_context(shared_context.clone());
4204
4205 let create_plan = QueryPlan {
4207 root: PlanNode::CreateTable(CreateTableNode {
4208 table_name: "shared_table".to_string(),
4209 columns: vec![CreateColumnDef {
4210 name: "id".to_string(),
4211 data_type: DataType::Integer,
4212 nullable: false,
4213 default: None,
4214 primary_key: true,
4215 unique: false,
4216 }],
4217 constraints: vec![],
4218 if_not_exists: false,
4219 }),
4220 estimated_cost: 1.0,
4221 estimated_rows: 0,
4222 };
4223 executor.execute(&create_plan).unwrap();
4224
4225 let executor2 = Executor::with_shared_context(shared_context.clone());
4227
4228 let tables = executor2.context.read().unwrap().list_tables();
4230 assert!(tables.contains(&"shared_table".to_string()));
4231 }
4232
4233 #[test]
4234 fn test_insert_select() {
4235 use crate::planner::{
4236 CreateColumnDef, CreateTableNode, FilterNode, InsertNode, InsertPlanSource,
4237 };
4238
4239 let executor = Executor::new(ExecutionContext::new());
4240
4241 let create_source = QueryPlan {
4243 root: PlanNode::CreateTable(CreateTableNode {
4244 table_name: "source_table".to_string(),
4245 columns: vec![
4246 CreateColumnDef {
4247 name: "id".to_string(),
4248 data_type: DataType::Integer,
4249 nullable: false,
4250 default: None,
4251 primary_key: true,
4252 unique: false,
4253 },
4254 CreateColumnDef {
4255 name: "value".to_string(),
4256 data_type: DataType::Text,
4257 nullable: true,
4258 default: None,
4259 primary_key: false,
4260 unique: false,
4261 },
4262 ],
4263 constraints: vec![],
4264 if_not_exists: false,
4265 }),
4266 estimated_cost: 1.0,
4267 estimated_rows: 0,
4268 };
4269 executor.execute(&create_source).unwrap();
4270
4271 let create_dest = QueryPlan {
4273 root: PlanNode::CreateTable(CreateTableNode {
4274 table_name: "dest_table".to_string(),
4275 columns: vec![
4276 CreateColumnDef {
4277 name: "id".to_string(),
4278 data_type: DataType::Integer,
4279 nullable: false,
4280 default: None,
4281 primary_key: true,
4282 unique: false,
4283 },
4284 CreateColumnDef {
4285 name: "value".to_string(),
4286 data_type: DataType::Text,
4287 nullable: true,
4288 default: None,
4289 primary_key: false,
4290 unique: false,
4291 },
4292 ],
4293 constraints: vec![],
4294 if_not_exists: false,
4295 }),
4296 estimated_cost: 1.0,
4297 estimated_rows: 0,
4298 };
4299 executor.execute(&create_dest).unwrap();
4300
4301 let insert_source = QueryPlan {
4303 root: PlanNode::Insert(InsertNode {
4304 table_name: "source_table".to_string(),
4305 columns: vec!["id".to_string(), "value".to_string()],
4306 source: InsertPlanSource::Values(vec![
4307 vec![
4308 PlanExpression::Literal(PlanLiteral::Integer(1)),
4309 PlanExpression::Literal(PlanLiteral::String("one".to_string())),
4310 ],
4311 vec![
4312 PlanExpression::Literal(PlanLiteral::Integer(2)),
4313 PlanExpression::Literal(PlanLiteral::String("two".to_string())),
4314 ],
4315 vec![
4316 PlanExpression::Literal(PlanLiteral::Integer(3)),
4317 PlanExpression::Literal(PlanLiteral::String("three".to_string())),
4318 ],
4319 ]),
4320 }),
4321 estimated_cost: 3.0,
4322 estimated_rows: 3,
4323 };
4324 executor.execute(&insert_source).unwrap();
4325
4326 let insert_select = QueryPlan {
4328 root: PlanNode::Insert(InsertNode {
4329 table_name: "dest_table".to_string(),
4330 columns: vec!["id".to_string(), "value".to_string()],
4331 source: InsertPlanSource::Query(Box::new(PlanNode::Project(ProjectNode {
4332 input: Box::new(PlanNode::Filter(FilterNode {
4333 input: Box::new(PlanNode::Scan(ScanNode {
4334 table_name: "source_table".to_string(),
4335 alias: None,
4336 columns: vec!["id".to_string(), "value".to_string()],
4337 index_scan: None,
4338 })),
4339 predicate: PlanExpression::BinaryOp {
4340 left: Box::new(PlanExpression::Column {
4341 table: None,
4342 name: "id".to_string(),
4343 data_type: DataType::Integer,
4344 }),
4345 op: PlanBinaryOp::GreaterThan,
4346 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
4347 },
4348 })),
4349 expressions: vec![
4350 ProjectionExpr {
4351 expr: PlanExpression::Column {
4352 table: None,
4353 name: "id".to_string(),
4354 data_type: DataType::Integer,
4355 },
4356 alias: Some("id".to_string()),
4357 },
4358 ProjectionExpr {
4359 expr: PlanExpression::Column {
4360 table: None,
4361 name: "value".to_string(),
4362 data_type: DataType::Text,
4363 },
4364 alias: Some("value".to_string()),
4365 },
4366 ],
4367 }))),
4368 }),
4369 estimated_cost: 2.0,
4370 estimated_rows: 2,
4371 };
4372
4373 let result = executor.execute(&insert_select).unwrap();
4374 assert_eq!(result.rows_affected, 2); let query_plan = QueryPlan {
4378 root: PlanNode::Project(ProjectNode {
4379 input: Box::new(PlanNode::Scan(ScanNode {
4380 table_name: "dest_table".to_string(),
4381 alias: None,
4382 columns: vec!["id".to_string(), "value".to_string()],
4383 index_scan: None,
4384 })),
4385 expressions: vec![ProjectionExpr {
4386 expr: PlanExpression::Column {
4387 table: None,
4388 name: "id".to_string(),
4389 data_type: DataType::Integer,
4390 },
4391 alias: Some("id".to_string()),
4392 }],
4393 }),
4394 estimated_cost: 100.0,
4395 estimated_rows: 2,
4396 };
4397
4398 let query_result = executor.execute(&query_plan).unwrap();
4399 assert_eq!(query_result.rows.len(), 2);
4400 }
4401
4402 #[test]
4403 fn test_case_expression() {
4404 let row = Row {
4405 values: vec![Value::Integer(2)],
4406 };
4407 let columns = vec!["status".to_string()];
4408
4409 let case_expr = PlanExpression::Case {
4411 operand: None,
4412 conditions: vec![
4413 (
4414 PlanExpression::BinaryOp {
4415 left: Box::new(PlanExpression::Column {
4416 table: None,
4417 name: "status".to_string(),
4418 data_type: DataType::Integer,
4419 }),
4420 op: PlanBinaryOp::Equal,
4421 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(1))),
4422 },
4423 PlanExpression::Literal(PlanLiteral::String("one".to_string())),
4424 ),
4425 (
4426 PlanExpression::BinaryOp {
4427 left: Box::new(PlanExpression::Column {
4428 table: None,
4429 name: "status".to_string(),
4430 data_type: DataType::Integer,
4431 }),
4432 op: PlanBinaryOp::Equal,
4433 right: Box::new(PlanExpression::Literal(PlanLiteral::Integer(2))),
4434 },
4435 PlanExpression::Literal(PlanLiteral::String("two".to_string())),
4436 ),
4437 ],
4438 else_result: Some(Box::new(PlanExpression::Literal(PlanLiteral::String(
4439 "other".to_string(),
4440 )))),
4441 };
4442
4443 let result = evaluate_expression(&case_expr, &row, &columns).unwrap();
4444 assert_eq!(result, Value::String("two".to_string()));
4445 }
4446
4447 #[test]
4448 fn test_in_list_expression() {
4449 let row = Row {
4450 values: vec![Value::Integer(3)],
4451 };
4452 let columns = vec!["id".to_string()];
4453
4454 let in_expr = PlanExpression::InList {
4456 expr: Box::new(PlanExpression::Column {
4457 table: None,
4458 name: "id".to_string(),
4459 data_type: DataType::Integer,
4460 }),
4461 list: vec![
4462 PlanExpression::Literal(PlanLiteral::Integer(1)),
4463 PlanExpression::Literal(PlanLiteral::Integer(2)),
4464 PlanExpression::Literal(PlanLiteral::Integer(3)),
4465 PlanExpression::Literal(PlanLiteral::Integer(4)),
4466 PlanExpression::Literal(PlanLiteral::Integer(5)),
4467 ],
4468 negated: false,
4469 };
4470
4471 let result = evaluate_expression(&in_expr, &row, &columns).unwrap();
4472 assert_eq!(result, Value::Boolean(true));
4473
4474 let not_in_expr = PlanExpression::InList {
4476 expr: Box::new(PlanExpression::Column {
4477 table: None,
4478 name: "id".to_string(),
4479 data_type: DataType::Integer,
4480 }),
4481 list: vec![
4482 PlanExpression::Literal(PlanLiteral::Integer(1)),
4483 PlanExpression::Literal(PlanLiteral::Integer(2)),
4484 PlanExpression::Literal(PlanLiteral::Integer(3)),
4485 ],
4486 negated: true,
4487 };
4488
4489 let result = evaluate_expression(¬_in_expr, &row, &columns).unwrap();
4490 assert_eq!(result, Value::Boolean(false));
4491 }
4492
4493 #[test]
4494 fn test_between_expression() {
4495 let row = Row {
4496 values: vec![Value::Integer(50)],
4497 };
4498 let columns = vec!["value".to_string()];
4499
4500 let between_expr = PlanExpression::Between {
4502 expr: Box::new(PlanExpression::Column {
4503 table: None,
4504 name: "value".to_string(),
4505 data_type: DataType::Integer,
4506 }),
4507 low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
4508 high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(100))),
4509 negated: false,
4510 };
4511
4512 let result = evaluate_expression(&between_expr, &row, &columns).unwrap();
4513 assert_eq!(result, Value::Boolean(true));
4514
4515 let not_between_expr = PlanExpression::Between {
4517 expr: Box::new(PlanExpression::Column {
4518 table: None,
4519 name: "value".to_string(),
4520 data_type: DataType::Integer,
4521 }),
4522 low: Box::new(PlanExpression::Literal(PlanLiteral::Integer(10))),
4523 high: Box::new(PlanExpression::Literal(PlanLiteral::Integer(40))),
4524 negated: true,
4525 };
4526
4527 let result = evaluate_expression(¬_between_expr, &row, &columns).unwrap();
4528 assert_eq!(result, Value::Boolean(true));
4529 }
4530
4531 #[test]
4532 fn test_like_expression() {
4533 let row = Row {
4534 values: vec![Value::String("hello world".to_string())],
4535 };
4536 let columns = vec!["text".to_string()];
4537
4538 let like_expr = PlanExpression::Like {
4540 expr: Box::new(PlanExpression::Column {
4541 table: None,
4542 name: "text".to_string(),
4543 data_type: DataType::Text,
4544 }),
4545 pattern: Box::new(PlanExpression::Literal(PlanLiteral::String(
4546 "hello%".to_string(),
4547 ))),
4548 negated: false,
4549 };
4550
4551 let result = evaluate_expression(&like_expr, &row, &columns).unwrap();
4552 assert_eq!(result, Value::Boolean(true));
4553
4554 let like_expr2 = PlanExpression::Like {
4556 expr: Box::new(PlanExpression::Column {
4557 table: None,
4558 name: "text".to_string(),
4559 data_type: DataType::Text,
4560 }),
4561 pattern: Box::new(PlanExpression::Literal(PlanLiteral::String(
4562 "%world".to_string(),
4563 ))),
4564 negated: false,
4565 };
4566
4567 let result = evaluate_expression(&like_expr2, &row, &columns).unwrap();
4568 assert_eq!(result, Value::Boolean(true));
4569
4570 let not_like_expr = PlanExpression::Like {
4572 expr: Box::new(PlanExpression::Column {
4573 table: None,
4574 name: "text".to_string(),
4575 data_type: DataType::Text,
4576 }),
4577 pattern: Box::new(PlanExpression::Literal(PlanLiteral::String(
4578 "%foo%".to_string(),
4579 ))),
4580 negated: true,
4581 };
4582
4583 let result = evaluate_expression(¬_like_expr, &row, &columns).unwrap();
4584 assert_eq!(result, Value::Boolean(true));
4585 }
4586
4587 #[test]
4588 fn test_alter_table() {
4589 use crate::planner::{
4590 AlterTableNode, CreateColumnDef, CreateTableNode, PlanAlterOperation,
4591 };
4592
4593 let executor = Executor::new(ExecutionContext::new());
4594
4595 let create_plan = QueryPlan {
4597 root: PlanNode::CreateTable(CreateTableNode {
4598 table_name: "alter_test".to_string(),
4599 columns: vec![CreateColumnDef {
4600 name: "id".to_string(),
4601 data_type: DataType::Integer,
4602 nullable: false,
4603 default: None,
4604 primary_key: true,
4605 unique: false,
4606 }],
4607 constraints: vec![],
4608 if_not_exists: false,
4609 }),
4610 estimated_cost: 1.0,
4611 estimated_rows: 0,
4612 };
4613 executor.execute(&create_plan).unwrap();
4614
4615 let add_column_plan = QueryPlan {
4617 root: PlanNode::AlterTable(AlterTableNode {
4618 table_name: "alter_test".to_string(),
4619 operations: vec![PlanAlterOperation::AddColumn(CreateColumnDef {
4620 name: "name".to_string(),
4621 data_type: DataType::Text,
4622 nullable: true,
4623 default: None,
4624 primary_key: false,
4625 unique: false,
4626 })],
4627 }),
4628 estimated_cost: 1.0,
4629 estimated_rows: 0,
4630 };
4631 let result = executor.execute(&add_column_plan).unwrap();
4632 match &result.rows[0].values[0] {
4633 Value::String(s) => assert!(s.contains("altered")),
4634 _ => panic!("Expected string result"),
4635 }
4636
4637 let schema = executor
4639 .context
4640 .read()
4641 .unwrap()
4642 .get_table_schema("alter_test")
4643 .unwrap()
4644 .clone();
4645 assert_eq!(schema.columns.len(), 2);
4646 assert_eq!(schema.columns[1].name, "name");
4647
4648 let rename_column_plan = QueryPlan {
4650 root: PlanNode::AlterTable(AlterTableNode {
4651 table_name: "alter_test".to_string(),
4652 operations: vec![PlanAlterOperation::RenameColumn {
4653 old_name: "name".to_string(),
4654 new_name: "full_name".to_string(),
4655 }],
4656 }),
4657 estimated_cost: 1.0,
4658 estimated_rows: 0,
4659 };
4660 executor.execute(&rename_column_plan).unwrap();
4661
4662 let schema = executor
4664 .context
4665 .read()
4666 .unwrap()
4667 .get_table_schema("alter_test")
4668 .unwrap()
4669 .clone();
4670 assert_eq!(schema.columns[1].name, "full_name");
4671
4672 let drop_column_plan = QueryPlan {
4674 root: PlanNode::AlterTable(AlterTableNode {
4675 table_name: "alter_test".to_string(),
4676 operations: vec![PlanAlterOperation::DropColumn {
4677 name: "full_name".to_string(),
4678 if_exists: false,
4679 }],
4680 }),
4681 estimated_cost: 1.0,
4682 estimated_rows: 0,
4683 };
4684 executor.execute(&drop_column_plan).unwrap();
4685
4686 let schema = executor
4688 .context
4689 .read()
4690 .unwrap()
4691 .get_table_schema("alter_test")
4692 .unwrap()
4693 .clone();
4694 assert_eq!(schema.columns.len(), 1);
4695 }
4696
4697 #[test]
4698 fn test_alter_table_constraints() {
4699 use crate::planner::{
4700 AlterTableNode, CreateColumnDef, CreateTableConstraint, CreateTableNode,
4701 PlanAlterOperation,
4702 };
4703
4704 let executor = Executor::new(ExecutionContext::new());
4705
4706 let create_plan = QueryPlan {
4708 root: PlanNode::CreateTable(CreateTableNode {
4709 table_name: "constraint_test".to_string(),
4710 columns: vec![
4711 CreateColumnDef {
4712 name: "id".to_string(),
4713 data_type: DataType::Integer,
4714 nullable: false,
4715 default: None,
4716 primary_key: false,
4717 unique: false,
4718 },
4719 CreateColumnDef {
4720 name: "email".to_string(),
4721 data_type: DataType::Text,
4722 nullable: true,
4723 default: None,
4724 primary_key: false,
4725 unique: false,
4726 },
4727 ],
4728 constraints: vec![],
4729 if_not_exists: false,
4730 }),
4731 estimated_cost: 1.0,
4732 estimated_rows: 0,
4733 };
4734 executor.execute(&create_plan).unwrap();
4735
4736 let add_unique_plan = QueryPlan {
4738 root: PlanNode::AlterTable(AlterTableNode {
4739 table_name: "constraint_test".to_string(),
4740 operations: vec![PlanAlterOperation::AddConstraint(
4741 CreateTableConstraint::Unique {
4742 columns: vec!["email".to_string()],
4743 },
4744 )],
4745 }),
4746 estimated_cost: 1.0,
4747 estimated_rows: 0,
4748 };
4749 executor.execute(&add_unique_plan).unwrap();
4750
4751 let schema = executor
4753 .context
4754 .read()
4755 .unwrap()
4756 .get_table_schema("constraint_test")
4757 .unwrap()
4758 .clone();
4759 assert_eq!(schema.constraints.len(), 1);
4760 assert!(schema.constraints[0].name.contains("uq"));
4761
4762 let add_pk_plan = QueryPlan {
4764 root: PlanNode::AlterTable(AlterTableNode {
4765 table_name: "constraint_test".to_string(),
4766 operations: vec![PlanAlterOperation::AddConstraint(
4767 CreateTableConstraint::PrimaryKey {
4768 columns: vec!["id".to_string()],
4769 },
4770 )],
4771 }),
4772 estimated_cost: 1.0,
4773 estimated_rows: 0,
4774 };
4775 executor.execute(&add_pk_plan).unwrap();
4776
4777 let schema = executor
4779 .context
4780 .read()
4781 .unwrap()
4782 .get_table_schema("constraint_test")
4783 .unwrap()
4784 .clone();
4785 assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
4786 assert_eq!(schema.constraints.len(), 2);
4787
4788 let add_dup_pk_plan = QueryPlan {
4790 root: PlanNode::AlterTable(AlterTableNode {
4791 table_name: "constraint_test".to_string(),
4792 operations: vec![PlanAlterOperation::AddConstraint(
4793 CreateTableConstraint::PrimaryKey {
4794 columns: vec!["email".to_string()],
4795 },
4796 )],
4797 }),
4798 estimated_cost: 1.0,
4799 estimated_rows: 0,
4800 };
4801 let result = executor.execute(&add_dup_pk_plan);
4802 assert!(result.is_err());
4803
4804 let pk_constraint_name = {
4806 let schema = executor
4807 .context
4808 .read()
4809 .unwrap()
4810 .get_table_schema("constraint_test")
4811 .unwrap()
4812 .clone();
4813 schema
4814 .constraints
4815 .iter()
4816 .find(|c| matches!(c.constraint_type, StoredConstraintType::Unique { .. }))
4817 .map(|c| c.name.clone())
4818 .unwrap()
4819 };
4820
4821 let drop_constraint_plan = QueryPlan {
4822 root: PlanNode::AlterTable(AlterTableNode {
4823 table_name: "constraint_test".to_string(),
4824 operations: vec![PlanAlterOperation::DropConstraint {
4825 name: pk_constraint_name,
4826 }],
4827 }),
4828 estimated_cost: 1.0,
4829 estimated_rows: 0,
4830 };
4831 executor.execute(&drop_constraint_plan).unwrap();
4832
4833 let schema = executor
4835 .context
4836 .read()
4837 .unwrap()
4838 .get_table_schema("constraint_test")
4839 .unwrap()
4840 .clone();
4841 assert_eq!(schema.constraints.len(), 1);
4842 assert!(matches!(
4844 schema.constraints[0].constraint_type,
4845 StoredConstraintType::PrimaryKey { .. }
4846 ));
4847 }
4848
4849 #[test]
4850 fn test_create_table_with_constraints() {
4851 use crate::planner::{CreateColumnDef, CreateTableConstraint, CreateTableNode};
4852
4853 let executor = Executor::new(ExecutionContext::new());
4854
4855 let create_plan = QueryPlan {
4857 root: PlanNode::CreateTable(CreateTableNode {
4858 table_name: "users".to_string(),
4859 columns: vec![
4860 CreateColumnDef {
4861 name: "id".to_string(),
4862 data_type: DataType::Integer,
4863 nullable: false,
4864 default: None,
4865 primary_key: true,
4866 unique: false,
4867 },
4868 CreateColumnDef {
4869 name: "email".to_string(),
4870 data_type: DataType::Text,
4871 nullable: false,
4872 default: None,
4873 primary_key: false,
4874 unique: true, },
4876 CreateColumnDef {
4877 name: "name".to_string(),
4878 data_type: DataType::Text,
4879 nullable: true,
4880 default: None,
4881 primary_key: false,
4882 unique: false,
4883 },
4884 ],
4885 constraints: vec![CreateTableConstraint::Unique {
4886 columns: vec!["name".to_string()],
4887 }],
4888 if_not_exists: false,
4889 }),
4890 estimated_cost: 1.0,
4891 estimated_rows: 0,
4892 };
4893 executor.execute(&create_plan).unwrap();
4894
4895 let schema = executor
4897 .context
4898 .read()
4899 .unwrap()
4900 .get_table_schema("users")
4901 .unwrap()
4902 .clone();
4903 assert_eq!(schema.primary_key, Some(vec!["id".to_string()]));
4904 assert!(schema.constraints.len() >= 2);
4906 }
4907
4908 #[test]
4909 fn test_column_default_values() {
4910 use crate::planner::{
4911 CreateColumnDef, CreateTableNode, InsertNode, InsertPlanSource, PlanExpression,
4912 PlanLiteral,
4913 };
4914
4915 let executor = Executor::new(ExecutionContext::new());
4916
4917 let create_plan = QueryPlan {
4919 root: PlanNode::CreateTable(CreateTableNode {
4920 table_name: "defaults_test".to_string(),
4921 columns: vec![
4922 CreateColumnDef {
4923 name: "id".to_string(),
4924 data_type: DataType::Integer,
4925 nullable: false,
4926 default: None,
4927 primary_key: true,
4928 unique: false,
4929 },
4930 CreateColumnDef {
4931 name: "status".to_string(),
4932 data_type: DataType::Text,
4933 nullable: false,
4934 default: Some(PlanExpression::Literal(PlanLiteral::String(
4935 "active".to_string(),
4936 ))),
4937 primary_key: false,
4938 unique: false,
4939 },
4940 CreateColumnDef {
4941 name: "count".to_string(),
4942 data_type: DataType::Integer,
4943 nullable: true,
4944 default: Some(PlanExpression::Literal(PlanLiteral::Integer(0))),
4945 primary_key: false,
4946 unique: false,
4947 },
4948 ],
4949 constraints: vec![],
4950 if_not_exists: false,
4951 }),
4952 estimated_cost: 1.0,
4953 estimated_rows: 0,
4954 };
4955 executor.execute(&create_plan).unwrap();
4956
4957 let schema = executor
4959 .context
4960 .read()
4961 .unwrap()
4962 .get_table_schema("defaults_test")
4963 .unwrap()
4964 .clone();
4965 assert_eq!(
4966 schema.columns[1].default,
4967 Some(Value::String("active".to_string()))
4968 );
4969 assert_eq!(schema.columns[2].default, Some(Value::Integer(0)));
4970
4971 let insert_plan = QueryPlan {
4973 root: PlanNode::Insert(InsertNode {
4974 table_name: "defaults_test".to_string(),
4975 columns: vec!["id".to_string()],
4976 source: InsertPlanSource::Values(vec![vec![PlanExpression::Literal(
4977 PlanLiteral::Integer(1),
4978 )]]),
4979 }),
4980 estimated_cost: 1.0,
4981 estimated_rows: 1,
4982 };
4983 executor.execute(&insert_plan).unwrap();
4984
4985 let context = executor.context.read().unwrap();
4987 let table = context.get_table("defaults_test").unwrap();
4988 let table_data = table.read().unwrap();
4989
4990 assert_eq!(table_data.rows.len(), 1);
4991 assert_eq!(table_data.rows[0].values[0], Value::Integer(1)); assert_eq!(
4993 table_data.rows[0].values[1],
4994 Value::String("active".to_string())
4995 ); assert_eq!(table_data.rows[0].values[2], Value::Integer(0)); }
4998
4999 #[test]
5000 fn test_alter_column_default() {
5001 use crate::planner::{
5002 AlterTableNode, CreateColumnDef, CreateTableNode, PlanAlterOperation, PlanExpression,
5003 PlanLiteral,
5004 };
5005
5006 let executor = Executor::new(ExecutionContext::new());
5007
5008 let create_plan = QueryPlan {
5010 root: PlanNode::CreateTable(CreateTableNode {
5011 table_name: "alter_default_test".to_string(),
5012 columns: vec![
5013 CreateColumnDef {
5014 name: "id".to_string(),
5015 data_type: DataType::Integer,
5016 nullable: false,
5017 default: None,
5018 primary_key: true,
5019 unique: false,
5020 },
5021 CreateColumnDef {
5022 name: "value".to_string(),
5023 data_type: DataType::Integer,
5024 nullable: true,
5025 default: None,
5026 primary_key: false,
5027 unique: false,
5028 },
5029 ],
5030 constraints: vec![],
5031 if_not_exists: false,
5032 }),
5033 estimated_cost: 1.0,
5034 estimated_rows: 0,
5035 };
5036 executor.execute(&create_plan).unwrap();
5037
5038 let alter_plan = QueryPlan {
5040 root: PlanNode::AlterTable(AlterTableNode {
5041 table_name: "alter_default_test".to_string(),
5042 operations: vec![PlanAlterOperation::AlterColumn {
5043 name: "value".to_string(),
5044 data_type: None,
5045 set_not_null: None,
5046 set_default: Some(Some(PlanExpression::Literal(PlanLiteral::Integer(42)))),
5047 }],
5048 }),
5049 estimated_cost: 1.0,
5050 estimated_rows: 0,
5051 };
5052 executor.execute(&alter_plan).unwrap();
5053
5054 let schema = executor
5056 .context
5057 .read()
5058 .unwrap()
5059 .get_table_schema("alter_default_test")
5060 .unwrap()
5061 .clone();
5062 assert_eq!(schema.columns[1].default, Some(Value::Integer(42)));
5063
5064 let drop_default_plan = QueryPlan {
5066 root: PlanNode::AlterTable(AlterTableNode {
5067 table_name: "alter_default_test".to_string(),
5068 operations: vec![PlanAlterOperation::AlterColumn {
5069 name: "value".to_string(),
5070 data_type: None,
5071 set_not_null: None,
5072 set_default: Some(None), }],
5074 }),
5075 estimated_cost: 1.0,
5076 estimated_rows: 0,
5077 };
5078 executor.execute(&drop_default_plan).unwrap();
5079
5080 let schema = executor
5082 .context
5083 .read()
5084 .unwrap()
5085 .get_table_schema("alter_default_test")
5086 .unwrap()
5087 .clone();
5088 assert_eq!(schema.columns[1].default, None);
5089 }
5090
5091 #[test]
5092 fn test_parameterized_query() {
5093 use crate::planner::{
5094 CreateColumnDef, CreateTableNode, FilterNode, InsertNode, InsertPlanSource,
5095 };
5096
5097 let executor = Executor::new(ExecutionContext::new());
5098
5099 let create_plan = QueryPlan {
5101 root: PlanNode::CreateTable(CreateTableNode {
5102 table_name: "params_test".to_string(),
5103 columns: vec![
5104 CreateColumnDef {
5105 name: "id".to_string(),
5106 data_type: DataType::Integer,
5107 nullable: false,
5108 default: None,
5109 primary_key: true,
5110 unique: false,
5111 },
5112 CreateColumnDef {
5113 name: "name".to_string(),
5114 data_type: DataType::Text,
5115 nullable: true,
5116 default: None,
5117 primary_key: false,
5118 unique: false,
5119 },
5120 ],
5121 constraints: vec![],
5122 if_not_exists: false,
5123 }),
5124 estimated_cost: 1.0,
5125 estimated_rows: 0,
5126 };
5127 executor.execute(&create_plan).unwrap();
5128
5129 let insert_plan = QueryPlan {
5131 root: PlanNode::Insert(InsertNode {
5132 table_name: "params_test".to_string(),
5133 columns: vec!["id".to_string(), "name".to_string()],
5134 source: InsertPlanSource::Values(vec![
5135 vec![
5136 PlanExpression::Literal(PlanLiteral::Integer(1)),
5137 PlanExpression::Literal(PlanLiteral::String("Alice".to_string())),
5138 ],
5139 vec![
5140 PlanExpression::Literal(PlanLiteral::Integer(2)),
5141 PlanExpression::Literal(PlanLiteral::String("Bob".to_string())),
5142 ],
5143 vec![
5144 PlanExpression::Literal(PlanLiteral::Integer(3)),
5145 PlanExpression::Literal(PlanLiteral::String("Charlie".to_string())),
5146 ],
5147 ]),
5148 }),
5149 estimated_cost: 1.0,
5150 estimated_rows: 3,
5151 };
5152 executor.execute(&insert_plan).unwrap();
5153
5154 let query_plan = QueryPlan {
5156 root: PlanNode::Filter(FilterNode {
5157 input: Box::new(PlanNode::Scan(ScanNode {
5158 table_name: "params_test".to_string(),
5159 alias: None,
5160 columns: vec!["id".to_string(), "name".to_string()],
5161 index_scan: None,
5162 })),
5163 predicate: PlanExpression::BinaryOp {
5164 left: Box::new(PlanExpression::Column {
5165 table: None,
5166 name: "id".to_string(),
5167 data_type: DataType::Integer,
5168 }),
5169 op: PlanBinaryOp::Equal,
5170 right: Box::new(PlanExpression::Placeholder(1)),
5171 },
5172 }),
5173 estimated_cost: 10.0,
5174 estimated_rows: 1,
5175 };
5176
5177 let result = executor
5179 .execute_with_params(&query_plan, &[Value::Integer(2)])
5180 .unwrap();
5181 assert_eq!(result.rows.len(), 1);
5182 assert_eq!(result.rows[0].values[0], Value::Integer(2));
5183 assert_eq!(result.rows[0].values[1], Value::String("Bob".to_string()));
5184
5185 let result = executor
5187 .execute_with_params(&query_plan, &[Value::Integer(1)])
5188 .unwrap();
5189 assert_eq!(result.rows.len(), 1);
5190 assert_eq!(result.rows[0].values[1], Value::String("Alice".to_string()));
5191
5192 let result = executor
5194 .execute_with_params(&query_plan, &[Value::Integer(99)])
5195 .unwrap();
5196 assert_eq!(result.rows.len(), 0);
5197 }
5198
5199 #[test]
5200 fn test_parameterized_insert() {
5201 use crate::planner::{CreateColumnDef, CreateTableNode, InsertNode, InsertPlanSource};
5202
5203 let executor = Executor::new(ExecutionContext::new());
5204
5205 let create_plan = QueryPlan {
5207 root: PlanNode::CreateTable(CreateTableNode {
5208 table_name: "param_insert_test".to_string(),
5209 columns: vec![
5210 CreateColumnDef {
5211 name: "id".to_string(),
5212 data_type: DataType::Integer,
5213 nullable: false,
5214 default: None,
5215 primary_key: true,
5216 unique: false,
5217 },
5218 CreateColumnDef {
5219 name: "value".to_string(),
5220 data_type: DataType::Text,
5221 nullable: true,
5222 default: None,
5223 primary_key: false,
5224 unique: false,
5225 },
5226 ],
5227 constraints: vec![],
5228 if_not_exists: false,
5229 }),
5230 estimated_cost: 1.0,
5231 estimated_rows: 0,
5232 };
5233 executor.execute(&create_plan).unwrap();
5234
5235 let insert_plan = QueryPlan {
5237 root: PlanNode::Insert(InsertNode {
5238 table_name: "param_insert_test".to_string(),
5239 columns: vec!["id".to_string(), "value".to_string()],
5240 source: InsertPlanSource::Values(vec![vec![
5241 PlanExpression::Placeholder(1),
5242 PlanExpression::Placeholder(2),
5243 ]]),
5244 }),
5245 estimated_cost: 1.0,
5246 estimated_rows: 1,
5247 };
5248
5249 executor
5251 .execute_with_params(
5252 &insert_plan,
5253 &[Value::Integer(42), Value::String("test value".to_string())],
5254 )
5255 .unwrap();
5256
5257 let context = executor.context.read().unwrap();
5259 let table = context.get_table("param_insert_test").unwrap();
5260 let table_data = table.read().unwrap();
5261
5262 assert_eq!(table_data.rows.len(), 1);
5263 assert_eq!(table_data.rows[0].values[0], Value::Integer(42));
5264 assert_eq!(
5265 table_data.rows[0].values[1],
5266 Value::String("test value".to_string())
5267 );
5268 }
5269
5270 fn create_set_op_context() -> ExecutionContext {
5276 let mut context = ExecutionContext::new();
5277
5278 context.add_table(TableData {
5279 name: "t1".to_string(),
5280 columns: vec!["id".to_string(), "val".to_string()],
5281 rows: vec![
5282 Row {
5283 values: vec![Value::Integer(1), Value::String("a".to_string())],
5284 },
5285 Row {
5286 values: vec![Value::Integer(2), Value::String("b".to_string())],
5287 },
5288 Row {
5289 values: vec![Value::Integer(3), Value::String("c".to_string())],
5290 },
5291 ],
5292 row_created_version: vec![0, 0, 0],
5293 row_deleted_version: vec![0, 0, 0],
5294 });
5295
5296 context.add_table(TableData {
5297 name: "t2".to_string(),
5298 columns: vec!["id".to_string(), "val".to_string()],
5299 rows: vec![
5300 Row {
5301 values: vec![Value::Integer(2), Value::String("b".to_string())],
5302 },
5303 Row {
5304 values: vec![Value::Integer(3), Value::String("c".to_string())],
5305 },
5306 Row {
5307 values: vec![Value::Integer(4), Value::String("d".to_string())],
5308 },
5309 ],
5310 row_created_version: vec![0, 0, 0],
5311 row_deleted_version: vec![0, 0, 0],
5312 });
5313
5314 context
5315 }
5316
5317 fn make_set_op_plan(op: SetOperationType) -> QueryPlan {
5319 use crate::planner::SetOperationNode;
5320
5321 let left = PlanNode::Project(ProjectNode {
5322 input: Box::new(PlanNode::Scan(ScanNode {
5323 table_name: "t1".to_string(),
5324 alias: None,
5325 columns: vec!["id".to_string(), "val".to_string()],
5326 index_scan: None,
5327 })),
5328 expressions: vec![
5329 ProjectionExpr {
5330 expr: PlanExpression::Column {
5331 table: None,
5332 name: "id".to_string(),
5333 data_type: DataType::Integer,
5334 },
5335 alias: Some("id".to_string()),
5336 },
5337 ProjectionExpr {
5338 expr: PlanExpression::Column {
5339 table: None,
5340 name: "val".to_string(),
5341 data_type: DataType::Text,
5342 },
5343 alias: Some("val".to_string()),
5344 },
5345 ],
5346 });
5347
5348 let right = PlanNode::Project(ProjectNode {
5349 input: Box::new(PlanNode::Scan(ScanNode {
5350 table_name: "t2".to_string(),
5351 alias: None,
5352 columns: vec!["id".to_string(), "val".to_string()],
5353 index_scan: None,
5354 })),
5355 expressions: vec![
5356 ProjectionExpr {
5357 expr: PlanExpression::Column {
5358 table: None,
5359 name: "id".to_string(),
5360 data_type: DataType::Integer,
5361 },
5362 alias: Some("id".to_string()),
5363 },
5364 ProjectionExpr {
5365 expr: PlanExpression::Column {
5366 table: None,
5367 name: "val".to_string(),
5368 data_type: DataType::Text,
5369 },
5370 alias: Some("val".to_string()),
5371 },
5372 ],
5373 });
5374
5375 QueryPlan {
5376 root: PlanNode::SetOperation(SetOperationNode {
5377 op,
5378 left: Box::new(left),
5379 right: Box::new(right),
5380 }),
5381 estimated_cost: 10.0,
5382 estimated_rows: 6,
5383 }
5384 }
5385
5386 #[test]
5387 fn test_union_all() {
5388 let context = create_set_op_context();
5389 let executor = Executor::new(context);
5390 let plan = make_set_op_plan(crate::ast::SetOperationType::UnionAll);
5391
5392 let result = executor.execute(&plan).unwrap();
5393
5394 assert_eq!(result.rows.len(), 6);
5396 assert_eq!(result.columns, vec!["id".to_string(), "val".to_string()]);
5397 }
5398
5399 #[test]
5400 fn test_union() {
5401 let context = create_set_op_context();
5402 let executor = Executor::new(context);
5403 let plan = make_set_op_plan(crate::ast::SetOperationType::Union);
5404
5405 let result = executor.execute(&plan).unwrap();
5406
5407 assert_eq!(result.rows.len(), 4);
5409 let mut ids: Vec<i64> = result
5411 .rows
5412 .iter()
5413 .map(|r| match &r.values[0] {
5414 Value::Integer(i) => *i,
5415 _ => panic!("Expected int"),
5416 })
5417 .collect();
5418 ids.sort();
5419 assert_eq!(ids, vec![1, 2, 3, 4]);
5420 }
5421
5422 #[test]
5423 fn test_intersect() {
5424 let context = create_set_op_context();
5425 let executor = Executor::new(context);
5426 let plan = make_set_op_plan(crate::ast::SetOperationType::Intersect);
5427
5428 let result = executor.execute(&plan).unwrap();
5429
5430 assert_eq!(result.rows.len(), 2);
5432 let mut ids: Vec<i64> = result
5433 .rows
5434 .iter()
5435 .map(|r| match &r.values[0] {
5436 Value::Integer(i) => *i,
5437 _ => panic!("Expected int"),
5438 })
5439 .collect();
5440 ids.sort();
5441 assert_eq!(ids, vec![2, 3]);
5442 }
5443
5444 #[test]
5445 fn test_except() {
5446 let context = create_set_op_context();
5447 let executor = Executor::new(context);
5448 let plan = make_set_op_plan(crate::ast::SetOperationType::Except);
5449
5450 let result = executor.execute(&plan).unwrap();
5451
5452 assert_eq!(result.rows.len(), 1);
5454 assert_eq!(result.rows[0].values[0], Value::Integer(1));
5455 assert_eq!(result.rows[0].values[1], Value::String("a".to_string()));
5456 }
5457}