alopex_sql/executor/
mod.rs

1//! SQL Executor module for Alopex SQL.
2//!
3//! This module provides the execution engine for SQL statements.
4//!
5//! # Overview
6//!
7//! The Executor takes a [`LogicalPlan`] from the Planner and executes it
8//! against the storage layer. It supports DDL, DML, and Query operations.
9//!
10//! Query execution currently materializes intermediate results per stage;
11//! future versions may add streaming pipelines as requirements grow.
12
13//! # Components
14//!
15//! - [`Executor`]: Main executor struct
16//! - [`ExecutorError`]: Error types for execution
17//! - [`ExecutionResult`]: Execution result types
18//!
19//! # Example
20//!
21//! ```ignore
22//! use std::sync::{Arc, RwLock};
23//! use alopex_core::kv::memory::MemoryKV;
24//! use alopex_sql::executor::Executor;
25//! use alopex_sql::catalog::MemoryCatalog;
26//! use alopex_sql::planner::LogicalPlan;
27//!
28//! // Create storage and catalog
29//! let store = Arc::new(MemoryKV::new());
30//! let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
31//!
32//! // Create executor
33//! let mut executor = Executor::new(store, catalog);
34//!
35//! // Execute a plan
36//! let result = executor.execute(plan)?;
37//! ```
38
39pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
50pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
51
52use std::sync::{Arc, RwLock};
53
54use alopex_core::kv::KVStore;
55use alopex_core::types::TxnMode;
56
57use crate::catalog::Catalog;
58use crate::catalog::persistent::{IndexFqn, TableFqn};
59use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
60use crate::planner::LogicalPlan;
61use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
62
63/// SQL statement executor.
64///
65/// The Executor takes a [`LogicalPlan`] and executes it against the storage layer.
66/// It manages transactions and coordinates between DDL, DML, and Query operations.
67///
68/// # Type Parameters
69///
70/// - `S`: The underlying KV store type (must implement [`KVStore`])
71/// - `C`: The catalog type (must implement [`Catalog`])
72pub struct Executor<S: KVStore, C: Catalog> {
73    /// Transaction bridge for storage operations.
74    bridge: TxnBridge<S>,
75
76    /// Catalog for metadata operations.
77    catalog: Arc<RwLock<C>>,
78}
79
80impl<S: KVStore, C: Catalog> Executor<S, C> {
81    fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
82    where
83        F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
84    {
85        let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
86        match f(&mut txn) {
87            Ok(result) => {
88                txn.commit().map_err(ExecutorError::from)?;
89                Ok(result)
90            }
91            Err(err) => {
92                txn.rollback().map_err(ExecutorError::from)?;
93                Err(err)
94            }
95        }
96    }
97
98    /// Create a new Executor with the given store and catalog.
99    ///
100    /// # Arguments
101    ///
102    /// - `store`: The underlying KV store
103    /// - `catalog`: The catalog for metadata operations
104    pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
105        Self {
106            bridge: TxnBridge::new(store),
107            catalog,
108        }
109    }
110
111    /// Execute a logical plan and return the result.
112    ///
113    /// # Arguments
114    ///
115    /// - `plan`: The logical plan to execute
116    ///
117    /// # Returns
118    ///
119    /// Returns an [`ExecutionResult`] on success, or an [`ExecutorError`] on failure.
120    ///
121    /// # DDL Operations
122    ///
123    /// - `CreateTable`: Creates a new table with optional PK index
124    /// - `DropTable`: Drops a table and its associated indexes
125    /// - `CreateIndex`: Creates a new index
126    /// - `DropIndex`: Drops an index
127    ///
128    /// # DML Operations
129    ///
130    /// - `Insert`: Inserts rows into a table
131    /// - `Update`: Updates rows in a table
132    /// - `Delete`: Deletes rows from a table
133    ///
134    /// # Query Operations
135    ///
136    /// - `Scan`, `Filter`, `Sort`, `Limit`: SELECT query execution
137    pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
138        match plan {
139            // DDL Operations
140            LogicalPlan::CreateTable {
141                table,
142                if_not_exists,
143                with_options,
144            } => self.execute_create_table(table, with_options, if_not_exists),
145            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
146            LogicalPlan::CreateIndex {
147                index,
148                if_not_exists,
149            } => self.execute_create_index(index, if_not_exists),
150            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
151
152            // DML Operations
153            LogicalPlan::Insert {
154                table,
155                columns,
156                values,
157            } => self.execute_insert(&table, columns, values),
158            LogicalPlan::Update {
159                table,
160                assignments,
161                filter,
162            } => self.execute_update(&table, assignments, filter),
163            LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
164
165            // Query Operations
166            LogicalPlan::Scan { .. }
167            | LogicalPlan::Filter { .. }
168            | LogicalPlan::Sort { .. }
169            | LogicalPlan::Limit { .. } => self.execute_query(plan),
170        }
171    }
172
173    // ========================================================================
174    // DDL Operations (to be implemented in Phase 2)
175    // ========================================================================
176
177    fn execute_create_table(
178        &mut self,
179        table: crate::catalog::TableMetadata,
180        with_options: Vec<(String, String)>,
181        if_not_exists: bool,
182    ) -> Result<ExecutionResult> {
183        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
184        self.run_in_write_txn(|txn| {
185            ddl::create_table::execute_create_table(
186                txn,
187                &mut *catalog,
188                table,
189                with_options,
190                if_not_exists,
191            )
192        })
193    }
194
195    fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
196        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
197        self.run_in_write_txn(|txn| {
198            ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
199        })
200    }
201
202    fn execute_create_index(
203        &mut self,
204        index: crate::catalog::IndexMetadata,
205        if_not_exists: bool,
206    ) -> Result<ExecutionResult> {
207        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
208        self.run_in_write_txn(|txn| {
209            ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
210        })
211    }
212
213    fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
214        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
215        self.run_in_write_txn(|txn| {
216            ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
217        })
218    }
219
220    // ========================================================================
221    // DML Operations (implemented in Phase 4)
222    // ========================================================================
223
224    fn execute_insert(
225        &mut self,
226        table: &str,
227        columns: Vec<String>,
228        values: Vec<Vec<crate::planner::TypedExpr>>,
229    ) -> Result<ExecutionResult> {
230        let catalog = self.catalog.read().expect("catalog lock poisoned");
231        self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
232    }
233
234    fn execute_update(
235        &mut self,
236        table: &str,
237        assignments: Vec<crate::planner::TypedAssignment>,
238        filter: Option<crate::planner::TypedExpr>,
239    ) -> Result<ExecutionResult> {
240        let catalog = self.catalog.read().expect("catalog lock poisoned");
241        self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
242    }
243
244    fn execute_delete(
245        &mut self,
246        table: &str,
247        filter: Option<crate::planner::TypedExpr>,
248    ) -> Result<ExecutionResult> {
249        let catalog = self.catalog.read().expect("catalog lock poisoned");
250        self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
251    }
252
253    // ========================================================================
254    // Query Operations (to be implemented in Phase 5)
255    // ========================================================================
256
257    fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
258        let catalog = self.catalog.read().expect("catalog lock poisoned");
259        self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
260    }
261}
262
263impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
264    pub fn execute_in_txn<'a, 'b, 'c>(
265        &mut self,
266        plan: LogicalPlan,
267        txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
268    ) -> Result<ExecutionResult> {
269        if txn.mode() == TxnMode::ReadOnly
270            && !matches!(
271                plan,
272                LogicalPlan::Scan { .. }
273                    | LogicalPlan::Filter { .. }
274                    | LogicalPlan::Sort { .. }
275                    | LogicalPlan::Limit { .. }
276            )
277        {
278            return Err(ExecutorError::ReadOnlyTransaction {
279                operation: plan.operation_name().to_string(),
280            });
281        }
282
283        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
284        let (mut sql_txn, overlay) = txn.split_parts();
285
286        let result = match plan {
287            LogicalPlan::CreateTable {
288                table,
289                if_not_exists,
290                with_options,
291            } => self.execute_create_table_in_txn(
292                &mut *catalog,
293                &mut sql_txn,
294                overlay,
295                table,
296                with_options,
297                if_not_exists,
298            ),
299            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
300                &mut *catalog,
301                &mut sql_txn,
302                overlay,
303                &name,
304                if_exists,
305            ),
306            LogicalPlan::CreateIndex {
307                index,
308                if_not_exists,
309            } => self.execute_create_index_in_txn(
310                &mut *catalog,
311                &mut sql_txn,
312                overlay,
313                index,
314                if_not_exists,
315            ),
316            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
317                &mut *catalog,
318                &mut sql_txn,
319                overlay,
320                &name,
321                if_exists,
322            ),
323            LogicalPlan::Insert {
324                table,
325                columns,
326                values,
327            } => {
328                let view = TxnCatalogView::new(&*catalog, &*overlay);
329                dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
330            }
331            LogicalPlan::Update {
332                table,
333                assignments,
334                filter,
335            } => {
336                let view = TxnCatalogView::new(&*catalog, &*overlay);
337                dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
338            }
339            LogicalPlan::Delete { table, filter } => {
340                let view = TxnCatalogView::new(&*catalog, &*overlay);
341                dml::execute_delete(&mut sql_txn, &view, &table, filter)
342            }
343            LogicalPlan::Scan { .. }
344            | LogicalPlan::Filter { .. }
345            | LogicalPlan::Sort { .. }
346            | LogicalPlan::Limit { .. } => {
347                let view = TxnCatalogView::new(&*catalog, &*overlay);
348                query::execute_query(&mut sql_txn, &view, plan)
349            }
350        };
351
352        match result {
353            Ok(value) => {
354                sql_txn.flush_hnsw()?;
355                Ok(value)
356            }
357            Err(err) => {
358                let _ = sql_txn.abandon_hnsw();
359                Err(err)
360            }
361        }
362    }
363
364    fn map_catalog_error(err: CatalogError) -> ExecutorError {
365        match err {
366            CatalogError::Kv(e) => ExecutorError::Core(e),
367            CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
368                operation: "CatalogPersistence".into(),
369                reason: e.to_string(),
370            },
371            CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
372                operation: "CatalogPersistence".into(),
373                reason,
374            },
375        }
376    }
377
378    fn execute_create_table_in_txn<'txn>(
379        &self,
380        catalog: &mut PersistentCatalog<S>,
381        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
382        overlay: &mut CatalogOverlay,
383        mut table: crate::catalog::TableMetadata,
384        with_options: Vec<(String, String)>,
385        if_not_exists: bool,
386    ) -> Result<ExecutionResult>
387    where
388        S: 'txn,
389    {
390        if catalog.table_exists_in_txn(&table.name, overlay) {
391            return if if_not_exists {
392                Ok(ExecutionResult::Success)
393            } else {
394                Err(ExecutorError::TableAlreadyExists(table.name))
395            };
396        }
397
398        table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
399
400        let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
401            let column_indices = pk_columns
402                .iter()
403                .map(|name| {
404                    table
405                        .get_column_index(name)
406                        .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
407                })
408                .collect::<Result<Vec<_>>>()?;
409            let index_id = catalog.next_index_id();
410            let index_name = ddl::create_pk_index_name(&table.name);
411            let mut index = crate::catalog::IndexMetadata::new(
412                index_id,
413                index_name,
414                table.name.clone(),
415                pk_columns,
416            )
417            .with_column_indices(column_indices)
418            .with_unique(true);
419            index.catalog_name = table.catalog_name.clone();
420            index.namespace_name = table.namespace_name.clone();
421            Some(index)
422        } else {
423            None
424        };
425
426        let table_id = catalog.next_table_id();
427        table = table.with_table_id(table_id);
428
429        // storage keyspace の初期化
430        txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
431        txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
432
433        // 永続化(同一 KV トランザクション内)
434        catalog
435            .persist_create_table(txn.inner_mut(), &table)
436            .map_err(Self::map_catalog_error)?;
437        if let Some(index) = &pk_index {
438            catalog
439                .persist_create_index(txn.inner_mut(), index)
440                .map_err(Self::map_catalog_error)?;
441        }
442
443        // オーバーレイに反映(ベースカタログはコミットまで不変)
444        overlay.add_table(TableFqn::from(&table), table);
445        if let Some(index) = pk_index {
446            overlay.add_index(IndexFqn::from(&index), index);
447        }
448
449        Ok(ExecutionResult::Success)
450    }
451
452    fn execute_drop_table_in_txn<'txn>(
453        &self,
454        catalog: &mut PersistentCatalog<S>,
455        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
456        overlay: &mut CatalogOverlay,
457        table_name: &str,
458        if_exists: bool,
459    ) -> Result<ExecutionResult>
460    where
461        S: 'txn,
462    {
463        let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
464            Some(table) => table.clone(),
465            None => {
466                return if if_exists {
467                    Ok(ExecutionResult::Success)
468                } else {
469                    Err(ExecutorError::TableNotFound(table_name.to_string()))
470                };
471            }
472        };
473        if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
474            return if if_exists {
475                Ok(ExecutionResult::Success)
476            } else {
477                Err(ExecutorError::TableNotFound(table_name.to_string()))
478            };
479        }
480
481        let indexes = TxnCatalogView::new(catalog, overlay)
482            .get_indexes_for_table(table_name)
483            .into_iter()
484            .cloned()
485            .collect::<Vec<_>>();
486
487        for index in &indexes {
488            if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
489                crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
490            } else {
491                txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
492            }
493        }
494
495        txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
496        txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
497
498        catalog
499            .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
500            .map_err(Self::map_catalog_error)?;
501
502        overlay.drop_table(&TableFqn::from(&table_meta));
503
504        Ok(ExecutionResult::Success)
505    }
506
507    fn execute_create_index_in_txn<'txn>(
508        &self,
509        catalog: &mut PersistentCatalog<S>,
510        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
511        overlay: &mut CatalogOverlay,
512        mut index: crate::catalog::IndexMetadata,
513        if_not_exists: bool,
514    ) -> Result<ExecutionResult>
515    where
516        S: 'txn,
517    {
518        if ddl::is_implicit_pk_index(&index.name) {
519            return Err(ExecutorError::InvalidIndexName {
520                name: index.name.clone(),
521                reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
522            });
523        }
524
525        if catalog.index_exists_in_txn(&index.name, overlay) {
526            return if if_not_exists {
527                Ok(ExecutionResult::Success)
528            } else {
529                Err(ExecutorError::IndexAlreadyExists(index.name))
530            };
531        }
532
533        let table = catalog
534            .get_table_in_txn(&index.table, overlay)
535            .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
536            .clone();
537        index.catalog_name = table.catalog_name.clone();
538        index.namespace_name = table.namespace_name.clone();
539
540        let column_indices = index
541            .columns
542            .iter()
543            .map(|name| {
544                table
545                    .get_column_index(name)
546                    .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
547            })
548            .collect::<Result<Vec<_>>>()?;
549
550        let index_id = catalog.next_index_id();
551        index.index_id = index_id;
552        index.column_indices = column_indices.clone();
553
554        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
555            crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
556        } else {
557            ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
558        }
559
560        catalog
561            .persist_create_index(txn.inner_mut(), &index)
562            .map_err(Self::map_catalog_error)?;
563
564        overlay.add_index(IndexFqn::from(&index), index);
565
566        Ok(ExecutionResult::Success)
567    }
568
569    fn execute_drop_index_in_txn<'txn>(
570        &self,
571        catalog: &mut PersistentCatalog<S>,
572        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
573        overlay: &mut CatalogOverlay,
574        index_name: &str,
575        if_exists: bool,
576    ) -> Result<ExecutionResult>
577    where
578        S: 'txn,
579    {
580        if ddl::is_implicit_pk_index(index_name) {
581            return Err(ExecutorError::InvalidOperation {
582                operation: "DROP INDEX".into(),
583                reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
584            });
585        }
586
587        let index = match catalog.get_index_in_txn(index_name, overlay) {
588            Some(index) => index.clone(),
589            None => {
590                return if if_exists {
591                    Ok(ExecutionResult::Success)
592                } else {
593                    Err(ExecutorError::IndexNotFound(index_name.to_string()))
594                };
595            }
596        };
597        if index.catalog_name != "default" || index.namespace_name != "default" {
598            return if if_exists {
599                Ok(ExecutionResult::Success)
600            } else {
601                Err(ExecutorError::IndexNotFound(index_name.to_string()))
602            };
603        }
604
605        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
606            crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
607        } else {
608            txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
609        }
610
611        catalog
612            .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
613            .map_err(Self::map_catalog_error)?;
614
615        overlay.drop_index(&IndexFqn::from(&index));
616
617        Ok(ExecutionResult::Success)
618    }
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624    use crate::catalog::MemoryCatalog;
625    use alopex_core::kv::memory::MemoryKV;
626
627    fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
628        let store = Arc::new(MemoryKV::new());
629        let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
630        Executor::new(store, catalog)
631    }
632
633    #[test]
634    fn test_executor_creation() {
635        let _executor = create_executor();
636        // Executor should be created without panic
637    }
638
639    #[test]
640    fn create_table_is_supported() {
641        let mut executor = create_executor();
642
643        use crate::catalog::{ColumnMetadata, TableMetadata};
644        use crate::planner::ResolvedType;
645
646        let table = TableMetadata::new(
647            "test",
648            vec![ColumnMetadata::new("id", ResolvedType::Integer)],
649        );
650
651        let result = executor.execute(LogicalPlan::CreateTable {
652            table,
653            if_not_exists: false,
654            with_options: vec![],
655        });
656        assert!(matches!(result, Ok(ExecutionResult::Success)));
657
658        let catalog = executor.catalog.read().unwrap();
659        assert!(catalog.table_exists("test"));
660    }
661
662    #[test]
663    fn insert_is_supported() {
664        use crate::Span;
665        use crate::catalog::{ColumnMetadata, TableMetadata};
666        use crate::planner::typed_expr::TypedExprKind;
667        use crate::planner::types::ResolvedType;
668
669        let mut executor = create_executor();
670
671        let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
672            .with_primary_key(vec!["id".into()]);
673
674        executor
675            .execute(LogicalPlan::CreateTable {
676                table,
677                if_not_exists: false,
678                with_options: vec![],
679            })
680            .unwrap();
681
682        let result = executor.execute(LogicalPlan::Insert {
683            table: "t".into(),
684            columns: vec!["id".into()],
685            values: vec![vec![crate::planner::typed_expr::TypedExpr {
686                kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
687                resolved_type: ResolvedType::Integer,
688                span: Span::default(),
689            }]],
690        });
691        assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
692    }
693}