llkv_runtime/
runtime_session.rs

1// TODO: Implement a common trait (similar to CatalogDdl) for runtime sessions and llkv-transaction sessions
2
3use std::sync::{Arc, RwLock};
4
5use arrow::record_batch::RecordBatch;
6use llkv_result::{Error, Result};
7use llkv_storage::pager::{BoxedPager, MemPager};
8use llkv_table::types::TableId;
9use llkv_table::{
10    SingleColumnIndexDescriptor, canonical_table_name, validate_alter_table_operation,
11};
12
13use crate::{
14    AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource,
15    CreateViewPlan, DeletePlan, DropIndexPlan, DropTablePlan, DropViewPlan, InsertPlan,
16    InsertSource, PlanColumnSpec, PlanOperation, PlanValue, RenameTablePlan, RuntimeContext,
17    RuntimeStatementResult, RuntimeTransactionContext, SelectExecution, SelectPlan,
18    SelectProjection, TransactionContext, TransactionKind, TransactionResult, TransactionSession,
19    UpdatePlan,
20};
21use crate::{
22    PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
23    RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry, TEMPORARY_NAMESPACE_ID,
24    TemporaryRuntimeNamespace,
25};
26use llkv_plan::TruncatePlan;
27
28type StatementResult = RuntimeStatementResult<BoxedPager>;
29type TxnResult = TransactionResult<BoxedPager>;
30type BaseTxnContext = RuntimeTransactionContext<BoxedPager>;
31
32pub(crate) struct SessionNamespaces {
33    persistent: Arc<PersistentRuntimeNamespace>,
34    temporary: Option<Arc<TemporaryRuntimeNamespace>>,
35    registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
36}
37
38impl SessionNamespaces {
39    pub(crate) fn new(base_context: Arc<RuntimeContext<BoxedPager>>) -> Self {
40        let persistent = Arc::new(PersistentRuntimeNamespace::new(
41            PERSISTENT_NAMESPACE_ID.to_string(),
42            Arc::clone(&base_context),
43        ));
44
45        let mut registry = RuntimeStorageNamespaceRegistry::new(
46            RuntimeStorageNamespace::namespace_id(persistent.as_ref()).clone(),
47        );
48        registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
49
50        let temporary = {
51            // ARCHITECTURAL DECISION: Multi-pager arena via fallback lookup
52            //
53            // Temporary tables use an isolated MemPager-backed ColumnStore while sharing the
54            // persistent namespace's catalog. When a temporary object references persistent
55            // data (e.g., CREATE TEMP VIEW ... FROM main.t1), the temporary context forwards
56            // lookups to the persistent context via fallback. This keeps temporary storage
57            // purely in-memory while preserving cross-namespace visibility.
58            //
59            // Implementation steps:
60            // 1. Wrap a fresh MemPager in BoxedPager so it uses the same runtime pager type
61            //    as the persistent context (BoxedPager).
62            // 2. Reuse the persistent catalog handle so both namespaces observe identical
63            //    table metadata.
64            // 3. Install the persistent context as the fallback lookup target so cache misses
65            //    in the temporary namespace transparently resolve to persistent tables.
66            let shared_catalog = base_context.table_catalog();
67            let temp_mem_pager = Arc::new(MemPager::default());
68            let temp_boxed_pager = Arc::new(BoxedPager::from_arc(temp_mem_pager));
69            let temp_context = Arc::new(
70                RuntimeContext::new_with_catalog(temp_boxed_pager, Arc::clone(&shared_catalog))
71                    .with_fallback_lookup(Arc::clone(&base_context)),
72            );
73
74            const TEMPORARY_TABLE_ID_START: TableId = 0x8000;
75            temp_context
76                .ensure_next_table_id_at_least(TEMPORARY_TABLE_ID_START)
77                .expect("failed to seed temporary namespace table id counter");
78
79            let namespace = Arc::new(TemporaryRuntimeNamespace::new(
80                TEMPORARY_NAMESPACE_ID.to_string(),
81                temp_context,
82            ));
83            registry.register_namespace(
84                Arc::clone(&namespace),
85                vec![TEMPORARY_NAMESPACE_ID.to_string()],
86                true,
87            );
88            namespace
89        };
90
91        Self {
92            persistent,
93            temporary: Some(temporary),
94            registry: Arc::new(RwLock::new(registry)),
95        }
96    }
97
98    pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace> {
99        Arc::clone(&self.persistent)
100    }
101
102    pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
103        self.temporary.as_ref().map(Arc::clone)
104    }
105
106    pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
107        Arc::clone(&self.registry)
108    }
109}
110
111impl Drop for SessionNamespaces {
112    fn drop(&mut self) {
113        if let Some(temp) = &self.temporary {
114            let namespace_id = temp.namespace_id().to_string();
115            let canonical_names = {
116                let mut registry = self.registry.write().expect("namespace registry poisoned");
117                registry.drain_namespace_tables(&namespace_id)
118            };
119            temp.clear_tables(canonical_names);
120        }
121    }
122}
123
124/// A session for executing operations with optional transaction support.
125///
126/// This is a high-level wrapper around the transaction machinery that provides
127/// a clean API for users. Operations can be executed directly or within a transaction.
128pub struct RuntimeSession {
129    // Transaction session using BoxedPager for base storage and MemPager for staging tables
130    inner: TransactionSession<
131        RuntimeTransactionContext<BoxedPager>,
132        RuntimeTransactionContext<MemPager>,
133    >,
134    namespaces: Arc<SessionNamespaces>,
135}
136
137impl RuntimeSession {
138    pub(crate) fn from_parts(
139        inner: TransactionSession<
140            RuntimeTransactionContext<BoxedPager>,
141            RuntimeTransactionContext<MemPager>,
142        >,
143        namespaces: Arc<SessionNamespaces>,
144    ) -> Self {
145        Self { inner, namespaces }
146    }
147
148    /// Clone this session (reuses the same underlying TransactionSession).
149    /// This is necessary to maintain transaction state across Engine clones.
150    pub(crate) fn clone_session(&self) -> Self {
151        Self {
152            inner: self.inner.clone_session(),
153            namespaces: self.namespaces.clone(),
154        }
155    }
156
157    pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
158        self.namespaces.registry()
159    }
160
161    fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
162        self.namespace_registry()
163            .read()
164            .expect("namespace registry poisoned")
165            .namespace_for_table(canonical)
166    }
167
168    fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
169        if plan.tables.len() != 1 {
170            return None;
171        }
172
173        let qualified = plan.tables[0].qualified_name();
174        let (_, canonical) = canonical_table_name(&qualified).ok()?;
175        Some(self.resolve_namespace_for_table(&canonical))
176    }
177
178    fn select_from_temporary(&self, plan: SelectPlan) -> Result<StatementResult> {
179        let temp_namespace = self
180            .temporary_namespace()
181            .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
182
183        let table_name = if plan.tables.len() == 1 {
184            plan.tables[0].qualified_name()
185        } else {
186            String::new()
187        };
188
189        let temp_context = temp_namespace.context();
190        let temp_tx_context = RuntimeTransactionContext::new(temp_context);
191        let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
192        let schema = execution.schema();
193        let batches = execution.collect()?;
194
195        let combined = if batches.is_empty() {
196            RecordBatch::new_empty(Arc::clone(&schema))
197        } else if batches.len() == 1 {
198            batches.into_iter().next().unwrap()
199        } else {
200            let refs: Vec<&RecordBatch> = batches.iter().collect();
201            arrow::compute::concat_batches(&schema, refs)?
202        };
203
204        let execution =
205            SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
206
207        Ok(RuntimeStatementResult::Select {
208            execution: Box::new(execution),
209            table_name,
210            schema,
211        })
212    }
213
214    fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace> {
215        self.namespaces.persistent()
216    }
217
218    #[allow(dead_code)]
219    fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace>> {
220        self.namespaces.temporary()
221    }
222
223    fn base_transaction_context(&self) -> Arc<BaseTxnContext> {
224        Arc::clone(self.inner.context())
225    }
226
227    fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
228    where
229        F: FnOnce(&BaseTxnContext) -> Result<T>,
230    {
231        let context = self.base_transaction_context();
232        let default_snapshot = context.ctx().default_snapshot();
233        TransactionContext::set_snapshot(&*context, default_snapshot);
234        f(context.as_ref())
235    }
236
237    fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TxnResult> {
238        self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
239    }
240
241    fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TxnResult> {
242        self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
243    }
244
245    fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TxnResult> {
246        self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
247    }
248
249    fn run_autocommit_truncate(&self, plan: TruncatePlan) -> Result<TxnResult> {
250        self.with_autocommit_transaction_context(|ctx| TransactionContext::truncate(ctx, plan))
251    }
252
253    fn run_autocommit_create_table(&self, plan: CreateTablePlan) -> Result<StatementResult> {
254        let result =
255            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
256        match result {
257            TransactionResult::CreateTable { table_name } => {
258                Ok(RuntimeStatementResult::CreateTable { table_name })
259            }
260            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
261            _ => Err(Error::Internal(
262                "unexpected transaction result for CREATE TABLE".into(),
263            )),
264        }
265    }
266
267    fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<StatementResult> {
268        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
269        Ok(RuntimeStatementResult::NoOp)
270    }
271
272    fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
273        self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
274    }
275
276    fn run_autocommit_alter_table(&self, plan: AlterTablePlan) -> Result<StatementResult> {
277        let result =
278            self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
279        match result {
280            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
281            TransactionResult::CreateTable { table_name } => {
282                Ok(RuntimeStatementResult::CreateTable { table_name })
283            }
284            TransactionResult::CreateIndex {
285                table_name,
286                index_name,
287            } => Ok(RuntimeStatementResult::CreateIndex {
288                table_name,
289                index_name,
290            }),
291            _ => Err(Error::Internal(
292                "unexpected transaction result for ALTER TABLE".into(),
293            )),
294        }
295    }
296
297    fn run_autocommit_create_index(&self, plan: CreateIndexPlan) -> Result<StatementResult> {
298        let result =
299            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
300        match result {
301            TransactionResult::CreateIndex {
302                table_name,
303                index_name,
304            } => Ok(RuntimeStatementResult::CreateIndex {
305                table_name,
306                index_name,
307            }),
308            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
309            _ => Err(Error::Internal(
310                "unexpected transaction result for CREATE INDEX".into(),
311            )),
312        }
313    }
314
315    fn run_autocommit_drop_index(
316        &self,
317        plan: DropIndexPlan,
318    ) -> Result<Option<SingleColumnIndexDescriptor>> {
319        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
320    }
321
322    /// Begin a transaction in this session.
323    /// Creates an empty staging context for new tables created within the transaction.
324    /// Existing tables are accessed via MVCC visibility filtering - NO data copying occurs.
325    pub fn begin_transaction(&self) -> Result<StatementResult> {
326        let staging_pager = Arc::new(MemPager::default());
327        tracing::trace!(
328            "BEGIN_TRANSACTION: Created staging pager at {:p}",
329            &*staging_pager
330        );
331        let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
332
333        // Staging context is EMPTY - used only for tables created within the transaction.
334        // Existing tables are read from base context with MVCC visibility filtering.
335        // No data copying occurs at BEGIN - this is pure MVCC.
336
337        let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
338
339        self.inner.begin_transaction(staging_wrapper)?;
340        Ok(RuntimeStatementResult::Transaction {
341            kind: TransactionKind::Begin,
342        })
343    }
344
345    /// Mark the current transaction as aborted due to an error.
346    /// This should be called when any error occurs during a transaction.
347    pub fn abort_transaction(&self) {
348        self.inner.abort_transaction();
349    }
350
351    /// Check if this session has an active transaction.
352    pub fn has_active_transaction(&self) -> bool {
353        let result = self.inner.has_active_transaction();
354        tracing::trace!("SESSION: has_active_transaction() = {}", result);
355        result
356    }
357
358    /// Check if the current transaction has been aborted due to an error.
359    pub fn is_aborted(&self) -> bool {
360        self.inner.is_aborted()
361    }
362
363    /// Check if a table was created in the current active transaction.
364    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
365        self.inner.is_table_created_in_transaction(table_name)
366    }
367
368    /// Get column specifications for a table created in the current transaction.
369    /// Returns `None` if there's no active transaction or the table wasn't created in it.
370    pub fn table_column_specs_from_transaction(
371        &self,
372        table_name: &str,
373    ) -> Option<Vec<PlanColumnSpec>> {
374        self.inner.table_column_specs_from_transaction(table_name)
375    }
376
377    /// Get tables that reference the given table via foreign keys created in the current transaction.
378    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
379    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
380        self.inner
381            .tables_referencing_in_transaction(referenced_table)
382    }
383
384    /// Commit the current transaction and apply changes to the base context.
385    /// If the transaction was aborted, this acts as a ROLLBACK instead.
386    pub fn commit_transaction(&self) -> Result<StatementResult> {
387        tracing::trace!("Session::commit_transaction called");
388        let (tx_result, operations) = self.inner.commit_transaction()?;
389        tracing::trace!(
390            "Session::commit_transaction got {} operations",
391            operations.len()
392        );
393
394        if !operations.is_empty() {
395            let dropped_tables = self
396                .inner
397                .context()
398                .ctx()
399                .dropped_tables
400                .read()
401                .unwrap()
402                .clone();
403            if !dropped_tables.is_empty() {
404                for operation in &operations {
405                    let table_name_opt = match operation {
406                        PlanOperation::Insert(plan) => Some(plan.table.as_str()),
407                        PlanOperation::Update(plan) => Some(plan.table.as_str()),
408                        PlanOperation::Delete(plan) => Some(plan.table.as_str()),
409                        _ => None,
410                    };
411                    if let Some(table_name) = table_name_opt {
412                        let (_, canonical) = canonical_table_name(table_name)?;
413                        if dropped_tables.contains(&canonical) {
414                            self.abort_transaction();
415                            return Err(Error::TransactionContextError(
416                                "another transaction has dropped this table".into(),
417                            ));
418                        }
419                    }
420                }
421            }
422        }
423
424        // Extract the transaction kind from the transaction module's result
425        let kind = match tx_result {
426            TransactionResult::Transaction { kind } => kind,
427            _ => {
428                return Err(Error::Internal(
429                    "commit_transaction returned non-transaction result".into(),
430                ));
431            }
432        };
433        tracing::trace!("Session::commit_transaction kind={:?}", kind);
434
435        // Only replay operations if there are any (empty if transaction was aborted)
436        for operation in operations {
437            match operation {
438                PlanOperation::CreateTable(plan) => {
439                    TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
440                }
441                PlanOperation::DropTable(plan) => {
442                    TransactionContext::drop_table(&**self.inner.context(), plan)?;
443                }
444                PlanOperation::Insert(plan) => {
445                    TransactionContext::insert(&**self.inner.context(), plan)?;
446                }
447                PlanOperation::Update(plan) => {
448                    TransactionContext::update(&**self.inner.context(), plan)?;
449                }
450                PlanOperation::Delete(plan) => {
451                    TransactionContext::delete(&**self.inner.context(), plan)?;
452                }
453                _ => {}
454            }
455        }
456
457        // Reset the base context snapshot to the default auto-commit view now that
458        // the transaction has been replayed onto the base tables.
459        let base_ctx = self.inner.context();
460        let default_snapshot = base_ctx.ctx().default_snapshot();
461        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
462
463        // Persist the next_txn_id to the catalog after a successful commit
464        if matches!(kind, TransactionKind::Commit) {
465            let ctx = base_ctx.ctx();
466            let next_txn_id = ctx.txn_manager().current_next_txn_id();
467            if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
468                tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
469            }
470        }
471
472        // Return a StatementResult with the correct kind (Commit or Rollback)
473        Ok(RuntimeStatementResult::Transaction { kind })
474    }
475
476    /// Rollback the current transaction, discarding all changes.
477    pub fn rollback_transaction(&self) -> Result<StatementResult> {
478        self.inner.rollback_transaction()?;
479        let base_ctx = self.inner.context();
480        let default_snapshot = base_ctx.ctx().default_snapshot();
481        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
482        Ok(RuntimeStatementResult::Transaction {
483            kind: TransactionKind::Rollback,
484        })
485    }
486
487    fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
488        // Only materialize if source is a SELECT query
489        // If source is already Batches, leave it alone
490        if matches!(plan.source, Some(CreateTableSource::Select { .. }))
491            && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
492        {
493            let select_result = self.execute_select_plan(*select_plan)?;
494            let (schema, batches) = match select_result {
495                RuntimeStatementResult::Select {
496                    schema, execution, ..
497                } => {
498                    let batches = execution.collect()?;
499                    (schema, batches)
500                }
501                _ => {
502                    return Err(Error::Internal(
503                        "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
504                    ));
505                }
506            };
507            plan.source = Some(CreateTableSource::Batches { schema, batches });
508        }
509        Ok(plan)
510    }
511
512    fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
513        let InsertPlan {
514            table,
515            columns,
516            source,
517            on_conflict,
518        } = plan;
519
520        match source {
521            InsertSource::Rows(rows) => {
522                let count = rows.len();
523                Ok((
524                    InsertPlan {
525                        table,
526                        columns,
527                        source: InsertSource::Rows(rows),
528                        on_conflict,
529                    },
530                    count,
531                ))
532            }
533            InsertSource::Batches(batches) => {
534                let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
535                Ok((
536                    InsertPlan {
537                        table,
538                        columns,
539                        source: InsertSource::Batches(batches),
540                        on_conflict,
541                    },
542                    count,
543                ))
544            }
545            InsertSource::Select { plan: select_plan } => {
546                let select_result = self.execute_select_plan(*select_plan)?;
547                let rows = match select_result {
548                    RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
549                    _ => {
550                        return Err(Error::Internal(
551                            "expected Select result when executing INSERT ... SELECT".into(),
552                        ));
553                    }
554                };
555                let count = rows.len();
556                Ok((
557                    InsertPlan {
558                        table,
559                        columns,
560                        source: InsertSource::Rows(rows),
561                        on_conflict,
562                    },
563                    count,
564                ))
565            }
566        }
567    }
568
569    /// Insert rows (outside or inside transaction).
570    pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<StatementResult> {
571        tracing::trace!("Session::insert called for table={}", plan.table);
572        let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
573        let table_name = plan.table.clone();
574        let (_, canonical_table) = canonical_table_name(&plan.table)?;
575        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
576
577        match namespace_id.as_str() {
578            TEMPORARY_NAMESPACE_ID => {
579                let temp_namespace = self
580                    .temporary_namespace()
581                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
582                let temp_context = temp_namespace.context();
583                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
584                match TransactionContext::insert(&temp_tx_context, plan)? {
585                    TransactionResult::Insert { .. } => {}
586                    _ => {
587                        return Err(Error::Internal(
588                            "unexpected transaction result for temporary INSERT".into(),
589                        ));
590                    }
591                }
592                Ok(RuntimeStatementResult::Insert {
593                    rows_inserted,
594                    table_name,
595                })
596            }
597            PERSISTENT_NAMESPACE_ID => {
598                if self.has_active_transaction() {
599                    match self.inner.execute_operation(PlanOperation::Insert(plan)) {
600                        Ok(_) => {
601                            tracing::trace!("Session::insert succeeded for table={}", table_name);
602                            Ok(RuntimeStatementResult::Insert {
603                                rows_inserted,
604                                table_name,
605                            })
606                        }
607                        Err(e) => {
608                            tracing::trace!(
609                                "Session::insert failed for table={}, error={:?}",
610                                table_name,
611                                e
612                            );
613                            if matches!(e, Error::ConstraintError(_)) {
614                                tracing::trace!("Transaction is_aborted=true");
615                                self.abort_transaction();
616                            }
617                            Err(e)
618                        }
619                    }
620                } else {
621                    let result = self.run_autocommit_insert(plan)?;
622                    if !matches!(result, TransactionResult::Insert { .. }) {
623                        return Err(Error::Internal(
624                            "unexpected transaction result for INSERT operation".into(),
625                        ));
626                    }
627                    Ok(RuntimeStatementResult::Insert {
628                        rows_inserted,
629                        table_name,
630                    })
631                }
632            }
633            other => Err(Error::InvalidArgumentError(format!(
634                "Unknown storage namespace '{}'",
635                other
636            ))),
637        }
638    }
639
640    /// Select rows (outside or inside transaction).
641    pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<StatementResult> {
642        if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
643            && namespace_id == TEMPORARY_NAMESPACE_ID
644        {
645            return self.select_from_temporary(plan);
646        }
647
648        if self.has_active_transaction() {
649            let tx_result = match self
650                .inner
651                .execute_operation(PlanOperation::Select(Box::new(plan.clone())))
652            {
653                Ok(result) => result,
654                Err(e) => {
655                    // Only abort transaction on specific errors (constraint violations, etc.)
656                    // Don't abort on catalog errors (table doesn't exist) or similar
657                    if matches!(e, Error::ConstraintError(_)) {
658                        self.abort_transaction();
659                    }
660                    return Err(e);
661                }
662            };
663            match tx_result {
664                TransactionResult::Select {
665                    table_name,
666                    schema,
667                    execution: staging_execution,
668                } => {
669                    // Convert from staging (MemPager) execution to base pager execution
670                    // by collecting batches and rebuilding
671                    let batches = staging_execution.collect().unwrap_or_default();
672                    let combined = if batches.is_empty() {
673                        RecordBatch::new_empty(Arc::clone(&schema))
674                    } else if batches.len() == 1 {
675                        batches.into_iter().next().unwrap()
676                    } else {
677                        let refs: Vec<&RecordBatch> = batches.iter().collect();
678                        arrow::compute::concat_batches(&schema, refs)?
679                    };
680
681                    let execution = SelectExecution::from_batch(
682                        table_name.clone(),
683                        Arc::clone(&schema),
684                        combined,
685                    );
686
687                    Ok(RuntimeStatementResult::Select {
688                        execution: Box::new(execution),
689                        table_name,
690                        schema,
691                    })
692                }
693                _ => Err(Error::Internal("expected Select result".into())),
694            }
695        } else {
696            // Call via TransactionContext trait
697            let table_name = if plan.tables.len() == 1 {
698                plan.tables[0].qualified_name()
699            } else {
700                String::new()
701            };
702            let execution = self.with_autocommit_transaction_context(|ctx| {
703                TransactionContext::execute_select(ctx, plan)
704            })?;
705            let schema = execution.schema();
706            Ok(RuntimeStatementResult::Select {
707                execution: Box::new(execution),
708                table_name,
709                schema,
710            })
711        }
712    }
713
714    /// Convenience helper to fetch all rows from a table within this session.
715    pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
716        let plan =
717            SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
718        match self.execute_select_plan(plan)? {
719            RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
720            other => Err(Error::Internal(format!(
721                "expected Select result when reading table '{}', got {:?}",
722                table, other
723            ))),
724        }
725    }
726
727    pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<StatementResult> {
728        let (_, canonical_table) = canonical_table_name(&plan.table)?;
729        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
730
731        match namespace_id.as_str() {
732            TEMPORARY_NAMESPACE_ID => {
733                let temp_namespace = self
734                    .temporary_namespace()
735                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
736                let temp_context = temp_namespace.context();
737                let table_name = plan.table.clone();
738                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
739                match TransactionContext::update(&temp_tx_context, plan)? {
740                    TransactionResult::Update { rows_updated, .. } => {
741                        Ok(RuntimeStatementResult::Update {
742                            rows_updated,
743                            table_name,
744                        })
745                    }
746                    _ => Err(Error::Internal(
747                        "unexpected transaction result for temporary UPDATE".into(),
748                    )),
749                }
750            }
751            PERSISTENT_NAMESPACE_ID => {
752                if self.has_active_transaction() {
753                    let table_name = plan.table.clone();
754                    let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
755                        Ok(result) => result,
756                        Err(e) => {
757                            // If an error occurs during a transaction, abort it
758                            self.abort_transaction();
759                            return Err(e);
760                        }
761                    };
762                    match result {
763                        TransactionResult::Update {
764                            rows_matched: _,
765                            rows_updated,
766                        } => Ok(RuntimeStatementResult::Update {
767                            rows_updated,
768                            table_name,
769                        }),
770                        _ => Err(Error::Internal("expected Update result".into())),
771                    }
772                } else {
773                    let table_name = plan.table.clone();
774                    let result = self.run_autocommit_update(plan)?;
775                    match result {
776                        TransactionResult::Update {
777                            rows_matched: _,
778                            rows_updated,
779                        } => Ok(RuntimeStatementResult::Update {
780                            rows_updated,
781                            table_name,
782                        }),
783                        _ => Err(Error::Internal("expected Update result".into())),
784                    }
785                }
786            }
787            other => Err(Error::InvalidArgumentError(format!(
788                "Unknown storage namespace '{}'",
789                other
790            ))),
791        }
792    }
793
794    pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<StatementResult> {
795        let (_, canonical_table) = canonical_table_name(&plan.table)?;
796        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
797
798        match namespace_id.as_str() {
799            TEMPORARY_NAMESPACE_ID => {
800                let temp_namespace = self
801                    .temporary_namespace()
802                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
803                let temp_context = temp_namespace.context();
804                let table_name = plan.table.clone();
805                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
806                match TransactionContext::delete(&temp_tx_context, plan)? {
807                    TransactionResult::Delete { rows_deleted } => {
808                        Ok(RuntimeStatementResult::Delete {
809                            rows_deleted,
810                            table_name,
811                        })
812                    }
813                    _ => Err(Error::Internal(
814                        "unexpected transaction result for temporary DELETE".into(),
815                    )),
816                }
817            }
818            PERSISTENT_NAMESPACE_ID => {
819                if self.has_active_transaction() {
820                    let table_name = plan.table.clone();
821                    let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
822                        Ok(result) => result,
823                        Err(e) => {
824                            // If an error occurs during a transaction, abort it
825                            self.abort_transaction();
826                            return Err(e);
827                        }
828                    };
829                    match result {
830                        TransactionResult::Delete { rows_deleted } => {
831                            Ok(RuntimeStatementResult::Delete {
832                                rows_deleted,
833                                table_name,
834                            })
835                        }
836                        _ => Err(Error::Internal("expected Delete result".into())),
837                    }
838                } else {
839                    let table_name = plan.table.clone();
840                    let result = self.run_autocommit_delete(plan)?;
841                    match result {
842                        TransactionResult::Delete { rows_deleted } => {
843                            Ok(RuntimeStatementResult::Delete {
844                                rows_deleted,
845                                table_name,
846                            })
847                        }
848                        _ => Err(Error::Internal("expected Delete result".into())),
849                    }
850                }
851            }
852            other => Err(Error::InvalidArgumentError(format!(
853                "Unknown storage namespace '{}'",
854                other
855            ))),
856        }
857    }
858
859    pub fn execute_truncate_plan(&self, plan: TruncatePlan) -> Result<StatementResult> {
860        let (_, canonical_table) = canonical_table_name(&plan.table)?;
861        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
862
863        match namespace_id.as_str() {
864            TEMPORARY_NAMESPACE_ID => {
865                let temp_namespace = self
866                    .temporary_namespace()
867                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
868                let temp_context = temp_namespace.context();
869                let table_name = plan.table.clone();
870                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
871                match TransactionContext::truncate(&temp_tx_context, plan)? {
872                    TransactionResult::Delete { rows_deleted } => {
873                        Ok(RuntimeStatementResult::Delete {
874                            rows_deleted,
875                            table_name,
876                        })
877                    }
878                    _ => Err(Error::Internal(
879                        "unexpected transaction result for temporary TRUNCATE".into(),
880                    )),
881                }
882            }
883            PERSISTENT_NAMESPACE_ID => {
884                if self.has_active_transaction() {
885                    let table_name = plan.table.clone();
886                    let result = match self.inner.execute_operation(PlanOperation::Truncate(plan)) {
887                        Ok(result) => result,
888                        Err(e) => {
889                            // If an error occurs during a transaction, abort it
890                            self.abort_transaction();
891                            return Err(e);
892                        }
893                    };
894                    match result {
895                        TransactionResult::Delete { rows_deleted } => {
896                            Ok(RuntimeStatementResult::Delete {
897                                rows_deleted,
898                                table_name,
899                            })
900                        }
901                        _ => Err(Error::Internal("expected Delete result".into())),
902                    }
903                } else {
904                    let table_name = plan.table.clone();
905                    let result = self.run_autocommit_truncate(plan)?;
906                    match result {
907                        TransactionResult::Delete { rows_deleted } => {
908                            Ok(RuntimeStatementResult::Delete {
909                                rows_deleted,
910                                table_name,
911                            })
912                        }
913                        _ => Err(Error::Internal("expected Delete result".into())),
914                    }
915                }
916            }
917            other => Err(Error::InvalidArgumentError(format!(
918                "Unknown storage namespace '{}'",
919                other
920            ))),
921        }
922    }
923}
924
925/// Implement [`CatalogDdl`] directly on the session so callers must import the trait
926/// to perform schema mutations. This keeps all runtime DDL entry points aligned with
927/// the shared contract used by contexts and storage namespaces.
928impl CatalogDdl for RuntimeSession {
929    type CreateTableOutput = StatementResult;
930    type DropTableOutput = StatementResult;
931    type RenameTableOutput = ();
932    type AlterTableOutput = StatementResult;
933    type CreateIndexOutput = StatementResult;
934    type DropIndexOutput = StatementResult;
935
936    fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
937        let target_namespace = plan
938            .namespace
939            .clone()
940            .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
941            .to_ascii_lowercase();
942
943        let plan = self.materialize_ctas_plan(plan)?;
944
945        match target_namespace.as_str() {
946            TEMPORARY_NAMESPACE_ID => {
947                let temp_namespace = self
948                    .temporary_namespace()
949                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
950                let (_, canonical) = canonical_table_name(&plan.name)?;
951                let result = temp_namespace.create_table(plan)?;
952                if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
953                    let namespace_id = temp_namespace.namespace_id().to_string();
954                    let registry = self.namespace_registry();
955                    registry
956                        .write()
957                        .expect("namespace registry poisoned")
958                        .register_table(&namespace_id, canonical);
959                }
960                Ok(result)
961            }
962            PERSISTENT_NAMESPACE_ID => {
963                if self.has_active_transaction() {
964                    match self
965                        .inner
966                        .execute_operation(PlanOperation::CreateTable(plan))
967                    {
968                        Ok(TransactionResult::CreateTable { table_name }) => {
969                            Ok(RuntimeStatementResult::CreateTable { table_name })
970                        }
971                        Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
972                        Ok(_) => Err(Error::Internal(
973                            "expected CreateTable result during transactional CREATE TABLE".into(),
974                        )),
975                        Err(err) => {
976                            self.abort_transaction();
977                            Err(err)
978                        }
979                    }
980                } else {
981                    if self.inner.has_table_locked_by_other_session(&plan.name) {
982                        return Err(Error::TransactionContextError(format!(
983                            "table '{}' is locked by another active transaction",
984                            plan.name
985                        )));
986                    }
987                    self.run_autocommit_create_table(plan)
988                }
989            }
990            other => Err(Error::InvalidArgumentError(format!(
991                "Unknown storage namespace '{}'",
992                other
993            ))),
994        }
995    }
996
997    fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
998        let (_, canonical_table) = canonical_table_name(&plan.name)?;
999        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1000
1001        match namespace_id.as_str() {
1002            TEMPORARY_NAMESPACE_ID => {
1003                let temp_namespace = self
1004                    .temporary_namespace()
1005                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1006                temp_namespace.drop_table(plan)?;
1007                let registry = self.namespace_registry();
1008                registry
1009                    .write()
1010                    .expect("namespace registry poisoned")
1011                    .unregister_table(&canonical_table);
1012                Ok(RuntimeStatementResult::NoOp)
1013            }
1014            PERSISTENT_NAMESPACE_ID => {
1015                if self.has_active_transaction() {
1016                    let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
1017                    if !referencing_tables.is_empty() {
1018                        let referencing_table = &referencing_tables[0];
1019                        self.abort_transaction();
1020                        return Err(Error::CatalogError(format!(
1021                            "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
1022                            referencing_table
1023                        )));
1024                    }
1025
1026                    match self
1027                        .inner
1028                        .execute_operation(PlanOperation::DropTable(plan.clone()))
1029                    {
1030                        Ok(TransactionResult::NoOp) => {
1031                            let registry = self.namespace_registry();
1032                            registry
1033                                .write()
1034                                .expect("namespace registry poisoned")
1035                                .unregister_table(&canonical_table);
1036                            Ok(RuntimeStatementResult::NoOp)
1037                        }
1038                        Ok(_) => Err(Error::Internal(
1039                            "expected NoOp result for DROP TABLE during transactional execution"
1040                                .into(),
1041                        )),
1042                        Err(err) => {
1043                            self.abort_transaction();
1044                            Err(err)
1045                        }
1046                    }
1047                } else {
1048                    if self.inner.has_table_locked_by_other_session(&plan.name) {
1049                        return Err(Error::TransactionContextError(format!(
1050                            "table '{}' is locked by another active transaction",
1051                            plan.name
1052                        )));
1053                    }
1054                    let result = self.run_autocommit_drop_table(plan)?;
1055                    let registry = self.namespace_registry();
1056                    registry
1057                        .write()
1058                        .expect("namespace registry poisoned")
1059                        .unregister_table(&canonical_table);
1060                    Ok(result)
1061                }
1062            }
1063            other => Err(Error::InvalidArgumentError(format!(
1064                "Unknown storage namespace '{}'",
1065                other
1066            ))),
1067        }
1068    }
1069
1070    fn create_view(&self, plan: CreateViewPlan) -> Result<()> {
1071        let target_namespace = plan
1072            .namespace
1073            .clone()
1074            .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
1075            .to_ascii_lowercase();
1076
1077        match target_namespace.as_str() {
1078            TEMPORARY_NAMESPACE_ID => {
1079                let temp_namespace = self
1080                    .temporary_namespace()
1081                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1082                let (_, canonical) = canonical_table_name(&plan.name)?;
1083                temp_namespace.create_view(plan)?;
1084                let namespace_id = temp_namespace.namespace_id().to_string();
1085                let registry = self.namespace_registry();
1086                registry
1087                    .write()
1088                    .expect("namespace registry poisoned")
1089                    .register_table(&namespace_id, canonical);
1090                Ok(())
1091            }
1092            PERSISTENT_NAMESPACE_ID => {
1093                let persistent_namespace = self.persistent_namespace();
1094                persistent_namespace.create_view(plan)
1095            }
1096            other => Err(Error::InvalidArgumentError(format!(
1097                "Unknown storage namespace '{}'",
1098                other
1099            ))),
1100        }
1101    }
1102
1103    fn drop_view(&self, plan: DropViewPlan) -> Result<()> {
1104        let (_, canonical_view) = canonical_table_name(&plan.name)?;
1105        let namespace_id = self.resolve_namespace_for_table(&canonical_view);
1106
1107        match namespace_id.as_str() {
1108            TEMPORARY_NAMESPACE_ID => {
1109                let temp_namespace = self
1110                    .temporary_namespace()
1111                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1112                temp_namespace.drop_view(plan)?;
1113                let registry = self.namespace_registry();
1114                registry
1115                    .write()
1116                    .expect("namespace registry poisoned")
1117                    .unregister_table(&canonical_view);
1118                Ok(())
1119            }
1120            PERSISTENT_NAMESPACE_ID => {
1121                let persistent_namespace = self.persistent_namespace();
1122                persistent_namespace.drop_view(plan)
1123            }
1124            other => Err(Error::InvalidArgumentError(format!(
1125                "Unknown storage namespace '{}'",
1126                other
1127            ))),
1128        }
1129    }
1130
1131    fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
1132        if self.has_active_transaction() {
1133            return Err(Error::InvalidArgumentError(
1134                "ALTER TABLE RENAME is not supported inside an active transaction".into(),
1135            ));
1136        }
1137
1138        let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
1139        let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
1140        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1141
1142        match namespace_id.as_str() {
1143            TEMPORARY_NAMESPACE_ID => {
1144                let temp_namespace = self
1145                    .temporary_namespace()
1146                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1147                match temp_namespace.rename_table(plan.clone()) {
1148                    Ok(()) => {
1149                        let namespace_id = temp_namespace.namespace_id().to_string();
1150                        let registry = self.namespace_registry();
1151                        let mut registry = registry.write().expect("namespace registry poisoned");
1152                        registry.unregister_table(&canonical_table);
1153                        registry.register_table(&namespace_id, new_canonical);
1154                        Ok(())
1155                    }
1156                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1157                    Err(err) => Err(err),
1158                }
1159            }
1160            PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1161                Ok(()) => Ok(()),
1162                Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1163                Err(err) => Err(err),
1164            },
1165            other => Err(Error::InvalidArgumentError(format!(
1166                "Unknown storage namespace '{}'",
1167                other
1168            ))),
1169        }
1170    }
1171
1172    fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1173        let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1174        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1175
1176        match namespace_id.as_str() {
1177            TEMPORARY_NAMESPACE_ID => {
1178                let temp_namespace = self
1179                    .temporary_namespace()
1180                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1181
1182                let context = temp_namespace.context();
1183                let catalog_service = &context.catalog_service;
1184                let view = match catalog_service.table_view(&canonical_table) {
1185                    Ok(view) => view,
1186                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1187                        return Ok(RuntimeStatementResult::NoOp);
1188                    }
1189                    Err(err) => return Err(err),
1190                };
1191                let table_id = view
1192                    .table_meta
1193                    .as_ref()
1194                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1195                    .table_id;
1196
1197                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1198
1199                Ok(temp_namespace.alter_table(plan)?)
1200            }
1201            PERSISTENT_NAMESPACE_ID => {
1202                let persistent = self.persistent_namespace();
1203                let context = persistent.context();
1204                let catalog_service = &context.catalog_service;
1205                let view = match catalog_service.table_view(&canonical_table) {
1206                    Ok(view) => view,
1207                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1208                        return Ok(RuntimeStatementResult::NoOp);
1209                    }
1210                    Err(err) => return Err(err),
1211                };
1212                let table_id = view
1213                    .table_meta
1214                    .as_ref()
1215                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1216                    .table_id;
1217
1218                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1219
1220                self.run_autocommit_alter_table(plan)
1221            }
1222            other => Err(Error::InvalidArgumentError(format!(
1223                "Unknown storage namespace '{}'",
1224                other
1225            ))),
1226        }
1227    }
1228
1229    fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1230        if plan.columns.is_empty() {
1231            return Err(Error::InvalidArgumentError(
1232                "CREATE INDEX requires at least one column".into(),
1233            ));
1234        }
1235
1236        let (_, canonical_table) = canonical_table_name(&plan.table)?;
1237        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1238
1239        match namespace_id.as_str() {
1240            TEMPORARY_NAMESPACE_ID => {
1241                let temp_namespace = self
1242                    .temporary_namespace()
1243                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1244                Ok(temp_namespace.create_index(plan)?)
1245            }
1246            PERSISTENT_NAMESPACE_ID => {
1247                if self.has_active_transaction() {
1248                    return Err(Error::InvalidArgumentError(
1249                        "CREATE INDEX is not supported inside an active transaction".into(),
1250                    ));
1251                }
1252
1253                self.run_autocommit_create_index(plan)
1254            }
1255            other => Err(Error::InvalidArgumentError(format!(
1256                "Unknown storage namespace '{}'",
1257                other
1258            ))),
1259        }
1260    }
1261
1262    fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1263        if self.has_active_transaction() {
1264            return Err(Error::InvalidArgumentError(
1265                "DROP INDEX is not supported inside an active transaction".into(),
1266            ));
1267        }
1268
1269        let mut dropped = false;
1270
1271        match self.run_autocommit_drop_index(plan.clone()) {
1272            Ok(Some(_)) => {
1273                dropped = true;
1274            }
1275            Ok(None) => {}
1276            Err(err) => {
1277                if !super::is_index_not_found_error(&err) {
1278                    return Err(err);
1279                }
1280            }
1281        }
1282
1283        if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1284            match temp_namespace.drop_index(plan.clone()) {
1285                Ok(Some(_)) => {
1286                    dropped = true;
1287                }
1288                Ok(None) => {}
1289                Err(err) => {
1290                    if !super::is_index_not_found_error(&err) {
1291                        return Err(err);
1292                    }
1293                }
1294            }
1295        }
1296
1297        if dropped || plan.if_exists {
1298            Ok(RuntimeStatementResult::NoOp)
1299        } else {
1300            Err(Error::CatalogError(format!(
1301                "Index '{}' does not exist",
1302                plan.name
1303            )))
1304        }
1305    }
1306}