1use crate::pinning::{PinManager, PinType};
23use crate::traits::BlockStore;
24use ipfrs_core::{Cid, Result};
25use std::collections::HashSet;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30#[derive(Debug, Clone)]
32pub struct GcConfig {
33 pub max_blocks_per_run: usize,
35 pub time_limit: Option<Duration>,
37 pub incremental: bool,
39 pub batch_size: usize,
41 pub batch_delay: Duration,
43 pub dry_run: bool,
45}
46
47impl Default for GcConfig {
48 fn default() -> Self {
49 Self {
50 max_blocks_per_run: 0, time_limit: None, incremental: false, batch_size: 1000, batch_delay: Duration::from_millis(10), dry_run: false,
56 }
57 }
58}
59
60impl GcConfig {
61 pub fn incremental() -> Self {
63 Self {
64 incremental: true,
65 ..Default::default()
66 }
67 }
68
69 pub fn dry_run() -> Self {
71 Self {
72 dry_run: true,
73 ..Default::default()
74 }
75 }
76
77 pub fn with_max_blocks(mut self, max: usize) -> Self {
79 self.max_blocks_per_run = max;
80 self
81 }
82
83 pub fn with_time_limit(mut self, duration: Duration) -> Self {
85 self.time_limit = Some(duration);
86 self
87 }
88}
89
90#[derive(Debug, Clone, Default)]
92pub struct GcResult {
93 pub blocks_collected: u64,
95 pub bytes_freed: u64,
97 pub blocks_marked: u64,
99 pub blocks_scanned: u64,
101 pub duration: Duration,
103 pub interrupted: bool,
105 pub errors: Vec<String>,
107}
108
109#[derive(Debug, Default)]
111pub struct GcStats {
112 pub total_runs: AtomicU64,
114 pub total_blocks_collected: AtomicU64,
116 pub total_bytes_freed: AtomicU64,
118 pub last_run_timestamp: AtomicU64,
120}
121
122impl GcStats {
123 pub fn record_run(&self, result: &GcResult) {
125 self.total_runs.fetch_add(1, Ordering::Relaxed);
126 self.total_blocks_collected
127 .fetch_add(result.blocks_collected, Ordering::Relaxed);
128 self.total_bytes_freed
129 .fetch_add(result.bytes_freed, Ordering::Relaxed);
130 self.last_run_timestamp.store(
131 std::time::SystemTime::now()
132 .duration_since(std::time::UNIX_EPOCH)
133 .unwrap_or_default()
134 .as_secs(),
135 Ordering::Relaxed,
136 );
137 }
138
139 pub fn snapshot(&self) -> GcStatsSnapshot {
141 GcStatsSnapshot {
142 total_runs: self.total_runs.load(Ordering::Relaxed),
143 total_blocks_collected: self.total_blocks_collected.load(Ordering::Relaxed),
144 total_bytes_freed: self.total_bytes_freed.load(Ordering::Relaxed),
145 last_run_timestamp: self.last_run_timestamp.load(Ordering::Relaxed),
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct GcStatsSnapshot {
153 pub total_runs: u64,
154 pub total_blocks_collected: u64,
155 pub total_bytes_freed: u64,
156 pub last_run_timestamp: u64,
157}
158
159pub type LinkResolver = Arc<dyn Fn(&Cid) -> Result<Vec<Cid>> + Send + Sync>;
161
162pub struct GarbageCollector<S: BlockStore> {
164 store: Arc<S>,
166 pin_manager: Arc<PinManager>,
168 link_resolver: LinkResolver,
170 config: GcConfig,
172 stats: GcStats,
174 cancel: AtomicBool,
176}
177
178impl<S: BlockStore> GarbageCollector<S> {
179 pub fn new(
187 store: Arc<S>,
188 pin_manager: Arc<PinManager>,
189 link_resolver: LinkResolver,
190 config: GcConfig,
191 ) -> Self {
192 Self {
193 store,
194 pin_manager,
195 link_resolver,
196 config,
197 stats: GcStats::default(),
198 cancel: AtomicBool::new(false),
199 }
200 }
201
202 pub fn new_flat(store: Arc<S>, pin_manager: Arc<PinManager>, config: GcConfig) -> Self {
204 let link_resolver: LinkResolver = Arc::new(|_| Ok(Vec::new()));
205 Self::new(store, pin_manager, link_resolver, config)
206 }
207
208 pub fn cancel(&self) {
210 self.cancel.store(true, Ordering::SeqCst);
211 }
212
213 pub fn reset_cancel(&self) {
215 self.cancel.store(false, Ordering::SeqCst);
216 }
217
218 fn is_cancelled(&self) -> bool {
220 self.cancel.load(Ordering::SeqCst)
221 }
222
223 pub fn stats(&self) -> GcStatsSnapshot {
225 self.stats.snapshot()
226 }
227
228 pub async fn collect(&self) -> Result<GcResult> {
230 self.reset_cancel();
231 let start_time = Instant::now();
232 let mut result = GcResult::default();
233
234 let marked = self.mark_phase(&mut result).await?;
236
237 if self.should_stop(start_time, &result) {
239 result.interrupted = true;
240 result.duration = start_time.elapsed();
241 self.stats.record_run(&result);
242 return Ok(result);
243 }
244
245 self.sweep_phase(&marked, &mut result).await?;
247
248 result.duration = start_time.elapsed();
249 self.stats.record_run(&result);
250 Ok(result)
251 }
252
253 #[allow(clippy::unused_async)]
255 async fn mark_phase(&self, result: &mut GcResult) -> Result<HashSet<Vec<u8>>> {
256 let mut marked: HashSet<Vec<u8>> = HashSet::new();
257 let mut to_process: Vec<Cid> = Vec::new();
258
259 let pins = self.pin_manager.list_pins()?;
261 for (cid, info) in pins {
262 if info.pin_type == PinType::Direct || info.pin_type == PinType::Recursive {
264 to_process.push(cid);
265 }
266 marked.insert(cid.to_bytes());
268 }
269
270 while let Some(cid) = to_process.pop() {
272 if self.is_cancelled() {
273 break;
274 }
275
276 match (self.link_resolver)(&cid) {
278 Ok(links) => {
279 for link in links {
280 let link_bytes = link.to_bytes();
281 if marked.insert(link_bytes) {
282 to_process.push(link);
284 }
285 }
286 }
287 Err(e) => {
288 result
289 .errors
290 .push(format!("Error resolving links for {cid}: {e}"));
291 }
292 }
293 }
294
295 result.blocks_marked = marked.len() as u64;
296 Ok(marked)
297 }
298
299 async fn sweep_phase(&self, marked: &HashSet<Vec<u8>>, result: &mut GcResult) -> Result<()> {
301 let start_time = Instant::now();
302 let all_cids = self.store.list_cids()?;
303 result.blocks_scanned = all_cids.len() as u64;
304
305 let mut to_delete = Vec::new();
306 let mut batch_count = 0;
307
308 for cid in all_cids {
309 if self.is_cancelled() || self.should_stop(start_time, result) {
310 result.interrupted = true;
311 break;
312 }
313
314 if self.config.max_blocks_per_run > 0
316 && result.blocks_collected >= self.config.max_blocks_per_run as u64
317 {
318 result.interrupted = true;
319 break;
320 }
321
322 let cid_bytes = cid.to_bytes();
323 if !marked.contains(&cid_bytes) {
324 if self.config.dry_run {
326 if let Ok(Some(block)) = self.store.get(&cid).await {
328 result.bytes_freed += block.size();
329 }
330 result.blocks_collected += 1;
331 } else {
332 to_delete.push(cid);
333 }
334
335 batch_count += 1;
336
337 if self.config.incremental && batch_count >= self.config.batch_size {
339 if !self.config.dry_run && !to_delete.is_empty() {
340 self.delete_batch(&to_delete, result).await?;
341 to_delete.clear();
342 }
343 batch_count = 0;
344 tokio::time::sleep(self.config.batch_delay).await;
345 }
346 }
347 }
348
349 if !self.config.dry_run && !to_delete.is_empty() {
351 self.delete_batch(&to_delete, result).await?;
352 }
353
354 Ok(())
355 }
356
357 async fn delete_batch(&self, cids: &[Cid], result: &mut GcResult) -> Result<()> {
359 for cid in cids {
360 if let Ok(Some(block)) = self.store.get(cid).await {
362 result.bytes_freed += block.size();
363 }
364
365 match self.store.delete(cid).await {
367 Ok(()) => {
368 result.blocks_collected += 1;
369 }
370 Err(e) => {
371 result
372 .errors
373 .push(format!("Error deleting block {cid}: {e}"));
374 }
375 }
376 }
377 Ok(())
378 }
379
380 fn should_stop(&self, start_time: Instant, _result: &GcResult) -> bool {
382 if self.is_cancelled() {
383 return true;
384 }
385
386 if let Some(limit) = self.config.time_limit {
387 if start_time.elapsed() > limit {
388 return true;
389 }
390 }
391
392 false
393 }
394}
395
396#[derive(Debug, Clone, Default)]
398pub enum GcPolicy {
399 #[default]
401 Manual,
402 TimeBased { interval_secs: u64 },
404 SpaceBased { threshold_percent: f64 },
406 Combined {
408 interval_secs: u64,
409 threshold_percent: f64,
410 },
411}
412
413pub struct GcScheduler<S: BlockStore + 'static> {
415 gc: Arc<GarbageCollector<S>>,
416 policy: GcPolicy,
417 running: AtomicBool,
418}
419
420impl<S: BlockStore + 'static> GcScheduler<S> {
421 pub fn new(gc: Arc<GarbageCollector<S>>, policy: GcPolicy) -> Self {
423 Self {
424 gc,
425 policy,
426 running: AtomicBool::new(false),
427 }
428 }
429
430 pub fn should_run(&self) -> bool {
432 match &self.policy {
433 GcPolicy::Manual => false,
434 GcPolicy::TimeBased { interval_secs } => {
435 let stats = self.gc.stats();
436 let now = std::time::SystemTime::now()
437 .duration_since(std::time::UNIX_EPOCH)
438 .unwrap_or_default()
439 .as_secs();
440 now.saturating_sub(stats.last_run_timestamp) >= *interval_secs
441 }
442 GcPolicy::SpaceBased { .. } => {
443 false
445 }
446 GcPolicy::Combined { interval_secs, .. } => {
447 let stats = self.gc.stats();
448 let now = std::time::SystemTime::now()
449 .duration_since(std::time::UNIX_EPOCH)
450 .unwrap_or_default()
451 .as_secs();
452 now.saturating_sub(stats.last_run_timestamp) >= *interval_secs
453 }
454 }
455 }
456
457 pub async fn maybe_run(&self) -> Option<GcResult> {
459 if !self.should_run() {
460 return None;
461 }
462
463 if self
465 .running
466 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
467 .is_err()
468 {
469 return None;
470 }
471
472 let result = self.gc.collect().await.ok();
473 self.running.store(false, Ordering::SeqCst);
474 result
475 }
476
477 pub fn gc(&self) -> &GarbageCollector<S> {
479 &self.gc
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486 use crate::blockstore::{BlockStoreConfig, SledBlockStore};
487 use bytes::Bytes;
488 use ipfrs_core::Block;
489 use std::path::PathBuf;
490
491 fn make_test_block(data: &[u8]) -> Block {
492 Block::new(Bytes::copy_from_slice(data)).unwrap()
493 }
494
495 #[tokio::test]
496 async fn test_gc_collect_unreachable() {
497 let config = BlockStoreConfig {
498 path: PathBuf::from("/tmp/ipfrs-test-gc"),
499 cache_size: 1024 * 1024,
500 };
501 let _ = std::fs::remove_dir_all(&config.path);
502
503 let store = Arc::new(SledBlockStore::new(config).unwrap());
504 let pin_manager = Arc::new(PinManager::new());
505
506 let block1 = make_test_block(b"block1");
508 let block2 = make_test_block(b"block2");
509 let block3 = make_test_block(b"block3");
510
511 store.put(&block1).await.unwrap();
512 store.put(&block2).await.unwrap();
513 store.put(&block3).await.unwrap();
514
515 pin_manager.pin(block1.cid()).unwrap();
517
518 let gc = GarbageCollector::new_flat(store.clone(), pin_manager, GcConfig::default());
520
521 let result = gc.collect().await.unwrap();
523
524 assert_eq!(result.blocks_collected, 2);
526 assert_eq!(result.blocks_marked, 1);
527
528 assert!(store.has(block1.cid()).await.unwrap());
530 assert!(!store.has(block2.cid()).await.unwrap());
532 assert!(!store.has(block3.cid()).await.unwrap());
533 }
534
535 #[tokio::test]
536 async fn test_gc_dry_run() {
537 let config = BlockStoreConfig {
538 path: PathBuf::from("/tmp/ipfrs-test-gc-dry"),
539 cache_size: 1024 * 1024,
540 };
541 let _ = std::fs::remove_dir_all(&config.path);
542
543 let store = Arc::new(SledBlockStore::new(config).unwrap());
544 let pin_manager = Arc::new(PinManager::new());
545
546 let block1 = make_test_block(b"block1");
548 let block2 = make_test_block(b"block2");
549
550 store.put(&block1).await.unwrap();
551 store.put(&block2).await.unwrap();
552
553 pin_manager.pin(block1.cid()).unwrap();
555
556 let gc = GarbageCollector::new_flat(store.clone(), pin_manager, GcConfig::dry_run());
558
559 let result = gc.collect().await.unwrap();
561
562 assert_eq!(result.blocks_collected, 1);
564
565 assert!(store.has(block2.cid()).await.unwrap());
567 }
568
569 #[test]
570 fn test_gc_config() {
571 let config = GcConfig::default();
572 assert!(!config.dry_run);
573 assert!(!config.incremental);
574
575 let config = GcConfig::incremental();
576 assert!(config.incremental);
577
578 let config = GcConfig::dry_run().with_max_blocks(100);
579 assert!(config.dry_run);
580 assert_eq!(config.max_blocks_per_run, 100);
581 }
582
583 #[test]
584 fn test_gc_stats() {
585 let stats = GcStats::default();
586 let result = GcResult {
587 blocks_collected: 10,
588 bytes_freed: 1024,
589 ..Default::default()
590 };
591
592 stats.record_run(&result);
593
594 let snapshot = stats.snapshot();
595 assert_eq!(snapshot.total_runs, 1);
596 assert_eq!(snapshot.total_blocks_collected, 10);
597 assert_eq!(snapshot.total_bytes_freed, 1024);
598 }
599}