llkv_transaction/
context.rs

1//! Transaction context types for orchestrating transaction lifecycles.
2//!
3//! This module contains the core types for managing transactions:
4//! - [`TransactionContext`]: Trait defining operations a context must support
5//! - [`SessionTransaction`]: Per-transaction state machine
6//! - [`TransactionSession`]: Session-level transaction management
7//! - [`TransactionManager`]: Cross-session transaction coordination
8
9use std::collections::{HashMap, HashSet};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12
13use arrow::array::RecordBatch;
14use llkv_column_map::types::TableId;
15use llkv_executor::{ExecutorRowBatch, SelectExecution};
16use llkv_expr::expr::Expr as LlkvExpr;
17use llkv_plan::plans::{
18    CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
19    PlanOperation, SelectPlan, TruncatePlan, UpdatePlan,
20};
21use llkv_result::{Error, Result as LlkvResult};
22use llkv_storage::pager::Pager;
23use llkv_table::CatalogDdl;
24use simd_r_drive_entry_handle::EntryHandle;
25
26use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
27use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
28
29/// Session identifier type.
30///
31/// Session IDs track client sessions that may spawn multiple transactions.
32/// They are distinct from transaction IDs and managed separately.
33pub type TransactionSessionId = u64;
34
35/// Extracts table name from SelectPlan for single-table queries.
36fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
37    if plan.tables.len() == 1 {
38        Some(plan.tables[0].qualified_name())
39    } else {
40        None
41    }
42}
43
44/// A trait for transaction context operations.
45///
46/// This allows [`SessionTransaction`] to work with any context that implements these operations.
47/// The associated type `Pager` specifies the pager type this context uses.
48pub trait TransactionContext: CatalogDdl + Send + Sync {
49    /// The pager type used by this context
50    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
51
52    /// Snapshot representation returned by this context.
53    type Snapshot: TransactionCatalogSnapshot;
54
55    /// Update the snapshot used for MVCC visibility decisions.
56    fn set_snapshot(&self, snapshot: TransactionSnapshot);
57
58    /// Get the snapshot currently associated with this context.
59    fn snapshot(&self) -> TransactionSnapshot;
60
61    /// Get table column specifications
62    fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
63
64    /// Export table rows for snapshotting
65    fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
66
67    /// Get batches with row IDs for seeding updates
68    fn get_batches_with_row_ids(
69        &self,
70        table_name: &str,
71        filter: Option<LlkvExpr<'static, String>>,
72    ) -> LlkvResult<Vec<RecordBatch>>;
73
74    /// Execute a SELECT plan with this context's pager type
75    fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
76
77    /// Create a table from plan
78    fn apply_create_table_plan(
79        &self,
80        plan: CreateTablePlan,
81    ) -> LlkvResult<TransactionResult<Self::Pager>>;
82
83    /// Drop a table
84    fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
85
86    /// Insert rows
87    fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
88
89    /// Update rows
90    fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
91
92    /// Delete rows
93    fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
94
95    /// Truncate table (delete all rows)
96    fn truncate(&self, plan: TruncatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
97
98    /// Create an index
99    fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
100
101    /// Append batches with row IDs
102    fn append_batches_with_row_ids(
103        &self,
104        table_name: &str,
105        batches: Vec<RecordBatch>,
106    ) -> LlkvResult<usize>;
107
108    /// Get table names for catalog snapshot
109    fn table_names(&self) -> Vec<String>;
110
111    /// Get table ID for a given table name (for conflict detection)
112    fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
113
114    /// Get an immutable catalog snapshot for transaction isolation
115    fn catalog_snapshot(&self) -> Self::Snapshot;
116
117    /// Validate any pending commit-time constraints for this transaction.
118    fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
119        Ok(())
120    }
121
122    /// Clear any transaction-scoped state retained by the context.
123    fn clear_transaction_state(&self, _txn_id: TxnId) {}
124}
125
126/// Transaction state for the runtime context.
127///
128/// This struct manages the lifecycle of a single transaction, tracking:
129/// - Operations to replay on commit
130/// - Tables created/dropped/accessed within the transaction
131/// - Foreign key constraints for validation
132/// - Transaction isolation via MVCC snapshots
133pub struct SessionTransaction<BaseCtx, StagingCtx>
134where
135    BaseCtx: TransactionContext + 'static,
136    StagingCtx: TransactionContext + 'static,
137{
138    /// Transaction snapshot (contains txn id + snapshot watermark)
139    snapshot: TransactionSnapshot,
140    /// Staging context with MemPager for isolation (only used for tables created in this txn).
141    staging: Arc<StagingCtx>,
142    /// Operations to replay on commit.
143    operations: Vec<PlanOperation>,
144    /// Tables that have been verified to exist (either in base or staging).
145    staged_tables: HashSet<String>,
146    /// Tables created within this transaction (live in staging until commit).
147    new_tables: HashSet<String>,
148    /// Tables known to be missing.
149    missing_tables: HashSet<String>,
150    /// All table names locked by this transaction for DDL operations (for conflict detection).
151    /// This includes tables that were created, dropped, or both within the transaction.
152    locked_table_names: HashSet<String>,
153    /// Immutable catalog snapshot at transaction start (for isolation).
154    /// Contains table name→ID mappings. Replaces separate HashSet and HashMap.
155    catalog_snapshot: BaseCtx::Snapshot,
156    /// Base context for reading existing tables with MVCC visibility.
157    base_context: Arc<BaseCtx>,
158    /// Whether this transaction has been aborted due to an error.
159    is_aborted: bool,
160    /// Transaction ID manager (shared across all transactions)
161    txn_manager: Arc<TxnIdManager>,
162    /// Tables accessed (names only) by this transaction
163    accessed_tables: HashSet<String>,
164    /// Foreign key constraints created in this transaction.
165    /// Maps referenced_table_name -> Vec<referencing_table_name>
166    /// Used to check FK dependencies when dropping tables within a transaction.
167    transactional_foreign_keys: HashMap<String, Vec<String>>,
168}
169
170impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
171where
172    BaseCtx: TransactionContext + 'static,
173    StagingCtx: TransactionContext + 'static,
174{
175    pub fn new(
176        base_context: Arc<BaseCtx>,
177        staging: Arc<StagingCtx>,
178        txn_manager: Arc<TxnIdManager>,
179    ) -> Self {
180        // Get immutable catalog snapshot for transaction isolation
181        // This replaces the previous HashSet<String> and HashMap<String, TableId>
182        let catalog_snapshot = base_context.catalog_snapshot();
183
184        let snapshot = txn_manager.begin_transaction();
185        tracing::debug!(
186            "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
187            snapshot.txn_id,
188            snapshot.snapshot_id
189        );
190        TransactionContext::set_snapshot(&*base_context, snapshot);
191        TransactionContext::set_snapshot(&*staging, snapshot);
192
193        Self {
194            staging,
195            operations: Vec::new(),
196            staged_tables: HashSet::new(),
197            new_tables: HashSet::new(),
198            missing_tables: HashSet::new(),
199            locked_table_names: HashSet::new(),
200            catalog_snapshot,
201            base_context,
202            is_aborted: false,
203            accessed_tables: HashSet::new(),
204            snapshot,
205            txn_manager,
206            transactional_foreign_keys: HashMap::new(),
207        }
208    }
209
210    /// Ensure a table exists and is visible to this transaction.
211    /// NO COPYING - just check if table exists in base or was created in this transaction.
212    fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
213        tracing::trace!(
214            "[ENSURE] ensure_table_exists called for table='{}'",
215            table_name
216        );
217
218        // If we already checked this table, return early
219        if self.staged_tables.contains(table_name) {
220            tracing::trace!("[ENSURE] table already verified to exist");
221            return Ok(());
222        }
223
224        // Check if table exists in catalog snapshot OR was created in this transaction
225        if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
226        {
227            self.missing_tables.insert(table_name.to_string());
228            return Err(Error::CatalogError(format!(
229                "Catalog Error: Table '{table_name}' does not exist"
230            )));
231        }
232
233        if self.missing_tables.contains(table_name) {
234            return Err(Error::CatalogError(format!(
235                "Catalog Error: Table '{table_name}' does not exist"
236            )));
237        }
238
239        // For tables created in this transaction, also create in staging for isolation
240        if self.new_tables.contains(table_name) {
241            tracing::trace!("[ENSURE] Table was created in this transaction");
242            // Check if it exists in staging
243            match self.staging.table_column_specs(table_name) {
244                Ok(_) => {
245                    self.staged_tables.insert(table_name.to_string());
246                    return Ok(());
247                }
248                Err(_) => {
249                    return Err(Error::CatalogError(format!(
250                        "Catalog Error: Table '{table_name}' was created but not found in staging"
251                    )));
252                }
253            }
254        }
255
256        // Table exists in base - mark as verified
257        tracing::trace!(
258            "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
259        );
260        self.staged_tables.insert(table_name.to_string());
261        Ok(())
262    }
263
264    /// Execute a SELECT query within transaction isolation.
265    /// For tables created in this transaction: read from staging.
266    /// For existing tables: read from BASE with MVCC visibility filtering.
267    pub fn execute_select(
268        &mut self,
269        plan: SelectPlan,
270    ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
271        // Get table name (for single-table queries only)
272        let table_name = select_plan_table_name(&plan).ok_or_else(|| {
273            Error::InvalidArgumentError(
274                "Transaction execute_select requires single-table query".into(),
275            )
276        })?;
277
278        // Ensure table exists
279        self.ensure_table_exists(&table_name)?;
280
281        // If table was created in this transaction, read from staging
282        if self.new_tables.contains(&table_name) {
283            tracing::trace!(
284                "[SELECT] Reading from staging for new table '{}'",
285                table_name
286            );
287            return self.staging.execute_select(plan);
288        }
289
290        // Track access to existing table for conflict detection
291        self.accessed_tables.insert(table_name.clone());
292
293        // Otherwise read from BASE with MVCC visibility
294        // The base context already has the snapshot set in SessionTransaction::new()
295        tracing::trace!(
296            "[SELECT] Reading from BASE with MVCC for existing table '{}'",
297            table_name
298        );
299        self.base_context.execute_select(plan).and_then(|exec| {
300            // Convert pager type from BaseCtx to StagingCtx
301            // This is a limitation of the current type system
302            // In practice, we're just collecting and re-packaging
303            let schema = exec.schema();
304            let batches = exec.collect().unwrap_or_default();
305            let combined = if batches.is_empty() {
306                RecordBatch::new_empty(Arc::clone(&schema))
307            } else if batches.len() == 1 {
308                batches.into_iter().next().unwrap()
309            } else {
310                let refs: Vec<&RecordBatch> = batches.iter().collect();
311                arrow::compute::concat_batches(&schema, refs).map_err(|err| {
312                    Error::Internal(format!("failed to concatenate batches: {err}"))
313                })?
314            };
315            Ok(SelectExecution::from_batch(
316                table_name,
317                Arc::clone(&schema),
318                combined,
319            ))
320        })
321    }
322
323    /// Execute an operation in the transaction staging context
324    pub fn execute_operation(
325        &mut self,
326        operation: PlanOperation,
327    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
328        tracing::trace!(
329            "[TX] SessionTransaction::execute_operation called, operation={:?}",
330            match &operation {
331                PlanOperation::Insert(p) => format!("INSERT({})", p.table),
332                PlanOperation::Update(p) => format!("UPDATE({})", p.table),
333                PlanOperation::Delete(p) => format!("DELETE({})", p.table),
334                PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
335                _ => "OTHER".to_string(),
336            }
337        );
338        // Check if transaction is aborted
339        if self.is_aborted {
340            return Err(Error::TransactionContextError(
341                "TransactionContext Error: transaction is aborted".into(),
342            ));
343        }
344
345        // Execute operation and catch errors to mark transaction as aborted
346        let result = match operation {
347            PlanOperation::CreateTable(ref plan) => {
348                // Before creating in staging, validate that all foreign key referenced tables
349                // exist in the base context. This is necessary because the staging context
350                // can't see tables from the base context.
351                for fk in &plan.foreign_keys {
352                    let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
353                    // Check if the referenced table exists in base context or was created in this transaction
354                    if !self.new_tables.contains(&canonical_ref_table)
355                        && !self.catalog_snapshot.table_exists(&canonical_ref_table)
356                    {
357                        self.is_aborted = true;
358                        return Err(Error::CatalogError(format!(
359                            "Catalog Error: referenced table '{}' does not exist",
360                            fk.referenced_table
361                        )));
362                    }
363                }
364
365                // Create a modified plan without foreign keys for staging context.
366                // The staging context can't validate foreign keys against base tables,
367                // so we skip FK registration in staging and will re-apply at commit time.
368                let mut staging_plan = plan.clone();
369                staging_plan.foreign_keys.clear();
370
371                match self.staging.apply_create_table_plan(staging_plan) {
372                    Ok(result) => {
373                        // Track new table so it's visible to subsequent operations in this transaction
374                        self.new_tables.insert(plan.name.clone());
375                        self.missing_tables.remove(&plan.name);
376                        self.staged_tables.insert(plan.name.clone());
377                        // Lock this table name for the duration of the transaction
378                        self.locked_table_names
379                            .insert(plan.name.to_ascii_lowercase());
380
381                        // Track foreign key dependencies for DROP TABLE validation
382                        for fk in &plan.foreign_keys {
383                            let referenced_table = fk.referenced_table.to_ascii_lowercase();
384                            self.transactional_foreign_keys
385                                .entry(referenced_table)
386                                .or_default()
387                                .push(plan.name.to_ascii_lowercase());
388                        }
389
390                        // Track for commit replay WITH original foreign keys
391                        self.operations
392                            .push(PlanOperation::CreateTable(plan.clone()));
393                        result.convert_pager_type()?
394                    }
395                    Err(e) => {
396                        self.is_aborted = true;
397                        return Err(e);
398                    }
399                }
400            }
401            PlanOperation::DropTable(ref plan) => {
402                let canonical_name = plan.name.to_ascii_lowercase();
403
404                // Lock this table name for conflict detection (even if CREATE/DROP cancel out)
405                self.locked_table_names.insert(canonical_name.clone());
406
407                // Check if the table was created in this transaction
408                if self.new_tables.contains(&canonical_name) {
409                    // Table was created in this transaction, so drop it from staging
410                    // and remove from tracking
411                    TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
412                    self.new_tables.remove(&canonical_name);
413                    self.staged_tables.remove(&canonical_name);
414
415                    // Remove FK constraints where this table was the referencing table
416                    self.transactional_foreign_keys.iter_mut().for_each(
417                        |(_, referencing_tables)| {
418                            referencing_tables.retain(|t| t != &canonical_name);
419                        },
420                    );
421                    // Clean up empty entries
422                    self.transactional_foreign_keys
423                        .retain(|_, referencing_tables| !referencing_tables.is_empty());
424
425                    // Remove the CREATE TABLE operation from the operation list
426                    self.operations.retain(|op| {
427                        !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
428                    });
429                    // Don't add DROP to operations since we're canceling out the CREATE
430                    // But keep the table name locked for conflict detection
431                    TransactionResult::NoOp
432                } else {
433                    // Table exists in base context, track the drop for replay at commit
434                    // Verify the table exists
435                    if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
436                        self.is_aborted = true;
437                        return Err(Error::InvalidArgumentError(format!(
438                            "table '{}' does not exist",
439                            plan.name
440                        )));
441                    }
442
443                    if self.catalog_snapshot.table_exists(&canonical_name) {
444                        // Mark as dropped so it's not visible in this transaction
445                        self.missing_tables.insert(canonical_name.clone());
446                        self.staged_tables.remove(&canonical_name);
447                        // Track for commit replay
448                        self.operations.push(PlanOperation::DropTable(plan.clone()));
449                    }
450                    TransactionResult::NoOp
451                }
452            }
453            PlanOperation::Insert(ref plan) => {
454                tracing::trace!(
455                    "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
456                    plan.table
457                );
458                // Ensure table exists
459                if let Err(e) = self.ensure_table_exists(&plan.table) {
460                    self.is_aborted = true;
461                    return Err(e);
462                }
463
464                // If table was created in this transaction, insert into staging
465                // Otherwise insert directly into BASE (with transaction ID tagging)
466                let is_new_table = self.new_tables.contains(&plan.table);
467                // Track access to existing table for conflict detection
468                if !is_new_table {
469                    self.accessed_tables.insert(plan.table.clone());
470                }
471                let result = if is_new_table {
472                    tracing::trace!("[TX] INSERT into staging for new table");
473                    self.staging.insert(plan.clone())
474                } else {
475                    tracing::trace!(
476                        "[TX] INSERT directly into BASE with txn_id={}",
477                        self.snapshot.txn_id
478                    );
479                    // Insert into base - MVCC tagging happens automatically in insert_rows()
480                    self.base_context
481                        .insert(plan.clone())
482                        .and_then(|r| r.convert_pager_type())
483                };
484
485                match result {
486                    Ok(result) => {
487                        // Only track operations for NEW tables - they need replay on commit
488                        // For existing tables, changes are already in BASE with MVCC tags
489                        if is_new_table {
490                            tracing::trace!(
491                                "[TX] INSERT to new table - tracking for commit replay"
492                            );
493                            self.operations.push(PlanOperation::Insert(plan.clone()));
494                        } else {
495                            tracing::trace!(
496                                "[TX] INSERT to existing table - already in BASE, no replay needed"
497                            );
498                        }
499                        result
500                    }
501                    Err(e) => {
502                        tracing::trace!(
503                            "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
504                            e
505                        );
506                        tracing::trace!("DEBUG setting is_aborted=true");
507                        self.is_aborted = true;
508                        return Err(e);
509                    }
510                }
511            }
512            PlanOperation::Update(ref plan) => {
513                if let Err(e) = self.ensure_table_exists(&plan.table) {
514                    self.is_aborted = true;
515                    return Err(e);
516                }
517
518                // If table was created in this transaction, update in staging
519                // Otherwise update directly in BASE (with MVCC soft-delete + insert)
520                let is_new_table = self.new_tables.contains(&plan.table);
521                // Track access to existing table for conflict detection
522                if !is_new_table {
523                    self.accessed_tables.insert(plan.table.clone());
524                }
525                let result = if is_new_table {
526                    tracing::trace!("[TX] UPDATE in staging for new table");
527                    self.staging.update(plan.clone())
528                } else {
529                    tracing::trace!(
530                        "[TX] UPDATE directly in BASE with txn_id={}",
531                        self.snapshot.txn_id
532                    );
533                    self.base_context
534                        .update(plan.clone())
535                        .and_then(|r| r.convert_pager_type())
536                };
537
538                match result {
539                    Ok(result) => {
540                        // Only track operations for NEW tables - they need replay on commit
541                        if is_new_table {
542                            tracing::trace!(
543                                "[TX] UPDATE to new table - tracking for commit replay"
544                            );
545                            self.operations.push(PlanOperation::Update(plan.clone()));
546                        } else {
547                            tracing::trace!(
548                                "[TX] UPDATE to existing table - already in BASE, no replay needed"
549                            );
550                        }
551                        result
552                    }
553                    Err(e) => {
554                        self.is_aborted = true;
555                        return Err(e);
556                    }
557                }
558            }
559            PlanOperation::Delete(ref plan) => {
560                tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
561                if let Err(e) = self.ensure_table_exists(&plan.table) {
562                    tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
563                    self.is_aborted = true;
564                    return Err(e);
565                }
566
567                // If table was created in this transaction, delete from staging
568                // Otherwise delete directly in BASE (with MVCC soft-delete)
569                let is_new_table = self.new_tables.contains(&plan.table);
570                tracing::debug!("[DELETE] is_new_table={}", is_new_table);
571                // Track access to existing table for conflict detection
572                if !is_new_table {
573                    tracing::debug!(
574                        "[DELETE] Tracking access to existing table '{}'",
575                        plan.table
576                    );
577                    self.accessed_tables.insert(plan.table.clone());
578                }
579                let result = if is_new_table {
580                    tracing::debug!("[DELETE] Deleting from staging for new table");
581                    self.staging.delete(plan.clone())
582                } else {
583                    tracing::debug!(
584                        "[DELETE] Deleting from BASE with txn_id={}",
585                        self.snapshot.txn_id
586                    );
587                    self.base_context
588                        .delete(plan.clone())
589                        .and_then(|r| r.convert_pager_type())
590                };
591
592                tracing::debug!(
593                    "[DELETE] Result: {:?}",
594                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
595                );
596                match result {
597                    Ok(result) => {
598                        // Only track operations for NEW tables - they need replay on commit
599                        if is_new_table {
600                            tracing::trace!(
601                                "[TX] DELETE from new table - tracking for commit replay"
602                            );
603                            self.operations.push(PlanOperation::Delete(plan.clone()));
604                        } else {
605                            tracing::trace!(
606                                "[TX] DELETE from existing table - already in BASE, no replay needed"
607                            );
608                        }
609                        result
610                    }
611                    Err(e) => {
612                        self.is_aborted = true;
613                        return Err(e);
614                    }
615                }
616            }
617            PlanOperation::Truncate(ref plan) => {
618                tracing::debug!("[TRUNCATE] Starting truncate for table '{}'", plan.table);
619                if let Err(e) = self.ensure_table_exists(&plan.table) {
620                    tracing::debug!("[TRUNCATE] ensure_table_exists failed: {}", e);
621                    self.is_aborted = true;
622                    return Err(e);
623                }
624
625                // TRUNCATE is like DELETE without WHERE - same transaction handling
626                let is_new_table = self.new_tables.contains(&plan.table);
627                tracing::debug!("[TRUNCATE] is_new_table={}", is_new_table);
628                // Track access to existing table for conflict detection
629                if !is_new_table {
630                    tracing::debug!(
631                        "[TRUNCATE] Tracking access to existing table '{}'",
632                        plan.table
633                    );
634                    self.accessed_tables.insert(plan.table.clone());
635                }
636                let result = if is_new_table {
637                    tracing::debug!("[TRUNCATE] Truncating staging for new table");
638                    self.staging.truncate(plan.clone())
639                } else {
640                    tracing::debug!(
641                        "[TRUNCATE] Truncating BASE with txn_id={}",
642                        self.snapshot.txn_id
643                    );
644                    self.base_context
645                        .truncate(plan.clone())
646                        .and_then(|r| r.convert_pager_type())
647                };
648
649                tracing::debug!(
650                    "[TRUNCATE] Result: {:?}",
651                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
652                );
653                match result {
654                    Ok(result) => {
655                        // Only track operations for NEW tables - they need replay on commit
656                        if is_new_table {
657                            tracing::trace!(
658                                "[TX] TRUNCATE on new table - tracking for commit replay"
659                            );
660                            self.operations.push(PlanOperation::Truncate(plan.clone()));
661                        } else {
662                            tracing::trace!(
663                                "[TX] TRUNCATE on existing table - already in BASE, no replay needed"
664                            );
665                        }
666                        result
667                    }
668                    Err(e) => {
669                        self.is_aborted = true;
670                        return Err(e);
671                    }
672                }
673            }
674            PlanOperation::Select(ref plan) => {
675                // SELECT is read-only, not tracked for replay
676                // But still fails if transaction is aborted (already checked above)
677                let table_name = select_plan_table_name(plan).unwrap_or_default();
678                match self.execute_select(plan.clone()) {
679                    Ok(staging_execution) => {
680                        // Collect staging execution into batches
681                        let schema = staging_execution.schema();
682                        let batches = staging_execution.collect().unwrap_or_default();
683
684                        // Combine into single batch
685                        let combined = if batches.is_empty() {
686                            RecordBatch::new_empty(Arc::clone(&schema))
687                        } else if batches.len() == 1 {
688                            batches.into_iter().next().unwrap()
689                        } else {
690                            let refs: Vec<&RecordBatch> = batches.iter().collect();
691                            arrow::compute::concat_batches(&schema, refs).map_err(|err| {
692                                Error::Internal(format!("failed to concatenate batches: {err}"))
693                            })?
694                        };
695
696                        // Return execution with combined batch
697                        let execution = SelectExecution::from_batch(
698                            table_name.clone(),
699                            Arc::clone(&schema),
700                            combined,
701                        );
702
703                        TransactionResult::Select {
704                            table_name,
705                            schema,
706                            execution,
707                        }
708                    }
709                    Err(e) => {
710                        // Don't abort on SELECT errors (like table not found)
711                        // Only Session layer aborts on constraint violations
712                        return Err(e);
713                    }
714                }
715            }
716        };
717
718        Ok(result)
719    }
720
721    /// Get the operations queued for commit
722    pub fn operations(&self) -> &[PlanOperation] {
723        &self.operations
724    }
725}
726
727/// A session handle for transaction management.
728///
729/// When dropped, automatically rolls back any active transaction.
730pub struct TransactionSession<BaseCtx, StagingCtx>
731where
732    BaseCtx: TransactionContext + 'static,
733    StagingCtx: TransactionContext + 'static,
734{
735    context: Arc<BaseCtx>,
736    session_id: TransactionSessionId,
737    transactions:
738        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
739    txn_manager: Arc<TxnIdManager>,
740}
741
742impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
743where
744    BaseCtx: TransactionContext + 'static,
745    StagingCtx: TransactionContext + 'static,
746{
747    pub fn new(
748        context: Arc<BaseCtx>,
749        session_id: TransactionSessionId,
750        transactions: Arc<
751            Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
752        >,
753        txn_manager: Arc<TxnIdManager>,
754    ) -> Self {
755        Self {
756            context,
757            session_id,
758            transactions,
759            txn_manager,
760        }
761    }
762
763    /// Clone this session (reuses the same session_id and shared transaction map).
764    /// This is necessary to maintain transaction state across Engine clones.
765    pub fn clone_session(&self) -> Self {
766        Self {
767            context: Arc::clone(&self.context),
768            session_id: self.session_id,
769            transactions: Arc::clone(&self.transactions),
770            txn_manager: Arc::clone(&self.txn_manager),
771        }
772    }
773
774    /// Get the session ID.
775    pub fn session_id(&self) -> TransactionSessionId {
776        self.session_id
777    }
778
779    /// Get the underlying context (for advanced use).
780    pub fn context(&self) -> &Arc<BaseCtx> {
781        &self.context
782    }
783
784    /// Check if this session has an active transaction.
785    pub fn has_active_transaction(&self) -> bool {
786        self.transactions
787            .lock()
788            .expect("transactions lock poisoned")
789            .contains_key(&self.session_id)
790    }
791
792    /// Check if the current transaction has been aborted due to an error.
793    pub fn is_aborted(&self) -> bool {
794        self.transactions
795            .lock()
796            .expect("transactions lock poisoned")
797            .get(&self.session_id)
798            .map(|tx| tx.is_aborted)
799            .unwrap_or(false)
800    }
801
802    /// Check if a table was created in the current active transaction.
803    /// Returns `true` if there's an active transaction and the table exists in its `new_tables` set.
804    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
805        self.transactions
806            .lock()
807            .expect("transactions lock poisoned")
808            .get(&self.session_id)
809            .map(|tx| tx.new_tables.contains(table_name))
810            .unwrap_or(false)
811    }
812
813    /// Get column specifications for a table created in the current transaction.
814    /// Returns `None` if there's no active transaction or the table wasn't created in it.
815    pub fn table_column_specs_from_transaction(
816        &self,
817        table_name: &str,
818    ) -> Option<Vec<PlanColumnSpec>> {
819        let guard = self
820            .transactions
821            .lock()
822            .expect("transactions lock poisoned");
823
824        let tx = guard.get(&self.session_id)?;
825        if !tx.new_tables.contains(table_name) {
826            return None;
827        }
828
829        // Get column specs from the staging context
830        tx.staging.table_column_specs(table_name).ok()
831    }
832
833    /// Get tables that reference the given table via foreign keys created in the current transaction.
834    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
835    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
836        let canonical = referenced_table.to_ascii_lowercase();
837        let guard = self
838            .transactions
839            .lock()
840            .expect("transactions lock poisoned");
841
842        let tx = match guard.get(&self.session_id) {
843            Some(tx) => tx,
844            None => return Vec::new(),
845        };
846
847        tx.transactional_foreign_keys
848            .get(&canonical)
849            .cloned()
850            .unwrap_or_else(Vec::new)
851    }
852
853    /// Check if a table is locked by another active session's transaction.
854    /// Returns true if ANY other session has this table in their locked_table_names.
855    pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
856        let canonical = table_name.to_ascii_lowercase();
857        let guard = self
858            .transactions
859            .lock()
860            .expect("transactions lock poisoned");
861
862        for (session_id, tx) in guard.iter() {
863            // Skip our own session
864            if *session_id == self.session_id {
865                continue;
866            }
867
868            // Check if this other session has the table locked
869            if tx.locked_table_names.contains(&canonical) {
870                return true;
871            }
872        }
873
874        false
875    }
876
877    /// Mark the current transaction as aborted due to an error.
878    /// This should be called when any error occurs during a transaction.
879    pub fn abort_transaction(&self) {
880        let mut guard = self
881            .transactions
882            .lock()
883            .expect("transactions lock poisoned");
884        if let Some(tx) = guard.get_mut(&self.session_id) {
885            tx.is_aborted = true;
886        }
887    }
888
889    /// Begin a transaction in this session.
890    pub fn begin_transaction(
891        &self,
892        staging: Arc<StagingCtx>,
893    ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
894        tracing::debug!(
895            "[BEGIN] begin_transaction called for session_id={}",
896            self.session_id
897        );
898        let mut guard = self
899            .transactions
900            .lock()
901            .expect("transactions lock poisoned");
902        tracing::debug!(
903            "[BEGIN] session_id={}, transactions map has {} entries",
904            self.session_id,
905            guard.len()
906        );
907        if guard.contains_key(&self.session_id) {
908            return Err(Error::InvalidArgumentError(
909                "a transaction is already in progress in this session".into(),
910            ));
911        }
912        guard.insert(
913            self.session_id,
914            SessionTransaction::new(
915                Arc::clone(&self.context),
916                staging,
917                Arc::clone(&self.txn_manager),
918            ),
919        );
920        tracing::debug!(
921            "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
922            self.session_id,
923            guard.len()
924        );
925        Ok(TransactionResult::Transaction {
926            kind: TransactionKind::Begin,
927        })
928    }
929
930    /// Commit the transaction in this session.
931    /// If the transaction is aborted, this acts as a ROLLBACK instead.
932    pub fn commit_transaction(
933        &self,
934    ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
935        tracing::trace!(
936            "[COMMIT] commit_transaction called for session {:?}",
937            self.session_id
938        );
939        let mut guard = self
940            .transactions
941            .lock()
942            .expect("transactions lock poisoned");
943        tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
944        let tx_opt = guard.remove(&self.session_id);
945        tracing::trace!(
946            "[COMMIT] commit_transaction remove returned: {}",
947            tx_opt.is_some()
948        );
949        let tx = tx_opt.ok_or_else(|| {
950            tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
951            Error::InvalidArgumentError(
952                "no transaction is currently in progress in this session".into(),
953            )
954        })?;
955        tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
956
957        // If transaction is aborted, commit becomes a rollback (no operations to replay)
958        if tx.is_aborted {
959            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
960            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
961            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
962            // Reset context snapshot to auto-commit view (aborted txn's writes should be invisible)
963            let auto_commit_snapshot = TransactionSnapshot {
964                txn_id: TXN_ID_AUTO_COMMIT,
965                snapshot_id: tx.txn_manager.last_committed(),
966            };
967            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
968            tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
969            return Ok((
970                TransactionResult::Transaction {
971                    kind: TransactionKind::Rollback,
972                },
973                Vec::new(),
974            ));
975        }
976
977        // Check for write-write conflicts: detect if any accessed tables have been dropped or replaced
978        // We captured (table_name, table_id) pairs at transaction start
979        tracing::debug!(
980            "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
981            tx.snapshot.txn_id,
982            tx.accessed_tables.len()
983        );
984        for accessed_table_name in &tx.accessed_tables {
985            tracing::debug!(
986                "[COMMIT CONFLICT CHECK] Checking table '{}'",
987                accessed_table_name
988            );
989            // Get the table ID from our catalog snapshot at transaction start
990            if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
991                // Check current table state
992                match self.context.table_id(accessed_table_name) {
993                    Ok(current_table_id) => {
994                        // If table ID changed, it was dropped and recreated
995                        if current_table_id != snapshot_table_id {
996                            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
997                            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
998                            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
999                            let auto_commit_snapshot = TransactionSnapshot {
1000                                txn_id: TXN_ID_AUTO_COMMIT,
1001                                snapshot_id: tx.txn_manager.last_committed(),
1002                            };
1003                            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1004                            return Err(Error::TransactionContextError(
1005                                "another transaction has dropped this table".into(),
1006                            ));
1007                        }
1008                    }
1009                    Err(_) => {
1010                        // Table no longer exists - it was dropped
1011                        tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1012                        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1013                        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1014                        let auto_commit_snapshot = TransactionSnapshot {
1015                            txn_id: TXN_ID_AUTO_COMMIT,
1016                            snapshot_id: tx.txn_manager.last_committed(),
1017                        };
1018                        TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1019                        return Err(Error::TransactionContextError(
1020                            "another transaction has dropped this table".into(),
1021                        ));
1022                    }
1023                }
1024            }
1025        }
1026
1027        if let Err(err) = tx
1028            .base_context
1029            .validate_commit_constraints(tx.snapshot.txn_id)
1030        {
1031            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1032            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1033            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1034            let auto_commit_snapshot = TransactionSnapshot {
1035                txn_id: TXN_ID_AUTO_COMMIT,
1036                snapshot_id: tx.txn_manager.last_committed(),
1037            };
1038            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1039            let wrapped = match err {
1040                Error::ConstraintError(msg) => Error::TransactionContextError(format!(
1041                    "TransactionContext Error: constraint violation: {msg}"
1042                )),
1043                other => other,
1044            };
1045            return Err(wrapped);
1046        }
1047
1048        let operations = tx.operations;
1049        tracing::trace!(
1050            "DEBUG commit_transaction: returning Commit with {} operations",
1051            operations.len()
1052        );
1053
1054        tx.txn_manager.mark_committed(tx.snapshot.txn_id);
1055        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1056        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1057        TransactionContext::set_snapshot(&*self.context, tx.snapshot);
1058
1059        Ok((
1060            TransactionResult::Transaction {
1061                kind: TransactionKind::Commit,
1062            },
1063            operations,
1064        ))
1065    }
1066
1067    /// Rollback the transaction in this session.
1068    pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1069        let mut guard = self
1070            .transactions
1071            .lock()
1072            .expect("transactions lock poisoned");
1073        if let Some(tx) = guard.remove(&self.session_id) {
1074            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1075            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1076            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1077            // Reset context snapshot to auto-commit view (rolled-back txn's writes should be invisible)
1078            let auto_commit_snapshot = TransactionSnapshot {
1079                txn_id: TXN_ID_AUTO_COMMIT,
1080                snapshot_id: tx.txn_manager.last_committed(),
1081            };
1082            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1083        } else {
1084            return Err(Error::InvalidArgumentError(
1085                "no transaction is currently in progress in this session".into(),
1086            ));
1087        }
1088        Ok(TransactionResult::Transaction {
1089            kind: TransactionKind::Rollback,
1090        })
1091    }
1092
1093    /// Execute an operation in this session's transaction, or directly if no transaction is active.
1094    pub fn execute_operation(
1095        &self,
1096        operation: PlanOperation,
1097    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1098        tracing::debug!(
1099            "[EXECUTE_OP] execute_operation called for session_id={}",
1100            self.session_id
1101        );
1102        if !self.has_active_transaction() {
1103            // No transaction - caller must handle direct execution
1104            return Err(Error::InvalidArgumentError(
1105                "execute_operation called without active transaction".into(),
1106            ));
1107        }
1108
1109        // Check for cross-transaction conflicts before executing
1110        if let PlanOperation::CreateTable(ref plan) = operation {
1111            let guard = self
1112                .transactions
1113                .lock()
1114                .expect("transactions lock poisoned");
1115
1116            let canonical_name = plan.name.to_ascii_lowercase();
1117
1118            // Check if another session has this table locked in their transaction
1119            for (other_session_id, other_tx) in guard.iter() {
1120                if *other_session_id != self.session_id
1121                    && other_tx.locked_table_names.contains(&canonical_name)
1122                {
1123                    return Err(Error::TransactionContextError(format!(
1124                        "table '{}' is locked by another active transaction",
1125                        plan.name
1126                    )));
1127                }
1128            }
1129            drop(guard); // Release lock before continuing
1130        }
1131
1132        // Also check DROP TABLE for conflicts
1133        if let PlanOperation::DropTable(ref plan) = operation {
1134            let guard = self
1135                .transactions
1136                .lock()
1137                .expect("transactions lock poisoned");
1138
1139            let canonical_name = plan.name.to_ascii_lowercase();
1140
1141            // Check if another session has this table locked in their transaction
1142            for (other_session_id, other_tx) in guard.iter() {
1143                if *other_session_id != self.session_id
1144                    && other_tx.locked_table_names.contains(&canonical_name)
1145                {
1146                    return Err(Error::TransactionContextError(format!(
1147                        "table '{}' is locked by another active transaction",
1148                        plan.name
1149                    )));
1150                }
1151            }
1152            drop(guard); // Release lock before continuing
1153        }
1154
1155        // In transaction - add to transaction and execute on staging context
1156        let mut guard = self
1157            .transactions
1158            .lock()
1159            .expect("transactions lock poisoned");
1160        tracing::debug!(
1161            "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1162            self.session_id,
1163            guard.len()
1164        );
1165        let tx = guard
1166            .get_mut(&self.session_id)
1167            .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1168        tracing::debug!(
1169            "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1170            self.session_id,
1171            tx.snapshot.txn_id,
1172            tx.accessed_tables.len()
1173        );
1174
1175        let result = tx.execute_operation(operation);
1176        if let Err(ref e) = result {
1177            tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1178            tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1179        }
1180        result
1181    }
1182}
1183
1184impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1185where
1186    BaseCtx: TransactionContext,
1187    StagingCtx: TransactionContext,
1188{
1189    fn drop(&mut self) {
1190        // Auto-rollback on drop if transaction is active
1191        // Handle poisoned mutex gracefully to avoid panic during cleanup
1192        match self.transactions.lock() {
1193            Ok(mut guard) => {
1194                if guard.remove(&self.session_id).is_some() {
1195                    eprintln!(
1196                        "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1197                    );
1198                }
1199            }
1200            Err(_) => {
1201                // Mutex is poisoned, likely due to a panic elsewhere
1202                // Don't panic again during cleanup
1203                tracing::trace!(
1204                    "Warning: TransactionSession dropped with poisoned transaction mutex"
1205                );
1206            }
1207        }
1208    }
1209}
1210
1211/// Transaction manager for coordinating sessions.
1212///
1213/// This is the top-level coordinator that creates sessions and manages
1214/// the shared transaction ID allocator.
1215pub struct TransactionManager<BaseCtx, StagingCtx>
1216where
1217    BaseCtx: TransactionContext + 'static,
1218    StagingCtx: TransactionContext + 'static,
1219{
1220    transactions:
1221        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1222    next_session_id: AtomicU64,
1223    txn_manager: Arc<TxnIdManager>,
1224}
1225
1226impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1227where
1228    BaseCtx: TransactionContext + 'static,
1229    StagingCtx: TransactionContext + 'static,
1230{
1231    pub fn new() -> Self {
1232        Self {
1233            transactions: Arc::new(Mutex::new(HashMap::new())),
1234            next_session_id: AtomicU64::new(1),
1235            txn_manager: Arc::new(TxnIdManager::new()),
1236        }
1237    }
1238
1239    /// Create a new TransactionManager with a custom initial transaction ID.
1240    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1241        Self {
1242            transactions: Arc::new(Mutex::new(HashMap::new())),
1243            next_session_id: AtomicU64::new(1),
1244            txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1245        }
1246    }
1247
1248    /// Create a new TransactionManager with custom initial state.
1249    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1250        Self {
1251            transactions: Arc::new(Mutex::new(HashMap::new())),
1252            next_session_id: AtomicU64::new(1),
1253            txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1254                next_txn_id,
1255                last_committed,
1256            )),
1257        }
1258    }
1259
1260    /// Create a new session for transaction management.
1261    pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1262        let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1263        tracing::debug!(
1264            "[TX_MANAGER] create_session: allocated session_id={}",
1265            session_id
1266        );
1267        TransactionSession::new(
1268            context,
1269            session_id,
1270            Arc::clone(&self.transactions),
1271            Arc::clone(&self.txn_manager),
1272        )
1273    }
1274
1275    /// Obtain the shared transaction ID manager.
1276    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1277        Arc::clone(&self.txn_manager)
1278    }
1279
1280    /// Check if there's an active transaction (checks if ANY session has a transaction).
1281    pub fn has_active_transaction(&self) -> bool {
1282        !self
1283            .transactions
1284            .lock()
1285            .expect("transactions lock poisoned")
1286            .is_empty()
1287    }
1288}
1289
1290impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1291where
1292    BaseCtx: TransactionContext + 'static,
1293    StagingCtx: TransactionContext + 'static,
1294{
1295    fn default() -> Self {
1296        Self::new()
1297    }
1298}