iroh_blobs/store/
gc.rs

1use std::{collections::HashSet, pin::Pin, sync::Arc};
2
3use bao_tree::ChunkRanges;
4use genawaiter::sync::{Co, Gen};
5use n0_future::{Stream, StreamExt};
6use tracing::{debug, error, info, warn};
7
8use crate::{api::Store, Hash, HashAndFormat};
9
10/// An event related to GC
11#[derive(Debug)]
12pub enum GcMarkEvent {
13    /// A custom event (info)
14    CustomDebug(String),
15    /// A custom non critical error
16    CustomWarning(String, Option<crate::api::Error>),
17    /// An unrecoverable error during GC
18    Error(crate::api::Error),
19}
20
21/// An event related to GC
22#[derive(Debug)]
23pub enum GcSweepEvent {
24    /// A custom event (debug)
25    CustomDebug(String),
26    /// A custom non critical error
27    #[allow(dead_code)]
28    CustomWarning(String, Option<crate::api::Error>),
29    /// An unrecoverable error during GC
30    Error(crate::api::Error),
31}
32
33/// Compute the set of live hashes
34pub(super) async fn gc_mark_task(
35    store: &Store,
36    live: &mut HashSet<Hash>,
37    co: &Co<GcMarkEvent>,
38) -> crate::api::Result<()> {
39    macro_rules! trace {
40        ($($arg:tt)*) => {
41            co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
42        };
43    }
44    macro_rules! warn {
45        ($($arg:tt)*) => {
46            co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
47        };
48    }
49    let mut roots = HashSet::new();
50    trace!("traversing tags");
51    let mut tags = store.tags().list().await?;
52    while let Some(tag) = tags.next().await {
53        let info = tag?;
54        trace!("adding root {:?} {:?}", info.name, info.hash_and_format());
55        roots.insert(info.hash_and_format());
56    }
57    trace!("traversing temp roots");
58    let mut tts = store.tags().list_temp_tags().await?;
59    while let Some(tt) = tts.next().await {
60        trace!("adding temp root {:?}", tt);
61        roots.insert(tt);
62    }
63    for HashAndFormat { hash, format } in roots {
64        // we need to do this for all formats except raw
65        if live.insert(hash) && !format.is_raw() {
66            let mut stream = store.export_bao(hash, ChunkRanges::all()).hashes();
67            while let Some(hash) = stream.next().await {
68                match hash {
69                    Ok(hash) => {
70                        live.insert(hash);
71                    }
72                    Err(e) => {
73                        warn!("error while traversing hashseq: {e:?}");
74                    }
75                }
76            }
77        }
78    }
79    trace!("gc mark done. found {} live blobs", live.len());
80    Ok(())
81}
82
83async fn gc_sweep_task(
84    store: &Store,
85    live: &HashSet<Hash>,
86    co: &Co<GcSweepEvent>,
87) -> crate::api::Result<()> {
88    let mut blobs = store.blobs().list().stream().await?;
89    let mut count = 0;
90    let mut batch = Vec::new();
91    while let Some(hash) = blobs.next().await {
92        let hash = hash?;
93        if !live.contains(&hash) {
94            batch.push(hash);
95            count += 1;
96        }
97        if batch.len() >= 100 {
98            store.blobs().delete(batch.clone()).await?;
99            batch.clear();
100        }
101    }
102    if !batch.is_empty() {
103        store.blobs().delete(batch).await?;
104    }
105    store.sync_db().await?;
106    co.yield_(GcSweepEvent::CustomDebug(format!("deleted {count} blobs")))
107        .await;
108    Ok(())
109}
110
111fn gc_mark<'a>(
112    store: &'a Store,
113    live: &'a mut HashSet<Hash>,
114) -> impl Stream<Item = GcMarkEvent> + 'a {
115    Gen::new(|co| async move {
116        if let Err(e) = gc_mark_task(store, live, &co).await {
117            co.yield_(GcMarkEvent::Error(e)).await;
118        }
119    })
120}
121
122fn gc_sweep<'a>(
123    store: &'a Store,
124    live: &'a HashSet<Hash>,
125) -> impl Stream<Item = GcSweepEvent> + 'a {
126    Gen::new(|co| async move {
127        if let Err(e) = gc_sweep_task(store, live, &co).await {
128            co.yield_(GcSweepEvent::Error(e)).await;
129        }
130    })
131}
132
133/// Configuration for garbage collection.
134#[derive(derive_more::Debug, Clone)]
135pub struct GcConfig {
136    /// Interval in which to run garbage collection.
137    pub interval: std::time::Duration,
138    /// Optional callback to manually add protected blobs.
139    ///
140    /// The callback is called before each garbage collection run. It gets a `&mut HashSet<Hash>`
141    /// and returns a future that returns [`ProtectOutcome`]. All hashes that are added to the
142    /// [`HashSet`] will be protected from garbage collection during this run.
143    ///
144    /// In normal operation, return [`ProtectOutcome::Continue`] from the callback. If you return
145    /// [`ProtectOutcome::Abort`], the garbage collection run will be aborted.Use this if your
146    /// source of hashes to protect returned an error, and thus garbage collection should be skipped
147    /// completely to not unintentionally delete blobs that should be protected.
148    #[debug("ProtectCallback")]
149    pub add_protected: Option<ProtectCb>,
150}
151
152/// Returned from [`ProtectCb`].
153///
154/// See [`GcConfig::add_protected] for details.
155#[derive(Debug)]
156pub enum ProtectOutcome {
157    /// Continue with the garbage collection run.
158    Continue,
159    /// Abort the garbage collection run.
160    Abort,
161}
162
163/// The type of the garbage collection callback.
164///
165/// See [`GcConfig::add_protected] for details.
166pub type ProtectCb = Arc<
167    dyn for<'a> Fn(
168            &'a mut HashSet<Hash>,
169        )
170            -> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
171        + Send
172        + Sync
173        + 'static,
174>;
175
176pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
177    debug!(externally_protected = live.len(), "gc: start");
178    {
179        store.clear_protected().await?;
180        let mut stream = gc_mark(store, live);
181        while let Some(ev) = stream.next().await {
182            match ev {
183                GcMarkEvent::CustomDebug(msg) => {
184                    debug!("{}", msg);
185                }
186                GcMarkEvent::CustomWarning(msg, err) => {
187                    warn!("{}: {:?}", msg, err);
188                }
189                GcMarkEvent::Error(err) => {
190                    error!("error during gc mark: {:?}", err);
191                    return Err(err);
192                }
193            }
194        }
195    }
196    debug!(total_protected = live.len(), "gc: sweep");
197    {
198        let mut stream = gc_sweep(store, live);
199        while let Some(ev) = stream.next().await {
200            match ev {
201                GcSweepEvent::CustomDebug(msg) => {
202                    debug!("{}", msg);
203                }
204                GcSweepEvent::CustomWarning(msg, err) => {
205                    warn!("{}: {:?}", msg, err);
206                }
207                GcSweepEvent::Error(err) => {
208                    error!("error during gc sweep: {:?}", err);
209                    return Err(err);
210                }
211            }
212        }
213    }
214    debug!("gc: done");
215
216    Ok(())
217}
218
219pub async fn run_gc(store: Store, config: GcConfig) {
220    debug!("gc enabled with interval {:?}", config.interval);
221    let mut live = HashSet::new();
222    loop {
223        live.clear();
224        tokio::time::sleep(config.interval).await;
225        if let Some(ref cb) = config.add_protected {
226            match (cb)(&mut live).await {
227                ProtectOutcome::Continue => {}
228                ProtectOutcome::Abort => {
229                    info!("abort gc run: protect callback indicated abort");
230                    continue;
231                }
232            }
233        }
234        if let Err(e) = gc_run_once(&store, &mut live).await {
235            error!("error during gc run: {e}");
236            break;
237        }
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use std::io::{self};
244
245    use bao_tree::io::EncodeError;
246    use range_collections::RangeSet2;
247    use testresult::TestResult;
248
249    use super::*;
250    use crate::{
251        api::{blobs::AddBytesOptions, ExportBaoError, RequestError, Store},
252        hashseq::HashSeq,
253        BlobFormat,
254    };
255
256    async fn gc_smoke(store: &Store) -> TestResult<()> {
257        let blobs = store.blobs();
258        let at = blobs.add_slice("a").temp_tag().await?;
259        let bt = blobs.add_slice("b").temp_tag().await?;
260        let ct = blobs.add_slice("c").temp_tag().await?;
261        let dt = blobs.add_slice("d").temp_tag().await?;
262        let et = blobs.add_slice("e").temp_tag().await?;
263        let ft = blobs.add_slice("f").temp_tag().await?;
264        let gt = blobs.add_slice("g").temp_tag().await?;
265        let ht = blobs.add_slice("h").with_named_tag("h").await?;
266        let a = at.hash();
267        let b = bt.hash();
268        let c = ct.hash();
269        let d = dt.hash();
270        let e = et.hash();
271        let f = ft.hash();
272        let g = gt.hash();
273        let h = ht.hash;
274        store.tags().set("c", ct.hash_and_format()).await?;
275        let dehs = [d, e].into_iter().collect::<HashSeq>();
276        let hehs = blobs
277            .add_bytes_with_opts(AddBytesOptions {
278                data: dehs.into(),
279                format: BlobFormat::HashSeq,
280            })
281            .await?;
282        let fghs = [f, g].into_iter().collect::<HashSeq>();
283        let fghs = blobs
284            .add_bytes_with_opts(AddBytesOptions {
285                data: fghs.into(),
286                format: BlobFormat::HashSeq,
287            })
288            .temp_tag()
289            .await?;
290        store.tags().set("fg", fghs.hash_and_format()).await?;
291        drop(fghs);
292        drop(bt);
293        store.tags().delete("h").await?;
294        let mut live = HashSet::new();
295        gc_run_once(store, &mut live).await?;
296        // a is protected because we keep the temp tag
297        assert!(live.contains(&a));
298        assert!(store.has(a).await?);
299        // b is not protected because we drop the temp tag
300        assert!(!live.contains(&b));
301        assert!(!store.has(b).await?);
302        // c is protected because we set an explicit tag
303        assert!(live.contains(&c));
304        assert!(store.has(c).await?);
305        // d and e are protected because they are part of a hashseq protected by a temp tag
306        assert!(live.contains(&d));
307        assert!(store.has(d).await?);
308        assert!(live.contains(&e));
309        assert!(store.has(e).await?);
310        // f and g are protected because they are part of a hashseq protected by a tag
311        assert!(live.contains(&f));
312        assert!(store.has(f).await?);
313        assert!(live.contains(&g));
314        assert!(store.has(g).await?);
315        // h is not protected because we deleted the tag before gc ran
316        assert!(!live.contains(&h));
317        assert!(!store.has(h).await?);
318        drop(at);
319        drop(hehs);
320        Ok(())
321    }
322
323    #[cfg(feature = "fs-store")]
324    async fn gc_file_delete(path: &std::path::Path, store: &Store) -> TestResult<()> {
325        use bao_tree::ChunkNum;
326
327        use crate::store::{fs::options::PathOptions, util::tests::create_n0_bao};
328        let mut live = HashSet::new();
329        let options = PathOptions::new(&path.join("db"));
330        // create a large complete file and check that the data and outboard files are deleted by gc
331        {
332            let a = store
333                .blobs()
334                .add_slice(vec![0u8; 8000000])
335                .temp_tag()
336                .await?;
337            let ah = a.hash();
338            let data_path = options.data_path(&ah);
339            let outboard_path = options.outboard_path(&ah);
340            assert!(data_path.exists());
341            assert!(outboard_path.exists());
342            assert!(store.has(ah).await?);
343            drop(a);
344            gc_run_once(store, &mut live).await?;
345            assert!(!data_path.exists());
346            assert!(!outboard_path.exists());
347        }
348        live.clear();
349        // create a large partial file and check that the data and outboard file as well as
350        // the sizes and bitfield files are deleted by gc
351        {
352            let data = vec![1u8; 8000000];
353            let ranges = ChunkRanges::from(..ChunkNum(19));
354            let (bh, b_bao) = create_n0_bao(&data, &ranges)?;
355            store.import_bao_bytes(bh, ranges, b_bao).await?;
356            let data_path = options.data_path(&bh);
357            let outboard_path = options.outboard_path(&bh);
358            let sizes_path = options.sizes_path(&bh);
359            let bitfield_path = options.bitfield_path(&bh);
360            store.wait_idle().await?;
361            assert!(data_path.exists());
362            assert!(outboard_path.exists());
363            assert!(sizes_path.exists());
364            assert!(bitfield_path.exists());
365            gc_run_once(store, &mut live).await?;
366            assert!(!data_path.exists());
367            assert!(!outboard_path.exists());
368            assert!(!sizes_path.exists());
369            assert!(!bitfield_path.exists());
370        }
371        Ok(())
372    }
373
374    #[tokio::test]
375    #[cfg(feature = "fs-store")]
376    async fn gc_smoke_fs() -> TestResult {
377        tracing_subscriber::fmt::try_init().ok();
378        let testdir = tempfile::tempdir()?;
379        let db_path = testdir.path().join("db");
380        let store = crate::store::fs::FsStore::load(&db_path).await?;
381        gc_smoke(&store).await?;
382        gc_file_delete(testdir.path(), &store).await?;
383        Ok(())
384    }
385
386    #[tokio::test]
387    async fn gc_smoke_mem() -> TestResult {
388        tracing_subscriber::fmt::try_init().ok();
389        let store = crate::store::mem::MemStore::new();
390        gc_smoke(&store).await?;
391        Ok(())
392    }
393
394    #[tokio::test]
395    #[cfg(feature = "fs-store")]
396    async fn gc_check_deletion_fs() -> TestResult {
397        tracing_subscriber::fmt::try_init().ok();
398        let testdir = tempfile::tempdir()?;
399        let db_path = testdir.path().join("db");
400        let store = crate::store::fs::FsStore::load(&db_path).await?;
401        gc_check_deletion(&store).await
402    }
403
404    #[tokio::test]
405    async fn gc_check_deletion_mem() -> TestResult {
406        tracing_subscriber::fmt::try_init().ok();
407        let store = crate::store::mem::MemStore::default();
408        gc_check_deletion(&store).await
409    }
410
411    async fn gc_check_deletion(store: &Store) -> TestResult {
412        let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?;
413        let hash = temp_tag.hash();
414        assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo");
415        drop(temp_tag);
416        let mut live = HashSet::new();
417        gc_run_once(store, &mut live).await?;
418
419        // check that `get_bytes` returns an error.
420        let res = store.get_bytes(hash).await;
421        assert!(res.is_err());
422        assert!(matches!(
423            res,
424            Err(ExportBaoError::ExportBaoInner {
425                source: EncodeError::Io(cause),
426                ..
427            }) if cause.kind() == io::ErrorKind::NotFound
428        ));
429
430        // check that `export_ranges` returns an error.
431        let res = store
432            .export_ranges(hash, RangeSet2::all())
433            .concatenate()
434            .await;
435        assert!(res.is_err());
436        assert!(matches!(
437            res,
438            Err(RequestError::Inner{
439                source: crate::api::Error::Io(cause),
440                ..
441            }) if cause.kind() == io::ErrorKind::NotFound
442        ));
443
444        // check that `export_bao` returns an error.
445        let res = store
446            .export_bao(hash, ChunkRanges::all())
447            .bao_to_vec()
448            .await;
449        assert!(res.is_err());
450        println!("export_bao res {res:?}");
451        assert!(matches!(
452            res,
453            Err(RequestError::Inner{
454                source: crate::api::Error::Io(cause),
455                ..
456            }) if cause.kind() == io::ErrorKind::NotFound
457        ));
458
459        // check that `export` returns an error.
460        let target = tempfile::NamedTempFile::new()?;
461        let path = target.path();
462        let res = store.export(hash, path).await;
463        assert!(res.is_err());
464        assert!(matches!(
465            res,
466            Err(RequestError::Inner{
467                source: crate::api::Error::Io(cause),
468                ..
469            }) if cause.kind() == io::ErrorKind::NotFound
470        ));
471        Ok(())
472    }
473}