llkv_transaction/
context.rs

1//! Transaction context types for orchestrating transaction lifecycles.
2//!
3//! This module contains the core types for managing transactions:
4//! - [`TransactionContext`]: Trait defining operations a context must support
5//! - [`SessionTransaction`]: Per-transaction state machine
6//! - [`TransactionSession`]: Session-level transaction management
7//! - [`TransactionManager`]: Cross-session transaction coordination
8
9use std::collections::{HashMap, HashSet};
10use std::sync::atomic::{AtomicU64, Ordering};
11use std::sync::{Arc, Mutex};
12
13use arrow::array::RecordBatch;
14use llkv_column_map::types::TableId;
15use llkv_executor::{ExecutorRowBatch, SelectExecution};
16use llkv_expr::expr::Expr as LlkvExpr;
17use llkv_plan::plans::{
18    CreateIndexPlan, CreateTablePlan, DeletePlan, DropTablePlan, InsertPlan, PlanColumnSpec,
19    PlanOperation, SelectPlan, UpdatePlan,
20};
21use llkv_result::{Error, Result as LlkvResult};
22use llkv_storage::pager::Pager;
23use llkv_table::CatalogDdl;
24use simd_r_drive_entry_handle::EntryHandle;
25
26use crate::mvcc::{TXN_ID_AUTO_COMMIT, TransactionSnapshot, TxnId, TxnIdManager};
27use crate::types::{TransactionCatalogSnapshot, TransactionKind, TransactionResult};
28
29/// Session identifier type.
30///
31/// Session IDs track client sessions that may spawn multiple transactions.
32/// They are distinct from transaction IDs and managed separately.
33pub type TransactionSessionId = u64;
34
35/// Extracts table name from SelectPlan for single-table queries.
36fn select_plan_table_name(plan: &SelectPlan) -> Option<String> {
37    if plan.tables.len() == 1 {
38        Some(plan.tables[0].qualified_name())
39    } else {
40        None
41    }
42}
43
44/// A trait for transaction context operations.
45///
46/// This allows [`SessionTransaction`] to work with any context that implements these operations.
47/// The associated type `Pager` specifies the pager type this context uses.
48pub trait TransactionContext: CatalogDdl + Send + Sync {
49    /// The pager type used by this context
50    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
51
52    /// Snapshot representation returned by this context.
53    type Snapshot: TransactionCatalogSnapshot;
54
55    /// Update the snapshot used for MVCC visibility decisions.
56    fn set_snapshot(&self, snapshot: TransactionSnapshot);
57
58    /// Get the snapshot currently associated with this context.
59    fn snapshot(&self) -> TransactionSnapshot;
60
61    /// Get table column specifications
62    fn table_column_specs(&self, table_name: &str) -> LlkvResult<Vec<PlanColumnSpec>>;
63
64    /// Export table rows for snapshotting
65    fn export_table_rows(&self, table_name: &str) -> LlkvResult<ExecutorRowBatch>;
66
67    /// Get batches with row IDs for seeding updates
68    fn get_batches_with_row_ids(
69        &self,
70        table_name: &str,
71        filter: Option<LlkvExpr<'static, String>>,
72    ) -> LlkvResult<Vec<RecordBatch>>;
73
74    /// Execute a SELECT plan with this context's pager type
75    fn execute_select(&self, plan: SelectPlan) -> LlkvResult<SelectExecution<Self::Pager>>;
76
77    /// Create a table from plan
78    fn apply_create_table_plan(
79        &self,
80        plan: CreateTablePlan,
81    ) -> LlkvResult<TransactionResult<Self::Pager>>;
82
83    /// Drop a table
84    fn drop_table(&self, plan: DropTablePlan) -> LlkvResult<()>;
85
86    /// Insert rows
87    fn insert(&self, plan: InsertPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
88
89    /// Update rows
90    fn update(&self, plan: UpdatePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
91
92    /// Delete rows
93    fn delete(&self, plan: DeletePlan) -> LlkvResult<TransactionResult<Self::Pager>>;
94
95    /// Create an index
96    fn create_index(&self, plan: CreateIndexPlan) -> LlkvResult<TransactionResult<Self::Pager>>;
97
98    /// Append batches with row IDs
99    fn append_batches_with_row_ids(
100        &self,
101        table_name: &str,
102        batches: Vec<RecordBatch>,
103    ) -> LlkvResult<usize>;
104
105    /// Get table names for catalog snapshot
106    fn table_names(&self) -> Vec<String>;
107
108    /// Get table ID for a given table name (for conflict detection)
109    fn table_id(&self, table_name: &str) -> LlkvResult<TableId>;
110
111    /// Get an immutable catalog snapshot for transaction isolation
112    fn catalog_snapshot(&self) -> Self::Snapshot;
113
114    /// Validate any pending commit-time constraints for this transaction.
115    fn validate_commit_constraints(&self, _txn_id: TxnId) -> LlkvResult<()> {
116        Ok(())
117    }
118
119    /// Clear any transaction-scoped state retained by the context.
120    fn clear_transaction_state(&self, _txn_id: TxnId) {}
121}
122
123/// Transaction state for the runtime context.
124///
125/// This struct manages the lifecycle of a single transaction, tracking:
126/// - Operations to replay on commit
127/// - Tables created/dropped/accessed within the transaction
128/// - Foreign key constraints for validation
129/// - Transaction isolation via MVCC snapshots
130pub struct SessionTransaction<BaseCtx, StagingCtx>
131where
132    BaseCtx: TransactionContext + 'static,
133    StagingCtx: TransactionContext + 'static,
134{
135    /// Transaction snapshot (contains txn id + snapshot watermark)
136    snapshot: TransactionSnapshot,
137    /// Staging context with MemPager for isolation (only used for tables created in this txn).
138    staging: Arc<StagingCtx>,
139    /// Operations to replay on commit.
140    operations: Vec<PlanOperation>,
141    /// Tables that have been verified to exist (either in base or staging).
142    staged_tables: HashSet<String>,
143    /// Tables created within this transaction (live in staging until commit).
144    new_tables: HashSet<String>,
145    /// Tables known to be missing.
146    missing_tables: HashSet<String>,
147    /// All table names locked by this transaction for DDL operations (for conflict detection).
148    /// This includes tables that were created, dropped, or both within the transaction.
149    locked_table_names: HashSet<String>,
150    /// Immutable catalog snapshot at transaction start (for isolation).
151    /// Contains table name→ID mappings. Replaces separate HashSet and HashMap.
152    catalog_snapshot: BaseCtx::Snapshot,
153    /// Base context for reading existing tables with MVCC visibility.
154    base_context: Arc<BaseCtx>,
155    /// Whether this transaction has been aborted due to an error.
156    is_aborted: bool,
157    /// Transaction ID manager (shared across all transactions)
158    txn_manager: Arc<TxnIdManager>,
159    /// Tables accessed (names only) by this transaction
160    accessed_tables: HashSet<String>,
161    /// Foreign key constraints created in this transaction.
162    /// Maps referenced_table_name -> Vec<referencing_table_name>
163    /// Used to check FK dependencies when dropping tables within a transaction.
164    transactional_foreign_keys: HashMap<String, Vec<String>>,
165}
166
167impl<BaseCtx, StagingCtx> SessionTransaction<BaseCtx, StagingCtx>
168where
169    BaseCtx: TransactionContext + 'static,
170    StagingCtx: TransactionContext + 'static,
171{
172    pub fn new(
173        base_context: Arc<BaseCtx>,
174        staging: Arc<StagingCtx>,
175        txn_manager: Arc<TxnIdManager>,
176    ) -> Self {
177        // Get immutable catalog snapshot for transaction isolation
178        // This replaces the previous HashSet<String> and HashMap<String, TableId>
179        let catalog_snapshot = base_context.catalog_snapshot();
180
181        let snapshot = txn_manager.begin_transaction();
182        tracing::debug!(
183            "[SESSION_TX] new() created transaction with txn_id={}, snapshot_id={}",
184            snapshot.txn_id,
185            snapshot.snapshot_id
186        );
187        TransactionContext::set_snapshot(&*base_context, snapshot);
188        TransactionContext::set_snapshot(&*staging, snapshot);
189
190        Self {
191            staging,
192            operations: Vec::new(),
193            staged_tables: HashSet::new(),
194            new_tables: HashSet::new(),
195            missing_tables: HashSet::new(),
196            locked_table_names: HashSet::new(),
197            catalog_snapshot,
198            base_context,
199            is_aborted: false,
200            accessed_tables: HashSet::new(),
201            snapshot,
202            txn_manager,
203            transactional_foreign_keys: HashMap::new(),
204        }
205    }
206
207    /// Ensure a table exists and is visible to this transaction.
208    /// NO COPYING - just check if table exists in base or was created in this transaction.
209    fn ensure_table_exists(&mut self, table_name: &str) -> LlkvResult<()> {
210        tracing::trace!(
211            "[ENSURE] ensure_table_exists called for table='{}'",
212            table_name
213        );
214
215        // If we already checked this table, return early
216        if self.staged_tables.contains(table_name) {
217            tracing::trace!("[ENSURE] table already verified to exist");
218            return Ok(());
219        }
220
221        // Check if table exists in catalog snapshot OR was created in this transaction
222        if !self.catalog_snapshot.table_exists(table_name) && !self.new_tables.contains(table_name)
223        {
224            self.missing_tables.insert(table_name.to_string());
225            return Err(Error::CatalogError(format!(
226                "Catalog Error: Table '{table_name}' does not exist"
227            )));
228        }
229
230        if self.missing_tables.contains(table_name) {
231            return Err(Error::CatalogError(format!(
232                "Catalog Error: Table '{table_name}' does not exist"
233            )));
234        }
235
236        // For tables created in this transaction, also create in staging for isolation
237        if self.new_tables.contains(table_name) {
238            tracing::trace!("[ENSURE] Table was created in this transaction");
239            // Check if it exists in staging
240            match self.staging.table_column_specs(table_name) {
241                Ok(_) => {
242                    self.staged_tables.insert(table_name.to_string());
243                    return Ok(());
244                }
245                Err(_) => {
246                    return Err(Error::CatalogError(format!(
247                        "Catalog Error: Table '{table_name}' was created but not found in staging"
248                    )));
249                }
250            }
251        }
252
253        // Table exists in base - mark as verified
254        tracing::trace!(
255            "[ENSURE] Table exists in base, no copying needed (MVCC will handle visibility)"
256        );
257        self.staged_tables.insert(table_name.to_string());
258        Ok(())
259    }
260
261    /// Execute a SELECT query within transaction isolation.
262    /// For tables created in this transaction: read from staging.
263    /// For existing tables: read from BASE with MVCC visibility filtering.
264    pub fn execute_select(
265        &mut self,
266        plan: SelectPlan,
267    ) -> LlkvResult<SelectExecution<StagingCtx::Pager>> {
268        // Get table name (for single-table queries only)
269        let table_name = select_plan_table_name(&plan).ok_or_else(|| {
270            Error::InvalidArgumentError(
271                "Transaction execute_select requires single-table query".into(),
272            )
273        })?;
274
275        // Ensure table exists
276        self.ensure_table_exists(&table_name)?;
277
278        // If table was created in this transaction, read from staging
279        if self.new_tables.contains(&table_name) {
280            tracing::trace!(
281                "[SELECT] Reading from staging for new table '{}'",
282                table_name
283            );
284            return self.staging.execute_select(plan);
285        }
286
287        // Track access to existing table for conflict detection
288        self.accessed_tables.insert(table_name.clone());
289
290        // Otherwise read from BASE with MVCC visibility
291        // The base context already has the snapshot set in SessionTransaction::new()
292        tracing::trace!(
293            "[SELECT] Reading from BASE with MVCC for existing table '{}'",
294            table_name
295        );
296        self.base_context.execute_select(plan).and_then(|exec| {
297            // Convert pager type from BaseCtx to StagingCtx
298            // This is a limitation of the current type system
299            // In practice, we're just collecting and re-packaging
300            let schema = exec.schema();
301            let batches = exec.collect().unwrap_or_default();
302            let combined = if batches.is_empty() {
303                RecordBatch::new_empty(Arc::clone(&schema))
304            } else if batches.len() == 1 {
305                batches.into_iter().next().unwrap()
306            } else {
307                let refs: Vec<&RecordBatch> = batches.iter().collect();
308                arrow::compute::concat_batches(&schema, refs).map_err(|err| {
309                    Error::Internal(format!("failed to concatenate batches: {err}"))
310                })?
311            };
312            Ok(SelectExecution::from_batch(
313                table_name,
314                Arc::clone(&schema),
315                combined,
316            ))
317        })
318    }
319
320    /// Execute an operation in the transaction staging context
321    pub fn execute_operation(
322        &mut self,
323        operation: PlanOperation,
324    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
325        tracing::trace!(
326            "[TX] SessionTransaction::execute_operation called, operation={:?}",
327            match &operation {
328                PlanOperation::Insert(p) => format!("INSERT({})", p.table),
329                PlanOperation::Update(p) => format!("UPDATE({})", p.table),
330                PlanOperation::Delete(p) => format!("DELETE({})", p.table),
331                PlanOperation::CreateTable(p) => format!("CREATE_TABLE({})", p.name),
332                _ => "OTHER".to_string(),
333            }
334        );
335        // Check if transaction is aborted
336        if self.is_aborted {
337            return Err(Error::TransactionContextError(
338                "TransactionContext Error: transaction is aborted".into(),
339            ));
340        }
341
342        // Execute operation and catch errors to mark transaction as aborted
343        let result = match operation {
344            PlanOperation::CreateTable(ref plan) => {
345                // Before creating in staging, validate that all foreign key referenced tables
346                // exist in the base context. This is necessary because the staging context
347                // can't see tables from the base context.
348                for fk in &plan.foreign_keys {
349                    let canonical_ref_table = fk.referenced_table.to_ascii_lowercase();
350                    // Check if the referenced table exists in base context or was created in this transaction
351                    if !self.new_tables.contains(&canonical_ref_table)
352                        && !self.catalog_snapshot.table_exists(&canonical_ref_table)
353                    {
354                        self.is_aborted = true;
355                        return Err(Error::CatalogError(format!(
356                            "Catalog Error: referenced table '{}' does not exist",
357                            fk.referenced_table
358                        )));
359                    }
360                }
361
362                // Create a modified plan without foreign keys for staging context.
363                // The staging context can't validate foreign keys against base tables,
364                // so we skip FK registration in staging and will re-apply at commit time.
365                let mut staging_plan = plan.clone();
366                staging_plan.foreign_keys.clear();
367
368                match self.staging.apply_create_table_plan(staging_plan) {
369                    Ok(result) => {
370                        // Track new table so it's visible to subsequent operations in this transaction
371                        self.new_tables.insert(plan.name.clone());
372                        self.missing_tables.remove(&plan.name);
373                        self.staged_tables.insert(plan.name.clone());
374                        // Lock this table name for the duration of the transaction
375                        self.locked_table_names
376                            .insert(plan.name.to_ascii_lowercase());
377
378                        // Track foreign key dependencies for DROP TABLE validation
379                        for fk in &plan.foreign_keys {
380                            let referenced_table = fk.referenced_table.to_ascii_lowercase();
381                            self.transactional_foreign_keys
382                                .entry(referenced_table)
383                                .or_default()
384                                .push(plan.name.to_ascii_lowercase());
385                        }
386
387                        // Track for commit replay WITH original foreign keys
388                        self.operations
389                            .push(PlanOperation::CreateTable(plan.clone()));
390                        result.convert_pager_type()?
391                    }
392                    Err(e) => {
393                        self.is_aborted = true;
394                        return Err(e);
395                    }
396                }
397            }
398            PlanOperation::DropTable(ref plan) => {
399                let canonical_name = plan.name.to_ascii_lowercase();
400
401                // Lock this table name for conflict detection (even if CREATE/DROP cancel out)
402                self.locked_table_names.insert(canonical_name.clone());
403
404                // Check if the table was created in this transaction
405                if self.new_tables.contains(&canonical_name) {
406                    // Table was created in this transaction, so drop it from staging
407                    // and remove from tracking
408                    TransactionContext::drop_table(self.staging.as_ref(), plan.clone())?;
409                    self.new_tables.remove(&canonical_name);
410                    self.staged_tables.remove(&canonical_name);
411
412                    // Remove FK constraints where this table was the referencing table
413                    self.transactional_foreign_keys.iter_mut().for_each(
414                        |(_, referencing_tables)| {
415                            referencing_tables.retain(|t| t != &canonical_name);
416                        },
417                    );
418                    // Clean up empty entries
419                    self.transactional_foreign_keys
420                        .retain(|_, referencing_tables| !referencing_tables.is_empty());
421
422                    // Remove the CREATE TABLE operation from the operation list
423                    self.operations.retain(|op| {
424                        !matches!(op, PlanOperation::CreateTable(p) if p.name.to_ascii_lowercase() == canonical_name)
425                    });
426                    // Don't add DROP to operations since we're canceling out the CREATE
427                    // But keep the table name locked for conflict detection
428                    TransactionResult::NoOp
429                } else {
430                    // Table exists in base context, track the drop for replay at commit
431                    // Verify the table exists
432                    if !self.catalog_snapshot.table_exists(&canonical_name) && !plan.if_exists {
433                        self.is_aborted = true;
434                        return Err(Error::InvalidArgumentError(format!(
435                            "table '{}' does not exist",
436                            plan.name
437                        )));
438                    }
439
440                    if self.catalog_snapshot.table_exists(&canonical_name) {
441                        // Mark as dropped so it's not visible in this transaction
442                        self.missing_tables.insert(canonical_name.clone());
443                        self.staged_tables.remove(&canonical_name);
444                        // Track for commit replay
445                        self.operations.push(PlanOperation::DropTable(plan.clone()));
446                    }
447                    TransactionResult::NoOp
448                }
449            }
450            PlanOperation::Insert(ref plan) => {
451                tracing::trace!(
452                    "[TX] SessionTransaction::execute_operation INSERT for table='{}'",
453                    plan.table
454                );
455                // Ensure table exists
456                if let Err(e) = self.ensure_table_exists(&plan.table) {
457                    self.is_aborted = true;
458                    return Err(e);
459                }
460
461                // If table was created in this transaction, insert into staging
462                // Otherwise insert directly into BASE (with transaction ID tagging)
463                let is_new_table = self.new_tables.contains(&plan.table);
464                // Track access to existing table for conflict detection
465                if !is_new_table {
466                    self.accessed_tables.insert(plan.table.clone());
467                }
468                let result = if is_new_table {
469                    tracing::trace!("[TX] INSERT into staging for new table");
470                    self.staging.insert(plan.clone())
471                } else {
472                    tracing::trace!(
473                        "[TX] INSERT directly into BASE with txn_id={}",
474                        self.snapshot.txn_id
475                    );
476                    // Insert into base - MVCC tagging happens automatically in insert_rows()
477                    self.base_context
478                        .insert(plan.clone())
479                        .and_then(|r| r.convert_pager_type())
480                };
481
482                match result {
483                    Ok(result) => {
484                        // Only track operations for NEW tables - they need replay on commit
485                        // For existing tables, changes are already in BASE with MVCC tags
486                        if is_new_table {
487                            tracing::trace!(
488                                "[TX] INSERT to new table - tracking for commit replay"
489                            );
490                            self.operations.push(PlanOperation::Insert(plan.clone()));
491                        } else {
492                            tracing::trace!(
493                                "[TX] INSERT to existing table - already in BASE, no replay needed"
494                            );
495                        }
496                        result
497                    }
498                    Err(e) => {
499                        tracing::trace!(
500                            "DEBUG SessionTransaction::execute_operation INSERT failed: {:?}",
501                            e
502                        );
503                        tracing::trace!("DEBUG setting is_aborted=true");
504                        self.is_aborted = true;
505                        return Err(e);
506                    }
507                }
508            }
509            PlanOperation::Update(ref plan) => {
510                if let Err(e) = self.ensure_table_exists(&plan.table) {
511                    self.is_aborted = true;
512                    return Err(e);
513                }
514
515                // If table was created in this transaction, update in staging
516                // Otherwise update directly in BASE (with MVCC soft-delete + insert)
517                let is_new_table = self.new_tables.contains(&plan.table);
518                // Track access to existing table for conflict detection
519                if !is_new_table {
520                    self.accessed_tables.insert(plan.table.clone());
521                }
522                let result = if is_new_table {
523                    tracing::trace!("[TX] UPDATE in staging for new table");
524                    self.staging.update(plan.clone())
525                } else {
526                    tracing::trace!(
527                        "[TX] UPDATE directly in BASE with txn_id={}",
528                        self.snapshot.txn_id
529                    );
530                    self.base_context
531                        .update(plan.clone())
532                        .and_then(|r| r.convert_pager_type())
533                };
534
535                match result {
536                    Ok(result) => {
537                        // Only track operations for NEW tables - they need replay on commit
538                        if is_new_table {
539                            tracing::trace!(
540                                "[TX] UPDATE to new table - tracking for commit replay"
541                            );
542                            self.operations.push(PlanOperation::Update(plan.clone()));
543                        } else {
544                            tracing::trace!(
545                                "[TX] UPDATE to existing table - already in BASE, no replay needed"
546                            );
547                        }
548                        result
549                    }
550                    Err(e) => {
551                        self.is_aborted = true;
552                        return Err(e);
553                    }
554                }
555            }
556            PlanOperation::Delete(ref plan) => {
557                tracing::debug!("[DELETE] Starting delete for table '{}'", plan.table);
558                if let Err(e) = self.ensure_table_exists(&plan.table) {
559                    tracing::debug!("[DELETE] ensure_table_exists failed: {}", e);
560                    self.is_aborted = true;
561                    return Err(e);
562                }
563
564                // If table was created in this transaction, delete from staging
565                // Otherwise delete directly in BASE (with MVCC soft-delete)
566                let is_new_table = self.new_tables.contains(&plan.table);
567                tracing::debug!("[DELETE] is_new_table={}", is_new_table);
568                // Track access to existing table for conflict detection
569                if !is_new_table {
570                    tracing::debug!(
571                        "[DELETE] Tracking access to existing table '{}'",
572                        plan.table
573                    );
574                    self.accessed_tables.insert(plan.table.clone());
575                }
576                let result = if is_new_table {
577                    tracing::debug!("[DELETE] Deleting from staging for new table");
578                    self.staging.delete(plan.clone())
579                } else {
580                    tracing::debug!(
581                        "[DELETE] Deleting from BASE with txn_id={}",
582                        self.snapshot.txn_id
583                    );
584                    self.base_context
585                        .delete(plan.clone())
586                        .and_then(|r| r.convert_pager_type())
587                };
588
589                tracing::debug!(
590                    "[DELETE] Result: {:?}",
591                    result.as_ref().map(|_| "Ok").map_err(|e| format!("{}", e))
592                );
593                match result {
594                    Ok(result) => {
595                        // Only track operations for NEW tables - they need replay on commit
596                        if is_new_table {
597                            tracing::trace!(
598                                "[TX] DELETE from new table - tracking for commit replay"
599                            );
600                            self.operations.push(PlanOperation::Delete(plan.clone()));
601                        } else {
602                            tracing::trace!(
603                                "[TX] DELETE from existing table - already in BASE, no replay needed"
604                            );
605                        }
606                        result
607                    }
608                    Err(e) => {
609                        self.is_aborted = true;
610                        return Err(e);
611                    }
612                }
613            }
614            PlanOperation::Select(ref plan) => {
615                // SELECT is read-only, not tracked for replay
616                // But still fails if transaction is aborted (already checked above)
617                let table_name = select_plan_table_name(plan).unwrap_or_default();
618                match self.execute_select(plan.clone()) {
619                    Ok(staging_execution) => {
620                        // Collect staging execution into batches
621                        let schema = staging_execution.schema();
622                        let batches = staging_execution.collect().unwrap_or_default();
623
624                        // Combine into single batch
625                        let combined = if batches.is_empty() {
626                            RecordBatch::new_empty(Arc::clone(&schema))
627                        } else if batches.len() == 1 {
628                            batches.into_iter().next().unwrap()
629                        } else {
630                            let refs: Vec<&RecordBatch> = batches.iter().collect();
631                            arrow::compute::concat_batches(&schema, refs).map_err(|err| {
632                                Error::Internal(format!("failed to concatenate batches: {err}"))
633                            })?
634                        };
635
636                        // Return execution with combined batch
637                        let execution = SelectExecution::from_batch(
638                            table_name.clone(),
639                            Arc::clone(&schema),
640                            combined,
641                        );
642
643                        TransactionResult::Select {
644                            table_name,
645                            schema,
646                            execution,
647                        }
648                    }
649                    Err(e) => {
650                        // Don't abort on SELECT errors (like table not found)
651                        // Only Session layer aborts on constraint violations
652                        return Err(e);
653                    }
654                }
655            }
656        };
657
658        Ok(result)
659    }
660
661    /// Get the operations queued for commit
662    pub fn operations(&self) -> &[PlanOperation] {
663        &self.operations
664    }
665}
666
667/// A session handle for transaction management.
668///
669/// When dropped, automatically rolls back any active transaction.
670pub struct TransactionSession<BaseCtx, StagingCtx>
671where
672    BaseCtx: TransactionContext + 'static,
673    StagingCtx: TransactionContext + 'static,
674{
675    context: Arc<BaseCtx>,
676    session_id: TransactionSessionId,
677    transactions:
678        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
679    txn_manager: Arc<TxnIdManager>,
680}
681
682impl<BaseCtx, StagingCtx> TransactionSession<BaseCtx, StagingCtx>
683where
684    BaseCtx: TransactionContext + 'static,
685    StagingCtx: TransactionContext + 'static,
686{
687    pub fn new(
688        context: Arc<BaseCtx>,
689        session_id: TransactionSessionId,
690        transactions: Arc<
691            Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>,
692        >,
693        txn_manager: Arc<TxnIdManager>,
694    ) -> Self {
695        Self {
696            context,
697            session_id,
698            transactions,
699            txn_manager,
700        }
701    }
702
703    /// Clone this session (reuses the same session_id and shared transaction map).
704    /// This is necessary to maintain transaction state across Engine clones.
705    pub fn clone_session(&self) -> Self {
706        Self {
707            context: Arc::clone(&self.context),
708            session_id: self.session_id,
709            transactions: Arc::clone(&self.transactions),
710            txn_manager: Arc::clone(&self.txn_manager),
711        }
712    }
713
714    /// Get the session ID.
715    pub fn session_id(&self) -> TransactionSessionId {
716        self.session_id
717    }
718
719    /// Get the underlying context (for advanced use).
720    pub fn context(&self) -> &Arc<BaseCtx> {
721        &self.context
722    }
723
724    /// Check if this session has an active transaction.
725    pub fn has_active_transaction(&self) -> bool {
726        self.transactions
727            .lock()
728            .expect("transactions lock poisoned")
729            .contains_key(&self.session_id)
730    }
731
732    /// Check if the current transaction has been aborted due to an error.
733    pub fn is_aborted(&self) -> bool {
734        self.transactions
735            .lock()
736            .expect("transactions lock poisoned")
737            .get(&self.session_id)
738            .map(|tx| tx.is_aborted)
739            .unwrap_or(false)
740    }
741
742    /// Check if a table was created in the current active transaction.
743    /// Returns `true` if there's an active transaction and the table exists in its `new_tables` set.
744    pub fn is_table_created_in_transaction(&self, table_name: &str) -> bool {
745        self.transactions
746            .lock()
747            .expect("transactions lock poisoned")
748            .get(&self.session_id)
749            .map(|tx| tx.new_tables.contains(table_name))
750            .unwrap_or(false)
751    }
752
753    /// Get column specifications for a table created in the current transaction.
754    /// Returns `None` if there's no active transaction or the table wasn't created in it.
755    pub fn table_column_specs_from_transaction(
756        &self,
757        table_name: &str,
758    ) -> Option<Vec<PlanColumnSpec>> {
759        let guard = self
760            .transactions
761            .lock()
762            .expect("transactions lock poisoned");
763
764        let tx = guard.get(&self.session_id)?;
765        if !tx.new_tables.contains(table_name) {
766            return None;
767        }
768
769        // Get column specs from the staging context
770        tx.staging.table_column_specs(table_name).ok()
771    }
772
773    /// Get tables that reference the given table via foreign keys created in the current transaction.
774    /// Returns an empty vector if there's no active transaction or no transactional FKs reference this table.
775    pub fn tables_referencing_in_transaction(&self, referenced_table: &str) -> Vec<String> {
776        let canonical = referenced_table.to_ascii_lowercase();
777        let guard = self
778            .transactions
779            .lock()
780            .expect("transactions lock poisoned");
781
782        let tx = match guard.get(&self.session_id) {
783            Some(tx) => tx,
784            None => return Vec::new(),
785        };
786
787        tx.transactional_foreign_keys
788            .get(&canonical)
789            .cloned()
790            .unwrap_or_else(Vec::new)
791    }
792
793    /// Check if a table is locked by another active session's transaction.
794    /// Returns true if ANY other session has this table in their locked_table_names.
795    pub fn has_table_locked_by_other_session(&self, table_name: &str) -> bool {
796        let canonical = table_name.to_ascii_lowercase();
797        let guard = self
798            .transactions
799            .lock()
800            .expect("transactions lock poisoned");
801
802        for (session_id, tx) in guard.iter() {
803            // Skip our own session
804            if *session_id == self.session_id {
805                continue;
806            }
807
808            // Check if this other session has the table locked
809            if tx.locked_table_names.contains(&canonical) {
810                return true;
811            }
812        }
813
814        false
815    }
816
817    /// Mark the current transaction as aborted due to an error.
818    /// This should be called when any error occurs during a transaction.
819    pub fn abort_transaction(&self) {
820        let mut guard = self
821            .transactions
822            .lock()
823            .expect("transactions lock poisoned");
824        if let Some(tx) = guard.get_mut(&self.session_id) {
825            tx.is_aborted = true;
826        }
827    }
828
829    /// Begin a transaction in this session.
830    pub fn begin_transaction(
831        &self,
832        staging: Arc<StagingCtx>,
833    ) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
834        tracing::debug!(
835            "[BEGIN] begin_transaction called for session_id={}",
836            self.session_id
837        );
838        let mut guard = self
839            .transactions
840            .lock()
841            .expect("transactions lock poisoned");
842        tracing::debug!(
843            "[BEGIN] session_id={}, transactions map has {} entries",
844            self.session_id,
845            guard.len()
846        );
847        if guard.contains_key(&self.session_id) {
848            return Err(Error::InvalidArgumentError(
849                "a transaction is already in progress in this session".into(),
850            ));
851        }
852        guard.insert(
853            self.session_id,
854            SessionTransaction::new(
855                Arc::clone(&self.context),
856                staging,
857                Arc::clone(&self.txn_manager),
858            ),
859        );
860        tracing::debug!(
861            "[BEGIN] session_id={}, inserted transaction, map now has {} entries",
862            self.session_id,
863            guard.len()
864        );
865        Ok(TransactionResult::Transaction {
866            kind: TransactionKind::Begin,
867        })
868    }
869
870    /// Commit the transaction in this session.
871    /// If the transaction is aborted, this acts as a ROLLBACK instead.
872    pub fn commit_transaction(
873        &self,
874    ) -> LlkvResult<(TransactionResult<BaseCtx::Pager>, Vec<PlanOperation>)> {
875        tracing::trace!(
876            "[COMMIT] commit_transaction called for session {:?}",
877            self.session_id
878        );
879        let mut guard = self
880            .transactions
881            .lock()
882            .expect("transactions lock poisoned");
883        tracing::trace!("[COMMIT] commit_transaction got lock, checking for transaction...");
884        let tx_opt = guard.remove(&self.session_id);
885        tracing::trace!(
886            "[COMMIT] commit_transaction remove returned: {}",
887            tx_opt.is_some()
888        );
889        let tx = tx_opt.ok_or_else(|| {
890            tracing::trace!("[COMMIT] commit_transaction: no transaction found!");
891            Error::InvalidArgumentError(
892                "no transaction is currently in progress in this session".into(),
893            )
894        })?;
895        tracing::trace!("DEBUG commit_transaction: is_aborted={}", tx.is_aborted);
896
897        // If transaction is aborted, commit becomes a rollback (no operations to replay)
898        if tx.is_aborted {
899            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
900            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
901            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
902            // Reset context snapshot to auto-commit view (aborted txn's writes should be invisible)
903            let auto_commit_snapshot = TransactionSnapshot {
904                txn_id: TXN_ID_AUTO_COMMIT,
905                snapshot_id: tx.txn_manager.last_committed(),
906            };
907            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
908            tracing::trace!("DEBUG commit_transaction: returning Rollback with 0 operations");
909            return Ok((
910                TransactionResult::Transaction {
911                    kind: TransactionKind::Rollback,
912                },
913                Vec::new(),
914            ));
915        }
916
917        // Check for write-write conflicts: detect if any accessed tables have been dropped or replaced
918        // We captured (table_name, table_id) pairs at transaction start
919        tracing::debug!(
920            "[COMMIT CONFLICT CHECK] Transaction {} accessed {} tables",
921            tx.snapshot.txn_id,
922            tx.accessed_tables.len()
923        );
924        for accessed_table_name in &tx.accessed_tables {
925            tracing::debug!(
926                "[COMMIT CONFLICT CHECK] Checking table '{}'",
927                accessed_table_name
928            );
929            // Get the table ID from our catalog snapshot at transaction start
930            if let Some(snapshot_table_id) = tx.catalog_snapshot.table_id(accessed_table_name) {
931                // Check current table state
932                match self.context.table_id(accessed_table_name) {
933                    Ok(current_table_id) => {
934                        // If table ID changed, it was dropped and recreated
935                        if current_table_id != snapshot_table_id {
936                            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
937                            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
938                            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
939                            let auto_commit_snapshot = TransactionSnapshot {
940                                txn_id: TXN_ID_AUTO_COMMIT,
941                                snapshot_id: tx.txn_manager.last_committed(),
942                            };
943                            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
944                            return Err(Error::TransactionContextError(
945                                "another transaction has dropped this table".into(),
946                            ));
947                        }
948                    }
949                    Err(_) => {
950                        // Table no longer exists - it was dropped
951                        tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
952                        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
953                        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
954                        let auto_commit_snapshot = TransactionSnapshot {
955                            txn_id: TXN_ID_AUTO_COMMIT,
956                            snapshot_id: tx.txn_manager.last_committed(),
957                        };
958                        TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
959                        return Err(Error::TransactionContextError(
960                            "another transaction has dropped this table".into(),
961                        ));
962                    }
963                }
964            }
965        }
966
967        if let Err(err) = tx
968            .base_context
969            .validate_commit_constraints(tx.snapshot.txn_id)
970        {
971            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
972            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
973            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
974            let auto_commit_snapshot = TransactionSnapshot {
975                txn_id: TXN_ID_AUTO_COMMIT,
976                snapshot_id: tx.txn_manager.last_committed(),
977            };
978            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
979            let wrapped = match err {
980                Error::ConstraintError(msg) => Error::TransactionContextError(format!(
981                    "TransactionContext Error: constraint violation: {msg}"
982                )),
983                other => other,
984            };
985            return Err(wrapped);
986        }
987
988        let operations = tx.operations;
989        tracing::trace!(
990            "DEBUG commit_transaction: returning Commit with {} operations",
991            operations.len()
992        );
993
994        tx.txn_manager.mark_committed(tx.snapshot.txn_id);
995        tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
996        tx.staging.clear_transaction_state(tx.snapshot.txn_id);
997        TransactionContext::set_snapshot(&*self.context, tx.snapshot);
998
999        Ok((
1000            TransactionResult::Transaction {
1001                kind: TransactionKind::Commit,
1002            },
1003            operations,
1004        ))
1005    }
1006
1007    /// Rollback the transaction in this session.
1008    pub fn rollback_transaction(&self) -> LlkvResult<TransactionResult<BaseCtx::Pager>> {
1009        let mut guard = self
1010            .transactions
1011            .lock()
1012            .expect("transactions lock poisoned");
1013        if let Some(tx) = guard.remove(&self.session_id) {
1014            tx.txn_manager.mark_aborted(tx.snapshot.txn_id);
1015            tx.base_context.clear_transaction_state(tx.snapshot.txn_id);
1016            tx.staging.clear_transaction_state(tx.snapshot.txn_id);
1017            // Reset context snapshot to auto-commit view (rolled-back txn's writes should be invisible)
1018            let auto_commit_snapshot = TransactionSnapshot {
1019                txn_id: TXN_ID_AUTO_COMMIT,
1020                snapshot_id: tx.txn_manager.last_committed(),
1021            };
1022            TransactionContext::set_snapshot(&*self.context, auto_commit_snapshot);
1023        } else {
1024            return Err(Error::InvalidArgumentError(
1025                "no transaction is currently in progress in this session".into(),
1026            ));
1027        }
1028        Ok(TransactionResult::Transaction {
1029            kind: TransactionKind::Rollback,
1030        })
1031    }
1032
1033    /// Execute an operation in this session's transaction, or directly if no transaction is active.
1034    pub fn execute_operation(
1035        &self,
1036        operation: PlanOperation,
1037    ) -> LlkvResult<TransactionResult<StagingCtx::Pager>> {
1038        tracing::debug!(
1039            "[EXECUTE_OP] execute_operation called for session_id={}",
1040            self.session_id
1041        );
1042        if !self.has_active_transaction() {
1043            // No transaction - caller must handle direct execution
1044            return Err(Error::InvalidArgumentError(
1045                "execute_operation called without active transaction".into(),
1046            ));
1047        }
1048
1049        // Check for cross-transaction conflicts before executing
1050        if let PlanOperation::CreateTable(ref plan) = operation {
1051            let guard = self
1052                .transactions
1053                .lock()
1054                .expect("transactions lock poisoned");
1055
1056            let canonical_name = plan.name.to_ascii_lowercase();
1057
1058            // Check if another session has this table locked in their transaction
1059            for (other_session_id, other_tx) in guard.iter() {
1060                if *other_session_id != self.session_id
1061                    && other_tx.locked_table_names.contains(&canonical_name)
1062                {
1063                    return Err(Error::TransactionContextError(format!(
1064                        "table '{}' is locked by another active transaction",
1065                        plan.name
1066                    )));
1067                }
1068            }
1069            drop(guard); // Release lock before continuing
1070        }
1071
1072        // Also check DROP TABLE for conflicts
1073        if let PlanOperation::DropTable(ref plan) = operation {
1074            let guard = self
1075                .transactions
1076                .lock()
1077                .expect("transactions lock poisoned");
1078
1079            let canonical_name = plan.name.to_ascii_lowercase();
1080
1081            // Check if another session has this table locked in their transaction
1082            for (other_session_id, other_tx) in guard.iter() {
1083                if *other_session_id != self.session_id
1084                    && other_tx.locked_table_names.contains(&canonical_name)
1085                {
1086                    return Err(Error::TransactionContextError(format!(
1087                        "table '{}' is locked by another active transaction",
1088                        plan.name
1089                    )));
1090                }
1091            }
1092            drop(guard); // Release lock before continuing
1093        }
1094
1095        // In transaction - add to transaction and execute on staging context
1096        let mut guard = self
1097            .transactions
1098            .lock()
1099            .expect("transactions lock poisoned");
1100        tracing::debug!(
1101            "[EXECUTE_OP] session_id={}, transactions map has {} entries",
1102            self.session_id,
1103            guard.len()
1104        );
1105        let tx = guard
1106            .get_mut(&self.session_id)
1107            .ok_or_else(|| Error::Internal("transaction disappeared during execution".into()))?;
1108        tracing::debug!(
1109            "[EXECUTE_OP] session_id={}, found transaction with txn_id={}, accessed_tables={}",
1110            self.session_id,
1111            tx.snapshot.txn_id,
1112            tx.accessed_tables.len()
1113        );
1114
1115        let result = tx.execute_operation(operation);
1116        if let Err(ref e) = result {
1117            tracing::trace!("DEBUG TransactionSession::execute_operation error: {:?}", e);
1118            tracing::trace!("DEBUG Transaction is_aborted={}", tx.is_aborted);
1119        }
1120        result
1121    }
1122}
1123
1124impl<BaseCtx, StagingCtx> Drop for TransactionSession<BaseCtx, StagingCtx>
1125where
1126    BaseCtx: TransactionContext,
1127    StagingCtx: TransactionContext,
1128{
1129    fn drop(&mut self) {
1130        // Auto-rollback on drop if transaction is active
1131        // Handle poisoned mutex gracefully to avoid panic during cleanup
1132        match self.transactions.lock() {
1133            Ok(mut guard) => {
1134                if guard.remove(&self.session_id).is_some() {
1135                    eprintln!(
1136                        "Warning: TransactionSession dropped with active transaction - auto-rolling back"
1137                    );
1138                }
1139            }
1140            Err(_) => {
1141                // Mutex is poisoned, likely due to a panic elsewhere
1142                // Don't panic again during cleanup
1143                tracing::trace!(
1144                    "Warning: TransactionSession dropped with poisoned transaction mutex"
1145                );
1146            }
1147        }
1148    }
1149}
1150
1151/// Transaction manager for coordinating sessions.
1152///
1153/// This is the top-level coordinator that creates sessions and manages
1154/// the shared transaction ID allocator.
1155pub struct TransactionManager<BaseCtx, StagingCtx>
1156where
1157    BaseCtx: TransactionContext + 'static,
1158    StagingCtx: TransactionContext + 'static,
1159{
1160    transactions:
1161        Arc<Mutex<HashMap<TransactionSessionId, SessionTransaction<BaseCtx, StagingCtx>>>>,
1162    next_session_id: AtomicU64,
1163    txn_manager: Arc<TxnIdManager>,
1164}
1165
1166impl<BaseCtx, StagingCtx> TransactionManager<BaseCtx, StagingCtx>
1167where
1168    BaseCtx: TransactionContext + 'static,
1169    StagingCtx: TransactionContext + 'static,
1170{
1171    pub fn new() -> Self {
1172        Self {
1173            transactions: Arc::new(Mutex::new(HashMap::new())),
1174            next_session_id: AtomicU64::new(1),
1175            txn_manager: Arc::new(TxnIdManager::new()),
1176        }
1177    }
1178
1179    /// Create a new TransactionManager with a custom initial transaction ID.
1180    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
1181        Self {
1182            transactions: Arc::new(Mutex::new(HashMap::new())),
1183            next_session_id: AtomicU64::new(1),
1184            txn_manager: Arc::new(TxnIdManager::new_with_initial_txn_id(next_txn_id)),
1185        }
1186    }
1187
1188    /// Create a new TransactionManager with custom initial state.
1189    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
1190        Self {
1191            transactions: Arc::new(Mutex::new(HashMap::new())),
1192            next_session_id: AtomicU64::new(1),
1193            txn_manager: Arc::new(TxnIdManager::new_with_initial_state(
1194                next_txn_id,
1195                last_committed,
1196            )),
1197        }
1198    }
1199
1200    /// Create a new session for transaction management.
1201    pub fn create_session(&self, context: Arc<BaseCtx>) -> TransactionSession<BaseCtx, StagingCtx> {
1202        let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
1203        tracing::debug!(
1204            "[TX_MANAGER] create_session: allocated session_id={}",
1205            session_id
1206        );
1207        TransactionSession::new(
1208            context,
1209            session_id,
1210            Arc::clone(&self.transactions),
1211            Arc::clone(&self.txn_manager),
1212        )
1213    }
1214
1215    /// Obtain the shared transaction ID manager.
1216    pub fn txn_manager(&self) -> Arc<TxnIdManager> {
1217        Arc::clone(&self.txn_manager)
1218    }
1219
1220    /// Check if there's an active transaction (checks if ANY session has a transaction).
1221    pub fn has_active_transaction(&self) -> bool {
1222        !self
1223            .transactions
1224            .lock()
1225            .expect("transactions lock poisoned")
1226            .is_empty()
1227    }
1228}
1229
1230impl<BaseCtx, StagingCtx> Default for TransactionManager<BaseCtx, StagingCtx>
1231where
1232    BaseCtx: TransactionContext + 'static,
1233    StagingCtx: TransactionContext + 'static,
1234{
1235    fn default() -> Self {
1236        Self::new()
1237    }
1238}