1use std::sync::{Arc, RwLock};
4
5use arrow::record_batch::RecordBatch;
6use llkv_result::{Error, Result};
7use llkv_storage::pager::{BoxedPager, MemPager, Pager};
8use llkv_table::{
9 SingleColumnIndexDescriptor, canonical_table_name, validate_alter_table_operation,
10};
11use simd_r_drive_entry_handle::EntryHandle;
12
13use crate::{
14 AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan,
15 DropIndexPlan, DropTablePlan, InsertPlan, InsertSource, PlanColumnSpec, PlanOperation,
16 PlanValue, RenameTablePlan, RuntimeContext, RuntimeStatementResult, RuntimeTransactionContext,
17 SelectExecution, SelectPlan, SelectProjection, TransactionContext, TransactionKind,
18 TransactionResult, TransactionSession, UpdatePlan,
19};
20use crate::{
21 PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
22 RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry, TEMPORARY_NAMESPACE_ID,
23 TemporaryRuntimeNamespace,
24};
25use llkv_plan::TruncatePlan;
26
27pub(crate) struct SessionNamespaces<P>
28where
29 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
30{
31 persistent: Arc<PersistentRuntimeNamespace<P>>,
32 temporary: Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>>,
33 registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
34}
35
36impl<P> SessionNamespaces<P>
37where
38 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
39{
40 pub(crate) fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
41 let persistent = Arc::new(PersistentRuntimeNamespace::new(
42 PERSISTENT_NAMESPACE_ID.to_string(),
43 Arc::clone(&base_context),
44 ));
45
46 let mut registry = RuntimeStorageNamespaceRegistry::new(
47 RuntimeStorageNamespace::namespace_id(persistent.as_ref()).clone(),
48 );
49 registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
50
51 let temporary = {
52 let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
53 let temp_context = Arc::new(RuntimeContext::new(temp_pager));
54 let namespace = Arc::new(TemporaryRuntimeNamespace::new(
55 TEMPORARY_NAMESPACE_ID.to_string(),
56 temp_context,
57 ));
58 registry.register_namespace(
59 Arc::clone(&namespace),
60 vec![TEMPORARY_NAMESPACE_ID.to_string()],
61 true,
62 );
63 namespace
64 };
65
66 Self {
67 persistent,
68 temporary: Some(temporary),
69 registry: Arc::new(RwLock::new(registry)),
70 }
71 }
72
73 pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace<P>> {
74 Arc::clone(&self.persistent)
75 }
76
77 pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
78 self.temporary.as_ref().map(Arc::clone)
79 }
80
81 pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
82 Arc::clone(&self.registry)
83 }
84}
85
86impl<P> Drop for SessionNamespaces<P>
87where
88 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
89{
90 fn drop(&mut self) {
91 if let Some(temp) = &self.temporary {
92 let namespace_id = temp.namespace_id().to_string();
93 let canonical_names = {
94 let mut registry = self.registry.write().expect("namespace registry poisoned");
95 registry.drain_namespace_tables(&namespace_id)
96 };
97 temp.clear_tables(canonical_names);
98 }
99 }
100}
101
102pub struct RuntimeSession<P>
107where
108 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
109{
110 inner: TransactionSession<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
114 namespaces: Arc<SessionNamespaces<P>>,
115}
116
117impl<P> RuntimeSession<P>
118where
119 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
120{
121 pub(crate) fn from_parts(
122 inner: TransactionSession<
123 RuntimeTransactionContext<P>,
124 RuntimeTransactionContext<MemPager>,
125 >,
126 namespaces: Arc<SessionNamespaces<P>>,
127 ) -> Self {
128 Self { inner, namespaces }
129 }
130
131 pub(crate) fn clone_session(&self) -> Self {
134 Self {
135 inner: self.inner.clone_session(),
136 namespaces: self.namespaces.clone(),
137 }
138 }
139
140 pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
141 self.namespaces.registry()
142 }
143
144 fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
145 self.namespace_registry()
146 .read()
147 .expect("namespace registry poisoned")
148 .namespace_for_table(canonical)
149 }
150
151 fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
152 if plan.tables.len() != 1 {
153 return None;
154 }
155
156 let qualified = plan.tables[0].qualified_name();
157 let (_, canonical) = canonical_table_name(&qualified).ok()?;
158 Some(self.resolve_namespace_for_table(&canonical))
159 }
160
161 fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
162 let temp_namespace = self
163 .temporary_namespace()
164 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
165
166 let table_name = if plan.tables.len() == 1 {
167 plan.tables[0].qualified_name()
168 } else {
169 String::new()
170 };
171
172 let temp_context = temp_namespace.context();
173 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
174 let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
175 let schema = execution.schema();
176 let batches = execution.collect()?;
177
178 let combined = if batches.is_empty() {
179 RecordBatch::new_empty(Arc::clone(&schema))
180 } else if batches.len() == 1 {
181 batches.into_iter().next().unwrap()
182 } else {
183 let refs: Vec<&RecordBatch> = batches.iter().collect();
184 arrow::compute::concat_batches(&schema, refs)?
185 };
186
187 let execution =
188 SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
189
190 Ok(RuntimeStatementResult::Select {
191 execution: Box::new(execution),
192 table_name,
193 schema,
194 })
195 }
196
197 fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace<P>> {
198 self.namespaces.persistent()
199 }
200
201 #[allow(dead_code)]
202 fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
203 self.namespaces.temporary()
204 }
205
206 fn base_transaction_context(&self) -> Arc<RuntimeTransactionContext<P>> {
207 Arc::clone(self.inner.context())
208 }
209
210 fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
211 where
212 F: FnOnce(&RuntimeTransactionContext<P>) -> Result<T>,
213 {
214 let context = self.base_transaction_context();
215 let default_snapshot = context.ctx().default_snapshot();
216 TransactionContext::set_snapshot(&*context, default_snapshot);
217 f(context.as_ref())
218 }
219
220 fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TransactionResult<P>> {
221 self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
222 }
223
224 fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TransactionResult<P>> {
225 self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
226 }
227
228 fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TransactionResult<P>> {
229 self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
230 }
231
232 fn run_autocommit_truncate(&self, plan: TruncatePlan) -> Result<TransactionResult<P>> {
233 self.with_autocommit_transaction_context(|ctx| TransactionContext::truncate(ctx, plan))
234 }
235
236 fn run_autocommit_create_table(
237 &self,
238 plan: CreateTablePlan,
239 ) -> Result<RuntimeStatementResult<P>> {
240 let result =
241 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
242 match result {
243 TransactionResult::CreateTable { table_name } => {
244 Ok(RuntimeStatementResult::CreateTable { table_name })
245 }
246 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
247 _ => Err(Error::Internal(
248 "unexpected transaction result for CREATE TABLE".into(),
249 )),
250 }
251 }
252
253 fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<RuntimeStatementResult<P>> {
254 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
255 Ok(RuntimeStatementResult::NoOp)
256 }
257
258 fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
259 self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
260 }
261
262 fn run_autocommit_alter_table(
263 &self,
264 plan: AlterTablePlan,
265 ) -> Result<RuntimeStatementResult<P>> {
266 let result =
267 self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
268 match result {
269 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
270 TransactionResult::CreateTable { table_name } => {
271 Ok(RuntimeStatementResult::CreateTable { table_name })
272 }
273 TransactionResult::CreateIndex {
274 table_name,
275 index_name,
276 } => Ok(RuntimeStatementResult::CreateIndex {
277 table_name,
278 index_name,
279 }),
280 _ => Err(Error::Internal(
281 "unexpected transaction result for ALTER TABLE".into(),
282 )),
283 }
284 }
285
286 fn run_autocommit_create_index(
287 &self,
288 plan: CreateIndexPlan,
289 ) -> Result<RuntimeStatementResult<P>> {
290 let result =
291 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
292 match result {
293 TransactionResult::CreateIndex {
294 table_name,
295 index_name,
296 } => Ok(RuntimeStatementResult::CreateIndex {
297 table_name,
298 index_name,
299 }),
300 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
301 _ => Err(Error::Internal(
302 "unexpected transaction result for CREATE INDEX".into(),
303 )),
304 }
305 }
306
307 fn run_autocommit_drop_index(
308 &self,
309 plan: DropIndexPlan,
310 ) -> Result<Option<SingleColumnIndexDescriptor>> {
311 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
312 }
313
314 pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
318 let staging_pager = Arc::new(MemPager::default());
319 tracing::trace!(
320 "BEGIN_TRANSACTION: Created staging pager at {:p}",
321 &*staging_pager
322 );
323 let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
324
325 let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
330
331 self.inner.begin_transaction(staging_wrapper)?;
332 Ok(RuntimeStatementResult::Transaction {
333 kind: TransactionKind::Begin,
334 })
335 }
336
337 pub fn abort_transaction(&self) {
340 self.inner.abort_transaction();
341 }
342
343 pub fn has_active_transaction(&self) -> bool {
345 let result = self.inner.has_active_transaction();
346 tracing::trace!("SESSION: has_active_transaction() = {}", result);
347 result
348 }
349
350 pub fn is_aborted(&self) -> bool {
352 self.inner.is_aborted()
353 }
354
355 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
357 self.inner.is_table_created_in_transaction(table_name)
358 }
359
360 pub fn table_column_specs_from_transaction(
363 &self,
364 table_name: &str,
365 ) -> Option<Vec<PlanColumnSpec>> {
366 self.inner.table_column_specs_from_transaction(table_name)
367 }
368
369 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
372 self.inner
373 .tables_referencing_in_transaction(referenced_table)
374 }
375
376 pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
379 tracing::trace!("Session::commit_transaction called");
380 let (tx_result, operations) = self.inner.commit_transaction()?;
381 tracing::trace!(
382 "Session::commit_transaction got {} operations",
383 operations.len()
384 );
385
386 if !operations.is_empty() {
387 let dropped_tables = self
388 .inner
389 .context()
390 .ctx()
391 .dropped_tables
392 .read()
393 .unwrap()
394 .clone();
395 if !dropped_tables.is_empty() {
396 for operation in &operations {
397 let table_name_opt = match operation {
398 PlanOperation::Insert(plan) => Some(plan.table.as_str()),
399 PlanOperation::Update(plan) => Some(plan.table.as_str()),
400 PlanOperation::Delete(plan) => Some(plan.table.as_str()),
401 _ => None,
402 };
403 if let Some(table_name) = table_name_opt {
404 let (_, canonical) = canonical_table_name(table_name)?;
405 if dropped_tables.contains(&canonical) {
406 self.abort_transaction();
407 return Err(Error::TransactionContextError(
408 "another transaction has dropped this table".into(),
409 ));
410 }
411 }
412 }
413 }
414 }
415
416 let kind = match tx_result {
418 TransactionResult::Transaction { kind } => kind,
419 _ => {
420 return Err(Error::Internal(
421 "commit_transaction returned non-transaction result".into(),
422 ));
423 }
424 };
425 tracing::trace!("Session::commit_transaction kind={:?}", kind);
426
427 for operation in operations {
429 match operation {
430 PlanOperation::CreateTable(plan) => {
431 TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
432 }
433 PlanOperation::DropTable(plan) => {
434 TransactionContext::drop_table(&**self.inner.context(), plan)?;
435 }
436 PlanOperation::Insert(plan) => {
437 TransactionContext::insert(&**self.inner.context(), plan)?;
438 }
439 PlanOperation::Update(plan) => {
440 TransactionContext::update(&**self.inner.context(), plan)?;
441 }
442 PlanOperation::Delete(plan) => {
443 TransactionContext::delete(&**self.inner.context(), plan)?;
444 }
445 _ => {}
446 }
447 }
448
449 let base_ctx = self.inner.context();
452 let default_snapshot = base_ctx.ctx().default_snapshot();
453 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
454
455 if matches!(kind, TransactionKind::Commit) {
457 let ctx = base_ctx.ctx();
458 let next_txn_id = ctx.txn_manager().current_next_txn_id();
459 if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
460 tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
461 }
462 }
463
464 Ok(RuntimeStatementResult::Transaction { kind })
466 }
467
468 pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
470 self.inner.rollback_transaction()?;
471 let base_ctx = self.inner.context();
472 let default_snapshot = base_ctx.ctx().default_snapshot();
473 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
474 Ok(RuntimeStatementResult::Transaction {
475 kind: TransactionKind::Rollback,
476 })
477 }
478
479 fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
480 if matches!(plan.source, Some(CreateTableSource::Select { .. }))
483 && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
484 {
485 let select_result = self.execute_select_plan(*select_plan)?;
486 let (schema, batches) = match select_result {
487 RuntimeStatementResult::Select {
488 schema, execution, ..
489 } => {
490 let batches = execution.collect()?;
491 (schema, batches)
492 }
493 _ => {
494 return Err(Error::Internal(
495 "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
496 ));
497 }
498 };
499 plan.source = Some(CreateTableSource::Batches { schema, batches });
500 }
501 Ok(plan)
502 }
503
504 fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
505 let InsertPlan {
506 table,
507 columns,
508 source,
509 } = plan;
510
511 match source {
512 InsertSource::Rows(rows) => {
513 let count = rows.len();
514 Ok((
515 InsertPlan {
516 table,
517 columns,
518 source: InsertSource::Rows(rows),
519 },
520 count,
521 ))
522 }
523 InsertSource::Batches(batches) => {
524 let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
525 Ok((
526 InsertPlan {
527 table,
528 columns,
529 source: InsertSource::Batches(batches),
530 },
531 count,
532 ))
533 }
534 InsertSource::Select { plan: select_plan } => {
535 let select_result = self.execute_select_plan(*select_plan)?;
536 let rows = match select_result {
537 RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
538 _ => {
539 return Err(Error::Internal(
540 "expected Select result when executing INSERT ... SELECT".into(),
541 ));
542 }
543 };
544 let count = rows.len();
545 Ok((
546 InsertPlan {
547 table,
548 columns,
549 source: InsertSource::Rows(rows),
550 },
551 count,
552 ))
553 }
554 }
555 }
556
557 pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
559 tracing::trace!("Session::insert called for table={}", plan.table);
560 let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
561 let table_name = plan.table.clone();
562 let (_, canonical_table) = canonical_table_name(&plan.table)?;
563 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
564
565 match namespace_id.as_str() {
566 TEMPORARY_NAMESPACE_ID => {
567 let temp_namespace = self
568 .temporary_namespace()
569 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
570 let temp_context = temp_namespace.context();
571 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
572 match TransactionContext::insert(&temp_tx_context, plan)? {
573 TransactionResult::Insert { .. } => {}
574 _ => {
575 return Err(Error::Internal(
576 "unexpected transaction result for temporary INSERT".into(),
577 ));
578 }
579 }
580 Ok(RuntimeStatementResult::Insert {
581 rows_inserted,
582 table_name,
583 })
584 }
585 PERSISTENT_NAMESPACE_ID => {
586 if self.has_active_transaction() {
587 match self.inner.execute_operation(PlanOperation::Insert(plan)) {
588 Ok(_) => {
589 tracing::trace!("Session::insert succeeded for table={}", table_name);
590 Ok(RuntimeStatementResult::Insert {
591 rows_inserted,
592 table_name,
593 })
594 }
595 Err(e) => {
596 tracing::trace!(
597 "Session::insert failed for table={}, error={:?}",
598 table_name,
599 e
600 );
601 if matches!(e, Error::ConstraintError(_)) {
602 tracing::trace!("Transaction is_aborted=true");
603 self.abort_transaction();
604 }
605 Err(e)
606 }
607 }
608 } else {
609 let result = self.run_autocommit_insert(plan)?;
610 if !matches!(result, TransactionResult::Insert { .. }) {
611 return Err(Error::Internal(
612 "unexpected transaction result for INSERT operation".into(),
613 ));
614 }
615 Ok(RuntimeStatementResult::Insert {
616 rows_inserted,
617 table_name,
618 })
619 }
620 }
621 other => Err(Error::InvalidArgumentError(format!(
622 "Unknown storage namespace '{}'",
623 other
624 ))),
625 }
626 }
627
628 pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
630 if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
631 && namespace_id == TEMPORARY_NAMESPACE_ID
632 {
633 return self.select_from_temporary(plan);
634 }
635
636 if self.has_active_transaction() {
637 let tx_result = match self
638 .inner
639 .execute_operation(PlanOperation::Select(plan.clone()))
640 {
641 Ok(result) => result,
642 Err(e) => {
643 if matches!(e, Error::ConstraintError(_)) {
646 self.abort_transaction();
647 }
648 return Err(e);
649 }
650 };
651 match tx_result {
652 TransactionResult::Select {
653 table_name,
654 schema,
655 execution: staging_execution,
656 } => {
657 let batches = staging_execution.collect().unwrap_or_default();
660 let combined = if batches.is_empty() {
661 RecordBatch::new_empty(Arc::clone(&schema))
662 } else if batches.len() == 1 {
663 batches.into_iter().next().unwrap()
664 } else {
665 let refs: Vec<&RecordBatch> = batches.iter().collect();
666 arrow::compute::concat_batches(&schema, refs)?
667 };
668
669 let execution = SelectExecution::from_batch(
670 table_name.clone(),
671 Arc::clone(&schema),
672 combined,
673 );
674
675 Ok(RuntimeStatementResult::Select {
676 execution: Box::new(execution),
677 table_name,
678 schema,
679 })
680 }
681 _ => Err(Error::Internal("expected Select result".into())),
682 }
683 } else {
684 let table_name = if plan.tables.len() == 1 {
686 plan.tables[0].qualified_name()
687 } else {
688 String::new()
689 };
690 let execution = self.with_autocommit_transaction_context(|ctx| {
691 TransactionContext::execute_select(ctx, plan)
692 })?;
693 let schema = execution.schema();
694 Ok(RuntimeStatementResult::Select {
695 execution: Box::new(execution),
696 table_name,
697 schema,
698 })
699 }
700 }
701
702 pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
704 let plan =
705 SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
706 match self.execute_select_plan(plan)? {
707 RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
708 other => Err(Error::Internal(format!(
709 "expected Select result when reading table '{}', got {:?}",
710 table, other
711 ))),
712 }
713 }
714
715 pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
716 let (_, canonical_table) = canonical_table_name(&plan.table)?;
717 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
718
719 match namespace_id.as_str() {
720 TEMPORARY_NAMESPACE_ID => {
721 let temp_namespace = self
722 .temporary_namespace()
723 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
724 let temp_context = temp_namespace.context();
725 let table_name = plan.table.clone();
726 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
727 match TransactionContext::update(&temp_tx_context, plan)? {
728 TransactionResult::Update { rows_updated, .. } => {
729 Ok(RuntimeStatementResult::Update {
730 rows_updated,
731 table_name,
732 })
733 }
734 _ => Err(Error::Internal(
735 "unexpected transaction result for temporary UPDATE".into(),
736 )),
737 }
738 }
739 PERSISTENT_NAMESPACE_ID => {
740 if self.has_active_transaction() {
741 let table_name = plan.table.clone();
742 let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
743 Ok(result) => result,
744 Err(e) => {
745 self.abort_transaction();
747 return Err(e);
748 }
749 };
750 match result {
751 TransactionResult::Update {
752 rows_matched: _,
753 rows_updated,
754 } => Ok(RuntimeStatementResult::Update {
755 rows_updated,
756 table_name,
757 }),
758 _ => Err(Error::Internal("expected Update result".into())),
759 }
760 } else {
761 let table_name = plan.table.clone();
762 let result = self.run_autocommit_update(plan)?;
763 match result {
764 TransactionResult::Update {
765 rows_matched: _,
766 rows_updated,
767 } => Ok(RuntimeStatementResult::Update {
768 rows_updated,
769 table_name,
770 }),
771 _ => Err(Error::Internal("expected Update result".into())),
772 }
773 }
774 }
775 other => Err(Error::InvalidArgumentError(format!(
776 "Unknown storage namespace '{}'",
777 other
778 ))),
779 }
780 }
781
782 pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
783 let (_, canonical_table) = canonical_table_name(&plan.table)?;
784 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
785
786 match namespace_id.as_str() {
787 TEMPORARY_NAMESPACE_ID => {
788 let temp_namespace = self
789 .temporary_namespace()
790 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
791 let temp_context = temp_namespace.context();
792 let table_name = plan.table.clone();
793 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
794 match TransactionContext::delete(&temp_tx_context, plan)? {
795 TransactionResult::Delete { rows_deleted } => {
796 Ok(RuntimeStatementResult::Delete {
797 rows_deleted,
798 table_name,
799 })
800 }
801 _ => Err(Error::Internal(
802 "unexpected transaction result for temporary DELETE".into(),
803 )),
804 }
805 }
806 PERSISTENT_NAMESPACE_ID => {
807 if self.has_active_transaction() {
808 let table_name = plan.table.clone();
809 let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
810 Ok(result) => result,
811 Err(e) => {
812 self.abort_transaction();
814 return Err(e);
815 }
816 };
817 match result {
818 TransactionResult::Delete { rows_deleted } => {
819 Ok(RuntimeStatementResult::Delete {
820 rows_deleted,
821 table_name,
822 })
823 }
824 _ => Err(Error::Internal("expected Delete result".into())),
825 }
826 } else {
827 let table_name = plan.table.clone();
828 let result = self.run_autocommit_delete(plan)?;
829 match result {
830 TransactionResult::Delete { rows_deleted } => {
831 Ok(RuntimeStatementResult::Delete {
832 rows_deleted,
833 table_name,
834 })
835 }
836 _ => Err(Error::Internal("expected Delete result".into())),
837 }
838 }
839 }
840 other => Err(Error::InvalidArgumentError(format!(
841 "Unknown storage namespace '{}'",
842 other
843 ))),
844 }
845 }
846
847 pub fn execute_truncate_plan(&self, plan: TruncatePlan) -> Result<RuntimeStatementResult<P>> {
848 let (_, canonical_table) = canonical_table_name(&plan.table)?;
849 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
850
851 match namespace_id.as_str() {
852 TEMPORARY_NAMESPACE_ID => {
853 let temp_namespace = self
854 .temporary_namespace()
855 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
856 let temp_context = temp_namespace.context();
857 let table_name = plan.table.clone();
858 let temp_tx_context = RuntimeTransactionContext::new(temp_context);
859 match TransactionContext::truncate(&temp_tx_context, plan)? {
860 TransactionResult::Delete { rows_deleted } => {
861 Ok(RuntimeStatementResult::Delete {
862 rows_deleted,
863 table_name,
864 })
865 }
866 _ => Err(Error::Internal(
867 "unexpected transaction result for temporary TRUNCATE".into(),
868 )),
869 }
870 }
871 PERSISTENT_NAMESPACE_ID => {
872 if self.has_active_transaction() {
873 let table_name = plan.table.clone();
874 let result = match self.inner.execute_operation(PlanOperation::Truncate(plan)) {
875 Ok(result) => result,
876 Err(e) => {
877 self.abort_transaction();
879 return Err(e);
880 }
881 };
882 match result {
883 TransactionResult::Delete { rows_deleted } => {
884 Ok(RuntimeStatementResult::Delete {
885 rows_deleted,
886 table_name,
887 })
888 }
889 _ => Err(Error::Internal("expected Delete result".into())),
890 }
891 } else {
892 let table_name = plan.table.clone();
893 let result = self.run_autocommit_truncate(plan)?;
894 match result {
895 TransactionResult::Delete { rows_deleted } => {
896 Ok(RuntimeStatementResult::Delete {
897 rows_deleted,
898 table_name,
899 })
900 }
901 _ => Err(Error::Internal("expected Delete result".into())),
902 }
903 }
904 }
905 other => Err(Error::InvalidArgumentError(format!(
906 "Unknown storage namespace '{}'",
907 other
908 ))),
909 }
910 }
911}
912
913impl<P> CatalogDdl for RuntimeSession<P>
917where
918 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
919{
920 type CreateTableOutput = RuntimeStatementResult<P>;
921 type DropTableOutput = RuntimeStatementResult<P>;
922 type RenameTableOutput = ();
923 type AlterTableOutput = RuntimeStatementResult<P>;
924 type CreateIndexOutput = RuntimeStatementResult<P>;
925 type DropIndexOutput = RuntimeStatementResult<P>;
926
927 fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
928 let target_namespace = plan
929 .namespace
930 .clone()
931 .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
932 .to_ascii_lowercase();
933
934 let plan = self.materialize_ctas_plan(plan)?;
935
936 match target_namespace.as_str() {
937 TEMPORARY_NAMESPACE_ID => {
938 let temp_namespace = self
939 .temporary_namespace()
940 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
941 let (_, canonical) = canonical_table_name(&plan.name)?;
942 let result = temp_namespace.create_table(plan)?;
943 if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
944 let namespace_id = temp_namespace.namespace_id().to_string();
945 let registry = self.namespace_registry();
946 registry
947 .write()
948 .expect("namespace registry poisoned")
949 .register_table(&namespace_id, canonical);
950 }
951 result.convert_pager_type::<P>()
952 }
953 PERSISTENT_NAMESPACE_ID => {
954 if self.has_active_transaction() {
955 match self
956 .inner
957 .execute_operation(PlanOperation::CreateTable(plan))
958 {
959 Ok(TransactionResult::CreateTable { table_name }) => {
960 Ok(RuntimeStatementResult::CreateTable { table_name })
961 }
962 Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
963 Ok(_) => Err(Error::Internal(
964 "expected CreateTable result during transactional CREATE TABLE".into(),
965 )),
966 Err(err) => {
967 self.abort_transaction();
968 Err(err)
969 }
970 }
971 } else {
972 if self.inner.has_table_locked_by_other_session(&plan.name) {
973 return Err(Error::TransactionContextError(format!(
974 "table '{}' is locked by another active transaction",
975 plan.name
976 )));
977 }
978 self.run_autocommit_create_table(plan)
979 }
980 }
981 other => Err(Error::InvalidArgumentError(format!(
982 "Unknown storage namespace '{}'",
983 other
984 ))),
985 }
986 }
987
988 fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
989 let (_, canonical_table) = canonical_table_name(&plan.name)?;
990 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
991
992 match namespace_id.as_str() {
993 TEMPORARY_NAMESPACE_ID => {
994 let temp_namespace = self
995 .temporary_namespace()
996 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
997 temp_namespace.drop_table(plan)?;
998 let registry = self.namespace_registry();
999 registry
1000 .write()
1001 .expect("namespace registry poisoned")
1002 .unregister_table(&canonical_table);
1003 Ok(RuntimeStatementResult::NoOp)
1004 }
1005 PERSISTENT_NAMESPACE_ID => {
1006 if self.has_active_transaction() {
1007 let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
1008 if !referencing_tables.is_empty() {
1009 let referencing_table = &referencing_tables[0];
1010 self.abort_transaction();
1011 return Err(Error::CatalogError(format!(
1012 "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
1013 referencing_table
1014 )));
1015 }
1016
1017 match self
1018 .inner
1019 .execute_operation(PlanOperation::DropTable(plan.clone()))
1020 {
1021 Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
1022 Ok(_) => Err(Error::Internal(
1023 "expected NoOp result for DROP TABLE during transactional execution"
1024 .into(),
1025 )),
1026 Err(err) => {
1027 self.abort_transaction();
1028 Err(err)
1029 }
1030 }
1031 } else {
1032 if self.inner.has_table_locked_by_other_session(&plan.name) {
1033 return Err(Error::TransactionContextError(format!(
1034 "table '{}' is locked by another active transaction",
1035 plan.name
1036 )));
1037 }
1038 self.run_autocommit_drop_table(plan)
1039 }
1040 }
1041 other => Err(Error::InvalidArgumentError(format!(
1042 "Unknown storage namespace '{}'",
1043 other
1044 ))),
1045 }
1046 }
1047
1048 fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
1049 if self.has_active_transaction() {
1050 return Err(Error::InvalidArgumentError(
1051 "ALTER TABLE RENAME is not supported inside an active transaction".into(),
1052 ));
1053 }
1054
1055 let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
1056 let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
1057 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1058
1059 match namespace_id.as_str() {
1060 TEMPORARY_NAMESPACE_ID => {
1061 let temp_namespace = self
1062 .temporary_namespace()
1063 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1064 match temp_namespace.rename_table(plan.clone()) {
1065 Ok(()) => {
1066 let namespace_id = temp_namespace.namespace_id().to_string();
1067 let registry = self.namespace_registry();
1068 let mut registry = registry.write().expect("namespace registry poisoned");
1069 registry.unregister_table(&canonical_table);
1070 registry.register_table(&namespace_id, new_canonical);
1071 Ok(())
1072 }
1073 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1074 Err(err) => Err(err),
1075 }
1076 }
1077 PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1078 Ok(()) => Ok(()),
1079 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1080 Err(err) => Err(err),
1081 },
1082 other => Err(Error::InvalidArgumentError(format!(
1083 "Unknown storage namespace '{}'",
1084 other
1085 ))),
1086 }
1087 }
1088
1089 fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1090 let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1091 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1092
1093 match namespace_id.as_str() {
1094 TEMPORARY_NAMESPACE_ID => {
1095 let temp_namespace = self
1096 .temporary_namespace()
1097 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1098
1099 let context = temp_namespace.context();
1100 let catalog_service = &context.catalog_service;
1101 let view = match catalog_service.table_view(&canonical_table) {
1102 Ok(view) => view,
1103 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1104 return Ok(RuntimeStatementResult::NoOp);
1105 }
1106 Err(err) => return Err(err),
1107 };
1108 let table_id = view
1109 .table_meta
1110 .as_ref()
1111 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1112 .table_id;
1113
1114 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1115
1116 temp_namespace.alter_table(plan)?.convert_pager_type::<P>()
1117 }
1118 PERSISTENT_NAMESPACE_ID => {
1119 let persistent = self.persistent_namespace();
1120 let context = persistent.context();
1121 let catalog_service = &context.catalog_service;
1122 let view = match catalog_service.table_view(&canonical_table) {
1123 Ok(view) => view,
1124 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1125 return Ok(RuntimeStatementResult::NoOp);
1126 }
1127 Err(err) => return Err(err),
1128 };
1129 let table_id = view
1130 .table_meta
1131 .as_ref()
1132 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1133 .table_id;
1134
1135 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1136
1137 self.run_autocommit_alter_table(plan)
1138 }
1139 other => Err(Error::InvalidArgumentError(format!(
1140 "Unknown storage namespace '{}'",
1141 other
1142 ))),
1143 }
1144 }
1145
1146 fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1147 if plan.columns.is_empty() {
1148 return Err(Error::InvalidArgumentError(
1149 "CREATE INDEX requires at least one column".into(),
1150 ));
1151 }
1152
1153 let (_, canonical_table) = canonical_table_name(&plan.table)?;
1154 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1155
1156 match namespace_id.as_str() {
1157 TEMPORARY_NAMESPACE_ID => {
1158 let temp_namespace = self
1159 .temporary_namespace()
1160 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1161 temp_namespace.create_index(plan)?.convert_pager_type::<P>()
1162 }
1163 PERSISTENT_NAMESPACE_ID => {
1164 if self.has_active_transaction() {
1165 return Err(Error::InvalidArgumentError(
1166 "CREATE INDEX is not supported inside an active transaction".into(),
1167 ));
1168 }
1169
1170 self.run_autocommit_create_index(plan)
1171 }
1172 other => Err(Error::InvalidArgumentError(format!(
1173 "Unknown storage namespace '{}'",
1174 other
1175 ))),
1176 }
1177 }
1178
1179 fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1180 if self.has_active_transaction() {
1181 return Err(Error::InvalidArgumentError(
1182 "DROP INDEX is not supported inside an active transaction".into(),
1183 ));
1184 }
1185
1186 let mut dropped = false;
1187
1188 match self.run_autocommit_drop_index(plan.clone()) {
1189 Ok(Some(_)) => {
1190 dropped = true;
1191 }
1192 Ok(None) => {}
1193 Err(err) => {
1194 if !super::is_index_not_found_error(&err) {
1195 return Err(err);
1196 }
1197 }
1198 }
1199
1200 if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1201 match temp_namespace.drop_index(plan.clone()) {
1202 Ok(Some(_)) => {
1203 dropped = true;
1204 }
1205 Ok(None) => {}
1206 Err(err) => {
1207 if !super::is_index_not_found_error(&err) {
1208 return Err(err);
1209 }
1210 }
1211 }
1212 }
1213
1214 if dropped || plan.if_exists {
1215 Ok(RuntimeStatementResult::NoOp)
1216 } else {
1217 Err(Error::CatalogError(format!(
1218 "Index '{}' does not exist",
1219 plan.name
1220 )))
1221 }
1222 }
1223}