llkv_transaction/
mvcc.rs

1/// Multi-Version Concurrency Control (MVCC) utilities.
2///
3/// This module centralizes the transaction ID allocator, row-version metadata,
4/// Arrow helpers for MVCC column construction, and visibility checks used across
5/// the engine. The overarching goal is to allow transactions to operate directly
6/// on the base storage without copying tables into a staging area.
7use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use llkv_column_map::store::{
11    CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
12};
13use llkv_column_map::types::{FieldId, RowId};
14use llkv_result::Error;
15use rustc_hash::FxHashMap;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19
20/// Transaction ID type.
21pub type TxnId = u64;
22
23/// Transaction ID representing "no transaction" or "not deleted".
24pub const TXN_ID_NONE: TxnId = TxnId::MAX;
25
26/// Transaction ID for auto-commit (single-statement) transactions.
27pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
28
29/// Minimum valid transaction ID for multi-statement transactions.
30pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
31
32/// Check if a transaction ID is reserved (cannot be allocated).
33#[inline]
34pub fn is_reserved_txn_id(id: TxnId) -> bool {
35    id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
36}
37
38/// Return the error message for attempting to use a reserved transaction ID.
39#[inline]
40pub fn reserved_txn_id_message(id: TxnId) -> String {
41    match id {
42        TXN_ID_NONE => format!(
43            "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
44            id
45        ),
46        0 => "Transaction ID 0 is invalid".to_string(),
47        TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
48        _ => format!("Transaction ID {} is reserved", id),
49    }
50}
51
52/// Internal state shared across transaction ID managers.
53#[derive(Debug)]
54struct TxnIdManagerInner {
55    /// Next transaction ID to allocate.
56    next_txn_id: AtomicU64,
57    /// Largest committed transaction ID (acts as snapshot watermark).
58    last_committed: AtomicU64,
59    /// Tracking map for transaction statuses.
60    statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
61}
62
63impl TxnIdManagerInner {
64    fn new() -> Self {
65        Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
66    }
67
68    fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
69        Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
70    }
71
72    fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
73        let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
74        statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
75
76        Self {
77            next_txn_id: AtomicU64::new(next_txn_id),
78            last_committed: AtomicU64::new(last_committed),
79            statuses: Mutex::new(statuses),
80        }
81    }
82}
83
84/// Transaction ID manager that hands out IDs and tracks commit status.
85#[derive(Clone, Debug)]
86pub struct TxnIdManager {
87    inner: Arc<TxnIdManagerInner>,
88}
89
90impl TxnIdManager {
91    /// Create a new manager.
92    pub fn new() -> Self {
93        Self {
94            inner: Arc::new(TxnIdManagerInner::new()),
95        }
96    }
97
98    /// Create a new manager with a custom initial transaction ID.
99    /// Used when loading persisted state from the catalog.
100    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
101        Self {
102            inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
103        }
104    }
105
106    /// Create a new manager with custom initial state.
107    /// Used when loading persisted state from the catalog.
108    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
109        Self {
110            inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
111                next_txn_id,
112                last_committed,
113            )),
114        }
115    }
116
117    /// Get the current next_txn_id value (for persistence).
118    pub fn current_next_txn_id(&self) -> TxnId {
119        self.inner.next_txn_id.load(Ordering::SeqCst)
120    }
121
122    /// Begin a new transaction and return its snapshot.
123    ///
124    /// The snapshot captures both the allocated transaction ID and the latest
125    /// committed ID at the moment the transaction starts. These two values are
126    /// required to evaluate row visibility rules.
127    pub fn begin_transaction(&self) -> TransactionSnapshot {
128        let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
129        let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
130
131        {
132            let mut guard = self
133                .inner
134                .statuses
135                .lock()
136                .expect("txn status lock poisoned");
137            guard.insert(txn_id, TxnStatus::Active);
138        }
139
140        TransactionSnapshot {
141            txn_id,
142            snapshot_id,
143        }
144    }
145
146    // TODO: Is this method a good idea?  A transaction has to begin in order to determine the next ID?
147    // NOTE: This helper preserves existing call sites that only need identifiers; revisit after
148    // transaction lifecycle changes land.
149    /// Convenience helper that returns only the allocated transaction ID.
150    /// Prefer [`TxnIdManager::begin_transaction`] when a snapshot is required.
151    pub fn next_txn_id(&self) -> TxnId {
152        self.begin_transaction().txn_id
153    }
154
155    /// Return the status for a given transaction ID.
156    pub fn status(&self, txn_id: TxnId) -> TxnStatus {
157        if txn_id == TXN_ID_NONE {
158            return TxnStatus::None;
159        }
160        if txn_id == TXN_ID_AUTO_COMMIT {
161            return TxnStatus::Committed;
162        }
163
164        let guard = self
165            .inner
166            .statuses
167            .lock()
168            .expect("txn status lock poisoned");
169        guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
170    }
171
172    /// Mark a transaction as committed and advance the global watermark.
173    pub fn mark_committed(&self, txn_id: TxnId) {
174        {
175            let mut guard = self
176                .inner
177                .statuses
178                .lock()
179                .expect("txn status lock poisoned");
180            guard.insert(txn_id, TxnStatus::Committed);
181        }
182
183        // Opportunistically advance the committed watermark. Exact ordering is
184        // not critical; best effort progression keeps snapshots monotonic.
185        let mut current = self.inner.last_committed.load(Ordering::SeqCst);
186        loop {
187            if txn_id <= current {
188                break;
189            }
190            match self.inner.last_committed.compare_exchange(
191                current,
192                txn_id,
193                Ordering::SeqCst,
194                Ordering::SeqCst,
195            ) {
196                Ok(_) => break,
197                Err(observed) => current = observed,
198            }
199        }
200    }
201
202    /// Mark a transaction as aborted.
203    pub fn mark_aborted(&self, txn_id: TxnId) {
204        let mut guard = self
205            .inner
206            .statuses
207            .lock()
208            .expect("txn status lock poisoned");
209        guard.insert(txn_id, TxnStatus::Aborted);
210    }
211
212    /// Return the latest committed transaction ID (snapshot watermark).
213    pub fn last_committed(&self) -> TxnId {
214        self.inner.last_committed.load(Ordering::SeqCst)
215    }
216}
217
218impl Default for TxnIdManager {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224/// Metadata tracking when a row was created and deleted.
225///
226/// Each row in a table has associated `created_by` and `deleted_by` values stored
227/// in MVCC metadata columns. These values determine which transactions can see the row.
228///
229/// # Visibility Rules
230///
231/// A row is visible to a transaction if:
232/// 1. The creating transaction committed and its ID ≤ the snapshot ID
233/// 2. The row is not deleted (`deleted_by == TXN_ID_NONE`), or the deleting
234///    transaction either hasn't committed or has ID > snapshot ID
235///
236/// Special case: Rows created by the current transaction are visible to that
237/// transaction immediately, even before commit.
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct RowVersion {
240    /// Transaction ID that created this row version.
241    pub created_by: TxnId,
242    /// Transaction ID that deleted this row version (or [`TXN_ID_NONE`] if still visible).
243    pub deleted_by: TxnId,
244}
245
246impl RowVersion {
247    /// Create a new row version associated with `created_by`.
248    pub fn new(created_by: TxnId) -> Self {
249        Self {
250            created_by,
251            deleted_by: TXN_ID_NONE,
252        }
253    }
254
255    /// Soft-delete the row version by the given transaction.
256    pub fn delete(&mut self, deleted_by: TxnId) {
257        self.deleted_by = deleted_by;
258    }
259
260    /// Basic visibility check using only numeric ordering.
261    ///
262    /// This retains the previous behaviour and is primarily used in tests.
263    pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
264        self.created_by <= snapshot_txn_id
265            && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
266    }
267
268    /// Determine whether the row is visible for the supplied snapshot using full MVCC rules.
269    pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
270        tracing::trace!(
271            "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
272            self.created_by,
273            self.deleted_by,
274            snapshot.txn_id,
275            snapshot.snapshot_id
276        );
277
278        // Rows created inside the current transaction are visible unless this
279        // transaction also deleted them.
280        // IMPORTANT: TXN_ID_AUTO_COMMIT is never treated as "current transaction"
281        if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
282            let visible = self.deleted_by != snapshot.txn_id;
283            tracing::trace!("[MVCC] created by current txn, visible={}", visible);
284            return visible;
285        }
286
287        // Ignore rows whose creator has not committed yet.
288        let creator_status = manager.status(self.created_by);
289        tracing::trace!("[MVCC] creator_status={:?}", creator_status);
290        if !creator_status.is_committed() {
291            tracing::trace!("[MVCC] creator not committed, invisible");
292            return false;
293        }
294
295        if self.created_by > snapshot.snapshot_id {
296            tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
297            return false;
298        }
299
300        match self.deleted_by {
301            TXN_ID_NONE => {
302                tracing::trace!("[MVCC] not deleted, visible");
303                true
304            }
305            tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
306                tracing::trace!("[MVCC] deleted by current txn, invisible");
307                false
308            }
309            tx => {
310                if !manager.status(tx).is_committed() {
311                    // A different transaction marked the row deleted but has not
312                    // committed; the row remains visible to others.
313                    tracing::trace!("[MVCC] deleter not committed, visible");
314                    return true;
315                }
316                let visible = tx > snapshot.snapshot_id;
317                tracing::trace!("[MVCC] deleter committed, visible={}", visible);
318                visible
319            }
320        }
321    }
322
323    /// Visibility check for foreign key validation.
324    ///
325    /// For FK checks, we want different semantics:
326    /// - Rows created by current txn are visible (so FK checks see new parent rows)
327    /// - Rows deleted by current txn are STILL VISIBLE for FK purposes
328    ///   (uncommitted deletes shouldn't affect FK validation)
329    ///
330    /// This implements the SQL standard behavior where FK constraints are checked
331    /// against the committed state plus uncommitted inserts, but ignoring uncommitted deletes.
332    pub fn is_visible_for_fk_check(
333        &self,
334        manager: &TxnIdManager,
335        snapshot: TransactionSnapshot,
336    ) -> bool {
337        tracing::trace!(
338            "[MVCC-FK] is_visible_for_fk_check: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
339            self.created_by,
340            self.deleted_by,
341            snapshot.txn_id,
342            snapshot.snapshot_id
343        );
344
345        // Rows created inside the current transaction are visible.
346        // IMPORTANT: TXN_ID_AUTO_COMMIT is never treated as "current transaction"
347        if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
348            tracing::trace!("[MVCC-FK] created by current txn, visible");
349            return true;
350        }
351
352        // Ignore rows whose creator has not committed yet.
353        let creator_status = manager.status(self.created_by);
354        tracing::trace!("[MVCC-FK] creator_status={:?}", creator_status);
355        if !creator_status.is_committed() {
356            tracing::trace!("[MVCC-FK] creator not committed, invisible");
357            return false;
358        }
359
360        if self.created_by > snapshot.snapshot_id {
361            tracing::trace!("[MVCC-FK] created_by > snapshot_id, invisible");
362            return false;
363        }
364
365        // For FK checks, treat rows deleted by the current transaction as still visible
366        if self.deleted_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
367            tracing::trace!("[MVCC-FK] deleted by current txn, but still visible for FK check");
368            return true;
369        }
370
371        match self.deleted_by {
372            TXN_ID_NONE => {
373                tracing::trace!("[MVCC-FK] not deleted, visible");
374                true
375            }
376            tx => {
377                if !manager.status(tx).is_committed() {
378                    // A different transaction marked the row deleted but has not
379                    // committed; the row remains visible to others.
380                    tracing::trace!("[MVCC-FK] deleter not committed, visible");
381                    return true;
382                }
383                let visible = tx > snapshot.snapshot_id;
384                tracing::trace!("[MVCC-FK] deleter committed, visible={}", visible);
385                visible
386            }
387        }
388    }
389}
390
391/// Transaction metadata captured when a transaction begins.
392///
393/// A snapshot contains two key pieces of information:
394/// - `txn_id`: The unique ID of this transaction (used for writes)
395/// - `snapshot_id`: The highest committed transaction ID at the time this transaction started
396///
397/// The `snapshot_id` determines which rows are visible: rows created by transactions
398/// with IDs ≤ `snapshot_id` are visible, while rows created by later transactions are not.
399/// This implements snapshot isolation.
400#[derive(Debug, Clone, Copy)]
401pub struct TransactionSnapshot {
402    /// The unique ID assigned to this transaction.
403    pub txn_id: TxnId,
404    /// The highest committed transaction ID when this transaction began.
405    pub snapshot_id: TxnId,
406}
407
408/// Transaction status values tracked by the manager.
409#[derive(Debug, Clone, Copy, PartialEq, Eq)]
410pub enum TxnStatus {
411    Active,
412    Committed,
413    Aborted,
414    None,
415}
416
417impl TxnStatus {
418    pub fn is_committed(self) -> bool {
419        matches!(self, TxnStatus::Committed)
420    }
421
422    pub fn is_active(self) -> bool {
423        matches!(self, TxnStatus::Active)
424    }
425
426    pub fn is_aborted(self) -> bool {
427        matches!(self, TxnStatus::Aborted)
428    }
429}
430
431// ============================================================================
432// Arrow helpers for MVCC column construction
433// ============================================================================
434
435/// Build MVCC columns (row_id, created_by, deleted_by) for INSERT/CTAS operations.
436pub fn build_insert_mvcc_columns(
437    row_count: usize,
438    start_row_id: RowId,
439    creator_txn_id: TxnId,
440    deleted_marker: TxnId,
441) -> (ArrayRef, ArrayRef, ArrayRef) {
442    let mut row_builder = UInt64Builder::with_capacity(row_count);
443    for offset in 0..row_count {
444        row_builder.append_value(start_row_id + offset as u64);
445    }
446
447    let mut created_builder = UInt64Builder::with_capacity(row_count);
448    let mut deleted_builder = UInt64Builder::with_capacity(row_count);
449    for _ in 0..row_count {
450        created_builder.append_value(creator_txn_id);
451        deleted_builder.append_value(deleted_marker);
452    }
453
454    (
455        Arc::new(row_builder.finish()) as ArrayRef,
456        Arc::new(created_builder.finish()) as ArrayRef,
457        Arc::new(deleted_builder.finish()) as ArrayRef,
458    )
459}
460
461/// Build MVCC field definitions (row_id, created_by, deleted_by).
462pub fn build_mvcc_fields() -> Vec<Field> {
463    vec![
464        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
465        Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
466        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
467    ]
468}
469
470/// Build field with field_id metadata for a user column.
471pub fn build_field_with_metadata(
472    name: &str,
473    data_type: DataType,
474    nullable: bool,
475    field_id: FieldId,
476) -> Field {
477    let mut metadata = HashMap::with_capacity(1);
478    metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
479    Field::new(name, data_type, nullable).with_metadata(metadata)
480}
481
482/// Build DELETE batch with row_id and deleted_by columns.
483pub fn build_delete_batch(
484    row_ids: Vec<RowId>,
485    deleted_by_txn_id: TxnId,
486) -> llkv_result::Result<RecordBatch> {
487    let row_count = row_ids.len();
488
489    let fields = vec![
490        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
491        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
492    ];
493
494    let arrays: Vec<ArrayRef> = vec![
495        Arc::new(UInt64Array::from(row_ids)),
496        Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
497    ];
498
499    RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    #[test]
507    fn test_txn_id_manager_allocates_monotonic_ids() {
508        let manager = TxnIdManager::new();
509        let snapshot1 = manager.begin_transaction();
510        let snapshot2 = manager.begin_transaction();
511        assert!(snapshot2.txn_id > snapshot1.txn_id);
512    }
513
514    #[test]
515    fn test_row_visibility_simple() {
516        let manager = TxnIdManager::new();
517        let writer_snapshot = manager.begin_transaction();
518        let mut row = RowVersion::new(writer_snapshot.txn_id);
519
520        // Newly created rows are visible to the creating transaction.
521        assert!(row.is_visible(writer_snapshot.txn_id));
522        assert!(row.is_visible_for(&manager, writer_snapshot));
523
524        // After the writer commits, the row becomes visible to later snapshots.
525        manager.mark_committed(writer_snapshot.txn_id);
526        let committed = manager.last_committed();
527        assert!(row.is_visible(committed));
528
529        let reader_snapshot = manager.begin_transaction();
530        assert!(row.is_visible_for(&manager, reader_snapshot));
531
532        // Deleting transaction must commit before other readers stop seeing the row.
533        let deleter_snapshot = manager.begin_transaction();
534        row.delete(deleter_snapshot.txn_id);
535        assert!(row.is_visible_for(&manager, reader_snapshot));
536
537        manager.mark_committed(deleter_snapshot.txn_id);
538        assert!(row.is_visible_for(&manager, reader_snapshot));
539
540        let post_delete_snapshot = manager.begin_transaction();
541        assert!(!row.is_visible_for(&manager, post_delete_snapshot));
542    }
543}