Skip to main content

sochdb_core/
epoch_gc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Epoch-Based Garbage Collection for Version Cleanup
19//!
20//! This module provides epoch-based garbage collection for cleaning up
21//! old versions of data in a multi-version concurrency control (MVCC) system.
22//!
23//! # Design
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │                    Epoch-Based GC                               │
28//! │                                                                 │
29//! │  Time ──────────────────────────────────────────────────────→  │
30//! │                                                                 │
31//! │  Epoch 0     Epoch 1     Epoch 2     Epoch 3     Epoch 4       │
32//! │  ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐       │
33//! │  │v1,v2│     │v3,v4│     │v5   │     │v6,v7│     │v8   │       │
34//! │  └─────┘     └─────┘     └─────┘     └─────┘     └─────┘       │
35//! │    ↑                                               ↑           │
36//! │    │                                               │           │
37//! │  Min active                                    Current         │
38//! │  reader epoch                                   epoch          │
39//! │                                                                 │
40//! │  Versions in epochs < min_active can be safely collected       │
41//! └─────────────────────────────────────────────────────────────────┘
42//! ```
43//!
44//! # Key Concepts
45//!
46//! - **Version**: A specific snapshot of data at a point in time
47//! - **Epoch**: A logical time period during which versions are created
48//! - **Watermark**: The minimum epoch that is still accessible by readers
49//! - **GC Cycle**: Periodic cleanup of versions older than watermark
50
51use parking_lot::RwLock;
52use std::collections::{HashMap, VecDeque};
53use std::sync::Arc;
54use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
55
56/// Version identifier (epoch, sequence within epoch)
57#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
58pub struct VersionId {
59    pub epoch: u64,
60    pub sequence: u32,
61}
62
63impl VersionId {
64    pub fn new(epoch: u64, sequence: u32) -> Self {
65        Self { epoch, sequence }
66    }
67
68    /// Check if this version is older than watermark
69    pub fn is_stale(&self, watermark: u64) -> bool {
70        self.epoch < watermark
71    }
72}
73
74/// A versioned value wrapper
75#[derive(Debug, Clone)]
76pub struct VersionedValue<T> {
77    pub version: VersionId,
78    pub value: T,
79    /// Deletion marker (tombstone)
80    pub deleted: bool,
81}
82
83impl<T> VersionedValue<T> {
84    pub fn new(version: VersionId, value: T) -> Self {
85        Self {
86            version,
87            value,
88            deleted: false,
89        }
90    }
91
92    pub fn tombstone(version: VersionId, value: T) -> Self {
93        Self {
94            version,
95            value,
96            deleted: true,
97        }
98    }
99}
100
101/// Version chain for a single key
102#[derive(Debug)]
103pub struct VersionChain<T> {
104    /// Versions ordered from newest to oldest
105    versions: VecDeque<VersionedValue<T>>,
106    /// Total count of versions ever created
107    total_versions: u64,
108}
109
110impl<T: Clone> VersionChain<T> {
111    pub fn new() -> Self {
112        Self {
113            versions: VecDeque::new(),
114            total_versions: 0,
115        }
116    }
117
118    /// Add a new version (prepends to front as newest)
119    pub fn add_version(&mut self, version: VersionedValue<T>) {
120        self.versions.push_front(version);
121        self.total_versions += 1;
122    }
123
124    /// Get the latest version
125    pub fn latest(&self) -> Option<&VersionedValue<T>> {
126        self.versions.front()
127    }
128
129    /// Get version visible at specific epoch
130    /// Returns None if the key was deleted at or before the given epoch
131    pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
132        for v in &self.versions {
133            if v.version.epoch <= epoch {
134                // If this version (the most recent at or before the epoch) is deleted,
135                // the key is considered deleted at this point in time
136                if v.deleted {
137                    return None;
138                }
139                return Some(v);
140            }
141        }
142        None
143    }
144
145    /// Clean up versions older than watermark
146    /// Returns (versions_removed, bytes_freed estimate)
147    pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
148        let initial_len = self.versions.len();
149
150        // Keep at least one version (the latest visible)
151        let mut kept = 0;
152        let mut last_visible_idx = None;
153
154        for (i, v) in self.versions.iter().enumerate() {
155            if v.version.epoch >= watermark {
156                kept += 1;
157            } else {
158                // This is below watermark, but we keep the first one as the base
159                if last_visible_idx.is_none() {
160                    last_visible_idx = Some(i);
161                    kept += 1;
162                }
163            }
164        }
165
166        // Remove versions beyond what we're keeping
167        while self.versions.len() > kept {
168            self.versions.pop_back();
169        }
170
171        let removed = initial_len - self.versions.len();
172        let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
173
174        (removed, bytes_freed)
175    }
176
177    /// Number of versions in chain
178    pub fn len(&self) -> usize {
179        self.versions.len()
180    }
181
182    pub fn is_empty(&self) -> bool {
183        self.versions.is_empty()
184    }
185}
186
187impl<T: Clone> Default for VersionChain<T> {
188    fn default() -> Self {
189        Self::new()
190    }
191}
192
193/// Reader registration for tracking active epochs
194#[derive(Debug)]
195pub struct ReaderRegistry {
196    /// Active reader epochs (reader_id -> epoch)
197    active_readers: RwLock<HashMap<u64, u64>>,
198    /// Next reader ID
199    next_reader_id: AtomicU64,
200    /// Count of active readers
201    active_count: AtomicUsize,
202}
203
204impl ReaderRegistry {
205    pub fn new() -> Self {
206        Self {
207            active_readers: RwLock::new(HashMap::new()),
208            next_reader_id: AtomicU64::new(1),
209            active_count: AtomicUsize::new(0),
210        }
211    }
212
213    /// Register a reader at the current epoch
214    pub fn register(&self, epoch: u64) -> u64 {
215        let reader_id = self.next_reader_id.fetch_add(1, Ordering::Relaxed);
216        let mut readers = self.active_readers.write();
217        readers.insert(reader_id, epoch);
218        self.active_count.fetch_add(1, Ordering::Relaxed);
219        reader_id
220    }
221
222    /// Unregister a reader
223    pub fn unregister(&self, reader_id: u64) {
224        let mut readers = self.active_readers.write();
225        if readers.remove(&reader_id).is_some() {
226            self.active_count.fetch_sub(1, Ordering::Relaxed);
227        }
228    }
229
230    /// Get the minimum epoch among all active readers
231    pub fn min_active_epoch(&self) -> Option<u64> {
232        let readers = self.active_readers.read();
233        readers.values().copied().min()
234    }
235
236    /// Get count of active readers
237    pub fn active_count(&self) -> usize {
238        self.active_count.load(Ordering::Relaxed)
239    }
240}
241
242impl Default for ReaderRegistry {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248/// GC statistics
249#[derive(Debug, Default)]
250pub struct GCStats {
251    pub gc_cycles: AtomicU64,
252    pub versions_collected: AtomicU64,
253    pub bytes_freed: AtomicU64,
254    pub chains_scanned: AtomicU64,
255    pub last_gc_epoch: AtomicU64,
256    pub last_gc_duration_us: AtomicU64,
257}
258
259impl GCStats {
260    pub fn snapshot(&self) -> GCStatsSnapshot {
261        GCStatsSnapshot {
262            gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
263            versions_collected: self.versions_collected.load(Ordering::Relaxed),
264            bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
265            chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
266            last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
267            last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
268        }
269    }
270}
271
272#[derive(Debug, Clone)]
273pub struct GCStatsSnapshot {
274    pub gc_cycles: u64,
275    pub versions_collected: u64,
276    pub bytes_freed: u64,
277    pub chains_scanned: u64,
278    pub last_gc_epoch: u64,
279    pub last_gc_duration_us: u64,
280}
281
282/// GC configuration
283#[derive(Debug, Clone)]
284pub struct GCConfig {
285    /// Minimum number of epochs to keep (grace period)
286    pub min_epochs_to_keep: u64,
287    /// Trigger GC after this many new versions
288    pub gc_trigger_threshold: usize,
289    /// Maximum versions to scan per GC cycle
290    pub max_versions_per_cycle: usize,
291}
292
293impl Default for GCConfig {
294    fn default() -> Self {
295        Self {
296            min_epochs_to_keep: 2,
297            gc_trigger_threshold: 1000,
298            max_versions_per_cycle: 10000,
299        }
300    }
301}
302
303/// Epoch-based garbage collector for versioned data
304pub struct EpochGC<K, V>
305where
306    K: Eq + std::hash::Hash + Clone,
307    V: Clone,
308{
309    /// Global epoch counter
310    current_epoch: AtomicU64,
311    /// Sequence counter within epoch
312    current_sequence: AtomicU64,
313    /// Version chains by key
314    chains: RwLock<HashMap<K, VersionChain<V>>>,
315    /// Reader registry
316    readers: Arc<ReaderRegistry>,
317    /// GC configuration
318    config: GCConfig,
319    /// GC statistics
320    stats: GCStats,
321    /// Pending versions since last GC
322    pending_versions: AtomicUsize,
323}
324
325impl<K, V> EpochGC<K, V>
326where
327    K: Eq + std::hash::Hash + Clone,
328    V: Clone,
329{
330    /// Create new epoch GC with default config
331    pub fn new() -> Self {
332        Self::with_config(GCConfig::default())
333    }
334
335    /// Create with custom config
336    pub fn with_config(config: GCConfig) -> Self {
337        Self {
338            current_epoch: AtomicU64::new(0),
339            current_sequence: AtomicU64::new(0),
340            chains: RwLock::new(HashMap::new()),
341            readers: Arc::new(ReaderRegistry::new()),
342            config,
343            stats: GCStats::default(),
344            pending_versions: AtomicUsize::new(0),
345        }
346    }
347
348    /// Get current epoch
349    pub fn current_epoch(&self) -> u64 {
350        self.current_epoch.load(Ordering::SeqCst)
351    }
352
353    /// Advance to next epoch
354    pub fn advance_epoch(&self) -> u64 {
355        self.current_sequence.store(0, Ordering::SeqCst);
356        self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
357    }
358
359    /// Allocate a new version ID
360    pub fn next_version(&self) -> VersionId {
361        let epoch = self.current_epoch.load(Ordering::SeqCst);
362        let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
363        VersionId::new(epoch, seq)
364    }
365
366    /// Insert a new version
367    pub fn insert(&self, key: K, value: V) -> VersionId {
368        let version = self.next_version();
369        let versioned = VersionedValue::new(version, value);
370
371        {
372            let mut chains = self.chains.write();
373            chains.entry(key).or_default().add_version(versioned);
374        }
375
376        let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
377
378        // Trigger GC if threshold exceeded
379        if pending >= self.config.gc_trigger_threshold {
380            self.try_gc();
381        }
382
383        version
384    }
385
386    /// Delete a key (insert tombstone)
387    pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
388        let version = self.next_version();
389        let versioned = VersionedValue::tombstone(version, tombstone_value);
390
391        {
392            let mut chains = self.chains.write();
393            chains.entry(key).or_default().add_version(versioned);
394        }
395
396        self.pending_versions.fetch_add(1, Ordering::Relaxed);
397        version
398    }
399
400    /// Get latest version of a key
401    pub fn get(&self, key: &K) -> Option<V> {
402        let chains = self.chains.read();
403        chains
404            .get(key)
405            .and_then(|chain| chain.latest())
406            .filter(|v| !v.deleted)
407            .map(|v| v.value.clone())
408    }
409
410    /// Get version at specific epoch
411    pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
412        let chains = self.chains.read();
413        chains
414            .get(key)
415            .and_then(|chain| chain.version_at(epoch))
416            .map(|v| v.value.clone())
417    }
418
419    /// Begin a read transaction at current epoch
420    pub fn begin_read(&self) -> ReadGuard {
421        let epoch = self.current_epoch.load(Ordering::SeqCst);
422        let reader_id = self.readers.register(epoch);
423        ReadGuard {
424            epoch,
425            reader_id,
426            registry: Arc::clone(&self.readers),
427        }
428    }
429
430    /// Calculate the GC watermark (safe to collect below this)
431    pub fn watermark(&self) -> u64 {
432        let current = self.current_epoch.load(Ordering::SeqCst);
433        let min_reader = self.readers.min_active_epoch().unwrap_or(current);
434
435        // Watermark is the minimum of current - grace period and min active reader
436        let grace = current.saturating_sub(self.config.min_epochs_to_keep);
437        grace.min(min_reader)
438    }
439
440    /// Try to run a GC cycle
441    pub fn try_gc(&self) -> GCResult {
442        let start = std::time::Instant::now();
443        let watermark = self.watermark();
444
445        let mut versions_collected = 0;
446        let mut bytes_freed = 0;
447        let mut chains_scanned = 0;
448
449        {
450            let mut chains = self.chains.write();
451            let keys: Vec<K> = chains.keys().cloned().collect();
452
453            for key in keys {
454                if chains_scanned >= self.config.max_versions_per_cycle {
455                    break;
456                }
457
458                if let Some(chain) = chains.get_mut(&key) {
459                    let (removed, freed) = chain.gc(watermark);
460                    versions_collected += removed;
461                    bytes_freed += freed;
462                    chains_scanned += 1;
463
464                    // Remove empty chains
465                    if chain.is_empty() {
466                        chains.remove(&key);
467                    }
468                }
469            }
470        }
471
472        let duration = start.elapsed();
473
474        // Update stats
475        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
476        self.stats
477            .versions_collected
478            .fetch_add(versions_collected as u64, Ordering::Relaxed);
479        self.stats
480            .bytes_freed
481            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
482        self.stats
483            .chains_scanned
484            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
485        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
486        self.stats
487            .last_gc_duration_us
488            .store(duration.as_micros() as u64, Ordering::Relaxed);
489
490        // Reset pending counter
491        self.pending_versions.store(0, Ordering::Relaxed);
492
493        GCResult {
494            versions_collected,
495            bytes_freed,
496            chains_scanned,
497            watermark,
498            duration_us: duration.as_micros() as u64,
499        }
500    }
501
502    /// Force a full GC (ignore limits)
503    pub fn force_gc(&self) -> GCResult {
504        let _old_limit = self.config.max_versions_per_cycle;
505        // Temporarily set limit to max
506        let _config = GCConfig {
507            max_versions_per_cycle: usize::MAX,
508            ..self.config.clone()
509        };
510
511        let start = std::time::Instant::now();
512        let watermark = self.watermark();
513
514        let mut versions_collected = 0;
515        let mut bytes_freed = 0;
516        let mut chains_scanned = 0;
517
518        {
519            let mut chains = self.chains.write();
520            for chain in chains.values_mut() {
521                let (removed, freed) = chain.gc(watermark);
522                versions_collected += removed;
523                bytes_freed += freed;
524                chains_scanned += 1;
525            }
526
527            // Remove empty chains
528            chains.retain(|_, chain| !chain.is_empty());
529        }
530
531        let duration = start.elapsed();
532
533        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
534        self.stats
535            .versions_collected
536            .fetch_add(versions_collected as u64, Ordering::Relaxed);
537        self.stats
538            .bytes_freed
539            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
540        self.stats
541            .chains_scanned
542            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
543        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
544        self.stats
545            .last_gc_duration_us
546            .store(duration.as_micros() as u64, Ordering::Relaxed);
547        self.pending_versions.store(0, Ordering::Relaxed);
548
549        GCResult {
550            versions_collected,
551            bytes_freed,
552            chains_scanned,
553            watermark,
554            duration_us: duration.as_micros() as u64,
555        }
556    }
557
558    /// Get GC statistics
559    pub fn stats(&self) -> GCStatsSnapshot {
560        self.stats.snapshot()
561    }
562
563    /// Get total version count
564    pub fn version_count(&self) -> usize {
565        let chains = self.chains.read();
566        chains.values().map(|c| c.len()).sum()
567    }
568
569    /// Get chain count
570    pub fn chain_count(&self) -> usize {
571        self.chains.read().len()
572    }
573}
574
575impl<K, V> Default for EpochGC<K, V>
576where
577    K: Eq + std::hash::Hash + Clone,
578    V: Clone,
579{
580    fn default() -> Self {
581        Self::new()
582    }
583}
584
585/// Result of a GC cycle
586#[derive(Debug, Clone)]
587pub struct GCResult {
588    pub versions_collected: usize,
589    pub bytes_freed: usize,
590    pub chains_scanned: usize,
591    pub watermark: u64,
592    pub duration_us: u64,
593}
594
595/// Guard for read transactions
596pub struct ReadGuard {
597    pub epoch: u64,
598    reader_id: u64,
599    registry: Arc<ReaderRegistry>,
600}
601
602impl Drop for ReadGuard {
603    fn drop(&mut self) {
604        self.registry.unregister(self.reader_id);
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use super::*;
611
612    #[test]
613    fn test_version_id() {
614        let v1 = VersionId::new(1, 0);
615        let v2 = VersionId::new(2, 0);
616
617        assert!(v1 < v2);
618        assert!(v1.is_stale(2));
619        assert!(!v2.is_stale(2));
620    }
621
622    #[test]
623    fn test_version_chain_basic() {
624        let mut chain: VersionChain<String> = VersionChain::new();
625
626        chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
627        chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
628
629        assert_eq!(chain.len(), 2);
630        assert_eq!(chain.latest().unwrap().value, "v2");
631    }
632
633    #[test]
634    fn test_version_chain_gc() {
635        let mut chain: VersionChain<String> = VersionChain::new();
636
637        // Add versions at different epochs
638        for epoch in 0..5 {
639            chain.add_version(VersionedValue::new(
640                VersionId::new(epoch, 0),
641                format!("v{}", epoch),
642            ));
643        }
644
645        assert_eq!(chain.len(), 5);
646
647        // GC with watermark at epoch 3
648        let (removed, _) = chain.gc(3);
649
650        // Should keep epochs 3,4 and one base version
651        assert!(removed > 0);
652        assert!(chain.len() < 5);
653    }
654
655    #[test]
656    fn test_reader_registry() {
657        let registry = ReaderRegistry::new();
658
659        let r1 = registry.register(10);
660        let _r2 = registry.register(20);
661
662        assert_eq!(registry.active_count(), 2);
663        assert_eq!(registry.min_active_epoch(), Some(10));
664
665        registry.unregister(r1);
666        assert_eq!(registry.active_count(), 1);
667        assert_eq!(registry.min_active_epoch(), Some(20));
668    }
669
670    #[test]
671    fn test_epoch_gc_basic() {
672        let gc: EpochGC<String, i32> = EpochGC::new();
673
674        let _v1 = gc.insert("key1".to_string(), 100);
675        let _v2 = gc.insert("key1".to_string(), 200);
676
677        assert_eq!(gc.get(&"key1".to_string()), Some(200));
678        assert_eq!(gc.version_count(), 2);
679    }
680
681    #[test]
682    fn test_epoch_gc_delete() {
683        let gc: EpochGC<String, i32> = EpochGC::new();
684
685        gc.insert("key1".to_string(), 100);
686        gc.delete("key1".to_string(), 0); // tombstone
687
688        assert_eq!(gc.get(&"key1".to_string()), None);
689    }
690
691    #[test]
692    fn test_epoch_gc_at_epoch() {
693        let gc: EpochGC<String, i32> = EpochGC::new();
694
695        gc.insert("key1".to_string(), 100);
696        gc.advance_epoch();
697        gc.insert("key1".to_string(), 200);
698        gc.advance_epoch();
699        gc.insert("key1".to_string(), 300);
700
701        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), Some(100));
702        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(200));
703        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(300));
704    }
705
706    #[test]
707    fn test_read_guard() {
708        let gc: EpochGC<String, i32> = EpochGC::new();
709
710        gc.insert("key1".to_string(), 100);
711
712        {
713            let _guard = gc.begin_read();
714            assert_eq!(gc.readers.active_count(), 1);
715        }
716
717        assert_eq!(gc.readers.active_count(), 0);
718    }
719
720    #[test]
721    fn test_watermark_calculation() {
722        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
723            min_epochs_to_keep: 2,
724            ..Default::default()
725        });
726
727        // Epoch 0
728        gc.insert("k".to_string(), 1);
729        gc.advance_epoch(); // Epoch 1
730        gc.insert("k".to_string(), 2);
731        gc.advance_epoch(); // Epoch 2
732        gc.insert("k".to_string(), 3);
733        gc.advance_epoch(); // Epoch 3
734        gc.insert("k".to_string(), 4);
735        gc.advance_epoch(); // Epoch 4
736
737        // With no readers and grace period 2, watermark should be 4-2=2
738        assert!(gc.watermark() <= 2);
739
740        // With a reader at epoch 1, watermark should be min(2, 1) = 1
741        let _guard = gc.begin_read();
742        assert!(gc.watermark() <= gc.current_epoch());
743    }
744
745    #[test]
746    fn test_gc_cycle() {
747        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
748            min_epochs_to_keep: 1,
749            gc_trigger_threshold: 100,
750            max_versions_per_cycle: 100,
751        });
752
753        // Create multiple versions across epochs
754        for i in 0..10 {
755            gc.insert("key".to_string(), i);
756            gc.advance_epoch();
757        }
758
759        assert_eq!(gc.version_count(), 10);
760
761        // Run GC
762        let result = gc.try_gc();
763
764        // Should have collected some versions
765        assert!(result.versions_collected > 0 || gc.version_count() < 10);
766    }
767
768    #[test]
769    fn test_gc_stats() {
770        let gc: EpochGC<String, i32> = EpochGC::new();
771
772        for i in 0..5 {
773            gc.insert("key".to_string(), i);
774            gc.advance_epoch();
775        }
776
777        gc.try_gc();
778
779        let stats = gc.stats();
780        assert!(stats.gc_cycles >= 1);
781    }
782
783    #[test]
784    fn test_force_gc() {
785        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
786            min_epochs_to_keep: 0,
787            gc_trigger_threshold: 1000,
788            max_versions_per_cycle: 1, // Very limited
789        });
790
791        for i in 0..20 {
792            gc.insert(format!("key{}", i), i);
793        }
794
795        gc.advance_epoch();
796        gc.advance_epoch();
797
798        let initial_count = gc.version_count();
799        gc.force_gc();
800        let final_count = gc.version_count();
801
802        // Force GC should process all chains regardless of limit
803        assert!(final_count <= initial_count);
804    }
805
806    #[test]
807    fn test_chain_count() {
808        let gc: EpochGC<String, i32> = EpochGC::new();
809
810        gc.insert("key1".to_string(), 1);
811        gc.insert("key2".to_string(), 2);
812        gc.insert("key3".to_string(), 3);
813
814        assert_eq!(gc.chain_count(), 3);
815    }
816
817    #[test]
818    fn test_version_at_respects_tombstone() {
819        let mut chain: VersionChain<i32> = VersionChain::new();
820
821        chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
822        chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
823
824        // Should not return tombstone
825        assert!(chain.version_at(1).is_none());
826        // Should return the version before tombstone
827        assert_eq!(chain.version_at(0).map(|v| v.value), Some(100));
828    }
829
830    #[test]
831    fn test_gc_result_fields() {
832        let gc: EpochGC<String, i32> = EpochGC::new();
833
834        for i in 0..5 {
835            gc.insert("key".to_string(), i);
836            gc.advance_epoch();
837        }
838
839        let result = gc.try_gc();
840
841        // Verify result has reasonable values
842        assert!(result.watermark <= gc.current_epoch());
843        assert!(result.chains_scanned <= gc.chain_count() + 1);
844    }
845}