llkv_runtime/
runtime_session.rs

1// TODO: Implement a common trait (similar to CatalogDdl) for runtime sessions and llkv-transaction sessions
2
3use std::collections::HashMap;
4use std::sync::{Arc, Mutex, OnceLock, RwLock, Weak};
5
6use arrow::record_batch::RecordBatch;
7use llkv_result::{Error, Result};
8use llkv_storage::pager::{BoxedPager, MemPager};
9use llkv_table::{
10    ConstraintEnforcementMode, INFORMATION_SCHEMA_TABLE_ID_START, SingleColumnIndexDescriptor,
11    TEMPORARY_TABLE_ID_START, canonical_table_name, validate_alter_table_operation,
12};
13
14use crate::{
15    AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource,
16    CreateViewPlan, DeletePlan, DropIndexPlan, DropTablePlan, DropViewPlan, InsertPlan,
17    InsertSource, PlanColumnSpec, PlanOperation, PlanValue, RenameTablePlan, RuntimeContext,
18    RuntimeStatementResult, RuntimeTransactionContext, SelectExecution, SelectPlan,
19    SelectProjection, TransactionContext, TransactionKind, TransactionResult, TransactionSession,
20    UpdatePlan, information_schema::refresh_information_schema,
21};
22use crate::{
23    INFORMATION_SCHEMA_NAMESPACE_ID, PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace,
24    RuntimeNamespaceId, RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry,
25    TEMPORARY_NAMESPACE_ID, TemporaryRuntimeNamespace,
26};
27use llkv_plan::TruncatePlan;
28
29type StatementResult = RuntimeStatementResult<BoxedPager>;
30type TxnResult = TransactionResult<BoxedPager>;
31type BaseTxnContext = RuntimeTransactionContext<BoxedPager>;
32
33static INFORMATION_SCHEMA_NAMESPACE_POOL: OnceLock<
34    Mutex<HashMap<usize, Weak<TemporaryRuntimeNamespace>>>,
35> = OnceLock::new();
36
37fn information_schema_namespace_pool()
38-> &'static Mutex<HashMap<usize, Weak<TemporaryRuntimeNamespace>>> {
39    INFORMATION_SCHEMA_NAMESPACE_POOL.get_or_init(|| Mutex::new(HashMap::new()))
40}
41
42fn information_schema_read_only_error(operation: &str) -> Error {
43    Error::CatalogError(format!(
44        "information_schema is read-only: {} operations are not supported",
45        operation
46    ))
47}
48
49pub(crate) struct SessionNamespaces {
50    persistent: Arc<PersistentRuntimeNamespace>,
51    temporary: Option<Arc<TemporaryRuntimeNamespace>>,
52    information_schema: Arc<TemporaryRuntimeNamespace>,
53    registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
54}
55
56impl SessionNamespaces {
57    pub(crate) fn new(base_context: Arc<RuntimeContext<BoxedPager>>) -> Self {
58        let mut registry =
59            RuntimeStorageNamespaceRegistry::new(PERSISTENT_NAMESPACE_ID.to_string());
60
61        // Create information_schema namespace FIRST so we can set it as fallback for persistent
62        let information_schema = {
63            let key = Arc::as_ptr(&base_context) as usize;
64            let namespace = {
65                let mut pool = information_schema_namespace_pool()
66                    .lock()
67                    .expect("information_schema namespace pool poisoned");
68                if let Some(existing) = pool.get(&key).and_then(|weak| weak.upgrade()) {
69                    existing
70                } else {
71                    let shared_catalog = base_context.table_catalog();
72                    let mem_pager = Arc::new(MemPager::default());
73                    let boxed_pager = Arc::new(BoxedPager::from_arc(mem_pager));
74                    let context = Arc::new(RuntimeContext::new_with_catalog(
75                        boxed_pager,
76                        Arc::clone(&shared_catalog),
77                    ));
78                    context
79                        .ensure_next_table_id_at_least(INFORMATION_SCHEMA_TABLE_ID_START)
80                        .expect("failed to seed information_schema table id counter");
81
82                    let namespace = Arc::new(TemporaryRuntimeNamespace::new(
83                        INFORMATION_SCHEMA_NAMESPACE_ID.to_string(),
84                        context,
85                    ));
86                    pool.insert(key, Arc::downgrade(&namespace));
87                    namespace
88                }
89            };
90            registry.register_namespace(
91                Arc::clone(&namespace),
92                vec![INFORMATION_SCHEMA_NAMESPACE_ID.to_string()],
93                false,
94            );
95            namespace
96        };
97
98        // Set information_schema as fallback for base_context so persistent namespace can resolve it
99        base_context.set_fallback_lookup(information_schema.context());
100
101        // Create persistent namespace using the base_context (now with fallback configured)
102        let persistent = Arc::new(PersistentRuntimeNamespace::new(
103            PERSISTENT_NAMESPACE_ID.to_string(),
104            Arc::clone(&base_context),
105        ));
106
107        registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
108
109        let information_schema = {
110            let key = Arc::as_ptr(&base_context) as usize;
111            let namespace = {
112                let mut pool = information_schema_namespace_pool()
113                    .lock()
114                    .expect("information_schema namespace pool poisoned");
115                if let Some(existing) = pool.get(&key).and_then(|weak| weak.upgrade()) {
116                    existing
117                } else {
118                    let shared_catalog = base_context.table_catalog();
119                    let mem_pager = Arc::new(MemPager::default());
120                    let boxed_pager = Arc::new(BoxedPager::from_arc(mem_pager));
121                    let context = Arc::new(RuntimeContext::new_with_catalog(
122                        boxed_pager,
123                        Arc::clone(&shared_catalog),
124                    ));
125                    context
126                        .ensure_next_table_id_at_least(INFORMATION_SCHEMA_TABLE_ID_START)
127                        .expect("failed to seed information_schema table id counter");
128
129                    let namespace = Arc::new(TemporaryRuntimeNamespace::new(
130                        INFORMATION_SCHEMA_NAMESPACE_ID.to_string(),
131                        context,
132                    ));
133                    pool.insert(key, Arc::downgrade(&namespace));
134                    namespace
135                }
136            };
137            registry.register_namespace(
138                Arc::clone(&namespace),
139                vec![INFORMATION_SCHEMA_NAMESPACE_ID.to_string()],
140                false,
141            );
142            namespace
143        };
144
145        let temporary = {
146            // ARCHITECTURAL DECISION: Multi-pager arena via fallback lookup
147            //
148            // Temporary tables use an isolated MemPager-backed ColumnStore while sharing the
149            // persistent namespace's catalog. When a temporary object references persistent
150            // data (e.g., CREATE TEMP VIEW ... FROM main.t1), the temporary context forwards
151            // lookups to the persistent context via fallback. This keeps temporary storage
152            // purely in-memory while preserving cross-namespace visibility.
153            //
154            // Implementation steps:
155            // 1. Wrap a fresh MemPager in BoxedPager so it uses the same runtime pager type
156            //    as the persistent context (BoxedPager).
157            // 2. Reuse the persistent catalog handle so both namespaces observe identical
158            //    table metadata.
159            // 3. Install the persistent context as the fallback lookup target so cache misses
160            //    in the temporary namespace transparently resolve to persistent tables.
161            let shared_catalog = base_context.table_catalog();
162            let temp_mem_pager = Arc::new(MemPager::default());
163            let temp_boxed_pager = Arc::new(BoxedPager::from_arc(temp_mem_pager));
164            let temp_context = Arc::new(
165                RuntimeContext::new_with_catalog(temp_boxed_pager, Arc::clone(&shared_catalog))
166                    .with_fallback_lookup(Arc::clone(&base_context)),
167            );
168
169            temp_context
170                .ensure_next_table_id_at_least(TEMPORARY_TABLE_ID_START)
171                .expect("failed to seed temporary namespace table id counter");
172
173            let namespace = Arc::new(TemporaryRuntimeNamespace::new(
174                TEMPORARY_NAMESPACE_ID.to_string(),
175                temp_context,
176            ));
177            registry.register_namespace(
178                Arc::clone(&namespace),
179                vec![TEMPORARY_NAMESPACE_ID.to_string()],
180                true,
181            );
182            namespace
183        };
184
185        Self {
186            persistent,
187            temporary: Some(temporary),
188            information_schema,
189            registry: Arc::new(RwLock::new(registry)),
190        }
191    }
192
193    pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace> {
194        Arc::clone(&self.persistent)
195    }
196
197    pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
198        self.temporary.as_ref().map(Arc::clone)
199    }
200
201    pub(crate) fn information_schema(&self) -> Arc<TemporaryRuntimeNamespace> {
202        Arc::clone(&self.information_schema)
203    }
204
205    pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
206        Arc::clone(&self.registry)
207    }
208}
209
210impl Drop for SessionNamespaces {
211    fn drop(&mut self) {
212        if let Some(temp) = &self.temporary {
213            let namespace_id = temp.namespace_id().to_string();
214            let canonical_names = {
215                let mut registry = self.registry.write().expect("namespace registry poisoned");
216                registry.drain_namespace_tables(&namespace_id)
217            };
218            temp.clear_tables(canonical_names);
219        }
220    }
221}
222
223/// A session for executing operations with optional transaction support.
224///
225/// This is a high-level wrapper around the transaction machinery that provides
226/// a clean API for users. Operations can be executed directly or within a transaction.
227pub struct RuntimeSession {
228    // Transaction session using BoxedPager for base storage and MemPager for staging tables
229    inner: TransactionSession<
230        RuntimeTransactionContext<BoxedPager>,
231        RuntimeTransactionContext<MemPager>,
232    >,
233    namespaces: Arc<SessionNamespaces>,
234    constraint_mode: Arc<RwLock<ConstraintEnforcementMode>>,
235}
236
237impl RuntimeSession {
238    pub(crate) fn from_parts(
239        inner: TransactionSession<
240            RuntimeTransactionContext<BoxedPager>,
241            RuntimeTransactionContext<MemPager>,
242        >,
243        namespaces: Arc<SessionNamespaces>,
244    ) -> Self {
245        let session = Self {
246            inner,
247            namespaces,
248            constraint_mode: Arc::new(RwLock::new(ConstraintEnforcementMode::Immediate)),
249        };
250        session.apply_constraint_mode_to_base(ConstraintEnforcementMode::Immediate);
251        session
252    }
253
254    /// Clone this session (reuses the same underlying TransactionSession).
255    /// This is necessary to maintain transaction state across Engine clones.
256    pub(crate) fn clone_session(&self) -> Self {
257        let session = Self {
258            inner: self.inner.clone_session(),
259            namespaces: self.namespaces.clone(),
260            constraint_mode: Arc::clone(&self.constraint_mode),
261        };
262        let mode = session.constraint_enforcement_mode();
263        session.apply_constraint_mode_to_base(mode);
264        session
265    }
266
267    fn new_temp_tx_context(
268        &self,
269        context: Arc<RuntimeContext<BoxedPager>>,
270    ) -> RuntimeTransactionContext<BoxedPager> {
271        let tx = RuntimeTransactionContext::new(context);
272        tx.set_constraint_mode(self.constraint_enforcement_mode());
273        tx
274    }
275
276    pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
277        self.namespaces.registry()
278    }
279
280    fn apply_constraint_mode_to_base(&self, mode: ConstraintEnforcementMode) {
281        self.inner.context().set_constraint_mode(mode);
282    }
283
284    pub fn set_constraint_enforcement_mode(&self, mode: ConstraintEnforcementMode) {
285        let previous = {
286            let mut guard = self
287                .constraint_mode
288                .write()
289                .expect("constraint mode lock poisoned");
290            if *guard == mode {
291                None
292            } else {
293                let old = *guard;
294                *guard = mode;
295                Some(old)
296            }
297        };
298        if let Some(old) = previous {
299            tracing::warn!(
300                "Session constraint enforcement mode changed from {:?} to {:?}",
301                old,
302                mode
303            );
304        }
305        self.apply_constraint_mode_to_base(mode);
306    }
307
308    pub fn constraint_enforcement_mode(&self) -> ConstraintEnforcementMode {
309        *self
310            .constraint_mode
311            .read()
312            .expect("constraint mode lock poisoned")
313    }
314
315    fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
316        self.namespace_registry()
317            .read()
318            .expect("namespace registry poisoned")
319            .namespace_for_table(canonical)
320    }
321
322    fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
323        if plan.tables.len() != 1 {
324            return None;
325        }
326
327        let qualified = plan.tables[0].qualified_name();
328        let (_, canonical) = canonical_table_name(&qualified).ok()?;
329        Some(self.resolve_namespace_for_table(&canonical))
330    }
331
332    fn select_from_namespace(
333        &self,
334        namespace: Arc<TemporaryRuntimeNamespace>,
335        plan: SelectPlan,
336    ) -> Result<StatementResult> {
337        let table_name = if plan.tables.len() == 1 {
338            plan.tables[0].qualified_name()
339        } else {
340            String::new()
341        };
342
343        let context = namespace.context();
344        let temp_tx_context = self.new_temp_tx_context(context);
345        let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
346        let schema = execution.schema();
347        let batches = execution.collect()?;
348
349        let combined = if batches.is_empty() {
350            RecordBatch::new_empty(Arc::clone(&schema))
351        } else if batches.len() == 1 {
352            batches.into_iter().next().unwrap()
353        } else {
354            let refs: Vec<&RecordBatch> = batches.iter().collect();
355            arrow::compute::concat_batches(&schema, refs)?
356        };
357
358        let execution =
359            SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
360
361        Ok(RuntimeStatementResult::Select {
362            execution: Box::new(execution),
363            table_name,
364            schema,
365        })
366    }
367
368    fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace> {
369        self.namespaces.persistent()
370    }
371
372    #[allow(dead_code)]
373    fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
374        self.namespaces.temporary()
375    }
376
377    fn information_schema_namespace(&self) -> Arc<TemporaryRuntimeNamespace> {
378        self.namespaces.information_schema()
379    }
380
381    fn base_transaction_context(&self) -> Arc<BaseTxnContext> {
382        let ctx = Arc::clone(self.inner.context());
383        ctx.set_constraint_mode(self.constraint_enforcement_mode());
384        ctx
385    }
386
387    fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
388    where
389        F: FnOnce(&BaseTxnContext) -> Result<T>,
390    {
391        let context = self.base_transaction_context();
392        let default_snapshot = context.ctx().default_snapshot();
393        TransactionContext::set_snapshot(&*context, default_snapshot);
394        f(context.as_ref())
395    }
396
397    fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TxnResult> {
398        self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
399    }
400
401    fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TxnResult> {
402        self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
403    }
404
405    fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TxnResult> {
406        self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
407    }
408
409    fn run_autocommit_truncate(&self, plan: TruncatePlan) -> Result<TxnResult> {
410        self.with_autocommit_transaction_context(|ctx| TransactionContext::truncate(ctx, plan))
411    }
412
413    fn run_autocommit_create_table(&self, plan: CreateTablePlan) -> Result<StatementResult> {
414        let result =
415            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
416        match result {
417            TransactionResult::CreateTable { table_name } => {
418                Ok(RuntimeStatementResult::CreateTable { table_name })
419            }
420            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
421            _ => Err(Error::Internal(
422                "unexpected transaction result for CREATE TABLE".into(),
423            )),
424        }
425    }
426
427    fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<StatementResult> {
428        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
429        Ok(RuntimeStatementResult::NoOp)
430    }
431
432    fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
433        self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
434    }
435
436    fn run_autocommit_alter_table(&self, plan: AlterTablePlan) -> Result<StatementResult> {
437        let result =
438            self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
439        match result {
440            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
441            TransactionResult::CreateTable { table_name } => {
442                Ok(RuntimeStatementResult::CreateTable { table_name })
443            }
444            TransactionResult::CreateIndex {
445                table_name,
446                index_name,
447            } => Ok(RuntimeStatementResult::CreateIndex {
448                table_name,
449                index_name,
450            }),
451            _ => Err(Error::Internal(
452                "unexpected transaction result for ALTER TABLE".into(),
453            )),
454        }
455    }
456
457    fn run_autocommit_create_index(&self, plan: CreateIndexPlan) -> Result<StatementResult> {
458        let result =
459            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
460        match result {
461            TransactionResult::CreateIndex {
462                table_name,
463                index_name,
464            } => Ok(RuntimeStatementResult::CreateIndex {
465                table_name,
466                index_name,
467            }),
468            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
469            _ => Err(Error::Internal(
470                "unexpected transaction result for CREATE INDEX".into(),
471            )),
472        }
473    }
474
475    fn run_autocommit_drop_index(
476        &self,
477        plan: DropIndexPlan,
478    ) -> Result<Option<SingleColumnIndexDescriptor>> {
479        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
480    }
481
482    /// Begin a transaction in this session.
483    /// Creates an empty staging context for new tables created within the transaction.
484    /// Existing tables are accessed via MVCC visibility filtering - NO data copying occurs.
485    pub fn begin_transaction(&self) -> Result<StatementResult> {
486        let staging_pager = Arc::new(MemPager::default());
487        tracing::trace!(
488            "BEGIN_TRANSACTION: Created staging pager at {:p}",
489            &*staging_pager
490        );
491        let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
492
493        // Staging context is EMPTY - used only for tables created within the transaction.
494        // Existing tables are read from base context with MVCC visibility filtering.
495        // No data copying occurs at BEGIN - this is pure MVCC.
496
497        let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
498        staging_wrapper.set_constraint_mode(self.constraint_enforcement_mode());
499
500        self.inner.begin_transaction(staging_wrapper)?;
501        Ok(RuntimeStatementResult::Transaction {
502            kind: TransactionKind::Begin,
503        })
504    }
505
506    /// Mark the current transaction as aborted due to an error.
507    /// This should be called when any error occurs during a transaction.
508    pub fn abort_transaction(&self) {
509        self.inner.abort_transaction();
510    }
511
512    /// Check if this session has an active transaction.
513    pub fn has_active_transaction(&self) -> bool {
514        let result = self.inner.has_active_transaction();
515        tracing::trace!("SESSION: has_active_transaction() = {}", result);
516        result
517    }
518
519    /// Check if the current transaction has been aborted due to an error.
520    pub fn is_aborted(&self) -> bool {
521        self.inner.is_aborted()
522    }
523
524    /// Check if a table was created in the current active transaction.
525    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
526        self.inner.is_table_created_in_transaction(table_name)
527    }
528
529    /// Get column specifications for a table created in the current transaction.
530    /// Returns `None` if there's no active transaction or the table wasn't created in it.
531    pub fn table_column_specs_from_transaction(
532        &self,
533        table_name: &str,
534    ) -> Option<Vec<PlanColumnSpec>> {
535        self.inner.table_column_specs_from_transaction(table_name)
536    }
537
538    /// Get tables that reference the given table via foreign keys created in the current transaction.
539    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
540    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
541        self.inner
542            .tables_referencing_in_transaction(referenced_table)
543    }
544
545    /// Commit the current transaction and apply changes to the base context.
546    /// If the transaction was aborted, this acts as a ROLLBACK instead.
547    pub fn commit_transaction(&self) -> Result<StatementResult> {
548        tracing::trace!("Session::commit_transaction called");
549        let (tx_result, operations) = self.inner.commit_transaction()?;
550        tracing::trace!(
551            "Session::commit_transaction got {} operations",
552            operations.len()
553        );
554
555        if !operations.is_empty() {
556            let dropped_tables = self
557                .inner
558                .context()
559                .ctx()
560                .dropped_tables
561                .read()
562                .unwrap()
563                .clone();
564            if !dropped_tables.is_empty() {
565                for operation in &operations {
566                    let table_name_opt = match operation {
567                        PlanOperation::Insert(plan) => Some(plan.table.as_str()),
568                        PlanOperation::Update(plan) => Some(plan.table.as_str()),
569                        PlanOperation::Delete(plan) => Some(plan.table.as_str()),
570                        _ => None,
571                    };
572                    if let Some(table_name) = table_name_opt {
573                        let (_, canonical) = canonical_table_name(table_name)?;
574                        if dropped_tables.contains(&canonical) {
575                            self.abort_transaction();
576                            return Err(Error::TransactionContextError(
577                                "another transaction has dropped this table".into(),
578                            ));
579                        }
580                    }
581                }
582            }
583        }
584
585        // Extract the transaction kind from the transaction module's result
586        let kind = match tx_result {
587            TransactionResult::Transaction { kind } => kind,
588            _ => {
589                return Err(Error::Internal(
590                    "commit_transaction returned non-transaction result".into(),
591                ));
592            }
593        };
594        tracing::trace!("Session::commit_transaction kind={:?}", kind);
595
596        // Only replay operations if there are any (empty if transaction was aborted)
597        for operation in operations {
598            match operation {
599                PlanOperation::CreateTable(plan) => {
600                    TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
601                }
602                PlanOperation::DropTable(plan) => {
603                    TransactionContext::drop_table(&**self.inner.context(), plan)?;
604                }
605                PlanOperation::Insert(plan) => {
606                    TransactionContext::insert(&**self.inner.context(), plan)?;
607                }
608                PlanOperation::Update(plan) => {
609                    TransactionContext::update(&**self.inner.context(), plan)?;
610                }
611                PlanOperation::Delete(plan) => {
612                    TransactionContext::delete(&**self.inner.context(), plan)?;
613                }
614                _ => {}
615            }
616        }
617
618        // Reset the base context snapshot to the default auto-commit view now that
619        // the transaction has been replayed onto the base tables.
620        let base_ctx = self.inner.context();
621        let default_snapshot = base_ctx.ctx().default_snapshot();
622        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
623
624        // Persist the next_txn_id to the catalog after a successful commit
625        if matches!(kind, TransactionKind::Commit) {
626            let ctx = base_ctx.ctx();
627            let next_txn_id = ctx.txn_manager().current_next_txn_id();
628            if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
629                tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
630            }
631        }
632
633        // Return a StatementResult with the correct kind (Commit or Rollback)
634        Ok(RuntimeStatementResult::Transaction { kind })
635    }
636
637    /// Rollback the current transaction, discarding all changes.
638    pub fn rollback_transaction(&self) -> Result<StatementResult> {
639        self.inner.rollback_transaction()?;
640        let base_ctx = self.inner.context();
641        let default_snapshot = base_ctx.ctx().default_snapshot();
642        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
643        Ok(RuntimeStatementResult::Transaction {
644            kind: TransactionKind::Rollback,
645        })
646    }
647
648    fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
649        // Only materialize if source is a SELECT query
650        // If source is already Batches, leave it alone
651        if matches!(plan.source, Some(CreateTableSource::Select { .. }))
652            && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
653        {
654            let select_result = self.execute_select_plan(*select_plan)?;
655            let (schema, batches) = match select_result {
656                RuntimeStatementResult::Select {
657                    schema, execution, ..
658                } => {
659                    let batches = execution.collect()?;
660                    (schema, batches)
661                }
662                _ => {
663                    return Err(Error::Internal(
664                        "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
665                    ));
666                }
667            };
668            plan.source = Some(CreateTableSource::Batches { schema, batches });
669        }
670        Ok(plan)
671    }
672
673    fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
674        let InsertPlan {
675            table,
676            columns,
677            source,
678            on_conflict,
679        } = plan;
680
681        match source {
682            InsertSource::Rows(rows) => {
683                let count = rows.len();
684                Ok((
685                    InsertPlan {
686                        table,
687                        columns,
688                        source: InsertSource::Rows(rows),
689                        on_conflict,
690                    },
691                    count,
692                ))
693            }
694            InsertSource::Batches(batches) => {
695                let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
696                Ok((
697                    InsertPlan {
698                        table,
699                        columns,
700                        source: InsertSource::Batches(batches),
701                        on_conflict,
702                    },
703                    count,
704                ))
705            }
706            InsertSource::Select { plan: select_plan } => {
707                let select_result = self.execute_select_plan(*select_plan)?;
708                let rows = match select_result {
709                    RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
710                    _ => {
711                        return Err(Error::Internal(
712                            "expected Select result when executing INSERT ... SELECT".into(),
713                        ));
714                    }
715                };
716                let count = rows.len();
717                Ok((
718                    InsertPlan {
719                        table,
720                        columns,
721                        source: InsertSource::Rows(rows),
722                        on_conflict,
723                    },
724                    count,
725                ))
726            }
727        }
728    }
729
730    /// Insert rows (outside or inside transaction).
731    pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<StatementResult> {
732        tracing::trace!("Session::insert called for table={}", plan.table);
733        let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
734        let table_name = plan.table.clone();
735        let (_, canonical_table) = canonical_table_name(&plan.table)?;
736        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
737
738        match namespace_id.as_str() {
739            TEMPORARY_NAMESPACE_ID => {
740                let temp_namespace = self
741                    .temporary_namespace()
742                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
743                let temp_context = temp_namespace.context();
744                let temp_tx_context = self.new_temp_tx_context(temp_context);
745                match TransactionContext::insert(&temp_tx_context, plan)? {
746                    TransactionResult::Insert { .. } => {}
747                    _ => {
748                        return Err(Error::Internal(
749                            "unexpected transaction result for temporary INSERT".into(),
750                        ));
751                    }
752                }
753                Ok(RuntimeStatementResult::Insert {
754                    rows_inserted,
755                    table_name,
756                })
757            }
758            INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("INSERT")),
759            PERSISTENT_NAMESPACE_ID => {
760                if self.has_active_transaction() {
761                    match self.inner.execute_operation(PlanOperation::Insert(plan)) {
762                        Ok(_) => {
763                            tracing::trace!("Session::insert succeeded for table={}", table_name);
764                            Ok(RuntimeStatementResult::Insert {
765                                rows_inserted,
766                                table_name,
767                            })
768                        }
769                        Err(e) => {
770                            tracing::trace!(
771                                "Session::insert failed for table={}, error={:?}",
772                                table_name,
773                                e
774                            );
775                            if matches!(e, Error::ConstraintError(_)) {
776                                tracing::trace!("Transaction is_aborted=true");
777                                self.abort_transaction();
778                            }
779                            Err(e)
780                        }
781                    }
782                } else {
783                    let result = self.run_autocommit_insert(plan)?;
784                    if !matches!(result, TransactionResult::Insert { .. }) {
785                        return Err(Error::Internal(
786                            "unexpected transaction result for INSERT operation".into(),
787                        ));
788                    }
789                    Ok(RuntimeStatementResult::Insert {
790                        rows_inserted,
791                        table_name,
792                    })
793                }
794            }
795            other => Err(Error::InvalidArgumentError(format!(
796                "Unknown storage namespace '{}'",
797                other
798            ))),
799        }
800    }
801
802    /// Select rows (outside or inside transaction).
803    pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<StatementResult> {
804        if let Some(namespace_id) = self.namespace_for_select_plan(&plan) {
805            if namespace_id == TEMPORARY_NAMESPACE_ID {
806                let namespace = self
807                    .temporary_namespace()
808                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
809                return self.select_from_namespace(namespace, plan);
810            }
811            if namespace_id == INFORMATION_SCHEMA_NAMESPACE_ID {
812                let namespace = self.information_schema_namespace();
813                return self.select_from_namespace(namespace, plan);
814            }
815        }
816
817        if self.has_active_transaction() {
818            let tx_result = match self
819                .inner
820                .execute_operation(PlanOperation::Select(Box::new(plan.clone())))
821            {
822                Ok(result) => result,
823                Err(e) => {
824                    // Only abort transaction on specific errors (constraint violations, etc.)
825                    // Don't abort on catalog errors (table doesn't exist) or similar
826                    if matches!(e, Error::ConstraintError(_)) {
827                        self.abort_transaction();
828                    }
829                    return Err(e);
830                }
831            };
832            match tx_result {
833                TransactionResult::Select {
834                    table_name,
835                    schema,
836                    execution: staging_execution,
837                } => {
838                    // Convert from staging (MemPager) execution to base pager execution
839                    // by collecting batches and rebuilding
840                    let batches = staging_execution.collect().unwrap_or_default();
841                    let combined = if batches.is_empty() {
842                        RecordBatch::new_empty(Arc::clone(&schema))
843                    } else if batches.len() == 1 {
844                        batches.into_iter().next().unwrap()
845                    } else {
846                        let refs: Vec<&RecordBatch> = batches.iter().collect();
847                        arrow::compute::concat_batches(&schema, refs)?
848                    };
849
850                    let execution = SelectExecution::from_batch(
851                        table_name.clone(),
852                        Arc::clone(&schema),
853                        combined,
854                    );
855
856                    Ok(RuntimeStatementResult::Select {
857                        execution: Box::new(execution),
858                        table_name,
859                        schema,
860                    })
861                }
862                _ => Err(Error::Internal("expected Select result".into())),
863            }
864        } else {
865            // Call via TransactionContext trait
866            let table_name = if plan.tables.len() == 1 {
867                plan.tables[0].qualified_name()
868            } else {
869                String::new()
870            };
871            let execution = self.with_autocommit_transaction_context(|ctx| {
872                TransactionContext::execute_select(ctx, plan)
873            })?;
874            let schema = execution.schema();
875            Ok(RuntimeStatementResult::Select {
876                execution: Box::new(execution),
877                table_name,
878                schema,
879            })
880        }
881    }
882
883    /// Convenience helper to fetch all rows from a table within this session.
884    pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
885        let plan =
886            SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
887        match self.execute_select_plan(plan)? {
888            RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
889            other => Err(Error::Internal(format!(
890                "expected Select result when reading table '{}', got {:?}",
891                table, other
892            ))),
893        }
894    }
895
896    /// Rebuilds `information_schema.*` tables inside the in-memory namespace.
897    ///
898    /// This forwards to the runtime’s refresh helper, which issues in-memory CTAS
899    /// batches derived from catalog metadata. No user tables are scanned, no data
900    /// reaches the persistent pager heap, and only the MemPager-backed
901    /// information_schema objects are dropped and recreated.
902    pub fn refresh_information_schema(&self) -> Result<()> {
903        let persistent = self.persistent_namespace();
904        let info_namespace = self.information_schema_namespace();
905        let registry = self.namespace_registry();
906        refresh_information_schema(
907            &persistent.context(),
908            &info_namespace.context(),
909            &registry,
910            info_namespace.namespace_id(),
911        )
912    }
913
914    pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<StatementResult> {
915        let (_, canonical_table) = canonical_table_name(&plan.table)?;
916        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
917
918        match namespace_id.as_str() {
919            TEMPORARY_NAMESPACE_ID => {
920                let temp_namespace = self
921                    .temporary_namespace()
922                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
923                let temp_context = temp_namespace.context();
924                let table_name = plan.table.clone();
925                let temp_tx_context = self.new_temp_tx_context(temp_context);
926                match TransactionContext::update(&temp_tx_context, plan)? {
927                    TransactionResult::Update { rows_updated, .. } => {
928                        Ok(RuntimeStatementResult::Update {
929                            rows_updated,
930                            table_name,
931                        })
932                    }
933                    _ => Err(Error::Internal(
934                        "unexpected transaction result for temporary UPDATE".into(),
935                    )),
936                }
937            }
938            INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("UPDATE")),
939            PERSISTENT_NAMESPACE_ID => {
940                if self.has_active_transaction() {
941                    let table_name = plan.table.clone();
942                    let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
943                        Ok(result) => result,
944                        Err(e) => {
945                            // If an error occurs during a transaction, abort it
946                            self.abort_transaction();
947                            return Err(e);
948                        }
949                    };
950                    match result {
951                        TransactionResult::Update {
952                            rows_matched: _,
953                            rows_updated,
954                        } => Ok(RuntimeStatementResult::Update {
955                            rows_updated,
956                            table_name,
957                        }),
958                        _ => Err(Error::Internal("expected Update result".into())),
959                    }
960                } else {
961                    let table_name = plan.table.clone();
962                    let result = self.run_autocommit_update(plan)?;
963                    match result {
964                        TransactionResult::Update {
965                            rows_matched: _,
966                            rows_updated,
967                        } => Ok(RuntimeStatementResult::Update {
968                            rows_updated,
969                            table_name,
970                        }),
971                        _ => Err(Error::Internal("expected Update result".into())),
972                    }
973                }
974            }
975            other => Err(Error::InvalidArgumentError(format!(
976                "Unknown storage namespace '{}'",
977                other
978            ))),
979        }
980    }
981
982    pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<StatementResult> {
983        let (_, canonical_table) = canonical_table_name(&plan.table)?;
984        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
985
986        match namespace_id.as_str() {
987            TEMPORARY_NAMESPACE_ID => {
988                let temp_namespace = self
989                    .temporary_namespace()
990                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
991                let temp_context = temp_namespace.context();
992                let table_name = plan.table.clone();
993                let temp_tx_context = self.new_temp_tx_context(temp_context);
994                match TransactionContext::delete(&temp_tx_context, plan)? {
995                    TransactionResult::Delete { rows_deleted } => {
996                        Ok(RuntimeStatementResult::Delete {
997                            rows_deleted,
998                            table_name,
999                        })
1000                    }
1001                    _ => Err(Error::Internal(
1002                        "unexpected transaction result for temporary DELETE".into(),
1003                    )),
1004                }
1005            }
1006            INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("DELETE")),
1007            PERSISTENT_NAMESPACE_ID => {
1008                if self.has_active_transaction() {
1009                    let table_name = plan.table.clone();
1010                    let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
1011                        Ok(result) => result,
1012                        Err(e) => {
1013                            // If an error occurs during a transaction, abort it
1014                            self.abort_transaction();
1015                            return Err(e);
1016                        }
1017                    };
1018                    match result {
1019                        TransactionResult::Delete { rows_deleted } => {
1020                            Ok(RuntimeStatementResult::Delete {
1021                                rows_deleted,
1022                                table_name,
1023                            })
1024                        }
1025                        _ => Err(Error::Internal("expected Delete result".into())),
1026                    }
1027                } else {
1028                    let table_name = plan.table.clone();
1029                    let result = self.run_autocommit_delete(plan)?;
1030                    match result {
1031                        TransactionResult::Delete { rows_deleted } => {
1032                            Ok(RuntimeStatementResult::Delete {
1033                                rows_deleted,
1034                                table_name,
1035                            })
1036                        }
1037                        _ => Err(Error::Internal("expected Delete result".into())),
1038                    }
1039                }
1040            }
1041            other => Err(Error::InvalidArgumentError(format!(
1042                "Unknown storage namespace '{}'",
1043                other
1044            ))),
1045        }
1046    }
1047
1048    pub fn execute_truncate_plan(&self, plan: TruncatePlan) -> Result<StatementResult> {
1049        let (_, canonical_table) = canonical_table_name(&plan.table)?;
1050        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1051
1052        match namespace_id.as_str() {
1053            TEMPORARY_NAMESPACE_ID => {
1054                let temp_namespace = self
1055                    .temporary_namespace()
1056                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1057                let temp_context = temp_namespace.context();
1058                let table_name = plan.table.clone();
1059                let temp_tx_context = self.new_temp_tx_context(temp_context);
1060                match TransactionContext::truncate(&temp_tx_context, plan)? {
1061                    TransactionResult::Delete { rows_deleted } => {
1062                        Ok(RuntimeStatementResult::Delete {
1063                            rows_deleted,
1064                            table_name,
1065                        })
1066                    }
1067                    _ => Err(Error::Internal(
1068                        "unexpected transaction result for temporary TRUNCATE".into(),
1069                    )),
1070                }
1071            }
1072            INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("TRUNCATE")),
1073            PERSISTENT_NAMESPACE_ID => {
1074                if self.has_active_transaction() {
1075                    let table_name = plan.table.clone();
1076                    let result = match self.inner.execute_operation(PlanOperation::Truncate(plan)) {
1077                        Ok(result) => result,
1078                        Err(e) => {
1079                            // If an error occurs during a transaction, abort it
1080                            self.abort_transaction();
1081                            return Err(e);
1082                        }
1083                    };
1084                    match result {
1085                        TransactionResult::Delete { rows_deleted } => {
1086                            Ok(RuntimeStatementResult::Delete {
1087                                rows_deleted,
1088                                table_name,
1089                            })
1090                        }
1091                        _ => Err(Error::Internal("expected Delete result".into())),
1092                    }
1093                } else {
1094                    let table_name = plan.table.clone();
1095                    let result = self.run_autocommit_truncate(plan)?;
1096                    match result {
1097                        TransactionResult::Delete { rows_deleted } => {
1098                            Ok(RuntimeStatementResult::Delete {
1099                                rows_deleted,
1100                                table_name,
1101                            })
1102                        }
1103                        _ => Err(Error::Internal("expected Delete result".into())),
1104                    }
1105                }
1106            }
1107            other => Err(Error::InvalidArgumentError(format!(
1108                "Unknown storage namespace '{}'",
1109                other
1110            ))),
1111        }
1112    }
1113}
1114
1115/// Implement [`CatalogDdl`] directly on the session so callers must import the trait
1116/// to perform schema mutations. This keeps all runtime DDL entry points aligned with
1117/// the shared contract used by contexts and storage namespaces.
1118impl CatalogDdl for RuntimeSession {
1119    type CreateTableOutput = StatementResult;
1120    type DropTableOutput = StatementResult;
1121    type RenameTableOutput = ();
1122    type AlterTableOutput = StatementResult;
1123    type CreateIndexOutput = StatementResult;
1124    type DropIndexOutput = StatementResult;
1125
1126    fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
1127        let target_namespace = plan
1128            .namespace
1129            .clone()
1130            .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
1131            .to_ascii_lowercase();
1132
1133        let plan = self.materialize_ctas_plan(plan)?;
1134
1135        match target_namespace.as_str() {
1136            INFORMATION_SCHEMA_NAMESPACE_ID => {
1137                Err(information_schema_read_only_error("CREATE TABLE"))
1138            }
1139            TEMPORARY_NAMESPACE_ID => {
1140                let temp_namespace = self
1141                    .temporary_namespace()
1142                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1143                let (_, canonical) = canonical_table_name(&plan.name)?;
1144                let result = temp_namespace.create_table(plan)?;
1145                if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
1146                    let namespace_id = temp_namespace.namespace_id().to_string();
1147                    let registry = self.namespace_registry();
1148                    registry
1149                        .write()
1150                        .expect("namespace registry poisoned")
1151                        .register_table(&namespace_id, canonical);
1152                }
1153                Ok(result)
1154            }
1155            PERSISTENT_NAMESPACE_ID => {
1156                if self.has_active_transaction() {
1157                    match self
1158                        .inner
1159                        .execute_operation(PlanOperation::CreateTable(plan))
1160                    {
1161                        Ok(TransactionResult::CreateTable { table_name }) => {
1162                            Ok(RuntimeStatementResult::CreateTable { table_name })
1163                        }
1164                        Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
1165                        Ok(_) => Err(Error::Internal(
1166                            "expected CreateTable result during transactional CREATE TABLE".into(),
1167                        )),
1168                        Err(err) => {
1169                            self.abort_transaction();
1170                            Err(err)
1171                        }
1172                    }
1173                } else {
1174                    if self.inner.has_table_locked_by_other_session(&plan.name) {
1175                        return Err(Error::TransactionContextError(format!(
1176                            "table '{}' is locked by another active transaction",
1177                            plan.name
1178                        )));
1179                    }
1180                    self.run_autocommit_create_table(plan)
1181                }
1182            }
1183            other => Err(Error::InvalidArgumentError(format!(
1184                "Unknown storage namespace '{}'",
1185                other
1186            ))),
1187        }
1188    }
1189
1190    fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
1191        let (_, canonical_table) = canonical_table_name(&plan.name)?;
1192        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1193
1194        match namespace_id.as_str() {
1195            TEMPORARY_NAMESPACE_ID => {
1196                let temp_namespace = self
1197                    .temporary_namespace()
1198                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1199                temp_namespace.drop_table(plan)?;
1200                let registry = self.namespace_registry();
1201                registry
1202                    .write()
1203                    .expect("namespace registry poisoned")
1204                    .unregister_table(&canonical_table);
1205                Ok(RuntimeStatementResult::NoOp)
1206            }
1207            INFORMATION_SCHEMA_NAMESPACE_ID => {
1208                Err(information_schema_read_only_error("DROP TABLE"))
1209            }
1210            PERSISTENT_NAMESPACE_ID => {
1211                if self.has_active_transaction() {
1212                    let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
1213                    if !referencing_tables.is_empty() {
1214                        let referencing_table = &referencing_tables[0];
1215                        self.abort_transaction();
1216                        return Err(Error::CatalogError(format!(
1217                            "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
1218                            referencing_table
1219                        )));
1220                    }
1221
1222                    match self
1223                        .inner
1224                        .execute_operation(PlanOperation::DropTable(plan.clone()))
1225                    {
1226                        Ok(TransactionResult::NoOp) => {
1227                            let registry = self.namespace_registry();
1228                            registry
1229                                .write()
1230                                .expect("namespace registry poisoned")
1231                                .unregister_table(&canonical_table);
1232                            Ok(RuntimeStatementResult::NoOp)
1233                        }
1234                        Ok(_) => Err(Error::Internal(
1235                            "expected NoOp result for DROP TABLE during transactional execution"
1236                                .into(),
1237                        )),
1238                        Err(err) => {
1239                            self.abort_transaction();
1240                            Err(err)
1241                        }
1242                    }
1243                } else {
1244                    if self.inner.has_table_locked_by_other_session(&plan.name) {
1245                        return Err(Error::TransactionContextError(format!(
1246                            "table '{}' is locked by another active transaction",
1247                            plan.name
1248                        )));
1249                    }
1250                    let result = self.run_autocommit_drop_table(plan)?;
1251                    let registry = self.namespace_registry();
1252                    registry
1253                        .write()
1254                        .expect("namespace registry poisoned")
1255                        .unregister_table(&canonical_table);
1256                    Ok(result)
1257                }
1258            }
1259            other => Err(Error::InvalidArgumentError(format!(
1260                "Unknown storage namespace '{}'",
1261                other
1262            ))),
1263        }
1264    }
1265
1266    fn create_view(&self, plan: CreateViewPlan) -> Result<()> {
1267        let target_namespace = plan
1268            .namespace
1269            .clone()
1270            .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
1271            .to_ascii_lowercase();
1272
1273        match target_namespace.as_str() {
1274            INFORMATION_SCHEMA_NAMESPACE_ID => {
1275                Err(information_schema_read_only_error("CREATE VIEW"))
1276            }
1277            TEMPORARY_NAMESPACE_ID => {
1278                let temp_namespace = self
1279                    .temporary_namespace()
1280                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1281                let (_, canonical) = canonical_table_name(&plan.name)?;
1282                temp_namespace.create_view(plan)?;
1283                let namespace_id = temp_namespace.namespace_id().to_string();
1284                let registry = self.namespace_registry();
1285                registry
1286                    .write()
1287                    .expect("namespace registry poisoned")
1288                    .register_table(&namespace_id, canonical);
1289                Ok(())
1290            }
1291            PERSISTENT_NAMESPACE_ID => {
1292                let persistent_namespace = self.persistent_namespace();
1293                persistent_namespace.create_view(plan)
1294            }
1295            other => Err(Error::InvalidArgumentError(format!(
1296                "Unknown storage namespace '{}'",
1297                other
1298            ))),
1299        }
1300    }
1301
1302    fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1303        let (_, canonical_view) = canonical_table_name(&plan.name)?;
1304        let namespace_id = self.resolve_namespace_for_table(&canonical_view);
1305
1306        match namespace_id.as_str() {
1307            TEMPORARY_NAMESPACE_ID => {
1308                let temp_namespace = self
1309                    .temporary_namespace()
1310                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1311                temp_namespace.drop_view(plan)?;
1312                let registry = self.namespace_registry();
1313                registry
1314                    .write()
1315                    .expect("namespace registry poisoned")
1316                    .unregister_table(&canonical_view);
1317                Ok(())
1318            }
1319            INFORMATION_SCHEMA_NAMESPACE_ID => Err(information_schema_read_only_error("DROP VIEW")),
1320            PERSISTENT_NAMESPACE_ID => {
1321                let persistent_namespace = self.persistent_namespace();
1322                persistent_namespace.drop_view(plan)
1323            }
1324            other => Err(Error::InvalidArgumentError(format!(
1325                "Unknown storage namespace '{}'",
1326                other
1327            ))),
1328        }
1329    }
1330
1331    fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
1332        if self.has_active_transaction() {
1333            return Err(Error::InvalidArgumentError(
1334                "ALTER TABLE RENAME is not supported inside an active transaction".into(),
1335            ));
1336        }
1337
1338        let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
1339        let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
1340        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1341
1342        match namespace_id.as_str() {
1343            TEMPORARY_NAMESPACE_ID => {
1344                let temp_namespace = self
1345                    .temporary_namespace()
1346                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1347                match temp_namespace.rename_table(plan.clone()) {
1348                    Ok(()) => {
1349                        let namespace_id = temp_namespace.namespace_id().to_string();
1350                        let registry = self.namespace_registry();
1351                        let mut registry = registry.write().expect("namespace registry poisoned");
1352                        registry.unregister_table(&canonical_table);
1353                        registry.register_table(&namespace_id, new_canonical);
1354                        Ok(())
1355                    }
1356                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1357                    Err(err) => Err(err),
1358                }
1359            }
1360            INFORMATION_SCHEMA_NAMESPACE_ID => {
1361                Err(information_schema_read_only_error("RENAME TABLE"))
1362            }
1363            PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1364                Ok(()) => Ok(()),
1365                Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1366                Err(err) => Err(err),
1367            },
1368            other => Err(Error::InvalidArgumentError(format!(
1369                "Unknown storage namespace '{}'",
1370                other
1371            ))),
1372        }
1373    }
1374
1375    fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1376        let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1377        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1378
1379        match namespace_id.as_str() {
1380            TEMPORARY_NAMESPACE_ID => {
1381                let temp_namespace = self
1382                    .temporary_namespace()
1383                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1384
1385                let context = temp_namespace.context();
1386                let catalog_service = &context.catalog_service;
1387                let view = match catalog_service.table_view(&canonical_table) {
1388                    Ok(view) => view,
1389                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1390                        return Ok(RuntimeStatementResult::NoOp);
1391                    }
1392                    Err(err) => return Err(err),
1393                };
1394                let table_id = view
1395                    .table_meta
1396                    .as_ref()
1397                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1398                    .table_id;
1399
1400                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1401
1402                Ok(temp_namespace.alter_table(plan)?)
1403            }
1404            INFORMATION_SCHEMA_NAMESPACE_ID => {
1405                Err(information_schema_read_only_error("ALTER TABLE"))
1406            }
1407            PERSISTENT_NAMESPACE_ID => {
1408                let persistent = self.persistent_namespace();
1409                let context = persistent.context();
1410                let catalog_service = &context.catalog_service;
1411                let view = match catalog_service.table_view(&canonical_table) {
1412                    Ok(view) => view,
1413                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1414                        return Ok(RuntimeStatementResult::NoOp);
1415                    }
1416                    Err(err) => return Err(err),
1417                };
1418                let table_id = view
1419                    .table_meta
1420                    .as_ref()
1421                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1422                    .table_id;
1423
1424                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1425
1426                self.run_autocommit_alter_table(plan)
1427            }
1428            other => Err(Error::InvalidArgumentError(format!(
1429                "Unknown storage namespace '{}'",
1430                other
1431            ))),
1432        }
1433    }
1434
1435    fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1436        if plan.columns.is_empty() {
1437            return Err(Error::InvalidArgumentError(
1438                "CREATE INDEX requires at least one column".into(),
1439            ));
1440        }
1441
1442        let (_, canonical_table) = canonical_table_name(&plan.table)?;
1443        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1444
1445        match namespace_id.as_str() {
1446            TEMPORARY_NAMESPACE_ID => {
1447                let temp_namespace = self
1448                    .temporary_namespace()
1449                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1450                Ok(temp_namespace.create_index(plan)?)
1451            }
1452            INFORMATION_SCHEMA_NAMESPACE_ID => {
1453                Err(information_schema_read_only_error("CREATE INDEX"))
1454            }
1455            PERSISTENT_NAMESPACE_ID => {
1456                if self.has_active_transaction() {
1457                    return Err(Error::InvalidArgumentError(
1458                        "CREATE INDEX is not supported inside an active transaction".into(),
1459                    ));
1460                }
1461
1462                self.run_autocommit_create_index(plan)
1463            }
1464            other => Err(Error::InvalidArgumentError(format!(
1465                "Unknown storage namespace '{}'",
1466                other
1467            ))),
1468        }
1469    }
1470
1471    fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1472        if self.has_active_transaction() {
1473            return Err(Error::InvalidArgumentError(
1474                "DROP INDEX is not supported inside an active transaction".into(),
1475            ));
1476        }
1477
1478        let mut dropped = false;
1479
1480        match self.run_autocommit_drop_index(plan.clone()) {
1481            Ok(Some(_)) => {
1482                dropped = true;
1483            }
1484            Ok(None) => {}
1485            Err(err) => {
1486                if !super::is_index_not_found_error(&err) {
1487                    return Err(err);
1488                }
1489            }
1490        }
1491
1492        if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1493            match temp_namespace.drop_index(plan.clone()) {
1494                Ok(Some(_)) => {
1495                    dropped = true;
1496                }
1497                Ok(None) => {}
1498                Err(err) => {
1499                    if !super::is_index_not_found_error(&err) {
1500                        return Err(err);
1501                    }
1502                }
1503            }
1504        }
1505
1506        if dropped || plan.if_exists {
1507            Ok(RuntimeStatementResult::NoOp)
1508        } else {
1509            Err(Error::CatalogError(format!(
1510                "Index '{}' does not exist",
1511                plan.name
1512            )))
1513        }
1514    }
1515}