1use parking_lot::{Mutex, RwLock};
40use std::sync::atomic::{AtomicU64, Ordering};
41use std::sync::{Arc, Weak};
42
43#[derive(Debug, Clone)]
48pub struct Snapshot {
49 pub version: u64,
51 pub timestamp_us: u64,
53 pub column_groups: Vec<Vec<ColumnGroupRef>>,
55 pub min_visible_txn: u64,
57 pub max_visible_txn: u64,
59}
60
61#[derive(Debug, Clone)]
63pub struct ColumnGroupRef {
64 pub id: u64,
66 pub path: String,
68 pub level: u32,
70 pub sequence: u64,
72 pub row_count: u64,
74 pub min_timestamp: u64,
76 pub max_timestamp: u64,
78}
79
80impl Snapshot {
81 pub fn empty(version: u64) -> Self {
83 Self {
84 version,
85 timestamp_us: Self::now_us(),
86 column_groups: Vec::new(),
87 min_visible_txn: 0,
88 max_visible_txn: u64::MAX,
89 }
90 }
91
92 pub fn new(
94 version: u64,
95 column_groups: Vec<Vec<ColumnGroupRef>>,
96 min_visible_txn: u64,
97 max_visible_txn: u64,
98 ) -> Self {
99 Self {
100 version,
101 timestamp_us: Self::now_us(),
102 column_groups,
103 min_visible_txn,
104 max_visible_txn,
105 }
106 }
107
108 fn now_us() -> u64 {
109 std::time::SystemTime::now()
110 .duration_since(std::time::UNIX_EPOCH)
111 .unwrap()
112 .as_micros() as u64
113 }
114
115 pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
117 self.column_groups
118 .get(level)
119 .map(|v| v.as_slice())
120 .unwrap_or(&[])
121 }
122
123 pub fn total_groups(&self) -> usize {
125 self.column_groups.iter().map(|l| l.len()).sum()
126 }
127
128 pub fn is_visible(&self, txn_id: u64) -> bool {
130 txn_id >= self.min_visible_txn && txn_id <= self.max_visible_txn
131 }
132}
133
134#[derive(Debug)]
139pub struct SnapshotGuard {
140 snapshot: Arc<Snapshot>,
141}
142
143impl SnapshotGuard {
144 pub fn version(&self) -> u64 {
146 self.snapshot.version
147 }
148
149 pub fn snapshot(&self) -> &Snapshot {
151 &self.snapshot
152 }
153
154 pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
156 self.snapshot.level_groups(level)
157 }
158
159 pub fn is_visible(&self, txn_id: u64) -> bool {
161 self.snapshot.is_visible(txn_id)
162 }
163}
164
165impl std::ops::Deref for SnapshotGuard {
166 type Target = Snapshot;
167
168 fn deref(&self) -> &Self::Target {
169 &self.snapshot
170 }
171}
172
173pub struct VersionSet {
178 current: RwLock<Arc<Snapshot>>,
180 active_snapshots: Mutex<Vec<Weak<Snapshot>>>,
182 next_version: AtomicU64,
184 stats: VersionSetStats,
186}
187
188impl VersionSet {
189 pub fn new() -> Self {
191 let initial = Arc::new(Snapshot::empty(0));
192 Self {
193 current: RwLock::new(initial),
194 active_snapshots: Mutex::new(Vec::new()),
195 next_version: AtomicU64::new(1),
196 stats: VersionSetStats::default(),
197 }
198 }
199
200 pub fn acquire(&self) -> SnapshotGuard {
204 self.stats.acquires.fetch_add(1, Ordering::Relaxed);
205 let current = self.current.read();
206 SnapshotGuard {
207 snapshot: Arc::clone(¤t),
208 }
209 }
210
211 pub fn install(&self, snapshot: Snapshot) {
215 self.stats.installs.fetch_add(1, Ordering::Relaxed);
216
217 let new_snapshot = Arc::new(snapshot);
218
219 let old = {
221 let mut current = self.current.write();
222 let old = Arc::clone(¤t);
223 *current = new_snapshot;
224 old
225 };
226
227 let mut active = self.active_snapshots.lock();
229 active.push(Arc::downgrade(&old));
230
231 let current_count = active.len() as u64;
233 self.stats
234 .peak_versions
235 .fetch_max(current_count, Ordering::Relaxed);
236 }
237
238 pub fn create_snapshot(&self, column_groups: Vec<Vec<ColumnGroupRef>>) -> u64 {
240 let version = self.next_version.fetch_add(1, Ordering::SeqCst);
241 let snapshot = Snapshot::new(version, column_groups, 0, u64::MAX);
242 self.install(snapshot);
243 version
244 }
245
246 pub fn cleanup(&self) {
248 let mut active = self.active_snapshots.lock();
249 let before_len = active.len();
250 active.retain(|weak| weak.strong_count() > 0);
251 let cleaned = before_len - active.len();
252 self.stats
253 .cleanups
254 .fetch_add(cleaned as u64, Ordering::Relaxed);
255 }
256
257 pub fn current_version(&self) -> u64 {
259 self.current.read().version
260 }
261
262 pub fn active_count(&self) -> usize {
264 let active = self.active_snapshots.lock();
265 active.iter().filter(|w| w.strong_count() > 0).count() + 1
266 }
267
268 pub fn stats(&self) -> VersionSetStatsSnapshot {
270 VersionSetStatsSnapshot {
271 installs: self.stats.installs.load(Ordering::Relaxed),
272 acquires: self.stats.acquires.load(Ordering::Relaxed),
273 cleanups: self.stats.cleanups.load(Ordering::Relaxed),
274 peak_versions: self.stats.peak_versions.load(Ordering::Relaxed),
275 active_versions: self.active_count(),
276 }
277 }
278}
279
280impl Default for VersionSet {
281 fn default() -> Self {
282 Self::new()
283 }
284}
285
286#[derive(Debug, Default)]
288pub struct VersionSetStats {
289 pub installs: AtomicU64,
291 pub acquires: AtomicU64,
293 pub cleanups: AtomicU64,
295 pub peak_versions: AtomicU64,
297}
298
299#[derive(Debug, Clone, Default)]
301pub struct VersionSetStatsSnapshot {
302 pub installs: u64,
303 pub acquires: u64,
304 pub cleanups: u64,
305 pub peak_versions: u64,
306 pub active_versions: usize,
307}
308
309pub use Snapshot as ReadVersion;
311pub use SnapshotGuard as VersionGuard;
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn test_empty_version_set() {
319 let vs = VersionSet::new();
320 assert_eq!(vs.current_version(), 0);
321 assert_eq!(vs.active_count(), 1);
322 }
323
324 #[test]
325 fn test_acquire_snapshot() {
326 let vs = VersionSet::new();
327 let guard = vs.acquire();
328 assert_eq!(guard.version(), 0);
329 }
330
331 #[test]
332 fn test_install_snapshot() {
333 let vs = VersionSet::new();
334
335 let snapshot = Snapshot::new(1, vec![], 0, 100);
336 vs.install(snapshot);
337
338 assert_eq!(vs.current_version(), 1);
339
340 let guard = vs.acquire();
341 assert_eq!(guard.version(), 1);
342 assert!(guard.is_visible(50));
343 assert!(!guard.is_visible(101));
344 }
345
346 #[test]
347 fn test_old_snapshot_stays_valid() {
348 let vs = VersionSet::new();
349
350 let old_guard = vs.acquire();
352 assert_eq!(old_guard.version(), 0);
353
354 let snapshot = Snapshot::new(1, vec![], 0, u64::MAX);
356 vs.install(snapshot);
357
358 assert_eq!(old_guard.version(), 0);
360
361 let new_guard = vs.acquire();
363 assert_eq!(new_guard.version(), 1);
364 }
365
366 #[test]
367 fn test_cleanup() {
368 let vs = VersionSet::new();
369
370 for i in 1..=5 {
372 let snapshot = Snapshot::new(i, vec![], 0, u64::MAX);
373 vs.install(snapshot);
374 }
375
376 vs.cleanup();
378
379 assert_eq!(vs.active_count(), 1);
381 }
382
383 #[test]
384 fn test_cleanup_with_active_readers() {
385 let vs = VersionSet::new();
386
387 let guard = vs.acquire();
389
390 vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
392
393 vs.install(Snapshot::new(2, vec![], 0, u64::MAX));
395
396 vs.cleanup();
398
399 assert!(vs.active_count() >= 2);
402
403 drop(guard);
405 vs.cleanup();
406
407 assert_eq!(vs.active_count(), 1);
409 }
410
411 #[test]
412 fn test_stats() {
413 let vs = VersionSet::new();
414
415 let _g1 = vs.acquire();
416 let _g2 = vs.acquire();
417
418 vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
419
420 let stats = vs.stats();
421 assert_eq!(stats.acquires, 2);
422 assert_eq!(stats.installs, 1);
423 }
424}