1use crate::pinning::{PinRecommendation, PinningOptimizer};
7use crate::storage::{ChunkStorage, StorageError};
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::RwLock;
11use tracing::{debug, info, warn};
12
13#[derive(Debug, Clone)]
15pub struct GarbageCollectionConfig {
16 pub gc_interval: Duration,
18 pub max_unpin_per_run: usize,
20 pub aggressive_threshold: f64,
22 pub target_usage: f64,
24 pub auto_gc_enabled: bool,
26}
27
28impl Default for GarbageCollectionConfig {
29 fn default() -> Self {
30 Self {
31 gc_interval: Duration::from_secs(3600), max_unpin_per_run: 10,
33 aggressive_threshold: 0.9, target_usage: 0.8, auto_gc_enabled: true,
36 }
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct ContentGcResult {
43 pub unpinned_count: usize,
45 pub bytes_freed: u64,
47 pub unpinned_cids: Vec<String>,
49 pub was_aggressive: bool,
51 pub errors: Vec<String>,
53}
54
55impl ContentGcResult {
56 #[must_use]
57 #[inline]
58 fn new() -> Self {
59 Self {
60 unpinned_count: 0,
61 bytes_freed: 0,
62 unpinned_cids: Vec::new(),
63 was_aggressive: false,
64 errors: Vec::new(),
65 }
66 }
67}
68
69pub struct GarbageCollector {
71 config: GarbageCollectionConfig,
72 storage: Arc<RwLock<ChunkStorage>>,
73 optimizer: Arc<RwLock<PinningOptimizer>>,
74}
75
76impl GarbageCollector {
77 #[must_use]
79 #[inline]
80 pub fn new(
81 config: GarbageCollectionConfig,
82 storage: Arc<RwLock<ChunkStorage>>,
83 optimizer: Arc<RwLock<PinningOptimizer>>,
84 ) -> Self {
85 Self {
86 config,
87 storage,
88 optimizer,
89 }
90 }
91
92 #[must_use]
94 pub async fn run_gc(&self) -> ContentGcResult {
95 let mut result = ContentGcResult::new();
96
97 let storage = self.storage.read().await;
99 let stats = storage.stats();
100 drop(storage);
101
102 let usage_ratio = stats.usage_percent / 100.0;
103 let is_aggressive = usage_ratio >= self.config.aggressive_threshold;
104 result.was_aggressive = is_aggressive;
105
106 if is_aggressive {
107 info!(
108 "Storage at {:.1}%, triggering aggressive GC (target: {:.1}%)",
109 stats.usage_percent,
110 self.config.target_usage * 100.0
111 );
112 }
113
114 let optimizer = self.optimizer.read().await;
116 let recommendations = optimizer.get_recommendations();
117 drop(optimizer);
118
119 let unpin_candidates: Vec<_> = recommendations
121 .iter()
122 .filter(|r| r.recommendation == PinRecommendation::Unpin)
123 .collect();
124
125 if unpin_candidates.is_empty() {
126 debug!("No content marked for unpinning");
127 return result;
128 }
129
130 let target_count = if is_aggressive {
132 let target_bytes = (stats.max_bytes as f64 * self.config.target_usage) as u64;
134 let excess_bytes = stats.used_bytes.saturating_sub(target_bytes);
135 let avg_size = stats.used_bytes / stats.pinned_content_count.max(1) as u64;
136
137 ((excess_bytes / avg_size.max(1)) as usize)
138 .max(1)
139 .min(unpin_candidates.len())
140 } else {
141 self.config.max_unpin_per_run.min(unpin_candidates.len())
142 };
143
144 info!(
145 "GC: {} candidates, unpinning up to {}",
146 unpin_candidates.len(),
147 target_count
148 );
149
150 for (i, scored) in unpin_candidates.iter().take(target_count).enumerate() {
152 match self.unpin_content(&scored.cid, scored.size_bytes).await {
153 Ok(freed) => {
154 result.unpinned_count += 1;
155 result.bytes_freed += freed;
156 result.unpinned_cids.push(scored.cid.clone());
157 debug!(
158 "GC unpinned {} ({}/{}): {} bytes freed",
159 scored.cid,
160 i + 1,
161 target_count,
162 freed
163 );
164 }
165 Err(e) => {
166 warn!("GC failed to unpin {}: {}", scored.cid, e);
167 result.errors.push(format!("{}: {}", scored.cid, e));
168 }
169 }
170 }
171
172 info!(
173 "GC completed: {} items unpinned, {} bytes freed",
174 result.unpinned_count, result.bytes_freed
175 );
176
177 result
178 }
179
180 async fn unpin_content(&self, cid: &str, expected_size: u64) -> Result<u64, StorageError> {
182 {
184 let mut optimizer = self.optimizer.write().await;
185 optimizer.unregister_content(cid);
186 }
187
188 {
190 let mut storage = self.storage.write().await;
191 storage.unpin_content(cid).await?;
192 }
193
194 Ok(expected_size)
195 }
196
197 pub async fn start_auto_gc(self: Arc<Self>, shutdown: tokio::sync::watch::Receiver<bool>) {
199 if !self.config.auto_gc_enabled {
200 info!("Auto GC disabled");
201 return;
202 }
203
204 let mut interval = tokio::time::interval(self.config.gc_interval);
205 let mut shutdown = shutdown;
206
207 info!(
208 "Starting auto GC with interval: {:?}",
209 self.config.gc_interval
210 );
211
212 loop {
213 tokio::select! {
214 _ = interval.tick() => {
215 let result = self.run_gc().await;
216 if !result.errors.is_empty() {
217 warn!("GC completed with {} errors", result.errors.len());
218 }
219 }
220 _ = shutdown.changed() => {
221 if *shutdown.borrow() {
222 info!("GC shutdown signal received");
223 break;
224 }
225 }
226 }
227 }
228 }
229
230 #[must_use]
232 pub async fn stats(&self) -> GcStats {
233 let storage = self.storage.read().await;
234 let storage_stats = storage.stats();
235 drop(storage);
236
237 let optimizer = self.optimizer.read().await;
238 let recommendations = optimizer.get_recommendations();
239 drop(optimizer);
240
241 let unpin_candidates = recommendations
242 .iter()
243 .filter(|r| r.recommendation == PinRecommendation::Unpin)
244 .count();
245
246 let bytes_reclaimable: u64 = recommendations
247 .iter()
248 .filter(|r| r.recommendation == PinRecommendation::Unpin)
249 .map(|r| r.size_bytes)
250 .sum();
251
252 GcStats {
253 storage_used_bytes: storage_stats.used_bytes,
254 storage_max_bytes: storage_stats.max_bytes,
255 storage_usage_percent: storage_stats.usage_percent,
256 unpin_candidates,
257 bytes_reclaimable,
258 is_aggressive_threshold: storage_stats.usage_percent
259 >= self.config.aggressive_threshold * 100.0,
260 }
261 }
262}
263
264#[derive(Debug, Clone)]
266pub struct GcStats {
267 pub storage_used_bytes: u64,
269 pub storage_max_bytes: u64,
271 pub storage_usage_percent: f64,
273 pub unpin_candidates: usize,
275 pub bytes_reclaimable: u64,
277 pub is_aggressive_threshold: bool,
279}
280
281#[must_use]
283pub async fn run_gc_once(
284 storage: Arc<RwLock<ChunkStorage>>,
285 optimizer: Arc<RwLock<PinningOptimizer>>,
286) -> ContentGcResult {
287 let gc = GarbageCollector::new(GarbageCollectionConfig::default(), storage, optimizer);
288 gc.run_gc().await
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294 use crate::pinning::PinningConfig;
295 use std::time::Duration;
296 use tempfile::tempdir;
297
298 #[tokio::test]
299 async fn test_gc_config_defaults() {
300 let config = GarbageCollectionConfig::default();
301 assert!(config.auto_gc_enabled);
302 assert_eq!(config.gc_interval, Duration::from_secs(3600));
303 assert_eq!(config.aggressive_threshold, 0.9);
304 }
305
306 #[tokio::test]
307 async fn test_gc_result_new() {
308 let result = ContentGcResult::new();
309 assert_eq!(result.unpinned_count, 0);
310 assert_eq!(result.bytes_freed, 0);
311 assert!(result.unpinned_cids.is_empty());
312 assert!(!result.was_aggressive);
313 }
314
315 #[tokio::test]
316 async fn test_gc_no_candidates() {
317 let tmp = tempdir().unwrap();
318 let storage = ChunkStorage::new(tmp.path().to_path_buf(), 1024 * 1024 * 100)
319 .await
320 .unwrap();
321 let storage = Arc::new(RwLock::new(storage));
322
323 let optimizer = PinningOptimizer::new(PinningConfig::default());
324 let optimizer = Arc::new(RwLock::new(optimizer));
325
326 let gc = GarbageCollector::new(GarbageCollectionConfig::default(), storage, optimizer);
327 let result = gc.run_gc().await;
328
329 assert_eq!(result.unpinned_count, 0);
330 assert_eq!(result.bytes_freed, 0);
331 }
332}