alopex_sql/executor/
mod.rs

1//! SQL Executor module for Alopex SQL.
2//!
3//! This module provides the execution engine for SQL statements.
4//!
5//! # Overview
6//!
7//! The Executor takes a [`LogicalPlan`] from the Planner and executes it
8//! against the storage layer. It supports DDL, DML, and Query operations.
9//!
10//! Query execution currently materializes intermediate results per stage;
11//! future versions may add streaming pipelines as requirements grow.
12
13//! # Components
14//!
15//! - [`Executor`]: Main executor struct
16//! - [`ExecutorError`]: Error types for execution
17//! - [`ExecutionResult`]: Execution result types
18//!
19//! # Example
20//!
21//! ```ignore
22//! use std::sync::{Arc, RwLock};
23//! use alopex_core::kv::memory::MemoryKV;
24//! use alopex_sql::executor::Executor;
25//! use alopex_sql::catalog::MemoryCatalog;
26//! use alopex_sql::planner::LogicalPlan;
27//!
28//! // Create storage and catalog
29//! let store = Arc::new(MemoryKV::new());
30//! let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
31//!
32//! // Create executor
33//! let mut executor = Executor::new(store, catalog);
34//!
35//! // Execute a plan
36//! let result = executor.execute(plan)?;
37//! ```
38
39pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
50pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
51
52use std::sync::{Arc, RwLock};
53
54use alopex_core::kv::KVStore;
55use alopex_core::types::TxnMode;
56
57use crate::catalog::Catalog;
58use crate::catalog::persistent::{IndexFqn, TableFqn};
59use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
60use crate::planner::LogicalPlan;
61use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
62
63/// SQL statement executor.
64///
65/// The Executor takes a [`LogicalPlan`] and executes it against the storage layer.
66/// It manages transactions and coordinates between DDL, DML, and Query operations.
67///
68/// # Type Parameters
69///
70/// - `S`: The underlying KV store type (must implement [`KVStore`])
71/// - `C`: The catalog type (must implement [`Catalog`])
72pub struct Executor<S: KVStore, C: Catalog> {
73    /// Transaction bridge for storage operations.
74    bridge: TxnBridge<S>,
75
76    /// Catalog for metadata operations.
77    catalog: Arc<RwLock<C>>,
78}
79
80impl<S: KVStore, C: Catalog> Executor<S, C> {
81    fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
82    where
83        F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
84    {
85        let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
86        match f(&mut txn) {
87            Ok(result) => {
88                txn.commit().map_err(ExecutorError::from)?;
89                Ok(result)
90            }
91            Err(err) => {
92                txn.rollback().map_err(ExecutorError::from)?;
93                Err(err)
94            }
95        }
96    }
97
98    /// Create a new Executor with the given store and catalog.
99    ///
100    /// # Arguments
101    ///
102    /// - `store`: The underlying KV store
103    /// - `catalog`: The catalog for metadata operations
104    pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
105        Self {
106            bridge: TxnBridge::new(store),
107            catalog,
108        }
109    }
110
111    /// Execute a logical plan and return the result.
112    ///
113    /// # Arguments
114    ///
115    /// - `plan`: The logical plan to execute
116    ///
117    /// # Returns
118    ///
119    /// Returns an [`ExecutionResult`] on success, or an [`ExecutorError`] on failure.
120    ///
121    /// # DDL Operations
122    ///
123    /// - `CreateTable`: Creates a new table with optional PK index
124    /// - `DropTable`: Drops a table and its associated indexes
125    /// - `CreateIndex`: Creates a new index
126    /// - `DropIndex`: Drops an index
127    ///
128    /// # DML Operations
129    ///
130    /// - `Insert`: Inserts rows into a table
131    /// - `Update`: Updates rows in a table
132    /// - `Delete`: Deletes rows from a table
133    ///
134    /// # Query Operations
135    ///
136    /// - `Scan`, `Filter`, `Sort`, `Limit`: SELECT query execution
137    pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
138        match plan {
139            // DDL Operations
140            LogicalPlan::CreateTable {
141                table,
142                if_not_exists,
143                with_options,
144            } => self.execute_create_table(table, with_options, if_not_exists),
145            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
146            LogicalPlan::CreateIndex {
147                index,
148                if_not_exists,
149            } => self.execute_create_index(index, if_not_exists),
150            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
151
152            // DML Operations
153            LogicalPlan::Insert {
154                table,
155                columns,
156                values,
157            } => self.execute_insert(&table, columns, values),
158            LogicalPlan::Update {
159                table,
160                assignments,
161                filter,
162            } => self.execute_update(&table, assignments, filter),
163            LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
164
165            // Query Operations
166            LogicalPlan::Scan { .. }
167            | LogicalPlan::Filter { .. }
168            | LogicalPlan::Aggregate { .. }
169            | LogicalPlan::Sort { .. }
170            | LogicalPlan::Limit { .. } => self.execute_query(plan),
171        }
172    }
173
174    // ========================================================================
175    // DDL Operations (to be implemented in Phase 2)
176    // ========================================================================
177
178    fn execute_create_table(
179        &mut self,
180        table: crate::catalog::TableMetadata,
181        with_options: Vec<(String, String)>,
182        if_not_exists: bool,
183    ) -> Result<ExecutionResult> {
184        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
185        self.run_in_write_txn(|txn| {
186            ddl::create_table::execute_create_table(
187                txn,
188                &mut *catalog,
189                table,
190                with_options,
191                if_not_exists,
192            )
193        })
194    }
195
196    fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
197        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
198        self.run_in_write_txn(|txn| {
199            ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
200        })
201    }
202
203    fn execute_create_index(
204        &mut self,
205        index: crate::catalog::IndexMetadata,
206        if_not_exists: bool,
207    ) -> Result<ExecutionResult> {
208        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
209        self.run_in_write_txn(|txn| {
210            ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
211        })
212    }
213
214    fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
215        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
216        self.run_in_write_txn(|txn| {
217            ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
218        })
219    }
220
221    // ========================================================================
222    // DML Operations (implemented in Phase 4)
223    // ========================================================================
224
225    fn execute_insert(
226        &mut self,
227        table: &str,
228        columns: Vec<String>,
229        values: Vec<Vec<crate::planner::TypedExpr>>,
230    ) -> Result<ExecutionResult> {
231        let catalog = self.catalog.read().expect("catalog lock poisoned");
232        self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
233    }
234
235    fn execute_update(
236        &mut self,
237        table: &str,
238        assignments: Vec<crate::planner::TypedAssignment>,
239        filter: Option<crate::planner::TypedExpr>,
240    ) -> Result<ExecutionResult> {
241        let catalog = self.catalog.read().expect("catalog lock poisoned");
242        self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
243    }
244
245    fn execute_delete(
246        &mut self,
247        table: &str,
248        filter: Option<crate::planner::TypedExpr>,
249    ) -> Result<ExecutionResult> {
250        let catalog = self.catalog.read().expect("catalog lock poisoned");
251        self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
252    }
253
254    // ========================================================================
255    // Query Operations (to be implemented in Phase 5)
256    // ========================================================================
257
258    fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
259        let catalog = self.catalog.read().expect("catalog lock poisoned");
260        self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
261    }
262}
263
264impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
265    pub fn execute_in_txn<'a, 'b, 'c>(
266        &mut self,
267        plan: LogicalPlan,
268        txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
269    ) -> Result<ExecutionResult> {
270        if txn.mode() == TxnMode::ReadOnly
271            && !matches!(
272                plan,
273                LogicalPlan::Scan { .. }
274                    | LogicalPlan::Filter { .. }
275                    | LogicalPlan::Aggregate { .. }
276                    | LogicalPlan::Sort { .. }
277                    | LogicalPlan::Limit { .. }
278            )
279        {
280            return Err(ExecutorError::ReadOnlyTransaction {
281                operation: plan.operation_name().to_string(),
282            });
283        }
284
285        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
286        let (mut sql_txn, overlay) = txn.split_parts();
287
288        let result = match plan {
289            LogicalPlan::CreateTable {
290                table,
291                if_not_exists,
292                with_options,
293            } => self.execute_create_table_in_txn(
294                &mut *catalog,
295                &mut sql_txn,
296                overlay,
297                table,
298                with_options,
299                if_not_exists,
300            ),
301            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
302                &mut *catalog,
303                &mut sql_txn,
304                overlay,
305                &name,
306                if_exists,
307            ),
308            LogicalPlan::CreateIndex {
309                index,
310                if_not_exists,
311            } => self.execute_create_index_in_txn(
312                &mut *catalog,
313                &mut sql_txn,
314                overlay,
315                index,
316                if_not_exists,
317            ),
318            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
319                &mut *catalog,
320                &mut sql_txn,
321                overlay,
322                &name,
323                if_exists,
324            ),
325            LogicalPlan::Insert {
326                table,
327                columns,
328                values,
329            } => {
330                let view = TxnCatalogView::new(&*catalog, &*overlay);
331                dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
332            }
333            LogicalPlan::Update {
334                table,
335                assignments,
336                filter,
337            } => {
338                let view = TxnCatalogView::new(&*catalog, &*overlay);
339                dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
340            }
341            LogicalPlan::Delete { table, filter } => {
342                let view = TxnCatalogView::new(&*catalog, &*overlay);
343                dml::execute_delete(&mut sql_txn, &view, &table, filter)
344            }
345            LogicalPlan::Scan { .. }
346            | LogicalPlan::Filter { .. }
347            | LogicalPlan::Aggregate { .. }
348            | LogicalPlan::Sort { .. }
349            | LogicalPlan::Limit { .. } => {
350                let view = TxnCatalogView::new(&*catalog, &*overlay);
351                query::execute_query(&mut sql_txn, &view, plan)
352            }
353        };
354
355        match result {
356            Ok(value) => {
357                sql_txn.flush_hnsw()?;
358                Ok(value)
359            }
360            Err(err) => {
361                let _ = sql_txn.abandon_hnsw();
362                Err(err)
363            }
364        }
365    }
366
367    fn map_catalog_error(err: CatalogError) -> ExecutorError {
368        match err {
369            CatalogError::Kv(e) => ExecutorError::Core(e),
370            CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
371                operation: "CatalogPersistence".into(),
372                reason: e.to_string(),
373            },
374            CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
375                operation: "CatalogPersistence".into(),
376                reason,
377            },
378        }
379    }
380
381    fn execute_create_table_in_txn<'txn>(
382        &self,
383        catalog: &mut PersistentCatalog<S>,
384        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
385        overlay: &mut CatalogOverlay,
386        mut table: crate::catalog::TableMetadata,
387        with_options: Vec<(String, String)>,
388        if_not_exists: bool,
389    ) -> Result<ExecutionResult>
390    where
391        S: 'txn,
392    {
393        if catalog.table_exists_in_txn(&table.name, overlay) {
394            return if if_not_exists {
395                Ok(ExecutionResult::Success)
396            } else {
397                Err(ExecutorError::TableAlreadyExists(table.name))
398            };
399        }
400
401        table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
402
403        let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
404            let column_indices = pk_columns
405                .iter()
406                .map(|name| {
407                    table
408                        .get_column_index(name)
409                        .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
410                })
411                .collect::<Result<Vec<_>>>()?;
412            let index_id = catalog.next_index_id();
413            let index_name = ddl::create_pk_index_name(&table.name);
414            let mut index = crate::catalog::IndexMetadata::new(
415                index_id,
416                index_name,
417                table.name.clone(),
418                pk_columns,
419            )
420            .with_column_indices(column_indices)
421            .with_unique(true);
422            index.catalog_name = table.catalog_name.clone();
423            index.namespace_name = table.namespace_name.clone();
424            Some(index)
425        } else {
426            None
427        };
428
429        let table_id = catalog.next_table_id();
430        table = table.with_table_id(table_id);
431
432        // storage keyspace の初期化
433        txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
434        txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
435
436        // 永続化(同一 KV トランザクション内)
437        catalog
438            .persist_create_table(txn.inner_mut(), &table)
439            .map_err(Self::map_catalog_error)?;
440        if let Some(index) = &pk_index {
441            catalog
442                .persist_create_index(txn.inner_mut(), index)
443                .map_err(Self::map_catalog_error)?;
444        }
445
446        // オーバーレイに反映(ベースカタログはコミットまで不変)
447        overlay.add_table(TableFqn::from(&table), table);
448        if let Some(index) = pk_index {
449            overlay.add_index(IndexFqn::from(&index), index);
450        }
451
452        Ok(ExecutionResult::Success)
453    }
454
455    fn execute_drop_table_in_txn<'txn>(
456        &self,
457        catalog: &mut PersistentCatalog<S>,
458        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
459        overlay: &mut CatalogOverlay,
460        table_name: &str,
461        if_exists: bool,
462    ) -> Result<ExecutionResult>
463    where
464        S: 'txn,
465    {
466        let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
467            Some(table) => table.clone(),
468            None => {
469                return if if_exists {
470                    Ok(ExecutionResult::Success)
471                } else {
472                    Err(ExecutorError::TableNotFound(table_name.to_string()))
473                };
474            }
475        };
476        if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
477            return if if_exists {
478                Ok(ExecutionResult::Success)
479            } else {
480                Err(ExecutorError::TableNotFound(table_name.to_string()))
481            };
482        }
483
484        let indexes = TxnCatalogView::new(catalog, overlay)
485            .get_indexes_for_table(table_name)
486            .into_iter()
487            .cloned()
488            .collect::<Vec<_>>();
489
490        for index in &indexes {
491            if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
492                crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
493            } else {
494                txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
495            }
496        }
497
498        txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
499        txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
500
501        catalog
502            .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
503            .map_err(Self::map_catalog_error)?;
504
505        overlay.drop_table(&TableFqn::from(&table_meta));
506
507        Ok(ExecutionResult::Success)
508    }
509
510    fn execute_create_index_in_txn<'txn>(
511        &self,
512        catalog: &mut PersistentCatalog<S>,
513        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
514        overlay: &mut CatalogOverlay,
515        mut index: crate::catalog::IndexMetadata,
516        if_not_exists: bool,
517    ) -> Result<ExecutionResult>
518    where
519        S: 'txn,
520    {
521        if ddl::is_implicit_pk_index(&index.name) {
522            return Err(ExecutorError::InvalidIndexName {
523                name: index.name.clone(),
524                reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
525            });
526        }
527
528        if catalog.index_exists_in_txn(&index.name, overlay) {
529            return if if_not_exists {
530                Ok(ExecutionResult::Success)
531            } else {
532                Err(ExecutorError::IndexAlreadyExists(index.name))
533            };
534        }
535
536        let table = catalog
537            .get_table_in_txn(&index.table, overlay)
538            .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
539            .clone();
540        index.catalog_name = table.catalog_name.clone();
541        index.namespace_name = table.namespace_name.clone();
542
543        let column_indices = index
544            .columns
545            .iter()
546            .map(|name| {
547                table
548                    .get_column_index(name)
549                    .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
550            })
551            .collect::<Result<Vec<_>>>()?;
552
553        let index_id = catalog.next_index_id();
554        index.index_id = index_id;
555        index.column_indices = column_indices.clone();
556
557        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
558            crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
559        } else {
560            ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
561        }
562
563        catalog
564            .persist_create_index(txn.inner_mut(), &index)
565            .map_err(Self::map_catalog_error)?;
566
567        overlay.add_index(IndexFqn::from(&index), index);
568
569        Ok(ExecutionResult::Success)
570    }
571
572    fn execute_drop_index_in_txn<'txn>(
573        &self,
574        catalog: &mut PersistentCatalog<S>,
575        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
576        overlay: &mut CatalogOverlay,
577        index_name: &str,
578        if_exists: bool,
579    ) -> Result<ExecutionResult>
580    where
581        S: 'txn,
582    {
583        if ddl::is_implicit_pk_index(index_name) {
584            return Err(ExecutorError::InvalidOperation {
585                operation: "DROP INDEX".into(),
586                reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
587            });
588        }
589
590        let index = match catalog.get_index_in_txn(index_name, overlay) {
591            Some(index) => index.clone(),
592            None => {
593                return if if_exists {
594                    Ok(ExecutionResult::Success)
595                } else {
596                    Err(ExecutorError::IndexNotFound(index_name.to_string()))
597                };
598            }
599        };
600        if index.catalog_name != "default" || index.namespace_name != "default" {
601            return if if_exists {
602                Ok(ExecutionResult::Success)
603            } else {
604                Err(ExecutorError::IndexNotFound(index_name.to_string()))
605            };
606        }
607
608        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
609            crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
610        } else {
611            txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
612        }
613
614        catalog
615            .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
616            .map_err(Self::map_catalog_error)?;
617
618        overlay.drop_index(&IndexFqn::from(&index));
619
620        Ok(ExecutionResult::Success)
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use crate::catalog::MemoryCatalog;
628    use alopex_core::kv::memory::MemoryKV;
629
630    fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
631        let store = Arc::new(MemoryKV::new());
632        let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
633        Executor::new(store, catalog)
634    }
635
636    #[test]
637    fn test_executor_creation() {
638        let _executor = create_executor();
639        // Executor should be created without panic
640    }
641
642    #[test]
643    fn create_table_is_supported() {
644        let mut executor = create_executor();
645
646        use crate::catalog::{ColumnMetadata, TableMetadata};
647        use crate::planner::ResolvedType;
648
649        let table = TableMetadata::new(
650            "test",
651            vec![ColumnMetadata::new("id", ResolvedType::Integer)],
652        );
653
654        let result = executor.execute(LogicalPlan::CreateTable {
655            table,
656            if_not_exists: false,
657            with_options: vec![],
658        });
659        assert!(matches!(result, Ok(ExecutionResult::Success)));
660
661        let catalog = executor.catalog.read().unwrap();
662        assert!(catalog.table_exists("test"));
663    }
664
665    #[test]
666    fn insert_is_supported() {
667        use crate::Span;
668        use crate::catalog::{ColumnMetadata, TableMetadata};
669        use crate::planner::typed_expr::TypedExprKind;
670        use crate::planner::types::ResolvedType;
671
672        let mut executor = create_executor();
673
674        let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
675            .with_primary_key(vec!["id".into()]);
676
677        executor
678            .execute(LogicalPlan::CreateTable {
679                table,
680                if_not_exists: false,
681                with_options: vec![],
682            })
683            .unwrap();
684
685        let result = executor.execute(LogicalPlan::Insert {
686            table: "t".into(),
687            columns: vec!["id".into()],
688            values: vec![vec![crate::planner::typed_expr::TypedExpr {
689                kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
690                resolved_type: ResolvedType::Integer,
691                span: Span::default(),
692            }]],
693        });
694        assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
695    }
696}