iroh_blobs/store/traits.rs
1//! Traits for in-memory or persistent maps of blob with bao encoded outboards.
2use std::{collections::BTreeSet, future::Future, io, path::PathBuf, time::Duration};
3
4pub use bao_tree;
5use bao_tree::{
6 io::fsm::{BaoContentItem, Outboard},
7 BaoTree, ChunkRanges,
8};
9use bytes::Bytes;
10use futures_lite::{Stream, StreamExt};
11use genawaiter::rc::{Co, Gen};
12use iroh_io::AsyncSliceReader;
13pub use range_collections;
14use serde::{Deserialize, Serialize};
15use tokio::io::AsyncRead;
16
17use crate::{
18 hashseq::parse_hash_seq,
19 protocol::RangeSpec,
20 util::{
21 local_pool::{self, LocalPool},
22 progress::{BoxedProgressSender, IdGenerator, ProgressSender},
23 Tag,
24 },
25 BlobFormat, Hash, HashAndFormat, TempTag, IROH_BLOCK_SIZE,
26};
27
28/// A fallible but owned iterator over the entries in a store.
29pub type DbIter<T> = Box<dyn Iterator<Item = io::Result<T>> + Send + Sync + 'static>;
30
31/// Export trogress callback
32pub type ExportProgressCb = Box<dyn Fn(u64) -> io::Result<()> + Send + Sync + 'static>;
33
34/// The availability status of an entry in a store.
35#[derive(Debug, Clone, Eq, PartialEq)]
36pub enum EntryStatus {
37 /// The entry is completely available.
38 Complete,
39 /// The entry is partially available.
40 Partial,
41 /// The entry is not in the store.
42 NotFound,
43}
44
45/// The size of a bao file
46#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq)]
47pub enum BaoBlobSize {
48 /// A remote side told us the size, but we have insufficient data to verify it.
49 Unverified(u64),
50 /// We have verified the size.
51 Verified(u64),
52}
53
54impl BaoBlobSize {
55 /// Create a new `BaoFileSize` with the given size and verification status.
56 pub fn new(size: u64, verified: bool) -> Self {
57 if verified {
58 BaoBlobSize::Verified(size)
59 } else {
60 BaoBlobSize::Unverified(size)
61 }
62 }
63
64 /// Get just the value, no matter if it is verified or not.
65 pub fn value(&self) -> u64 {
66 match self {
67 BaoBlobSize::Unverified(size) => *size,
68 BaoBlobSize::Verified(size) => *size,
69 }
70 }
71}
72
73/// An entry for one hash in a bao map
74///
75/// The entry has the ability to provide you with an (outboard, data)
76/// reader pair. Creating the reader is async and may fail. The futures that
77/// create the readers must be `Send`, but the readers themselves don't have to
78/// be.
79pub trait MapEntry: std::fmt::Debug + Clone + Send + Sync + 'static {
80 /// The hash of the entry.
81 fn hash(&self) -> Hash;
82 /// The size of the entry.
83 fn size(&self) -> BaoBlobSize;
84 /// Returns `true` if the entry is complete.
85 ///
86 /// Note that this does not actually verify if the bytes on disk are complete,
87 /// it only checks if the entry was marked as complete in the store.
88 fn is_complete(&self) -> bool;
89 /// A future that resolves to a reader that can be used to read the outboard
90 fn outboard(&self) -> impl Future<Output = io::Result<impl Outboard>> + Send;
91 /// A future that resolves to a reader that can be used to read the data
92 fn data_reader(&self) -> impl Future<Output = io::Result<impl AsyncSliceReader>> + Send;
93}
94
95/// A generic map from hashes to bao blobs (blobs with bao outboards).
96///
97/// This is the readonly view. To allow updates, a concrete implementation must
98/// also implement [`MapMut`].
99///
100/// Entries are *not* guaranteed to be complete for all implementations.
101/// They are also not guaranteed to be immutable, since this could be the
102/// readonly view of a mutable store.
103pub trait Map: Clone + Send + Sync + 'static {
104 /// The entry type. An entry is a cheaply cloneable handle that can be used
105 /// to open readers for both the data and the outboard
106 type Entry: MapEntry;
107 /// Get an entry for a hash.
108 ///
109 /// This can also be used for a membership test by just checking if there
110 /// is an entry. Creating an entry should be cheap, any expensive ops should
111 /// be deferred to the creation of the actual readers.
112 ///
113 /// It is not guaranteed that the entry is complete.
114 fn get(&self, hash: &Hash) -> impl Future<Output = io::Result<Option<Self::Entry>>> + Send;
115}
116
117/// A partial entry
118pub trait MapEntryMut: MapEntry {
119 /// Get a batch writer
120 fn batch_writer(&self) -> impl Future<Output = io::Result<impl BaoBatchWriter>> + Send;
121}
122
123/// An async batch interface for writing bao content items to a pair of data and
124/// outboard.
125///
126/// Details like the chunk group size and the actual storage location are left
127/// to the implementation.
128pub trait BaoBatchWriter {
129 /// Write a batch of bao content items to the underlying storage.
130 ///
131 /// The batch is guaranteed to be sorted as data is received from the network.
132 /// So leaves will be sorted by offset, and parents will be sorted by pre order
133 /// traversal offset. There is no guarantee that they will be consecutive
134 /// though.
135 ///
136 /// The size is the total size of the blob that the remote side told us.
137 /// It is not guaranteed to be correct, but it is guaranteed to be
138 /// consistent with all data in the batch. The size therefore represents
139 /// an upper bound on the maximum offset of all leaf items.
140 /// So it is guaranteed that `leaf.offset + leaf.size <= size` for all
141 /// leaf items in the batch.
142 ///
143 /// Batches should not become too large. Typically, a batch is just a few
144 /// parent nodes and a leaf.
145 ///
146 /// Batch is a vec so it can be moved into a task, which is unfortunately
147 /// necessary in typical io code.
148 fn write_batch(
149 &mut self,
150 size: u64,
151 batch: Vec<BaoContentItem>,
152 ) -> impl Future<Output = io::Result<()>>;
153
154 /// Sync the written data to permanent storage, if applicable.
155 /// E.g. for a file based implementation, this would call sync_data
156 /// on all files.
157 fn sync(&mut self) -> impl Future<Output = io::Result<()>>;
158}
159
160/// Implement BaoBatchWriter for mutable references
161impl<W: BaoBatchWriter> BaoBatchWriter for &mut W {
162 async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
163 (**self).write_batch(size, batch).await
164 }
165
166 async fn sync(&mut self) -> io::Result<()> {
167 (**self).sync().await
168 }
169}
170
171/// A wrapper around a batch writer that calls a progress callback for one leaf
172/// per batch.
173#[derive(Debug)]
174pub(crate) struct FallibleProgressBatchWriter<W, F>(W, F);
175
176impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static>
177 FallibleProgressBatchWriter<W, F>
178{
179 /// Create a new `FallibleProgressBatchWriter` from an inner writer and a progress callback
180 ///
181 /// The `on_write` function is called for each write, with the `offset` as the first and the
182 /// length of the data as the second param. `on_write` must return an `io::Result`.
183 /// If `on_write` returns an error, the download is aborted.
184 pub fn new(inner: W, on_write: F) -> Self {
185 Self(inner, on_write)
186 }
187}
188
189impl<W: BaoBatchWriter, F: Fn(u64, usize) -> io::Result<()> + 'static> BaoBatchWriter
190 for FallibleProgressBatchWriter<W, F>
191{
192 async fn write_batch(&mut self, size: u64, batch: Vec<BaoContentItem>) -> io::Result<()> {
193 // find the offset and length of the first (usually only) chunk
194 let chunk = batch
195 .iter()
196 .filter_map(|item| {
197 if let BaoContentItem::Leaf(leaf) = item {
198 Some((leaf.offset, leaf.data.len()))
199 } else {
200 None
201 }
202 })
203 .next();
204 self.0.write_batch(size, batch).await?;
205 // call the progress callback
206 if let Some((offset, len)) = chunk {
207 (self.1)(offset, len)?;
208 }
209 Ok(())
210 }
211
212 async fn sync(&mut self) -> io::Result<()> {
213 self.0.sync().await
214 }
215}
216
217/// A mutable bao map.
218///
219/// This extends the readonly [`Map`] trait with methods to create and modify entries.
220pub trait MapMut: Map {
221 /// An entry that is possibly writable
222 type EntryMut: MapEntryMut;
223
224 /// Get an existing entry as an EntryMut.
225 ///
226 /// For implementations where EntryMut and Entry are the same type, this is just an alias for
227 /// `get`.
228 fn get_mut(
229 &self,
230 hash: &Hash,
231 ) -> impl Future<Output = io::Result<Option<Self::EntryMut>>> + Send;
232
233 /// Get an existing partial entry, or create a new one.
234 ///
235 /// We need to know the size of the partial entry. This might produce an
236 /// error e.g. if there is not enough space on disk.
237 fn get_or_create(
238 &self,
239 hash: Hash,
240 size: u64,
241 ) -> impl Future<Output = io::Result<Self::EntryMut>> + Send;
242
243 /// Find out if the data behind a `hash` is complete, partial, or not present.
244 ///
245 /// Note that this does not actually verify the on-disc data, but only checks in which section
246 /// of the store the entry is present.
247 fn entry_status(&self, hash: &Hash) -> impl Future<Output = io::Result<EntryStatus>> + Send;
248
249 /// Sync version of `entry_status`, for the doc sync engine until we can get rid of it.
250 ///
251 /// Don't count on this to be efficient.
252 fn entry_status_sync(&self, hash: &Hash) -> io::Result<EntryStatus>;
253
254 /// Upgrade a partial entry to a complete entry.
255 fn insert_complete(&self, entry: Self::EntryMut)
256 -> impl Future<Output = io::Result<()>> + Send;
257}
258
259/// Extension of [`Map`] to add misc methods used by the rpc calls.
260pub trait ReadableStore: Map {
261 /// list all blobs in the database. This includes both raw blobs that have
262 /// been imported, and hash sequences that have been created internally.
263 fn blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
264 /// list all tags (collections or other explicitly added things) in the database
265 fn tags(
266 &self,
267 from: Option<Tag>,
268 to: Option<Tag>,
269 ) -> impl Future<Output = io::Result<DbIter<(Tag, HashAndFormat)>>> + Send;
270
271 /// Temp tags
272 fn temp_tags(&self) -> Box<dyn Iterator<Item = HashAndFormat> + Send + Sync + 'static>;
273
274 /// Perform a consistency check on the database
275 fn consistency_check(
276 &self,
277 repair: bool,
278 tx: BoxedProgressSender<ConsistencyCheckProgress>,
279 ) -> impl Future<Output = io::Result<()>> + Send;
280
281 /// list partial blobs in the database
282 fn partial_blobs(&self) -> impl Future<Output = io::Result<DbIter<Hash>>> + Send;
283
284 /// This trait method extracts a file to a local path.
285 ///
286 /// `hash` is the hash of the file
287 /// `target` is the path to the target file
288 /// `mode` is a hint how the file should be exported.
289 /// `progress` is a callback that is called with the total number of bytes that have been written
290 fn export(
291 &self,
292 hash: Hash,
293 target: PathBuf,
294 mode: ExportMode,
295 progress: ExportProgressCb,
296 ) -> impl Future<Output = io::Result<()>> + Send;
297}
298
299/// The mutable part of a Bao store.
300pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
301 /// This trait method imports a file from a local path.
302 ///
303 /// `data` is the path to the file.
304 /// `mode` is a hint how the file should be imported.
305 /// `progress` is a sender that provides a way for the importer to send progress messages
306 /// when importing large files. This also serves as a way to cancel the import. If the
307 /// consumer of the progress messages is dropped, subsequent attempts to send progress
308 /// will fail.
309 ///
310 /// Returns the hash of the imported file. The reason to have this method is that some database
311 /// implementations might be able to import a file without copying it.
312 fn import_file(
313 &self,
314 data: PathBuf,
315 mode: ImportMode,
316 format: BlobFormat,
317 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
318 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
319
320 /// Import data from memory.
321 ///
322 /// It is a special case of `import` that does not use the file system.
323 fn import_bytes(
324 &self,
325 bytes: Bytes,
326 format: BlobFormat,
327 ) -> impl Future<Output = io::Result<TempTag>> + Send;
328
329 /// Import data from a stream of bytes.
330 fn import_stream(
331 &self,
332 data: impl Stream<Item = io::Result<Bytes>> + Send + Unpin + 'static,
333 format: BlobFormat,
334 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
335 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send;
336
337 /// Import data from an async byte reader.
338 fn import_reader(
339 &self,
340 data: impl AsyncRead + Send + Unpin + 'static,
341 format: BlobFormat,
342 progress: impl ProgressSender<Msg = ImportProgress> + IdGenerator,
343 ) -> impl Future<Output = io::Result<(TempTag, u64)>> + Send {
344 let stream = tokio_util::io::ReaderStream::new(data);
345 self.import_stream(stream, format, progress)
346 }
347
348 /// Set a tag
349 fn set_tag(
350 &self,
351 name: Tag,
352 hash: HashAndFormat,
353 ) -> impl Future<Output = io::Result<()>> + Send;
354
355 /// Rename a tag
356 fn rename_tag(&self, from: Tag, to: Tag) -> impl Future<Output = io::Result<()>> + Send;
357
358 /// Delete a single tag
359 fn delete_tag(&self, name: Tag) -> impl Future<Output = io::Result<()>> + Send {
360 self.delete_tags(Some(name.clone()), Some(name.successor()))
361 }
362
363 /// Bulk delete tags
364 fn delete_tags(
365 &self,
366 from: Option<Tag>,
367 to: Option<Tag>,
368 ) -> impl Future<Output = io::Result<()>> + Send;
369
370 /// Create a new tag
371 fn create_tag(&self, hash: HashAndFormat) -> impl Future<Output = io::Result<Tag>> + Send;
372
373 /// Create a temporary pin for this store
374 fn temp_tag(&self, value: HashAndFormat) -> TempTag;
375
376 /// Start the GC loop
377 ///
378 /// The gc task will shut down, when dropping the returned future.
379 fn gc_run<G, Gut>(&self, config: super::GcConfig, protected_cb: G) -> impl Future<Output = ()>
380 where
381 G: Fn() -> Gut,
382 Gut: Future<Output = BTreeSet<Hash>> + Send;
383
384 /// physically delete the given hashes from the store.
385 fn delete(&self, hashes: Vec<Hash>) -> impl Future<Output = io::Result<()>> + Send;
386
387 /// Shutdown the store.
388 fn shutdown(&self) -> impl Future<Output = ()> + Send;
389
390 /// Sync the store.
391 fn sync(&self) -> impl Future<Output = io::Result<()>> + Send;
392
393 /// Validate the database
394 ///
395 /// This will check that the file and outboard content is correct for all complete
396 /// entries, and output valid ranges for all partial entries.
397 ///
398 /// It will not check the internal consistency of the database.
399 fn validate(
400 &self,
401 repair: bool,
402 tx: BoxedProgressSender<ValidateProgress>,
403 ) -> impl Future<Output = io::Result<()>> + Send {
404 validate_impl(self, repair, tx)
405 }
406}
407
408async fn validate_impl(
409 store: &impl Store,
410 repair: bool,
411 tx: BoxedProgressSender<ValidateProgress>,
412) -> io::Result<()> {
413 use futures_buffered::BufferedStreamExt;
414
415 let validate_parallelism: usize = num_cpus::get();
416 let lp = LocalPool::new(local_pool::Config {
417 threads: validate_parallelism,
418 ..Default::default()
419 });
420 let complete = store.blobs().await?.collect::<io::Result<Vec<_>>>()?;
421 let partial = store
422 .partial_blobs()
423 .await?
424 .collect::<io::Result<Vec<_>>>()?;
425 tx.send(ValidateProgress::Starting {
426 total: complete.len() as u64,
427 })
428 .await?;
429 let complete_result = futures_lite::stream::iter(complete)
430 .map(|hash| {
431 let store = store.clone();
432 let tx = tx.clone();
433 lp.spawn(move || async move {
434 let entry = store
435 .get(&hash)
436 .await?
437 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
438 let size = entry.size().value();
439 let outboard = entry.outboard().await?;
440 let data = entry.data_reader().await?;
441 let chunk_ranges = ChunkRanges::all();
442 let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
443 let id = tx.new_id();
444 tx.send(ValidateProgress::Entry {
445 id,
446 hash,
447 path: None,
448 size,
449 })
450 .await?;
451 let mut actual_chunk_ranges = ChunkRanges::empty();
452 while let Some(item) = ranges.next().await {
453 let item = item?;
454 let offset = item.start.to_bytes();
455 actual_chunk_ranges |= ChunkRanges::from(item);
456 tx.try_send(ValidateProgress::EntryProgress { id, offset })?;
457 }
458 let expected_chunk_range =
459 ChunkRanges::from(..BaoTree::new(size, IROH_BLOCK_SIZE).chunks());
460 let incomplete = actual_chunk_ranges == expected_chunk_range;
461 let error = if incomplete {
462 None
463 } else {
464 Some(format!(
465 "expected chunk ranges {:?}, got chunk ranges {:?}",
466 expected_chunk_range, actual_chunk_ranges
467 ))
468 };
469 tx.send(ValidateProgress::EntryDone { id, error }).await?;
470 drop(ranges);
471 drop(entry);
472 io::Result::Ok((hash, incomplete))
473 })
474 })
475 .buffered_unordered(validate_parallelism)
476 .collect::<Vec<_>>()
477 .await;
478 let partial_result = futures_lite::stream::iter(partial)
479 .map(|hash| {
480 let store = store.clone();
481 let tx = tx.clone();
482 lp.spawn(move || async move {
483 let entry = store
484 .get(&hash)
485 .await?
486 .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "entry not found"))?;
487 let size = entry.size().value();
488 let outboard = entry.outboard().await?;
489 let data = entry.data_reader().await?;
490 let chunk_ranges = ChunkRanges::all();
491 let mut ranges = bao_tree::io::fsm::valid_ranges(outboard, data, &chunk_ranges);
492 let id = tx.new_id();
493 tx.send(ValidateProgress::PartialEntry {
494 id,
495 hash,
496 path: None,
497 size,
498 })
499 .await?;
500 let mut actual_chunk_ranges = ChunkRanges::empty();
501 while let Some(item) = ranges.next().await {
502 let item = item?;
503 let offset = item.start.to_bytes();
504 actual_chunk_ranges |= ChunkRanges::from(item);
505 tx.try_send(ValidateProgress::PartialEntryProgress { id, offset })?;
506 }
507 tx.send(ValidateProgress::PartialEntryDone {
508 id,
509 ranges: RangeSpec::new(&actual_chunk_ranges),
510 })
511 .await?;
512 drop(ranges);
513 drop(entry);
514 io::Result::Ok(())
515 })
516 })
517 .buffered_unordered(validate_parallelism)
518 .collect::<Vec<_>>()
519 .await;
520 let mut to_downgrade = Vec::new();
521 for item in complete_result {
522 let (hash, incomplete) = item??;
523 if incomplete {
524 to_downgrade.push(hash);
525 }
526 }
527 for item in partial_result {
528 item??;
529 }
530 if repair {
531 return Err(io::Error::new(
532 io::ErrorKind::Other,
533 "repair not implemented",
534 ));
535 }
536 Ok(())
537}
538
539/// Configuration for the GC mark and sweep.
540#[derive(derive_more::Debug)]
541pub struct GcConfig {
542 /// The period at which to execute the GC.
543 pub period: Duration,
544 /// An optional callback called every time a GC round finishes.
545 #[debug("done_callback")]
546 pub done_callback: Option<Box<dyn Fn() + Send>>,
547}
548
549/// Implementation of the gc loop.
550pub(super) async fn gc_run_loop<S, F, Fut, G, Gut>(
551 store: &S,
552 config: GcConfig,
553 start_cb: F,
554 protected_cb: G,
555) where
556 S: Store,
557 F: Fn() -> Fut,
558 Fut: Future<Output = io::Result<()>> + Send,
559 G: Fn() -> Gut,
560 Gut: Future<Output = BTreeSet<Hash>> + Send,
561{
562 tracing::info!("Starting GC task with interval {:?}", config.period);
563 let mut live = BTreeSet::new();
564 'outer: loop {
565 if let Err(cause) = start_cb().await {
566 tracing::debug!("unable to notify the db of GC start: {cause}. Shutting down GC loop.");
567 break;
568 }
569 // do delay before the two phases of GC
570 tokio::time::sleep(config.period).await;
571 tracing::debug!("Starting GC");
572 live.clear();
573
574 let p = protected_cb().await;
575 live.extend(p);
576
577 tracing::debug!("Starting GC mark phase");
578 let live_ref = &mut live;
579 let mut stream = Gen::new(|co| async move {
580 if let Err(e) = gc_mark_task(store, live_ref, &co).await {
581 co.yield_(GcMarkEvent::Error(e)).await;
582 }
583 });
584 while let Some(item) = stream.next().await {
585 match item {
586 GcMarkEvent::CustomDebug(text) => {
587 tracing::debug!("{}", text);
588 }
589 GcMarkEvent::CustomWarning(text, _) => {
590 tracing::warn!("{}", text);
591 }
592 GcMarkEvent::Error(err) => {
593 tracing::error!("Fatal error during GC mark {}", err);
594 continue 'outer;
595 }
596 }
597 }
598 drop(stream);
599
600 tracing::debug!("Starting GC sweep phase");
601 let live_ref = &live;
602 let mut stream = Gen::new(|co| async move {
603 if let Err(e) = gc_sweep_task(store, live_ref, &co).await {
604 co.yield_(GcSweepEvent::Error(e)).await;
605 }
606 });
607 while let Some(item) = stream.next().await {
608 match item {
609 GcSweepEvent::CustomDebug(text) => {
610 tracing::debug!("{}", text);
611 }
612 GcSweepEvent::CustomWarning(text, _) => {
613 tracing::warn!("{}", text);
614 }
615 GcSweepEvent::Error(err) => {
616 tracing::error!("Fatal error during GC mark {}", err);
617 continue 'outer;
618 }
619 }
620 }
621 if let Some(ref cb) = config.done_callback {
622 cb();
623 }
624 }
625}
626
627/// Implementation of the gc method.
628pub(super) async fn gc_mark_task<'a>(
629 store: &'a impl Store,
630 live: &'a mut BTreeSet<Hash>,
631 co: &Co<GcMarkEvent>,
632) -> anyhow::Result<()> {
633 macro_rules! debug {
634 ($($arg:tt)*) => {
635 co.yield_(GcMarkEvent::CustomDebug(format!($($arg)*))).await;
636 };
637 }
638 macro_rules! warn {
639 ($($arg:tt)*) => {
640 co.yield_(GcMarkEvent::CustomWarning(format!($($arg)*), None)).await;
641 };
642 }
643 let mut roots = BTreeSet::new();
644 debug!("traversing tags");
645 for item in store.tags(None, None).await? {
646 let (name, haf) = item?;
647 debug!("adding root {:?} {:?}", name, haf);
648 roots.insert(haf);
649 }
650 debug!("traversing temp roots");
651 for haf in store.temp_tags() {
652 debug!("adding temp pin {:?}", haf);
653 roots.insert(haf);
654 }
655 for HashAndFormat { hash, format } in roots {
656 // we need to do this for all formats except raw
657 if live.insert(hash) && !format.is_raw() {
658 let Some(entry) = store.get(&hash).await? else {
659 warn!("gc: {} not found", hash);
660 continue;
661 };
662 if !entry.is_complete() {
663 warn!("gc: {} is partial", hash);
664 continue;
665 }
666 let Ok(reader) = entry.data_reader().await else {
667 warn!("gc: {} creating data reader failed", hash);
668 continue;
669 };
670 let Ok((mut stream, count)) = parse_hash_seq(reader).await else {
671 warn!("gc: {} parse failed", hash);
672 continue;
673 };
674 debug!("parsed collection {} {:?}", hash, count);
675 loop {
676 let item = match stream.next().await {
677 Ok(Some(item)) => item,
678 Ok(None) => break,
679 Err(_err) => {
680 warn!("gc: {} parse failed", hash);
681 break;
682 }
683 };
684 // if format != raw we would have to recurse here by adding this to current
685 live.insert(item);
686 }
687 }
688 }
689 debug!("gc mark done. found {} live blobs", live.len());
690 Ok(())
691}
692
693async fn gc_sweep_task(
694 store: &impl Store,
695 live: &BTreeSet<Hash>,
696 co: &Co<GcSweepEvent>,
697) -> anyhow::Result<()> {
698 let blobs = store.blobs().await?.chain(store.partial_blobs().await?);
699 let mut count = 0;
700 let mut batch = Vec::new();
701 for hash in blobs {
702 let hash = hash?;
703 if !live.contains(&hash) {
704 batch.push(hash);
705 count += 1;
706 }
707 if batch.len() >= 100 {
708 store.delete(batch.clone()).await?;
709 batch.clear();
710 }
711 }
712 if !batch.is_empty() {
713 store.delete(batch).await?;
714 }
715 co.yield_(GcSweepEvent::CustomDebug(format!(
716 "deleted {} blobs",
717 count
718 )))
719 .await;
720 Ok(())
721}
722
723/// An event related to GC
724#[derive(Debug)]
725pub enum GcMarkEvent {
726 /// A custom event (info)
727 CustomDebug(String),
728 /// A custom non critical error
729 CustomWarning(String, Option<anyhow::Error>),
730 /// An unrecoverable error during GC
731 Error(anyhow::Error),
732}
733
734/// An event related to GC
735#[derive(Debug)]
736pub enum GcSweepEvent {
737 /// A custom event (debug)
738 CustomDebug(String),
739 /// A custom non critical error
740 CustomWarning(String, Option<anyhow::Error>),
741 /// An unrecoverable error during GC
742 Error(anyhow::Error),
743}
744
745/// Progress messages for an import operation
746///
747/// An import operation involves computing the outboard of a file, and then
748/// either copying or moving the file into the database.
749#[allow(missing_docs)]
750#[derive(Debug)]
751pub enum ImportProgress {
752 /// Found a path
753 ///
754 /// This will be the first message for an id
755 Found { id: u64, name: String },
756 /// Progress when copying the file to the store
757 ///
758 /// This will be omitted if the store can use the file in place
759 ///
760 /// There will be multiple of these messages for an id
761 CopyProgress { id: u64, offset: u64 },
762 /// Determined the size
763 ///
764 /// This will come after `Found` and zero or more `CopyProgress` messages.
765 /// For unstable files, determining the size will only be done once the file
766 /// is fully copied.
767 Size { id: u64, size: u64 },
768 /// Progress when computing the outboard
769 ///
770 /// There will be multiple of these messages for an id
771 OutboardProgress { id: u64, offset: u64 },
772 /// Done computing the outboard
773 ///
774 /// This comes after `Size` and zero or more `OutboardProgress` messages
775 OutboardDone { id: u64, hash: Hash },
776}
777
778/// The import mode describes how files will be imported.
779///
780/// This is a hint to the import trait method. For some implementations, this
781/// does not make any sense. E.g. an in memory implementation will always have
782/// to copy the file into memory. Also, a disk based implementation might choose
783/// to copy small files even if the mode is `Reference`.
784#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
785pub enum ImportMode {
786 /// This mode will copy the file into the database before hashing.
787 ///
788 /// This is the safe default because the file can not be accidentally modified
789 /// after it has been imported.
790 #[default]
791 Copy,
792 /// This mode will try to reference the file in place and assume it is unchanged after import.
793 ///
794 /// This has a large performance and storage benefit, but it is less safe since
795 /// the file might be modified after it has been imported.
796 ///
797 /// Stores are allowed to ignore this mode and always copy the file, e.g.
798 /// if the file is very small or if the store does not support referencing files.
799 TryReference,
800}
801/// The import mode describes how files will be imported.
802///
803/// This is a hint to the import trait method. For some implementations, this
804/// does not make any sense. E.g. an in memory implementation will always have
805/// to copy the file into memory. Also, a disk based implementation might choose
806/// to copy small files even if the mode is `Reference`.
807#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
808pub enum ExportMode {
809 /// This mode will copy the file to the target directory.
810 ///
811 /// This is the safe default because the file can not be accidentally modified
812 /// after it has been exported.
813 #[default]
814 Copy,
815 /// This mode will try to move the file to the target directory and then reference it from
816 /// the database.
817 ///
818 /// This has a large performance and storage benefit, but it is less safe since
819 /// the file might be modified in the target directory after it has been exported.
820 ///
821 /// Stores are allowed to ignore this mode and always copy the file, e.g.
822 /// if the file is very small or if the store does not support referencing files.
823 TryReference,
824}
825
826/// The expected format of a hash being exported.
827#[derive(Debug, Clone, Serialize, Deserialize, Default)]
828pub enum ExportFormat {
829 /// The hash refers to any blob and will be exported to a single file.
830 #[default]
831 Blob,
832 /// The hash refers to a [`crate::format::collection::Collection`] blob
833 /// and all children of the collection shall be exported to one file per child.
834 ///
835 /// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
836 /// collection metadata, all other children of the collection will be exported to
837 /// a file each, with their collection name treated as a relative path to the export
838 /// destination path.
839 ///
840 /// If the blob cannot be parsed as a collection, the operation will fail.
841 Collection,
842}
843
844#[allow(missing_docs)]
845#[derive(Debug)]
846pub enum ExportProgress {
847 /// Starting to export to a file
848 ///
849 /// This will be the first message for an id
850 Start {
851 id: u64,
852 hash: Hash,
853 path: PathBuf,
854 stable: bool,
855 },
856 /// Progress when copying the file to the target
857 ///
858 /// This will be omitted if the store can move the file or use copy on write
859 ///
860 /// There will be multiple of these messages for an id
861 Progress { id: u64, offset: u64 },
862 /// Done exporting
863 Done { id: u64 },
864}
865
866/// Level for generic validation messages
867#[derive(
868 Debug, Clone, Copy, derive_more::Display, Serialize, Deserialize, PartialOrd, Ord, PartialEq, Eq,
869)]
870pub enum ReportLevel {
871 /// Very unimportant info messages
872 Trace,
873 /// Info messages
874 Info,
875 /// Warnings, something is not quite right
876 Warn,
877 /// Errors, something is very wrong
878 Error,
879}
880
881/// Progress updates for the validate operation
882#[derive(Debug, Serialize, Deserialize)]
883pub enum ConsistencyCheckProgress {
884 /// Consistency check started
885 Start,
886 /// Consistency check update
887 Update {
888 /// The message
889 message: String,
890 /// The entry this message is about, if any
891 entry: Option<Hash>,
892 /// The level of the message
893 level: ReportLevel,
894 },
895 /// Consistency check ended
896 Done,
897 /// We got an error and need to abort.
898 Abort(serde_error::Error),
899}
900
901/// Progress updates for the validate operation
902#[derive(Debug, Serialize, Deserialize)]
903pub enum ValidateProgress {
904 /// started validating
905 Starting {
906 /// The total number of entries to validate
907 total: u64,
908 },
909 /// We started validating a complete entry
910 Entry {
911 /// a new unique id for this entry
912 id: u64,
913 /// the hash of the entry
914 hash: Hash,
915 /// location of the entry.
916 ///
917 /// In case of a file, this is the path to the file.
918 /// Otherwise it might be an url or something else to uniquely identify the entry.
919 path: Option<String>,
920 /// The size of the entry, in bytes.
921 size: u64,
922 },
923 /// We got progress ingesting item `id`.
924 EntryProgress {
925 /// The unique id of the entry.
926 id: u64,
927 /// The offset of the progress, in bytes.
928 offset: u64,
929 },
930 /// We are done with `id`
931 EntryDone {
932 /// The unique id of the entry.
933 id: u64,
934 /// An error if we failed to validate the entry.
935 error: Option<String>,
936 },
937 /// We started validating an entry
938 PartialEntry {
939 /// a new unique id for this entry
940 id: u64,
941 /// the hash of the entry
942 hash: Hash,
943 /// location of the entry.
944 ///
945 /// In case of a file, this is the path to the file.
946 /// Otherwise it might be an url or something else to uniquely identify the entry.
947 path: Option<String>,
948 /// The best known size of the entry, in bytes.
949 size: u64,
950 },
951 /// We got progress ingesting item `id`.
952 PartialEntryProgress {
953 /// The unique id of the entry.
954 id: u64,
955 /// The offset of the progress, in bytes.
956 offset: u64,
957 },
958 /// We are done with `id`
959 PartialEntryDone {
960 /// The unique id of the entry.
961 id: u64,
962 /// Available ranges.
963 ranges: RangeSpec,
964 },
965 /// We are done with the whole operation.
966 AllDone,
967 /// We got an error and need to abort.
968 Abort(serde_error::Error),
969}
970
971/// Database events
972#[derive(Debug, Clone, PartialEq, Eq)]
973pub enum Event {
974 /// A GC was started
975 GcStarted,
976 /// A GC was completed
977 GcCompleted,
978}