1use dashmap::DashMap;
52use std::collections::VecDeque;
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
58pub struct VersionId {
59 pub epoch: u64,
60 pub sequence: u32,
61}
62
63impl VersionId {
64 pub fn new(epoch: u64, sequence: u32) -> Self {
65 Self { epoch, sequence }
66 }
67
68 pub fn is_stale(&self, watermark: u64) -> bool {
70 self.epoch < watermark
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct VersionedValue<T> {
77 pub version: VersionId,
78 pub value: T,
79 pub deleted: bool,
81}
82
83impl<T> VersionedValue<T> {
84 pub fn new(version: VersionId, value: T) -> Self {
85 Self {
86 version,
87 value,
88 deleted: false,
89 }
90 }
91
92 pub fn tombstone(version: VersionId, value: T) -> Self {
93 Self {
94 version,
95 value,
96 deleted: true,
97 }
98 }
99}
100
101#[derive(Debug)]
103pub struct VersionChain<T> {
104 versions: VecDeque<VersionedValue<T>>,
106 total_versions: u64,
108}
109
110impl<T: Clone> VersionChain<T> {
111 pub fn new() -> Self {
112 Self {
113 versions: VecDeque::new(),
114 total_versions: 0,
115 }
116 }
117
118 pub fn add_version(&mut self, version: VersionedValue<T>) {
120 self.versions.push_front(version);
121 self.total_versions += 1;
122 }
123
124 pub fn latest(&self) -> Option<&VersionedValue<T>> {
126 self.versions.front()
127 }
128
129 pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
132 for v in &self.versions {
133 if v.version.epoch < epoch {
134 if v.deleted {
137 return None;
138 }
139 return Some(v);
140 }
141 }
142 None
143 }
144
145 pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
148 let initial_len = self.versions.len();
149
150 let mut kept = 0;
152 let mut found_base = false;
153
154 for v in self.versions.iter() {
155 if v.version.epoch >= watermark {
156 kept += 1;
157 } else if !found_base {
158 found_base = true;
160 kept += 1;
161 }
162 }
163
164 self.versions.truncate(kept);
166
167 let removed = initial_len - self.versions.len();
168 let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
169
170 (removed, bytes_freed)
171 }
172
173 pub fn len(&self) -> usize {
175 self.versions.len()
176 }
177
178 pub fn is_empty(&self) -> bool {
179 self.versions.is_empty()
180 }
181}
182
183impl<T: Clone> Default for VersionChain<T> {
184 fn default() -> Self {
185 Self::new()
186 }
187}
188
189const MAX_READER_SLOTS: usize = 256;
192
193const SLOT_EMPTY: u64 = u64::MAX;
195
196#[derive(Debug)]
202#[repr(C, align(64))]
203struct EpochSlot {
204 epoch: AtomicU64,
205}
206
207impl EpochSlot {
208 const fn empty() -> Self {
209 Self {
210 epoch: AtomicU64::new(SLOT_EMPTY),
211 }
212 }
213}
214
215#[derive(Debug)]
225pub struct ReaderRegistry {
226 slots: Box<[EpochSlot; MAX_READER_SLOTS]>,
228 active_count: AtomicUsize,
230}
231
232impl ReaderRegistry {
233 pub fn new() -> Self {
234 let slots: Box<[EpochSlot; MAX_READER_SLOTS]> = {
236 let mut v: Vec<EpochSlot> = Vec::with_capacity(MAX_READER_SLOTS);
237 for _ in 0..MAX_READER_SLOTS {
238 v.push(EpochSlot::empty());
239 }
240 v.into_boxed_slice().try_into().ok().unwrap()
241 };
242 Self {
243 slots,
244 active_count: AtomicUsize::new(0),
245 }
246 }
247
248 pub fn register(&self, epoch: u64) -> Option<u64> {
258 for (i, slot) in self.slots.iter().enumerate() {
259 if slot
260 .epoch
261 .compare_exchange(SLOT_EMPTY, epoch, Ordering::AcqRel, Ordering::Relaxed)
262 .is_ok()
263 {
264 self.active_count.fetch_add(1, Ordering::Relaxed);
265 return Some(i as u64);
266 }
267 }
268 None
273 }
274
275 pub fn unregister(&self, reader_id: u64) {
279 let idx = reader_id as usize;
280 if idx < MAX_READER_SLOTS {
281 let prev = self.slots[idx].epoch.swap(SLOT_EMPTY, Ordering::Release);
282 if prev != SLOT_EMPTY {
283 self.active_count.fetch_sub(1, Ordering::Relaxed);
284 }
285 }
286 }
287
288 pub fn min_active_epoch(&self) -> Option<u64> {
292 let mut min_epoch = u64::MAX;
293 for slot in self.slots.iter() {
294 let e = slot.epoch.load(Ordering::Relaxed);
295 if e != SLOT_EMPTY && e < min_epoch {
296 min_epoch = e;
297 }
298 }
299 if min_epoch == u64::MAX {
300 None
301 } else {
302 Some(min_epoch)
303 }
304 }
305
306 pub fn active_count(&self) -> usize {
308 self.active_count.load(Ordering::Relaxed)
309 }
310}
311
312impl Default for ReaderRegistry {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318#[derive(Debug, Default)]
320pub struct GCStats {
321 pub gc_cycles: AtomicU64,
322 pub versions_collected: AtomicU64,
323 pub bytes_freed: AtomicU64,
324 pub chains_scanned: AtomicU64,
325 pub last_gc_epoch: AtomicU64,
326 pub last_gc_duration_us: AtomicU64,
327}
328
329impl GCStats {
330 pub fn snapshot(&self) -> GCStatsSnapshot {
331 GCStatsSnapshot {
332 gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
333 versions_collected: self.versions_collected.load(Ordering::Relaxed),
334 bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
335 chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
336 last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
337 last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
338 }
339 }
340}
341
342#[derive(Debug, Clone)]
343pub struct GCStatsSnapshot {
344 pub gc_cycles: u64,
345 pub versions_collected: u64,
346 pub bytes_freed: u64,
347 pub chains_scanned: u64,
348 pub last_gc_epoch: u64,
349 pub last_gc_duration_us: u64,
350}
351
352#[derive(Debug, Clone)]
354pub struct GCConfig {
355 pub min_epochs_to_keep: u64,
357 pub gc_trigger_threshold: usize,
359 pub max_versions_per_cycle: usize,
361}
362
363impl Default for GCConfig {
364 fn default() -> Self {
365 Self {
366 min_epochs_to_keep: 2,
367 gc_trigger_threshold: 1000,
368 max_versions_per_cycle: 10000,
369 }
370 }
371}
372
373pub struct EpochGC<K, V>
375where
376 K: Eq + std::hash::Hash + Clone,
377 V: Clone,
378{
379 current_epoch: AtomicU64,
381 current_sequence: AtomicU64,
383 chains: DashMap<K, VersionChain<V>>,
386 readers: Arc<ReaderRegistry>,
388 config: GCConfig,
390 stats: GCStats,
392 pending_versions: AtomicUsize,
394}
395
396impl<K, V> EpochGC<K, V>
397where
398 K: Eq + std::hash::Hash + Clone,
399 V: Clone,
400{
401 pub fn new() -> Self {
403 Self::with_config(GCConfig::default())
404 }
405
406 pub fn with_config(config: GCConfig) -> Self {
408 Self {
409 current_epoch: AtomicU64::new(0),
410 current_sequence: AtomicU64::new(0),
411 chains: DashMap::new(),
412 readers: Arc::new(ReaderRegistry::new()),
413 config,
414 stats: GCStats::default(),
415 pending_versions: AtomicUsize::new(0),
416 }
417 }
418
419 pub fn current_epoch(&self) -> u64 {
421 self.current_epoch.load(Ordering::SeqCst)
422 }
423
424 pub fn advance_epoch(&self) -> u64 {
426 self.current_sequence.store(0, Ordering::SeqCst);
427 self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
428 }
429
430 pub fn next_version(&self) -> VersionId {
432 let epoch = self.current_epoch.load(Ordering::SeqCst);
433 let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
434 VersionId::new(epoch, seq)
435 }
436
437 pub fn insert(&self, key: K, value: V) -> VersionId {
439 let version = self.next_version();
440 let versioned = VersionedValue::new(version, value);
441
442 self.chains.entry(key).or_default().add_version(versioned);
443
444 let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
445
446 if pending >= self.config.gc_trigger_threshold {
448 self.try_gc();
449 }
450
451 version
452 }
453
454 pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
456 let version = self.next_version();
457 let versioned = VersionedValue::tombstone(version, tombstone_value);
458
459 self.chains.entry(key).or_default().add_version(versioned);
460
461 self.pending_versions.fetch_add(1, Ordering::Relaxed);
462 version
463 }
464
465 pub fn get(&self, key: &K) -> Option<V> {
467 self.chains
468 .get(key)
469 .and_then(|entry| entry.latest().cloned())
470 .filter(|v| !v.deleted)
471 .map(|v| v.value.clone())
472 }
473
474 pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
476 self.chains
477 .get(key)
478 .and_then(|entry| entry.version_at(epoch).cloned())
479 .map(|v| v.value.clone())
480 }
481
482 pub fn begin_read(&self) -> ReadGuard {
488 let epoch = self.current_epoch.load(Ordering::SeqCst);
489 let reader_id = self.readers.register(epoch).unwrap_or_else(|| {
490 panic!(
491 "EpochGC: all {} reader slots exhausted — too many concurrent readers. \
492 Ensure ReadGuards are dropped promptly.",
493 MAX_READER_SLOTS
494 )
495 });
496 ReadGuard {
497 epoch,
498 reader_id,
499 registry: Arc::clone(&self.readers),
500 }
501 }
502
503 pub fn watermark(&self) -> u64 {
505 let current = self.current_epoch.load(Ordering::SeqCst);
506 let min_reader = self.readers.min_active_epoch().unwrap_or(current);
507
508 let grace = current.saturating_sub(self.config.min_epochs_to_keep);
510 grace.min(min_reader)
511 }
512
513 pub fn try_gc(&self) -> GCResult {
515 let start = std::time::Instant::now();
516 let watermark = self.watermark();
517
518 let mut versions_collected = 0;
519 let mut bytes_freed = 0;
520 let mut chains_scanned = 0;
521
522 let keys: Vec<K> = self
524 .chains
525 .iter()
526 .map(|entry| entry.key().clone())
527 .collect();
528
529 for key in keys {
530 if chains_scanned >= self.config.max_versions_per_cycle {
531 break;
532 }
533
534 if let Some(mut entry) = self.chains.get_mut(&key) {
535 let (removed, freed) = entry.gc(watermark);
536 versions_collected += removed;
537 bytes_freed += freed;
538 chains_scanned += 1;
539 }
540 }
541
542 self.chains.retain(|_, chain| !chain.is_empty());
544
545 let duration = start.elapsed();
546
547 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
549 self.stats
550 .versions_collected
551 .fetch_add(versions_collected as u64, Ordering::Relaxed);
552 self.stats
553 .bytes_freed
554 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
555 self.stats
556 .chains_scanned
557 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
558 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
559 self.stats
560 .last_gc_duration_us
561 .store(duration.as_micros() as u64, Ordering::Relaxed);
562
563 self.pending_versions.store(0, Ordering::Relaxed);
565
566 GCResult {
567 versions_collected,
568 bytes_freed,
569 chains_scanned,
570 watermark,
571 duration_us: duration.as_micros() as u64,
572 }
573 }
574
575 pub fn force_gc(&self) -> GCResult {
577 let _old_limit = self.config.max_versions_per_cycle;
578 let _config = GCConfig {
580 max_versions_per_cycle: usize::MAX,
581 ..self.config.clone()
582 };
583
584 let start = std::time::Instant::now();
585 let watermark = self.watermark();
586
587 let mut versions_collected = 0;
588 let mut bytes_freed = 0;
589 let mut chains_scanned = 0;
590
591 for mut entry in self.chains.iter_mut() {
592 let (removed, freed) = entry.value_mut().gc(watermark);
593 versions_collected += removed;
594 bytes_freed += freed;
595 chains_scanned += 1;
596 }
597
598 self.chains.retain(|_, chain| !chain.is_empty());
600
601 let duration = start.elapsed();
602
603 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
604 self.stats
605 .versions_collected
606 .fetch_add(versions_collected as u64, Ordering::Relaxed);
607 self.stats
608 .bytes_freed
609 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
610 self.stats
611 .chains_scanned
612 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
613 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
614 self.stats
615 .last_gc_duration_us
616 .store(duration.as_micros() as u64, Ordering::Relaxed);
617 self.pending_versions.store(0, Ordering::Relaxed);
618
619 GCResult {
620 versions_collected,
621 bytes_freed,
622 chains_scanned,
623 watermark,
624 duration_us: duration.as_micros() as u64,
625 }
626 }
627
628 pub fn stats(&self) -> GCStatsSnapshot {
630 self.stats.snapshot()
631 }
632
633 pub fn version_count(&self) -> usize {
635 self.chains.iter().map(|entry| entry.value().len()).sum()
636 }
637
638 pub fn chain_count(&self) -> usize {
640 self.chains.len()
641 }
642}
643
644impl<K, V> Default for EpochGC<K, V>
645where
646 K: Eq + std::hash::Hash + Clone,
647 V: Clone,
648{
649 fn default() -> Self {
650 Self::new()
651 }
652}
653
654#[derive(Debug, Clone)]
656pub struct GCResult {
657 pub versions_collected: usize,
658 pub bytes_freed: usize,
659 pub chains_scanned: usize,
660 pub watermark: u64,
661 pub duration_us: u64,
662}
663
664pub struct ReadGuard {
666 pub epoch: u64,
667 reader_id: u64,
668 registry: Arc<ReaderRegistry>,
669}
670
671impl Drop for ReadGuard {
672 fn drop(&mut self) {
673 self.registry.unregister(self.reader_id);
674 }
675}
676
677#[cfg(test)]
678mod tests {
679 use super::*;
680
681 #[test]
682 fn test_version_id() {
683 let v1 = VersionId::new(1, 0);
684 let v2 = VersionId::new(2, 0);
685
686 assert!(v1 < v2);
687 assert!(v1.is_stale(2));
688 assert!(!v2.is_stale(2));
689 }
690
691 #[test]
692 fn test_version_chain_basic() {
693 let mut chain: VersionChain<String> = VersionChain::new();
694
695 chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
696 chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
697
698 assert_eq!(chain.len(), 2);
699 assert_eq!(chain.latest().unwrap().value, "v2");
700 }
701
702 #[test]
703 fn test_version_chain_gc() {
704 let mut chain: VersionChain<String> = VersionChain::new();
705
706 for epoch in 0..5 {
708 chain.add_version(VersionedValue::new(
709 VersionId::new(epoch, 0),
710 format!("v{}", epoch),
711 ));
712 }
713
714 assert_eq!(chain.len(), 5);
715
716 let (removed, _) = chain.gc(3);
718
719 assert!(removed > 0);
721 assert!(chain.len() < 5);
722 }
723
724 #[test]
725 fn test_reader_registry() {
726 let registry = ReaderRegistry::new();
727
728 let r1 = registry.register(10).unwrap();
729 let _r2 = registry.register(20).unwrap();
730
731 assert_eq!(registry.active_count(), 2);
732 assert_eq!(registry.min_active_epoch(), Some(10));
733
734 registry.unregister(r1);
735 assert_eq!(registry.active_count(), 1);
736 assert_eq!(registry.min_active_epoch(), Some(20));
737 }
738
739 #[test]
740 fn test_epoch_gc_basic() {
741 let gc: EpochGC<String, i32> = EpochGC::new();
742
743 let _v1 = gc.insert("key1".to_string(), 100);
744 let _v2 = gc.insert("key1".to_string(), 200);
745
746 assert_eq!(gc.get(&"key1".to_string()), Some(200));
747 assert_eq!(gc.version_count(), 2);
748 }
749
750 #[test]
751 fn test_epoch_gc_delete() {
752 let gc: EpochGC<String, i32> = EpochGC::new();
753
754 gc.insert("key1".to_string(), 100);
755 gc.delete("key1".to_string(), 0); assert_eq!(gc.get(&"key1".to_string()), None);
758 }
759
760 #[test]
761 fn test_epoch_gc_at_epoch() {
762 let gc: EpochGC<String, i32> = EpochGC::new();
763
764 gc.insert("key1".to_string(), 100);
765 gc.advance_epoch();
766 gc.insert("key1".to_string(), 200);
767 gc.advance_epoch();
768 gc.insert("key1".to_string(), 300);
769
770 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(100)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(200)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 3), Some(300)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), None); }
777
778 #[test]
779 fn test_read_guard() {
780 let gc: EpochGC<String, i32> = EpochGC::new();
781
782 gc.insert("key1".to_string(), 100);
783
784 {
785 let _guard = gc.begin_read();
786 assert_eq!(gc.readers.active_count(), 1);
787 }
788
789 assert_eq!(gc.readers.active_count(), 0);
790 }
791
792 #[test]
793 fn test_watermark_calculation() {
794 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
795 min_epochs_to_keep: 2,
796 ..Default::default()
797 });
798
799 gc.insert("k".to_string(), 1);
801 gc.advance_epoch(); gc.insert("k".to_string(), 2);
803 gc.advance_epoch(); gc.insert("k".to_string(), 3);
805 gc.advance_epoch(); gc.insert("k".to_string(), 4);
807 gc.advance_epoch(); assert!(gc.watermark() <= 2);
811
812 let _guard = gc.begin_read();
814 assert!(gc.watermark() <= gc.current_epoch());
815 }
816
817 #[test]
818 fn test_gc_cycle() {
819 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
820 min_epochs_to_keep: 1,
821 gc_trigger_threshold: 100,
822 max_versions_per_cycle: 100,
823 });
824
825 for i in 0..10 {
827 gc.insert("key".to_string(), i);
828 gc.advance_epoch();
829 }
830
831 assert_eq!(gc.version_count(), 10);
832
833 let result = gc.try_gc();
835
836 assert!(result.versions_collected > 0 || gc.version_count() < 10);
838 }
839
840 #[test]
841 fn test_gc_stats() {
842 let gc: EpochGC<String, i32> = EpochGC::new();
843
844 for i in 0..5 {
845 gc.insert("key".to_string(), i);
846 gc.advance_epoch();
847 }
848
849 gc.try_gc();
850
851 let stats = gc.stats();
852 assert!(stats.gc_cycles >= 1);
853 }
854
855 #[test]
856 fn test_force_gc() {
857 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
858 min_epochs_to_keep: 0,
859 gc_trigger_threshold: 1000,
860 max_versions_per_cycle: 1, });
862
863 for i in 0..20 {
864 gc.insert(format!("key{}", i), i);
865 }
866
867 gc.advance_epoch();
868 gc.advance_epoch();
869
870 let initial_count = gc.version_count();
871 gc.force_gc();
872 let final_count = gc.version_count();
873
874 assert!(final_count <= initial_count);
876 }
877
878 #[test]
879 fn test_chain_count() {
880 let gc: EpochGC<String, i32> = EpochGC::new();
881
882 gc.insert("key1".to_string(), 1);
883 gc.insert("key2".to_string(), 2);
884 gc.insert("key3".to_string(), 3);
885
886 assert_eq!(gc.chain_count(), 3);
887 }
888
889 #[test]
890 fn test_version_at_respects_tombstone() {
891 let mut chain: VersionChain<i32> = VersionChain::new();
892
893 chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
894 chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
895
896 assert!(chain.version_at(2).is_none());
899 assert_eq!(chain.version_at(1).map(|v| v.value), Some(100));
901 assert!(chain.version_at(0).is_none());
903 }
904
905 #[test]
906 fn test_gc_result_fields() {
907 let gc: EpochGC<String, i32> = EpochGC::new();
908
909 for i in 0..5 {
910 gc.insert("key".to_string(), i);
911 gc.advance_epoch();
912 }
913
914 let result = gc.try_gc();
915
916 assert!(result.watermark <= gc.current_epoch());
918 assert!(result.chains_scanned <= gc.chain_count() + 1);
919 }
920
921 #[test]
922 fn test_lock_free_slot_registration() {
923 let registry = ReaderRegistry::new();
924
925 let id0 = registry.register(10).unwrap();
927 let id1 = registry.register(20).unwrap();
928 let id2 = registry.register(30).unwrap();
929
930 assert_ne!(id0, id1);
931 assert_ne!(id1, id2);
932 assert_eq!(registry.active_count(), 3);
933 assert_eq!(registry.min_active_epoch(), Some(10));
934
935 registry.unregister(id1);
937 assert_eq!(registry.active_count(), 2);
938 assert_eq!(registry.min_active_epoch(), Some(10));
939
940 registry.unregister(id0);
942 assert_eq!(registry.active_count(), 1);
943 assert_eq!(registry.min_active_epoch(), Some(30));
944
945 registry.unregister(id2);
947 assert_eq!(registry.active_count(), 0);
948 assert_eq!(registry.min_active_epoch(), None);
949 }
950
951 #[test]
952 fn test_concurrent_insert_and_gc() {
953 use std::sync::Arc;
954 use std::thread;
955
956 let gc = Arc::new(EpochGC::<u64, u64>::new());
957
958 let mut handles = Vec::new();
960 for t in 0..4 {
961 let gc = Arc::clone(&gc);
962 handles.push(thread::spawn(move || {
963 for i in 0..100 {
964 gc.insert(t * 1000 + i, i);
965 }
966 }));
967 }
968
969 {
971 let gc = Arc::clone(&gc);
972 handles.push(thread::spawn(move || {
973 for _ in 0..10 {
974 gc.advance_epoch();
975 gc.try_gc();
976 }
977 }));
978 }
979
980 for h in handles {
981 h.join().unwrap();
982 }
983
984 assert_eq!(gc.chain_count(), 400);
986 }
987}