1pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use result::{ColumnInfo, ExecutionResult, QueryResult, Row};
50
51use std::sync::{Arc, RwLock};
52
53use alopex_core::kv::KVStore;
54use alopex_core::types::TxnMode;
55
56use crate::catalog::Catalog;
57use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
58use crate::planner::LogicalPlan;
59use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
60
61pub struct Executor<S: KVStore, C: Catalog> {
71 bridge: TxnBridge<S>,
73
74 catalog: Arc<RwLock<C>>,
76}
77
78impl<S: KVStore, C: Catalog> Executor<S, C> {
79 fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
80 where
81 F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
82 {
83 let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
84 match f(&mut txn) {
85 Ok(result) => {
86 txn.commit().map_err(ExecutorError::from)?;
87 Ok(result)
88 }
89 Err(err) => {
90 txn.rollback().map_err(ExecutorError::from)?;
91 Err(err)
92 }
93 }
94 }
95
96 pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
103 Self {
104 bridge: TxnBridge::new(store),
105 catalog,
106 }
107 }
108
109 pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
136 match plan {
137 LogicalPlan::CreateTable {
139 table,
140 if_not_exists,
141 with_options,
142 } => self.execute_create_table(table, with_options, if_not_exists),
143 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
144 LogicalPlan::CreateIndex {
145 index,
146 if_not_exists,
147 } => self.execute_create_index(index, if_not_exists),
148 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
149
150 LogicalPlan::Insert {
152 table,
153 columns,
154 values,
155 } => self.execute_insert(&table, columns, values),
156 LogicalPlan::Update {
157 table,
158 assignments,
159 filter,
160 } => self.execute_update(&table, assignments, filter),
161 LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
162
163 LogicalPlan::Scan { .. }
165 | LogicalPlan::Filter { .. }
166 | LogicalPlan::Sort { .. }
167 | LogicalPlan::Limit { .. } => self.execute_query(plan),
168 }
169 }
170
171 fn execute_create_table(
176 &mut self,
177 table: crate::catalog::TableMetadata,
178 with_options: Vec<(String, String)>,
179 if_not_exists: bool,
180 ) -> Result<ExecutionResult> {
181 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
182 self.run_in_write_txn(|txn| {
183 ddl::create_table::execute_create_table(
184 txn,
185 &mut *catalog,
186 table,
187 with_options,
188 if_not_exists,
189 )
190 })
191 }
192
193 fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
194 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
195 self.run_in_write_txn(|txn| {
196 ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
197 })
198 }
199
200 fn execute_create_index(
201 &mut self,
202 index: crate::catalog::IndexMetadata,
203 if_not_exists: bool,
204 ) -> Result<ExecutionResult> {
205 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
206 self.run_in_write_txn(|txn| {
207 ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
208 })
209 }
210
211 fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
212 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
213 self.run_in_write_txn(|txn| {
214 ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
215 })
216 }
217
218 fn execute_insert(
223 &mut self,
224 table: &str,
225 columns: Vec<String>,
226 values: Vec<Vec<crate::planner::TypedExpr>>,
227 ) -> Result<ExecutionResult> {
228 let catalog = self.catalog.read().expect("catalog lock poisoned");
229 self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
230 }
231
232 fn execute_update(
233 &mut self,
234 table: &str,
235 assignments: Vec<crate::planner::TypedAssignment>,
236 filter: Option<crate::planner::TypedExpr>,
237 ) -> Result<ExecutionResult> {
238 let catalog = self.catalog.read().expect("catalog lock poisoned");
239 self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
240 }
241
242 fn execute_delete(
243 &mut self,
244 table: &str,
245 filter: Option<crate::planner::TypedExpr>,
246 ) -> Result<ExecutionResult> {
247 let catalog = self.catalog.read().expect("catalog lock poisoned");
248 self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
249 }
250
251 fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
256 let catalog = self.catalog.read().expect("catalog lock poisoned");
257 self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
258 }
259}
260
261impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
262 pub fn execute_in_txn<'a, 'b, 'c>(
263 &mut self,
264 plan: LogicalPlan,
265 txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
266 ) -> Result<ExecutionResult> {
267 if txn.mode() == TxnMode::ReadOnly
268 && !matches!(
269 plan,
270 LogicalPlan::Scan { .. }
271 | LogicalPlan::Filter { .. }
272 | LogicalPlan::Sort { .. }
273 | LogicalPlan::Limit { .. }
274 )
275 {
276 return Err(ExecutorError::ReadOnlyTransaction {
277 operation: plan.operation_name().to_string(),
278 });
279 }
280
281 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
282 let (mut sql_txn, overlay) = txn.split_parts();
283
284 let result = match plan {
285 LogicalPlan::CreateTable {
286 table,
287 if_not_exists,
288 with_options,
289 } => self.execute_create_table_in_txn(
290 &mut *catalog,
291 &mut sql_txn,
292 overlay,
293 table,
294 with_options,
295 if_not_exists,
296 ),
297 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
298 &mut *catalog,
299 &mut sql_txn,
300 overlay,
301 &name,
302 if_exists,
303 ),
304 LogicalPlan::CreateIndex {
305 index,
306 if_not_exists,
307 } => self.execute_create_index_in_txn(
308 &mut *catalog,
309 &mut sql_txn,
310 overlay,
311 index,
312 if_not_exists,
313 ),
314 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
315 &mut *catalog,
316 &mut sql_txn,
317 overlay,
318 &name,
319 if_exists,
320 ),
321 LogicalPlan::Insert {
322 table,
323 columns,
324 values,
325 } => {
326 let view = TxnCatalogView::new(&*catalog, &*overlay);
327 dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
328 }
329 LogicalPlan::Update {
330 table,
331 assignments,
332 filter,
333 } => {
334 let view = TxnCatalogView::new(&*catalog, &*overlay);
335 dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
336 }
337 LogicalPlan::Delete { table, filter } => {
338 let view = TxnCatalogView::new(&*catalog, &*overlay);
339 dml::execute_delete(&mut sql_txn, &view, &table, filter)
340 }
341 LogicalPlan::Scan { .. }
342 | LogicalPlan::Filter { .. }
343 | LogicalPlan::Sort { .. }
344 | LogicalPlan::Limit { .. } => {
345 let view = TxnCatalogView::new(&*catalog, &*overlay);
346 query::execute_query(&mut sql_txn, &view, plan)
347 }
348 };
349
350 match result {
351 Ok(value) => {
352 sql_txn.flush_hnsw()?;
353 Ok(value)
354 }
355 Err(err) => {
356 let _ = sql_txn.abandon_hnsw();
357 Err(err)
358 }
359 }
360 }
361
362 fn map_catalog_error(err: CatalogError) -> ExecutorError {
363 match err {
364 CatalogError::Kv(e) => ExecutorError::Core(e),
365 CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
366 operation: "CatalogPersistence".into(),
367 reason: e.to_string(),
368 },
369 CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
370 operation: "CatalogPersistence".into(),
371 reason,
372 },
373 }
374 }
375
376 fn execute_create_table_in_txn<'txn>(
377 &self,
378 catalog: &mut PersistentCatalog<S>,
379 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
380 overlay: &mut CatalogOverlay,
381 mut table: crate::catalog::TableMetadata,
382 with_options: Vec<(String, String)>,
383 if_not_exists: bool,
384 ) -> Result<ExecutionResult>
385 where
386 S: 'txn,
387 {
388 if catalog.table_exists_in_txn(&table.name, overlay) {
389 return if if_not_exists {
390 Ok(ExecutionResult::Success)
391 } else {
392 Err(ExecutorError::TableAlreadyExists(table.name))
393 };
394 }
395
396 table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
397
398 let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
399 let column_indices = pk_columns
400 .iter()
401 .map(|name| {
402 table
403 .get_column_index(name)
404 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
405 })
406 .collect::<Result<Vec<_>>>()?;
407 let index_id = catalog.next_index_id();
408 let index_name = ddl::create_pk_index_name(&table.name);
409 Some(
410 crate::catalog::IndexMetadata::new(
411 index_id,
412 index_name,
413 table.name.clone(),
414 pk_columns,
415 )
416 .with_column_indices(column_indices)
417 .with_unique(true),
418 )
419 } else {
420 None
421 };
422
423 let table_id = catalog.next_table_id();
424 table = table.with_table_id(table_id);
425
426 txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
428 txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
429
430 catalog
432 .persist_create_table(txn.inner_mut(), &table)
433 .map_err(Self::map_catalog_error)?;
434 if let Some(index) = &pk_index {
435 catalog
436 .persist_create_index(txn.inner_mut(), index)
437 .map_err(Self::map_catalog_error)?;
438 }
439
440 overlay.add_table(table);
442 if let Some(index) = pk_index {
443 overlay.add_index(index);
444 }
445
446 Ok(ExecutionResult::Success)
447 }
448
449 fn execute_drop_table_in_txn<'txn>(
450 &self,
451 catalog: &mut PersistentCatalog<S>,
452 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
453 overlay: &mut CatalogOverlay,
454 table_name: &str,
455 if_exists: bool,
456 ) -> Result<ExecutionResult>
457 where
458 S: 'txn,
459 {
460 let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
461 Some(table) => table.clone(),
462 None => {
463 return if if_exists {
464 Ok(ExecutionResult::Success)
465 } else {
466 Err(ExecutorError::TableNotFound(table_name.to_string()))
467 };
468 }
469 };
470
471 let indexes = TxnCatalogView::new(catalog, overlay)
472 .get_indexes_for_table(table_name)
473 .into_iter()
474 .cloned()
475 .collect::<Vec<_>>();
476
477 for index in &indexes {
478 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
479 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
480 } else {
481 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
482 }
483 }
484
485 txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
486 txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
487
488 catalog
489 .persist_drop_table(txn.inner_mut(), table_name)
490 .map_err(Self::map_catalog_error)?;
491
492 overlay.drop_table(table_name);
493
494 Ok(ExecutionResult::Success)
495 }
496
497 fn execute_create_index_in_txn<'txn>(
498 &self,
499 catalog: &mut PersistentCatalog<S>,
500 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
501 overlay: &mut CatalogOverlay,
502 mut index: crate::catalog::IndexMetadata,
503 if_not_exists: bool,
504 ) -> Result<ExecutionResult>
505 where
506 S: 'txn,
507 {
508 if ddl::is_implicit_pk_index(&index.name) {
509 return Err(ExecutorError::InvalidIndexName {
510 name: index.name.clone(),
511 reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
512 });
513 }
514
515 if catalog.index_exists_in_txn(&index.name, overlay) {
516 return if if_not_exists {
517 Ok(ExecutionResult::Success)
518 } else {
519 Err(ExecutorError::IndexAlreadyExists(index.name))
520 };
521 }
522
523 let table = catalog
524 .get_table_in_txn(&index.table, overlay)
525 .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
526 .clone();
527
528 let column_indices = index
529 .columns
530 .iter()
531 .map(|name| {
532 table
533 .get_column_index(name)
534 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
535 })
536 .collect::<Result<Vec<_>>>()?;
537
538 let index_id = catalog.next_index_id();
539 index.index_id = index_id;
540 index.column_indices = column_indices.clone();
541
542 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
543 crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
544 } else {
545 ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
546 }
547
548 catalog
549 .persist_create_index(txn.inner_mut(), &index)
550 .map_err(Self::map_catalog_error)?;
551
552 overlay.add_index(index);
553
554 Ok(ExecutionResult::Success)
555 }
556
557 fn execute_drop_index_in_txn<'txn>(
558 &self,
559 catalog: &mut PersistentCatalog<S>,
560 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
561 overlay: &mut CatalogOverlay,
562 index_name: &str,
563 if_exists: bool,
564 ) -> Result<ExecutionResult>
565 where
566 S: 'txn,
567 {
568 if ddl::is_implicit_pk_index(index_name) {
569 return Err(ExecutorError::InvalidOperation {
570 operation: "DROP INDEX".into(),
571 reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
572 });
573 }
574
575 let index = match catalog.get_index_in_txn(index_name, overlay) {
576 Some(index) => index.clone(),
577 None => {
578 return if if_exists {
579 Ok(ExecutionResult::Success)
580 } else {
581 Err(ExecutorError::IndexNotFound(index_name.to_string()))
582 };
583 }
584 };
585
586 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
587 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
588 } else {
589 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
590 }
591
592 catalog
593 .persist_drop_index(txn.inner_mut(), index_name)
594 .map_err(Self::map_catalog_error)?;
595
596 overlay.drop_index(index_name);
597
598 Ok(ExecutionResult::Success)
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605 use crate::catalog::MemoryCatalog;
606 use alopex_core::kv::memory::MemoryKV;
607
608 fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
609 let store = Arc::new(MemoryKV::new());
610 let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
611 Executor::new(store, catalog)
612 }
613
614 #[test]
615 fn test_executor_creation() {
616 let _executor = create_executor();
617 }
619
620 #[test]
621 fn create_table_is_supported() {
622 let mut executor = create_executor();
623
624 use crate::catalog::{ColumnMetadata, TableMetadata};
625 use crate::planner::ResolvedType;
626
627 let table = TableMetadata::new(
628 "test",
629 vec![ColumnMetadata::new("id", ResolvedType::Integer)],
630 );
631
632 let result = executor.execute(LogicalPlan::CreateTable {
633 table,
634 if_not_exists: false,
635 with_options: vec![],
636 });
637 assert!(matches!(result, Ok(ExecutionResult::Success)));
638
639 let catalog = executor.catalog.read().unwrap();
640 assert!(catalog.table_exists("test"));
641 }
642
643 #[test]
644 fn insert_is_supported() {
645 use crate::Span;
646 use crate::catalog::{ColumnMetadata, TableMetadata};
647 use crate::planner::typed_expr::TypedExprKind;
648 use crate::planner::types::ResolvedType;
649
650 let mut executor = create_executor();
651
652 let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
653 .with_primary_key(vec!["id".into()]);
654
655 executor
656 .execute(LogicalPlan::CreateTable {
657 table,
658 if_not_exists: false,
659 with_options: vec![],
660 })
661 .unwrap();
662
663 let result = executor.execute(LogicalPlan::Insert {
664 table: "t".into(),
665 columns: vec!["id".into()],
666 values: vec![vec![crate::planner::typed_expr::TypedExpr {
667 kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
668 resolved_type: ResolvedType::Integer,
669 span: Span::default(),
670 }]],
671 });
672 assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
673 }
674}