iroh_blobs/store/traits.rs
1//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
2use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Duration};
3
4pub use bao_tree;
5use bao_tree::{
6 io::fsm::{BaoContentItem, Outboard},
7 BaoTree, ChunkRanges,
8};
9use bytes::Bytes;
10use futures_lite::{Stream, StreamExt};
11use genawaiter::rc::{Co, Gen};
12use iroh_io::AsyncSliceReader;
13pub use range_collections;
14use serde::{Deserialize, Serialize};
15use tokio::io::AsyncRead;
16
17use crate::{
18 hashseq::parse_hash_seq,
19 protocol::RangeSpec,
20 util::{
21 local_pool::{self, LocalPool},
22 progress::{BoxedProgressSender, IdGenerator, ProgressSender},
23 Tag,
24 },
25 BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
26};
27
28/// A fallible but owned iterator over the entries in a store.
29pub type DbIter<T> = Box<dyn Iterator<Item = io::Result<T>> + Send + Sync + 'static>;
30
31/// Export trogress callback
32pub type ExportProgressCb = Box<dyn Fn(u64) -> io::Result<()> + Send + Sync + 'static>;
33
34/// The availability status of an entry in a store.
35#[derive(Debug, Clone, Eq, PartialEq)]
36pub enum EntryStatus {
37 /// The entry is completely available.
38 Complete,
39 /// The entry is partially available.
40 Partial,
41 /// The entry is not in the store.
42 NotFound,
43}
44
45/// The size of a bao file
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)]
47pub enum BaoBlobSize {
48 /// A remote side told us the size, but we have insufficient data to verify it.
49 Unverified(u64),
50 /// We have verified the size.
51 Verified(u64),
52}
53
54impl BaoBlobSize {
55 /// Create a new `BaoFileSize` with the given size and verification status.
56 pub fn new(size: u64, verified: bool) -> Self {
57 if verified {
58 BaoBlobSize::Verified(size)
59 } else {
60 BaoBlobSize::Unverified(size)
61 }
62 }
63
64 /// Get just the value, no matter if it is verified or not.
65 pub fn value(&self) -> u64 {
66 match self {
67 BaoBlobSize::Unverified(size) => *size,
68 BaoBlobSize::Verified(size) => *size,
69 }
70 }
71}
72
73/// An entry for one hash in a bao map
74///
75/// The entry has the ability to provide you with an (outboard, data)
76/// reader pair. Creating the reader is async and may fail. The futures that
77/// create the readers must be `Send`, but the readers themselves don't have to
78/// be.
79pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
80 /// The hash of the entry.
81 fn hash(&self) -> Hash;
82 /// The size of the entry.
83 fn size(&self) -> BaoBlobSize;
84 /// Returns `true` if the entry is complete.
85 ///
86 /// Note that this does not actually verify if the bytes on disk are complete,
87 /// it only checks if the entry was marked as complete in the store.
88 fn is_complete(&self) -> bool;
89 /// A future that resolves to a reader that can be used to read the outboard
90 fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
91 /// A future that resolves to a reader that can be used to read the data
92 fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
93}
94
95/// A generic map from hashes to bao blobs (blobs with bao outboards).
96///
97/// This is the readonly view. To allow updates, a concrete implementation must
98/// also implement [`MapMut`].
99///
100/// Entries are *not* guaranteed to be complete for all implementations.
101/// They are also not guaranteed to be immutable, since this could be the
102/// readonly view of a mutable store.
103pub trait Map: Clone + Send + Sync + 'static {
104 /// The entry type. An entry is a cheaply cloneable handle that can be used
105 /// to open readers for both the data and the outboard
106 type Entry: MapEntry;
107 /// Get an entry for a hash.
108 ///
109 /// This can also be used for a membership test by just checking if there
110 /// is an entry. Creating an entry should be cheap, any expensive ops should
111 /// be deferred to the creation of the actual readers.
112 ///
113 /// It is not guaranteed that the entry is complete.
114 fn get(&self, hash: &Hash) -> impl Future<Output = io::Result<Option<Self::Entry>>> + Send;
115}
116
117/// A partial entry
118pub trait MapEntryMut: MapEntry {
119 /// Get a batch writer
120 fn batch_writer(&self) -> impl Future<Output = io::Result<impl BaoBatchWriter>> + Send;
121}
122
123/// An async batch interface for writing bao content items to a pair of data and
124/// outboard.
125///
126/// Details like the chunk group size and the actual storage location are left
127/// to the implementation.
128pub trait BaoBatchWriter {
129 /// Write a batch of bao content items to the underlying storage.
130 ///
131 /// The batch is guaranteed to be sorted as data is received from the network.
132 /// So leaves will be sorted by offset, and parents will be sorted by pre order
133 /// traversal offset. There is no guarantee that they will be consecutive
134 /// though.
135 ///
136 /// The size is the total size of the blob that the remote side told us.
137 /// It is not guaranteed to be correct, but it is guaranteed to be
138 /// consistent with all data in the batch. The size therefore represents
139 /// an upper bound on the maximum offset of all leaf items.
140 /// So it is guaranteed that `leaf.offset + leaf.size <= size` for all
141 /// leaf items in the batch.
142 ///
143 /// Batches should not become too large. Typically, a batch is just a few
144 /// parent nodes and a leaf.
145 ///
146 /// Batch is a vec so it can be moved into a task, which is unfortunately
147 /// necessary in typical io code.
148 fn write_batch(
149 &mut self,
150 size: u64,
151 batch: Vec<BaoContentItem>,
152 ) -> impl Future<Output = io::Result<()>>;
153
154 /// Sync the written data to permanent storage, if applicable.
155 /// E.g. for a file based implementation, this would call sync_data
156 /// on all files.
157 fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
158}
159
160/// Implement BaoBatchWriter for mutable references
161impl<W: BaoBatchWriter> BaoBatchWriter for &mut W {
162 async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
163 (**self).write_batch(size, batch).await
164 }
165
166 async fn sync(&mut self) -> io::Result<()> {
167 (**self).sync().await
168 }
169}
170
171/// A wrapper around a batch writer that calls a progress callback for one leaf
172/// per batch.
173#[derive(Debug)]
174pub(crate) struct FallibleProgressBatchWriter<W, F>(W, F);
175
176impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
177 FallibleProgressBatchWriter<W, F>
178{
179 /// Create a new `FallibleProgressBatchWriter` from an inner writer and a progress callback
180 ///
181 /// The `on_write` function is called for each write, with the `offset` as the first and the
182 /// length of the data as the second param. `on_write` must return an `io::Result`.
183 /// If `on_write` returns an error, the download is aborted.
184 pub fn new(inner: W, on_write: F) -> Self {
185 Self(inner, on_write)
186 }
187}
188
189impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static> BaoBatchWriter
190 for FallibleProgressBatchWriter<W, F>
191{
192 async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
193 // find the offset and length of the first (usually only) chunk
194 let chunk = batch
195 .iter()
196 .filter_map(|item| {
197 if let BaoContentItem::Leaf(leaf) = item {
198 Some((leaf.offset, leaf.data.len()))
199 } else {
200 None
201 }
202 })
203 .next();
204 self.0.write_batch(size, batch).await?;
205 // call the progress callback
206 if let Some((offset, len)) = chunk {
207 (self.1)(offset, len)?;
208 }
209 Ok(())
210 }
211
212 async fn sync(&mut self) -> io::Result<()> {
213 self.0.sync().await
214 }
215}
216
217/// A mutable bao map.
218///
219/// This extends the readonly [`Map`] trait with methods to create and modify entries.
220pub trait MapMut: Map {
221 /// An entry that is possibly writable
222 type EntryMut: MapEntryMut;
223
224 /// Get an existing entry as an EntryMut.
225 ///
226 /// For implementations where EntryMut and Entry are the same type, this is just an alias for
227 /// `get`.
228 fn get_mut(
229 &self,
230 hash: &Hash,
231 ) -> impl Future<Output = io::Result<Option<Self::EntryMut>>> + Send;
232
233 /// Get an existing partial entry, or create a new one.
234 ///
235 /// We need to know the size of the partial entry. This might produce an
236 /// error e.g. if there is not enough space on disk.
237 fn get_or_create(
238 &self,
239 hash: Hash,
240 size: u64,
241 ) -> impl Future<Output = io::Result<Self::EntryMut>> + Send;
242
243 /// Find out if the data behind a `hash` is complete, partial, or not present.
244 ///
245 /// Note that this does not actually verify the on-disc data, but only checks in which section
246 /// of the store the entry is present.
247 fn entry_status(&self, hash: &Hash) -> impl Future<Output = io::Result<EntryStatus>> + Send;
248
249 /// Sync version of `entry_status`, for the doc sync engine until we can get rid of it.
250 ///
251 /// Don't count on this to be efficient.
252 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus>;
253
254 /// Upgrade a partial entry to a complete entry.
255 fn insert_complete(&self, entry: Self::EntryMut)
256 -> impl Future<Output = io::Result<()>> + Send;
257}
258
259/// Extension of [`Map`] to add misc methods used by the rpc calls.
260pub trait ReadableStore: Map {
261 /// list all blobs in the database. This includes both raw blobs that have
262 /// been imported, and hash sequences that have been created internally.
263 fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
264 /// list all tags (collections or other explicitly added things) in the database
265 fn tags(&self) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
266
267 /// Temp tags
268 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
269
270 /// Perform a consistency check on the database
271 fn consistency_check(
272 &self,
273 repair: bool,
274 tx: BoxedProgressSender<ConsistencyCheckProgress>,
275 ) -> impl Future<Output = io::Result<()>> + Send;
276
277 /// list partial blobs in the database
278 fn partial_blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
279
280 /// This trait method extracts a file to a local path.
281 ///
282 /// `hash` is the hash of the file
283 /// `target` is the path to the target file
284 /// `mode` is a hint how the file should be exported.
285 /// `progress` is a callback that is called with the total number of bytes that have been written
286 fn export(
287 &self,
288 hash: Hash,
289 target: PathBuf,
290 mode: ExportMode,
291 progress: ExportProgressCb,
292 ) -> impl Future<Output = io::Result<()>> + Send;
293}
294
295/// The mutable part of a Bao store.
296pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
297 /// This trait method imports a file from a local path.
298 ///
299 /// `data` is the path to the file.
300 /// `mode` is a hint how the file should be imported.
301 /// `progress` is a sender that provides a way for the importer to send progress messages
302 /// when importing large files. This also serves as a way to cancel the import. If the
303 /// consumer of the progress messages is dropped, subsequent attempts to send progress
304 /// will fail.
305 ///
306 /// Returns the hash of the imported file. The reason to have this method is that some database
307 /// implementations might be able to import a file without copying it.
308 fn import_file(
309 &self,
310 data: PathBuf,
311 mode: ImportMode,
312 format: BlobFormat,
313 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
314 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
315
316 /// Import data from memory.
317 ///
318 /// It is a special case of `import` that does not use the file system.
319 fn import_bytes(
320 &self,
321 bytes: Bytes,
322 format: BlobFormat,
323 ) -> impl Future<Output = io::Result<TempTag>> + Send;
324
325 /// Import data from a stream of bytes.
326 fn import_stream(
327 &self,
328 data: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
329 format: BlobFormat,
330 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
331 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
332
333 /// Import data from an async byte reader.
334 fn import_reader(
335 &self,
336 data: impl AsyncRead + Send + Unpin + 'static,
337 format: BlobFormat,
338 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
339 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send {
340 let stream = tokio_util::io::ReaderStream::new(data);
341 self.import_stream(stream, format, progress)
342 }
343
344 /// Set a tag
345 fn set_tag(
346 &self,
347 name: Tag,
348 hash: Option<HashAndFormat>,
349 ) -> impl Future<Output = io::Result<()>> + Send;
350
351 /// Create a new tag
352 fn create_tag(&self, hash: HashAndFormat) -> impl Future<Output = io::Result<Tag>> + Send;
353
354 /// Create a temporary pin for this store
355 fn temp_tag(&self, value: HashAndFormat) -> TempTag;
356
357 /// Start the GC loop
358 ///
359 /// The gc task will shut down, when dropping the returned future.
360 fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G) -> impl Future<Output = ()>
361 where
362 G: Fn() -> Gut,
363 Gut: Future<Output = BTreeSet<Hash>> + Send;
364
365 /// physically delete the given hashes from the store.
366 fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;
367
368 /// Shutdown the store.
369 fn shutdown(&self) -> impl Future<Output = ()> + Send;
370
371 /// Sync the store.
372 fn sync(&self) -> impl Future<Output = io::Result<()>> + Send;
373
374 /// Validate the database
375 ///
376 /// This will check that the file and outboard content is correct for all complete
377 /// entries, and output valid ranges for all partial entries.
378 ///
379 /// It will not check the internal consistency of the database.
380 fn validate(
381 &self,
382 repair: bool,
383 tx: BoxedProgressSender<ValidateProgress>,
384 ) -> impl Future<Output = io::Result<()>> + Send {
385 validate_impl(self, repair, tx)
386 }
387}
388
389async fn validate_impl(
390 store: &impl Store,
391 repair: bool,
392 tx: BoxedProgressSender<ValidateProgress>,
393) -> io::Result<()> {
394 use futures_buffered::BufferedStreamExt;
395
396 let validate_parallelism: usize = num_cpus::get();
397 let lp = LocalPool::new(local_pool::Config {
398 threads: validate_parallelism,
399 ..Default::default()
400 });
401 let complete = store.blobs().await?.collect::<io::Result<Vec<_>>>()?;
402 let partial = store
403 .partial_blobs()
404 .await?
405 .collect::<io::Result<Vec<_>>>()?;
406 tx.send(ValidateProgress::Starting {
407 total: complete.len() as u64,
408 })
409 .await?;
410 let complete_result = futures_lite::stream::iter(complete)
411 .map(|hash| {
412 let store = store.clone();
413 let tx = tx.clone();
414 lp.spawn(move || async move {
415 let entry = store
416 .get(&hash)
417 .await?
418 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
419 let size = entry.size().value();
420 let outboard = entry.outboard().await?;
421 let data = entry.data_reader().await?;
422 let chunk_ranges = ChunkRanges::all();
423 let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
424 let id = tx.new_id();
425 tx.send(ValidateProgress::Entry {
426 id,
427 hash,
428 path: None,
429 size,
430 })
431 .await?;
432 let mut actual_chunk_ranges = ChunkRanges::empty();
433 while let Some(item) = ranges.next().await {
434 let item = item?;
435 let offset = item.start.to_bytes();
436 actual_chunk_ranges |= ChunkRanges::from(item);
437 tx.try_send(ValidateProgress::EntryProgress { id, offset })?;
438 }
439 let expected_chunk_range =
440 ChunkRanges::from(..BaoTree::new(size, IROH_BLOCK_SIZE).chunks());
441 let incomplete = actual_chunk_ranges == expected_chunk_range;
442 let error = if incomplete {
443 None
444 } else {
445 Some(format!(
446 "expected chunk ranges {:?}, got chunk ranges {:?}",
447 expected_chunk_range, actual_chunk_ranges
448 ))
449 };
450 tx.send(ValidateProgress::EntryDone { id, error }).await?;
451 drop(ranges);
452 drop(entry);
453 io::Result::Ok((hash, incomplete))
454 })
455 })
456 .buffered_unordered(validate_parallelism)
457 .collect::<Vec<_>>()
458 .await;
459 let partial_result = futures_lite::stream::iter(partial)
460 .map(|hash| {
461 let store = store.clone();
462 let tx = tx.clone();
463 lp.spawn(move || async move {
464 let entry = store
465 .get(&hash)
466 .await?
467 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
468 let size = entry.size().value();
469 let outboard = entry.outboard().await?;
470 let data = entry.data_reader().await?;
471 let chunk_ranges = ChunkRanges::all();
472 let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
473 let id = tx.new_id();
474 tx.send(ValidateProgress::PartialEntry {
475 id,
476 hash,
477 path: None,
478 size,
479 })
480 .await?;
481 let mut actual_chunk_ranges = ChunkRanges::empty();
482 while let Some(item) = ranges.next().await {
483 let item = item?;
484 let offset = item.start.to_bytes();
485 actual_chunk_ranges |= ChunkRanges::from(item);
486 tx.try_send(ValidateProgress::PartialEntryProgress { id, offset })?;
487 }
488 tx.send(ValidateProgress::PartialEntryDone {
489 id,
490 ranges: RangeSpec::new(&actual_chunk_ranges),
491 })
492 .await?;
493 drop(ranges);
494 drop(entry);
495 io::Result::Ok(())
496 })
497 })
498 .buffered_unordered(validate_parallelism)
499 .collect::<Vec<_>>()
500 .await;
501 let mut to_downgrade = Vec::new();
502 for item in complete_result {
503 let (hash, incomplete) = item??;
504 if incomplete {
505 to_downgrade.push(hash);
506 }
507 }
508 for item in partial_result {
509 item??;
510 }
511 if repair {
512 return Err(io::Error::new(
513 io::ErrorKind::Other,
514 "repair not implemented",
515 ));
516 }
517 Ok(())
518}
519
520/// Configuration for the GC mark and sweep.
521#[derive(derive_more::Debug)]
522pub struct GcConfig {
523 /// The period at which to execute the GC.
524 pub period: Duration,
525 /// An optional callback called every time a GC round finishes.
526 #[debug("done_callback")]
527 pub done_callback: Option<Box<dyn Fn() + Send>>,
528}
529
530/// Implementation of the gc loop.
531pub(super) async fn gc_run_loop<S, F, Fut, G, Gut>(
532 store: &S,
533 config: GcConfig,
534 start_cb: F,
535 protected_cb: G,
536) where
537 S: Store,
538 F: Fn() -> Fut,
539 Fut: Future<Output = io::Result<()>> + Send,
540 G: Fn() -> Gut,
541 Gut: Future<Output = BTreeSet<Hash>> + Send,
542{
543 tracing::info!("Starting GC task with interval {:?}", config.period);
544 let mut live = BTreeSet::new();
545 'outer: loop {
546 if let Err(cause) = start_cb().await {
547 tracing::debug!("unable to notify the db of GC start: {cause}. Shutting down GC loop.");
548 break;
549 }
550 // do delay before the two phases of GC
551 tokio::time::sleep(config.period).await;
552 tracing::debug!("Starting GC");
553 live.clear();
554
555 let p = protected_cb().await;
556 live.extend(p);
557
558 tracing::debug!("Starting GC mark phase");
559 let live_ref = &mut live;
560 let mut stream = Gen::new(|co| async move {
561 if let Err(e) = gc_mark_task(store, live_ref, &co).await {
562 co.yield_(GcMarkEvent::Error(e)).await;
563 }
564 });
565 while let Some(item) = stream.next().await {
566 match item {
567 GcMarkEvent::CustomDebug(text) => {
568 tracing::debug!("{}", text);
569 }
570 GcMarkEvent::CustomWarning(text, _) => {
571 tracing::warn!("{}", text);
572 }
573 GcMarkEvent::Error(err) => {
574 tracing::error!("Fatal error during GC mark {}", err);
575 continue 'outer;
576 }
577 }
578 }
579 drop(stream);
580
581 tracing::debug!("Starting GC sweep phase");
582 let live_ref = &live;
583 let mut stream = Gen::new(|co| async move {
584 if let Err(e) = gc_sweep_task(store, live_ref, &co).await {
585 co.yield_(GcSweepEvent::Error(e)).await;
586 }
587 });
588 while let Some(item) = stream.next().await {
589 match item {
590 GcSweepEvent::CustomDebug(text) => {
591 tracing::debug!("{}", text);
592 }
593 GcSweepEvent::CustomWarning(text, _) => {
594 tracing::warn!("{}", text);
595 }
596 GcSweepEvent::Error(err) => {
597 tracing::error!("Fatal error during GC mark {}", err);
598 continue 'outer;
599 }
600 }
601 }
602 if let Some(ref cb) = config.done_callback {
603 cb();
604 }
605 }
606}
607
608/// Implementation of the gc method.
609pub(super) async fn gc_mark_task<'a>(
610 store: &'a impl Store,
611 live: &'a mut BTreeSet<Hash>,
612 co: &Co<GcMarkEvent>,
613) -> anyhow::Result<()> {
614 macro_rules! debug {
615 ($($arg:tt)*) => {
616 co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
617 };
618 }
619 macro_rules! warn {
620 ($($arg:tt)*) => {
621 co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
622 };
623 }
624 let mut roots = BTreeSet::new();
625 debug!("traversing tags");
626 for item in store.tags().await? {
627 let (name, haf) = item?;
628 debug!("adding root {:?} {:?}", name, haf);
629 roots.insert(haf);
630 }
631 debug!("traversing temp roots");
632 for haf in store.temp_tags() {
633 debug!("adding temp pin {:?}", haf);
634 roots.insert(haf);
635 }
636 for HashAndFormat { hash, format } in roots {
637 // we need to do this for all formats except raw
638 if live.insert(hash) && !format.is_raw() {
639 let Some(entry) = store.get(&hash).await? else {
640 warn!("gc: {} not found", hash);
641 continue;
642 };
643 if !entry.is_complete() {
644 warn!("gc: {} is partial", hash);
645 continue;
646 }
647 let Ok(reader) = entry.data_reader().await else {
648 warn!("gc: {} creating data reader failed", hash);
649 continue;
650 };
651 let Ok((mut stream, count)) = parse_hash_seq(reader).await else {
652 warn!("gc: {} parse failed", hash);
653 continue;
654 };
655 debug!("parsed collection {} {:?}", hash, count);
656 loop {
657 let item = match stream.next().await {
658 Ok(Some(item)) => item,
659 Ok(None) => break,
660 Err(_err) => {
661 warn!("gc: {} parse failed", hash);
662 break;
663 }
664 };
665 // if format != raw we would have to recurse here by adding this to current
666 live.insert(item);
667 }
668 }
669 }
670 debug!("gc mark done. found {} live blobs", live.len());
671 Ok(())
672}
673
674async fn gc_sweep_task(
675 store: &impl Store,
676 live: &BTreeSet<Hash>,
677 co: &Co<GcSweepEvent>,
678) -> anyhow::Result<()> {
679 let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
680 let mut count = 0;
681 let mut batch = Vec::new();
682 for hash in blobs {
683 let hash = hash?;
684 if !live.contains(&hash) {
685 batch.push(hash);
686 count += 1;
687 }
688 if batch.len() >= 100 {
689 store.delete(batch.clone()).await?;
690 batch.clear();
691 }
692 }
693 if !batch.is_empty() {
694 store.delete(batch).await?;
695 }
696 co.yield_(GcSweepEvent::CustomDebug(format!(
697 "deleted {} blobs",
698 count
699 )))
700 .await;
701 Ok(())
702}
703
704/// An event related to GC
705#[derive(Debug)]
706pub enum GcMarkEvent {
707 /// A custom event (info)
708 CustomDebug(String),
709 /// A custom non critical error
710 CustomWarning(String, Option<anyhow::Error>),
711 /// An unrecoverable error during GC
712 Error(anyhow::Error),
713}
714
715/// An event related to GC
716#[derive(Debug)]
717pub enum GcSweepEvent {
718 /// A custom event (debug)
719 CustomDebug(String),
720 /// A custom non critical error
721 CustomWarning(String, Option<anyhow::Error>),
722 /// An unrecoverable error during GC
723 Error(anyhow::Error),
724}
725
726/// Progress messages for an import operation
727///
728/// An import operation involves computing the outboard of a file, and then
729/// either copying or moving the file into the database.
730#[allow(missing_docs)]
731#[derive(Debug)]
732pub enum ImportProgress {
733 /// Found a path
734 ///
735 /// This will be the first message for an id
736 Found { id: u64, name: String },
737 /// Progress when copying the file to the store
738 ///
739 /// This will be omitted if the store can use the file in place
740 ///
741 /// There will be multiple of these messages for an id
742 CopyProgress { id: u64, offset: u64 },
743 /// Determined the size
744 ///
745 /// This will come after `Found` and zero or more `CopyProgress` messages.
746 /// For unstable files, determining the size will only be done once the file
747 /// is fully copied.
748 Size { id: u64, size: u64 },
749 /// Progress when computing the outboard
750 ///
751 /// There will be multiple of these messages for an id
752 OutboardProgress { id: u64, offset: u64 },
753 /// Done computing the outboard
754 ///
755 /// This comes after `Size` and zero or more `OutboardProgress` messages
756 OutboardDone { id: u64, hash: Hash },
757}
758
759/// The import mode describes how files will be imported.
760///
761/// This is a hint to the import trait method. For some implementations, this
762/// does not make any sense. E.g. an in memory implementation will always have
763/// to copy the file into memory. Also, a disk based implementation might choose
764/// to copy small files even if the mode is `Reference`.
765#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
766pub enum ImportMode {
767 /// This mode will copy the file into the database before hashing.
768 ///
769 /// This is the safe default because the file can not be accidentally modified
770 /// after it has been imported.
771 #[default]
772 Copy,
773 /// This mode will try to reference the file in place and assume it is unchanged after import.
774 ///
775 /// This has a large performance and storage benefit, but it is less safe since
776 /// the file might be modified after it has been imported.
777 ///
778 /// Stores are allowed to ignore this mode and always copy the file, e.g.
779 /// if the file is very small or if the store does not support referencing files.
780 TryReference,
781}
782/// The import mode describes how files will be imported.
783///
784/// This is a hint to the import trait method. For some implementations, this
785/// does not make any sense. E.g. an in memory implementation will always have
786/// to copy the file into memory. Also, a disk based implementation might choose
787/// to copy small files even if the mode is `Reference`.
788#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
789pub enum ExportMode {
790 /// This mode will copy the file to the target directory.
791 ///
792 /// This is the safe default because the file can not be accidentally modified
793 /// after it has been exported.
794 #[default]
795 Copy,
796 /// This mode will try to move the file to the target directory and then reference it from
797 /// the database.
798 ///
799 /// This has a large performance and storage benefit, but it is less safe since
800 /// the file might be modified in the target directory after it has been exported.
801 ///
802 /// Stores are allowed to ignore this mode and always copy the file, e.g.
803 /// if the file is very small or if the store does not support referencing files.
804 TryReference,
805}
806
807/// The expected format of a hash being exported.
808#[derive(Debug, Clone, Serialize, Deserialize, Default)]
809pub enum ExportFormat {
810 /// The hash refers to any blob and will be exported to a single file.
811 #[default]
812 Blob,
813 /// The hash refers to a [`crate::format::collection::Collection`] blob
814 /// and all children of the collection shall be exported to one file per child.
815 ///
816 /// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
817 /// collection metadata, all other children of the collection will be exported to
818 /// a file each, with their collection name treated as a relative path to the export
819 /// destination path.
820 ///
821 /// If the blob cannot be parsed as a collection, the operation will fail.
822 Collection,
823}
824
825#[allow(missing_docs)]
826#[derive(Debug)]
827pub enum ExportProgress {
828 /// Starting to export to a file
829 ///
830 /// This will be the first message for an id
831 Start {
832 id: u64,
833 hash: Hash,
834 path: PathBuf,
835 stable: bool,
836 },
837 /// Progress when copying the file to the target
838 ///
839 /// This will be omitted if the store can move the file or use copy on write
840 ///
841 /// There will be multiple of these messages for an id
842 Progress { id: u64, offset: u64 },
843 /// Done exporting
844 Done { id: u64 },
845}
846
847/// Level for generic validation messages
848#[derive(
849 Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,
850)]
851pub enum ReportLevel {
852 /// Very unimportant info messages
853 Trace,
854 /// Info messages
855 Info,
856 /// Warnings, something is not quite right
857 Warn,
858 /// Errors, something is very wrong
859 Error,
860}
861
862/// Progress updates for the validate operation
863#[derive(Debug, Serialize, Deserialize)]
864pub enum ConsistencyCheckProgress {
865 /// Consistency check started
866 Start,
867 /// Consistency check update
868 Update {
869 /// The message
870 message: String,
871 /// The entry this message is about, if any
872 entry: Option<Hash>,
873 /// The level of the message
874 level: ReportLevel,
875 },
876 /// Consistency check ended
877 Done,
878 /// We got an error and need to abort.
879 Abort(serde_error::Error),
880}
881
882/// Progress updates for the validate operation
883#[derive(Debug, Serialize, Deserialize)]
884pub enum ValidateProgress {
885 /// started validating
886 Starting {
887 /// The total number of entries to validate
888 total: u64,
889 },
890 /// We started validating a complete entry
891 Entry {
892 /// a new unique id for this entry
893 id: u64,
894 /// the hash of the entry
895 hash: Hash,
896 /// location of the entry.
897 ///
898 /// In case of a file, this is the path to the file.
899 /// Otherwise it might be an url or something else to uniquely identify the entry.
900 path: Option<String>,
901 /// The size of the entry, in bytes.
902 size: u64,
903 },
904 /// We got progress ingesting item `id`.
905 EntryProgress {
906 /// The unique id of the entry.
907 id: u64,
908 /// The offset of the progress, in bytes.
909 offset: u64,
910 },
911 /// We are done with `id`
912 EntryDone {
913 /// The unique id of the entry.
914 id: u64,
915 /// An error if we failed to validate the entry.
916 error: Option<String>,
917 },
918 /// We started validating an entry
919 PartialEntry {
920 /// a new unique id for this entry
921 id: u64,
922 /// the hash of the entry
923 hash: Hash,
924 /// location of the entry.
925 ///
926 /// In case of a file, this is the path to the file.
927 /// Otherwise it might be an url or something else to uniquely identify the entry.
928 path: Option<String>,
929 /// The best known size of the entry, in bytes.
930 size: u64,
931 },
932 /// We got progress ingesting item `id`.
933 PartialEntryProgress {
934 /// The unique id of the entry.
935 id: u64,
936 /// The offset of the progress, in bytes.
937 offset: u64,
938 },
939 /// We are done with `id`
940 PartialEntryDone {
941 /// The unique id of the entry.
942 id: u64,
943 /// Available ranges.
944 ranges: RangeSpec,
945 },
946 /// We are done with the whole operation.
947 AllDone,
948 /// We got an error and need to abort.
949 Abort(serde_error::Error),
950}
951
952/// Database events
953#[derive(Debug, Clone, PartialEq, Eq)]
954pub enum Event {
955 /// A GC was started
956 GcStarted,
957 /// A GC was completed
958 GcCompleted,
959}