alopex_sql/executor/
mod.rs

1//! SQL Executor module for Alopex SQL.
2//!
3//! This module provides the execution engine for SQL statements.
4//!
5//! # Overview
6//!
7//! The Executor takes a [`LogicalPlan`] from the Planner and executes it
8//! against the storage layer. It supports DDL, DML, and Query operations.
9//!
10//! Query execution currently materializes intermediate results per stage;
11//! future versions may add streaming pipelines as requirements grow.
12
13//! # Components
14//!
15//! - [`Executor`]: Main executor struct
16//! - [`ExecutorError`]: Error types for execution
17//! - [`ExecutionResult`]: Execution result types
18//!
19//! # Example
20//!
21//! ```ignore
22//! use std::sync::{Arc, RwLock};
23//! use alopex_core::kv::memory::MemoryKV;
24//! use alopex_sql::executor::Executor;
25//! use alopex_sql::catalog::MemoryCatalog;
26//! use alopex_sql::planner::LogicalPlan;
27//!
28//! // Create storage and catalog
29//! let store = Arc::new(MemoryKV::new());
30//! let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
31//!
32//! // Create executor
33//! let mut executor = Executor::new(store, catalog);
34//!
35//! // Execute a plan
36//! let result = executor.execute(plan)?;
37//! ```
38
39pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use result::{ColumnInfo, ExecutionResult, QueryResult, Row};
50
51use std::sync::{Arc, RwLock};
52
53use alopex_core::kv::KVStore;
54use alopex_core::types::TxnMode;
55
56use crate::catalog::Catalog;
57use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
58use crate::planner::LogicalPlan;
59use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
60
61/// SQL statement executor.
62///
63/// The Executor takes a [`LogicalPlan`] and executes it against the storage layer.
64/// It manages transactions and coordinates between DDL, DML, and Query operations.
65///
66/// # Type Parameters
67///
68/// - `S`: The underlying KV store type (must implement [`KVStore`])
69/// - `C`: The catalog type (must implement [`Catalog`])
70pub struct Executor<S: KVStore, C: Catalog> {
71    /// Transaction bridge for storage operations.
72    bridge: TxnBridge<S>,
73
74    /// Catalog for metadata operations.
75    catalog: Arc<RwLock<C>>,
76}
77
78impl<S: KVStore, C: Catalog> Executor<S, C> {
79    fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
80    where
81        F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
82    {
83        let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
84        match f(&mut txn) {
85            Ok(result) => {
86                txn.commit().map_err(ExecutorError::from)?;
87                Ok(result)
88            }
89            Err(err) => {
90                txn.rollback().map_err(ExecutorError::from)?;
91                Err(err)
92            }
93        }
94    }
95
96    /// Create a new Executor with the given store and catalog.
97    ///
98    /// # Arguments
99    ///
100    /// - `store`: The underlying KV store
101    /// - `catalog`: The catalog for metadata operations
102    pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
103        Self {
104            bridge: TxnBridge::new(store),
105            catalog,
106        }
107    }
108
109    /// Execute a logical plan and return the result.
110    ///
111    /// # Arguments
112    ///
113    /// - `plan`: The logical plan to execute
114    ///
115    /// # Returns
116    ///
117    /// Returns an [`ExecutionResult`] on success, or an [`ExecutorError`] on failure.
118    ///
119    /// # DDL Operations
120    ///
121    /// - `CreateTable`: Creates a new table with optional PK index
122    /// - `DropTable`: Drops a table and its associated indexes
123    /// - `CreateIndex`: Creates a new index
124    /// - `DropIndex`: Drops an index
125    ///
126    /// # DML Operations
127    ///
128    /// - `Insert`: Inserts rows into a table
129    /// - `Update`: Updates rows in a table
130    /// - `Delete`: Deletes rows from a table
131    ///
132    /// # Query Operations
133    ///
134    /// - `Scan`, `Filter`, `Sort`, `Limit`: SELECT query execution
135    pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
136        match plan {
137            // DDL Operations
138            LogicalPlan::CreateTable {
139                table,
140                if_not_exists,
141                with_options,
142            } => self.execute_create_table(table, with_options, if_not_exists),
143            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
144            LogicalPlan::CreateIndex {
145                index,
146                if_not_exists,
147            } => self.execute_create_index(index, if_not_exists),
148            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
149
150            // DML Operations
151            LogicalPlan::Insert {
152                table,
153                columns,
154                values,
155            } => self.execute_insert(&table, columns, values),
156            LogicalPlan::Update {
157                table,
158                assignments,
159                filter,
160            } => self.execute_update(&table, assignments, filter),
161            LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
162
163            // Query Operations
164            LogicalPlan::Scan { .. }
165            | LogicalPlan::Filter { .. }
166            | LogicalPlan::Sort { .. }
167            | LogicalPlan::Limit { .. } => self.execute_query(plan),
168        }
169    }
170
171    // ========================================================================
172    // DDL Operations (to be implemented in Phase 2)
173    // ========================================================================
174
175    fn execute_create_table(
176        &mut self,
177        table: crate::catalog::TableMetadata,
178        with_options: Vec<(String, String)>,
179        if_not_exists: bool,
180    ) -> Result<ExecutionResult> {
181        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
182        self.run_in_write_txn(|txn| {
183            ddl::create_table::execute_create_table(
184                txn,
185                &mut *catalog,
186                table,
187                with_options,
188                if_not_exists,
189            )
190        })
191    }
192
193    fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
194        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
195        self.run_in_write_txn(|txn| {
196            ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
197        })
198    }
199
200    fn execute_create_index(
201        &mut self,
202        index: crate::catalog::IndexMetadata,
203        if_not_exists: bool,
204    ) -> Result<ExecutionResult> {
205        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
206        self.run_in_write_txn(|txn| {
207            ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
208        })
209    }
210
211    fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
212        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
213        self.run_in_write_txn(|txn| {
214            ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
215        })
216    }
217
218    // ========================================================================
219    // DML Operations (implemented in Phase 4)
220    // ========================================================================
221
222    fn execute_insert(
223        &mut self,
224        table: &str,
225        columns: Vec<String>,
226        values: Vec<Vec<crate::planner::TypedExpr>>,
227    ) -> Result<ExecutionResult> {
228        let catalog = self.catalog.read().expect("catalog lock poisoned");
229        self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
230    }
231
232    fn execute_update(
233        &mut self,
234        table: &str,
235        assignments: Vec<crate::planner::TypedAssignment>,
236        filter: Option<crate::planner::TypedExpr>,
237    ) -> Result<ExecutionResult> {
238        let catalog = self.catalog.read().expect("catalog lock poisoned");
239        self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
240    }
241
242    fn execute_delete(
243        &mut self,
244        table: &str,
245        filter: Option<crate::planner::TypedExpr>,
246    ) -> Result<ExecutionResult> {
247        let catalog = self.catalog.read().expect("catalog lock poisoned");
248        self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
249    }
250
251    // ========================================================================
252    // Query Operations (to be implemented in Phase 5)
253    // ========================================================================
254
255    fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
256        let catalog = self.catalog.read().expect("catalog lock poisoned");
257        self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
258    }
259}
260
261impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
262    pub fn execute_in_txn<'a, 'b, 'c>(
263        &mut self,
264        plan: LogicalPlan,
265        txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
266    ) -> Result<ExecutionResult> {
267        if txn.mode() == TxnMode::ReadOnly
268            && !matches!(
269                plan,
270                LogicalPlan::Scan { .. }
271                    | LogicalPlan::Filter { .. }
272                    | LogicalPlan::Sort { .. }
273                    | LogicalPlan::Limit { .. }
274            )
275        {
276            return Err(ExecutorError::ReadOnlyTransaction {
277                operation: plan.operation_name().to_string(),
278            });
279        }
280
281        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
282        let (mut sql_txn, overlay) = txn.split_parts();
283
284        let result = match plan {
285            LogicalPlan::CreateTable {
286                table,
287                if_not_exists,
288                with_options,
289            } => self.execute_create_table_in_txn(
290                &mut *catalog,
291                &mut sql_txn,
292                overlay,
293                table,
294                with_options,
295                if_not_exists,
296            ),
297            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
298                &mut *catalog,
299                &mut sql_txn,
300                overlay,
301                &name,
302                if_exists,
303            ),
304            LogicalPlan::CreateIndex {
305                index,
306                if_not_exists,
307            } => self.execute_create_index_in_txn(
308                &mut *catalog,
309                &mut sql_txn,
310                overlay,
311                index,
312                if_not_exists,
313            ),
314            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
315                &mut *catalog,
316                &mut sql_txn,
317                overlay,
318                &name,
319                if_exists,
320            ),
321            LogicalPlan::Insert {
322                table,
323                columns,
324                values,
325            } => {
326                let view = TxnCatalogView::new(&*catalog, &*overlay);
327                dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
328            }
329            LogicalPlan::Update {
330                table,
331                assignments,
332                filter,
333            } => {
334                let view = TxnCatalogView::new(&*catalog, &*overlay);
335                dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
336            }
337            LogicalPlan::Delete { table, filter } => {
338                let view = TxnCatalogView::new(&*catalog, &*overlay);
339                dml::execute_delete(&mut sql_txn, &view, &table, filter)
340            }
341            LogicalPlan::Scan { .. }
342            | LogicalPlan::Filter { .. }
343            | LogicalPlan::Sort { .. }
344            | LogicalPlan::Limit { .. } => {
345                let view = TxnCatalogView::new(&*catalog, &*overlay);
346                query::execute_query(&mut sql_txn, &view, plan)
347            }
348        };
349
350        match result {
351            Ok(value) => {
352                sql_txn.flush_hnsw()?;
353                Ok(value)
354            }
355            Err(err) => {
356                let _ = sql_txn.abandon_hnsw();
357                Err(err)
358            }
359        }
360    }
361
362    fn map_catalog_error(err: CatalogError) -> ExecutorError {
363        match err {
364            CatalogError::Kv(e) => ExecutorError::Core(e),
365            CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
366                operation: "CatalogPersistence".into(),
367                reason: e.to_string(),
368            },
369            CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
370                operation: "CatalogPersistence".into(),
371                reason,
372            },
373        }
374    }
375
376    fn execute_create_table_in_txn<'txn>(
377        &self,
378        catalog: &mut PersistentCatalog<S>,
379        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
380        overlay: &mut CatalogOverlay,
381        mut table: crate::catalog::TableMetadata,
382        with_options: Vec<(String, String)>,
383        if_not_exists: bool,
384    ) -> Result<ExecutionResult>
385    where
386        S: 'txn,
387    {
388        if catalog.table_exists_in_txn(&table.name, overlay) {
389            return if if_not_exists {
390                Ok(ExecutionResult::Success)
391            } else {
392                Err(ExecutorError::TableAlreadyExists(table.name))
393            };
394        }
395
396        table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
397
398        let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
399            let column_indices = pk_columns
400                .iter()
401                .map(|name| {
402                    table
403                        .get_column_index(name)
404                        .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
405                })
406                .collect::<Result<Vec<_>>>()?;
407            let index_id = catalog.next_index_id();
408            let index_name = ddl::create_pk_index_name(&table.name);
409            Some(
410                crate::catalog::IndexMetadata::new(
411                    index_id,
412                    index_name,
413                    table.name.clone(),
414                    pk_columns,
415                )
416                .with_column_indices(column_indices)
417                .with_unique(true),
418            )
419        } else {
420            None
421        };
422
423        let table_id = catalog.next_table_id();
424        table = table.with_table_id(table_id);
425
426        // storage keyspace の初期化
427        txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
428        txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
429
430        // 永続化(同一 KV トランザクション内)
431        catalog
432            .persist_create_table(txn.inner_mut(), &table)
433            .map_err(Self::map_catalog_error)?;
434        if let Some(index) = &pk_index {
435            catalog
436                .persist_create_index(txn.inner_mut(), index)
437                .map_err(Self::map_catalog_error)?;
438        }
439
440        // オーバーレイに反映(ベースカタログはコミットまで不変)
441        overlay.add_table(table);
442        if let Some(index) = pk_index {
443            overlay.add_index(index);
444        }
445
446        Ok(ExecutionResult::Success)
447    }
448
449    fn execute_drop_table_in_txn<'txn>(
450        &self,
451        catalog: &mut PersistentCatalog<S>,
452        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
453        overlay: &mut CatalogOverlay,
454        table_name: &str,
455        if_exists: bool,
456    ) -> Result<ExecutionResult>
457    where
458        S: 'txn,
459    {
460        let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
461            Some(table) => table.clone(),
462            None => {
463                return if if_exists {
464                    Ok(ExecutionResult::Success)
465                } else {
466                    Err(ExecutorError::TableNotFound(table_name.to_string()))
467                };
468            }
469        };
470
471        let indexes = TxnCatalogView::new(catalog, overlay)
472            .get_indexes_for_table(table_name)
473            .into_iter()
474            .cloned()
475            .collect::<Vec<_>>();
476
477        for index in &indexes {
478            if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
479                crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
480            } else {
481                txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
482            }
483        }
484
485        txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
486        txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
487
488        catalog
489            .persist_drop_table(txn.inner_mut(), table_name)
490            .map_err(Self::map_catalog_error)?;
491
492        overlay.drop_table(table_name);
493
494        Ok(ExecutionResult::Success)
495    }
496
497    fn execute_create_index_in_txn<'txn>(
498        &self,
499        catalog: &mut PersistentCatalog<S>,
500        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
501        overlay: &mut CatalogOverlay,
502        mut index: crate::catalog::IndexMetadata,
503        if_not_exists: bool,
504    ) -> Result<ExecutionResult>
505    where
506        S: 'txn,
507    {
508        if ddl::is_implicit_pk_index(&index.name) {
509            return Err(ExecutorError::InvalidIndexName {
510                name: index.name.clone(),
511                reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
512            });
513        }
514
515        if catalog.index_exists_in_txn(&index.name, overlay) {
516            return if if_not_exists {
517                Ok(ExecutionResult::Success)
518            } else {
519                Err(ExecutorError::IndexAlreadyExists(index.name))
520            };
521        }
522
523        let table = catalog
524            .get_table_in_txn(&index.table, overlay)
525            .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
526            .clone();
527
528        let column_indices = index
529            .columns
530            .iter()
531            .map(|name| {
532                table
533                    .get_column_index(name)
534                    .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
535            })
536            .collect::<Result<Vec<_>>>()?;
537
538        let index_id = catalog.next_index_id();
539        index.index_id = index_id;
540        index.column_indices = column_indices.clone();
541
542        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
543            crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
544        } else {
545            ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
546        }
547
548        catalog
549            .persist_create_index(txn.inner_mut(), &index)
550            .map_err(Self::map_catalog_error)?;
551
552        overlay.add_index(index);
553
554        Ok(ExecutionResult::Success)
555    }
556
557    fn execute_drop_index_in_txn<'txn>(
558        &self,
559        catalog: &mut PersistentCatalog<S>,
560        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
561        overlay: &mut CatalogOverlay,
562        index_name: &str,
563        if_exists: bool,
564    ) -> Result<ExecutionResult>
565    where
566        S: 'txn,
567    {
568        if ddl::is_implicit_pk_index(index_name) {
569            return Err(ExecutorError::InvalidOperation {
570                operation: "DROP INDEX".into(),
571                reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
572            });
573        }
574
575        let index = match catalog.get_index_in_txn(index_name, overlay) {
576            Some(index) => index.clone(),
577            None => {
578                return if if_exists {
579                    Ok(ExecutionResult::Success)
580                } else {
581                    Err(ExecutorError::IndexNotFound(index_name.to_string()))
582                };
583            }
584        };
585
586        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
587            crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
588        } else {
589            txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
590        }
591
592        catalog
593            .persist_drop_index(txn.inner_mut(), index_name)
594            .map_err(Self::map_catalog_error)?;
595
596        overlay.drop_index(index_name);
597
598        Ok(ExecutionResult::Success)
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605    use crate::catalog::MemoryCatalog;
606    use alopex_core::kv::memory::MemoryKV;
607
608    fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
609        let store = Arc::new(MemoryKV::new());
610        let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
611        Executor::new(store, catalog)
612    }
613
614    #[test]
615    fn test_executor_creation() {
616        let _executor = create_executor();
617        // Executor should be created without panic
618    }
619
620    #[test]
621    fn create_table_is_supported() {
622        let mut executor = create_executor();
623
624        use crate::catalog::{ColumnMetadata, TableMetadata};
625        use crate::planner::ResolvedType;
626
627        let table = TableMetadata::new(
628            "test",
629            vec![ColumnMetadata::new("id", ResolvedType::Integer)],
630        );
631
632        let result = executor.execute(LogicalPlan::CreateTable {
633            table,
634            if_not_exists: false,
635            with_options: vec![],
636        });
637        assert!(matches!(result, Ok(ExecutionResult::Success)));
638
639        let catalog = executor.catalog.read().unwrap();
640        assert!(catalog.table_exists("test"));
641    }
642
643    #[test]
644    fn insert_is_supported() {
645        use crate::Span;
646        use crate::catalog::{ColumnMetadata, TableMetadata};
647        use crate::planner::typed_expr::TypedExprKind;
648        use crate::planner::types::ResolvedType;
649
650        let mut executor = create_executor();
651
652        let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
653            .with_primary_key(vec!["id".into()]);
654
655        executor
656            .execute(LogicalPlan::CreateTable {
657                table,
658                if_not_exists: false,
659                with_options: vec![],
660            })
661            .unwrap();
662
663        let result = executor.execute(LogicalPlan::Insert {
664            table: "t".into(),
665            columns: vec!["id".into()],
666            values: vec![vec![crate::planner::typed_expr::TypedExpr {
667                kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
668                resolved_type: ResolvedType::Integer,
669                span: Span::default(),
670            }]],
671        });
672        assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
673    }
674}