Skip to main content

sochdb_core/
epoch_gc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Epoch-Based Garbage Collection for Version Cleanup
19//!
20//! This module provides epoch-based garbage collection for cleaning up
21//! old versions of data in a multi-version concurrency control (MVCC) system.
22//!
23//! # Design
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │                    Epoch-Based GC                               │
28//! │                                                                 │
29//! │  Time ──────────────────────────────────────────────────────→  │
30//! │                                                                 │
31//! │  Epoch 0     Epoch 1     Epoch 2     Epoch 3     Epoch 4       │
32//! │  ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐       │
33//! │  │v1,v2│     │v3,v4│     │v5   │     │v6,v7│     │v8   │       │
34//! │  └─────┘     └─────┘     └─────┘     └─────┘     └─────┘       │
35//! │    ↑                                               ↑           │
36//! │    │                                               │           │
37//! │  Min active                                    Current         │
38//! │  reader epoch                                   epoch          │
39//! │                                                                 │
40//! │  Versions in epochs < min_active can be safely collected       │
41//! └─────────────────────────────────────────────────────────────────┘
42//! ```
43//!
44//! # Key Concepts
45//!
46//! - **Version**: A specific snapshot of data at a point in time
47//! - **Epoch**: A logical time period during which versions are created
48//! - **Watermark**: The minimum epoch that is still accessible by readers
49//! - **GC Cycle**: Periodic cleanup of versions older than watermark
50
51use dashmap::DashMap;
52use std::collections::VecDeque;
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55
56/// Version identifier (epoch, sequence within epoch)
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
58pub struct VersionId {
59    pub epoch: u64,
60    pub sequence: u32,
61}
62
63impl VersionId {
64    pub fn new(epoch: u64, sequence: u32) -> Self {
65        Self { epoch, sequence }
66    }
67
68    /// Check if this version is older than watermark
69    pub fn is_stale(&self, watermark: u64) -> bool {
70        self.epoch < watermark
71    }
72}
73
74/// A versioned value wrapper
75#[derive(Debug, Clone)]
76pub struct VersionedValue<T> {
77    pub version: VersionId,
78    pub value: T,
79    /// Deletion marker (tombstone)
80    pub deleted: bool,
81}
82
83impl<T> VersionedValue<T> {
84    pub fn new(version: VersionId, value: T) -> Self {
85        Self {
86            version,
87            value,
88            deleted: false,
89        }
90    }
91
92    pub fn tombstone(version: VersionId, value: T) -> Self {
93        Self {
94            version,
95            value,
96            deleted: true,
97        }
98    }
99}
100
101/// Version chain for a single key
102#[derive(Debug)]
103pub struct VersionChain<T> {
104    /// Versions ordered from newest to oldest
105    versions: VecDeque<VersionedValue<T>>,
106    /// Total count of versions ever created
107    total_versions: u64,
108}
109
110impl<T: Clone> VersionChain<T> {
111    pub fn new() -> Self {
112        Self {
113            versions: VecDeque::new(),
114            total_versions: 0,
115        }
116    }
117
118    /// Add a new version (prepends to front as newest)
119    pub fn add_version(&mut self, version: VersionedValue<T>) {
120        self.versions.push_front(version);
121        self.total_versions += 1;
122    }
123
124    /// Get the latest version
125    pub fn latest(&self) -> Option<&VersionedValue<T>> {
126        self.versions.front()
127    }
128
129    /// Get version visible at specific epoch (strict less-than for consistency with MVCC)
130    /// Returns None if the key was deleted before the given epoch
131    pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
132        for v in &self.versions {
133            if v.version.epoch < epoch {
134                // If this version (the most recent before the epoch) is deleted,
135                // the key is considered deleted at this point in time
136                if v.deleted {
137                    return None;
138                }
139                return Some(v);
140            }
141        }
142        None
143    }
144
145    /// Clean up versions older than watermark
146    /// Returns (versions_removed, bytes_freed estimate)
147    pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
148        let initial_len = self.versions.len();
149
150        // Keep at least one version (the latest visible)
151        let mut kept = 0;
152        let mut found_base = false;
153
154        for v in self.versions.iter() {
155            if v.version.epoch >= watermark {
156                kept += 1;
157            } else if !found_base {
158                // Keep first version below watermark as base
159                found_base = true;
160                kept += 1;
161            }
162        }
163
164        // Truncate in one operation — O(k) drops but O(1) metadata update
165        self.versions.truncate(kept);
166
167        let removed = initial_len - self.versions.len();
168        let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
169
170        (removed, bytes_freed)
171    }
172
173    /// Number of versions in chain
174    pub fn len(&self) -> usize {
175        self.versions.len()
176    }
177
178    pub fn is_empty(&self) -> bool {
179        self.versions.is_empty()
180    }
181}
182
183impl<T: Clone> Default for VersionChain<T> {
184    fn default() -> Self {
185        Self::new()
186    }
187}
188
189/// Maximum number of concurrent reader slots.
190/// Each slot is cache-line aligned (64 bytes) to prevent false sharing.
191const MAX_READER_SLOTS: usize = 256;
192
193/// Sentinel value indicating an empty (unused) reader slot.
194const SLOT_EMPTY: u64 = u64::MAX;
195
196/// Cache-line-aligned epoch slot for lock-free reader tracking.
197///
198/// Each reader thread stores its held epoch in one of these slots.
199/// Alignment to 64 bytes ensures no two slots share a cache line,
200/// eliminating false-sharing between concurrent register/unregister calls.
201#[derive(Debug)]
202#[repr(C, align(64))]
203struct EpochSlot {
204    epoch: AtomicU64,
205}
206
207impl EpochSlot {
208    const fn empty() -> Self {
209        Self {
210            epoch: AtomicU64::new(SLOT_EMPTY),
211        }
212    }
213}
214
215/// Lock-free reader registration for tracking active epochs.
216///
217/// Replaces the original `RwLock<HashMap<u64, u64>>` with a fixed-size
218/// array of cache-line-aligned `AtomicU64` slots.  Every operation is
219/// O(1) amortised and fully lock-free:
220///
221/// * `register()` — CAS-loop to claim an empty slot
222/// * `unregister()` — single atomic store (SLOT_EMPTY)
223/// * `min_active_epoch()` — relaxed scan of all slots, no locking
224#[derive(Debug)]
225pub struct ReaderRegistry {
226    /// Fixed-size slot array, one epoch per slot.
227    slots: Box<[EpochSlot; MAX_READER_SLOTS]>,
228    /// Count of active readers (informational).
229    active_count: AtomicUsize,
230}
231
232impl ReaderRegistry {
233    pub fn new() -> Self {
234        // Initialize all slots to SLOT_EMPTY
235        let slots: Box<[EpochSlot; MAX_READER_SLOTS]> = {
236            let mut v: Vec<EpochSlot> = Vec::with_capacity(MAX_READER_SLOTS);
237            for _ in 0..MAX_READER_SLOTS {
238                v.push(EpochSlot::empty());
239            }
240            v.into_boxed_slice().try_into().ok().unwrap()
241        };
242        Self {
243            slots,
244            active_count: AtomicUsize::new(0),
245        }
246    }
247
248    /// Register a reader at `epoch`.  Returns the slot index (used as reader_id).
249    ///
250    /// Cost: O(MAX_READER_SLOTS) worst-case scan, O(1) amortised with low occupancy.
251    /// Fully lock-free — uses compare_exchange to claim an empty slot.
252    ///
253    /// Returns `None` if all slots are exhausted. Callers must back-pressure
254    /// or retry after a brief yield — silently overwriting a slot would allow
255    /// the GC watermark to advance past a still-active reader's epoch,
256    /// leading to use-after-free on version data.
257    pub fn register(&self, epoch: u64) -> Option<u64> {
258        for (i, slot) in self.slots.iter().enumerate() {
259            if slot
260                .epoch
261                .compare_exchange(SLOT_EMPTY, epoch, Ordering::AcqRel, Ordering::Relaxed)
262                .is_ok()
263            {
264                self.active_count.fetch_add(1, Ordering::Relaxed);
265                return Some(i as u64);
266            }
267        }
268        // All slots full — must NOT overwrite an occupied slot.
269        // Overwriting would erase a live reader's epoch, causing
270        // min_active_epoch() to return a value that is too high,
271        // which lets GC reclaim versions still visible to that reader.
272        None
273    }
274
275    /// Unregister a reader by slot index.
276    ///
277    /// Cost: O(1), lock-free.
278    pub fn unregister(&self, reader_id: u64) {
279        let idx = reader_id as usize;
280        if idx < MAX_READER_SLOTS {
281            let prev = self.slots[idx].epoch.swap(SLOT_EMPTY, Ordering::Release);
282            if prev != SLOT_EMPTY {
283                self.active_count.fetch_sub(1, Ordering::Relaxed);
284            }
285        }
286    }
287
288    /// Compute the minimum epoch across all active readers, lock-free.
289    ///
290    /// Cost: O(MAX_READER_SLOTS) with relaxed loads — no locking, no contention.
291    pub fn min_active_epoch(&self) -> Option<u64> {
292        let mut min_epoch = u64::MAX;
293        for slot in self.slots.iter() {
294            let e = slot.epoch.load(Ordering::Relaxed);
295            if e != SLOT_EMPTY && e < min_epoch {
296                min_epoch = e;
297            }
298        }
299        if min_epoch == u64::MAX {
300            None
301        } else {
302            Some(min_epoch)
303        }
304    }
305
306    /// Get count of active readers.
307    pub fn active_count(&self) -> usize {
308        self.active_count.load(Ordering::Relaxed)
309    }
310}
311
312impl Default for ReaderRegistry {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318/// GC statistics
319#[derive(Debug, Default)]
320pub struct GCStats {
321    pub gc_cycles: AtomicU64,
322    pub versions_collected: AtomicU64,
323    pub bytes_freed: AtomicU64,
324    pub chains_scanned: AtomicU64,
325    pub last_gc_epoch: AtomicU64,
326    pub last_gc_duration_us: AtomicU64,
327}
328
329impl GCStats {
330    pub fn snapshot(&self) -> GCStatsSnapshot {
331        GCStatsSnapshot {
332            gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
333            versions_collected: self.versions_collected.load(Ordering::Relaxed),
334            bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
335            chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
336            last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
337            last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
338        }
339    }
340}
341
342#[derive(Debug, Clone)]
343pub struct GCStatsSnapshot {
344    pub gc_cycles: u64,
345    pub versions_collected: u64,
346    pub bytes_freed: u64,
347    pub chains_scanned: u64,
348    pub last_gc_epoch: u64,
349    pub last_gc_duration_us: u64,
350}
351
352/// GC configuration
353#[derive(Debug, Clone)]
354pub struct GCConfig {
355    /// Minimum number of epochs to keep (grace period)
356    pub min_epochs_to_keep: u64,
357    /// Trigger GC after this many new versions
358    pub gc_trigger_threshold: usize,
359    /// Maximum versions to scan per GC cycle
360    pub max_versions_per_cycle: usize,
361}
362
363impl Default for GCConfig {
364    fn default() -> Self {
365        Self {
366            min_epochs_to_keep: 2,
367            gc_trigger_threshold: 1000,
368            max_versions_per_cycle: 10000,
369        }
370    }
371}
372
373/// Epoch-based garbage collector for versioned data
374pub struct EpochGC<K, V>
375where
376    K: Eq + std::hash::Hash + Clone,
377    V: Clone,
378{
379    /// Global epoch counter
380    current_epoch: AtomicU64,
381    /// Sequence counter within epoch
382    current_sequence: AtomicU64,
383    /// Version chains by key — DashMap provides 64-shard concurrent access,
384    /// eliminating the global RwLock that serialised all insert/get/gc calls.
385    chains: DashMap<K, VersionChain<V>>,
386    /// Reader registry
387    readers: Arc<ReaderRegistry>,
388    /// GC configuration
389    config: GCConfig,
390    /// GC statistics
391    stats: GCStats,
392    /// Pending versions since last GC
393    pending_versions: AtomicUsize,
394}
395
396impl<K, V> EpochGC<K, V>
397where
398    K: Eq + std::hash::Hash + Clone,
399    V: Clone,
400{
401    /// Create new epoch GC with default config
402    pub fn new() -> Self {
403        Self::with_config(GCConfig::default())
404    }
405
406    /// Create with custom config
407    pub fn with_config(config: GCConfig) -> Self {
408        Self {
409            current_epoch: AtomicU64::new(0),
410            current_sequence: AtomicU64::new(0),
411            chains: DashMap::new(),
412            readers: Arc::new(ReaderRegistry::new()),
413            config,
414            stats: GCStats::default(),
415            pending_versions: AtomicUsize::new(0),
416        }
417    }
418
419    /// Get current epoch
420    pub fn current_epoch(&self) -> u64 {
421        self.current_epoch.load(Ordering::SeqCst)
422    }
423
424    /// Advance to next epoch
425    pub fn advance_epoch(&self) -> u64 {
426        self.current_sequence.store(0, Ordering::SeqCst);
427        self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
428    }
429
430    /// Allocate a new version ID
431    pub fn next_version(&self) -> VersionId {
432        let epoch = self.current_epoch.load(Ordering::SeqCst);
433        let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
434        VersionId::new(epoch, seq)
435    }
436
437    /// Insert a new version
438    pub fn insert(&self, key: K, value: V) -> VersionId {
439        let version = self.next_version();
440        let versioned = VersionedValue::new(version, value);
441
442        self.chains.entry(key).or_default().add_version(versioned);
443
444        let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
445
446        // Trigger GC if threshold exceeded
447        if pending >= self.config.gc_trigger_threshold {
448            self.try_gc();
449        }
450
451        version
452    }
453
454    /// Delete a key (insert tombstone)
455    pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
456        let version = self.next_version();
457        let versioned = VersionedValue::tombstone(version, tombstone_value);
458
459        self.chains.entry(key).or_default().add_version(versioned);
460
461        self.pending_versions.fetch_add(1, Ordering::Relaxed);
462        version
463    }
464
465    /// Get latest version of a key
466    pub fn get(&self, key: &K) -> Option<V> {
467        self.chains
468            .get(key)
469            .and_then(|entry| entry.latest().cloned())
470            .filter(|v| !v.deleted)
471            .map(|v| v.value.clone())
472    }
473
474    /// Get version at specific epoch
475    pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
476        self.chains
477            .get(key)
478            .and_then(|entry| entry.version_at(epoch).cloned())
479            .map(|v| v.value.clone())
480    }
481
482    /// Begin a read transaction at current epoch.
483    ///
484    /// Panics if all reader slots are exhausted (256 concurrent readers).
485    /// This indicates a concurrency design issue in the caller — either
486    /// readers are not being dropped or the workload exceeds the slot limit.
487    pub fn begin_read(&self) -> ReadGuard {
488        let epoch = self.current_epoch.load(Ordering::SeqCst);
489        let reader_id = self.readers.register(epoch).unwrap_or_else(|| {
490            panic!(
491                "EpochGC: all {} reader slots exhausted — too many concurrent readers. \
492                 Ensure ReadGuards are dropped promptly.",
493                MAX_READER_SLOTS
494            )
495        });
496        ReadGuard {
497            epoch,
498            reader_id,
499            registry: Arc::clone(&self.readers),
500        }
501    }
502
503    /// Calculate the GC watermark (safe to collect below this)
504    pub fn watermark(&self) -> u64 {
505        let current = self.current_epoch.load(Ordering::SeqCst);
506        let min_reader = self.readers.min_active_epoch().unwrap_or(current);
507
508        // Watermark is the minimum of current - grace period and min active reader
509        let grace = current.saturating_sub(self.config.min_epochs_to_keep);
510        grace.min(min_reader)
511    }
512
513    /// Try to run a GC cycle
514    pub fn try_gc(&self) -> GCResult {
515        let start = std::time::Instant::now();
516        let watermark = self.watermark();
517
518        let mut versions_collected = 0;
519        let mut bytes_freed = 0;
520        let mut chains_scanned = 0;
521
522        // Collect keys to scan (DashMap doesn't need a global lock)
523        let keys: Vec<K> = self.chains.iter().map(|entry| entry.key().clone()).collect();
524
525        for key in keys {
526            if chains_scanned >= self.config.max_versions_per_cycle {
527                break;
528            }
529
530            if let Some(mut entry) = self.chains.get_mut(&key) {
531                let (removed, freed) = entry.gc(watermark);
532                versions_collected += removed;
533                bytes_freed += freed;
534                chains_scanned += 1;
535            }
536        }
537
538        // Remove empty chains in a separate pass
539        self.chains.retain(|_, chain| !chain.is_empty());
540
541        let duration = start.elapsed();
542
543        // Update stats
544        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
545        self.stats
546            .versions_collected
547            .fetch_add(versions_collected as u64, Ordering::Relaxed);
548        self.stats
549            .bytes_freed
550            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
551        self.stats
552            .chains_scanned
553            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
554        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
555        self.stats
556            .last_gc_duration_us
557            .store(duration.as_micros() as u64, Ordering::Relaxed);
558
559        // Reset pending counter
560        self.pending_versions.store(0, Ordering::Relaxed);
561
562        GCResult {
563            versions_collected,
564            bytes_freed,
565            chains_scanned,
566            watermark,
567            duration_us: duration.as_micros() as u64,
568        }
569    }
570
571    /// Force a full GC (ignore limits)
572    pub fn force_gc(&self) -> GCResult {
573        let _old_limit = self.config.max_versions_per_cycle;
574        // Temporarily set limit to max
575        let _config = GCConfig {
576            max_versions_per_cycle: usize::MAX,
577            ..self.config.clone()
578        };
579
580        let start = std::time::Instant::now();
581        let watermark = self.watermark();
582
583        let mut versions_collected = 0;
584        let mut bytes_freed = 0;
585        let mut chains_scanned = 0;
586
587        for mut entry in self.chains.iter_mut() {
588            let (removed, freed) = entry.value_mut().gc(watermark);
589            versions_collected += removed;
590            bytes_freed += freed;
591            chains_scanned += 1;
592        }
593
594        // Remove empty chains
595        self.chains.retain(|_, chain| !chain.is_empty());
596
597        let duration = start.elapsed();
598
599        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
600        self.stats
601            .versions_collected
602            .fetch_add(versions_collected as u64, Ordering::Relaxed);
603        self.stats
604            .bytes_freed
605            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
606        self.stats
607            .chains_scanned
608            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
609        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
610        self.stats
611            .last_gc_duration_us
612            .store(duration.as_micros() as u64, Ordering::Relaxed);
613        self.pending_versions.store(0, Ordering::Relaxed);
614
615        GCResult {
616            versions_collected,
617            bytes_freed,
618            chains_scanned,
619            watermark,
620            duration_us: duration.as_micros() as u64,
621        }
622    }
623
624    /// Get GC statistics
625    pub fn stats(&self) -> GCStatsSnapshot {
626        self.stats.snapshot()
627    }
628
629    /// Get total version count
630    pub fn version_count(&self) -> usize {
631        self.chains.iter().map(|entry| entry.value().len()).sum()
632    }
633
634    /// Get chain count
635    pub fn chain_count(&self) -> usize {
636        self.chains.len()
637    }
638}
639
640impl<K, V> Default for EpochGC<K, V>
641where
642    K: Eq + std::hash::Hash + Clone,
643    V: Clone,
644{
645    fn default() -> Self {
646        Self::new()
647    }
648}
649
650/// Result of a GC cycle
651#[derive(Debug, Clone)]
652pub struct GCResult {
653    pub versions_collected: usize,
654    pub bytes_freed: usize,
655    pub chains_scanned: usize,
656    pub watermark: u64,
657    pub duration_us: u64,
658}
659
660/// Guard for read transactions
661pub struct ReadGuard {
662    pub epoch: u64,
663    reader_id: u64,
664    registry: Arc<ReaderRegistry>,
665}
666
667impl Drop for ReadGuard {
668    fn drop(&mut self) {
669        self.registry.unregister(self.reader_id);
670    }
671}
672
673#[cfg(test)]
674mod tests {
675    use super::*;
676
677    #[test]
678    fn test_version_id() {
679        let v1 = VersionId::new(1, 0);
680        let v2 = VersionId::new(2, 0);
681
682        assert!(v1 < v2);
683        assert!(v1.is_stale(2));
684        assert!(!v2.is_stale(2));
685    }
686
687    #[test]
688    fn test_version_chain_basic() {
689        let mut chain: VersionChain<String> = VersionChain::new();
690
691        chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
692        chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
693
694        assert_eq!(chain.len(), 2);
695        assert_eq!(chain.latest().unwrap().value, "v2");
696    }
697
698    #[test]
699    fn test_version_chain_gc() {
700        let mut chain: VersionChain<String> = VersionChain::new();
701
702        // Add versions at different epochs
703        for epoch in 0..5 {
704            chain.add_version(VersionedValue::new(
705                VersionId::new(epoch, 0),
706                format!("v{}", epoch),
707            ));
708        }
709
710        assert_eq!(chain.len(), 5);
711
712        // GC with watermark at epoch 3
713        let (removed, _) = chain.gc(3);
714
715        // Should keep epochs 3,4 and one base version
716        assert!(removed > 0);
717        assert!(chain.len() < 5);
718    }
719
720    #[test]
721    fn test_reader_registry() {
722        let registry = ReaderRegistry::new();
723
724        let r1 = registry.register(10).unwrap();
725        let _r2 = registry.register(20).unwrap();
726
727        assert_eq!(registry.active_count(), 2);
728        assert_eq!(registry.min_active_epoch(), Some(10));
729
730        registry.unregister(r1);
731        assert_eq!(registry.active_count(), 1);
732        assert_eq!(registry.min_active_epoch(), Some(20));
733    }
734
735    #[test]
736    fn test_epoch_gc_basic() {
737        let gc: EpochGC<String, i32> = EpochGC::new();
738
739        let _v1 = gc.insert("key1".to_string(), 100);
740        let _v2 = gc.insert("key1".to_string(), 200);
741
742        assert_eq!(gc.get(&"key1".to_string()), Some(200));
743        assert_eq!(gc.version_count(), 2);
744    }
745
746    #[test]
747    fn test_epoch_gc_delete() {
748        let gc: EpochGC<String, i32> = EpochGC::new();
749
750        gc.insert("key1".to_string(), 100);
751        gc.delete("key1".to_string(), 0); // tombstone
752
753        assert_eq!(gc.get(&"key1".to_string()), None);
754    }
755
756    #[test]
757    fn test_epoch_gc_at_epoch() {
758        let gc: EpochGC<String, i32> = EpochGC::new();
759
760        gc.insert("key1".to_string(), 100);
761        gc.advance_epoch();
762        gc.insert("key1".to_string(), 200);
763        gc.advance_epoch();
764        gc.insert("key1".to_string(), 300);
765
766        // With strict < semantics: version_at(N) sees versions with epoch < N
767        // v=100 at epoch 0, v=200 at epoch 1, v=300 at epoch 2
768        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(100));  // epoch 0 < 1
769        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(200));  // epoch 1 < 2
770        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 3), Some(300));  // epoch 2 < 3
771        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), None);       // no epoch < 0
772    }
773
774    #[test]
775    fn test_read_guard() {
776        let gc: EpochGC<String, i32> = EpochGC::new();
777
778        gc.insert("key1".to_string(), 100);
779
780        {
781            let _guard = gc.begin_read();
782            assert_eq!(gc.readers.active_count(), 1);
783        }
784
785        assert_eq!(gc.readers.active_count(), 0);
786    }
787
788    #[test]
789    fn test_watermark_calculation() {
790        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
791            min_epochs_to_keep: 2,
792            ..Default::default()
793        });
794
795        // Epoch 0
796        gc.insert("k".to_string(), 1);
797        gc.advance_epoch(); // Epoch 1
798        gc.insert("k".to_string(), 2);
799        gc.advance_epoch(); // Epoch 2
800        gc.insert("k".to_string(), 3);
801        gc.advance_epoch(); // Epoch 3
802        gc.insert("k".to_string(), 4);
803        gc.advance_epoch(); // Epoch 4
804
805        // With no readers and grace period 2, watermark should be 4-2=2
806        assert!(gc.watermark() <= 2);
807
808        // With a reader at epoch 1, watermark should be min(2, 1) = 1
809        let _guard = gc.begin_read();
810        assert!(gc.watermark() <= gc.current_epoch());
811    }
812
813    #[test]
814    fn test_gc_cycle() {
815        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
816            min_epochs_to_keep: 1,
817            gc_trigger_threshold: 100,
818            max_versions_per_cycle: 100,
819        });
820
821        // Create multiple versions across epochs
822        for i in 0..10 {
823            gc.insert("key".to_string(), i);
824            gc.advance_epoch();
825        }
826
827        assert_eq!(gc.version_count(), 10);
828
829        // Run GC
830        let result = gc.try_gc();
831
832        // Should have collected some versions
833        assert!(result.versions_collected > 0 || gc.version_count() < 10);
834    }
835
836    #[test]
837    fn test_gc_stats() {
838        let gc: EpochGC<String, i32> = EpochGC::new();
839
840        for i in 0..5 {
841            gc.insert("key".to_string(), i);
842            gc.advance_epoch();
843        }
844
845        gc.try_gc();
846
847        let stats = gc.stats();
848        assert!(stats.gc_cycles >= 1);
849    }
850
851    #[test]
852    fn test_force_gc() {
853        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
854            min_epochs_to_keep: 0,
855            gc_trigger_threshold: 1000,
856            max_versions_per_cycle: 1, // Very limited
857        });
858
859        for i in 0..20 {
860            gc.insert(format!("key{}", i), i);
861        }
862
863        gc.advance_epoch();
864        gc.advance_epoch();
865
866        let initial_count = gc.version_count();
867        gc.force_gc();
868        let final_count = gc.version_count();
869
870        // Force GC should process all chains regardless of limit
871        assert!(final_count <= initial_count);
872    }
873
874    #[test]
875    fn test_chain_count() {
876        let gc: EpochGC<String, i32> = EpochGC::new();
877
878        gc.insert("key1".to_string(), 1);
879        gc.insert("key2".to_string(), 2);
880        gc.insert("key3".to_string(), 3);
881
882        assert_eq!(gc.chain_count(), 3);
883    }
884
885    #[test]
886    fn test_version_at_respects_tombstone() {
887        let mut chain: VersionChain<i32> = VersionChain::new();
888
889        chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
890        chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
891
892        // With strict < semantics:
893        // version_at(2) should see tombstone at epoch 1 → return None
894        assert!(chain.version_at(2).is_none());
895        // version_at(1) should see version at epoch 0 → return 100
896        assert_eq!(chain.version_at(1).map(|v| v.value), Some(100));
897        // version_at(0) → no version with epoch < 0
898        assert!(chain.version_at(0).is_none());
899    }
900
901    #[test]
902    fn test_gc_result_fields() {
903        let gc: EpochGC<String, i32> = EpochGC::new();
904
905        for i in 0..5 {
906            gc.insert("key".to_string(), i);
907            gc.advance_epoch();
908        }
909
910        let result = gc.try_gc();
911
912        // Verify result has reasonable values
913        assert!(result.watermark <= gc.current_epoch());
914        assert!(result.chains_scanned <= gc.chain_count() + 1);
915    }
916
917    #[test]
918    fn test_lock_free_slot_registration() {
919        let registry = ReaderRegistry::new();
920
921        // Register multiple readers — each gets a unique slot
922        let id0 = registry.register(10).unwrap();
923        let id1 = registry.register(20).unwrap();
924        let id2 = registry.register(30).unwrap();
925
926        assert_ne!(id0, id1);
927        assert_ne!(id1, id2);
928        assert_eq!(registry.active_count(), 3);
929        assert_eq!(registry.min_active_epoch(), Some(10));
930
931        // Unregister middle reader — min should stay at 10
932        registry.unregister(id1);
933        assert_eq!(registry.active_count(), 2);
934        assert_eq!(registry.min_active_epoch(), Some(10));
935
936        // Unregister first reader — min moves to 30
937        registry.unregister(id0);
938        assert_eq!(registry.active_count(), 1);
939        assert_eq!(registry.min_active_epoch(), Some(30));
940
941        // Unregister last — no active readers
942        registry.unregister(id2);
943        assert_eq!(registry.active_count(), 0);
944        assert_eq!(registry.min_active_epoch(), None);
945    }
946
947    #[test]
948    fn test_concurrent_insert_and_gc() {
949        use std::sync::Arc;
950        use std::thread;
951
952        let gc = Arc::new(EpochGC::<u64, u64>::new());
953
954        // Spawn writers
955        let mut handles = Vec::new();
956        for t in 0..4 {
957            let gc = Arc::clone(&gc);
958            handles.push(thread::spawn(move || {
959                for i in 0..100 {
960                    gc.insert(t * 1000 + i, i);
961                }
962            }));
963        }
964
965        // Spawn a GC thread
966        {
967            let gc = Arc::clone(&gc);
968            handles.push(thread::spawn(move || {
969                for _ in 0..10 {
970                    gc.advance_epoch();
971                    gc.try_gc();
972                }
973            }));
974        }
975
976        for h in handles {
977            h.join().unwrap();
978        }
979
980        // Should have 400 unique keys
981        assert_eq!(gc.chain_count(), 400);
982    }
983}