Skip to main content

sochdb_storage/
mvcc_new.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! MVCC Version Management for LSCS
19//!
20//! Implements Multi-Version Concurrency Control for the Log-Structured Column Store.
21//! Provides lock-free reads during compaction by maintaining version snapshots.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! VersionSet
27//! ├── current: Arc<Snapshot>     (latest version for new reads)
28//! ├── active_snapshots: Vec<Weak<Snapshot>>  (snapshots still in use)
29//! └── cleanup_threshold: usize   (trigger cleanup when exceeded)
30//!
31//! Read Path:
32//!   snapshot = version_set.acquire();  // Arc clone + ref count
33//!   // ... perform reads using snapshot ...
34//!   // Drop snapshot -> decrements ref count
35//!
36//! Write Path:
37//!   new_snapshot = create_new_snapshot(...);
38//!   version_set.install(new_snapshot);  // Atomic swap
39//!   // Old snapshot cleaned when all readers finish
40//! ```
41
42use parking_lot::{Mutex, RwLock};
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::{Arc, Weak};
45
46/// A read-only snapshot of the storage state.
47///
48/// Readers acquire this via `VersionSet::acquire()`. The snapshot remains
49/// valid for the lifetime of the Arc, even if newer snapshots are installed.
50#[derive(Debug, Clone)]
51pub struct Snapshot {
52    /// Snapshot/version number (monotonically increasing)
53    pub version: u64,
54    /// Timestamp when this snapshot was created
55    pub timestamp_us: u64,
56    /// Column groups visible in this snapshot (by level)
57    pub column_groups: Vec<Vec<ColumnGroupRef>>,
58    /// Minimum visible transaction ID (for MVCC reads)
59    pub min_visible_txn: u64,
60    /// Maximum visible transaction ID
61    pub max_visible_txn: u64,
62}
63
64/// Reference to a column group file
65#[derive(Debug, Clone)]
66pub struct ColumnGroupRef {
67    /// Unique identifier
68    pub id: u64,
69    /// Path to column group directory
70    pub path: String,
71    /// Level in LSM tree
72    pub level: u32,
73    /// Sequence number
74    pub sequence: u64,
75    /// Row count
76    pub row_count: u64,
77    /// Minimum timestamp in this group
78    pub min_timestamp: u64,
79    /// Maximum timestamp in this group
80    pub max_timestamp: u64,
81}
82
83impl Snapshot {
84    /// Create a new empty snapshot
85    pub fn empty(version: u64) -> Self {
86        Self {
87            version,
88            timestamp_us: Self::now_us(),
89            column_groups: Vec::new(),
90            min_visible_txn: 0,
91            max_visible_txn: u64::MAX,
92        }
93    }
94
95    /// Create a snapshot with column groups
96    pub fn new(
97        version: u64,
98        column_groups: Vec<Vec<ColumnGroupRef>>,
99        min_visible_txn: u64,
100        max_visible_txn: u64,
101    ) -> Self {
102        Self {
103            version,
104            timestamp_us: Self::now_us(),
105            column_groups,
106            min_visible_txn,
107            max_visible_txn,
108        }
109    }
110
111    fn now_us() -> u64 {
112        std::time::SystemTime::now()
113            .duration_since(std::time::UNIX_EPOCH)
114            .unwrap()
115            .as_micros() as u64
116    }
117
118    /// Get column groups at a specific level
119    pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
120        self.column_groups
121            .get(level)
122            .map(|v| v.as_slice())
123            .unwrap_or(&[])
124    }
125
126    /// Get total number of column groups
127    pub fn total_groups(&self) -> usize {
128        self.column_groups.iter().map(|l| l.len()).sum()
129    }
130
131    /// Check if a transaction is visible in this snapshot
132    pub fn is_visible(&self, txn_id: u64) -> bool {
133        txn_id >= self.min_visible_txn && txn_id <= self.max_visible_txn
134    }
135}
136
137/// RAII guard for a snapshot.
138///
139/// Holds an Arc to the Snapshot, ensuring it remains valid
140/// for the lifetime of this guard.
141#[derive(Debug)]
142pub struct SnapshotGuard {
143    snapshot: Arc<Snapshot>,
144}
145
146impl SnapshotGuard {
147    /// Get the version number
148    pub fn version(&self) -> u64 {
149        self.snapshot.version
150    }
151
152    /// Access the underlying Snapshot
153    pub fn snapshot(&self) -> &Snapshot {
154        &self.snapshot
155    }
156
157    /// Get column groups at a level
158    pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
159        self.snapshot.level_groups(level)
160    }
161
162    /// Check if a transaction is visible
163    pub fn is_visible(&self, txn_id: u64) -> bool {
164        self.snapshot.is_visible(txn_id)
165    }
166}
167
168impl std::ops::Deref for SnapshotGuard {
169    type Target = Snapshot;
170
171    fn deref(&self) -> &Self::Target {
172        &self.snapshot
173    }
174}
175
176/// Manages multiple versions/snapshots of storage state.
177///
178/// Provides lock-free reads by allowing readers to hold references to
179/// older snapshots while compaction creates new versions.
180pub struct VersionSet {
181    /// Current snapshot (latest) - readers acquire this
182    current: RwLock<Arc<Snapshot>>,
183    /// Active snapshots that may still have readers
184    active_snapshots: Mutex<Vec<Weak<Snapshot>>>,
185    /// Version number counter
186    next_version: AtomicU64,
187    /// Statistics
188    stats: VersionSetStats,
189}
190
191impl VersionSet {
192    /// Create a new version set with an empty initial snapshot
193    pub fn new() -> Self {
194        let initial = Arc::new(Snapshot::empty(0));
195        Self {
196            current: RwLock::new(initial),
197            active_snapshots: Mutex::new(Vec::new()),
198            next_version: AtomicU64::new(1),
199            stats: VersionSetStats::default(),
200        }
201    }
202
203    /// Acquire a snapshot for reading
204    ///
205    /// Returns a guard that keeps the snapshot alive until dropped.
206    pub fn acquire(&self) -> SnapshotGuard {
207        self.stats.acquires.fetch_add(1, Ordering::Relaxed);
208        let current = self.current.read();
209        SnapshotGuard {
210            snapshot: Arc::clone(&current),
211        }
212    }
213
214    /// Install a new snapshot
215    ///
216    /// The old snapshot remains valid for existing readers.
217    pub fn install(&self, snapshot: Snapshot) {
218        self.stats.installs.fetch_add(1, Ordering::Relaxed);
219
220        let new_snapshot = Arc::new(snapshot);
221
222        // Track the old snapshot
223        let old = {
224            let mut current = self.current.write();
225            let old = Arc::clone(&current);
226            *current = new_snapshot;
227            old
228        };
229
230        // Add old snapshot to active list for cleanup tracking
231        let mut active = self.active_snapshots.lock();
232        active.push(Arc::downgrade(&old));
233
234        // Update peak versions
235        let current_count = active.len() as u64;
236        self.stats
237            .peak_versions
238            .fetch_max(current_count, Ordering::Relaxed);
239    }
240
241    /// Create and install a new snapshot with column groups
242    pub fn create_snapshot(&self, column_groups: Vec<Vec<ColumnGroupRef>>) -> u64 {
243        let version = self.next_version.fetch_add(1, Ordering::SeqCst);
244        let snapshot = Snapshot::new(version, column_groups, 0, u64::MAX);
245        self.install(snapshot);
246        version
247    }
248
249    /// Clean up old snapshots that have no readers
250    pub fn cleanup(&self) {
251        let mut active = self.active_snapshots.lock();
252        let before_len = active.len();
253        active.retain(|weak| weak.strong_count() > 0);
254        let cleaned = before_len - active.len();
255        self.stats
256            .cleanups
257            .fetch_add(cleaned as u64, Ordering::Relaxed);
258    }
259
260    /// Get current version number
261    pub fn current_version(&self) -> u64 {
262        self.current.read().version
263    }
264
265    /// Get number of active snapshots (includes current)
266    pub fn active_count(&self) -> usize {
267        let active = self.active_snapshots.lock();
268        active.iter().filter(|w| w.strong_count() > 0).count() + 1
269    }
270
271    /// Get statistics snapshot
272    pub fn stats(&self) -> VersionSetStatsSnapshot {
273        VersionSetStatsSnapshot {
274            installs: self.stats.installs.load(Ordering::Relaxed),
275            acquires: self.stats.acquires.load(Ordering::Relaxed),
276            cleanups: self.stats.cleanups.load(Ordering::Relaxed),
277            peak_versions: self.stats.peak_versions.load(Ordering::Relaxed),
278            active_versions: self.active_count(),
279        }
280    }
281}
282
283impl Default for VersionSet {
284    fn default() -> Self {
285        Self::new()
286    }
287}
288
289/// Statistics for the version set
290#[derive(Debug, Default)]
291pub struct VersionSetStats {
292    /// Number of snapshot installs
293    pub installs: AtomicU64,
294    /// Number of snapshot acquires
295    pub acquires: AtomicU64,
296    /// Number of old snapshots cleaned up
297    pub cleanups: AtomicU64,
298    /// Peak number of concurrent snapshots
299    pub peak_versions: AtomicU64,
300}
301
302/// Snapshot of version set statistics
303#[derive(Debug, Clone, Default)]
304pub struct VersionSetStatsSnapshot {
305    pub installs: u64,
306    pub acquires: u64,
307    pub cleanups: u64,
308    pub peak_versions: u64,
309    pub active_versions: usize,
310}
311
312// Re-export for backwards compatibility with lib.rs
313pub use Snapshot as ReadVersion;
314pub use SnapshotGuard as VersionGuard;
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    #[test]
321    fn test_empty_version_set() {
322        let vs = VersionSet::new();
323        assert_eq!(vs.current_version(), 0);
324        assert_eq!(vs.active_count(), 1);
325    }
326
327    #[test]
328    fn test_acquire_snapshot() {
329        let vs = VersionSet::new();
330        let guard = vs.acquire();
331        assert_eq!(guard.version(), 0);
332    }
333
334    #[test]
335    fn test_install_snapshot() {
336        let vs = VersionSet::new();
337
338        let snapshot = Snapshot::new(1, vec![], 0, 100);
339        vs.install(snapshot);
340
341        assert_eq!(vs.current_version(), 1);
342
343        let guard = vs.acquire();
344        assert_eq!(guard.version(), 1);
345        assert!(guard.is_visible(50));
346        assert!(!guard.is_visible(101));
347    }
348
349    #[test]
350    fn test_old_snapshot_stays_valid() {
351        let vs = VersionSet::new();
352
353        // Acquire before install
354        let old_guard = vs.acquire();
355        assert_eq!(old_guard.version(), 0);
356
357        // Install new version
358        let snapshot = Snapshot::new(1, vec![], 0, u64::MAX);
359        vs.install(snapshot);
360
361        // Old guard still valid
362        assert_eq!(old_guard.version(), 0);
363
364        // New acquire gets new version
365        let new_guard = vs.acquire();
366        assert_eq!(new_guard.version(), 1);
367    }
368
369    #[test]
370    fn test_cleanup() {
371        let vs = VersionSet::new();
372
373        // Install several versions
374        for i in 1..=5 {
375            let snapshot = Snapshot::new(i, vec![], 0, u64::MAX);
376            vs.install(snapshot);
377        }
378
379        // Without any guards held, cleanup should remove all old versions
380        vs.cleanup();
381
382        // Only current should remain
383        assert_eq!(vs.active_count(), 1);
384    }
385
386    #[test]
387    fn test_cleanup_with_active_readers() {
388        let vs = VersionSet::new();
389
390        // Acquire v0
391        let guard = vs.acquire();
392
393        // Install v1
394        vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
395
396        // Install v2
397        vs.install(Snapshot::new(2, vec![], 0, u64::MAX));
398
399        // v0 is still held by guard, so cleanup won't remove it
400        vs.cleanup();
401
402        // v0 (held), v1 (released), v2 (current) -> 2 active
403        // Actually, v0 is held and v2 is current = at least 2
404        assert!(vs.active_count() >= 2);
405
406        // Drop guard
407        drop(guard);
408        vs.cleanup();
409
410        // Now only current should remain
411        assert_eq!(vs.active_count(), 1);
412    }
413
414    #[test]
415    fn test_stats() {
416        let vs = VersionSet::new();
417
418        let _g1 = vs.acquire();
419        let _g2 = vs.acquire();
420
421        vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
422
423        let stats = vs.stats();
424        assert_eq!(stats.acquires, 2);
425        assert_eq!(stats.installs, 1);
426    }
427}