1pub mod bulk;
40mod ddl;
41mod dml;
42mod error;
43pub mod evaluator;
44mod hnsw_bridge;
45pub mod query;
46mod result;
47
48pub use error::{ConstraintViolation, EvaluationError, ExecutorError, Result};
49pub use query::{RowIterator, ScanIterator, build_streaming_pipeline};
50pub use result::{ColumnInfo, ExecutionResult, QueryResult, QueryRowIterator, Row};
51
52use std::sync::{Arc, RwLock};
53
54use alopex_core::kv::KVStore;
55use alopex_core::types::TxnMode;
56
57use crate::catalog::Catalog;
58use crate::catalog::persistent::{IndexFqn, TableFqn};
59use crate::catalog::{CatalogError, CatalogOverlay, PersistentCatalog, TxnCatalogView};
60use crate::planner::LogicalPlan;
61use crate::storage::{BorrowedSqlTransaction, KeyEncoder, SqlTransaction, SqlTxn as _, TxnBridge};
62
63pub struct Executor<S: KVStore, C: Catalog> {
73 bridge: TxnBridge<S>,
75
76 catalog: Arc<RwLock<C>>,
78}
79
80impl<S: KVStore, C: Catalog> Executor<S, C> {
81 fn run_in_write_txn<R, F>(&self, f: F) -> Result<R>
82 where
83 F: FnOnce(&mut SqlTransaction<'_, S>) -> Result<R>,
84 {
85 let mut txn = self.bridge.begin_write().map_err(ExecutorError::from)?;
86 match f(&mut txn) {
87 Ok(result) => {
88 txn.commit().map_err(ExecutorError::from)?;
89 Ok(result)
90 }
91 Err(err) => {
92 txn.rollback().map_err(ExecutorError::from)?;
93 Err(err)
94 }
95 }
96 }
97
98 pub fn new(store: Arc<S>, catalog: Arc<RwLock<C>>) -> Self {
105 Self {
106 bridge: TxnBridge::new(store),
107 catalog,
108 }
109 }
110
111 pub fn execute(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
138 match plan {
139 LogicalPlan::CreateTable {
141 table,
142 if_not_exists,
143 with_options,
144 } => self.execute_create_table(table, with_options, if_not_exists),
145 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table(&name, if_exists),
146 LogicalPlan::CreateIndex {
147 index,
148 if_not_exists,
149 } => self.execute_create_index(index, if_not_exists),
150 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index(&name, if_exists),
151
152 LogicalPlan::Insert {
154 table,
155 columns,
156 values,
157 } => self.execute_insert(&table, columns, values),
158 LogicalPlan::Update {
159 table,
160 assignments,
161 filter,
162 } => self.execute_update(&table, assignments, filter),
163 LogicalPlan::Delete { table, filter } => self.execute_delete(&table, filter),
164
165 LogicalPlan::Scan { .. }
167 | LogicalPlan::Filter { .. }
168 | LogicalPlan::Aggregate { .. }
169 | LogicalPlan::Sort { .. }
170 | LogicalPlan::Limit { .. } => self.execute_query(plan),
171 }
172 }
173
174 fn execute_create_table(
179 &mut self,
180 table: crate::catalog::TableMetadata,
181 with_options: Vec<(String, String)>,
182 if_not_exists: bool,
183 ) -> Result<ExecutionResult> {
184 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
185 self.run_in_write_txn(|txn| {
186 ddl::create_table::execute_create_table(
187 txn,
188 &mut *catalog,
189 table,
190 with_options,
191 if_not_exists,
192 )
193 })
194 }
195
196 fn execute_drop_table(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
197 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
198 self.run_in_write_txn(|txn| {
199 ddl::drop_table::execute_drop_table(txn, &mut *catalog, name, if_exists)
200 })
201 }
202
203 fn execute_create_index(
204 &mut self,
205 index: crate::catalog::IndexMetadata,
206 if_not_exists: bool,
207 ) -> Result<ExecutionResult> {
208 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
209 self.run_in_write_txn(|txn| {
210 ddl::create_index::execute_create_index(txn, &mut *catalog, index, if_not_exists)
211 })
212 }
213
214 fn execute_drop_index(&mut self, name: &str, if_exists: bool) -> Result<ExecutionResult> {
215 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
216 self.run_in_write_txn(|txn| {
217 ddl::drop_index::execute_drop_index(txn, &mut *catalog, name, if_exists)
218 })
219 }
220
221 fn execute_insert(
226 &mut self,
227 table: &str,
228 columns: Vec<String>,
229 values: Vec<Vec<crate::planner::TypedExpr>>,
230 ) -> Result<ExecutionResult> {
231 let catalog = self.catalog.read().expect("catalog lock poisoned");
232 self.run_in_write_txn(|txn| dml::execute_insert(txn, &*catalog, table, columns, values))
233 }
234
235 fn execute_update(
236 &mut self,
237 table: &str,
238 assignments: Vec<crate::planner::TypedAssignment>,
239 filter: Option<crate::planner::TypedExpr>,
240 ) -> Result<ExecutionResult> {
241 let catalog = self.catalog.read().expect("catalog lock poisoned");
242 self.run_in_write_txn(|txn| dml::execute_update(txn, &*catalog, table, assignments, filter))
243 }
244
245 fn execute_delete(
246 &mut self,
247 table: &str,
248 filter: Option<crate::planner::TypedExpr>,
249 ) -> Result<ExecutionResult> {
250 let catalog = self.catalog.read().expect("catalog lock poisoned");
251 self.run_in_write_txn(|txn| dml::execute_delete(txn, &*catalog, table, filter))
252 }
253
254 fn execute_query(&mut self, plan: LogicalPlan) -> Result<ExecutionResult> {
259 let catalog = self.catalog.read().expect("catalog lock poisoned");
260 self.run_in_write_txn(|txn| query::execute_query(txn, &*catalog, plan))
261 }
262}
263
264impl<S: KVStore> Executor<S, PersistentCatalog<S>> {
265 pub fn execute_in_txn<'a, 'b, 'c>(
266 &mut self,
267 plan: LogicalPlan,
268 txn: &mut BorrowedSqlTransaction<'a, 'b, 'c, S>,
269 ) -> Result<ExecutionResult> {
270 if txn.mode() == TxnMode::ReadOnly
271 && !matches!(
272 plan,
273 LogicalPlan::Scan { .. }
274 | LogicalPlan::Filter { .. }
275 | LogicalPlan::Aggregate { .. }
276 | LogicalPlan::Sort { .. }
277 | LogicalPlan::Limit { .. }
278 )
279 {
280 return Err(ExecutorError::ReadOnlyTransaction {
281 operation: plan.operation_name().to_string(),
282 });
283 }
284
285 let mut catalog = self.catalog.write().expect("catalog lock poisoned");
286 let (mut sql_txn, overlay) = txn.split_parts();
287
288 let result = match plan {
289 LogicalPlan::CreateTable {
290 table,
291 if_not_exists,
292 with_options,
293 } => self.execute_create_table_in_txn(
294 &mut *catalog,
295 &mut sql_txn,
296 overlay,
297 table,
298 with_options,
299 if_not_exists,
300 ),
301 LogicalPlan::DropTable { name, if_exists } => self.execute_drop_table_in_txn(
302 &mut *catalog,
303 &mut sql_txn,
304 overlay,
305 &name,
306 if_exists,
307 ),
308 LogicalPlan::CreateIndex {
309 index,
310 if_not_exists,
311 } => self.execute_create_index_in_txn(
312 &mut *catalog,
313 &mut sql_txn,
314 overlay,
315 index,
316 if_not_exists,
317 ),
318 LogicalPlan::DropIndex { name, if_exists } => self.execute_drop_index_in_txn(
319 &mut *catalog,
320 &mut sql_txn,
321 overlay,
322 &name,
323 if_exists,
324 ),
325 LogicalPlan::Insert {
326 table,
327 columns,
328 values,
329 } => {
330 let view = TxnCatalogView::new(&*catalog, &*overlay);
331 dml::execute_insert(&mut sql_txn, &view, &table, columns, values)
332 }
333 LogicalPlan::Update {
334 table,
335 assignments,
336 filter,
337 } => {
338 let view = TxnCatalogView::new(&*catalog, &*overlay);
339 dml::execute_update(&mut sql_txn, &view, &table, assignments, filter)
340 }
341 LogicalPlan::Delete { table, filter } => {
342 let view = TxnCatalogView::new(&*catalog, &*overlay);
343 dml::execute_delete(&mut sql_txn, &view, &table, filter)
344 }
345 LogicalPlan::Scan { .. }
346 | LogicalPlan::Filter { .. }
347 | LogicalPlan::Aggregate { .. }
348 | LogicalPlan::Sort { .. }
349 | LogicalPlan::Limit { .. } => {
350 let view = TxnCatalogView::new(&*catalog, &*overlay);
351 query::execute_query(&mut sql_txn, &view, plan)
352 }
353 };
354
355 match result {
356 Ok(value) => {
357 sql_txn.flush_hnsw()?;
358 Ok(value)
359 }
360 Err(err) => {
361 let _ = sql_txn.abandon_hnsw();
362 Err(err)
363 }
364 }
365 }
366
367 fn map_catalog_error(err: CatalogError) -> ExecutorError {
368 match err {
369 CatalogError::Kv(e) => ExecutorError::Core(e),
370 CatalogError::Serialize(e) => ExecutorError::InvalidOperation {
371 operation: "CatalogPersistence".into(),
372 reason: e.to_string(),
373 },
374 CatalogError::InvalidKey(reason) => ExecutorError::InvalidOperation {
375 operation: "CatalogPersistence".into(),
376 reason,
377 },
378 }
379 }
380
381 fn execute_create_table_in_txn<'txn>(
382 &self,
383 catalog: &mut PersistentCatalog<S>,
384 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
385 overlay: &mut CatalogOverlay,
386 mut table: crate::catalog::TableMetadata,
387 with_options: Vec<(String, String)>,
388 if_not_exists: bool,
389 ) -> Result<ExecutionResult>
390 where
391 S: 'txn,
392 {
393 if catalog.table_exists_in_txn(&table.name, overlay) {
394 return if if_not_exists {
395 Ok(ExecutionResult::Success)
396 } else {
397 Err(ExecutorError::TableAlreadyExists(table.name))
398 };
399 }
400
401 table.storage_options = ddl::create_table::parse_storage_options(&with_options)?;
402
403 let pk_index = if let Some(pk_columns) = table.primary_key.clone() {
404 let column_indices = pk_columns
405 .iter()
406 .map(|name| {
407 table
408 .get_column_index(name)
409 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
410 })
411 .collect::<Result<Vec<_>>>()?;
412 let index_id = catalog.next_index_id();
413 let index_name = ddl::create_pk_index_name(&table.name);
414 let mut index = crate::catalog::IndexMetadata::new(
415 index_id,
416 index_name,
417 table.name.clone(),
418 pk_columns,
419 )
420 .with_column_indices(column_indices)
421 .with_unique(true);
422 index.catalog_name = table.catalog_name.clone();
423 index.namespace_name = table.namespace_name.clone();
424 Some(index)
425 } else {
426 None
427 };
428
429 let table_id = catalog.next_table_id();
430 table = table.with_table_id(table_id);
431
432 txn.delete_prefix(&KeyEncoder::table_prefix(table_id))?;
434 txn.delete_prefix(&KeyEncoder::sequence_key(table_id))?;
435
436 catalog
438 .persist_create_table(txn.inner_mut(), &table)
439 .map_err(Self::map_catalog_error)?;
440 if let Some(index) = &pk_index {
441 catalog
442 .persist_create_index(txn.inner_mut(), index)
443 .map_err(Self::map_catalog_error)?;
444 }
445
446 overlay.add_table(TableFqn::from(&table), table);
448 if let Some(index) = pk_index {
449 overlay.add_index(IndexFqn::from(&index), index);
450 }
451
452 Ok(ExecutionResult::Success)
453 }
454
455 fn execute_drop_table_in_txn<'txn>(
456 &self,
457 catalog: &mut PersistentCatalog<S>,
458 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
459 overlay: &mut CatalogOverlay,
460 table_name: &str,
461 if_exists: bool,
462 ) -> Result<ExecutionResult>
463 where
464 S: 'txn,
465 {
466 let table_meta = match catalog.get_table_in_txn(table_name, overlay) {
467 Some(table) => table.clone(),
468 None => {
469 return if if_exists {
470 Ok(ExecutionResult::Success)
471 } else {
472 Err(ExecutorError::TableNotFound(table_name.to_string()))
473 };
474 }
475 };
476 if table_meta.catalog_name != "default" || table_meta.namespace_name != "default" {
477 return if if_exists {
478 Ok(ExecutionResult::Success)
479 } else {
480 Err(ExecutorError::TableNotFound(table_name.to_string()))
481 };
482 }
483
484 let indexes = TxnCatalogView::new(catalog, overlay)
485 .get_indexes_for_table(table_name)
486 .into_iter()
487 .cloned()
488 .collect::<Vec<_>>();
489
490 for index in &indexes {
491 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
492 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, index, false)?;
493 } else {
494 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
495 }
496 }
497
498 txn.delete_prefix(&KeyEncoder::table_prefix(table_meta.table_id))?;
499 txn.delete_prefix(&KeyEncoder::sequence_key(table_meta.table_id))?;
500
501 catalog
502 .persist_drop_table(txn.inner_mut(), &TableFqn::from(&table_meta))
503 .map_err(Self::map_catalog_error)?;
504
505 overlay.drop_table(&TableFqn::from(&table_meta));
506
507 Ok(ExecutionResult::Success)
508 }
509
510 fn execute_create_index_in_txn<'txn>(
511 &self,
512 catalog: &mut PersistentCatalog<S>,
513 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
514 overlay: &mut CatalogOverlay,
515 mut index: crate::catalog::IndexMetadata,
516 if_not_exists: bool,
517 ) -> Result<ExecutionResult>
518 where
519 S: 'txn,
520 {
521 if ddl::is_implicit_pk_index(&index.name) {
522 return Err(ExecutorError::InvalidIndexName {
523 name: index.name.clone(),
524 reason: "Index names starting with '__pk_' are reserved for PRIMARY KEY".into(),
525 });
526 }
527
528 if catalog.index_exists_in_txn(&index.name, overlay) {
529 return if if_not_exists {
530 Ok(ExecutionResult::Success)
531 } else {
532 Err(ExecutorError::IndexAlreadyExists(index.name))
533 };
534 }
535
536 let table = catalog
537 .get_table_in_txn(&index.table, overlay)
538 .ok_or_else(|| ExecutorError::TableNotFound(index.table.clone()))?
539 .clone();
540 index.catalog_name = table.catalog_name.clone();
541 index.namespace_name = table.namespace_name.clone();
542
543 let column_indices = index
544 .columns
545 .iter()
546 .map(|name| {
547 table
548 .get_column_index(name)
549 .ok_or_else(|| ExecutorError::ColumnNotFound(name.clone()))
550 })
551 .collect::<Result<Vec<_>>>()?;
552
553 let index_id = catalog.next_index_id();
554 index.index_id = index_id;
555 index.column_indices = column_indices.clone();
556
557 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
558 crate::executor::hnsw_bridge::HnswBridge::create_index(txn, &table, &index)?;
559 } else {
560 ddl::create_index::build_index_for_existing_rows(txn, &table, &index, column_indices)?;
561 }
562
563 catalog
564 .persist_create_index(txn.inner_mut(), &index)
565 .map_err(Self::map_catalog_error)?;
566
567 overlay.add_index(IndexFqn::from(&index), index);
568
569 Ok(ExecutionResult::Success)
570 }
571
572 fn execute_drop_index_in_txn<'txn>(
573 &self,
574 catalog: &mut PersistentCatalog<S>,
575 txn: &mut impl crate::storage::SqlTxn<'txn, S>,
576 overlay: &mut CatalogOverlay,
577 index_name: &str,
578 if_exists: bool,
579 ) -> Result<ExecutionResult>
580 where
581 S: 'txn,
582 {
583 if ddl::is_implicit_pk_index(index_name) {
584 return Err(ExecutorError::InvalidOperation {
585 operation: "DROP INDEX".into(),
586 reason: "Cannot drop implicit PRIMARY KEY index directly; use DROP TABLE".into(),
587 });
588 }
589
590 let index = match catalog.get_index_in_txn(index_name, overlay) {
591 Some(index) => index.clone(),
592 None => {
593 return if if_exists {
594 Ok(ExecutionResult::Success)
595 } else {
596 Err(ExecutorError::IndexNotFound(index_name.to_string()))
597 };
598 }
599 };
600 if index.catalog_name != "default" || index.namespace_name != "default" {
601 return if if_exists {
602 Ok(ExecutionResult::Success)
603 } else {
604 Err(ExecutorError::IndexNotFound(index_name.to_string()))
605 };
606 }
607
608 if matches!(index.method, Some(crate::ast::ddl::IndexMethod::Hnsw)) {
609 crate::executor::hnsw_bridge::HnswBridge::drop_index(txn, &index, if_exists)?;
610 } else {
611 txn.delete_prefix(&KeyEncoder::index_prefix(index.index_id))?;
612 }
613
614 catalog
615 .persist_drop_index(txn.inner_mut(), &IndexFqn::from(&index))
616 .map_err(Self::map_catalog_error)?;
617
618 overlay.drop_index(&IndexFqn::from(&index));
619
620 Ok(ExecutionResult::Success)
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627 use crate::catalog::MemoryCatalog;
628 use alopex_core::kv::memory::MemoryKV;
629
630 fn create_executor() -> Executor<MemoryKV, MemoryCatalog> {
631 let store = Arc::new(MemoryKV::new());
632 let catalog = Arc::new(RwLock::new(MemoryCatalog::new()));
633 Executor::new(store, catalog)
634 }
635
636 #[test]
637 fn test_executor_creation() {
638 let _executor = create_executor();
639 }
641
642 #[test]
643 fn create_table_is_supported() {
644 let mut executor = create_executor();
645
646 use crate::catalog::{ColumnMetadata, TableMetadata};
647 use crate::planner::ResolvedType;
648
649 let table = TableMetadata::new(
650 "test",
651 vec![ColumnMetadata::new("id", ResolvedType::Integer)],
652 );
653
654 let result = executor.execute(LogicalPlan::CreateTable {
655 table,
656 if_not_exists: false,
657 with_options: vec![],
658 });
659 assert!(matches!(result, Ok(ExecutionResult::Success)));
660
661 let catalog = executor.catalog.read().unwrap();
662 assert!(catalog.table_exists("test"));
663 }
664
665 #[test]
666 fn insert_is_supported() {
667 use crate::Span;
668 use crate::catalog::{ColumnMetadata, TableMetadata};
669 use crate::planner::typed_expr::TypedExprKind;
670 use crate::planner::types::ResolvedType;
671
672 let mut executor = create_executor();
673
674 let table = TableMetadata::new("t", vec![ColumnMetadata::new("id", ResolvedType::Integer)])
675 .with_primary_key(vec!["id".into()]);
676
677 executor
678 .execute(LogicalPlan::CreateTable {
679 table,
680 if_not_exists: false,
681 with_options: vec![],
682 })
683 .unwrap();
684
685 let result = executor.execute(LogicalPlan::Insert {
686 table: "t".into(),
687 columns: vec!["id".into()],
688 values: vec![vec![crate::planner::typed_expr::TypedExpr {
689 kind: TypedExprKind::Literal(crate::ast::expr::Literal::Number("1".into())),
690 resolved_type: ResolvedType::Integer,
691 span: Span::default(),
692 }]],
693 });
694 assert!(matches!(result, Ok(ExecutionResult::RowsAffected(1))));
695 }
696}