Skip to main content

alopex_sql/executor/
mod.rs

1//! SQL Executor module for Alopex SQL.
2//!
3//! This module provides the execution engine for SQL statements.
4//!
5//! # Overview
6//!
7//! The Executor takes a [`LogicalPlan`] from the Planner and executes it
8//! against the storage layer. It supports DDL, DML, and Query operations.
9//!
10//! Query execution currently materializes intermediate results per stage;
11//! future versions may add streaming pipelines as requirements grow.
12
13//! # Components
14//!
15//! - [`Executor`]: Main executor struct
16//! - [`ExecutorError`]: Error types for execution
17//! - [`ExecutionResult`]: Execution result types
18//!
19//! # Example
20//!
21//! ```ignore
22//! use std::sync::{Arc, RwLock};
23//! use alopex_core::kv::memory::MemoryKV;
24//! use alopex_sql::executor::Executor;
25//! use alopex_sql::catalog::MemoryCatalog;
26//! use alopex_sql::planner::LogicalPlan;
27//!
28//! // Create storage and catalog
29//! let store = Arc::new(MemoryKV::new());
30//! let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
31//!
32//! // Create executor
33//! let mut executor = Executor::new(store, catalog);
34//!
35//! // Execute a plan
36//! let result = executor.execute(plan)?;
37//! ```
38
39#[cfg(feature = "tokio")]
40pub mod async_executor;
41pub mod bulk;
42pub(crate) mod ddl;
43pub(crate) mod dml;
44mod error;
45pub mod evaluator;
46mod hnsw_bridge;
47pub mod memory;
48pub mod query;
49mod result;
50
51#[cfg(feature = "tokio")]
52pub use async_executor::AsyncExecutor;
53pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
54pub use memory::{MemoryPolicy, SpillPolicy};
55pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
56pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
57
58use std::sync::{Arc, RwLock};
59
60use alopex_core::kv::KVStore;
61use alopex_core::types::TxnMode;
62
63use crate::catalog::Catalog;
64use crate::catalog::persistent::{IndexFqn, TableFqn};
65use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
66use crate::planner::LogicalPlan;
67use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
68
69/// SQL statement executor.
70///
71/// The Executor takes a [`LogicalPlan`] and executes it against the storage layer.
72/// It manages transactions and coordinates between DDL, DML, and Query operations.
73///
74/// # Type Parameters
75///
76/// - `S`: The underlying KV store type (must implement [`KVStore`])
77/// - `C`: The catalog type (must implement [`Catalog`])
78pub struct Executor<S: KVStore, C: Catalog> {
79    /// Transaction bridge for storage operations.
80    bridge: TxnBridge<S>,
81
82    /// Catalog for metadata operations.
83    catalog: Arc<RwLock<C>>,
84}
85
86impl<S: KVStore, C: Catalog> Executor<S, C> {
87    fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
88    where
89        F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
90    {
91        let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
92        match f(&mut txn) {
93            Ok(result) => {
94                txn.commit().map_err(ExecutorError::from)?;
95                Ok(result)
96            }
97            Err(err) => {
98                txn.rollback().map_err(ExecutorError::from)?;
99                Err(err)
100            }
101        }
102    }
103
104    /// Create a new Executor with the given store and catalog.
105    ///
106    /// # Arguments
107    ///
108    /// - `store`: The underlying KV store
109    /// - `catalog`: The catalog for metadata operations
110    pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
111        Self {
112            bridge: TxnBridge::new(store),
113            catalog,
114        }
115    }
116
117    /// Execute a logical plan and return the result.
118    ///
119    /// # Arguments
120    ///
121    /// - `plan`: The logical plan to execute
122    ///
123    /// # Returns
124    ///
125    /// Returns an [`ExecutionResult`] on success, or an [`ExecutorError`] on failure.
126    ///
127    /// # DDL Operations
128    ///
129    /// - `CreateTable`: Creates a new table with optional PK index
130    /// - `DropTable`: Drops a table and its associated indexes
131    /// - `CreateIndex`: Creates a new index
132    /// - `DropIndex`: Drops an index
133    ///
134    /// # DML Operations
135    ///
136    /// - `Insert`: Inserts rows into a table
137    /// - `Update`: Updates rows in a table
138    /// - `Delete`: Deletes rows from a table
139    ///
140    /// # Query Operations
141    ///
142    /// - `Scan`, `Filter`, `Sort`, `Limit`: SELECT query execution
143    pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
144        match plan {
145            // DDL Operations
146            LogicalPlan::CreateTable {
147                table,
148                if_not_exists,
149                with_options,
150            } => self.execute_create_table(table, with_options, if_not_exists),
151            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
152            LogicalPlan::CreateIndex {
153                index,
154                if_not_exists,
155            } => self.execute_create_index(index, if_not_exists),
156            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
157
158            // DML Operations
159            LogicalPlan::Insert {
160                table,
161                columns,
162                values,
163            } => self.execute_insert(&table, columns, values),
164            LogicalPlan::Update {
165                table,
166                assignments,
167                filter,
168            } => self.execute_update(&table, assignments, filter),
169            LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
170
171            // Query Operations
172            LogicalPlan::Scan { .. }
173            | LogicalPlan::Filter { .. }
174            | LogicalPlan::Aggregate { .. }
175            | LogicalPlan::Sort { .. }
176            | LogicalPlan::Limit { .. } => self.execute_query(plan),
177        }
178    }
179
180    // ========================================================================
181    // DDL Operations (to be implemented in Phase 2)
182    // ========================================================================
183
184    fn execute_create_table(
185        &mut self,
186        table: crate::catalog::TableMetadata,
187        with_options: Vec<(String, String)>,
188        if_not_exists: bool,
189    ) -> Result<ExecutionResult> {
190        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
191        self.run_in_write_txn(|txn| {
192            ddl::create_table::execute_create_table(
193                txn,
194                &mut *catalog,
195                table,
196                with_options,
197                if_not_exists,
198            )
199        })
200    }
201
202    fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
203        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
204        self.run_in_write_txn(|txn| {
205            ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
206        })
207    }
208
209    fn execute_create_index(
210        &mut self,
211        index: crate::catalog::IndexMetadata,
212        if_not_exists: bool,
213    ) -> Result<ExecutionResult> {
214        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
215        self.run_in_write_txn(|txn| {
216            ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
217        })
218    }
219
220    fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
221        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
222        self.run_in_write_txn(|txn| {
223            ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
224        })
225    }
226
227    // ========================================================================
228    // DML Operations (implemented in Phase 4)
229    // ========================================================================
230
231    fn execute_insert(
232        &mut self,
233        table: &str,
234        columns: Vec<String>,
235        values: Vec<Vec<crate::planner::TypedExpr>>,
236    ) -> Result<ExecutionResult> {
237        let catalog = self.catalog.read().expect("catalog lock poisoned");
238        self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
239    }
240
241    fn execute_update(
242        &mut self,
243        table: &str,
244        assignments: Vec<crate::planner::TypedAssignment>,
245        filter: Option<crate::planner::TypedExpr>,
246    ) -> Result<ExecutionResult> {
247        let catalog = self.catalog.read().expect("catalog lock poisoned");
248        self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
249    }
250
251    fn execute_delete(
252        &mut self,
253        table: &str,
254        filter: Option<crate::planner::TypedExpr>,
255    ) -> Result<ExecutionResult> {
256        let catalog = self.catalog.read().expect("catalog lock poisoned");
257        self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
258    }
259
260    // ========================================================================
261    // Query Operations (to be implemented in Phase 5)
262    // ========================================================================
263
264    fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
265        let catalog = self.catalog.read().expect("catalog lock poisoned");
266        self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
267    }
268}
269
270impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
271    pub fn execute_in_txn<'a, 'b, 'c>(
272        &mut self,
273        plan: LogicalPlan,
274        txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
275    ) -> Result<ExecutionResult> {
276        if txn.mode() == TxnMode::ReadOnly
277            && !matches!(
278                plan,
279                LogicalPlan::Scan { .. }
280                    | LogicalPlan::Filter { .. }
281                    | LogicalPlan::Aggregate { .. }
282                    | LogicalPlan::Sort { .. }
283                    | LogicalPlan::Limit { .. }
284            )
285        {
286            return Err(ExecutorError::ReadOnlyTransaction {
287                operation: plan.operation_name().to_string(),
288            });
289        }
290
291        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
292        let (mut sql_txn, overlay) = txn.split_parts();
293
294        let result = match plan {
295            LogicalPlan::CreateTable {
296                table,
297                if_not_exists,
298                with_options,
299            } => self.execute_create_table_in_txn(
300                &mut *catalog,
301                &mut sql_txn,
302                overlay,
303                table,
304                with_options,
305                if_not_exists,
306            ),
307            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
308                &mut *catalog,
309                &mut sql_txn,
310                overlay,
311                &name,
312                if_exists,
313            ),
314            LogicalPlan::CreateIndex {
315                index,
316                if_not_exists,
317            } => self.execute_create_index_in_txn(
318                &mut *catalog,
319                &mut sql_txn,
320                overlay,
321                index,
322                if_not_exists,
323            ),
324            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
325                &mut *catalog,
326                &mut sql_txn,
327                overlay,
328                &name,
329                if_exists,
330            ),
331            LogicalPlan::Insert {
332                table,
333                columns,
334                values,
335            } => {
336                let view = TxnCatalogView::new(&*catalog, &*overlay);
337                dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
338            }
339            LogicalPlan::Update {
340                table,
341                assignments,
342                filter,
343            } => {
344                let view = TxnCatalogView::new(&*catalog, &*overlay);
345                dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
346            }
347            LogicalPlan::Delete { table, filter } => {
348                let view = TxnCatalogView::new(&*catalog, &*overlay);
349                dml::execute_delete(&mut sql_txn, &view, &table, filter)
350            }
351            LogicalPlan::Scan { .. }
352            | LogicalPlan::Filter { .. }
353            | LogicalPlan::Aggregate { .. }
354            | LogicalPlan::Sort { .. }
355            | LogicalPlan::Limit { .. } => {
356                let view = TxnCatalogView::new(&*catalog, &*overlay);
357                query::execute_query(&mut sql_txn, &view, plan)
358            }
359        };
360
361        match result {
362            Ok(value) => {
363                sql_txn.flush_hnsw()?;
364                Ok(value)
365            }
366            Err(err) => {
367                let _ = sql_txn.abandon_hnsw();
368                Err(err)
369            }
370        }
371    }
372
373    fn map_catalog_error(err: CatalogError) -> ExecutorError {
374        match err {
375            CatalogError::Kv(e) => ExecutorError::Core(e),
376            CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
377                operation: "CatalogPersistence".into(),
378                reason: e.to_string(),
379            },
380            CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
381                operation: "CatalogPersistence".into(),
382                reason,
383            },
384        }
385    }
386
387    fn execute_create_table_in_txn<'txn>(
388        &self,
389        catalog: &mut PersistentCatalog<S>,
390        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
391        overlay: &mut CatalogOverlay,
392        mut table: crate::catalog::TableMetadata,
393        with_options: Vec<(String, String)>,
394        if_not_exists: bool,
395    ) -> Result<ExecutionResult>
396    where
397        S: 'txn,
398    {
399        if catalog.table_exists_in_txn(&table.name, overlay) {
400            return if if_not_exists {
401                Ok(ExecutionResult::Success)
402            } else {
403                Err(ExecutorError::TableAlreadyExists(table.name))
404            };
405        }
406
407        table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
408
409        let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
410            let column_indices = pk_columns
411                .iter()
412                .map(|name| {
413                    table
414                        .get_column_index(name)
415                        .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
416                })
417                .collect::<Result<Vec<_>>>()?;
418            let index_id = catalog.next_index_id();
419            let index_name = ddl::create_pk_index_name(&table.name);
420            let mut index = crate::catalog::IndexMetadata::new(
421                index_id,
422                index_name,
423                table.name.clone(),
424                pk_columns,
425            )
426            .with_column_indices(column_indices)
427            .with_unique(true);
428            index.catalog_name = table.catalog_name.clone();
429            index.namespace_name = table.namespace_name.clone();
430            Some(index)
431        } else {
432            None
433        };
434
435        let table_id = catalog.next_table_id();
436        table = table.with_table_id(table_id);
437
438        // storage keyspace の初期化
439        txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
440        txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
441
442        // 永続化(同一 KV トランザクション内)
443        catalog
444            .persist_create_table(txn.inner_mut(), &table)
445            .map_err(Self::map_catalog_error)?;
446        if let Some(index) = &pk_index {
447            catalog
448                .persist_create_index(txn.inner_mut(), index)
449                .map_err(Self::map_catalog_error)?;
450        }
451
452        // オーバーレイに反映(ベースカタログはコミットまで不変)
453        overlay.add_table(TableFqn::from(&table), table);
454        if let Some(index) = pk_index {
455            overlay.add_index(IndexFqn::from(&index), index);
456        }
457
458        Ok(ExecutionResult::Success)
459    }
460
461    fn execute_drop_table_in_txn<'txn>(
462        &self,
463        catalog: &mut PersistentCatalog<S>,
464        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
465        overlay: &mut CatalogOverlay,
466        table_name: &str,
467        if_exists: bool,
468    ) -> Result<ExecutionResult>
469    where
470        S: 'txn,
471    {
472        let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
473            Some(table) => table.clone(),
474            None => {
475                return if if_exists {
476                    Ok(ExecutionResult::Success)
477                } else {
478                    Err(ExecutorError::TableNotFound(table_name.to_string()))
479                };
480            }
481        };
482        if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
483            return if if_exists {
484                Ok(ExecutionResult::Success)
485            } else {
486                Err(ExecutorError::TableNotFound(table_name.to_string()))
487            };
488        }
489
490        let indexes = TxnCatalogView::new(catalog, overlay)
491            .get_indexes_for_table(table_name)
492            .into_iter()
493            .cloned()
494            .collect::<Vec<_>>();
495
496        for index in &indexes {
497            if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
498                crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
499            } else {
500                txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
501            }
502        }
503
504        txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
505        txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
506
507        catalog
508            .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
509            .map_err(Self::map_catalog_error)?;
510
511        overlay.drop_table(&TableFqn::from(&table_meta));
512
513        Ok(ExecutionResult::Success)
514    }
515
516    fn execute_create_index_in_txn<'txn>(
517        &self,
518        catalog: &mut PersistentCatalog<S>,
519        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
520        overlay: &mut CatalogOverlay,
521        mut index: crate::catalog::IndexMetadata,
522        if_not_exists: bool,
523    ) -> Result<ExecutionResult>
524    where
525        S: 'txn,
526    {
527        if ddl::is_implicit_pk_index(&index.name) {
528            return Err(ExecutorError::InvalidIndexName {
529                name: index.name.clone(),
530                reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
531            });
532        }
533
534        if catalog.index_exists_in_txn(&index.name, overlay) {
535            return if if_not_exists {
536                Ok(ExecutionResult::Success)
537            } else {
538                Err(ExecutorError::IndexAlreadyExists(index.name))
539            };
540        }
541
542        let table = catalog
543            .get_table_in_txn(&index.table, overlay)
544            .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
545            .clone();
546        index.catalog_name = table.catalog_name.clone();
547        index.namespace_name = table.namespace_name.clone();
548
549        let column_indices = index
550            .columns
551            .iter()
552            .map(|name| {
553                table
554                    .get_column_index(name)
555                    .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
556            })
557            .collect::<Result<Vec<_>>>()?;
558
559        let index_id = catalog.next_index_id();
560        index.index_id = index_id;
561        index.column_indices = column_indices.clone();
562
563        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
564            crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
565        } else {
566            ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
567        }
568
569        catalog
570            .persist_create_index(txn.inner_mut(), &index)
571            .map_err(Self::map_catalog_error)?;
572
573        overlay.add_index(IndexFqn::from(&index), index);
574
575        Ok(ExecutionResult::Success)
576    }
577
578    fn execute_drop_index_in_txn<'txn>(
579        &self,
580        catalog: &mut PersistentCatalog<S>,
581        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
582        overlay: &mut CatalogOverlay,
583        index_name: &str,
584        if_exists: bool,
585    ) -> Result<ExecutionResult>
586    where
587        S: 'txn,
588    {
589        if ddl::is_implicit_pk_index(index_name) {
590            return Err(ExecutorError::InvalidOperation {
591                operation: "DROP INDEX".into(),
592                reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
593            });
594        }
595
596        let index = match catalog.get_index_in_txn(index_name, overlay) {
597            Some(index) => index.clone(),
598            None => {
599                return if if_exists {
600                    Ok(ExecutionResult::Success)
601                } else {
602                    Err(ExecutorError::IndexNotFound(index_name.to_string()))
603                };
604            }
605        };
606        if index.catalog_name != "default" || index.namespace_name != "default" {
607            return if if_exists {
608                Ok(ExecutionResult::Success)
609            } else {
610                Err(ExecutorError::IndexNotFound(index_name.to_string()))
611            };
612        }
613
614        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
615            crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
616        } else {
617            txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
618        }
619
620        catalog
621            .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
622            .map_err(Self::map_catalog_error)?;
623
624        overlay.drop_index(&IndexFqn::from(&index));
625
626        Ok(ExecutionResult::Success)
627    }
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633    use crate::catalog::MemoryCatalog;
634    use alopex_core::kv::memory::MemoryKV;
635
636    fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
637        let store = Arc::new(MemoryKV::new());
638        let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
639        Executor::new(store, catalog)
640    }
641
642    #[test]
643    fn test_executor_creation() {
644        let _executor = create_executor();
645        // Executor should be created without panic
646    }
647
648    #[test]
649    fn create_table_is_supported() {
650        let mut executor = create_executor();
651
652        use crate::catalog::{ColumnMetadata, TableMetadata};
653        use crate::planner::ResolvedType;
654
655        let table = TableMetadata::new(
656            "test",
657            vec![ColumnMetadata::new("id", ResolvedType::Integer)],
658        );
659
660        let result = executor.execute(LogicalPlan::CreateTable {
661            table,
662            if_not_exists: false,
663            with_options: vec![],
664        });
665        assert!(matches!(result, Ok(ExecutionResult::Success)));
666
667        let catalog = executor.catalog.read().unwrap();
668        assert!(catalog.table_exists("test"));
669    }
670
671    #[test]
672    fn insert_is_supported() {
673        use crate::Span;
674        use crate::catalog::{ColumnMetadata, TableMetadata};
675        use crate::planner::typed_expr::TypedExprKind;
676        use crate::planner::types::ResolvedType;
677
678        let mut executor = create_executor();
679
680        let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
681            .with_primary_key(vec!["id".into()]);
682
683        executor
684            .execute(LogicalPlan::CreateTable {
685                table,
686                if_not_exists: false,
687                with_options: vec![],
688            })
689            .unwrap();
690
691        let result = executor.execute(LogicalPlan::Insert {
692            table: "t".into(),
693            columns: vec!["id".into()],
694            values: vec![vec![crate::planner::typed_expr::TypedExpr {
695                kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
696                resolved_type: ResolvedType::Integer,
697                span: Span::default(),
698            }]],
699        });
700        assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
701    }
702}