llkv_transaction/
lib.rs

1//! Transaction management and MVCC (Multi-Version Concurrency Control) for LLKV.
2//!
3//! This crate provides transaction isolation using MVCC semantics. Each transaction
4//! operates with a consistent snapshot of the database, determined by its transaction
5//! ID and snapshot timestamp.
6//!
7//! # Key Concepts
8//!
9//! - **Transaction ID ([`TxnId`])**: Unique 64-bit identifier for each transaction
10//! - **Snapshot Isolation**: Transactions see a consistent view of data as of their start time
11//! - **Row Versioning**: Each row tracks when it was created and deleted via `created_by` and `deleted_by` columns
12//! - **Transaction Snapshot ([`TransactionSnapshot`])**: Captures transaction ID and snapshot timestamp
13//!
14//! # Reserved Transaction IDs
15//!
16//! - **[`TXN_ID_NONE`] (u64::MAX)**: Indicates no transaction (uninitialized state)
17//! - **[`TXN_ID_AUTO_COMMIT`] (1)**: Used for auto-commit (single-statement) transactions
18//! - **IDs 2+**: Multi-statement transactions (allocated by [`TxnIdManager`])
19//!
20//! # Visibility Rules
21//!
22//! A row is visible to a transaction if:
23//! 1. It was created before the transaction's snapshot (`created_by <= snapshot_id`)
24//! 2. It was not deleted, or deleted after the snapshot (`deleted_by == TXN_ID_NONE || deleted_by > snapshot_id`)
25//!
26//! # Architecture
27//!
28//! - **[`TxnIdManager`]**: Allocates transaction IDs and tracks commit status
29//! - **[`TransactionSnapshot`]**: Immutable view of transaction state for visibility checks
30//! - **[`TransactionContext`]**: Main interface for executing operations within a transaction
31//! - **[`RowVersion`]**: Metadata tracking which transaction created/deleted a row
32pub mod mvcc;
33
34use std::collections::{HashMap, HashSet};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::{Arc, Mutex};
37
38use arrow::array::RecordBatch;
39use arrow::datatypes::Schema;
40
41pub use mvcc::{
42    RowVersion, TXN_ID_AUTO_COMMIT, TXN_ID_NONE, TransactionSnapshot, TxnId, TxnIdManager,
43};
44
45/// Session identifier type.
46///
47/// Session IDs track client sessions that may spawn multiple transactions.
48/// They are distinct from transaction IDs and managed separately.
49pub type SessionId = u64;
50
51use llkv_expr::expr::Expr as LlkvExpr;
52use llkv_plan::plans::{
53    ColumnSpec, CreateTablePlan, DeletePlan, InsertPlan, PlanOperation, PlanValue, SelectPlan,
54    UpdatePlan,
55};
56use llkv_result::{Error, Result as LlkvResult};
57use llkv_storage::pager::Pager;
58use simd_r_drive_entry_handle::EntryHandle;
59
60use llkv_executor::SelectExecution;
61
62// ============================================================================
63// Type Definitions
64// ============================================================================
65
66/// Simplified row batch for export/import
67pub struct RowBatch {
68    pub columns: Vec<String>,
69    pub rows: Vec<Vec<PlanValue>>,
70}
71
72/// Transaction kind enum
73#[derive(Clone, Debug)]
74pub enum TransactionKind {
75    Begin,
76    Commit,
77    Rollback,
78}
79
80/// Transaction result enum (simplified version for transaction module)
81#[allow(clippy::large_enum_variant)] // TODO: Consider refactoring large variants
82#[derive(Clone, Debug)]
83pub enum TransactionResult<P>
84where
85    P: Pager<Blob = EntryHandle> + Send + Sync,
86{
87    CreateTable {
88        table_name: String,
89    },
90    Insert {
91        rows_inserted: usize,
92    },
93    Update {
94        rows_matched: usize,
95        rows_updated: usize,
96    },
97    Delete {
98        rows_deleted: usize,
99    },
100    Select {
101        table_name: String,
102        schema: Arc<Schema>,
103        execution: SelectExecution<P>,
104    },
105    Transaction {
106        kind: TransactionKind,
107    },
108}
109
110impl<P> TransactionResult<P>
111where
112    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
113{
114    /// Convert pager type for compatibility
115    pub fn convert_pager_type<P2>(self) -> LlkvResult<TransactionResult<P2>>
116    where
117        P2: Pager<Blob = EntryHandle> + Send + Sync + 'static,
118    {
119        match self {
120            TransactionResult::CreateTable { table_name } => {
121                Ok(TransactionResult::CreateTable { table_name })
122            }
123            TransactionResult::Insert { rows_inserted } => {
124                Ok(TransactionResult::Insert { rows_inserted })
125            }
126            TransactionResult::Update {
127                rows_matched,
128                rows_updated,
129            } => Ok(TransactionResult::Update {
130                rows_matched,
131                rows_updated,
132            }),
133            TransactionResult::Delete { rows_deleted } => {
134                Ok(TransactionResult::Delete { rows_deleted })
135            }
136            TransactionResult::Transaction { kind } => Ok(TransactionResult::Transaction { kind }),
137            TransactionResult::Select { .. } => Err(Error::Internal(
138                "cannot convert SELECT TransactionResult between pager types".into(),
139            )),
140        }
141    }
142}
143
144// ============================================================================
145// Transaction Management Types
146// ============================================================================
147
148/// A trait for transaction context operations.
149/// This allows SessionTransaction to work with any context that implements these operations.
150/// The associated type P specifies the pager type this context uses.
151pub trait TransactionContext: Send + Sync {
152    /// The pager type used by this context
153    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
154
155    /// Update the snapshot used for MVCC visibility decisions.
156    fn set_snapshot(&self, snapshot: mvcc::TransactionSnapshot);
157
158    /// Get the snapshot currently associated with this context.
159    fn snapshot(&self) -> mvcc::TransactionSnapshot;
160
161    /// Get table column specifications
162    fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<ColumnSpec>>;
163
164    /// Export table rows for snapshotting
165    fn export_table_rows(&self, table_name: &str) -> LlkvResult<RowBatch>;
166
167    /// Get batches with row IDs for seeding updates
168    fn get_batches_with_row_ids(
169        &self,
170        table_name: &str,
171        filter: Option<LlkvExpr<'static, String>>,
172    ) -> LlkvResult<Vec<RecordBatch>>;
173
174    /// Execute a SELECT plan with this context's pager type
175    fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
176
177    /// Create a table from plan
178    fn create_table_plan(
179        &self,
180        plan: CreateTablePlan,
181    ) -> LlkvResult<TransactionResult<Self::Pager>>;
182
183    /// Insert rows
184    fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
185
186    /// Update rows
187    fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
188
189    /// Delete rows
190    fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
191
192    /// Append batches with row IDs
193    fn append_batches_with_row_ids(
194        &self,
195        table_name: &str,
196        batches: Vec<RecordBatch>,
197    ) -> LlkvResult<usize>;
198
199    /// Get table names for catalog snapshot
200    fn table_names(&self) -> Vec<String>;
201
202    /// Get table ID for a given table name (for conflict detection)
203    fn table_id(&self, table_name: &str) -> LlkvResult<llkv_table::types::TableId>;
204
205    /// Get an immutable catalog snapshot for transaction isolation
206    fn catalog_snapshot(&self) -> llkv_table::catalog::TableCatalogSnapshot;
207}
208
209/// Transaction state for the runtime context.
210pub struct SessionTransaction<BaseCtx, StagingCtx>
211where
212    BaseCtx: TransactionContext + 'static,
213    StagingCtx: TransactionContext + 'static,
214{
215    /// Transaction snapshot (contains txn id + snapshot watermark)
216    snapshot: mvcc::TransactionSnapshot,
217    /// Staging context with MemPager for isolation (only used for tables created in this txn).
218    staging: Arc<StagingCtx>,
219    /// Operations to replay on commit.
220    operations: Vec<PlanOperation>,
221    /// Tables that have been verified to exist (either in base or staging).
222    staged_tables: HashSet<String>,
223    /// Tables created within this transaction (live in staging until commit).
224    new_tables: HashSet<String>,
225    /// Tables known to be missing.
226    missing_tables: HashSet<String>,
227    /// Immutable catalog snapshot at transaction start (for isolation).
228    /// Contains table name→ID mappings. Replaces separate HashSet and HashMap.
229    catalog_snapshot: llkv_table::catalog::TableCatalogSnapshot,
230    /// Base context for reading existing tables with MVCC visibility.
231    base_context: Arc<BaseCtx>,
232    /// Whether this transaction has been aborted due to an error.
233    is_aborted: bool,
234    /// Transaction ID manager (shared across all transactions)
235    txn_manager: Arc<TxnIdManager>,
236    /// Tables accessed (names only) by this transaction
237    accessed_tables: HashSet<String>,
238}
239
240impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
241where
242    BaseCtx: TransactionContext + 'static,
243    StagingCtx: TransactionContext + 'static,
244{
245    pub fn new(
246        base_context: Arc<BaseCtx>,
247        staging: Arc<StagingCtx>,
248        txn_manager: Arc<TxnIdManager>,
249    ) -> Self {
250        // Get immutable catalog snapshot for transaction isolation
251        // This replaces the previous HashSet<String> and HashMap<String, TableId>
252        let catalog_snapshot = base_context.catalog_snapshot();
253
254        let snapshot = txn_manager.begin_transaction();
255        tracing::debug!(
256            "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
257            snapshot.txn_id,
258            snapshot.snapshot_id
259        );
260        TransactionContext::set_snapshot(&*base_context, snapshot);
261        TransactionContext::set_snapshot(&*staging, snapshot);
262
263        Self {
264            staging,
265            operations: Vec::new(),
266            staged_tables: HashSet::new(),
267            new_tables: HashSet::new(),
268            missing_tables: HashSet::new(),
269            catalog_snapshot,
270            base_context,
271            is_aborted: false,
272            accessed_tables: HashSet::new(),
273            snapshot,
274            txn_manager,
275        }
276    }
277
278    /// Ensure a table exists and is visible to this transaction.
279    /// NO COPYING - just check if table exists in base or was created in this transaction.
280    fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
281        tracing::trace!(
282            "[ENSURE] ensure_table_exists called for table='{}'",
283            table_name
284        );
285
286        // If we already checked this table, return early
287        if self.staged_tables.contains(table_name) {
288            tracing::trace!("[ENSURE] table already verified to exist");
289            return Ok(());
290        }
291
292        // Check if table exists in catalog snapshot OR was created in this transaction
293        if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
294        {
295            self.missing_tables.insert(table_name.to_string());
296            return Err(Error::CatalogError(format!(
297                "Catalog Error: Table '{table_name}' does not exist"
298            )));
299        }
300
301        if self.missing_tables.contains(table_name) {
302            return Err(Error::CatalogError(format!(
303                "Catalog Error: Table '{table_name}' does not exist"
304            )));
305        }
306
307        // For tables created in this transaction, also create in staging for isolation
308        if self.new_tables.contains(table_name) {
309            tracing::trace!("[ENSURE] Table was created in this transaction");
310            // Check if it exists in staging
311            match self.staging.table_column_specs(table_name) {
312                Ok(_) => {
313                    self.staged_tables.insert(table_name.to_string());
314                    return Ok(());
315                }
316                Err(_) => {
317                    return Err(Error::CatalogError(format!(
318                        "Catalog Error: Table '{table_name}' was created but not found in staging"
319                    )));
320                }
321            }
322        }
323
324        // Table exists in base - mark as verified
325        tracing::trace!(
326            "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
327        );
328        self.staged_tables.insert(table_name.to_string());
329        Ok(())
330    }
331
332    /// Execute a SELECT query within transaction isolation.
333    /// For tables created in this transaction: read from staging.
334    /// For existing tables: read from BASE with MVCC visibility filtering.
335    pub fn execute_select(
336        &mut self,
337        plan: SelectPlan,
338    ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
339        // Ensure table exists
340        self.ensure_table_exists(&plan.table)?;
341
342        // If table was created in this transaction, read from staging
343        if self.new_tables.contains(&plan.table) {
344            tracing::trace!(
345                "[SELECT] Reading from staging for new table '{}'",
346                plan.table
347            );
348            return self.staging.execute_select(plan);
349        }
350
351        // Track access to existing table for conflict detection
352        self.accessed_tables.insert(plan.table.clone());
353
354        // Otherwise read from BASE with MVCC visibility
355        // The base context already has the snapshot set in SessionTransaction::new()
356        tracing::trace!(
357            "[SELECT] Reading from BASE with MVCC for existing table '{}'",
358            plan.table
359        );
360        let table_name = plan.table.clone();
361        self.base_context.execute_select(plan).and_then(|exec| {
362            // Convert pager type from BaseCtx to StagingCtx
363            // This is a limitation of the current type system
364            // In practice, we're just collecting and re-packaging
365            let schema = exec.schema();
366            let batches = exec.collect().unwrap_or_default();
367            let combined = if batches.is_empty() {
368                RecordBatch::new_empty(Arc::clone(&schema))
369            } else if batches.len() == 1 {
370                batches.into_iter().next().unwrap()
371            } else {
372                let refs: Vec<&RecordBatch> = batches.iter().collect();
373                arrow::compute::concat_batches(&schema, refs).map_err(|err| {
374                    Error::Internal(format!("failed to concatenate batches: {err}"))
375                })?
376            };
377            Ok(SelectExecution::from_batch(
378                table_name,
379                Arc::clone(&schema),
380                combined,
381            ))
382        })
383    }
384
385    /// Execute an operation in the transaction staging context
386    pub fn execute_operation(
387        &mut self,
388        operation: PlanOperation,
389    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
390        tracing::trace!(
391            "[TX] SessionTransaction::execute_operation called, operation={:?}",
392            match &operation {
393                PlanOperation::Insert(p) => format!("INSERT({})", p.table),
394                PlanOperation::Update(p) => format!("UPDATE({})", p.table),
395                PlanOperation::Delete(p) => format!("DELETE({})", p.table),
396                PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
397                _ => "OTHER".to_string(),
398            }
399        );
400        // Check if transaction is aborted
401        if self.is_aborted {
402            return Err(Error::TransactionContextError(
403                "TransactionContext Error: transaction is aborted".into(),
404            ));
405        }
406
407        // Execute operation and catch errors to mark transaction as aborted
408        let result = match operation {
409            PlanOperation::CreateTable(ref plan) => {
410                match self.staging.create_table_plan(plan.clone()) {
411                    Ok(result) => {
412                        // Track new table so it's visible to subsequent operations in this transaction
413                        self.new_tables.insert(plan.name.clone());
414                        self.missing_tables.remove(&plan.name);
415                        self.staged_tables.insert(plan.name.clone());
416                        // Track for commit replay
417                        self.operations
418                            .push(PlanOperation::CreateTable(plan.clone()));
419                        result.convert_pager_type()?
420                    }
421                    Err(e) => {
422                        self.is_aborted = true;
423                        return Err(e);
424                    }
425                }
426            }
427            PlanOperation::Insert(ref plan) => {
428                tracing::trace!(
429                    "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
430                    plan.table
431                );
432                // Ensure table exists
433                if let Err(e) = self.ensure_table_exists(&plan.table) {
434                    self.is_aborted = true;
435                    return Err(e);
436                }
437
438                // If table was created in this transaction, insert into staging
439                // Otherwise insert directly into BASE (with transaction ID tagging)
440                let is_new_table = self.new_tables.contains(&plan.table);
441                // Track access to existing table for conflict detection
442                if !is_new_table {
443                    self.accessed_tables.insert(plan.table.clone());
444                }
445                let result = if is_new_table {
446                    tracing::trace!("[TX] INSERT into staging for new table");
447                    self.staging.insert(plan.clone())
448                } else {
449                    tracing::trace!(
450                        "[TX] INSERT directly into BASE with txn_id={}",
451                        self.snapshot.txn_id
452                    );
453                    // Insert into base - MVCC tagging happens automatically in insert_rows()
454                    self.base_context
455                        .insert(plan.clone())
456                        .and_then(|r| r.convert_pager_type())
457                };
458
459                match result {
460                    Ok(result) => {
461                        // Only track operations for NEW tables - they need replay on commit
462                        // For existing tables, changes are already in BASE with MVCC tags
463                        if is_new_table {
464                            tracing::trace!(
465                                "[TX] INSERT to new table - tracking for commit replay"
466                            );
467                            self.operations.push(PlanOperation::Insert(plan.clone()));
468                        } else {
469                            tracing::trace!(
470                                "[TX] INSERT to existing table - already in BASE, no replay needed"
471                            );
472                        }
473                        result
474                    }
475                    Err(e) => {
476                        tracing::trace!(
477                            "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
478                            e
479                        );
480                        tracing::trace!("DEBUG setting is_aborted=true");
481                        self.is_aborted = true;
482                        return Err(e);
483                    }
484                }
485            }
486            PlanOperation::Update(ref plan) => {
487                if let Err(e) = self.ensure_table_exists(&plan.table) {
488                    self.is_aborted = true;
489                    return Err(e);
490                }
491
492                // If table was created in this transaction, update in staging
493                // Otherwise update directly in BASE (with MVCC soft-delete + insert)
494                let is_new_table = self.new_tables.contains(&plan.table);
495                // Track access to existing table for conflict detection
496                if !is_new_table {
497                    self.accessed_tables.insert(plan.table.clone());
498                }
499                let result = if is_new_table {
500                    tracing::trace!("[TX] UPDATE in staging for new table");
501                    self.staging.update(plan.clone())
502                } else {
503                    tracing::trace!(
504                        "[TX] UPDATE directly in BASE with txn_id={}",
505                        self.snapshot.txn_id
506                    );
507                    self.base_context
508                        .update(plan.clone())
509                        .and_then(|r| r.convert_pager_type())
510                };
511
512                match result {
513                    Ok(result) => {
514                        // Only track operations for NEW tables - they need replay on commit
515                        if is_new_table {
516                            tracing::trace!(
517                                "[TX] UPDATE to new table - tracking for commit replay"
518                            );
519                            self.operations.push(PlanOperation::Update(plan.clone()));
520                        } else {
521                            tracing::trace!(
522                                "[TX] UPDATE to existing table - already in BASE, no replay needed"
523                            );
524                        }
525                        result
526                    }
527                    Err(e) => {
528                        self.is_aborted = true;
529                        return Err(e);
530                    }
531                }
532            }
533            PlanOperation::Delete(ref plan) => {
534                tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
535                if let Err(e) = self.ensure_table_exists(&plan.table) {
536                    tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
537                    self.is_aborted = true;
538                    return Err(e);
539                }
540
541                // If table was created in this transaction, delete from staging
542                // Otherwise delete directly in BASE (with MVCC soft-delete)
543                let is_new_table = self.new_tables.contains(&plan.table);
544                tracing::debug!("[DELETE] is_new_table={}", is_new_table);
545                // Track access to existing table for conflict detection
546                if !is_new_table {
547                    tracing::debug!(
548                        "[DELETE] Tracking access to existing table '{}'",
549                        plan.table
550                    );
551                    self.accessed_tables.insert(plan.table.clone());
552                }
553                let result = if is_new_table {
554                    tracing::debug!("[DELETE] Deleting from staging for new table");
555                    self.staging.delete(plan.clone())
556                } else {
557                    tracing::debug!(
558                        "[DELETE] Deleting from BASE with txn_id={}",
559                        self.snapshot.txn_id
560                    );
561                    self.base_context
562                        .delete(plan.clone())
563                        .and_then(|r| r.convert_pager_type())
564                };
565
566                tracing::debug!(
567                    "[DELETE] Result: {:?}",
568                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
569                );
570                match result {
571                    Ok(result) => {
572                        // Only track operations for NEW tables - they need replay on commit
573                        if is_new_table {
574                            tracing::trace!(
575                                "[TX] DELETE from new table - tracking for commit replay"
576                            );
577                            self.operations.push(PlanOperation::Delete(plan.clone()));
578                        } else {
579                            tracing::trace!(
580                                "[TX] DELETE from existing table - already in BASE, no replay needed"
581                            );
582                        }
583                        result
584                    }
585                    Err(e) => {
586                        self.is_aborted = true;
587                        return Err(e);
588                    }
589                }
590            }
591            PlanOperation::Select(ref plan) => {
592                // SELECT is read-only, not tracked for replay
593                // But still fails if transaction is aborted (already checked above)
594                let table_name = plan.table.clone();
595                match self.execute_select(plan.clone()) {
596                    Ok(staging_execution) => {
597                        // Collect staging execution into batches
598                        let schema = staging_execution.schema();
599                        let batches = staging_execution.collect().unwrap_or_default();
600
601                        // Combine into single batch
602                        let combined = if batches.is_empty() {
603                            RecordBatch::new_empty(Arc::clone(&schema))
604                        } else if batches.len() == 1 {
605                            batches.into_iter().next().unwrap()
606                        } else {
607                            let refs: Vec<&RecordBatch> = batches.iter().collect();
608                            arrow::compute::concat_batches(&schema, refs).map_err(|err| {
609                                Error::Internal(format!("failed to concatenate batches: {err}"))
610                            })?
611                        };
612
613                        // Return execution with combined batch
614                        let execution = SelectExecution::from_batch(
615                            table_name.clone(),
616                            Arc::clone(&schema),
617                            combined,
618                        );
619
620                        TransactionResult::Select {
621                            table_name,
622                            schema,
623                            execution,
624                        }
625                    }
626                    Err(e) => {
627                        // Don't abort on SELECT errors (like table not found)
628                        // Only Session layer aborts on constraint violations
629                        return Err(e);
630                    }
631                }
632            }
633        };
634
635        Ok(result)
636    }
637
638    /// Get the operations queued for commit
639    pub fn operations(&self) -> &[PlanOperation] {
640        &self.operations
641    }
642}
643
644/// A session handle for transaction management.
645/// When dropped, automatically rolls back any active transaction.
646pub struct TransactionSession<BaseCtx, StagingCtx>
647where
648    BaseCtx: TransactionContext + 'static,
649    StagingCtx: TransactionContext + 'static,
650{
651    context: Arc<BaseCtx>,
652    session_id: SessionId,
653    transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
654    txn_manager: Arc<TxnIdManager>,
655}
656
657impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
658where
659    BaseCtx: TransactionContext + 'static,
660    StagingCtx: TransactionContext + 'static,
661{
662    pub fn new(
663        context: Arc<BaseCtx>,
664        session_id: SessionId,
665        transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
666        txn_manager: Arc<TxnIdManager>,
667    ) -> Self {
668        Self {
669            context,
670            session_id,
671            transactions,
672            txn_manager,
673        }
674    }
675
676    /// Clone this session (reuses the same session_id and shared transaction map).
677    /// This is necessary to maintain transaction state across Engine clones.
678    pub fn clone_session(&self) -> Self {
679        Self {
680            context: Arc::clone(&self.context),
681            session_id: self.session_id,
682            transactions: Arc::clone(&self.transactions),
683            txn_manager: Arc::clone(&self.txn_manager),
684        }
685    }
686
687    /// Get the session ID.
688    pub fn session_id(&self) -> SessionId {
689        self.session_id
690    }
691
692    /// Get the underlying context (for advanced use).
693    pub fn context(&self) -> &Arc<BaseCtx> {
694        &self.context
695    }
696
697    /// Check if this session has an active transaction.
698    pub fn has_active_transaction(&self) -> bool {
699        self.transactions
700            .lock()
701            .expect("transactions lock poisoned")
702            .contains_key(&self.session_id)
703    }
704
705    /// Check if the current transaction has been aborted due to an error.
706    pub fn is_aborted(&self) -> bool {
707        self.transactions
708            .lock()
709            .expect("transactions lock poisoned")
710            .get(&self.session_id)
711            .map(|tx| tx.is_aborted)
712            .unwrap_or(false)
713    }
714
715    /// Mark the current transaction as aborted due to an error.
716    /// This should be called when any error occurs during a transaction.
717    pub fn abort_transaction(&self) {
718        let mut guard = self
719            .transactions
720            .lock()
721            .expect("transactions lock poisoned");
722        if let Some(tx) = guard.get_mut(&self.session_id) {
723            tx.is_aborted = true;
724        }
725    }
726
727    /// Begin a transaction in this session.
728    pub fn begin_transaction(
729        &self,
730        staging: Arc<StagingCtx>,
731    ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
732        tracing::debug!(
733            "[BEGIN] begin_transaction called for session_id={}",
734            self.session_id
735        );
736        let mut guard = self
737            .transactions
738            .lock()
739            .expect("transactions lock poisoned");
740        tracing::debug!(
741            "[BEGIN] session_id={}, transactions map has {} entries",
742            self.session_id,
743            guard.len()
744        );
745        if guard.contains_key(&self.session_id) {
746            return Err(Error::InvalidArgumentError(
747                "a transaction is already in progress in this session".into(),
748            ));
749        }
750        guard.insert(
751            self.session_id,
752            SessionTransaction::new(
753                Arc::clone(&self.context),
754                staging,
755                Arc::clone(&self.txn_manager),
756            ),
757        );
758        tracing::debug!(
759            "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
760            self.session_id,
761            guard.len()
762        );
763        Ok(TransactionResult::Transaction {
764            kind: TransactionKind::Begin,
765        })
766    }
767
768    /// Commit the transaction in this session.
769    /// If the transaction is aborted, this acts as a ROLLBACK instead.
770    pub fn commit_transaction(
771        &self,
772    ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
773        tracing::trace!(
774            "[COMMIT] commit_transaction called for session {:?}",
775            self.session_id
776        );
777        let mut guard = self
778            .transactions
779            .lock()
780            .expect("transactions lock poisoned");
781        tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
782        let tx_opt = guard.remove(&self.session_id);
783        tracing::trace!(
784            "[COMMIT] commit_transaction remove returned: {}",
785            tx_opt.is_some()
786        );
787        let tx = tx_opt.ok_or_else(|| {
788            tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
789            Error::InvalidArgumentError(
790                "no transaction is currently in progress in this session".into(),
791            )
792        })?;
793        tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
794
795        // If transaction is aborted, commit becomes a rollback (no operations to replay)
796        if tx.is_aborted {
797            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
798            // Reset context snapshot to auto-commit view (aborted txn's writes should be invisible)
799            let auto_commit_snapshot = TransactionSnapshot {
800                txn_id: TXN_ID_AUTO_COMMIT,
801                snapshot_id: tx.txn_manager.last_committed(),
802            };
803            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
804            tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
805            return Ok((
806                TransactionResult::Transaction {
807                    kind: TransactionKind::Rollback,
808                },
809                Vec::new(),
810            ));
811        }
812
813        // Check for write-write conflicts: detect if any accessed tables have been dropped or replaced
814        // We captured (table_name, table_id) pairs at transaction start
815        tracing::debug!(
816            "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
817            tx.snapshot.txn_id,
818            tx.accessed_tables.len()
819        );
820        for accessed_table_name in &tx.accessed_tables {
821            tracing::debug!(
822                "[COMMIT CONFLICT CHECK] Checking table '{}'",
823                accessed_table_name
824            );
825            // Get the table ID from our catalog snapshot at transaction start
826            if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
827                // Check current table state
828                match self.context.table_id(accessed_table_name) {
829                    Ok(current_table_id) => {
830                        // If table ID changed, it was dropped and recreated
831                        if current_table_id != snapshot_table_id {
832                            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
833                            let auto_commit_snapshot = TransactionSnapshot {
834                                txn_id: TXN_ID_AUTO_COMMIT,
835                                snapshot_id: tx.txn_manager.last_committed(),
836                            };
837                            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
838                            return Err(Error::TransactionContextError(
839                                "another transaction has dropped this table".into(),
840                            ));
841                        }
842                    }
843                    Err(_) => {
844                        // Table no longer exists - it was dropped
845                        tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
846                        let auto_commit_snapshot = TransactionSnapshot {
847                            txn_id: TXN_ID_AUTO_COMMIT,
848                            snapshot_id: tx.txn_manager.last_committed(),
849                        };
850                        TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
851                        return Err(Error::TransactionContextError(
852                            "another transaction has dropped this table".into(),
853                        ));
854                    }
855                }
856            }
857        }
858
859        let operations = tx.operations;
860        tracing::trace!(
861            "DEBUG commit_transaction: returning Commit with {} operations",
862            operations.len()
863        );
864
865        tx.txn_manager.mark_committed(tx.snapshot.txn_id);
866        TransactionContext::set_snapshot(&*self.context, tx.snapshot);
867
868        Ok((
869            TransactionResult::Transaction {
870                kind: TransactionKind::Commit,
871            },
872            operations,
873        ))
874    }
875
876    /// Rollback the transaction in this session.
877    pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
878        let mut guard = self
879            .transactions
880            .lock()
881            .expect("transactions lock poisoned");
882        if let Some(tx) = guard.remove(&self.session_id) {
883            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
884            // Reset context snapshot to auto-commit view (rolled-back txn's writes should be invisible)
885            let auto_commit_snapshot = TransactionSnapshot {
886                txn_id: TXN_ID_AUTO_COMMIT,
887                snapshot_id: tx.txn_manager.last_committed(),
888            };
889            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
890        } else {
891            return Err(Error::InvalidArgumentError(
892                "no transaction is currently in progress in this session".into(),
893            ));
894        }
895        Ok(TransactionResult::Transaction {
896            kind: TransactionKind::Rollback,
897        })
898    }
899
900    /// Execute an operation in this session's transaction, or directly if no transaction is active.
901    pub fn execute_operation(
902        &self,
903        operation: PlanOperation,
904    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
905        tracing::debug!(
906            "[EXECUTE_OP] execute_operation called for session_id={}",
907            self.session_id
908        );
909        if !self.has_active_transaction() {
910            // No transaction - caller must handle direct execution
911            return Err(Error::InvalidArgumentError(
912                "execute_operation called without active transaction".into(),
913            ));
914        }
915
916        // In transaction - add to transaction and execute on staging context
917        let mut guard = self
918            .transactions
919            .lock()
920            .expect("transactions lock poisoned");
921        tracing::debug!(
922            "[EXECUTE_OP] session_id={}, transactions map has {} entries",
923            self.session_id,
924            guard.len()
925        );
926        let tx = guard
927            .get_mut(&self.session_id)
928            .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
929        tracing::debug!(
930            "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
931            self.session_id,
932            tx.snapshot.txn_id,
933            tx.accessed_tables.len()
934        );
935
936        let result = tx.execute_operation(operation);
937        if let Err(ref e) = result {
938            tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
939            tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
940        }
941        result
942    }
943}
944
945impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
946where
947    BaseCtx: TransactionContext,
948    StagingCtx: TransactionContext,
949{
950    fn drop(&mut self) {
951        // Auto-rollback on drop if transaction is active
952        // Handle poisoned mutex gracefully to avoid panic during cleanup
953        match self.transactions.lock() {
954            Ok(mut guard) => {
955                if guard.remove(&self.session_id).is_some() {
956                    eprintln!(
957                        "Warning: TransactionSession dropped with active transaction - auto-rolling back"
958                    );
959                }
960            }
961            Err(_) => {
962                // Mutex is poisoned, likely due to a panic elsewhere
963                // Don't panic again during cleanup
964                tracing::trace!(
965                    "Warning: TransactionSession dropped with poisoned transaction mutex"
966                );
967            }
968        }
969    }
970}
971
972/// Transaction manager for coordinating sessions
973pub struct TransactionManager<BaseCtx, StagingCtx>
974where
975    BaseCtx: TransactionContext + 'static,
976    StagingCtx: TransactionContext + 'static,
977{
978    transactions: Arc<Mutex<HashMap<SessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
979    next_session_id: AtomicU64,
980    txn_manager: Arc<TxnIdManager>,
981}
982
983impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
984where
985    BaseCtx: TransactionContext + 'static,
986    StagingCtx: TransactionContext + 'static,
987{
988    pub fn new() -> Self {
989        Self {
990            transactions: Arc::new(Mutex::new(HashMap::new())),
991            next_session_id: AtomicU64::new(1),
992            txn_manager: Arc::new(TxnIdManager::new()),
993        }
994    }
995
996    /// Create a new TransactionManager with a custom initial transaction ID.
997    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
998        Self {
999            transactions: Arc::new(Mutex::new(HashMap::new())),
1000            next_session_id: AtomicU64::new(1),
1001            txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1002        }
1003    }
1004
1005    /// Create a new TransactionManager with custom initial state.
1006    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1007        Self {
1008            transactions: Arc::new(Mutex::new(HashMap::new())),
1009            next_session_id: AtomicU64::new(1),
1010            txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1011                next_txn_id,
1012                last_committed,
1013            )),
1014        }
1015    }
1016
1017    /// Create a new session for transaction management.
1018    pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1019        let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1020        tracing::debug!(
1021            "[TX_MANAGER] create_session: allocated session_id={}",
1022            session_id
1023        );
1024        TransactionSession::new(
1025            context,
1026            session_id,
1027            Arc::clone(&self.transactions),
1028            Arc::clone(&self.txn_manager),
1029        )
1030    }
1031
1032    /// Obtain the shared transaction ID manager.
1033    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1034        Arc::clone(&self.txn_manager)
1035    }
1036
1037    /// Check if there's an active transaction (checks if ANY session has a transaction).
1038    pub fn has_active_transaction(&self) -> bool {
1039        !self
1040            .transactions
1041            .lock()
1042            .expect("transactions lock poisoned")
1043            .is_empty()
1044    }
1045}
1046
1047impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1048where
1049    BaseCtx: TransactionContext + 'static,
1050    StagingCtx: TransactionContext + 'static,
1051{
1052    fn default() -> Self {
1053        Self::new()
1054    }
1055}