llkv_transaction/
mvcc.rs

1/// Multi-Version Concurrency Control (MVCC) utilities.
2///
3/// This module centralizes the transaction ID allocator, row-version metadata,
4/// Arrow helpers for MVCC column construction, and visibility checks used across
5/// the engine. The overarching goal is to allow transactions to operate directly
6/// on the base storage without copying tables into a staging area.
7use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use croaring::Treemap;
11use llkv_column_map::store::{
12    CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
13};
14use llkv_result::Error;
15use llkv_types::{FieldId, RowId};
16use rustc_hash::FxHashMap;
17use std::collections::HashMap;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::{Arc, Mutex};
20
21/// Transaction ID type.
22pub type TxnId = u64;
23
24/// Transaction ID representing "no transaction" or "not deleted".
25pub const TXN_ID_NONE: TxnId = TxnId::MAX;
26
27/// Transaction ID for auto-commit (single-statement) transactions.
28pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
29
30/// Minimum valid transaction ID for multi-statement transactions.
31pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
32
33/// Check if a transaction ID is reserved (cannot be allocated).
34#[inline]
35pub fn is_reserved_txn_id(id: TxnId) -> bool {
36    id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
37}
38
39/// Return the error message for attempting to use a reserved transaction ID.
40#[inline]
41pub fn reserved_txn_id_message(id: TxnId) -> String {
42    match id {
43        TXN_ID_NONE => format!(
44            "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
45            id
46        ),
47        0 => "Transaction ID 0 is invalid".to_string(),
48        TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
49        _ => format!("Transaction ID {} is reserved", id),
50    }
51}
52
53/// Internal state shared across transaction ID managers.
54#[derive(Debug)]
55struct TxnIdManagerInner {
56    /// Next transaction ID to allocate.
57    next_txn_id: AtomicU64,
58    /// Largest committed transaction ID (acts as snapshot watermark).
59    last_committed: AtomicU64,
60    /// Tracking map for transaction statuses.
61    statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
62}
63
64impl TxnIdManagerInner {
65    fn new() -> Self {
66        Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
67    }
68
69    fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
70        Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
71    }
72
73    fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
74        let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
75        statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
76
77        Self {
78            next_txn_id: AtomicU64::new(next_txn_id),
79            last_committed: AtomicU64::new(last_committed),
80            statuses: Mutex::new(statuses),
81        }
82    }
83}
84
85/// Transaction ID manager that hands out IDs and tracks commit status.
86#[derive(Clone, Debug)]
87pub struct TxnIdManager {
88    inner: Arc<TxnIdManagerInner>,
89}
90
91impl TxnIdManager {
92    /// Create a new manager.
93    pub fn new() -> Self {
94        Self {
95            inner: Arc::new(TxnIdManagerInner::new()),
96        }
97    }
98
99    /// Create a new manager with a custom initial transaction ID.
100    /// Used when loading persisted state from the catalog.
101    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
102        Self {
103            inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
104        }
105    }
106
107    /// Create a new manager with custom initial state.
108    /// Used when loading persisted state from the catalog.
109    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
110        Self {
111            inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
112                next_txn_id,
113                last_committed,
114            )),
115        }
116    }
117
118    /// Get the current next_txn_id value (for persistence).
119    pub fn current_next_txn_id(&self) -> TxnId {
120        self.inner.next_txn_id.load(Ordering::SeqCst)
121    }
122
123    /// Begin a new transaction and return its snapshot.
124    ///
125    /// The snapshot captures both the allocated transaction ID and the latest
126    /// committed ID at the moment the transaction starts. These two values are
127    /// required to evaluate row visibility rules.
128    pub fn begin_transaction(&self) -> TransactionSnapshot {
129        let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
130        let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
131
132        {
133            let mut guard = self
134                .inner
135                .statuses
136                .lock()
137                .expect("txn status lock poisoned");
138            guard.insert(txn_id, TxnStatus::Active);
139        }
140
141        TransactionSnapshot {
142            txn_id,
143            snapshot_id,
144        }
145    }
146
147    // TODO: Is this method a good idea?  A transaction has to begin in order to determine the next ID?
148    // NOTE: This helper preserves existing call sites that only need identifiers; revisit after
149    // transaction lifecycle changes land.
150    /// Convenience helper that returns only the allocated transaction ID.
151    /// Prefer [`TxnIdManager::begin_transaction`] when a snapshot is required.
152    pub fn next_txn_id(&self) -> TxnId {
153        self.begin_transaction().txn_id
154    }
155
156    /// Return the status for a given transaction ID.
157    pub fn status(&self, txn_id: TxnId) -> TxnStatus {
158        if txn_id == TXN_ID_NONE {
159            return TxnStatus::None;
160        }
161        if txn_id == TXN_ID_AUTO_COMMIT {
162            return TxnStatus::Committed;
163        }
164
165        let guard = self
166            .inner
167            .statuses
168            .lock()
169            .expect("txn status lock poisoned");
170        guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
171    }
172
173    /// Mark a transaction as committed and advance the global watermark.
174    pub fn mark_committed(&self, txn_id: TxnId) {
175        {
176            let mut guard = self
177                .inner
178                .statuses
179                .lock()
180                .expect("txn status lock poisoned");
181            guard.insert(txn_id, TxnStatus::Committed);
182        }
183
184        // Opportunistically advance the committed watermark. Exact ordering is
185        // not critical; best effort progression keeps snapshots monotonic.
186        let mut current = self.inner.last_committed.load(Ordering::SeqCst);
187        loop {
188            if txn_id <= current {
189                break;
190            }
191            match self.inner.last_committed.compare_exchange(
192                current,
193                txn_id,
194                Ordering::SeqCst,
195                Ordering::SeqCst,
196            ) {
197                Ok(_) => break,
198                Err(observed) => current = observed,
199            }
200        }
201    }
202
203    /// Mark a transaction as aborted.
204    pub fn mark_aborted(&self, txn_id: TxnId) {
205        let mut guard = self
206            .inner
207            .statuses
208            .lock()
209            .expect("txn status lock poisoned");
210        guard.insert(txn_id, TxnStatus::Aborted);
211    }
212
213    /// Return the latest committed transaction ID (snapshot watermark).
214    pub fn last_committed(&self) -> TxnId {
215        self.inner.last_committed.load(Ordering::SeqCst)
216    }
217}
218
219impl Default for TxnIdManager {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225/// Metadata tracking when a row was created and deleted.
226///
227/// Each row in a table has associated `created_by` and `deleted_by` values stored
228/// in MVCC metadata columns. These values determine which transactions can see the row.
229///
230/// # Visibility Rules
231///
232/// A row is visible to a transaction if:
233/// 1. The creating transaction committed and its ID ≤ the snapshot ID
234/// 2. The row is not deleted (`deleted_by == TXN_ID_NONE`), or the deleting
235///    transaction either hasn't committed or has ID > snapshot ID
236///
237/// Special case: Rows created by the current transaction are visible to that
238/// transaction immediately, even before commit.
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
240pub struct RowVersion {
241    /// Transaction ID that created this row version.
242    pub created_by: TxnId,
243    /// Transaction ID that deleted this row version (or [`TXN_ID_NONE`] if still visible).
244    pub deleted_by: TxnId,
245}
246
247impl RowVersion {
248    /// Create a new row version associated with `created_by`.
249    pub fn new(created_by: TxnId) -> Self {
250        Self {
251            created_by,
252            deleted_by: TXN_ID_NONE,
253        }
254    }
255
256    /// Soft-delete the row version by the given transaction.
257    pub fn delete(&mut self, deleted_by: TxnId) {
258        self.deleted_by = deleted_by;
259    }
260
261    /// Basic visibility check using only numeric ordering.
262    ///
263    /// This retains the previous behaviour and is primarily used in tests.
264    pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
265        self.created_by <= snapshot_txn_id
266            && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
267    }
268
269    /// Determine whether the row is visible for the supplied snapshot using full MVCC rules.
270    pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
271        tracing::trace!(
272            "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
273            self.created_by,
274            self.deleted_by,
275            snapshot.txn_id,
276            snapshot.snapshot_id
277        );
278
279        // Rows created inside the current transaction are visible unless this
280        // transaction also deleted them.
281        // IMPORTANT: TXN_ID_AUTO_COMMIT is never treated as "current transaction"
282        if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
283            let visible = self.deleted_by != snapshot.txn_id;
284            tracing::trace!("[MVCC] created by current txn, visible={}", visible);
285            return visible;
286        }
287
288        // Ignore rows whose creator has not committed yet.
289        let creator_status = manager.status(self.created_by);
290        tracing::trace!("[MVCC] creator_status={:?}", creator_status);
291        if !creator_status.is_committed() {
292            tracing::trace!("[MVCC] creator not committed, invisible");
293            return false;
294        }
295
296        if self.created_by > snapshot.snapshot_id {
297            tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
298            return false;
299        }
300
301        match self.deleted_by {
302            TXN_ID_NONE => {
303                tracing::trace!("[MVCC] not deleted, visible");
304                true
305            }
306            tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
307                tracing::trace!("[MVCC] deleted by current txn, invisible");
308                false
309            }
310            tx => {
311                if !manager.status(tx).is_committed() {
312                    // A different transaction marked the row deleted but has not
313                    // committed; the row remains visible to others.
314                    tracing::trace!("[MVCC] deleter not committed, visible");
315                    return true;
316                }
317                let visible = tx > snapshot.snapshot_id;
318                tracing::trace!("[MVCC] deleter committed, visible={}", visible);
319                visible
320            }
321        }
322    }
323
324    /// Visibility check for foreign key validation.
325    ///
326    /// For FK checks, we want different semantics:
327    /// - Rows created by current txn are visible (so FK checks see new parent rows)
328    /// - Rows deleted by current txn are STILL VISIBLE for FK purposes
329    ///   (uncommitted deletes shouldn't affect FK validation)
330    ///
331    /// This implements the SQL standard behavior where FK constraints are checked
332    /// against the committed state plus uncommitted inserts, but ignoring uncommitted deletes.
333    pub fn is_visible_for_fk_check(
334        &self,
335        manager: &TxnIdManager,
336        snapshot: TransactionSnapshot,
337    ) -> bool {
338        tracing::trace!(
339            "[MVCC-FK] is_visible_for_fk_check: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
340            self.created_by,
341            self.deleted_by,
342            snapshot.txn_id,
343            snapshot.snapshot_id
344        );
345
346        // Rows created inside the current transaction are visible.
347        // IMPORTANT: TXN_ID_AUTO_COMMIT is never treated as "current transaction"
348        if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
349            tracing::trace!("[MVCC-FK] created by current txn, visible");
350            return true;
351        }
352
353        // Ignore rows whose creator has not committed yet.
354        let creator_status = manager.status(self.created_by);
355        tracing::trace!("[MVCC-FK] creator_status={:?}", creator_status);
356        if !creator_status.is_committed() {
357            tracing::trace!("[MVCC-FK] creator not committed, invisible");
358            return false;
359        }
360
361        if self.created_by > snapshot.snapshot_id {
362            tracing::trace!("[MVCC-FK] created_by > snapshot_id, invisible");
363            return false;
364        }
365
366        // For FK checks, treat rows deleted by the current transaction as still visible
367        if self.deleted_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
368            tracing::trace!("[MVCC-FK] deleted by current txn, but still visible for FK check");
369            return true;
370        }
371
372        match self.deleted_by {
373            TXN_ID_NONE => {
374                tracing::trace!("[MVCC-FK] not deleted, visible");
375                true
376            }
377            tx => {
378                if !manager.status(tx).is_committed() {
379                    // A different transaction marked the row deleted but has not
380                    // committed; the row remains visible to others.
381                    tracing::trace!("[MVCC-FK] deleter not committed, visible");
382                    return true;
383                }
384                let visible = tx > snapshot.snapshot_id;
385                tracing::trace!("[MVCC-FK] deleter committed, visible={}", visible);
386                visible
387            }
388        }
389    }
390}
391
392/// Transaction metadata captured when a transaction begins.
393///
394/// A snapshot contains two key pieces of information:
395/// - `txn_id`: The unique ID of this transaction (used for writes)
396/// - `snapshot_id`: The highest committed transaction ID at the time this transaction started
397///
398/// The `snapshot_id` determines which rows are visible: rows created by transactions
399/// with IDs ≤ `snapshot_id` are visible, while rows created by later transactions are not.
400/// This implements snapshot isolation.
401#[derive(Debug, Clone, Copy)]
402pub struct TransactionSnapshot {
403    /// The unique ID assigned to this transaction.
404    pub txn_id: TxnId,
405    /// The highest committed transaction ID when this transaction began.
406    pub snapshot_id: TxnId,
407}
408
409/// Transaction status values tracked by the manager.
410#[derive(Debug, Clone, Copy, PartialEq, Eq)]
411pub enum TxnStatus {
412    Active,
413    Committed,
414    Aborted,
415    None,
416}
417
418impl TxnStatus {
419    pub fn is_committed(self) -> bool {
420        matches!(self, TxnStatus::Committed)
421    }
422
423    pub fn is_active(self) -> bool {
424        matches!(self, TxnStatus::Active)
425    }
426
427    pub fn is_aborted(self) -> bool {
428        matches!(self, TxnStatus::Aborted)
429    }
430}
431
432// ============================================================================
433// Arrow helpers for MVCC column construction
434// ============================================================================
435
436/// Build MVCC columns (row_id, created_by, deleted_by) for INSERT/CTAS operations.
437pub fn build_insert_mvcc_columns(
438    row_count: usize,
439    start_row_id: RowId,
440    creator_txn_id: TxnId,
441    deleted_marker: TxnId,
442) -> (ArrayRef, ArrayRef, ArrayRef) {
443    let mut row_builder = UInt64Builder::with_capacity(row_count);
444    for offset in 0..row_count {
445        row_builder.append_value(start_row_id + offset as u64);
446    }
447
448    let mut created_builder = UInt64Builder::with_capacity(row_count);
449    let mut deleted_builder = UInt64Builder::with_capacity(row_count);
450    for _ in 0..row_count {
451        created_builder.append_value(creator_txn_id);
452        deleted_builder.append_value(deleted_marker);
453    }
454
455    (
456        Arc::new(row_builder.finish()) as ArrayRef,
457        Arc::new(created_builder.finish()) as ArrayRef,
458        Arc::new(deleted_builder.finish()) as ArrayRef,
459    )
460}
461
462/// Build MVCC field definitions (row_id, created_by, deleted_by).
463pub fn build_mvcc_fields() -> Vec<Field> {
464    vec![
465        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
466        Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
467        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
468    ]
469}
470
471/// Build field with field_id metadata for a user column.
472pub fn build_field_with_metadata(
473    name: &str,
474    data_type: DataType,
475    nullable: bool,
476    field_id: FieldId,
477) -> Field {
478    let mut metadata = HashMap::with_capacity(1);
479    metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
480    Field::new(name, data_type, nullable).with_metadata(metadata)
481}
482
483/// Build DELETE batch with row_id and deleted_by columns.
484pub fn build_delete_batch(
485    row_ids: Treemap,
486    deleted_by_txn_id: TxnId,
487) -> llkv_result::Result<RecordBatch> {
488    let row_count = row_ids.cardinality() as usize;
489
490    let fields = vec![
491        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
492        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
493    ];
494
495    let arrays: Vec<ArrayRef> = vec![
496        Arc::new(UInt64Array::from_iter_values(row_ids.iter())),
497        Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
498    ];
499
500    RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506
507    #[test]
508    fn test_txn_id_manager_allocates_monotonic_ids() {
509        let manager = TxnIdManager::new();
510        let snapshot1 = manager.begin_transaction();
511        let snapshot2 = manager.begin_transaction();
512        assert!(snapshot2.txn_id > snapshot1.txn_id);
513    }
514
515    #[test]
516    fn test_row_visibility_simple() {
517        let manager = TxnIdManager::new();
518        let writer_snapshot = manager.begin_transaction();
519        let mut row = RowVersion::new(writer_snapshot.txn_id);
520
521        // Newly created rows are visible to the creating transaction.
522        assert!(row.is_visible(writer_snapshot.txn_id));
523        assert!(row.is_visible_for(&manager, writer_snapshot));
524
525        // After the writer commits, the row becomes visible to later snapshots.
526        manager.mark_committed(writer_snapshot.txn_id);
527        let committed = manager.last_committed();
528        assert!(row.is_visible(committed));
529
530        let reader_snapshot = manager.begin_transaction();
531        assert!(row.is_visible_for(&manager, reader_snapshot));
532
533        // Deleting transaction must commit before other readers stop seeing the row.
534        let deleter_snapshot = manager.begin_transaction();
535        row.delete(deleter_snapshot.txn_id);
536        assert!(row.is_visible_for(&manager, reader_snapshot));
537
538        manager.mark_committed(deleter_snapshot.txn_id);
539        assert!(row.is_visible_for(&manager, reader_snapshot));
540
541        let post_delete_snapshot = manager.begin_transaction();
542        assert!(!row.is_visible_for(&manager, post_delete_snapshot));
543    }
544}