1use parking_lot::RwLock;
52use std::collections::{HashMap, VecDeque};
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
58pub struct VersionId {
59 pub epoch: u64,
60 pub sequence: u32,
61}
62
63impl VersionId {
64 pub fn new(epoch: u64, sequence: u32) -> Self {
65 Self { epoch, sequence }
66 }
67
68 pub fn is_stale(&self, watermark: u64) -> bool {
70 self.epoch < watermark
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct VersionedValue<T> {
77 pub version: VersionId,
78 pub value: T,
79 pub deleted: bool,
81}
82
83impl<T> VersionedValue<T> {
84 pub fn new(version: VersionId, value: T) -> Self {
85 Self {
86 version,
87 value,
88 deleted: false,
89 }
90 }
91
92 pub fn tombstone(version: VersionId, value: T) -> Self {
93 Self {
94 version,
95 value,
96 deleted: true,
97 }
98 }
99}
100
101#[derive(Debug)]
103pub struct VersionChain<T> {
104 versions: VecDeque<VersionedValue<T>>,
106 total_versions: u64,
108}
109
110impl<T: Clone> VersionChain<T> {
111 pub fn new() -> Self {
112 Self {
113 versions: VecDeque::new(),
114 total_versions: 0,
115 }
116 }
117
118 pub fn add_version(&mut self, version: VersionedValue<T>) {
120 self.versions.push_front(version);
121 self.total_versions += 1;
122 }
123
124 pub fn latest(&self) -> Option<&VersionedValue<T>> {
126 self.versions.front()
127 }
128
129 pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
132 for v in &self.versions {
133 if v.version.epoch <= epoch {
134 if v.deleted {
137 return None;
138 }
139 return Some(v);
140 }
141 }
142 None
143 }
144
145 pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
148 let initial_len = self.versions.len();
149
150 let mut kept = 0;
152 let mut last_visible_idx = None;
153
154 for (i, v) in self.versions.iter().enumerate() {
155 if v.version.epoch >= watermark {
156 kept += 1;
157 } else {
158 if last_visible_idx.is_none() {
160 last_visible_idx = Some(i);
161 kept += 1;
162 }
163 }
164 }
165
166 while self.versions.len() > kept {
168 self.versions.pop_back();
169 }
170
171 let removed = initial_len - self.versions.len();
172 let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
173
174 (removed, bytes_freed)
175 }
176
177 pub fn len(&self) -> usize {
179 self.versions.len()
180 }
181
182 pub fn is_empty(&self) -> bool {
183 self.versions.is_empty()
184 }
185}
186
187impl<T: Clone> Default for VersionChain<T> {
188 fn default() -> Self {
189 Self::new()
190 }
191}
192
193#[derive(Debug)]
195pub struct ReaderRegistry {
196 active_readers: RwLock<HashMap<u64, u64>>,
198 next_reader_id: AtomicU64,
200 active_count: AtomicUsize,
202}
203
204impl ReaderRegistry {
205 pub fn new() -> Self {
206 Self {
207 active_readers: RwLock::new(HashMap::new()),
208 next_reader_id: AtomicU64::new(1),
209 active_count: AtomicUsize::new(0),
210 }
211 }
212
213 pub fn register(&self, epoch: u64) -> u64 {
215 let reader_id = self.next_reader_id.fetch_add(1, Ordering::Relaxed);
216 let mut readers = self.active_readers.write();
217 readers.insert(reader_id, epoch);
218 self.active_count.fetch_add(1, Ordering::Relaxed);
219 reader_id
220 }
221
222 pub fn unregister(&self, reader_id: u64) {
224 let mut readers = self.active_readers.write();
225 if readers.remove(&reader_id).is_some() {
226 self.active_count.fetch_sub(1, Ordering::Relaxed);
227 }
228 }
229
230 pub fn min_active_epoch(&self) -> Option<u64> {
232 let readers = self.active_readers.read();
233 readers.values().copied().min()
234 }
235
236 pub fn active_count(&self) -> usize {
238 self.active_count.load(Ordering::Relaxed)
239 }
240}
241
242impl Default for ReaderRegistry {
243 fn default() -> Self {
244 Self::new()
245 }
246}
247
248#[derive(Debug, Default)]
250pub struct GCStats {
251 pub gc_cycles: AtomicU64,
252 pub versions_collected: AtomicU64,
253 pub bytes_freed: AtomicU64,
254 pub chains_scanned: AtomicU64,
255 pub last_gc_epoch: AtomicU64,
256 pub last_gc_duration_us: AtomicU64,
257}
258
259impl GCStats {
260 pub fn snapshot(&self) -> GCStatsSnapshot {
261 GCStatsSnapshot {
262 gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
263 versions_collected: self.versions_collected.load(Ordering::Relaxed),
264 bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
265 chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
266 last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
267 last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
268 }
269 }
270}
271
272#[derive(Debug, Clone)]
273pub struct GCStatsSnapshot {
274 pub gc_cycles: u64,
275 pub versions_collected: u64,
276 pub bytes_freed: u64,
277 pub chains_scanned: u64,
278 pub last_gc_epoch: u64,
279 pub last_gc_duration_us: u64,
280}
281
282#[derive(Debug, Clone)]
284pub struct GCConfig {
285 pub min_epochs_to_keep: u64,
287 pub gc_trigger_threshold: usize,
289 pub max_versions_per_cycle: usize,
291}
292
293impl Default for GCConfig {
294 fn default() -> Self {
295 Self {
296 min_epochs_to_keep: 2,
297 gc_trigger_threshold: 1000,
298 max_versions_per_cycle: 10000,
299 }
300 }
301}
302
303pub struct EpochGC<K, V>
305where
306 K: Eq + std::hash::Hash + Clone,
307 V: Clone,
308{
309 current_epoch: AtomicU64,
311 current_sequence: AtomicU64,
313 chains: RwLock<HashMap<K, VersionChain<V>>>,
315 readers: Arc<ReaderRegistry>,
317 config: GCConfig,
319 stats: GCStats,
321 pending_versions: AtomicUsize,
323}
324
325impl<K, V> EpochGC<K, V>
326where
327 K: Eq + std::hash::Hash + Clone,
328 V: Clone,
329{
330 pub fn new() -> Self {
332 Self::with_config(GCConfig::default())
333 }
334
335 pub fn with_config(config: GCConfig) -> Self {
337 Self {
338 current_epoch: AtomicU64::new(0),
339 current_sequence: AtomicU64::new(0),
340 chains: RwLock::new(HashMap::new()),
341 readers: Arc::new(ReaderRegistry::new()),
342 config,
343 stats: GCStats::default(),
344 pending_versions: AtomicUsize::new(0),
345 }
346 }
347
348 pub fn current_epoch(&self) -> u64 {
350 self.current_epoch.load(Ordering::SeqCst)
351 }
352
353 pub fn advance_epoch(&self) -> u64 {
355 self.current_sequence.store(0, Ordering::SeqCst);
356 self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
357 }
358
359 pub fn next_version(&self) -> VersionId {
361 let epoch = self.current_epoch.load(Ordering::SeqCst);
362 let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
363 VersionId::new(epoch, seq)
364 }
365
366 pub fn insert(&self, key: K, value: V) -> VersionId {
368 let version = self.next_version();
369 let versioned = VersionedValue::new(version, value);
370
371 {
372 let mut chains = self.chains.write();
373 chains.entry(key).or_default().add_version(versioned);
374 }
375
376 let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
377
378 if pending >= self.config.gc_trigger_threshold {
380 self.try_gc();
381 }
382
383 version
384 }
385
386 pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
388 let version = self.next_version();
389 let versioned = VersionedValue::tombstone(version, tombstone_value);
390
391 {
392 let mut chains = self.chains.write();
393 chains.entry(key).or_default().add_version(versioned);
394 }
395
396 self.pending_versions.fetch_add(1, Ordering::Relaxed);
397 version
398 }
399
400 pub fn get(&self, key: &K) -> Option<V> {
402 let chains = self.chains.read();
403 chains
404 .get(key)
405 .and_then(|chain| chain.latest())
406 .filter(|v| !v.deleted)
407 .map(|v| v.value.clone())
408 }
409
410 pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
412 let chains = self.chains.read();
413 chains
414 .get(key)
415 .and_then(|chain| chain.version_at(epoch))
416 .map(|v| v.value.clone())
417 }
418
419 pub fn begin_read(&self) -> ReadGuard {
421 let epoch = self.current_epoch.load(Ordering::SeqCst);
422 let reader_id = self.readers.register(epoch);
423 ReadGuard {
424 epoch,
425 reader_id,
426 registry: Arc::clone(&self.readers),
427 }
428 }
429
430 pub fn watermark(&self) -> u64 {
432 let current = self.current_epoch.load(Ordering::SeqCst);
433 let min_reader = self.readers.min_active_epoch().unwrap_or(current);
434
435 let grace = current.saturating_sub(self.config.min_epochs_to_keep);
437 grace.min(min_reader)
438 }
439
440 pub fn try_gc(&self) -> GCResult {
442 let start = std::time::Instant::now();
443 let watermark = self.watermark();
444
445 let mut versions_collected = 0;
446 let mut bytes_freed = 0;
447 let mut chains_scanned = 0;
448
449 {
450 let mut chains = self.chains.write();
451 let keys: Vec<K> = chains.keys().cloned().collect();
452
453 for key in keys {
454 if chains_scanned >= self.config.max_versions_per_cycle {
455 break;
456 }
457
458 if let Some(chain) = chains.get_mut(&key) {
459 let (removed, freed) = chain.gc(watermark);
460 versions_collected += removed;
461 bytes_freed += freed;
462 chains_scanned += 1;
463
464 if chain.is_empty() {
466 chains.remove(&key);
467 }
468 }
469 }
470 }
471
472 let duration = start.elapsed();
473
474 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
476 self.stats
477 .versions_collected
478 .fetch_add(versions_collected as u64, Ordering::Relaxed);
479 self.stats
480 .bytes_freed
481 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
482 self.stats
483 .chains_scanned
484 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
485 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
486 self.stats
487 .last_gc_duration_us
488 .store(duration.as_micros() as u64, Ordering::Relaxed);
489
490 self.pending_versions.store(0, Ordering::Relaxed);
492
493 GCResult {
494 versions_collected,
495 bytes_freed,
496 chains_scanned,
497 watermark,
498 duration_us: duration.as_micros() as u64,
499 }
500 }
501
502 pub fn force_gc(&self) -> GCResult {
504 let _old_limit = self.config.max_versions_per_cycle;
505 let _config = GCConfig {
507 max_versions_per_cycle: usize::MAX,
508 ..self.config.clone()
509 };
510
511 let start = std::time::Instant::now();
512 let watermark = self.watermark();
513
514 let mut versions_collected = 0;
515 let mut bytes_freed = 0;
516 let mut chains_scanned = 0;
517
518 {
519 let mut chains = self.chains.write();
520 for chain in chains.values_mut() {
521 let (removed, freed) = chain.gc(watermark);
522 versions_collected += removed;
523 bytes_freed += freed;
524 chains_scanned += 1;
525 }
526
527 chains.retain(|_, chain| !chain.is_empty());
529 }
530
531 let duration = start.elapsed();
532
533 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
534 self.stats
535 .versions_collected
536 .fetch_add(versions_collected as u64, Ordering::Relaxed);
537 self.stats
538 .bytes_freed
539 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
540 self.stats
541 .chains_scanned
542 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
543 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
544 self.stats
545 .last_gc_duration_us
546 .store(duration.as_micros() as u64, Ordering::Relaxed);
547 self.pending_versions.store(0, Ordering::Relaxed);
548
549 GCResult {
550 versions_collected,
551 bytes_freed,
552 chains_scanned,
553 watermark,
554 duration_us: duration.as_micros() as u64,
555 }
556 }
557
558 pub fn stats(&self) -> GCStatsSnapshot {
560 self.stats.snapshot()
561 }
562
563 pub fn version_count(&self) -> usize {
565 let chains = self.chains.read();
566 chains.values().map(|c| c.len()).sum()
567 }
568
569 pub fn chain_count(&self) -> usize {
571 self.chains.read().len()
572 }
573}
574
575impl<K, V> Default for EpochGC<K, V>
576where
577 K: Eq + std::hash::Hash + Clone,
578 V: Clone,
579{
580 fn default() -> Self {
581 Self::new()
582 }
583}
584
585#[derive(Debug, Clone)]
587pub struct GCResult {
588 pub versions_collected: usize,
589 pub bytes_freed: usize,
590 pub chains_scanned: usize,
591 pub watermark: u64,
592 pub duration_us: u64,
593}
594
595pub struct ReadGuard {
597 pub epoch: u64,
598 reader_id: u64,
599 registry: Arc<ReaderRegistry>,
600}
601
602impl Drop for ReadGuard {
603 fn drop(&mut self) {
604 self.registry.unregister(self.reader_id);
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611
612 #[test]
613 fn test_version_id() {
614 let v1 = VersionId::new(1, 0);
615 let v2 = VersionId::new(2, 0);
616
617 assert!(v1 < v2);
618 assert!(v1.is_stale(2));
619 assert!(!v2.is_stale(2));
620 }
621
622 #[test]
623 fn test_version_chain_basic() {
624 let mut chain: VersionChain<String> = VersionChain::new();
625
626 chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
627 chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
628
629 assert_eq!(chain.len(), 2);
630 assert_eq!(chain.latest().unwrap().value, "v2");
631 }
632
633 #[test]
634 fn test_version_chain_gc() {
635 let mut chain: VersionChain<String> = VersionChain::new();
636
637 for epoch in 0..5 {
639 chain.add_version(VersionedValue::new(
640 VersionId::new(epoch, 0),
641 format!("v{}", epoch),
642 ));
643 }
644
645 assert_eq!(chain.len(), 5);
646
647 let (removed, _) = chain.gc(3);
649
650 assert!(removed > 0);
652 assert!(chain.len() < 5);
653 }
654
655 #[test]
656 fn test_reader_registry() {
657 let registry = ReaderRegistry::new();
658
659 let r1 = registry.register(10);
660 let _r2 = registry.register(20);
661
662 assert_eq!(registry.active_count(), 2);
663 assert_eq!(registry.min_active_epoch(), Some(10));
664
665 registry.unregister(r1);
666 assert_eq!(registry.active_count(), 1);
667 assert_eq!(registry.min_active_epoch(), Some(20));
668 }
669
670 #[test]
671 fn test_epoch_gc_basic() {
672 let gc: EpochGC<String, i32> = EpochGC::new();
673
674 let _v1 = gc.insert("key1".to_string(), 100);
675 let _v2 = gc.insert("key1".to_string(), 200);
676
677 assert_eq!(gc.get(&"key1".to_string()), Some(200));
678 assert_eq!(gc.version_count(), 2);
679 }
680
681 #[test]
682 fn test_epoch_gc_delete() {
683 let gc: EpochGC<String, i32> = EpochGC::new();
684
685 gc.insert("key1".to_string(), 100);
686 gc.delete("key1".to_string(), 0); assert_eq!(gc.get(&"key1".to_string()), None);
689 }
690
691 #[test]
692 fn test_epoch_gc_at_epoch() {
693 let gc: EpochGC<String, i32> = EpochGC::new();
694
695 gc.insert("key1".to_string(), 100);
696 gc.advance_epoch();
697 gc.insert("key1".to_string(), 200);
698 gc.advance_epoch();
699 gc.insert("key1".to_string(), 300);
700
701 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), Some(100));
702 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(200));
703 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(300));
704 }
705
706 #[test]
707 fn test_read_guard() {
708 let gc: EpochGC<String, i32> = EpochGC::new();
709
710 gc.insert("key1".to_string(), 100);
711
712 {
713 let _guard = gc.begin_read();
714 assert_eq!(gc.readers.active_count(), 1);
715 }
716
717 assert_eq!(gc.readers.active_count(), 0);
718 }
719
720 #[test]
721 fn test_watermark_calculation() {
722 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
723 min_epochs_to_keep: 2,
724 ..Default::default()
725 });
726
727 gc.insert("k".to_string(), 1);
729 gc.advance_epoch(); gc.insert("k".to_string(), 2);
731 gc.advance_epoch(); gc.insert("k".to_string(), 3);
733 gc.advance_epoch(); gc.insert("k".to_string(), 4);
735 gc.advance_epoch(); assert!(gc.watermark() <= 2);
739
740 let _guard = gc.begin_read();
742 assert!(gc.watermark() <= gc.current_epoch());
743 }
744
745 #[test]
746 fn test_gc_cycle() {
747 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
748 min_epochs_to_keep: 1,
749 gc_trigger_threshold: 100,
750 max_versions_per_cycle: 100,
751 });
752
753 for i in 0..10 {
755 gc.insert("key".to_string(), i);
756 gc.advance_epoch();
757 }
758
759 assert_eq!(gc.version_count(), 10);
760
761 let result = gc.try_gc();
763
764 assert!(result.versions_collected > 0 || gc.version_count() < 10);
766 }
767
768 #[test]
769 fn test_gc_stats() {
770 let gc: EpochGC<String, i32> = EpochGC::new();
771
772 for i in 0..5 {
773 gc.insert("key".to_string(), i);
774 gc.advance_epoch();
775 }
776
777 gc.try_gc();
778
779 let stats = gc.stats();
780 assert!(stats.gc_cycles >= 1);
781 }
782
783 #[test]
784 fn test_force_gc() {
785 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
786 min_epochs_to_keep: 0,
787 gc_trigger_threshold: 1000,
788 max_versions_per_cycle: 1, });
790
791 for i in 0..20 {
792 gc.insert(format!("key{}", i), i);
793 }
794
795 gc.advance_epoch();
796 gc.advance_epoch();
797
798 let initial_count = gc.version_count();
799 gc.force_gc();
800 let final_count = gc.version_count();
801
802 assert!(final_count <= initial_count);
804 }
805
806 #[test]
807 fn test_chain_count() {
808 let gc: EpochGC<String, i32> = EpochGC::new();
809
810 gc.insert("key1".to_string(), 1);
811 gc.insert("key2".to_string(), 2);
812 gc.insert("key3".to_string(), 3);
813
814 assert_eq!(gc.chain_count(), 3);
815 }
816
817 #[test]
818 fn test_version_at_respects_tombstone() {
819 let mut chain: VersionChain<i32> = VersionChain::new();
820
821 chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
822 chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
823
824 assert!(chain.version_at(1).is_none());
826 assert_eq!(chain.version_at(0).map(|v| v.value), Some(100));
828 }
829
830 #[test]
831 fn test_gc_result_fields() {
832 let gc: EpochGC<String, i32> = EpochGC::new();
833
834 for i in 0..5 {
835 gc.insert("key".to_string(), i);
836 gc.advance_epoch();
837 }
838
839 let result = gc.try_gc();
840
841 assert!(result.watermark <= gc.current_epoch());
843 assert!(result.chains_scanned <= gc.chain_count() + 1);
844 }
845}