sochdb_core/
epoch_gc.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Epoch-Based Garbage Collection for Version Cleanup
16//!
17//! This module provides epoch-based garbage collection for cleaning up
18//! old versions of data in a multi-version concurrency control (MVCC) system.
19//!
20//! # Design
21//!
22//! ```text
23//! ┌─────────────────────────────────────────────────────────────────┐
24//! │                    Epoch-Based GC                               │
25//! │                                                                 │
26//! │  Time ──────────────────────────────────────────────────────→  │
27//! │                                                                 │
28//! │  Epoch 0     Epoch 1     Epoch 2     Epoch 3     Epoch 4       │
29//! │  ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐     ┌─────┐       │
30//! │  │v1,v2│     │v3,v4│     │v5   │     │v6,v7│     │v8   │       │
31//! │  └─────┘     └─────┘     └─────┘     └─────┘     └─────┘       │
32//! │    ↑                                               ↑           │
33//! │    │                                               │           │
34//! │  Min active                                    Current         │
35//! │  reader epoch                                   epoch          │
36//! │                                                                 │
37//! │  Versions in epochs < min_active can be safely collected       │
38//! └─────────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! # Key Concepts
42//!
43//! - **Version**: A specific snapshot of data at a point in time
44//! - **Epoch**: A logical time period during which versions are created
45//! - **Watermark**: The minimum epoch that is still accessible by readers
46//! - **GC Cycle**: Periodic cleanup of versions older than watermark
47
48use parking_lot::RwLock;
49use std::collections::{HashMap, VecDeque};
50use std::sync::Arc;
51use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
52
53/// Version identifier (epoch, sequence within epoch)
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub struct VersionId {
56    pub epoch: u64,
57    pub sequence: u32,
58}
59
60impl VersionId {
61    pub fn new(epoch: u64, sequence: u32) -> Self {
62        Self { epoch, sequence }
63    }
64
65    /// Check if this version is older than watermark
66    pub fn is_stale(&self, watermark: u64) -> bool {
67        self.epoch < watermark
68    }
69}
70
71/// A versioned value wrapper
72#[derive(Debug, Clone)]
73pub struct VersionedValue<T> {
74    pub version: VersionId,
75    pub value: T,
76    /// Deletion marker (tombstone)
77    pub deleted: bool,
78}
79
80impl<T> VersionedValue<T> {
81    pub fn new(version: VersionId, value: T) -> Self {
82        Self {
83            version,
84            value,
85            deleted: false,
86        }
87    }
88
89    pub fn tombstone(version: VersionId, value: T) -> Self {
90        Self {
91            version,
92            value,
93            deleted: true,
94        }
95    }
96}
97
98/// Version chain for a single key
99#[derive(Debug)]
100pub struct VersionChain<T> {
101    /// Versions ordered from newest to oldest
102    versions: VecDeque<VersionedValue<T>>,
103    /// Total count of versions ever created
104    total_versions: u64,
105}
106
107impl<T: Clone> VersionChain<T> {
108    pub fn new() -> Self {
109        Self {
110            versions: VecDeque::new(),
111            total_versions: 0,
112        }
113    }
114
115    /// Add a new version (prepends to front as newest)
116    pub fn add_version(&mut self, version: VersionedValue<T>) {
117        self.versions.push_front(version);
118        self.total_versions += 1;
119    }
120
121    /// Get the latest version
122    pub fn latest(&self) -> Option<&VersionedValue<T>> {
123        self.versions.front()
124    }
125
126    /// Get version visible at specific epoch
127    /// Returns None if the key was deleted at or before the given epoch
128    pub fn version_at(&self, epoch: u64) -> Option<&VersionedValue<T>> {
129        for v in &self.versions {
130            if v.version.epoch <= epoch {
131                // If this version (the most recent at or before the epoch) is deleted,
132                // the key is considered deleted at this point in time
133                if v.deleted {
134                    return None;
135                }
136                return Some(v);
137            }
138        }
139        None
140    }
141
142    /// Clean up versions older than watermark
143    /// Returns (versions_removed, bytes_freed estimate)
144    pub fn gc(&mut self, watermark: u64) -> (usize, usize) {
145        let initial_len = self.versions.len();
146
147        // Keep at least one version (the latest visible)
148        let mut kept = 0;
149        let mut last_visible_idx = None;
150
151        for (i, v) in self.versions.iter().enumerate() {
152            if v.version.epoch >= watermark {
153                kept += 1;
154            } else {
155                // This is below watermark, but we keep the first one as the base
156                if last_visible_idx.is_none() {
157                    last_visible_idx = Some(i);
158                    kept += 1;
159                }
160            }
161        }
162
163        // Remove versions beyond what we're keeping
164        while self.versions.len() > kept {
165            self.versions.pop_back();
166        }
167
168        let removed = initial_len - self.versions.len();
169        let bytes_freed = removed * std::mem::size_of::<VersionedValue<T>>();
170
171        (removed, bytes_freed)
172    }
173
174    /// Number of versions in chain
175    pub fn len(&self) -> usize {
176        self.versions.len()
177    }
178
179    pub fn is_empty(&self) -> bool {
180        self.versions.is_empty()
181    }
182}
183
184impl<T: Clone> Default for VersionChain<T> {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190/// Reader registration for tracking active epochs
191#[derive(Debug)]
192pub struct ReaderRegistry {
193    /// Active reader epochs (reader_id -> epoch)
194    active_readers: RwLock<HashMap<u64, u64>>,
195    /// Next reader ID
196    next_reader_id: AtomicU64,
197    /// Count of active readers
198    active_count: AtomicUsize,
199}
200
201impl ReaderRegistry {
202    pub fn new() -> Self {
203        Self {
204            active_readers: RwLock::new(HashMap::new()),
205            next_reader_id: AtomicU64::new(1),
206            active_count: AtomicUsize::new(0),
207        }
208    }
209
210    /// Register a reader at the current epoch
211    pub fn register(&self, epoch: u64) -> u64 {
212        let reader_id = self.next_reader_id.fetch_add(1, Ordering::Relaxed);
213        let mut readers = self.active_readers.write();
214        readers.insert(reader_id, epoch);
215        self.active_count.fetch_add(1, Ordering::Relaxed);
216        reader_id
217    }
218
219    /// Unregister a reader
220    pub fn unregister(&self, reader_id: u64) {
221        let mut readers = self.active_readers.write();
222        if readers.remove(&reader_id).is_some() {
223            self.active_count.fetch_sub(1, Ordering::Relaxed);
224        }
225    }
226
227    /// Get the minimum epoch among all active readers
228    pub fn min_active_epoch(&self) -> Option<u64> {
229        let readers = self.active_readers.read();
230        readers.values().copied().min()
231    }
232
233    /// Get count of active readers
234    pub fn active_count(&self) -> usize {
235        self.active_count.load(Ordering::Relaxed)
236    }
237}
238
239impl Default for ReaderRegistry {
240    fn default() -> Self {
241        Self::new()
242    }
243}
244
245/// GC statistics
246#[derive(Debug, Default)]
247pub struct GCStats {
248    pub gc_cycles: AtomicU64,
249    pub versions_collected: AtomicU64,
250    pub bytes_freed: AtomicU64,
251    pub chains_scanned: AtomicU64,
252    pub last_gc_epoch: AtomicU64,
253    pub last_gc_duration_us: AtomicU64,
254}
255
256impl GCStats {
257    pub fn snapshot(&self) -> GCStatsSnapshot {
258        GCStatsSnapshot {
259            gc_cycles: self.gc_cycles.load(Ordering::Relaxed),
260            versions_collected: self.versions_collected.load(Ordering::Relaxed),
261            bytes_freed: self.bytes_freed.load(Ordering::Relaxed),
262            chains_scanned: self.chains_scanned.load(Ordering::Relaxed),
263            last_gc_epoch: self.last_gc_epoch.load(Ordering::Relaxed),
264            last_gc_duration_us: self.last_gc_duration_us.load(Ordering::Relaxed),
265        }
266    }
267}
268
269#[derive(Debug, Clone)]
270pub struct GCStatsSnapshot {
271    pub gc_cycles: u64,
272    pub versions_collected: u64,
273    pub bytes_freed: u64,
274    pub chains_scanned: u64,
275    pub last_gc_epoch: u64,
276    pub last_gc_duration_us: u64,
277}
278
279/// GC configuration
280#[derive(Debug, Clone)]
281pub struct GCConfig {
282    /// Minimum number of epochs to keep (grace period)
283    pub min_epochs_to_keep: u64,
284    /// Trigger GC after this many new versions
285    pub gc_trigger_threshold: usize,
286    /// Maximum versions to scan per GC cycle
287    pub max_versions_per_cycle: usize,
288}
289
290impl Default for GCConfig {
291    fn default() -> Self {
292        Self {
293            min_epochs_to_keep: 2,
294            gc_trigger_threshold: 1000,
295            max_versions_per_cycle: 10000,
296        }
297    }
298}
299
300/// Epoch-based garbage collector for versioned data
301pub struct EpochGC<K, V>
302where
303    K: Eq + std::hash::Hash + Clone,
304    V: Clone,
305{
306    /// Global epoch counter
307    current_epoch: AtomicU64,
308    /// Sequence counter within epoch
309    current_sequence: AtomicU64,
310    /// Version chains by key
311    chains: RwLock<HashMap<K, VersionChain<V>>>,
312    /// Reader registry
313    readers: Arc<ReaderRegistry>,
314    /// GC configuration
315    config: GCConfig,
316    /// GC statistics
317    stats: GCStats,
318    /// Pending versions since last GC
319    pending_versions: AtomicUsize,
320}
321
322impl<K, V> EpochGC<K, V>
323where
324    K: Eq + std::hash::Hash + Clone,
325    V: Clone,
326{
327    /// Create new epoch GC with default config
328    pub fn new() -> Self {
329        Self::with_config(GCConfig::default())
330    }
331
332    /// Create with custom config
333    pub fn with_config(config: GCConfig) -> Self {
334        Self {
335            current_epoch: AtomicU64::new(0),
336            current_sequence: AtomicU64::new(0),
337            chains: RwLock::new(HashMap::new()),
338            readers: Arc::new(ReaderRegistry::new()),
339            config,
340            stats: GCStats::default(),
341            pending_versions: AtomicUsize::new(0),
342        }
343    }
344
345    /// Get current epoch
346    pub fn current_epoch(&self) -> u64 {
347        self.current_epoch.load(Ordering::SeqCst)
348    }
349
350    /// Advance to next epoch
351    pub fn advance_epoch(&self) -> u64 {
352        self.current_sequence.store(0, Ordering::SeqCst);
353        self.current_epoch.fetch_add(1, Ordering::SeqCst) + 1
354    }
355
356    /// Allocate a new version ID
357    pub fn next_version(&self) -> VersionId {
358        let epoch = self.current_epoch.load(Ordering::SeqCst);
359        let seq = self.current_sequence.fetch_add(1, Ordering::SeqCst) as u32;
360        VersionId::new(epoch, seq)
361    }
362
363    /// Insert a new version
364    pub fn insert(&self, key: K, value: V) -> VersionId {
365        let version = self.next_version();
366        let versioned = VersionedValue::new(version, value);
367
368        {
369            let mut chains = self.chains.write();
370            chains.entry(key).or_default().add_version(versioned);
371        }
372
373        let pending = self.pending_versions.fetch_add(1, Ordering::Relaxed);
374
375        // Trigger GC if threshold exceeded
376        if pending >= self.config.gc_trigger_threshold {
377            self.try_gc();
378        }
379
380        version
381    }
382
383    /// Delete a key (insert tombstone)
384    pub fn delete(&self, key: K, tombstone_value: V) -> VersionId {
385        let version = self.next_version();
386        let versioned = VersionedValue::tombstone(version, tombstone_value);
387
388        {
389            let mut chains = self.chains.write();
390            chains.entry(key).or_default().add_version(versioned);
391        }
392
393        self.pending_versions.fetch_add(1, Ordering::Relaxed);
394        version
395    }
396
397    /// Get latest version of a key
398    pub fn get(&self, key: &K) -> Option<V> {
399        let chains = self.chains.read();
400        chains
401            .get(key)
402            .and_then(|chain| chain.latest())
403            .filter(|v| !v.deleted)
404            .map(|v| v.value.clone())
405    }
406
407    /// Get version at specific epoch
408    pub fn get_at_epoch(&self, key: &K, epoch: u64) -> Option<V> {
409        let chains = self.chains.read();
410        chains
411            .get(key)
412            .and_then(|chain| chain.version_at(epoch))
413            .map(|v| v.value.clone())
414    }
415
416    /// Begin a read transaction at current epoch
417    pub fn begin_read(&self) -> ReadGuard {
418        let epoch = self.current_epoch.load(Ordering::SeqCst);
419        let reader_id = self.readers.register(epoch);
420        ReadGuard {
421            epoch,
422            reader_id,
423            registry: Arc::clone(&self.readers),
424        }
425    }
426
427    /// Calculate the GC watermark (safe to collect below this)
428    pub fn watermark(&self) -> u64 {
429        let current = self.current_epoch.load(Ordering::SeqCst);
430        let min_reader = self.readers.min_active_epoch().unwrap_or(current);
431
432        // Watermark is the minimum of current - grace period and min active reader
433        let grace = current.saturating_sub(self.config.min_epochs_to_keep);
434        grace.min(min_reader)
435    }
436
437    /// Try to run a GC cycle
438    pub fn try_gc(&self) -> GCResult {
439        let start = std::time::Instant::now();
440        let watermark = self.watermark();
441
442        let mut versions_collected = 0;
443        let mut bytes_freed = 0;
444        let mut chains_scanned = 0;
445
446        {
447            let mut chains = self.chains.write();
448            let keys: Vec<K> = chains.keys().cloned().collect();
449
450            for key in keys {
451                if chains_scanned >= self.config.max_versions_per_cycle {
452                    break;
453                }
454
455                if let Some(chain) = chains.get_mut(&key) {
456                    let (removed, freed) = chain.gc(watermark);
457                    versions_collected += removed;
458                    bytes_freed += freed;
459                    chains_scanned += 1;
460
461                    // Remove empty chains
462                    if chain.is_empty() {
463                        chains.remove(&key);
464                    }
465                }
466            }
467        }
468
469        let duration = start.elapsed();
470
471        // Update stats
472        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
473        self.stats
474            .versions_collected
475            .fetch_add(versions_collected as u64, Ordering::Relaxed);
476        self.stats
477            .bytes_freed
478            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
479        self.stats
480            .chains_scanned
481            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
482        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
483        self.stats
484            .last_gc_duration_us
485            .store(duration.as_micros() as u64, Ordering::Relaxed);
486
487        // Reset pending counter
488        self.pending_versions.store(0, Ordering::Relaxed);
489
490        GCResult {
491            versions_collected,
492            bytes_freed,
493            chains_scanned,
494            watermark,
495            duration_us: duration.as_micros() as u64,
496        }
497    }
498
499    /// Force a full GC (ignore limits)
500    pub fn force_gc(&self) -> GCResult {
501        let _old_limit = self.config.max_versions_per_cycle;
502        // Temporarily set limit to max
503        let _config = GCConfig {
504            max_versions_per_cycle: usize::MAX,
505            ..self.config.clone()
506        };
507
508        let start = std::time::Instant::now();
509        let watermark = self.watermark();
510
511        let mut versions_collected = 0;
512        let mut bytes_freed = 0;
513        let mut chains_scanned = 0;
514
515        {
516            let mut chains = self.chains.write();
517            for chain in chains.values_mut() {
518                let (removed, freed) = chain.gc(watermark);
519                versions_collected += removed;
520                bytes_freed += freed;
521                chains_scanned += 1;
522            }
523
524            // Remove empty chains
525            chains.retain(|_, chain| !chain.is_empty());
526        }
527
528        let duration = start.elapsed();
529
530        self.stats.gc_cycles.fetch_add(1, Ordering::Relaxed);
531        self.stats
532            .versions_collected
533            .fetch_add(versions_collected as u64, Ordering::Relaxed);
534        self.stats
535            .bytes_freed
536            .fetch_add(bytes_freed as u64, Ordering::Relaxed);
537        self.stats
538            .chains_scanned
539            .fetch_add(chains_scanned as u64, Ordering::Relaxed);
540        self.stats.last_gc_epoch.store(watermark, Ordering::Relaxed);
541        self.stats
542            .last_gc_duration_us
543            .store(duration.as_micros() as u64, Ordering::Relaxed);
544        self.pending_versions.store(0, Ordering::Relaxed);
545
546        GCResult {
547            versions_collected,
548            bytes_freed,
549            chains_scanned,
550            watermark,
551            duration_us: duration.as_micros() as u64,
552        }
553    }
554
555    /// Get GC statistics
556    pub fn stats(&self) -> GCStatsSnapshot {
557        self.stats.snapshot()
558    }
559
560    /// Get total version count
561    pub fn version_count(&self) -> usize {
562        let chains = self.chains.read();
563        chains.values().map(|c| c.len()).sum()
564    }
565
566    /// Get chain count
567    pub fn chain_count(&self) -> usize {
568        self.chains.read().len()
569    }
570}
571
572impl<K, V> Default for EpochGC<K, V>
573where
574    K: Eq + std::hash::Hash + Clone,
575    V: Clone,
576{
577    fn default() -> Self {
578        Self::new()
579    }
580}
581
582/// Result of a GC cycle
583#[derive(Debug, Clone)]
584pub struct GCResult {
585    pub versions_collected: usize,
586    pub bytes_freed: usize,
587    pub chains_scanned: usize,
588    pub watermark: u64,
589    pub duration_us: u64,
590}
591
592/// Guard for read transactions
593pub struct ReadGuard {
594    pub epoch: u64,
595    reader_id: u64,
596    registry: Arc<ReaderRegistry>,
597}
598
599impl Drop for ReadGuard {
600    fn drop(&mut self) {
601        self.registry.unregister(self.reader_id);
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use super::*;
608
609    #[test]
610    fn test_version_id() {
611        let v1 = VersionId::new(1, 0);
612        let v2 = VersionId::new(2, 0);
613
614        assert!(v1 < v2);
615        assert!(v1.is_stale(2));
616        assert!(!v2.is_stale(2));
617    }
618
619    #[test]
620    fn test_version_chain_basic() {
621        let mut chain: VersionChain<String> = VersionChain::new();
622
623        chain.add_version(VersionedValue::new(VersionId::new(0, 0), "v1".to_string()));
624        chain.add_version(VersionedValue::new(VersionId::new(1, 0), "v2".to_string()));
625
626        assert_eq!(chain.len(), 2);
627        assert_eq!(chain.latest().unwrap().value, "v2");
628    }
629
630    #[test]
631    fn test_version_chain_gc() {
632        let mut chain: VersionChain<String> = VersionChain::new();
633
634        // Add versions at different epochs
635        for epoch in 0..5 {
636            chain.add_version(VersionedValue::new(
637                VersionId::new(epoch, 0),
638                format!("v{}", epoch),
639            ));
640        }
641
642        assert_eq!(chain.len(), 5);
643
644        // GC with watermark at epoch 3
645        let (removed, _) = chain.gc(3);
646
647        // Should keep epochs 3,4 and one base version
648        assert!(removed > 0);
649        assert!(chain.len() < 5);
650    }
651
652    #[test]
653    fn test_reader_registry() {
654        let registry = ReaderRegistry::new();
655
656        let r1 = registry.register(10);
657        let _r2 = registry.register(20);
658
659        assert_eq!(registry.active_count(), 2);
660        assert_eq!(registry.min_active_epoch(), Some(10));
661
662        registry.unregister(r1);
663        assert_eq!(registry.active_count(), 1);
664        assert_eq!(registry.min_active_epoch(), Some(20));
665    }
666
667    #[test]
668    fn test_epoch_gc_basic() {
669        let gc: EpochGC<String, i32> = EpochGC::new();
670
671        let _v1 = gc.insert("key1".to_string(), 100);
672        let _v2 = gc.insert("key1".to_string(), 200);
673
674        assert_eq!(gc.get(&"key1".to_string()), Some(200));
675        assert_eq!(gc.version_count(), 2);
676    }
677
678    #[test]
679    fn test_epoch_gc_delete() {
680        let gc: EpochGC<String, i32> = EpochGC::new();
681
682        gc.insert("key1".to_string(), 100);
683        gc.delete("key1".to_string(), 0); // tombstone
684
685        assert_eq!(gc.get(&"key1".to_string()), None);
686    }
687
688    #[test]
689    fn test_epoch_gc_at_epoch() {
690        let gc: EpochGC<String, i32> = EpochGC::new();
691
692        gc.insert("key1".to_string(), 100);
693        gc.advance_epoch();
694        gc.insert("key1".to_string(), 200);
695        gc.advance_epoch();
696        gc.insert("key1".to_string(), 300);
697
698        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 0), Some(100));
699        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 1), Some(200));
700        assert_eq!(gc.get_at_epoch(&"key1".to_string(), 2), Some(300));
701    }
702
703    #[test]
704    fn test_read_guard() {
705        let gc: EpochGC<String, i32> = EpochGC::new();
706
707        gc.insert("key1".to_string(), 100);
708
709        {
710            let _guard = gc.begin_read();
711            assert_eq!(gc.readers.active_count(), 1);
712        }
713
714        assert_eq!(gc.readers.active_count(), 0);
715    }
716
717    #[test]
718    fn test_watermark_calculation() {
719        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
720            min_epochs_to_keep: 2,
721            ..Default::default()
722        });
723
724        // Epoch 0
725        gc.insert("k".to_string(), 1);
726        gc.advance_epoch(); // Epoch 1
727        gc.insert("k".to_string(), 2);
728        gc.advance_epoch(); // Epoch 2
729        gc.insert("k".to_string(), 3);
730        gc.advance_epoch(); // Epoch 3
731        gc.insert("k".to_string(), 4);
732        gc.advance_epoch(); // Epoch 4
733
734        // With no readers and grace period 2, watermark should be 4-2=2
735        assert!(gc.watermark() <= 2);
736
737        // With a reader at epoch 1, watermark should be min(2, 1) = 1
738        let _guard = gc.begin_read();
739        assert!(gc.watermark() <= gc.current_epoch());
740    }
741
742    #[test]
743    fn test_gc_cycle() {
744        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
745            min_epochs_to_keep: 1,
746            gc_trigger_threshold: 100,
747            max_versions_per_cycle: 100,
748        });
749
750        // Create multiple versions across epochs
751        for i in 0..10 {
752            gc.insert("key".to_string(), i);
753            gc.advance_epoch();
754        }
755
756        assert_eq!(gc.version_count(), 10);
757
758        // Run GC
759        let result = gc.try_gc();
760
761        // Should have collected some versions
762        assert!(result.versions_collected > 0 || gc.version_count() < 10);
763    }
764
765    #[test]
766    fn test_gc_stats() {
767        let gc: EpochGC<String, i32> = EpochGC::new();
768
769        for i in 0..5 {
770            gc.insert("key".to_string(), i);
771            gc.advance_epoch();
772        }
773
774        gc.try_gc();
775
776        let stats = gc.stats();
777        assert!(stats.gc_cycles >= 1);
778    }
779
780    #[test]
781    fn test_force_gc() {
782        let gc: EpochGC<String, i32> = EpochGC::with_config(GCConfig {
783            min_epochs_to_keep: 0,
784            gc_trigger_threshold: 1000,
785            max_versions_per_cycle: 1, // Very limited
786        });
787
788        for i in 0..20 {
789            gc.insert(format!("key{}", i), i);
790        }
791
792        gc.advance_epoch();
793        gc.advance_epoch();
794
795        let initial_count = gc.version_count();
796        gc.force_gc();
797        let final_count = gc.version_count();
798
799        // Force GC should process all chains regardless of limit
800        assert!(final_count <= initial_count);
801    }
802
803    #[test]
804    fn test_chain_count() {
805        let gc: EpochGC<String, i32> = EpochGC::new();
806
807        gc.insert("key1".to_string(), 1);
808        gc.insert("key2".to_string(), 2);
809        gc.insert("key3".to_string(), 3);
810
811        assert_eq!(gc.chain_count(), 3);
812    }
813
814    #[test]
815    fn test_version_at_respects_tombstone() {
816        let mut chain: VersionChain<i32> = VersionChain::new();
817
818        chain.add_version(VersionedValue::new(VersionId::new(0, 0), 100));
819        chain.add_version(VersionedValue::tombstone(VersionId::new(1, 0), 0));
820
821        // Should not return tombstone
822        assert!(chain.version_at(1).is_none());
823        // Should return the version before tombstone
824        assert_eq!(chain.version_at(0).map(|v| v.value), Some(100));
825    }
826
827    #[test]
828    fn test_gc_result_fields() {
829        let gc: EpochGC<String, i32> = EpochGC::new();
830
831        for i in 0..5 {
832            gc.insert("key".to_string(), i);
833            gc.advance_epoch();
834        }
835
836        let result = gc.try_gc();
837
838        // Verify result has reasonable values
839        assert!(result.watermark <= gc.current_epoch());
840        assert!(result.chains_scanned <= gc.chain_count() + 1);
841    }
842}