iroh_bytes/store/
traits.rs

1//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
2use std::{collections::BTreeSet, future::Future, io, path::PathBuf};
3
4use bao_tree::{
5    io::fsm::{BaoContentItem, Outboard},
6    BaoTree, ChunkRanges,
7};
8use bytes::Bytes;
9use futures_lite::{Stream, StreamExt};
10use genawaiter::rc::{Co, Gen};
11use iroh_base::rpc::RpcError;
12use iroh_io::AsyncSliceReader;
13use serde::{Deserialize, Serialize};
14use tokio::io::AsyncRead;
15use tokio_util::task::LocalPoolHandle;
16
17use crate::{
18    hashseq::parse_hash_seq,
19    protocol::RangeSpec,
20    util::{
21        progress::{BoxedProgressSender, IdGenerator, ProgressSender},
22        Tag,
23    },
24    BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
25};
26
27pub use bao_tree;
28pub use range_collections;
29
30/// A fallible but owned iterator over the entries in a store.
31pub type DbIter<T> = Box<dyn Iterator<Item = io::Result<T>> + Send + Sync + 'static>;
32
33/// Export trogress callback
34pub type ExportProgressCb = Box<dyn Fn(u64) -> io::Result<()> + Send + Sync + 'static>;
35
36/// The availability status of an entry in a store.
37#[derive(Debug, Clone, Eq, PartialEq)]
38pub enum EntryStatus {
39    /// The entry is completely available.
40    Complete,
41    /// The entry is partially available.
42    Partial,
43    /// The entry is not in the store.
44    NotFound,
45}
46
47/// The size of a bao file
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)]
49pub enum BaoBlobSize {
50    /// A remote side told us the size, but we have insufficient data to verify it.
51    Unverified(u64),
52    /// We have verified the size.
53    Verified(u64),
54}
55
56impl BaoBlobSize {
57    /// Create a new `BaoFileSize` with the given size and verification status.
58    pub fn new(size: u64, verified: bool) -> Self {
59        if verified {
60            BaoBlobSize::Verified(size)
61        } else {
62            BaoBlobSize::Unverified(size)
63        }
64    }
65
66    /// Get just the value, no matter if it is verified or not.
67    pub fn value(&self) -> u64 {
68        match self {
69            BaoBlobSize::Unverified(size) => *size,
70            BaoBlobSize::Verified(size) => *size,
71        }
72    }
73}
74
75/// An entry for one hash in a bao map
76///
77/// The entry has the ability to provide you with an (outboard, data)
78/// reader pair. Creating the reader is async and may fail. The futures that
79/// create the readers must be `Send`, but the readers themselves don't have to
80/// be.
81pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
82    /// The hash of the entry.
83    fn hash(&self) -> Hash;
84    /// The size of the entry.
85    fn size(&self) -> BaoBlobSize;
86    /// Returns `true` if the entry is complete.
87    ///
88    /// Note that this does not actually verify if the bytes on disk are complete,
89    /// it only checks if the entry was marked as complete in the store.
90    fn is_complete(&self) -> bool;
91    /// A future that resolves to a reader that can be used to read the outboard
92    fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
93    /// A future that resolves to a reader that can be used to read the data
94    fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
95}
96
97/// A generic map from hashes to bao blobs (blobs with bao outboards).
98///
99/// This is the readonly view. To allow updates, a concrete implementation must
100/// also implement [`MapMut`].
101///
102/// Entries are *not* guaranteed to be complete for all implementations.
103/// They are also not guaranteed to be immutable, since this could be the
104/// readonly view of a mutable store.
105pub trait Map: Clone + Send + Sync + 'static {
106    /// The entry type. An entry is a cheaply cloneable handle that can be used
107    /// to open readers for both the data and the outboard
108    type Entry: MapEntry;
109    /// Get an entry for a hash.
110    ///
111    /// This can also be used for a membership test by just checking if there
112    /// is an entry. Creating an entry should be cheap, any expensive ops should
113    /// be deferred to the creation of the actual readers.
114    ///
115    /// It is not guaranteed that the entry is complete.
116    fn get(&self, hash: &Hash) -> impl Future<Output = io::Result<Option<Self::Entry>>> + Send;
117}
118
119/// A partial entry
120pub trait MapEntryMut: MapEntry {
121    /// Get a batch writer
122    fn batch_writer(&self) -> impl Future<Output = io::Result<impl BaoBatchWriter>> + Send;
123}
124
125/// An async batch interface for writing bao content items to a pair of data and
126/// outboard.
127///
128/// Details like the chunk group size and the actual storage location are left
129/// to the implementation.
130pub trait BaoBatchWriter {
131    /// Write a batch of bao content items to the underlying storage.
132    ///
133    /// The batch is guaranteed to be sorted as data is received from the network.
134    /// So leafs will be sorted by offset, and parents will be sorted by pre order
135    /// traversal offset. There is no guarantee that they will be consecutive
136    /// though.
137    ///
138    /// The size is the total size of the blob that the remote side told us.
139    /// It is not guaranteed to be correct, but it is guaranteed to be
140    /// consistent with all data in the batch. The size therefore represents
141    /// an upper bound on the maximum offset of all leaf items.
142    /// So it is guaranteed that `leaf.offset + leaf.size <= size` for all
143    /// leaf items in the batch.
144    ///
145    /// Batches should not become too large. Typically, a batch is just a few
146    /// parent nodes and a leaf.
147    ///
148    /// Batch is a vec so it can be moved into a task, which is unfortunately
149    /// necessary in typical io code.
150    fn write_batch(
151        &mut self,
152        size: u64,
153        batch: Vec<BaoContentItem>,
154    ) -> impl Future<Output = io::Result<()>>;
155
156    /// Sync the written data to permanent storage, if applicable.
157    /// E.g. for a file based implementation, this would call sync_data
158    /// on all files.
159    fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
160}
161
162/// Implement BaoBatchWriter for mutable references
163impl<W: BaoBatchWriter> BaoBatchWriter for &mut W {
164    async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
165        (**self).write_batch(size, batch).await
166    }
167
168    async fn sync(&mut self) -> io::Result<()> {
169        (**self).sync().await
170    }
171}
172
173/// A wrapper around a batch writer that calls a progress callback for one leaf
174/// per batch.
175#[derive(Debug)]
176pub(crate) struct FallibleProgressBatchWriter<W, F>(W, F);
177
178impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
179    FallibleProgressBatchWriter<W, F>
180{
181    /// Create a new `FallibleProgressBatchWriter` from an inner writer and a progress callback
182    ///
183    /// The `on_write` function is called for each write, with the `offset` as the first and the
184    /// length of the data as the second param. `on_write` must return an `io::Result`.
185    /// If `on_write` returns an error, the download is aborted.
186    pub fn new(inner: W, on_write: F) -> Self {
187        Self(inner, on_write)
188    }
189}
190
191impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static> BaoBatchWriter
192    for FallibleProgressBatchWriter<W, F>
193{
194    async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
195        // find the offset and length of the first (usually only) chunk
196        let chunk = batch
197            .iter()
198            .filter_map(|item| {
199                if let BaoContentItem::Leaf(leaf) = item {
200                    Some((leaf.offset, leaf.data.len()))
201                } else {
202                    None
203                }
204            })
205            .next();
206        self.0.write_batch(size, batch).await?;
207        // call the progress callback
208        if let Some((offset, len)) = chunk {
209            (self.1)(offset, len)?;
210        }
211        Ok(())
212    }
213
214    async fn sync(&mut self) -> io::Result<()> {
215        self.0.sync().await
216    }
217}
218
219/// A mutable bao map.
220///
221/// This extends the readonly [`Map`] trait with methods to create and modify entries.
222pub trait MapMut: Map {
223    /// An entry that is possibly writable
224    type EntryMut: MapEntryMut;
225
226    /// Get an existing entry as an EntryMut.
227    ///
228    /// For implementations where EntryMut and Entry are the same type, this is just an alias for
229    /// `get`.
230    fn get_mut(
231        &self,
232        hash: &Hash,
233    ) -> impl Future<Output = io::Result<Option<Self::EntryMut>>> + Send;
234
235    /// Get an existing partial entry, or create a new one.
236    ///
237    /// We need to know the size of the partial entry. This might produce an
238    /// error e.g. if there is not enough space on disk.
239    fn get_or_create(
240        &self,
241        hash: Hash,
242        size: u64,
243    ) -> impl Future<Output = io::Result<Self::EntryMut>> + Send;
244
245    /// Find out if the data behind a `hash` is complete, partial, or not present.
246    ///
247    /// Note that this does not actually verify the on-disc data, but only checks in which section
248    /// of the store the entry is present.
249    fn entry_status(&self, hash: &Hash) -> impl Future<Output = io::Result<EntryStatus>> + Send;
250
251    /// Sync version of `entry_status`, for the doc sync engine until we can get rid of it.
252    ///
253    /// Don't count on this to be efficient.
254    fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus>;
255
256    /// Upgrade a partial entry to a complete entry.
257    fn insert_complete(&self, entry: Self::EntryMut)
258        -> impl Future<Output = io::Result<()>> + Send;
259}
260
261/// Extension of [`Map`] to add misc methods used by the rpc calls.
262pub trait ReadableStore: Map {
263    /// list all blobs in the database. This includes both raw blobs that have
264    /// been imported, and hash sequences that have been created internally.
265    fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
266    /// list all tags (collections or other explicitly added things) in the database
267    fn tags(&self) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
268
269    /// Temp tags
270    fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
271
272    /// Perform a consistency check on the database
273    fn consistency_check(
274        &self,
275        repair: bool,
276        tx: BoxedProgressSender<ConsistencyCheckProgress>,
277    ) -> impl Future<Output = io::Result<()>> + Send;
278
279    /// list partial blobs in the database
280    fn partial_blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
281
282    /// This trait method extracts a file to a local path.
283    ///
284    /// `hash` is the hash of the file
285    /// `target` is the path to the target file
286    /// `mode` is a hint how the file should be exported.
287    /// `progress` is a callback that is called with the total number of bytes that have been written
288    fn export(
289        &self,
290        hash: Hash,
291        target: PathBuf,
292        mode: ExportMode,
293        progress: ExportProgressCb,
294    ) -> impl Future<Output = io::Result<()>> + Send;
295}
296
297/// The mutable part of a Bao store.
298pub trait Store: ReadableStore + MapMut {
299    /// This trait method imports a file from a local path.
300    ///
301    /// `data` is the path to the file.
302    /// `mode` is a hint how the file should be imported.
303    /// `progress` is a sender that provides a way for the importer to send progress messages
304    /// when importing large files. This also serves as a way to cancel the import. If the
305    /// consumer of the progress messages is dropped, subsequent attempts to send progress
306    /// will fail.
307    ///
308    /// Returns the hash of the imported file. The reason to have this method is that some database
309    /// implementations might be able to import a file without copying it.
310    fn import_file(
311        &self,
312        data: PathBuf,
313        mode: ImportMode,
314        format: BlobFormat,
315        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
316    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
317
318    /// Import data from memory.
319    ///
320    /// It is a special case of `import` that does not use the file system.
321    fn import_bytes(
322        &self,
323        bytes: Bytes,
324        format: BlobFormat,
325    ) -> impl Future<Output = io::Result<TempTag>> + Send;
326
327    /// Import data from a stream of bytes.
328    fn import_stream(
329        &self,
330        data: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
331        format: BlobFormat,
332        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
333    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
334
335    /// Import data from an async byte reader.
336    fn import_reader(
337        &self,
338        data: impl AsyncRead + Send + Unpin + 'static,
339        format: BlobFormat,
340        progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
341    ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send {
342        let stream = tokio_util::io::ReaderStream::new(data);
343        self.import_stream(stream, format, progress)
344    }
345
346    /// Set a tag
347    fn set_tag(
348        &self,
349        name: Tag,
350        hash: Option<HashAndFormat>,
351    ) -> impl Future<Output = io::Result<()>> + Send;
352
353    /// Create a new tag
354    fn create_tag(&self, hash: HashAndFormat) -> impl Future<Output = io::Result<Tag>> + Send;
355
356    /// Create a temporary pin for this store
357    fn temp_tag(&self, value: HashAndFormat) -> TempTag;
358
359    /// Notify the store that a new gc phase is about to start.
360    ///
361    /// This should not fail unless the store is shut down or otherwise in a
362    /// bad state. The gc task will shut itself down if this fails.
363    fn gc_start(&self) -> impl Future<Output = io::Result<()>> + Send;
364
365    /// Traverse all roots recursively and mark them as live.
366    ///
367    /// Poll this stream to completion to perform a full gc mark phase.
368    ///
369    /// Not polling this stream to completion is dangerous, since it might lead
370    /// to some live data being missed.
371    ///
372    /// The implementation of this method should do the minimum amount of work
373    /// to determine the live set. Actual deletion of garbage should be done
374    /// in the gc_sweep phase.
375    fn gc_mark(&self, live: &mut BTreeSet<Hash>) -> impl Stream<Item = GcMarkEvent> + Unpin {
376        Gen::new(|co| async move {
377            if let Err(e) = gc_mark_task(self, live, &co).await {
378                co.yield_(GcMarkEvent::Error(e)).await;
379            }
380        })
381    }
382
383    /// Remove all blobs that are not marked as live.
384    ///
385    /// Poll this stream to completion to perform a full gc sweep. Not polling this stream
386    /// to completion just means that some garbage will remain in the database.
387    ///
388    /// Sweeping might take long, but it can safely be done in the background.
389    fn gc_sweep(&self, live: &BTreeSet<Hash>) -> impl Stream<Item = GcSweepEvent> + Unpin {
390        Gen::new(|co| async move {
391            if let Err(e) = gc_sweep_task(self, live, &co).await {
392                co.yield_(GcSweepEvent::Error(e)).await;
393            }
394        })
395    }
396
397    /// physically delete the given hashes from the store.
398    fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;
399
400    /// Shutdown the store.
401    fn shutdown(&self) -> impl Future<Output = ()> + Send;
402
403    /// Validate the database
404    ///
405    /// This will check that the file and outboard content is correct for all complete
406    /// entries, and output valid ranges for all partial entries.
407    ///
408    /// It will not check the internal consistency of the database.
409    fn validate(
410        &self,
411        repair: bool,
412        tx: BoxedProgressSender<ValidateProgress>,
413    ) -> impl Future<Output = io::Result<()>> + Send {
414        validate_impl(self, repair, tx)
415    }
416}
417
418async fn validate_impl(
419    store: &impl Store,
420    repair: bool,
421    tx: BoxedProgressSender<ValidateProgress>,
422) -> io::Result<()> {
423    use futures_buffered::BufferedStreamExt;
424
425    let validate_parallelism: usize = num_cpus::get();
426    let lp = LocalPoolHandle::new(validate_parallelism);
427    let complete = store.blobs().await?.collect::<io::Result<Vec<_>>>()?;
428    let partial = store
429        .partial_blobs()
430        .await?
431        .collect::<io::Result<Vec<_>>>()?;
432    tx.send(ValidateProgress::Starting {
433        total: complete.len() as u64,
434    })
435    .await?;
436    let complete_result = futures_lite::stream::iter(complete)
437        .map(|hash| {
438            let store = store.clone();
439            let tx = tx.clone();
440            lp.spawn_pinned(move || async move {
441                let entry = store
442                    .get(&hash)
443                    .await?
444                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
445                let size = entry.size().value();
446                let outboard = entry.outboard().await?;
447                let data = entry.data_reader().await?;
448                let chunk_ranges = ChunkRanges::all();
449                let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
450                let id = tx.new_id();
451                tx.send(ValidateProgress::Entry {
452                    id,
453                    hash,
454                    path: None,
455                    size,
456                })
457                .await?;
458                let mut actual_chunk_ranges = ChunkRanges::empty();
459                while let Some(item) = ranges.next().await {
460                    let item = item?;
461                    let offset = item.start.to_bytes();
462                    actual_chunk_ranges |= ChunkRanges::from(item);
463                    tx.try_send(ValidateProgress::EntryProgress { id, offset })?;
464                }
465                let expected_chunk_range =
466                    ChunkRanges::from(..BaoTree::new(size, IROH_BLOCK_SIZE).chunks());
467                let incomplete = actual_chunk_ranges == expected_chunk_range;
468                let error = if incomplete {
469                    None
470                } else {
471                    Some(format!(
472                        "expected chunk ranges {:?}, got chunk ranges {:?}",
473                        expected_chunk_range, actual_chunk_ranges
474                    ))
475                };
476                tx.send(ValidateProgress::EntryDone { id, error }).await?;
477                drop(ranges);
478                drop(entry);
479                io::Result::Ok((hash, incomplete))
480            })
481        })
482        .buffered_unordered(validate_parallelism)
483        .collect::<Vec<_>>()
484        .await;
485    let partial_result = futures_lite::stream::iter(partial)
486        .map(|hash| {
487            let store = store.clone();
488            let tx = tx.clone();
489            lp.spawn_pinned(move || async move {
490                let entry = store
491                    .get(&hash)
492                    .await?
493                    .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
494                let size = entry.size().value();
495                let outboard = entry.outboard().await?;
496                let data = entry.data_reader().await?;
497                let chunk_ranges = ChunkRanges::all();
498                let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
499                let id = tx.new_id();
500                tx.send(ValidateProgress::PartialEntry {
501                    id,
502                    hash,
503                    path: None,
504                    size,
505                })
506                .await?;
507                let mut actual_chunk_ranges = ChunkRanges::empty();
508                while let Some(item) = ranges.next().await {
509                    let item = item?;
510                    let offset = item.start.to_bytes();
511                    actual_chunk_ranges |= ChunkRanges::from(item);
512                    tx.try_send(ValidateProgress::PartialEntryProgress { id, offset })?;
513                }
514                tx.send(ValidateProgress::PartialEntryDone {
515                    id,
516                    ranges: RangeSpec::new(&actual_chunk_ranges),
517                })
518                .await?;
519                drop(ranges);
520                drop(entry);
521                io::Result::Ok(())
522            })
523        })
524        .buffered_unordered(validate_parallelism)
525        .collect::<Vec<_>>()
526        .await;
527    let mut to_downgrade = Vec::new();
528    for item in complete_result {
529        let (hash, incomplete) = item??;
530        if incomplete {
531            to_downgrade.push(hash);
532        }
533    }
534    for item in partial_result {
535        item??;
536    }
537    if repair {
538        return Err(io::Error::new(
539            io::ErrorKind::Other,
540            "repair not implemented",
541        ));
542    }
543    Ok(())
544}
545
546/// Implementation of the gc method.
547async fn gc_mark_task<'a>(
548    store: &'a impl Store,
549    live: &'a mut BTreeSet<Hash>,
550    co: &Co<GcMarkEvent>,
551) -> anyhow::Result<()> {
552    macro_rules! debug {
553        ($($arg:tt)*) => {
554            co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
555        };
556    }
557    macro_rules! warn {
558        ($($arg:tt)*) => {
559            co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
560        };
561    }
562    let mut roots = BTreeSet::new();
563    debug!("traversing tags");
564    for item in store.tags().await? {
565        let (name, haf) = item?;
566        debug!("adding root {:?} {:?}", name, haf);
567        roots.insert(haf);
568    }
569    debug!("traversing temp roots");
570    for haf in store.temp_tags() {
571        debug!("adding temp pin {:?}", haf);
572        roots.insert(haf);
573    }
574    for HashAndFormat { hash, format } in roots {
575        // we need to do this for all formats except raw
576        if live.insert(hash) && !format.is_raw() {
577            let Some(entry) = store.get(&hash).await? else {
578                warn!("gc: {} not found", hash);
579                continue;
580            };
581            if !entry.is_complete() {
582                warn!("gc: {} is partial", hash);
583                continue;
584            }
585            let Ok(reader) = entry.data_reader().await else {
586                warn!("gc: {} creating data reader failed", hash);
587                continue;
588            };
589            let Ok((mut stream, count)) = parse_hash_seq(reader).await else {
590                warn!("gc: {} parse failed", hash);
591                continue;
592            };
593            debug!("parsed collection {} {:?}", hash, count);
594            loop {
595                let item = match stream.next().await {
596                    Ok(Some(item)) => item,
597                    Ok(None) => break,
598                    Err(_err) => {
599                        warn!("gc: {} parse failed", hash);
600                        break;
601                    }
602                };
603                // if format != raw we would have to recurse here by adding this to current
604                live.insert(item);
605            }
606        }
607    }
608    debug!("gc mark done. found {} live blobs", live.len());
609    Ok(())
610}
611
612async fn gc_sweep_task<'a>(
613    store: &'a impl Store,
614    live: &BTreeSet<Hash>,
615    co: &Co<GcSweepEvent>,
616) -> anyhow::Result<()> {
617    let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
618    let mut count = 0;
619    let mut batch = Vec::new();
620    for hash in blobs {
621        let hash = hash?;
622        if !live.contains(&hash) {
623            batch.push(hash);
624            count += 1;
625        }
626        if batch.len() >= 100 {
627            store.delete(batch.clone()).await?;
628            batch.clear();
629        }
630    }
631    if !batch.is_empty() {
632        store.delete(batch).await?;
633    }
634    co.yield_(GcSweepEvent::CustomDebug(format!(
635        "deleted {} blobs",
636        count
637    )))
638    .await;
639    Ok(())
640}
641
642/// An event related to GC
643#[derive(Debug)]
644pub enum GcMarkEvent {
645    /// A custom event (info)
646    CustomDebug(String),
647    /// A custom non critical error
648    CustomWarning(String, Option<anyhow::Error>),
649    /// An unrecoverable error during GC
650    Error(anyhow::Error),
651}
652
653/// An event related to GC
654#[derive(Debug)]
655pub enum GcSweepEvent {
656    /// A custom event (debug)
657    CustomDebug(String),
658    /// A custom non critical error
659    CustomWarning(String, Option<anyhow::Error>),
660    /// An unrecoverable error during GC
661    Error(anyhow::Error),
662}
663
664/// Progress messages for an import operation
665///
666/// An import operation involves computing the outboard of a file, and then
667/// either copying or moving the file into the database.
668#[allow(missing_docs)]
669#[derive(Debug)]
670pub enum ImportProgress {
671    /// Found a path
672    ///
673    /// This will be the first message for an id
674    Found { id: u64, name: String },
675    /// Progress when copying the file to the store
676    ///
677    /// This will be omitted if the store can use the file in place
678    ///
679    /// There will be multiple of these messages for an id
680    CopyProgress { id: u64, offset: u64 },
681    /// Determined the size
682    ///
683    /// This will come after `Found` and zero or more `CopyProgress` messages.
684    /// For unstable files, determining the size will only be done once the file
685    /// is fully copied.
686    Size { id: u64, size: u64 },
687    /// Progress when computing the outboard
688    ///
689    /// There will be multiple of these messages for an id
690    OutboardProgress { id: u64, offset: u64 },
691    /// Done computing the outboard
692    ///
693    /// This comes after `Size` and zero or more `OutboardProgress` messages
694    OutboardDone { id: u64, hash: Hash },
695}
696
697/// The import mode describes how files will be imported.
698///
699/// This is a hint to the import trait method. For some implementations, this
700/// does not make any sense. E.g. an in memory implementation will always have
701/// to copy the file into memory. Also, a disk based implementation might choose
702/// to copy small files even if the mode is `Reference`.
703#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
704pub enum ImportMode {
705    /// This mode will copy the file into the database before hashing.
706    ///
707    /// This is the safe default because the file can not be accidentally modified
708    /// after it has been imported.
709    #[default]
710    Copy,
711    /// This mode will try to reference the file in place and assume it is unchanged after import.
712    ///
713    /// This has a large performance and storage benefit, but it is less safe since
714    /// the file might be modified after it has been imported.
715    ///
716    /// Stores are allowed to ignore this mode and always copy the file, e.g.
717    /// if the file is very small or if the store does not support referencing files.
718    TryReference,
719}
720/// The import mode describes how files will be imported.
721///
722/// This is a hint to the import trait method. For some implementations, this
723/// does not make any sense. E.g. an in memory implementation will always have
724/// to copy the file into memory. Also, a disk based implementation might choose
725/// to copy small files even if the mode is `Reference`.
726#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
727pub enum ExportMode {
728    /// This mode will copy the file to the target directory.
729    ///
730    /// This is the safe default because the file can not be accidentally modified
731    /// after it has been exported.
732    #[default]
733    Copy,
734    /// This mode will try to move the file to the target directory and then reference it from
735    /// the database.
736    ///
737    /// This has a large performance and storage benefit, but it is less safe since
738    /// the file might be modified in the target directory after it has been exported.
739    ///
740    /// Stores are allowed to ignore this mode and always copy the file, e.g.
741    /// if the file is very small or if the store does not support referencing files.
742    TryReference,
743}
744
745/// The expected format of a hash being exported.
746#[derive(Debug, Clone, Serialize, Deserialize, Default)]
747pub enum ExportFormat {
748    /// The hash refers to any blob and will be exported to a single file.
749    #[default]
750    Blob,
751    /// The hash refers to a [`crate::format::collection::Collection`] blob
752    /// and all children of the collection shall be exported to one file per child.
753    ///
754    /// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
755    /// collection metadata, all other children of the collection will be exported to
756    /// a file each, with their collection name treated as a relative path to the export
757    /// destination path.
758    ///
759    /// If the blob cannot be parsed as a collection, the operation will fail.
760    Collection,
761}
762
763#[allow(missing_docs)]
764#[derive(Debug)]
765pub enum ExportProgress {
766    /// Starting to export to a file
767    ///
768    /// This will be the first message for an id
769    Start {
770        id: u64,
771        hash: Hash,
772        path: PathBuf,
773        stable: bool,
774    },
775    /// Progress when copying the file to the target
776    ///
777    /// This will be omitted if the store can move the file or use copy on write
778    ///
779    /// There will be multiple of these messages for an id
780    Progress { id: u64, offset: u64 },
781    /// Done exporting
782    Done { id: u64 },
783}
784
785/// Level for generic validation messages
786#[derive(
787    Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,
788)]
789pub enum ReportLevel {
790    /// Very unimportant info messages
791    Trace,
792    /// Info messages
793    Info,
794    /// Warnings, something is not quite right
795    Warn,
796    /// Errors, something is very wrong
797    Error,
798}
799
800/// Progress updates for the validate operation
801#[derive(Debug, Serialize, Deserialize)]
802pub enum ConsistencyCheckProgress {
803    /// Consistency check started
804    Start,
805    /// Consistency check update
806    Update {
807        /// The message
808        message: String,
809        /// The entry this message is about, if any
810        entry: Option<Hash>,
811        /// The level of the message
812        level: ReportLevel,
813    },
814    /// Consistency check ended
815    Done,
816    /// We got an error and need to abort.
817    Abort(RpcError),
818}
819
820/// Progress updates for the validate operation
821#[derive(Debug, Serialize, Deserialize)]
822pub enum ValidateProgress {
823    /// started validating
824    Starting {
825        /// The total number of entries to validate
826        total: u64,
827    },
828    /// We started validating a complete entry
829    Entry {
830        /// a new unique id for this entry
831        id: u64,
832        /// the hash of the entry
833        hash: Hash,
834        /// location of the entry.
835        ///
836        /// In case of a file, this is the path to the file.
837        /// Otherwise it might be an url or something else to uniquely identify the entry.
838        path: Option<String>,
839        /// The size of the entry, in bytes.
840        size: u64,
841    },
842    /// We got progress ingesting item `id`.
843    EntryProgress {
844        /// The unique id of the entry.
845        id: u64,
846        /// The offset of the progress, in bytes.
847        offset: u64,
848    },
849    /// We are done with `id`
850    EntryDone {
851        /// The unique id of the entry.
852        id: u64,
853        /// An error if we failed to validate the entry.
854        error: Option<String>,
855    },
856    /// We started validating an entry
857    PartialEntry {
858        /// a new unique id for this entry
859        id: u64,
860        /// the hash of the entry
861        hash: Hash,
862        /// location of the entry.
863        ///
864        /// In case of a file, this is the path to the file.
865        /// Otherwise it might be an url or something else to uniquely identify the entry.
866        path: Option<String>,
867        /// The best known size of the entry, in bytes.
868        size: u64,
869    },
870    /// We got progress ingesting item `id`.
871    PartialEntryProgress {
872        /// The unique id of the entry.
873        id: u64,
874        /// The offset of the progress, in bytes.
875        offset: u64,
876    },
877    /// We are done with `id`
878    PartialEntryDone {
879        /// The unique id of the entry.
880        id: u64,
881        /// Available ranges.
882        ranges: RangeSpec,
883    },
884    /// We are done with the whole operation.
885    AllDone,
886    /// We got an error and need to abort.
887    Abort(RpcError),
888}
889
890/// Database events
891#[derive(Debug, Clone, PartialEq, Eq)]
892pub enum Event {
893    /// A GC was started
894    GcStarted,
895    /// A GC was completed
896    GcCompleted,
897}