1use parking_lot::{Mutex, RwLock};
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::sync::{Arc, Weak};
45
46#[derive(Debug, Clone)]
51pub struct Snapshot {
52 pub version: u64,
54 pub timestamp_us: u64,
56 pub column_groups: Vec<Vec<ColumnGroupRef>>,
58 pub min_visible_txn: u64,
60 pub max_visible_txn: u64,
62}
63
64#[derive(Debug, Clone)]
66pub struct ColumnGroupRef {
67 pub id: u64,
69 pub path: String,
71 pub level: u32,
73 pub sequence: u64,
75 pub row_count: u64,
77 pub min_timestamp: u64,
79 pub max_timestamp: u64,
81}
82
83impl Snapshot {
84 pub fn empty(version: u64) -> Self {
86 Self {
87 version,
88 timestamp_us: Self::now_us(),
89 column_groups: Vec::new(),
90 min_visible_txn: 0,
91 max_visible_txn: u64::MAX,
92 }
93 }
94
95 pub fn new(
97 version: u64,
98 column_groups: Vec<Vec<ColumnGroupRef>>,
99 min_visible_txn: u64,
100 max_visible_txn: u64,
101 ) -> Self {
102 Self {
103 version,
104 timestamp_us: Self::now_us(),
105 column_groups,
106 min_visible_txn,
107 max_visible_txn,
108 }
109 }
110
111 fn now_us() -> u64 {
112 std::time::SystemTime::now()
113 .duration_since(std::time::UNIX_EPOCH)
114 .unwrap()
115 .as_micros() as u64
116 }
117
118 pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
120 self.column_groups
121 .get(level)
122 .map(|v| v.as_slice())
123 .unwrap_or(&[])
124 }
125
126 pub fn total_groups(&self) -> usize {
128 self.column_groups.iter().map(|l| l.len()).sum()
129 }
130
131 pub fn is_visible(&self, txn_id: u64) -> bool {
133 txn_id >= self.min_visible_txn && txn_id <= self.max_visible_txn
134 }
135}
136
137#[derive(Debug)]
142pub struct SnapshotGuard {
143 snapshot: Arc<Snapshot>,
144}
145
146impl SnapshotGuard {
147 pub fn version(&self) -> u64 {
149 self.snapshot.version
150 }
151
152 pub fn snapshot(&self) -> &Snapshot {
154 &self.snapshot
155 }
156
157 pub fn level_groups(&self, level: usize) -> &[ColumnGroupRef] {
159 self.snapshot.level_groups(level)
160 }
161
162 pub fn is_visible(&self, txn_id: u64) -> bool {
164 self.snapshot.is_visible(txn_id)
165 }
166}
167
168impl std::ops::Deref for SnapshotGuard {
169 type Target = Snapshot;
170
171 fn deref(&self) -> &Self::Target {
172 &self.snapshot
173 }
174}
175
176pub struct VersionSet {
181 current: RwLock<Arc<Snapshot>>,
183 active_snapshots: Mutex<Vec<Weak<Snapshot>>>,
185 next_version: AtomicU64,
187 stats: VersionSetStats,
189}
190
191impl VersionSet {
192 pub fn new() -> Self {
194 let initial = Arc::new(Snapshot::empty(0));
195 Self {
196 current: RwLock::new(initial),
197 active_snapshots: Mutex::new(Vec::new()),
198 next_version: AtomicU64::new(1),
199 stats: VersionSetStats::default(),
200 }
201 }
202
203 pub fn acquire(&self) -> SnapshotGuard {
207 self.stats.acquires.fetch_add(1, Ordering::Relaxed);
208 let current = self.current.read();
209 SnapshotGuard {
210 snapshot: Arc::clone(¤t),
211 }
212 }
213
214 pub fn install(&self, snapshot: Snapshot) {
218 self.stats.installs.fetch_add(1, Ordering::Relaxed);
219
220 let new_snapshot = Arc::new(snapshot);
221
222 let old = {
224 let mut current = self.current.write();
225 let old = Arc::clone(¤t);
226 *current = new_snapshot;
227 old
228 };
229
230 let mut active = self.active_snapshots.lock();
232 active.push(Arc::downgrade(&old));
233
234 let current_count = active.len() as u64;
236 self.stats
237 .peak_versions
238 .fetch_max(current_count, Ordering::Relaxed);
239 }
240
241 pub fn create_snapshot(&self, column_groups: Vec<Vec<ColumnGroupRef>>) -> u64 {
243 let version = self.next_version.fetch_add(1, Ordering::SeqCst);
244 let snapshot = Snapshot::new(version, column_groups, 0, u64::MAX);
245 self.install(snapshot);
246 version
247 }
248
249 pub fn cleanup(&self) {
251 let mut active = self.active_snapshots.lock();
252 let before_len = active.len();
253 active.retain(|weak| weak.strong_count() > 0);
254 let cleaned = before_len - active.len();
255 self.stats
256 .cleanups
257 .fetch_add(cleaned as u64, Ordering::Relaxed);
258 }
259
260 pub fn current_version(&self) -> u64 {
262 self.current.read().version
263 }
264
265 pub fn active_count(&self) -> usize {
267 let active = self.active_snapshots.lock();
268 active.iter().filter(|w| w.strong_count() > 0).count() + 1
269 }
270
271 pub fn stats(&self) -> VersionSetStatsSnapshot {
273 VersionSetStatsSnapshot {
274 installs: self.stats.installs.load(Ordering::Relaxed),
275 acquires: self.stats.acquires.load(Ordering::Relaxed),
276 cleanups: self.stats.cleanups.load(Ordering::Relaxed),
277 peak_versions: self.stats.peak_versions.load(Ordering::Relaxed),
278 active_versions: self.active_count(),
279 }
280 }
281}
282
283impl Default for VersionSet {
284 fn default() -> Self {
285 Self::new()
286 }
287}
288
289#[derive(Debug, Default)]
291pub struct VersionSetStats {
292 pub installs: AtomicU64,
294 pub acquires: AtomicU64,
296 pub cleanups: AtomicU64,
298 pub peak_versions: AtomicU64,
300}
301
302#[derive(Debug, Clone, Default)]
304pub struct VersionSetStatsSnapshot {
305 pub installs: u64,
306 pub acquires: u64,
307 pub cleanups: u64,
308 pub peak_versions: u64,
309 pub active_versions: usize,
310}
311
312pub use Snapshot as ReadVersion;
314pub use SnapshotGuard as VersionGuard;
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 #[test]
321 fn test_empty_version_set() {
322 let vs = VersionSet::new();
323 assert_eq!(vs.current_version(), 0);
324 assert_eq!(vs.active_count(), 1);
325 }
326
327 #[test]
328 fn test_acquire_snapshot() {
329 let vs = VersionSet::new();
330 let guard = vs.acquire();
331 assert_eq!(guard.version(), 0);
332 }
333
334 #[test]
335 fn test_install_snapshot() {
336 let vs = VersionSet::new();
337
338 let snapshot = Snapshot::new(1, vec![], 0, 100);
339 vs.install(snapshot);
340
341 assert_eq!(vs.current_version(), 1);
342
343 let guard = vs.acquire();
344 assert_eq!(guard.version(), 1);
345 assert!(guard.is_visible(50));
346 assert!(!guard.is_visible(101));
347 }
348
349 #[test]
350 fn test_old_snapshot_stays_valid() {
351 let vs = VersionSet::new();
352
353 let old_guard = vs.acquire();
355 assert_eq!(old_guard.version(), 0);
356
357 let snapshot = Snapshot::new(1, vec![], 0, u64::MAX);
359 vs.install(snapshot);
360
361 assert_eq!(old_guard.version(), 0);
363
364 let new_guard = vs.acquire();
366 assert_eq!(new_guard.version(), 1);
367 }
368
369 #[test]
370 fn test_cleanup() {
371 let vs = VersionSet::new();
372
373 for i in 1..=5 {
375 let snapshot = Snapshot::new(i, vec![], 0, u64::MAX);
376 vs.install(snapshot);
377 }
378
379 vs.cleanup();
381
382 assert_eq!(vs.active_count(), 1);
384 }
385
386 #[test]
387 fn test_cleanup_with_active_readers() {
388 let vs = VersionSet::new();
389
390 let guard = vs.acquire();
392
393 vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
395
396 vs.install(Snapshot::new(2, vec![], 0, u64::MAX));
398
399 vs.cleanup();
401
402 assert!(vs.active_count() >= 2);
405
406 drop(guard);
408 vs.cleanup();
409
410 assert_eq!(vs.active_count(), 1);
412 }
413
414 #[test]
415 fn test_stats() {
416 let vs = VersionSet::new();
417
418 let _g1 = vs.acquire();
419 let _g2 = vs.acquire();
420
421 vs.install(Snapshot::new(1, vec![], 0, u64::MAX));
422
423 let stats = vs.stats();
424 assert_eq!(stats.acquires, 2);
425 assert_eq!(stats.installs, 1);
426 }
427}