llkv_runtime/
runtime_session.rs

1// TODO: Implement a common trait (similar to CataglogDdl) for runtime sessions and llkv-transaction sessions
2
3use std::sync::{Arc, RwLock};
4
5use arrow::record_batch::RecordBatch;
6use llkv_result::{Error, Result};
7use llkv_storage::pager::{BoxedPager, MemPager, Pager};
8use llkv_table::{
9    SingleColumnIndexDescriptor, canonical_table_name, validate_alter_table_operation,
10};
11use simd_r_drive_entry_handle::EntryHandle;
12
13use crate::{
14    AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan,
15    DropIndexPlan, DropTablePlan, InsertPlan, InsertSource, PlanColumnSpec, PlanOperation,
16    PlanValue, RenameTablePlan, RuntimeContext, RuntimeStatementResult, RuntimeTransactionContext,
17    SelectExecution, SelectPlan, SelectProjection, TransactionContext, TransactionKind,
18    TransactionResult, TransactionSession, UpdatePlan,
19};
20use crate::{
21    PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
22    RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry, TEMPORARY_NAMESPACE_ID,
23    TemporaryRuntimeNamespace,
24};
25
26pub(crate) struct SessionNamespaces<P>
27where
28    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
29{
30    persistent: Arc<PersistentRuntimeNamespace<P>>,
31    temporary: Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>>,
32    registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
33}
34
35impl<P> SessionNamespaces<P>
36where
37    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
38{
39    pub(crate) fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
40        let persistent = Arc::new(PersistentRuntimeNamespace::new(
41            PERSISTENT_NAMESPACE_ID.to_string(),
42            Arc::clone(&base_context),
43        ));
44
45        let mut registry = RuntimeStorageNamespaceRegistry::new(
46            RuntimeStorageNamespace::namespace_id(persistent.as_ref()).clone(),
47        );
48        registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
49
50        let temporary = {
51            let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
52            let temp_context = Arc::new(RuntimeContext::new(temp_pager));
53            let namespace = Arc::new(TemporaryRuntimeNamespace::new(
54                TEMPORARY_NAMESPACE_ID.to_string(),
55                temp_context,
56            ));
57            registry.register_namespace(
58                Arc::clone(&namespace),
59                vec![TEMPORARY_NAMESPACE_ID.to_string()],
60                true,
61            );
62            namespace
63        };
64
65        Self {
66            persistent,
67            temporary: Some(temporary),
68            registry: Arc::new(RwLock::new(registry)),
69        }
70    }
71
72    pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace<P>> {
73        Arc::clone(&self.persistent)
74    }
75
76    pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
77        self.temporary.as_ref().map(Arc::clone)
78    }
79
80    pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
81        Arc::clone(&self.registry)
82    }
83}
84
85impl<P> Drop for SessionNamespaces<P>
86where
87    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
88{
89    fn drop(&mut self) {
90        if let Some(temp) = &self.temporary {
91            let namespace_id = temp.namespace_id().to_string();
92            let canonical_names = {
93                let mut registry = self.registry.write().expect("namespace registry poisoned");
94                registry.drain_namespace_tables(&namespace_id)
95            };
96            temp.clear_tables(canonical_names);
97        }
98    }
99}
100
101/// A session for executing operations with optional transaction support.
102///
103/// This is a high-level wrapper around the transaction machinery that provides
104/// a clean API for users. Operations can be executed directly or within a transaction.
105pub struct RuntimeSession<P>
106where
107    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
108{
109    // TODO: Allow generic pager type for the secondary pager context
110    // NOTE: Sessions always embed a `MemPager` for temporary namespaces; extend the
111    // wrapper when pluggable temp storage is supported.
112    inner: TransactionSession<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
113    namespaces: Arc<SessionNamespaces<P>>,
114}
115
116impl<P> RuntimeSession<P>
117where
118    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
119{
120    pub(crate) fn from_parts(
121        inner: TransactionSession<
122            RuntimeTransactionContext<P>,
123            RuntimeTransactionContext<MemPager>,
124        >,
125        namespaces: Arc<SessionNamespaces<P>>,
126    ) -> Self {
127        Self { inner, namespaces }
128    }
129
130    /// Clone this session (reuses the same underlying TransactionSession).
131    /// This is necessary to maintain transaction state across Engine clones.
132    pub(crate) fn clone_session(&self) -> Self {
133        Self {
134            inner: self.inner.clone_session(),
135            namespaces: self.namespaces.clone(),
136        }
137    }
138
139    pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
140        self.namespaces.registry()
141    }
142
143    fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
144        self.namespace_registry()
145            .read()
146            .expect("namespace registry poisoned")
147            .namespace_for_table(canonical)
148    }
149
150    fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
151        if plan.tables.len() != 1 {
152            return None;
153        }
154
155        let qualified = plan.tables[0].qualified_name();
156        let (_, canonical) = canonical_table_name(&qualified).ok()?;
157        Some(self.resolve_namespace_for_table(&canonical))
158    }
159
160    fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
161        let temp_namespace = self
162            .temporary_namespace()
163            .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
164
165        let table_name = if plan.tables.len() == 1 {
166            plan.tables[0].qualified_name()
167        } else {
168            String::new()
169        };
170
171        let temp_context = temp_namespace.context();
172        let temp_tx_context = RuntimeTransactionContext::new(temp_context);
173        let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
174        let schema = execution.schema();
175        let batches = execution.collect()?;
176
177        let combined = if batches.is_empty() {
178            RecordBatch::new_empty(Arc::clone(&schema))
179        } else if batches.len() == 1 {
180            batches.into_iter().next().unwrap()
181        } else {
182            let refs: Vec<&RecordBatch> = batches.iter().collect();
183            arrow::compute::concat_batches(&schema, refs)?
184        };
185
186        let execution =
187            SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
188
189        Ok(RuntimeStatementResult::Select {
190            execution: Box::new(execution),
191            table_name,
192            schema,
193        })
194    }
195
196    fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace<P>> {
197        self.namespaces.persistent()
198    }
199
200    #[allow(dead_code)]
201    fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
202        self.namespaces.temporary()
203    }
204
205    fn base_transaction_context(&self) -> Arc<RuntimeTransactionContext<P>> {
206        Arc::clone(self.inner.context())
207    }
208
209    fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
210    where
211        F: FnOnce(&RuntimeTransactionContext<P>) -> Result<T>,
212    {
213        let context = self.base_transaction_context();
214        let default_snapshot = context.ctx().default_snapshot();
215        TransactionContext::set_snapshot(&*context, default_snapshot);
216        f(context.as_ref())
217    }
218
219    fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TransactionResult<P>> {
220        self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
221    }
222
223    fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TransactionResult<P>> {
224        self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
225    }
226
227    fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TransactionResult<P>> {
228        self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
229    }
230
231    fn run_autocommit_create_table(
232        &self,
233        plan: CreateTablePlan,
234    ) -> Result<RuntimeStatementResult<P>> {
235        let result =
236            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
237        match result {
238            TransactionResult::CreateTable { table_name } => {
239                Ok(RuntimeStatementResult::CreateTable { table_name })
240            }
241            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
242            _ => Err(Error::Internal(
243                "unexpected transaction result for CREATE TABLE".into(),
244            )),
245        }
246    }
247
248    fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<RuntimeStatementResult<P>> {
249        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
250        Ok(RuntimeStatementResult::NoOp)
251    }
252
253    fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
254        self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
255    }
256
257    fn run_autocommit_alter_table(
258        &self,
259        plan: AlterTablePlan,
260    ) -> Result<RuntimeStatementResult<P>> {
261        let result =
262            self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
263        match result {
264            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
265            TransactionResult::CreateTable { table_name } => {
266                Ok(RuntimeStatementResult::CreateTable { table_name })
267            }
268            TransactionResult::CreateIndex {
269                table_name,
270                index_name,
271            } => Ok(RuntimeStatementResult::CreateIndex {
272                table_name,
273                index_name,
274            }),
275            _ => Err(Error::Internal(
276                "unexpected transaction result for ALTER TABLE".into(),
277            )),
278        }
279    }
280
281    fn run_autocommit_create_index(
282        &self,
283        plan: CreateIndexPlan,
284    ) -> Result<RuntimeStatementResult<P>> {
285        let result =
286            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
287        match result {
288            TransactionResult::CreateIndex {
289                table_name,
290                index_name,
291            } => Ok(RuntimeStatementResult::CreateIndex {
292                table_name,
293                index_name,
294            }),
295            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
296            _ => Err(Error::Internal(
297                "unexpected transaction result for CREATE INDEX".into(),
298            )),
299        }
300    }
301
302    fn run_autocommit_drop_index(
303        &self,
304        plan: DropIndexPlan,
305    ) -> Result<Option<SingleColumnIndexDescriptor>> {
306        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
307    }
308
309    /// Begin a transaction in this session.
310    /// Creates an empty staging context for new tables created within the transaction.
311    /// Existing tables are accessed via MVCC visibility filtering - NO data copying occurs.
312    pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
313        let staging_pager = Arc::new(MemPager::default());
314        tracing::trace!(
315            "BEGIN_TRANSACTION: Created staging pager at {:p}",
316            &*staging_pager
317        );
318        let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
319
320        // Staging context is EMPTY - used only for tables created within the transaction.
321        // Existing tables are read from base context with MVCC visibility filtering.
322        // No data copying occurs at BEGIN - this is pure MVCC.
323
324        let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
325
326        self.inner.begin_transaction(staging_wrapper)?;
327        Ok(RuntimeStatementResult::Transaction {
328            kind: TransactionKind::Begin,
329        })
330    }
331
332    /// Mark the current transaction as aborted due to an error.
333    /// This should be called when any error occurs during a transaction.
334    pub fn abort_transaction(&self) {
335        self.inner.abort_transaction();
336    }
337
338    /// Check if this session has an active transaction.
339    pub fn has_active_transaction(&self) -> bool {
340        let result = self.inner.has_active_transaction();
341        tracing::trace!("SESSION: has_active_transaction() = {}", result);
342        result
343    }
344
345    /// Check if the current transaction has been aborted due to an error.
346    pub fn is_aborted(&self) -> bool {
347        self.inner.is_aborted()
348    }
349
350    /// Check if a table was created in the current active transaction.
351    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
352        self.inner.is_table_created_in_transaction(table_name)
353    }
354
355    /// Get column specifications for a table created in the current transaction.
356    /// Returns `None` if there's no active transaction or the table wasn't created in it.
357    pub fn table_column_specs_from_transaction(
358        &self,
359        table_name: &str,
360    ) -> Option<Vec<PlanColumnSpec>> {
361        self.inner.table_column_specs_from_transaction(table_name)
362    }
363
364    /// Get tables that reference the given table via foreign keys created in the current transaction.
365    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
366    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
367        self.inner
368            .tables_referencing_in_transaction(referenced_table)
369    }
370
371    /// Commit the current transaction and apply changes to the base context.
372    /// If the transaction was aborted, this acts as a ROLLBACK instead.
373    pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
374        tracing::trace!("Session::commit_transaction called");
375        let (tx_result, operations) = self.inner.commit_transaction()?;
376        tracing::trace!(
377            "Session::commit_transaction got {} operations",
378            operations.len()
379        );
380
381        if !operations.is_empty() {
382            let dropped_tables = self
383                .inner
384                .context()
385                .ctx()
386                .dropped_tables
387                .read()
388                .unwrap()
389                .clone();
390            if !dropped_tables.is_empty() {
391                for operation in &operations {
392                    let table_name_opt = match operation {
393                        PlanOperation::Insert(plan) => Some(plan.table.as_str()),
394                        PlanOperation::Update(plan) => Some(plan.table.as_str()),
395                        PlanOperation::Delete(plan) => Some(plan.table.as_str()),
396                        _ => None,
397                    };
398                    if let Some(table_name) = table_name_opt {
399                        let (_, canonical) = canonical_table_name(table_name)?;
400                        if dropped_tables.contains(&canonical) {
401                            self.abort_transaction();
402                            return Err(Error::TransactionContextError(
403                                "another transaction has dropped this table".into(),
404                            ));
405                        }
406                    }
407                }
408            }
409        }
410
411        // Extract the transaction kind from the transaction module's result
412        let kind = match tx_result {
413            TransactionResult::Transaction { kind } => kind,
414            _ => {
415                return Err(Error::Internal(
416                    "commit_transaction returned non-transaction result".into(),
417                ));
418            }
419        };
420        tracing::trace!("Session::commit_transaction kind={:?}", kind);
421
422        // Only replay operations if there are any (empty if transaction was aborted)
423        for operation in operations {
424            match operation {
425                PlanOperation::CreateTable(plan) => {
426                    TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
427                }
428                PlanOperation::DropTable(plan) => {
429                    TransactionContext::drop_table(&**self.inner.context(), plan)?;
430                }
431                PlanOperation::Insert(plan) => {
432                    TransactionContext::insert(&**self.inner.context(), plan)?;
433                }
434                PlanOperation::Update(plan) => {
435                    TransactionContext::update(&**self.inner.context(), plan)?;
436                }
437                PlanOperation::Delete(plan) => {
438                    TransactionContext::delete(&**self.inner.context(), plan)?;
439                }
440                _ => {}
441            }
442        }
443
444        // Reset the base context snapshot to the default auto-commit view now that
445        // the transaction has been replayed onto the base tables.
446        let base_ctx = self.inner.context();
447        let default_snapshot = base_ctx.ctx().default_snapshot();
448        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
449
450        // Persist the next_txn_id to the catalog after a successful commit
451        if matches!(kind, TransactionKind::Commit) {
452            let ctx = base_ctx.ctx();
453            let next_txn_id = ctx.txn_manager().current_next_txn_id();
454            if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
455                tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
456            }
457        }
458
459        // Return a StatementResult with the correct kind (Commit or Rollback)
460        Ok(RuntimeStatementResult::Transaction { kind })
461    }
462
463    /// Rollback the current transaction, discarding all changes.
464    pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
465        self.inner.rollback_transaction()?;
466        let base_ctx = self.inner.context();
467        let default_snapshot = base_ctx.ctx().default_snapshot();
468        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
469        Ok(RuntimeStatementResult::Transaction {
470            kind: TransactionKind::Rollback,
471        })
472    }
473
474    fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
475        // Only materialize if source is a SELECT query
476        // If source is already Batches, leave it alone
477        if matches!(plan.source, Some(CreateTableSource::Select { .. }))
478            && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
479        {
480            let select_result = self.execute_select_plan(*select_plan)?;
481            let (schema, batches) = match select_result {
482                RuntimeStatementResult::Select {
483                    schema, execution, ..
484                } => {
485                    let batches = execution.collect()?;
486                    (schema, batches)
487                }
488                _ => {
489                    return Err(Error::Internal(
490                        "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
491                    ));
492                }
493            };
494            plan.source = Some(CreateTableSource::Batches { schema, batches });
495        }
496        Ok(plan)
497    }
498
499    fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
500        let InsertPlan {
501            table,
502            columns,
503            source,
504        } = plan;
505
506        match source {
507            InsertSource::Rows(rows) => {
508                let count = rows.len();
509                Ok((
510                    InsertPlan {
511                        table,
512                        columns,
513                        source: InsertSource::Rows(rows),
514                    },
515                    count,
516                ))
517            }
518            InsertSource::Batches(batches) => {
519                let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
520                Ok((
521                    InsertPlan {
522                        table,
523                        columns,
524                        source: InsertSource::Batches(batches),
525                    },
526                    count,
527                ))
528            }
529            InsertSource::Select { plan: select_plan } => {
530                let select_result = self.execute_select_plan(*select_plan)?;
531                let rows = match select_result {
532                    RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
533                    _ => {
534                        return Err(Error::Internal(
535                            "expected Select result when executing INSERT ... SELECT".into(),
536                        ));
537                    }
538                };
539                let count = rows.len();
540                Ok((
541                    InsertPlan {
542                        table,
543                        columns,
544                        source: InsertSource::Rows(rows),
545                    },
546                    count,
547                ))
548            }
549        }
550    }
551
552    /// Insert rows (outside or inside transaction).
553    pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
554        tracing::trace!("Session::insert called for table={}", plan.table);
555        let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
556        let table_name = plan.table.clone();
557        let (_, canonical_table) = canonical_table_name(&plan.table)?;
558        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
559
560        match namespace_id.as_str() {
561            TEMPORARY_NAMESPACE_ID => {
562                let temp_namespace = self
563                    .temporary_namespace()
564                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
565                let temp_context = temp_namespace.context();
566                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
567                match TransactionContext::insert(&temp_tx_context, plan)? {
568                    TransactionResult::Insert { .. } => {}
569                    _ => {
570                        return Err(Error::Internal(
571                            "unexpected transaction result for temporary INSERT".into(),
572                        ));
573                    }
574                }
575                Ok(RuntimeStatementResult::Insert {
576                    rows_inserted,
577                    table_name,
578                })
579            }
580            PERSISTENT_NAMESPACE_ID => {
581                if self.has_active_transaction() {
582                    match self.inner.execute_operation(PlanOperation::Insert(plan)) {
583                        Ok(_) => {
584                            tracing::trace!("Session::insert succeeded for table={}", table_name);
585                            Ok(RuntimeStatementResult::Insert {
586                                rows_inserted,
587                                table_name,
588                            })
589                        }
590                        Err(e) => {
591                            tracing::trace!(
592                                "Session::insert failed for table={}, error={:?}",
593                                table_name,
594                                e
595                            );
596                            if matches!(e, Error::ConstraintError(_)) {
597                                tracing::trace!("Transaction is_aborted=true");
598                                self.abort_transaction();
599                            }
600                            Err(e)
601                        }
602                    }
603                } else {
604                    let result = self.run_autocommit_insert(plan)?;
605                    if !matches!(result, TransactionResult::Insert { .. }) {
606                        return Err(Error::Internal(
607                            "unexpected transaction result for INSERT operation".into(),
608                        ));
609                    }
610                    Ok(RuntimeStatementResult::Insert {
611                        rows_inserted,
612                        table_name,
613                    })
614                }
615            }
616            other => Err(Error::InvalidArgumentError(format!(
617                "Unknown storage namespace '{}'",
618                other
619            ))),
620        }
621    }
622
623    /// Select rows (outside or inside transaction).
624    pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
625        if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
626            && namespace_id == TEMPORARY_NAMESPACE_ID
627        {
628            return self.select_from_temporary(plan);
629        }
630
631        if self.has_active_transaction() {
632            let tx_result = match self
633                .inner
634                .execute_operation(PlanOperation::Select(plan.clone()))
635            {
636                Ok(result) => result,
637                Err(e) => {
638                    // Only abort transaction on specific errors (constraint violations, etc.)
639                    // Don't abort on catalog errors (table doesn't exist) or similar
640                    if matches!(e, Error::ConstraintError(_)) {
641                        self.abort_transaction();
642                    }
643                    return Err(e);
644                }
645            };
646            match tx_result {
647                TransactionResult::Select {
648                    table_name,
649                    schema,
650                    execution: staging_execution,
651                } => {
652                    // Convert from staging (MemPager) execution to base pager execution
653                    // by collecting batches and rebuilding
654                    let batches = staging_execution.collect().unwrap_or_default();
655                    let combined = if batches.is_empty() {
656                        RecordBatch::new_empty(Arc::clone(&schema))
657                    } else if batches.len() == 1 {
658                        batches.into_iter().next().unwrap()
659                    } else {
660                        let refs: Vec<&RecordBatch> = batches.iter().collect();
661                        arrow::compute::concat_batches(&schema, refs)?
662                    };
663
664                    let execution = SelectExecution::from_batch(
665                        table_name.clone(),
666                        Arc::clone(&schema),
667                        combined,
668                    );
669
670                    Ok(RuntimeStatementResult::Select {
671                        execution: Box::new(execution),
672                        table_name,
673                        schema,
674                    })
675                }
676                _ => Err(Error::Internal("expected Select result".into())),
677            }
678        } else {
679            // Call via TransactionContext trait
680            let table_name = if plan.tables.len() == 1 {
681                plan.tables[0].qualified_name()
682            } else {
683                String::new()
684            };
685            let execution = self.with_autocommit_transaction_context(|ctx| {
686                TransactionContext::execute_select(ctx, plan)
687            })?;
688            let schema = execution.schema();
689            Ok(RuntimeStatementResult::Select {
690                execution: Box::new(execution),
691                table_name,
692                schema,
693            })
694        }
695    }
696
697    /// Convenience helper to fetch all rows from a table within this session.
698    pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
699        let plan =
700            SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
701        match self.execute_select_plan(plan)? {
702            RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
703            other => Err(Error::Internal(format!(
704                "expected Select result when reading table '{}', got {:?}",
705                table, other
706            ))),
707        }
708    }
709
710    pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
711        let (_, canonical_table) = canonical_table_name(&plan.table)?;
712        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
713
714        match namespace_id.as_str() {
715            TEMPORARY_NAMESPACE_ID => {
716                let temp_namespace = self
717                    .temporary_namespace()
718                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
719                let temp_context = temp_namespace.context();
720                let table_name = plan.table.clone();
721                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
722                match TransactionContext::update(&temp_tx_context, plan)? {
723                    TransactionResult::Update { rows_updated, .. } => {
724                        Ok(RuntimeStatementResult::Update {
725                            rows_updated,
726                            table_name,
727                        })
728                    }
729                    _ => Err(Error::Internal(
730                        "unexpected transaction result for temporary UPDATE".into(),
731                    )),
732                }
733            }
734            PERSISTENT_NAMESPACE_ID => {
735                if self.has_active_transaction() {
736                    let table_name = plan.table.clone();
737                    let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
738                        Ok(result) => result,
739                        Err(e) => {
740                            // If an error occurs during a transaction, abort it
741                            self.abort_transaction();
742                            return Err(e);
743                        }
744                    };
745                    match result {
746                        TransactionResult::Update {
747                            rows_matched: _,
748                            rows_updated,
749                        } => Ok(RuntimeStatementResult::Update {
750                            rows_updated,
751                            table_name,
752                        }),
753                        _ => Err(Error::Internal("expected Update result".into())),
754                    }
755                } else {
756                    let table_name = plan.table.clone();
757                    let result = self.run_autocommit_update(plan)?;
758                    match result {
759                        TransactionResult::Update {
760                            rows_matched: _,
761                            rows_updated,
762                        } => Ok(RuntimeStatementResult::Update {
763                            rows_updated,
764                            table_name,
765                        }),
766                        _ => Err(Error::Internal("expected Update result".into())),
767                    }
768                }
769            }
770            other => Err(Error::InvalidArgumentError(format!(
771                "Unknown storage namespace '{}'",
772                other
773            ))),
774        }
775    }
776
777    pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
778        let (_, canonical_table) = canonical_table_name(&plan.table)?;
779        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
780
781        match namespace_id.as_str() {
782            TEMPORARY_NAMESPACE_ID => {
783                let temp_namespace = self
784                    .temporary_namespace()
785                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
786                let temp_context = temp_namespace.context();
787                let table_name = plan.table.clone();
788                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
789                match TransactionContext::delete(&temp_tx_context, plan)? {
790                    TransactionResult::Delete { rows_deleted } => {
791                        Ok(RuntimeStatementResult::Delete {
792                            rows_deleted,
793                            table_name,
794                        })
795                    }
796                    _ => Err(Error::Internal(
797                        "unexpected transaction result for temporary DELETE".into(),
798                    )),
799                }
800            }
801            PERSISTENT_NAMESPACE_ID => {
802                if self.has_active_transaction() {
803                    let table_name = plan.table.clone();
804                    let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
805                        Ok(result) => result,
806                        Err(e) => {
807                            // If an error occurs during a transaction, abort it
808                            self.abort_transaction();
809                            return Err(e);
810                        }
811                    };
812                    match result {
813                        TransactionResult::Delete { rows_deleted } => {
814                            Ok(RuntimeStatementResult::Delete {
815                                rows_deleted,
816                                table_name,
817                            })
818                        }
819                        _ => Err(Error::Internal("expected Delete result".into())),
820                    }
821                } else {
822                    let table_name = plan.table.clone();
823                    let result = self.run_autocommit_delete(plan)?;
824                    match result {
825                        TransactionResult::Delete { rows_deleted } => {
826                            Ok(RuntimeStatementResult::Delete {
827                                rows_deleted,
828                                table_name,
829                            })
830                        }
831                        _ => Err(Error::Internal("expected Delete result".into())),
832                    }
833                }
834            }
835            other => Err(Error::InvalidArgumentError(format!(
836                "Unknown storage namespace '{}'",
837                other
838            ))),
839        }
840    }
841}
842
843/// Implement [`CatalogDdl`] directly on the session so callers must import the trait
844/// to perform schema mutations. This keeps all runtime DDL entry points aligned with
845/// the shared contract used by contexts and storage namespaces.
846impl<P> CatalogDdl for RuntimeSession<P>
847where
848    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
849{
850    type CreateTableOutput = RuntimeStatementResult<P>;
851    type DropTableOutput = RuntimeStatementResult<P>;
852    type RenameTableOutput = ();
853    type AlterTableOutput = RuntimeStatementResult<P>;
854    type CreateIndexOutput = RuntimeStatementResult<P>;
855    type DropIndexOutput = RuntimeStatementResult<P>;
856
857    fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
858        let target_namespace = plan
859            .namespace
860            .clone()
861            .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
862            .to_ascii_lowercase();
863
864        let plan = self.materialize_ctas_plan(plan)?;
865
866        match target_namespace.as_str() {
867            TEMPORARY_NAMESPACE_ID => {
868                let temp_namespace = self
869                    .temporary_namespace()
870                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
871                let (_, canonical) = canonical_table_name(&plan.name)?;
872                let result = temp_namespace.create_table(plan)?;
873                if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
874                    let namespace_id = temp_namespace.namespace_id().to_string();
875                    let registry = self.namespace_registry();
876                    registry
877                        .write()
878                        .expect("namespace registry poisoned")
879                        .register_table(&namespace_id, canonical);
880                }
881                result.convert_pager_type::<P>()
882            }
883            PERSISTENT_NAMESPACE_ID => {
884                if self.has_active_transaction() {
885                    match self
886                        .inner
887                        .execute_operation(PlanOperation::CreateTable(plan))
888                    {
889                        Ok(TransactionResult::CreateTable { table_name }) => {
890                            Ok(RuntimeStatementResult::CreateTable { table_name })
891                        }
892                        Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
893                        Ok(_) => Err(Error::Internal(
894                            "expected CreateTable result during transactional CREATE TABLE".into(),
895                        )),
896                        Err(err) => {
897                            self.abort_transaction();
898                            Err(err)
899                        }
900                    }
901                } else {
902                    if self.inner.has_table_locked_by_other_session(&plan.name) {
903                        return Err(Error::TransactionContextError(format!(
904                            "table '{}' is locked by another active transaction",
905                            plan.name
906                        )));
907                    }
908                    self.run_autocommit_create_table(plan)
909                }
910            }
911            other => Err(Error::InvalidArgumentError(format!(
912                "Unknown storage namespace '{}'",
913                other
914            ))),
915        }
916    }
917
918    fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
919        let (_, canonical_table) = canonical_table_name(&plan.name)?;
920        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
921
922        match namespace_id.as_str() {
923            TEMPORARY_NAMESPACE_ID => {
924                let temp_namespace = self
925                    .temporary_namespace()
926                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
927                temp_namespace.drop_table(plan)?;
928                let registry = self.namespace_registry();
929                registry
930                    .write()
931                    .expect("namespace registry poisoned")
932                    .unregister_table(&canonical_table);
933                Ok(RuntimeStatementResult::NoOp)
934            }
935            PERSISTENT_NAMESPACE_ID => {
936                if self.has_active_transaction() {
937                    let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
938                    if !referencing_tables.is_empty() {
939                        let referencing_table = &referencing_tables[0];
940                        self.abort_transaction();
941                        return Err(Error::CatalogError(format!(
942                            "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
943                            referencing_table
944                        )));
945                    }
946
947                    match self
948                        .inner
949                        .execute_operation(PlanOperation::DropTable(plan.clone()))
950                    {
951                        Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
952                        Ok(_) => Err(Error::Internal(
953                            "expected NoOp result for DROP TABLE during transactional execution"
954                                .into(),
955                        )),
956                        Err(err) => {
957                            self.abort_transaction();
958                            Err(err)
959                        }
960                    }
961                } else {
962                    if self.inner.has_table_locked_by_other_session(&plan.name) {
963                        return Err(Error::TransactionContextError(format!(
964                            "table '{}' is locked by another active transaction",
965                            plan.name
966                        )));
967                    }
968                    self.run_autocommit_drop_table(plan)
969                }
970            }
971            other => Err(Error::InvalidArgumentError(format!(
972                "Unknown storage namespace '{}'",
973                other
974            ))),
975        }
976    }
977
978    fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
979        if self.has_active_transaction() {
980            return Err(Error::InvalidArgumentError(
981                "ALTER TABLE RENAME is not supported inside an active transaction".into(),
982            ));
983        }
984
985        let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
986        let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
987        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
988
989        match namespace_id.as_str() {
990            TEMPORARY_NAMESPACE_ID => {
991                let temp_namespace = self
992                    .temporary_namespace()
993                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
994                match temp_namespace.rename_table(plan.clone()) {
995                    Ok(()) => {
996                        let namespace_id = temp_namespace.namespace_id().to_string();
997                        let registry = self.namespace_registry();
998                        let mut registry = registry.write().expect("namespace registry poisoned");
999                        registry.unregister_table(&canonical_table);
1000                        registry.register_table(&namespace_id, new_canonical);
1001                        Ok(())
1002                    }
1003                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1004                    Err(err) => Err(err),
1005                }
1006            }
1007            PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1008                Ok(()) => Ok(()),
1009                Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1010                Err(err) => Err(err),
1011            },
1012            other => Err(Error::InvalidArgumentError(format!(
1013                "Unknown storage namespace '{}'",
1014                other
1015            ))),
1016        }
1017    }
1018
1019    fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1020        let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1021        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1022
1023        match namespace_id.as_str() {
1024            TEMPORARY_NAMESPACE_ID => {
1025                let temp_namespace = self
1026                    .temporary_namespace()
1027                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1028
1029                let context = temp_namespace.context();
1030                let catalog_service = &context.catalog_service;
1031                let view = match catalog_service.table_view(&canonical_table) {
1032                    Ok(view) => view,
1033                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1034                        return Ok(RuntimeStatementResult::NoOp);
1035                    }
1036                    Err(err) => return Err(err),
1037                };
1038                let table_id = view
1039                    .table_meta
1040                    .as_ref()
1041                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1042                    .table_id;
1043
1044                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1045
1046                temp_namespace.alter_table(plan)?.convert_pager_type::<P>()
1047            }
1048            PERSISTENT_NAMESPACE_ID => {
1049                let persistent = self.persistent_namespace();
1050                let context = persistent.context();
1051                let catalog_service = &context.catalog_service;
1052                let view = match catalog_service.table_view(&canonical_table) {
1053                    Ok(view) => view,
1054                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1055                        return Ok(RuntimeStatementResult::NoOp);
1056                    }
1057                    Err(err) => return Err(err),
1058                };
1059                let table_id = view
1060                    .table_meta
1061                    .as_ref()
1062                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1063                    .table_id;
1064
1065                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1066
1067                self.run_autocommit_alter_table(plan)
1068            }
1069            other => Err(Error::InvalidArgumentError(format!(
1070                "Unknown storage namespace '{}'",
1071                other
1072            ))),
1073        }
1074    }
1075
1076    fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1077        if plan.columns.is_empty() {
1078            return Err(Error::InvalidArgumentError(
1079                "CREATE INDEX requires at least one column".into(),
1080            ));
1081        }
1082
1083        for column_plan in &plan.columns {
1084            if !column_plan.ascending || column_plan.nulls_first {
1085                return Err(Error::InvalidArgumentError(
1086                    "only ASC indexes with NULLS LAST are supported".into(),
1087                ));
1088            }
1089        }
1090
1091        let (_, canonical_table) = canonical_table_name(&plan.table)?;
1092        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1093
1094        match namespace_id.as_str() {
1095            TEMPORARY_NAMESPACE_ID => {
1096                let temp_namespace = self
1097                    .temporary_namespace()
1098                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1099                temp_namespace.create_index(plan)?.convert_pager_type::<P>()
1100            }
1101            PERSISTENT_NAMESPACE_ID => {
1102                if self.has_active_transaction() {
1103                    return Err(Error::InvalidArgumentError(
1104                        "CREATE INDEX is not supported inside an active transaction".into(),
1105                    ));
1106                }
1107
1108                self.run_autocommit_create_index(plan)
1109            }
1110            other => Err(Error::InvalidArgumentError(format!(
1111                "Unknown storage namespace '{}'",
1112                other
1113            ))),
1114        }
1115    }
1116
1117    fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1118        if self.has_active_transaction() {
1119            return Err(Error::InvalidArgumentError(
1120                "DROP INDEX is not supported inside an active transaction".into(),
1121            ));
1122        }
1123
1124        let mut dropped = false;
1125
1126        match self.run_autocommit_drop_index(plan.clone()) {
1127            Ok(Some(_)) => {
1128                dropped = true;
1129            }
1130            Ok(None) => {}
1131            Err(err) => {
1132                if !super::is_index_not_found_error(&err) {
1133                    return Err(err);
1134                }
1135            }
1136        }
1137
1138        if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1139            match temp_namespace.drop_index(plan.clone()) {
1140                Ok(Some(_)) => {
1141                    dropped = true;
1142                }
1143                Ok(None) => {}
1144                Err(err) => {
1145                    if !super::is_index_not_found_error(&err) {
1146                        return Err(err);
1147                    }
1148                }
1149            }
1150        }
1151
1152        if dropped || plan.if_exists {
1153            Ok(RuntimeStatementResult::NoOp)
1154        } else {
1155            Err(Error::CatalogError(format!(
1156                "Index '{}' does not exist",
1157                plan.name
1158            )))
1159        }
1160    }
1161}