sochdb_storage/
mvcc_new.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! MVCC Version Management for LSCS
16//!
17//! Implements Multi-Version Concurrency Control for the Log-Structured Column Store.
18//! Provides lock-free reads during compaction by maintaining version snapshots.
19//!
20//! ## Architecture
21//!
22//! ```text
23//! VersionSet
24//! ├── current: Arc<Snapshot>     (latest version for new reads)
25//! ├── active_snapshots: Vec<Weak<Snapshot>>  (snapshots still in use)
26//! └── cleanup_threshold: usize   (trigger cleanup when exceeded)
27//!
28//! Read Path:
29//!   snapshot = version_set.acquire();  // Arc clone + ref count
30//!   // ... perform reads using snapshot ...
31//!   // Drop snapshot -> decrements ref count
32//!
33//! Write Path:
34//!   new_snapshot = create_new_snapshot(...);
35//!   version_set.install(new_snapshot);  // Atomic swap
36//!   // Old snapshot cleaned when all readers finish
37//! ```
38
39use parking_lot::{Mutex, RwLock};
40use std::sync::atomic::{AtomicU64, Ordering};
41use std::sync::{Arc, Weak};
42
43/// A read-only snapshot of the storage state.
44///
45/// Readers acquire this via `VersionSet::acquire()`. The snapshot remains
46/// valid for the lifetime of the Arc, even if newer snapshots are installed.
47#[derive(Debug, Clone)]
48pub struct Snapshot {
49    /// Snapshot/version number (monotonically increasing)
50    pub version: u64,
51    /// Timestamp when this snapshot was created
52    pub timestamp_us: u64,
53    /// Column groups visible in this snapshot (by level)
54    pub column_groups: Vec<Vec<ColumnGroupRef>>,
55    /// Minimum visible transaction ID (for MVCC reads)
56    pub min_visible_txn: u64,
57    /// Maximum visible transaction ID
58    pub max_visible_txn: u64,
59}
60
61/// Reference to a column group file
62#[derive(Debug, Clone)]
63pub struct ColumnGroupRef {
64    /// Unique identifier
65    pub id: u64,
66    /// Path to column group directory
67    pub path: String,
68    /// Level in LSM tree
69    pub level: u32,
70    /// Sequence number
71    pub sequence: u64,
72    /// Row count
73    pub row_count: u64,
74    /// Minimum timestamp in this group
75    pub min_timestamp: u64,
76    /// Maximum timestamp in this group
77    pub max_timestamp: u64,
78}
79
80impl Snapshot {
81    /// Create a new empty snapshot
82    pub fn empty(version: u64) -> Self {
83        Self {
84            version,
85            timestamp_us: Self::now_us(),
86            column_groups: Vec::new(),
87            min_visible_txn: 0,
88            max_visible_txn: u64::MAX,
89        }
90    }
91
92    /// Create a snapshot with column groups
93    pub fn new(
94        version: u64,
95        column_groups: Vec<Vec<ColumnGroupRef>>,
96        min_visible_txn: u64,
97        max_visible_txn: u64,
98    ) -> Self {
99        Self {
100            version,
101            timestamp_us: Self::now_us(),
102            column_groups,
103            min_visible_txn,
104            max_visible_txn,
105        }
106    }
107
108    fn now_us() -> u64 {
109        std::time::SystemTime::now()
110            .duration_since(std::time::UNIX_EPOCH)
111            .unwrap()
112            .as_micros() as u64
113    }
114
115    /// Get column groups at a specific level
116    pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
117        self.column_groups
118            .get(level)
119            .map(|v| v.as_slice())
120            .unwrap_or(&[])
121    }
122
123    /// Get total number of column groups
124    pub fn total_groups(&self) -> usize {
125        self.column_groups.iter().map(|l| l.len()).sum()
126    }
127
128    /// Check if a transaction is visible in this snapshot
129    pub fn is_visible(&self, txn_id: u64) -> bool {
130        txn_id >= self.min_visible_txn && txn_id <= self.max_visible_txn
131    }
132}
133
134/// RAII guard for a snapshot.
135///
136/// Holds an Arc to the Snapshot, ensuring it remains valid
137/// for the lifetime of this guard.
138#[derive(Debug)]
139pub struct SnapshotGuard {
140    snapshot: Arc<Snapshot>,
141}
142
143impl SnapshotGuard {
144    /// Get the version number
145    pub fn version(&self) -> u64 {
146        self.snapshot.version
147    }
148
149    /// Access the underlying Snapshot
150    pub fn snapshot(&self) -> &Snapshot {
151        &self.snapshot
152    }
153
154    /// Get column groups at a level
155    pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
156        self.snapshot.level_groups(level)
157    }
158
159    /// Check if a transaction is visible
160    pub fn is_visible(&self, txn_id: u64) -> bool {
161        self.snapshot.is_visible(txn_id)
162    }
163}
164
165impl std::ops::Deref for SnapshotGuard {
166    type Target = Snapshot;
167
168    fn deref(&self) -> &Self::Target {
169        &self.snapshot
170    }
171}
172
173/// Manages multiple versions/snapshots of storage state.
174///
175/// Provides lock-free reads by allowing readers to hold references to
176/// older snapshots while compaction creates new versions.
177pub struct VersionSet {
178    /// Current snapshot (latest) - readers acquire this
179    current: RwLock<Arc<Snapshot>>,
180    /// Active snapshots that may still have readers
181    active_snapshots: Mutex<Vec<Weak<Snapshot>>>,
182    /// Version number counter
183    next_version: AtomicU64,
184    /// Statistics
185    stats: VersionSetStats,
186}
187
188impl VersionSet {
189    /// Create a new version set with an empty initial snapshot
190    pub fn new() -> Self {
191        let initial = Arc::new(Snapshot::empty(0));
192        Self {
193            current: RwLock::new(initial),
194            active_snapshots: Mutex::new(Vec::new()),
195            next_version: AtomicU64::new(1),
196            stats: VersionSetStats::default(),
197        }
198    }
199
200    /// Acquire a snapshot for reading
201    ///
202    /// Returns a guard that keeps the snapshot alive until dropped.
203    pub fn acquire(&self) -> SnapshotGuard {
204        self.stats.acquires.fetch_add(1, Ordering::Relaxed);
205        let current = self.current.read();
206        SnapshotGuard {
207            snapshot: Arc::clone(&current),
208        }
209    }
210
211    /// Install a new snapshot
212    ///
213    /// The old snapshot remains valid for existing readers.
214    pub fn install(&self, snapshot: Snapshot) {
215        self.stats.installs.fetch_add(1, Ordering::Relaxed);
216
217        let new_snapshot = Arc::new(snapshot);
218
219        // Track the old snapshot
220        let old = {
221            let mut current = self.current.write();
222            let old = Arc::clone(&current);
223            *current = new_snapshot;
224            old
225        };
226
227        // Add old snapshot to active list for cleanup tracking
228        let mut active = self.active_snapshots.lock();
229        active.push(Arc::downgrade(&old));
230
231        // Update peak versions
232        let current_count = active.len() as u64;
233        self.stats
234            .peak_versions
235            .fetch_max(current_count, Ordering::Relaxed);
236    }
237
238    /// Create and install a new snapshot with column groups
239    pub fn create_snapshot(&self, column_groups: Vec<Vec<ColumnGroupRef>>) -> u64 {
240        let version = self.next_version.fetch_add(1, Ordering::SeqCst);
241        let snapshot = Snapshot::new(version, column_groups, 0, u64::MAX);
242        self.install(snapshot);
243        version
244    }
245
246    /// Clean up old snapshots that have no readers
247    pub fn cleanup(&self) {
248        let mut active = self.active_snapshots.lock();
249        let before_len = active.len();
250        active.retain(|weak| weak.strong_count() > 0);
251        let cleaned = before_len - active.len();
252        self.stats
253            .cleanups
254            .fetch_add(cleaned as u64, Ordering::Relaxed);
255    }
256
257    /// Get current version number
258    pub fn current_version(&self) -> u64 {
259        self.current.read().version
260    }
261
262    /// Get number of active snapshots (includes current)
263    pub fn active_count(&self) -> usize {
264        let active = self.active_snapshots.lock();
265        active.iter().filter(|w| w.strong_count() > 0).count() + 1
266    }
267
268    /// Get statistics snapshot
269    pub fn stats(&self) -> VersionSetStatsSnapshot {
270        VersionSetStatsSnapshot {
271            installs: self.stats.installs.load(Ordering::Relaxed),
272            acquires: self.stats.acquires.load(Ordering::Relaxed),
273            cleanups: self.stats.cleanups.load(Ordering::Relaxed),
274            peak_versions: self.stats.peak_versions.load(Ordering::Relaxed),
275            active_versions: self.active_count(),
276        }
277    }
278}
279
280impl Default for VersionSet {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286/// Statistics for the version set
287#[derive(Debug, Default)]
288pub struct VersionSetStats {
289    /// Number of snapshot installs
290    pub installs: AtomicU64,
291    /// Number of snapshot acquires
292    pub acquires: AtomicU64,
293    /// Number of old snapshots cleaned up
294    pub cleanups: AtomicU64,
295    /// Peak number of concurrent snapshots
296    pub peak_versions: AtomicU64,
297}
298
299/// Snapshot of version set statistics
300#[derive(Debug, Clone, Default)]
301pub struct VersionSetStatsSnapshot {
302    pub installs: u64,
303    pub acquires: u64,
304    pub cleanups: u64,
305    pub peak_versions: u64,
306    pub active_versions: usize,
307}
308
309// Re-export for backwards compatibility with lib.rs
310pub use Snapshot as ReadVersion;
311pub use SnapshotGuard as VersionGuard;
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[test]
318    fn test_empty_version_set() {
319        let vs = VersionSet::new();
320        assert_eq!(vs.current_version(), 0);
321        assert_eq!(vs.active_count(), 1);
322    }
323
324    #[test]
325    fn test_acquire_snapshot() {
326        let vs = VersionSet::new();
327        let guard = vs.acquire();
328        assert_eq!(guard.version(), 0);
329    }
330
331    #[test]
332    fn test_install_snapshot() {
333        let vs = VersionSet::new();
334
335        let snapshot = Snapshot::new(1, vec![], 0, 100);
336        vs.install(snapshot);
337
338        assert_eq!(vs.current_version(), 1);
339
340        let guard = vs.acquire();
341        assert_eq!(guard.version(), 1);
342        assert!(guard.is_visible(50));
343        assert!(!guard.is_visible(101));
344    }
345
346    #[test]
347    fn test_old_snapshot_stays_valid() {
348        let vs = VersionSet::new();
349
350        // Acquire before install
351        let old_guard = vs.acquire();
352        assert_eq!(old_guard.version(), 0);
353
354        // Install new version
355        let snapshot = Snapshot::new(1, vec![], 0, u64::MAX);
356        vs.install(snapshot);
357
358        // Old guard still valid
359        assert_eq!(old_guard.version(), 0);
360
361        // New acquire gets new version
362        let new_guard = vs.acquire();
363        assert_eq!(new_guard.version(), 1);
364    }
365
366    #[test]
367    fn test_cleanup() {
368        let vs = VersionSet::new();
369
370        // Install several versions
371        for i in 1..=5 {
372            let snapshot = Snapshot::new(i, vec![], 0, u64::MAX);
373            vs.install(snapshot);
374        }
375
376        // Without any guards held, cleanup should remove all old versions
377        vs.cleanup();
378
379        // Only current should remain
380        assert_eq!(vs.active_count(), 1);
381    }
382
383    #[test]
384    fn test_cleanup_with_active_readers() {
385        let vs = VersionSet::new();
386
387        // Acquire v0
388        let guard = vs.acquire();
389
390        // Install v1
391        vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
392
393        // Install v2
394        vs.install(Snapshot::new(2, vec![], 0, u64::MAX));
395
396        // v0 is still held by guard, so cleanup won't remove it
397        vs.cleanup();
398
399        // v0 (held), v1 (released), v2 (current) -> 2 active
400        // Actually, v0 is held and v2 is current = at least 2
401        assert!(vs.active_count() >= 2);
402
403        // Drop guard
404        drop(guard);
405        vs.cleanup();
406
407        // Now only current should remain
408        assert_eq!(vs.active_count(), 1);
409    }
410
411    #[test]
412    fn test_stats() {
413        let vs = VersionSet::new();
414
415        let _g1 = vs.acquire();
416        let _g2 = vs.acquire();
417
418        vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
419
420        let stats = vs.stats();
421        assert_eq!(stats.acquires, 2);
422        assert_eq!(stats.installs, 1);
423    }
424}