iroh_blobs/store/
traits.rs

1//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
2use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Duration};
3
4pub use bao_tree;
5use bao_tree::{
6    io::fsm::{BaoContentItem, Outboard},
7    BaoTree, ChunkRanges,
8};
9use bytes::Bytes;
10use futures_lite::{Stream, StreamExt};
11use genawaiter::rc::{Co, Gen};
12use iroh_io::AsyncSliceReader;
13pub use range_collections;
14use serde::{Deserialize, Serialize};
15use tokio::io::AsyncRead;
16
17use crate::{
18    hashseq::parse_hash_seq,
19    protocol::RangeSpec,
20    util::{
21        local_pool::{self, LocalPool},
22        progress::{BoxedProgressSender, IdGenerator, ProgressSender},
23        Tag,
24    },
25    BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
26};
27
28/// A fallible but owned iterator over the entries in a store.
29pub type DbIter<T> = Box<dyn Iterator<Item = io::Result<T>> + Send + Sync + 'static>;
30
31/// Export trogress callback
32pub type ExportProgressCb = Box<dyn Fn(u64) -> io::Result<()> + Send + Sync + 'static>;
33
34/// The availability status of an entry in a store.
35#[derive(Debug, Clone, Eq, PartialEq)]
36pub enum EntryStatus {
37    /// The entry is completely available.
38    Complete,
39    /// The entry is partially available.
40    Partial,
41    /// The entry is not in the store.
42    NotFound,
43}
44
45/// The size of a bao file
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)]
47pub enum BaoBlobSize {
48    /// A remote side told us the size, but we have insufficient data to verify it.
49    Unverified(u64),
50    /// We have verified the size.
51    Verified(u64),
52}
53
54impl BaoBlobSize {
55    /// Create a new `BaoFileSize` with the given size and verification status.
56    pub fn new(size: u64, verified: bool) -> Self {
57        if verified {
58            BaoBlobSize::Verified(size)
59        } else {
60            BaoBlobSize::Unverified(size)
61        }
62    }
63
64    /// Get just the value, no matter if it is verified or not.
65    pub fn value(&self) -> u64 {
66        match self {
67            BaoBlobSize::Unverified(size) => *size,
68            BaoBlobSize::Verified(size) => *size,
69        }
70    }
71}
72
73/// An entry for one hash in a bao map
74///
75/// The entry has the ability to provide you with an (outboard, data)
76/// reader pair. Creating the reader is async and may fail. The futures that
77/// create the readers must be `Send`, but the readers themselves don't have to
78/// be.
79pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
80    /// The hash of the entry.
81    fn hash(&self) -> Hash;
82    /// The size of the entry.
83    fn size(&self) -> BaoBlobSize;
84    /// Returns `true` if the entry is complete.
85    ///
86    /// Note that this does not actually verify if the bytes on disk are complete,
87    /// it only checks if the entry was marked as complete in the store.
88    fn is_complete(&self) -> bool;
89    /// A future that resolves to a reader that can be used to read the outboard
90    fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
91    /// A future that resolves to a reader that can be used to read the data
92    fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
93}
94
95/// A generic map from hashes to bao blobs (blobs with bao outboards).
96///
97/// This is the readonly view. To allow updates, a concrete implementation must
98/// also implement [`MapMut`].
99///
100/// Entries are *not* guaranteed to be complete for all implementations.
101/// They are also not guaranteed to be immutable, since this could be the
102/// readonly view of a mutable store.
103pub trait Map: Clone + Send + Sync + 'static {
104    /// The entry type. An entry is a cheaply cloneable handle that can be used
105    /// to open readers for both the data and the outboard
106    type Entry: MapEntry;
107    /// Get an entry for a hash.
108    ///
109    /// This can also be used for a membership test by just checking if there
110    /// is an entry. Creating an entry should be cheap, any expensive ops should
111    /// be deferred to the creation of the actual readers.
112    ///
113    /// It is not guaranteed that the entry is complete.
114    fn get(&self, hash: &Hash) -> impl Future<Output = io::Result<Option<Self::Entry>>> + Send;
115}
116
117/// A partial entry
118pub trait MapEntryMut: MapEntry {
119    /// Get a batch writer
120    fn batch_writer(&self) -> impl Future<Output = io::Result<impl BaoBatchWriter>> + Send;
121}
122
123/// An async batch interface for writing bao content items to a pair of data and
124/// outboard.
125///
126/// Details like the chunk group size and the actual storage location are left
127/// to the implementation.
128pub trait BaoBatchWriter {
129    /// Write a batch of bao content items to the underlying storage.
130    ///
131    /// The batch is guaranteed to be sorted as data is received from the network.
132    /// So leaves will be sorted by offset, and parents will be sorted by pre order
133    /// traversal offset. There is no guarantee that they will be consecutive
134    /// though.
135    ///
136    /// The size is the total size of the blob that the remote side told us.
137    /// It is not guaranteed to be correct, but it is guaranteed to be
138    /// consistent with all data in the batch. The size therefore represents
139    /// an upper bound on the maximum offset of all leaf items.
140    /// So it is guaranteed that `leaf.offset + leaf.size <= size` for all
141    /// leaf items in the batch.
142    ///
143    /// Batches should not become too large. Typically, a batch is just a few
144    /// parent nodes and a leaf.
145    ///
146    /// Batch is a vec so it can be moved into a task, which is unfortunately
147    /// necessary in typical io code.
148    fn write_batch(
149        &mut self,
150        size: u64,
151        batch: Vec<BaoContentItem>,
152    ) -> impl Future<Output = io::Result<()>>;
153
154    /// Sync the written data to permanent storage, if applicable.
155    /// E.g. for a file based implementation, this would call sync_data
156    /// on all files.
157    fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
158}
159
160/// Implement BaoBatchWriter for mutable references
161impl<W: BaoBatchWriter> BaoBatchWriter for &mut W {
162    async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
163        (**self).write_batch(size, batch).await
164    }
165
166    async fn sync(&mut self) -> io::Result<()> {
167        (**self).sync().await
168    }
169}
170
171/// A wrapper around a batch writer that calls a progress callback for one leaf
172/// per batch.
173#[derive(Debug)]
174pub(crate) struct FallibleProgressBatchWriter<W, F>(W, F);
175
176impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
177    FallibleProgressBatchWriter<W, F>
178{
179    /// Create a new `FallibleProgressBatchWriter` from an inner writer and a progress callback
180    ///
181    /// The `on_write` function is called for each write, with the `offset` as the first and the
182    /// length of the data as the second param. `on_write` must return an `io::Result`.
183    /// If `on_write` returns an error, the download is aborted.
184    pub fn new(inner: W, on_write: F) -> Self {
185        Self(inner, on_write)
186    }
187}
188
189impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static> BaoBatchWriter
190    for FallibleProgressBatchWriter<W, F>
191{
192    async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
193        // find the offset and length of the first (usually only) chunk
194        let chunk = batch
195            .iter()
196            .filter_map(|item| {
197                if let BaoContentItem::Leaf(leaf) = item {
198                    Some((leaf.offset, leaf.data.len()))
199                } else {
200                    None
201                }
202            })
203            .next();
204        self.0.write_batch(size, batch).await?;
205        // call the progress callback
206        if let Some((offset, len)) = chunk {
207            (self.1)(offset, len)?;
208        }
209        Ok(())
210    }
211
212    async fn sync(&mut self) -> io::Result<()> {
213        self.0.sync().await
214    }
215}
216
217/// A mutable bao map.
218///
219/// This extends the readonly [`Map`] trait with methods to create and modify entries.
220pub trait MapMut: Map {
221    /// An entry that is possibly writable
222    type EntryMut: MapEntryMut;
223
224    /// Get an existing entry as an EntryMut.
225    ///
226    /// For implementations where EntryMut and Entry are the same type, this is just an alias for
227    /// `get`.
228    fn get_mut(
229        &self,
230        hash: &Hash,
231    ) -> impl Future<Output = io::Result<Option<Self::EntryMut>>> + Send;
232
233    /// Get an existing partial entry, or create a new one.
234    ///
235    /// We need to know the size of the partial entry. This might produce an
236    /// error e.g. if there is not enough space on disk.
237    fn get_or_create(
238        &self,
239        hash: Hash,
240        size: u64,
241    ) -> impl Future<Output = io::Result<Self::EntryMut>> + Send;
242
243    /// Find out if the data behind a `hash` is complete, partial, or not present.
244    ///
245    /// Note that this does not actually verify the on-disc data, but only checks in which section
246    /// of the store the entry is present.
247    fn entry_status(&self, hash: &Hash) -> impl Future<Output = io::Result<EntryStatus>> + Send;
248
249    /// Sync version of `entry_status`, for the doc sync engine until we can get rid of it.
250    ///
251    /// Don't count on this to be efficient.
252    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus>;
253
254    /// Upgrade a partial entry to a complete entry.
255    fn insert_complete(&self, entry: Self::EntryMut)
256        -> impl Future<Output = io::Result<()>> + Send;
257}
258
259/// Extension of [`Map`] to add misc methods used by the rpc calls.
260pub trait ReadableStore: Map {
261    /// list all blobs in the database. This includes both raw blobs that have
262    /// been imported, and hash sequences that have been created internally.
263    fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
264    /// list all tags (collections or other explicitly added things) in the database
265    fn tags(&self) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
266
267    /// Temp tags
268    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
269
270    /// Perform a consistency check on the database
271    fn consistency_check(
272        &self,
273        repair: bool,
274        tx: BoxedProgressSender<ConsistencyCheckProgress>,
275    ) -> impl Future<Output = io::Result<()>> + Send;
276
277    /// list partial blobs in the database
278    fn partial_blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
279
280    /// This trait method extracts a file to a local path.
281    ///
282    /// `hash` is the hash of the file
283    /// `target` is the path to the target file
284    /// `mode` is a hint how the file should be exported.
285    /// `progress` is a callback that is called with the total number of bytes that have been written
286    fn export(
287        &self,
288        hash: Hash,
289        target: PathBuf,
290        mode: ExportMode,
291        progress: ExportProgressCb,
292    ) -> impl Future<Output = io::Result<()>> + Send;
293}
294
295/// The mutable part of a Bao store.
296pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
297    /// This trait method imports a file from a local path.
298    ///
299    /// `data` is the path to the file.
300    /// `mode` is a hint how the file should be imported.
301    /// `progress` is a sender that provides a way for the importer to send progress messages
302    /// when importing large files. This also serves as a way to cancel the import. If the
303    /// consumer of the progress messages is dropped, subsequent attempts to send progress
304    /// will fail.
305    ///
306    /// Returns the hash of the imported file. The reason to have this method is that some database
307    /// implementations might be able to import a file without copying it.
308    fn import_file(
309        &self,
310        data: PathBuf,
311        mode: ImportMode,
312        format: BlobFormat,
313        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
314    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
315
316    /// Import data from memory.
317    ///
318    /// It is a special case of `import` that does not use the file system.
319    fn import_bytes(
320        &self,
321        bytes: Bytes,
322        format: BlobFormat,
323    ) -> impl Future<Output = io::Result<TempTag>> + Send;
324
325    /// Import data from a stream of bytes.
326    fn import_stream(
327        &self,
328        data: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
329        format: BlobFormat,
330        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
331    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
332
333    /// Import data from an async byte reader.
334    fn import_reader(
335        &self,
336        data: impl AsyncRead + Send + Unpin + 'static,
337        format: BlobFormat,
338        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
339    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send {
340        let stream = tokio_util::io::ReaderStream::new(data);
341        self.import_stream(stream, format, progress)
342    }
343
344    /// Set a tag
345    fn set_tag(
346        &self,
347        name: Tag,
348        hash: Option<HashAndFormat>,
349    ) -> impl Future<Output = io::Result<()>> + Send;
350
351    /// Create a new tag
352    fn create_tag(&self, hash: HashAndFormat) -> impl Future<Output = io::Result<Tag>> + Send;
353
354    /// Create a temporary pin for this store
355    fn temp_tag(&self, value: HashAndFormat) -> TempTag;
356
357    /// Start the GC loop
358    ///
359    /// The gc task will shut down, when dropping the returned future.
360    fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G) -> impl Future<Output = ()>
361    where
362        G: Fn() -> Gut,
363        Gut: Future<Output = BTreeSet<Hash>> + Send;
364
365    /// physically delete the given hashes from the store.
366    fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;
367
368    /// Shutdown the store.
369    fn shutdown(&self) -> impl Future<Output = ()> + Send;
370
371    /// Sync the store.
372    fn sync(&self) -> impl Future<Output = io::Result<()>> + Send;
373
374    /// Validate the database
375    ///
376    /// This will check that the file and outboard content is correct for all complete
377    /// entries, and output valid ranges for all partial entries.
378    ///
379    /// It will not check the internal consistency of the database.
380    fn validate(
381        &self,
382        repair: bool,
383        tx: BoxedProgressSender<ValidateProgress>,
384    ) -> impl Future<Output = io::Result<()>> + Send {
385        validate_impl(self, repair, tx)
386    }
387}
388
389async fn validate_impl(
390    store: &impl Store,
391    repair: bool,
392    tx: BoxedProgressSender<ValidateProgress>,
393) -> io::Result<()> {
394    use futures_buffered::BufferedStreamExt;
395
396    let validate_parallelism: usize = num_cpus::get();
397    let lp = LocalPool::new(local_pool::Config {
398        threads: validate_parallelism,
399        ..Default::default()
400    });
401    let complete = store.blobs().await?.collect::<io::Result<Vec<_>>>()?;
402    let partial = store
403        .partial_blobs()
404        .await?
405        .collect::<io::Result<Vec<_>>>()?;
406    tx.send(ValidateProgress::Starting {
407        total: complete.len() as u64,
408    })
409    .await?;
410    let complete_result = futures_lite::stream::iter(complete)
411        .map(|hash| {
412            let store = store.clone();
413            let tx = tx.clone();
414            lp.spawn(move || async move {
415                let entry = store
416                    .get(&hash)
417                    .await?
418                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
419                let size = entry.size().value();
420                let outboard = entry.outboard().await?;
421                let data = entry.data_reader().await?;
422                let chunk_ranges = ChunkRanges::all();
423                let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
424                let id = tx.new_id();
425                tx.send(ValidateProgress::Entry {
426                    id,
427                    hash,
428                    path: None,
429                    size,
430                })
431                .await?;
432                let mut actual_chunk_ranges = ChunkRanges::empty();
433                while let Some(item) = ranges.next().await {
434                    let item = item?;
435                    let offset = item.start.to_bytes();
436                    actual_chunk_ranges |= ChunkRanges::from(item);
437                    tx.try_send(ValidateProgress::EntryProgress { id, offset })?;
438                }
439                let expected_chunk_range =
440                    ChunkRanges::from(..BaoTree::new(size, IROH_BLOCK_SIZE).chunks());
441                let incomplete = actual_chunk_ranges == expected_chunk_range;
442                let error = if incomplete {
443                    None
444                } else {
445                    Some(format!(
446                        "expected chunk ranges {:?}, got chunk ranges {:?}",
447                        expected_chunk_range, actual_chunk_ranges
448                    ))
449                };
450                tx.send(ValidateProgress::EntryDone { id, error }).await?;
451                drop(ranges);
452                drop(entry);
453                io::Result::Ok((hash, incomplete))
454            })
455        })
456        .buffered_unordered(validate_parallelism)
457        .collect::<Vec<_>>()
458        .await;
459    let partial_result = futures_lite::stream::iter(partial)
460        .map(|hash| {
461            let store = store.clone();
462            let tx = tx.clone();
463            lp.spawn(move || async move {
464                let entry = store
465                    .get(&hash)
466                    .await?
467                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
468                let size = entry.size().value();
469                let outboard = entry.outboard().await?;
470                let data = entry.data_reader().await?;
471                let chunk_ranges = ChunkRanges::all();
472                let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
473                let id = tx.new_id();
474                tx.send(ValidateProgress::PartialEntry {
475                    id,
476                    hash,
477                    path: None,
478                    size,
479                })
480                .await?;
481                let mut actual_chunk_ranges = ChunkRanges::empty();
482                while let Some(item) = ranges.next().await {
483                    let item = item?;
484                    let offset = item.start.to_bytes();
485                    actual_chunk_ranges |= ChunkRanges::from(item);
486                    tx.try_send(ValidateProgress::PartialEntryProgress { id, offset })?;
487                }
488                tx.send(ValidateProgress::PartialEntryDone {
489                    id,
490                    ranges: RangeSpec::new(&actual_chunk_ranges),
491                })
492                .await?;
493                drop(ranges);
494                drop(entry);
495                io::Result::Ok(())
496            })
497        })
498        .buffered_unordered(validate_parallelism)
499        .collect::<Vec<_>>()
500        .await;
501    let mut to_downgrade = Vec::new();
502    for item in complete_result {
503        let (hash, incomplete) = item??;
504        if incomplete {
505            to_downgrade.push(hash);
506        }
507    }
508    for item in partial_result {
509        item??;
510    }
511    if repair {
512        return Err(io::Error::new(
513            io::ErrorKind::Other,
514            "repair not implemented",
515        ));
516    }
517    Ok(())
518}
519
520/// Configuration for the GC mark and sweep.
521#[derive(derive_more::Debug)]
522pub struct GcConfig {
523    /// The period at which to execute the GC.
524    pub period: Duration,
525    /// An optional callback called every time a GC round finishes.
526    #[debug("done_callback")]
527    pub done_callback: Option<Box<dyn Fn() + Send>>,
528}
529
530/// Implementation of the gc loop.
531pub(super) async fn gc_run_loop<S, F, Fut, G, Gut>(
532    store: &S,
533    config: GcConfig,
534    start_cb: F,
535    protected_cb: G,
536) where
537    S: Store,
538    F: Fn() -> Fut,
539    Fut: Future<Output = io::Result<()>> + Send,
540    G: Fn() -> Gut,
541    Gut: Future<Output = BTreeSet<Hash>> + Send,
542{
543    tracing::info!("Starting GC task with interval {:?}", config.period);
544    let mut live = BTreeSet::new();
545    'outer: loop {
546        if let Err(cause) = start_cb().await {
547            tracing::debug!("unable to notify the db of GC start: {cause}. Shutting down GC loop.");
548            break;
549        }
550        // do delay before the two phases of GC
551        tokio::time::sleep(config.period).await;
552        tracing::debug!("Starting GC");
553        live.clear();
554
555        let p = protected_cb().await;
556        live.extend(p);
557
558        tracing::debug!("Starting GC mark phase");
559        let live_ref = &mut live;
560        let mut stream = Gen::new(|co| async move {
561            if let Err(e) = gc_mark_task(store, live_ref, &co).await {
562                co.yield_(GcMarkEvent::Error(e)).await;
563            }
564        });
565        while let Some(item) = stream.next().await {
566            match item {
567                GcMarkEvent::CustomDebug(text) => {
568                    tracing::debug!("{}", text);
569                }
570                GcMarkEvent::CustomWarning(text, _) => {
571                    tracing::warn!("{}", text);
572                }
573                GcMarkEvent::Error(err) => {
574                    tracing::error!("Fatal error during GC mark {}", err);
575                    continue 'outer;
576                }
577            }
578        }
579        drop(stream);
580
581        tracing::debug!("Starting GC sweep phase");
582        let live_ref = &live;
583        let mut stream = Gen::new(|co| async move {
584            if let Err(e) = gc_sweep_task(store, live_ref, &co).await {
585                co.yield_(GcSweepEvent::Error(e)).await;
586            }
587        });
588        while let Some(item) = stream.next().await {
589            match item {
590                GcSweepEvent::CustomDebug(text) => {
591                    tracing::debug!("{}", text);
592                }
593                GcSweepEvent::CustomWarning(text, _) => {
594                    tracing::warn!("{}", text);
595                }
596                GcSweepEvent::Error(err) => {
597                    tracing::error!("Fatal error during GC mark {}", err);
598                    continue 'outer;
599                }
600            }
601        }
602        if let Some(ref cb) = config.done_callback {
603            cb();
604        }
605    }
606}
607
608/// Implementation of the gc method.
609pub(super) async fn gc_mark_task<'a>(
610    store: &'a impl Store,
611    live: &'a mut BTreeSet<Hash>,
612    co: &Co<GcMarkEvent>,
613) -> anyhow::Result<()> {
614    macro_rules! debug {
615        ($($arg:tt)*) => {
616            co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
617        };
618    }
619    macro_rules! warn {
620        ($($arg:tt)*) => {
621            co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
622        };
623    }
624    let mut roots = BTreeSet::new();
625    debug!("traversing tags");
626    for item in store.tags().await? {
627        let (name, haf) = item?;
628        debug!("adding root {:?} {:?}", name, haf);
629        roots.insert(haf);
630    }
631    debug!("traversing temp roots");
632    for haf in store.temp_tags() {
633        debug!("adding temp pin {:?}", haf);
634        roots.insert(haf);
635    }
636    for HashAndFormat { hash, format } in roots {
637        // we need to do this for all formats except raw
638        if live.insert(hash) && !format.is_raw() {
639            let Some(entry) = store.get(&hash).await? else {
640                warn!("gc: {} not found", hash);
641                continue;
642            };
643            if !entry.is_complete() {
644                warn!("gc: {} is partial", hash);
645                continue;
646            }
647            let Ok(reader) = entry.data_reader().await else {
648                warn!("gc: {} creating data reader failed", hash);
649                continue;
650            };
651            let Ok((mut stream, count)) = parse_hash_seq(reader).await else {
652                warn!("gc: {} parse failed", hash);
653                continue;
654            };
655            debug!("parsed collection {} {:?}", hash, count);
656            loop {
657                let item = match stream.next().await {
658                    Ok(Some(item)) => item,
659                    Ok(None) => break,
660                    Err(_err) => {
661                        warn!("gc: {} parse failed", hash);
662                        break;
663                    }
664                };
665                // if format != raw we would have to recurse here by adding this to current
666                live.insert(item);
667            }
668        }
669    }
670    debug!("gc mark done. found {} live blobs", live.len());
671    Ok(())
672}
673
674async fn gc_sweep_task(
675    store: &impl Store,
676    live: &BTreeSet<Hash>,
677    co: &Co<GcSweepEvent>,
678) -> anyhow::Result<()> {
679    let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
680    let mut count = 0;
681    let mut batch = Vec::new();
682    for hash in blobs {
683        let hash = hash?;
684        if !live.contains(&hash) {
685            batch.push(hash);
686            count += 1;
687        }
688        if batch.len() >= 100 {
689            store.delete(batch.clone()).await?;
690            batch.clear();
691        }
692    }
693    if !batch.is_empty() {
694        store.delete(batch).await?;
695    }
696    co.yield_(GcSweepEvent::CustomDebug(format!(
697        "deleted {} blobs",
698        count
699    )))
700    .await;
701    Ok(())
702}
703
704/// An event related to GC
705#[derive(Debug)]
706pub enum GcMarkEvent {
707    /// A custom event (info)
708    CustomDebug(String),
709    /// A custom non critical error
710    CustomWarning(String, Option<anyhow::Error>),
711    /// An unrecoverable error during GC
712    Error(anyhow::Error),
713}
714
715/// An event related to GC
716#[derive(Debug)]
717pub enum GcSweepEvent {
718    /// A custom event (debug)
719    CustomDebug(String),
720    /// A custom non critical error
721    CustomWarning(String, Option<anyhow::Error>),
722    /// An unrecoverable error during GC
723    Error(anyhow::Error),
724}
725
726/// Progress messages for an import operation
727///
728/// An import operation involves computing the outboard of a file, and then
729/// either copying or moving the file into the database.
730#[allow(missing_docs)]
731#[derive(Debug)]
732pub enum ImportProgress {
733    /// Found a path
734    ///
735    /// This will be the first message for an id
736    Found { id: u64, name: String },
737    /// Progress when copying the file to the store
738    ///
739    /// This will be omitted if the store can use the file in place
740    ///
741    /// There will be multiple of these messages for an id
742    CopyProgress { id: u64, offset: u64 },
743    /// Determined the size
744    ///
745    /// This will come after `Found` and zero or more `CopyProgress` messages.
746    /// For unstable files, determining the size will only be done once the file
747    /// is fully copied.
748    Size { id: u64, size: u64 },
749    /// Progress when computing the outboard
750    ///
751    /// There will be multiple of these messages for an id
752    OutboardProgress { id: u64, offset: u64 },
753    /// Done computing the outboard
754    ///
755    /// This comes after `Size` and zero or more `OutboardProgress` messages
756    OutboardDone { id: u64, hash: Hash },
757}
758
759/// The import mode describes how files will be imported.
760///
761/// This is a hint to the import trait method. For some implementations, this
762/// does not make any sense. E.g. an in memory implementation will always have
763/// to copy the file into memory. Also, a disk based implementation might choose
764/// to copy small files even if the mode is `Reference`.
765#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
766pub enum ImportMode {
767    /// This mode will copy the file into the database before hashing.
768    ///
769    /// This is the safe default because the file can not be accidentally modified
770    /// after it has been imported.
771    #[default]
772    Copy,
773    /// This mode will try to reference the file in place and assume it is unchanged after import.
774    ///
775    /// This has a large performance and storage benefit, but it is less safe since
776    /// the file might be modified after it has been imported.
777    ///
778    /// Stores are allowed to ignore this mode and always copy the file, e.g.
779    /// if the file is very small or if the store does not support referencing files.
780    TryReference,
781}
782/// The import mode describes how files will be imported.
783///
784/// This is a hint to the import trait method. For some implementations, this
785/// does not make any sense. E.g. an in memory implementation will always have
786/// to copy the file into memory. Also, a disk based implementation might choose
787/// to copy small files even if the mode is `Reference`.
788#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
789pub enum ExportMode {
790    /// This mode will copy the file to the target directory.
791    ///
792    /// This is the safe default because the file can not be accidentally modified
793    /// after it has been exported.
794    #[default]
795    Copy,
796    /// This mode will try to move the file to the target directory and then reference it from
797    /// the database.
798    ///
799    /// This has a large performance and storage benefit, but it is less safe since
800    /// the file might be modified in the target directory after it has been exported.
801    ///
802    /// Stores are allowed to ignore this mode and always copy the file, e.g.
803    /// if the file is very small or if the store does not support referencing files.
804    TryReference,
805}
806
807/// The expected format of a hash being exported.
808#[derive(Debug, Clone, Serialize, Deserialize, Default)]
809pub enum ExportFormat {
810    /// The hash refers to any blob and will be exported to a single file.
811    #[default]
812    Blob,
813    /// The hash refers to a [`crate::format::collection::Collection`] blob
814    /// and all children of the collection shall be exported to one file per child.
815    ///
816    /// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
817    /// collection metadata, all other children of the collection will be exported to
818    /// a file each, with their collection name treated as a relative path to the export
819    /// destination path.
820    ///
821    /// If the blob cannot be parsed as a collection, the operation will fail.
822    Collection,
823}
824
825#[allow(missing_docs)]
826#[derive(Debug)]
827pub enum ExportProgress {
828    /// Starting to export to a file
829    ///
830    /// This will be the first message for an id
831    Start {
832        id: u64,
833        hash: Hash,
834        path: PathBuf,
835        stable: bool,
836    },
837    /// Progress when copying the file to the target
838    ///
839    /// This will be omitted if the store can move the file or use copy on write
840    ///
841    /// There will be multiple of these messages for an id
842    Progress { id: u64, offset: u64 },
843    /// Done exporting
844    Done { id: u64 },
845}
846
847/// Level for generic validation messages
848#[derive(
849    Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,
850)]
851pub enum ReportLevel {
852    /// Very unimportant info messages
853    Trace,
854    /// Info messages
855    Info,
856    /// Warnings, something is not quite right
857    Warn,
858    /// Errors, something is very wrong
859    Error,
860}
861
862/// Progress updates for the validate operation
863#[derive(Debug, Serialize, Deserialize)]
864pub enum ConsistencyCheckProgress {
865    /// Consistency check started
866    Start,
867    /// Consistency check update
868    Update {
869        /// The message
870        message: String,
871        /// The entry this message is about, if any
872        entry: Option<Hash>,
873        /// The level of the message
874        level: ReportLevel,
875    },
876    /// Consistency check ended
877    Done,
878    /// We got an error and need to abort.
879    Abort(serde_error::Error),
880}
881
882/// Progress updates for the validate operation
883#[derive(Debug, Serialize, Deserialize)]
884pub enum ValidateProgress {
885    /// started validating
886    Starting {
887        /// The total number of entries to validate
888        total: u64,
889    },
890    /// We started validating a complete entry
891    Entry {
892        /// a new unique id for this entry
893        id: u64,
894        /// the hash of the entry
895        hash: Hash,
896        /// location of the entry.
897        ///
898        /// In case of a file, this is the path to the file.
899        /// Otherwise it might be an url or something else to uniquely identify the entry.
900        path: Option<String>,
901        /// The size of the entry, in bytes.
902        size: u64,
903    },
904    /// We got progress ingesting item `id`.
905    EntryProgress {
906        /// The unique id of the entry.
907        id: u64,
908        /// The offset of the progress, in bytes.
909        offset: u64,
910    },
911    /// We are done with `id`
912    EntryDone {
913        /// The unique id of the entry.
914        id: u64,
915        /// An error if we failed to validate the entry.
916        error: Option<String>,
917    },
918    /// We started validating an entry
919    PartialEntry {
920        /// a new unique id for this entry
921        id: u64,
922        /// the hash of the entry
923        hash: Hash,
924        /// location of the entry.
925        ///
926        /// In case of a file, this is the path to the file.
927        /// Otherwise it might be an url or something else to uniquely identify the entry.
928        path: Option<String>,
929        /// The best known size of the entry, in bytes.
930        size: u64,
931    },
932    /// We got progress ingesting item `id`.
933    PartialEntryProgress {
934        /// The unique id of the entry.
935        id: u64,
936        /// The offset of the progress, in bytes.
937        offset: u64,
938    },
939    /// We are done with `id`
940    PartialEntryDone {
941        /// The unique id of the entry.
942        id: u64,
943        /// Available ranges.
944        ranges: RangeSpec,
945    },
946    /// We are done with the whole operation.
947    AllDone,
948    /// We got an error and need to abort.
949    Abort(serde_error::Error),
950}
951
952/// Database events
953#[derive(Debug, Clone, PartialEq, Eq)]
954pub enum Event {
955    /// A GC was started
956    GcStarted,
957    /// A GC was completed
958    GcCompleted,
959}