1use std::{collections::HashSet, pin::Pin, sync::Arc};
2
3use bao_tree::ChunkRanges;
4use genawaiter::sync::{Co, Gen};
5use n0_future::{Stream, StreamExt};
6use tracing::{debug, error, info, warn};
7
8use crate::{api::Store, Hash, HashAndFormat};
9
10#[derive(Debug)]
12pub enum GcMarkEvent {
13 CustomDebug(String),
15 CustomWarning(String, Option<crate::api::Error>),
17 Error(crate::api::Error),
19}
20
21#[derive(Debug)]
23pub enum GcSweepEvent {
24 CustomDebug(String),
26 #[allow(dead_code)]
28 CustomWarning(String, Option<crate::api::Error>),
29 Error(crate::api::Error),
31}
32
33pub(super) async fn gc_mark_task(
35 store: &Store,
36 live: &mut HashSet<Hash>,
37 co: &Co<GcMarkEvent>,
38) -> crate::api::Result<()> {
39 macro_rules! trace {
40 ($($arg:tt)*) => {
41 co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
42 };
43 }
44 macro_rules! warn {
45 ($($arg:tt)*) => {
46 co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
47 };
48 }
49 let mut roots = HashSet::new();
50 trace!("traversing tags");
51 let mut tags = store.tags().list().await?;
52 while let Some(tag) = tags.next().await {
53 let info = tag?;
54 trace!("adding root {:?} {:?}", info.name, info.hash_and_format());
55 roots.insert(info.hash_and_format());
56 }
57 trace!("traversing temp roots");
58 let mut tts = store.tags().list_temp_tags().await?;
59 while let Some(tt) = tts.next().await {
60 trace!("adding temp root {:?}", tt);
61 roots.insert(tt);
62 }
63 for HashAndFormat { hash, format } in roots {
64 if live.insert(hash) && !format.is_raw() {
66 let mut stream = store.export_bao(hash, ChunkRanges::all()).hashes();
67 while let Some(hash) = stream.next().await {
68 match hash {
69 Ok(hash) => {
70 live.insert(hash);
71 }
72 Err(e) => {
73 warn!("error while traversing hashseq: {e:?}");
74 }
75 }
76 }
77 }
78 }
79 trace!("gc mark done. found {} live blobs", live.len());
80 Ok(())
81}
82
83async fn gc_sweep_task(
84 store: &Store,
85 live: &HashSet<Hash>,
86 co: &Co<GcSweepEvent>,
87) -> crate::api::Result<()> {
88 let mut blobs = store.blobs().list().stream().await?;
89 let mut count = 0;
90 let mut batch = Vec::new();
91 while let Some(hash) = blobs.next().await {
92 let hash = hash?;
93 if !live.contains(&hash) {
94 batch.push(hash);
95 count += 1;
96 }
97 if batch.len() >= 100 {
98 store.blobs().delete(batch.clone()).await?;
99 batch.clear();
100 }
101 }
102 if !batch.is_empty() {
103 store.blobs().delete(batch).await?;
104 }
105 store.sync_db().await?;
106 co.yield_(GcSweepEvent::CustomDebug(format!("deleted {count} blobs")))
107 .await;
108 Ok(())
109}
110
111fn gc_mark<'a>(
112 store: &'a Store,
113 live: &'a mut HashSet<Hash>,
114) -> impl Stream<Item = GcMarkEvent> + 'a {
115 Gen::new(|co| async move {
116 if let Err(e) = gc_mark_task(store, live, &co).await {
117 co.yield_(GcMarkEvent::Error(e)).await;
118 }
119 })
120}
121
122fn gc_sweep<'a>(
123 store: &'a Store,
124 live: &'a HashSet<Hash>,
125) -> impl Stream<Item = GcSweepEvent> + 'a {
126 Gen::new(|co| async move {
127 if let Err(e) = gc_sweep_task(store, live, &co).await {
128 co.yield_(GcSweepEvent::Error(e)).await;
129 }
130 })
131}
132
133#[derive(derive_more::Debug, Clone)]
135pub struct GcConfig {
136 pub interval: std::time::Duration,
138 #[debug("ProtectCallback")]
149 pub add_protected: Option<ProtectCb>,
150}
151
152#[derive(Debug)]
156pub enum ProtectOutcome {
157 Continue,
159 Abort,
161}
162
163pub type ProtectCb = Arc<
167 dyn for<'a> Fn(
168 &'a mut HashSet<Hash>,
169 )
170 -> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
171 + Send
172 + Sync
173 + 'static,
174>;
175
176pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
177 debug!(externally_protected = live.len(), "gc: start");
178 {
179 store.clear_protected().await?;
180 let mut stream = gc_mark(store, live);
181 while let Some(ev) = stream.next().await {
182 match ev {
183 GcMarkEvent::CustomDebug(msg) => {
184 debug!("{}", msg);
185 }
186 GcMarkEvent::CustomWarning(msg, err) => {
187 warn!("{}: {:?}", msg, err);
188 }
189 GcMarkEvent::Error(err) => {
190 error!("error during gc mark: {:?}", err);
191 return Err(err);
192 }
193 }
194 }
195 }
196 debug!(total_protected = live.len(), "gc: sweep");
197 {
198 let mut stream = gc_sweep(store, live);
199 while let Some(ev) = stream.next().await {
200 match ev {
201 GcSweepEvent::CustomDebug(msg) => {
202 debug!("{}", msg);
203 }
204 GcSweepEvent::CustomWarning(msg, err) => {
205 warn!("{}: {:?}", msg, err);
206 }
207 GcSweepEvent::Error(err) => {
208 error!("error during gc sweep: {:?}", err);
209 return Err(err);
210 }
211 }
212 }
213 }
214 debug!("gc: done");
215
216 Ok(())
217}
218
219pub async fn run_gc(store: Store, config: GcConfig) {
220 debug!("gc enabled with interval {:?}", config.interval);
221 let mut live = HashSet::new();
222 loop {
223 live.clear();
224 tokio::time::sleep(config.interval).await;
225 if let Some(ref cb) = config.add_protected {
226 match (cb)(&mut live).await {
227 ProtectOutcome::Continue => {}
228 ProtectOutcome::Abort => {
229 info!("abort gc run: protect callback indicated abort");
230 continue;
231 }
232 }
233 }
234 if let Err(e) = gc_run_once(&store, &mut live).await {
235 error!("error during gc run: {e}");
236 break;
237 }
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use std::io::{self};
244
245 use bao_tree::io::EncodeError;
246 use range_collections::RangeSet2;
247 use testresult::TestResult;
248
249 use super::*;
250 use crate::{
251 api::{blobs::AddBytesOptions, ExportBaoError, RequestError, Store},
252 hashseq::HashSeq,
253 BlobFormat,
254 };
255
256 async fn gc_smoke(store: &Store) -> TestResult<()> {
257 let blobs = store.blobs();
258 let at = blobs.add_slice("a").temp_tag().await?;
259 let bt = blobs.add_slice("b").temp_tag().await?;
260 let ct = blobs.add_slice("c").temp_tag().await?;
261 let dt = blobs.add_slice("d").temp_tag().await?;
262 let et = blobs.add_slice("e").temp_tag().await?;
263 let ft = blobs.add_slice("f").temp_tag().await?;
264 let gt = blobs.add_slice("g").temp_tag().await?;
265 let ht = blobs.add_slice("h").with_named_tag("h").await?;
266 let a = at.hash();
267 let b = bt.hash();
268 let c = ct.hash();
269 let d = dt.hash();
270 let e = et.hash();
271 let f = ft.hash();
272 let g = gt.hash();
273 let h = ht.hash;
274 store.tags().set("c", ct.hash_and_format()).await?;
275 let dehs = [d, e].into_iter().collect::<HashSeq>();
276 let hehs = blobs
277 .add_bytes_with_opts(AddBytesOptions {
278 data: dehs.into(),
279 format: BlobFormat::HashSeq,
280 })
281 .await?;
282 let fghs = [f, g].into_iter().collect::<HashSeq>();
283 let fghs = blobs
284 .add_bytes_with_opts(AddBytesOptions {
285 data: fghs.into(),
286 format: BlobFormat::HashSeq,
287 })
288 .temp_tag()
289 .await?;
290 store.tags().set("fg", fghs.hash_and_format()).await?;
291 drop(fghs);
292 drop(bt);
293 store.tags().delete("h").await?;
294 let mut live = HashSet::new();
295 gc_run_once(store, &mut live).await?;
296 assert!(live.contains(&a));
298 assert!(store.has(a).await?);
299 assert!(!live.contains(&b));
301 assert!(!store.has(b).await?);
302 assert!(live.contains(&c));
304 assert!(store.has(c).await?);
305 assert!(live.contains(&d));
307 assert!(store.has(d).await?);
308 assert!(live.contains(&e));
309 assert!(store.has(e).await?);
310 assert!(live.contains(&f));
312 assert!(store.has(f).await?);
313 assert!(live.contains(&g));
314 assert!(store.has(g).await?);
315 assert!(!live.contains(&h));
317 assert!(!store.has(h).await?);
318 drop(at);
319 drop(hehs);
320 Ok(())
321 }
322
323 #[cfg(feature = "fs-store")]
324 async fn gc_file_delete(path: &std::path::Path, store: &Store) -> TestResult<()> {
325 use bao_tree::ChunkNum;
326
327 use crate::store::{fs::options::PathOptions, util::tests::create_n0_bao};
328 let mut live = HashSet::new();
329 let options = PathOptions::new(&path.join("db"));
330 {
332 let a = store
333 .blobs()
334 .add_slice(vec![0u8; 8000000])
335 .temp_tag()
336 .await?;
337 let ah = a.hash();
338 let data_path = options.data_path(&ah);
339 let outboard_path = options.outboard_path(&ah);
340 assert!(data_path.exists());
341 assert!(outboard_path.exists());
342 assert!(store.has(ah).await?);
343 drop(a);
344 gc_run_once(store, &mut live).await?;
345 assert!(!data_path.exists());
346 assert!(!outboard_path.exists());
347 }
348 live.clear();
349 {
352 let data = vec![1u8; 8000000];
353 let ranges = ChunkRanges::from(..ChunkNum(19));
354 let (bh, b_bao) = create_n0_bao(&data, &ranges)?;
355 store.import_bao_bytes(bh, ranges, b_bao).await?;
356 let data_path = options.data_path(&bh);
357 let outboard_path = options.outboard_path(&bh);
358 let sizes_path = options.sizes_path(&bh);
359 let bitfield_path = options.bitfield_path(&bh);
360 store.wait_idle().await?;
361 assert!(data_path.exists());
362 assert!(outboard_path.exists());
363 assert!(sizes_path.exists());
364 assert!(bitfield_path.exists());
365 gc_run_once(store, &mut live).await?;
366 assert!(!data_path.exists());
367 assert!(!outboard_path.exists());
368 assert!(!sizes_path.exists());
369 assert!(!bitfield_path.exists());
370 }
371 Ok(())
372 }
373
374 #[tokio::test]
375 #[cfg(feature = "fs-store")]
376 async fn gc_smoke_fs() -> TestResult {
377 tracing_subscriber::fmt::try_init().ok();
378 let testdir = tempfile::tempdir()?;
379 let db_path = testdir.path().join("db");
380 let store = crate::store::fs::FsStore::load(&db_path).await?;
381 gc_smoke(&store).await?;
382 gc_file_delete(testdir.path(), &store).await?;
383 Ok(())
384 }
385
386 #[tokio::test]
387 async fn gc_smoke_mem() -> TestResult {
388 tracing_subscriber::fmt::try_init().ok();
389 let store = crate::store::mem::MemStore::new();
390 gc_smoke(&store).await?;
391 Ok(())
392 }
393
394 #[tokio::test]
395 #[cfg(feature = "fs-store")]
396 async fn gc_check_deletion_fs() -> TestResult {
397 tracing_subscriber::fmt::try_init().ok();
398 let testdir = tempfile::tempdir()?;
399 let db_path = testdir.path().join("db");
400 let store = crate::store::fs::FsStore::load(&db_path).await?;
401 gc_check_deletion(&store).await
402 }
403
404 #[tokio::test]
405 async fn gc_check_deletion_mem() -> TestResult {
406 tracing_subscriber::fmt::try_init().ok();
407 let store = crate::store::mem::MemStore::default();
408 gc_check_deletion(&store).await
409 }
410
411 async fn gc_check_deletion(store: &Store) -> TestResult {
412 let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?;
413 let hash = temp_tag.hash();
414 assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo");
415 drop(temp_tag);
416 let mut live = HashSet::new();
417 gc_run_once(store, &mut live).await?;
418
419 let res = store.get_bytes(hash).await;
421 assert!(res.is_err());
422 assert!(matches!(
423 res,
424 Err(ExportBaoError::ExportBaoInner {
425 source: EncodeError::Io(cause),
426 ..
427 }) if cause.kind() == io::ErrorKind::NotFound
428 ));
429
430 let res = store
432 .export_ranges(hash, RangeSet2::all())
433 .concatenate()
434 .await;
435 assert!(res.is_err());
436 assert!(matches!(
437 res,
438 Err(RequestError::Inner{
439 source: crate::api::Error::Io(cause),
440 ..
441 }) if cause.kind() == io::ErrorKind::NotFound
442 ));
443
444 let res = store
446 .export_bao(hash, ChunkRanges::all())
447 .bao_to_vec()
448 .await;
449 assert!(res.is_err());
450 println!("export_bao res {res:?}");
451 assert!(matches!(
452 res,
453 Err(RequestError::Inner{
454 source: crate::api::Error::Io(cause),
455 ..
456 }) if cause.kind() == io::ErrorKind::NotFound
457 ));
458
459 let target = tempfile::NamedTempFile::new()?;
461 let path = target.path();
462 let res = store.export(hash, path).await;
463 assert!(res.is_err());
464 assert!(matches!(
465 res,
466 Err(RequestError::Inner{
467 source: crate::api::Error::Io(cause),
468 ..
469 }) if cause.kind() == io::ErrorKind::NotFound
470 ));
471 Ok(())
472 }
473}