llkv_transaction/
mvcc.rs

1/// Multi-Version Concurrency Control (MVCC) utilities.
2///
3/// This module centralizes the transaction ID allocator, row-version metadata,
4/// Arrow helpers for MVCC column construction, and visibility checks used across
5/// the engine. The overarching goal is to allow transactions to operate directly
6/// on the base storage without copying tables into a staging area.
7use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use croaring::Treemap;
11use llkv_column_map::store::{
12    CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
13};
14use llkv_result::Error;
15use llkv_types::{FieldId, RowId};
16use rustc_hash::FxHashMap;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, Mutex};
20
21/// Transaction ID type.
22pub type TxnId = u64;
23
24/// Transaction ID representing "no transaction" or "not deleted".
25pub const TXN_ID_NONE: TxnId = TxnId::MAX;
26
27/// Transaction ID for auto-commit (single-statement) transactions.
28pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
29
30/// Minimum valid transaction ID for multi-statement transactions.
31pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
32
33/// Check if a transaction ID is reserved (cannot be allocated).
34#[inline]
35pub fn is_reserved_txn_id(id: TxnId) -> bool {
36    id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
37}
38
39/// Return the error message for attempting to use a reserved transaction ID.
40#[inline]
41pub fn reserved_txn_id_message(id: TxnId) -> String {
42    match id {
43        TXN_ID_NONE => format!(
44            "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
45            id
46        ),
47        0 => "Transaction ID 0 is invalid".to_string(),
48        TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
49        _ => format!("Transaction ID {} is reserved", id),
50    }
51}
52
53/// Internal state shared across transaction ID managers.
54#[derive(Debug)]
55struct TxnIdManagerInner {
56    /// Next transaction ID to allocate.
57    next_txn_id: AtomicU64,
58    /// Largest committed transaction ID (acts as snapshot watermark).
59    last_committed: AtomicU64,
60    /// Tracking map for transaction statuses.
61    statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
62}
63
64impl TxnIdManagerInner {
65    fn new() -> Self {
66        Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
67    }
68
69    fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
70        Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
71    }
72
73    fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
74        let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
75        statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
76
77        Self {
78            next_txn_id: AtomicU64::new(next_txn_id),
79            last_committed: AtomicU64::new(last_committed),
80            statuses: Mutex::new(statuses),
81        }
82    }
83}
84
85/// Transaction ID manager that hands out IDs and tracks commit status.
86#[derive(Clone, Debug)]
87pub struct TxnIdManager {
88    inner: Arc<TxnIdManagerInner>,
89}
90
91impl TxnIdManager {
92    /// Create a new manager.
93    pub fn new() -> Self {
94        Self {
95            inner: Arc::new(TxnIdManagerInner::new()),
96        }
97    }
98
99    /// Create a new manager with a custom initial transaction ID.
100    /// Used when loading persisted state from the catalog.
101    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
102        Self {
103            inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
104        }
105    }
106
107    /// Create a new manager with custom initial state.
108    /// Used when loading persisted state from the catalog.
109    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
110        Self {
111            inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
112                next_txn_id,
113                last_committed,
114            )),
115        }
116    }
117
118    /// Get the current next_txn_id value (for persistence).
119    pub fn current_next_txn_id(&self) -> TxnId {
120        self.inner.next_txn_id.load(Ordering::SeqCst)
121    }
122
123    /// Begin a new transaction and return its snapshot.
124    ///
125    /// The snapshot captures both the allocated transaction ID and the latest
126    /// committed ID at the moment the transaction starts. These two values are
127    /// required to evaluate row visibility rules.
128    pub fn begin_transaction(&self) -> TransactionSnapshot {
129        let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
130        let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
131
132        {
133            let mut guard = self
134                .inner
135                .statuses
136                .lock()
137                .expect("txn status lock poisoned");
138            guard.insert(txn_id, TxnStatus::Active);
139        }
140
141        TransactionSnapshot {
142            txn_id,
143            snapshot_id,
144        }
145    }
146
147    // TODO: Is this method a good idea?  A transaction has to begin in order to determine the next ID?
148    // NOTE: This helper preserves existing call sites that only need identifiers; revisit after
149    // transaction lifecycle changes land.
150    /// Convenience helper that returns only the allocated transaction ID.
151    /// Prefer [`TxnIdManager::begin_transaction`] when a snapshot is required.
152    pub fn next_txn_id(&self) -> TxnId {
153        self.begin_transaction().txn_id
154    }
155
156    /// Return the status for a given transaction ID.
157    pub fn status(&self, txn_id: TxnId) -> TxnStatus {
158        if txn_id == TXN_ID_NONE {
159            return TxnStatus::None;
160        }
161        if txn_id == TXN_ID_AUTO_COMMIT {
162            return TxnStatus::Committed;
163        }
164
165        let guard = self
166            .inner
167            .statuses
168            .lock()
169            .expect("txn status lock poisoned");
170        guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
171    }
172
173    /// Mark a transaction as committed and advance the global watermark.
174    pub fn mark_committed(&self, txn_id: TxnId) {
175        {
176            let mut guard = self
177                .inner
178                .statuses
179                .lock()
180                .expect("txn status lock poisoned");
181            guard.insert(txn_id, TxnStatus::Committed);
182        }
183
184        // Opportunistically advance the committed watermark. Exact ordering is
185        // not critical; best effort progression keeps snapshots monotonic.
186        let mut current = self.inner.last_committed.load(Ordering::SeqCst);
187        loop {
188            if txn_id <= current {
189                break;
190            }
191            match self.inner.last_committed.compare_exchange(
192                current,
193                txn_id,
194                Ordering::SeqCst,
195                Ordering::SeqCst,
196            ) {
197                Ok(_) => break,
198                Err(observed) => current = observed,
199            }
200        }
201    }
202
203    /// Returns true if any transaction other than `txn_id` is currently active.
204    pub fn has_other_active_transactions(&self, txn_id: TxnId) -> bool {
205        let guard = self
206            .inner
207            .statuses
208            .lock()
209            .expect("txn status lock poisoned");
210        guard
211            .iter()
212            .any(|(&other_id, status)| other_id != txn_id && status.is_active())
213    }
214
215    /// Mark a transaction as aborted.
216    pub fn mark_aborted(&self, txn_id: TxnId) {
217        let mut guard = self
218            .inner
219            .statuses
220            .lock()
221            .expect("txn status lock poisoned");
222        guard.insert(txn_id, TxnStatus::Aborted);
223    }
224
225    /// Return the latest committed transaction ID (snapshot watermark).
226    pub fn last_committed(&self) -> TxnId {
227        self.inner.last_committed.load(Ordering::SeqCst)
228    }
229}
230
231impl Default for TxnIdManager {
232    fn default() -> Self {
233        Self::new()
234    }
235}
236
237/// Metadata tracking when a row was created and deleted.
238///
239/// Each row in a table has associated `created_by` and `deleted_by` values stored
240/// in MVCC metadata columns. These values determine which transactions can see the row.
241///
242/// # Visibility Rules
243///
244/// A row is visible to a transaction if:
245/// 1. The creating transaction committed and its ID ≤ the snapshot ID
246/// 2. The row is not deleted (`deleted_by == TXN_ID_NONE`), or the deleting
247///    transaction either hasn't committed or has ID > snapshot ID
248///
249/// Special case: Rows created by the current transaction are visible to that
250/// transaction immediately, even before commit.
251#[derive(Debug, Clone, Copy, PartialEq, Eq)]
252pub struct RowVersion {
253    /// Transaction ID that created this row version.
254    pub created_by: TxnId,
255    /// Transaction ID that deleted this row version (or [`TXN_ID_NONE`] if still visible).
256    pub deleted_by: TxnId,
257}
258
259impl RowVersion {
260    /// Create a new row version associated with `created_by`.
261    pub fn new(created_by: TxnId) -> Self {
262        Self {
263            created_by,
264            deleted_by: TXN_ID_NONE,
265        }
266    }
267
268    /// Soft-delete the row version by the given transaction.
269    pub fn delete(&mut self, deleted_by: TxnId) {
270        self.deleted_by = deleted_by;
271    }
272
273    /// Basic visibility check using only numeric ordering.
274    ///
275    /// This retains the previous behaviour and is primarily used in tests.
276    pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
277        self.created_by <= snapshot_txn_id
278            && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
279    }
280
281    /// Determine whether the row is visible for the supplied snapshot using full MVCC rules.
282    pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
283        tracing::trace!(
284            "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
285            self.created_by,
286            self.deleted_by,
287            snapshot.txn_id,
288            snapshot.snapshot_id
289        );
290
291        // Rows created inside the current transaction are visible unless this
292        // transaction also deleted them.
293        // IMPORTANT: TXN_ID_AUTO_COMMIT is never treated as "current transaction"
294        if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
295            let visible = self.deleted_by != snapshot.txn_id;
296            tracing::trace!("[MVCC] created by current txn, visible={}", visible);
297            return visible;
298        }
299
300        // Ignore rows whose creator has not committed yet.
301        let creator_status = manager.status(self.created_by);
302        tracing::trace!("[MVCC] creator_status={:?}", creator_status);
303        if !creator_status.is_committed() {
304            tracing::trace!("[MVCC] creator not committed, invisible");
305            return false;
306        }
307
308        if self.created_by > snapshot.snapshot_id {
309            tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
310            return false;
311        }
312
313        match self.deleted_by {
314            TXN_ID_NONE => {
315                tracing::trace!("[MVCC] not deleted, visible");
316                true
317            }
318            tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
319                tracing::trace!("[MVCC] deleted by current txn, invisible");
320                false
321            }
322            tx => {
323                if !manager.status(tx).is_committed() {
324                    // A different transaction marked the row deleted but has not
325                    // committed; the row remains visible to others.
326                    tracing::trace!("[MVCC] deleter not committed, visible");
327                    return true;
328                }
329                let visible = tx > snapshot.snapshot_id;
330                tracing::trace!("[MVCC] deleter committed, visible={}", visible);
331                visible
332            }
333        }
334    }
335
336    /// Visibility check for foreign key validation.
337    ///
338    /// For FK checks, we want different semantics:
339    /// - Rows created by current txn are visible (so FK checks see new parent rows)
340    /// - Rows deleted by current txn are STILL VISIBLE for FK purposes
341    ///   (uncommitted deletes shouldn't affect FK validation)
342    ///
343    /// This implements the SQL standard behavior where FK constraints are checked
344    /// against the committed state plus uncommitted inserts, but ignoring uncommitted deletes.
345    pub fn is_visible_for_fk_check(
346        &self,
347        manager: &TxnIdManager,
348        snapshot: TransactionSnapshot,
349    ) -> bool {
350        tracing::trace!(
351            "[MVCC-FK] is_visible_for_fk_check: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
352            self.created_by,
353            self.deleted_by,
354            snapshot.txn_id,
355            snapshot.snapshot_id
356        );
357
358        // Rows created inside the current transaction are visible.
359        // IMPORTANT: TXN_ID_AUTO_COMMIT is never treated as "current transaction"
360        if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
361            tracing::trace!("[MVCC-FK] created by current txn, visible");
362            return true;
363        }
364
365        // Ignore rows whose creator has not committed yet.
366        let creator_status = manager.status(self.created_by);
367        tracing::trace!("[MVCC-FK] creator_status={:?}", creator_status);
368        if !creator_status.is_committed() {
369            tracing::trace!("[MVCC-FK] creator not committed, invisible");
370            return false;
371        }
372
373        if self.created_by > snapshot.snapshot_id {
374            tracing::trace!("[MVCC-FK] created_by > snapshot_id, invisible");
375            return false;
376        }
377
378        // For FK checks, treat rows deleted by the current transaction as still visible
379        if self.deleted_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
380            tracing::trace!("[MVCC-FK] deleted by current txn, but still visible for FK check");
381            return true;
382        }
383
384        match self.deleted_by {
385            TXN_ID_NONE => {
386                tracing::trace!("[MVCC-FK] not deleted, visible");
387                true
388            }
389            tx => {
390                if !manager.status(tx).is_committed() {
391                    // A different transaction marked the row deleted but has not
392                    // committed; the row remains visible to others.
393                    tracing::trace!("[MVCC-FK] deleter not committed, visible");
394                    return true;
395                }
396                let visible = tx > snapshot.snapshot_id;
397                tracing::trace!("[MVCC-FK] deleter committed, visible={}", visible);
398                visible
399            }
400        }
401    }
402}
403
404/// Transaction metadata captured when a transaction begins.
405///
406/// A snapshot contains two key pieces of information:
407/// - `txn_id`: The unique ID of this transaction (used for writes)
408/// - `snapshot_id`: The highest committed transaction ID at the time this transaction started
409///
410/// The `snapshot_id` determines which rows are visible: rows created by transactions
411/// with IDs ≤ `snapshot_id` are visible, while rows created by later transactions are not.
412/// This implements snapshot isolation.
413#[derive(Debug, Clone, Copy)]
414pub struct TransactionSnapshot {
415    /// The unique ID assigned to this transaction.
416    pub txn_id: TxnId,
417    /// The highest committed transaction ID when this transaction began.
418    pub snapshot_id: TxnId,
419}
420
421/// Transaction status values tracked by the manager.
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
423pub enum TxnStatus {
424    Active,
425    Committed,
426    Aborted,
427    None,
428}
429
430impl TxnStatus {
431    pub fn is_committed(self) -> bool {
432        matches!(self, TxnStatus::Committed)
433    }
434
435    pub fn is_active(self) -> bool {
436        matches!(self, TxnStatus::Active)
437    }
438
439    pub fn is_aborted(self) -> bool {
440        matches!(self, TxnStatus::Aborted)
441    }
442}
443
444// ============================================================================
445// Arrow helpers for MVCC column construction
446// ============================================================================
447
448/// Build MVCC columns (row_id, created_by, deleted_by) for INSERT/CTAS operations.
449pub fn build_insert_mvcc_columns(
450    row_count: usize,
451    start_row_id: RowId,
452    creator_txn_id: TxnId,
453    deleted_marker: TxnId,
454) -> (ArrayRef, ArrayRef, ArrayRef) {
455    let mut row_builder = UInt64Builder::with_capacity(row_count);
456    for offset in 0..row_count {
457        row_builder.append_value(start_row_id + offset as u64);
458    }
459
460    let mut created_builder = UInt64Builder::with_capacity(row_count);
461    let mut deleted_builder = UInt64Builder::with_capacity(row_count);
462    for _ in 0..row_count {
463        created_builder.append_value(creator_txn_id);
464        deleted_builder.append_value(deleted_marker);
465    }
466
467    (
468        Arc::new(row_builder.finish()) as ArrayRef,
469        Arc::new(created_builder.finish()) as ArrayRef,
470        Arc::new(deleted_builder.finish()) as ArrayRef,
471    )
472}
473
474/// Build MVCC field definitions (row_id, created_by, deleted_by).
475pub fn build_mvcc_fields() -> Vec<Field> {
476    vec![
477        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
478        Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
479        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
480    ]
481}
482
483/// Build field with field_id metadata for a user column.
484pub fn build_field_with_metadata(
485    name: &str,
486    data_type: DataType,
487    nullable: bool,
488    field_id: FieldId,
489) -> Field {
490    let mut metadata = HashMap::with_capacity(1);
491    metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
492    Field::new(name, data_type, nullable).with_metadata(metadata)
493}
494
495/// Build DELETE batch with row_id and deleted_by columns.
496pub fn build_delete_batch(
497    row_ids: Treemap,
498    deleted_by_txn_id: TxnId,
499) -> llkv_result::Result<RecordBatch> {
500    let row_count = row_ids.cardinality() as usize;
501
502    let fields = vec![
503        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
504        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
505    ];
506
507    let arrays: Vec<ArrayRef> = vec![
508        Arc::new(UInt64Array::from_iter_values(row_ids.iter())),
509        Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
510    ];
511
512    RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
513}
514
515#[cfg(test)]
516mod tests {
517    use super::*;
518
519    #[test]
520    fn test_txn_id_manager_allocates_monotonic_ids() {
521        let manager = TxnIdManager::new();
522        let snapshot1 = manager.begin_transaction();
523        let snapshot2 = manager.begin_transaction();
524        assert!(snapshot2.txn_id > snapshot1.txn_id);
525    }
526
527    #[test]
528    fn test_row_visibility_simple() {
529        let manager = TxnIdManager::new();
530        let writer_snapshot = manager.begin_transaction();
531        let mut row = RowVersion::new(writer_snapshot.txn_id);
532
533        // Newly created rows are visible to the creating transaction.
534        assert!(row.is_visible(writer_snapshot.txn_id));
535        assert!(row.is_visible_for(&manager, writer_snapshot));
536
537        // After the writer commits, the row becomes visible to later snapshots.
538        manager.mark_committed(writer_snapshot.txn_id);
539        let committed = manager.last_committed();
540        assert!(row.is_visible(committed));
541
542        let reader_snapshot = manager.begin_transaction();
543        assert!(row.is_visible_for(&manager, reader_snapshot));
544
545        // Deleting transaction must commit before other readers stop seeing the row.
546        let deleter_snapshot = manager.begin_transaction();
547        row.delete(deleter_snapshot.txn_id);
548        assert!(row.is_visible_for(&manager, reader_snapshot));
549
550        manager.mark_committed(deleter_snapshot.txn_id);
551        assert!(row.is_visible_for(&manager, reader_snapshot));
552
553        let post_delete_snapshot = manager.begin_transaction();
554        assert!(!row.is_visible_for(&manager, post_delete_snapshot));
555    }
556}