Skip to main content

featherdb_mvcc/
version.rs

1//! Version chain storage for MVCC
2
3use crate::Snapshot;
4use bytes::{Buf, BufMut};
5use featherdb_core::{Error, Result, TransactionId, Value};
6use std::collections::HashMap;
7use std::sync::atomic::{AtomicU64, Ordering};
8
9/// Pointer to a version in the version store
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct VersionPtr(pub u64);
12
13impl VersionPtr {
14    pub const NULL: VersionPtr = VersionPtr(0);
15
16    pub fn is_null(&self) -> bool {
17        self.0 == 0
18    }
19}
20
21/// A versioned value with MVCC metadata
22#[derive(Debug, Clone)]
23pub struct VersionedValue {
24    /// Current (latest) version data
25    pub current: Vec<Value>,
26    /// Transaction that created the current version
27    pub created_by: TransactionId,
28    /// Transaction that deleted this row (if any)
29    pub deleted_by: Option<TransactionId>,
30    /// Pointer to older versions
31    pub version_chain: Option<VersionPtr>,
32}
33
34impl VersionedValue {
35    /// Create a new versioned value
36    pub fn new(data: Vec<Value>, created_by: TransactionId) -> Self {
37        VersionedValue {
38            current: data,
39            created_by,
40            deleted_by: None,
41            version_chain: None,
42        }
43    }
44
45    /// Mark as deleted
46    pub fn mark_deleted(&mut self, deleted_by: TransactionId) {
47        self.deleted_by = Some(deleted_by);
48    }
49
50    /// Check if visible to a snapshot
51    pub fn is_visible(&self, snapshot: &Snapshot) -> bool {
52        snapshot.is_visible(self.created_by, self.deleted_by)
53    }
54
55    /// Serialize to bytes
56    pub fn serialize(&self) -> Vec<u8> {
57        let mut buf = Vec::new();
58
59        // created_by
60        buf.put_u64_le(self.created_by.0);
61
62        // deleted_by
63        match self.deleted_by {
64            Some(txn) => {
65                buf.put_u8(1);
66                buf.put_u64_le(txn.0);
67            }
68            None => {
69                buf.put_u8(0);
70            }
71        }
72
73        // version_chain
74        match self.version_chain {
75            Some(ptr) => {
76                buf.put_u8(1);
77                buf.put_u64_le(ptr.0);
78            }
79            None => {
80                buf.put_u8(0);
81            }
82        }
83
84        // Number of values
85        buf.put_u32_le(self.current.len() as u32);
86
87        // Values
88        for value in &self.current {
89            value.serialize(&mut buf);
90        }
91
92        buf
93    }
94
95    /// Deserialize from bytes
96    pub fn deserialize(data: &[u8]) -> Result<Self> {
97        let mut cursor = data;
98
99        if cursor.remaining() < 9 {
100            return Err(Error::Serialization("VersionedValue too small".into()));
101        }
102
103        let created_by = TransactionId(cursor.get_u64_le());
104
105        let deleted_by = if cursor.get_u8() == 1 {
106            Some(TransactionId(cursor.get_u64_le()))
107        } else {
108            None
109        };
110
111        let version_chain = if cursor.get_u8() == 1 {
112            Some(VersionPtr(cursor.get_u64_le()))
113        } else {
114            None
115        };
116
117        let value_count = cursor.get_u32_le() as usize;
118        let mut current = Vec::with_capacity(value_count);
119
120        for _ in 0..value_count {
121            current.push(Value::deserialize(&mut cursor)?);
122        }
123
124        Ok(VersionedValue {
125            current,
126            created_by,
127            deleted_by,
128            version_chain,
129        })
130    }
131}
132
133/// An old version in the version chain
134#[derive(Debug, Clone)]
135pub struct OldVersion {
136    /// Row data
137    pub data: Vec<Value>,
138    /// Transaction that created this version
139    pub created_by: TransactionId,
140    /// Transaction that deleted this version (replaced it)
141    pub deleted_by: Option<TransactionId>,
142    /// Pointer to even older version
143    pub prev_version: Option<VersionPtr>,
144}
145
146impl OldVersion {
147    /// Serialize to bytes
148    pub fn serialize(&self) -> Vec<u8> {
149        let mut buf = Vec::new();
150
151        buf.put_u64_le(self.created_by.0);
152
153        match self.deleted_by {
154            Some(txn) => {
155                buf.put_u8(1);
156                buf.put_u64_le(txn.0);
157            }
158            None => {
159                buf.put_u8(0);
160            }
161        }
162
163        match self.prev_version {
164            Some(ptr) => {
165                buf.put_u8(1);
166                buf.put_u64_le(ptr.0);
167            }
168            None => {
169                buf.put_u8(0);
170            }
171        }
172
173        buf.put_u32_le(self.data.len() as u32);
174        for value in &self.data {
175            value.serialize(&mut buf);
176        }
177
178        buf
179    }
180
181    /// Deserialize from bytes
182    pub fn deserialize(data: &[u8]) -> Result<Self> {
183        let mut cursor = data;
184
185        let created_by = TransactionId(cursor.get_u64_le());
186
187        let deleted_by = if cursor.get_u8() == 1 {
188            Some(TransactionId(cursor.get_u64_le()))
189        } else {
190            None
191        };
192
193        let prev_version = if cursor.get_u8() == 1 {
194            Some(VersionPtr(cursor.get_u64_le()))
195        } else {
196            None
197        };
198
199        let value_count = cursor.get_u32_le() as usize;
200        let mut data_values = Vec::with_capacity(value_count);
201
202        for _ in 0..value_count {
203            data_values.push(Value::deserialize(&mut cursor)?);
204        }
205
206        Ok(OldVersion {
207            data: data_values,
208            created_by,
209            deleted_by,
210            prev_version,
211        })
212    }
213}
214
215/// In-memory version store for old versions
216pub struct VersionStore {
217    /// Old versions, keyed by VersionPtr
218    versions: HashMap<VersionPtr, OldVersion>,
219    /// Next version pointer to allocate
220    next_ptr: AtomicU64,
221}
222
223impl VersionStore {
224    /// Create a new version store
225    pub fn new() -> Self {
226        VersionStore {
227            versions: HashMap::new(),
228            next_ptr: AtomicU64::new(1), // 0 is NULL
229        }
230    }
231
232    /// Insert an old version
233    pub fn insert(&mut self, version: OldVersion) -> VersionPtr {
234        let ptr = VersionPtr(self.next_ptr.fetch_add(1, Ordering::SeqCst));
235        self.versions.insert(ptr, version);
236        ptr
237    }
238
239    /// Get an old version
240    pub fn get(&self, ptr: VersionPtr) -> Option<&OldVersion> {
241        self.versions.get(&ptr)
242    }
243
244    /// Remove an old version (during GC)
245    pub fn remove(&mut self, ptr: VersionPtr) -> Option<OldVersion> {
246        self.versions.remove(&ptr)
247    }
248
249    /// Get the number of versions stored
250    pub fn len(&self) -> usize {
251        self.versions.len()
252    }
253
254    /// Check if empty
255    pub fn is_empty(&self) -> bool {
256        self.versions.is_empty()
257    }
258}
259
260impl Default for VersionStore {
261    fn default() -> Self {
262        Self::new()
263    }
264}
265
266/// Statistics from garbage collection
267#[derive(Debug, Clone, Default)]
268pub struct GcStats {
269    /// Number of versions removed
270    pub versions_removed: usize,
271    /// Approximate bytes freed
272    pub bytes_freed: usize,
273}
274
275impl VersionStore {
276    /// Garbage collect old versions that no active transaction can see.
277    ///
278    /// A version is garbage if:
279    /// 1. It was deleted by a transaction older than the oldest active transaction
280    ///    (meaning all active transactions can see the deletion)
281    /// 2. There's no active transaction that could possibly need this version
282    ///
283    /// # Arguments
284    /// * `oldest_active_txn` - The ID of the oldest active transaction, or None if no active transactions
285    ///
286    /// # Returns
287    /// Statistics about what was collected
288    pub fn gc(&mut self, oldest_active_txn: Option<featherdb_core::TransactionId>) -> GcStats {
289        let mut stats = GcStats::default();
290
291        // If no active transactions, everything can be garbage collected
292        // except versions that have never been deleted (they're still current)
293        let oldest = match oldest_active_txn {
294            Some(txn) => txn.0,
295            None => u64::MAX, // All versions with deleted_by can be collected
296        };
297
298        // Collect pointers to remove (can't modify while iterating)
299        let to_remove: Vec<VersionPtr> = self
300            .versions
301            .iter()
302            .filter_map(|(ptr, version)| {
303                // Version is garbage if it was deleted by a transaction
304                // that's older than all active transactions
305                // (meaning everyone can see the deletion)
306                if let Some(deleted_by) = version.deleted_by {
307                    if deleted_by.0 < oldest {
308                        // All active transactions can see this deletion,
309                        // so this old version is no longer needed
310                        return Some(*ptr);
311                    }
312                }
313                None
314            })
315            .collect();
316
317        // Remove garbage versions
318        for ptr in to_remove {
319            if let Some(version) = self.versions.remove(&ptr) {
320                stats.versions_removed += 1;
321                // Estimate bytes: rough size of the value data
322                stats.bytes_freed += version
323                    .data
324                    .iter()
325                    .map(|v| v.size_estimate())
326                    .sum::<usize>();
327            }
328        }
329
330        stats
331    }
332
333    /// Get all version pointers (for iteration/inspection)
334    pub fn pointers(&self) -> impl Iterator<Item = &VersionPtr> {
335        self.versions.keys()
336    }
337}
338
339/// Version chain helper for navigating versions
340pub struct VersionChain<'a> {
341    version_store: &'a VersionStore,
342    current: Option<VersionPtr>,
343}
344
345impl<'a> VersionChain<'a> {
346    /// Create a new version chain starting from a pointer
347    pub fn new(version_store: &'a VersionStore, start: Option<VersionPtr>) -> Self {
348        VersionChain {
349            version_store,
350            current: start,
351        }
352    }
353
354    /// Find a version visible to the given snapshot
355    pub fn find_visible(&mut self, snapshot: &Snapshot) -> Option<&'a OldVersion> {
356        while let Some(ptr) = self.current {
357            if let Some(version) = self.version_store.get(ptr) {
358                if snapshot.is_visible(version.created_by, version.deleted_by) {
359                    return Some(version);
360                }
361                self.current = version.prev_version;
362            } else {
363                break;
364            }
365        }
366        None
367    }
368}
369
370impl<'a> Iterator for VersionChain<'a> {
371    type Item = &'a OldVersion;
372
373    fn next(&mut self) -> Option<Self::Item> {
374        let ptr = self.current?;
375        let version = self.version_store.get(ptr)?;
376        self.current = version.prev_version;
377        Some(version)
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn test_versioned_value_serialization() {
387        let value = VersionedValue {
388            current: vec![Value::Integer(42), Value::Text("hello".into())],
389            created_by: TransactionId(10),
390            deleted_by: Some(TransactionId(20)),
391            version_chain: Some(VersionPtr(5)),
392        };
393
394        let serialized = value.serialize();
395        let deserialized = VersionedValue::deserialize(&serialized).unwrap();
396
397        assert_eq!(value.created_by, deserialized.created_by);
398        assert_eq!(value.deleted_by, deserialized.deleted_by);
399        assert_eq!(value.version_chain, deserialized.version_chain);
400        assert_eq!(value.current.len(), deserialized.current.len());
401    }
402
403    #[test]
404    fn test_version_store() {
405        let mut store = VersionStore::new();
406
407        let v1 = OldVersion {
408            data: vec![Value::Integer(1)],
409            created_by: TransactionId(1),
410            deleted_by: None,
411            prev_version: None,
412        };
413
414        let ptr1 = store.insert(v1);
415        assert!(!ptr1.is_null());
416        assert!(store.get(ptr1).is_some());
417    }
418
419    #[test]
420    fn test_version_chain() {
421        let mut store = VersionStore::new();
422
423        // Create a chain of versions
424        let v1 = OldVersion {
425            data: vec![Value::Integer(1)],
426            created_by: TransactionId(1),
427            deleted_by: Some(TransactionId(2)),
428            prev_version: None,
429        };
430        let ptr1 = store.insert(v1);
431
432        let v2 = OldVersion {
433            data: vec![Value::Integer(2)],
434            created_by: TransactionId(2),
435            deleted_by: Some(TransactionId(3)),
436            prev_version: Some(ptr1),
437        };
438        let ptr2 = store.insert(v2);
439
440        // Iterate through chain
441        let chain: Vec<_> = VersionChain::new(&store, Some(ptr2)).collect();
442        assert_eq!(chain.len(), 2);
443        assert_eq!(chain[0].data[0], Value::Integer(2));
444        assert_eq!(chain[1].data[0], Value::Integer(1));
445    }
446
447    #[test]
448    fn test_gc_removes_old_deleted_versions() {
449        let mut store = VersionStore::new();
450
451        // Insert version deleted by transaction 5
452        let v1 = OldVersion {
453            data: vec![Value::Integer(100)],
454            created_by: TransactionId(1),
455            deleted_by: Some(TransactionId(5)),
456            prev_version: None,
457        };
458        store.insert(v1);
459
460        // Insert version deleted by transaction 10
461        let v2 = OldVersion {
462            data: vec![Value::Integer(200)],
463            created_by: TransactionId(2),
464            deleted_by: Some(TransactionId(10)),
465            prev_version: None,
466        };
467        store.insert(v2);
468
469        // Insert version NOT deleted (still current)
470        let v3 = OldVersion {
471            data: vec![Value::Integer(300)],
472            created_by: TransactionId(3),
473            deleted_by: None,
474            prev_version: None,
475        };
476        store.insert(v3);
477
478        assert_eq!(store.len(), 3);
479
480        // GC with oldest active = 7
481        // Should remove v1 (deleted by 5 < 7)
482        // Should keep v2 (deleted by 10 >= 7)
483        // Should keep v3 (not deleted)
484        let stats = store.gc(Some(TransactionId(7)));
485
486        assert_eq!(stats.versions_removed, 1);
487        assert_eq!(store.len(), 2);
488    }
489
490    #[test]
491    fn test_gc_with_no_active_transactions() {
492        let mut store = VersionStore::new();
493
494        // Insert deleted versions
495        let v1 = OldVersion {
496            data: vec![Value::Integer(100)],
497            created_by: TransactionId(1),
498            deleted_by: Some(TransactionId(5)),
499            prev_version: None,
500        };
501        store.insert(v1);
502
503        let v2 = OldVersion {
504            data: vec![Value::Integer(200)],
505            created_by: TransactionId(2),
506            deleted_by: Some(TransactionId(10)),
507            prev_version: None,
508        };
509        store.insert(v2);
510
511        // Version not deleted
512        let v3 = OldVersion {
513            data: vec![Value::Integer(300)],
514            created_by: TransactionId(3),
515            deleted_by: None,
516            prev_version: None,
517        };
518        store.insert(v3);
519
520        assert_eq!(store.len(), 3);
521
522        // GC with no active transactions (oldest = None)
523        // Should remove all deleted versions
524        let stats = store.gc(None);
525
526        assert_eq!(stats.versions_removed, 2);
527        assert_eq!(store.len(), 1); // Only v3 remains
528    }
529
530    #[test]
531    fn test_gc_bytes_freed() {
532        let mut store = VersionStore::new();
533
534        // Insert version with substantial data
535        let v1 = OldVersion {
536            data: vec![Value::Text("hello world".to_string())],
537            created_by: TransactionId(1),
538            deleted_by: Some(TransactionId(5)),
539            prev_version: None,
540        };
541        store.insert(v1);
542
543        let stats = store.gc(Some(TransactionId(10)));
544
545        assert_eq!(stats.versions_removed, 1);
546        assert!(stats.bytes_freed > 0);
547    }
548}