1use std::sync::{Arc, RwLock};
4
5use arrow::record_batch::RecordBatch;
6use llkv_result::{Error, Result};
7use llkv_storage::pager::{BoxedPager, MemPager};
8use llkv_table::types::TableId;
9use llkv_table::{
10 SingleColumnIndexDescriptor, canonical_table_name, validate_alter_table_operation,
11};
12
13use crate::{
14 AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource,
15 CreateViewPlan, DeletePlan, DropIndexPlan, DropTablePlan, DropViewPlan, InsertPlan,
16 InsertSource, PlanColumnSpec, PlanOperation, PlanValue, RenameTablePlan, RuntimeContext,
17 RuntimeStatementResult, RuntimeTransactionContext, SelectExecution, SelectPlan,
18 SelectProjection, TransactionContext, TransactionKind, TransactionResult, TransactionSession,
19 UpdatePlan,
20};
21use crate::{
22 PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
23 RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry, TEMPORARY_NAMESPACE_ID,
24 TemporaryRuntimeNamespace,
25};
26use llkv_plan::TruncatePlan;
27
28type StatementResult = RuntimeStatementResult<BoxedPager>;
29type TxnResult = TransactionResult<BoxedPager>;
30type BaseTxnContext = RuntimeTransactionContext<BoxedPager>;
31
32pub(crate) struct SessionNamespaces {
33 persistent: Arc<PersistentRuntimeNamespace>,
34 temporary: Option<Arc<TemporaryRuntimeNamespace>>,
35 registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
36}
37
38impl SessionNamespaces {
39 pub(crate) fn new(base_context: Arc<RuntimeContext<BoxedPager>>) -> Self {
40 let persistent = Arc::new(PersistentRuntimeNamespace::new(
41 PERSISTENT_NAMESPACE_ID.to_string(),
42 Arc::clone(&base_context),
43 ));
44
45 let mut registry = RuntimeStorageNamespaceRegistry::new(
46 RuntimeStorageNamespace::namespace_id(persistent.as_ref()).clone(),
47 );
48 registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
49
50 let temporary = {
51 let shared_catalog = base_context.table_catalog();
67 let temp_mem_pager = Arc::new(MemPager::default());
68 let temp_boxed_pager = Arc::new(BoxedPager::from_arc(temp_mem_pager));
69 let temp_context = Arc::new(
70 RuntimeContext::new_with_catalog(temp_boxed_pager, Arc::clone(&shared_catalog))
71 .with_fallback_lookup(Arc::clone(&base_context)),
72 );
73
74 const TEMPORARY_TABLE_ID_START: TableId = 0x8000;
75 temp_context
76 .ensure_next_table_id_at_least(TEMPORARY_TABLE_ID_START)
77 .expect("failed to seed temporary namespace table id counter");
78
79 let namespace = Arc::new(TemporaryRuntimeNamespace::new(
80 TEMPORARY_NAMESPACE_ID.to_string(),
81 temp_context,
82 ));
83 registry.register_namespace(
84 Arc::clone(&namespace),
85 vec![TEMPORARY_NAMESPACE_ID.to_string()],
86 true,
87 );
88 namespace
89 };
90
91 Self {
92 persistent,
93 temporary: Some(temporary),
94 registry: Arc::new(RwLock::new(registry)),
95 }
96 }
97
98 pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace> {
99 Arc::clone(&self.persistent)
100 }
101
102 pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
103 self.temporary.as_ref().map(Arc::clone)
104 }
105
106 pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
107 Arc::clone(&self.registry)
108 }
109}
110
111impl Drop for SessionNamespaces {
112 fn drop(&mut self) {
113 if let Some(temp) = &self.temporary {
114 let namespace_id = temp.namespace_id().to_string();
115 let canonical_names = {
116 let mut registry = self.registry.write().expect("namespace registry poisoned");
117 registry.drain_namespace_tables(&namespace_id)
118 };
119 temp.clear_tables(canonical_names);
120 }
121 }
122}
123
124pub struct RuntimeSession {
129 inner: TransactionSession<
131 RuntimeTransactionContext<BoxedPager>,
132 RuntimeTransactionContext<MemPager>,
133 >,
134 namespaces: Arc<SessionNamespaces>,
135}
136
137impl RuntimeSession {
138 pub(crate) fn from_parts(
139 inner: TransactionSession<
140 RuntimeTransactionContext<BoxedPager>,
141 RuntimeTransactionContext<MemPager>,
142 >,
143 namespaces: Arc<SessionNamespaces>,
144 ) -> Self {
145 Self { inner, namespaces }
146 }
147
148 pub(crate) fn clone_session(&self) -> Self {
151 Self {
152 inner: self.inner.clone_session(),
153 namespaces: self.namespaces.clone(),
154 }
155 }
156
157 pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
158 self.namespaces.registry()
159 }
160
161 fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
162 self.namespace_registry()
163 .read()
164 .expect("namespace registry poisoned")
165 .namespace_for_table(canonical)
166 }
167
168 fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
169 if plan.tables.len() != 1 {
170 return None;
171 }
172
173 let qualified = plan.tables[0].qualified_name();
174 let (_, canonical) = canonical_table_name(&qualified).ok()?;
175 Some(self.resolve_namespace_for_table(&canonical))
176 }
177
178 fn select_from_temporary(&self, plan: SelectPlan) -> Result<StatementResult> {
179 let temp_namespace = self
180 .temporary_namespace()
181 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
182
183 let table_name = if plan.tables.len() == 1 {
184 plan.tables[0].qualified_name()
185 } else {
186 String::new()
187 };
188
189 let temp_context = temp_namespace.context();
190 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
191 let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
192 let schema = execution.schema();
193 let batches = execution.collect()?;
194
195 let combined = if batches.is_empty() {
196 RecordBatch::new_empty(Arc::clone(&schema))
197 } else if batches.len() == 1 {
198 batches.into_iter().next().unwrap()
199 } else {
200 let refs: Vec<&RecordBatch> = batches.iter().collect();
201 arrow::compute::concat_batches(&schema, refs)?
202 };
203
204 let execution =
205 SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
206
207 Ok(RuntimeStatementResult::Select {
208 execution: Box::new(execution),
209 table_name,
210 schema,
211 })
212 }
213
214 fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace> {
215 self.namespaces.persistent()
216 }
217
218 #[allow(dead_code)]
219 fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
220 self.namespaces.temporary()
221 }
222
223 fn base_transaction_context(&self) -> Arc<BaseTxnContext> {
224 Arc::clone(self.inner.context())
225 }
226
227 fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
228 where
229 F: FnOnce(&BaseTxnContext) -> Result<T>,
230 {
231 let context = self.base_transaction_context();
232 let default_snapshot = context.ctx().default_snapshot();
233 TransactionContext::set_snapshot(&*context, default_snapshot);
234 f(context.as_ref())
235 }
236
237 fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TxnResult> {
238 self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
239 }
240
241 fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TxnResult> {
242 self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
243 }
244
245 fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TxnResult> {
246 self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
247 }
248
249 fn run_autocommit_truncate(&self, plan: TruncatePlan) -> Result<TxnResult> {
250 self.with_autocommit_transaction_context(|ctx| TransactionContext::truncate(ctx, plan))
251 }
252
253 fn run_autocommit_create_table(&self, plan: CreateTablePlan) -> Result<StatementResult> {
254 let result =
255 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
256 match result {
257 TransactionResult::CreateTable { table_name } => {
258 Ok(RuntimeStatementResult::CreateTable { table_name })
259 }
260 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
261 _ => Err(Error::Internal(
262 "unexpected transaction result for CREATE TABLE".into(),
263 )),
264 }
265 }
266
267 fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<StatementResult> {
268 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
269 Ok(RuntimeStatementResult::NoOp)
270 }
271
272 fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
273 self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
274 }
275
276 fn run_autocommit_alter_table(&self, plan: AlterTablePlan) -> Result<StatementResult> {
277 let result =
278 self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
279 match result {
280 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
281 TransactionResult::CreateTable { table_name } => {
282 Ok(RuntimeStatementResult::CreateTable { table_name })
283 }
284 TransactionResult::CreateIndex {
285 table_name,
286 index_name,
287 } => Ok(RuntimeStatementResult::CreateIndex {
288 table_name,
289 index_name,
290 }),
291 _ => Err(Error::Internal(
292 "unexpected transaction result for ALTER TABLE".into(),
293 )),
294 }
295 }
296
297 fn run_autocommit_create_index(&self, plan: CreateIndexPlan) -> Result<StatementResult> {
298 let result =
299 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
300 match result {
301 TransactionResult::CreateIndex {
302 table_name,
303 index_name,
304 } => Ok(RuntimeStatementResult::CreateIndex {
305 table_name,
306 index_name,
307 }),
308 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
309 _ => Err(Error::Internal(
310 "unexpected transaction result for CREATE INDEX".into(),
311 )),
312 }
313 }
314
315 fn run_autocommit_drop_index(
316 &self,
317 plan: DropIndexPlan,
318 ) -> Result<Option<SingleColumnIndexDescriptor>> {
319 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
320 }
321
322 pub fn begin_transaction(&self) -> Result<StatementResult> {
326 let staging_pager = Arc::new(MemPager::default());
327 tracing::trace!(
328 "BEGIN_TRANSACTION: Created staging pager at {:p}",
329 &*staging_pager
330 );
331 let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
332
333 let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
338
339 self.inner.begin_transaction(staging_wrapper)?;
340 Ok(RuntimeStatementResult::Transaction {
341 kind: TransactionKind::Begin,
342 })
343 }
344
345 pub fn abort_transaction(&self) {
348 self.inner.abort_transaction();
349 }
350
351 pub fn has_active_transaction(&self) -> bool {
353 let result = self.inner.has_active_transaction();
354 tracing::trace!("SESSION: has_active_transaction() = {}", result);
355 result
356 }
357
358 pub fn is_aborted(&self) -> bool {
360 self.inner.is_aborted()
361 }
362
363 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
365 self.inner.is_table_created_in_transaction(table_name)
366 }
367
368 pub fn table_column_specs_from_transaction(
371 &self,
372 table_name: &str,
373 ) -> Option<Vec<PlanColumnSpec>> {
374 self.inner.table_column_specs_from_transaction(table_name)
375 }
376
377 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
380 self.inner
381 .tables_referencing_in_transaction(referenced_table)
382 }
383
384 pub fn commit_transaction(&self) -> Result<StatementResult> {
387 tracing::trace!("Session::commit_transaction called");
388 let (tx_result, operations) = self.inner.commit_transaction()?;
389 tracing::trace!(
390 "Session::commit_transaction got {} operations",
391 operations.len()
392 );
393
394 if !operations.is_empty() {
395 let dropped_tables = self
396 .inner
397 .context()
398 .ctx()
399 .dropped_tables
400 .read()
401 .unwrap()
402 .clone();
403 if !dropped_tables.is_empty() {
404 for operation in &operations {
405 let table_name_opt = match operation {
406 PlanOperation::Insert(plan) => Some(plan.table.as_str()),
407 PlanOperation::Update(plan) => Some(plan.table.as_str()),
408 PlanOperation::Delete(plan) => Some(plan.table.as_str()),
409 _ => None,
410 };
411 if let Some(table_name) = table_name_opt {
412 let (_, canonical) = canonical_table_name(table_name)?;
413 if dropped_tables.contains(&canonical) {
414 self.abort_transaction();
415 return Err(Error::TransactionContextError(
416 "another transaction has dropped this table".into(),
417 ));
418 }
419 }
420 }
421 }
422 }
423
424 let kind = match tx_result {
426 TransactionResult::Transaction { kind } => kind,
427 _ => {
428 return Err(Error::Internal(
429 "commit_transaction returned non-transaction result".into(),
430 ));
431 }
432 };
433 tracing::trace!("Session::commit_transaction kind={:?}", kind);
434
435 for operation in operations {
437 match operation {
438 PlanOperation::CreateTable(plan) => {
439 TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
440 }
441 PlanOperation::DropTable(plan) => {
442 TransactionContext::drop_table(&**self.inner.context(), plan)?;
443 }
444 PlanOperation::Insert(plan) => {
445 TransactionContext::insert(&**self.inner.context(), plan)?;
446 }
447 PlanOperation::Update(plan) => {
448 TransactionContext::update(&**self.inner.context(), plan)?;
449 }
450 PlanOperation::Delete(plan) => {
451 TransactionContext::delete(&**self.inner.context(), plan)?;
452 }
453 _ => {}
454 }
455 }
456
457 let base_ctx = self.inner.context();
460 let default_snapshot = base_ctx.ctx().default_snapshot();
461 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
462
463 if matches!(kind, TransactionKind::Commit) {
465 let ctx = base_ctx.ctx();
466 let next_txn_id = ctx.txn_manager().current_next_txn_id();
467 if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
468 tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
469 }
470 }
471
472 Ok(RuntimeStatementResult::Transaction { kind })
474 }
475
476 pub fn rollback_transaction(&self) -> Result<StatementResult> {
478 self.inner.rollback_transaction()?;
479 let base_ctx = self.inner.context();
480 let default_snapshot = base_ctx.ctx().default_snapshot();
481 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
482 Ok(RuntimeStatementResult::Transaction {
483 kind: TransactionKind::Rollback,
484 })
485 }
486
487 fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
488 if matches!(plan.source, Some(CreateTableSource::Select { .. }))
491 && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
492 {
493 let select_result = self.execute_select_plan(*select_plan)?;
494 let (schema, batches) = match select_result {
495 RuntimeStatementResult::Select {
496 schema, execution, ..
497 } => {
498 let batches = execution.collect()?;
499 (schema, batches)
500 }
501 _ => {
502 return Err(Error::Internal(
503 "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
504 ));
505 }
506 };
507 plan.source = Some(CreateTableSource::Batches { schema, batches });
508 }
509 Ok(plan)
510 }
511
512 fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
513 let InsertPlan {
514 table,
515 columns,
516 source,
517 on_conflict,
518 } = plan;
519
520 match source {
521 InsertSource::Rows(rows) => {
522 let count = rows.len();
523 Ok((
524 InsertPlan {
525 table,
526 columns,
527 source: InsertSource::Rows(rows),
528 on_conflict,
529 },
530 count,
531 ))
532 }
533 InsertSource::Batches(batches) => {
534 let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
535 Ok((
536 InsertPlan {
537 table,
538 columns,
539 source: InsertSource::Batches(batches),
540 on_conflict,
541 },
542 count,
543 ))
544 }
545 InsertSource::Select { plan: select_plan } => {
546 let select_result = self.execute_select_plan(*select_plan)?;
547 let rows = match select_result {
548 RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
549 _ => {
550 return Err(Error::Internal(
551 "expected Select result when executing INSERT ... SELECT".into(),
552 ));
553 }
554 };
555 let count = rows.len();
556 Ok((
557 InsertPlan {
558 table,
559 columns,
560 source: InsertSource::Rows(rows),
561 on_conflict,
562 },
563 count,
564 ))
565 }
566 }
567 }
568
569 pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<StatementResult> {
571 tracing::trace!("Session::insert called for table={}", plan.table);
572 let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
573 let table_name = plan.table.clone();
574 let (_, canonical_table) = canonical_table_name(&plan.table)?;
575 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
576
577 match namespace_id.as_str() {
578 TEMPORARY_NAMESPACE_ID => {
579 let temp_namespace = self
580 .temporary_namespace()
581 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
582 let temp_context = temp_namespace.context();
583 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
584 match TransactionContext::insert(&temp_tx_context, plan)? {
585 TransactionResult::Insert { .. } => {}
586 _ => {
587 return Err(Error::Internal(
588 "unexpected transaction result for temporary INSERT".into(),
589 ));
590 }
591 }
592 Ok(RuntimeStatementResult::Insert {
593 rows_inserted,
594 table_name,
595 })
596 }
597 PERSISTENT_NAMESPACE_ID => {
598 if self.has_active_transaction() {
599 match self.inner.execute_operation(PlanOperation::Insert(plan)) {
600 Ok(_) => {
601 tracing::trace!("Session::insert succeeded for table={}", table_name);
602 Ok(RuntimeStatementResult::Insert {
603 rows_inserted,
604 table_name,
605 })
606 }
607 Err(e) => {
608 tracing::trace!(
609 "Session::insert failed for table={}, error={:?}",
610 table_name,
611 e
612 );
613 if matches!(e, Error::ConstraintError(_)) {
614 tracing::trace!("Transaction is_aborted=true");
615 self.abort_transaction();
616 }
617 Err(e)
618 }
619 }
620 } else {
621 let result = self.run_autocommit_insert(plan)?;
622 if !matches!(result, TransactionResult::Insert { .. }) {
623 return Err(Error::Internal(
624 "unexpected transaction result for INSERT operation".into(),
625 ));
626 }
627 Ok(RuntimeStatementResult::Insert {
628 rows_inserted,
629 table_name,
630 })
631 }
632 }
633 other => Err(Error::InvalidArgumentError(format!(
634 "Unknown storage namespace '{}'",
635 other
636 ))),
637 }
638 }
639
640 pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<StatementResult> {
642 if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
643 && namespace_id == TEMPORARY_NAMESPACE_ID
644 {
645 return self.select_from_temporary(plan);
646 }
647
648 if self.has_active_transaction() {
649 let tx_result = match self
650 .inner
651 .execute_operation(PlanOperation::Select(Box::new(plan.clone())))
652 {
653 Ok(result) => result,
654 Err(e) => {
655 if matches!(e, Error::ConstraintError(_)) {
658 self.abort_transaction();
659 }
660 return Err(e);
661 }
662 };
663 match tx_result {
664 TransactionResult::Select {
665 table_name,
666 schema,
667 execution: staging_execution,
668 } => {
669 let batches = staging_execution.collect().unwrap_or_default();
672 let combined = if batches.is_empty() {
673 RecordBatch::new_empty(Arc::clone(&schema))
674 } else if batches.len() == 1 {
675 batches.into_iter().next().unwrap()
676 } else {
677 let refs: Vec<&RecordBatch> = batches.iter().collect();
678 arrow::compute::concat_batches(&schema, refs)?
679 };
680
681 let execution = SelectExecution::from_batch(
682 table_name.clone(),
683 Arc::clone(&schema),
684 combined,
685 );
686
687 Ok(RuntimeStatementResult::Select {
688 execution: Box::new(execution),
689 table_name,
690 schema,
691 })
692 }
693 _ => Err(Error::Internal("expected Select result".into())),
694 }
695 } else {
696 let table_name = if plan.tables.len() == 1 {
698 plan.tables[0].qualified_name()
699 } else {
700 String::new()
701 };
702 let execution = self.with_autocommit_transaction_context(|ctx| {
703 TransactionContext::execute_select(ctx, plan)
704 })?;
705 let schema = execution.schema();
706 Ok(RuntimeStatementResult::Select {
707 execution: Box::new(execution),
708 table_name,
709 schema,
710 })
711 }
712 }
713
714 pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
716 let plan =
717 SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
718 match self.execute_select_plan(plan)? {
719 RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
720 other => Err(Error::Internal(format!(
721 "expected Select result when reading table '{}', got {:?}",
722 table, other
723 ))),
724 }
725 }
726
727 pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<StatementResult> {
728 let (_, canonical_table) = canonical_table_name(&plan.table)?;
729 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
730
731 match namespace_id.as_str() {
732 TEMPORARY_NAMESPACE_ID => {
733 let temp_namespace = self
734 .temporary_namespace()
735 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
736 let temp_context = temp_namespace.context();
737 let table_name = plan.table.clone();
738 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
739 match TransactionContext::update(&temp_tx_context, plan)? {
740 TransactionResult::Update { rows_updated, .. } => {
741 Ok(RuntimeStatementResult::Update {
742 rows_updated,
743 table_name,
744 })
745 }
746 _ => Err(Error::Internal(
747 "unexpected transaction result for temporary UPDATE".into(),
748 )),
749 }
750 }
751 PERSISTENT_NAMESPACE_ID => {
752 if self.has_active_transaction() {
753 let table_name = plan.table.clone();
754 let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
755 Ok(result) => result,
756 Err(e) => {
757 self.abort_transaction();
759 return Err(e);
760 }
761 };
762 match result {
763 TransactionResult::Update {
764 rows_matched: _,
765 rows_updated,
766 } => Ok(RuntimeStatementResult::Update {
767 rows_updated,
768 table_name,
769 }),
770 _ => Err(Error::Internal("expected Update result".into())),
771 }
772 } else {
773 let table_name = plan.table.clone();
774 let result = self.run_autocommit_update(plan)?;
775 match result {
776 TransactionResult::Update {
777 rows_matched: _,
778 rows_updated,
779 } => Ok(RuntimeStatementResult::Update {
780 rows_updated,
781 table_name,
782 }),
783 _ => Err(Error::Internal("expected Update result".into())),
784 }
785 }
786 }
787 other => Err(Error::InvalidArgumentError(format!(
788 "Unknown storage namespace '{}'",
789 other
790 ))),
791 }
792 }
793
794 pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<StatementResult> {
795 let (_, canonical_table) = canonical_table_name(&plan.table)?;
796 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
797
798 match namespace_id.as_str() {
799 TEMPORARY_NAMESPACE_ID => {
800 let temp_namespace = self
801 .temporary_namespace()
802 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
803 let temp_context = temp_namespace.context();
804 let table_name = plan.table.clone();
805 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
806 match TransactionContext::delete(&temp_tx_context, plan)? {
807 TransactionResult::Delete { rows_deleted } => {
808 Ok(RuntimeStatementResult::Delete {
809 rows_deleted,
810 table_name,
811 })
812 }
813 _ => Err(Error::Internal(
814 "unexpected transaction result for temporary DELETE".into(),
815 )),
816 }
817 }
818 PERSISTENT_NAMESPACE_ID => {
819 if self.has_active_transaction() {
820 let table_name = plan.table.clone();
821 let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
822 Ok(result) => result,
823 Err(e) => {
824 self.abort_transaction();
826 return Err(e);
827 }
828 };
829 match result {
830 TransactionResult::Delete { rows_deleted } => {
831 Ok(RuntimeStatementResult::Delete {
832 rows_deleted,
833 table_name,
834 })
835 }
836 _ => Err(Error::Internal("expected Delete result".into())),
837 }
838 } else {
839 let table_name = plan.table.clone();
840 let result = self.run_autocommit_delete(plan)?;
841 match result {
842 TransactionResult::Delete { rows_deleted } => {
843 Ok(RuntimeStatementResult::Delete {
844 rows_deleted,
845 table_name,
846 })
847 }
848 _ => Err(Error::Internal("expected Delete result".into())),
849 }
850 }
851 }
852 other => Err(Error::InvalidArgumentError(format!(
853 "Unknown storage namespace '{}'",
854 other
855 ))),
856 }
857 }
858
859 pub fn execute_truncate_plan(&self, plan: TruncatePlan) -> Result<StatementResult> {
860 let (_, canonical_table) = canonical_table_name(&plan.table)?;
861 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
862
863 match namespace_id.as_str() {
864 TEMPORARY_NAMESPACE_ID => {
865 let temp_namespace = self
866 .temporary_namespace()
867 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
868 let temp_context = temp_namespace.context();
869 let table_name = plan.table.clone();
870 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
871 match TransactionContext::truncate(&temp_tx_context, plan)? {
872 TransactionResult::Delete { rows_deleted } => {
873 Ok(RuntimeStatementResult::Delete {
874 rows_deleted,
875 table_name,
876 })
877 }
878 _ => Err(Error::Internal(
879 "unexpected transaction result for temporary TRUNCATE".into(),
880 )),
881 }
882 }
883 PERSISTENT_NAMESPACE_ID => {
884 if self.has_active_transaction() {
885 let table_name = plan.table.clone();
886 let result = match self.inner.execute_operation(PlanOperation::Truncate(plan)) {
887 Ok(result) => result,
888 Err(e) => {
889 self.abort_transaction();
891 return Err(e);
892 }
893 };
894 match result {
895 TransactionResult::Delete { rows_deleted } => {
896 Ok(RuntimeStatementResult::Delete {
897 rows_deleted,
898 table_name,
899 })
900 }
901 _ => Err(Error::Internal("expected Delete result".into())),
902 }
903 } else {
904 let table_name = plan.table.clone();
905 let result = self.run_autocommit_truncate(plan)?;
906 match result {
907 TransactionResult::Delete { rows_deleted } => {
908 Ok(RuntimeStatementResult::Delete {
909 rows_deleted,
910 table_name,
911 })
912 }
913 _ => Err(Error::Internal("expected Delete result".into())),
914 }
915 }
916 }
917 other => Err(Error::InvalidArgumentError(format!(
918 "Unknown storage namespace '{}'",
919 other
920 ))),
921 }
922 }
923}
924
925impl CatalogDdl for RuntimeSession {
929 type CreateTableOutput = StatementResult;
930 type DropTableOutput = StatementResult;
931 type RenameTableOutput = ();
932 type AlterTableOutput = StatementResult;
933 type CreateIndexOutput = StatementResult;
934 type DropIndexOutput = StatementResult;
935
936 fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
937 let target_namespace = plan
938 .namespace
939 .clone()
940 .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
941 .to_ascii_lowercase();
942
943 let plan = self.materialize_ctas_plan(plan)?;
944
945 match target_namespace.as_str() {
946 TEMPORARY_NAMESPACE_ID => {
947 let temp_namespace = self
948 .temporary_namespace()
949 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
950 let (_, canonical) = canonical_table_name(&plan.name)?;
951 let result = temp_namespace.create_table(plan)?;
952 if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
953 let namespace_id = temp_namespace.namespace_id().to_string();
954 let registry = self.namespace_registry();
955 registry
956 .write()
957 .expect("namespace registry poisoned")
958 .register_table(&namespace_id, canonical);
959 }
960 Ok(result)
961 }
962 PERSISTENT_NAMESPACE_ID => {
963 if self.has_active_transaction() {
964 match self
965 .inner
966 .execute_operation(PlanOperation::CreateTable(plan))
967 {
968 Ok(TransactionResult::CreateTable { table_name }) => {
969 Ok(RuntimeStatementResult::CreateTable { table_name })
970 }
971 Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
972 Ok(_) => Err(Error::Internal(
973 "expected CreateTable result during transactional CREATE TABLE".into(),
974 )),
975 Err(err) => {
976 self.abort_transaction();
977 Err(err)
978 }
979 }
980 } else {
981 if self.inner.has_table_locked_by_other_session(&plan.name) {
982 return Err(Error::TransactionContextError(format!(
983 "table '{}' is locked by another active transaction",
984 plan.name
985 )));
986 }
987 self.run_autocommit_create_table(plan)
988 }
989 }
990 other => Err(Error::InvalidArgumentError(format!(
991 "Unknown storage namespace '{}'",
992 other
993 ))),
994 }
995 }
996
997 fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
998 let (_, canonical_table) = canonical_table_name(&plan.name)?;
999 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1000
1001 match namespace_id.as_str() {
1002 TEMPORARY_NAMESPACE_ID => {
1003 let temp_namespace = self
1004 .temporary_namespace()
1005 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1006 temp_namespace.drop_table(plan)?;
1007 let registry = self.namespace_registry();
1008 registry
1009 .write()
1010 .expect("namespace registry poisoned")
1011 .unregister_table(&canonical_table);
1012 Ok(RuntimeStatementResult::NoOp)
1013 }
1014 PERSISTENT_NAMESPACE_ID => {
1015 if self.has_active_transaction() {
1016 let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
1017 if !referencing_tables.is_empty() {
1018 let referencing_table = &referencing_tables[0];
1019 self.abort_transaction();
1020 return Err(Error::CatalogError(format!(
1021 "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
1022 referencing_table
1023 )));
1024 }
1025
1026 match self
1027 .inner
1028 .execute_operation(PlanOperation::DropTable(plan.clone()))
1029 {
1030 Ok(TransactionResult::NoOp) => {
1031 let registry = self.namespace_registry();
1032 registry
1033 .write()
1034 .expect("namespace registry poisoned")
1035 .unregister_table(&canonical_table);
1036 Ok(RuntimeStatementResult::NoOp)
1037 }
1038 Ok(_) => Err(Error::Internal(
1039 "expected NoOp result for DROP TABLE during transactional execution"
1040 .into(),
1041 )),
1042 Err(err) => {
1043 self.abort_transaction();
1044 Err(err)
1045 }
1046 }
1047 } else {
1048 if self.inner.has_table_locked_by_other_session(&plan.name) {
1049 return Err(Error::TransactionContextError(format!(
1050 "table '{}' is locked by another active transaction",
1051 plan.name
1052 )));
1053 }
1054 let result = self.run_autocommit_drop_table(plan)?;
1055 let registry = self.namespace_registry();
1056 registry
1057 .write()
1058 .expect("namespace registry poisoned")
1059 .unregister_table(&canonical_table);
1060 Ok(result)
1061 }
1062 }
1063 other => Err(Error::InvalidArgumentError(format!(
1064 "Unknown storage namespace '{}'",
1065 other
1066 ))),
1067 }
1068 }
1069
1070 fn create_view(&self, plan: CreateViewPlan) -> Result<()> {
1071 let target_namespace = plan
1072 .namespace
1073 .clone()
1074 .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
1075 .to_ascii_lowercase();
1076
1077 match target_namespace.as_str() {
1078 TEMPORARY_NAMESPACE_ID => {
1079 let temp_namespace = self
1080 .temporary_namespace()
1081 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1082 let (_, canonical) = canonical_table_name(&plan.name)?;
1083 temp_namespace.create_view(plan)?;
1084 let namespace_id = temp_namespace.namespace_id().to_string();
1085 let registry = self.namespace_registry();
1086 registry
1087 .write()
1088 .expect("namespace registry poisoned")
1089 .register_table(&namespace_id, canonical);
1090 Ok(())
1091 }
1092 PERSISTENT_NAMESPACE_ID => {
1093 let persistent_namespace = self.persistent_namespace();
1094 persistent_namespace.create_view(plan)
1095 }
1096 other => Err(Error::InvalidArgumentError(format!(
1097 "Unknown storage namespace '{}'",
1098 other
1099 ))),
1100 }
1101 }
1102
1103 fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1104 let (_, canonical_view) = canonical_table_name(&plan.name)?;
1105 let namespace_id = self.resolve_namespace_for_table(&canonical_view);
1106
1107 match namespace_id.as_str() {
1108 TEMPORARY_NAMESPACE_ID => {
1109 let temp_namespace = self
1110 .temporary_namespace()
1111 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1112 temp_namespace.drop_view(plan)?;
1113 let registry = self.namespace_registry();
1114 registry
1115 .write()
1116 .expect("namespace registry poisoned")
1117 .unregister_table(&canonical_view);
1118 Ok(())
1119 }
1120 PERSISTENT_NAMESPACE_ID => {
1121 let persistent_namespace = self.persistent_namespace();
1122 persistent_namespace.drop_view(plan)
1123 }
1124 other => Err(Error::InvalidArgumentError(format!(
1125 "Unknown storage namespace '{}'",
1126 other
1127 ))),
1128 }
1129 }
1130
1131 fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
1132 if self.has_active_transaction() {
1133 return Err(Error::InvalidArgumentError(
1134 "ALTER TABLE RENAME is not supported inside an active transaction".into(),
1135 ));
1136 }
1137
1138 let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
1139 let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
1140 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1141
1142 match namespace_id.as_str() {
1143 TEMPORARY_NAMESPACE_ID => {
1144 let temp_namespace = self
1145 .temporary_namespace()
1146 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1147 match temp_namespace.rename_table(plan.clone()) {
1148 Ok(()) => {
1149 let namespace_id = temp_namespace.namespace_id().to_string();
1150 let registry = self.namespace_registry();
1151 let mut registry = registry.write().expect("namespace registry poisoned");
1152 registry.unregister_table(&canonical_table);
1153 registry.register_table(&namespace_id, new_canonical);
1154 Ok(())
1155 }
1156 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1157 Err(err) => Err(err),
1158 }
1159 }
1160 PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1161 Ok(()) => Ok(()),
1162 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1163 Err(err) => Err(err),
1164 },
1165 other => Err(Error::InvalidArgumentError(format!(
1166 "Unknown storage namespace '{}'",
1167 other
1168 ))),
1169 }
1170 }
1171
1172 fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1173 let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1174 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1175
1176 match namespace_id.as_str() {
1177 TEMPORARY_NAMESPACE_ID => {
1178 let temp_namespace = self
1179 .temporary_namespace()
1180 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1181
1182 let context = temp_namespace.context();
1183 let catalog_service = &context.catalog_service;
1184 let view = match catalog_service.table_view(&canonical_table) {
1185 Ok(view) => view,
1186 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1187 return Ok(RuntimeStatementResult::NoOp);
1188 }
1189 Err(err) => return Err(err),
1190 };
1191 let table_id = view
1192 .table_meta
1193 .as_ref()
1194 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1195 .table_id;
1196
1197 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1198
1199 Ok(temp_namespace.alter_table(plan)?)
1200 }
1201 PERSISTENT_NAMESPACE_ID => {
1202 let persistent = self.persistent_namespace();
1203 let context = persistent.context();
1204 let catalog_service = &context.catalog_service;
1205 let view = match catalog_service.table_view(&canonical_table) {
1206 Ok(view) => view,
1207 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1208 return Ok(RuntimeStatementResult::NoOp);
1209 }
1210 Err(err) => return Err(err),
1211 };
1212 let table_id = view
1213 .table_meta
1214 .as_ref()
1215 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1216 .table_id;
1217
1218 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1219
1220 self.run_autocommit_alter_table(plan)
1221 }
1222 other => Err(Error::InvalidArgumentError(format!(
1223 "Unknown storage namespace '{}'",
1224 other
1225 ))),
1226 }
1227 }
1228
1229 fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1230 if plan.columns.is_empty() {
1231 return Err(Error::InvalidArgumentError(
1232 "CREATE INDEX requires at least one column".into(),
1233 ));
1234 }
1235
1236 let (_, canonical_table) = canonical_table_name(&plan.table)?;
1237 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1238
1239 match namespace_id.as_str() {
1240 TEMPORARY_NAMESPACE_ID => {
1241 let temp_namespace = self
1242 .temporary_namespace()
1243 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1244 Ok(temp_namespace.create_index(plan)?)
1245 }
1246 PERSISTENT_NAMESPACE_ID => {
1247 if self.has_active_transaction() {
1248 return Err(Error::InvalidArgumentError(
1249 "CREATE INDEX is not supported inside an active transaction".into(),
1250 ));
1251 }
1252
1253 self.run_autocommit_create_index(plan)
1254 }
1255 other => Err(Error::InvalidArgumentError(format!(
1256 "Unknown storage namespace '{}'",
1257 other
1258 ))),
1259 }
1260 }
1261
1262 fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1263 if self.has_active_transaction() {
1264 return Err(Error::InvalidArgumentError(
1265 "DROP INDEX is not supported inside an active transaction".into(),
1266 ));
1267 }
1268
1269 let mut dropped = false;
1270
1271 match self.run_autocommit_drop_index(plan.clone()) {
1272 Ok(Some(_)) => {
1273 dropped = true;
1274 }
1275 Ok(None) => {}
1276 Err(err) => {
1277 if !super::is_index_not_found_error(&err) {
1278 return Err(err);
1279 }
1280 }
1281 }
1282
1283 if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1284 match temp_namespace.drop_index(plan.clone()) {
1285 Ok(Some(_)) => {
1286 dropped = true;
1287 }
1288 Ok(None) => {}
1289 Err(err) => {
1290 if !super::is_index_not_found_error(&err) {
1291 return Err(err);
1292 }
1293 }
1294 }
1295 }
1296
1297 if dropped || plan.if_exists {
1298 Ok(RuntimeStatementResult::NoOp)
1299 } else {
1300 Err(Error::CatalogError(format!(
1301 "Index '{}' does not exist",
1302 plan.name
1303 )))
1304 }
1305 }
1306}