ipfrs_storage/
gc.rs

1//! Garbage Collection for block storage.
2//!
3//! Implements mark-and-sweep GC to reclaim space from unreferenced blocks.
4//! Works with the pin management system to ensure pinned blocks are retained.
5//!
6//! # Algorithm
7//!
8//! 1. **Mark Phase**: Starting from pinned root blocks, traverse all links
9//!    to mark reachable blocks.
10//! 2. **Sweep Phase**: Delete all blocks that weren't marked as reachable.
11//!
12//! # Example
13//!
14//! ```rust,ignore
15//! use ipfrs_storage::gc::{GarbageCollector, GcConfig};
16//!
17//! let gc = GarbageCollector::new(store, pin_manager, GcConfig::default());
18//! let result = gc.collect().await?;
19//! println!("Collected {} blocks, freed {} bytes", result.blocks_collected, result.bytes_freed);
20//! ```
21
22use crate::pinning::{PinManager, PinType};
23use crate::traits::BlockStore;
24use ipfrs_core::{Cid, Result};
25use std::collections::HashSet;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::Arc;
28use std::time::{Duration, Instant};
29
30/// GC configuration
31#[derive(Debug, Clone)]
32pub struct GcConfig {
33    /// Maximum blocks to collect in a single run (0 = unlimited)
34    pub max_blocks_per_run: usize,
35    /// Time limit for a single GC run (None = unlimited)
36    pub time_limit: Option<Duration>,
37    /// Whether to run incrementally (pause between batches)
38    pub incremental: bool,
39    /// Batch size for incremental GC
40    pub batch_size: usize,
41    /// Delay between incremental batches
42    pub batch_delay: Duration,
43    /// Whether to perform a dry run (don't actually delete)
44    pub dry_run: bool,
45}
46
47impl Default for GcConfig {
48    fn default() -> Self {
49        Self {
50            max_blocks_per_run: 0,                  // Unlimited
51            time_limit: None,                       // No limit
52            incremental: false,                     // Full GC by default
53            batch_size: 1000,                       // Process 1000 blocks per batch
54            batch_delay: Duration::from_millis(10), // 10ms between batches
55            dry_run: false,
56        }
57    }
58}
59
60impl GcConfig {
61    /// Create a configuration for incremental GC
62    pub fn incremental() -> Self {
63        Self {
64            incremental: true,
65            ..Default::default()
66        }
67    }
68
69    /// Create a configuration for dry run (no actual deletion)
70    pub fn dry_run() -> Self {
71        Self {
72            dry_run: true,
73            ..Default::default()
74        }
75    }
76
77    /// Set max blocks per run
78    pub fn with_max_blocks(mut self, max: usize) -> Self {
79        self.max_blocks_per_run = max;
80        self
81    }
82
83    /// Set time limit
84    pub fn with_time_limit(mut self, duration: Duration) -> Self {
85        self.time_limit = Some(duration);
86        self
87    }
88}
89
90/// Result of a GC run
91#[derive(Debug, Clone, Default)]
92pub struct GcResult {
93    /// Number of blocks collected (deleted)
94    pub blocks_collected: u64,
95    /// Bytes freed
96    pub bytes_freed: u64,
97    /// Number of blocks marked as reachable
98    pub blocks_marked: u64,
99    /// Number of blocks scanned
100    pub blocks_scanned: u64,
101    /// Duration of the GC run
102    pub duration: Duration,
103    /// Whether GC was interrupted (time limit, etc.)
104    pub interrupted: bool,
105    /// Errors encountered during GC (non-fatal)
106    pub errors: Vec<String>,
107}
108
109/// GC statistics tracking
110#[derive(Debug, Default)]
111pub struct GcStats {
112    /// Total GC runs
113    pub total_runs: AtomicU64,
114    /// Total blocks collected across all runs
115    pub total_blocks_collected: AtomicU64,
116    /// Total bytes freed across all runs
117    pub total_bytes_freed: AtomicU64,
118    /// Last GC run time (as unix timestamp)
119    pub last_run_timestamp: AtomicU64,
120}
121
122impl GcStats {
123    /// Record a GC run
124    pub fn record_run(&self, result: &GcResult) {
125        self.total_runs.fetch_add(1, Ordering::Relaxed);
126        self.total_blocks_collected
127            .fetch_add(result.blocks_collected, Ordering::Relaxed);
128        self.total_bytes_freed
129            .fetch_add(result.bytes_freed, Ordering::Relaxed);
130        self.last_run_timestamp.store(
131            std::time::SystemTime::now()
132                .duration_since(std::time::UNIX_EPOCH)
133                .unwrap_or_default()
134                .as_secs(),
135            Ordering::Relaxed,
136        );
137    }
138
139    /// Get a snapshot of statistics
140    pub fn snapshot(&self) -> GcStatsSnapshot {
141        GcStatsSnapshot {
142            total_runs: self.total_runs.load(Ordering::Relaxed),
143            total_blocks_collected: self.total_blocks_collected.load(Ordering::Relaxed),
144            total_bytes_freed: self.total_bytes_freed.load(Ordering::Relaxed),
145            last_run_timestamp: self.last_run_timestamp.load(Ordering::Relaxed),
146        }
147    }
148}
149
150/// Snapshot of GC statistics
151#[derive(Debug, Clone)]
152pub struct GcStatsSnapshot {
153    pub total_runs: u64,
154    pub total_blocks_collected: u64,
155    pub total_bytes_freed: u64,
156    pub last_run_timestamp: u64,
157}
158
159/// Link resolver function type
160pub type LinkResolver = Arc<dyn Fn(&Cid) -> Result<Vec<Cid>> + Send + Sync>;
161
162/// Garbage collector for block storage
163pub struct GarbageCollector<S: BlockStore> {
164    /// The block store to collect from
165    store: Arc<S>,
166    /// Pin manager for determining roots
167    pin_manager: Arc<PinManager>,
168    /// Link resolver for traversing DAG structure
169    link_resolver: LinkResolver,
170    /// Configuration
171    config: GcConfig,
172    /// Statistics
173    stats: GcStats,
174    /// Cancel flag for stopping GC
175    cancel: AtomicBool,
176}
177
178impl<S: BlockStore> GarbageCollector<S> {
179    /// Create a new garbage collector
180    ///
181    /// # Arguments
182    /// * `store` - The block store to collect from
183    /// * `pin_manager` - Pin manager for determining root blocks
184    /// * `link_resolver` - Function to get links from a block
185    /// * `config` - GC configuration
186    pub fn new(
187        store: Arc<S>,
188        pin_manager: Arc<PinManager>,
189        link_resolver: LinkResolver,
190        config: GcConfig,
191    ) -> Self {
192        Self {
193            store,
194            pin_manager,
195            link_resolver,
196            config,
197            stats: GcStats::default(),
198            cancel: AtomicBool::new(false),
199        }
200    }
201
202    /// Create with a no-op link resolver (for flat storage without DAG)
203    pub fn new_flat(store: Arc<S>, pin_manager: Arc<PinManager>, config: GcConfig) -> Self {
204        let link_resolver: LinkResolver = Arc::new(|_| Ok(Vec::new()));
205        Self::new(store, pin_manager, link_resolver, config)
206    }
207
208    /// Request cancellation of the current GC run
209    pub fn cancel(&self) {
210        self.cancel.store(true, Ordering::SeqCst);
211    }
212
213    /// Reset cancel flag
214    pub fn reset_cancel(&self) {
215        self.cancel.store(false, Ordering::SeqCst);
216    }
217
218    /// Check if GC has been cancelled
219    fn is_cancelled(&self) -> bool {
220        self.cancel.load(Ordering::SeqCst)
221    }
222
223    /// Get GC statistics
224    pub fn stats(&self) -> GcStatsSnapshot {
225        self.stats.snapshot()
226    }
227
228    /// Run garbage collection
229    pub async fn collect(&self) -> Result<GcResult> {
230        self.reset_cancel();
231        let start_time = Instant::now();
232        let mut result = GcResult::default();
233
234        // Phase 1: Mark - find all reachable blocks
235        let marked = self.mark_phase(&mut result).await?;
236
237        // Check for cancellation or time limit
238        if self.should_stop(start_time, &result) {
239            result.interrupted = true;
240            result.duration = start_time.elapsed();
241            self.stats.record_run(&result);
242            return Ok(result);
243        }
244
245        // Phase 2: Sweep - delete unreachable blocks
246        self.sweep_phase(&marked, &mut result).await?;
247
248        result.duration = start_time.elapsed();
249        self.stats.record_run(&result);
250        Ok(result)
251    }
252
253    /// Mark phase: traverse from roots to find all reachable blocks
254    #[allow(clippy::unused_async)]
255    async fn mark_phase(&self, result: &mut GcResult) -> Result<HashSet<Vec<u8>>> {
256        let mut marked: HashSet<Vec<u8>> = HashSet::new();
257        let mut to_process: Vec<Cid> = Vec::new();
258
259        // Get all pinned CIDs as roots
260        let pins = self.pin_manager.list_pins()?;
261        for (cid, info) in pins {
262            // Direct and recursive pins are roots
263            if info.pin_type == PinType::Direct || info.pin_type == PinType::Recursive {
264                to_process.push(cid);
265            }
266            // All pinned blocks (including indirect) should be marked
267            marked.insert(cid.to_bytes());
268        }
269
270        // Traverse from roots
271        while let Some(cid) = to_process.pop() {
272            if self.is_cancelled() {
273                break;
274            }
275
276            // Get links from this block
277            match (self.link_resolver)(&cid) {
278                Ok(links) => {
279                    for link in links {
280                        let link_bytes = link.to_bytes();
281                        if marked.insert(link_bytes) {
282                            // Newly marked, add to process queue
283                            to_process.push(link);
284                        }
285                    }
286                }
287                Err(e) => {
288                    result
289                        .errors
290                        .push(format!("Error resolving links for {cid}: {e}"));
291                }
292            }
293        }
294
295        result.blocks_marked = marked.len() as u64;
296        Ok(marked)
297    }
298
299    /// Sweep phase: delete unreachable blocks
300    async fn sweep_phase(&self, marked: &HashSet<Vec<u8>>, result: &mut GcResult) -> Result<()> {
301        let start_time = Instant::now();
302        let all_cids = self.store.list_cids()?;
303        result.blocks_scanned = all_cids.len() as u64;
304
305        let mut to_delete = Vec::new();
306        let mut batch_count = 0;
307
308        for cid in all_cids {
309            if self.is_cancelled() || self.should_stop(start_time, result) {
310                result.interrupted = true;
311                break;
312            }
313
314            // Check max blocks limit
315            if self.config.max_blocks_per_run > 0
316                && result.blocks_collected >= self.config.max_blocks_per_run as u64
317            {
318                result.interrupted = true;
319                break;
320            }
321
322            let cid_bytes = cid.to_bytes();
323            if !marked.contains(&cid_bytes) {
324                // Block is not marked - collect it
325                if self.config.dry_run {
326                    // In dry run mode, just count
327                    if let Ok(Some(block)) = self.store.get(&cid).await {
328                        result.bytes_freed += block.size();
329                    }
330                    result.blocks_collected += 1;
331                } else {
332                    to_delete.push(cid);
333                }
334
335                batch_count += 1;
336
337                // Incremental GC: process in batches
338                if self.config.incremental && batch_count >= self.config.batch_size {
339                    if !self.config.dry_run && !to_delete.is_empty() {
340                        self.delete_batch(&to_delete, result).await?;
341                        to_delete.clear();
342                    }
343                    batch_count = 0;
344                    tokio::time::sleep(self.config.batch_delay).await;
345                }
346            }
347        }
348
349        // Delete remaining blocks
350        if !self.config.dry_run && !to_delete.is_empty() {
351            self.delete_batch(&to_delete, result).await?;
352        }
353
354        Ok(())
355    }
356
357    /// Delete a batch of blocks
358    async fn delete_batch(&self, cids: &[Cid], result: &mut GcResult) -> Result<()> {
359        for cid in cids {
360            // Get size before deletion
361            if let Ok(Some(block)) = self.store.get(cid).await {
362                result.bytes_freed += block.size();
363            }
364
365            // Delete the block
366            match self.store.delete(cid).await {
367                Ok(()) => {
368                    result.blocks_collected += 1;
369                }
370                Err(e) => {
371                    result
372                        .errors
373                        .push(format!("Error deleting block {cid}: {e}"));
374                }
375            }
376        }
377        Ok(())
378    }
379
380    /// Check if GC should stop
381    fn should_stop(&self, start_time: Instant, _result: &GcResult) -> bool {
382        if self.is_cancelled() {
383            return true;
384        }
385
386        if let Some(limit) = self.config.time_limit {
387            if start_time.elapsed() > limit {
388                return true;
389            }
390        }
391
392        false
393    }
394}
395
396/// GC policy for automatic garbage collection
397#[derive(Debug, Clone, Default)]
398pub enum GcPolicy {
399    /// Manual GC only
400    #[default]
401    Manual,
402    /// Time-based: run every N seconds
403    TimeBased { interval_secs: u64 },
404    /// Space-based: run when disk usage exceeds threshold
405    SpaceBased { threshold_percent: f64 },
406    /// Combined: run when either condition is met
407    Combined {
408        interval_secs: u64,
409        threshold_percent: f64,
410    },
411}
412
413/// Automatic GC scheduler
414pub struct GcScheduler<S: BlockStore + 'static> {
415    gc: Arc<GarbageCollector<S>>,
416    policy: GcPolicy,
417    running: AtomicBool,
418}
419
420impl<S: BlockStore + 'static> GcScheduler<S> {
421    /// Create a new GC scheduler
422    pub fn new(gc: Arc<GarbageCollector<S>>, policy: GcPolicy) -> Self {
423        Self {
424            gc,
425            policy,
426            running: AtomicBool::new(false),
427        }
428    }
429
430    /// Check if GC should run based on policy
431    pub fn should_run(&self) -> bool {
432        match &self.policy {
433            GcPolicy::Manual => false,
434            GcPolicy::TimeBased { interval_secs } => {
435                let stats = self.gc.stats();
436                let now = std::time::SystemTime::now()
437                    .duration_since(std::time::UNIX_EPOCH)
438                    .unwrap_or_default()
439                    .as_secs();
440                now.saturating_sub(stats.last_run_timestamp) >= *interval_secs
441            }
442            GcPolicy::SpaceBased { .. } => {
443                // Would need disk usage info - not implemented yet
444                false
445            }
446            GcPolicy::Combined { interval_secs, .. } => {
447                let stats = self.gc.stats();
448                let now = std::time::SystemTime::now()
449                    .duration_since(std::time::UNIX_EPOCH)
450                    .unwrap_or_default()
451                    .as_secs();
452                now.saturating_sub(stats.last_run_timestamp) >= *interval_secs
453            }
454        }
455    }
456
457    /// Run GC if policy conditions are met
458    pub async fn maybe_run(&self) -> Option<GcResult> {
459        if !self.should_run() {
460            return None;
461        }
462
463        // Prevent concurrent runs
464        if self
465            .running
466            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
467            .is_err()
468        {
469            return None;
470        }
471
472        let result = self.gc.collect().await.ok();
473        self.running.store(false, Ordering::SeqCst);
474        result
475    }
476
477    /// Get reference to the garbage collector
478    pub fn gc(&self) -> &GarbageCollector<S> {
479        &self.gc
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use super::*;
486    use crate::blockstore::{BlockStoreConfig, SledBlockStore};
487    use bytes::Bytes;
488    use ipfrs_core::Block;
489    use std::path::PathBuf;
490
491    fn make_test_block(data: &[u8]) -> Block {
492        Block::new(Bytes::copy_from_slice(data)).unwrap()
493    }
494
495    #[tokio::test]
496    async fn test_gc_collect_unreachable() {
497        let config = BlockStoreConfig {
498            path: PathBuf::from("/tmp/ipfrs-test-gc"),
499            cache_size: 1024 * 1024,
500        };
501        let _ = std::fs::remove_dir_all(&config.path);
502
503        let store = Arc::new(SledBlockStore::new(config).unwrap());
504        let pin_manager = Arc::new(PinManager::new());
505
506        // Add some blocks
507        let block1 = make_test_block(b"block1");
508        let block2 = make_test_block(b"block2");
509        let block3 = make_test_block(b"block3");
510
511        store.put(&block1).await.unwrap();
512        store.put(&block2).await.unwrap();
513        store.put(&block3).await.unwrap();
514
515        // Pin only block1
516        pin_manager.pin(block1.cid()).unwrap();
517
518        // Create GC
519        let gc = GarbageCollector::new_flat(store.clone(), pin_manager, GcConfig::default());
520
521        // Run GC
522        let result = gc.collect().await.unwrap();
523
524        // Should have collected 2 blocks (block2 and block3)
525        assert_eq!(result.blocks_collected, 2);
526        assert_eq!(result.blocks_marked, 1);
527
528        // Verify block1 still exists
529        assert!(store.has(block1.cid()).await.unwrap());
530        // Verify block2 and block3 are gone
531        assert!(!store.has(block2.cid()).await.unwrap());
532        assert!(!store.has(block3.cid()).await.unwrap());
533    }
534
535    #[tokio::test]
536    async fn test_gc_dry_run() {
537        let config = BlockStoreConfig {
538            path: PathBuf::from("/tmp/ipfrs-test-gc-dry"),
539            cache_size: 1024 * 1024,
540        };
541        let _ = std::fs::remove_dir_all(&config.path);
542
543        let store = Arc::new(SledBlockStore::new(config).unwrap());
544        let pin_manager = Arc::new(PinManager::new());
545
546        // Add some blocks
547        let block1 = make_test_block(b"block1");
548        let block2 = make_test_block(b"block2");
549
550        store.put(&block1).await.unwrap();
551        store.put(&block2).await.unwrap();
552
553        // Pin only block1
554        pin_manager.pin(block1.cid()).unwrap();
555
556        // Create GC with dry run
557        let gc = GarbageCollector::new_flat(store.clone(), pin_manager, GcConfig::dry_run());
558
559        // Run GC
560        let result = gc.collect().await.unwrap();
561
562        // Should report 1 block to collect
563        assert_eq!(result.blocks_collected, 1);
564
565        // But block2 should still exist (dry run)
566        assert!(store.has(block2.cid()).await.unwrap());
567    }
568
569    #[test]
570    fn test_gc_config() {
571        let config = GcConfig::default();
572        assert!(!config.dry_run);
573        assert!(!config.incremental);
574
575        let config = GcConfig::incremental();
576        assert!(config.incremental);
577
578        let config = GcConfig::dry_run().with_max_blocks(100);
579        assert!(config.dry_run);
580        assert_eq!(config.max_blocks_per_run, 100);
581    }
582
583    #[test]
584    fn test_gc_stats() {
585        let stats = GcStats::default();
586        let result = GcResult {
587            blocks_collected: 10,
588            bytes_freed: 1024,
589            ..Default::default()
590        };
591
592        stats.record_run(&result);
593
594        let snapshot = stats.snapshot();
595        assert_eq!(snapshot.total_runs, 1);
596        assert_eq!(snapshot.total_blocks_collected, 10);
597        assert_eq!(snapshot.total_bytes_freed, 1024);
598    }
599}