1pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use result::{ColumnInfo, ExecutionResult, QueryResult, Row};
50
51use std::sync::{Arc, RwLock};
52
53use alopex_core::kv::KVStore;
54
55use crate::catalog::Catalog;
56use crate::planner::LogicalPlan;
57use crate::storage::{SqlTransaction, TxnBridge};
58
59pub struct Executor<S: KVStore, C: Catalog> {
69 bridge: TxnBridge<S>,
71
72 catalog: Arc<RwLock<C>>,
74}
75
76impl<S: KVStore, C: Catalog> Executor<S, C> {
77 fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
78 where
79 F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
80 {
81 let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
82 match f(&mut txn) {
83 Ok(result) => {
84 txn.commit().map_err(ExecutorError::from)?;
85 Ok(result)
86 }
87 Err(err) => {
88 txn.rollback().map_err(ExecutorError::from)?;
89 Err(err)
90 }
91 }
92 }
93
94 pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
101 Self {
102 bridge: TxnBridge::new(store),
103 catalog,
104 }
105 }
106
107 pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
134 match plan {
135 LogicalPlan::CreateTable {
137 table,
138 if_not_exists,
139 with_options,
140 } => self.execute_create_table(table, with_options, if_not_exists),
141 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
142 LogicalPlan::CreateIndex {
143 index,
144 if_not_exists,
145 } => self.execute_create_index(index, if_not_exists),
146 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
147
148 LogicalPlan::Insert {
150 table,
151 columns,
152 values,
153 } => self.execute_insert(&table, columns, values),
154 LogicalPlan::Update {
155 table,
156 assignments,
157 filter,
158 } => self.execute_update(&table, assignments, filter),
159 LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
160
161 LogicalPlan::Scan { .. }
163 | LogicalPlan::Filter { .. }
164 | LogicalPlan::Sort { .. }
165 | LogicalPlan::Limit { .. } => self.execute_query(plan),
166 }
167 }
168
169 fn execute_create_table(
174 &mut self,
175 table: crate::catalog::TableMetadata,
176 with_options: Vec<(String, String)>,
177 if_not_exists: bool,
178 ) -> Result<ExecutionResult> {
179 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
180 self.run_in_write_txn(|txn| {
181 ddl::create_table::execute_create_table(
182 txn,
183 &mut *catalog,
184 table,
185 with_options,
186 if_not_exists,
187 )
188 })
189 }
190
191 fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
192 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
193 self.run_in_write_txn(|txn| {
194 ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
195 })
196 }
197
198 fn execute_create_index(
199 &mut self,
200 index: crate::catalog::IndexMetadata,
201 if_not_exists: bool,
202 ) -> Result<ExecutionResult> {
203 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
204 self.run_in_write_txn(|txn| {
205 ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
206 })
207 }
208
209 fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
210 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
211 self.run_in_write_txn(|txn| {
212 ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
213 })
214 }
215
216 fn execute_insert(
221 &mut self,
222 table: &str,
223 columns: Vec<String>,
224 values: Vec<Vec<crate::planner::TypedExpr>>,
225 ) -> Result<ExecutionResult> {
226 let catalog = self.catalog.read().expect("catalog lock poisoned");
227 self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
228 }
229
230 fn execute_update(
231 &mut self,
232 table: &str,
233 assignments: Vec<crate::planner::TypedAssignment>,
234 filter: Option<crate::planner::TypedExpr>,
235 ) -> Result<ExecutionResult> {
236 let catalog = self.catalog.read().expect("catalog lock poisoned");
237 self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
238 }
239
240 fn execute_delete(
241 &mut self,
242 table: &str,
243 filter: Option<crate::planner::TypedExpr>,
244 ) -> Result<ExecutionResult> {
245 let catalog = self.catalog.read().expect("catalog lock poisoned");
246 self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
247 }
248
249 fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
254 let catalog = self.catalog.read().expect("catalog lock poisoned");
255 self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use crate::catalog::MemoryCatalog;
263 use alopex_core::kv::memory::MemoryKV;
264
265 fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
266 let store = Arc::new(MemoryKV::new());
267 let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
268 Executor::new(store, catalog)
269 }
270
271 #[test]
272 fn test_executor_creation() {
273 let _executor = create_executor();
274 }
276
277 #[test]
278 fn create_table_is_supported() {
279 let mut executor = create_executor();
280
281 use crate::catalog::{ColumnMetadata, TableMetadata};
282 use crate::planner::ResolvedType;
283
284 let table = TableMetadata::new(
285 "test",
286 vec![ColumnMetadata::new("id", ResolvedType::Integer)],
287 );
288
289 let result = executor.execute(LogicalPlan::CreateTable {
290 table,
291 if_not_exists: false,
292 with_options: vec![],
293 });
294 assert!(matches!(result, Ok(ExecutionResult::Success)));
295
296 let catalog = executor.catalog.read().unwrap();
297 assert!(catalog.table_exists("test"));
298 }
299
300 #[test]
301 fn insert_is_supported() {
302 use crate::Span;
303 use crate::catalog::{ColumnMetadata, TableMetadata};
304 use crate::planner::typed_expr::TypedExprKind;
305 use crate::planner::types::ResolvedType;
306
307 let mut executor = create_executor();
308
309 let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
310 .with_primary_key(vec!["id".into()]);
311
312 executor
313 .execute(LogicalPlan::CreateTable {
314 table,
315 if_not_exists: false,
316 with_options: vec![],
317 })
318 .unwrap();
319
320 let result = executor.execute(LogicalPlan::Insert {
321 table: "t".into(),
322 columns: vec!["id".into()],
323 values: vec![vec![crate::planner::typed_expr::TypedExpr {
324 kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
325 resolved_type: ResolvedType::Integer,
326 span: Span::default(),
327 }]],
328 });
329 assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
330 }
331}