alopex_sql/executor/
mod.rs

1//! SQL Executor module for Alopex SQL.
2//!
3//! This module provides the execution engine for SQL statements.
4//!
5//! # Overview
6//!
7//! The Executor takes a [`LogicalPlan`] from the Planner and executes it
8//! against the storage layer. It supports DDL, DML, and Query operations.
9//!
10//! Query execution currently materializes intermediate results per stage;
11//! future versions may add streaming pipelines as requirements grow.
12
13//! # Components
14//!
15//! - [`Executor`]: Main executor struct
16//! - [`ExecutorError`]: Error types for execution
17//! - [`ExecutionResult`]: Execution result types
18//!
19//! # Example
20//!
21//! ```ignore
22//! use std::sync::{Arc, RwLock};
23//! use alopex_core::kv::memory::MemoryKV;
24//! use alopex_sql::executor::Executor;
25//! use alopex_sql::catalog::MemoryCatalog;
26//! use alopex_sql::planner::LogicalPlan;
27//!
28//! // Create storage and catalog
29//! let store = Arc::new(MemoryKV::new());
30//! let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
31//!
32//! // Create executor
33//! let mut executor = Executor::new(store, catalog);
34//!
35//! // Execute a plan
36//! let result = executor.execute(plan)?;
37//! ```
38
39pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use result::{ColumnInfo, ExecutionResult, QueryResult, Row};
50
51use std::sync::{Arc, RwLock};
52
53use alopex_core::kv::KVStore;
54
55use crate::catalog::Catalog;
56use crate::planner::LogicalPlan;
57use crate::storage::{SqlTransaction, TxnBridge};
58
59/// SQL statement executor.
60///
61/// The Executor takes a [`LogicalPlan`] and executes it against the storage layer.
62/// It manages transactions and coordinates between DDL, DML, and Query operations.
63///
64/// # Type Parameters
65///
66/// - `S`: The underlying KV store type (must implement [`KVStore`])
67/// - `C`: The catalog type (must implement [`Catalog`])
68pub struct Executor<S: KVStore, C: Catalog> {
69    /// Transaction bridge for storage operations.
70    bridge: TxnBridge<S>,
71
72    /// Catalog for metadata operations.
73    catalog: Arc<RwLock<C>>,
74}
75
76impl<S: KVStore, C: Catalog> Executor<S, C> {
77    fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
78    where
79        F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
80    {
81        let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
82        match f(&mut txn) {
83            Ok(result) => {
84                txn.commit().map_err(ExecutorError::from)?;
85                Ok(result)
86            }
87            Err(err) => {
88                txn.rollback().map_err(ExecutorError::from)?;
89                Err(err)
90            }
91        }
92    }
93
94    /// Create a new Executor with the given store and catalog.
95    ///
96    /// # Arguments
97    ///
98    /// - `store`: The underlying KV store
99    /// - `catalog`: The catalog for metadata operations
100    pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
101        Self {
102            bridge: TxnBridge::new(store),
103            catalog,
104        }
105    }
106
107    /// Execute a logical plan and return the result.
108    ///
109    /// # Arguments
110    ///
111    /// - `plan`: The logical plan to execute
112    ///
113    /// # Returns
114    ///
115    /// Returns an [`ExecutionResult`] on success, or an [`ExecutorError`] on failure.
116    ///
117    /// # DDL Operations
118    ///
119    /// - `CreateTable`: Creates a new table with optional PK index
120    /// - `DropTable`: Drops a table and its associated indexes
121    /// - `CreateIndex`: Creates a new index
122    /// - `DropIndex`: Drops an index
123    ///
124    /// # DML Operations
125    ///
126    /// - `Insert`: Inserts rows into a table
127    /// - `Update`: Updates rows in a table
128    /// - `Delete`: Deletes rows from a table
129    ///
130    /// # Query Operations
131    ///
132    /// - `Scan`, `Filter`, `Sort`, `Limit`: SELECT query execution
133    pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
134        match plan {
135            // DDL Operations
136            LogicalPlan::CreateTable {
137                table,
138                if_not_exists,
139                with_options,
140            } => self.execute_create_table(table, with_options, if_not_exists),
141            LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
142            LogicalPlan::CreateIndex {
143                index,
144                if_not_exists,
145            } => self.execute_create_index(index, if_not_exists),
146            LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
147
148            // DML Operations
149            LogicalPlan::Insert {
150                table,
151                columns,
152                values,
153            } => self.execute_insert(&table, columns, values),
154            LogicalPlan::Update {
155                table,
156                assignments,
157                filter,
158            } => self.execute_update(&table, assignments, filter),
159            LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
160
161            // Query Operations
162            LogicalPlan::Scan { .. }
163            | LogicalPlan::Filter { .. }
164            | LogicalPlan::Sort { .. }
165            | LogicalPlan::Limit { .. } => self.execute_query(plan),
166        }
167    }
168
169    // ========================================================================
170    // DDL Operations (to be implemented in Phase 2)
171    // ========================================================================
172
173    fn execute_create_table(
174        &mut self,
175        table: crate::catalog::TableMetadata,
176        with_options: Vec<(String, String)>,
177        if_not_exists: bool,
178    ) -> Result<ExecutionResult> {
179        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
180        self.run_in_write_txn(|txn| {
181            ddl::create_table::execute_create_table(
182                txn,
183                &mut *catalog,
184                table,
185                with_options,
186                if_not_exists,
187            )
188        })
189    }
190
191    fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
192        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
193        self.run_in_write_txn(|txn| {
194            ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
195        })
196    }
197
198    fn execute_create_index(
199        &mut self,
200        index: crate::catalog::IndexMetadata,
201        if_not_exists: bool,
202    ) -> Result<ExecutionResult> {
203        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
204        self.run_in_write_txn(|txn| {
205            ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
206        })
207    }
208
209    fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
210        let mut catalog = self.catalog.write().expect("catalog lock poisoned");
211        self.run_in_write_txn(|txn| {
212            ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
213        })
214    }
215
216    // ========================================================================
217    // DML Operations (implemented in Phase 4)
218    // ========================================================================
219
220    fn execute_insert(
221        &mut self,
222        table: &str,
223        columns: Vec<String>,
224        values: Vec<Vec<crate::planner::TypedExpr>>,
225    ) -> Result<ExecutionResult> {
226        let catalog = self.catalog.read().expect("catalog lock poisoned");
227        self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
228    }
229
230    fn execute_update(
231        &mut self,
232        table: &str,
233        assignments: Vec<crate::planner::TypedAssignment>,
234        filter: Option<crate::planner::TypedExpr>,
235    ) -> Result<ExecutionResult> {
236        let catalog = self.catalog.read().expect("catalog lock poisoned");
237        self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
238    }
239
240    fn execute_delete(
241        &mut self,
242        table: &str,
243        filter: Option<crate::planner::TypedExpr>,
244    ) -> Result<ExecutionResult> {
245        let catalog = self.catalog.read().expect("catalog lock poisoned");
246        self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
247    }
248
249    // ========================================================================
250    // Query Operations (to be implemented in Phase 5)
251    // ========================================================================
252
253    fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
254        let catalog = self.catalog.read().expect("catalog lock poisoned");
255        self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use crate::catalog::MemoryCatalog;
263    use alopex_core::kv::memory::MemoryKV;
264
265    fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
266        let store = Arc::new(MemoryKV::new());
267        let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
268        Executor::new(store, catalog)
269    }
270
271    #[test]
272    fn test_executor_creation() {
273        let _executor = create_executor();
274        // Executor should be created without panic
275    }
276
277    #[test]
278    fn create_table_is_supported() {
279        let mut executor = create_executor();
280
281        use crate::catalog::{ColumnMetadata, TableMetadata};
282        use crate::planner::ResolvedType;
283
284        let table = TableMetadata::new(
285            "test",
286            vec![ColumnMetadata::new("id", ResolvedType::Integer)],
287        );
288
289        let result = executor.execute(LogicalPlan::CreateTable {
290            table,
291            if_not_exists: false,
292            with_options: vec![],
293        });
294        assert!(matches!(result, Ok(ExecutionResult::Success)));
295
296        let catalog = executor.catalog.read().unwrap();
297        assert!(catalog.table_exists("test"));
298    }
299
300    #[test]
301    fn insert_is_supported() {
302        use crate::Span;
303        use crate::catalog::{ColumnMetadata, TableMetadata};
304        use crate::planner::typed_expr::TypedExprKind;
305        use crate::planner::types::ResolvedType;
306
307        let mut executor = create_executor();
308
309        let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
310            .with_primary_key(vec!["id".into()]);
311
312        executor
313            .execute(LogicalPlan::CreateTable {
314                table,
315                if_not_exists: false,
316                with_options: vec![],
317            })
318            .unwrap();
319
320        let result = executor.execute(LogicalPlan::Insert {
321            table: "t".into(),
322            columns: vec!["id".into()],
323            values: vec![vec![crate::planner::typed_expr::TypedExpr {
324                kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
325                resolved_type: ResolvedType::Integer,
326                span: Span::default(),
327            }]],
328        });
329        assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
330    }
331}