1use dashmap::DashMap;
52use std::collections::VecDeque;
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
58pub struct VersionId {
59 pub epoch: u64,
60 pub sequence: u32,
61}
62
63impl VersionId {
64 pub fn new(epoch: u64, sequence: u32) -> Self {
65 Self { epoch, sequence }
66 }
67
68 pub fn is_stale(&self, watermark: u64) -> bool {
70 self.epoch < watermark
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct VersionedValue<T> {
77 pub version: VersionId,
78 pub value: T,
79 pub deleted: bool,
81}
82
83impl<T> VersionedValue<T> {
84 pub fn new(version: VersionId, value: T) -> Self {
85 Self {
86 version,
87 value,
88 deleted: false,
89 }
90 }
91
92 pub fn tombstone(version: VersionId, value: T) -> Self {
93 Self {
94 version,
95 value,
96 deleted: true,
97 }
98 }
99}
100
101#[derive(Debug)]
103pub struct VersionChain<T> {
104 versions: VecDeque<VersionedValue<T>>,
106 total_versions: u64,
108}
109
110impl<T: Clone> VersionChain<T> {
111 pub fn new() -> Self {
112 Self {
113 versions: VecDeque::new(),
114 total_versions: 0,
115 }
116 }
117
118 pub fn add_version(&mut self, version: VersionedValue<T>) {
120 self.versions.push_front(version);
121 self.total_versions += 1;
122 }
123
124 pub fn latest(&self) -> Option<&VersionedValue<T>> {
126 self.versions.front()
127 }
128
129 pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
132 for v in &self.versions {
133 if v.version.epoch < epoch {
134 if v.deleted {
137 return None;
138 }
139 return Some(v);
140 }
141 }
142 None
143 }
144
145 pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
148 let initial_len = self.versions.len();
149
150 let mut kept = 0;
152 let mut found_base = false;
153
154 for v in self.versions.iter() {
155 if v.version.epoch >= watermark {
156 kept += 1;
157 } else if !found_base {
158 found_base = true;
160 kept += 1;
161 }
162 }
163
164 self.versions.truncate(kept);
166
167 let removed = initial_len - self.versions.len();
168 let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
169
170 (removed, bytes_freed)
171 }
172
173 pub fn len(&self) -> usize {
175 self.versions.len()
176 }
177
178 pub fn is_empty(&self) -> bool {
179 self.versions.is_empty()
180 }
181}
182
183impl<T: Clone> Default for VersionChain<T> {
184 fn default() -> Self {
185 Self::new()
186 }
187}
188
189const MAX_READER_SLOTS: usize = 256;
192
193const SLOT_EMPTY: u64 = u64::MAX;
195
196#[derive(Debug)]
202#[repr(C, align(64))]
203struct EpochSlot {
204 epoch: AtomicU64,
205}
206
207impl EpochSlot {
208 const fn empty() -> Self {
209 Self {
210 epoch: AtomicU64::new(SLOT_EMPTY),
211 }
212 }
213}
214
215#[derive(Debug)]
225pub struct ReaderRegistry {
226 slots: Box<[EpochSlot; MAX_READER_SLOTS]>,
228 active_count: AtomicUsize,
230}
231
232impl ReaderRegistry {
233 pub fn new() -> Self {
234 let slots: Box<[EpochSlot; MAX_READER_SLOTS]> = {
236 let mut v: Vec<EpochSlot> = Vec::with_capacity(MAX_READER_SLOTS);
237 for _ in 0..MAX_READER_SLOTS {
238 v.push(EpochSlot::empty());
239 }
240 v.into_boxed_slice().try_into().ok().unwrap()
241 };
242 Self {
243 slots,
244 active_count: AtomicUsize::new(0),
245 }
246 }
247
248 pub fn register(&self, epoch: u64) -> Option<u64> {
258 for (i, slot) in self.slots.iter().enumerate() {
259 if slot
260 .epoch
261 .compare_exchange(SLOT_EMPTY, epoch, Ordering::AcqRel, Ordering::Relaxed)
262 .is_ok()
263 {
264 self.active_count.fetch_add(1, Ordering::Relaxed);
265 return Some(i as u64);
266 }
267 }
268 None
273 }
274
275 pub fn unregister(&self, reader_id: u64) {
279 let idx = reader_id as usize;
280 if idx < MAX_READER_SLOTS {
281 let prev = self.slots[idx].epoch.swap(SLOT_EMPTY, Ordering::Release);
282 if prev != SLOT_EMPTY {
283 self.active_count.fetch_sub(1, Ordering::Relaxed);
284 }
285 }
286 }
287
288 pub fn min_active_epoch(&self) -> Option<u64> {
292 let mut min_epoch = u64::MAX;
293 for slot in self.slots.iter() {
294 let e = slot.epoch.load(Ordering::Relaxed);
295 if e != SLOT_EMPTY && e < min_epoch {
296 min_epoch = e;
297 }
298 }
299 if min_epoch == u64::MAX {
300 None
301 } else {
302 Some(min_epoch)
303 }
304 }
305
306 pub fn active_count(&self) -> usize {
308 self.active_count.load(Ordering::Relaxed)
309 }
310}
311
312impl Default for ReaderRegistry {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318#[derive(Debug, Default)]
320pub struct GCStats {
321 pub gc_cycles: AtomicU64,
322 pub versions_collected: AtomicU64,
323 pub bytes_freed: AtomicU64,
324 pub chains_scanned: AtomicU64,
325 pub last_gc_epoch: AtomicU64,
326 pub last_gc_duration_us: AtomicU64,
327}
328
329impl GCStats {
330 pub fn snapshot(&self) -> GCStatsSnapshot {
331 GCStatsSnapshot {
332 gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
333 versions_collected: self.versions_collected.load(Ordering::Relaxed),
334 bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
335 chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
336 last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
337 last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
338 }
339 }
340}
341
342#[derive(Debug, Clone)]
343pub struct GCStatsSnapshot {
344 pub gc_cycles: u64,
345 pub versions_collected: u64,
346 pub bytes_freed: u64,
347 pub chains_scanned: u64,
348 pub last_gc_epoch: u64,
349 pub last_gc_duration_us: u64,
350}
351
352#[derive(Debug, Clone)]
354pub struct GCConfig {
355 pub min_epochs_to_keep: u64,
357 pub gc_trigger_threshold: usize,
359 pub max_versions_per_cycle: usize,
361}
362
363impl Default for GCConfig {
364 fn default() -> Self {
365 Self {
366 min_epochs_to_keep: 2,
367 gc_trigger_threshold: 1000,
368 max_versions_per_cycle: 10000,
369 }
370 }
371}
372
373pub struct EpochGC<K, V>
375where
376 K: Eq + std::hash::Hash + Clone,
377 V: Clone,
378{
379 current_epoch: AtomicU64,
381 current_sequence: AtomicU64,
383 chains: DashMap<K, VersionChain<V>>,
386 readers: Arc<ReaderRegistry>,
388 config: GCConfig,
390 stats: GCStats,
392 pending_versions: AtomicUsize,
394}
395
396impl<K, V> EpochGC<K, V>
397where
398 K: Eq + std::hash::Hash + Clone,
399 V: Clone,
400{
401 pub fn new() -> Self {
403 Self::with_config(GCConfig::default())
404 }
405
406 pub fn with_config(config: GCConfig) -> Self {
408 Self {
409 current_epoch: AtomicU64::new(0),
410 current_sequence: AtomicU64::new(0),
411 chains: DashMap::new(),
412 readers: Arc::new(ReaderRegistry::new()),
413 config,
414 stats: GCStats::default(),
415 pending_versions: AtomicUsize::new(0),
416 }
417 }
418
419 pub fn current_epoch(&self) -> u64 {
421 self.current_epoch.load(Ordering::SeqCst)
422 }
423
424 pub fn advance_epoch(&self) -> u64 {
426 self.current_sequence.store(0, Ordering::SeqCst);
427 self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
428 }
429
430 pub fn next_version(&self) -> VersionId {
432 let epoch = self.current_epoch.load(Ordering::SeqCst);
433 let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
434 VersionId::new(epoch, seq)
435 }
436
437 pub fn insert(&self, key: K, value: V) -> VersionId {
439 let version = self.next_version();
440 let versioned = VersionedValue::new(version, value);
441
442 self.chains.entry(key).or_default().add_version(versioned);
443
444 let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
445
446 if pending >= self.config.gc_trigger_threshold {
448 self.try_gc();
449 }
450
451 version
452 }
453
454 pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
456 let version = self.next_version();
457 let versioned = VersionedValue::tombstone(version, tombstone_value);
458
459 self.chains.entry(key).or_default().add_version(versioned);
460
461 self.pending_versions.fetch_add(1, Ordering::Relaxed);
462 version
463 }
464
465 pub fn get(&self, key: &K) -> Option<V> {
467 self.chains
468 .get(key)
469 .and_then(|entry| entry.latest().cloned())
470 .filter(|v| !v.deleted)
471 .map(|v| v.value.clone())
472 }
473
474 pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
476 self.chains
477 .get(key)
478 .and_then(|entry| entry.version_at(epoch).cloned())
479 .map(|v| v.value.clone())
480 }
481
482 pub fn begin_read(&self) -> ReadGuard {
488 let epoch = self.current_epoch.load(Ordering::SeqCst);
489 let reader_id = self.readers.register(epoch).unwrap_or_else(|| {
490 panic!(
491 "EpochGC: all {} reader slots exhausted — too many concurrent readers. \
492 Ensure ReadGuards are dropped promptly.",
493 MAX_READER_SLOTS
494 )
495 });
496 ReadGuard {
497 epoch,
498 reader_id,
499 registry: Arc::clone(&self.readers),
500 }
501 }
502
503 pub fn watermark(&self) -> u64 {
505 let current = self.current_epoch.load(Ordering::SeqCst);
506 let min_reader = self.readers.min_active_epoch().unwrap_or(current);
507
508 let grace = current.saturating_sub(self.config.min_epochs_to_keep);
510 grace.min(min_reader)
511 }
512
513 pub fn try_gc(&self) -> GCResult {
515 let start = std::time::Instant::now();
516 let watermark = self.watermark();
517
518 let mut versions_collected = 0;
519 let mut bytes_freed = 0;
520 let mut chains_scanned = 0;
521
522 let keys: Vec<K> = self.chains.iter().map(|entry| entry.key().clone()).collect();
524
525 for key in keys {
526 if chains_scanned >= self.config.max_versions_per_cycle {
527 break;
528 }
529
530 if let Some(mut entry) = self.chains.get_mut(&key) {
531 let (removed, freed) = entry.gc(watermark);
532 versions_collected += removed;
533 bytes_freed += freed;
534 chains_scanned += 1;
535 }
536 }
537
538 self.chains.retain(|_, chain| !chain.is_empty());
540
541 let duration = start.elapsed();
542
543 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
545 self.stats
546 .versions_collected
547 .fetch_add(versions_collected as u64, Ordering::Relaxed);
548 self.stats
549 .bytes_freed
550 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
551 self.stats
552 .chains_scanned
553 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
554 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
555 self.stats
556 .last_gc_duration_us
557 .store(duration.as_micros() as u64, Ordering::Relaxed);
558
559 self.pending_versions.store(0, Ordering::Relaxed);
561
562 GCResult {
563 versions_collected,
564 bytes_freed,
565 chains_scanned,
566 watermark,
567 duration_us: duration.as_micros() as u64,
568 }
569 }
570
571 pub fn force_gc(&self) -> GCResult {
573 let _old_limit = self.config.max_versions_per_cycle;
574 let _config = GCConfig {
576 max_versions_per_cycle: usize::MAX,
577 ..self.config.clone()
578 };
579
580 let start = std::time::Instant::now();
581 let watermark = self.watermark();
582
583 let mut versions_collected = 0;
584 let mut bytes_freed = 0;
585 let mut chains_scanned = 0;
586
587 for mut entry in self.chains.iter_mut() {
588 let (removed, freed) = entry.value_mut().gc(watermark);
589 versions_collected += removed;
590 bytes_freed += freed;
591 chains_scanned += 1;
592 }
593
594 self.chains.retain(|_, chain| !chain.is_empty());
596
597 let duration = start.elapsed();
598
599 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
600 self.stats
601 .versions_collected
602 .fetch_add(versions_collected as u64, Ordering::Relaxed);
603 self.stats
604 .bytes_freed
605 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
606 self.stats
607 .chains_scanned
608 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
609 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
610 self.stats
611 .last_gc_duration_us
612 .store(duration.as_micros() as u64, Ordering::Relaxed);
613 self.pending_versions.store(0, Ordering::Relaxed);
614
615 GCResult {
616 versions_collected,
617 bytes_freed,
618 chains_scanned,
619 watermark,
620 duration_us: duration.as_micros() as u64,
621 }
622 }
623
624 pub fn stats(&self) -> GCStatsSnapshot {
626 self.stats.snapshot()
627 }
628
629 pub fn version_count(&self) -> usize {
631 self.chains.iter().map(|entry| entry.value().len()).sum()
632 }
633
634 pub fn chain_count(&self) -> usize {
636 self.chains.len()
637 }
638}
639
640impl<K, V> Default for EpochGC<K, V>
641where
642 K: Eq + std::hash::Hash + Clone,
643 V: Clone,
644{
645 fn default() -> Self {
646 Self::new()
647 }
648}
649
650#[derive(Debug, Clone)]
652pub struct GCResult {
653 pub versions_collected: usize,
654 pub bytes_freed: usize,
655 pub chains_scanned: usize,
656 pub watermark: u64,
657 pub duration_us: u64,
658}
659
660pub struct ReadGuard {
662 pub epoch: u64,
663 reader_id: u64,
664 registry: Arc<ReaderRegistry>,
665}
666
667impl Drop for ReadGuard {
668 fn drop(&mut self) {
669 self.registry.unregister(self.reader_id);
670 }
671}
672
673#[cfg(test)]
674mod tests {
675 use super::*;
676
677 #[test]
678 fn test_version_id() {
679 let v1 = VersionId::new(1, 0);
680 let v2 = VersionId::new(2, 0);
681
682 assert!(v1 < v2);
683 assert!(v1.is_stale(2));
684 assert!(!v2.is_stale(2));
685 }
686
687 #[test]
688 fn test_version_chain_basic() {
689 let mut chain: VersionChain<String> = VersionChain::new();
690
691 chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
692 chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
693
694 assert_eq!(chain.len(), 2);
695 assert_eq!(chain.latest().unwrap().value, "v2");
696 }
697
698 #[test]
699 fn test_version_chain_gc() {
700 let mut chain: VersionChain<String> = VersionChain::new();
701
702 for epoch in 0..5 {
704 chain.add_version(VersionedValue::new(
705 VersionId::new(epoch, 0),
706 format!("v{}", epoch),
707 ));
708 }
709
710 assert_eq!(chain.len(), 5);
711
712 let (removed, _) = chain.gc(3);
714
715 assert!(removed > 0);
717 assert!(chain.len() < 5);
718 }
719
720 #[test]
721 fn test_reader_registry() {
722 let registry = ReaderRegistry::new();
723
724 let r1 = registry.register(10).unwrap();
725 let _r2 = registry.register(20).unwrap();
726
727 assert_eq!(registry.active_count(), 2);
728 assert_eq!(registry.min_active_epoch(), Some(10));
729
730 registry.unregister(r1);
731 assert_eq!(registry.active_count(), 1);
732 assert_eq!(registry.min_active_epoch(), Some(20));
733 }
734
735 #[test]
736 fn test_epoch_gc_basic() {
737 let gc: EpochGC<String, i32> = EpochGC::new();
738
739 let _v1 = gc.insert("key1".to_string(), 100);
740 let _v2 = gc.insert("key1".to_string(), 200);
741
742 assert_eq!(gc.get(&"key1".to_string()), Some(200));
743 assert_eq!(gc.version_count(), 2);
744 }
745
746 #[test]
747 fn test_epoch_gc_delete() {
748 let gc: EpochGC<String, i32> = EpochGC::new();
749
750 gc.insert("key1".to_string(), 100);
751 gc.delete("key1".to_string(), 0); assert_eq!(gc.get(&"key1".to_string()), None);
754 }
755
756 #[test]
757 fn test_epoch_gc_at_epoch() {
758 let gc: EpochGC<String, i32> = EpochGC::new();
759
760 gc.insert("key1".to_string(), 100);
761 gc.advance_epoch();
762 gc.insert("key1".to_string(), 200);
763 gc.advance_epoch();
764 gc.insert("key1".to_string(), 300);
765
766 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(100)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(200)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 3), Some(300)); assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), None); }
773
774 #[test]
775 fn test_read_guard() {
776 let gc: EpochGC<String, i32> = EpochGC::new();
777
778 gc.insert("key1".to_string(), 100);
779
780 {
781 let _guard = gc.begin_read();
782 assert_eq!(gc.readers.active_count(), 1);
783 }
784
785 assert_eq!(gc.readers.active_count(), 0);
786 }
787
788 #[test]
789 fn test_watermark_calculation() {
790 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
791 min_epochs_to_keep: 2,
792 ..Default::default()
793 });
794
795 gc.insert("k".to_string(), 1);
797 gc.advance_epoch(); gc.insert("k".to_string(), 2);
799 gc.advance_epoch(); gc.insert("k".to_string(), 3);
801 gc.advance_epoch(); gc.insert("k".to_string(), 4);
803 gc.advance_epoch(); assert!(gc.watermark() <= 2);
807
808 let _guard = gc.begin_read();
810 assert!(gc.watermark() <= gc.current_epoch());
811 }
812
813 #[test]
814 fn test_gc_cycle() {
815 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
816 min_epochs_to_keep: 1,
817 gc_trigger_threshold: 100,
818 max_versions_per_cycle: 100,
819 });
820
821 for i in 0..10 {
823 gc.insert("key".to_string(), i);
824 gc.advance_epoch();
825 }
826
827 assert_eq!(gc.version_count(), 10);
828
829 let result = gc.try_gc();
831
832 assert!(result.versions_collected > 0 || gc.version_count() < 10);
834 }
835
836 #[test]
837 fn test_gc_stats() {
838 let gc: EpochGC<String, i32> = EpochGC::new();
839
840 for i in 0..5 {
841 gc.insert("key".to_string(), i);
842 gc.advance_epoch();
843 }
844
845 gc.try_gc();
846
847 let stats = gc.stats();
848 assert!(stats.gc_cycles >= 1);
849 }
850
851 #[test]
852 fn test_force_gc() {
853 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
854 min_epochs_to_keep: 0,
855 gc_trigger_threshold: 1000,
856 max_versions_per_cycle: 1, });
858
859 for i in 0..20 {
860 gc.insert(format!("key{}", i), i);
861 }
862
863 gc.advance_epoch();
864 gc.advance_epoch();
865
866 let initial_count = gc.version_count();
867 gc.force_gc();
868 let final_count = gc.version_count();
869
870 assert!(final_count <= initial_count);
872 }
873
874 #[test]
875 fn test_chain_count() {
876 let gc: EpochGC<String, i32> = EpochGC::new();
877
878 gc.insert("key1".to_string(), 1);
879 gc.insert("key2".to_string(), 2);
880 gc.insert("key3".to_string(), 3);
881
882 assert_eq!(gc.chain_count(), 3);
883 }
884
885 #[test]
886 fn test_version_at_respects_tombstone() {
887 let mut chain: VersionChain<i32> = VersionChain::new();
888
889 chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
890 chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
891
892 assert!(chain.version_at(2).is_none());
895 assert_eq!(chain.version_at(1).map(|v| v.value), Some(100));
897 assert!(chain.version_at(0).is_none());
899 }
900
901 #[test]
902 fn test_gc_result_fields() {
903 let gc: EpochGC<String, i32> = EpochGC::new();
904
905 for i in 0..5 {
906 gc.insert("key".to_string(), i);
907 gc.advance_epoch();
908 }
909
910 let result = gc.try_gc();
911
912 assert!(result.watermark <= gc.current_epoch());
914 assert!(result.chains_scanned <= gc.chain_count() + 1);
915 }
916
917 #[test]
918 fn test_lock_free_slot_registration() {
919 let registry = ReaderRegistry::new();
920
921 let id0 = registry.register(10).unwrap();
923 let id1 = registry.register(20).unwrap();
924 let id2 = registry.register(30).unwrap();
925
926 assert_ne!(id0, id1);
927 assert_ne!(id1, id2);
928 assert_eq!(registry.active_count(), 3);
929 assert_eq!(registry.min_active_epoch(), Some(10));
930
931 registry.unregister(id1);
933 assert_eq!(registry.active_count(), 2);
934 assert_eq!(registry.min_active_epoch(), Some(10));
935
936 registry.unregister(id0);
938 assert_eq!(registry.active_count(), 1);
939 assert_eq!(registry.min_active_epoch(), Some(30));
940
941 registry.unregister(id2);
943 assert_eq!(registry.active_count(), 0);
944 assert_eq!(registry.min_active_epoch(), None);
945 }
946
947 #[test]
948 fn test_concurrent_insert_and_gc() {
949 use std::sync::Arc;
950 use std::thread;
951
952 let gc = Arc::new(EpochGC::<u64, u64>::new());
953
954 let mut handles = Vec::new();
956 for t in 0..4 {
957 let gc = Arc::clone(&gc);
958 handles.push(thread::spawn(move || {
959 for i in 0..100 {
960 gc.insert(t * 1000 + i, i);
961 }
962 }));
963 }
964
965 {
967 let gc = Arc::clone(&gc);
968 handles.push(thread::spawn(move || {
969 for _ in 0..10 {
970 gc.advance_epoch();
971 gc.try_gc();
972 }
973 }));
974 }
975
976 for h in handles {
977 h.join().unwrap();
978 }
979
980 assert_eq!(gc.chain_count(), 400);
982 }
983}