iroh_blobs/api/
proto.rs

1#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
2//! The protocol that a store implementation needs to implement.
3//!
4//! A store needs to handle [`Request`]s. It is fine to just return an error for some
5//! commands. E.g. an immutable store can just return an error for import commands.
6//!
7//! Each command consists of a serializable request message and channels for updates
8//! and responses. The enum containing the full requests is [`Command`]. These are the
9//! commands you will have to handle in a store actor handler.
10//!
11//! This crate provides a file system based store implementation, [`crate::store::fs::FsStore`],
12//! as well as a mutable in-memory store and an immutable in-memory store.
13//!
14//! The file system store is quite complex and optimized, so to get started take a look at
15//! the much simpler memory store.
16use std::{
17    fmt::{self, Debug},
18    io,
19    num::NonZeroU64,
20    ops::{Bound, RangeBounds},
21    path::PathBuf,
22    pin::Pin,
23};
24
25use arrayvec::ArrayString;
26use bao_tree::{
27    io::{mixed::EncodedItem, BaoContentItem, Leaf},
28    ChunkRanges,
29};
30use bytes::Bytes;
31use irpc::{
32    channel::{mpsc, oneshot},
33    rpc_requests,
34};
35use n0_future::Stream;
36use range_collections::RangeSet2;
37use serde::{Deserialize, Serialize};
38pub(crate) mod bitfield;
39pub use bitfield::Bitfield;
40
41use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
42
43#[allow(dead_code)]
44pub(crate) trait HashSpecific {
45    fn hash(&self) -> Hash;
46
47    fn hash_short(&self) -> ArrayString<10> {
48        self.hash().fmt_short()
49    }
50}
51
52impl HashSpecific for ImportBaoMsg {
53    fn hash(&self) -> crate::Hash {
54        self.inner.hash
55    }
56}
57
58impl HashSpecific for ObserveMsg {
59    fn hash(&self) -> crate::Hash {
60        self.inner.hash
61    }
62}
63
64impl HashSpecific for ExportBaoMsg {
65    fn hash(&self) -> crate::Hash {
66        self.inner.hash
67    }
68}
69
70impl HashSpecific for ExportRangesMsg {
71    fn hash(&self) -> crate::Hash {
72        self.inner.hash
73    }
74}
75
76impl HashSpecific for ExportPathMsg {
77    fn hash(&self) -> crate::Hash {
78        self.inner.hash
79    }
80}
81
82pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
83
84impl HashSpecific for CreateTagMsg {
85    fn hash(&self) -> crate::Hash {
86        self.inner.value.hash
87    }
88}
89
90#[rpc_requests(message = Command, alias = "Msg")]
91#[derive(Debug, Serialize, Deserialize)]
92pub enum Request {
93    #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
94    ListBlobs(ListRequest),
95    #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
96    Batch(BatchRequest),
97    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
98    DeleteBlobs(BlobDeleteRequest),
99    #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
100    ImportBao(ImportBaoRequest),
101    #[rpc(tx = mpsc::Sender<EncodedItem>)]
102    ExportBao(ExportBaoRequest),
103    #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
104    ExportRanges(ExportRangesRequest),
105    #[rpc(tx = mpsc::Sender<Bitfield>)]
106    Observe(ObserveRequest),
107    #[rpc(tx = oneshot::Sender<BlobStatus>)]
108    BlobStatus(BlobStatusRequest),
109    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
110    ImportBytes(ImportBytesRequest),
111    #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
112    ImportByteStream(ImportByteStreamRequest),
113    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
114    ImportPath(ImportPathRequest),
115    #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
116    ExportPath(ExportPathRequest),
117    #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
118    ListTags(ListTagsRequest),
119    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
120    SetTag(SetTagRequest),
121    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
122    DeleteTags(DeleteTagsRequest),
123    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
124    RenameTag(RenameTagRequest),
125    #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
126    CreateTag(CreateTagRequest),
127    #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
128    ListTempTags(ListTempTagsRequest),
129    #[rpc(tx = oneshot::Sender<TempTag>)]
130    CreateTempTag(CreateTempTagRequest),
131    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
132    SyncDb(SyncDbRequest),
133    #[rpc(tx = oneshot::Sender<()>)]
134    WaitIdle(WaitIdleRequest),
135    #[rpc(tx = oneshot::Sender<()>)]
136    Shutdown(ShutdownRequest),
137    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
138    ClearProtected(ClearProtectedRequest),
139}
140
141#[derive(Debug, Serialize, Deserialize)]
142pub struct WaitIdleRequest;
143
144#[derive(Debug, Serialize, Deserialize)]
145pub struct SyncDbRequest;
146
147#[derive(Debug, Serialize, Deserialize)]
148pub struct ShutdownRequest;
149
150#[derive(Debug, Serialize, Deserialize)]
151pub struct ClearProtectedRequest;
152
153#[derive(Debug, Serialize, Deserialize)]
154pub struct BlobStatusRequest {
155    pub hash: Hash,
156}
157
158#[derive(Debug, Serialize, Deserialize)]
159pub struct ListRequest;
160
161#[derive(Debug, Serialize, Deserialize)]
162pub struct BatchRequest;
163
164#[derive(Debug, Serialize, Deserialize)]
165pub enum BatchResponse {
166    Drop(HashAndFormat),
167    Ping,
168}
169
170/// Options for force deletion of blobs.
171#[derive(Debug, Serialize, Deserialize)]
172pub struct BlobDeleteRequest {
173    pub hashes: Vec<Hash>,
174    pub force: bool,
175}
176
177/// Import the given bytes.
178#[derive(Serialize, Deserialize)]
179pub struct ImportBytesRequest {
180    pub data: Bytes,
181    pub format: BlobFormat,
182    pub scope: Scope,
183}
184
185impl fmt::Debug for ImportBytesRequest {
186    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
187        f.debug_struct("ImportBytes")
188            .field("data", &self.data.len())
189            .field("format", &self.format)
190            .field("scope", &self.scope)
191            .finish()
192    }
193}
194
195#[derive(Debug, Serialize, Deserialize)]
196pub struct ImportPathRequest {
197    pub path: PathBuf,
198    pub mode: ImportMode,
199    pub format: BlobFormat,
200    pub scope: Scope,
201}
202
203/// Import bao encoded data for the given hash with the iroh block size.
204///
205/// The result is just a single item, indicating if a write error occurred.
206/// To observe the incoming data more granularly, use the `Observe` command
207/// concurrently.
208#[derive(Debug, Serialize, Deserialize)]
209pub struct ImportBaoRequest {
210    pub hash: Hash,
211    pub size: NonZeroU64,
212}
213
214/// Observe the local bitfield of the given hash.
215#[derive(Debug, Serialize, Deserialize)]
216pub struct ObserveRequest {
217    pub hash: Hash,
218}
219
220/// Export the given ranges in bao format, with the iroh block size.
221///
222/// The returned stream should be verified by the store.
223#[derive(Debug, Serialize, Deserialize)]
224pub struct ExportBaoRequest {
225    pub hash: Hash,
226    pub ranges: ChunkRanges,
227}
228
229/// Export the given ranges as chunkks, without validation.
230#[derive(Debug, Serialize, Deserialize)]
231pub struct ExportRangesRequest {
232    pub hash: Hash,
233    pub ranges: RangeSet2<u64>,
234}
235
236/// Export a file to a target path.
237///
238/// For an incomplete file, the size might be truncated and gaps will be filled
239/// with zeros. If possible, a store implementation should try to write as a
240/// sparse file.
241
242#[derive(Debug, Serialize, Deserialize)]
243pub struct ExportPathRequest {
244    pub hash: Hash,
245    pub mode: ExportMode,
246    pub target: PathBuf,
247}
248
249#[derive(Debug, Serialize, Deserialize)]
250pub struct ImportByteStreamRequest {
251    pub format: BlobFormat,
252    pub scope: Scope,
253}
254
255#[derive(Debug, Serialize, Deserialize)]
256pub enum ImportByteStreamUpdate {
257    Bytes(Bytes),
258    Done,
259}
260
261/// Options for a list operation.
262#[derive(Debug, Clone, Serialize, Deserialize)]
263pub struct ListTagsRequest {
264    /// List tags to hash seqs
265    pub hash_seq: bool,
266    /// List tags to raw blobs
267    pub raw: bool,
268    /// Optional from tag (inclusive)
269    pub from: Option<Tag>,
270    /// Optional to tag (exclusive)
271    pub to: Option<Tag>,
272}
273
274impl ListTagsRequest {
275    /// List a range of tags
276    pub fn range<R, E>(range: R) -> Self
277    where
278        R: RangeBounds<E>,
279        E: AsRef<[u8]>,
280    {
281        let (from, to) = tags_from_range(range);
282        Self {
283            from,
284            to,
285            raw: true,
286            hash_seq: true,
287        }
288    }
289
290    /// List tags with a prefix
291    pub fn prefix(prefix: &[u8]) -> Self {
292        let from = Tag::from(prefix);
293        let to = from.next_prefix();
294        Self {
295            raw: true,
296            hash_seq: true,
297            from: Some(from),
298            to,
299        }
300    }
301
302    /// List a single tag
303    pub fn single(name: &[u8]) -> Self {
304        let from = Tag::from(name);
305        Self {
306            to: Some(from.successor()),
307            from: Some(from),
308            raw: true,
309            hash_seq: true,
310        }
311    }
312
313    /// List all tags
314    pub fn all() -> Self {
315        Self {
316            raw: true,
317            hash_seq: true,
318            from: None,
319            to: None,
320        }
321    }
322
323    /// List raw tags
324    pub fn raw() -> Self {
325        Self {
326            raw: true,
327            hash_seq: false,
328            from: None,
329            to: None,
330        }
331    }
332
333    /// List hash seq tags
334    pub fn hash_seq() -> Self {
335        Self {
336            raw: false,
337            hash_seq: true,
338            from: None,
339            to: None,
340        }
341    }
342}
343
344/// Information about a tag.
345#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
346pub struct TagInfo {
347    /// Name of the tag
348    pub name: Tag,
349    /// Format of the data
350    pub format: BlobFormat,
351    /// Hash of the data
352    pub hash: Hash,
353}
354
355impl From<TagInfo> for HashAndFormat {
356    fn from(tag_info: TagInfo) -> Self {
357        HashAndFormat {
358            hash: tag_info.hash,
359            format: tag_info.format,
360        }
361    }
362}
363
364impl TagInfo {
365    /// Create a new tag info.
366    pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
367        let name = name.as_ref();
368        let value = value.into();
369        Self {
370            name: Tag::from(name),
371            hash: value.hash,
372            format: value.format,
373        }
374    }
375
376    /// Get the hash and format of the tag.
377    pub fn hash_and_format(&self) -> HashAndFormat {
378        HashAndFormat {
379            hash: self.hash,
380            format: self.format,
381        }
382    }
383}
384
385pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
386where
387    R: RangeBounds<E>,
388    E: AsRef<[u8]>,
389{
390    let from = match range.start_bound() {
391        Bound::Included(start) => Some(Tag::from(start.as_ref())),
392        Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
393        Bound::Unbounded => None,
394    };
395    let to = match range.end_bound() {
396        Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
397        Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
398        Bound::Unbounded => None,
399    };
400    (from, to)
401}
402
403/// List all temp tags
404#[derive(Debug, Serialize, Deserialize)]
405pub struct CreateTempTagRequest {
406    pub scope: Scope,
407    pub value: HashAndFormat,
408}
409
410/// List all temp tags
411#[derive(Debug, Serialize, Deserialize)]
412pub struct ListTempTagsRequest;
413
414/// Rename a tag atomically
415#[derive(Debug, Serialize, Deserialize)]
416pub struct RenameTagRequest {
417    /// Old tag name
418    pub from: Tag,
419    /// New tag name
420    pub to: Tag,
421}
422
423/// Options for a delete operation.
424#[derive(Debug, Clone, Serialize, Deserialize)]
425pub struct DeleteTagsRequest {
426    /// Optional from tag (inclusive)
427    pub from: Option<Tag>,
428    /// Optional to tag (exclusive)
429    pub to: Option<Tag>,
430}
431
432impl DeleteTagsRequest {
433    /// Delete a single tag
434    pub fn single(name: &[u8]) -> Self {
435        let name = Tag::from(name);
436        Self {
437            to: Some(name.successor()),
438            from: Some(name),
439        }
440    }
441
442    /// Delete a range of tags
443    pub fn range<R, E>(range: R) -> Self
444    where
445        R: RangeBounds<E>,
446        E: AsRef<[u8]>,
447    {
448        let (from, to) = tags_from_range(range);
449        Self { from, to }
450    }
451
452    /// Delete tags with a prefix
453    pub fn prefix(prefix: &[u8]) -> Self {
454        let from = Tag::from(prefix);
455        let to = from.next_prefix();
456        Self {
457            from: Some(from),
458            to,
459        }
460    }
461}
462
463/// Options for creating a tag or setting it to a new value.
464#[derive(Debug, Serialize, Deserialize)]
465pub struct SetTagRequest {
466    pub name: Tag,
467    pub value: HashAndFormat,
468}
469
470/// Options for creating a tag
471#[derive(Debug, Serialize, Deserialize)]
472pub struct CreateTagRequest {
473    pub value: HashAndFormat,
474}
475
476/// Debug tool to exit the process in the middle of a write transaction, for testing.
477#[derive(Debug, Serialize, Deserialize)]
478pub struct ProcessExitRequest {
479    pub code: i32,
480}
481
482/// Progress events for importing from any local source.
483///
484/// For sources with known size such as blobs or files, you will get the events
485/// in the following order:
486///
487/// Size -> CopyProgress(*n) -> CopyDone -> OutboardProgress(*n) -> Done
488///
489/// For sources with unknown size such as streams, you will get the events
490/// in the following order:
491///
492/// CopyProgress(*n) -> Size -> CopyDone -> OutboardProgress(*n) -> Done
493///
494/// Errors can happen at any time, and will be reported as an `Error` event.
495#[derive(Debug, Serialize, Deserialize)]
496pub enum AddProgressItem {
497    /// Progress copying the file into the data directory.
498    ///
499    /// On most modern systems, copying will be done with copy on write,
500    /// so copying will be instantaneous and you won't get any of these.
501    ///
502    /// The number is the *byte offset* of the copy process.
503    ///
504    /// This is an ephemeral progress event, so you can't rely on getting
505    /// regular updates.
506    CopyProgress(u64),
507    /// Size of the file or stream has been determined.
508    ///
509    /// For some input such as blobs or files, the size is immediately known.
510    /// For other inputs such as streams, the size is determined by reading
511    /// the stream to the end.
512    ///
513    /// This is a guaranteed progress event, so you can rely on getting exactly
514    /// one of these.
515    Size(u64),
516    /// The copy part of the import operation is done.
517    ///
518    /// This is a guaranteed progress event, so you can rely on getting exactly
519    /// one of these.
520    CopyDone,
521    /// Progress computing the outboard and root hash of the imported data.
522    ///
523    /// This is an ephemeral progress event, so you can't rely on getting
524    /// regular updates.
525    OutboardProgress(u64),
526    /// The import is done. Once you get this event the data is available
527    /// and protected in the store via the temp tag.
528    ///
529    /// This is a guaranteed progress event, so you can rely on getting exactly
530    /// one of these if the operation was successful.
531    ///
532    /// This is one of the two possible final events. After this event, there
533    /// won't be any more progress events.
534    Done(TempTag),
535    /// The import failed with an error. Partial data will be deleted.
536    ///
537    /// This is a guaranteed progress event, so you can rely on getting exactly
538    /// one of these if the operation was unsuccessful.
539    ///
540    /// This is one of the two possible final events. After this event, there
541    /// won't be any more progress events.
542    Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
543}
544
545impl From<io::Error> for AddProgressItem {
546    fn from(e: io::Error) -> Self {
547        Self::Error(e)
548    }
549}
550
551#[derive(Debug, Serialize, Deserialize)]
552pub enum ExportRangesItem {
553    /// The size of the file being exported.
554    ///
555    /// This is a guaranteed progress event, so you can rely on getting exactly
556    /// one of these.
557    Size(u64),
558    /// A range of the file being exported.
559    Data(Leaf),
560    /// Error while exporting the ranges.
561    Error(super::Error),
562}
563
564impl From<super::Error> for ExportRangesItem {
565    fn from(e: super::Error) -> Self {
566        Self::Error(e)
567    }
568}
569
570impl From<Leaf> for ExportRangesItem {
571    fn from(leaf: Leaf) -> Self {
572        Self::Data(leaf)
573    }
574}
575
576/// Progress events for exporting to a local file.
577///
578/// Exporting does not involve outboard computation, so the events are simpler
579/// than [`AddProgressItem`].
580///
581/// Size -> CopyProgress(*n) -> Done
582///
583/// Errors can happen at any time, and will be reported as an `Error` event.
584#[derive(Debug, Serialize, Deserialize)]
585pub enum ExportProgressItem {
586    /// The size of the file being exported.
587    ///
588    /// This is a guaranteed progress event, so you can rely on getting exactly
589    /// one of these.
590    Size(u64),
591    /// Progress copying the file to the target directory.
592    ///
593    /// On many modern systems, copying will be done with copy on write,
594    /// so copying will be instantaneous and you won't get any of these.
595    ///
596    /// This is an ephemeral progress event, so you can't rely on getting
597    /// regular updates.
598    CopyProgress(u64),
599    /// The export is done. Once you get this event the data is available.
600    ///
601    /// This is a guaranteed progress event, so you can rely on getting exactly
602    /// one of these if the operation was successful.
603    ///
604    /// This is one of the two possible final events. After this event, there
605    /// won't be any more progress events.
606    Done,
607    /// The export failed with an error.
608    ///
609    /// This is a guaranteed progress event, so you can rely on getting exactly
610    /// one of these if the operation was unsuccessful.
611    ///
612    /// This is one of the two possible final events. After this event, there
613    /// won't be any more progress events.
614    Error(super::Error),
615}
616
617impl From<super::Error> for ExportProgressItem {
618    fn from(e: super::Error) -> Self {
619        Self::Error(e)
620    }
621}
622
623/// The import mode describes how files will be imported.
624///
625/// This is a hint to the import trait method. For some implementations, this
626/// does not make any sense. E.g. an in memory implementation will always have
627/// to copy the file into memory. Also, a disk based implementation might choose
628/// to copy small files even if the mode is `Reference`.
629#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
630pub enum ImportMode {
631    /// This mode will copy the file into the database before hashing.
632    ///
633    /// This is the safe default because the file can not be accidentally modified
634    /// after it has been imported.
635    #[default]
636    Copy,
637    /// This mode will try to reference the file in place and assume it is unchanged after import.
638    ///
639    /// This has a large performance and storage benefit, but it is less safe since
640    /// the file might be modified after it has been imported.
641    ///
642    /// Stores are allowed to ignore this mode and always copy the file, e.g.
643    /// if the file is very small or if the store does not support referencing files.
644    TryReference,
645}
646
647/// The import mode describes how files will be imported.
648///
649/// This is a hint to the import trait method. For some implementations, this
650/// does not make any sense. E.g. an in memory implementation will always have
651/// to copy the file into memory. Also, a disk based implementation might choose
652/// to copy small files even if the mode is `Reference`.
653#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
654pub enum ExportMode {
655    /// This mode will copy the file to the target directory.
656    ///
657    /// This is the safe default because the file can not be accidentally modified
658    /// after it has been exported.
659    #[default]
660    Copy,
661    /// This mode will try to move the file to the target directory and then reference it from
662    /// the database.
663    ///
664    /// This has a large performance and storage benefit, but it is less safe since
665    /// the file might be modified in the target directory after it has been exported.
666    ///
667    /// Stores are allowed to ignore this mode and always copy the file, e.g.
668    /// if the file is very small or if the store does not support referencing files.
669    TryReference,
670}
671
672/// Status information about a blob.
673#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
674pub enum BlobStatus {
675    /// The blob is not stored at all.
676    NotFound,
677    /// The blob is only stored partially.
678    Partial {
679        /// The size of the currently stored partial blob.
680        size: Option<u64>,
681    },
682    /// The blob is stored completely.
683    Complete {
684        /// The size of the blob.
685        size: u64,
686    },
687}
688
689/// A scope for a write operation.
690#[derive(
691    Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
692)]
693pub struct Scope(pub(crate) u64);
694
695impl Scope {
696    pub const GLOBAL: Self = Self(0);
697}
698
699impl std::fmt::Debug for Scope {
700    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
701        if self.0 == 0 {
702            write!(f, "Global")
703        } else {
704            f.debug_tuple("Scope").field(&self.0).finish()
705        }
706    }
707}