1#[cfg(feature = "tokio")]
40pub mod async_executor;
41pub mod bulk;
42pub(crate) mod ddl;
43pub(crate) mod dml;
44mod error;
45pub mod evaluator;
46mod hnsw_bridge;
47pub mod memory;
48pub mod query;
49mod result;
50
51#[cfg(feature = "tokio")]
52pub use async_executor::AsyncExecutor;
53pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
54pub use memory::{MemoryPolicy, SpillPolicy};
55pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
56pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
57
58use std::sync::{Arc, RwLock};
59
60use alopex_core::kv::KVStore;
61use alopex_core::types::TxnMode;
62
63use crate::catalog::Catalog;
64use crate::catalog::persistent::{IndexFqn, TableFqn};
65use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
66use crate::planner::LogicalPlan;
67use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
68
69pub struct Executor<S: KVStore, C: Catalog> {
79 bridge: TxnBridge<S>,
81
82 catalog: Arc<RwLock<C>>,
84}
85
86impl<S: KVStore, C: Catalog> Executor<S, C> {
87 fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
88 where
89 F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
90 {
91 let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
92 match f(&mut txn) {
93 Ok(result) => {
94 txn.commit().map_err(ExecutorError::from)?;
95 Ok(result)
96 }
97 Err(err) => {
98 txn.rollback().map_err(ExecutorError::from)?;
99 Err(err)
100 }
101 }
102 }
103
104 pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
111 Self {
112 bridge: TxnBridge::new(store),
113 catalog,
114 }
115 }
116
117 pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
144 match plan {
145 LogicalPlan::CreateTable {
147 table,
148 if_not_exists,
149 with_options,
150 } => self.execute_create_table(table, with_options, if_not_exists),
151 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
152 LogicalPlan::CreateIndex {
153 index,
154 if_not_exists,
155 } => self.execute_create_index(index, if_not_exists),
156 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
157
158 LogicalPlan::Insert {
160 table,
161 columns,
162 values,
163 } => self.execute_insert(&table, columns, values),
164 LogicalPlan::Update {
165 table,
166 assignments,
167 filter,
168 } => self.execute_update(&table, assignments, filter),
169 LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
170
171 LogicalPlan::Scan { .. }
173 | LogicalPlan::Filter { .. }
174 | LogicalPlan::Aggregate { .. }
175 | LogicalPlan::Sort { .. }
176 | LogicalPlan::Limit { .. } => self.execute_query(plan),
177 }
178 }
179
180 fn execute_create_table(
185 &mut self,
186 table: crate::catalog::TableMetadata,
187 with_options: Vec<(String, String)>,
188 if_not_exists: bool,
189 ) -> Result<ExecutionResult> {
190 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
191 self.run_in_write_txn(|txn| {
192 ddl::create_table::execute_create_table(
193 txn,
194 &mut *catalog,
195 table,
196 with_options,
197 if_not_exists,
198 )
199 })
200 }
201
202 fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
203 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
204 self.run_in_write_txn(|txn| {
205 ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
206 })
207 }
208
209 fn execute_create_index(
210 &mut self,
211 index: crate::catalog::IndexMetadata,
212 if_not_exists: bool,
213 ) -> Result<ExecutionResult> {
214 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
215 self.run_in_write_txn(|txn| {
216 ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
217 })
218 }
219
220 fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
221 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
222 self.run_in_write_txn(|txn| {
223 ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
224 })
225 }
226
227 fn execute_insert(
232 &mut self,
233 table: &str,
234 columns: Vec<String>,
235 values: Vec<Vec<crate::planner::TypedExpr>>,
236 ) -> Result<ExecutionResult> {
237 let catalog = self.catalog.read().expect("catalog lock poisoned");
238 self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
239 }
240
241 fn execute_update(
242 &mut self,
243 table: &str,
244 assignments: Vec<crate::planner::TypedAssignment>,
245 filter: Option<crate::planner::TypedExpr>,
246 ) -> Result<ExecutionResult> {
247 let catalog = self.catalog.read().expect("catalog lock poisoned");
248 self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
249 }
250
251 fn execute_delete(
252 &mut self,
253 table: &str,
254 filter: Option<crate::planner::TypedExpr>,
255 ) -> Result<ExecutionResult> {
256 let catalog = self.catalog.read().expect("catalog lock poisoned");
257 self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
258 }
259
260 fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
265 let catalog = self.catalog.read().expect("catalog lock poisoned");
266 self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
267 }
268}
269
270impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
271 pub fn execute_in_txn<'a, 'b, 'c>(
272 &mut self,
273 plan: LogicalPlan,
274 txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
275 ) -> Result<ExecutionResult> {
276 if txn.mode() == TxnMode::ReadOnly
277 && !matches!(
278 plan,
279 LogicalPlan::Scan { .. }
280 | LogicalPlan::Filter { .. }
281 | LogicalPlan::Aggregate { .. }
282 | LogicalPlan::Sort { .. }
283 | LogicalPlan::Limit { .. }
284 )
285 {
286 return Err(ExecutorError::ReadOnlyTransaction {
287 operation: plan.operation_name().to_string(),
288 });
289 }
290
291 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
292 let (mut sql_txn, overlay) = txn.split_parts();
293
294 let result = match plan {
295 LogicalPlan::CreateTable {
296 table,
297 if_not_exists,
298 with_options,
299 } => self.execute_create_table_in_txn(
300 &mut *catalog,
301 &mut sql_txn,
302 overlay,
303 table,
304 with_options,
305 if_not_exists,
306 ),
307 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
308 &mut *catalog,
309 &mut sql_txn,
310 overlay,
311 &name,
312 if_exists,
313 ),
314 LogicalPlan::CreateIndex {
315 index,
316 if_not_exists,
317 } => self.execute_create_index_in_txn(
318 &mut *catalog,
319 &mut sql_txn,
320 overlay,
321 index,
322 if_not_exists,
323 ),
324 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
325 &mut *catalog,
326 &mut sql_txn,
327 overlay,
328 &name,
329 if_exists,
330 ),
331 LogicalPlan::Insert {
332 table,
333 columns,
334 values,
335 } => {
336 let view = TxnCatalogView::new(&*catalog, &*overlay);
337 dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
338 }
339 LogicalPlan::Update {
340 table,
341 assignments,
342 filter,
343 } => {
344 let view = TxnCatalogView::new(&*catalog, &*overlay);
345 dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
346 }
347 LogicalPlan::Delete { table, filter } => {
348 let view = TxnCatalogView::new(&*catalog, &*overlay);
349 dml::execute_delete(&mut sql_txn, &view, &table, filter)
350 }
351 LogicalPlan::Scan { .. }
352 | LogicalPlan::Filter { .. }
353 | LogicalPlan::Aggregate { .. }
354 | LogicalPlan::Sort { .. }
355 | LogicalPlan::Limit { .. } => {
356 let view = TxnCatalogView::new(&*catalog, &*overlay);
357 query::execute_query(&mut sql_txn, &view, plan)
358 }
359 };
360
361 match result {
362 Ok(value) => {
363 sql_txn.flush_hnsw()?;
364 Ok(value)
365 }
366 Err(err) => {
367 let _ = sql_txn.abandon_hnsw();
368 Err(err)
369 }
370 }
371 }
372
373 fn map_catalog_error(err: CatalogError) -> ExecutorError {
374 match err {
375 CatalogError::Kv(e) => ExecutorError::Core(e),
376 CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
377 operation: "CatalogPersistence".into(),
378 reason: e.to_string(),
379 },
380 CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
381 operation: "CatalogPersistence".into(),
382 reason,
383 },
384 }
385 }
386
387 fn execute_create_table_in_txn<'txn>(
388 &self,
389 catalog: &mut PersistentCatalog<S>,
390 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
391 overlay: &mut CatalogOverlay,
392 mut table: crate::catalog::TableMetadata,
393 with_options: Vec<(String, String)>,
394 if_not_exists: bool,
395 ) -> Result<ExecutionResult>
396 where
397 S: 'txn,
398 {
399 if catalog.table_exists_in_txn(&table.name, overlay) {
400 return if if_not_exists {
401 Ok(ExecutionResult::Success)
402 } else {
403 Err(ExecutorError::TableAlreadyExists(table.name))
404 };
405 }
406
407 table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
408
409 let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
410 let column_indices = pk_columns
411 .iter()
412 .map(|name| {
413 table
414 .get_column_index(name)
415 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
416 })
417 .collect::<Result<Vec<_>>>()?;
418 let index_id = catalog.next_index_id();
419 let index_name = ddl::create_pk_index_name(&table.name);
420 let mut index = crate::catalog::IndexMetadata::new(
421 index_id,
422 index_name,
423 table.name.clone(),
424 pk_columns,
425 )
426 .with_column_indices(column_indices)
427 .with_unique(true);
428 index.catalog_name = table.catalog_name.clone();
429 index.namespace_name = table.namespace_name.clone();
430 Some(index)
431 } else {
432 None
433 };
434
435 let table_id = catalog.next_table_id();
436 table = table.with_table_id(table_id);
437
438 txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
440 txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
441
442 catalog
444 .persist_create_table(txn.inner_mut(), &table)
445 .map_err(Self::map_catalog_error)?;
446 if let Some(index) = &pk_index {
447 catalog
448 .persist_create_index(txn.inner_mut(), index)
449 .map_err(Self::map_catalog_error)?;
450 }
451
452 overlay.add_table(TableFqn::from(&table), table);
454 if let Some(index) = pk_index {
455 overlay.add_index(IndexFqn::from(&index), index);
456 }
457
458 Ok(ExecutionResult::Success)
459 }
460
461 fn execute_drop_table_in_txn<'txn>(
462 &self,
463 catalog: &mut PersistentCatalog<S>,
464 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
465 overlay: &mut CatalogOverlay,
466 table_name: &str,
467 if_exists: bool,
468 ) -> Result<ExecutionResult>
469 where
470 S: 'txn,
471 {
472 let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
473 Some(table) => table.clone(),
474 None => {
475 return if if_exists {
476 Ok(ExecutionResult::Success)
477 } else {
478 Err(ExecutorError::TableNotFound(table_name.to_string()))
479 };
480 }
481 };
482 if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
483 return if if_exists {
484 Ok(ExecutionResult::Success)
485 } else {
486 Err(ExecutorError::TableNotFound(table_name.to_string()))
487 };
488 }
489
490 let indexes = TxnCatalogView::new(catalog, overlay)
491 .get_indexes_for_table(table_name)
492 .into_iter()
493 .cloned()
494 .collect::<Vec<_>>();
495
496 for index in &indexes {
497 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
498 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
499 } else {
500 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
501 }
502 }
503
504 txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
505 txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
506
507 catalog
508 .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
509 .map_err(Self::map_catalog_error)?;
510
511 overlay.drop_table(&TableFqn::from(&table_meta));
512
513 Ok(ExecutionResult::Success)
514 }
515
516 fn execute_create_index_in_txn<'txn>(
517 &self,
518 catalog: &mut PersistentCatalog<S>,
519 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
520 overlay: &mut CatalogOverlay,
521 mut index: crate::catalog::IndexMetadata,
522 if_not_exists: bool,
523 ) -> Result<ExecutionResult>
524 where
525 S: 'txn,
526 {
527 if ddl::is_implicit_pk_index(&index.name) {
528 return Err(ExecutorError::InvalidIndexName {
529 name: index.name.clone(),
530 reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
531 });
532 }
533
534 if catalog.index_exists_in_txn(&index.name, overlay) {
535 return if if_not_exists {
536 Ok(ExecutionResult::Success)
537 } else {
538 Err(ExecutorError::IndexAlreadyExists(index.name))
539 };
540 }
541
542 let table = catalog
543 .get_table_in_txn(&index.table, overlay)
544 .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
545 .clone();
546 index.catalog_name = table.catalog_name.clone();
547 index.namespace_name = table.namespace_name.clone();
548
549 let column_indices = index
550 .columns
551 .iter()
552 .map(|name| {
553 table
554 .get_column_index(name)
555 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
556 })
557 .collect::<Result<Vec<_>>>()?;
558
559 let index_id = catalog.next_index_id();
560 index.index_id = index_id;
561 index.column_indices = column_indices.clone();
562
563 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
564 crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
565 } else {
566 ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
567 }
568
569 catalog
570 .persist_create_index(txn.inner_mut(), &index)
571 .map_err(Self::map_catalog_error)?;
572
573 overlay.add_index(IndexFqn::from(&index), index);
574
575 Ok(ExecutionResult::Success)
576 }
577
578 fn execute_drop_index_in_txn<'txn>(
579 &self,
580 catalog: &mut PersistentCatalog<S>,
581 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
582 overlay: &mut CatalogOverlay,
583 index_name: &str,
584 if_exists: bool,
585 ) -> Result<ExecutionResult>
586 where
587 S: 'txn,
588 {
589 if ddl::is_implicit_pk_index(index_name) {
590 return Err(ExecutorError::InvalidOperation {
591 operation: "DROP INDEX".into(),
592 reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
593 });
594 }
595
596 let index = match catalog.get_index_in_txn(index_name, overlay) {
597 Some(index) => index.clone(),
598 None => {
599 return if if_exists {
600 Ok(ExecutionResult::Success)
601 } else {
602 Err(ExecutorError::IndexNotFound(index_name.to_string()))
603 };
604 }
605 };
606 if index.catalog_name != "default" || index.namespace_name != "default" {
607 return if if_exists {
608 Ok(ExecutionResult::Success)
609 } else {
610 Err(ExecutorError::IndexNotFound(index_name.to_string()))
611 };
612 }
613
614 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
615 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
616 } else {
617 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
618 }
619
620 catalog
621 .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
622 .map_err(Self::map_catalog_error)?;
623
624 overlay.drop_index(&IndexFqn::from(&index));
625
626 Ok(ExecutionResult::Success)
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 use super::*;
633 use crate::catalog::MemoryCatalog;
634 use alopex_core::kv::memory::MemoryKV;
635
636 fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
637 let store = Arc::new(MemoryKV::new());
638 let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
639 Executor::new(store, catalog)
640 }
641
642 #[test]
643 fn test_executor_creation() {
644 let _executor = create_executor();
645 }
647
648 #[test]
649 fn create_table_is_supported() {
650 let mut executor = create_executor();
651
652 use crate::catalog::{ColumnMetadata, TableMetadata};
653 use crate::planner::ResolvedType;
654
655 let table = TableMetadata::new(
656 "test",
657 vec![ColumnMetadata::new("id", ResolvedType::Integer)],
658 );
659
660 let result = executor.execute(LogicalPlan::CreateTable {
661 table,
662 if_not_exists: false,
663 with_options: vec![],
664 });
665 assert!(matches!(result, Ok(ExecutionResult::Success)));
666
667 let catalog = executor.catalog.read().unwrap();
668 assert!(catalog.table_exists("test"));
669 }
670
671 #[test]
672 fn insert_is_supported() {
673 use crate::Span;
674 use crate::catalog::{ColumnMetadata, TableMetadata};
675 use crate::planner::typed_expr::TypedExprKind;
676 use crate::planner::types::ResolvedType;
677
678 let mut executor = create_executor();
679
680 let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
681 .with_primary_key(vec!["id".into()]);
682
683 executor
684 .execute(LogicalPlan::CreateTable {
685 table,
686 if_not_exists: false,
687 with_options: vec![],
688 })
689 .unwrap();
690
691 let result = executor.execute(LogicalPlan::Insert {
692 table: "t".into(),
693 columns: vec!["id".into()],
694 values: vec![vec![crate::planner::typed_expr::TypedExpr {
695 kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
696 resolved_type: ResolvedType::Integer,
697 span: Span::default(),
698 }]],
699 });
700 assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
701 }
702}