alopex_sql/executor/
mod.rs

1//! SQL Executor module for Alopex SQL.
2//!
3//! This module provides the execution engine for SQL statements.
4//!
5//! # Overview
6//!
7//! The Executor takes a [`LogicalPlan`] from the Planner and executes it
8//! against the storage layer. It supports DDL, DML, and Query operations.
9//!
10//! Query execution currently materializes intermediate results per stage;
11//! future versions may add streaming pipelines as requirements grow.
12
13//! # Components
14//!
15//! - [`Executor`]: Main executor struct
16//! - [`ExecutorError`]: Error types for execution
17//! - [`ExecutionResult`]: Execution result types
18//!
19//! # Example
20//!
21//! ```ignore
22//! use std::sync::{Arc, RwLock};
23//! use alopex_core::kv::memory::MemoryKV;
24//! use alopex_sql::executor::Executor;
25//! use alopex_sql::catalog::MemoryCatalog;
26//! use alopex_sql::planner::LogicalPlan;
27//!
28//! // Create storage and catalog
29//! let store = Arc::new(MemoryKV::new());
30//! let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
31//!
32//! // Create executor
33//! let mut executor = Executor::new(store, catalog);
34//!
35//! // Execute a plan
36//! let result = executor.execute(plan)?;
37//! ```
38
39#[cfg(feature = "tokio")]
40pub mod async_executor;
41pub mod bulk;
42pub(crate) mod ddl;
43pub(crate) mod dml;
44mod error;
45pub mod evaluator;
46mod hnsw_bridge;
47pub mod query;
48mod result;
49
50#[cfg(feature = "tokio")]
51pub use async_executor::AsyncExecutor;
52pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
53pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
54pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
55
56use std::sync::{Arc, RwLock};
57
58use alopex_core::kv::KVStore;
59use alopex_core::types::TxnMode;
60
61use crate::catalog::Catalog;
62use crate::catalog::persistent::{IndexFqn, TableFqn};
63use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
64use crate::planner::LogicalPlan;
65use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
66
67/// SQL statement executor.
68///
69/// The Executor takes a [`LogicalPlan`] and executes it against the storage layer.
70/// It manages transactions and coordinates between DDL, DML, and Query operations.
71///
72/// # Type Parameters
73///
74/// - `S`: The underlying KV store type (must implement [`KVStore`])
75/// - `C`: The catalog type (must implement [`Catalog`])
76pub struct Executor<S: KVStore, C: Catalog> {
77    /// Transaction bridge for storage operations.
78    bridge: TxnBridge<S>,
79
80    /// Catalog for metadata operations.
81    catalog: Arc<RwLock<C>>,
82}
83
84impl<S: KVStore, C: Catalog> Executor<S, C> {
85    fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
86    where
87        F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
88    {
89        let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
90        match f(&mut txn) {
91            Ok(result) => {
92                txn.commit().map_err(ExecutorError::from)?;
93                Ok(result)
94            }
95            Err(err) => {
96                txn.rollback().map_err(ExecutorError::from)?;
97                Err(err)
98            }
99        }
100    }
101
102    /// Create a new Executor with the given store and catalog.
103    ///
104    /// # Arguments
105    ///
106    /// - `store`: The underlying KV store
107    /// - `catalog`: The catalog for metadata operations
108    pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
109        Self {
110            bridge: TxnBridge::new(store),
111            catalog,
112        }
113    }
114
115    /// Execute a logical plan and return the result.
116    ///
117    /// # Arguments
118    ///
119    /// - `plan`: The logical plan to execute
120    ///
121    /// # Returns
122    ///
123    /// Returns an [`ExecutionResult`] on success, or an [`ExecutorError`] on failure.
124    ///
125    /// # DDL Operations
126    ///
127    /// - `CreateTable`: Creates a new table with optional PK index
128    /// - `DropTable`: Drops a table and its associated indexes
129    /// - `CreateIndex`: Creates a new index
130    /// - `DropIndex`: Drops an index
131    ///
132    /// # DML Operations
133    ///
134    /// - `Insert`: Inserts rows into a table
135    /// - `Update`: Updates rows in a table
136    /// - `Delete`: Deletes rows from a table
137    ///
138    /// # Query Operations
139    ///
140    /// - `Scan`, `Filter`, `Sort`, `Limit`: SELECT query execution
141    pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
142        match plan {
143            // DDL Operations
144            LogicalPlan::CreateTable {
145                table,
146                if_not_exists,
147                with_options,
148            } => self.execute_create_table(table, with_options, if_not_exists),
149            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
150            LogicalPlan::CreateIndex {
151                index,
152                if_not_exists,
153            } => self.execute_create_index(index, if_not_exists),
154            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
155
156            // DML Operations
157            LogicalPlan::Insert {
158                table,
159                columns,
160                values,
161            } => self.execute_insert(&table, columns, values),
162            LogicalPlan::Update {
163                table,
164                assignments,
165                filter,
166            } => self.execute_update(&table, assignments, filter),
167            LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
168
169            // Query Operations
170            LogicalPlan::Scan { .. }
171            | LogicalPlan::Filter { .. }
172            | LogicalPlan::Aggregate { .. }
173            | LogicalPlan::Sort { .. }
174            | LogicalPlan::Limit { .. } => self.execute_query(plan),
175        }
176    }
177
178    // ========================================================================
179    // DDL Operations (to be implemented in Phase 2)
180    // ========================================================================
181
182    fn execute_create_table(
183        &mut self,
184        table: crate::catalog::TableMetadata,
185        with_options: Vec<(String, String)>,
186        if_not_exists: bool,
187    ) -> Result<ExecutionResult> {
188        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
189        self.run_in_write_txn(|txn| {
190            ddl::create_table::execute_create_table(
191                txn,
192                &mut *catalog,
193                table,
194                with_options,
195                if_not_exists,
196            )
197        })
198    }
199
200    fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
201        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
202        self.run_in_write_txn(|txn| {
203            ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
204        })
205    }
206
207    fn execute_create_index(
208        &mut self,
209        index: crate::catalog::IndexMetadata,
210        if_not_exists: bool,
211    ) -> Result<ExecutionResult> {
212        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
213        self.run_in_write_txn(|txn| {
214            ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
215        })
216    }
217
218    fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
219        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
220        self.run_in_write_txn(|txn| {
221            ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
222        })
223    }
224
225    // ========================================================================
226    // DML Operations (implemented in Phase 4)
227    // ========================================================================
228
229    fn execute_insert(
230        &mut self,
231        table: &str,
232        columns: Vec<String>,
233        values: Vec<Vec<crate::planner::TypedExpr>>,
234    ) -> Result<ExecutionResult> {
235        let catalog = self.catalog.read().expect("catalog lock poisoned");
236        self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
237    }
238
239    fn execute_update(
240        &mut self,
241        table: &str,
242        assignments: Vec<crate::planner::TypedAssignment>,
243        filter: Option<crate::planner::TypedExpr>,
244    ) -> Result<ExecutionResult> {
245        let catalog = self.catalog.read().expect("catalog lock poisoned");
246        self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
247    }
248
249    fn execute_delete(
250        &mut self,
251        table: &str,
252        filter: Option<crate::planner::TypedExpr>,
253    ) -> Result<ExecutionResult> {
254        let catalog = self.catalog.read().expect("catalog lock poisoned");
255        self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
256    }
257
258    // ========================================================================
259    // Query Operations (to be implemented in Phase 5)
260    // ========================================================================
261
262    fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
263        let catalog = self.catalog.read().expect("catalog lock poisoned");
264        self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
265    }
266}
267
268impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
269    pub fn execute_in_txn<'a, 'b, 'c>(
270        &mut self,
271        plan: LogicalPlan,
272        txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
273    ) -> Result<ExecutionResult> {
274        if txn.mode() == TxnMode::ReadOnly
275            && !matches!(
276                plan,
277                LogicalPlan::Scan { .. }
278                    | LogicalPlan::Filter { .. }
279                    | LogicalPlan::Aggregate { .. }
280                    | LogicalPlan::Sort { .. }
281                    | LogicalPlan::Limit { .. }
282            )
283        {
284            return Err(ExecutorError::ReadOnlyTransaction {
285                operation: plan.operation_name().to_string(),
286            });
287        }
288
289        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
290        let (mut sql_txn, overlay) = txn.split_parts();
291
292        let result = match plan {
293            LogicalPlan::CreateTable {
294                table,
295                if_not_exists,
296                with_options,
297            } => self.execute_create_table_in_txn(
298                &mut *catalog,
299                &mut sql_txn,
300                overlay,
301                table,
302                with_options,
303                if_not_exists,
304            ),
305            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
306                &mut *catalog,
307                &mut sql_txn,
308                overlay,
309                &name,
310                if_exists,
311            ),
312            LogicalPlan::CreateIndex {
313                index,
314                if_not_exists,
315            } => self.execute_create_index_in_txn(
316                &mut *catalog,
317                &mut sql_txn,
318                overlay,
319                index,
320                if_not_exists,
321            ),
322            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
323                &mut *catalog,
324                &mut sql_txn,
325                overlay,
326                &name,
327                if_exists,
328            ),
329            LogicalPlan::Insert {
330                table,
331                columns,
332                values,
333            } => {
334                let view = TxnCatalogView::new(&*catalog, &*overlay);
335                dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
336            }
337            LogicalPlan::Update {
338                table,
339                assignments,
340                filter,
341            } => {
342                let view = TxnCatalogView::new(&*catalog, &*overlay);
343                dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
344            }
345            LogicalPlan::Delete { table, filter } => {
346                let view = TxnCatalogView::new(&*catalog, &*overlay);
347                dml::execute_delete(&mut sql_txn, &view, &table, filter)
348            }
349            LogicalPlan::Scan { .. }
350            | LogicalPlan::Filter { .. }
351            | LogicalPlan::Aggregate { .. }
352            | LogicalPlan::Sort { .. }
353            | LogicalPlan::Limit { .. } => {
354                let view = TxnCatalogView::new(&*catalog, &*overlay);
355                query::execute_query(&mut sql_txn, &view, plan)
356            }
357        };
358
359        match result {
360            Ok(value) => {
361                sql_txn.flush_hnsw()?;
362                Ok(value)
363            }
364            Err(err) => {
365                let _ = sql_txn.abandon_hnsw();
366                Err(err)
367            }
368        }
369    }
370
371    fn map_catalog_error(err: CatalogError) -> ExecutorError {
372        match err {
373            CatalogError::Kv(e) => ExecutorError::Core(e),
374            CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
375                operation: "CatalogPersistence".into(),
376                reason: e.to_string(),
377            },
378            CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
379                operation: "CatalogPersistence".into(),
380                reason,
381            },
382        }
383    }
384
385    fn execute_create_table_in_txn<'txn>(
386        &self,
387        catalog: &mut PersistentCatalog<S>,
388        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
389        overlay: &mut CatalogOverlay,
390        mut table: crate::catalog::TableMetadata,
391        with_options: Vec<(String, String)>,
392        if_not_exists: bool,
393    ) -> Result<ExecutionResult>
394    where
395        S: 'txn,
396    {
397        if catalog.table_exists_in_txn(&table.name, overlay) {
398            return if if_not_exists {
399                Ok(ExecutionResult::Success)
400            } else {
401                Err(ExecutorError::TableAlreadyExists(table.name))
402            };
403        }
404
405        table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
406
407        let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
408            let column_indices = pk_columns
409                .iter()
410                .map(|name| {
411                    table
412                        .get_column_index(name)
413                        .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
414                })
415                .collect::<Result<Vec<_>>>()?;
416            let index_id = catalog.next_index_id();
417            let index_name = ddl::create_pk_index_name(&table.name);
418            let mut index = crate::catalog::IndexMetadata::new(
419                index_id,
420                index_name,
421                table.name.clone(),
422                pk_columns,
423            )
424            .with_column_indices(column_indices)
425            .with_unique(true);
426            index.catalog_name = table.catalog_name.clone();
427            index.namespace_name = table.namespace_name.clone();
428            Some(index)
429        } else {
430            None
431        };
432
433        let table_id = catalog.next_table_id();
434        table = table.with_table_id(table_id);
435
436        // storage keyspace の初期化
437        txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
438        txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
439
440        // 永続化(同一 KV トランザクション内)
441        catalog
442            .persist_create_table(txn.inner_mut(), &table)
443            .map_err(Self::map_catalog_error)?;
444        if let Some(index) = &pk_index {
445            catalog
446                .persist_create_index(txn.inner_mut(), index)
447                .map_err(Self::map_catalog_error)?;
448        }
449
450        // オーバーレイに反映(ベースカタログはコミットまで不変)
451        overlay.add_table(TableFqn::from(&table), table);
452        if let Some(index) = pk_index {
453            overlay.add_index(IndexFqn::from(&index), index);
454        }
455
456        Ok(ExecutionResult::Success)
457    }
458
459    fn execute_drop_table_in_txn<'txn>(
460        &self,
461        catalog: &mut PersistentCatalog<S>,
462        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
463        overlay: &mut CatalogOverlay,
464        table_name: &str,
465        if_exists: bool,
466    ) -> Result<ExecutionResult>
467    where
468        S: 'txn,
469    {
470        let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
471            Some(table) => table.clone(),
472            None => {
473                return if if_exists {
474                    Ok(ExecutionResult::Success)
475                } else {
476                    Err(ExecutorError::TableNotFound(table_name.to_string()))
477                };
478            }
479        };
480        if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
481            return if if_exists {
482                Ok(ExecutionResult::Success)
483            } else {
484                Err(ExecutorError::TableNotFound(table_name.to_string()))
485            };
486        }
487
488        let indexes = TxnCatalogView::new(catalog, overlay)
489            .get_indexes_for_table(table_name)
490            .into_iter()
491            .cloned()
492            .collect::<Vec<_>>();
493
494        for index in &indexes {
495            if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
496                crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
497            } else {
498                txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
499            }
500        }
501
502        txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
503        txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
504
505        catalog
506            .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
507            .map_err(Self::map_catalog_error)?;
508
509        overlay.drop_table(&TableFqn::from(&table_meta));
510
511        Ok(ExecutionResult::Success)
512    }
513
514    fn execute_create_index_in_txn<'txn>(
515        &self,
516        catalog: &mut PersistentCatalog<S>,
517        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
518        overlay: &mut CatalogOverlay,
519        mut index: crate::catalog::IndexMetadata,
520        if_not_exists: bool,
521    ) -> Result<ExecutionResult>
522    where
523        S: 'txn,
524    {
525        if ddl::is_implicit_pk_index(&index.name) {
526            return Err(ExecutorError::InvalidIndexName {
527                name: index.name.clone(),
528                reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
529            });
530        }
531
532        if catalog.index_exists_in_txn(&index.name, overlay) {
533            return if if_not_exists {
534                Ok(ExecutionResult::Success)
535            } else {
536                Err(ExecutorError::IndexAlreadyExists(index.name))
537            };
538        }
539
540        let table = catalog
541            .get_table_in_txn(&index.table, overlay)
542            .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
543            .clone();
544        index.catalog_name = table.catalog_name.clone();
545        index.namespace_name = table.namespace_name.clone();
546
547        let column_indices = index
548            .columns
549            .iter()
550            .map(|name| {
551                table
552                    .get_column_index(name)
553                    .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
554            })
555            .collect::<Result<Vec<_>>>()?;
556
557        let index_id = catalog.next_index_id();
558        index.index_id = index_id;
559        index.column_indices = column_indices.clone();
560
561        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
562            crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
563        } else {
564            ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
565        }
566
567        catalog
568            .persist_create_index(txn.inner_mut(), &index)
569            .map_err(Self::map_catalog_error)?;
570
571        overlay.add_index(IndexFqn::from(&index), index);
572
573        Ok(ExecutionResult::Success)
574    }
575
576    fn execute_drop_index_in_txn<'txn>(
577        &self,
578        catalog: &mut PersistentCatalog<S>,
579        txn: &mut impl crate::storage::SqlTxn<'txn, S>,
580        overlay: &mut CatalogOverlay,
581        index_name: &str,
582        if_exists: bool,
583    ) -> Result<ExecutionResult>
584    where
585        S: 'txn,
586    {
587        if ddl::is_implicit_pk_index(index_name) {
588            return Err(ExecutorError::InvalidOperation {
589                operation: "DROP INDEX".into(),
590                reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
591            });
592        }
593
594        let index = match catalog.get_index_in_txn(index_name, overlay) {
595            Some(index) => index.clone(),
596            None => {
597                return if if_exists {
598                    Ok(ExecutionResult::Success)
599                } else {
600                    Err(ExecutorError::IndexNotFound(index_name.to_string()))
601                };
602            }
603        };
604        if index.catalog_name != "default" || index.namespace_name != "default" {
605            return if if_exists {
606                Ok(ExecutionResult::Success)
607            } else {
608                Err(ExecutorError::IndexNotFound(index_name.to_string()))
609            };
610        }
611
612        if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
613            crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
614        } else {
615            txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
616        }
617
618        catalog
619            .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
620            .map_err(Self::map_catalog_error)?;
621
622        overlay.drop_index(&IndexFqn::from(&index));
623
624        Ok(ExecutionResult::Success)
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use super::*;
631    use crate::catalog::MemoryCatalog;
632    use alopex_core::kv::memory::MemoryKV;
633
634    fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
635        let store = Arc::new(MemoryKV::new());
636        let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
637        Executor::new(store, catalog)
638    }
639
640    #[test]
641    fn test_executor_creation() {
642        let _executor = create_executor();
643        // Executor should be created without panic
644    }
645
646    #[test]
647    fn create_table_is_supported() {
648        let mut executor = create_executor();
649
650        use crate::catalog::{ColumnMetadata, TableMetadata};
651        use crate::planner::ResolvedType;
652
653        let table = TableMetadata::new(
654            "test",
655            vec![ColumnMetadata::new("id", ResolvedType::Integer)],
656        );
657
658        let result = executor.execute(LogicalPlan::CreateTable {
659            table,
660            if_not_exists: false,
661            with_options: vec![],
662        });
663        assert!(matches!(result, Ok(ExecutionResult::Success)));
664
665        let catalog = executor.catalog.read().unwrap();
666        assert!(catalog.table_exists("test"));
667    }
668
669    #[test]
670    fn insert_is_supported() {
671        use crate::Span;
672        use crate::catalog::{ColumnMetadata, TableMetadata};
673        use crate::planner::typed_expr::TypedExprKind;
674        use crate::planner::types::ResolvedType;
675
676        let mut executor = create_executor();
677
678        let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
679            .with_primary_key(vec!["id".into()]);
680
681        executor
682            .execute(LogicalPlan::CreateTable {
683                table,
684                if_not_exists: false,
685                with_options: vec![],
686            })
687            .unwrap();
688
689        let result = executor.execute(LogicalPlan::Insert {
690            table: "t".into(),
691            columns: vec!["id".into()],
692            values: vec![vec![crate::planner::typed_expr::TypedExpr {
693                kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
694                resolved_type: ResolvedType::Integer,
695                span: Span::default(),
696            }]],
697        });
698        assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
699    }
700}