1pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
50pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
51
52use std::sync::{Arc, RwLock};
53
54use alopex_core::kv::KVStore;
55use alopex_core::types::TxnMode;
56
57use crate::catalog::Catalog;
58use crate::catalog::persistent::{IndexFqn, TableFqn};
59use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
60use crate::planner::LogicalPlan;
61use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
62
63pub struct Executor<S: KVStore, C: Catalog> {
73 bridge: TxnBridge<S>,
75
76 catalog: Arc<RwLock<C>>,
78}
79
80impl<S: KVStore, C: Catalog> Executor<S, C> {
81 fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
82 where
83 F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
84 {
85 let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
86 match f(&mut txn) {
87 Ok(result) => {
88 txn.commit().map_err(ExecutorError::from)?;
89 Ok(result)
90 }
91 Err(err) => {
92 txn.rollback().map_err(ExecutorError::from)?;
93 Err(err)
94 }
95 }
96 }
97
98 pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
105 Self {
106 bridge: TxnBridge::new(store),
107 catalog,
108 }
109 }
110
111 pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
138 match plan {
139 LogicalPlan::CreateTable {
141 table,
142 if_not_exists,
143 with_options,
144 } => self.execute_create_table(table, with_options, if_not_exists),
145 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
146 LogicalPlan::CreateIndex {
147 index,
148 if_not_exists,
149 } => self.execute_create_index(index, if_not_exists),
150 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
151
152 LogicalPlan::Insert {
154 table,
155 columns,
156 values,
157 } => self.execute_insert(&table, columns, values),
158 LogicalPlan::Update {
159 table,
160 assignments,
161 filter,
162 } => self.execute_update(&table, assignments, filter),
163 LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
164
165 LogicalPlan::Scan { .. }
167 | LogicalPlan::Filter { .. }
168 | LogicalPlan::Sort { .. }
169 | LogicalPlan::Limit { .. } => self.execute_query(plan),
170 }
171 }
172
173 fn execute_create_table(
178 &mut self,
179 table: crate::catalog::TableMetadata,
180 with_options: Vec<(String, String)>,
181 if_not_exists: bool,
182 ) -> Result<ExecutionResult> {
183 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
184 self.run_in_write_txn(|txn| {
185 ddl::create_table::execute_create_table(
186 txn,
187 &mut *catalog,
188 table,
189 with_options,
190 if_not_exists,
191 )
192 })
193 }
194
195 fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
196 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
197 self.run_in_write_txn(|txn| {
198 ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
199 })
200 }
201
202 fn execute_create_index(
203 &mut self,
204 index: crate::catalog::IndexMetadata,
205 if_not_exists: bool,
206 ) -> Result<ExecutionResult> {
207 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
208 self.run_in_write_txn(|txn| {
209 ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
210 })
211 }
212
213 fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
214 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
215 self.run_in_write_txn(|txn| {
216 ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
217 })
218 }
219
220 fn execute_insert(
225 &mut self,
226 table: &str,
227 columns: Vec<String>,
228 values: Vec<Vec<crate::planner::TypedExpr>>,
229 ) -> Result<ExecutionResult> {
230 let catalog = self.catalog.read().expect("catalog lock poisoned");
231 self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
232 }
233
234 fn execute_update(
235 &mut self,
236 table: &str,
237 assignments: Vec<crate::planner::TypedAssignment>,
238 filter: Option<crate::planner::TypedExpr>,
239 ) -> Result<ExecutionResult> {
240 let catalog = self.catalog.read().expect("catalog lock poisoned");
241 self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
242 }
243
244 fn execute_delete(
245 &mut self,
246 table: &str,
247 filter: Option<crate::planner::TypedExpr>,
248 ) -> Result<ExecutionResult> {
249 let catalog = self.catalog.read().expect("catalog lock poisoned");
250 self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
251 }
252
253 fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
258 let catalog = self.catalog.read().expect("catalog lock poisoned");
259 self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
260 }
261}
262
263impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
264 pub fn execute_in_txn<'a, 'b, 'c>(
265 &mut self,
266 plan: LogicalPlan,
267 txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
268 ) -> Result<ExecutionResult> {
269 if txn.mode() == TxnMode::ReadOnly
270 && !matches!(
271 plan,
272 LogicalPlan::Scan { .. }
273 | LogicalPlan::Filter { .. }
274 | LogicalPlan::Sort { .. }
275 | LogicalPlan::Limit { .. }
276 )
277 {
278 return Err(ExecutorError::ReadOnlyTransaction {
279 operation: plan.operation_name().to_string(),
280 });
281 }
282
283 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
284 let (mut sql_txn, overlay) = txn.split_parts();
285
286 let result = match plan {
287 LogicalPlan::CreateTable {
288 table,
289 if_not_exists,
290 with_options,
291 } => self.execute_create_table_in_txn(
292 &mut *catalog,
293 &mut sql_txn,
294 overlay,
295 table,
296 with_options,
297 if_not_exists,
298 ),
299 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
300 &mut *catalog,
301 &mut sql_txn,
302 overlay,
303 &name,
304 if_exists,
305 ),
306 LogicalPlan::CreateIndex {
307 index,
308 if_not_exists,
309 } => self.execute_create_index_in_txn(
310 &mut *catalog,
311 &mut sql_txn,
312 overlay,
313 index,
314 if_not_exists,
315 ),
316 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
317 &mut *catalog,
318 &mut sql_txn,
319 overlay,
320 &name,
321 if_exists,
322 ),
323 LogicalPlan::Insert {
324 table,
325 columns,
326 values,
327 } => {
328 let view = TxnCatalogView::new(&*catalog, &*overlay);
329 dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
330 }
331 LogicalPlan::Update {
332 table,
333 assignments,
334 filter,
335 } => {
336 let view = TxnCatalogView::new(&*catalog, &*overlay);
337 dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
338 }
339 LogicalPlan::Delete { table, filter } => {
340 let view = TxnCatalogView::new(&*catalog, &*overlay);
341 dml::execute_delete(&mut sql_txn, &view, &table, filter)
342 }
343 LogicalPlan::Scan { .. }
344 | LogicalPlan::Filter { .. }
345 | LogicalPlan::Sort { .. }
346 | LogicalPlan::Limit { .. } => {
347 let view = TxnCatalogView::new(&*catalog, &*overlay);
348 query::execute_query(&mut sql_txn, &view, plan)
349 }
350 };
351
352 match result {
353 Ok(value) => {
354 sql_txn.flush_hnsw()?;
355 Ok(value)
356 }
357 Err(err) => {
358 let _ = sql_txn.abandon_hnsw();
359 Err(err)
360 }
361 }
362 }
363
364 fn map_catalog_error(err: CatalogError) -> ExecutorError {
365 match err {
366 CatalogError::Kv(e) => ExecutorError::Core(e),
367 CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
368 operation: "CatalogPersistence".into(),
369 reason: e.to_string(),
370 },
371 CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
372 operation: "CatalogPersistence".into(),
373 reason,
374 },
375 }
376 }
377
378 fn execute_create_table_in_txn<'txn>(
379 &self,
380 catalog: &mut PersistentCatalog<S>,
381 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
382 overlay: &mut CatalogOverlay,
383 mut table: crate::catalog::TableMetadata,
384 with_options: Vec<(String, String)>,
385 if_not_exists: bool,
386 ) -> Result<ExecutionResult>
387 where
388 S: 'txn,
389 {
390 if catalog.table_exists_in_txn(&table.name, overlay) {
391 return if if_not_exists {
392 Ok(ExecutionResult::Success)
393 } else {
394 Err(ExecutorError::TableAlreadyExists(table.name))
395 };
396 }
397
398 table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
399
400 let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
401 let column_indices = pk_columns
402 .iter()
403 .map(|name| {
404 table
405 .get_column_index(name)
406 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
407 })
408 .collect::<Result<Vec<_>>>()?;
409 let index_id = catalog.next_index_id();
410 let index_name = ddl::create_pk_index_name(&table.name);
411 let mut index = crate::catalog::IndexMetadata::new(
412 index_id,
413 index_name,
414 table.name.clone(),
415 pk_columns,
416 )
417 .with_column_indices(column_indices)
418 .with_unique(true);
419 index.catalog_name = table.catalog_name.clone();
420 index.namespace_name = table.namespace_name.clone();
421 Some(index)
422 } else {
423 None
424 };
425
426 let table_id = catalog.next_table_id();
427 table = table.with_table_id(table_id);
428
429 txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
431 txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
432
433 catalog
435 .persist_create_table(txn.inner_mut(), &table)
436 .map_err(Self::map_catalog_error)?;
437 if let Some(index) = &pk_index {
438 catalog
439 .persist_create_index(txn.inner_mut(), index)
440 .map_err(Self::map_catalog_error)?;
441 }
442
443 overlay.add_table(TableFqn::from(&table), table);
445 if let Some(index) = pk_index {
446 overlay.add_index(IndexFqn::from(&index), index);
447 }
448
449 Ok(ExecutionResult::Success)
450 }
451
452 fn execute_drop_table_in_txn<'txn>(
453 &self,
454 catalog: &mut PersistentCatalog<S>,
455 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
456 overlay: &mut CatalogOverlay,
457 table_name: &str,
458 if_exists: bool,
459 ) -> Result<ExecutionResult>
460 where
461 S: 'txn,
462 {
463 let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
464 Some(table) => table.clone(),
465 None => {
466 return if if_exists {
467 Ok(ExecutionResult::Success)
468 } else {
469 Err(ExecutorError::TableNotFound(table_name.to_string()))
470 };
471 }
472 };
473 if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
474 return if if_exists {
475 Ok(ExecutionResult::Success)
476 } else {
477 Err(ExecutorError::TableNotFound(table_name.to_string()))
478 };
479 }
480
481 let indexes = TxnCatalogView::new(catalog, overlay)
482 .get_indexes_for_table(table_name)
483 .into_iter()
484 .cloned()
485 .collect::<Vec<_>>();
486
487 for index in &indexes {
488 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
489 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
490 } else {
491 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
492 }
493 }
494
495 txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
496 txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
497
498 catalog
499 .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
500 .map_err(Self::map_catalog_error)?;
501
502 overlay.drop_table(&TableFqn::from(&table_meta));
503
504 Ok(ExecutionResult::Success)
505 }
506
507 fn execute_create_index_in_txn<'txn>(
508 &self,
509 catalog: &mut PersistentCatalog<S>,
510 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
511 overlay: &mut CatalogOverlay,
512 mut index: crate::catalog::IndexMetadata,
513 if_not_exists: bool,
514 ) -> Result<ExecutionResult>
515 where
516 S: 'txn,
517 {
518 if ddl::is_implicit_pk_index(&index.name) {
519 return Err(ExecutorError::InvalidIndexName {
520 name: index.name.clone(),
521 reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
522 });
523 }
524
525 if catalog.index_exists_in_txn(&index.name, overlay) {
526 return if if_not_exists {
527 Ok(ExecutionResult::Success)
528 } else {
529 Err(ExecutorError::IndexAlreadyExists(index.name))
530 };
531 }
532
533 let table = catalog
534 .get_table_in_txn(&index.table, overlay)
535 .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
536 .clone();
537 index.catalog_name = table.catalog_name.clone();
538 index.namespace_name = table.namespace_name.clone();
539
540 let column_indices = index
541 .columns
542 .iter()
543 .map(|name| {
544 table
545 .get_column_index(name)
546 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
547 })
548 .collect::<Result<Vec<_>>>()?;
549
550 let index_id = catalog.next_index_id();
551 index.index_id = index_id;
552 index.column_indices = column_indices.clone();
553
554 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
555 crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
556 } else {
557 ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
558 }
559
560 catalog
561 .persist_create_index(txn.inner_mut(), &index)
562 .map_err(Self::map_catalog_error)?;
563
564 overlay.add_index(IndexFqn::from(&index), index);
565
566 Ok(ExecutionResult::Success)
567 }
568
569 fn execute_drop_index_in_txn<'txn>(
570 &self,
571 catalog: &mut PersistentCatalog<S>,
572 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
573 overlay: &mut CatalogOverlay,
574 index_name: &str,
575 if_exists: bool,
576 ) -> Result<ExecutionResult>
577 where
578 S: 'txn,
579 {
580 if ddl::is_implicit_pk_index(index_name) {
581 return Err(ExecutorError::InvalidOperation {
582 operation: "DROP INDEX".into(),
583 reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
584 });
585 }
586
587 let index = match catalog.get_index_in_txn(index_name, overlay) {
588 Some(index) => index.clone(),
589 None => {
590 return if if_exists {
591 Ok(ExecutionResult::Success)
592 } else {
593 Err(ExecutorError::IndexNotFound(index_name.to_string()))
594 };
595 }
596 };
597 if index.catalog_name != "default" || index.namespace_name != "default" {
598 return if if_exists {
599 Ok(ExecutionResult::Success)
600 } else {
601 Err(ExecutorError::IndexNotFound(index_name.to_string()))
602 };
603 }
604
605 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
606 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
607 } else {
608 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
609 }
610
611 catalog
612 .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
613 .map_err(Self::map_catalog_error)?;
614
615 overlay.drop_index(&IndexFqn::from(&index));
616
617 Ok(ExecutionResult::Success)
618 }
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624 use crate::catalog::MemoryCatalog;
625 use alopex_core::kv::memory::MemoryKV;
626
627 fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
628 let store = Arc::new(MemoryKV::new());
629 let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
630 Executor::new(store, catalog)
631 }
632
633 #[test]
634 fn test_executor_creation() {
635 let _executor = create_executor();
636 }
638
639 #[test]
640 fn create_table_is_supported() {
641 let mut executor = create_executor();
642
643 use crate::catalog::{ColumnMetadata, TableMetadata};
644 use crate::planner::ResolvedType;
645
646 let table = TableMetadata::new(
647 "test",
648 vec![ColumnMetadata::new("id", ResolvedType::Integer)],
649 );
650
651 let result = executor.execute(LogicalPlan::CreateTable {
652 table,
653 if_not_exists: false,
654 with_options: vec![],
655 });
656 assert!(matches!(result, Ok(ExecutionResult::Success)));
657
658 let catalog = executor.catalog.read().unwrap();
659 assert!(catalog.table_exists("test"));
660 }
661
662 #[test]
663 fn insert_is_supported() {
664 use crate::Span;
665 use crate::catalog::{ColumnMetadata, TableMetadata};
666 use crate::planner::typed_expr::TypedExprKind;
667 use crate::planner::types::ResolvedType;
668
669 let mut executor = create_executor();
670
671 let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
672 .with_primary_key(vec!["id".into()]);
673
674 executor
675 .execute(LogicalPlan::CreateTable {
676 table,
677 if_not_exists: false,
678 with_options: vec![],
679 })
680 .unwrap();
681
682 let result = executor.execute(LogicalPlan::Insert {
683 table: "t".into(),
684 columns: vec!["id".into()],
685 values: vec![vec![crate::planner::typed_expr::TypedExpr {
686 kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
687 resolved_type: ResolvedType::Integer,
688 span: Span::default(),
689 }]],
690 });
691 assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
692 }
693}