fluxmap 0.3.7

A thread-safe, transactional, and concurrent in-memory key-value store for Rust. Offers ACID guarantees with Serializable Snapshot Isolation (SSI) and optional durability via a Write-Ahead Log (WAL). Designed for ease of use, high performance, and modern async Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
//! Manages transactions, snapshots, and versions for MVCC.
//!
//! This module is the heart of FluxMap's concurrency control system. It implements
//! a Serializable Snapshot Isolation (SSI) protocol, which provides the highest
//! level of isolation defined by the SQL standard, preventing all common read/write
//! anomalies, including write skew.
//!
//! # Core Components
//!
//! -   **`TransactionManager`**: A global service that creates new transactions,
//!     tracks their status (`Active`, `Committed`, `Aborted`), and validates
//!     them upon commit.
//!
//! -   **`Transaction`**: Represents a single, isolated operation. It holds a
//!     `Snapshot` of the database at the time it began, and it tracks its own
//!     reads and writes in `read_set` and `write_set` to detect conflicts.
//!     Uncommitted writes are stored in a private `workspace`.
//!
//! -   **`Snapshot`**: An immutable view of the database state at a single point
//!     in time. It contains the information needed to determine which data
//!     versions are "visible" to its transaction.
//!
//! -   **`Version`**: A single version of a value, containing the value itself,
//!     the ID of the transaction that created it (`creator_txid`), and the ID
//!     of the transaction that expired it (`expirer_txid`).

use crate::db::IsolationLevel;
use crate::error::FluxError;
use dashmap::{DashMap, DashSet};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};

// --- Core MVCC Types ---

/// A unique identifier for a transaction.
pub type TxId = u64;

/// The status of a transaction.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionStatus {
    /// The transaction is currently in progress.
    Active,
    /// The transaction has successfully committed.
    Committed,
    /// The transaction has been aborted and its changes were discarded.
    Aborted,
}

/// Represents a transaction's private workspace for uncommitted changes.
///
/// A `Some(Arc<V>)` indicates an insertion or update, while `None` indicates a deletion.
pub type Workspace<K, V> = HashMap<K, Option<Arc<V>>>;

/// Manages the lifecycle and status of all transactions in the system.
///
/// This is the central authority for creating transactions, managing their
/// state, and ensuring serializability by detecting and resolving conflicts.
pub struct TransactionManager<K: Eq + std::hash::Hash, V> {
    /// The next transaction ID to be allocated.
    next_txid: AtomicU64,
    /// A map tracking the status of every transaction (Active, Committed, or Aborted).
    statuses: DashMap<TxId, TransactionStatus>,
    /// A map of currently active transactions.
    active_transactions: DashMap<TxId, Arc<Transaction<K, V>>>,
    /// The minimum transaction ID that must be kept in the `statuses` map.
    /// This is used by the vacuum process to prune old transaction statuses.
    pub min_retainable_txid: AtomicU64,
    /// Tracks which active transactions have read which keys.
    /// This is a key part of the SSI conflict detection mechanism.
    /// `DashMap<Key, DashSet<TxId>>`
    pub read_trackers: DashMap<K, DashSet<TxId>>,
    _phantom: std::marker::PhantomData<(K, V)>,
}

// ...

impl<K, V> TransactionManager<K, V>
where
    K: Eq + std::hash::Hash + Clone + Serialize + DeserializeOwned,
    V: Clone + Serialize + DeserializeOwned,
{
    /// Creates a new `TransactionManager`.
    pub fn new() -> Self {
        // TXID 0 is reserved for pre-existing data from snapshots.
        Self {
            next_txid: AtomicU64::new(1),
            statuses: DashMap::new(),
            active_transactions: DashMap::new(),
            min_retainable_txid: AtomicU64::new(0), // Initialize to 0
            read_trackers: DashMap::new(),          // Initialize new field
            _phantom: std::marker::PhantomData,
        }
    }

    /// Allocates a new, unique transaction ID.
    pub fn new_txid(&self) -> TxId {
        self.next_txid.fetch_add(1, Ordering::SeqCst)
    }

    /// Begins a new transaction.
    ///
    /// This creates a new transaction ID, generates a snapshot for it,
    /// and registers it as active.
    pub fn begin(&self) -> Arc<Transaction<K, V>> {
        let txid = self.new_txid();
        self.statuses.insert(txid, TransactionStatus::Active);
        let snapshot = self.create_snapshot(txid);
        let tx = Arc::new(Transaction::new(txid, snapshot));
        self.active_transactions.insert(txid, tx.clone());
        tx
    }

    /// Attempts to commit a transaction.
    ///
    /// This performs the Serializable Snapshot Isolation (SSI) conflict checks.
    /// If a conflict is detected, the transaction is aborted and a `SerializationConflict`
    /// error is returned. Otherwise, the transaction is marked as committed.
    pub fn commit<F>(
        &self,
        tx: &Transaction<K, V>,
        on_pre_commit: F,
        isolation_level: IsolationLevel,
    ) -> Result<(), FluxError>
    where
        F: FnOnce() -> Result<(), FluxError>,
        K: Ord + std::borrow::Borrow<str>,
    {
        if isolation_level == IsolationLevel::Serializable {
            // SSI: Incoming conflict check.
            // If another transaction has written to a key that this transaction read,
            // or inserted into a range this transaction scanned, and that other transaction
            // committed, then this transaction must abort.
            if tx.in_conflict.load(Ordering::Acquire) {
                self.abort(tx);
                return Err(FluxError::SerializationConflict);
            }

            // SSI: Outgoing conflict check for phantoms.
            // Check if this transaction's insertions create a phantom for another active transaction.
            for inserted_key in tx.insert_set.iter() {
                for other_tx_entry in self.active_transactions.iter() {
                    let other_tx = other_tx_entry.value();
                    if other_tx.id == tx.id {
                        continue;
                    }

                    // Check against range scans
                    let other_ranges = other_tx.range_scans.read().unwrap();
                    for (start, end) in other_ranges.iter() {
                        if inserted_key.key() >= start && inserted_key.key() <= end {
                            other_tx.in_conflict.store(true, Ordering::Release);
                            break; // Move to the next active transaction
                        }
                    }

                    if other_tx.in_conflict.load(Ordering::Relaxed) {
                        continue;
                    }

                    // Check against prefix scans
                    let other_prefixes = other_tx.prefix_scans.read().unwrap();
                    for prefix in other_prefixes.iter() {
                        if inserted_key.key().borrow().starts_with(prefix) {
                            other_tx.in_conflict.store(true, Ordering::Release);
                            break; // Move to the next active transaction
                        }
                    }
                }
            }

            // SSI: Outgoing conflict check for read-write conflicts.
            // Notify other transactions that read keys we are now writing to.
            for written_key in tx.write_set.iter() {
                let key: &K = written_key.key();
                if let Some(reader_tx_ids) = self.read_trackers.get(key.borrow()) {
                    for reader_tx_id in reader_tx_ids.iter() {
                        // Only signal other active transactions, not ourselves.
                        if *reader_tx_id == tx.id {
                            continue;
                        }
                        if let Some(reader_tx_entry) = self.active_transactions.get(&reader_tx_id) {
                            reader_tx_entry
                                .value()
                                .in_conflict
                                .store(true, Ordering::Release);
                        }
                    }
                }
            }
        }

        // The transaction is valid to commit from an SSI perspective.
        // Or we are in a weaker isolation level.
        // Now, execute the pre-commit hook (e.g., for WAL logging).
        if let Err(e) = on_pre_commit() {
            // If the pre-commit hook fails, we must abort the transaction.
            self.abort(tx);
            return Err(e);
        }

        // All checks passed and the pre-commit hook succeeded. Finalize the commit.
        self.cleanup_read_trackers(tx);
        self.statuses.insert(tx.id, TransactionStatus::Committed);
        self.active_transactions.remove(&tx.id);
        Ok(())
    }

    /// Aborts a transaction, marking it as aborted and cleaning up its state.
    pub fn abort(&self, tx: &Transaction<K, V>) {
        self.cleanup_read_trackers(tx);
        self.statuses.insert(tx.id, TransactionStatus::Aborted);
        self.active_transactions.remove(&tx.id);
    }

    /// Returns a set of all currently active transaction IDs.
    pub fn get_active_txids(&self) -> HashSet<TxId> {
        self.active_transactions
            .iter()
            .map(|entry| *entry.key())
            .collect()
    }

    /// Returns the next transaction ID that will be allocated.
    pub fn get_current_txid(&self) -> TxId {
        self.next_txid.load(Ordering::SeqCst)
    }

    /// Retrieves the status of a given transaction ID.
    pub fn get_status(&self, txid: TxId) -> Option<TransactionStatus> {
        self.statuses.get(&txid).map(|s| *s)
    }

    /// Removes transaction statuses that are older than the `min_retainable_txid`.
    /// This is called by the vacuum process to prevent unbounded growth of the `statuses` map.
    pub fn prune_statuses(&self) {
        let min_txid = self.min_retainable_txid.load(Ordering::Acquire);
        self.statuses.retain(|&txid, _| txid >= min_txid);
    }

    /// Returns the number of transaction statuses currently stored. Used for metrics/testing.
    pub fn statuses_len(&self) -> usize {
        self.statuses.len()
    }

    /// Removes the given transaction from all read trackers it was a part of.
    fn cleanup_read_trackers(&self, tx: &Transaction<K, V>) {
        use dashmap::mapref::entry::Entry;

        for read_key in tx.read_set.iter() {
            // Use the entry API to ensure the check-and-remove is atomic.
            if let Entry::Occupied(mut o) = self.read_trackers.entry(read_key.key().clone()) {
                o.get_mut().remove(&tx.id);
                if o.get().is_empty() {
                    o.remove();
                }
            }
        }
    }

    /// Creates a new snapshot for a transaction.
    fn create_snapshot(&self, txid: TxId) -> Snapshot {
        let xmax = self.next_txid.load(Ordering::SeqCst);
        let active_txids: HashSet<u64> = self
            .active_transactions
            .iter()
            .map(|entry| *entry.key())
            .collect();
        let xmin = active_txids.iter().min().copied().unwrap_or(xmax);

        Snapshot {
            txid,
            xmin,
            xmax,
            xip: active_txids,
        }
    }
}

/// Represents a transaction's consistent view of the database.
///
/// The snapshot defines which versions of data are visible to the transaction.
/// The visibility rules are based on the transaction IDs of the versions
/// relative to the state of the system when the snapshot was created.
#[derive(Debug, Clone)]
pub struct Snapshot {
    /// The ID of the transaction this snapshot belongs to.
    pub txid: TxId,
    /// The oldest active transaction ID at the time of snapshot creation.
    /// Any transaction with an ID less than `xmin` is considered committed.
    pub xmin: TxId,
    /// The next transaction ID to be handed out. Any transaction with an ID
    /// greater than or equal to `xmax` was not yet started when the snapshot
    /// was created and is therefore invisible.
    pub xmax: TxId,
    /// The set of in-progress (active) transaction IDs at the time of snapshot creation.
    pub xip: HashSet<TxId>,
}

impl Snapshot {
    /// Checks if a given version is visible to the transaction owning this snapshot.
    ///
    /// # Visibility Rules
    ///
    /// A version is visible if and only if:
    /// 1. The transaction that created it (`creator_txid`) had committed before this
    ///    snapshot was taken.
    ///    - `creator_txid` is not in `xip`.
    ///    - `creator_txid` is less than `xmax`.
    ///    - `creator_txid` is not our own `txid`.
    ///
    /// 2. The transaction that expired it (`expirer_txid`), if any, had *not* yet
    ///    committed when this snapshot was taken.
    ///    - `expirer_txid` is 0 (not expired), OR
    ///    - `expirer_txid` belongs to a transaction that was still in progress (`xip`)
    ///      or had not yet started (`>= xmax`).
    pub fn is_visible<K, V, F>(
        &self,
        version: &Version<V>,
        tx_manager: &TransactionManager<K, F>,
    ) -> bool
    where
        K: Eq + std::hash::Hash + Clone + Serialize + DeserializeOwned,
        F: Clone + Serialize + DeserializeOwned,
    {
        // Rule 1: The creating transaction must be visible to us.
        let creator_visible = if version.creator_txid == self.txid {
            // Our own writes are not visible through the snapshot. They are read from the workspace.
            false
        } else if version.creator_txid >= self.xmax {
            // Created by a transaction that started after our snapshot. Not visible.
            false
        } else if self.xip.contains(&version.creator_txid) {
            // Created by a transaction that was concurrent and has not yet committed. Not visible.
            false
        } else {
            // It's an older transaction. Check its status.
            // If its status is pruned, we can assume it was committed because it's older
            // than the oldest active transaction (`xmin`).
            match tx_manager.get_status(version.creator_txid) {
                Some(status) => status == TransactionStatus::Committed,
                None => version.creator_txid < self.xmin, // Assume committed if pruned and older than oldest active
            }
        };

        if !creator_visible {
            return false;
        }

        // Rule 2: The expiring transaction (if any) must NOT be visible to us.
        let expirer_id = version.expirer_txid.load(Ordering::Acquire);

        if expirer_id == 0 {
            return true; // Not expired, so the version is visible.
        }

        if expirer_id == self.txid {
            return false; // Expired by our own transaction. Not visible.
        }

        let expirer_visible = if expirer_id >= self.xmax {
            // Expired by a transaction that started after our snapshot. Expiration is not visible.
            false
        } else if self.xip.contains(&expirer_id) {
            // Expired by a transaction that was concurrent. Expiration is not visible.
            false
        } else {
            // Expired by an older transaction. Check its status.
            match tx_manager.get_status(expirer_id) {
                Some(status) => status == TransactionStatus::Committed,
                None => expirer_id < self.xmin, // Assume committed if pruned and older than oldest active
            }
        };

        // If the expirer is visible, the deletion is visible, so the version is NOT visible.
        // Therefore, we return the inverse of `expirer_visible`.
        !expirer_visible
    }
}

/// Represents a single, isolated transaction.
#[derive(Debug)]
pub struct Transaction<K: Eq + std::hash::Hash, V> {
    /// The unique ID of this transaction.
    pub id: TxId,
    /// The consistent snapshot of the database state for this transaction.
    pub snapshot: Snapshot,
    /// The set of keys read by this transaction, used for SSI conflict detection.
    /// Maps Key -> Creator TxId of the version that was read.
    pub read_set: DashMap<K, TxId>,
    /// The set of keys written to by this transaction, used for SSI conflict detection.
    pub write_set: DashSet<K>,
    /// The set of keys inserted by this transaction, used for phantom detection.
    pub insert_set: DashSet<K>,
    /// The set of ranges scanned by this transaction, used for phantom detection.
    pub range_scans: RwLock<Vec<(K, K)>>,
    /// The set of prefixes scanned by this transaction, used for phantom detection.
    pub prefix_scans: RwLock<Vec<String>>,
    /// A flag indicating if a read-write conflict has been detected by another committing transaction.
    /// If true, this transaction will be forced to abort.
    pub in_conflict: AtomicBool,
    /// The transaction's private workspace for pending changes (inserts, updates, deletes).
    /// These changes are only applied to the main database upon a successful commit.
    pub workspace: RwLock<Workspace<K, V>>,
    /// A stack of named savepoints, each holding a snapshot of the workspace.
    pub savepoints: RwLock<Vec<(String, Workspace<K, V>)>>,
    _phantom: std::marker::PhantomData<V>,
}

impl<K, V> Transaction<K, V>
where
    K: Eq + std::hash::Hash + Serialize + DeserializeOwned,
    V: Serialize + DeserializeOwned,
{
    /// Creates a new transaction with a given ID and snapshot.
    pub fn new(id: TxId, snapshot: Snapshot) -> Self {
        Self {
            id,
            snapshot,
            read_set: DashMap::new(),
            write_set: DashSet::new(),
            insert_set: DashSet::new(),
            range_scans: RwLock::new(Vec::new()),
            prefix_scans: RwLock::new(Vec::new()),
            in_conflict: AtomicBool::new(false),
            workspace: RwLock::new(HashMap::new()), // Initialize workspace
            savepoints: RwLock::new(Vec::new()),
            _phantom: std::marker::PhantomData,
        }
    }
}

/// Represents a single version of a value in the multi-version store.
#[derive(Debug, Serialize, Deserialize)]
pub struct Version<V> {
    /// The actual value.
    pub value: V,
    /// The ID of the transaction that created this version.
    pub creator_txid: TxId,
    /// The ID of the transaction that expired (deleted) this version.
    /// A value of `0` means this version is not expired.
    pub expirer_txid: AtomicU64,
}