iroh_bytes/store/traits.rs
1//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
2use std::{collections::BTreeSet, future::Future, io, path::PathBuf};
3
4use bao_tree::{
5 io::fsm::{BaoContentItem, Outboard},
6 BaoTree, ChunkRanges,
7};
8use bytes::Bytes;
9use futures_lite::{Stream, StreamExt};
10use genawaiter::rc::{Co, Gen};
11use iroh_base::rpc::RpcError;
12use iroh_io::AsyncSliceReader;
13use serde::{Deserialize, Serialize};
14use tokio::io::AsyncRead;
15use tokio_util::task::LocalPoolHandle;
16
17use crate::{
18 hashseq::parse_hash_seq,
19 protocol::RangeSpec,
20 util::{
21 progress::{BoxedProgressSender, IdGenerator, ProgressSender},
22 Tag,
23 },
24 BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
25};
26
27pub use bao_tree;
28pub use range_collections;
29
30/// A fallible but owned iterator over the entries in a store.
31pub type DbIter<T> = Box<dyn Iterator<Item = io::Result<T>> + Send + Sync + 'static>;
32
33/// Export trogress callback
34pub type ExportProgressCb = Box<dyn Fn(u64) -> io::Result<()> + Send + Sync + 'static>;
35
36/// The availability status of an entry in a store.
37#[derive(Debug, Clone, Eq, PartialEq)]
38pub enum EntryStatus {
39 /// The entry is completely available.
40 Complete,
41 /// The entry is partially available.
42 Partial,
43 /// The entry is not in the store.
44 NotFound,
45}
46
47/// The size of a bao file
48#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)]
49pub enum BaoBlobSize {
50 /// A remote side told us the size, but we have insufficient data to verify it.
51 Unverified(u64),
52 /// We have verified the size.
53 Verified(u64),
54}
55
56impl BaoBlobSize {
57 /// Create a new `BaoFileSize` with the given size and verification status.
58 pub fn new(size: u64, verified: bool) -> Self {
59 if verified {
60 BaoBlobSize::Verified(size)
61 } else {
62 BaoBlobSize::Unverified(size)
63 }
64 }
65
66 /// Get just the value, no matter if it is verified or not.
67 pub fn value(&self) -> u64 {
68 match self {
69 BaoBlobSize::Unverified(size) => *size,
70 BaoBlobSize::Verified(size) => *size,
71 }
72 }
73}
74
75/// An entry for one hash in a bao map
76///
77/// The entry has the ability to provide you with an (outboard, data)
78/// reader pair. Creating the reader is async and may fail. The futures that
79/// create the readers must be `Send`, but the readers themselves don't have to
80/// be.
81pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
82 /// The hash of the entry.
83 fn hash(&self) -> Hash;
84 /// The size of the entry.
85 fn size(&self) -> BaoBlobSize;
86 /// Returns `true` if the entry is complete.
87 ///
88 /// Note that this does not actually verify if the bytes on disk are complete,
89 /// it only checks if the entry was marked as complete in the store.
90 fn is_complete(&self) -> bool;
91 /// A future that resolves to a reader that can be used to read the outboard
92 fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
93 /// A future that resolves to a reader that can be used to read the data
94 fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
95}
96
97/// A generic map from hashes to bao blobs (blobs with bao outboards).
98///
99/// This is the readonly view. To allow updates, a concrete implementation must
100/// also implement [`MapMut`].
101///
102/// Entries are *not* guaranteed to be complete for all implementations.
103/// They are also not guaranteed to be immutable, since this could be the
104/// readonly view of a mutable store.
105pub trait Map: Clone + Send + Sync + 'static {
106 /// The entry type. An entry is a cheaply cloneable handle that can be used
107 /// to open readers for both the data and the outboard
108 type Entry: MapEntry;
109 /// Get an entry for a hash.
110 ///
111 /// This can also be used for a membership test by just checking if there
112 /// is an entry. Creating an entry should be cheap, any expensive ops should
113 /// be deferred to the creation of the actual readers.
114 ///
115 /// It is not guaranteed that the entry is complete.
116 fn get(&self, hash: &Hash) -> impl Future<Output = io::Result<Option<Self::Entry>>> + Send;
117}
118
119/// A partial entry
120pub trait MapEntryMut: MapEntry {
121 /// Get a batch writer
122 fn batch_writer(&self) -> impl Future<Output = io::Result<impl BaoBatchWriter>> + Send;
123}
124
125/// An async batch interface for writing bao content items to a pair of data and
126/// outboard.
127///
128/// Details like the chunk group size and the actual storage location are left
129/// to the implementation.
130pub trait BaoBatchWriter {
131 /// Write a batch of bao content items to the underlying storage.
132 ///
133 /// The batch is guaranteed to be sorted as data is received from the network.
134 /// So leafs will be sorted by offset, and parents will be sorted by pre order
135 /// traversal offset. There is no guarantee that they will be consecutive
136 /// though.
137 ///
138 /// The size is the total size of the blob that the remote side told us.
139 /// It is not guaranteed to be correct, but it is guaranteed to be
140 /// consistent with all data in the batch. The size therefore represents
141 /// an upper bound on the maximum offset of all leaf items.
142 /// So it is guaranteed that `leaf.offset + leaf.size <= size` for all
143 /// leaf items in the batch.
144 ///
145 /// Batches should not become too large. Typically, a batch is just a few
146 /// parent nodes and a leaf.
147 ///
148 /// Batch is a vec so it can be moved into a task, which is unfortunately
149 /// necessary in typical io code.
150 fn write_batch(
151 &mut self,
152 size: u64,
153 batch: Vec<BaoContentItem>,
154 ) -> impl Future<Output = io::Result<()>>;
155
156 /// Sync the written data to permanent storage, if applicable.
157 /// E.g. for a file based implementation, this would call sync_data
158 /// on all files.
159 fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
160}
161
162/// Implement BaoBatchWriter for mutable references
163impl<W: BaoBatchWriter> BaoBatchWriter for &mut W {
164 async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
165 (**self).write_batch(size, batch).await
166 }
167
168 async fn sync(&mut self) -> io::Result<()> {
169 (**self).sync().await
170 }
171}
172
173/// A wrapper around a batch writer that calls a progress callback for one leaf
174/// per batch.
175#[derive(Debug)]
176pub(crate) struct FallibleProgressBatchWriter<W, F>(W, F);
177
178impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
179 FallibleProgressBatchWriter<W, F>
180{
181 /// Create a new `FallibleProgressBatchWriter` from an inner writer and a progress callback
182 ///
183 /// The `on_write` function is called for each write, with the `offset` as the first and the
184 /// length of the data as the second param. `on_write` must return an `io::Result`.
185 /// If `on_write` returns an error, the download is aborted.
186 pub fn new(inner: W, on_write: F) -> Self {
187 Self(inner, on_write)
188 }
189}
190
191impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static> BaoBatchWriter
192 for FallibleProgressBatchWriter<W, F>
193{
194 async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
195 // find the offset and length of the first (usually only) chunk
196 let chunk = batch
197 .iter()
198 .filter_map(|item| {
199 if let BaoContentItem::Leaf(leaf) = item {
200 Some((leaf.offset, leaf.data.len()))
201 } else {
202 None
203 }
204 })
205 .next();
206 self.0.write_batch(size, batch).await?;
207 // call the progress callback
208 if let Some((offset, len)) = chunk {
209 (self.1)(offset, len)?;
210 }
211 Ok(())
212 }
213
214 async fn sync(&mut self) -> io::Result<()> {
215 self.0.sync().await
216 }
217}
218
219/// A mutable bao map.
220///
221/// This extends the readonly [`Map`] trait with methods to create and modify entries.
222pub trait MapMut: Map {
223 /// An entry that is possibly writable
224 type EntryMut: MapEntryMut;
225
226 /// Get an existing entry as an EntryMut.
227 ///
228 /// For implementations where EntryMut and Entry are the same type, this is just an alias for
229 /// `get`.
230 fn get_mut(
231 &self,
232 hash: &Hash,
233 ) -> impl Future<Output = io::Result<Option<Self::EntryMut>>> + Send;
234
235 /// Get an existing partial entry, or create a new one.
236 ///
237 /// We need to know the size of the partial entry. This might produce an
238 /// error e.g. if there is not enough space on disk.
239 fn get_or_create(
240 &self,
241 hash: Hash,
242 size: u64,
243 ) -> impl Future<Output = io::Result<Self::EntryMut>> + Send;
244
245 /// Find out if the data behind a `hash` is complete, partial, or not present.
246 ///
247 /// Note that this does not actually verify the on-disc data, but only checks in which section
248 /// of the store the entry is present.
249 fn entry_status(&self, hash: &Hash) -> impl Future<Output = io::Result<EntryStatus>> + Send;
250
251 /// Sync version of `entry_status`, for the doc sync engine until we can get rid of it.
252 ///
253 /// Don't count on this to be efficient.
254 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus>;
255
256 /// Upgrade a partial entry to a complete entry.
257 fn insert_complete(&self, entry: Self::EntryMut)
258 -> impl Future<Output = io::Result<()>> + Send;
259}
260
261/// Extension of [`Map`] to add misc methods used by the rpc calls.
262pub trait ReadableStore: Map {
263 /// list all blobs in the database. This includes both raw blobs that have
264 /// been imported, and hash sequences that have been created internally.
265 fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
266 /// list all tags (collections or other explicitly added things) in the database
267 fn tags(&self) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
268
269 /// Temp tags
270 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
271
272 /// Perform a consistency check on the database
273 fn consistency_check(
274 &self,
275 repair: bool,
276 tx: BoxedProgressSender<ConsistencyCheckProgress>,
277 ) -> impl Future<Output = io::Result<()>> + Send;
278
279 /// list partial blobs in the database
280 fn partial_blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
281
282 /// This trait method extracts a file to a local path.
283 ///
284 /// `hash` is the hash of the file
285 /// `target` is the path to the target file
286 /// `mode` is a hint how the file should be exported.
287 /// `progress` is a callback that is called with the total number of bytes that have been written
288 fn export(
289 &self,
290 hash: Hash,
291 target: PathBuf,
292 mode: ExportMode,
293 progress: ExportProgressCb,
294 ) -> impl Future<Output = io::Result<()>> + Send;
295}
296
297/// The mutable part of a Bao store.
298pub trait Store: ReadableStore + MapMut {
299 /// This trait method imports a file from a local path.
300 ///
301 /// `data` is the path to the file.
302 /// `mode` is a hint how the file should be imported.
303 /// `progress` is a sender that provides a way for the importer to send progress messages
304 /// when importing large files. This also serves as a way to cancel the import. If the
305 /// consumer of the progress messages is dropped, subsequent attempts to send progress
306 /// will fail.
307 ///
308 /// Returns the hash of the imported file. The reason to have this method is that some database
309 /// implementations might be able to import a file without copying it.
310 fn import_file(
311 &self,
312 data: PathBuf,
313 mode: ImportMode,
314 format: BlobFormat,
315 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
316 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
317
318 /// Import data from memory.
319 ///
320 /// It is a special case of `import` that does not use the file system.
321 fn import_bytes(
322 &self,
323 bytes: Bytes,
324 format: BlobFormat,
325 ) -> impl Future<Output = io::Result<TempTag>> + Send;
326
327 /// Import data from a stream of bytes.
328 fn import_stream(
329 &self,
330 data: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
331 format: BlobFormat,
332 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
333 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
334
335 /// Import data from an async byte reader.
336 fn import_reader(
337 &self,
338 data: impl AsyncRead + Send + Unpin + 'static,
339 format: BlobFormat,
340 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
341 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send {
342 let stream = tokio_util::io::ReaderStream::new(data);
343 self.import_stream(stream, format, progress)
344 }
345
346 /// Set a tag
347 fn set_tag(
348 &self,
349 name: Tag,
350 hash: Option<HashAndFormat>,
351 ) -> impl Future<Output = io::Result<()>> + Send;
352
353 /// Create a new tag
354 fn create_tag(&self, hash: HashAndFormat) -> impl Future<Output = io::Result<Tag>> + Send;
355
356 /// Create a temporary pin for this store
357 fn temp_tag(&self, value: HashAndFormat) -> TempTag;
358
359 /// Notify the store that a new gc phase is about to start.
360 ///
361 /// This should not fail unless the store is shut down or otherwise in a
362 /// bad state. The gc task will shut itself down if this fails.
363 fn gc_start(&self) -> impl Future<Output = io::Result<()>> + Send;
364
365 /// Traverse all roots recursively and mark them as live.
366 ///
367 /// Poll this stream to completion to perform a full gc mark phase.
368 ///
369 /// Not polling this stream to completion is dangerous, since it might lead
370 /// to some live data being missed.
371 ///
372 /// The implementation of this method should do the minimum amount of work
373 /// to determine the live set. Actual deletion of garbage should be done
374 /// in the gc_sweep phase.
375 fn gc_mark(&self, live: &mut BTreeSet<Hash>) -> impl Stream<Item = GcMarkEvent> + Unpin {
376 Gen::new(|co| async move {
377 if let Err(e) = gc_mark_task(self, live, &co).await {
378 co.yield_(GcMarkEvent::Error(e)).await;
379 }
380 })
381 }
382
383 /// Remove all blobs that are not marked as live.
384 ///
385 /// Poll this stream to completion to perform a full gc sweep. Not polling this stream
386 /// to completion just means that some garbage will remain in the database.
387 ///
388 /// Sweeping might take long, but it can safely be done in the background.
389 fn gc_sweep(&self, live: &BTreeSet<Hash>) -> impl Stream<Item = GcSweepEvent> + Unpin {
390 Gen::new(|co| async move {
391 if let Err(e) = gc_sweep_task(self, live, &co).await {
392 co.yield_(GcSweepEvent::Error(e)).await;
393 }
394 })
395 }
396
397 /// physically delete the given hashes from the store.
398 fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;
399
400 /// Shutdown the store.
401 fn shutdown(&self) -> impl Future<Output = ()> + Send;
402
403 /// Validate the database
404 ///
405 /// This will check that the file and outboard content is correct for all complete
406 /// entries, and output valid ranges for all partial entries.
407 ///
408 /// It will not check the internal consistency of the database.
409 fn validate(
410 &self,
411 repair: bool,
412 tx: BoxedProgressSender<ValidateProgress>,
413 ) -> impl Future<Output = io::Result<()>> + Send {
414 validate_impl(self, repair, tx)
415 }
416}
417
418async fn validate_impl(
419 store: &impl Store,
420 repair: bool,
421 tx: BoxedProgressSender<ValidateProgress>,
422) -> io::Result<()> {
423 use futures_buffered::BufferedStreamExt;
424
425 let validate_parallelism: usize = num_cpus::get();
426 let lp = LocalPoolHandle::new(validate_parallelism);
427 let complete = store.blobs().await?.collect::<io::Result<Vec<_>>>()?;
428 let partial = store
429 .partial_blobs()
430 .await?
431 .collect::<io::Result<Vec<_>>>()?;
432 tx.send(ValidateProgress::Starting {
433 total: complete.len() as u64,
434 })
435 .await?;
436 let complete_result = futures_lite::stream::iter(complete)
437 .map(|hash| {
438 let store = store.clone();
439 let tx = tx.clone();
440 lp.spawn_pinned(move || async move {
441 let entry = store
442 .get(&hash)
443 .await?
444 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
445 let size = entry.size().value();
446 let outboard = entry.outboard().await?;
447 let data = entry.data_reader().await?;
448 let chunk_ranges = ChunkRanges::all();
449 let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
450 let id = tx.new_id();
451 tx.send(ValidateProgress::Entry {
452 id,
453 hash,
454 path: None,
455 size,
456 })
457 .await?;
458 let mut actual_chunk_ranges = ChunkRanges::empty();
459 while let Some(item) = ranges.next().await {
460 let item = item?;
461 let offset = item.start.to_bytes();
462 actual_chunk_ranges |= ChunkRanges::from(item);
463 tx.try_send(ValidateProgress::EntryProgress { id, offset })?;
464 }
465 let expected_chunk_range =
466 ChunkRanges::from(..BaoTree::new(size, IROH_BLOCK_SIZE).chunks());
467 let incomplete = actual_chunk_ranges == expected_chunk_range;
468 let error = if incomplete {
469 None
470 } else {
471 Some(format!(
472 "expected chunk ranges {:?}, got chunk ranges {:?}",
473 expected_chunk_range, actual_chunk_ranges
474 ))
475 };
476 tx.send(ValidateProgress::EntryDone { id, error }).await?;
477 drop(ranges);
478 drop(entry);
479 io::Result::Ok((hash, incomplete))
480 })
481 })
482 .buffered_unordered(validate_parallelism)
483 .collect::<Vec<_>>()
484 .await;
485 let partial_result = futures_lite::stream::iter(partial)
486 .map(|hash| {
487 let store = store.clone();
488 let tx = tx.clone();
489 lp.spawn_pinned(move || async move {
490 let entry = store
491 .get(&hash)
492 .await?
493 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
494 let size = entry.size().value();
495 let outboard = entry.outboard().await?;
496 let data = entry.data_reader().await?;
497 let chunk_ranges = ChunkRanges::all();
498 let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
499 let id = tx.new_id();
500 tx.send(ValidateProgress::PartialEntry {
501 id,
502 hash,
503 path: None,
504 size,
505 })
506 .await?;
507 let mut actual_chunk_ranges = ChunkRanges::empty();
508 while let Some(item) = ranges.next().await {
509 let item = item?;
510 let offset = item.start.to_bytes();
511 actual_chunk_ranges |= ChunkRanges::from(item);
512 tx.try_send(ValidateProgress::PartialEntryProgress { id, offset })?;
513 }
514 tx.send(ValidateProgress::PartialEntryDone {
515 id,
516 ranges: RangeSpec::new(&actual_chunk_ranges),
517 })
518 .await?;
519 drop(ranges);
520 drop(entry);
521 io::Result::Ok(())
522 })
523 })
524 .buffered_unordered(validate_parallelism)
525 .collect::<Vec<_>>()
526 .await;
527 let mut to_downgrade = Vec::new();
528 for item in complete_result {
529 let (hash, incomplete) = item??;
530 if incomplete {
531 to_downgrade.push(hash);
532 }
533 }
534 for item in partial_result {
535 item??;
536 }
537 if repair {
538 return Err(io::Error::new(
539 io::ErrorKind::Other,
540 "repair not implemented",
541 ));
542 }
543 Ok(())
544}
545
546/// Implementation of the gc method.
547async fn gc_mark_task<'a>(
548 store: &'a impl Store,
549 live: &'a mut BTreeSet<Hash>,
550 co: &Co<GcMarkEvent>,
551) -> anyhow::Result<()> {
552 macro_rules! debug {
553 ($($arg:tt)*) => {
554 co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
555 };
556 }
557 macro_rules! warn {
558 ($($arg:tt)*) => {
559 co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
560 };
561 }
562 let mut roots = BTreeSet::new();
563 debug!("traversing tags");
564 for item in store.tags().await? {
565 let (name, haf) = item?;
566 debug!("adding root {:?} {:?}", name, haf);
567 roots.insert(haf);
568 }
569 debug!("traversing temp roots");
570 for haf in store.temp_tags() {
571 debug!("adding temp pin {:?}", haf);
572 roots.insert(haf);
573 }
574 for HashAndFormat { hash, format } in roots {
575 // we need to do this for all formats except raw
576 if live.insert(hash) && !format.is_raw() {
577 let Some(entry) = store.get(&hash).await? else {
578 warn!("gc: {} not found", hash);
579 continue;
580 };
581 if !entry.is_complete() {
582 warn!("gc: {} is partial", hash);
583 continue;
584 }
585 let Ok(reader) = entry.data_reader().await else {
586 warn!("gc: {} creating data reader failed", hash);
587 continue;
588 };
589 let Ok((mut stream, count)) = parse_hash_seq(reader).await else {
590 warn!("gc: {} parse failed", hash);
591 continue;
592 };
593 debug!("parsed collection {} {:?}", hash, count);
594 loop {
595 let item = match stream.next().await {
596 Ok(Some(item)) => item,
597 Ok(None) => break,
598 Err(_err) => {
599 warn!("gc: {} parse failed", hash);
600 break;
601 }
602 };
603 // if format != raw we would have to recurse here by adding this to current
604 live.insert(item);
605 }
606 }
607 }
608 debug!("gc mark done. found {} live blobs", live.len());
609 Ok(())
610}
611
612async fn gc_sweep_task<'a>(
613 store: &'a impl Store,
614 live: &BTreeSet<Hash>,
615 co: &Co<GcSweepEvent>,
616) -> anyhow::Result<()> {
617 let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
618 let mut count = 0;
619 let mut batch = Vec::new();
620 for hash in blobs {
621 let hash = hash?;
622 if !live.contains(&hash) {
623 batch.push(hash);
624 count += 1;
625 }
626 if batch.len() >= 100 {
627 store.delete(batch.clone()).await?;
628 batch.clear();
629 }
630 }
631 if !batch.is_empty() {
632 store.delete(batch).await?;
633 }
634 co.yield_(GcSweepEvent::CustomDebug(format!(
635 "deleted {} blobs",
636 count
637 )))
638 .await;
639 Ok(())
640}
641
642/// An event related to GC
643#[derive(Debug)]
644pub enum GcMarkEvent {
645 /// A custom event (info)
646 CustomDebug(String),
647 /// A custom non critical error
648 CustomWarning(String, Option<anyhow::Error>),
649 /// An unrecoverable error during GC
650 Error(anyhow::Error),
651}
652
653/// An event related to GC
654#[derive(Debug)]
655pub enum GcSweepEvent {
656 /// A custom event (debug)
657 CustomDebug(String),
658 /// A custom non critical error
659 CustomWarning(String, Option<anyhow::Error>),
660 /// An unrecoverable error during GC
661 Error(anyhow::Error),
662}
663
664/// Progress messages for an import operation
665///
666/// An import operation involves computing the outboard of a file, and then
667/// either copying or moving the file into the database.
668#[allow(missing_docs)]
669#[derive(Debug)]
670pub enum ImportProgress {
671 /// Found a path
672 ///
673 /// This will be the first message for an id
674 Found { id: u64, name: String },
675 /// Progress when copying the file to the store
676 ///
677 /// This will be omitted if the store can use the file in place
678 ///
679 /// There will be multiple of these messages for an id
680 CopyProgress { id: u64, offset: u64 },
681 /// Determined the size
682 ///
683 /// This will come after `Found` and zero or more `CopyProgress` messages.
684 /// For unstable files, determining the size will only be done once the file
685 /// is fully copied.
686 Size { id: u64, size: u64 },
687 /// Progress when computing the outboard
688 ///
689 /// There will be multiple of these messages for an id
690 OutboardProgress { id: u64, offset: u64 },
691 /// Done computing the outboard
692 ///
693 /// This comes after `Size` and zero or more `OutboardProgress` messages
694 OutboardDone { id: u64, hash: Hash },
695}
696
697/// The import mode describes how files will be imported.
698///
699/// This is a hint to the import trait method. For some implementations, this
700/// does not make any sense. E.g. an in memory implementation will always have
701/// to copy the file into memory. Also, a disk based implementation might choose
702/// to copy small files even if the mode is `Reference`.
703#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
704pub enum ImportMode {
705 /// This mode will copy the file into the database before hashing.
706 ///
707 /// This is the safe default because the file can not be accidentally modified
708 /// after it has been imported.
709 #[default]
710 Copy,
711 /// This mode will try to reference the file in place and assume it is unchanged after import.
712 ///
713 /// This has a large performance and storage benefit, but it is less safe since
714 /// the file might be modified after it has been imported.
715 ///
716 /// Stores are allowed to ignore this mode and always copy the file, e.g.
717 /// if the file is very small or if the store does not support referencing files.
718 TryReference,
719}
720/// The import mode describes how files will be imported.
721///
722/// This is a hint to the import trait method. For some implementations, this
723/// does not make any sense. E.g. an in memory implementation will always have
724/// to copy the file into memory. Also, a disk based implementation might choose
725/// to copy small files even if the mode is `Reference`.
726#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
727pub enum ExportMode {
728 /// This mode will copy the file to the target directory.
729 ///
730 /// This is the safe default because the file can not be accidentally modified
731 /// after it has been exported.
732 #[default]
733 Copy,
734 /// This mode will try to move the file to the target directory and then reference it from
735 /// the database.
736 ///
737 /// This has a large performance and storage benefit, but it is less safe since
738 /// the file might be modified in the target directory after it has been exported.
739 ///
740 /// Stores are allowed to ignore this mode and always copy the file, e.g.
741 /// if the file is very small or if the store does not support referencing files.
742 TryReference,
743}
744
745/// The expected format of a hash being exported.
746#[derive(Debug, Clone, Serialize, Deserialize, Default)]
747pub enum ExportFormat {
748 /// The hash refers to any blob and will be exported to a single file.
749 #[default]
750 Blob,
751 /// The hash refers to a [`crate::format::collection::Collection`] blob
752 /// and all children of the collection shall be exported to one file per child.
753 ///
754 /// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
755 /// collection metadata, all other children of the collection will be exported to
756 /// a file each, with their collection name treated as a relative path to the export
757 /// destination path.
758 ///
759 /// If the blob cannot be parsed as a collection, the operation will fail.
760 Collection,
761}
762
763#[allow(missing_docs)]
764#[derive(Debug)]
765pub enum ExportProgress {
766 /// Starting to export to a file
767 ///
768 /// This will be the first message for an id
769 Start {
770 id: u64,
771 hash: Hash,
772 path: PathBuf,
773 stable: bool,
774 },
775 /// Progress when copying the file to the target
776 ///
777 /// This will be omitted if the store can move the file or use copy on write
778 ///
779 /// There will be multiple of these messages for an id
780 Progress { id: u64, offset: u64 },
781 /// Done exporting
782 Done { id: u64 },
783}
784
785/// Level for generic validation messages
786#[derive(
787 Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,
788)]
789pub enum ReportLevel {
790 /// Very unimportant info messages
791 Trace,
792 /// Info messages
793 Info,
794 /// Warnings, something is not quite right
795 Warn,
796 /// Errors, something is very wrong
797 Error,
798}
799
800/// Progress updates for the validate operation
801#[derive(Debug, Serialize, Deserialize)]
802pub enum ConsistencyCheckProgress {
803 /// Consistency check started
804 Start,
805 /// Consistency check update
806 Update {
807 /// The message
808 message: String,
809 /// The entry this message is about, if any
810 entry: Option<Hash>,
811 /// The level of the message
812 level: ReportLevel,
813 },
814 /// Consistency check ended
815 Done,
816 /// We got an error and need to abort.
817 Abort(RpcError),
818}
819
820/// Progress updates for the validate operation
821#[derive(Debug, Serialize, Deserialize)]
822pub enum ValidateProgress {
823 /// started validating
824 Starting {
825 /// The total number of entries to validate
826 total: u64,
827 },
828 /// We started validating a complete entry
829 Entry {
830 /// a new unique id for this entry
831 id: u64,
832 /// the hash of the entry
833 hash: Hash,
834 /// location of the entry.
835 ///
836 /// In case of a file, this is the path to the file.
837 /// Otherwise it might be an url or something else to uniquely identify the entry.
838 path: Option<String>,
839 /// The size of the entry, in bytes.
840 size: u64,
841 },
842 /// We got progress ingesting item `id`.
843 EntryProgress {
844 /// The unique id of the entry.
845 id: u64,
846 /// The offset of the progress, in bytes.
847 offset: u64,
848 },
849 /// We are done with `id`
850 EntryDone {
851 /// The unique id of the entry.
852 id: u64,
853 /// An error if we failed to validate the entry.
854 error: Option<String>,
855 },
856 /// We started validating an entry
857 PartialEntry {
858 /// a new unique id for this entry
859 id: u64,
860 /// the hash of the entry
861 hash: Hash,
862 /// location of the entry.
863 ///
864 /// In case of a file, this is the path to the file.
865 /// Otherwise it might be an url or something else to uniquely identify the entry.
866 path: Option<String>,
867 /// The best known size of the entry, in bytes.
868 size: u64,
869 },
870 /// We got progress ingesting item `id`.
871 PartialEntryProgress {
872 /// The unique id of the entry.
873 id: u64,
874 /// The offset of the progress, in bytes.
875 offset: u64,
876 },
877 /// We are done with `id`
878 PartialEntryDone {
879 /// The unique id of the entry.
880 id: u64,
881 /// Available ranges.
882 ranges: RangeSpec,
883 },
884 /// We are done with the whole operation.
885 AllDone,
886 /// We got an error and need to abort.
887 Abort(RpcError),
888}
889
890/// Database events
891#[derive(Debug, Clone, PartialEq, Eq)]
892pub enum Event {
893 /// A GC was started
894 GcStarted,
895 /// A GC was completed
896 GcCompleted,
897}