1use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::{Duration, Instant};
5
6#[derive(Debug, Clone)]
8pub struct GcConfig {
9 pub min_interval_ms: u64,
11 pub max_versions_before_gc: u64,
13 pub adaptive: bool,
15}
16
17impl Default for GcConfig {
18 fn default() -> Self {
19 Self {
20 min_interval_ms: 1000, max_versions_before_gc: 10000,
22 adaptive: true,
23 }
24 }
25}
26
27#[derive(Debug, Default)]
29pub struct GcMetrics {
30 pub versions_created: AtomicU64,
32 pub versions_cleaned: AtomicU64,
34 pub gc_runs: AtomicU64,
36 pub gc_time_us: AtomicU64,
38 pub versions_pending: AtomicU64,
40}
41
42impl GcMetrics {
43 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn record_version_created(&self) {
48 self.versions_created.fetch_add(1, Ordering::Relaxed);
49 self.versions_pending.fetch_add(1, Ordering::Relaxed);
50 }
51
52 pub fn record_versions_cleaned(&self, count: u64) {
53 self.versions_cleaned.fetch_add(count, Ordering::Relaxed);
54 self.versions_pending.fetch_sub(
55 count.min(self.versions_pending.load(Ordering::Relaxed)),
56 Ordering::Relaxed,
57 );
58 }
59
60 pub fn record_gc_run(&self, duration: Duration) {
61 self.gc_runs.fetch_add(1, Ordering::Relaxed);
62 self.gc_time_us
63 .fetch_add(duration.as_micros() as u64, Ordering::Relaxed);
64 }
65
66 pub fn snapshot(&self) -> GcMetricsSnapshot {
68 GcMetricsSnapshot {
69 versions_created: self.versions_created.load(Ordering::Relaxed),
70 versions_cleaned: self.versions_cleaned.load(Ordering::Relaxed),
71 gc_runs: self.gc_runs.load(Ordering::Relaxed),
72 gc_time_us: self.gc_time_us.load(Ordering::Relaxed),
73 versions_pending: self.versions_pending.load(Ordering::Relaxed),
74 }
75 }
76}
77
78#[derive(Debug, Clone)]
79pub struct GcMetricsSnapshot {
80 pub versions_created: u64,
81 pub versions_cleaned: u64,
82 pub gc_runs: u64,
83 pub gc_time_us: u64,
84 pub versions_pending: u64,
85}
86
87pub struct GcScheduler {
89 config: GcConfig,
90 last_gc: Instant,
91 metrics: GcMetrics,
92}
93
94impl GcScheduler {
95 pub fn new(config: GcConfig) -> Self {
96 Self {
97 config,
98 last_gc: Instant::now(),
99 metrics: GcMetrics::new(),
100 }
101 }
102
103 pub fn should_run_gc(&self) -> bool {
105 let elapsed = self.last_gc.elapsed();
106 let min_interval = Duration::from_millis(self.config.min_interval_ms);
107
108 if elapsed < min_interval {
110 return false;
111 }
112
113 let pending = self.metrics.versions_pending.load(Ordering::Relaxed);
115 if pending >= self.config.max_versions_before_gc {
116 return true;
117 }
118
119 if self.config.adaptive {
121 let snapshot = self.metrics.snapshot();
122 if snapshot.gc_runs > 0 {
123 let avg_cleaned_per_run = snapshot.versions_cleaned / snapshot.gc_runs;
124 if pending > avg_cleaned_per_run * 2 {
126 return true;
127 }
128 }
129 }
130
131 elapsed >= min_interval
133 }
134
135 pub fn record_gc_run(&mut self, duration: Duration, versions_cleaned: u64) {
137 self.last_gc = Instant::now();
138 self.metrics.record_gc_run(duration);
139 self.metrics.record_versions_cleaned(versions_cleaned);
140 }
141
142 pub fn record_version_created(&self) {
144 self.metrics.record_version_created();
145 }
146
147 pub fn metrics(&self) -> GcMetricsSnapshot {
149 self.metrics.snapshot()
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156 use std::thread;
157
158 #[test]
159 fn test_gc_config_default() {
160 let config = GcConfig::default();
161 assert_eq!(config.min_interval_ms, 1000);
162 assert_eq!(config.max_versions_before_gc, 10000);
163 assert!(config.adaptive);
164 }
165
166 #[test]
167 fn test_gc_metrics_creation() {
168 let metrics = GcMetrics::new();
169 metrics.record_version_created();
170 metrics.record_version_created();
171
172 let snapshot = metrics.snapshot();
173 assert_eq!(snapshot.versions_created, 2);
174 assert_eq!(snapshot.versions_pending, 2);
175 assert_eq!(snapshot.versions_cleaned, 0);
176 }
177
178 #[test]
179 fn test_gc_metrics_cleanup() {
180 let metrics = GcMetrics::new();
181
182 for _ in 0..10 {
184 metrics.record_version_created();
185 }
186
187 metrics.record_versions_cleaned(5);
189
190 let snapshot = metrics.snapshot();
191 assert_eq!(snapshot.versions_created, 10);
192 assert_eq!(snapshot.versions_cleaned, 5);
193 assert_eq!(snapshot.versions_pending, 5);
194 }
195
196 #[test]
197 fn test_gc_metrics_gc_run() {
198 let metrics = GcMetrics::new();
199
200 let duration = Duration::from_millis(100);
201 metrics.record_gc_run(duration);
202
203 let snapshot = metrics.snapshot();
204 assert_eq!(snapshot.gc_runs, 1);
205 assert!(snapshot.gc_time_us >= 100_000); }
207
208 #[test]
209 fn test_gc_scheduler_min_interval() {
210 let config = GcConfig {
211 min_interval_ms: 100,
212 max_versions_before_gc: 10000,
213 adaptive: false,
214 };
215
216 let scheduler = GcScheduler::new(config);
217
218 assert!(!scheduler.should_run_gc());
220
221 thread::sleep(Duration::from_millis(150));
223
224 assert!(scheduler.should_run_gc());
226 }
227
228 #[test]
229 fn test_gc_scheduler_max_versions() {
230 let config = GcConfig {
231 min_interval_ms: 10, max_versions_before_gc: 5,
233 adaptive: false,
234 };
235
236 let scheduler = GcScheduler::new(config);
237
238 for _ in 0..6 {
240 scheduler.record_version_created();
241 }
242
243 thread::sleep(Duration::from_millis(1));
245 assert!(!scheduler.should_run_gc());
247
248 thread::sleep(Duration::from_millis(15));
250 assert!(scheduler.should_run_gc());
252 }
253
254 #[test]
255 fn test_gc_scheduler_adaptive() {
256 let config = GcConfig {
257 min_interval_ms: 10,
258 max_versions_before_gc: 100,
259 adaptive: true,
260 };
261
262 let mut scheduler = GcScheduler::new(config);
263
264 thread::sleep(Duration::from_millis(20));
266 scheduler.record_gc_run(Duration::from_millis(5), 10);
267
268 for _ in 0..25 {
270 scheduler.record_version_created();
271 }
272
273 thread::sleep(Duration::from_millis(20));
275
276 assert!(scheduler.should_run_gc());
278 }
279
280 #[test]
281 fn test_gc_scheduler_record_operations() {
282 let config = GcConfig::default();
283 let mut scheduler = GcScheduler::new(config);
284
285 scheduler.record_version_created();
287 scheduler.record_version_created();
288
289 let metrics = scheduler.metrics();
290 assert_eq!(metrics.versions_created, 2);
291 assert_eq!(metrics.versions_pending, 2);
292
293 scheduler.record_gc_run(Duration::from_millis(50), 1);
295
296 let metrics = scheduler.metrics();
297 assert_eq!(metrics.gc_runs, 1);
298 assert_eq!(metrics.versions_cleaned, 1);
299 assert_eq!(metrics.versions_pending, 1);
300 }
301
302 #[test]
303 fn test_gc_metrics_pending_doesnt_underflow() {
304 let metrics = GcMetrics::new();
305
306 metrics.record_versions_cleaned(100);
308
309 let snapshot = metrics.snapshot();
310 assert_eq!(snapshot.versions_pending, 0);
311 }
312}