Skip to main content

reddb_server/storage/btree/
version.rs

1//! MVCC Version Management
2//!
3//! Multi-version concurrency control for B+ tree entries.
4//!
5//! # Design
6//!
7//! Each key can have multiple versions, forming a chain from newest to oldest.
8//! Transactions see a consistent snapshot based on their start timestamp.
9
10pub use crate::storage::primitives::ids::{current_timestamp, next_timestamp, Timestamp, TxnId};
11
12/// Version visibility for a transaction
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum VersionVisibility {
15    /// Version is visible to this transaction
16    Visible,
17    /// Version is not yet committed (created by active transaction)
18    Uncommitted,
19    /// Version was deleted before transaction started
20    Deleted,
21    /// Version was created after transaction started
22    Future,
23}
24
25/// A single version of a value
26#[derive(Debug, Clone)]
27pub struct Version<V: Clone> {
28    /// Transaction that created this version
29    pub created_by: TxnId,
30    /// Timestamp when created
31    pub created_at: Timestamp,
32    /// Transaction that deleted this version (0 if not deleted)
33    pub deleted_by: TxnId,
34    /// Timestamp when deleted (0 if not deleted)
35    pub deleted_at: Timestamp,
36    /// The value (None if this is a tombstone)
37    pub value: Option<V>,
38    /// Pointer to older version
39    pub prev: Option<Box<Version<V>>>,
40}
41
42impl<V: Clone> Version<V> {
43    /// Create new version
44    pub fn new(value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
45        Self {
46            created_by: txn_id,
47            created_at: timestamp,
48            deleted_by: TxnId::ZERO,
49            deleted_at: Timestamp::EPOCH,
50            value: Some(value),
51            prev: None,
52        }
53    }
54
55    /// Create tombstone version (delete marker)
56    pub fn tombstone(txn_id: TxnId, timestamp: Timestamp) -> Self {
57        Self {
58            created_by: txn_id,
59            created_at: timestamp,
60            deleted_by: TxnId::ZERO,
61            deleted_at: Timestamp::EPOCH,
62            value: None,
63            prev: None,
64        }
65    }
66
67    /// Check if this version is a tombstone
68    pub fn is_tombstone(&self) -> bool {
69        self.value.is_none()
70    }
71
72    /// Check if this version is deleted
73    pub fn is_deleted(&self) -> bool {
74        !self.deleted_by.is_zero()
75    }
76
77    /// Mark as deleted by transaction
78    pub fn mark_deleted(&mut self, txn_id: TxnId, timestamp: Timestamp) {
79        self.deleted_by = txn_id;
80        self.deleted_at = timestamp;
81    }
82
83    /// Check visibility for a snapshot
84    pub fn check_visibility(&self, snapshot: &Snapshot) -> VersionVisibility {
85        // If created by an uncommitted transaction (not in snapshot)
86        if !self.created_by.is_zero() && !snapshot.is_committed(self.created_by) {
87            if self.created_by == snapshot.txn_id {
88                // Created by this transaction - visible
89                if self.is_deleted() && self.deleted_by == snapshot.txn_id {
90                    return VersionVisibility::Deleted;
91                }
92                return VersionVisibility::Visible;
93            }
94            return VersionVisibility::Uncommitted;
95        }
96
97        // If created after snapshot started
98        if self.created_at > snapshot.start_ts {
99            return VersionVisibility::Future;
100        }
101
102        // If deleted by a committed transaction before snapshot
103        if self.is_deleted()
104            && snapshot.is_committed(self.deleted_by)
105            && self.deleted_at <= snapshot.start_ts
106        {
107            return VersionVisibility::Deleted;
108        }
109
110        // Visible
111        VersionVisibility::Visible
112    }
113}
114
115/// Version chain for a key (head is newest)
116#[derive(Debug, Clone)]
117pub struct VersionChain<V: Clone> {
118    /// Head of version chain (newest)
119    head: Option<Box<Version<V>>>,
120    /// Number of versions in chain
121    version_count: usize,
122    /// Oldest visible timestamp (for GC)
123    oldest_ts: Timestamp,
124}
125
126impl<V: Clone> VersionChain<V> {
127    /// Create empty chain
128    pub fn new() -> Self {
129        Self {
130            head: None,
131            version_count: 0,
132            oldest_ts: Timestamp::EPOCH,
133        }
134    }
135
136    /// Create chain with initial version
137    pub fn with_value(value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
138        Self {
139            head: Some(Box::new(Version::new(value, txn_id, timestamp))),
140            version_count: 1,
141            oldest_ts: timestamp,
142        }
143    }
144
145    /// Check if chain is empty
146    pub fn is_empty(&self) -> bool {
147        self.head.is_none()
148    }
149
150    /// Get number of versions
151    pub fn len(&self) -> usize {
152        self.version_count
153    }
154
155    /// Get visible value for snapshot
156    pub fn get(&self, snapshot: &Snapshot) -> Option<&V> {
157        let mut current = self.head.as_ref();
158
159        while let Some(version) = current {
160            match version.check_visibility(snapshot) {
161                VersionVisibility::Visible => {
162                    return version.value.as_ref();
163                }
164                VersionVisibility::Deleted => {
165                    return None;
166                }
167                VersionVisibility::Uncommitted | VersionVisibility::Future => {
168                    // Skip to older version
169                    current = version.prev.as_ref();
170                }
171            }
172        }
173
174        None
175    }
176
177    /// Insert new version at head
178    pub fn insert(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
179        let mut new_version = Box::new(Version::new(value, txn_id, timestamp));
180        new_version.prev = self.head.take();
181        self.head = Some(new_version);
182        self.version_count += 1;
183
184        if self.oldest_ts.is_epoch() {
185            self.oldest_ts = timestamp;
186        }
187    }
188
189    /// Update with new version (creates new version, points to old)
190    pub fn update(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
191        self.insert(value, txn_id, timestamp);
192    }
193
194    /// Delete (creates tombstone version)
195    pub fn delete(&mut self, txn_id: TxnId, timestamp: Timestamp) {
196        let mut tombstone = Box::new(Version::tombstone(txn_id, timestamp));
197        tombstone.prev = self.head.take();
198        self.head = Some(tombstone);
199        self.version_count += 1;
200    }
201
202    /// Get head version (for write conflict detection)
203    pub fn head(&self) -> Option<&Version<V>> {
204        self.head.as_ref().map(|v| v.as_ref())
205    }
206
207    /// Get head version mutable
208    pub fn head_mut(&mut self) -> Option<&mut Version<V>> {
209        self.head.as_mut().map(|v| v.as_mut())
210    }
211
212    /// Garbage collect versions older than watermark
213    pub fn gc(&mut self, watermark: Timestamp) -> usize {
214        let mut removed = 0;
215
216        // Find the last version that is still needed
217        let mut current = &mut self.head;
218        let mut found_visible = false;
219
220        while let Some(version) = current {
221            // Keep at least one version before watermark for visibility
222            if version.created_at <= watermark {
223                if found_visible {
224                    // Can remove this and all older versions
225                    if let Some(prev) = version.prev.take() {
226                        removed += 1 + self.count_chain(&prev);
227                    }
228                    break;
229                }
230                found_visible = true;
231            }
232            current = &mut version.prev;
233        }
234
235        self.version_count -= removed;
236        removed
237    }
238
239    /// Count versions in a chain
240    fn count_chain(&self, version: &Version<V>) -> usize {
241        let mut count = 1;
242        let mut current = version.prev.as_ref();
243        while let Some(v) = current {
244            count += 1;
245            current = v.prev.as_ref();
246        }
247        count
248    }
249
250    /// Check if all versions are tombstones (for compaction)
251    pub fn is_all_deleted(&self) -> bool {
252        let mut current = self.head.as_ref();
253        while let Some(version) = current {
254            if !version.is_tombstone() {
255                return false;
256            }
257            current = version.prev.as_ref();
258        }
259        true
260    }
261
262    /// Get oldest timestamp in chain
263    pub fn oldest_timestamp(&self) -> Timestamp {
264        self.oldest_ts
265    }
266}
267
268impl<V: Clone> Default for VersionChain<V> {
269    fn default() -> Self {
270        Self::new()
271    }
272}
273
274/// Transaction snapshot for consistent reads
275#[derive(Debug, Clone)]
276pub struct Snapshot {
277    /// This transaction's ID
278    pub txn_id: TxnId,
279    /// Snapshot start timestamp
280    pub start_ts: Timestamp,
281    /// Set of active (uncommitted) transactions at snapshot time
282    active_txns: Vec<TxnId>,
283    /// Set of committed transactions visible to this snapshot
284    committed_txns: Vec<TxnId>,
285}
286
287impl Snapshot {
288    /// Create new snapshot
289    pub fn new(txn_id: TxnId, start_ts: Timestamp) -> Self {
290        Self {
291            txn_id,
292            start_ts,
293            active_txns: Vec::new(),
294            committed_txns: Vec::new(),
295        }
296    }
297
298    /// Create snapshot with active transactions
299    pub fn with_active(txn_id: TxnId, start_ts: Timestamp, active: Vec<TxnId>) -> Self {
300        Self {
301            txn_id,
302            start_ts,
303            active_txns: active,
304            committed_txns: Vec::new(),
305        }
306    }
307
308    /// Add committed transaction to snapshot
309    pub fn add_committed(&mut self, txn_id: TxnId) {
310        if !self.committed_txns.contains(&txn_id) {
311            self.committed_txns.push(txn_id);
312        }
313    }
314
315    /// Check if transaction is committed (visible to this snapshot)
316    pub fn is_committed(&self, txn_id: TxnId) -> bool {
317        // Transaction 0 is always committed (initial state)
318        if txn_id.is_zero() {
319            return true;
320        }
321
322        // If in active set, not committed
323        if self.active_txns.contains(&txn_id) {
324            return false;
325        }
326
327        // If in committed set, committed
328        if self.committed_txns.contains(&txn_id) {
329            return true;
330        }
331
332        // If started before snapshot and not in active set, committed
333        // (This is a simplification - real impl would track commit timestamps)
334        true
335    }
336
337    /// Check if transaction is active (uncommitted)
338    pub fn is_active(&self, txn_id: TxnId) -> bool {
339        self.active_txns.contains(&txn_id)
340    }
341}
342
343/// Transaction state
344#[derive(Debug, Clone, Copy, PartialEq, Eq)]
345pub enum TxnState {
346    /// Transaction is active
347    Active,
348    /// Transaction is committed
349    Committed,
350    /// Transaction is aborted
351    Aborted,
352}
353
354/// Active transaction tracking
355#[derive(Debug)]
356pub struct ActiveTransaction {
357    /// Transaction ID
358    pub id: TxnId,
359    /// Start timestamp
360    pub start_ts: Timestamp,
361    /// State
362    pub state: TxnState,
363    /// Snapshot for reads
364    pub snapshot: Snapshot,
365    /// Write set (keys modified)
366    write_set: Vec<Vec<u8>>,
367    /// Read set (keys read - for validation)
368    read_set: Vec<Vec<u8>>,
369}
370
371impl ActiveTransaction {
372    /// Create new transaction
373    pub fn new(id: TxnId, active_txns: Vec<TxnId>) -> Self {
374        let start_ts = next_timestamp();
375        Self {
376            id,
377            start_ts,
378            state: TxnState::Active,
379            snapshot: Snapshot::with_active(id, start_ts, active_txns),
380            write_set: Vec::new(),
381            read_set: Vec::new(),
382        }
383    }
384
385    /// Record a read
386    pub fn record_read(&mut self, key: &[u8]) {
387        if !self.read_set.iter().any(|k| k == key) {
388            self.read_set.push(key.to_vec());
389        }
390    }
391
392    /// Record a write
393    pub fn record_write(&mut self, key: &[u8]) {
394        if !self.write_set.iter().any(|k| k == key) {
395            self.write_set.push(key.to_vec());
396        }
397    }
398
399    /// Get write set
400    pub fn write_set(&self) -> &[Vec<u8>] {
401        &self.write_set
402    }
403
404    /// Get read set
405    pub fn read_set(&self) -> &[Vec<u8>] {
406        &self.read_set
407    }
408
409    /// Mark as committed
410    pub fn commit(&mut self) {
411        self.state = TxnState::Committed;
412    }
413
414    /// Mark as aborted
415    pub fn abort(&mut self) {
416        self.state = TxnState::Aborted;
417    }
418
419    /// Check if active
420    pub fn is_active(&self) -> bool {
421        self.state == TxnState::Active
422    }
423}
424
425#[cfg(test)]
426mod tests {
427    use super::*;
428
429    #[test]
430    fn test_version_chain_basic() {
431        let mut chain: VersionChain<String> = VersionChain::new();
432        assert!(chain.is_empty());
433
434        chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
435        assert_eq!(chain.len(), 1);
436
437        chain.update("v2".to_string(), TxnId(2), Timestamp(2));
438        assert_eq!(chain.len(), 2);
439    }
440
441    #[test]
442    fn test_version_visibility() {
443        let mut chain: VersionChain<String> = VersionChain::new();
444
445        // Insert by txn 1
446        chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
447
448        // Update by txn 2
449        chain.update("v2".to_string(), TxnId(2), Timestamp(2));
450
451        // Snapshot at timestamp 1 should see v1
452        let _snap1 = Snapshot::new(TxnId(3), Timestamp(1));
453        // Note: In simplified impl, both versions are visible since we don't
454        // track commit status precisely. Real impl would check commit timestamps.
455
456        // Snapshot at timestamp 2 should see v2
457        let snap2 = Snapshot::new(TxnId(3), Timestamp(2));
458        assert_eq!(chain.get(&snap2), Some(&"v2".to_string()));
459    }
460
461    #[test]
462    fn test_version_delete() {
463        let mut chain: VersionChain<String> = VersionChain::new();
464
465        chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
466        chain.delete(TxnId(2), Timestamp(2));
467
468        // Snapshot at timestamp 2 should see tombstone (no value)
469        let snap = Snapshot::new(TxnId(3), Timestamp(2));
470        assert!(chain.get(&snap).is_none());
471    }
472
473    #[test]
474    fn test_version_gc() {
475        let mut chain: VersionChain<String> = VersionChain::new();
476
477        chain.insert("v1".to_string(), TxnId(1), Timestamp(1));
478        chain.update("v2".to_string(), TxnId(2), Timestamp(2));
479        chain.update("v3".to_string(), TxnId(3), Timestamp(3));
480        chain.update("v4".to_string(), TxnId(4), Timestamp(4));
481
482        assert_eq!(chain.len(), 4);
483
484        // GC versions older than timestamp 3
485        let removed = chain.gc(Timestamp(3));
486        assert!(removed > 0);
487        assert!(chain.len() < 4);
488    }
489
490    #[test]
491    fn test_snapshot() {
492        let snap = Snapshot::new(TxnId(5), Timestamp(10));
493
494        // Transaction 0 is always committed
495        assert!(snap.is_committed(TxnId::ZERO));
496
497        // Transactions started before snapshot are committed
498        assert!(snap.is_committed(TxnId(3)));
499    }
500
501    #[test]
502    fn test_snapshot_with_active() {
503        let snap = Snapshot::with_active(TxnId(5), Timestamp(10), vec![TxnId(3), TxnId(4)]);
504
505        // Active transactions are not committed
506        assert!(!snap.is_committed(TxnId(3)));
507        assert!(!snap.is_committed(TxnId(4)));
508
509        // Other transactions are committed
510        assert!(snap.is_committed(TxnId(1)));
511        assert!(snap.is_committed(TxnId(2)));
512    }
513
514    #[test]
515    fn test_active_transaction() {
516        let mut txn = ActiveTransaction::new(TxnId(1), vec![]);
517
518        assert!(txn.is_active());
519
520        txn.record_read(b"key1");
521        txn.record_write(b"key2");
522
523        assert_eq!(txn.read_set().len(), 1);
524        assert_eq!(txn.write_set().len(), 1);
525
526        txn.commit();
527        assert!(!txn.is_active());
528        assert_eq!(txn.state, TxnState::Committed);
529    }
530
531    #[test]
532    fn test_timestamp_generation() {
533        let ts1 = next_timestamp();
534        let ts2 = next_timestamp();
535        let ts3 = next_timestamp();
536
537        assert!(ts2 > ts1);
538        assert!(ts3 > ts2);
539    }
540}