llkv_transaction/
lib.rs

1//! Transaction management and MVCC (Multi-Version Concurrency Control) for LLKV.
2//!
3//! This crate provides transaction isolation using MVCC semantics. Each transaction
4//! operates with a consistent snapshot of the database, determined by its transaction
5//! ID and snapshot timestamp.
6//!
7//! # Key Concepts
8//!
9//! - **Transaction ID ([`TxnId`])**: Unique 64-bit identifier for each transaction
10//! - **Snapshot Isolation**: Transactions see a consistent view of data as of their start time
11//! - **Row Versioning**: Each row tracks when it was created and deleted via `created_by` and `deleted_by` columns
12//! - **Transaction Snapshot ([`TransactionSnapshot`])**: Captures transaction ID and snapshot timestamp
13//!
14//! # Reserved Transaction IDs
15//!
16//! - **[`TXN_ID_NONE`] (u64::MAX)**: Indicates no transaction (uninitialized state)
17//! - **[`TXN_ID_AUTO_COMMIT`] (1)**: Used for auto-commit (single-statement) transactions
18//! - **IDs 2+**: Multi-statement transactions (allocated by [`TxnIdManager`])
19//!
20//! # Visibility Rules
21//!
22//! A row is visible to a transaction if:
23//! 1. It was created before the transaction's snapshot (`created_by <= snapshot_id`)
24//! 2. It was not deleted, or deleted after the snapshot (`deleted_by == TXN_ID_NONE || deleted_by > snapshot_id`)
25//!
26//! # Architecture
27//!
28//! - **[`TxnIdManager`]**: Allocates transaction IDs and tracks commit status
29//! - **[`TransactionSnapshot`]**: Immutable view of transaction state for visibility checks
30//! - **[`TransactionContext`]**: Main interface for executing operations within a transaction
31//! - **[`RowVersion`]**: Metadata tracking which transaction created/deleted a row
32pub mod mvcc;
33
34use std::collections::{HashMap, HashSet};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, Mutex};
37
38use arrow::array::RecordBatch;
39use arrow::datatypes::Schema;
40
41pub use mvcc::{
42    RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionSnapshot, TxnId, TxnIdManager,
43};
44
45/// Session identifier type.
46///
47/// Session IDs track client sessions that may spawn multiple transactions.
48/// They are distinct from transaction IDs and managed separately.
49pub type SessionId = u64;
50
51use llkv_expr::expr::Expr as LlkvExpr;
52use llkv_plan::plans::{
53    ColumnSpec, CreateIndexPlan, CreateTablePlan, DeletePlan, InsertPlan, PlanOperation, PlanValue,
54    SelectPlan, UpdatePlan,
55};
56use llkv_result::{Error, Result as LlkvResult};
57use llkv_storage::pager::Pager;
58use simd_r_drive_entry_handle::EntryHandle;
59
60use llkv_executor::SelectExecution;
61
62// ============================================================================
63// Type Definitions
64// ============================================================================
65
66/// Simplified row batch for export/import
67pub struct RowBatch {
68    pub columns: Vec<String>,
69    pub rows: Vec<Vec<PlanValue>>,
70}
71
72/// Transaction kind enum
73#[derive(Clone, Debug)]
74pub enum TransactionKind {
75    Begin,
76    Commit,
77    Rollback,
78}
79
80/// Helper to extract table name from SelectPlan (only for single-table queries)
81fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
82    if plan.tables.len() == 1 {
83        Some(plan.tables[0].qualified_name())
84    } else {
85        None
86    }
87}
88
89/// Transaction result enum (simplified version for transaction module)
90#[allow(clippy::large_enum_variant)] // TODO: Consider refactoring large variants
91#[derive(Clone, Debug)]
92pub enum TransactionResult<P>
93where
94    P: Pager<Blob = EntryHandle> + Send + Sync,
95{
96    CreateTable {
97        table_name: String,
98    },
99    Insert {
100        rows_inserted: usize,
101    },
102    Update {
103        rows_matched: usize,
104        rows_updated: usize,
105    },
106    Delete {
107        rows_deleted: usize,
108    },
109    CreateIndex {
110        table_name: String,
111        index_name: Option<String>,
112    },
113    Select {
114        table_name: String,
115        schema: Arc<Schema>,
116        execution: SelectExecution<P>,
117    },
118    Transaction {
119        kind: TransactionKind,
120    },
121}
122
123impl<P> TransactionResult<P>
124where
125    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
126{
127    /// Convert pager type for compatibility
128    pub fn convert_pager_type<P2>(self) -> LlkvResult<TransactionResult<P2>>
129    where
130        P2: Pager<Blob = EntryHandle> + Send + Sync + 'static,
131    {
132        match self {
133            TransactionResult::CreateTable { table_name } => {
134                Ok(TransactionResult::CreateTable { table_name })
135            }
136            TransactionResult::Insert { rows_inserted } => {
137                Ok(TransactionResult::Insert { rows_inserted })
138            }
139            TransactionResult::Update {
140                rows_matched,
141                rows_updated,
142            } => Ok(TransactionResult::Update {
143                rows_matched,
144                rows_updated,
145            }),
146            TransactionResult::Delete { rows_deleted } => {
147                Ok(TransactionResult::Delete { rows_deleted })
148            }
149            TransactionResult::CreateIndex {
150                table_name,
151                index_name,
152            } => Ok(TransactionResult::CreateIndex {
153                table_name,
154                index_name,
155            }),
156            TransactionResult::Transaction { kind } => Ok(TransactionResult::Transaction { kind }),
157            TransactionResult::Select { .. } => Err(Error::Internal(
158                "cannot convert SELECT TransactionResult between pager types".into(),
159            )),
160        }
161    }
162}
163
164// ============================================================================
165// Transaction Management Types
166// ============================================================================
167
168/// A trait for transaction context operations.
169/// This allows SessionTransaction to work with any context that implements these operations.
170/// The associated type P specifies the pager type this context uses.
171pub trait TransactionContext: Send + Sync {
172    /// The pager type used by this context
173    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
174
175    /// Update the snapshot used for MVCC visibility decisions.
176    fn set_snapshot(&self, snapshot: mvcc::TransactionSnapshot);
177
178    /// Get the snapshot currently associated with this context.
179    fn snapshot(&self) -> mvcc::TransactionSnapshot;
180
181    /// Get table column specifications
182    fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<ColumnSpec>>;
183
184    /// Export table rows for snapshotting
185    fn export_table_rows(&self, table_name: &str) -> LlkvResult<RowBatch>;
186
187    /// Get batches with row IDs for seeding updates
188    fn get_batches_with_row_ids(
189        &self,
190        table_name: &str,
191        filter: Option<LlkvExpr<'static, String>>,
192    ) -> LlkvResult<Vec<RecordBatch>>;
193
194    /// Execute a SELECT plan with this context's pager type
195    fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
196
197    /// Create a table from plan
198    fn create_table_plan(
199        &self,
200        plan: CreateTablePlan,
201    ) -> LlkvResult<TransactionResult<Self::Pager>>;
202
203    /// Insert rows
204    fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
205
206    /// Update rows
207    fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
208
209    /// Delete rows
210    fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
211
212    /// Create an index
213    fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
214
215    /// Append batches with row IDs
216    fn append_batches_with_row_ids(
217        &self,
218        table_name: &str,
219        batches: Vec<RecordBatch>,
220    ) -> LlkvResult<usize>;
221
222    /// Get table names for catalog snapshot
223    fn table_names(&self) -> Vec<String>;
224
225    /// Get table ID for a given table name (for conflict detection)
226    fn table_id(&self, table_name: &str) -> LlkvResult<llkv_table::types::TableId>;
227
228    /// Get an immutable catalog snapshot for transaction isolation
229    fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot;
230
231    /// Validate any pending commit-time constraints for this transaction.
232    fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
233        Ok(())
234    }
235
236    /// Clear any transaction-scoped state retained by the context.
237    fn clear_transaction_state(&self, _txn_id: TxnId) {}
238}
239
240/// Transaction state for the runtime context.
241pub struct SessionTransaction<BaseCtx, StagingCtx>
242where
243    BaseCtx: TransactionContext + 'static,
244    StagingCtx: TransactionContext + 'static,
245{
246    /// Transaction snapshot (contains txn id + snapshot watermark)
247    snapshot: mvcc::TransactionSnapshot,
248    /// Staging context with MemPager for isolation (only used for tables created in this txn).
249    staging: Arc<StagingCtx>,
250    /// Operations to replay on commit.
251    operations: Vec<PlanOperation>,
252    /// Tables that have been verified to exist (either in base or staging).
253    staged_tables: HashSet<String>,
254    /// Tables created within this transaction (live in staging until commit).
255    new_tables: HashSet<String>,
256    /// Tables known to be missing.
257    missing_tables: HashSet<String>,
258    /// Immutable catalog snapshot at transaction start (for isolation).
259    /// Contains table name→ID mappings. Replaces separate HashSet and HashMap.
260    catalog_snapshot: llkv_table::catalog::TableCatalogSnapshot,
261    /// Base context for reading existing tables with MVCC visibility.
262    base_context: Arc<BaseCtx>,
263    /// Whether this transaction has been aborted due to an error.
264    is_aborted: bool,
265    /// Transaction ID manager (shared across all transactions)
266    txn_manager: Arc<TxnIdManager>,
267    /// Tables accessed (names only) by this transaction
268    accessed_tables: HashSet<String>,
269}
270
271impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
272where
273    BaseCtx: TransactionContext + 'static,
274    StagingCtx: TransactionContext + 'static,
275{
276    pub fn new(
277        base_context: Arc<BaseCtx>,
278        staging: Arc<StagingCtx>,
279        txn_manager: Arc<TxnIdManager>,
280    ) -> Self {
281        // Get immutable catalog snapshot for transaction isolation
282        // This replaces the previous HashSet<String> and HashMap<String, TableId>
283        let catalog_snapshot = base_context.catalog_snapshot();
284
285        let snapshot = txn_manager.begin_transaction();
286        tracing::debug!(
287            "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
288            snapshot.txn_id,
289            snapshot.snapshot_id
290        );
291        TransactionContext::set_snapshot(&*base_context, snapshot);
292        TransactionContext::set_snapshot(&*staging, snapshot);
293
294        Self {
295            staging,
296            operations: Vec::new(),
297            staged_tables: HashSet::new(),
298            new_tables: HashSet::new(),
299            missing_tables: HashSet::new(),
300            catalog_snapshot,
301            base_context,
302            is_aborted: false,
303            accessed_tables: HashSet::new(),
304            snapshot,
305            txn_manager,
306        }
307    }
308
309    /// Ensure a table exists and is visible to this transaction.
310    /// NO COPYING - just check if table exists in base or was created in this transaction.
311    fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
312        tracing::trace!(
313            "[ENSURE] ensure_table_exists called for table='{}'",
314            table_name
315        );
316
317        // If we already checked this table, return early
318        if self.staged_tables.contains(table_name) {
319            tracing::trace!("[ENSURE] table already verified to exist");
320            return Ok(());
321        }
322
323        // Check if table exists in catalog snapshot OR was created in this transaction
324        if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
325        {
326            self.missing_tables.insert(table_name.to_string());
327            return Err(Error::CatalogError(format!(
328                "Catalog Error: Table '{table_name}' does not exist"
329            )));
330        }
331
332        if self.missing_tables.contains(table_name) {
333            return Err(Error::CatalogError(format!(
334                "Catalog Error: Table '{table_name}' does not exist"
335            )));
336        }
337
338        // For tables created in this transaction, also create in staging for isolation
339        if self.new_tables.contains(table_name) {
340            tracing::trace!("[ENSURE] Table was created in this transaction");
341            // Check if it exists in staging
342            match self.staging.table_column_specs(table_name) {
343                Ok(_) => {
344                    self.staged_tables.insert(table_name.to_string());
345                    return Ok(());
346                }
347                Err(_) => {
348                    return Err(Error::CatalogError(format!(
349                        "Catalog Error: Table '{table_name}' was created but not found in staging"
350                    )));
351                }
352            }
353        }
354
355        // Table exists in base - mark as verified
356        tracing::trace!(
357            "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
358        );
359        self.staged_tables.insert(table_name.to_string());
360        Ok(())
361    }
362
363    /// Execute a SELECT query within transaction isolation.
364    /// For tables created in this transaction: read from staging.
365    /// For existing tables: read from BASE with MVCC visibility filtering.
366    pub fn execute_select(
367        &mut self,
368        plan: SelectPlan,
369    ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
370        // Get table name (for single-table queries only)
371        let table_name = select_plan_table_name(&plan).ok_or_else(|| {
372            Error::InvalidArgumentError(
373                "Transaction execute_select requires single-table query".into(),
374            )
375        })?;
376
377        // Ensure table exists
378        self.ensure_table_exists(&table_name)?;
379
380        // If table was created in this transaction, read from staging
381        if self.new_tables.contains(&table_name) {
382            tracing::trace!(
383                "[SELECT] Reading from staging for new table '{}'",
384                table_name
385            );
386            return self.staging.execute_select(plan);
387        }
388
389        // Track access to existing table for conflict detection
390        self.accessed_tables.insert(table_name.clone());
391
392        // Otherwise read from BASE with MVCC visibility
393        // The base context already has the snapshot set in SessionTransaction::new()
394        tracing::trace!(
395            "[SELECT] Reading from BASE with MVCC for existing table '{}'",
396            table_name
397        );
398        self.base_context.execute_select(plan).and_then(|exec| {
399            // Convert pager type from BaseCtx to StagingCtx
400            // This is a limitation of the current type system
401            // In practice, we're just collecting and re-packaging
402            let schema = exec.schema();
403            let batches = exec.collect().unwrap_or_default();
404            let combined = if batches.is_empty() {
405                RecordBatch::new_empty(Arc::clone(&schema))
406            } else if batches.len() == 1 {
407                batches.into_iter().next().unwrap()
408            } else {
409                let refs: Vec<&RecordBatch> = batches.iter().collect();
410                arrow::compute::concat_batches(&schema, refs).map_err(|err| {
411                    Error::Internal(format!("failed to concatenate batches: {err}"))
412                })?
413            };
414            Ok(SelectExecution::from_batch(
415                table_name,
416                Arc::clone(&schema),
417                combined,
418            ))
419        })
420    }
421
422    /// Execute an operation in the transaction staging context
423    pub fn execute_operation(
424        &mut self,
425        operation: PlanOperation,
426    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
427        tracing::trace!(
428            "[TX] SessionTransaction::execute_operation called, operation={:?}",
429            match &operation {
430                PlanOperation::Insert(p) => format!("INSERT({})", p.table),
431                PlanOperation::Update(p) => format!("UPDATE({})", p.table),
432                PlanOperation::Delete(p) => format!("DELETE({})", p.table),
433                PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
434                _ => "OTHER".to_string(),
435            }
436        );
437        // Check if transaction is aborted
438        if self.is_aborted {
439            return Err(Error::TransactionContextError(
440                "TransactionContext Error: transaction is aborted".into(),
441            ));
442        }
443
444        // Execute operation and catch errors to mark transaction as aborted
445        let result = match operation {
446            PlanOperation::CreateTable(ref plan) => {
447                match self.staging.create_table_plan(plan.clone()) {
448                    Ok(result) => {
449                        // Track new table so it's visible to subsequent operations in this transaction
450                        self.new_tables.insert(plan.name.clone());
451                        self.missing_tables.remove(&plan.name);
452                        self.staged_tables.insert(plan.name.clone());
453                        // Track for commit replay
454                        self.operations
455                            .push(PlanOperation::CreateTable(plan.clone()));
456                        result.convert_pager_type()?
457                    }
458                    Err(e) => {
459                        self.is_aborted = true;
460                        return Err(e);
461                    }
462                }
463            }
464            PlanOperation::Insert(ref plan) => {
465                tracing::trace!(
466                    "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
467                    plan.table
468                );
469                // Ensure table exists
470                if let Err(e) = self.ensure_table_exists(&plan.table) {
471                    self.is_aborted = true;
472                    return Err(e);
473                }
474
475                // If table was created in this transaction, insert into staging
476                // Otherwise insert directly into BASE (with transaction ID tagging)
477                let is_new_table = self.new_tables.contains(&plan.table);
478                // Track access to existing table for conflict detection
479                if !is_new_table {
480                    self.accessed_tables.insert(plan.table.clone());
481                }
482                let result = if is_new_table {
483                    tracing::trace!("[TX] INSERT into staging for new table");
484                    self.staging.insert(plan.clone())
485                } else {
486                    tracing::trace!(
487                        "[TX] INSERT directly into BASE with txn_id={}",
488                        self.snapshot.txn_id
489                    );
490                    // Insert into base - MVCC tagging happens automatically in insert_rows()
491                    self.base_context
492                        .insert(plan.clone())
493                        .and_then(|r| r.convert_pager_type())
494                };
495
496                match result {
497                    Ok(result) => {
498                        // Only track operations for NEW tables - they need replay on commit
499                        // For existing tables, changes are already in BASE with MVCC tags
500                        if is_new_table {
501                            tracing::trace!(
502                                "[TX] INSERT to new table - tracking for commit replay"
503                            );
504                            self.operations.push(PlanOperation::Insert(plan.clone()));
505                        } else {
506                            tracing::trace!(
507                                "[TX] INSERT to existing table - already in BASE, no replay needed"
508                            );
509                        }
510                        result
511                    }
512                    Err(e) => {
513                        tracing::trace!(
514                            "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
515                            e
516                        );
517                        tracing::trace!("DEBUG setting is_aborted=true");
518                        self.is_aborted = true;
519                        return Err(e);
520                    }
521                }
522            }
523            PlanOperation::Update(ref plan) => {
524                if let Err(e) = self.ensure_table_exists(&plan.table) {
525                    self.is_aborted = true;
526                    return Err(e);
527                }
528
529                // If table was created in this transaction, update in staging
530                // Otherwise update directly in BASE (with MVCC soft-delete + insert)
531                let is_new_table = self.new_tables.contains(&plan.table);
532                // Track access to existing table for conflict detection
533                if !is_new_table {
534                    self.accessed_tables.insert(plan.table.clone());
535                }
536                let result = if is_new_table {
537                    tracing::trace!("[TX] UPDATE in staging for new table");
538                    self.staging.update(plan.clone())
539                } else {
540                    tracing::trace!(
541                        "[TX] UPDATE directly in BASE with txn_id={}",
542                        self.snapshot.txn_id
543                    );
544                    self.base_context
545                        .update(plan.clone())
546                        .and_then(|r| r.convert_pager_type())
547                };
548
549                match result {
550                    Ok(result) => {
551                        // Only track operations for NEW tables - they need replay on commit
552                        if is_new_table {
553                            tracing::trace!(
554                                "[TX] UPDATE to new table - tracking for commit replay"
555                            );
556                            self.operations.push(PlanOperation::Update(plan.clone()));
557                        } else {
558                            tracing::trace!(
559                                "[TX] UPDATE to existing table - already in BASE, no replay needed"
560                            );
561                        }
562                        result
563                    }
564                    Err(e) => {
565                        self.is_aborted = true;
566                        return Err(e);
567                    }
568                }
569            }
570            PlanOperation::Delete(ref plan) => {
571                tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
572                if let Err(e) = self.ensure_table_exists(&plan.table) {
573                    tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
574                    self.is_aborted = true;
575                    return Err(e);
576                }
577
578                // If table was created in this transaction, delete from staging
579                // Otherwise delete directly in BASE (with MVCC soft-delete)
580                let is_new_table = self.new_tables.contains(&plan.table);
581                tracing::debug!("[DELETE] is_new_table={}", is_new_table);
582                // Track access to existing table for conflict detection
583                if !is_new_table {
584                    tracing::debug!(
585                        "[DELETE] Tracking access to existing table '{}'",
586                        plan.table
587                    );
588                    self.accessed_tables.insert(plan.table.clone());
589                }
590                let result = if is_new_table {
591                    tracing::debug!("[DELETE] Deleting from staging for new table");
592                    self.staging.delete(plan.clone())
593                } else {
594                    tracing::debug!(
595                        "[DELETE] Deleting from BASE with txn_id={}",
596                        self.snapshot.txn_id
597                    );
598                    self.base_context
599                        .delete(plan.clone())
600                        .and_then(|r| r.convert_pager_type())
601                };
602
603                tracing::debug!(
604                    "[DELETE] Result: {:?}",
605                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
606                );
607                match result {
608                    Ok(result) => {
609                        // Only track operations for NEW tables - they need replay on commit
610                        if is_new_table {
611                            tracing::trace!(
612                                "[TX] DELETE from new table - tracking for commit replay"
613                            );
614                            self.operations.push(PlanOperation::Delete(plan.clone()));
615                        } else {
616                            tracing::trace!(
617                                "[TX] DELETE from existing table - already in BASE, no replay needed"
618                            );
619                        }
620                        result
621                    }
622                    Err(e) => {
623                        self.is_aborted = true;
624                        return Err(e);
625                    }
626                }
627            }
628            PlanOperation::Select(ref plan) => {
629                // SELECT is read-only, not tracked for replay
630                // But still fails if transaction is aborted (already checked above)
631                let table_name = select_plan_table_name(plan).unwrap_or_default();
632                match self.execute_select(plan.clone()) {
633                    Ok(staging_execution) => {
634                        // Collect staging execution into batches
635                        let schema = staging_execution.schema();
636                        let batches = staging_execution.collect().unwrap_or_default();
637
638                        // Combine into single batch
639                        let combined = if batches.is_empty() {
640                            RecordBatch::new_empty(Arc::clone(&schema))
641                        } else if batches.len() == 1 {
642                            batches.into_iter().next().unwrap()
643                        } else {
644                            let refs: Vec<&RecordBatch> = batches.iter().collect();
645                            arrow::compute::concat_batches(&schema, refs).map_err(|err| {
646                                Error::Internal(format!("failed to concatenate batches: {err}"))
647                            })?
648                        };
649
650                        // Return execution with combined batch
651                        let execution = SelectExecution::from_batch(
652                            table_name.clone(),
653                            Arc::clone(&schema),
654                            combined,
655                        );
656
657                        TransactionResult::Select {
658                            table_name,
659                            schema,
660                            execution,
661                        }
662                    }
663                    Err(e) => {
664                        // Don't abort on SELECT errors (like table not found)
665                        // Only Session layer aborts on constraint violations
666                        return Err(e);
667                    }
668                }
669            }
670        };
671
672        Ok(result)
673    }
674
675    /// Get the operations queued for commit
676    pub fn operations(&self) -> &[PlanOperation] {
677        &self.operations
678    }
679}
680
681/// A session handle for transaction management.
682/// When dropped, automatically rolls back any active transaction.
683pub struct TransactionSession<BaseCtx, StagingCtx>
684where
685    BaseCtx: TransactionContext + 'static,
686    StagingCtx: TransactionContext + 'static,
687{
688    context: Arc<BaseCtx>,
689    session_id: SessionId,
690    transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
691    txn_manager: Arc<TxnIdManager>,
692}
693
694impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
695where
696    BaseCtx: TransactionContext + 'static,
697    StagingCtx: TransactionContext + 'static,
698{
699    pub fn new(
700        context: Arc<BaseCtx>,
701        session_id: SessionId,
702        transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
703        txn_manager: Arc<TxnIdManager>,
704    ) -> Self {
705        Self {
706            context,
707            session_id,
708            transactions,
709            txn_manager,
710        }
711    }
712
713    /// Clone this session (reuses the same session_id and shared transaction map).
714    /// This is necessary to maintain transaction state across Engine clones.
715    pub fn clone_session(&self) -> Self {
716        Self {
717            context: Arc::clone(&self.context),
718            session_id: self.session_id,
719            transactions: Arc::clone(&self.transactions),
720            txn_manager: Arc::clone(&self.txn_manager),
721        }
722    }
723
724    /// Get the session ID.
725    pub fn session_id(&self) -> SessionId {
726        self.session_id
727    }
728
729    /// Get the underlying context (for advanced use).
730    pub fn context(&self) -> &Arc<BaseCtx> {
731        &self.context
732    }
733
734    /// Check if this session has an active transaction.
735    pub fn has_active_transaction(&self) -> bool {
736        self.transactions
737            .lock()
738            .expect("transactions lock poisoned")
739            .contains_key(&self.session_id)
740    }
741
742    /// Check if the current transaction has been aborted due to an error.
743    pub fn is_aborted(&self) -> bool {
744        self.transactions
745            .lock()
746            .expect("transactions lock poisoned")
747            .get(&self.session_id)
748            .map(|tx| tx.is_aborted)
749            .unwrap_or(false)
750    }
751
752    /// Mark the current transaction as aborted due to an error.
753    /// This should be called when any error occurs during a transaction.
754    pub fn abort_transaction(&self) {
755        let mut guard = self
756            .transactions
757            .lock()
758            .expect("transactions lock poisoned");
759        if let Some(tx) = guard.get_mut(&self.session_id) {
760            tx.is_aborted = true;
761        }
762    }
763
764    /// Begin a transaction in this session.
765    pub fn begin_transaction(
766        &self,
767        staging: Arc<StagingCtx>,
768    ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
769        tracing::debug!(
770            "[BEGIN] begin_transaction called for session_id={}",
771            self.session_id
772        );
773        let mut guard = self
774            .transactions
775            .lock()
776            .expect("transactions lock poisoned");
777        tracing::debug!(
778            "[BEGIN] session_id={}, transactions map has {} entries",
779            self.session_id,
780            guard.len()
781        );
782        if guard.contains_key(&self.session_id) {
783            return Err(Error::InvalidArgumentError(
784                "a transaction is already in progress in this session".into(),
785            ));
786        }
787        guard.insert(
788            self.session_id,
789            SessionTransaction::new(
790                Arc::clone(&self.context),
791                staging,
792                Arc::clone(&self.txn_manager),
793            ),
794        );
795        tracing::debug!(
796            "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
797            self.session_id,
798            guard.len()
799        );
800        Ok(TransactionResult::Transaction {
801            kind: TransactionKind::Begin,
802        })
803    }
804
805    /// Commit the transaction in this session.
806    /// If the transaction is aborted, this acts as a ROLLBACK instead.
807    pub fn commit_transaction(
808        &self,
809    ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
810        tracing::trace!(
811            "[COMMIT] commit_transaction called for session {:?}",
812            self.session_id
813        );
814        let mut guard = self
815            .transactions
816            .lock()
817            .expect("transactions lock poisoned");
818        tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
819        let tx_opt = guard.remove(&self.session_id);
820        tracing::trace!(
821            "[COMMIT] commit_transaction remove returned: {}",
822            tx_opt.is_some()
823        );
824        let tx = tx_opt.ok_or_else(|| {
825            tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
826            Error::InvalidArgumentError(
827                "no transaction is currently in progress in this session".into(),
828            )
829        })?;
830        tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
831
832        // If transaction is aborted, commit becomes a rollback (no operations to replay)
833        if tx.is_aborted {
834            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
835            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
836            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
837            // Reset context snapshot to auto-commit view (aborted txn's writes should be invisible)
838            let auto_commit_snapshot = TransactionSnapshot {
839                txn_id: TXN_ID_AUTO_COMMIT,
840                snapshot_id: tx.txn_manager.last_committed(),
841            };
842            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
843            tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
844            return Ok((
845                TransactionResult::Transaction {
846                    kind: TransactionKind::Rollback,
847                },
848                Vec::new(),
849            ));
850        }
851
852        // Check for write-write conflicts: detect if any accessed tables have been dropped or replaced
853        // We captured (table_name, table_id) pairs at transaction start
854        tracing::debug!(
855            "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
856            tx.snapshot.txn_id,
857            tx.accessed_tables.len()
858        );
859        for accessed_table_name in &tx.accessed_tables {
860            tracing::debug!(
861                "[COMMIT CONFLICT CHECK] Checking table '{}'",
862                accessed_table_name
863            );
864            // Get the table ID from our catalog snapshot at transaction start
865            if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
866                // Check current table state
867                match self.context.table_id(accessed_table_name) {
868                    Ok(current_table_id) => {
869                        // If table ID changed, it was dropped and recreated
870                        if current_table_id != snapshot_table_id {
871                            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
872                            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
873                            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
874                            let auto_commit_snapshot = TransactionSnapshot {
875                                txn_id: TXN_ID_AUTO_COMMIT,
876                                snapshot_id: tx.txn_manager.last_committed(),
877                            };
878                            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
879                            return Err(Error::TransactionContextError(
880                                "another transaction has dropped this table".into(),
881                            ));
882                        }
883                    }
884                    Err(_) => {
885                        // Table no longer exists - it was dropped
886                        tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
887                        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
888                        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
889                        let auto_commit_snapshot = TransactionSnapshot {
890                            txn_id: TXN_ID_AUTO_COMMIT,
891                            snapshot_id: tx.txn_manager.last_committed(),
892                        };
893                        TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
894                        return Err(Error::TransactionContextError(
895                            "another transaction has dropped this table".into(),
896                        ));
897                    }
898                }
899            }
900        }
901
902        if let Err(err) = tx
903            .base_context
904            .validate_commit_constraints(tx.snapshot.txn_id)
905        {
906            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
907            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
908            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
909            let auto_commit_snapshot = TransactionSnapshot {
910                txn_id: TXN_ID_AUTO_COMMIT,
911                snapshot_id: tx.txn_manager.last_committed(),
912            };
913            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
914            let wrapped = match err {
915                Error::ConstraintError(msg) => Error::TransactionContextError(format!(
916                    "TransactionContext Error: constraint violation: {msg}"
917                )),
918                other => other,
919            };
920            return Err(wrapped);
921        }
922
923        let operations = tx.operations;
924        tracing::trace!(
925            "DEBUG commit_transaction: returning Commit with {} operations",
926            operations.len()
927        );
928
929        tx.txn_manager.mark_committed(tx.snapshot.txn_id);
930        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
931        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
932        TransactionContext::set_snapshot(&*self.context, tx.snapshot);
933
934        Ok((
935            TransactionResult::Transaction {
936                kind: TransactionKind::Commit,
937            },
938            operations,
939        ))
940    }
941
942    /// Rollback the transaction in this session.
943    pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
944        let mut guard = self
945            .transactions
946            .lock()
947            .expect("transactions lock poisoned");
948        if let Some(tx) = guard.remove(&self.session_id) {
949            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
950            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
951            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
952            // Reset context snapshot to auto-commit view (rolled-back txn's writes should be invisible)
953            let auto_commit_snapshot = TransactionSnapshot {
954                txn_id: TXN_ID_AUTO_COMMIT,
955                snapshot_id: tx.txn_manager.last_committed(),
956            };
957            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
958        } else {
959            return Err(Error::InvalidArgumentError(
960                "no transaction is currently in progress in this session".into(),
961            ));
962        }
963        Ok(TransactionResult::Transaction {
964            kind: TransactionKind::Rollback,
965        })
966    }
967
968    /// Execute an operation in this session's transaction, or directly if no transaction is active.
969    pub fn execute_operation(
970        &self,
971        operation: PlanOperation,
972    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
973        tracing::debug!(
974            "[EXECUTE_OP] execute_operation called for session_id={}",
975            self.session_id
976        );
977        if !self.has_active_transaction() {
978            // No transaction - caller must handle direct execution
979            return Err(Error::InvalidArgumentError(
980                "execute_operation called without active transaction".into(),
981            ));
982        }
983
984        // In transaction - add to transaction and execute on staging context
985        let mut guard = self
986            .transactions
987            .lock()
988            .expect("transactions lock poisoned");
989        tracing::debug!(
990            "[EXECUTE_OP] session_id={}, transactions map has {} entries",
991            self.session_id,
992            guard.len()
993        );
994        let tx = guard
995            .get_mut(&self.session_id)
996            .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
997        tracing::debug!(
998            "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
999            self.session_id,
1000            tx.snapshot.txn_id,
1001            tx.accessed_tables.len()
1002        );
1003
1004        let result = tx.execute_operation(operation);
1005        if let Err(ref e) = result {
1006            tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1007            tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1008        }
1009        result
1010    }
1011}
1012
1013impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1014where
1015    BaseCtx: TransactionContext,
1016    StagingCtx: TransactionContext,
1017{
1018    fn drop(&mut self) {
1019        // Auto-rollback on drop if transaction is active
1020        // Handle poisoned mutex gracefully to avoid panic during cleanup
1021        match self.transactions.lock() {
1022            Ok(mut guard) => {
1023                if guard.remove(&self.session_id).is_some() {
1024                    eprintln!(
1025                        "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1026                    );
1027                }
1028            }
1029            Err(_) => {
1030                // Mutex is poisoned, likely due to a panic elsewhere
1031                // Don't panic again during cleanup
1032                tracing::trace!(
1033                    "Warning: TransactionSession dropped with poisoned transaction mutex"
1034                );
1035            }
1036        }
1037    }
1038}
1039
1040/// Transaction manager for coordinating sessions
1041pub struct TransactionManager<BaseCtx, StagingCtx>
1042where
1043    BaseCtx: TransactionContext + 'static,
1044    StagingCtx: TransactionContext + 'static,
1045{
1046    transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1047    next_session_id: AtomicU64,
1048    txn_manager: Arc<TxnIdManager>,
1049}
1050
1051impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1052where
1053    BaseCtx: TransactionContext + 'static,
1054    StagingCtx: TransactionContext + 'static,
1055{
1056    pub fn new() -> Self {
1057        Self {
1058            transactions: Arc::new(Mutex::new(HashMap::new())),
1059            next_session_id: AtomicU64::new(1),
1060            txn_manager: Arc::new(TxnIdManager::new()),
1061        }
1062    }
1063
1064    /// Create a new TransactionManager with a custom initial transaction ID.
1065    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1066        Self {
1067            transactions: Arc::new(Mutex::new(HashMap::new())),
1068            next_session_id: AtomicU64::new(1),
1069            txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1070        }
1071    }
1072
1073    /// Create a new TransactionManager with custom initial state.
1074    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1075        Self {
1076            transactions: Arc::new(Mutex::new(HashMap::new())),
1077            next_session_id: AtomicU64::new(1),
1078            txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1079                next_txn_id,
1080                last_committed,
1081            )),
1082        }
1083    }
1084
1085    /// Create a new session for transaction management.
1086    pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1087        let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1088        tracing::debug!(
1089            "[TX_MANAGER] create_session: allocated session_id={}",
1090            session_id
1091        );
1092        TransactionSession::new(
1093            context,
1094            session_id,
1095            Arc::clone(&self.transactions),
1096            Arc::clone(&self.txn_manager),
1097        )
1098    }
1099
1100    /// Obtain the shared transaction ID manager.
1101    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1102        Arc::clone(&self.txn_manager)
1103    }
1104
1105    /// Check if there's an active transaction (checks if ANY session has a transaction).
1106    pub fn has_active_transaction(&self) -> bool {
1107        !self
1108            .transactions
1109            .lock()
1110            .expect("transactions lock poisoned")
1111            .is_empty()
1112    }
1113}
1114
1115impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1116where
1117    BaseCtx: TransactionContext + 'static,
1118    StagingCtx: TransactionContext + 'static,
1119{
1120    fn default() -> Self {
1121        Self::new()
1122    }
1123}