iroh_blobs/store/
traits.rs

1//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
2use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Duration};
3
4pub use bao_tree;
5use bao_tree::{
6    io::fsm::{BaoContentItem, Outboard},
7    BaoTree, ChunkRanges,
8};
9use bytes::Bytes;
10use futures_lite::{Stream, StreamExt};
11use genawaiter::rc::{Co, Gen};
12use iroh_io::AsyncSliceReader;
13pub use range_collections;
14use serde::{Deserialize, Serialize};
15use tokio::io::AsyncRead;
16
17use crate::{
18    hashseq::parse_hash_seq,
19    protocol::RangeSpec,
20    util::{
21        local_pool::{self, LocalPool},
22        progress::{BoxedProgressSender, IdGenerator, ProgressSender},
23        Tag,
24    },
25    BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
26};
27
28/// A fallible but owned iterator over the entries in a store.
29pub type DbIter<T> = Box<dyn Iterator<Item = io::Result<T>> + Send + Sync + 'static>;
30
31/// Export trogress callback
32pub type ExportProgressCb = Box<dyn Fn(u64) -> io::Result<()> + Send + Sync + 'static>;
33
34/// The availability status of an entry in a store.
35#[derive(Debug, Clone, Eq, PartialEq)]
36pub enum EntryStatus {
37    /// The entry is completely available.
38    Complete,
39    /// The entry is partially available.
40    Partial,
41    /// The entry is not in the store.
42    NotFound,
43}
44
45/// The size of a bao file
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)]
47pub enum BaoBlobSize {
48    /// A remote side told us the size, but we have insufficient data to verify it.
49    Unverified(u64),
50    /// We have verified the size.
51    Verified(u64),
52}
53
54impl BaoBlobSize {
55    /// Create a new `BaoFileSize` with the given size and verification status.
56    pub fn new(size: u64, verified: bool) -> Self {
57        if verified {
58            BaoBlobSize::Verified(size)
59        } else {
60            BaoBlobSize::Unverified(size)
61        }
62    }
63
64    /// Get just the value, no matter if it is verified or not.
65    pub fn value(&self) -> u64 {
66        match self {
67            BaoBlobSize::Unverified(size) => *size,
68            BaoBlobSize::Verified(size) => *size,
69        }
70    }
71}
72
73/// An entry for one hash in a bao map
74///
75/// The entry has the ability to provide you with an (outboard, data)
76/// reader pair. Creating the reader is async and may fail. The futures that
77/// create the readers must be `Send`, but the readers themselves don't have to
78/// be.
79pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
80    /// The hash of the entry.
81    fn hash(&self) -> Hash;
82    /// The size of the entry.
83    fn size(&self) -> BaoBlobSize;
84    /// Returns `true` if the entry is complete.
85    ///
86    /// Note that this does not actually verify if the bytes on disk are complete,
87    /// it only checks if the entry was marked as complete in the store.
88    fn is_complete(&self) -> bool;
89    /// A future that resolves to a reader that can be used to read the outboard
90    fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
91    /// A future that resolves to a reader that can be used to read the data
92    fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
93}
94
95/// A generic map from hashes to bao blobs (blobs with bao outboards).
96///
97/// This is the readonly view. To allow updates, a concrete implementation must
98/// also implement [`MapMut`].
99///
100/// Entries are *not* guaranteed to be complete for all implementations.
101/// They are also not guaranteed to be immutable, since this could be the
102/// readonly view of a mutable store.
103pub trait Map: Clone + Send + Sync + 'static {
104    /// The entry type. An entry is a cheaply cloneable handle that can be used
105    /// to open readers for both the data and the outboard
106    type Entry: MapEntry;
107    /// Get an entry for a hash.
108    ///
109    /// This can also be used for a membership test by just checking if there
110    /// is an entry. Creating an entry should be cheap, any expensive ops should
111    /// be deferred to the creation of the actual readers.
112    ///
113    /// It is not guaranteed that the entry is complete.
114    fn get(&self, hash: &Hash) -> impl Future<Output = io::Result<Option<Self::Entry>>> + Send;
115}
116
117/// A partial entry
118pub trait MapEntryMut: MapEntry {
119    /// Get a batch writer
120    fn batch_writer(&self) -> impl Future<Output = io::Result<impl BaoBatchWriter>> + Send;
121}
122
123/// An async batch interface for writing bao content items to a pair of data and
124/// outboard.
125///
126/// Details like the chunk group size and the actual storage location are left
127/// to the implementation.
128pub trait BaoBatchWriter {
129    /// Write a batch of bao content items to the underlying storage.
130    ///
131    /// The batch is guaranteed to be sorted as data is received from the network.
132    /// So leaves will be sorted by offset, and parents will be sorted by pre order
133    /// traversal offset. There is no guarantee that they will be consecutive
134    /// though.
135    ///
136    /// The size is the total size of the blob that the remote side told us.
137    /// It is not guaranteed to be correct, but it is guaranteed to be
138    /// consistent with all data in the batch. The size therefore represents
139    /// an upper bound on the maximum offset of all leaf items.
140    /// So it is guaranteed that `leaf.offset + leaf.size <= size` for all
141    /// leaf items in the batch.
142    ///
143    /// Batches should not become too large. Typically, a batch is just a few
144    /// parent nodes and a leaf.
145    ///
146    /// Batch is a vec so it can be moved into a task, which is unfortunately
147    /// necessary in typical io code.
148    fn write_batch(
149        &mut self,
150        size: u64,
151        batch: Vec<BaoContentItem>,
152    ) -> impl Future<Output = io::Result<()>>;
153
154    /// Sync the written data to permanent storage, if applicable.
155    /// E.g. for a file based implementation, this would call sync_data
156    /// on all files.
157    fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
158}
159
160/// Implement BaoBatchWriter for mutable references
161impl<W: BaoBatchWriter> BaoBatchWriter for &mut W {
162    async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
163        (**self).write_batch(size, batch).await
164    }
165
166    async fn sync(&mut self) -> io::Result<()> {
167        (**self).sync().await
168    }
169}
170
171/// A wrapper around a batch writer that calls a progress callback for one leaf
172/// per batch.
173#[derive(Debug)]
174pub(crate) struct FallibleProgressBatchWriter<W, F>(W, F);
175
176impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
177    FallibleProgressBatchWriter<W, F>
178{
179    /// Create a new `FallibleProgressBatchWriter` from an inner writer and a progress callback
180    ///
181    /// The `on_write` function is called for each write, with the `offset` as the first and the
182    /// length of the data as the second param. `on_write` must return an `io::Result`.
183    /// If `on_write` returns an error, the download is aborted.
184    pub fn new(inner: W, on_write: F) -> Self {
185        Self(inner, on_write)
186    }
187}
188
189impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static> BaoBatchWriter
190    for FallibleProgressBatchWriter<W, F>
191{
192    async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
193        // find the offset and length of the first (usually only) chunk
194        let chunk = batch
195            .iter()
196            .filter_map(|item| {
197                if let BaoContentItem::Leaf(leaf) = item {
198                    Some((leaf.offset, leaf.data.len()))
199                } else {
200                    None
201                }
202            })
203            .next();
204        self.0.write_batch(size, batch).await?;
205        // call the progress callback
206        if let Some((offset, len)) = chunk {
207            (self.1)(offset, len)?;
208        }
209        Ok(())
210    }
211
212    async fn sync(&mut self) -> io::Result<()> {
213        self.0.sync().await
214    }
215}
216
217/// A mutable bao map.
218///
219/// This extends the readonly [`Map`] trait with methods to create and modify entries.
220pub trait MapMut: Map {
221    /// An entry that is possibly writable
222    type EntryMut: MapEntryMut;
223
224    /// Get an existing entry as an EntryMut.
225    ///
226    /// For implementations where EntryMut and Entry are the same type, this is just an alias for
227    /// `get`.
228    fn get_mut(
229        &self,
230        hash: &Hash,
231    ) -> impl Future<Output = io::Result<Option<Self::EntryMut>>> + Send;
232
233    /// Get an existing partial entry, or create a new one.
234    ///
235    /// We need to know the size of the partial entry. This might produce an
236    /// error e.g. if there is not enough space on disk.
237    fn get_or_create(
238        &self,
239        hash: Hash,
240        size: u64,
241    ) -> impl Future<Output = io::Result<Self::EntryMut>> + Send;
242
243    /// Find out if the data behind a `hash` is complete, partial, or not present.
244    ///
245    /// Note that this does not actually verify the on-disc data, but only checks in which section
246    /// of the store the entry is present.
247    fn entry_status(&self, hash: &Hash) -> impl Future<Output = io::Result<EntryStatus>> + Send;
248
249    /// Sync version of `entry_status`, for the doc sync engine until we can get rid of it.
250    ///
251    /// Don't count on this to be efficient.
252    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus>;
253
254    /// Upgrade a partial entry to a complete entry.
255    fn insert_complete(&self, entry: Self::EntryMut)
256        -> impl Future<Output = io::Result<()>> + Send;
257}
258
259/// Extension of [`Map`] to add misc methods used by the rpc calls.
260pub trait ReadableStore: Map {
261    /// list all blobs in the database. This includes both raw blobs that have
262    /// been imported, and hash sequences that have been created internally.
263    fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
264    /// list all tags (collections or other explicitly added things) in the database
265    fn tags(
266        &self,
267        from: Option<Tag>,
268        to: Option<Tag>,
269    ) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
270
271    /// Temp tags
272    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
273
274    /// Perform a consistency check on the database
275    fn consistency_check(
276        &self,
277        repair: bool,
278        tx: BoxedProgressSender<ConsistencyCheckProgress>,
279    ) -> impl Future<Output = io::Result<()>> + Send;
280
281    /// list partial blobs in the database
282    fn partial_blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
283
284    /// This trait method extracts a file to a local path.
285    ///
286    /// `hash` is the hash of the file
287    /// `target` is the path to the target file
288    /// `mode` is a hint how the file should be exported.
289    /// `progress` is a callback that is called with the total number of bytes that have been written
290    fn export(
291        &self,
292        hash: Hash,
293        target: PathBuf,
294        mode: ExportMode,
295        progress: ExportProgressCb,
296    ) -> impl Future<Output = io::Result<()>> + Send;
297}
298
299/// The mutable part of a Bao store.
300pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
301    /// This trait method imports a file from a local path.
302    ///
303    /// `data` is the path to the file.
304    /// `mode` is a hint how the file should be imported.
305    /// `progress` is a sender that provides a way for the importer to send progress messages
306    /// when importing large files. This also serves as a way to cancel the import. If the
307    /// consumer of the progress messages is dropped, subsequent attempts to send progress
308    /// will fail.
309    ///
310    /// Returns the hash of the imported file. The reason to have this method is that some database
311    /// implementations might be able to import a file without copying it.
312    fn import_file(
313        &self,
314        data: PathBuf,
315        mode: ImportMode,
316        format: BlobFormat,
317        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
318    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
319
320    /// Import data from memory.
321    ///
322    /// It is a special case of `import` that does not use the file system.
323    fn import_bytes(
324        &self,
325        bytes: Bytes,
326        format: BlobFormat,
327    ) -> impl Future<Output = io::Result<TempTag>> + Send;
328
329    /// Import data from a stream of bytes.
330    fn import_stream(
331        &self,
332        data: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
333        format: BlobFormat,
334        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
335    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
336
337    /// Import data from an async byte reader.
338    fn import_reader(
339        &self,
340        data: impl AsyncRead + Send + Unpin + 'static,
341        format: BlobFormat,
342        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
343    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send {
344        let stream = tokio_util::io::ReaderStream::new(data);
345        self.import_stream(stream, format, progress)
346    }
347
348    /// Set a tag
349    fn set_tag(
350        &self,
351        name: Tag,
352        hash: HashAndFormat,
353    ) -> impl Future<Output = io::Result<()>> + Send;
354
355    /// Rename a tag
356    fn rename_tag(&self, from: Tag, to: Tag) -> impl Future<Output = io::Result<()>> + Send;
357
358    /// Delete a single tag
359    fn delete_tag(&self, name: Tag) -> impl Future<Output = io::Result<()>> + Send {
360        self.delete_tags(Some(name.clone()), Some(name.successor()))
361    }
362
363    /// Bulk delete tags
364    fn delete_tags(
365        &self,
366        from: Option<Tag>,
367        to: Option<Tag>,
368    ) -> impl Future<Output = io::Result<()>> + Send;
369
370    /// Create a new tag
371    fn create_tag(&self, hash: HashAndFormat) -> impl Future<Output = io::Result<Tag>> + Send;
372
373    /// Create a temporary pin for this store
374    fn temp_tag(&self, value: HashAndFormat) -> TempTag;
375
376    /// Start the GC loop
377    ///
378    /// The gc task will shut down, when dropping the returned future.
379    fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G) -> impl Future<Output = ()>
380    where
381        G: Fn() -> Gut,
382        Gut: Future<Output = BTreeSet<Hash>> + Send;
383
384    /// physically delete the given hashes from the store.
385    fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;
386
387    /// Shutdown the store.
388    fn shutdown(&self) -> impl Future<Output = ()> + Send;
389
390    /// Sync the store.
391    fn sync(&self) -> impl Future<Output = io::Result<()>> + Send;
392
393    /// Validate the database
394    ///
395    /// This will check that the file and outboard content is correct for all complete
396    /// entries, and output valid ranges for all partial entries.
397    ///
398    /// It will not check the internal consistency of the database.
399    fn validate(
400        &self,
401        repair: bool,
402        tx: BoxedProgressSender<ValidateProgress>,
403    ) -> impl Future<Output = io::Result<()>> + Send {
404        validate_impl(self, repair, tx)
405    }
406}
407
408async fn validate_impl(
409    store: &impl Store,
410    repair: bool,
411    tx: BoxedProgressSender<ValidateProgress>,
412) -> io::Result<()> {
413    use futures_buffered::BufferedStreamExt;
414
415    let validate_parallelism: usize = num_cpus::get();
416    let lp = LocalPool::new(local_pool::Config {
417        threads: validate_parallelism,
418        ..Default::default()
419    });
420    let complete = store.blobs().await?.collect::<io::Result<Vec<_>>>()?;
421    let partial = store
422        .partial_blobs()
423        .await?
424        .collect::<io::Result<Vec<_>>>()?;
425    tx.send(ValidateProgress::Starting {
426        total: complete.len() as u64,
427    })
428    .await?;
429    let complete_result = futures_lite::stream::iter(complete)
430        .map(|hash| {
431            let store = store.clone();
432            let tx = tx.clone();
433            lp.spawn(move || async move {
434                let entry = store
435                    .get(&hash)
436                    .await?
437                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
438                let size = entry.size().value();
439                let outboard = entry.outboard().await?;
440                let data = entry.data_reader().await?;
441                let chunk_ranges = ChunkRanges::all();
442                let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
443                let id = tx.new_id();
444                tx.send(ValidateProgress::Entry {
445                    id,
446                    hash,
447                    path: None,
448                    size,
449                })
450                .await?;
451                let mut actual_chunk_ranges = ChunkRanges::empty();
452                while let Some(item) = ranges.next().await {
453                    let item = item?;
454                    let offset = item.start.to_bytes();
455                    actual_chunk_ranges |= ChunkRanges::from(item);
456                    tx.try_send(ValidateProgress::EntryProgress { id, offset })?;
457                }
458                let expected_chunk_range =
459                    ChunkRanges::from(..BaoTree::new(size, IROH_BLOCK_SIZE).chunks());
460                let incomplete = actual_chunk_ranges == expected_chunk_range;
461                let error = if incomplete {
462                    None
463                } else {
464                    Some(format!(
465                        "expected chunk ranges {:?}, got chunk ranges {:?}",
466                        expected_chunk_range, actual_chunk_ranges
467                    ))
468                };
469                tx.send(ValidateProgress::EntryDone { id, error }).await?;
470                drop(ranges);
471                drop(entry);
472                io::Result::Ok((hash, incomplete))
473            })
474        })
475        .buffered_unordered(validate_parallelism)
476        .collect::<Vec<_>>()
477        .await;
478    let partial_result = futures_lite::stream::iter(partial)
479        .map(|hash| {
480            let store = store.clone();
481            let tx = tx.clone();
482            lp.spawn(move || async move {
483                let entry = store
484                    .get(&hash)
485                    .await?
486                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
487                let size = entry.size().value();
488                let outboard = entry.outboard().await?;
489                let data = entry.data_reader().await?;
490                let chunk_ranges = ChunkRanges::all();
491                let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
492                let id = tx.new_id();
493                tx.send(ValidateProgress::PartialEntry {
494                    id,
495                    hash,
496                    path: None,
497                    size,
498                })
499                .await?;
500                let mut actual_chunk_ranges = ChunkRanges::empty();
501                while let Some(item) = ranges.next().await {
502                    let item = item?;
503                    let offset = item.start.to_bytes();
504                    actual_chunk_ranges |= ChunkRanges::from(item);
505                    tx.try_send(ValidateProgress::PartialEntryProgress { id, offset })?;
506                }
507                tx.send(ValidateProgress::PartialEntryDone {
508                    id,
509                    ranges: RangeSpec::new(&actual_chunk_ranges),
510                })
511                .await?;
512                drop(ranges);
513                drop(entry);
514                io::Result::Ok(())
515            })
516        })
517        .buffered_unordered(validate_parallelism)
518        .collect::<Vec<_>>()
519        .await;
520    let mut to_downgrade = Vec::new();
521    for item in complete_result {
522        let (hash, incomplete) = item??;
523        if incomplete {
524            to_downgrade.push(hash);
525        }
526    }
527    for item in partial_result {
528        item??;
529    }
530    if repair {
531        return Err(io::Error::new(
532            io::ErrorKind::Other,
533            "repair not implemented",
534        ));
535    }
536    Ok(())
537}
538
539/// Configuration for the GC mark and sweep.
540#[derive(derive_more::Debug)]
541pub struct GcConfig {
542    /// The period at which to execute the GC.
543    pub period: Duration,
544    /// An optional callback called every time a GC round finishes.
545    #[debug("done_callback")]
546    pub done_callback: Option<Box<dyn Fn() + Send>>,
547}
548
549/// Implementation of the gc loop.
550pub(super) async fn gc_run_loop<S, F, Fut, G, Gut>(
551    store: &S,
552    config: GcConfig,
553    start_cb: F,
554    protected_cb: G,
555) where
556    S: Store,
557    F: Fn() -> Fut,
558    Fut: Future<Output = io::Result<()>> + Send,
559    G: Fn() -> Gut,
560    Gut: Future<Output = BTreeSet<Hash>> + Send,
561{
562    tracing::info!("Starting GC task with interval {:?}", config.period);
563    let mut live = BTreeSet::new();
564    'outer: loop {
565        if let Err(cause) = start_cb().await {
566            tracing::debug!("unable to notify the db of GC start: {cause}. Shutting down GC loop.");
567            break;
568        }
569        // do delay before the two phases of GC
570        tokio::time::sleep(config.period).await;
571        tracing::debug!("Starting GC");
572        live.clear();
573
574        let p = protected_cb().await;
575        live.extend(p);
576
577        tracing::debug!("Starting GC mark phase");
578        let live_ref = &mut live;
579        let mut stream = Gen::new(|co| async move {
580            if let Err(e) = gc_mark_task(store, live_ref, &co).await {
581                co.yield_(GcMarkEvent::Error(e)).await;
582            }
583        });
584        while let Some(item) = stream.next().await {
585            match item {
586                GcMarkEvent::CustomDebug(text) => {
587                    tracing::debug!("{}", text);
588                }
589                GcMarkEvent::CustomWarning(text, _) => {
590                    tracing::warn!("{}", text);
591                }
592                GcMarkEvent::Error(err) => {
593                    tracing::error!("Fatal error during GC mark {}", err);
594                    continue 'outer;
595                }
596            }
597        }
598        drop(stream);
599
600        tracing::debug!("Starting GC sweep phase");
601        let live_ref = &live;
602        let mut stream = Gen::new(|co| async move {
603            if let Err(e) = gc_sweep_task(store, live_ref, &co).await {
604                co.yield_(GcSweepEvent::Error(e)).await;
605            }
606        });
607        while let Some(item) = stream.next().await {
608            match item {
609                GcSweepEvent::CustomDebug(text) => {
610                    tracing::debug!("{}", text);
611                }
612                GcSweepEvent::CustomWarning(text, _) => {
613                    tracing::warn!("{}", text);
614                }
615                GcSweepEvent::Error(err) => {
616                    tracing::error!("Fatal error during GC mark {}", err);
617                    continue 'outer;
618                }
619            }
620        }
621        if let Some(ref cb) = config.done_callback {
622            cb();
623        }
624    }
625}
626
627/// Implementation of the gc method.
628pub(super) async fn gc_mark_task<'a>(
629    store: &'a impl Store,
630    live: &'a mut BTreeSet<Hash>,
631    co: &Co<GcMarkEvent>,
632) -> anyhow::Result<()> {
633    macro_rules! debug {
634        ($($arg:tt)*) => {
635            co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
636        };
637    }
638    macro_rules! warn {
639        ($($arg:tt)*) => {
640            co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
641        };
642    }
643    let mut roots = BTreeSet::new();
644    debug!("traversing tags");
645    for item in store.tags(None, None).await? {
646        let (name, haf) = item?;
647        debug!("adding root {:?} {:?}", name, haf);
648        roots.insert(haf);
649    }
650    debug!("traversing temp roots");
651    for haf in store.temp_tags() {
652        debug!("adding temp pin {:?}", haf);
653        roots.insert(haf);
654    }
655    for HashAndFormat { hash, format } in roots {
656        // we need to do this for all formats except raw
657        if live.insert(hash) && !format.is_raw() {
658            let Some(entry) = store.get(&hash).await? else {
659                warn!("gc: {} not found", hash);
660                continue;
661            };
662            if !entry.is_complete() {
663                warn!("gc: {} is partial", hash);
664                continue;
665            }
666            let Ok(reader) = entry.data_reader().await else {
667                warn!("gc: {} creating data reader failed", hash);
668                continue;
669            };
670            let Ok((mut stream, count)) = parse_hash_seq(reader).await else {
671                warn!("gc: {} parse failed", hash);
672                continue;
673            };
674            debug!("parsed collection {} {:?}", hash, count);
675            loop {
676                let item = match stream.next().await {
677                    Ok(Some(item)) => item,
678                    Ok(None) => break,
679                    Err(_err) => {
680                        warn!("gc: {} parse failed", hash);
681                        break;
682                    }
683                };
684                // if format != raw we would have to recurse here by adding this to current
685                live.insert(item);
686            }
687        }
688    }
689    debug!("gc mark done. found {} live blobs", live.len());
690    Ok(())
691}
692
693async fn gc_sweep_task(
694    store: &impl Store,
695    live: &BTreeSet<Hash>,
696    co: &Co<GcSweepEvent>,
697) -> anyhow::Result<()> {
698    let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
699    let mut count = 0;
700    let mut batch = Vec::new();
701    for hash in blobs {
702        let hash = hash?;
703        if !live.contains(&hash) {
704            batch.push(hash);
705            count += 1;
706        }
707        if batch.len() >= 100 {
708            store.delete(batch.clone()).await?;
709            batch.clear();
710        }
711    }
712    if !batch.is_empty() {
713        store.delete(batch).await?;
714    }
715    co.yield_(GcSweepEvent::CustomDebug(format!(
716        "deleted {} blobs",
717        count
718    )))
719    .await;
720    Ok(())
721}
722
723/// An event related to GC
724#[derive(Debug)]
725pub enum GcMarkEvent {
726    /// A custom event (info)
727    CustomDebug(String),
728    /// A custom non critical error
729    CustomWarning(String, Option<anyhow::Error>),
730    /// An unrecoverable error during GC
731    Error(anyhow::Error),
732}
733
734/// An event related to GC
735#[derive(Debug)]
736pub enum GcSweepEvent {
737    /// A custom event (debug)
738    CustomDebug(String),
739    /// A custom non critical error
740    CustomWarning(String, Option<anyhow::Error>),
741    /// An unrecoverable error during GC
742    Error(anyhow::Error),
743}
744
745/// Progress messages for an import operation
746///
747/// An import operation involves computing the outboard of a file, and then
748/// either copying or moving the file into the database.
749#[allow(missing_docs)]
750#[derive(Debug)]
751pub enum ImportProgress {
752    /// Found a path
753    ///
754    /// This will be the first message for an id
755    Found { id: u64, name: String },
756    /// Progress when copying the file to the store
757    ///
758    /// This will be omitted if the store can use the file in place
759    ///
760    /// There will be multiple of these messages for an id
761    CopyProgress { id: u64, offset: u64 },
762    /// Determined the size
763    ///
764    /// This will come after `Found` and zero or more `CopyProgress` messages.
765    /// For unstable files, determining the size will only be done once the file
766    /// is fully copied.
767    Size { id: u64, size: u64 },
768    /// Progress when computing the outboard
769    ///
770    /// There will be multiple of these messages for an id
771    OutboardProgress { id: u64, offset: u64 },
772    /// Done computing the outboard
773    ///
774    /// This comes after `Size` and zero or more `OutboardProgress` messages
775    OutboardDone { id: u64, hash: Hash },
776}
777
778/// The import mode describes how files will be imported.
779///
780/// This is a hint to the import trait method. For some implementations, this
781/// does not make any sense. E.g. an in memory implementation will always have
782/// to copy the file into memory. Also, a disk based implementation might choose
783/// to copy small files even if the mode is `Reference`.
784#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
785pub enum ImportMode {
786    /// This mode will copy the file into the database before hashing.
787    ///
788    /// This is the safe default because the file can not be accidentally modified
789    /// after it has been imported.
790    #[default]
791    Copy,
792    /// This mode will try to reference the file in place and assume it is unchanged after import.
793    ///
794    /// This has a large performance and storage benefit, but it is less safe since
795    /// the file might be modified after it has been imported.
796    ///
797    /// Stores are allowed to ignore this mode and always copy the file, e.g.
798    /// if the file is very small or if the store does not support referencing files.
799    TryReference,
800}
801/// The import mode describes how files will be imported.
802///
803/// This is a hint to the import trait method. For some implementations, this
804/// does not make any sense. E.g. an in memory implementation will always have
805/// to copy the file into memory. Also, a disk based implementation might choose
806/// to copy small files even if the mode is `Reference`.
807#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
808pub enum ExportMode {
809    /// This mode will copy the file to the target directory.
810    ///
811    /// This is the safe default because the file can not be accidentally modified
812    /// after it has been exported.
813    #[default]
814    Copy,
815    /// This mode will try to move the file to the target directory and then reference it from
816    /// the database.
817    ///
818    /// This has a large performance and storage benefit, but it is less safe since
819    /// the file might be modified in the target directory after it has been exported.
820    ///
821    /// Stores are allowed to ignore this mode and always copy the file, e.g.
822    /// if the file is very small or if the store does not support referencing files.
823    TryReference,
824}
825
826/// The expected format of a hash being exported.
827#[derive(Debug, Clone, Serialize, Deserialize, Default)]
828pub enum ExportFormat {
829    /// The hash refers to any blob and will be exported to a single file.
830    #[default]
831    Blob,
832    /// The hash refers to a [`crate::format::collection::Collection`] blob
833    /// and all children of the collection shall be exported to one file per child.
834    ///
835    /// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
836    /// collection metadata, all other children of the collection will be exported to
837    /// a file each, with their collection name treated as a relative path to the export
838    /// destination path.
839    ///
840    /// If the blob cannot be parsed as a collection, the operation will fail.
841    Collection,
842}
843
844#[allow(missing_docs)]
845#[derive(Debug)]
846pub enum ExportProgress {
847    /// Starting to export to a file
848    ///
849    /// This will be the first message for an id
850    Start {
851        id: u64,
852        hash: Hash,
853        path: PathBuf,
854        stable: bool,
855    },
856    /// Progress when copying the file to the target
857    ///
858    /// This will be omitted if the store can move the file or use copy on write
859    ///
860    /// There will be multiple of these messages for an id
861    Progress { id: u64, offset: u64 },
862    /// Done exporting
863    Done { id: u64 },
864}
865
866/// Level for generic validation messages
867#[derive(
868    Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,
869)]
870pub enum ReportLevel {
871    /// Very unimportant info messages
872    Trace,
873    /// Info messages
874    Info,
875    /// Warnings, something is not quite right
876    Warn,
877    /// Errors, something is very wrong
878    Error,
879}
880
881/// Progress updates for the validate operation
882#[derive(Debug, Serialize, Deserialize)]
883pub enum ConsistencyCheckProgress {
884    /// Consistency check started
885    Start,
886    /// Consistency check update
887    Update {
888        /// The message
889        message: String,
890        /// The entry this message is about, if any
891        entry: Option<Hash>,
892        /// The level of the message
893        level: ReportLevel,
894    },
895    /// Consistency check ended
896    Done,
897    /// We got an error and need to abort.
898    Abort(serde_error::Error),
899}
900
901/// Progress updates for the validate operation
902#[derive(Debug, Serialize, Deserialize)]
903pub enum ValidateProgress {
904    /// started validating
905    Starting {
906        /// The total number of entries to validate
907        total: u64,
908    },
909    /// We started validating a complete entry
910    Entry {
911        /// a new unique id for this entry
912        id: u64,
913        /// the hash of the entry
914        hash: Hash,
915        /// location of the entry.
916        ///
917        /// In case of a file, this is the path to the file.
918        /// Otherwise it might be an url or something else to uniquely identify the entry.
919        path: Option<String>,
920        /// The size of the entry, in bytes.
921        size: u64,
922    },
923    /// We got progress ingesting item `id`.
924    EntryProgress {
925        /// The unique id of the entry.
926        id: u64,
927        /// The offset of the progress, in bytes.
928        offset: u64,
929    },
930    /// We are done with `id`
931    EntryDone {
932        /// The unique id of the entry.
933        id: u64,
934        /// An error if we failed to validate the entry.
935        error: Option<String>,
936    },
937    /// We started validating an entry
938    PartialEntry {
939        /// a new unique id for this entry
940        id: u64,
941        /// the hash of the entry
942        hash: Hash,
943        /// location of the entry.
944        ///
945        /// In case of a file, this is the path to the file.
946        /// Otherwise it might be an url or something else to uniquely identify the entry.
947        path: Option<String>,
948        /// The best known size of the entry, in bytes.
949        size: u64,
950    },
951    /// We got progress ingesting item `id`.
952    PartialEntryProgress {
953        /// The unique id of the entry.
954        id: u64,
955        /// The offset of the progress, in bytes.
956        offset: u64,
957    },
958    /// We are done with `id`
959    PartialEntryDone {
960        /// The unique id of the entry.
961        id: u64,
962        /// Available ranges.
963        ranges: RangeSpec,
964    },
965    /// We are done with the whole operation.
966    AllDone,
967    /// We got an error and need to abort.
968    Abort(serde_error::Error),
969}
970
971/// Database events
972#[derive(Debug, Clone, PartialEq, Eq)]
973pub enum Event {
974    /// A GC was started
975    GcStarted,
976    /// A GC was completed
977    GcCompleted,
978}