1use parking_lot::RwLock;
49use std::collections::{HashMap, VecDeque};
50use std::sync::Arc;
51use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub struct VersionId {
56 pub epoch: u64,
57 pub sequence: u32,
58}
59
60impl VersionId {
61 pub fn new(epoch: u64, sequence: u32) -> Self {
62 Self { epoch, sequence }
63 }
64
65 pub fn is_stale(&self, watermark: u64) -> bool {
67 self.epoch < watermark
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct VersionedValue<T> {
74 pub version: VersionId,
75 pub value: T,
76 pub deleted: bool,
78}
79
80impl<T> VersionedValue<T> {
81 pub fn new(version: VersionId, value: T) -> Self {
82 Self {
83 version,
84 value,
85 deleted: false,
86 }
87 }
88
89 pub fn tombstone(version: VersionId, value: T) -> Self {
90 Self {
91 version,
92 value,
93 deleted: true,
94 }
95 }
96}
97
98#[derive(Debug)]
100pub struct VersionChain<T> {
101 versions: VecDeque<VersionedValue<T>>,
103 total_versions: u64,
105}
106
107impl<T: Clone> VersionChain<T> {
108 pub fn new() -> Self {
109 Self {
110 versions: VecDeque::new(),
111 total_versions: 0,
112 }
113 }
114
115 pub fn add_version(&mut self, version: VersionedValue<T>) {
117 self.versions.push_front(version);
118 self.total_versions += 1;
119 }
120
121 pub fn latest(&self) -> Option<&VersionedValue<T>> {
123 self.versions.front()
124 }
125
126 pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
129 for v in &self.versions {
130 if v.version.epoch <= epoch {
131 if v.deleted {
134 return None;
135 }
136 return Some(v);
137 }
138 }
139 None
140 }
141
142 pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
145 let initial_len = self.versions.len();
146
147 let mut kept = 0;
149 let mut last_visible_idx = None;
150
151 for (i, v) in self.versions.iter().enumerate() {
152 if v.version.epoch >= watermark {
153 kept += 1;
154 } else {
155 if last_visible_idx.is_none() {
157 last_visible_idx = Some(i);
158 kept += 1;
159 }
160 }
161 }
162
163 while self.versions.len() > kept {
165 self.versions.pop_back();
166 }
167
168 let removed = initial_len - self.versions.len();
169 let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
170
171 (removed, bytes_freed)
172 }
173
174 pub fn len(&self) -> usize {
176 self.versions.len()
177 }
178
179 pub fn is_empty(&self) -> bool {
180 self.versions.is_empty()
181 }
182}
183
184impl<T: Clone> Default for VersionChain<T> {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190#[derive(Debug)]
192pub struct ReaderRegistry {
193 active_readers: RwLock<HashMap<u64, u64>>,
195 next_reader_id: AtomicU64,
197 active_count: AtomicUsize,
199}
200
201impl ReaderRegistry {
202 pub fn new() -> Self {
203 Self {
204 active_readers: RwLock::new(HashMap::new()),
205 next_reader_id: AtomicU64::new(1),
206 active_count: AtomicUsize::new(0),
207 }
208 }
209
210 pub fn register(&self, epoch: u64) -> u64 {
212 let reader_id = self.next_reader_id.fetch_add(1, Ordering::Relaxed);
213 let mut readers = self.active_readers.write();
214 readers.insert(reader_id, epoch);
215 self.active_count.fetch_add(1, Ordering::Relaxed);
216 reader_id
217 }
218
219 pub fn unregister(&self, reader_id: u64) {
221 let mut readers = self.active_readers.write();
222 if readers.remove(&reader_id).is_some() {
223 self.active_count.fetch_sub(1, Ordering::Relaxed);
224 }
225 }
226
227 pub fn min_active_epoch(&self) -> Option<u64> {
229 let readers = self.active_readers.read();
230 readers.values().copied().min()
231 }
232
233 pub fn active_count(&self) -> usize {
235 self.active_count.load(Ordering::Relaxed)
236 }
237}
238
239impl Default for ReaderRegistry {
240 fn default() -> Self {
241 Self::new()
242 }
243}
244
245#[derive(Debug, Default)]
247pub struct GCStats {
248 pub gc_cycles: AtomicU64,
249 pub versions_collected: AtomicU64,
250 pub bytes_freed: AtomicU64,
251 pub chains_scanned: AtomicU64,
252 pub last_gc_epoch: AtomicU64,
253 pub last_gc_duration_us: AtomicU64,
254}
255
256impl GCStats {
257 pub fn snapshot(&self) -> GCStatsSnapshot {
258 GCStatsSnapshot {
259 gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
260 versions_collected: self.versions_collected.load(Ordering::Relaxed),
261 bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
262 chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
263 last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
264 last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
265 }
266 }
267}
268
269#[derive(Debug, Clone)]
270pub struct GCStatsSnapshot {
271 pub gc_cycles: u64,
272 pub versions_collected: u64,
273 pub bytes_freed: u64,
274 pub chains_scanned: u64,
275 pub last_gc_epoch: u64,
276 pub last_gc_duration_us: u64,
277}
278
279#[derive(Debug, Clone)]
281pub struct GCConfig {
282 pub min_epochs_to_keep: u64,
284 pub gc_trigger_threshold: usize,
286 pub max_versions_per_cycle: usize,
288}
289
290impl Default for GCConfig {
291 fn default() -> Self {
292 Self {
293 min_epochs_to_keep: 2,
294 gc_trigger_threshold: 1000,
295 max_versions_per_cycle: 10000,
296 }
297 }
298}
299
300pub struct EpochGC<K, V>
302where
303 K: Eq + std::hash::Hash + Clone,
304 V: Clone,
305{
306 current_epoch: AtomicU64,
308 current_sequence: AtomicU64,
310 chains: RwLock<HashMap<K, VersionChain<V>>>,
312 readers: Arc<ReaderRegistry>,
314 config: GCConfig,
316 stats: GCStats,
318 pending_versions: AtomicUsize,
320}
321
322impl<K, V> EpochGC<K, V>
323where
324 K: Eq + std::hash::Hash + Clone,
325 V: Clone,
326{
327 pub fn new() -> Self {
329 Self::with_config(GCConfig::default())
330 }
331
332 pub fn with_config(config: GCConfig) -> Self {
334 Self {
335 current_epoch: AtomicU64::new(0),
336 current_sequence: AtomicU64::new(0),
337 chains: RwLock::new(HashMap::new()),
338 readers: Arc::new(ReaderRegistry::new()),
339 config,
340 stats: GCStats::default(),
341 pending_versions: AtomicUsize::new(0),
342 }
343 }
344
345 pub fn current_epoch(&self) -> u64 {
347 self.current_epoch.load(Ordering::SeqCst)
348 }
349
350 pub fn advance_epoch(&self) -> u64 {
352 self.current_sequence.store(0, Ordering::SeqCst);
353 self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
354 }
355
356 pub fn next_version(&self) -> VersionId {
358 let epoch = self.current_epoch.load(Ordering::SeqCst);
359 let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
360 VersionId::new(epoch, seq)
361 }
362
363 pub fn insert(&self, key: K, value: V) -> VersionId {
365 let version = self.next_version();
366 let versioned = VersionedValue::new(version, value);
367
368 {
369 let mut chains = self.chains.write();
370 chains.entry(key).or_default().add_version(versioned);
371 }
372
373 let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
374
375 if pending >= self.config.gc_trigger_threshold {
377 self.try_gc();
378 }
379
380 version
381 }
382
383 pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
385 let version = self.next_version();
386 let versioned = VersionedValue::tombstone(version, tombstone_value);
387
388 {
389 let mut chains = self.chains.write();
390 chains.entry(key).or_default().add_version(versioned);
391 }
392
393 self.pending_versions.fetch_add(1, Ordering::Relaxed);
394 version
395 }
396
397 pub fn get(&self, key: &K) -> Option<V> {
399 let chains = self.chains.read();
400 chains
401 .get(key)
402 .and_then(|chain| chain.latest())
403 .filter(|v| !v.deleted)
404 .map(|v| v.value.clone())
405 }
406
407 pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
409 let chains = self.chains.read();
410 chains
411 .get(key)
412 .and_then(|chain| chain.version_at(epoch))
413 .map(|v| v.value.clone())
414 }
415
416 pub fn begin_read(&self) -> ReadGuard {
418 let epoch = self.current_epoch.load(Ordering::SeqCst);
419 let reader_id = self.readers.register(epoch);
420 ReadGuard {
421 epoch,
422 reader_id,
423 registry: Arc::clone(&self.readers),
424 }
425 }
426
427 pub fn watermark(&self) -> u64 {
429 let current = self.current_epoch.load(Ordering::SeqCst);
430 let min_reader = self.readers.min_active_epoch().unwrap_or(current);
431
432 let grace = current.saturating_sub(self.config.min_epochs_to_keep);
434 grace.min(min_reader)
435 }
436
437 pub fn try_gc(&self) -> GCResult {
439 let start = std::time::Instant::now();
440 let watermark = self.watermark();
441
442 let mut versions_collected = 0;
443 let mut bytes_freed = 0;
444 let mut chains_scanned = 0;
445
446 {
447 let mut chains = self.chains.write();
448 let keys: Vec<K> = chains.keys().cloned().collect();
449
450 for key in keys {
451 if chains_scanned >= self.config.max_versions_per_cycle {
452 break;
453 }
454
455 if let Some(chain) = chains.get_mut(&key) {
456 let (removed, freed) = chain.gc(watermark);
457 versions_collected += removed;
458 bytes_freed += freed;
459 chains_scanned += 1;
460
461 if chain.is_empty() {
463 chains.remove(&key);
464 }
465 }
466 }
467 }
468
469 let duration = start.elapsed();
470
471 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
473 self.stats
474 .versions_collected
475 .fetch_add(versions_collected as u64, Ordering::Relaxed);
476 self.stats
477 .bytes_freed
478 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
479 self.stats
480 .chains_scanned
481 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
482 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
483 self.stats
484 .last_gc_duration_us
485 .store(duration.as_micros() as u64, Ordering::Relaxed);
486
487 self.pending_versions.store(0, Ordering::Relaxed);
489
490 GCResult {
491 versions_collected,
492 bytes_freed,
493 chains_scanned,
494 watermark,
495 duration_us: duration.as_micros() as u64,
496 }
497 }
498
499 pub fn force_gc(&self) -> GCResult {
501 let _old_limit = self.config.max_versions_per_cycle;
502 let _config = GCConfig {
504 max_versions_per_cycle: usize::MAX,
505 ..self.config.clone()
506 };
507
508 let start = std::time::Instant::now();
509 let watermark = self.watermark();
510
511 let mut versions_collected = 0;
512 let mut bytes_freed = 0;
513 let mut chains_scanned = 0;
514
515 {
516 let mut chains = self.chains.write();
517 for chain in chains.values_mut() {
518 let (removed, freed) = chain.gc(watermark);
519 versions_collected += removed;
520 bytes_freed += freed;
521 chains_scanned += 1;
522 }
523
524 chains.retain(|_, chain| !chain.is_empty());
526 }
527
528 let duration = start.elapsed();
529
530 self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
531 self.stats
532 .versions_collected
533 .fetch_add(versions_collected as u64, Ordering::Relaxed);
534 self.stats
535 .bytes_freed
536 .fetch_add(bytes_freed as u64, Ordering::Relaxed);
537 self.stats
538 .chains_scanned
539 .fetch_add(chains_scanned as u64, Ordering::Relaxed);
540 self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
541 self.stats
542 .last_gc_duration_us
543 .store(duration.as_micros() as u64, Ordering::Relaxed);
544 self.pending_versions.store(0, Ordering::Relaxed);
545
546 GCResult {
547 versions_collected,
548 bytes_freed,
549 chains_scanned,
550 watermark,
551 duration_us: duration.as_micros() as u64,
552 }
553 }
554
555 pub fn stats(&self) -> GCStatsSnapshot {
557 self.stats.snapshot()
558 }
559
560 pub fn version_count(&self) -> usize {
562 let chains = self.chains.read();
563 chains.values().map(|c| c.len()).sum()
564 }
565
566 pub fn chain_count(&self) -> usize {
568 self.chains.read().len()
569 }
570}
571
572impl<K, V> Default for EpochGC<K, V>
573where
574 K: Eq + std::hash::Hash + Clone,
575 V: Clone,
576{
577 fn default() -> Self {
578 Self::new()
579 }
580}
581
582#[derive(Debug, Clone)]
584pub struct GCResult {
585 pub versions_collected: usize,
586 pub bytes_freed: usize,
587 pub chains_scanned: usize,
588 pub watermark: u64,
589 pub duration_us: u64,
590}
591
592pub struct ReadGuard {
594 pub epoch: u64,
595 reader_id: u64,
596 registry: Arc<ReaderRegistry>,
597}
598
599impl Drop for ReadGuard {
600 fn drop(&mut self) {
601 self.registry.unregister(self.reader_id);
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use super::*;
608
609 #[test]
610 fn test_version_id() {
611 let v1 = VersionId::new(1, 0);
612 let v2 = VersionId::new(2, 0);
613
614 assert!(v1 < v2);
615 assert!(v1.is_stale(2));
616 assert!(!v2.is_stale(2));
617 }
618
619 #[test]
620 fn test_version_chain_basic() {
621 let mut chain: VersionChain<String> = VersionChain::new();
622
623 chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
624 chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
625
626 assert_eq!(chain.len(), 2);
627 assert_eq!(chain.latest().unwrap().value, "v2");
628 }
629
630 #[test]
631 fn test_version_chain_gc() {
632 let mut chain: VersionChain<String> = VersionChain::new();
633
634 for epoch in 0..5 {
636 chain.add_version(VersionedValue::new(
637 VersionId::new(epoch, 0),
638 format!("v{}", epoch),
639 ));
640 }
641
642 assert_eq!(chain.len(), 5);
643
644 let (removed, _) = chain.gc(3);
646
647 assert!(removed > 0);
649 assert!(chain.len() < 5);
650 }
651
652 #[test]
653 fn test_reader_registry() {
654 let registry = ReaderRegistry::new();
655
656 let r1 = registry.register(10);
657 let _r2 = registry.register(20);
658
659 assert_eq!(registry.active_count(), 2);
660 assert_eq!(registry.min_active_epoch(), Some(10));
661
662 registry.unregister(r1);
663 assert_eq!(registry.active_count(), 1);
664 assert_eq!(registry.min_active_epoch(), Some(20));
665 }
666
667 #[test]
668 fn test_epoch_gc_basic() {
669 let gc: EpochGC<String, i32> = EpochGC::new();
670
671 let _v1 = gc.insert("key1".to_string(), 100);
672 let _v2 = gc.insert("key1".to_string(), 200);
673
674 assert_eq!(gc.get(&"key1".to_string()), Some(200));
675 assert_eq!(gc.version_count(), 2);
676 }
677
678 #[test]
679 fn test_epoch_gc_delete() {
680 let gc: EpochGC<String, i32> = EpochGC::new();
681
682 gc.insert("key1".to_string(), 100);
683 gc.delete("key1".to_string(), 0); assert_eq!(gc.get(&"key1".to_string()), None);
686 }
687
688 #[test]
689 fn test_epoch_gc_at_epoch() {
690 let gc: EpochGC<String, i32> = EpochGC::new();
691
692 gc.insert("key1".to_string(), 100);
693 gc.advance_epoch();
694 gc.insert("key1".to_string(), 200);
695 gc.advance_epoch();
696 gc.insert("key1".to_string(), 300);
697
698 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), Some(100));
699 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(200));
700 assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(300));
701 }
702
703 #[test]
704 fn test_read_guard() {
705 let gc: EpochGC<String, i32> = EpochGC::new();
706
707 gc.insert("key1".to_string(), 100);
708
709 {
710 let _guard = gc.begin_read();
711 assert_eq!(gc.readers.active_count(), 1);
712 }
713
714 assert_eq!(gc.readers.active_count(), 0);
715 }
716
717 #[test]
718 fn test_watermark_calculation() {
719 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
720 min_epochs_to_keep: 2,
721 ..Default::default()
722 });
723
724 gc.insert("k".to_string(), 1);
726 gc.advance_epoch(); gc.insert("k".to_string(), 2);
728 gc.advance_epoch(); gc.insert("k".to_string(), 3);
730 gc.advance_epoch(); gc.insert("k".to_string(), 4);
732 gc.advance_epoch(); assert!(gc.watermark() <= 2);
736
737 let _guard = gc.begin_read();
739 assert!(gc.watermark() <= gc.current_epoch());
740 }
741
742 #[test]
743 fn test_gc_cycle() {
744 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
745 min_epochs_to_keep: 1,
746 gc_trigger_threshold: 100,
747 max_versions_per_cycle: 100,
748 });
749
750 for i in 0..10 {
752 gc.insert("key".to_string(), i);
753 gc.advance_epoch();
754 }
755
756 assert_eq!(gc.version_count(), 10);
757
758 let result = gc.try_gc();
760
761 assert!(result.versions_collected > 0 || gc.version_count() < 10);
763 }
764
765 #[test]
766 fn test_gc_stats() {
767 let gc: EpochGC<String, i32> = EpochGC::new();
768
769 for i in 0..5 {
770 gc.insert("key".to_string(), i);
771 gc.advance_epoch();
772 }
773
774 gc.try_gc();
775
776 let stats = gc.stats();
777 assert!(stats.gc_cycles >= 1);
778 }
779
780 #[test]
781 fn test_force_gc() {
782 let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
783 min_epochs_to_keep: 0,
784 gc_trigger_threshold: 1000,
785 max_versions_per_cycle: 1, });
787
788 for i in 0..20 {
789 gc.insert(format!("key{}", i), i);
790 }
791
792 gc.advance_epoch();
793 gc.advance_epoch();
794
795 let initial_count = gc.version_count();
796 gc.force_gc();
797 let final_count = gc.version_count();
798
799 assert!(final_count <= initial_count);
801 }
802
803 #[test]
804 fn test_chain_count() {
805 let gc: EpochGC<String, i32> = EpochGC::new();
806
807 gc.insert("key1".to_string(), 1);
808 gc.insert("key2".to_string(), 2);
809 gc.insert("key3".to_string(), 3);
810
811 assert_eq!(gc.chain_count(), 3);
812 }
813
814 #[test]
815 fn test_version_at_respects_tombstone() {
816 let mut chain: VersionChain<i32> = VersionChain::new();
817
818 chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
819 chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
820
821 assert!(chain.version_at(1).is_none());
823 assert_eq!(chain.version_at(0).map(|v| v.value), Some(100));
825 }
826
827 #[test]
828 fn test_gc_result_fields() {
829 let gc: EpochGC<String, i32> = EpochGC::new();
830
831 for i in 0..5 {
832 gc.insert("key".to_string(), i);
833 gc.advance_epoch();
834 }
835
836 let result = gc.try_gc();
837
838 assert!(result.watermark <= gc.current_epoch());
840 assert!(result.chains_scanned <= gc.chain_count() + 1);
841 }
842}