Skip to main content

featherdb_mvcc/
gc.rs

1//! Garbage collection metrics and adaptive scheduling for MVCC
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::{Duration, Instant};
5
6/// Configuration for garbage collection
7#[derive(Debug, Clone)]
8pub struct GcConfig {
9    /// Minimum interval between GC runs (milliseconds)
10    pub min_interval_ms: u64,
11    /// Maximum versions before triggering GC
12    pub max_versions_before_gc: u64,
13    /// Enable adaptive GC scheduling
14    pub adaptive: bool,
15}
16
17impl Default for GcConfig {
18    fn default() -> Self {
19        Self {
20            min_interval_ms: 1000, // 1 second minimum
21            max_versions_before_gc: 10000,
22            adaptive: true,
23        }
24    }
25}
26
27/// GC metrics for monitoring and adaptive scheduling
28#[derive(Debug, Default)]
29pub struct GcMetrics {
30    /// Total versions created
31    pub versions_created: AtomicU64,
32    /// Total versions cleaned
33    pub versions_cleaned: AtomicU64,
34    /// Number of GC runs
35    pub gc_runs: AtomicU64,
36    /// Total time spent in GC (microseconds)
37    pub gc_time_us: AtomicU64,
38    /// Versions pending cleanup
39    pub versions_pending: AtomicU64,
40}
41
42impl GcMetrics {
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    pub fn record_version_created(&self) {
48        self.versions_created.fetch_add(1, Ordering::Relaxed);
49        self.versions_pending.fetch_add(1, Ordering::Relaxed);
50    }
51
52    pub fn record_versions_cleaned(&self, count: u64) {
53        self.versions_cleaned.fetch_add(count, Ordering::Relaxed);
54        self.versions_pending.fetch_sub(
55            count.min(self.versions_pending.load(Ordering::Relaxed)),
56            Ordering::Relaxed,
57        );
58    }
59
60    pub fn record_gc_run(&self, duration: Duration) {
61        self.gc_runs.fetch_add(1, Ordering::Relaxed);
62        self.gc_time_us
63            .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
64    }
65
66    /// Get a snapshot of metrics
67    pub fn snapshot(&self) -> GcMetricsSnapshot {
68        GcMetricsSnapshot {
69            versions_created: self.versions_created.load(Ordering::Relaxed),
70            versions_cleaned: self.versions_cleaned.load(Ordering::Relaxed),
71            gc_runs: self.gc_runs.load(Ordering::Relaxed),
72            gc_time_us: self.gc_time_us.load(Ordering::Relaxed),
73            versions_pending: self.versions_pending.load(Ordering::Relaxed),
74        }
75    }
76}
77
78#[derive(Debug, Clone)]
79pub struct GcMetricsSnapshot {
80    pub versions_created: u64,
81    pub versions_cleaned: u64,
82    pub gc_runs: u64,
83    pub gc_time_us: u64,
84    pub versions_pending: u64,
85}
86
87/// Adaptive GC scheduler
88pub struct GcScheduler {
89    config: GcConfig,
90    last_gc: Instant,
91    metrics: GcMetrics,
92}
93
94impl GcScheduler {
95    pub fn new(config: GcConfig) -> Self {
96        Self {
97            config,
98            last_gc: Instant::now(),
99            metrics: GcMetrics::new(),
100        }
101    }
102
103    /// Check if GC should run based on adaptive heuristics
104    pub fn should_run_gc(&self) -> bool {
105        let elapsed = self.last_gc.elapsed();
106        let min_interval = Duration::from_millis(self.config.min_interval_ms);
107
108        // Always respect minimum interval
109        if elapsed < min_interval {
110            return false;
111        }
112
113        // Check if too many versions pending
114        let pending = self.metrics.versions_pending.load(Ordering::Relaxed);
115        if pending >= self.config.max_versions_before_gc {
116            return true;
117        }
118
119        // Adaptive: run more frequently if version creation rate is high
120        if self.config.adaptive {
121            let snapshot = self.metrics.snapshot();
122            if snapshot.gc_runs > 0 {
123                let avg_cleaned_per_run = snapshot.versions_cleaned / snapshot.gc_runs;
124                // If pending is > 2x average, run GC
125                if pending > avg_cleaned_per_run * 2 {
126                    return true;
127                }
128            }
129        }
130
131        // Default: run if minimum interval passed
132        elapsed >= min_interval
133    }
134
135    /// Record that GC has run
136    pub fn record_gc_run(&mut self, duration: Duration, versions_cleaned: u64) {
137        self.last_gc = Instant::now();
138        self.metrics.record_gc_run(duration);
139        self.metrics.record_versions_cleaned(versions_cleaned);
140    }
141
142    /// Record a version was created
143    pub fn record_version_created(&self) {
144        self.metrics.record_version_created();
145    }
146
147    /// Get current metrics
148    pub fn metrics(&self) -> GcMetricsSnapshot {
149        self.metrics.snapshot()
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156    use std::thread;
157
158    #[test]
159    fn test_gc_config_default() {
160        let config = GcConfig::default();
161        assert_eq!(config.min_interval_ms, 1000);
162        assert_eq!(config.max_versions_before_gc, 10000);
163        assert!(config.adaptive);
164    }
165
166    #[test]
167    fn test_gc_metrics_creation() {
168        let metrics = GcMetrics::new();
169        metrics.record_version_created();
170        metrics.record_version_created();
171
172        let snapshot = metrics.snapshot();
173        assert_eq!(snapshot.versions_created, 2);
174        assert_eq!(snapshot.versions_pending, 2);
175        assert_eq!(snapshot.versions_cleaned, 0);
176    }
177
178    #[test]
179    fn test_gc_metrics_cleanup() {
180        let metrics = GcMetrics::new();
181
182        // Create 10 versions
183        for _ in 0..10 {
184            metrics.record_version_created();
185        }
186
187        // Clean 5 versions
188        metrics.record_versions_cleaned(5);
189
190        let snapshot = metrics.snapshot();
191        assert_eq!(snapshot.versions_created, 10);
192        assert_eq!(snapshot.versions_cleaned, 5);
193        assert_eq!(snapshot.versions_pending, 5);
194    }
195
196    #[test]
197    fn test_gc_metrics_gc_run() {
198        let metrics = GcMetrics::new();
199
200        let duration = Duration::from_millis(100);
201        metrics.record_gc_run(duration);
202
203        let snapshot = metrics.snapshot();
204        assert_eq!(snapshot.gc_runs, 1);
205        assert!(snapshot.gc_time_us >= 100_000); // At least 100ms in microseconds
206    }
207
208    #[test]
209    fn test_gc_scheduler_min_interval() {
210        let config = GcConfig {
211            min_interval_ms: 100,
212            max_versions_before_gc: 10000,
213            adaptive: false,
214        };
215
216        let scheduler = GcScheduler::new(config);
217
218        // Should not run immediately
219        assert!(!scheduler.should_run_gc());
220
221        // Wait for interval
222        thread::sleep(Duration::from_millis(150));
223
224        // Should run now
225        assert!(scheduler.should_run_gc());
226    }
227
228    #[test]
229    fn test_gc_scheduler_max_versions() {
230        let config = GcConfig {
231            min_interval_ms: 10, // Low interval so we can test max_versions trigger
232            max_versions_before_gc: 5,
233            adaptive: false,
234        };
235
236        let scheduler = GcScheduler::new(config);
237
238        // Create versions
239        for _ in 0..6 {
240            scheduler.record_version_created();
241        }
242
243        // Should trigger GC despite short elapsed time (but still respects min interval)
244        thread::sleep(Duration::from_millis(1));
245        // Won't trigger because min_interval not elapsed
246        assert!(!scheduler.should_run_gc());
247
248        // Wait for min interval
249        thread::sleep(Duration::from_millis(15));
250        // Now should trigger because versions exceeded max_versions_before_gc
251        assert!(scheduler.should_run_gc());
252    }
253
254    #[test]
255    fn test_gc_scheduler_adaptive() {
256        let config = GcConfig {
257            min_interval_ms: 10,
258            max_versions_before_gc: 100,
259            adaptive: true,
260        };
261
262        let mut scheduler = GcScheduler::new(config);
263
264        // Simulate first GC run that cleaned 10 versions
265        thread::sleep(Duration::from_millis(20));
266        scheduler.record_gc_run(Duration::from_millis(5), 10);
267
268        // Create 25 versions (> 2x average of 10)
269        for _ in 0..25 {
270            scheduler.record_version_created();
271        }
272
273        // Wait for min interval
274        thread::sleep(Duration::from_millis(20));
275
276        // Adaptive mode should trigger because pending > 2x average
277        assert!(scheduler.should_run_gc());
278    }
279
280    #[test]
281    fn test_gc_scheduler_record_operations() {
282        let config = GcConfig::default();
283        let mut scheduler = GcScheduler::new(config);
284
285        // Record versions created
286        scheduler.record_version_created();
287        scheduler.record_version_created();
288
289        let metrics = scheduler.metrics();
290        assert_eq!(metrics.versions_created, 2);
291        assert_eq!(metrics.versions_pending, 2);
292
293        // Record GC run
294        scheduler.record_gc_run(Duration::from_millis(50), 1);
295
296        let metrics = scheduler.metrics();
297        assert_eq!(metrics.gc_runs, 1);
298        assert_eq!(metrics.versions_cleaned, 1);
299        assert_eq!(metrics.versions_pending, 1);
300    }
301
302    #[test]
303    fn test_gc_metrics_pending_doesnt_underflow() {
304        let metrics = GcMetrics::new();
305
306        // Clean more than created (shouldn't underflow)
307        metrics.record_versions_cleaned(100);
308
309        let snapshot = metrics.snapshot();
310        assert_eq!(snapshot.versions_pending, 0);
311    }
312}