llkv_transaction/
context.rs

1//! Transaction context types for orchestrating transaction lifecycles.
2//!
3//! This module contains the core types for managing transactions:
4//! - [`TransactionContext`]: Trait defining operations a context must support
5//! - [`SessionTransaction`]: Per-transaction state machine
6//! - [`TransactionSession`]: Session-level transaction management
7//! - [`TransactionManager`]: Cross-session transaction coordination
8
9// TODO: Swap with `FxHashMap` and `FxHashSet` for performance?
10use std::collections::{HashMap, HashSet};
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::{Arc, Mutex};
13
14use arrow::array::RecordBatch;
15use llkv_column_map::types::TableId;
16use llkv_executor::{ExecutorRowBatch, SelectExecution};
17use llkv_expr::expr::Expr as LlkvExpr;
18use llkv_plan::plans::{
19    CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
20    PlanOperation, SelectPlan, TruncatePlan, UpdatePlan,
21};
22use llkv_result::{Error, Result as LlkvResult};
23use llkv_storage::pager::Pager;
24use llkv_table::CatalogDdl;
25use simd_r_drive_entry_handle::EntryHandle;
26
27use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
28use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
29
30/// Session identifier type.
31///
32/// Session IDs track client sessions that may spawn multiple transactions.
33/// They are distinct from transaction IDs and managed separately.
34pub type TransactionSessionId = u64;
35
36/// Extracts table name from SelectPlan for single-table queries.
37fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
38    if plan.tables.len() == 1 {
39        Some(plan.tables[0].qualified_name())
40    } else {
41        None
42    }
43}
44
45/// A trait for transaction context operations.
46///
47/// This allows [`SessionTransaction`] to work with any context that implements these operations.
48/// The associated type `Pager` specifies the pager type this context uses.
49pub trait TransactionContext: CatalogDdl + Send + Sync {
50    /// The pager type used by this context
51    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
52
53    /// Snapshot representation returned by this context.
54    type Snapshot: TransactionCatalogSnapshot;
55
56    /// Update the snapshot used for MVCC visibility decisions.
57    fn set_snapshot(&self, snapshot: TransactionSnapshot);
58
59    /// Get the snapshot currently associated with this context.
60    fn snapshot(&self) -> TransactionSnapshot;
61
62    /// Get table column specifications
63    fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
64
65    /// Export table rows for snapshotting
66    fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
67
68    /// Get batches with row IDs for seeding updates
69    fn get_batches_with_row_ids(
70        &self,
71        table_name: &str,
72        filter: Option<LlkvExpr<'static, String>>,
73    ) -> LlkvResult<Vec<RecordBatch>>;
74
75    /// Execute a SELECT plan with this context's pager type
76    fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
77
78    /// Create a table from plan
79    fn apply_create_table_plan(
80        &self,
81        plan: CreateTablePlan,
82    ) -> LlkvResult<TransactionResult<Self::Pager>>;
83
84    /// Drop a table
85    fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
86
87    /// Insert rows
88    fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
89
90    /// Update rows
91    fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
92
93    /// Delete rows
94    fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
95
96    /// Truncate table (delete all rows)
97    fn truncate(&self, plan: TruncatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
98
99    /// Create an index
100    fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
101
102    /// Append batches with row IDs
103    fn append_batches_with_row_ids(
104        &self,
105        table_name: &str,
106        batches: Vec<RecordBatch>,
107    ) -> LlkvResult<usize>;
108
109    /// Get table names for catalog snapshot
110    fn table_names(&self) -> Vec<String>;
111
112    /// Get table ID for a given table name (for conflict detection)
113    fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
114
115    /// Get an immutable catalog snapshot for transaction isolation
116    fn catalog_snapshot(&self) -> Self::Snapshot;
117
118    /// Validate any pending commit-time constraints for this transaction.
119    fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
120        Ok(())
121    }
122
123    /// Clear any transaction-scoped state retained by the context.
124    fn clear_transaction_state(&self, _txn_id: TxnId) {}
125}
126
127/// Transaction state for the runtime context.
128///
129/// This struct manages the lifecycle of a single transaction, tracking:
130/// - Operations to replay on commit
131/// - Tables created/dropped/accessed within the transaction
132/// - Foreign key constraints for validation
133/// - Transaction isolation via MVCC snapshots
134pub struct SessionTransaction<BaseCtx, StagingCtx>
135where
136    BaseCtx: TransactionContext + 'static,
137    StagingCtx: TransactionContext + 'static,
138{
139    /// Transaction snapshot (contains txn id + snapshot watermark)
140    snapshot: TransactionSnapshot,
141    /// Staging context with MemPager for isolation (only used for tables created in this txn).
142    staging: Arc<StagingCtx>,
143    /// Operations to replay on commit.
144    operations: Vec<PlanOperation>,
145    /// Tables that have been verified to exist (either in base or staging).
146    staged_tables: HashSet<String>,
147    /// Tables created within this transaction (live in staging until commit).
148    new_tables: HashSet<String>,
149    /// Tables known to be missing.
150    missing_tables: HashSet<String>,
151    /// All table names locked by this transaction for DDL operations (for conflict detection).
152    /// This includes tables that were created, dropped, or both within the transaction.
153    locked_table_names: HashSet<String>,
154    /// Immutable catalog snapshot at transaction start (for isolation).
155    /// Contains table name→ID mappings. Replaces separate HashSet and HashMap.
156    catalog_snapshot: BaseCtx::Snapshot,
157    /// Base context for reading existing tables with MVCC visibility.
158    base_context: Arc<BaseCtx>,
159    /// Whether this transaction has been aborted due to an error.
160    is_aborted: bool,
161    /// Transaction ID manager (shared across all transactions)
162    txn_manager: Arc<TxnIdManager>,
163    /// Tables accessed (names only) by this transaction
164    accessed_tables: HashSet<String>,
165    /// Foreign key constraints created in this transaction.
166    /// Maps referenced_table_name -> Vec<referencing_table_name>
167    /// Used to check FK dependencies when dropping tables within a transaction.
168    transactional_foreign_keys: HashMap<String, Vec<String>>,
169}
170
171impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
172where
173    BaseCtx: TransactionContext + 'static,
174    StagingCtx: TransactionContext + 'static,
175{
176    pub fn new(
177        base_context: Arc<BaseCtx>,
178        staging: Arc<StagingCtx>,
179        txn_manager: Arc<TxnIdManager>,
180    ) -> Self {
181        // Get immutable catalog snapshot for transaction isolation
182        // This replaces the previous HashSet<String> and HashMap<String, TableId>
183        let catalog_snapshot = base_context.catalog_snapshot();
184
185        let snapshot = txn_manager.begin_transaction();
186        tracing::debug!(
187            "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
188            snapshot.txn_id,
189            snapshot.snapshot_id
190        );
191        TransactionContext::set_snapshot(&*base_context, snapshot);
192        TransactionContext::set_snapshot(&*staging, snapshot);
193
194        Self {
195            staging,
196            operations: Vec::new(),
197            staged_tables: HashSet::new(),
198            new_tables: HashSet::new(),
199            missing_tables: HashSet::new(),
200            locked_table_names: HashSet::new(),
201            catalog_snapshot,
202            base_context,
203            is_aborted: false,
204            accessed_tables: HashSet::new(),
205            snapshot,
206            txn_manager,
207            transactional_foreign_keys: HashMap::new(),
208        }
209    }
210
211    /// Ensure a table exists and is visible to this transaction.
212    /// NO COPYING - just check if table exists in base or was created in this transaction.
213    fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
214        tracing::trace!(
215            "[ENSURE] ensure_table_exists called for table='{}'",
216            table_name
217        );
218
219        // If we already checked this table, return early
220        if self.staged_tables.contains(table_name) {
221            tracing::trace!("[ENSURE] table already verified to exist");
222            return Ok(());
223        }
224
225        // Check if table exists in catalog snapshot OR was created in this transaction
226        if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
227        {
228            self.missing_tables.insert(table_name.to_string());
229            return Err(Error::CatalogError(format!(
230                "Catalog Error: Table '{table_name}' does not exist"
231            )));
232        }
233
234        if self.missing_tables.contains(table_name) {
235            return Err(Error::CatalogError(format!(
236                "Catalog Error: Table '{table_name}' does not exist"
237            )));
238        }
239
240        // For tables created in this transaction, also create in staging for isolation
241        if self.new_tables.contains(table_name) {
242            tracing::trace!("[ENSURE] Table was created in this transaction");
243            // Check if it exists in staging
244            match self.staging.table_column_specs(table_name) {
245                Ok(_) => {
246                    self.staged_tables.insert(table_name.to_string());
247                    return Ok(());
248                }
249                Err(_) => {
250                    return Err(Error::CatalogError(format!(
251                        "Catalog Error: Table '{table_name}' was created but not found in staging"
252                    )));
253                }
254            }
255        }
256
257        // Table exists in base - mark as verified
258        tracing::trace!(
259            "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
260        );
261        self.staged_tables.insert(table_name.to_string());
262        Ok(())
263    }
264
265    /// Execute a SELECT query within transaction isolation.
266    /// For tables created in this transaction: read from staging.
267    /// For existing tables: read from BASE with MVCC visibility filtering.
268    pub fn execute_select(
269        &mut self,
270        plan: SelectPlan,
271    ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
272        // Get table name (for single-table queries only)
273        let table_name = select_plan_table_name(&plan).ok_or_else(|| {
274            Error::InvalidArgumentError(
275                "Transaction execute_select requires single-table query".into(),
276            )
277        })?;
278
279        // Ensure table exists
280        self.ensure_table_exists(&table_name)?;
281
282        // If table was created in this transaction, read from staging
283        if self.new_tables.contains(&table_name) {
284            tracing::trace!(
285                "[SELECT] Reading from staging for new table '{}'",
286                table_name
287            );
288            return self.staging.execute_select(plan);
289        }
290
291        // Track access to existing table for conflict detection
292        self.accessed_tables.insert(table_name.clone());
293
294        // Otherwise read from BASE with MVCC visibility
295        // The base context already has the snapshot set in SessionTransaction::new()
296        tracing::trace!(
297            "[SELECT] Reading from BASE with MVCC for existing table '{}'",
298            table_name
299        );
300        self.base_context.execute_select(plan).and_then(|exec| {
301            // Convert pager type from BaseCtx to StagingCtx
302            // This is a limitation of the current type system
303            // In practice, we're just collecting and re-packaging
304            let schema = exec.schema();
305            let batches = exec.collect().unwrap_or_default();
306            let combined = if batches.is_empty() {
307                RecordBatch::new_empty(Arc::clone(&schema))
308            } else if batches.len() == 1 {
309                batches.into_iter().next().unwrap()
310            } else {
311                let refs: Vec<&RecordBatch> = batches.iter().collect();
312                arrow::compute::concat_batches(&schema, refs).map_err(|err| {
313                    Error::Internal(format!("failed to concatenate batches: {err}"))
314                })?
315            };
316            Ok(SelectExecution::from_batch(
317                table_name,
318                Arc::clone(&schema),
319                combined,
320            ))
321        })
322    }
323
324    /// Execute an operation in the transaction staging context
325    pub fn execute_operation(
326        &mut self,
327        operation: PlanOperation,
328    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
329        tracing::trace!(
330            "[TX] SessionTransaction::execute_operation called, operation={:?}",
331            match &operation {
332                PlanOperation::Insert(p) => format!("INSERT({})", p.table),
333                PlanOperation::Update(p) => format!("UPDATE({})", p.table),
334                PlanOperation::Delete(p) => format!("DELETE({})", p.table),
335                PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
336                _ => "OTHER".to_string(),
337            }
338        );
339        // Check if transaction is aborted
340        if self.is_aborted {
341            return Err(Error::TransactionContextError(
342                "TransactionContext Error: transaction is aborted".into(),
343            ));
344        }
345
346        // Execute operation and catch errors to mark transaction as aborted
347        let result = match operation {
348            PlanOperation::CreateTable(ref plan) => {
349                // Before creating in staging, validate that all foreign key referenced tables
350                // exist in the base context. This is necessary because the staging context
351                // can't see tables from the base context.
352                for fk in &plan.foreign_keys {
353                    let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
354                    // Check if the referenced table exists in base context or was created in this transaction
355                    if !self.new_tables.contains(&canonical_ref_table)
356                        && !self.catalog_snapshot.table_exists(&canonical_ref_table)
357                    {
358                        self.is_aborted = true;
359                        return Err(Error::CatalogError(format!(
360                            "Catalog Error: referenced table '{}' does not exist",
361                            fk.referenced_table
362                        )));
363                    }
364                }
365
366                // Create a modified plan without foreign keys for staging context.
367                // The staging context can't validate foreign keys against base tables,
368                // so we skip FK registration in staging and will re-apply at commit time.
369                let mut staging_plan = plan.clone();
370                staging_plan.foreign_keys.clear();
371
372                match self.staging.apply_create_table_plan(staging_plan) {
373                    Ok(result) => {
374                        // Track new table so it's visible to subsequent operations in this transaction
375                        self.new_tables.insert(plan.name.clone());
376                        self.missing_tables.remove(&plan.name);
377                        self.staged_tables.insert(plan.name.clone());
378                        // Lock this table name for the duration of the transaction
379                        self.locked_table_names
380                            .insert(plan.name.to_ascii_lowercase());
381
382                        // Track foreign key dependencies for DROP TABLE validation
383                        for fk in &plan.foreign_keys {
384                            let referenced_table = fk.referenced_table.to_ascii_lowercase();
385                            self.transactional_foreign_keys
386                                .entry(referenced_table)
387                                .or_default()
388                                .push(plan.name.to_ascii_lowercase());
389                        }
390
391                        // Track for commit replay WITH original foreign keys
392                        self.operations
393                            .push(PlanOperation::CreateTable(plan.clone()));
394                        result.convert_pager_type()?
395                    }
396                    Err(e) => {
397                        self.is_aborted = true;
398                        return Err(e);
399                    }
400                }
401            }
402            PlanOperation::DropTable(ref plan) => {
403                let canonical_name = plan.name.to_ascii_lowercase();
404
405                // Lock this table name for conflict detection (even if CREATE/DROP cancel out)
406                self.locked_table_names.insert(canonical_name.clone());
407
408                // Check if the table was created in this transaction
409                if self.new_tables.contains(&canonical_name) {
410                    // Table was created in this transaction, so drop it from staging
411                    // and remove from tracking
412                    TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
413                    self.new_tables.remove(&canonical_name);
414                    self.staged_tables.remove(&canonical_name);
415
416                    // Remove FK constraints where this table was the referencing table
417                    self.transactional_foreign_keys.iter_mut().for_each(
418                        |(_, referencing_tables)| {
419                            referencing_tables.retain(|t| t != &canonical_name);
420                        },
421                    );
422                    // Clean up empty entries
423                    self.transactional_foreign_keys
424                        .retain(|_, referencing_tables| !referencing_tables.is_empty());
425
426                    // Remove the CREATE TABLE operation from the operation list
427                    self.operations.retain(|op| {
428                        !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
429                    });
430                    // Don't add DROP to operations since we're canceling out the CREATE
431                    // But keep the table name locked for conflict detection
432                    TransactionResult::NoOp
433                } else {
434                    // Table exists in base context, track the drop for replay at commit
435                    // Verify the table exists
436                    if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
437                        self.is_aborted = true;
438                        return Err(Error::InvalidArgumentError(format!(
439                            "table '{}' does not exist",
440                            plan.name
441                        )));
442                    }
443
444                    if self.catalog_snapshot.table_exists(&canonical_name) {
445                        // Mark as dropped so it's not visible in this transaction
446                        self.missing_tables.insert(canonical_name.clone());
447                        self.staged_tables.remove(&canonical_name);
448                        // Track for commit replay
449                        self.operations.push(PlanOperation::DropTable(plan.clone()));
450                    }
451                    TransactionResult::NoOp
452                }
453            }
454            PlanOperation::Insert(ref plan) => {
455                tracing::trace!(
456                    "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
457                    plan.table
458                );
459                // Ensure table exists
460                if let Err(e) = self.ensure_table_exists(&plan.table) {
461                    self.is_aborted = true;
462                    return Err(e);
463                }
464
465                // If table was created in this transaction, insert into staging
466                // Otherwise insert directly into BASE (with transaction ID tagging)
467                let is_new_table = self.new_tables.contains(&plan.table);
468                // Track access to existing table for conflict detection
469                if !is_new_table {
470                    self.accessed_tables.insert(plan.table.clone());
471                }
472                let result = if is_new_table {
473                    tracing::trace!("[TX] INSERT into staging for new table");
474                    self.staging.insert(plan.clone())
475                } else {
476                    tracing::trace!(
477                        "[TX] INSERT directly into BASE with txn_id={}",
478                        self.snapshot.txn_id
479                    );
480                    // Insert into base - MVCC tagging happens automatically in insert_rows()
481                    self.base_context
482                        .insert(plan.clone())
483                        .and_then(|r| r.convert_pager_type())
484                };
485
486                match result {
487                    Ok(result) => {
488                        // Only track operations for NEW tables - they need replay on commit
489                        // For existing tables, changes are already in BASE with MVCC tags
490                        if is_new_table {
491                            tracing::trace!(
492                                "[TX] INSERT to new table - tracking for commit replay"
493                            );
494                            self.operations.push(PlanOperation::Insert(plan.clone()));
495                        } else {
496                            tracing::trace!(
497                                "[TX] INSERT to existing table - already in BASE, no replay needed"
498                            );
499                        }
500                        result
501                    }
502                    Err(e) => {
503                        tracing::trace!(
504                            "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
505                            e
506                        );
507                        tracing::trace!("DEBUG setting is_aborted=true");
508                        self.is_aborted = true;
509                        return Err(e);
510                    }
511                }
512            }
513            PlanOperation::Update(ref plan) => {
514                if let Err(e) = self.ensure_table_exists(&plan.table) {
515                    self.is_aborted = true;
516                    return Err(e);
517                }
518
519                // If table was created in this transaction, update in staging
520                // Otherwise update directly in BASE (with MVCC soft-delete + insert)
521                let is_new_table = self.new_tables.contains(&plan.table);
522                // Track access to existing table for conflict detection
523                if !is_new_table {
524                    self.accessed_tables.insert(plan.table.clone());
525                }
526                let result = if is_new_table {
527                    tracing::trace!("[TX] UPDATE in staging for new table");
528                    self.staging.update(plan.clone())
529                } else {
530                    tracing::trace!(
531                        "[TX] UPDATE directly in BASE with txn_id={}",
532                        self.snapshot.txn_id
533                    );
534                    self.base_context
535                        .update(plan.clone())
536                        .and_then(|r| r.convert_pager_type())
537                };
538
539                match result {
540                    Ok(result) => {
541                        // Only track operations for NEW tables - they need replay on commit
542                        if is_new_table {
543                            tracing::trace!(
544                                "[TX] UPDATE to new table - tracking for commit replay"
545                            );
546                            self.operations.push(PlanOperation::Update(plan.clone()));
547                        } else {
548                            tracing::trace!(
549                                "[TX] UPDATE to existing table - already in BASE, no replay needed"
550                            );
551                        }
552                        result
553                    }
554                    Err(e) => {
555                        self.is_aborted = true;
556                        return Err(e);
557                    }
558                }
559            }
560            PlanOperation::Delete(ref plan) => {
561                tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
562                if let Err(e) = self.ensure_table_exists(&plan.table) {
563                    tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
564                    self.is_aborted = true;
565                    return Err(e);
566                }
567
568                // If table was created in this transaction, delete from staging
569                // Otherwise delete directly in BASE (with MVCC soft-delete)
570                let is_new_table = self.new_tables.contains(&plan.table);
571                tracing::debug!("[DELETE] is_new_table={}", is_new_table);
572                // Track access to existing table for conflict detection
573                if !is_new_table {
574                    tracing::debug!(
575                        "[DELETE] Tracking access to existing table '{}'",
576                        plan.table
577                    );
578                    self.accessed_tables.insert(plan.table.clone());
579                }
580                let result = if is_new_table {
581                    tracing::debug!("[DELETE] Deleting from staging for new table");
582                    self.staging.delete(plan.clone())
583                } else {
584                    tracing::debug!(
585                        "[DELETE] Deleting from BASE with txn_id={}",
586                        self.snapshot.txn_id
587                    );
588                    self.base_context
589                        .delete(plan.clone())
590                        .and_then(|r| r.convert_pager_type())
591                };
592
593                tracing::debug!(
594                    "[DELETE] Result: {:?}",
595                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
596                );
597                match result {
598                    Ok(result) => {
599                        // Only track operations for NEW tables - they need replay on commit
600                        if is_new_table {
601                            tracing::trace!(
602                                "[TX] DELETE from new table - tracking for commit replay"
603                            );
604                            self.operations.push(PlanOperation::Delete(plan.clone()));
605                        } else {
606                            tracing::trace!(
607                                "[TX] DELETE from existing table - already in BASE, no replay needed"
608                            );
609                        }
610                        result
611                    }
612                    Err(e) => {
613                        self.is_aborted = true;
614                        return Err(e);
615                    }
616                }
617            }
618            PlanOperation::Truncate(ref plan) => {
619                tracing::debug!("[TRUNCATE] Starting truncate for table '{}'", plan.table);
620                if let Err(e) = self.ensure_table_exists(&plan.table) {
621                    tracing::debug!("[TRUNCATE] ensure_table_exists failed: {}", e);
622                    self.is_aborted = true;
623                    return Err(e);
624                }
625
626                // TRUNCATE is like DELETE without WHERE - same transaction handling
627                let is_new_table = self.new_tables.contains(&plan.table);
628                tracing::debug!("[TRUNCATE] is_new_table={}", is_new_table);
629                // Track access to existing table for conflict detection
630                if !is_new_table {
631                    tracing::debug!(
632                        "[TRUNCATE] Tracking access to existing table '{}'",
633                        plan.table
634                    );
635                    self.accessed_tables.insert(plan.table.clone());
636                }
637                let result = if is_new_table {
638                    tracing::debug!("[TRUNCATE] Truncating staging for new table");
639                    self.staging.truncate(plan.clone())
640                } else {
641                    tracing::debug!(
642                        "[TRUNCATE] Truncating BASE with txn_id={}",
643                        self.snapshot.txn_id
644                    );
645                    self.base_context
646                        .truncate(plan.clone())
647                        .and_then(|r| r.convert_pager_type())
648                };
649
650                tracing::debug!(
651                    "[TRUNCATE] Result: {:?}",
652                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
653                );
654                match result {
655                    Ok(result) => {
656                        // Only track operations for NEW tables - they need replay on commit
657                        if is_new_table {
658                            tracing::trace!(
659                                "[TX] TRUNCATE on new table - tracking for commit replay"
660                            );
661                            self.operations.push(PlanOperation::Truncate(plan.clone()));
662                        } else {
663                            tracing::trace!(
664                                "[TX] TRUNCATE on existing table - already in BASE, no replay needed"
665                            );
666                        }
667                        result
668                    }
669                    Err(e) => {
670                        self.is_aborted = true;
671                        return Err(e);
672                    }
673                }
674            }
675            PlanOperation::Select(plan) => {
676                // SELECT is read-only, not tracked for replay
677                // But still fails if transaction is aborted (already checked above)
678                let table_name = select_plan_table_name(plan.as_ref()).unwrap_or_default();
679                let plan = *plan;
680                match self.execute_select(plan) {
681                    Ok(staging_execution) => {
682                        // Collect staging execution into batches
683                        let schema = staging_execution.schema();
684                        let batches = staging_execution.collect().unwrap_or_default();
685
686                        // Combine into single batch
687                        let combined = if batches.is_empty() {
688                            RecordBatch::new_empty(Arc::clone(&schema))
689                        } else if batches.len() == 1 {
690                            batches.into_iter().next().unwrap()
691                        } else {
692                            let refs: Vec<&RecordBatch> = batches.iter().collect();
693                            arrow::compute::concat_batches(&schema, refs).map_err(|err| {
694                                Error::Internal(format!("failed to concatenate batches: {err}"))
695                            })?
696                        };
697
698                        // Return execution with combined batch
699                        let execution = SelectExecution::from_batch(
700                            table_name.clone(),
701                            Arc::clone(&schema),
702                            combined,
703                        );
704
705                        TransactionResult::Select {
706                            table_name,
707                            schema,
708                            execution,
709                        }
710                    }
711                    Err(e) => {
712                        // Don't abort on SELECT errors (like table not found)
713                        // Only Session layer aborts on constraint violations
714                        return Err(e);
715                    }
716                }
717            }
718        };
719
720        Ok(result)
721    }
722
723    /// Get the operations queued for commit
724    pub fn operations(&self) -> &[PlanOperation] {
725        &self.operations
726    }
727}
728
729/// A session handle for transaction management.
730///
731/// When dropped, automatically rolls back any active transaction.
732pub struct TransactionSession<BaseCtx, StagingCtx>
733where
734    BaseCtx: TransactionContext + 'static,
735    StagingCtx: TransactionContext + 'static,
736{
737    context: Arc<BaseCtx>,
738    session_id: TransactionSessionId,
739    transactions:
740        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
741    txn_manager: Arc<TxnIdManager>,
742}
743
744impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
745where
746    BaseCtx: TransactionContext + 'static,
747    StagingCtx: TransactionContext + 'static,
748{
749    pub fn new(
750        context: Arc<BaseCtx>,
751        session_id: TransactionSessionId,
752        transactions: Arc<
753            Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
754        >,
755        txn_manager: Arc<TxnIdManager>,
756    ) -> Self {
757        Self {
758            context,
759            session_id,
760            transactions,
761            txn_manager,
762        }
763    }
764
765    /// Clone this session (reuses the same session_id and shared transaction map).
766    /// This is necessary to maintain transaction state across Engine clones.
767    pub fn clone_session(&self) -> Self {
768        Self {
769            context: Arc::clone(&self.context),
770            session_id: self.session_id,
771            transactions: Arc::clone(&self.transactions),
772            txn_manager: Arc::clone(&self.txn_manager),
773        }
774    }
775
776    /// Get the session ID.
777    pub fn session_id(&self) -> TransactionSessionId {
778        self.session_id
779    }
780
781    /// Get the underlying context (for advanced use).
782    pub fn context(&self) -> &Arc<BaseCtx> {
783        &self.context
784    }
785
786    /// Check if this session has an active transaction.
787    pub fn has_active_transaction(&self) -> bool {
788        self.transactions
789            .lock()
790            .expect("transactions lock poisoned")
791            .contains_key(&self.session_id)
792    }
793
794    /// Check if the current transaction has been aborted due to an error.
795    pub fn is_aborted(&self) -> bool {
796        self.transactions
797            .lock()
798            .expect("transactions lock poisoned")
799            .get(&self.session_id)
800            .map(|tx| tx.is_aborted)
801            .unwrap_or(false)
802    }
803
804    /// Check if a table was created in the current active transaction.
805    /// Returns `true` if there's an active transaction and the table exists in its `new_tables` set.
806    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
807        self.transactions
808            .lock()
809            .expect("transactions lock poisoned")
810            .get(&self.session_id)
811            .map(|tx| tx.new_tables.contains(table_name))
812            .unwrap_or(false)
813    }
814
815    /// Get column specifications for a table created in the current transaction.
816    /// Returns `None` if there's no active transaction or the table wasn't created in it.
817    pub fn table_column_specs_from_transaction(
818        &self,
819        table_name: &str,
820    ) -> Option<Vec<PlanColumnSpec>> {
821        let guard = self
822            .transactions
823            .lock()
824            .expect("transactions lock poisoned");
825
826        let tx = guard.get(&self.session_id)?;
827        if !tx.new_tables.contains(table_name) {
828            return None;
829        }
830
831        // Get column specs from the staging context
832        tx.staging.table_column_specs(table_name).ok()
833    }
834
835    /// Get tables that reference the given table via foreign keys created in the current transaction.
836    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
837    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
838        let canonical = referenced_table.to_ascii_lowercase();
839        let guard = self
840            .transactions
841            .lock()
842            .expect("transactions lock poisoned");
843
844        let tx = match guard.get(&self.session_id) {
845            Some(tx) => tx,
846            None => return Vec::new(),
847        };
848
849        tx.transactional_foreign_keys
850            .get(&canonical)
851            .cloned()
852            .unwrap_or_else(Vec::new)
853    }
854
855    /// Check if a table is locked by another active session's transaction.
856    /// Returns true if ANY other session has this table in their locked_table_names.
857    pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
858        let canonical = table_name.to_ascii_lowercase();
859        let guard = self
860            .transactions
861            .lock()
862            .expect("transactions lock poisoned");
863
864        for (session_id, tx) in guard.iter() {
865            // Skip our own session
866            if *session_id == self.session_id {
867                continue;
868            }
869
870            // Check if this other session has the table locked
871            if tx.locked_table_names.contains(&canonical) {
872                return true;
873            }
874        }
875
876        false
877    }
878
879    /// Mark the current transaction as aborted due to an error.
880    /// This should be called when any error occurs during a transaction.
881    pub fn abort_transaction(&self) {
882        let mut guard = self
883            .transactions
884            .lock()
885            .expect("transactions lock poisoned");
886        if let Some(tx) = guard.get_mut(&self.session_id) {
887            tx.is_aborted = true;
888        }
889    }
890
891    /// Begin a transaction in this session.
892    pub fn begin_transaction(
893        &self,
894        staging: Arc<StagingCtx>,
895    ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
896        tracing::debug!(
897            "[BEGIN] begin_transaction called for session_id={}",
898            self.session_id
899        );
900        let mut guard = self
901            .transactions
902            .lock()
903            .expect("transactions lock poisoned");
904        tracing::debug!(
905            "[BEGIN] session_id={}, transactions map has {} entries",
906            self.session_id,
907            guard.len()
908        );
909        if guard.contains_key(&self.session_id) {
910            return Err(Error::InvalidArgumentError(
911                "a transaction is already in progress in this session".into(),
912            ));
913        }
914        guard.insert(
915            self.session_id,
916            SessionTransaction::new(
917                Arc::clone(&self.context),
918                staging,
919                Arc::clone(&self.txn_manager),
920            ),
921        );
922        tracing::debug!(
923            "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
924            self.session_id,
925            guard.len()
926        );
927        Ok(TransactionResult::Transaction {
928            kind: TransactionKind::Begin,
929        })
930    }
931
932    /// Commit the transaction in this session.
933    /// If the transaction is aborted, this acts as a ROLLBACK instead.
934    pub fn commit_transaction(
935        &self,
936    ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
937        tracing::trace!(
938            "[COMMIT] commit_transaction called for session {:?}",
939            self.session_id
940        );
941        let mut guard = self
942            .transactions
943            .lock()
944            .expect("transactions lock poisoned");
945        tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
946        let tx_opt = guard.remove(&self.session_id);
947        tracing::trace!(
948            "[COMMIT] commit_transaction remove returned: {}",
949            tx_opt.is_some()
950        );
951        let tx = tx_opt.ok_or_else(|| {
952            tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
953            Error::InvalidArgumentError(
954                "no transaction is currently in progress in this session".into(),
955            )
956        })?;
957        tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
958
959        // If transaction is aborted, commit becomes a rollback (no operations to replay)
960        if tx.is_aborted {
961            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
962            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
963            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
964            // Reset context snapshot to auto-commit view (aborted txn's writes should be invisible)
965            let auto_commit_snapshot = TransactionSnapshot {
966                txn_id: TXN_ID_AUTO_COMMIT,
967                snapshot_id: tx.txn_manager.last_committed(),
968            };
969            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
970            tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
971            return Ok((
972                TransactionResult::Transaction {
973                    kind: TransactionKind::Rollback,
974                },
975                Vec::new(),
976            ));
977        }
978
979        // Check for write-write conflicts: detect if any accessed tables have been dropped or replaced
980        // We captured (table_name, table_id) pairs at transaction start
981        tracing::debug!(
982            "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
983            tx.snapshot.txn_id,
984            tx.accessed_tables.len()
985        );
986        for accessed_table_name in &tx.accessed_tables {
987            tracing::debug!(
988                "[COMMIT CONFLICT CHECK] Checking table '{}'",
989                accessed_table_name
990            );
991            // Get the table ID from our catalog snapshot at transaction start
992            if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
993                // Check current table state
994                match self.context.table_id(accessed_table_name) {
995                    Ok(current_table_id) => {
996                        // If table ID changed, it was dropped and recreated
997                        if current_table_id != snapshot_table_id {
998                            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
999                            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1000                            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1001                            let auto_commit_snapshot = TransactionSnapshot {
1002                                txn_id: TXN_ID_AUTO_COMMIT,
1003                                snapshot_id: tx.txn_manager.last_committed(),
1004                            };
1005                            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1006                            return Err(Error::TransactionContextError(
1007                                "another transaction has dropped this table".into(),
1008                            ));
1009                        }
1010                    }
1011                    Err(_) => {
1012                        // Table no longer exists - it was dropped
1013                        tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1014                        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1015                        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1016                        let auto_commit_snapshot = TransactionSnapshot {
1017                            txn_id: TXN_ID_AUTO_COMMIT,
1018                            snapshot_id: tx.txn_manager.last_committed(),
1019                        };
1020                        TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1021                        return Err(Error::TransactionContextError(
1022                            "another transaction has dropped this table".into(),
1023                        ));
1024                    }
1025                }
1026            }
1027        }
1028
1029        if let Err(err) = tx
1030            .base_context
1031            .validate_commit_constraints(tx.snapshot.txn_id)
1032        {
1033            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1034            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1035            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1036            let auto_commit_snapshot = TransactionSnapshot {
1037                txn_id: TXN_ID_AUTO_COMMIT,
1038                snapshot_id: tx.txn_manager.last_committed(),
1039            };
1040            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1041            let wrapped = match err {
1042                Error::ConstraintError(msg) => Error::TransactionContextError(format!(
1043                    "TransactionContext Error: constraint violation: {msg}"
1044                )),
1045                other => other,
1046            };
1047            return Err(wrapped);
1048        }
1049
1050        let operations = tx.operations;
1051        tracing::trace!(
1052            "DEBUG commit_transaction: returning Commit with {} operations",
1053            operations.len()
1054        );
1055
1056        tx.txn_manager.mark_committed(tx.snapshot.txn_id);
1057        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1058        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1059        TransactionContext::set_snapshot(&*self.context, tx.snapshot);
1060
1061        Ok((
1062            TransactionResult::Transaction {
1063                kind: TransactionKind::Commit,
1064            },
1065            operations,
1066        ))
1067    }
1068
1069    /// Rollback the transaction in this session.
1070    pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1071        let mut guard = self
1072            .transactions
1073            .lock()
1074            .expect("transactions lock poisoned");
1075        if let Some(tx) = guard.remove(&self.session_id) {
1076            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1077            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1078            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1079            // Reset context snapshot to auto-commit view (rolled-back txn's writes should be invisible)
1080            let auto_commit_snapshot = TransactionSnapshot {
1081                txn_id: TXN_ID_AUTO_COMMIT,
1082                snapshot_id: tx.txn_manager.last_committed(),
1083            };
1084            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1085        } else {
1086            return Err(Error::InvalidArgumentError(
1087                "no transaction is currently in progress in this session".into(),
1088            ));
1089        }
1090        Ok(TransactionResult::Transaction {
1091            kind: TransactionKind::Rollback,
1092        })
1093    }
1094
1095    /// Execute an operation in this session's transaction, or directly if no transaction is active.
1096    pub fn execute_operation(
1097        &self,
1098        operation: PlanOperation,
1099    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1100        tracing::debug!(
1101            "[EXECUTE_OP] execute_operation called for session_id={}",
1102            self.session_id
1103        );
1104        if !self.has_active_transaction() {
1105            // No transaction - caller must handle direct execution
1106            return Err(Error::InvalidArgumentError(
1107                "execute_operation called without active transaction".into(),
1108            ));
1109        }
1110
1111        // Check for cross-transaction conflicts before executing
1112        if let PlanOperation::CreateTable(ref plan) = operation {
1113            let guard = self
1114                .transactions
1115                .lock()
1116                .expect("transactions lock poisoned");
1117
1118            let canonical_name = plan.name.to_ascii_lowercase();
1119
1120            // Check if another session has this table locked in their transaction
1121            for (other_session_id, other_tx) in guard.iter() {
1122                if *other_session_id != self.session_id
1123                    && other_tx.locked_table_names.contains(&canonical_name)
1124                {
1125                    return Err(Error::TransactionContextError(format!(
1126                        "table '{}' is locked by another active transaction",
1127                        plan.name
1128                    )));
1129                }
1130            }
1131            drop(guard); // Release lock before continuing
1132        }
1133
1134        // Also check DROP TABLE for conflicts
1135        if let PlanOperation::DropTable(ref plan) = operation {
1136            let guard = self
1137                .transactions
1138                .lock()
1139                .expect("transactions lock poisoned");
1140
1141            let canonical_name = plan.name.to_ascii_lowercase();
1142
1143            // Check if another session has this table locked in their transaction
1144            for (other_session_id, other_tx) in guard.iter() {
1145                if *other_session_id != self.session_id
1146                    && other_tx.locked_table_names.contains(&canonical_name)
1147                {
1148                    return Err(Error::TransactionContextError(format!(
1149                        "table '{}' is locked by another active transaction",
1150                        plan.name
1151                    )));
1152                }
1153            }
1154            drop(guard); // Release lock before continuing
1155        }
1156
1157        // In transaction - add to transaction and execute on staging context
1158        let mut guard = self
1159            .transactions
1160            .lock()
1161            .expect("transactions lock poisoned");
1162        tracing::debug!(
1163            "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1164            self.session_id,
1165            guard.len()
1166        );
1167        let tx = guard
1168            .get_mut(&self.session_id)
1169            .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1170        tracing::debug!(
1171            "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1172            self.session_id,
1173            tx.snapshot.txn_id,
1174            tx.accessed_tables.len()
1175        );
1176
1177        let result = tx.execute_operation(operation);
1178        if let Err(ref e) = result {
1179            tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1180            tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1181        }
1182        result
1183    }
1184}
1185
1186impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1187where
1188    BaseCtx: TransactionContext,
1189    StagingCtx: TransactionContext,
1190{
1191    fn drop(&mut self) {
1192        // Auto-rollback on drop if transaction is active
1193        // Handle poisoned mutex gracefully to avoid panic during cleanup
1194        match self.transactions.lock() {
1195            Ok(mut guard) => {
1196                if guard.remove(&self.session_id).is_some() {
1197                    eprintln!(
1198                        "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1199                    );
1200                }
1201            }
1202            Err(_) => {
1203                // Mutex is poisoned, likely due to a panic elsewhere
1204                // Don't panic again during cleanup
1205                tracing::trace!(
1206                    "Warning: TransactionSession dropped with poisoned transaction mutex"
1207                );
1208            }
1209        }
1210    }
1211}
1212
1213/// Transaction manager for coordinating sessions.
1214///
1215/// This is the top-level coordinator that creates sessions and manages
1216/// the shared transaction ID allocator.
1217pub struct TransactionManager<BaseCtx, StagingCtx>
1218where
1219    BaseCtx: TransactionContext + 'static,
1220    StagingCtx: TransactionContext + 'static,
1221{
1222    transactions:
1223        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1224    next_session_id: AtomicU64,
1225    txn_manager: Arc<TxnIdManager>,
1226}
1227
1228impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1229where
1230    BaseCtx: TransactionContext + 'static,
1231    StagingCtx: TransactionContext + 'static,
1232{
1233    pub fn new() -> Self {
1234        Self {
1235            transactions: Arc::new(Mutex::new(HashMap::new())),
1236            next_session_id: AtomicU64::new(1),
1237            txn_manager: Arc::new(TxnIdManager::new()),
1238        }
1239    }
1240
1241    /// Create a new TransactionManager with a custom initial transaction ID.
1242    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1243        Self {
1244            transactions: Arc::new(Mutex::new(HashMap::new())),
1245            next_session_id: AtomicU64::new(1),
1246            txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1247        }
1248    }
1249
1250    /// Create a new TransactionManager with custom initial state.
1251    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1252        Self {
1253            transactions: Arc::new(Mutex::new(HashMap::new())),
1254            next_session_id: AtomicU64::new(1),
1255            txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1256                next_txn_id,
1257                last_committed,
1258            )),
1259        }
1260    }
1261
1262    /// Create a new session for transaction management.
1263    pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1264        let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1265        tracing::debug!(
1266            "[TX_MANAGER] create_session: allocated session_id={}",
1267            session_id
1268        );
1269        TransactionSession::new(
1270            context,
1271            session_id,
1272            Arc::clone(&self.transactions),
1273            Arc::clone(&self.txn_manager),
1274        )
1275    }
1276
1277    /// Obtain the shared transaction ID manager.
1278    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1279        Arc::clone(&self.txn_manager)
1280    }
1281
1282    /// Check if there's an active transaction (checks if ANY session has a transaction).
1283    pub fn has_active_transaction(&self) -> bool {
1284        !self
1285            .transactions
1286            .lock()
1287            .expect("transactions lock poisoned")
1288            .is_empty()
1289    }
1290}
1291
1292impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1293where
1294    BaseCtx: TransactionContext + 'static,
1295    StagingCtx: TransactionContext + 'static,
1296{
1297    fn default() -> Self {
1298        Self::new()
1299    }
1300}