1use std::{collections::HashSet, pin::Pin, sync::Arc};
2
3use bao_tree::ChunkRanges;
4use genawaiter::sync::{Co, Gen};
5use n0_future::{Stream, StreamExt};
6use tracing::{debug, error, info, warn};
7
8use crate::{api::Store, Hash, HashAndFormat};
9
10#[derive(Debug)]
12pub enum GcMarkEvent {
13 CustomDebug(String),
15 CustomWarning(String, Option<crate::api::Error>),
17 Error(crate::api::Error),
19}
20
21#[derive(Debug)]
23pub enum GcSweepEvent {
24 CustomDebug(String),
26 #[allow(dead_code)]
28 CustomWarning(String, Option<crate::api::Error>),
29 Error(crate::api::Error),
31}
32
33pub(super) async fn gc_mark_task(
35 store: &Store,
36 live: &mut HashSet<Hash>,
37 co: &Co<GcMarkEvent>,
38) -> crate::api::Result<()> {
39 macro_rules! trace {
40 ($($arg:tt)*) => {
41 co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
42 };
43 }
44 macro_rules! warn {
45 ($($arg:tt)*) => {
46 co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
47 };
48 }
49 let mut roots = HashSet::new();
50 trace!("traversing tags");
51 let mut tags = store.tags().list().await?;
52 while let Some(tag) = tags.next().await {
53 let info = tag?;
54 trace!("adding root {:?} {:?}", info.name, info.hash_and_format());
55 roots.insert(info.hash_and_format());
56 }
57 trace!("traversing temp roots");
58 let mut tts = store.tags().list_temp_tags().await?;
59 while let Some(tt) = tts.next().await {
60 trace!("adding temp root {:?}", tt);
61 roots.insert(tt);
62 }
63 for HashAndFormat { hash, format } in roots {
64 if live.insert(hash) && !format.is_raw() {
66 let mut stream = store.export_bao(hash, ChunkRanges::all()).hashes();
67 while let Some(hash) = stream.next().await {
68 match hash {
69 Ok(hash) => {
70 live.insert(hash);
71 }
72 Err(e) => {
73 warn!("error while traversing hashseq: {e:?}");
74 }
75 }
76 }
77 }
78 }
79 trace!("gc mark done. found {} live blobs", live.len());
80 Ok(())
81}
82
83async fn gc_sweep_task(
84 store: &Store,
85 live: &HashSet<Hash>,
86 co: &Co<GcSweepEvent>,
87) -> crate::api::Result<()> {
88 let mut blobs = store.blobs().list().stream().await?;
89 let mut count = 0;
90 let mut batch = Vec::new();
91 while let Some(hash) = blobs.next().await {
92 let hash = hash?;
93 if !live.contains(&hash) {
94 batch.push(hash);
95 count += 1;
96 }
97 if batch.len() >= 100 {
98 store.blobs().delete(batch.clone()).await?;
99 batch.clear();
100 }
101 }
102 if !batch.is_empty() {
103 store.blobs().delete(batch).await?;
104 }
105 store.sync_db().await?;
106 co.yield_(GcSweepEvent::CustomDebug(format!("deleted {count} blobs")))
107 .await;
108 Ok(())
109}
110
111fn gc_mark<'a>(
112 store: &'a Store,
113 live: &'a mut HashSet<Hash>,
114) -> impl Stream<Item = GcMarkEvent> + 'a {
115 Gen::new(|co| async move {
116 if let Err(e) = gc_mark_task(store, live, &co).await {
117 co.yield_(GcMarkEvent::Error(e)).await;
118 }
119 })
120}
121
122fn gc_sweep<'a>(
123 store: &'a Store,
124 live: &'a HashSet<Hash>,
125) -> impl Stream<Item = GcSweepEvent> + 'a {
126 Gen::new(|co| async move {
127 if let Err(e) = gc_sweep_task(store, live, &co).await {
128 co.yield_(GcSweepEvent::Error(e)).await;
129 }
130 })
131}
132
133#[derive(derive_more::Debug, Clone)]
135pub struct GcConfig {
136 pub interval: std::time::Duration,
138 #[debug("ProtectCallback")]
149 pub add_protected: Option<ProtectCb>,
150}
151
152#[derive(Debug)]
156pub enum ProtectOutcome {
157 Continue,
159 Abort,
161}
162
163pub type ProtectCb = Arc<
167 dyn for<'a> Fn(
168 &'a mut HashSet<Hash>,
169 )
170 -> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
171 + Send
172 + Sync
173 + 'static,
174>;
175
176pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
177 debug!(externally_protected = live.len(), "gc: start");
178 {
179 store.clear_protected().await?;
180 let mut stream = gc_mark(store, live);
181 while let Some(ev) = stream.next().await {
182 match ev {
183 GcMarkEvent::CustomDebug(msg) => {
184 debug!("{}", msg);
185 }
186 GcMarkEvent::CustomWarning(msg, err) => {
187 warn!("{}: {:?}", msg, err);
188 }
189 GcMarkEvent::Error(err) => {
190 error!("error during gc mark: {:?}", err);
191 return Err(err);
192 }
193 }
194 }
195 }
196 debug!(total_protected = live.len(), "gc: sweep");
197 {
198 let mut stream = gc_sweep(store, live);
199 while let Some(ev) = stream.next().await {
200 match ev {
201 GcSweepEvent::CustomDebug(msg) => {
202 debug!("{}", msg);
203 }
204 GcSweepEvent::CustomWarning(msg, err) => {
205 warn!("{}: {:?}", msg, err);
206 }
207 GcSweepEvent::Error(err) => {
208 error!("error during gc sweep: {:?}", err);
209 return Err(err);
210 }
211 }
212 }
213 }
214 debug!("gc: done");
215
216 Ok(())
217}
218
219pub async fn run_gc(store: Store, config: GcConfig) {
220 debug!("gc enabled with interval {:?}", config.interval);
221 let mut live = HashSet::new();
222 loop {
223 live.clear();
224 tokio::time::sleep(config.interval).await;
225 if let Some(ref cb) = config.add_protected {
226 match (cb)(&mut live).await {
227 ProtectOutcome::Continue => {}
228 ProtectOutcome::Abort => {
229 info!("abort gc run: protect callback indicated abort");
230 continue;
231 }
232 }
233 }
234 if let Err(e) = gc_run_once(&store, &mut live).await {
235 error!("error during gc run: {e}");
236 break;
237 }
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use std::{
244 io::{self},
245 path::Path,
246 };
247
248 use bao_tree::{io::EncodeError, ChunkNum};
249 use range_collections::RangeSet2;
250 use testresult::TestResult;
251
252 use super::*;
253 use crate::{
254 api::{blobs::AddBytesOptions, ExportBaoError, RequestError, Store},
255 hashseq::HashSeq,
256 store::fs::{options::PathOptions, tests::create_n0_bao},
257 BlobFormat,
258 };
259
260 async fn gc_smoke(store: &Store) -> TestResult<()> {
261 let blobs = store.blobs();
262 let at = blobs.add_slice("a").temp_tag().await?;
263 let bt = blobs.add_slice("b").temp_tag().await?;
264 let ct = blobs.add_slice("c").temp_tag().await?;
265 let dt = blobs.add_slice("d").temp_tag().await?;
266 let et = blobs.add_slice("e").temp_tag().await?;
267 let ft = blobs.add_slice("f").temp_tag().await?;
268 let gt = blobs.add_slice("g").temp_tag().await?;
269 let a = *at.hash();
270 let b = *bt.hash();
271 let c = *ct.hash();
272 let d = *dt.hash();
273 let e = *et.hash();
274 let f = *ft.hash();
275 let g = *gt.hash();
276 store.tags().set("c", *ct.hash_and_format()).await?;
277 let dehs = [d, e].into_iter().collect::<HashSeq>();
278 let hehs = blobs
279 .add_bytes_with_opts(AddBytesOptions {
280 data: dehs.into(),
281 format: BlobFormat::HashSeq,
282 })
283 .await?;
284 let fghs = [f, g].into_iter().collect::<HashSeq>();
285 let fghs = blobs
286 .add_bytes_with_opts(AddBytesOptions {
287 data: fghs.into(),
288 format: BlobFormat::HashSeq,
289 })
290 .temp_tag()
291 .await?;
292 store.tags().set("fg", *fghs.hash_and_format()).await?;
293 drop(fghs);
294 drop(bt);
295 let mut live = HashSet::new();
296 gc_run_once(store, &mut live).await?;
297 assert!(live.contains(&a));
299 assert!(store.has(a).await?);
300 assert!(!live.contains(&b));
302 assert!(!store.has(b).await?);
303 assert!(live.contains(&c));
305 assert!(store.has(c).await?);
306 assert!(live.contains(&d));
308 assert!(store.has(d).await?);
309 assert!(live.contains(&e));
310 assert!(store.has(e).await?);
311 assert!(live.contains(&f));
313 assert!(store.has(f).await?);
314 assert!(live.contains(&g));
315 assert!(store.has(g).await?);
316 drop(at);
317 drop(hehs);
318 Ok(())
319 }
320
321 async fn gc_file_delete(path: &Path, store: &Store) -> TestResult<()> {
322 let mut live = HashSet::new();
323 let options = PathOptions::new(&path.join("db"));
324 {
326 let a = store
327 .blobs()
328 .add_slice(vec![0u8; 8000000])
329 .temp_tag()
330 .await?;
331 let ah = a.hash();
332 let data_path = options.data_path(ah);
333 let outboard_path = options.outboard_path(ah);
334 assert!(data_path.exists());
335 assert!(outboard_path.exists());
336 assert!(store.has(*ah).await?);
337 drop(a);
338 gc_run_once(store, &mut live).await?;
339 assert!(!data_path.exists());
340 assert!(!outboard_path.exists());
341 }
342 live.clear();
343 {
346 let data = vec![1u8; 8000000];
347 let ranges = ChunkRanges::from(..ChunkNum(19));
348 let (bh, b_bao) = create_n0_bao(&data, &ranges)?;
349 store.import_bao_bytes(bh, ranges, b_bao).await?;
350 let data_path = options.data_path(&bh);
351 let outboard_path = options.outboard_path(&bh);
352 let sizes_path = options.sizes_path(&bh);
353 let bitfield_path = options.bitfield_path(&bh);
354 assert!(data_path.exists());
355 assert!(outboard_path.exists());
356 assert!(sizes_path.exists());
357 assert!(bitfield_path.exists());
358 gc_run_once(store, &mut live).await?;
359 assert!(!data_path.exists());
360 assert!(!outboard_path.exists());
361 assert!(!sizes_path.exists());
362 assert!(!bitfield_path.exists());
363 }
364 Ok(())
365 }
366
367 #[tokio::test]
368 async fn gc_smoke_fs() -> TestResult {
369 tracing_subscriber::fmt::try_init().ok();
370 let testdir = tempfile::tempdir()?;
371 let db_path = testdir.path().join("db");
372 let store = crate::store::fs::FsStore::load(&db_path).await?;
373 gc_smoke(&store).await?;
374 gc_file_delete(testdir.path(), &store).await?;
375 Ok(())
376 }
377
378 #[tokio::test]
379 async fn gc_smoke_mem() -> TestResult {
380 tracing_subscriber::fmt::try_init().ok();
381 let store = crate::store::mem::MemStore::new();
382 gc_smoke(&store).await?;
383 Ok(())
384 }
385
386 #[tokio::test]
387 async fn gc_check_deletion_fs() -> TestResult {
388 tracing_subscriber::fmt::try_init().ok();
389 let testdir = tempfile::tempdir()?;
390 let db_path = testdir.path().join("db");
391 let store = crate::store::fs::FsStore::load(&db_path).await?;
392 gc_check_deletion(&store).await
393 }
394
395 #[tokio::test]
396 async fn gc_check_deletion_mem() -> TestResult {
397 tracing_subscriber::fmt::try_init().ok();
398 let store = crate::store::mem::MemStore::default();
399 gc_check_deletion(&store).await
400 }
401
402 async fn gc_check_deletion(store: &Store) -> TestResult {
403 let temp_tag = store.add_bytes(b"foo".to_vec()).temp_tag().await?;
404 let hash = *temp_tag.hash();
405 assert_eq!(store.get_bytes(hash).await?.as_ref(), b"foo");
406 drop(temp_tag);
407 let mut live = HashSet::new();
408 gc_run_once(store, &mut live).await?;
409
410 let res = store.get_bytes(hash).await;
412 assert!(res.is_err());
413 assert!(matches!(
414 res,
415 Err(ExportBaoError::ExportBaoInner {
416 source: EncodeError::Io(cause),
417 ..
418 }) if cause.kind() == io::ErrorKind::NotFound
419 ));
420
421 let res = store
423 .export_ranges(hash, RangeSet2::all())
424 .concatenate()
425 .await;
426 assert!(res.is_err());
427 assert!(matches!(
428 res,
429 Err(RequestError::Inner{
430 source: crate::api::Error::Io(cause),
431 ..
432 }) if cause.kind() == io::ErrorKind::NotFound
433 ));
434
435 let res = store
437 .export_bao(hash, ChunkRanges::all())
438 .bao_to_vec()
439 .await;
440 assert!(res.is_err());
441 println!("export_bao res {res:?}");
442 assert!(matches!(
443 res,
444 Err(RequestError::Inner{
445 source: crate::api::Error::Io(cause),
446 ..
447 }) if cause.kind() == io::ErrorKind::NotFound
448 ));
449
450 let target = tempfile::NamedTempFile::new()?;
452 let path = target.path();
453 let res = store.export(hash, path).await;
454 assert!(res.is_err());
455 assert!(matches!(
456 res,
457 Err(RequestError::Inner{
458 source: crate::api::Error::Io(cause),
459 ..
460 }) if cause.kind() == io::ErrorKind::NotFound
461 ));
462 Ok(())
463 }
464}