llkv_transaction/
context.rs

1//! Transaction context types for orchestrating transaction lifecycles.
2//!
3//! This module contains the core types for managing transactions:
4//! - [`TransactionContext`]: Trait defining operations a context must support
5//! - [`SessionTransaction`]: Per-transaction state machine
6//! - [`TransactionSession`]: Session-level transaction management
7//! - [`TransactionManager`]: Cross-session transaction coordination
8
9// TODO: Swap with `FxHashMap` and `FxHashSet` for performance?
10use std::collections::{HashMap, HashSet};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13
14use arrow::array::RecordBatch;
15use llkv_executor::{ExecutorRowBatch, SelectExecution};
16use llkv_expr::expr::Expr as LlkvExpr;
17use llkv_plan::plans::{
18    CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
19    PlanOperation, SelectPlan, TruncatePlan, UpdatePlan,
20};
21use llkv_result::{Error, Result as LlkvResult};
22use llkv_storage::pager::Pager;
23use llkv_table::CatalogDdl;
24use llkv_types::TableId;
25use simd_r_drive_entry_handle::EntryHandle;
26
27use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
28use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
29
30/// Session identifier type.
31///
32/// Session IDs track client sessions that may spawn multiple transactions.
33/// They are distinct from transaction IDs and managed separately.
34pub type TransactionSessionId = u64;
35
36/// Extracts table name from SelectPlan for single-table queries.
37fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
38    if plan.tables.len() == 1 {
39        Some(plan.tables[0].qualified_name())
40    } else {
41        None
42    }
43}
44
45/// A trait for transaction context operations.
46///
47/// This allows [`SessionTransaction`] to work with any context that implements these operations.
48/// The associated type `Pager` specifies the pager type this context uses.
49pub trait TransactionContext: CatalogDdl + Send + Sync {
50    /// The pager type used by this context
51    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
52
53    /// Snapshot representation returned by this context.
54    type Snapshot: TransactionCatalogSnapshot;
55
56    /// Update the snapshot used for MVCC visibility decisions.
57    fn set_snapshot(&self, snapshot: TransactionSnapshot);
58
59    /// Get the snapshot currently associated with this context.
60    fn snapshot(&self) -> TransactionSnapshot;
61
62    /// Get table column specifications
63    fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
64
65    /// Export table rows for snapshotting
66    fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
67
68    /// Get batches with row IDs for seeding updates
69    fn get_batches_with_row_ids(
70        &self,
71        table_name: &str,
72        filter: Option<LlkvExpr<'static, String>>,
73    ) -> LlkvResult<Vec<RecordBatch>>;
74
75    /// Execute a SELECT plan with this context's pager type
76    fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
77
78    /// Create a table from plan
79    fn apply_create_table_plan(
80        &self,
81        plan: CreateTablePlan,
82    ) -> LlkvResult<TransactionResult<Self::Pager>>;
83
84    /// Drop a table
85    fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
86
87    /// Insert rows
88    fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
89
90    /// Update rows
91    fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
92
93    /// Delete rows
94    fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
95
96    /// Truncate table (delete all rows)
97    fn truncate(&self, plan: TruncatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
98
99    /// Create an index
100    fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
101
102    /// Append batches with row IDs
103    fn append_batches_with_row_ids(
104        &self,
105        table_name: &str,
106        batches: Vec<RecordBatch>,
107    ) -> LlkvResult<usize>;
108
109    /// Get table names for catalog snapshot
110    fn table_names(&self) -> Vec<String>;
111
112    /// Get table ID for a given table name (for conflict detection)
113    fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
114
115    /// Get an immutable catalog snapshot for transaction isolation
116    fn catalog_snapshot(&self) -> Self::Snapshot;
117
118    /// Validate any pending commit-time constraints for this transaction.
119    fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
120        Ok(())
121    }
122
123    /// Clear any transaction-scoped state retained by the context.
124    fn clear_transaction_state(&self, _txn_id: TxnId) {}
125}
126
127/// Transaction state for the runtime context.
128///
129/// This struct manages the lifecycle of a single transaction, tracking:
130/// - Operations to replay on commit
131/// - Tables created/dropped/accessed within the transaction
132/// - Foreign key constraints for validation
133/// - Transaction isolation via MVCC snapshots
134pub struct SessionTransaction<BaseCtx, StagingCtx>
135where
136    BaseCtx: TransactionContext + 'static,
137    StagingCtx: TransactionContext + 'static,
138{
139    /// Transaction snapshot (contains txn id + snapshot watermark)
140    snapshot: TransactionSnapshot,
141    /// Staging context with MemPager for isolation (only used for tables created in this txn).
142    staging: Arc<StagingCtx>,
143    /// Operations to replay on commit.
144    operations: Vec<PlanOperation>,
145    /// Tables that have been verified to exist (either in base or staging).
146    staged_tables: HashSet<String>,
147    /// Tables created within this transaction (live in staging until commit).
148    new_tables: HashSet<String>,
149    /// Tables known to be missing.
150    missing_tables: HashSet<String>,
151    /// All table names locked by this transaction for DDL operations (for conflict detection).
152    /// This includes tables that were created, dropped, or both within the transaction.
153    locked_table_names: HashSet<String>,
154    /// Immutable catalog snapshot at transaction start (for isolation).
155    /// Contains table name→ID mappings. Replaces separate HashSet and HashMap.
156    catalog_snapshot: BaseCtx::Snapshot,
157    /// Base context for reading existing tables with MVCC visibility.
158    base_context: Arc<BaseCtx>,
159    /// Whether this transaction has been aborted due to an error.
160    is_aborted: bool,
161    /// Transaction ID manager (shared across all transactions)
162    txn_manager: Arc<TxnIdManager>,
163    /// Tables accessed (names only) by this transaction
164    accessed_tables: HashSet<String>,
165    /// Foreign key constraints created in this transaction.
166    /// Maps referenced_table_name -> Vec<referencing_table_name>
167    /// Used to check FK dependencies when dropping tables within a transaction.
168    transactional_foreign_keys: HashMap<String, Vec<String>>,
169}
170
171impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
172where
173    BaseCtx: TransactionContext + 'static,
174    StagingCtx: TransactionContext + 'static,
175{
176    pub fn new(
177        base_context: Arc<BaseCtx>,
178        staging: Arc<StagingCtx>,
179        txn_manager: Arc<TxnIdManager>,
180    ) -> Self {
181        // Get immutable catalog snapshot for transaction isolation
182        // This replaces the previous HashSet<String> and HashMap<String, TableId>
183        let catalog_snapshot = base_context.catalog_snapshot();
184
185        let snapshot = txn_manager.begin_transaction();
186        tracing::debug!(
187            "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
188            snapshot.txn_id,
189            snapshot.snapshot_id
190        );
191        TransactionContext::set_snapshot(&*base_context, snapshot);
192        TransactionContext::set_snapshot(&*staging, snapshot);
193
194        Self {
195            staging,
196            operations: Vec::new(),
197            staged_tables: HashSet::new(),
198            new_tables: HashSet::new(),
199            missing_tables: HashSet::new(),
200            locked_table_names: HashSet::new(),
201            catalog_snapshot,
202            base_context,
203            is_aborted: false,
204            accessed_tables: HashSet::new(),
205            snapshot,
206            txn_manager,
207            transactional_foreign_keys: HashMap::new(),
208        }
209    }
210
211    /// Ensure a table exists and is visible to this transaction.
212    /// NO COPYING - just check if table exists in base or was created in this transaction.
213    fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
214        tracing::trace!(
215            "[ENSURE] ensure_table_exists called for table='{}'",
216            table_name
217        );
218
219        // If we already checked this table, return early
220        if self.staged_tables.contains(table_name) {
221            tracing::trace!("[ENSURE] table already verified to exist");
222            return Ok(());
223        }
224
225        // Check if table exists in catalog snapshot OR was created in this transaction
226        if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
227        {
228            self.missing_tables.insert(table_name.to_string());
229            return Err(Error::CatalogError(format!(
230                "Catalog Error: Table '{table_name}' does not exist"
231            )));
232        }
233
234        if self.missing_tables.contains(table_name) {
235            return Err(Error::CatalogError(format!(
236                "Catalog Error: Table '{table_name}' does not exist"
237            )));
238        }
239
240        // For tables created in this transaction, also create in staging for isolation
241        if self.new_tables.contains(table_name) {
242            tracing::trace!("[ENSURE] Table was created in this transaction");
243            // Check if it exists in staging
244            match self.staging.table_column_specs(table_name) {
245                Ok(_) => {
246                    self.staged_tables.insert(table_name.to_string());
247                    return Ok(());
248                }
249                Err(_) => {
250                    return Err(Error::CatalogError(format!(
251                        "Catalog Error: Table '{table_name}' was created but not found in staging"
252                    )));
253                }
254            }
255        }
256
257        // Table exists in base - mark as verified
258        tracing::trace!(
259            "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
260        );
261        self.staged_tables.insert(table_name.to_string());
262        Ok(())
263    }
264
265    /// Execute a SELECT query within transaction isolation.
266    /// For tables created in this transaction: read from staging.
267    /// For existing tables: read from BASE with MVCC visibility filtering.
268    pub fn execute_select(
269        &mut self,
270        plan: SelectPlan,
271    ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
272        // Get table name (for single-table queries only)
273        let table_name = select_plan_table_name(&plan).ok_or_else(|| {
274            Error::InvalidArgumentError(
275                "Transaction execute_select requires single-table query".into(),
276            )
277        })?;
278
279        // Ensure table exists
280        self.ensure_table_exists(&table_name)?;
281
282        // If table was created in this transaction, read from staging
283        if self.new_tables.contains(&table_name) {
284            tracing::trace!(
285                "[SELECT] Reading from staging for new table '{}'",
286                table_name
287            );
288            return self.staging.execute_select(plan);
289        }
290
291        // Track access to existing table for conflict detection
292        self.accessed_tables.insert(table_name.clone());
293
294        // Otherwise read from BASE with MVCC visibility
295        // The base context already has the snapshot set in SessionTransaction::new()
296        tracing::trace!(
297            "[SELECT] Reading from BASE with MVCC for existing table '{}'",
298            table_name
299        );
300        match self.base_context.execute_select(plan) {
301            Ok(exec) => {
302                // Convert pager type from BaseCtx to StagingCtx
303                // This is a limitation of the current type system
304                // In practice, we're just collecting and re-packaging
305                let schema = exec.schema();
306                let batches = exec.collect().unwrap_or_default();
307                let combined = if batches.is_empty() {
308                    RecordBatch::new_empty(Arc::clone(&schema))
309                } else if batches.len() == 1 {
310                    batches.into_iter().next().unwrap()
311                } else {
312                    let refs: Vec<&RecordBatch> = batches.iter().collect();
313                    arrow::compute::concat_batches(&schema, refs).map_err(|err| {
314                        Error::Internal(format!("failed to concatenate batches: {err}"))
315                    })?
316                };
317                Ok(SelectExecution::from_batch(
318                    table_name,
319                    Arc::clone(&schema),
320                    combined,
321                ))
322            }
323            Err(Error::NotFound) if self.catalog_snapshot.table_exists(&table_name) => {
324                // Table existed in our snapshot but has been dropped by another
325                // transaction. Return empty result; conflict will be detected at commit.
326                tracing::debug!(
327                    "[SELECT] Table '{}' not found in current catalog but exists in snapshot; returning empty result, deferring conflict to commit",
328                    table_name
329                );
330                // Get schema from the plan if possible, otherwise create a minimal schema
331                let schema = Arc::new(arrow::datatypes::Schema::empty());
332                Ok(SelectExecution::from_batch(
333                    table_name,
334                    schema.clone(),
335                    RecordBatch::new_empty(schema),
336                ))
337            }
338            Err(e) => Err(e),
339        }
340    }
341
342    /// Execute an operation in the transaction staging context
343    pub fn execute_operation(
344        &mut self,
345        operation: PlanOperation,
346    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
347        tracing::trace!(
348            "[TX] SessionTransaction::execute_operation called, operation={:?}",
349            match &operation {
350                PlanOperation::Insert(p) => format!("INSERT({})", p.table),
351                PlanOperation::Update(p) => format!("UPDATE({})", p.table),
352                PlanOperation::Delete(p) => format!("DELETE({})", p.table),
353                PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
354                _ => "OTHER".to_string(),
355            }
356        );
357        // Check if transaction is aborted
358        if self.is_aborted {
359            return Err(Error::TransactionContextError(
360                "TransactionContext Error: transaction is aborted".into(),
361            ));
362        }
363
364        // Execute operation and catch errors to mark transaction as aborted
365        let result = match operation {
366            PlanOperation::CreateTable(ref plan) => {
367                // Before creating in staging, validate that all foreign key referenced tables
368                // exist in the base context. This is necessary because the staging context
369                // can't see tables from the base context.
370                for fk in &plan.foreign_keys {
371                    let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
372                    // Check if the referenced table exists in base context or was created in this transaction
373                    if !self.new_tables.contains(&canonical_ref_table)
374                        && !self.catalog_snapshot.table_exists(&canonical_ref_table)
375                    {
376                        self.is_aborted = true;
377                        return Err(Error::CatalogError(format!(
378                            "Catalog Error: referenced table '{}' does not exist",
379                            fk.referenced_table
380                        )));
381                    }
382                }
383
384                // Create a modified plan without foreign keys for staging context.
385                // The staging context can't validate foreign keys against base tables,
386                // so we skip FK registration in staging and will re-apply at commit time.
387                let mut staging_plan = plan.clone();
388                staging_plan.foreign_keys.clear();
389
390                match self.staging.apply_create_table_plan(staging_plan) {
391                    Ok(result) => {
392                        // Track new table so it's visible to subsequent operations in this transaction
393                        self.new_tables.insert(plan.name.clone());
394                        self.missing_tables.remove(&plan.name);
395                        self.staged_tables.insert(plan.name.clone());
396                        // Lock this table name for the duration of the transaction
397                        self.locked_table_names
398                            .insert(plan.name.to_ascii_lowercase());
399
400                        // Track foreign key dependencies for DROP TABLE validation
401                        for fk in &plan.foreign_keys {
402                            let referenced_table = fk.referenced_table.to_ascii_lowercase();
403                            self.transactional_foreign_keys
404                                .entry(referenced_table)
405                                .or_default()
406                                .push(plan.name.to_ascii_lowercase());
407                        }
408
409                        // Track for commit replay WITH original foreign keys
410                        self.operations
411                            .push(PlanOperation::CreateTable(plan.clone()));
412                        result.convert_pager_type()?
413                    }
414                    Err(e) => {
415                        self.is_aborted = true;
416                        return Err(e);
417                    }
418                }
419            }
420            PlanOperation::DropTable(ref plan) => {
421                let canonical_name = plan.name.to_ascii_lowercase();
422
423                // Lock this table name for conflict detection (even if CREATE/DROP cancel out)
424                self.locked_table_names.insert(canonical_name.clone());
425
426                // Check if the table was created in this transaction
427                if self.new_tables.contains(&canonical_name) {
428                    // Table was created in this transaction, so drop it from staging
429                    // and remove from tracking
430                    TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
431                    self.new_tables.remove(&canonical_name);
432                    self.staged_tables.remove(&canonical_name);
433
434                    // Remove FK constraints where this table was the referencing table
435                    self.transactional_foreign_keys.iter_mut().for_each(
436                        |(_, referencing_tables)| {
437                            referencing_tables.retain(|t| t != &canonical_name);
438                        },
439                    );
440                    // Clean up empty entries
441                    self.transactional_foreign_keys
442                        .retain(|_, referencing_tables| !referencing_tables.is_empty());
443
444                    // Remove the CREATE TABLE operation from the operation list
445                    self.operations.retain(|op| {
446                        !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
447                    });
448                    // Don't add DROP to operations since we're canceling out the CREATE
449                    // But keep the table name locked for conflict detection
450                    TransactionResult::NoOp
451                } else {
452                    // Table exists in base context, track the drop for replay at commit
453                    // Verify the table exists
454                    if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
455                        self.is_aborted = true;
456                        return Err(Error::InvalidArgumentError(format!(
457                            "table '{}' does not exist",
458                            plan.name
459                        )));
460                    }
461
462                    if self.catalog_snapshot.table_exists(&canonical_name) {
463                        // Mark as dropped so it's not visible in this transaction
464                        self.missing_tables.insert(canonical_name.clone());
465                        self.staged_tables.remove(&canonical_name);
466                        // Track for commit replay
467                        self.operations.push(PlanOperation::DropTable(plan.clone()));
468                    }
469                    TransactionResult::NoOp
470                }
471            }
472            PlanOperation::Insert(ref plan) => {
473                tracing::trace!(
474                    "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
475                    plan.table
476                );
477                // Ensure table exists
478                if let Err(e) = self.ensure_table_exists(&plan.table) {
479                    self.is_aborted = true;
480                    return Err(e);
481                }
482
483                // If table was created in this transaction, insert into staging
484                // Otherwise insert directly into BASE (with transaction ID tagging)
485                let is_new_table = self.new_tables.contains(&plan.table);
486                // Track access to existing table for conflict detection
487                if !is_new_table {
488                    self.accessed_tables.insert(plan.table.clone());
489                }
490                let result = if is_new_table {
491                    tracing::trace!("[TX] INSERT into staging for new table");
492                    self.staging.insert(plan.clone())
493                } else {
494                    tracing::trace!(
495                        "[TX] INSERT directly into BASE with txn_id={}",
496                        self.snapshot.txn_id
497                    );
498                    // Insert into base - MVCC tagging happens automatically in insert_rows()
499                    self.base_context
500                        .insert(plan.clone())
501                        .and_then(|r| r.convert_pager_type())
502                };
503
504                match result {
505                    Ok(result) => {
506                        // Only track operations for NEW tables - they need replay on commit
507                        // For existing tables, changes are already in BASE with MVCC tags
508                        if is_new_table {
509                            tracing::trace!(
510                                "[TX] INSERT to new table - tracking for commit replay"
511                            );
512                            self.operations.push(PlanOperation::Insert(plan.clone()));
513                        } else {
514                            tracing::trace!(
515                                "[TX] INSERT to existing table - already in BASE, no replay needed"
516                            );
517                        }
518                        result
519                    }
520                    Err(e) => {
521                        tracing::trace!(
522                            "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
523                            e
524                        );
525                        tracing::trace!("DEBUG setting is_aborted=true");
526                        self.is_aborted = true;
527                        return Err(e);
528                    }
529                }
530            }
531            PlanOperation::Update(ref plan) => {
532                if let Err(e) = self.ensure_table_exists(&plan.table) {
533                    self.is_aborted = true;
534                    return Err(e);
535                }
536
537                // If table was created in this transaction, update in staging
538                // Otherwise update directly in BASE (with MVCC soft-delete + insert)
539                let is_new_table = self.new_tables.contains(&plan.table);
540                // Track access to existing table for conflict detection
541                if !is_new_table {
542                    self.accessed_tables.insert(plan.table.clone());
543                }
544                let result = if is_new_table {
545                    tracing::trace!("[TX] UPDATE in staging for new table");
546                    self.staging.update(plan.clone())
547                } else {
548                    tracing::trace!(
549                        "[TX] UPDATE directly in BASE with txn_id={}",
550                        self.snapshot.txn_id
551                    );
552                    match self.base_context.update(plan.clone()) {
553                        Ok(r) => r.convert_pager_type(),
554                        Err(Error::NotFound) if self.catalog_snapshot.table_exists(&plan.table) => {
555                            // Table existed in our snapshot but has been dropped by another
556                            // transaction. Return zero rows updated; conflict will be detected at commit.
557                            tracing::debug!(
558                                "[UPDATE] Table '{}' not found in current catalog but exists in snapshot; deferring conflict to commit",
559                                plan.table
560                            );
561                            Ok(TransactionResult::Update {
562                                rows_matched: 0,
563                                rows_updated: 0,
564                            })
565                        }
566                        Err(e) => Err(e),
567                    }
568                };
569
570                match result {
571                    Ok(result) => {
572                        // Only track operations for NEW tables - they need replay on commit
573                        if is_new_table {
574                            tracing::trace!(
575                                "[TX] UPDATE to new table - tracking for commit replay"
576                            );
577                            self.operations.push(PlanOperation::Update(plan.clone()));
578                        } else {
579                            tracing::trace!(
580                                "[TX] UPDATE to existing table - already in BASE, no replay needed"
581                            );
582                        }
583                        result
584                    }
585                    Err(e) => {
586                        self.is_aborted = true;
587                        return Err(e);
588                    }
589                }
590            }
591            PlanOperation::Delete(ref plan) => {
592                tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
593                if let Err(e) = self.ensure_table_exists(&plan.table) {
594                    tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
595                    self.is_aborted = true;
596                    return Err(e);
597                }
598
599                // If table was created in this transaction, delete from staging
600                // Otherwise delete directly in BASE (with MVCC soft-delete)
601                let is_new_table = self.new_tables.contains(&plan.table);
602                tracing::debug!("[DELETE] is_new_table={}", is_new_table);
603                // Track access to existing table for conflict detection
604                if !is_new_table {
605                    tracing::debug!(
606                        "[DELETE] Tracking access to existing table '{}'",
607                        plan.table
608                    );
609                    self.accessed_tables.insert(plan.table.clone());
610                }
611                let result = if is_new_table {
612                    tracing::debug!("[DELETE] Deleting from staging for new table");
613                    self.staging.delete(plan.clone())
614                } else {
615                    tracing::debug!(
616                        "[DELETE] Deleting from BASE with txn_id={}",
617                        self.snapshot.txn_id
618                    );
619                    match self.base_context.delete(plan.clone()) {
620                        Ok(r) => r.convert_pager_type(),
621                        Err(Error::NotFound) if self.catalog_snapshot.table_exists(&plan.table) => {
622                            // Table existed in our snapshot but has been dropped by another
623                            // transaction. Return zero rows deleted; conflict will be detected at commit.
624                            tracing::debug!(
625                                "[DELETE] Table '{}' not found in current catalog but exists in snapshot; deferring conflict to commit",
626                                plan.table
627                            );
628                            Ok(TransactionResult::Delete { rows_deleted: 0 })
629                        }
630                        Err(e) => Err(e),
631                    }
632                };
633
634                tracing::debug!(
635                    "[DELETE] Result: {:?}",
636                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
637                );
638                match result {
639                    Ok(result) => {
640                        // Only track operations for NEW tables - they need replay on commit
641                        if is_new_table {
642                            tracing::trace!(
643                                "[TX] DELETE from new table - tracking for commit replay"
644                            );
645                            self.operations.push(PlanOperation::Delete(plan.clone()));
646                        } else {
647                            tracing::trace!(
648                                "[TX] DELETE from existing table - already in BASE, no replay needed"
649                            );
650                        }
651                        result
652                    }
653                    Err(e) => {
654                        self.is_aborted = true;
655                        return Err(e);
656                    }
657                }
658            }
659            PlanOperation::Truncate(ref plan) => {
660                tracing::debug!("[TRUNCATE] Starting truncate for table '{}'", plan.table);
661                if let Err(e) = self.ensure_table_exists(&plan.table) {
662                    tracing::debug!("[TRUNCATE] ensure_table_exists failed: {}", e);
663                    self.is_aborted = true;
664                    return Err(e);
665                }
666
667                // TRUNCATE is like DELETE without WHERE - same transaction handling
668                let is_new_table = self.new_tables.contains(&plan.table);
669                tracing::debug!("[TRUNCATE] is_new_table={}", is_new_table);
670                // Track access to existing table for conflict detection
671                if !is_new_table {
672                    tracing::debug!(
673                        "[TRUNCATE] Tracking access to existing table '{}'",
674                        plan.table
675                    );
676                    self.accessed_tables.insert(plan.table.clone());
677                }
678                let result = if is_new_table {
679                    tracing::debug!("[TRUNCATE] Truncating staging for new table");
680                    self.staging.truncate(plan.clone())
681                } else {
682                    tracing::debug!(
683                        "[TRUNCATE] Truncating BASE with txn_id={}",
684                        self.snapshot.txn_id
685                    );
686                    self.base_context
687                        .truncate(plan.clone())
688                        .and_then(|r| r.convert_pager_type())
689                };
690
691                tracing::debug!(
692                    "[TRUNCATE] Result: {:?}",
693                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
694                );
695                match result {
696                    Ok(result) => {
697                        // Only track operations for NEW tables - they need replay on commit
698                        if is_new_table {
699                            tracing::trace!(
700                                "[TX] TRUNCATE on new table - tracking for commit replay"
701                            );
702                            self.operations.push(PlanOperation::Truncate(plan.clone()));
703                        } else {
704                            tracing::trace!(
705                                "[TX] TRUNCATE on existing table - already in BASE, no replay needed"
706                            );
707                        }
708                        result
709                    }
710                    Err(e) => {
711                        self.is_aborted = true;
712                        return Err(e);
713                    }
714                }
715            }
716            PlanOperation::Select(plan) => {
717                // SELECT is read-only, not tracked for replay
718                // But still fails if transaction is aborted (already checked above)
719                let table_name = select_plan_table_name(plan.as_ref()).unwrap_or_default();
720                let plan = *plan;
721                match self.execute_select(plan) {
722                    Ok(staging_execution) => {
723                        // Collect staging execution into batches
724                        let schema = staging_execution.schema();
725                        let batches = staging_execution.collect().unwrap_or_default();
726
727                        // Combine into single batch
728                        let combined = if batches.is_empty() {
729                            RecordBatch::new_empty(Arc::clone(&schema))
730                        } else if batches.len() == 1 {
731                            batches.into_iter().next().unwrap()
732                        } else {
733                            let refs: Vec<&RecordBatch> = batches.iter().collect();
734                            arrow::compute::concat_batches(&schema, refs).map_err(|err| {
735                                Error::Internal(format!("failed to concatenate batches: {err}"))
736                            })?
737                        };
738
739                        // Return execution with combined batch
740                        let execution = SelectExecution::from_batch(
741                            table_name.clone(),
742                            Arc::clone(&schema),
743                            combined,
744                        );
745
746                        TransactionResult::Select {
747                            table_name,
748                            schema,
749                            execution,
750                        }
751                    }
752                    Err(e) => {
753                        // Don't abort on SELECT errors (like table not found)
754                        // Only Session layer aborts on constraint violations
755                        return Err(e);
756                    }
757                }
758            }
759        };
760
761        Ok(result)
762    }
763
764    /// Get the operations queued for commit
765    pub fn operations(&self) -> &[PlanOperation] {
766        &self.operations
767    }
768}
769
770/// A session handle for transaction management.
771///
772/// When dropped, automatically rolls back any active transaction.
773pub struct TransactionSession<BaseCtx, StagingCtx>
774where
775    BaseCtx: TransactionContext + 'static,
776    StagingCtx: TransactionContext + 'static,
777{
778    context: Arc<BaseCtx>,
779    session_id: TransactionSessionId,
780    transactions:
781        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
782    txn_manager: Arc<TxnIdManager>,
783}
784
785impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
786where
787    BaseCtx: TransactionContext + 'static,
788    StagingCtx: TransactionContext + 'static,
789{
790    pub fn new(
791        context: Arc<BaseCtx>,
792        session_id: TransactionSessionId,
793        transactions: Arc<
794            Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
795        >,
796        txn_manager: Arc<TxnIdManager>,
797    ) -> Self {
798        Self {
799            context,
800            session_id,
801            transactions,
802            txn_manager,
803        }
804    }
805
806    /// Clone this session (reuses the same session_id and shared transaction map).
807    /// This is necessary to maintain transaction state across Engine clones.
808    pub fn clone_session(&self) -> Self {
809        Self {
810            context: Arc::clone(&self.context),
811            session_id: self.session_id,
812            transactions: Arc::clone(&self.transactions),
813            txn_manager: Arc::clone(&self.txn_manager),
814        }
815    }
816
817    /// Get the session ID.
818    pub fn session_id(&self) -> TransactionSessionId {
819        self.session_id
820    }
821
822    /// Get the underlying context (for advanced use).
823    pub fn context(&self) -> &Arc<BaseCtx> {
824        &self.context
825    }
826
827    /// Check if this session has an active transaction.
828    pub fn has_active_transaction(&self) -> bool {
829        self.transactions
830            .lock()
831            .expect("transactions lock poisoned")
832            .contains_key(&self.session_id)
833    }
834
835    /// Check if the current transaction has been aborted due to an error.
836    pub fn is_aborted(&self) -> bool {
837        self.transactions
838            .lock()
839            .expect("transactions lock poisoned")
840            .get(&self.session_id)
841            .map(|tx| tx.is_aborted)
842            .unwrap_or(false)
843    }
844
845    /// Check if a table was created in the current active transaction.
846    /// Returns `true` if there's an active transaction and the table exists in its `new_tables` set.
847    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
848        self.transactions
849            .lock()
850            .expect("transactions lock poisoned")
851            .get(&self.session_id)
852            .map(|tx| tx.new_tables.contains(table_name))
853            .unwrap_or(false)
854    }
855
856    /// Get column specifications for a table created in the current transaction.
857    /// Returns `None` if there's no active transaction or the table wasn't created in it.
858    pub fn table_column_specs_from_transaction(
859        &self,
860        table_name: &str,
861    ) -> Option<Vec<PlanColumnSpec>> {
862        let guard = self
863            .transactions
864            .lock()
865            .expect("transactions lock poisoned");
866
867        let tx = guard.get(&self.session_id)?;
868        if !tx.new_tables.contains(table_name) {
869            return None;
870        }
871
872        // Get column specs from the staging context
873        tx.staging.table_column_specs(table_name).ok()
874    }
875
876    /// Get tables that reference the given table via foreign keys created in the current transaction.
877    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
878    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
879        let canonical = referenced_table.to_ascii_lowercase();
880        let guard = self
881            .transactions
882            .lock()
883            .expect("transactions lock poisoned");
884
885        let tx = match guard.get(&self.session_id) {
886            Some(tx) => tx,
887            None => return Vec::new(),
888        };
889
890        tx.transactional_foreign_keys
891            .get(&canonical)
892            .cloned()
893            .unwrap_or_else(Vec::new)
894    }
895
896    /// Check if a table is locked by another active session's transaction.
897    /// Returns true if ANY other session has this table in their locked_table_names.
898    pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
899        let canonical = table_name.to_ascii_lowercase();
900        let guard = self
901            .transactions
902            .lock()
903            .expect("transactions lock poisoned");
904
905        for (session_id, tx) in guard.iter() {
906            // Skip our own session
907            if *session_id == self.session_id {
908                continue;
909            }
910
911            // Check if this other session has the table locked
912            if tx.locked_table_names.contains(&canonical) {
913                return true;
914            }
915        }
916
917        false
918    }
919
920    /// Mark the current transaction as aborted due to an error.
921    /// This should be called when any error occurs during a transaction.
922    pub fn abort_transaction(&self) {
923        let mut guard = self
924            .transactions
925            .lock()
926            .expect("transactions lock poisoned");
927        if let Some(tx) = guard.get_mut(&self.session_id) {
928            tx.is_aborted = true;
929        }
930    }
931
932    /// Begin a transaction in this session.
933    pub fn begin_transaction(
934        &self,
935        staging: Arc<StagingCtx>,
936    ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
937        tracing::debug!(
938            "[BEGIN] begin_transaction called for session_id={}",
939            self.session_id
940        );
941        let mut guard = self
942            .transactions
943            .lock()
944            .expect("transactions lock poisoned");
945        tracing::debug!(
946            "[BEGIN] session_id={}, transactions map has {} entries",
947            self.session_id,
948            guard.len()
949        );
950        if guard.contains_key(&self.session_id) {
951            return Err(Error::InvalidArgumentError(
952                "a transaction is already in progress in this session".into(),
953            ));
954        }
955        guard.insert(
956            self.session_id,
957            SessionTransaction::new(
958                Arc::clone(&self.context),
959                staging,
960                Arc::clone(&self.txn_manager),
961            ),
962        );
963        tracing::debug!(
964            "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
965            self.session_id,
966            guard.len()
967        );
968        Ok(TransactionResult::Transaction {
969            kind: TransactionKind::Begin,
970        })
971    }
972
973    /// Commit the transaction in this session.
974    /// If the transaction is aborted, this acts as a ROLLBACK instead.
975    pub fn commit_transaction(
976        &self,
977    ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
978        tracing::trace!(
979            "[COMMIT] commit_transaction called for session {:?}",
980            self.session_id
981        );
982        let mut guard = self
983            .transactions
984            .lock()
985            .expect("transactions lock poisoned");
986        tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
987        let tx_opt = guard.remove(&self.session_id);
988        tracing::trace!(
989            "[COMMIT] commit_transaction remove returned: {}",
990            tx_opt.is_some()
991        );
992        let tx = tx_opt.ok_or_else(|| {
993            tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
994            Error::InvalidArgumentError(
995                "no transaction is currently in progress in this session".into(),
996            )
997        })?;
998        tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
999
1000        // If transaction is aborted, commit becomes a rollback (no operations to replay)
1001        if tx.is_aborted {
1002            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1003            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1004            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1005            // Reset context snapshot to auto-commit view (aborted txn's writes should be invisible)
1006            let auto_commit_snapshot = TransactionSnapshot {
1007                txn_id: TXN_ID_AUTO_COMMIT,
1008                snapshot_id: tx.txn_manager.last_committed(),
1009            };
1010            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1011            tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
1012            return Ok((
1013                TransactionResult::Transaction {
1014                    kind: TransactionKind::Rollback,
1015                },
1016                Vec::new(),
1017            ));
1018        }
1019
1020        // Check for write-write conflicts: detect if any accessed tables have been dropped or replaced
1021        // We captured (table_name, table_id) pairs at transaction start
1022        tracing::debug!(
1023            "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
1024            tx.snapshot.txn_id,
1025            tx.accessed_tables.len()
1026        );
1027        for accessed_table_name in &tx.accessed_tables {
1028            tracing::debug!(
1029                "[COMMIT CONFLICT CHECK] Checking table '{}'",
1030                accessed_table_name
1031            );
1032            // Get the table ID from our catalog snapshot at transaction start
1033            if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
1034                // Check current table state
1035                match self.context.table_id(accessed_table_name) {
1036                    Ok(current_table_id) => {
1037                        // If table ID changed, it was dropped and recreated
1038                        if current_table_id != snapshot_table_id {
1039                            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1040                            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1041                            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1042                            let auto_commit_snapshot = TransactionSnapshot {
1043                                txn_id: TXN_ID_AUTO_COMMIT,
1044                                snapshot_id: tx.txn_manager.last_committed(),
1045                            };
1046                            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1047                            return Err(Error::TransactionContextError(
1048                                "another transaction has dropped this table".into(),
1049                            ));
1050                        }
1051                    }
1052                    Err(_) => {
1053                        // Table no longer exists - it was dropped
1054                        tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1055                        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1056                        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1057                        let auto_commit_snapshot = TransactionSnapshot {
1058                            txn_id: TXN_ID_AUTO_COMMIT,
1059                            snapshot_id: tx.txn_manager.last_committed(),
1060                        };
1061                        TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1062                        return Err(Error::TransactionContextError(
1063                            "another transaction has dropped this table".into(),
1064                        ));
1065                    }
1066                }
1067            }
1068        }
1069
1070        if let Err(err) = tx
1071            .base_context
1072            .validate_commit_constraints(tx.snapshot.txn_id)
1073        {
1074            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1075            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1076            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1077            let auto_commit_snapshot = TransactionSnapshot {
1078                txn_id: TXN_ID_AUTO_COMMIT,
1079                snapshot_id: tx.txn_manager.last_committed(),
1080            };
1081            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1082            let wrapped = match err {
1083                Error::ConstraintError(msg) => Error::TransactionContextError(format!(
1084                    "TransactionContext Error: constraint violation: {msg}"
1085                )),
1086                other => other,
1087            };
1088            return Err(wrapped);
1089        }
1090
1091        let operations = tx.operations;
1092        tracing::trace!(
1093            "DEBUG commit_transaction: returning Commit with {} operations",
1094            operations.len()
1095        );
1096
1097        tx.txn_manager.mark_committed(tx.snapshot.txn_id);
1098        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1099        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1100        TransactionContext::set_snapshot(&*self.context, tx.snapshot);
1101
1102        Ok((
1103            TransactionResult::Transaction {
1104                kind: TransactionKind::Commit,
1105            },
1106            operations,
1107        ))
1108    }
1109
1110    /// Rollback the transaction in this session.
1111    pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1112        let mut guard = self
1113            .transactions
1114            .lock()
1115            .expect("transactions lock poisoned");
1116        if let Some(tx) = guard.remove(&self.session_id) {
1117            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1118            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1119            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1120            // Reset context snapshot to auto-commit view (rolled-back txn's writes should be invisible)
1121            let auto_commit_snapshot = TransactionSnapshot {
1122                txn_id: TXN_ID_AUTO_COMMIT,
1123                snapshot_id: tx.txn_manager.last_committed(),
1124            };
1125            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1126        } else {
1127            return Err(Error::InvalidArgumentError(
1128                "no transaction is currently in progress in this session".into(),
1129            ));
1130        }
1131        Ok(TransactionResult::Transaction {
1132            kind: TransactionKind::Rollback,
1133        })
1134    }
1135
1136    /// Execute an operation in this session's transaction, or directly if no transaction is active.
1137    pub fn execute_operation(
1138        &self,
1139        operation: PlanOperation,
1140    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1141        tracing::debug!(
1142            "[EXECUTE_OP] execute_operation called for session_id={}",
1143            self.session_id
1144        );
1145        if !self.has_active_transaction() {
1146            // No transaction - caller must handle direct execution
1147            return Err(Error::InvalidArgumentError(
1148                "execute_operation called without active transaction".into(),
1149            ));
1150        }
1151
1152        // Check for cross-transaction conflicts before executing
1153        if let PlanOperation::CreateTable(ref plan) = operation {
1154            let guard = self
1155                .transactions
1156                .lock()
1157                .expect("transactions lock poisoned");
1158
1159            let canonical_name = plan.name.to_ascii_lowercase();
1160
1161            // Check if another session has this table locked in their transaction
1162            for (other_session_id, other_tx) in guard.iter() {
1163                if *other_session_id != self.session_id
1164                    && other_tx.locked_table_names.contains(&canonical_name)
1165                {
1166                    return Err(Error::TransactionContextError(format!(
1167                        "table '{}' is locked by another active transaction",
1168                        plan.name
1169                    )));
1170                }
1171            }
1172            drop(guard); // Release lock before continuing
1173        }
1174
1175        // Also check DROP TABLE for conflicts
1176        if let PlanOperation::DropTable(ref plan) = operation {
1177            let guard = self
1178                .transactions
1179                .lock()
1180                .expect("transactions lock poisoned");
1181
1182            let canonical_name = plan.name.to_ascii_lowercase();
1183
1184            // Check if another session has this table locked in their transaction
1185            for (other_session_id, other_tx) in guard.iter() {
1186                if *other_session_id != self.session_id
1187                    && other_tx.locked_table_names.contains(&canonical_name)
1188                {
1189                    return Err(Error::TransactionContextError(format!(
1190                        "table '{}' is locked by another active transaction",
1191                        plan.name
1192                    )));
1193                }
1194            }
1195            drop(guard); // Release lock before continuing
1196        }
1197
1198        // In transaction - add to transaction and execute on staging context
1199        let mut guard = self
1200            .transactions
1201            .lock()
1202            .expect("transactions lock poisoned");
1203        tracing::debug!(
1204            "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1205            self.session_id,
1206            guard.len()
1207        );
1208        let tx = guard
1209            .get_mut(&self.session_id)
1210            .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1211        tracing::debug!(
1212            "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1213            self.session_id,
1214            tx.snapshot.txn_id,
1215            tx.accessed_tables.len()
1216        );
1217
1218        let result = tx.execute_operation(operation);
1219        if let Err(ref e) = result {
1220            tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1221            tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1222        }
1223        result
1224    }
1225}
1226
1227impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1228where
1229    BaseCtx: TransactionContext,
1230    StagingCtx: TransactionContext,
1231{
1232    fn drop(&mut self) {
1233        // Auto-rollback on drop if transaction is active
1234        // Handle poisoned mutex gracefully to avoid panic during cleanup
1235        match self.transactions.lock() {
1236            Ok(mut guard) => {
1237                if guard.remove(&self.session_id).is_some() {
1238                    tracing::warn!(
1239                        "TransactionSession dropped with active transaction - auto-rolling back"
1240                    );
1241                }
1242            }
1243            Err(_) => {
1244                // Mutex is poisoned, likely due to a panic elsewhere
1245                // Don't panic again during cleanup
1246                tracing::warn!("TransactionSession dropped with poisoned transaction mutex");
1247            }
1248        }
1249    }
1250}
1251
1252/// Transaction manager for coordinating sessions.
1253///
1254/// This is the top-level coordinator that creates sessions and manages
1255/// the shared transaction ID allocator.
1256pub struct TransactionManager<BaseCtx, StagingCtx>
1257where
1258    BaseCtx: TransactionContext + 'static,
1259    StagingCtx: TransactionContext + 'static,
1260{
1261    transactions:
1262        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1263    next_session_id: AtomicU64,
1264    txn_manager: Arc<TxnIdManager>,
1265}
1266
1267impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1268where
1269    BaseCtx: TransactionContext + 'static,
1270    StagingCtx: TransactionContext + 'static,
1271{
1272    pub fn new() -> Self {
1273        Self {
1274            transactions: Arc::new(Mutex::new(HashMap::new())),
1275            next_session_id: AtomicU64::new(1),
1276            txn_manager: Arc::new(TxnIdManager::new()),
1277        }
1278    }
1279
1280    /// Create a new TransactionManager with a custom initial transaction ID.
1281    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1282        Self {
1283            transactions: Arc::new(Mutex::new(HashMap::new())),
1284            next_session_id: AtomicU64::new(1),
1285            txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1286        }
1287    }
1288
1289    /// Create a new TransactionManager with custom initial state.
1290    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1291        Self {
1292            transactions: Arc::new(Mutex::new(HashMap::new())),
1293            next_session_id: AtomicU64::new(1),
1294            txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1295                next_txn_id,
1296                last_committed,
1297            )),
1298        }
1299    }
1300
1301    /// Create a new session for transaction management.
1302    pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1303        let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1304        tracing::debug!(
1305            "[TX_MANAGER] create_session: allocated session_id={}",
1306            session_id
1307        );
1308        TransactionSession::new(
1309            context,
1310            session_id,
1311            Arc::clone(&self.transactions),
1312            Arc::clone(&self.txn_manager),
1313        )
1314    }
1315
1316    /// Obtain the shared transaction ID manager.
1317    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1318        Arc::clone(&self.txn_manager)
1319    }
1320
1321    /// Check if there's an active transaction (checks if ANY session has a transaction).
1322    pub fn has_active_transaction(&self) -> bool {
1323        !self
1324            .transactions
1325            .lock()
1326            .expect("transactions lock poisoned")
1327            .is_empty()
1328    }
1329}
1330
1331impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1332where
1333    BaseCtx: TransactionContext + 'static,
1334    StagingCtx: TransactionContext + 'static,
1335{
1336    fn default() -> Self {
1337        Self::new()
1338    }
1339}