llkv_transaction/
mvcc.rs

1/// Multi-Version Concurrency Control (MVCC) utilities.
2///
3/// This module centralizes the transaction ID allocator, row-version metadata,
4/// Arrow helpers for MVCC column construction, and visibility checks used across
5/// the engine. The overarching goal is to allow transactions to operate directly
6/// on the base storage without copying tables into a staging area.
7use arrow::array::{ArrayRef, UInt64Array, UInt64Builder};
8use arrow::datatypes::{DataType, Field, Schema};
9use arrow::record_batch::RecordBatch;
10use llkv_column_map::store::{
11    CREATED_BY_COLUMN_NAME, DELETED_BY_COLUMN_NAME, FIELD_ID_META_KEY, ROW_ID_COLUMN_NAME,
12};
13use llkv_column_map::types::{FieldId, RowId};
14use llkv_result::Error;
15use rustc_hash::FxHashMap;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU64, Ordering};
18use std::sync::{Arc, Mutex};
19
20/// Transaction ID type.
21pub type TxnId = u64;
22
23/// Transaction ID representing "no transaction" or "not deleted".
24pub const TXN_ID_NONE: TxnId = TxnId::MAX;
25
26/// Transaction ID for auto-commit (single-statement) transactions.
27pub const TXN_ID_AUTO_COMMIT: TxnId = 1;
28
29/// Minimum valid transaction ID for multi-statement transactions.
30pub const TXN_ID_MIN_MULTI_STATEMENT: TxnId = TXN_ID_AUTO_COMMIT + 1;
31
32/// Check if a transaction ID is reserved (cannot be allocated).
33#[inline]
34pub fn is_reserved_txn_id(id: TxnId) -> bool {
35    id == TXN_ID_NONE || id <= TXN_ID_AUTO_COMMIT
36}
37
38/// Return the error message for attempting to use a reserved transaction ID.
39#[inline]
40pub fn reserved_txn_id_message(id: TxnId) -> String {
41    match id {
42        TXN_ID_NONE => format!(
43            "Transaction ID {} (u64::MAX) is reserved for TXN_ID_NONE",
44            id
45        ),
46        0 => "Transaction ID 0 is invalid".to_string(),
47        TXN_ID_AUTO_COMMIT => "Transaction ID 1 is reserved for TXN_ID_AUTO_COMMIT".to_string(),
48        _ => format!("Transaction ID {} is reserved", id),
49    }
50}
51
52/// Internal state shared across transaction ID managers.
53#[derive(Debug)]
54struct TxnIdManagerInner {
55    /// Next transaction ID to allocate.
56    next_txn_id: AtomicU64,
57    /// Largest committed transaction ID (acts as snapshot watermark).
58    last_committed: AtomicU64,
59    /// Tracking map for transaction statuses.
60    statuses: Mutex<FxHashMap<TxnId, TxnStatus>>,
61}
62
63impl TxnIdManagerInner {
64    fn new() -> Self {
65        Self::new_with_initial_txn_id(TXN_ID_AUTO_COMMIT + 1)
66    }
67
68    fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
69        Self::new_with_initial_state(next_txn_id, TXN_ID_AUTO_COMMIT)
70    }
71
72    fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
73        let mut statuses = FxHashMap::with_capacity_and_hasher(1, Default::default());
74        statuses.insert(TXN_ID_AUTO_COMMIT, TxnStatus::Committed);
75
76        Self {
77            next_txn_id: AtomicU64::new(next_txn_id),
78            last_committed: AtomicU64::new(last_committed),
79            statuses: Mutex::new(statuses),
80        }
81    }
82}
83
84/// Transaction ID manager that hands out IDs and tracks commit status.
85#[derive(Clone, Debug)]
86pub struct TxnIdManager {
87    inner: Arc<TxnIdManagerInner>,
88}
89
90impl TxnIdManager {
91    /// Create a new manager.
92    pub fn new() -> Self {
93        Self {
94            inner: Arc::new(TxnIdManagerInner::new()),
95        }
96    }
97
98    /// Create a new manager with a custom initial transaction ID.
99    /// Used when loading persisted state from the catalog.
100    pub fn new_with_initial_txn_id(next_txn_id: TxnId) -> Self {
101        Self {
102            inner: Arc::new(TxnIdManagerInner::new_with_initial_txn_id(next_txn_id)),
103        }
104    }
105
106    /// Create a new manager with custom initial state.
107    /// Used when loading persisted state from the catalog.
108    pub fn new_with_initial_state(next_txn_id: TxnId, last_committed: TxnId) -> Self {
109        Self {
110            inner: Arc::new(TxnIdManagerInner::new_with_initial_state(
111                next_txn_id,
112                last_committed,
113            )),
114        }
115    }
116
117    /// Get the current next_txn_id value (for persistence).
118    pub fn current_next_txn_id(&self) -> TxnId {
119        self.inner.next_txn_id.load(Ordering::SeqCst)
120    }
121
122    /// Begin a new transaction and return its snapshot.
123    ///
124    /// The snapshot captures both the allocated transaction ID and the latest
125    /// committed ID at the moment the transaction starts. These two values are
126    /// required to evaluate row visibility rules.
127    pub fn begin_transaction(&self) -> TransactionSnapshot {
128        let snapshot_id = self.inner.last_committed.load(Ordering::SeqCst);
129        let txn_id = self.inner.next_txn_id.fetch_add(1, Ordering::SeqCst);
130
131        {
132            let mut guard = self
133                .inner
134                .statuses
135                .lock()
136                .expect("txn status lock poisoned");
137            guard.insert(txn_id, TxnStatus::Active);
138        }
139
140        TransactionSnapshot {
141            txn_id,
142            snapshot_id,
143        }
144    }
145
146    // TODO: Is this method a good idea?  A transaction has to begin in order to determine the next ID?
147    // NOTE: This helper preserves existing call sites that only need identifiers; revisit after
148    // transaction lifecycle changes land.
149    /// Convenience helper that returns only the allocated transaction ID.
150    /// Prefer [`TxnIdManager::begin_transaction`] when a snapshot is required.
151    pub fn next_txn_id(&self) -> TxnId {
152        self.begin_transaction().txn_id
153    }
154
155    /// Return the status for a given transaction ID.
156    pub fn status(&self, txn_id: TxnId) -> TxnStatus {
157        if txn_id == TXN_ID_NONE {
158            return TxnStatus::None;
159        }
160        if txn_id == TXN_ID_AUTO_COMMIT {
161            return TxnStatus::Committed;
162        }
163
164        let guard = self
165            .inner
166            .statuses
167            .lock()
168            .expect("txn status lock poisoned");
169        guard.get(&txn_id).copied().unwrap_or(TxnStatus::Committed)
170    }
171
172    /// Mark a transaction as committed and advance the global watermark.
173    pub fn mark_committed(&self, txn_id: TxnId) {
174        {
175            let mut guard = self
176                .inner
177                .statuses
178                .lock()
179                .expect("txn status lock poisoned");
180            guard.insert(txn_id, TxnStatus::Committed);
181        }
182
183        // Opportunistically advance the committed watermark. Exact ordering is
184        // not critical; best effort progression keeps snapshots monotonic.
185        let mut current = self.inner.last_committed.load(Ordering::SeqCst);
186        loop {
187            if txn_id <= current {
188                break;
189            }
190            match self.inner.last_committed.compare_exchange(
191                current,
192                txn_id,
193                Ordering::SeqCst,
194                Ordering::SeqCst,
195            ) {
196                Ok(_) => break,
197                Err(observed) => current = observed,
198            }
199        }
200    }
201
202    /// Mark a transaction as aborted.
203    pub fn mark_aborted(&self, txn_id: TxnId) {
204        let mut guard = self
205            .inner
206            .statuses
207            .lock()
208            .expect("txn status lock poisoned");
209        guard.insert(txn_id, TxnStatus::Aborted);
210    }
211
212    /// Return the latest committed transaction ID (snapshot watermark).
213    pub fn last_committed(&self) -> TxnId {
214        self.inner.last_committed.load(Ordering::SeqCst)
215    }
216}
217
218impl Default for TxnIdManager {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224/// Metadata tracking when a row was created and deleted.
225///
226/// Each row in a table has associated `created_by` and `deleted_by` values stored
227/// in MVCC metadata columns. These values determine which transactions can see the row.
228///
229/// # Visibility Rules
230///
231/// A row is visible to a transaction if:
232/// 1. The creating transaction committed and its ID ≤ the snapshot ID
233/// 2. The row is not deleted (`deleted_by == TXN_ID_NONE`), or the deleting
234///    transaction either hasn't committed or has ID > snapshot ID
235///
236/// Special case: Rows created by the current transaction are visible to that
237/// transaction immediately, even before commit.
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub struct RowVersion {
240    /// Transaction ID that created this row version.
241    pub created_by: TxnId,
242    /// Transaction ID that deleted this row version (or [`TXN_ID_NONE`] if still visible).
243    pub deleted_by: TxnId,
244}
245
246impl RowVersion {
247    /// Create a new row version associated with `created_by`.
248    pub fn new(created_by: TxnId) -> Self {
249        Self {
250            created_by,
251            deleted_by: TXN_ID_NONE,
252        }
253    }
254
255    /// Soft-delete the row version by the given transaction.
256    pub fn delete(&mut self, deleted_by: TxnId) {
257        self.deleted_by = deleted_by;
258    }
259
260    /// Basic visibility check using only numeric ordering.
261    ///
262    /// This retains the previous behaviour and is primarily used in tests.
263    pub fn is_visible(&self, snapshot_txn_id: TxnId) -> bool {
264        self.created_by <= snapshot_txn_id
265            && (self.deleted_by == TXN_ID_NONE || self.deleted_by > snapshot_txn_id)
266    }
267
268    /// Determine whether the row is visible for the supplied snapshot using full MVCC rules.
269    pub fn is_visible_for(&self, manager: &TxnIdManager, snapshot: TransactionSnapshot) -> bool {
270        tracing::trace!(
271            "[MVCC] is_visible_for: created_by={}, deleted_by={}, snapshot.txn_id={}, snapshot.snapshot_id={}",
272            self.created_by,
273            self.deleted_by,
274            snapshot.txn_id,
275            snapshot.snapshot_id
276        );
277
278        // Rows created inside the current transaction are visible unless this
279        // transaction also deleted them.
280        // IMPORTANT: TXN_ID_AUTO_COMMIT is never treated as "current transaction"
281        if self.created_by == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT {
282            let visible = self.deleted_by != snapshot.txn_id;
283            tracing::trace!("[MVCC] created by current txn, visible={}", visible);
284            return visible;
285        }
286
287        // Ignore rows whose creator has not committed yet.
288        let creator_status = manager.status(self.created_by);
289        tracing::trace!("[MVCC] creator_status={:?}", creator_status);
290        if !creator_status.is_committed() {
291            tracing::trace!("[MVCC] creator not committed, invisible");
292            return false;
293        }
294
295        if self.created_by > snapshot.snapshot_id {
296            tracing::trace!("[MVCC] created_by > snapshot_id, invisible");
297            return false;
298        }
299
300        match self.deleted_by {
301            TXN_ID_NONE => {
302                tracing::trace!("[MVCC] not deleted, visible");
303                true
304            }
305            tx if tx == snapshot.txn_id && snapshot.txn_id != TXN_ID_AUTO_COMMIT => {
306                tracing::trace!("[MVCC] deleted by current txn, invisible");
307                false
308            }
309            tx => {
310                if !manager.status(tx).is_committed() {
311                    // A different transaction marked the row deleted but has not
312                    // committed; the row remains visible to others.
313                    tracing::trace!("[MVCC] deleter not committed, visible");
314                    return true;
315                }
316                let visible = tx > snapshot.snapshot_id;
317                tracing::trace!("[MVCC] deleter committed, visible={}", visible);
318                visible
319            }
320        }
321    }
322}
323
324/// Transaction metadata captured when a transaction begins.
325///
326/// A snapshot contains two key pieces of information:
327/// - `txn_id`: The unique ID of this transaction (used for writes)
328/// - `snapshot_id`: The highest committed transaction ID at the time this transaction started
329///
330/// The `snapshot_id` determines which rows are visible: rows created by transactions
331/// with IDs ≤ `snapshot_id` are visible, while rows created by later transactions are not.
332/// This implements snapshot isolation.
333#[derive(Debug, Clone, Copy)]
334pub struct TransactionSnapshot {
335    /// The unique ID assigned to this transaction.
336    pub txn_id: TxnId,
337    /// The highest committed transaction ID when this transaction began.
338    pub snapshot_id: TxnId,
339}
340
341/// Transaction status values tracked by the manager.
342#[derive(Debug, Clone, Copy, PartialEq, Eq)]
343pub enum TxnStatus {
344    Active,
345    Committed,
346    Aborted,
347    None,
348}
349
350impl TxnStatus {
351    pub fn is_committed(self) -> bool {
352        matches!(self, TxnStatus::Committed)
353    }
354
355    pub fn is_active(self) -> bool {
356        matches!(self, TxnStatus::Active)
357    }
358
359    pub fn is_aborted(self) -> bool {
360        matches!(self, TxnStatus::Aborted)
361    }
362}
363
364// ============================================================================
365// Arrow helpers for MVCC column construction
366// ============================================================================
367
368/// Build MVCC columns (row_id, created_by, deleted_by) for INSERT/CTAS operations.
369pub fn build_insert_mvcc_columns(
370    row_count: usize,
371    start_row_id: RowId,
372    creator_txn_id: TxnId,
373    deleted_marker: TxnId,
374) -> (ArrayRef, ArrayRef, ArrayRef) {
375    let mut row_builder = UInt64Builder::with_capacity(row_count);
376    for offset in 0..row_count {
377        row_builder.append_value(start_row_id + offset as u64);
378    }
379
380    let mut created_builder = UInt64Builder::with_capacity(row_count);
381    let mut deleted_builder = UInt64Builder::with_capacity(row_count);
382    for _ in 0..row_count {
383        created_builder.append_value(creator_txn_id);
384        deleted_builder.append_value(deleted_marker);
385    }
386
387    (
388        Arc::new(row_builder.finish()) as ArrayRef,
389        Arc::new(created_builder.finish()) as ArrayRef,
390        Arc::new(deleted_builder.finish()) as ArrayRef,
391    )
392}
393
394/// Build MVCC field definitions (row_id, created_by, deleted_by).
395pub fn build_mvcc_fields() -> Vec<Field> {
396    vec![
397        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
398        Field::new(CREATED_BY_COLUMN_NAME, DataType::UInt64, false),
399        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
400    ]
401}
402
403/// Build field with field_id metadata for a user column.
404pub fn build_field_with_metadata(
405    name: &str,
406    data_type: DataType,
407    nullable: bool,
408    field_id: FieldId,
409) -> Field {
410    let mut metadata = HashMap::with_capacity(1);
411    metadata.insert(FIELD_ID_META_KEY.to_string(), field_id.to_string());
412    Field::new(name, data_type, nullable).with_metadata(metadata)
413}
414
415/// Build DELETE batch with row_id and deleted_by columns.
416pub fn build_delete_batch(
417    row_ids: Vec<RowId>,
418    deleted_by_txn_id: TxnId,
419) -> llkv_result::Result<RecordBatch> {
420    let row_count = row_ids.len();
421
422    let fields = vec![
423        Field::new(ROW_ID_COLUMN_NAME, DataType::UInt64, false),
424        Field::new(DELETED_BY_COLUMN_NAME, DataType::UInt64, false),
425    ];
426
427    let arrays: Vec<ArrayRef> = vec![
428        Arc::new(UInt64Array::from(row_ids)),
429        Arc::new(UInt64Array::from(vec![deleted_by_txn_id; row_count])),
430    ];
431
432    RecordBatch::try_new(Arc::new(Schema::new(fields)), arrays).map_err(Error::Arrow)
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_txn_id_manager_allocates_monotonic_ids() {
441        let manager = TxnIdManager::new();
442        let snapshot1 = manager.begin_transaction();
443        let snapshot2 = manager.begin_transaction();
444        assert!(snapshot2.txn_id > snapshot1.txn_id);
445    }
446
447    #[test]
448    fn test_row_visibility_simple() {
449        let manager = TxnIdManager::new();
450        let writer_snapshot = manager.begin_transaction();
451        let mut row = RowVersion::new(writer_snapshot.txn_id);
452
453        // Newly created rows are visible to the creating transaction.
454        assert!(row.is_visible(writer_snapshot.txn_id));
455        assert!(row.is_visible_for(&manager, writer_snapshot));
456
457        // After the writer commits, the row becomes visible to later snapshots.
458        manager.mark_committed(writer_snapshot.txn_id);
459        let committed = manager.last_committed();
460        assert!(row.is_visible(committed));
461
462        let reader_snapshot = manager.begin_transaction();
463        assert!(row.is_visible_for(&manager, reader_snapshot));
464
465        // Deleting transaction must commit before other readers stop seeing the row.
466        let deleter_snapshot = manager.begin_transaction();
467        row.delete(deleter_snapshot.txn_id);
468        assert!(row.is_visible_for(&manager, reader_snapshot));
469
470        manager.mark_committed(deleter_snapshot.txn_id);
471        assert!(row.is_visible_for(&manager, reader_snapshot));
472
473        let post_delete_snapshot = manager.begin_transaction();
474        assert!(!row.is_visible_for(&manager, post_delete_snapshot));
475    }
476}