1#[cfg(feature = "tokio")]
40pub mod async_executor;
41pub mod bulk;
42pub(crate) mod ddl;
43pub(crate) mod dml;
44mod error;
45pub mod evaluator;
46mod hnsw_bridge;
47pub mod query;
48mod result;
49
50#[cfg(feature = "tokio")]
51pub use async_executor::AsyncExecutor;
52pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
53pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
54pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
55
56use std::sync::{Arc, RwLock};
57
58use alopex_core::kv::KVStore;
59use alopex_core::types::TxnMode;
60
61use crate::catalog::Catalog;
62use crate::catalog::persistent::{IndexFqn, TableFqn};
63use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
64use crate::planner::LogicalPlan;
65use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
66
67pub struct Executor<S: KVStore, C: Catalog> {
77 bridge: TxnBridge<S>,
79
80 catalog: Arc<RwLock<C>>,
82}
83
84impl<S: KVStore, C: Catalog> Executor<S, C> {
85 fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
86 where
87 F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
88 {
89 let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
90 match f(&mut txn) {
91 Ok(result) => {
92 txn.commit().map_err(ExecutorError::from)?;
93 Ok(result)
94 }
95 Err(err) => {
96 txn.rollback().map_err(ExecutorError::from)?;
97 Err(err)
98 }
99 }
100 }
101
102 pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
109 Self {
110 bridge: TxnBridge::new(store),
111 catalog,
112 }
113 }
114
115 pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
142 match plan {
143 LogicalPlan::CreateTable {
145 table,
146 if_not_exists,
147 with_options,
148 } => self.execute_create_table(table, with_options, if_not_exists),
149 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
150 LogicalPlan::CreateIndex {
151 index,
152 if_not_exists,
153 } => self.execute_create_index(index, if_not_exists),
154 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
155
156 LogicalPlan::Insert {
158 table,
159 columns,
160 values,
161 } => self.execute_insert(&table, columns, values),
162 LogicalPlan::Update {
163 table,
164 assignments,
165 filter,
166 } => self.execute_update(&table, assignments, filter),
167 LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
168
169 LogicalPlan::Scan { .. }
171 | LogicalPlan::Filter { .. }
172 | LogicalPlan::Aggregate { .. }
173 | LogicalPlan::Sort { .. }
174 | LogicalPlan::Limit { .. } => self.execute_query(plan),
175 }
176 }
177
178 fn execute_create_table(
183 &mut self,
184 table: crate::catalog::TableMetadata,
185 with_options: Vec<(String, String)>,
186 if_not_exists: bool,
187 ) -> Result<ExecutionResult> {
188 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
189 self.run_in_write_txn(|txn| {
190 ddl::create_table::execute_create_table(
191 txn,
192 &mut *catalog,
193 table,
194 with_options,
195 if_not_exists,
196 )
197 })
198 }
199
200 fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
201 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
202 self.run_in_write_txn(|txn| {
203 ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
204 })
205 }
206
207 fn execute_create_index(
208 &mut self,
209 index: crate::catalog::IndexMetadata,
210 if_not_exists: bool,
211 ) -> Result<ExecutionResult> {
212 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
213 self.run_in_write_txn(|txn| {
214 ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
215 })
216 }
217
218 fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
219 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
220 self.run_in_write_txn(|txn| {
221 ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
222 })
223 }
224
225 fn execute_insert(
230 &mut self,
231 table: &str,
232 columns: Vec<String>,
233 values: Vec<Vec<crate::planner::TypedExpr>>,
234 ) -> Result<ExecutionResult> {
235 let catalog = self.catalog.read().expect("catalog lock poisoned");
236 self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
237 }
238
239 fn execute_update(
240 &mut self,
241 table: &str,
242 assignments: Vec<crate::planner::TypedAssignment>,
243 filter: Option<crate::planner::TypedExpr>,
244 ) -> Result<ExecutionResult> {
245 let catalog = self.catalog.read().expect("catalog lock poisoned");
246 self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
247 }
248
249 fn execute_delete(
250 &mut self,
251 table: &str,
252 filter: Option<crate::planner::TypedExpr>,
253 ) -> Result<ExecutionResult> {
254 let catalog = self.catalog.read().expect("catalog lock poisoned");
255 self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
256 }
257
258 fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
263 let catalog = self.catalog.read().expect("catalog lock poisoned");
264 self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
265 }
266}
267
268impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
269 pub fn execute_in_txn<'a, 'b, 'c>(
270 &mut self,
271 plan: LogicalPlan,
272 txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
273 ) -> Result<ExecutionResult> {
274 if txn.mode() == TxnMode::ReadOnly
275 && !matches!(
276 plan,
277 LogicalPlan::Scan { .. }
278 | LogicalPlan::Filter { .. }
279 | LogicalPlan::Aggregate { .. }
280 | LogicalPlan::Sort { .. }
281 | LogicalPlan::Limit { .. }
282 )
283 {
284 return Err(ExecutorError::ReadOnlyTransaction {
285 operation: plan.operation_name().to_string(),
286 });
287 }
288
289 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
290 let (mut sql_txn, overlay) = txn.split_parts();
291
292 let result = match plan {
293 LogicalPlan::CreateTable {
294 table,
295 if_not_exists,
296 with_options,
297 } => self.execute_create_table_in_txn(
298 &mut *catalog,
299 &mut sql_txn,
300 overlay,
301 table,
302 with_options,
303 if_not_exists,
304 ),
305 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
306 &mut *catalog,
307 &mut sql_txn,
308 overlay,
309 &name,
310 if_exists,
311 ),
312 LogicalPlan::CreateIndex {
313 index,
314 if_not_exists,
315 } => self.execute_create_index_in_txn(
316 &mut *catalog,
317 &mut sql_txn,
318 overlay,
319 index,
320 if_not_exists,
321 ),
322 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
323 &mut *catalog,
324 &mut sql_txn,
325 overlay,
326 &name,
327 if_exists,
328 ),
329 LogicalPlan::Insert {
330 table,
331 columns,
332 values,
333 } => {
334 let view = TxnCatalogView::new(&*catalog, &*overlay);
335 dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
336 }
337 LogicalPlan::Update {
338 table,
339 assignments,
340 filter,
341 } => {
342 let view = TxnCatalogView::new(&*catalog, &*overlay);
343 dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
344 }
345 LogicalPlan::Delete { table, filter } => {
346 let view = TxnCatalogView::new(&*catalog, &*overlay);
347 dml::execute_delete(&mut sql_txn, &view, &table, filter)
348 }
349 LogicalPlan::Scan { .. }
350 | LogicalPlan::Filter { .. }
351 | LogicalPlan::Aggregate { .. }
352 | LogicalPlan::Sort { .. }
353 | LogicalPlan::Limit { .. } => {
354 let view = TxnCatalogView::new(&*catalog, &*overlay);
355 query::execute_query(&mut sql_txn, &view, plan)
356 }
357 };
358
359 match result {
360 Ok(value) => {
361 sql_txn.flush_hnsw()?;
362 Ok(value)
363 }
364 Err(err) => {
365 let _ = sql_txn.abandon_hnsw();
366 Err(err)
367 }
368 }
369 }
370
371 fn map_catalog_error(err: CatalogError) -> ExecutorError {
372 match err {
373 CatalogError::Kv(e) => ExecutorError::Core(e),
374 CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
375 operation: "CatalogPersistence".into(),
376 reason: e.to_string(),
377 },
378 CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
379 operation: "CatalogPersistence".into(),
380 reason,
381 },
382 }
383 }
384
385 fn execute_create_table_in_txn<'txn>(
386 &self,
387 catalog: &mut PersistentCatalog<S>,
388 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
389 overlay: &mut CatalogOverlay,
390 mut table: crate::catalog::TableMetadata,
391 with_options: Vec<(String, String)>,
392 if_not_exists: bool,
393 ) -> Result<ExecutionResult>
394 where
395 S: 'txn,
396 {
397 if catalog.table_exists_in_txn(&table.name, overlay) {
398 return if if_not_exists {
399 Ok(ExecutionResult::Success)
400 } else {
401 Err(ExecutorError::TableAlreadyExists(table.name))
402 };
403 }
404
405 table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
406
407 let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
408 let column_indices = pk_columns
409 .iter()
410 .map(|name| {
411 table
412 .get_column_index(name)
413 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
414 })
415 .collect::<Result<Vec<_>>>()?;
416 let index_id = catalog.next_index_id();
417 let index_name = ddl::create_pk_index_name(&table.name);
418 let mut index = crate::catalog::IndexMetadata::new(
419 index_id,
420 index_name,
421 table.name.clone(),
422 pk_columns,
423 )
424 .with_column_indices(column_indices)
425 .with_unique(true);
426 index.catalog_name = table.catalog_name.clone();
427 index.namespace_name = table.namespace_name.clone();
428 Some(index)
429 } else {
430 None
431 };
432
433 let table_id = catalog.next_table_id();
434 table = table.with_table_id(table_id);
435
436 txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
438 txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
439
440 catalog
442 .persist_create_table(txn.inner_mut(), &table)
443 .map_err(Self::map_catalog_error)?;
444 if let Some(index) = &pk_index {
445 catalog
446 .persist_create_index(txn.inner_mut(), index)
447 .map_err(Self::map_catalog_error)?;
448 }
449
450 overlay.add_table(TableFqn::from(&table), table);
452 if let Some(index) = pk_index {
453 overlay.add_index(IndexFqn::from(&index), index);
454 }
455
456 Ok(ExecutionResult::Success)
457 }
458
459 fn execute_drop_table_in_txn<'txn>(
460 &self,
461 catalog: &mut PersistentCatalog<S>,
462 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
463 overlay: &mut CatalogOverlay,
464 table_name: &str,
465 if_exists: bool,
466 ) -> Result<ExecutionResult>
467 where
468 S: 'txn,
469 {
470 let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
471 Some(table) => table.clone(),
472 None => {
473 return if if_exists {
474 Ok(ExecutionResult::Success)
475 } else {
476 Err(ExecutorError::TableNotFound(table_name.to_string()))
477 };
478 }
479 };
480 if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
481 return if if_exists {
482 Ok(ExecutionResult::Success)
483 } else {
484 Err(ExecutorError::TableNotFound(table_name.to_string()))
485 };
486 }
487
488 let indexes = TxnCatalogView::new(catalog, overlay)
489 .get_indexes_for_table(table_name)
490 .into_iter()
491 .cloned()
492 .collect::<Vec<_>>();
493
494 for index in &indexes {
495 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
496 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
497 } else {
498 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
499 }
500 }
501
502 txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
503 txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
504
505 catalog
506 .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
507 .map_err(Self::map_catalog_error)?;
508
509 overlay.drop_table(&TableFqn::from(&table_meta));
510
511 Ok(ExecutionResult::Success)
512 }
513
514 fn execute_create_index_in_txn<'txn>(
515 &self,
516 catalog: &mut PersistentCatalog<S>,
517 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
518 overlay: &mut CatalogOverlay,
519 mut index: crate::catalog::IndexMetadata,
520 if_not_exists: bool,
521 ) -> Result<ExecutionResult>
522 where
523 S: 'txn,
524 {
525 if ddl::is_implicit_pk_index(&index.name) {
526 return Err(ExecutorError::InvalidIndexName {
527 name: index.name.clone(),
528 reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
529 });
530 }
531
532 if catalog.index_exists_in_txn(&index.name, overlay) {
533 return if if_not_exists {
534 Ok(ExecutionResult::Success)
535 } else {
536 Err(ExecutorError::IndexAlreadyExists(index.name))
537 };
538 }
539
540 let table = catalog
541 .get_table_in_txn(&index.table, overlay)
542 .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
543 .clone();
544 index.catalog_name = table.catalog_name.clone();
545 index.namespace_name = table.namespace_name.clone();
546
547 let column_indices = index
548 .columns
549 .iter()
550 .map(|name| {
551 table
552 .get_column_index(name)
553 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
554 })
555 .collect::<Result<Vec<_>>>()?;
556
557 let index_id = catalog.next_index_id();
558 index.index_id = index_id;
559 index.column_indices = column_indices.clone();
560
561 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
562 crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
563 } else {
564 ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
565 }
566
567 catalog
568 .persist_create_index(txn.inner_mut(), &index)
569 .map_err(Self::map_catalog_error)?;
570
571 overlay.add_index(IndexFqn::from(&index), index);
572
573 Ok(ExecutionResult::Success)
574 }
575
576 fn execute_drop_index_in_txn<'txn>(
577 &self,
578 catalog: &mut PersistentCatalog<S>,
579 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
580 overlay: &mut CatalogOverlay,
581 index_name: &str,
582 if_exists: bool,
583 ) -> Result<ExecutionResult>
584 where
585 S: 'txn,
586 {
587 if ddl::is_implicit_pk_index(index_name) {
588 return Err(ExecutorError::InvalidOperation {
589 operation: "DROP INDEX".into(),
590 reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
591 });
592 }
593
594 let index = match catalog.get_index_in_txn(index_name, overlay) {
595 Some(index) => index.clone(),
596 None => {
597 return if if_exists {
598 Ok(ExecutionResult::Success)
599 } else {
600 Err(ExecutorError::IndexNotFound(index_name.to_string()))
601 };
602 }
603 };
604 if index.catalog_name != "default" || index.namespace_name != "default" {
605 return if if_exists {
606 Ok(ExecutionResult::Success)
607 } else {
608 Err(ExecutorError::IndexNotFound(index_name.to_string()))
609 };
610 }
611
612 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
613 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
614 } else {
615 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
616 }
617
618 catalog
619 .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
620 .map_err(Self::map_catalog_error)?;
621
622 overlay.drop_index(&IndexFqn::from(&index));
623
624 Ok(ExecutionResult::Success)
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::*;
631 use crate::catalog::MemoryCatalog;
632 use alopex_core::kv::memory::MemoryKV;
633
634 fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
635 let store = Arc::new(MemoryKV::new());
636 let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
637 Executor::new(store, catalog)
638 }
639
640 #[test]
641 fn test_executor_creation() {
642 let _executor = create_executor();
643 }
645
646 #[test]
647 fn create_table_is_supported() {
648 let mut executor = create_executor();
649
650 use crate::catalog::{ColumnMetadata, TableMetadata};
651 use crate::planner::ResolvedType;
652
653 let table = TableMetadata::new(
654 "test",
655 vec![ColumnMetadata::new("id", ResolvedType::Integer)],
656 );
657
658 let result = executor.execute(LogicalPlan::CreateTable {
659 table,
660 if_not_exists: false,
661 with_options: vec![],
662 });
663 assert!(matches!(result, Ok(ExecutionResult::Success)));
664
665 let catalog = executor.catalog.read().unwrap();
666 assert!(catalog.table_exists("test"));
667 }
668
669 #[test]
670 fn insert_is_supported() {
671 use crate::Span;
672 use crate::catalog::{ColumnMetadata, TableMetadata};
673 use crate::planner::typed_expr::TypedExprKind;
674 use crate::planner::types::ResolvedType;
675
676 let mut executor = create_executor();
677
678 let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
679 .with_primary_key(vec!["id".into()]);
680
681 executor
682 .execute(LogicalPlan::CreateTable {
683 table,
684 if_not_exists: false,
685 with_options: vec![],
686 })
687 .unwrap();
688
689 let result = executor.execute(LogicalPlan::Insert {
690 table: "t".into(),
691 columns: vec!["id".into()],
692 values: vec![vec![crate::planner::typed_expr::TypedExpr {
693 kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
694 resolved_type: ResolvedType::Integer,
695 span: Span::default(),
696 }]],
697 });
698 assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
699 }
700}