1use std::collections::HashMap;
4use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
5
6use arrow::record_batch::RecordBatch;
7use llkv_result::{Error, Result};
8use llkv_storage::pager::{BoxedPager, MemPager};
9use llkv_table::{
10 ConstraintEnforcementMode, INFORMATION_SCHEMA_TABLE_ID_START, SingleColumnIndexDescriptor,
11 TEMPORARY_TABLE_ID_START, canonical_table_name, validate_alter_table_operation,
12};
13
14use crate::{
15 AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource,
16 CreateViewPlan, DeletePlan, DropIndexPlan, DropTablePlan, DropViewPlan, InsertPlan,
17 InsertSource, PlanColumnSpec, PlanOperation, PlanValue, RenameTablePlan, RuntimeContext,
18 RuntimeStatementResult, RuntimeTransactionContext, SelectExecution, SelectPlan,
19 SelectProjection, TransactionContext, TransactionKind, TransactionResult, TransactionSession,
20 UpdatePlan, information_schema::refresh_information_schema,
21};
22use crate::{
23 INFORMATION_SCHEMA_NAMESPACE_ID, PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace,
24 RuntimeNamespaceId, RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry,
25 TEMPORARY_NAMESPACE_ID, TemporaryRuntimeNamespace,
26};
27use llkv_plan::TruncatePlan;
28
29type StatementResult = RuntimeStatementResult<BoxedPager>;
30type TxnResult = TransactionResult<BoxedPager>;
31type BaseTxnContext = RuntimeTransactionContext<BoxedPager>;
32
33static INFORMATION_SCHEMA_NAMESPACE_POOL: OnceLock<
34 Mutex<HashMap<usize, Weak<TemporaryRuntimeNamespace>>>,
35> = OnceLock::new();
36
37fn information_schema_namespace_pool()
38-> &'static Mutex<HashMap<usize, Weak<TemporaryRuntimeNamespace>>> {
39 INFORMATION_SCHEMA_NAMESPACE_POOL.get_or_init(|| Mutex::new(HashMap::new()))
40}
41
42fn information_schema_read_only_error(operation: &str) -> Error {
43 Error::CatalogError(format!(
44 "information_schema is read-only: {} operations are not supported",
45 operation
46 ))
47}
48
49pub(crate) struct SessionNamespaces {
50 persistent: Arc<PersistentRuntimeNamespace>,
51 temporary: Option<Arc<TemporaryRuntimeNamespace>>,
52 information_schema: Arc<TemporaryRuntimeNamespace>,
53 registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
54}
55
56impl SessionNamespaces {
57 pub(crate) fn new(base_context: Arc<RuntimeContext<BoxedPager>>) -> Self {
58 let mut registry =
59 RuntimeStorageNamespaceRegistry::new(PERSISTENT_NAMESPACE_ID.to_string());
60
61 let information_schema = {
63 let key = Arc::as_ptr(&base_context) as usize;
64 let namespace = {
65 let mut pool = information_schema_namespace_pool()
66 .lock()
67 .expect("information_schema namespace pool poisoned");
68 if let Some(existing) = pool.get(&key).and_then(|weak| weak.upgrade()) {
69 existing
70 } else {
71 let shared_catalog = base_context.table_catalog();
72 let mem_pager = Arc::new(MemPager::default());
73 let boxed_pager = Arc::new(BoxedPager::from_arc(mem_pager));
74 let context = Arc::new(RuntimeContext::new_with_catalog(
75 boxed_pager,
76 Arc::clone(&shared_catalog),
77 ));
78 context
79 .ensure_next_table_id_at_least(INFORMATION_SCHEMA_TABLE_ID_START)
80 .expect("failed to seed information_schema table id counter");
81
82 let namespace = Arc::new(TemporaryRuntimeNamespace::new(
83 INFORMATION_SCHEMA_NAMESPACE_ID.to_string(),
84 context,
85 ));
86 pool.insert(key, Arc::downgrade(&namespace));
87 namespace
88 }
89 };
90 registry.register_namespace(
91 Arc::clone(&namespace),
92 vec![INFORMATION_SCHEMA_NAMESPACE_ID.to_string()],
93 false,
94 );
95 namespace
96 };
97
98 base_context.set_fallback_lookup(information_schema.context());
100
101 let persistent = Arc::new(PersistentRuntimeNamespace::new(
103 PERSISTENT_NAMESPACE_ID.to_string(),
104 Arc::clone(&base_context),
105 ));
106
107 registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
108
109 let information_schema = {
110 let key = Arc::as_ptr(&base_context) as usize;
111 let namespace = {
112 let mut pool = information_schema_namespace_pool()
113 .lock()
114 .expect("information_schema namespace pool poisoned");
115 if let Some(existing) = pool.get(&key).and_then(|weak| weak.upgrade()) {
116 existing
117 } else {
118 let shared_catalog = base_context.table_catalog();
119 let mem_pager = Arc::new(MemPager::default());
120 let boxed_pager = Arc::new(BoxedPager::from_arc(mem_pager));
121 let context = Arc::new(RuntimeContext::new_with_catalog(
122 boxed_pager,
123 Arc::clone(&shared_catalog),
124 ));
125 context
126 .ensure_next_table_id_at_least(INFORMATION_SCHEMA_TABLE_ID_START)
127 .expect("failed to seed information_schema table id counter");
128
129 let namespace = Arc::new(TemporaryRuntimeNamespace::new(
130 INFORMATION_SCHEMA_NAMESPACE_ID.to_string(),
131 context,
132 ));
133 pool.insert(key, Arc::downgrade(&namespace));
134 namespace
135 }
136 };
137 registry.register_namespace(
138 Arc::clone(&namespace),
139 vec![INFORMATION_SCHEMA_NAMESPACE_ID.to_string()],
140 false,
141 );
142 namespace
143 };
144
145 let temporary = {
146 let shared_catalog = base_context.table_catalog();
162 let temp_mem_pager = Arc::new(MemPager::default());
163 let temp_boxed_pager = Arc::new(BoxedPager::from_arc(temp_mem_pager));
164 let temp_context = Arc::new(
165 RuntimeContext::new_with_catalog(temp_boxed_pager, Arc::clone(&shared_catalog))
166 .with_fallback_lookup(Arc::clone(&base_context)),
167 );
168
169 temp_context
170 .ensure_next_table_id_at_least(TEMPORARY_TABLE_ID_START)
171 .expect("failed to seed temporary namespace table id counter");
172
173 let namespace = Arc::new(TemporaryRuntimeNamespace::new(
174 TEMPORARY_NAMESPACE_ID.to_string(),
175 temp_context,
176 ));
177 registry.register_namespace(
178 Arc::clone(&namespace),
179 vec![TEMPORARY_NAMESPACE_ID.to_string()],
180 true,
181 );
182 namespace
183 };
184
185 Self {
186 persistent,
187 temporary: Some(temporary),
188 information_schema,
189 registry: Arc::new(RwLock::new(registry)),
190 }
191 }
192
193 pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace> {
194 Arc::clone(&self.persistent)
195 }
196
197 pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
198 self.temporary.as_ref().map(Arc::clone)
199 }
200
201 pub(crate) fn information_schema(&self) -> Arc<TemporaryRuntimeNamespace> {
202 Arc::clone(&self.information_schema)
203 }
204
205 pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
206 Arc::clone(&self.registry)
207 }
208}
209
210impl Drop for SessionNamespaces {
211 fn drop(&mut self) {
212 if let Some(temp) = &self.temporary {
213 let namespace_id = temp.namespace_id().to_string();
214 let canonical_names = {
215 let mut registry = self.registry.write().expect("namespace registry poisoned");
216 registry.drain_namespace_tables(&namespace_id)
217 };
218 temp.clear_tables(canonical_names);
219 }
220 }
221}
222
223pub struct RuntimeSession {
228 inner: TransactionSession<
230 RuntimeTransactionContext<BoxedPager>,
231 RuntimeTransactionContext<MemPager>,
232 >,
233 namespaces: Arc<SessionNamespaces>,
234 constraint_mode: Arc<RwLock<ConstraintEnforcementMode>>,
235}
236
237impl RuntimeSession {
238 pub(crate) fn from_parts(
239 inner: TransactionSession<
240 RuntimeTransactionContext<BoxedPager>,
241 RuntimeTransactionContext<MemPager>,
242 >,
243 namespaces: Arc<SessionNamespaces>,
244 ) -> Self {
245 let session = Self {
246 inner,
247 namespaces,
248 constraint_mode: Arc::new(RwLock::new(ConstraintEnforcementMode::Immediate)),
249 };
250 session.apply_constraint_mode_to_base(ConstraintEnforcementMode::Immediate);
251 session
252 }
253
254 pub(crate) fn clone_session(&self) -> Self {
257 let session = Self {
258 inner: self.inner.clone_session(),
259 namespaces: self.namespaces.clone(),
260 constraint_mode: Arc::clone(&self.constraint_mode),
261 };
262 let mode = session.constraint_enforcement_mode();
263 session.apply_constraint_mode_to_base(mode);
264 session
265 }
266
267 fn new_temp_tx_context(
268 &self,
269 context: Arc<RuntimeContext<BoxedPager>>,
270 ) -> RuntimeTransactionContext<BoxedPager> {
271 let tx = RuntimeTransactionContext::new(context);
272 tx.set_constraint_mode(self.constraint_enforcement_mode());
273 tx
274 }
275
276 pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
277 self.namespaces.registry()
278 }
279
280 fn apply_constraint_mode_to_base(&self, mode: ConstraintEnforcementMode) {
281 self.inner.context().set_constraint_mode(mode);
282 }
283
284 pub fn set_constraint_enforcement_mode(&self, mode: ConstraintEnforcementMode) {
285 let previous = {
286 let mut guard = self
287 .constraint_mode
288 .write()
289 .expect("constraint mode lock poisoned");
290 if *guard == mode {
291 None
292 } else {
293 let old = *guard;
294 *guard = mode;
295 Some(old)
296 }
297 };
298 if let Some(old) = previous {
299 tracing::warn!(
300 "Session constraint enforcement mode changed from {:?} to {:?}",
301 old,
302 mode
303 );
304 }
305 self.apply_constraint_mode_to_base(mode);
306 }
307
308 pub fn constraint_enforcement_mode(&self) -> ConstraintEnforcementMode {
309 *self
310 .constraint_mode
311 .read()
312 .expect("constraint mode lock poisoned")
313 }
314
315 fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
316 self.namespace_registry()
317 .read()
318 .expect("namespace registry poisoned")
319 .namespace_for_table(canonical)
320 }
321
322 fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
323 if plan.tables.len() != 1 {
324 return None;
325 }
326
327 let qualified = plan.tables[0].qualified_name();
328 let (_, canonical) = canonical_table_name(&qualified).ok()?;
329 Some(self.resolve_namespace_for_table(&canonical))
330 }
331
332 fn select_from_namespace(
333 &self,
334 namespace: Arc<TemporaryRuntimeNamespace>,
335 plan: SelectPlan,
336 ) -> Result<StatementResult> {
337 let table_name = if plan.tables.len() == 1 {
338 plan.tables[0].qualified_name()
339 } else {
340 String::new()
341 };
342
343 let context = namespace.context();
344 let temp_tx_context = self.new_temp_tx_context(context);
345 let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
346 let schema = execution.schema();
347 let batches = execution.collect()?;
348
349 let combined = if batches.is_empty() {
350 RecordBatch::new_empty(Arc::clone(&schema))
351 } else if batches.len() == 1 {
352 batches.into_iter().next().unwrap()
353 } else {
354 let refs: Vec<&RecordBatch> = batches.iter().collect();
355 arrow::compute::concat_batches(&schema, refs)?
356 };
357
358 let execution =
359 SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
360
361 Ok(RuntimeStatementResult::Select {
362 execution: Box::new(execution),
363 table_name,
364 schema,
365 })
366 }
367
368 fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace> {
369 self.namespaces.persistent()
370 }
371
372 #[allow(dead_code)]
373 fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
374 self.namespaces.temporary()
375 }
376
377 fn information_schema_namespace(&self) -> Arc<TemporaryRuntimeNamespace> {
378 self.namespaces.information_schema()
379 }
380
381 fn base_transaction_context(&self) -> Arc<BaseTxnContext> {
382 let ctx = Arc::clone(self.inner.context());
383 ctx.set_constraint_mode(self.constraint_enforcement_mode());
384 ctx
385 }
386
387 fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
388 where
389 F: FnOnce(&BaseTxnContext) -> Result<T>,
390 {
391 let context = self.base_transaction_context();
392 let default_snapshot = context.ctx().default_snapshot();
393 TransactionContext::set_snapshot(&*context, default_snapshot);
394 f(context.as_ref())
395 }
396
397 fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TxnResult> {
398 self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
399 }
400
401 fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TxnResult> {
402 self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
403 }
404
405 fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TxnResult> {
406 self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
407 }
408
409 fn run_autocommit_truncate(&self, plan: TruncatePlan) -> Result<TxnResult> {
410 self.with_autocommit_transaction_context(|ctx| TransactionContext::truncate(ctx, plan))
411 }
412
413 fn run_autocommit_create_table(&self, plan: CreateTablePlan) -> Result<StatementResult> {
414 let result =
415 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
416 match result {
417 TransactionResult::CreateTable { table_name } => {
418 Ok(RuntimeStatementResult::CreateTable { table_name })
419 }
420 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
421 _ => Err(Error::Internal(
422 "unexpected transaction result for CREATE TABLE".into(),
423 )),
424 }
425 }
426
427 fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<StatementResult> {
428 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
429 Ok(RuntimeStatementResult::NoOp)
430 }
431
432 fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
433 self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
434 }
435
436 fn run_autocommit_alter_table(&self, plan: AlterTablePlan) -> Result<StatementResult> {
437 let result =
438 self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
439 match result {
440 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
441 TransactionResult::CreateTable { table_name } => {
442 Ok(RuntimeStatementResult::CreateTable { table_name })
443 }
444 TransactionResult::CreateIndex {
445 table_name,
446 index_name,
447 } => Ok(RuntimeStatementResult::CreateIndex {
448 table_name,
449 index_name,
450 }),
451 _ => Err(Error::Internal(
452 "unexpected transaction result for ALTER TABLE".into(),
453 )),
454 }
455 }
456
457 fn run_autocommit_create_index(&self, plan: CreateIndexPlan) -> Result<StatementResult> {
458 let result =
459 self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
460 match result {
461 TransactionResult::CreateIndex {
462 table_name,
463 index_name,
464 } => Ok(RuntimeStatementResult::CreateIndex {
465 table_name,
466 index_name,
467 }),
468 TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
469 _ => Err(Error::Internal(
470 "unexpected transaction result for CREATE INDEX".into(),
471 )),
472 }
473 }
474
475 fn run_autocommit_drop_index(
476 &self,
477 plan: DropIndexPlan,
478 ) -> Result<Option<SingleColumnIndexDescriptor>> {
479 self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
480 }
481
482 pub fn begin_transaction(&self) -> Result<StatementResult> {
486 let staging_pager = Arc::new(MemPager::default());
487 tracing::trace!(
488 "BEGIN_TRANSACTION: Created staging pager at {:p}",
489 &*staging_pager
490 );
491 let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
492
493 let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
498 staging_wrapper.set_constraint_mode(self.constraint_enforcement_mode());
499
500 self.inner.begin_transaction(staging_wrapper)?;
501 Ok(RuntimeStatementResult::Transaction {
502 kind: TransactionKind::Begin,
503 })
504 }
505
506 pub fn abort_transaction(&self) {
509 self.inner.abort_transaction();
510 }
511
512 pub fn has_active_transaction(&self) -> bool {
514 let result = self.inner.has_active_transaction();
515 tracing::trace!("SESSION: has_active_transaction() = {}", result);
516 result
517 }
518
519 pub fn is_aborted(&self) -> bool {
521 self.inner.is_aborted()
522 }
523
524 pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
526 self.inner.is_table_created_in_transaction(table_name)
527 }
528
529 pub fn table_column_specs_from_transaction(
532 &self,
533 table_name: &str,
534 ) -> Option<Vec<PlanColumnSpec>> {
535 self.inner.table_column_specs_from_transaction(table_name)
536 }
537
538 pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
541 self.inner
542 .tables_referencing_in_transaction(referenced_table)
543 }
544
545 pub fn commit_transaction(&self) -> Result<StatementResult> {
548 tracing::trace!("Session::commit_transaction called");
549 let (tx_result, operations) = self.inner.commit_transaction()?;
550 tracing::trace!(
551 "Session::commit_transaction got {} operations",
552 operations.len()
553 );
554
555 if !operations.is_empty() {
556 let dropped_tables = self
557 .inner
558 .context()
559 .ctx()
560 .dropped_tables
561 .read()
562 .unwrap()
563 .clone();
564 if !dropped_tables.is_empty() {
565 for operation in &operations {
566 let table_name_opt = match operation {
567 PlanOperation::Insert(plan) => Some(plan.table.as_str()),
568 PlanOperation::Update(plan) => Some(plan.table.as_str()),
569 PlanOperation::Delete(plan) => Some(plan.table.as_str()),
570 _ => None,
571 };
572 if let Some(table_name) = table_name_opt {
573 let (_, canonical) = canonical_table_name(table_name)?;
574 if dropped_tables.contains(&canonical) {
575 self.abort_transaction();
576 return Err(Error::TransactionContextError(
577 "another transaction has dropped this table".into(),
578 ));
579 }
580 }
581 }
582 }
583 }
584
585 let kind = match tx_result {
587 TransactionResult::Transaction { kind } => kind,
588 _ => {
589 return Err(Error::Internal(
590 "commit_transaction returned non-transaction result".into(),
591 ));
592 }
593 };
594 tracing::trace!("Session::commit_transaction kind={:?}", kind);
595
596 for operation in operations {
598 match operation {
599 PlanOperation::CreateTable(plan) => {
600 TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
601 }
602 PlanOperation::DropTable(plan) => {
603 TransactionContext::drop_table(&**self.inner.context(), plan)?;
604 }
605 PlanOperation::Insert(plan) => {
606 TransactionContext::insert(&**self.inner.context(), plan)?;
607 }
608 PlanOperation::Update(plan) => {
609 TransactionContext::update(&**self.inner.context(), plan)?;
610 }
611 PlanOperation::Delete(plan) => {
612 TransactionContext::delete(&**self.inner.context(), plan)?;
613 }
614 _ => {}
615 }
616 }
617
618 let base_ctx = self.inner.context();
621 let default_snapshot = base_ctx.ctx().default_snapshot();
622 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
623
624 if matches!(kind, TransactionKind::Commit) {
626 let ctx = base_ctx.ctx();
627 let next_txn_id = ctx.txn_manager().current_next_txn_id();
628 if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
629 tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
630 }
631 }
632
633 Ok(RuntimeStatementResult::Transaction { kind })
635 }
636
637 pub fn rollback_transaction(&self) -> Result<StatementResult> {
639 self.inner.rollback_transaction()?;
640 let base_ctx = self.inner.context();
641 let default_snapshot = base_ctx.ctx().default_snapshot();
642 TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
643 Ok(RuntimeStatementResult::Transaction {
644 kind: TransactionKind::Rollback,
645 })
646 }
647
648 fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
649 if matches!(plan.source, Some(CreateTableSource::Select { .. }))
652 && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
653 {
654 let select_result = self.execute_select_plan(*select_plan)?;
655 let (schema, batches) = match select_result {
656 RuntimeStatementResult::Select {
657 schema, execution, ..
658 } => {
659 let batches = execution.collect()?;
660 (schema, batches)
661 }
662 _ => {
663 return Err(Error::Internal(
664 "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
665 ));
666 }
667 };
668 plan.source = Some(CreateTableSource::Batches { schema, batches });
669 }
670 Ok(plan)
671 }
672
673 fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
674 let InsertPlan {
675 table,
676 columns,
677 source,
678 on_conflict,
679 } = plan;
680
681 match source {
682 InsertSource::Rows(rows) => {
683 let count = rows.len();
684 Ok((
685 InsertPlan {
686 table,
687 columns,
688 source: InsertSource::Rows(rows),
689 on_conflict,
690 },
691 count,
692 ))
693 }
694 InsertSource::Batches(batches) => {
695 let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
696 Ok((
697 InsertPlan {
698 table,
699 columns,
700 source: InsertSource::Batches(batches),
701 on_conflict,
702 },
703 count,
704 ))
705 }
706 InsertSource::Select { plan: select_plan } => {
707 let select_result = self.execute_select_plan(*select_plan)?;
708 let rows = match select_result {
709 RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
710 _ => {
711 return Err(Error::Internal(
712 "expected Select result when executing INSERT ... SELECT".into(),
713 ));
714 }
715 };
716 let count = rows.len();
717 Ok((
718 InsertPlan {
719 table,
720 columns,
721 source: InsertSource::Rows(rows),
722 on_conflict,
723 },
724 count,
725 ))
726 }
727 }
728 }
729
730 pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<StatementResult> {
732 tracing::trace!("Session::insert called for table={}", plan.table);
733 let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
734 let table_name = plan.table.clone();
735 let (_, canonical_table) = canonical_table_name(&plan.table)?;
736 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
737
738 match namespace_id.as_str() {
739 TEMPORARY_NAMESPACE_ID => {
740 let temp_namespace = self
741 .temporary_namespace()
742 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
743 let temp_context = temp_namespace.context();
744 let temp_tx_context = self.new_temp_tx_context(temp_context);
745 match TransactionContext::insert(&temp_tx_context, plan)? {
746 TransactionResult::Insert { .. } => {}
747 _ => {
748 return Err(Error::Internal(
749 "unexpected transaction result for temporary INSERT".into(),
750 ));
751 }
752 }
753 Ok(RuntimeStatementResult::Insert {
754 rows_inserted,
755 table_name,
756 })
757 }
758 INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("INSERT")),
759 PERSISTENT_NAMESPACE_ID => {
760 if self.has_active_transaction() {
761 match self.inner.execute_operation(PlanOperation::Insert(plan)) {
762 Ok(_) => {
763 tracing::trace!("Session::insert succeeded for table={}", table_name);
764 Ok(RuntimeStatementResult::Insert {
765 rows_inserted,
766 table_name,
767 })
768 }
769 Err(e) => {
770 tracing::trace!(
771 "Session::insert failed for table={}, error={:?}",
772 table_name,
773 e
774 );
775 if matches!(e, Error::ConstraintError(_)) {
776 tracing::trace!("Transaction is_aborted=true");
777 self.abort_transaction();
778 }
779 Err(e)
780 }
781 }
782 } else {
783 let result = self.run_autocommit_insert(plan)?;
784 if !matches!(result, TransactionResult::Insert { .. }) {
785 return Err(Error::Internal(
786 "unexpected transaction result for INSERT operation".into(),
787 ));
788 }
789 Ok(RuntimeStatementResult::Insert {
790 rows_inserted,
791 table_name,
792 })
793 }
794 }
795 other => Err(Error::InvalidArgumentError(format!(
796 "Unknown storage namespace '{}'",
797 other
798 ))),
799 }
800 }
801
802 pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<StatementResult> {
804 if let Some(namespace_id) = self.namespace_for_select_plan(&plan) {
805 if namespace_id == TEMPORARY_NAMESPACE_ID {
806 let namespace = self
807 .temporary_namespace()
808 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
809 return self.select_from_namespace(namespace, plan);
810 }
811 if namespace_id == INFORMATION_SCHEMA_NAMESPACE_ID {
812 let namespace = self.information_schema_namespace();
813 return self.select_from_namespace(namespace, plan);
814 }
815 }
816
817 if self.has_active_transaction() {
818 let tx_result = match self
819 .inner
820 .execute_operation(PlanOperation::Select(Box::new(plan.clone())))
821 {
822 Ok(result) => result,
823 Err(e) => {
824 if matches!(e, Error::ConstraintError(_)) {
827 self.abort_transaction();
828 }
829 return Err(e);
830 }
831 };
832 match tx_result {
833 TransactionResult::Select {
834 table_name,
835 schema,
836 execution: staging_execution,
837 } => {
838 let batches = staging_execution.collect().unwrap_or_default();
841 let combined = if batches.is_empty() {
842 RecordBatch::new_empty(Arc::clone(&schema))
843 } else if batches.len() == 1 {
844 batches.into_iter().next().unwrap()
845 } else {
846 let refs: Vec<&RecordBatch> = batches.iter().collect();
847 arrow::compute::concat_batches(&schema, refs)?
848 };
849
850 let execution = SelectExecution::from_batch(
851 table_name.clone(),
852 Arc::clone(&schema),
853 combined,
854 );
855
856 Ok(RuntimeStatementResult::Select {
857 execution: Box::new(execution),
858 table_name,
859 schema,
860 })
861 }
862 _ => Err(Error::Internal("expected Select result".into())),
863 }
864 } else {
865 let table_name = if plan.tables.len() == 1 {
867 plan.tables[0].qualified_name()
868 } else {
869 String::new()
870 };
871 let execution = self.with_autocommit_transaction_context(|ctx| {
872 TransactionContext::execute_select(ctx, plan)
873 })?;
874 let schema = execution.schema();
875 Ok(RuntimeStatementResult::Select {
876 execution: Box::new(execution),
877 table_name,
878 schema,
879 })
880 }
881 }
882
883 pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
885 let plan =
886 SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
887 match self.execute_select_plan(plan)? {
888 RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
889 other => Err(Error::Internal(format!(
890 "expected Select result when reading table '{}', got {:?}",
891 table, other
892 ))),
893 }
894 }
895
896 pub fn refresh_information_schema(&self) -> Result<()> {
903 let persistent = self.persistent_namespace();
904 let info_namespace = self.information_schema_namespace();
905 let registry = self.namespace_registry();
906 refresh_information_schema(
907 &persistent.context(),
908 &info_namespace.context(),
909 ®istry,
910 info_namespace.namespace_id(),
911 )
912 }
913
914 pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<StatementResult> {
915 let (_, canonical_table) = canonical_table_name(&plan.table)?;
916 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
917
918 match namespace_id.as_str() {
919 TEMPORARY_NAMESPACE_ID => {
920 let temp_namespace = self
921 .temporary_namespace()
922 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
923 let temp_context = temp_namespace.context();
924 let table_name = plan.table.clone();
925 let temp_tx_context = self.new_temp_tx_context(temp_context);
926 match TransactionContext::update(&temp_tx_context, plan)? {
927 TransactionResult::Update { rows_updated, .. } => {
928 Ok(RuntimeStatementResult::Update {
929 rows_updated,
930 table_name,
931 })
932 }
933 _ => Err(Error::Internal(
934 "unexpected transaction result for temporary UPDATE".into(),
935 )),
936 }
937 }
938 INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("UPDATE")),
939 PERSISTENT_NAMESPACE_ID => {
940 if self.has_active_transaction() {
941 let table_name = plan.table.clone();
942 let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
943 Ok(result) => result,
944 Err(e) => {
945 self.abort_transaction();
947 return Err(e);
948 }
949 };
950 match result {
951 TransactionResult::Update {
952 rows_matched: _,
953 rows_updated,
954 } => Ok(RuntimeStatementResult::Update {
955 rows_updated,
956 table_name,
957 }),
958 _ => Err(Error::Internal("expected Update result".into())),
959 }
960 } else {
961 let table_name = plan.table.clone();
962 let result = self.run_autocommit_update(plan)?;
963 match result {
964 TransactionResult::Update {
965 rows_matched: _,
966 rows_updated,
967 } => Ok(RuntimeStatementResult::Update {
968 rows_updated,
969 table_name,
970 }),
971 _ => Err(Error::Internal("expected Update result".into())),
972 }
973 }
974 }
975 other => Err(Error::InvalidArgumentError(format!(
976 "Unknown storage namespace '{}'",
977 other
978 ))),
979 }
980 }
981
982 pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<StatementResult> {
983 let (_, canonical_table) = canonical_table_name(&plan.table)?;
984 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
985
986 match namespace_id.as_str() {
987 TEMPORARY_NAMESPACE_ID => {
988 let temp_namespace = self
989 .temporary_namespace()
990 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
991 let temp_context = temp_namespace.context();
992 let table_name = plan.table.clone();
993 let temp_tx_context = self.new_temp_tx_context(temp_context);
994 match TransactionContext::delete(&temp_tx_context, plan)? {
995 TransactionResult::Delete { rows_deleted } => {
996 Ok(RuntimeStatementResult::Delete {
997 rows_deleted,
998 table_name,
999 })
1000 }
1001 _ => Err(Error::Internal(
1002 "unexpected transaction result for temporary DELETE".into(),
1003 )),
1004 }
1005 }
1006 INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("DELETE")),
1007 PERSISTENT_NAMESPACE_ID => {
1008 if self.has_active_transaction() {
1009 let table_name = plan.table.clone();
1010 let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
1011 Ok(result) => result,
1012 Err(e) => {
1013 self.abort_transaction();
1015 return Err(e);
1016 }
1017 };
1018 match result {
1019 TransactionResult::Delete { rows_deleted } => {
1020 Ok(RuntimeStatementResult::Delete {
1021 rows_deleted,
1022 table_name,
1023 })
1024 }
1025 _ => Err(Error::Internal("expected Delete result".into())),
1026 }
1027 } else {
1028 let table_name = plan.table.clone();
1029 let result = self.run_autocommit_delete(plan)?;
1030 match result {
1031 TransactionResult::Delete { rows_deleted } => {
1032 Ok(RuntimeStatementResult::Delete {
1033 rows_deleted,
1034 table_name,
1035 })
1036 }
1037 _ => Err(Error::Internal("expected Delete result".into())),
1038 }
1039 }
1040 }
1041 other => Err(Error::InvalidArgumentError(format!(
1042 "Unknown storage namespace '{}'",
1043 other
1044 ))),
1045 }
1046 }
1047
1048 pub fn execute_truncate_plan(&self, plan: TruncatePlan) -> Result<StatementResult> {
1049 let (_, canonical_table) = canonical_table_name(&plan.table)?;
1050 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1051
1052 match namespace_id.as_str() {
1053 TEMPORARY_NAMESPACE_ID => {
1054 let temp_namespace = self
1055 .temporary_namespace()
1056 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1057 let temp_context = temp_namespace.context();
1058 let table_name = plan.table.clone();
1059 let temp_tx_context = self.new_temp_tx_context(temp_context);
1060 match TransactionContext::truncate(&temp_tx_context, plan)? {
1061 TransactionResult::Delete { rows_deleted } => {
1062 Ok(RuntimeStatementResult::Delete {
1063 rows_deleted,
1064 table_name,
1065 })
1066 }
1067 _ => Err(Error::Internal(
1068 "unexpected transaction result for temporary TRUNCATE".into(),
1069 )),
1070 }
1071 }
1072 INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("TRUNCATE")),
1073 PERSISTENT_NAMESPACE_ID => {
1074 if self.has_active_transaction() {
1075 let table_name = plan.table.clone();
1076 let result = match self.inner.execute_operation(PlanOperation::Truncate(plan)) {
1077 Ok(result) => result,
1078 Err(e) => {
1079 self.abort_transaction();
1081 return Err(e);
1082 }
1083 };
1084 match result {
1085 TransactionResult::Delete { rows_deleted } => {
1086 Ok(RuntimeStatementResult::Delete {
1087 rows_deleted,
1088 table_name,
1089 })
1090 }
1091 _ => Err(Error::Internal("expected Delete result".into())),
1092 }
1093 } else {
1094 let table_name = plan.table.clone();
1095 let result = self.run_autocommit_truncate(plan)?;
1096 match result {
1097 TransactionResult::Delete { rows_deleted } => {
1098 Ok(RuntimeStatementResult::Delete {
1099 rows_deleted,
1100 table_name,
1101 })
1102 }
1103 _ => Err(Error::Internal("expected Delete result".into())),
1104 }
1105 }
1106 }
1107 other => Err(Error::InvalidArgumentError(format!(
1108 "Unknown storage namespace '{}'",
1109 other
1110 ))),
1111 }
1112 }
1113}
1114
1115impl CatalogDdl for RuntimeSession {
1119 type CreateTableOutput = StatementResult;
1120 type DropTableOutput = StatementResult;
1121 type RenameTableOutput = ();
1122 type AlterTableOutput = StatementResult;
1123 type CreateIndexOutput = StatementResult;
1124 type DropIndexOutput = StatementResult;
1125
1126 fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
1127 let target_namespace = plan
1128 .namespace
1129 .clone()
1130 .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
1131 .to_ascii_lowercase();
1132
1133 let plan = self.materialize_ctas_plan(plan)?;
1134
1135 match target_namespace.as_str() {
1136 INFORMATION_SCHEMA_NAMESPACE_ID => {
1137 Err(information_schema_read_only_error("CREATE TABLE"))
1138 }
1139 TEMPORARY_NAMESPACE_ID => {
1140 let temp_namespace = self
1141 .temporary_namespace()
1142 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1143 let (_, canonical) = canonical_table_name(&plan.name)?;
1144 let result = temp_namespace.create_table(plan)?;
1145 if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
1146 let namespace_id = temp_namespace.namespace_id().to_string();
1147 let registry = self.namespace_registry();
1148 registry
1149 .write()
1150 .expect("namespace registry poisoned")
1151 .register_table(&namespace_id, canonical);
1152 }
1153 Ok(result)
1154 }
1155 PERSISTENT_NAMESPACE_ID => {
1156 if self.has_active_transaction() {
1157 match self
1158 .inner
1159 .execute_operation(PlanOperation::CreateTable(plan))
1160 {
1161 Ok(TransactionResult::CreateTable { table_name }) => {
1162 Ok(RuntimeStatementResult::CreateTable { table_name })
1163 }
1164 Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
1165 Ok(_) => Err(Error::Internal(
1166 "expected CreateTable result during transactional CREATE TABLE".into(),
1167 )),
1168 Err(err) => {
1169 self.abort_transaction();
1170 Err(err)
1171 }
1172 }
1173 } else {
1174 if self.inner.has_table_locked_by_other_session(&plan.name) {
1175 return Err(Error::TransactionContextError(format!(
1176 "table '{}' is locked by another active transaction",
1177 plan.name
1178 )));
1179 }
1180 self.run_autocommit_create_table(plan)
1181 }
1182 }
1183 other => Err(Error::InvalidArgumentError(format!(
1184 "Unknown storage namespace '{}'",
1185 other
1186 ))),
1187 }
1188 }
1189
1190 fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
1191 let (_, canonical_table) = canonical_table_name(&plan.name)?;
1192 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1193
1194 match namespace_id.as_str() {
1195 TEMPORARY_NAMESPACE_ID => {
1196 let temp_namespace = self
1197 .temporary_namespace()
1198 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1199 temp_namespace.drop_table(plan)?;
1200 let registry = self.namespace_registry();
1201 registry
1202 .write()
1203 .expect("namespace registry poisoned")
1204 .unregister_table(&canonical_table);
1205 Ok(RuntimeStatementResult::NoOp)
1206 }
1207 INFORMATION_SCHEMA_NAMESPACE_ID => {
1208 Err(information_schema_read_only_error("DROP TABLE"))
1209 }
1210 PERSISTENT_NAMESPACE_ID => {
1211 if self.has_active_transaction() {
1212 let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
1213 if !referencing_tables.is_empty() {
1214 let referencing_table = &referencing_tables[0];
1215 self.abort_transaction();
1216 return Err(Error::CatalogError(format!(
1217 "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
1218 referencing_table
1219 )));
1220 }
1221
1222 match self
1223 .inner
1224 .execute_operation(PlanOperation::DropTable(plan.clone()))
1225 {
1226 Ok(TransactionResult::NoOp) => {
1227 let registry = self.namespace_registry();
1228 registry
1229 .write()
1230 .expect("namespace registry poisoned")
1231 .unregister_table(&canonical_table);
1232 Ok(RuntimeStatementResult::NoOp)
1233 }
1234 Ok(_) => Err(Error::Internal(
1235 "expected NoOp result for DROP TABLE during transactional execution"
1236 .into(),
1237 )),
1238 Err(err) => {
1239 self.abort_transaction();
1240 Err(err)
1241 }
1242 }
1243 } else {
1244 if self.inner.has_table_locked_by_other_session(&plan.name) {
1245 return Err(Error::TransactionContextError(format!(
1246 "table '{}' is locked by another active transaction",
1247 plan.name
1248 )));
1249 }
1250 let result = self.run_autocommit_drop_table(plan)?;
1251 let registry = self.namespace_registry();
1252 registry
1253 .write()
1254 .expect("namespace registry poisoned")
1255 .unregister_table(&canonical_table);
1256 Ok(result)
1257 }
1258 }
1259 other => Err(Error::InvalidArgumentError(format!(
1260 "Unknown storage namespace '{}'",
1261 other
1262 ))),
1263 }
1264 }
1265
1266 fn create_view(&self, plan: CreateViewPlan) -> Result<()> {
1267 let target_namespace = plan
1268 .namespace
1269 .clone()
1270 .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
1271 .to_ascii_lowercase();
1272
1273 match target_namespace.as_str() {
1274 INFORMATION_SCHEMA_NAMESPACE_ID => {
1275 Err(information_schema_read_only_error("CREATE VIEW"))
1276 }
1277 TEMPORARY_NAMESPACE_ID => {
1278 let temp_namespace = self
1279 .temporary_namespace()
1280 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1281 let (_, canonical) = canonical_table_name(&plan.name)?;
1282 temp_namespace.create_view(plan)?;
1283 let namespace_id = temp_namespace.namespace_id().to_string();
1284 let registry = self.namespace_registry();
1285 registry
1286 .write()
1287 .expect("namespace registry poisoned")
1288 .register_table(&namespace_id, canonical);
1289 Ok(())
1290 }
1291 PERSISTENT_NAMESPACE_ID => {
1292 let persistent_namespace = self.persistent_namespace();
1293 persistent_namespace.create_view(plan)
1294 }
1295 other => Err(Error::InvalidArgumentError(format!(
1296 "Unknown storage namespace '{}'",
1297 other
1298 ))),
1299 }
1300 }
1301
1302 fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1303 let (_, canonical_view) = canonical_table_name(&plan.name)?;
1304 let namespace_id = self.resolve_namespace_for_table(&canonical_view);
1305
1306 match namespace_id.as_str() {
1307 TEMPORARY_NAMESPACE_ID => {
1308 let temp_namespace = self
1309 .temporary_namespace()
1310 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1311 temp_namespace.drop_view(plan)?;
1312 let registry = self.namespace_registry();
1313 registry
1314 .write()
1315 .expect("namespace registry poisoned")
1316 .unregister_table(&canonical_view);
1317 Ok(())
1318 }
1319 INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("DROP VIEW")),
1320 PERSISTENT_NAMESPACE_ID => {
1321 let persistent_namespace = self.persistent_namespace();
1322 persistent_namespace.drop_view(plan)
1323 }
1324 other => Err(Error::InvalidArgumentError(format!(
1325 "Unknown storage namespace '{}'",
1326 other
1327 ))),
1328 }
1329 }
1330
1331 fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
1332 if self.has_active_transaction() {
1333 return Err(Error::InvalidArgumentError(
1334 "ALTER TABLE RENAME is not supported inside an active transaction".into(),
1335 ));
1336 }
1337
1338 let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
1339 let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
1340 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1341
1342 match namespace_id.as_str() {
1343 TEMPORARY_NAMESPACE_ID => {
1344 let temp_namespace = self
1345 .temporary_namespace()
1346 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1347 match temp_namespace.rename_table(plan.clone()) {
1348 Ok(()) => {
1349 let namespace_id = temp_namespace.namespace_id().to_string();
1350 let registry = self.namespace_registry();
1351 let mut registry = registry.write().expect("namespace registry poisoned");
1352 registry.unregister_table(&canonical_table);
1353 registry.register_table(&namespace_id, new_canonical);
1354 Ok(())
1355 }
1356 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1357 Err(err) => Err(err),
1358 }
1359 }
1360 INFORMATION_SCHEMA_NAMESPACE_ID => {
1361 Err(information_schema_read_only_error("RENAME TABLE"))
1362 }
1363 PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1364 Ok(()) => Ok(()),
1365 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1366 Err(err) => Err(err),
1367 },
1368 other => Err(Error::InvalidArgumentError(format!(
1369 "Unknown storage namespace '{}'",
1370 other
1371 ))),
1372 }
1373 }
1374
1375 fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1376 let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1377 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1378
1379 match namespace_id.as_str() {
1380 TEMPORARY_NAMESPACE_ID => {
1381 let temp_namespace = self
1382 .temporary_namespace()
1383 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1384
1385 let context = temp_namespace.context();
1386 let catalog_service = &context.catalog_service;
1387 let view = match catalog_service.table_view(&canonical_table) {
1388 Ok(view) => view,
1389 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1390 return Ok(RuntimeStatementResult::NoOp);
1391 }
1392 Err(err) => return Err(err),
1393 };
1394 let table_id = view
1395 .table_meta
1396 .as_ref()
1397 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1398 .table_id;
1399
1400 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1401
1402 Ok(temp_namespace.alter_table(plan)?)
1403 }
1404 INFORMATION_SCHEMA_NAMESPACE_ID => {
1405 Err(information_schema_read_only_error("ALTER TABLE"))
1406 }
1407 PERSISTENT_NAMESPACE_ID => {
1408 let persistent = self.persistent_namespace();
1409 let context = persistent.context();
1410 let catalog_service = &context.catalog_service;
1411 let view = match catalog_service.table_view(&canonical_table) {
1412 Ok(view) => view,
1413 Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1414 return Ok(RuntimeStatementResult::NoOp);
1415 }
1416 Err(err) => return Err(err),
1417 };
1418 let table_id = view
1419 .table_meta
1420 .as_ref()
1421 .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1422 .table_id;
1423
1424 validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1425
1426 self.run_autocommit_alter_table(plan)
1427 }
1428 other => Err(Error::InvalidArgumentError(format!(
1429 "Unknown storage namespace '{}'",
1430 other
1431 ))),
1432 }
1433 }
1434
1435 fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1436 if plan.columns.is_empty() {
1437 return Err(Error::InvalidArgumentError(
1438 "CREATE INDEX requires at least one column".into(),
1439 ));
1440 }
1441
1442 let (_, canonical_table) = canonical_table_name(&plan.table)?;
1443 let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1444
1445 match namespace_id.as_str() {
1446 TEMPORARY_NAMESPACE_ID => {
1447 let temp_namespace = self
1448 .temporary_namespace()
1449 .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1450 Ok(temp_namespace.create_index(plan)?)
1451 }
1452 INFORMATION_SCHEMA_NAMESPACE_ID => {
1453 Err(information_schema_read_only_error("CREATE INDEX"))
1454 }
1455 PERSISTENT_NAMESPACE_ID => {
1456 if self.has_active_transaction() {
1457 return Err(Error::InvalidArgumentError(
1458 "CREATE INDEX is not supported inside an active transaction".into(),
1459 ));
1460 }
1461
1462 self.run_autocommit_create_index(plan)
1463 }
1464 other => Err(Error::InvalidArgumentError(format!(
1465 "Unknown storage namespace '{}'",
1466 other
1467 ))),
1468 }
1469 }
1470
1471 fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1472 if self.has_active_transaction() {
1473 return Err(Error::InvalidArgumentError(
1474 "DROP INDEX is not supported inside an active transaction".into(),
1475 ));
1476 }
1477
1478 let mut dropped = false;
1479
1480 match self.run_autocommit_drop_index(plan.clone()) {
1481 Ok(Some(_)) => {
1482 dropped = true;
1483 }
1484 Ok(None) => {}
1485 Err(err) => {
1486 if !super::is_index_not_found_error(&err) {
1487 return Err(err);
1488 }
1489 }
1490 }
1491
1492 if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1493 match temp_namespace.drop_index(plan.clone()) {
1494 Ok(Some(_)) => {
1495 dropped = true;
1496 }
1497 Ok(None) => {}
1498 Err(err) => {
1499 if !super::is_index_not_found_error(&err) {
1500 return Err(err);
1501 }
1502 }
1503 }
1504 }
1505
1506 if dropped || plan.if_exists {
1507 Ok(RuntimeStatementResult::NoOp)
1508 } else {
1509 Err(Error::CatalogError(format!(
1510 "Index '{}' does not exist",
1511 plan.name
1512 )))
1513 }
1514 }
1515}