Skip to main content

sochdb_core/
epoch_gc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Epoch-Based Garbage Collection for Version Cleanup
19//!
20//! This module provides epoch-based garbage collection for cleaning up
21//! old versions of data in a multi-version concurrency control (MVCC) system.
22//!
23//! # Design
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │                    Epoch-Based GC                               │
28//! │                                                                 │
29//! │  Time ──────────────────────────────────────────────────────→  │
30//! │                                                                 │
31//! │  Epoch 0     Epoch 1     Epoch 2     Epoch 3     Epoch 4       │
32//! │  ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐       │
33//! │  │v1,v2│     │v3,v4│     │v5   │     │v6,v7│     │v8   │       │
34//! │  └─────┘     └─────┘     └─────┘     └─────┘     └─────┘       │
35//! │    ↑                                               ↑           │
36//! │    │                                               │           │
37//! │  Min active                                    Current         │
38//! │  reader epoch                                   epoch          │
39//! │                                                                 │
40//! │  Versions in epochs < min_active can be safely collected       │
41//! └─────────────────────────────────────────────────────────────────┘
42//! ```
43//!
44//! # Key Concepts
45//!
46//! - **Version**: A specific snapshot of data at a point in time
47//! - **Epoch**: A logical time period during which versions are created
48//! - **Watermark**: The minimum epoch that is still accessible by readers
49//! - **GC Cycle**: Periodic cleanup of versions older than watermark
50
51use dashmap::DashMap;
52use std::collections::VecDeque;
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55
56/// Version identifier (epoch, sequence within epoch)
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
58pub struct VersionId {
59    pub epoch: u64,
60    pub sequence: u32,
61}
62
63impl VersionId {
64    pub fn new(epoch: u64, sequence: u32) -> Self {
65        Self { epoch, sequence }
66    }
67
68    /// Check if this version is older than watermark
69    pub fn is_stale(&self, watermark: u64) -> bool {
70        self.epoch < watermark
71    }
72}
73
74/// A versioned value wrapper
75#[derive(Debug, Clone)]
76pub struct VersionedValue<T> {
77    pub version: VersionId,
78    pub value: T,
79    /// Deletion marker (tombstone)
80    pub deleted: bool,
81}
82
83impl<T> VersionedValue<T> {
84    pub fn new(version: VersionId, value: T) -> Self {
85        Self {
86            version,
87            value,
88            deleted: false,
89        }
90    }
91
92    pub fn tombstone(version: VersionId, value: T) -> Self {
93        Self {
94            version,
95            value,
96            deleted: true,
97        }
98    }
99}
100
101/// Version chain for a single key
102#[derive(Debug)]
103pub struct VersionChain<T> {
104    /// Versions ordered from newest to oldest
105    versions: VecDeque<VersionedValue<T>>,
106    /// Total count of versions ever created
107    total_versions: u64,
108}
109
110impl<T: Clone> VersionChain<T> {
111    pub fn new() -> Self {
112        Self {
113            versions: VecDeque::new(),
114            total_versions: 0,
115        }
116    }
117
118    /// Add a new version (prepends to front as newest)
119    pub fn add_version(&mut self, version: VersionedValue<T>) {
120        self.versions.push_front(version);
121        self.total_versions += 1;
122    }
123
124    /// Get the latest version
125    pub fn latest(&self) -> Option<&VersionedValue<T>> {
126        self.versions.front()
127    }
128
129    /// Get version visible at specific epoch (strict less-than for consistency with MVCC)
130    /// Returns None if the key was deleted before the given epoch
131    pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
132        for v in &self.versions {
133            if v.version.epoch < epoch {
134                // If this version (the most recent before the epoch) is deleted,
135                // the key is considered deleted at this point in time
136                if v.deleted {
137                    return None;
138                }
139                return Some(v);
140            }
141        }
142        None
143    }
144
145    /// Clean up versions older than watermark
146    /// Returns (versions_removed, bytes_freed estimate)
147    pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
148        let initial_len = self.versions.len();
149
150        // Keep at least one version (the latest visible)
151        let mut kept = 0;
152        let mut found_base = false;
153
154        for v in self.versions.iter() {
155            if v.version.epoch >= watermark {
156                kept += 1;
157            } else if !found_base {
158                // Keep first version below watermark as base
159                found_base = true;
160                kept += 1;
161            }
162        }
163
164        // Truncate in one operation — O(k) drops but O(1) metadata update
165        self.versions.truncate(kept);
166
167        let removed = initial_len - self.versions.len();
168        let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
169
170        (removed, bytes_freed)
171    }
172
173    /// Number of versions in chain
174    pub fn len(&self) -> usize {
175        self.versions.len()
176    }
177
178    pub fn is_empty(&self) -> bool {
179        self.versions.is_empty()
180    }
181}
182
183impl<T: Clone> Default for VersionChain<T> {
184    fn default() -> Self {
185        Self::new()
186    }
187}
188
189/// Maximum number of concurrent reader slots.
190/// Each slot is cache-line aligned (64 bytes) to prevent false sharing.
191const MAX_READER_SLOTS: usize = 256;
192
193/// Sentinel value indicating an empty (unused) reader slot.
194const SLOT_EMPTY: u64 = u64::MAX;
195
196/// Cache-line-aligned epoch slot for lock-free reader tracking.
197///
198/// Each reader thread stores its held epoch in one of these slots.
199/// Alignment to 64 bytes ensures no two slots share a cache line,
200/// eliminating false-sharing between concurrent register/unregister calls.
201#[derive(Debug)]
202#[repr(C, align(64))]
203struct EpochSlot {
204    epoch: AtomicU64,
205}
206
207impl EpochSlot {
208    const fn empty() -> Self {
209        Self {
210            epoch: AtomicU64::new(SLOT_EMPTY),
211        }
212    }
213}
214
215/// Lock-free reader registration for tracking active epochs.
216///
217/// Replaces the original `RwLock<HashMap<u64, u64>>` with a fixed-size
218/// array of cache-line-aligned `AtomicU64` slots.  Every operation is
219/// O(1) amortised and fully lock-free:
220///
221/// * `register()` — CAS-loop to claim an empty slot
222/// * `unregister()` — single atomic store (SLOT_EMPTY)
223/// * `min_active_epoch()` — relaxed scan of all slots, no locking
224#[derive(Debug)]
225pub struct ReaderRegistry {
226    /// Fixed-size slot array, one epoch per slot.
227    slots: Box<[EpochSlot; MAX_READER_SLOTS]>,
228    /// Count of active readers (informational).
229    active_count: AtomicUsize,
230}
231
232impl ReaderRegistry {
233    pub fn new() -> Self {
234        // Initialize all slots to SLOT_EMPTY
235        let slots: Box<[EpochSlot; MAX_READER_SLOTS]> = {
236            let mut v: Vec<EpochSlot> = Vec::with_capacity(MAX_READER_SLOTS);
237            for _ in 0..MAX_READER_SLOTS {
238                v.push(EpochSlot::empty());
239            }
240            v.into_boxed_slice().try_into().ok().unwrap()
241        };
242        Self {
243            slots,
244            active_count: AtomicUsize::new(0),
245        }
246    }
247
248    /// Register a reader at `epoch`.  Returns the slot index (used as reader_id).
249    ///
250    /// Cost: O(MAX_READER_SLOTS) worst-case scan, O(1) amortised with low occupancy.
251    /// Fully lock-free — uses compare_exchange to claim an empty slot.
252    ///
253    /// Returns `None` if all slots are exhausted. Callers must back-pressure
254    /// or retry after a brief yield — silently overwriting a slot would allow
255    /// the GC watermark to advance past a still-active reader's epoch,
256    /// leading to use-after-free on version data.
257    pub fn register(&self, epoch: u64) -> Option<u64> {
258        for (i, slot) in self.slots.iter().enumerate() {
259            if slot
260                .epoch
261                .compare_exchange(SLOT_EMPTY, epoch, Ordering::AcqRel, Ordering::Relaxed)
262                .is_ok()
263            {
264                self.active_count.fetch_add(1, Ordering::Relaxed);
265                return Some(i as u64);
266            }
267        }
268        // All slots full — must NOT overwrite an occupied slot.
269        // Overwriting would erase a live reader's epoch, causing
270        // min_active_epoch() to return a value that is too high,
271        // which lets GC reclaim versions still visible to that reader.
272        None
273    }
274
275    /// Unregister a reader by slot index.
276    ///
277    /// Cost: O(1), lock-free.
278    pub fn unregister(&self, reader_id: u64) {
279        let idx = reader_id as usize;
280        if idx < MAX_READER_SLOTS {
281            let prev = self.slots[idx].epoch.swap(SLOT_EMPTY, Ordering::Release);
282            if prev != SLOT_EMPTY {
283                self.active_count.fetch_sub(1, Ordering::Relaxed);
284            }
285        }
286    }
287
288    /// Compute the minimum epoch across all active readers, lock-free.
289    ///
290    /// Cost: O(MAX_READER_SLOTS) with relaxed loads — no locking, no contention.
291    pub fn min_active_epoch(&self) -> Option<u64> {
292        let mut min_epoch = u64::MAX;
293        for slot in self.slots.iter() {
294            let e = slot.epoch.load(Ordering::Relaxed);
295            if e != SLOT_EMPTY && e < min_epoch {
296                min_epoch = e;
297            }
298        }
299        if min_epoch == u64::MAX {
300            None
301        } else {
302            Some(min_epoch)
303        }
304    }
305
306    /// Get count of active readers.
307    pub fn active_count(&self) -> usize {
308        self.active_count.load(Ordering::Relaxed)
309    }
310}
311
312impl Default for ReaderRegistry {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318/// GC statistics
319#[derive(Debug, Default)]
320pub struct GCStats {
321    pub gc_cycles: AtomicU64,
322    pub versions_collected: AtomicU64,
323    pub bytes_freed: AtomicU64,
324    pub chains_scanned: AtomicU64,
325    pub last_gc_epoch: AtomicU64,
326    pub last_gc_duration_us: AtomicU64,
327}
328
329impl GCStats {
330    pub fn snapshot(&self) -> GCStatsSnapshot {
331        GCStatsSnapshot {
332            gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
333            versions_collected: self.versions_collected.load(Ordering::Relaxed),
334            bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
335            chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
336            last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
337            last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
338        }
339    }
340}
341
342#[derive(Debug, Clone)]
343pub struct GCStatsSnapshot {
344    pub gc_cycles: u64,
345    pub versions_collected: u64,
346    pub bytes_freed: u64,
347    pub chains_scanned: u64,
348    pub last_gc_epoch: u64,
349    pub last_gc_duration_us: u64,
350}
351
352/// GC configuration
353#[derive(Debug, Clone)]
354pub struct GCConfig {
355    /// Minimum number of epochs to keep (grace period)
356    pub min_epochs_to_keep: u64,
357    /// Trigger GC after this many new versions
358    pub gc_trigger_threshold: usize,
359    /// Maximum versions to scan per GC cycle
360    pub max_versions_per_cycle: usize,
361}
362
363impl Default for GCConfig {
364    fn default() -> Self {
365        Self {
366            min_epochs_to_keep: 2,
367            gc_trigger_threshold: 1000,
368            max_versions_per_cycle: 10000,
369        }
370    }
371}
372
373/// Epoch-based garbage collector for versioned data
374pub struct EpochGC<K, V>
375where
376    K: Eq + std::hash::Hash + Clone,
377    V: Clone,
378{
379    /// Global epoch counter
380    current_epoch: AtomicU64,
381    /// Sequence counter within epoch
382    current_sequence: AtomicU64,
383    /// Version chains by key — DashMap provides 64-shard concurrent access,
384    /// eliminating the global RwLock that serialised all insert/get/gc calls.
385    chains: DashMap<K, VersionChain<V>>,
386    /// Reader registry
387    readers: Arc<ReaderRegistry>,
388    /// GC configuration
389    config: GCConfig,
390    /// GC statistics
391    stats: GCStats,
392    /// Pending versions since last GC
393    pending_versions: AtomicUsize,
394}
395
396impl<K, V> EpochGC<K, V>
397where
398    K: Eq + std::hash::Hash + Clone,
399    V: Clone,
400{
401    /// Create new epoch GC with default config
402    pub fn new() -> Self {
403        Self::with_config(GCConfig::default())
404    }
405
406    /// Create with custom config
407    pub fn with_config(config: GCConfig) -> Self {
408        Self {
409            current_epoch: AtomicU64::new(0),
410            current_sequence: AtomicU64::new(0),
411            chains: DashMap::new(),
412            readers: Arc::new(ReaderRegistry::new()),
413            config,
414            stats: GCStats::default(),
415            pending_versions: AtomicUsize::new(0),
416        }
417    }
418
419    /// Get current epoch
420    pub fn current_epoch(&self) -> u64 {
421        self.current_epoch.load(Ordering::SeqCst)
422    }
423
424    /// Advance to next epoch
425    pub fn advance_epoch(&self) -> u64 {
426        self.current_sequence.store(0, Ordering::SeqCst);
427        self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
428    }
429
430    /// Allocate a new version ID
431    pub fn next_version(&self) -> VersionId {
432        let epoch = self.current_epoch.load(Ordering::SeqCst);
433        let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
434        VersionId::new(epoch, seq)
435    }
436
437    /// Insert a new version
438    pub fn insert(&self, key: K, value: V) -> VersionId {
439        let version = self.next_version();
440        let versioned = VersionedValue::new(version, value);
441
442        self.chains.entry(key).or_default().add_version(versioned);
443
444        let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
445
446        // Trigger GC if threshold exceeded
447        if pending >= self.config.gc_trigger_threshold {
448            self.try_gc();
449        }
450
451        version
452    }
453
454    /// Delete a key (insert tombstone)
455    pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
456        let version = self.next_version();
457        let versioned = VersionedValue::tombstone(version, tombstone_value);
458
459        self.chains.entry(key).or_default().add_version(versioned);
460
461        self.pending_versions.fetch_add(1, Ordering::Relaxed);
462        version
463    }
464
465    /// Get latest version of a key
466    pub fn get(&self, key: &K) -> Option<V> {
467        self.chains
468            .get(key)
469            .and_then(|entry| entry.latest().cloned())
470            .filter(|v| !v.deleted)
471            .map(|v| v.value.clone())
472    }
473
474    /// Get version at specific epoch
475    pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
476        self.chains
477            .get(key)
478            .and_then(|entry| entry.version_at(epoch).cloned())
479            .map(|v| v.value.clone())
480    }
481
482    /// Begin a read transaction at current epoch.
483    ///
484    /// Panics if all reader slots are exhausted (256 concurrent readers).
485    /// This indicates a concurrency design issue in the caller — either
486    /// readers are not being dropped or the workload exceeds the slot limit.
487    pub fn begin_read(&self) -> ReadGuard {
488        let epoch = self.current_epoch.load(Ordering::SeqCst);
489        let reader_id = self.readers.register(epoch).unwrap_or_else(|| {
490            panic!(
491                "EpochGC: all {} reader slots exhausted — too many concurrent readers. \
492                 Ensure ReadGuards are dropped promptly.",
493                MAX_READER_SLOTS
494            )
495        });
496        ReadGuard {
497            epoch,
498            reader_id,
499            registry: Arc::clone(&self.readers),
500        }
501    }
502
503    /// Calculate the GC watermark (safe to collect below this)
504    pub fn watermark(&self) -> u64 {
505        let current = self.current_epoch.load(Ordering::SeqCst);
506        let min_reader = self.readers.min_active_epoch().unwrap_or(current);
507
508        // Watermark is the minimum of current - grace period and min active reader
509        let grace = current.saturating_sub(self.config.min_epochs_to_keep);
510        grace.min(min_reader)
511    }
512
513    /// Try to run a GC cycle
514    pub fn try_gc(&self) -> GCResult {
515        let start = std::time::Instant::now();
516        let watermark = self.watermark();
517
518        let mut versions_collected = 0;
519        let mut bytes_freed = 0;
520        let mut chains_scanned = 0;
521
522        // Collect keys to scan (DashMap doesn't need a global lock)
523        let keys: Vec<K> = self
524            .chains
525            .iter()
526            .map(|entry| entry.key().clone())
527            .collect();
528
529        for key in keys {
530            if chains_scanned >= self.config.max_versions_per_cycle {
531                break;
532            }
533
534            if let Some(mut entry) = self.chains.get_mut(&key) {
535                let (removed, freed) = entry.gc(watermark);
536                versions_collected += removed;
537                bytes_freed += freed;
538                chains_scanned += 1;
539            }
540        }
541
542        // Remove empty chains in a separate pass
543        self.chains.retain(|_, chain| !chain.is_empty());
544
545        let duration = start.elapsed();
546
547        // Update stats
548        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
549        self.stats
550            .versions_collected
551            .fetch_add(versions_collected as u64, Ordering::Relaxed);
552        self.stats
553            .bytes_freed
554            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
555        self.stats
556            .chains_scanned
557            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
558        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
559        self.stats
560            .last_gc_duration_us
561            .store(duration.as_micros() as u64, Ordering::Relaxed);
562
563        // Reset pending counter
564        self.pending_versions.store(0, Ordering::Relaxed);
565
566        GCResult {
567            versions_collected,
568            bytes_freed,
569            chains_scanned,
570            watermark,
571            duration_us: duration.as_micros() as u64,
572        }
573    }
574
575    /// Force a full GC (ignore limits)
576    pub fn force_gc(&self) -> GCResult {
577        let _old_limit = self.config.max_versions_per_cycle;
578        // Temporarily set limit to max
579        let _config = GCConfig {
580            max_versions_per_cycle: usize::MAX,
581            ..self.config.clone()
582        };
583
584        let start = std::time::Instant::now();
585        let watermark = self.watermark();
586
587        let mut versions_collected = 0;
588        let mut bytes_freed = 0;
589        let mut chains_scanned = 0;
590
591        for mut entry in self.chains.iter_mut() {
592            let (removed, freed) = entry.value_mut().gc(watermark);
593            versions_collected += removed;
594            bytes_freed += freed;
595            chains_scanned += 1;
596        }
597
598        // Remove empty chains
599        self.chains.retain(|_, chain| !chain.is_empty());
600
601        let duration = start.elapsed();
602
603        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
604        self.stats
605            .versions_collected
606            .fetch_add(versions_collected as u64, Ordering::Relaxed);
607        self.stats
608            .bytes_freed
609            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
610        self.stats
611            .chains_scanned
612            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
613        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
614        self.stats
615            .last_gc_duration_us
616            .store(duration.as_micros() as u64, Ordering::Relaxed);
617        self.pending_versions.store(0, Ordering::Relaxed);
618
619        GCResult {
620            versions_collected,
621            bytes_freed,
622            chains_scanned,
623            watermark,
624            duration_us: duration.as_micros() as u64,
625        }
626    }
627
628    /// Get GC statistics
629    pub fn stats(&self) -> GCStatsSnapshot {
630        self.stats.snapshot()
631    }
632
633    /// Get total version count
634    pub fn version_count(&self) -> usize {
635        self.chains.iter().map(|entry| entry.value().len()).sum()
636    }
637
638    /// Get chain count
639    pub fn chain_count(&self) -> usize {
640        self.chains.len()
641    }
642}
643
644impl<K, V> Default for EpochGC<K, V>
645where
646    K: Eq + std::hash::Hash + Clone,
647    V: Clone,
648{
649    fn default() -> Self {
650        Self::new()
651    }
652}
653
654/// Result of a GC cycle
655#[derive(Debug, Clone)]
656pub struct GCResult {
657    pub versions_collected: usize,
658    pub bytes_freed: usize,
659    pub chains_scanned: usize,
660    pub watermark: u64,
661    pub duration_us: u64,
662}
663
664/// Guard for read transactions
665pub struct ReadGuard {
666    pub epoch: u64,
667    reader_id: u64,
668    registry: Arc<ReaderRegistry>,
669}
670
671impl Drop for ReadGuard {
672    fn drop(&mut self) {
673        self.registry.unregister(self.reader_id);
674    }
675}
676
677#[cfg(test)]
678mod tests {
679    use super::*;
680
681    #[test]
682    fn test_version_id() {
683        let v1 = VersionId::new(1, 0);
684        let v2 = VersionId::new(2, 0);
685
686        assert!(v1 < v2);
687        assert!(v1.is_stale(2));
688        assert!(!v2.is_stale(2));
689    }
690
691    #[test]
692    fn test_version_chain_basic() {
693        let mut chain: VersionChain<String> = VersionChain::new();
694
695        chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
696        chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
697
698        assert_eq!(chain.len(), 2);
699        assert_eq!(chain.latest().unwrap().value, "v2");
700    }
701
702    #[test]
703    fn test_version_chain_gc() {
704        let mut chain: VersionChain<String> = VersionChain::new();
705
706        // Add versions at different epochs
707        for epoch in 0..5 {
708            chain.add_version(VersionedValue::new(
709                VersionId::new(epoch, 0),
710                format!("v{}", epoch),
711            ));
712        }
713
714        assert_eq!(chain.len(), 5);
715
716        // GC with watermark at epoch 3
717        let (removed, _) = chain.gc(3);
718
719        // Should keep epochs 3,4 and one base version
720        assert!(removed > 0);
721        assert!(chain.len() < 5);
722    }
723
724    #[test]
725    fn test_reader_registry() {
726        let registry = ReaderRegistry::new();
727
728        let r1 = registry.register(10).unwrap();
729        let _r2 = registry.register(20).unwrap();
730
731        assert_eq!(registry.active_count(), 2);
732        assert_eq!(registry.min_active_epoch(), Some(10));
733
734        registry.unregister(r1);
735        assert_eq!(registry.active_count(), 1);
736        assert_eq!(registry.min_active_epoch(), Some(20));
737    }
738
739    #[test]
740    fn test_epoch_gc_basic() {
741        let gc: EpochGC<String, i32> = EpochGC::new();
742
743        let _v1 = gc.insert("key1".to_string(), 100);
744        let _v2 = gc.insert("key1".to_string(), 200);
745
746        assert_eq!(gc.get(&"key1".to_string()), Some(200));
747        assert_eq!(gc.version_count(), 2);
748    }
749
750    #[test]
751    fn test_epoch_gc_delete() {
752        let gc: EpochGC<String, i32> = EpochGC::new();
753
754        gc.insert("key1".to_string(), 100);
755        gc.delete("key1".to_string(), 0); // tombstone
756
757        assert_eq!(gc.get(&"key1".to_string()), None);
758    }
759
760    #[test]
761    fn test_epoch_gc_at_epoch() {
762        let gc: EpochGC<String, i32> = EpochGC::new();
763
764        gc.insert("key1".to_string(), 100);
765        gc.advance_epoch();
766        gc.insert("key1".to_string(), 200);
767        gc.advance_epoch();
768        gc.insert("key1".to_string(), 300);
769
770        // With strict < semantics: version_at(N) sees versions with epoch < N
771        // v=100 at epoch 0, v=200 at epoch 1, v=300 at epoch 2
772        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(100)); // epoch 0 < 1
773        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(200)); // epoch 1 < 2
774        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 3), Some(300)); // epoch 2 < 3
775        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), None); // no epoch < 0
776    }
777
778    #[test]
779    fn test_read_guard() {
780        let gc: EpochGC<String, i32> = EpochGC::new();
781
782        gc.insert("key1".to_string(), 100);
783
784        {
785            let _guard = gc.begin_read();
786            assert_eq!(gc.readers.active_count(), 1);
787        }
788
789        assert_eq!(gc.readers.active_count(), 0);
790    }
791
792    #[test]
793    fn test_watermark_calculation() {
794        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
795            min_epochs_to_keep: 2,
796            ..Default::default()
797        });
798
799        // Epoch 0
800        gc.insert("k".to_string(), 1);
801        gc.advance_epoch(); // Epoch 1
802        gc.insert("k".to_string(), 2);
803        gc.advance_epoch(); // Epoch 2
804        gc.insert("k".to_string(), 3);
805        gc.advance_epoch(); // Epoch 3
806        gc.insert("k".to_string(), 4);
807        gc.advance_epoch(); // Epoch 4
808
809        // With no readers and grace period 2, watermark should be 4-2=2
810        assert!(gc.watermark() <= 2);
811
812        // With a reader at epoch 1, watermark should be min(2, 1) = 1
813        let _guard = gc.begin_read();
814        assert!(gc.watermark() <= gc.current_epoch());
815    }
816
817    #[test]
818    fn test_gc_cycle() {
819        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
820            min_epochs_to_keep: 1,
821            gc_trigger_threshold: 100,
822            max_versions_per_cycle: 100,
823        });
824
825        // Create multiple versions across epochs
826        for i in 0..10 {
827            gc.insert("key".to_string(), i);
828            gc.advance_epoch();
829        }
830
831        assert_eq!(gc.version_count(), 10);
832
833        // Run GC
834        let result = gc.try_gc();
835
836        // Should have collected some versions
837        assert!(result.versions_collected > 0 || gc.version_count() < 10);
838    }
839
840    #[test]
841    fn test_gc_stats() {
842        let gc: EpochGC<String, i32> = EpochGC::new();
843
844        for i in 0..5 {
845            gc.insert("key".to_string(), i);
846            gc.advance_epoch();
847        }
848
849        gc.try_gc();
850
851        let stats = gc.stats();
852        assert!(stats.gc_cycles >= 1);
853    }
854
855    #[test]
856    fn test_force_gc() {
857        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
858            min_epochs_to_keep: 0,
859            gc_trigger_threshold: 1000,
860            max_versions_per_cycle: 1, // Very limited
861        });
862
863        for i in 0..20 {
864            gc.insert(format!("key{}", i), i);
865        }
866
867        gc.advance_epoch();
868        gc.advance_epoch();
869
870        let initial_count = gc.version_count();
871        gc.force_gc();
872        let final_count = gc.version_count();
873
874        // Force GC should process all chains regardless of limit
875        assert!(final_count <= initial_count);
876    }
877
878    #[test]
879    fn test_chain_count() {
880        let gc: EpochGC<String, i32> = EpochGC::new();
881
882        gc.insert("key1".to_string(), 1);
883        gc.insert("key2".to_string(), 2);
884        gc.insert("key3".to_string(), 3);
885
886        assert_eq!(gc.chain_count(), 3);
887    }
888
889    #[test]
890    fn test_version_at_respects_tombstone() {
891        let mut chain: VersionChain<i32> = VersionChain::new();
892
893        chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
894        chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
895
896        // With strict < semantics:
897        // version_at(2) should see tombstone at epoch 1 → return None
898        assert!(chain.version_at(2).is_none());
899        // version_at(1) should see version at epoch 0 → return 100
900        assert_eq!(chain.version_at(1).map(|v| v.value), Some(100));
901        // version_at(0) → no version with epoch < 0
902        assert!(chain.version_at(0).is_none());
903    }
904
905    #[test]
906    fn test_gc_result_fields() {
907        let gc: EpochGC<String, i32> = EpochGC::new();
908
909        for i in 0..5 {
910            gc.insert("key".to_string(), i);
911            gc.advance_epoch();
912        }
913
914        let result = gc.try_gc();
915
916        // Verify result has reasonable values
917        assert!(result.watermark <= gc.current_epoch());
918        assert!(result.chains_scanned <= gc.chain_count() + 1);
919    }
920
921    #[test]
922    fn test_lock_free_slot_registration() {
923        let registry = ReaderRegistry::new();
924
925        // Register multiple readers — each gets a unique slot
926        let id0 = registry.register(10).unwrap();
927        let id1 = registry.register(20).unwrap();
928        let id2 = registry.register(30).unwrap();
929
930        assert_ne!(id0, id1);
931        assert_ne!(id1, id2);
932        assert_eq!(registry.active_count(), 3);
933        assert_eq!(registry.min_active_epoch(), Some(10));
934
935        // Unregister middle reader — min should stay at 10
936        registry.unregister(id1);
937        assert_eq!(registry.active_count(), 2);
938        assert_eq!(registry.min_active_epoch(), Some(10));
939
940        // Unregister first reader — min moves to 30
941        registry.unregister(id0);
942        assert_eq!(registry.active_count(), 1);
943        assert_eq!(registry.min_active_epoch(), Some(30));
944
945        // Unregister last — no active readers
946        registry.unregister(id2);
947        assert_eq!(registry.active_count(), 0);
948        assert_eq!(registry.min_active_epoch(), None);
949    }
950
951    #[test]
952    fn test_concurrent_insert_and_gc() {
953        use std::sync::Arc;
954        use std::thread;
955
956        let gc = Arc::new(EpochGC::<u64, u64>::new());
957
958        // Spawn writers
959        let mut handles = Vec::new();
960        for t in 0..4 {
961            let gc = Arc::clone(&gc);
962            handles.push(thread::spawn(move || {
963                for i in 0..100 {
964                    gc.insert(t * 1000 + i, i);
965                }
966            }));
967        }
968
969        // Spawn a GC thread
970        {
971            let gc = Arc::clone(&gc);
972            handles.push(thread::spawn(move || {
973                for _ in 0..10 {
974                    gc.advance_epoch();
975                    gc.try_gc();
976                }
977            }));
978        }
979
980        for h in handles {
981            h.join().unwrap();
982        }
983
984        // Should have 400 unique keys
985        assert_eq!(gc.chain_count(), 400);
986    }
987}