llkv_runtime/
runtime_session.rs

1// TODO: Implement a common trait (similar to CataglogDdl) for runtime sessions and llkv-transaction sessions
2
3use std::sync::{Arc, RwLock};
4
5use arrow::record_batch::RecordBatch;
6use llkv_result::{Error, Result};
7use llkv_storage::pager::{BoxedPager, MemPager, Pager};
8use llkv_table::{
9    SingleColumnIndexDescriptor, canonical_table_name, validate_alter_table_operation,
10};
11use simd_r_drive_entry_handle::EntryHandle;
12
13use crate::{
14    AlterTablePlan, CatalogDdl, CreateIndexPlan, CreateTablePlan, CreateTableSource, DeletePlan,
15    DropIndexPlan, DropTablePlan, InsertPlan, InsertSource, PlanColumnSpec, PlanOperation,
16    PlanValue, RenameTablePlan, RuntimeContext, RuntimeStatementResult, RuntimeTransactionContext,
17    SelectExecution, SelectPlan, SelectProjection, TransactionContext, TransactionKind,
18    TransactionResult, TransactionSession, UpdatePlan,
19};
20use crate::{
21    PERSISTENT_NAMESPACE_ID, PersistentRuntimeNamespace, RuntimeNamespaceId,
22    RuntimeStorageNamespace, RuntimeStorageNamespaceRegistry, TEMPORARY_NAMESPACE_ID,
23    TemporaryRuntimeNamespace,
24};
25use llkv_plan::TruncatePlan;
26
27pub(crate) struct SessionNamespaces<P>
28where
29    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
30{
31    persistent: Arc<PersistentRuntimeNamespace<P>>,
32    temporary: Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>>,
33    registry: Arc<RwLock<RuntimeStorageNamespaceRegistry>>,
34}
35
36impl<P> SessionNamespaces<P>
37where
38    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
39{
40    pub(crate) fn new(base_context: Arc<RuntimeContext<P>>) -> Self {
41        let persistent = Arc::new(PersistentRuntimeNamespace::new(
42            PERSISTENT_NAMESPACE_ID.to_string(),
43            Arc::clone(&base_context),
44        ));
45
46        let mut registry = RuntimeStorageNamespaceRegistry::new(
47            RuntimeStorageNamespace::namespace_id(persistent.as_ref()).clone(),
48        );
49        registry.register_namespace(Arc::clone(&persistent), Vec::<String>::new(), false);
50
51        let temporary = {
52            let temp_pager = Arc::new(BoxedPager::from_arc(Arc::new(MemPager::default())));
53            let temp_context = Arc::new(RuntimeContext::new(temp_pager));
54            let namespace = Arc::new(TemporaryRuntimeNamespace::new(
55                TEMPORARY_NAMESPACE_ID.to_string(),
56                temp_context,
57            ));
58            registry.register_namespace(
59                Arc::clone(&namespace),
60                vec![TEMPORARY_NAMESPACE_ID.to_string()],
61                true,
62            );
63            namespace
64        };
65
66        Self {
67            persistent,
68            temporary: Some(temporary),
69            registry: Arc::new(RwLock::new(registry)),
70        }
71    }
72
73    pub(crate) fn persistent(&self) -> Arc<PersistentRuntimeNamespace<P>> {
74        Arc::clone(&self.persistent)
75    }
76
77    pub(crate) fn temporary(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
78        self.temporary.as_ref().map(Arc::clone)
79    }
80
81    pub(crate) fn registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
82        Arc::clone(&self.registry)
83    }
84}
85
86impl<P> Drop for SessionNamespaces<P>
87where
88    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
89{
90    fn drop(&mut self) {
91        if let Some(temp) = &self.temporary {
92            let namespace_id = temp.namespace_id().to_string();
93            let canonical_names = {
94                let mut registry = self.registry.write().expect("namespace registry poisoned");
95                registry.drain_namespace_tables(&namespace_id)
96            };
97            temp.clear_tables(canonical_names);
98        }
99    }
100}
101
102/// A session for executing operations with optional transaction support.
103///
104/// This is a high-level wrapper around the transaction machinery that provides
105/// a clean API for users. Operations can be executed directly or within a transaction.
106pub struct RuntimeSession<P>
107where
108    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
109{
110    // TODO: Allow generic pager type for the secondary pager context
111    // NOTE: Sessions always embed a `MemPager` for temporary namespaces; extend the
112    // wrapper when pluggable temp storage is supported.
113    inner: TransactionSession<RuntimeTransactionContext<P>, RuntimeTransactionContext<MemPager>>,
114    namespaces: Arc<SessionNamespaces<P>>,
115}
116
117impl<P> RuntimeSession<P>
118where
119    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
120{
121    pub(crate) fn from_parts(
122        inner: TransactionSession<
123            RuntimeTransactionContext<P>,
124            RuntimeTransactionContext<MemPager>,
125        >,
126        namespaces: Arc<SessionNamespaces<P>>,
127    ) -> Self {
128        Self { inner, namespaces }
129    }
130
131    /// Clone this session (reuses the same underlying TransactionSession).
132    /// This is necessary to maintain transaction state across Engine clones.
133    pub(crate) fn clone_session(&self) -> Self {
134        Self {
135            inner: self.inner.clone_session(),
136            namespaces: self.namespaces.clone(),
137        }
138    }
139
140    pub fn namespace_registry(&self) -> Arc<RwLock<RuntimeStorageNamespaceRegistry>> {
141        self.namespaces.registry()
142    }
143
144    fn resolve_namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
145        self.namespace_registry()
146            .read()
147            .expect("namespace registry poisoned")
148            .namespace_for_table(canonical)
149    }
150
151    fn namespace_for_select_plan(&self, plan: &SelectPlan) -> Option<RuntimeNamespaceId> {
152        if plan.tables.len() != 1 {
153            return None;
154        }
155
156        let qualified = plan.tables[0].qualified_name();
157        let (_, canonical) = canonical_table_name(&qualified).ok()?;
158        Some(self.resolve_namespace_for_table(&canonical))
159    }
160
161    fn select_from_temporary(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
162        let temp_namespace = self
163            .temporary_namespace()
164            .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
165
166        let table_name = if plan.tables.len() == 1 {
167            plan.tables[0].qualified_name()
168        } else {
169            String::new()
170        };
171
172        let temp_context = temp_namespace.context();
173        let temp_tx_context = RuntimeTransactionContext::new(temp_context);
174        let execution = TransactionContext::execute_select(&temp_tx_context, plan)?;
175        let schema = execution.schema();
176        let batches = execution.collect()?;
177
178        let combined = if batches.is_empty() {
179            RecordBatch::new_empty(Arc::clone(&schema))
180        } else if batches.len() == 1 {
181            batches.into_iter().next().unwrap()
182        } else {
183            let refs: Vec<&RecordBatch> = batches.iter().collect();
184            arrow::compute::concat_batches(&schema, refs)?
185        };
186
187        let execution =
188            SelectExecution::from_batch(table_name.clone(), Arc::clone(&schema), combined);
189
190        Ok(RuntimeStatementResult::Select {
191            execution: Box::new(execution),
192            table_name,
193            schema,
194        })
195    }
196
197    fn persistent_namespace(&self) -> Arc<PersistentRuntimeNamespace<P>> {
198        self.namespaces.persistent()
199    }
200
201    #[allow(dead_code)]
202    fn temporary_namespace(&self) -> Option<Arc<TemporaryRuntimeNamespace<BoxedPager>>> {
203        self.namespaces.temporary()
204    }
205
206    fn base_transaction_context(&self) -> Arc<RuntimeTransactionContext<P>> {
207        Arc::clone(self.inner.context())
208    }
209
210    fn with_autocommit_transaction_context<F, T>(&self, f: F) -> Result<T>
211    where
212        F: FnOnce(&RuntimeTransactionContext<P>) -> Result<T>,
213    {
214        let context = self.base_transaction_context();
215        let default_snapshot = context.ctx().default_snapshot();
216        TransactionContext::set_snapshot(&*context, default_snapshot);
217        f(context.as_ref())
218    }
219
220    fn run_autocommit_insert(&self, plan: InsertPlan) -> Result<TransactionResult<P>> {
221        self.with_autocommit_transaction_context(|ctx| TransactionContext::insert(ctx, plan))
222    }
223
224    fn run_autocommit_update(&self, plan: UpdatePlan) -> Result<TransactionResult<P>> {
225        self.with_autocommit_transaction_context(|ctx| TransactionContext::update(ctx, plan))
226    }
227
228    fn run_autocommit_delete(&self, plan: DeletePlan) -> Result<TransactionResult<P>> {
229        self.with_autocommit_transaction_context(|ctx| TransactionContext::delete(ctx, plan))
230    }
231
232    fn run_autocommit_truncate(&self, plan: TruncatePlan) -> Result<TransactionResult<P>> {
233        self.with_autocommit_transaction_context(|ctx| TransactionContext::truncate(ctx, plan))
234    }
235
236    fn run_autocommit_create_table(
237        &self,
238        plan: CreateTablePlan,
239    ) -> Result<RuntimeStatementResult<P>> {
240        let result =
241            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_table(ctx, plan))?;
242        match result {
243            TransactionResult::CreateTable { table_name } => {
244                Ok(RuntimeStatementResult::CreateTable { table_name })
245            }
246            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
247            _ => Err(Error::Internal(
248                "unexpected transaction result for CREATE TABLE".into(),
249            )),
250        }
251    }
252
253    fn run_autocommit_drop_table(&self, plan: DropTablePlan) -> Result<RuntimeStatementResult<P>> {
254        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_table(ctx, plan))?;
255        Ok(RuntimeStatementResult::NoOp)
256    }
257
258    fn run_autocommit_rename_table(&self, plan: RenameTablePlan) -> Result<()> {
259        self.with_autocommit_transaction_context(|ctx| CatalogDdl::rename_table(ctx, plan))
260    }
261
262    fn run_autocommit_alter_table(
263        &self,
264        plan: AlterTablePlan,
265    ) -> Result<RuntimeStatementResult<P>> {
266        let result =
267            self.with_autocommit_transaction_context(|ctx| CatalogDdl::alter_table(ctx, plan))?;
268        match result {
269            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
270            TransactionResult::CreateTable { table_name } => {
271                Ok(RuntimeStatementResult::CreateTable { table_name })
272            }
273            TransactionResult::CreateIndex {
274                table_name,
275                index_name,
276            } => Ok(RuntimeStatementResult::CreateIndex {
277                table_name,
278                index_name,
279            }),
280            _ => Err(Error::Internal(
281                "unexpected transaction result for ALTER TABLE".into(),
282            )),
283        }
284    }
285
286    fn run_autocommit_create_index(
287        &self,
288        plan: CreateIndexPlan,
289    ) -> Result<RuntimeStatementResult<P>> {
290        let result =
291            self.with_autocommit_transaction_context(|ctx| CatalogDdl::create_index(ctx, plan))?;
292        match result {
293            TransactionResult::CreateIndex {
294                table_name,
295                index_name,
296            } => Ok(RuntimeStatementResult::CreateIndex {
297                table_name,
298                index_name,
299            }),
300            TransactionResult::NoOp => Ok(RuntimeStatementResult::NoOp),
301            _ => Err(Error::Internal(
302                "unexpected transaction result for CREATE INDEX".into(),
303            )),
304        }
305    }
306
307    fn run_autocommit_drop_index(
308        &self,
309        plan: DropIndexPlan,
310    ) -> Result<Option<SingleColumnIndexDescriptor>> {
311        self.with_autocommit_transaction_context(|ctx| CatalogDdl::drop_index(ctx, plan))
312    }
313
314    /// Begin a transaction in this session.
315    /// Creates an empty staging context for new tables created within the transaction.
316    /// Existing tables are accessed via MVCC visibility filtering - NO data copying occurs.
317    pub fn begin_transaction(&self) -> Result<RuntimeStatementResult<P>> {
318        let staging_pager = Arc::new(MemPager::default());
319        tracing::trace!(
320            "BEGIN_TRANSACTION: Created staging pager at {:p}",
321            &*staging_pager
322        );
323        let staging_ctx = Arc::new(RuntimeContext::new(staging_pager));
324
325        // Staging context is EMPTY - used only for tables created within the transaction.
326        // Existing tables are read from base context with MVCC visibility filtering.
327        // No data copying occurs at BEGIN - this is pure MVCC.
328
329        let staging_wrapper = Arc::new(RuntimeTransactionContext::new(staging_ctx));
330
331        self.inner.begin_transaction(staging_wrapper)?;
332        Ok(RuntimeStatementResult::Transaction {
333            kind: TransactionKind::Begin,
334        })
335    }
336
337    /// Mark the current transaction as aborted due to an error.
338    /// This should be called when any error occurs during a transaction.
339    pub fn abort_transaction(&self) {
340        self.inner.abort_transaction();
341    }
342
343    /// Check if this session has an active transaction.
344    pub fn has_active_transaction(&self) -> bool {
345        let result = self.inner.has_active_transaction();
346        tracing::trace!("SESSION: has_active_transaction() = {}", result);
347        result
348    }
349
350    /// Check if the current transaction has been aborted due to an error.
351    pub fn is_aborted(&self) -> bool {
352        self.inner.is_aborted()
353    }
354
355    /// Check if a table was created in the current active transaction.
356    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
357        self.inner.is_table_created_in_transaction(table_name)
358    }
359
360    /// Get column specifications for a table created in the current transaction.
361    /// Returns `None` if there's no active transaction or the table wasn't created in it.
362    pub fn table_column_specs_from_transaction(
363        &self,
364        table_name: &str,
365    ) -> Option<Vec<PlanColumnSpec>> {
366        self.inner.table_column_specs_from_transaction(table_name)
367    }
368
369    /// Get tables that reference the given table via foreign keys created in the current transaction.
370    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
371    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
372        self.inner
373            .tables_referencing_in_transaction(referenced_table)
374    }
375
376    /// Commit the current transaction and apply changes to the base context.
377    /// If the transaction was aborted, this acts as a ROLLBACK instead.
378    pub fn commit_transaction(&self) -> Result<RuntimeStatementResult<P>> {
379        tracing::trace!("Session::commit_transaction called");
380        let (tx_result, operations) = self.inner.commit_transaction()?;
381        tracing::trace!(
382            "Session::commit_transaction got {} operations",
383            operations.len()
384        );
385
386        if !operations.is_empty() {
387            let dropped_tables = self
388                .inner
389                .context()
390                .ctx()
391                .dropped_tables
392                .read()
393                .unwrap()
394                .clone();
395            if !dropped_tables.is_empty() {
396                for operation in &operations {
397                    let table_name_opt = match operation {
398                        PlanOperation::Insert(plan) => Some(plan.table.as_str()),
399                        PlanOperation::Update(plan) => Some(plan.table.as_str()),
400                        PlanOperation::Delete(plan) => Some(plan.table.as_str()),
401                        _ => None,
402                    };
403                    if let Some(table_name) = table_name_opt {
404                        let (_, canonical) = canonical_table_name(table_name)?;
405                        if dropped_tables.contains(&canonical) {
406                            self.abort_transaction();
407                            return Err(Error::TransactionContextError(
408                                "another transaction has dropped this table".into(),
409                            ));
410                        }
411                    }
412                }
413            }
414        }
415
416        // Extract the transaction kind from the transaction module's result
417        let kind = match tx_result {
418            TransactionResult::Transaction { kind } => kind,
419            _ => {
420                return Err(Error::Internal(
421                    "commit_transaction returned non-transaction result".into(),
422                ));
423            }
424        };
425        tracing::trace!("Session::commit_transaction kind={:?}", kind);
426
427        // Only replay operations if there are any (empty if transaction was aborted)
428        for operation in operations {
429            match operation {
430                PlanOperation::CreateTable(plan) => {
431                    TransactionContext::apply_create_table_plan(&**self.inner.context(), plan)?;
432                }
433                PlanOperation::DropTable(plan) => {
434                    TransactionContext::drop_table(&**self.inner.context(), plan)?;
435                }
436                PlanOperation::Insert(plan) => {
437                    TransactionContext::insert(&**self.inner.context(), plan)?;
438                }
439                PlanOperation::Update(plan) => {
440                    TransactionContext::update(&**self.inner.context(), plan)?;
441                }
442                PlanOperation::Delete(plan) => {
443                    TransactionContext::delete(&**self.inner.context(), plan)?;
444                }
445                _ => {}
446            }
447        }
448
449        // Reset the base context snapshot to the default auto-commit view now that
450        // the transaction has been replayed onto the base tables.
451        let base_ctx = self.inner.context();
452        let default_snapshot = base_ctx.ctx().default_snapshot();
453        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
454
455        // Persist the next_txn_id to the catalog after a successful commit
456        if matches!(kind, TransactionKind::Commit) {
457            let ctx = base_ctx.ctx();
458            let next_txn_id = ctx.txn_manager().current_next_txn_id();
459            if let Err(e) = ctx.persist_next_txn_id(next_txn_id) {
460                tracing::warn!("[COMMIT] Failed to persist next_txn_id: {}", e);
461            }
462        }
463
464        // Return a StatementResult with the correct kind (Commit or Rollback)
465        Ok(RuntimeStatementResult::Transaction { kind })
466    }
467
468    /// Rollback the current transaction, discarding all changes.
469    pub fn rollback_transaction(&self) -> Result<RuntimeStatementResult<P>> {
470        self.inner.rollback_transaction()?;
471        let base_ctx = self.inner.context();
472        let default_snapshot = base_ctx.ctx().default_snapshot();
473        TransactionContext::set_snapshot(&**base_ctx, default_snapshot);
474        Ok(RuntimeStatementResult::Transaction {
475            kind: TransactionKind::Rollback,
476        })
477    }
478
479    fn materialize_ctas_plan(&self, mut plan: CreateTablePlan) -> Result<CreateTablePlan> {
480        // Only materialize if source is a SELECT query
481        // If source is already Batches, leave it alone
482        if matches!(plan.source, Some(CreateTableSource::Select { .. }))
483            && let Some(CreateTableSource::Select { plan: select_plan }) = plan.source.take()
484        {
485            let select_result = self.execute_select_plan(*select_plan)?;
486            let (schema, batches) = match select_result {
487                RuntimeStatementResult::Select {
488                    schema, execution, ..
489                } => {
490                    let batches = execution.collect()?;
491                    (schema, batches)
492                }
493                _ => {
494                    return Err(Error::Internal(
495                        "expected SELECT result while executing CREATE TABLE AS SELECT".into(),
496                    ));
497                }
498            };
499            plan.source = Some(CreateTableSource::Batches { schema, batches });
500        }
501        Ok(plan)
502    }
503
504    fn normalize_insert_plan(&self, plan: InsertPlan) -> Result<(InsertPlan, usize)> {
505        let InsertPlan {
506            table,
507            columns,
508            source,
509        } = plan;
510
511        match source {
512            InsertSource::Rows(rows) => {
513                let count = rows.len();
514                Ok((
515                    InsertPlan {
516                        table,
517                        columns,
518                        source: InsertSource::Rows(rows),
519                    },
520                    count,
521                ))
522            }
523            InsertSource::Batches(batches) => {
524                let count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
525                Ok((
526                    InsertPlan {
527                        table,
528                        columns,
529                        source: InsertSource::Batches(batches),
530                    },
531                    count,
532                ))
533            }
534            InsertSource::Select { plan: select_plan } => {
535                let select_result = self.execute_select_plan(*select_plan)?;
536                let rows = match select_result {
537                    RuntimeStatementResult::Select { execution, .. } => execution.into_rows()?,
538                    _ => {
539                        return Err(Error::Internal(
540                            "expected Select result when executing INSERT ... SELECT".into(),
541                        ));
542                    }
543                };
544                let count = rows.len();
545                Ok((
546                    InsertPlan {
547                        table,
548                        columns,
549                        source: InsertSource::Rows(rows),
550                    },
551                    count,
552                ))
553            }
554        }
555    }
556
557    /// Insert rows (outside or inside transaction).
558    pub fn execute_insert_plan(&self, plan: InsertPlan) -> Result<RuntimeStatementResult<P>> {
559        tracing::trace!("Session::insert called for table={}", plan.table);
560        let (plan, rows_inserted) = self.normalize_insert_plan(plan)?;
561        let table_name = plan.table.clone();
562        let (_, canonical_table) = canonical_table_name(&plan.table)?;
563        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
564
565        match namespace_id.as_str() {
566            TEMPORARY_NAMESPACE_ID => {
567                let temp_namespace = self
568                    .temporary_namespace()
569                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
570                let temp_context = temp_namespace.context();
571                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
572                match TransactionContext::insert(&temp_tx_context, plan)? {
573                    TransactionResult::Insert { .. } => {}
574                    _ => {
575                        return Err(Error::Internal(
576                            "unexpected transaction result for temporary INSERT".into(),
577                        ));
578                    }
579                }
580                Ok(RuntimeStatementResult::Insert {
581                    rows_inserted,
582                    table_name,
583                })
584            }
585            PERSISTENT_NAMESPACE_ID => {
586                if self.has_active_transaction() {
587                    match self.inner.execute_operation(PlanOperation::Insert(plan)) {
588                        Ok(_) => {
589                            tracing::trace!("Session::insert succeeded for table={}", table_name);
590                            Ok(RuntimeStatementResult::Insert {
591                                rows_inserted,
592                                table_name,
593                            })
594                        }
595                        Err(e) => {
596                            tracing::trace!(
597                                "Session::insert failed for table={}, error={:?}",
598                                table_name,
599                                e
600                            );
601                            if matches!(e, Error::ConstraintError(_)) {
602                                tracing::trace!("Transaction is_aborted=true");
603                                self.abort_transaction();
604                            }
605                            Err(e)
606                        }
607                    }
608                } else {
609                    let result = self.run_autocommit_insert(plan)?;
610                    if !matches!(result, TransactionResult::Insert { .. }) {
611                        return Err(Error::Internal(
612                            "unexpected transaction result for INSERT operation".into(),
613                        ));
614                    }
615                    Ok(RuntimeStatementResult::Insert {
616                        rows_inserted,
617                        table_name,
618                    })
619                }
620            }
621            other => Err(Error::InvalidArgumentError(format!(
622                "Unknown storage namespace '{}'",
623                other
624            ))),
625        }
626    }
627
628    /// Select rows (outside or inside transaction).
629    pub fn execute_select_plan(&self, plan: SelectPlan) -> Result<RuntimeStatementResult<P>> {
630        if let Some(namespace_id) = self.namespace_for_select_plan(&plan)
631            && namespace_id == TEMPORARY_NAMESPACE_ID
632        {
633            return self.select_from_temporary(plan);
634        }
635
636        if self.has_active_transaction() {
637            let tx_result = match self
638                .inner
639                .execute_operation(PlanOperation::Select(Box::new(plan.clone())))
640            {
641                Ok(result) => result,
642                Err(e) => {
643                    // Only abort transaction on specific errors (constraint violations, etc.)
644                    // Don't abort on catalog errors (table doesn't exist) or similar
645                    if matches!(e, Error::ConstraintError(_)) {
646                        self.abort_transaction();
647                    }
648                    return Err(e);
649                }
650            };
651            match tx_result {
652                TransactionResult::Select {
653                    table_name,
654                    schema,
655                    execution: staging_execution,
656                } => {
657                    // Convert from staging (MemPager) execution to base pager execution
658                    // by collecting batches and rebuilding
659                    let batches = staging_execution.collect().unwrap_or_default();
660                    let combined = if batches.is_empty() {
661                        RecordBatch::new_empty(Arc::clone(&schema))
662                    } else if batches.len() == 1 {
663                        batches.into_iter().next().unwrap()
664                    } else {
665                        let refs: Vec<&RecordBatch> = batches.iter().collect();
666                        arrow::compute::concat_batches(&schema, refs)?
667                    };
668
669                    let execution = SelectExecution::from_batch(
670                        table_name.clone(),
671                        Arc::clone(&schema),
672                        combined,
673                    );
674
675                    Ok(RuntimeStatementResult::Select {
676                        execution: Box::new(execution),
677                        table_name,
678                        schema,
679                    })
680                }
681                _ => Err(Error::Internal("expected Select result".into())),
682            }
683        } else {
684            // Call via TransactionContext trait
685            let table_name = if plan.tables.len() == 1 {
686                plan.tables[0].qualified_name()
687            } else {
688                String::new()
689            };
690            let execution = self.with_autocommit_transaction_context(|ctx| {
691                TransactionContext::execute_select(ctx, plan)
692            })?;
693            let schema = execution.schema();
694            Ok(RuntimeStatementResult::Select {
695                execution: Box::new(execution),
696                table_name,
697                schema,
698            })
699        }
700    }
701
702    /// Convenience helper to fetch all rows from a table within this session.
703    pub fn table_rows(&self, table: &str) -> Result<Vec<Vec<PlanValue>>> {
704        let plan =
705            SelectPlan::new(table.to_string()).with_projections(vec![SelectProjection::AllColumns]);
706        match self.execute_select_plan(plan)? {
707            RuntimeStatementResult::Select { execution, .. } => Ok(execution.collect_rows()?.rows),
708            other => Err(Error::Internal(format!(
709                "expected Select result when reading table '{}', got {:?}",
710                table, other
711            ))),
712        }
713    }
714
715    pub fn execute_update_plan(&self, plan: UpdatePlan) -> Result<RuntimeStatementResult<P>> {
716        let (_, canonical_table) = canonical_table_name(&plan.table)?;
717        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
718
719        match namespace_id.as_str() {
720            TEMPORARY_NAMESPACE_ID => {
721                let temp_namespace = self
722                    .temporary_namespace()
723                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
724                let temp_context = temp_namespace.context();
725                let table_name = plan.table.clone();
726                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
727                match TransactionContext::update(&temp_tx_context, plan)? {
728                    TransactionResult::Update { rows_updated, .. } => {
729                        Ok(RuntimeStatementResult::Update {
730                            rows_updated,
731                            table_name,
732                        })
733                    }
734                    _ => Err(Error::Internal(
735                        "unexpected transaction result for temporary UPDATE".into(),
736                    )),
737                }
738            }
739            PERSISTENT_NAMESPACE_ID => {
740                if self.has_active_transaction() {
741                    let table_name = plan.table.clone();
742                    let result = match self.inner.execute_operation(PlanOperation::Update(plan)) {
743                        Ok(result) => result,
744                        Err(e) => {
745                            // If an error occurs during a transaction, abort it
746                            self.abort_transaction();
747                            return Err(e);
748                        }
749                    };
750                    match result {
751                        TransactionResult::Update {
752                            rows_matched: _,
753                            rows_updated,
754                        } => Ok(RuntimeStatementResult::Update {
755                            rows_updated,
756                            table_name,
757                        }),
758                        _ => Err(Error::Internal("expected Update result".into())),
759                    }
760                } else {
761                    let table_name = plan.table.clone();
762                    let result = self.run_autocommit_update(plan)?;
763                    match result {
764                        TransactionResult::Update {
765                            rows_matched: _,
766                            rows_updated,
767                        } => Ok(RuntimeStatementResult::Update {
768                            rows_updated,
769                            table_name,
770                        }),
771                        _ => Err(Error::Internal("expected Update result".into())),
772                    }
773                }
774            }
775            other => Err(Error::InvalidArgumentError(format!(
776                "Unknown storage namespace '{}'",
777                other
778            ))),
779        }
780    }
781
782    pub fn execute_delete_plan(&self, plan: DeletePlan) -> Result<RuntimeStatementResult<P>> {
783        let (_, canonical_table) = canonical_table_name(&plan.table)?;
784        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
785
786        match namespace_id.as_str() {
787            TEMPORARY_NAMESPACE_ID => {
788                let temp_namespace = self
789                    .temporary_namespace()
790                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
791                let temp_context = temp_namespace.context();
792                let table_name = plan.table.clone();
793                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
794                match TransactionContext::delete(&temp_tx_context, plan)? {
795                    TransactionResult::Delete { rows_deleted } => {
796                        Ok(RuntimeStatementResult::Delete {
797                            rows_deleted,
798                            table_name,
799                        })
800                    }
801                    _ => Err(Error::Internal(
802                        "unexpected transaction result for temporary DELETE".into(),
803                    )),
804                }
805            }
806            PERSISTENT_NAMESPACE_ID => {
807                if self.has_active_transaction() {
808                    let table_name = plan.table.clone();
809                    let result = match self.inner.execute_operation(PlanOperation::Delete(plan)) {
810                        Ok(result) => result,
811                        Err(e) => {
812                            // If an error occurs during a transaction, abort it
813                            self.abort_transaction();
814                            return Err(e);
815                        }
816                    };
817                    match result {
818                        TransactionResult::Delete { rows_deleted } => {
819                            Ok(RuntimeStatementResult::Delete {
820                                rows_deleted,
821                                table_name,
822                            })
823                        }
824                        _ => Err(Error::Internal("expected Delete result".into())),
825                    }
826                } else {
827                    let table_name = plan.table.clone();
828                    let result = self.run_autocommit_delete(plan)?;
829                    match result {
830                        TransactionResult::Delete { rows_deleted } => {
831                            Ok(RuntimeStatementResult::Delete {
832                                rows_deleted,
833                                table_name,
834                            })
835                        }
836                        _ => Err(Error::Internal("expected Delete result".into())),
837                    }
838                }
839            }
840            other => Err(Error::InvalidArgumentError(format!(
841                "Unknown storage namespace '{}'",
842                other
843            ))),
844        }
845    }
846
847    pub fn execute_truncate_plan(&self, plan: TruncatePlan) -> Result<RuntimeStatementResult<P>> {
848        let (_, canonical_table) = canonical_table_name(&plan.table)?;
849        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
850
851        match namespace_id.as_str() {
852            TEMPORARY_NAMESPACE_ID => {
853                let temp_namespace = self
854                    .temporary_namespace()
855                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
856                let temp_context = temp_namespace.context();
857                let table_name = plan.table.clone();
858                let temp_tx_context = RuntimeTransactionContext::new(temp_context);
859                match TransactionContext::truncate(&temp_tx_context, plan)? {
860                    TransactionResult::Delete { rows_deleted } => {
861                        Ok(RuntimeStatementResult::Delete {
862                            rows_deleted,
863                            table_name,
864                        })
865                    }
866                    _ => Err(Error::Internal(
867                        "unexpected transaction result for temporary TRUNCATE".into(),
868                    )),
869                }
870            }
871            PERSISTENT_NAMESPACE_ID => {
872                if self.has_active_transaction() {
873                    let table_name = plan.table.clone();
874                    let result = match self.inner.execute_operation(PlanOperation::Truncate(plan)) {
875                        Ok(result) => result,
876                        Err(e) => {
877                            // If an error occurs during a transaction, abort it
878                            self.abort_transaction();
879                            return Err(e);
880                        }
881                    };
882                    match result {
883                        TransactionResult::Delete { rows_deleted } => {
884                            Ok(RuntimeStatementResult::Delete {
885                                rows_deleted,
886                                table_name,
887                            })
888                        }
889                        _ => Err(Error::Internal("expected Delete result".into())),
890                    }
891                } else {
892                    let table_name = plan.table.clone();
893                    let result = self.run_autocommit_truncate(plan)?;
894                    match result {
895                        TransactionResult::Delete { rows_deleted } => {
896                            Ok(RuntimeStatementResult::Delete {
897                                rows_deleted,
898                                table_name,
899                            })
900                        }
901                        _ => Err(Error::Internal("expected Delete result".into())),
902                    }
903                }
904            }
905            other => Err(Error::InvalidArgumentError(format!(
906                "Unknown storage namespace '{}'",
907                other
908            ))),
909        }
910    }
911}
912
913/// Implement [`CatalogDdl`] directly on the session so callers must import the trait
914/// to perform schema mutations. This keeps all runtime DDL entry points aligned with
915/// the shared contract used by contexts and storage namespaces.
916impl<P> CatalogDdl for RuntimeSession<P>
917where
918    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
919{
920    type CreateTableOutput = RuntimeStatementResult<P>;
921    type DropTableOutput = RuntimeStatementResult<P>;
922    type RenameTableOutput = ();
923    type AlterTableOutput = RuntimeStatementResult<P>;
924    type CreateIndexOutput = RuntimeStatementResult<P>;
925    type DropIndexOutput = RuntimeStatementResult<P>;
926
927    fn create_table(&self, plan: CreateTablePlan) -> Result<Self::CreateTableOutput> {
928        let target_namespace = plan
929            .namespace
930            .clone()
931            .unwrap_or_else(|| PERSISTENT_NAMESPACE_ID.to_string())
932            .to_ascii_lowercase();
933
934        let plan = self.materialize_ctas_plan(plan)?;
935
936        match target_namespace.as_str() {
937            TEMPORARY_NAMESPACE_ID => {
938                let temp_namespace = self
939                    .temporary_namespace()
940                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
941                let (_, canonical) = canonical_table_name(&plan.name)?;
942                let result = temp_namespace.create_table(plan)?;
943                if matches!(result, RuntimeStatementResult::CreateTable { .. }) {
944                    let namespace_id = temp_namespace.namespace_id().to_string();
945                    let registry = self.namespace_registry();
946                    registry
947                        .write()
948                        .expect("namespace registry poisoned")
949                        .register_table(&namespace_id, canonical);
950                }
951                result.convert_pager_type::<P>()
952            }
953            PERSISTENT_NAMESPACE_ID => {
954                if self.has_active_transaction() {
955                    match self
956                        .inner
957                        .execute_operation(PlanOperation::CreateTable(plan))
958                    {
959                        Ok(TransactionResult::CreateTable { table_name }) => {
960                            Ok(RuntimeStatementResult::CreateTable { table_name })
961                        }
962                        Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
963                        Ok(_) => Err(Error::Internal(
964                            "expected CreateTable result during transactional CREATE TABLE".into(),
965                        )),
966                        Err(err) => {
967                            self.abort_transaction();
968                            Err(err)
969                        }
970                    }
971                } else {
972                    if self.inner.has_table_locked_by_other_session(&plan.name) {
973                        return Err(Error::TransactionContextError(format!(
974                            "table '{}' is locked by another active transaction",
975                            plan.name
976                        )));
977                    }
978                    self.run_autocommit_create_table(plan)
979                }
980            }
981            other => Err(Error::InvalidArgumentError(format!(
982                "Unknown storage namespace '{}'",
983                other
984            ))),
985        }
986    }
987
988    fn drop_table(&self, plan: DropTablePlan) -> Result<Self::DropTableOutput> {
989        let (_, canonical_table) = canonical_table_name(&plan.name)?;
990        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
991
992        match namespace_id.as_str() {
993            TEMPORARY_NAMESPACE_ID => {
994                let temp_namespace = self
995                    .temporary_namespace()
996                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
997                temp_namespace.drop_table(plan)?;
998                let registry = self.namespace_registry();
999                registry
1000                    .write()
1001                    .expect("namespace registry poisoned")
1002                    .unregister_table(&canonical_table);
1003                Ok(RuntimeStatementResult::NoOp)
1004            }
1005            PERSISTENT_NAMESPACE_ID => {
1006                if self.has_active_transaction() {
1007                    let referencing_tables = self.tables_referencing_in_transaction(&plan.name);
1008                    if !referencing_tables.is_empty() {
1009                        let referencing_table = &referencing_tables[0];
1010                        self.abort_transaction();
1011                        return Err(Error::CatalogError(format!(
1012                            "Catalog Error: Could not drop the table because this table is main key table of the table \"{}\".",
1013                            referencing_table
1014                        )));
1015                    }
1016
1017                    match self
1018                        .inner
1019                        .execute_operation(PlanOperation::DropTable(plan.clone()))
1020                    {
1021                        Ok(TransactionResult::NoOp) => Ok(RuntimeStatementResult::NoOp),
1022                        Ok(_) => Err(Error::Internal(
1023                            "expected NoOp result for DROP TABLE during transactional execution"
1024                                .into(),
1025                        )),
1026                        Err(err) => {
1027                            self.abort_transaction();
1028                            Err(err)
1029                        }
1030                    }
1031                } else {
1032                    if self.inner.has_table_locked_by_other_session(&plan.name) {
1033                        return Err(Error::TransactionContextError(format!(
1034                            "table '{}' is locked by another active transaction",
1035                            plan.name
1036                        )));
1037                    }
1038                    self.run_autocommit_drop_table(plan)
1039                }
1040            }
1041            other => Err(Error::InvalidArgumentError(format!(
1042                "Unknown storage namespace '{}'",
1043                other
1044            ))),
1045        }
1046    }
1047
1048    fn rename_table(&self, plan: RenameTablePlan) -> Result<Self::RenameTableOutput> {
1049        if self.has_active_transaction() {
1050            return Err(Error::InvalidArgumentError(
1051                "ALTER TABLE RENAME is not supported inside an active transaction".into(),
1052            ));
1053        }
1054
1055        let (_, canonical_table) = canonical_table_name(&plan.current_name)?;
1056        let (_, new_canonical) = canonical_table_name(&plan.new_name)?;
1057        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1058
1059        match namespace_id.as_str() {
1060            TEMPORARY_NAMESPACE_ID => {
1061                let temp_namespace = self
1062                    .temporary_namespace()
1063                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1064                match temp_namespace.rename_table(plan.clone()) {
1065                    Ok(()) => {
1066                        let namespace_id = temp_namespace.namespace_id().to_string();
1067                        let registry = self.namespace_registry();
1068                        let mut registry = registry.write().expect("namespace registry poisoned");
1069                        registry.unregister_table(&canonical_table);
1070                        registry.register_table(&namespace_id, new_canonical);
1071                        Ok(())
1072                    }
1073                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1074                    Err(err) => Err(err),
1075                }
1076            }
1077            PERSISTENT_NAMESPACE_ID => match self.run_autocommit_rename_table(plan.clone()) {
1078                Ok(()) => Ok(()),
1079                Err(err) if plan.if_exists && super::is_table_missing_error(&err) => Ok(()),
1080                Err(err) => Err(err),
1081            },
1082            other => Err(Error::InvalidArgumentError(format!(
1083                "Unknown storage namespace '{}'",
1084                other
1085            ))),
1086        }
1087    }
1088
1089    fn alter_table(&self, plan: AlterTablePlan) -> Result<Self::AlterTableOutput> {
1090        let (_, canonical_table) = canonical_table_name(&plan.table_name)?;
1091        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1092
1093        match namespace_id.as_str() {
1094            TEMPORARY_NAMESPACE_ID => {
1095                let temp_namespace = self
1096                    .temporary_namespace()
1097                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1098
1099                let context = temp_namespace.context();
1100                let catalog_service = &context.catalog_service;
1101                let view = match catalog_service.table_view(&canonical_table) {
1102                    Ok(view) => view,
1103                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1104                        return Ok(RuntimeStatementResult::NoOp);
1105                    }
1106                    Err(err) => return Err(err),
1107                };
1108                let table_id = view
1109                    .table_meta
1110                    .as_ref()
1111                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1112                    .table_id;
1113
1114                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1115
1116                temp_namespace.alter_table(plan)?.convert_pager_type::<P>()
1117            }
1118            PERSISTENT_NAMESPACE_ID => {
1119                let persistent = self.persistent_namespace();
1120                let context = persistent.context();
1121                let catalog_service = &context.catalog_service;
1122                let view = match catalog_service.table_view(&canonical_table) {
1123                    Ok(view) => view,
1124                    Err(err) if plan.if_exists && super::is_table_missing_error(&err) => {
1125                        return Ok(RuntimeStatementResult::NoOp);
1126                    }
1127                    Err(err) => return Err(err),
1128                };
1129                let table_id = view
1130                    .table_meta
1131                    .as_ref()
1132                    .ok_or_else(|| Error::Internal("table metadata missing".into()))?
1133                    .table_id;
1134
1135                validate_alter_table_operation(&plan.operation, &view, table_id, catalog_service)?;
1136
1137                self.run_autocommit_alter_table(plan)
1138            }
1139            other => Err(Error::InvalidArgumentError(format!(
1140                "Unknown storage namespace '{}'",
1141                other
1142            ))),
1143        }
1144    }
1145
1146    fn create_index(&self, plan: CreateIndexPlan) -> Result<Self::CreateIndexOutput> {
1147        if plan.columns.is_empty() {
1148            return Err(Error::InvalidArgumentError(
1149                "CREATE INDEX requires at least one column".into(),
1150            ));
1151        }
1152
1153        let (_, canonical_table) = canonical_table_name(&plan.table)?;
1154        let namespace_id = self.resolve_namespace_for_table(&canonical_table);
1155
1156        match namespace_id.as_str() {
1157            TEMPORARY_NAMESPACE_ID => {
1158                let temp_namespace = self
1159                    .temporary_namespace()
1160                    .ok_or_else(|| Error::Internal("temporary namespace unavailable".into()))?;
1161                temp_namespace.create_index(plan)?.convert_pager_type::<P>()
1162            }
1163            PERSISTENT_NAMESPACE_ID => {
1164                if self.has_active_transaction() {
1165                    return Err(Error::InvalidArgumentError(
1166                        "CREATE INDEX is not supported inside an active transaction".into(),
1167                    ));
1168                }
1169
1170                self.run_autocommit_create_index(plan)
1171            }
1172            other => Err(Error::InvalidArgumentError(format!(
1173                "Unknown storage namespace '{}'",
1174                other
1175            ))),
1176        }
1177    }
1178
1179    fn drop_index(&self, plan: DropIndexPlan) -> Result<Self::DropIndexOutput> {
1180        if self.has_active_transaction() {
1181            return Err(Error::InvalidArgumentError(
1182                "DROP INDEX is not supported inside an active transaction".into(),
1183            ));
1184        }
1185
1186        let mut dropped = false;
1187
1188        match self.run_autocommit_drop_index(plan.clone()) {
1189            Ok(Some(_)) => {
1190                dropped = true;
1191            }
1192            Ok(None) => {}
1193            Err(err) => {
1194                if !super::is_index_not_found_error(&err) {
1195                    return Err(err);
1196                }
1197            }
1198        }
1199
1200        if !dropped && let Some(temp_namespace) = self.temporary_namespace() {
1201            match temp_namespace.drop_index(plan.clone()) {
1202                Ok(Some(_)) => {
1203                    dropped = true;
1204                }
1205                Ok(None) => {}
1206                Err(err) => {
1207                    if !super::is_index_not_found_error(&err) {
1208                        return Err(err);
1209                    }
1210                }
1211            }
1212        }
1213
1214        if dropped || plan.if_exists {
1215            Ok(RuntimeStatementResult::NoOp)
1216        } else {
1217            Err(Error::CatalogError(format!(
1218                "Index '{}' does not exist",
1219                plan.name
1220            )))
1221        }
1222    }
1223}