1use std::sync::{Arc, RwLock};
4
5use arrow::record_batch::RecordBatch;
6use llkv_result::{Error, Result};
7use llkv_storage::pager::{BoxedPager, MemPager, Pager};
8use llkv_table::{
9 SingleColumnIndexDescriptor, canonical_table_name, validate_alter_table_operation,
10};
11use simd_r_drive_entry_handle::EntryHandle;
12
13use crate::{
14 AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan,
15 DropIndexPlan, DropTablePlan, InsertPlan, InsertSource, PlanColumnSpec, PlanOperation,
16 PlanValue, RenameTablePlan, RuntimeContext, RuntimeStatementResult, RuntimeTransactionContext,
17 SelectExecution, SelectPlan, SelectProjection, TransactionContext, TransactionKind,
18 TransactionResult, TransactionSession, UpdatePlan,
19};
20use crate::{
21 PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
22 RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry, TEMPORARY_NAMESPACE_ID,
23 TemporaryRuntimeNamespace,
24};
25
26pub(crate) struct SessionNamespaces<P>
27where
28 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
29{
30 persistent: Arc<PersistentRuntimeNamespace<P>>,
31 temporary: Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>>,
32 registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
33}
34
35impl<P> SessionNamespaces<P>
36where
37 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
38{
39 pub(crate) fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
40 let persistent = Arc::new(PersistentRuntimeNamespace::new(
41 PERSISTENT_NAMESPACE_ID.to_string(),
42 Arc::clone(&base_context),
43 ));
44
45 let mut registry = RuntimeStorageNamespaceRegistry::new(
46 RuntimeStorageNamespace::namespace_id(persistent.as_ref()).clone(),
47 );
48 registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
49
50 let temporary = {
51 let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
52 let temp_context = Arc::new(RuntimeContext::new(temp_pager));
53 let namespace = Arc::new(TemporaryRuntimeNamespace::new(
54 TEMPORARY_NAMESPACE_ID.to_string(),
55 temp_context,
56 ));
57 registry.register_namespace(
58 Arc::clone(&namespace),
59 vec![TEMPORARY_NAMESPACE_ID.to_string()],
60 true,
61 );
62 namespace
63 };
64
65 Self {
66 persistent,
67 temporary: Some(temporary),
68 registry: Arc::new(RwLock::new(registry)),
69 }
70 }
71
72 pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace<P>> {
73 Arc::clone(&self.persistent)
74 }
75
76 pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
77 self.temporary.as_ref().map(Arc::clone)
78 }
79
80 pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
81 Arc::clone(&self.registry)
82 }
83}
84
85impl<P> Drop for SessionNamespaces<P>
86where
87 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
88{
89 fn drop(&mut self) {
90 if let Some(temp) = &self.temporary {
91 let namespace_id = temp.namespace_id().to_string();
92 let canonical_names = {
93 let mut registry = self.registry.write().expect("namespace registry poisoned");
94 registry.drain_namespace_tables(&namespace_id)
95 };
96 temp.clear_tables(canonical_names);
97 }
98 }
99}
100
101pub struct RuntimeSession<P>
106where
107 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
108{
109 inner: TransactionSession<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
113 namespaces: Arc<SessionNamespaces<P>>,
114}
115
116impl<P> RuntimeSession<P>
117where
118 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
119{
120 pub(crate) fn from_parts(
121 inner: TransactionSession<
122 RuntimeTransactionContext<P>,
123 RuntimeTransactionContext<MemPager>,
124 >,
125 namespaces: Arc<SessionNamespaces<P>>,
126 ) -> Self {
127 Self { inner, namespaces }
128 }
129
130 pub(crate) fn clone_session(&self) -> Self {
133 Self {
134 inner: self.inner.clone_session(),
135 namespaces: self.namespaces.clone(),
136 }
137 }
138
139 pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
140 self.namespaces.registry()
141 }
142
143 fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
144 self.namespace_registry()
145 .read()
146 .expect("namespace registry poisoned")
147 .namespace_for_table(canonical)
148 }
149
150 fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
151 if plan.tables.len() != 1 {
152 return None;
153 }
154
155 let qualified = plan.tables[0].qualified_name();
156 let (_, canonical) = canonical_table_name(&qualified).ok()?;
157 Some(self.resolve_namespace_for_table(&canonical))
158 }
159
160 fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
161 let temp_namespace = self
162 .temporary_namespace()
163 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
164
165 let table_name = if plan.tables.len() == 1 {
166 plan.tables[0].qualified_name()
167 } else {
168 String::new()
169 };
170
171 let temp_context = temp_namespace.context();
172 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
173 let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
174 let schema = execution.schema();
175 let batches = execution.collect()?;
176
177 let combined = if batches.is_empty() {
178 RecordBatch::new_empty(Arc::clone(&schema))
179 } else if batches.len() == 1 {
180 batches.into_iter().next().unwrap()
181 } else {
182 let refs: Vec<&RecordBatch> = batches.iter().collect();
183 arrow::compute::concat_batches(&schema, refs)?
184 };
185
186 let execution =
187 SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
188
189 Ok(RuntimeStatementResult::Select {
190 execution: Box::new(execution),
191 table_name,
192 schema,
193 })
194 }
195
196 fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace<P>> {
197 self.namespaces.persistent()
198 }
199
200 #[allow(dead_code)]
201 fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
202 self.namespaces.temporary()
203 }
204
205 fn base_transaction_context(&self) -> Arc<RuntimeTransactionContext<P>> {
206 Arc::clone(self.inner.context())
207 }
208
209 fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
210 where
211 F: FnOnce(&RuntimeTransactionContext<P>) -> Result<T>,
212 {
213 let context = self.base_transaction_context();
214 let default_snapshot = context.ctx().default_snapshot();
215 TransactionContext::set_snapshot(&*context, default_snapshot);
216 f(context.as_ref())
217 }
218
219 fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TransactionResult<P>> {
220 self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
221 }
222
223 fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TransactionResult<P>> {
224 self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
225 }
226
227 fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TransactionResult<P>> {
228 self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
229 }
230
231 fn run_autocommit_create_table(
232 &self,
233 plan: CreateTablePlan,
234 ) -> Result<RuntimeStatementResult<P>> {
235 let result =
236 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
237 match result {
238 TransactionResult::CreateTable { table_name } => {
239 Ok(RuntimeStatementResult::CreateTable { table_name })
240 }
241 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
242 _ => Err(Error::Internal(
243 "unexpected transaction result for CREATE TABLE".into(),
244 )),
245 }
246 }
247
248 fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<RuntimeStatementResult<P>> {
249 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
250 Ok(RuntimeStatementResult::NoOp)
251 }
252
253 fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
254 self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
255 }
256
257 fn run_autocommit_alter_table(
258 &self,
259 plan: AlterTablePlan,
260 ) -> Result<RuntimeStatementResult<P>> {
261 let result =
262 self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
263 match result {
264 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
265 TransactionResult::CreateTable { table_name } => {
266 Ok(RuntimeStatementResult::CreateTable { table_name })
267 }
268 TransactionResult::CreateIndex {
269 table_name,
270 index_name,
271 } => Ok(RuntimeStatementResult::CreateIndex {
272 table_name,
273 index_name,
274 }),
275 _ => Err(Error::Internal(
276 "unexpected transaction result for ALTER TABLE".into(),
277 )),
278 }
279 }
280
281 fn run_autocommit_create_index(
282 &self,
283 plan: CreateIndexPlan,
284 ) -> Result<RuntimeStatementResult<P>> {
285 let result =
286 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
287 match result {
288 TransactionResult::CreateIndex {
289 table_name,
290 index_name,
291 } => Ok(RuntimeStatementResult::CreateIndex {
292 table_name,
293 index_name,
294 }),
295 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
296 _ => Err(Error::Internal(
297 "unexpected transaction result for CREATE INDEX".into(),
298 )),
299 }
300 }
301
302 fn run_autocommit_drop_index(
303 &self,
304 plan: DropIndexPlan,
305 ) -> Result<Option<SingleColumnIndexDescriptor>> {
306 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
307 }
308
309 pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
313 let staging_pager = Arc::new(MemPager::default());
314 tracing::trace!(
315 "BEGIN_TRANSACTION: Created staging pager at {:p}",
316 &*staging_pager
317 );
318 let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
319
320 let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
325
326 self.inner.begin_transaction(staging_wrapper)?;
327 Ok(RuntimeStatementResult::Transaction {
328 kind: TransactionKind::Begin,
329 })
330 }
331
332 pub fn abort_transaction(&self) {
335 self.inner.abort_transaction();
336 }
337
338 pub fn has_active_transaction(&self) -> bool {
340 let result = self.inner.has_active_transaction();
341 tracing::trace!("SESSION: has_active_transaction() = {}", result);
342 result
343 }
344
345 pub fn is_aborted(&self) -> bool {
347 self.inner.is_aborted()
348 }
349
350 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
352 self.inner.is_table_created_in_transaction(table_name)
353 }
354
355 pub fn table_column_specs_from_transaction(
358 &self,
359 table_name: &str,
360 ) -> Option<Vec<PlanColumnSpec>> {
361 self.inner.table_column_specs_from_transaction(table_name)
362 }
363
364 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
367 self.inner
368 .tables_referencing_in_transaction(referenced_table)
369 }
370
371 pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
374 tracing::trace!("Session::commit_transaction called");
375 let (tx_result, operations) = self.inner.commit_transaction()?;
376 tracing::trace!(
377 "Session::commit_transaction got {} operations",
378 operations.len()
379 );
380
381 if !operations.is_empty() {
382 let dropped_tables = self
383 .inner
384 .context()
385 .ctx()
386 .dropped_tables
387 .read()
388 .unwrap()
389 .clone();
390 if !dropped_tables.is_empty() {
391 for operation in &operations {
392 let table_name_opt = match operation {
393 PlanOperation::Insert(plan) => Some(plan.table.as_str()),
394 PlanOperation::Update(plan) => Some(plan.table.as_str()),
395 PlanOperation::Delete(plan) => Some(plan.table.as_str()),
396 _ => None,
397 };
398 if let Some(table_name) = table_name_opt {
399 let (_, canonical) = canonical_table_name(table_name)?;
400 if dropped_tables.contains(&canonical) {
401 self.abort_transaction();
402 return Err(Error::TransactionContextError(
403 "another transaction has dropped this table".into(),
404 ));
405 }
406 }
407 }
408 }
409 }
410
411 let kind = match tx_result {
413 TransactionResult::Transaction { kind } => kind,
414 _ => {
415 return Err(Error::Internal(
416 "commit_transaction returned non-transaction result".into(),
417 ));
418 }
419 };
420 tracing::trace!("Session::commit_transaction kind={:?}", kind);
421
422 for operation in operations {
424 match operation {
425 PlanOperation::CreateTable(plan) => {
426 TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
427 }
428 PlanOperation::DropTable(plan) => {
429 TransactionContext::drop_table(&**self.inner.context(), plan)?;
430 }
431 PlanOperation::Insert(plan) => {
432 TransactionContext::insert(&**self.inner.context(), plan)?;
433 }
434 PlanOperation::Update(plan) => {
435 TransactionContext::update(&**self.inner.context(), plan)?;
436 }
437 PlanOperation::Delete(plan) => {
438 TransactionContext::delete(&**self.inner.context(), plan)?;
439 }
440 _ => {}
441 }
442 }
443
444 let base_ctx = self.inner.context();
447 let default_snapshot = base_ctx.ctx().default_snapshot();
448 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
449
450 if matches!(kind, TransactionKind::Commit) {
452 let ctx = base_ctx.ctx();
453 let next_txn_id = ctx.txn_manager().current_next_txn_id();
454 if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
455 tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
456 }
457 }
458
459 Ok(RuntimeStatementResult::Transaction { kind })
461 }
462
463 pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
465 self.inner.rollback_transaction()?;
466 let base_ctx = self.inner.context();
467 let default_snapshot = base_ctx.ctx().default_snapshot();
468 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
469 Ok(RuntimeStatementResult::Transaction {
470 kind: TransactionKind::Rollback,
471 })
472 }
473
474 fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
475 if matches!(plan.source, Some(CreateTableSource::Select { .. }))
478 && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
479 {
480 let select_result = self.execute_select_plan(*select_plan)?;
481 let (schema, batches) = match select_result {
482 RuntimeStatementResult::Select {
483 schema, execution, ..
484 } => {
485 let batches = execution.collect()?;
486 (schema, batches)
487 }
488 _ => {
489 return Err(Error::Internal(
490 "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
491 ));
492 }
493 };
494 plan.source = Some(CreateTableSource::Batches { schema, batches });
495 }
496 Ok(plan)
497 }
498
499 fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
500 let InsertPlan {
501 table,
502 columns,
503 source,
504 } = plan;
505
506 match source {
507 InsertSource::Rows(rows) => {
508 let count = rows.len();
509 Ok((
510 InsertPlan {
511 table,
512 columns,
513 source: InsertSource::Rows(rows),
514 },
515 count,
516 ))
517 }
518 InsertSource::Batches(batches) => {
519 let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
520 Ok((
521 InsertPlan {
522 table,
523 columns,
524 source: InsertSource::Batches(batches),
525 },
526 count,
527 ))
528 }
529 InsertSource::Select { plan: select_plan } => {
530 let select_result = self.execute_select_plan(*select_plan)?;
531 let rows = match select_result {
532 RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
533 _ => {
534 return Err(Error::Internal(
535 "expected Select result when executing INSERT ... SELECT".into(),
536 ));
537 }
538 };
539 let count = rows.len();
540 Ok((
541 InsertPlan {
542 table,
543 columns,
544 source: InsertSource::Rows(rows),
545 },
546 count,
547 ))
548 }
549 }
550 }
551
552 pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
554 tracing::trace!("Session::insert called for table={}", plan.table);
555 let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
556 let table_name = plan.table.clone();
557 let (_, canonical_table) = canonical_table_name(&plan.table)?;
558 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
559
560 match namespace_id.as_str() {
561 TEMPORARY_NAMESPACE_ID => {
562 let temp_namespace = self
563 .temporary_namespace()
564 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
565 let temp_context = temp_namespace.context();
566 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
567 match TransactionContext::insert(&temp_tx_context, plan)? {
568 TransactionResult::Insert { .. } => {}
569 _ => {
570 return Err(Error::Internal(
571 "unexpected transaction result for temporary INSERT".into(),
572 ));
573 }
574 }
575 Ok(RuntimeStatementResult::Insert {
576 rows_inserted,
577 table_name,
578 })
579 }
580 PERSISTENT_NAMESPACE_ID => {
581 if self.has_active_transaction() {
582 match self.inner.execute_operation(PlanOperation::Insert(plan)) {
583 Ok(_) => {
584 tracing::trace!("Session::insert succeeded for table={}", table_name);
585 Ok(RuntimeStatementResult::Insert {
586 rows_inserted,
587 table_name,
588 })
589 }
590 Err(e) => {
591 tracing::trace!(
592 "Session::insert failed for table={}, error={:?}",
593 table_name,
594 e
595 );
596 if matches!(e, Error::ConstraintError(_)) {
597 tracing::trace!("Transaction is_aborted=true");
598 self.abort_transaction();
599 }
600 Err(e)
601 }
602 }
603 } else {
604 let result = self.run_autocommit_insert(plan)?;
605 if !matches!(result, TransactionResult::Insert { .. }) {
606 return Err(Error::Internal(
607 "unexpected transaction result for INSERT operation".into(),
608 ));
609 }
610 Ok(RuntimeStatementResult::Insert {
611 rows_inserted,
612 table_name,
613 })
614 }
615 }
616 other => Err(Error::InvalidArgumentError(format!(
617 "Unknown storage namespace '{}'",
618 other
619 ))),
620 }
621 }
622
623 pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
625 if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
626 && namespace_id == TEMPORARY_NAMESPACE_ID
627 {
628 return self.select_from_temporary(plan);
629 }
630
631 if self.has_active_transaction() {
632 let tx_result = match self
633 .inner
634 .execute_operation(PlanOperation::Select(plan.clone()))
635 {
636 Ok(result) => result,
637 Err(e) => {
638 if matches!(e, Error::ConstraintError(_)) {
641 self.abort_transaction();
642 }
643 return Err(e);
644 }
645 };
646 match tx_result {
647 TransactionResult::Select {
648 table_name,
649 schema,
650 execution: staging_execution,
651 } => {
652 let batches = staging_execution.collect().unwrap_or_default();
655 let combined = if batches.is_empty() {
656 RecordBatch::new_empty(Arc::clone(&schema))
657 } else if batches.len() == 1 {
658 batches.into_iter().next().unwrap()
659 } else {
660 let refs: Vec<&RecordBatch> = batches.iter().collect();
661 arrow::compute::concat_batches(&schema, refs)?
662 };
663
664 let execution = SelectExecution::from_batch(
665 table_name.clone(),
666 Arc::clone(&schema),
667 combined,
668 );
669
670 Ok(RuntimeStatementResult::Select {
671 execution: Box::new(execution),
672 table_name,
673 schema,
674 })
675 }
676 _ => Err(Error::Internal("expected Select result".into())),
677 }
678 } else {
679 let table_name = if plan.tables.len() == 1 {
681 plan.tables[0].qualified_name()
682 } else {
683 String::new()
684 };
685 let execution = self.with_autocommit_transaction_context(|ctx| {
686 TransactionContext::execute_select(ctx, plan)
687 })?;
688 let schema = execution.schema();
689 Ok(RuntimeStatementResult::Select {
690 execution: Box::new(execution),
691 table_name,
692 schema,
693 })
694 }
695 }
696
697 pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
699 let plan =
700 SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
701 match self.execute_select_plan(plan)? {
702 RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
703 other => Err(Error::Internal(format!(
704 "expected Select result when reading table '{}', got {:?}",
705 table, other
706 ))),
707 }
708 }
709
710 pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
711 let (_, canonical_table) = canonical_table_name(&plan.table)?;
712 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
713
714 match namespace_id.as_str() {
715 TEMPORARY_NAMESPACE_ID => {
716 let temp_namespace = self
717 .temporary_namespace()
718 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
719 let temp_context = temp_namespace.context();
720 let table_name = plan.table.clone();
721 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
722 match TransactionContext::update(&temp_tx_context, plan)? {
723 TransactionResult::Update { rows_updated, .. } => {
724 Ok(RuntimeStatementResult::Update {
725 rows_updated,
726 table_name,
727 })
728 }
729 _ => Err(Error::Internal(
730 "unexpected transaction result for temporary UPDATE".into(),
731 )),
732 }
733 }
734 PERSISTENT_NAMESPACE_ID => {
735 if self.has_active_transaction() {
736 let table_name = plan.table.clone();
737 let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
738 Ok(result) => result,
739 Err(e) => {
740 self.abort_transaction();
742 return Err(e);
743 }
744 };
745 match result {
746 TransactionResult::Update {
747 rows_matched: _,
748 rows_updated,
749 } => Ok(RuntimeStatementResult::Update {
750 rows_updated,
751 table_name,
752 }),
753 _ => Err(Error::Internal("expected Update result".into())),
754 }
755 } else {
756 let table_name = plan.table.clone();
757 let result = self.run_autocommit_update(plan)?;
758 match result {
759 TransactionResult::Update {
760 rows_matched: _,
761 rows_updated,
762 } => Ok(RuntimeStatementResult::Update {
763 rows_updated,
764 table_name,
765 }),
766 _ => Err(Error::Internal("expected Update result".into())),
767 }
768 }
769 }
770 other => Err(Error::InvalidArgumentError(format!(
771 "Unknown storage namespace '{}'",
772 other
773 ))),
774 }
775 }
776
777 pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
778 let (_, canonical_table) = canonical_table_name(&plan.table)?;
779 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
780
781 match namespace_id.as_str() {
782 TEMPORARY_NAMESPACE_ID => {
783 let temp_namespace = self
784 .temporary_namespace()
785 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
786 let temp_context = temp_namespace.context();
787 let table_name = plan.table.clone();
788 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
789 match TransactionContext::delete(&temp_tx_context, plan)? {
790 TransactionResult::Delete { rows_deleted } => {
791 Ok(RuntimeStatementResult::Delete {
792 rows_deleted,
793 table_name,
794 })
795 }
796 _ => Err(Error::Internal(
797 "unexpected transaction result for temporary DELETE".into(),
798 )),
799 }
800 }
801 PERSISTENT_NAMESPACE_ID => {
802 if self.has_active_transaction() {
803 let table_name = plan.table.clone();
804 let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
805 Ok(result) => result,
806 Err(e) => {
807 self.abort_transaction();
809 return Err(e);
810 }
811 };
812 match result {
813 TransactionResult::Delete { rows_deleted } => {
814 Ok(RuntimeStatementResult::Delete {
815 rows_deleted,
816 table_name,
817 })
818 }
819 _ => Err(Error::Internal("expected Delete result".into())),
820 }
821 } else {
822 let table_name = plan.table.clone();
823 let result = self.run_autocommit_delete(plan)?;
824 match result {
825 TransactionResult::Delete { rows_deleted } => {
826 Ok(RuntimeStatementResult::Delete {
827 rows_deleted,
828 table_name,
829 })
830 }
831 _ => Err(Error::Internal("expected Delete result".into())),
832 }
833 }
834 }
835 other => Err(Error::InvalidArgumentError(format!(
836 "Unknown storage namespace '{}'",
837 other
838 ))),
839 }
840 }
841}
842
843impl<P> CatalogDdl for RuntimeSession<P>
847where
848 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
849{
850 type CreateTableOutput = RuntimeStatementResult<P>;
851 type DropTableOutput = RuntimeStatementResult<P>;
852 type RenameTableOutput = ();
853 type AlterTableOutput = RuntimeStatementResult<P>;
854 type CreateIndexOutput = RuntimeStatementResult<P>;
855 type DropIndexOutput = RuntimeStatementResult<P>;
856
857 fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
858 let target_namespace = plan
859 .namespace
860 .clone()
861 .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
862 .to_ascii_lowercase();
863
864 let plan = self.materialize_ctas_plan(plan)?;
865
866 match target_namespace.as_str() {
867 TEMPORARY_NAMESPACE_ID => {
868 let temp_namespace = self
869 .temporary_namespace()
870 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
871 let (_, canonical) = canonical_table_name(&plan.name)?;
872 let result = temp_namespace.create_table(plan)?;
873 if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
874 let namespace_id = temp_namespace.namespace_id().to_string();
875 let registry = self.namespace_registry();
876 registry
877 .write()
878 .expect("namespace registry poisoned")
879 .register_table(&namespace_id, canonical);
880 }
881 result.convert_pager_type::<P>()
882 }
883 PERSISTENT_NAMESPACE_ID => {
884 if self.has_active_transaction() {
885 match self
886 .inner
887 .execute_operation(PlanOperation::CreateTable(plan))
888 {
889 Ok(TransactionResult::CreateTable { table_name }) => {
890 Ok(RuntimeStatementResult::CreateTable { table_name })
891 }
892 Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
893 Ok(_) => Err(Error::Internal(
894 "expected CreateTable result during transactional CREATE TABLE".into(),
895 )),
896 Err(err) => {
897 self.abort_transaction();
898 Err(err)
899 }
900 }
901 } else {
902 if self.inner.has_table_locked_by_other_session(&plan.name) {
903 return Err(Error::TransactionContextError(format!(
904 "table '{}' is locked by another active transaction",
905 plan.name
906 )));
907 }
908 self.run_autocommit_create_table(plan)
909 }
910 }
911 other => Err(Error::InvalidArgumentError(format!(
912 "Unknown storage namespace '{}'",
913 other
914 ))),
915 }
916 }
917
918 fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
919 let (_, canonical_table) = canonical_table_name(&plan.name)?;
920 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
921
922 match namespace_id.as_str() {
923 TEMPORARY_NAMESPACE_ID => {
924 let temp_namespace = self
925 .temporary_namespace()
926 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
927 temp_namespace.drop_table(plan)?;
928 let registry = self.namespace_registry();
929 registry
930 .write()
931 .expect("namespace registry poisoned")
932 .unregister_table(&canonical_table);
933 Ok(RuntimeStatementResult::NoOp)
934 }
935 PERSISTENT_NAMESPACE_ID => {
936 if self.has_active_transaction() {
937 let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
938 if !referencing_tables.is_empty() {
939 let referencing_table = &referencing_tables[0];
940 self.abort_transaction();
941 return Err(Error::CatalogError(format!(
942 "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
943 referencing_table
944 )));
945 }
946
947 match self
948 .inner
949 .execute_operation(PlanOperation::DropTable(plan.clone()))
950 {
951 Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
952 Ok(_) => Err(Error::Internal(
953 "expected NoOp result for DROP TABLE during transactional execution"
954 .into(),
955 )),
956 Err(err) => {
957 self.abort_transaction();
958 Err(err)
959 }
960 }
961 } else {
962 if self.inner.has_table_locked_by_other_session(&plan.name) {
963 return Err(Error::TransactionContextError(format!(
964 "table '{}' is locked by another active transaction",
965 plan.name
966 )));
967 }
968 self.run_autocommit_drop_table(plan)
969 }
970 }
971 other => Err(Error::InvalidArgumentError(format!(
972 "Unknown storage namespace '{}'",
973 other
974 ))),
975 }
976 }
977
978 fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
979 if self.has_active_transaction() {
980 return Err(Error::InvalidArgumentError(
981 "ALTER TABLE RENAME is not supported inside an active transaction".into(),
982 ));
983 }
984
985 let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
986 let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
987 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
988
989 match namespace_id.as_str() {
990 TEMPORARY_NAMESPACE_ID => {
991 let temp_namespace = self
992 .temporary_namespace()
993 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
994 match temp_namespace.rename_table(plan.clone()) {
995 Ok(()) => {
996 let namespace_id = temp_namespace.namespace_id().to_string();
997 let registry = self.namespace_registry();
998 let mut registry = registry.write().expect("namespace registry poisoned");
999 registry.unregister_table(&canonical_table);
1000 registry.register_table(&namespace_id, new_canonical);
1001 Ok(())
1002 }
1003 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1004 Err(err) => Err(err),
1005 }
1006 }
1007 PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1008 Ok(()) => Ok(()),
1009 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1010 Err(err) => Err(err),
1011 },
1012 other => Err(Error::InvalidArgumentError(format!(
1013 "Unknown storage namespace '{}'",
1014 other
1015 ))),
1016 }
1017 }
1018
1019 fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1020 let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1021 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1022
1023 match namespace_id.as_str() {
1024 TEMPORARY_NAMESPACE_ID => {
1025 let temp_namespace = self
1026 .temporary_namespace()
1027 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1028
1029 let context = temp_namespace.context();
1030 let catalog_service = &context.catalog_service;
1031 let view = match catalog_service.table_view(&canonical_table) {
1032 Ok(view) => view,
1033 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1034 return Ok(RuntimeStatementResult::NoOp);
1035 }
1036 Err(err) => return Err(err),
1037 };
1038 let table_id = view
1039 .table_meta
1040 .as_ref()
1041 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1042 .table_id;
1043
1044 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1045
1046 temp_namespace.alter_table(plan)?.convert_pager_type::<P>()
1047 }
1048 PERSISTENT_NAMESPACE_ID => {
1049 let persistent = self.persistent_namespace();
1050 let context = persistent.context();
1051 let catalog_service = &context.catalog_service;
1052 let view = match catalog_service.table_view(&canonical_table) {
1053 Ok(view) => view,
1054 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1055 return Ok(RuntimeStatementResult::NoOp);
1056 }
1057 Err(err) => return Err(err),
1058 };
1059 let table_id = view
1060 .table_meta
1061 .as_ref()
1062 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1063 .table_id;
1064
1065 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1066
1067 self.run_autocommit_alter_table(plan)
1068 }
1069 other => Err(Error::InvalidArgumentError(format!(
1070 "Unknown storage namespace '{}'",
1071 other
1072 ))),
1073 }
1074 }
1075
1076 fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1077 if plan.columns.is_empty() {
1078 return Err(Error::InvalidArgumentError(
1079 "CREATE INDEX requires at least one column".into(),
1080 ));
1081 }
1082
1083 for column_plan in &plan.columns {
1084 if !column_plan.ascending || column_plan.nulls_first {
1085 return Err(Error::InvalidArgumentError(
1086 "only ASC indexes with NULLS LAST are supported".into(),
1087 ));
1088 }
1089 }
1090
1091 let (_, canonical_table) = canonical_table_name(&plan.table)?;
1092 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1093
1094 match namespace_id.as_str() {
1095 TEMPORARY_NAMESPACE_ID => {
1096 let temp_namespace = self
1097 .temporary_namespace()
1098 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1099 temp_namespace.create_index(plan)?.convert_pager_type::<P>()
1100 }
1101 PERSISTENT_NAMESPACE_ID => {
1102 if self.has_active_transaction() {
1103 return Err(Error::InvalidArgumentError(
1104 "CREATE INDEX is not supported inside an active transaction".into(),
1105 ));
1106 }
1107
1108 self.run_autocommit_create_index(plan)
1109 }
1110 other => Err(Error::InvalidArgumentError(format!(
1111 "Unknown storage namespace '{}'",
1112 other
1113 ))),
1114 }
1115 }
1116
1117 fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1118 if self.has_active_transaction() {
1119 return Err(Error::InvalidArgumentError(
1120 "DROP INDEX is not supported inside an active transaction".into(),
1121 ));
1122 }
1123
1124 let mut dropped = false;
1125
1126 match self.run_autocommit_drop_index(plan.clone()) {
1127 Ok(Some(_)) => {
1128 dropped = true;
1129 }
1130 Ok(None) => {}
1131 Err(err) => {
1132 if !super::is_index_not_found_error(&err) {
1133 return Err(err);
1134 }
1135 }
1136 }
1137
1138 if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1139 match temp_namespace.drop_index(plan.clone()) {
1140 Ok(Some(_)) => {
1141 dropped = true;
1142 }
1143 Ok(None) => {}
1144 Err(err) => {
1145 if !super::is_index_not_found_error(&err) {
1146 return Err(err);
1147 }
1148 }
1149 }
1150 }
1151
1152 if dropped || plan.if_exists {
1153 Ok(RuntimeStatementResult::NoOp)
1154 } else {
1155 Err(Error::CatalogError(format!(
1156 "Index '{}' does not exist",
1157 plan.name
1158 )))
1159 }
1160 }
1161}