iroh_blobs/store/fs/
gc.rs

1use std::{collections::HashSet, pin::Pin, sync::Arc};
2
3use bao_tree::ChunkRanges;
4use genawaiter::sync::{Co, Gen};
5use n0_future::{Stream, StreamExt};
6use tracing::{debug, error, info, warn};
7
8use crate::{api::Store, Hash, HashAndFormat};
9
10/// An event related to GC
11#[derive(Debug)]
12pub enum GcMarkEvent {
13    /// A custom event (info)
14    CustomDebug(String),
15    /// A custom non critical error
16    CustomWarning(String, Option<crate::api::Error>),
17    /// An unrecoverable error during GC
18    Error(crate::api::Error),
19}
20
21/// An event related to GC
22#[derive(Debug)]
23pub enum GcSweepEvent {
24    /// A custom event (debug)
25    CustomDebug(String),
26    /// A custom non critical error
27    #[allow(dead_code)]
28    CustomWarning(String, Option<crate::api::Error>),
29    /// An unrecoverable error during GC
30    Error(crate::api::Error),
31}
32
33/// Compute the set of live hashes
34pub(super) async fn gc_mark_task(
35    store: &Store,
36    live: &mut HashSet<Hash>,
37    co: &Co<GcMarkEvent>,
38) -> crate::api::Result<()> {
39    macro_rules! trace {
40        ($($arg:tt)*) => {
41            co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
42        };
43    }
44    macro_rules! warn {
45        ($($arg:tt)*) => {
46            co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
47        };
48    }
49    let mut roots = HashSet::new();
50    trace!("traversing tags");
51    let mut tags = store.tags().list().await?;
52    while let Some(tag) = tags.next().await {
53        let info = tag?;
54        trace!("adding root {:?} {:?}", info.name, info.hash_and_format());
55        roots.insert(info.hash_and_format());
56    }
57    trace!("traversing temp roots");
58    let mut tts = store.tags().list_temp_tags().await?;
59    while let Some(tt) = tts.next().await {
60        trace!("adding temp root {:?}", tt);
61        roots.insert(tt);
62    }
63    for HashAndFormat { hash, format } in roots {
64        // we need to do this for all formats except raw
65        if live.insert(hash) && !format.is_raw() {
66            let mut stream = store.export_bao(hash, ChunkRanges::all()).hashes();
67            while let Some(hash) = stream.next().await {
68                match hash {
69                    Ok(hash) => {
70                        live.insert(hash);
71                    }
72                    Err(e) => {
73                        warn!("error while traversing hashseq: {e:?}");
74                    }
75                }
76            }
77        }
78    }
79    trace!("gc mark done. found {} live blobs", live.len());
80    Ok(())
81}
82
83async fn gc_sweep_task(
84    store: &Store,
85    live: &HashSet<Hash>,
86    co: &Co<GcSweepEvent>,
87) -> crate::api::Result<()> {
88    let mut blobs = store.blobs().list().stream().await?;
89    let mut count = 0;
90    let mut batch = Vec::new();
91    while let Some(hash) = blobs.next().await {
92        let hash = hash?;
93        if !live.contains(&hash) {
94            batch.push(hash);
95            count += 1;
96        }
97        if batch.len() >= 100 {
98            store.blobs().delete(batch.clone()).await?;
99            batch.clear();
100        }
101    }
102    if !batch.is_empty() {
103        store.blobs().delete(batch).await?;
104    }
105    store.sync_db().await?;
106    co.yield_(GcSweepEvent::CustomDebug(format!("deleted {count} blobs")))
107        .await;
108    Ok(())
109}
110
111fn gc_mark<'a>(
112    store: &'a Store,
113    live: &'a mut HashSet<Hash>,
114) -> impl Stream<Item = GcMarkEvent> + 'a {
115    Gen::new(|co| async move {
116        if let Err(e) = gc_mark_task(store, live, &co).await {
117            co.yield_(GcMarkEvent::Error(e)).await;
118        }
119    })
120}
121
122fn gc_sweep<'a>(
123    store: &'a Store,
124    live: &'a HashSet<Hash>,
125) -> impl Stream<Item = GcSweepEvent> + 'a {
126    Gen::new(|co| async move {
127        if let Err(e) = gc_sweep_task(store, live, &co).await {
128            co.yield_(GcSweepEvent::Error(e)).await;
129        }
130    })
131}
132
133/// Configuration for garbage collection.
134#[derive(derive_more::Debug, Clone)]
135pub struct GcConfig {
136    /// Interval in which to run garbage collection.
137    pub interval: std::time::Duration,
138    /// Optional callback to manually add protected blobs.
139    ///
140    /// The callback is called before each garbage collection run. It gets a `&mut HashSet<Hash>`
141    /// and returns a future that returns [`ProtectOutcome`]. All hashes that are added to the
142    /// [`HashSet`] will be protected from garbage collection during this run.
143    ///
144    /// In normal operation, return [`ProtectOutcome::Continue`] from the callback. If you return
145    /// [`ProtectOutcome::Abort`], the garbage collection run will be aborted.Use this if your
146    /// source of hashes to protect returned an error, and thus garbage collection should be skipped
147    /// completely to not unintentionally delete blobs that should be protected.
148    #[debug("ProtectCallback")]
149    pub add_protected: Option<ProtectCb>,
150}
151
152/// Returned from [`ProtectCb`].
153///
154/// See [`GcConfig::add_protected] for details.
155#[derive(Debug)]
156pub enum ProtectOutcome {
157    /// Continue with the garbage collection run.
158    Continue,
159    /// Abort the garbage collection run.
160    Abort,
161}
162
163/// The type of the garbage collection callback.
164///
165/// See [`GcConfig::add_protected] for details.
166pub type ProtectCb = Arc<
167    dyn for<'a> Fn(
168            &'a mut HashSet<Hash>,
169        )
170            -> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
171        + Send
172        + Sync
173        + 'static,
174>;
175
176pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
177    debug!(externally_protected = live.len(), "gc: start");
178    {
179        store.clear_protected().await?;
180        let mut stream = gc_mark(store, live);
181        while let Some(ev) = stream.next().await {
182            match ev {
183                GcMarkEvent::CustomDebug(msg) => {
184                    debug!("{}", msg);
185                }
186                GcMarkEvent::CustomWarning(msg, err) => {
187                    warn!("{}: {:?}", msg, err);
188                }
189                GcMarkEvent::Error(err) => {
190                    error!("error during gc mark: {:?}", err);
191                    return Err(err);
192                }
193            }
194        }
195    }
196    debug!(total_protected = live.len(), "gc: sweep");
197    {
198        let mut stream = gc_sweep(store, live);
199        while let Some(ev) = stream.next().await {
200            match ev {
201                GcSweepEvent::CustomDebug(msg) => {
202                    debug!("{}", msg);
203                }
204                GcSweepEvent::CustomWarning(msg, err) => {
205                    warn!("{}: {:?}", msg, err);
206                }
207                GcSweepEvent::Error(err) => {
208                    error!("error during gc sweep: {:?}", err);
209                    return Err(err);
210                }
211            }
212        }
213    }
214    debug!("gc: done");
215
216    Ok(())
217}
218
219pub async fn run_gc(store: Store, config: GcConfig) {
220    debug!("gc enabled with interval {:?}", config.interval);
221    let mut live = HashSet::new();
222    loop {
223        live.clear();
224        tokio::time::sleep(config.interval).await;
225        if let Some(ref cb) = config.add_protected {
226            match (cb)(&mut live).await {
227                ProtectOutcome::Continue => {}
228                ProtectOutcome::Abort => {
229                    info!("abort gc run: protect callback indicated abort");
230                    continue;
231                }
232            }
233        }
234        if let Err(e) = gc_run_once(&store, &mut live).await {
235            error!("error during gc run: {e}");
236            break;
237        }
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use std::{
244        io::{self},
245        path::Path,
246    };
247
248    use bao_tree::{io::EncodeError, ChunkNum};
249    use range_collections::RangeSet2;
250    use testresult::TestResult;
251
252    use super::*;
253    use crate::{
254        api::{blobs::AddBytesOptions, ExportBaoError, RequestError, Store},
255        hashseq::HashSeq,
256        store::fs::{options::PathOptions, tests::create_n0_bao},
257        BlobFormat,
258    };
259
260    async fn gc_smoke(store: &Store) -> TestResult<()> {
261        let blobs = store.blobs();
262        let at = blobs.add_slice("a").temp_tag().await?;
263        let bt = blobs.add_slice("b").temp_tag().await?;
264        let ct = blobs.add_slice("c").temp_tag().await?;
265        let dt = blobs.add_slice("d").temp_tag().await?;
266        let et = blobs.add_slice("e").temp_tag().await?;
267        let ft = blobs.add_slice("f").temp_tag().await?;
268        let gt = blobs.add_slice("g").temp_tag().await?;
269        let a = *at.hash();
270        let b = *bt.hash();
271        let c = *ct.hash();
272        let d = *dt.hash();
273        let e = *et.hash();
274        let f = *ft.hash();
275        let g = *gt.hash();
276        store.tags().set("c", *ct.hash_and_format()).await?;
277        let dehs = [d, e].into_iter().collect::<HashSeq>();
278        let hehs = blobs
279            .add_bytes_with_opts(AddBytesOptions {
280                data: dehs.into(),
281                format: BlobFormat::HashSeq,
282            })
283            .await?;
284        let fghs = [f, g].into_iter().collect::<HashSeq>();
285        let fghs = blobs
286            .add_bytes_with_opts(AddBytesOptions {
287                data: fghs.into(),
288                format: BlobFormat::HashSeq,
289            })
290            .temp_tag()
291            .await?;
292        store.tags().set("fg", *fghs.hash_and_format()).await?;
293        drop(fghs);
294        drop(bt);
295        let mut live = HashSet::new();
296        gc_run_once(store, &mut live).await?;
297        // a is protected because we keep the temp tag
298        assert!(live.contains(&a));
299        assert!(store.has(a).await?);
300        // b is not protected because we drop the temp tag
301        assert!(!live.contains(&b));
302        assert!(!store.has(b).await?);
303        // c is protected because we set an explicit tag
304        assert!(live.contains(&c));
305        assert!(store.has(c).await?);
306        // d and e are protected because they are part of a hashseq protected by a temp tag
307        assert!(live.contains(&d));
308        assert!(store.has(d).await?);
309        assert!(live.contains(&e));
310        assert!(store.has(e).await?);
311        // f and g are protected because they are part of a hashseq protected by a tag
312        assert!(live.contains(&f));
313        assert!(store.has(f).await?);
314        assert!(live.contains(&g));
315        assert!(store.has(g).await?);
316        drop(at);
317        drop(hehs);
318        Ok(())
319    }
320
321    async fn gc_file_delete(path: &Path, store: &Store) -> TestResult<()> {
322        let mut live = HashSet::new();
323        let options = PathOptions::new(&path.join("db"));
324        // create a large complete file and check that the data and outboard files are deleted by gc
325        {
326            let a = store
327                .blobs()
328                .add_slice(vec![0u8; 8000000])
329                .temp_tag()
330                .await?;
331            let ah = a.hash();
332            let data_path = options.data_path(ah);
333            let outboard_path = options.outboard_path(ah);
334            assert!(data_path.exists());
335            assert!(outboard_path.exists());
336            assert!(store.has(*ah).await?);
337            drop(a);
338            gc_run_once(store, &mut live).await?;
339            assert!(!data_path.exists());
340            assert!(!outboard_path.exists());
341        }
342        live.clear();
343        // create a large partial file and check that the data and outboard file as well as
344        // the sizes and bitfield files are deleted by gc
345        {
346            let data = vec![1u8; 8000000];
347            let ranges = ChunkRanges::from(..ChunkNum(19));
348            let (bh, b_bao) = create_n0_bao(&data, &ranges)?;
349            store.import_bao_bytes(bh, ranges, b_bao).await?;
350            let data_path = options.data_path(&bh);
351            let outboard_path = options.outboard_path(&bh);
352            let sizes_path = options.sizes_path(&bh);
353            let bitfield_path = options.bitfield_path(&bh);
354            assert!(data_path.exists());
355            assert!(outboard_path.exists());
356            assert!(sizes_path.exists());
357            assert!(bitfield_path.exists());
358            gc_run_once(store, &mut live).await?;
359            assert!(!data_path.exists());
360            assert!(!outboard_path.exists());
361            assert!(!sizes_path.exists());
362            assert!(!bitfield_path.exists());
363        }
364        Ok(())
365    }
366
367    #[tokio::test]
368    async fn gc_smoke_fs() -> TestResult {
369        tracing_subscriber::fmt::try_init().ok();
370        let testdir = tempfile::tempdir()?;
371        let db_path = testdir.path().join("db");
372        let store = crate::store::fs::FsStore::load(&db_path).await?;
373        gc_smoke(&store).await?;
374        gc_file_delete(testdir.path(), &store).await?;
375        Ok(())
376    }
377
378    #[tokio::test]
379    async fn gc_smoke_mem() -> TestResult {
380        tracing_subscriber::fmt::try_init().ok();
381        let store = crate::store::mem::MemStore::new();
382        gc_smoke(&store).await?;
383        Ok(())
384    }
385
386    #[tokio::test]
387    async fn gc_check_deletion_fs() -> TestResult {
388        tracing_subscriber::fmt::try_init().ok();
389        let testdir = tempfile::tempdir()?;
390        let db_path = testdir.path().join("db");
391        let store = crate::store::fs::FsStore::load(&db_path).await?;
392        gc_check_deletion(&store).await
393    }
394
395    #[tokio::test]
396    async fn gc_check_deletion_mem() -> TestResult {
397        tracing_subscriber::fmt::try_init().ok();
398        let store = crate::store::mem::MemStore::default();
399        gc_check_deletion(&store).await
400    }
401
402    async fn gc_check_deletion(store: &Store) -> TestResult {
403        let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?;
404        let hash = *temp_tag.hash();
405        assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo");
406        drop(temp_tag);
407        let mut live = HashSet::new();
408        gc_run_once(store, &mut live).await?;
409
410        // check that `get_bytes` returns an error.
411        let res = store.get_bytes(hash).await;
412        assert!(res.is_err());
413        assert!(matches!(
414            res,
415            Err(ExportBaoError::ExportBaoInner {
416                source: EncodeError::Io(cause),
417                ..
418            }) if cause.kind() == io::ErrorKind::NotFound
419        ));
420
421        // check that `export_ranges` returns an error.
422        let res = store
423            .export_ranges(hash, RangeSet2::all())
424            .concatenate()
425            .await;
426        assert!(res.is_err());
427        assert!(matches!(
428            res,
429            Err(RequestError::Inner{
430                source: crate::api::Error::Io(cause),
431                ..
432            }) if cause.kind() == io::ErrorKind::NotFound
433        ));
434
435        // check that `export_bao` returns an error.
436        let res = store
437            .export_bao(hash, ChunkRanges::all())
438            .bao_to_vec()
439            .await;
440        assert!(res.is_err());
441        println!("export_bao res {res:?}");
442        assert!(matches!(
443            res,
444            Err(RequestError::Inner{
445                source: crate::api::Error::Io(cause),
446                ..
447            }) if cause.kind() == io::ErrorKind::NotFound
448        ));
449
450        // check that `export` returns an error.
451        let target = tempfile::NamedTempFile::new()?;
452        let path = target.path();
453        let res = store.export(hash, path).await;
454        assert!(res.is_err());
455        assert!(matches!(
456            res,
457            Err(RequestError::Inner{
458                source: crate::api::Error::Io(cause),
459                ..
460            }) if cause.kind() == io::ErrorKind::NotFound
461        ));
462        Ok(())
463    }
464}