iroh_blobs/api/
proto.rs

1#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
2//! The protocol that a store implementation needs to implement.
3//!
4//! A store needs to handle [`Request`]s. It is fine to just return an error for some
5//! commands. E.g. an immutable store can just return an error for import commands.
6//!
7//! Each command consists of a serializable request message and channels for updates
8//! and responses. The enum containing the full requests is [`Command`]. These are the
9//! commands you will have to handle in a store actor handler.
10//!
11//! This crate provides a file system based store implementation, [`crate::store::fs::FsStore`],
12//! as well as a mutable in-memory store and an immutable in-memory store.
13//!
14//! The file system store is quite complex and optimized, so to get started take a look at
15//! the much simpler memory store.
16use std::{
17    fmt::{self, Debug},
18    io,
19    num::NonZeroU64,
20    ops::{Bound, RangeBounds},
21    path::PathBuf,
22    pin::Pin,
23};
24
25use arrayvec::ArrayString;
26use bao_tree::{
27    io::{mixed::EncodedItem, BaoContentItem, Leaf},
28    ChunkRanges,
29};
30use bytes::Bytes;
31use irpc::{
32    channel::{mpsc, oneshot},
33    rpc_requests,
34};
35use n0_future::Stream;
36use range_collections::RangeSet2;
37use serde::{Deserialize, Serialize};
38pub(crate) mod bitfield;
39pub use bitfield::Bitfield;
40
41use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
42
43pub(crate) trait HashSpecific {
44    fn hash(&self) -> Hash;
45
46    fn hash_short(&self) -> ArrayString<10> {
47        self.hash().fmt_short()
48    }
49}
50
51impl HashSpecific for ImportBaoMsg {
52    fn hash(&self) -> crate::Hash {
53        self.inner.hash
54    }
55}
56
57impl HashSpecific for ObserveMsg {
58    fn hash(&self) -> crate::Hash {
59        self.inner.hash
60    }
61}
62
63impl HashSpecific for ExportBaoMsg {
64    fn hash(&self) -> crate::Hash {
65        self.inner.hash
66    }
67}
68
69impl HashSpecific for ExportRangesMsg {
70    fn hash(&self) -> crate::Hash {
71        self.inner.hash
72    }
73}
74
75impl HashSpecific for ExportPathMsg {
76    fn hash(&self) -> crate::Hash {
77        self.inner.hash
78    }
79}
80
81pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
82
83impl HashSpecific for CreateTagMsg {
84    fn hash(&self) -> crate::Hash {
85        self.inner.value.hash
86    }
87}
88
89#[rpc_requests(message = Command, alias = "Msg")]
90#[derive(Debug, Serialize, Deserialize)]
91pub enum Request {
92    #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
93    ListBlobs(ListRequest),
94    #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
95    Batch(BatchRequest),
96    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
97    DeleteBlobs(BlobDeleteRequest),
98    #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
99    ImportBao(ImportBaoRequest),
100    #[rpc(tx = mpsc::Sender<EncodedItem>)]
101    ExportBao(ExportBaoRequest),
102    #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
103    ExportRanges(ExportRangesRequest),
104    #[rpc(tx = mpsc::Sender<Bitfield>)]
105    Observe(ObserveRequest),
106    #[rpc(tx = oneshot::Sender<BlobStatus>)]
107    BlobStatus(BlobStatusRequest),
108    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
109    ImportBytes(ImportBytesRequest),
110    #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
111    ImportByteStream(ImportByteStreamRequest),
112    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
113    ImportPath(ImportPathRequest),
114    #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
115    ExportPath(ExportPathRequest),
116    #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
117    ListTags(ListTagsRequest),
118    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
119    SetTag(SetTagRequest),
120    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
121    DeleteTags(DeleteTagsRequest),
122    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
123    RenameTag(RenameTagRequest),
124    #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
125    CreateTag(CreateTagRequest),
126    #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
127    ListTempTags(ListTempTagsRequest),
128    #[rpc(tx = oneshot::Sender<TempTag>)]
129    CreateTempTag(CreateTempTagRequest),
130    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
131    SyncDb(SyncDbRequest),
132    #[rpc(tx = oneshot::Sender<()>)]
133    Shutdown(ShutdownRequest),
134    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
135    ClearProtected(ClearProtectedRequest),
136}
137
138#[derive(Debug, Serialize, Deserialize)]
139pub struct SyncDbRequest;
140
141#[derive(Debug, Serialize, Deserialize)]
142pub struct ShutdownRequest;
143
144#[derive(Debug, Serialize, Deserialize)]
145pub struct ClearProtectedRequest;
146
147#[derive(Debug, Serialize, Deserialize)]
148pub struct BlobStatusRequest {
149    pub hash: Hash,
150}
151
152#[derive(Debug, Serialize, Deserialize)]
153pub struct ListRequest;
154
155#[derive(Debug, Serialize, Deserialize)]
156pub struct BatchRequest;
157
158#[derive(Debug, Serialize, Deserialize)]
159pub enum BatchResponse {
160    Drop(HashAndFormat),
161    Ping,
162}
163
164/// Options for force deletion of blobs.
165#[derive(Debug, Serialize, Deserialize)]
166pub struct BlobDeleteRequest {
167    pub hashes: Vec<Hash>,
168    pub force: bool,
169}
170
171/// Import the given bytes.
172#[derive(Serialize, Deserialize)]
173pub struct ImportBytesRequest {
174    pub data: Bytes,
175    pub format: BlobFormat,
176    pub scope: Scope,
177}
178
179impl fmt::Debug for ImportBytesRequest {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        f.debug_struct("ImportBytes")
182            .field("data", &self.data.len())
183            .field("format", &self.format)
184            .field("scope", &self.scope)
185            .finish()
186    }
187}
188
189#[derive(Debug, Serialize, Deserialize)]
190pub struct ImportPathRequest {
191    pub path: PathBuf,
192    pub mode: ImportMode,
193    pub format: BlobFormat,
194    pub scope: Scope,
195}
196
197/// Import bao encoded data for the given hash with the iroh block size.
198///
199/// The result is just a single item, indicating if a write error occurred.
200/// To observe the incoming data more granularly, use the `Observe` command
201/// concurrently.
202#[derive(Debug, Serialize, Deserialize)]
203pub struct ImportBaoRequest {
204    pub hash: Hash,
205    pub size: NonZeroU64,
206}
207
208/// Observe the local bitfield of the given hash.
209#[derive(Debug, Serialize, Deserialize)]
210pub struct ObserveRequest {
211    pub hash: Hash,
212}
213
214/// Export the given ranges in bao format, with the iroh block size.
215///
216/// The returned stream should be verified by the store.
217#[derive(Debug, Serialize, Deserialize)]
218pub struct ExportBaoRequest {
219    pub hash: Hash,
220    pub ranges: ChunkRanges,
221}
222
223/// Export the given ranges as chunkks, without validation.
224#[derive(Debug, Serialize, Deserialize)]
225pub struct ExportRangesRequest {
226    pub hash: Hash,
227    pub ranges: RangeSet2<u64>,
228}
229
230/// Export a file to a target path.
231///
232/// For an incomplete file, the size might be truncated and gaps will be filled
233/// with zeros. If possible, a store implementation should try to write as a
234/// sparse file.
235
236#[derive(Debug, Serialize, Deserialize)]
237pub struct ExportPathRequest {
238    pub hash: Hash,
239    pub mode: ExportMode,
240    pub target: PathBuf,
241}
242
243#[derive(Debug, Serialize, Deserialize)]
244pub struct ImportByteStreamRequest {
245    pub format: BlobFormat,
246    pub scope: Scope,
247}
248
249#[derive(Debug, Serialize, Deserialize)]
250pub enum ImportByteStreamUpdate {
251    Bytes(Bytes),
252    Done,
253}
254
255/// Options for a list operation.
256#[derive(Debug, Clone, Serialize, Deserialize)]
257pub struct ListTagsRequest {
258    /// List tags to hash seqs
259    pub hash_seq: bool,
260    /// List tags to raw blobs
261    pub raw: bool,
262    /// Optional from tag (inclusive)
263    pub from: Option<Tag>,
264    /// Optional to tag (exclusive)
265    pub to: Option<Tag>,
266}
267
268impl ListTagsRequest {
269    /// List a range of tags
270    pub fn range<R, E>(range: R) -> Self
271    where
272        R: RangeBounds<E>,
273        E: AsRef<[u8]>,
274    {
275        let (from, to) = tags_from_range(range);
276        Self {
277            from,
278            to,
279            raw: true,
280            hash_seq: true,
281        }
282    }
283
284    /// List tags with a prefix
285    pub fn prefix(prefix: &[u8]) -> Self {
286        let from = Tag::from(prefix);
287        let to = from.next_prefix();
288        Self {
289            raw: true,
290            hash_seq: true,
291            from: Some(from),
292            to,
293        }
294    }
295
296    /// List a single tag
297    pub fn single(name: &[u8]) -> Self {
298        let from = Tag::from(name);
299        Self {
300            to: Some(from.successor()),
301            from: Some(from),
302            raw: true,
303            hash_seq: true,
304        }
305    }
306
307    /// List all tags
308    pub fn all() -> Self {
309        Self {
310            raw: true,
311            hash_seq: true,
312            from: None,
313            to: None,
314        }
315    }
316
317    /// List raw tags
318    pub fn raw() -> Self {
319        Self {
320            raw: true,
321            hash_seq: false,
322            from: None,
323            to: None,
324        }
325    }
326
327    /// List hash seq tags
328    pub fn hash_seq() -> Self {
329        Self {
330            raw: false,
331            hash_seq: true,
332            from: None,
333            to: None,
334        }
335    }
336}
337
338/// Information about a tag.
339#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
340pub struct TagInfo {
341    /// Name of the tag
342    pub name: Tag,
343    /// Format of the data
344    pub format: BlobFormat,
345    /// Hash of the data
346    pub hash: Hash,
347}
348
349impl From<TagInfo> for HashAndFormat {
350    fn from(tag_info: TagInfo) -> Self {
351        HashAndFormat {
352            hash: tag_info.hash,
353            format: tag_info.format,
354        }
355    }
356}
357
358impl TagInfo {
359    /// Create a new tag info.
360    pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
361        let name = name.as_ref();
362        let value = value.into();
363        Self {
364            name: Tag::from(name),
365            hash: value.hash,
366            format: value.format,
367        }
368    }
369
370    /// Get the hash and format of the tag.
371    pub fn hash_and_format(&self) -> HashAndFormat {
372        HashAndFormat {
373            hash: self.hash,
374            format: self.format,
375        }
376    }
377}
378
379pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
380where
381    R: RangeBounds<E>,
382    E: AsRef<[u8]>,
383{
384    let from = match range.start_bound() {
385        Bound::Included(start) => Some(Tag::from(start.as_ref())),
386        Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
387        Bound::Unbounded => None,
388    };
389    let to = match range.end_bound() {
390        Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
391        Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
392        Bound::Unbounded => None,
393    };
394    (from, to)
395}
396
397/// List all temp tags
398#[derive(Debug, Serialize, Deserialize)]
399pub struct CreateTempTagRequest {
400    pub scope: Scope,
401    pub value: HashAndFormat,
402}
403
404/// List all temp tags
405#[derive(Debug, Serialize, Deserialize)]
406pub struct ListTempTagsRequest;
407
408/// Rename a tag atomically
409#[derive(Debug, Serialize, Deserialize)]
410pub struct RenameTagRequest {
411    /// Old tag name
412    pub from: Tag,
413    /// New tag name
414    pub to: Tag,
415}
416
417/// Options for a delete operation.
418#[derive(Debug, Clone, Serialize, Deserialize)]
419pub struct DeleteTagsRequest {
420    /// Optional from tag (inclusive)
421    pub from: Option<Tag>,
422    /// Optional to tag (exclusive)
423    pub to: Option<Tag>,
424}
425
426impl DeleteTagsRequest {
427    /// Delete a single tag
428    pub fn single(name: &[u8]) -> Self {
429        let name = Tag::from(name);
430        Self {
431            to: Some(name.successor()),
432            from: Some(name),
433        }
434    }
435
436    /// Delete a range of tags
437    pub fn range<R, E>(range: R) -> Self
438    where
439        R: RangeBounds<E>,
440        E: AsRef<[u8]>,
441    {
442        let (from, to) = tags_from_range(range);
443        Self { from, to }
444    }
445
446    /// Delete tags with a prefix
447    pub fn prefix(prefix: &[u8]) -> Self {
448        let from = Tag::from(prefix);
449        let to = from.next_prefix();
450        Self {
451            from: Some(from),
452            to,
453        }
454    }
455}
456
457/// Options for creating a tag or setting it to a new value.
458#[derive(Debug, Serialize, Deserialize)]
459pub struct SetTagRequest {
460    pub name: Tag,
461    pub value: HashAndFormat,
462}
463
464/// Options for creating a tag
465#[derive(Debug, Serialize, Deserialize)]
466pub struct CreateTagRequest {
467    pub value: HashAndFormat,
468}
469
470/// Debug tool to exit the process in the middle of a write transaction, for testing.
471#[derive(Debug, Serialize, Deserialize)]
472pub struct ProcessExitRequest {
473    pub code: i32,
474}
475
476/// Progress events for importing from any local source.
477///
478/// For sources with known size such as blobs or files, you will get the events
479/// in the following order:
480///
481/// Size -> CopyProgress(*n) -> CopyDone -> OutboardProgress(*n) -> Done
482///
483/// For sources with unknown size such as streams, you will get the events
484/// in the following order:
485///
486/// CopyProgress(*n) -> Size -> CopyDone -> OutboardProgress(*n) -> Done
487///
488/// Errors can happen at any time, and will be reported as an `Error` event.
489#[derive(Debug, Serialize, Deserialize)]
490pub enum AddProgressItem {
491    /// Progress copying the file into the data directory.
492    ///
493    /// On most modern systems, copying will be done with copy on write,
494    /// so copying will be instantaneous and you won't get any of these.
495    ///
496    /// The number is the *byte offset* of the copy process.
497    ///
498    /// This is an ephemeral progress event, so you can't rely on getting
499    /// regular updates.
500    CopyProgress(u64),
501    /// Size of the file or stream has been determined.
502    ///
503    /// For some input such as blobs or files, the size is immediately known.
504    /// For other inputs such as streams, the size is determined by reading
505    /// the stream to the end.
506    ///
507    /// This is a guaranteed progress event, so you can rely on getting exactly
508    /// one of these.
509    Size(u64),
510    /// The copy part of the import operation is done.
511    ///
512    /// This is a guaranteed progress event, so you can rely on getting exactly
513    /// one of these.
514    CopyDone,
515    /// Progress computing the outboard and root hash of the imported data.
516    ///
517    /// This is an ephemeral progress event, so you can't rely on getting
518    /// regular updates.
519    OutboardProgress(u64),
520    /// The import is done. Once you get this event the data is available
521    /// and protected in the store via the temp tag.
522    ///
523    /// This is a guaranteed progress event, so you can rely on getting exactly
524    /// one of these if the operation was successful.
525    ///
526    /// This is one of the two possible final events. After this event, there
527    /// won't be any more progress events.
528    Done(TempTag),
529    /// The import failed with an error. Partial data will be deleted.
530    ///
531    /// This is a guaranteed progress event, so you can rely on getting exactly
532    /// one of these if the operation was unsuccessful.
533    ///
534    /// This is one of the two possible final events. After this event, there
535    /// won't be any more progress events.
536    Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
537}
538
539impl From<io::Error> for AddProgressItem {
540    fn from(e: io::Error) -> Self {
541        Self::Error(e)
542    }
543}
544
545#[derive(Debug, Serialize, Deserialize)]
546pub enum ExportRangesItem {
547    /// The size of the file being exported.
548    ///
549    /// This is a guaranteed progress event, so you can rely on getting exactly
550    /// one of these.
551    Size(u64),
552    /// A range of the file being exported.
553    Data(Leaf),
554    /// Error while exporting the ranges.
555    Error(super::Error),
556}
557
558impl From<super::Error> for ExportRangesItem {
559    fn from(e: super::Error) -> Self {
560        Self::Error(e)
561    }
562}
563
564impl From<Leaf> for ExportRangesItem {
565    fn from(leaf: Leaf) -> Self {
566        Self::Data(leaf)
567    }
568}
569
570/// Progress events for exporting to a local file.
571///
572/// Exporting does not involve outboard computation, so the events are simpler
573/// than [`AddProgressItem`].
574///
575/// Size -> CopyProgress(*n) -> Done
576///
577/// Errors can happen at any time, and will be reported as an `Error` event.
578#[derive(Debug, Serialize, Deserialize)]
579pub enum ExportProgressItem {
580    /// The size of the file being exported.
581    ///
582    /// This is a guaranteed progress event, so you can rely on getting exactly
583    /// one of these.
584    Size(u64),
585    /// Progress copying the file to the target directory.
586    ///
587    /// On many modern systems, copying will be done with copy on write,
588    /// so copying will be instantaneous and you won't get any of these.
589    ///
590    /// This is an ephemeral progress event, so you can't rely on getting
591    /// regular updates.
592    CopyProgress(u64),
593    /// The export is done. Once you get this event the data is available.
594    ///
595    /// This is a guaranteed progress event, so you can rely on getting exactly
596    /// one of these if the operation was successful.
597    ///
598    /// This is one of the two possible final events. After this event, there
599    /// won't be any more progress events.
600    Done,
601    /// The export failed with an error.
602    ///
603    /// This is a guaranteed progress event, so you can rely on getting exactly
604    /// one of these if the operation was unsuccessful.
605    ///
606    /// This is one of the two possible final events. After this event, there
607    /// won't be any more progress events.
608    Error(super::Error),
609}
610
611impl From<super::Error> for ExportProgressItem {
612    fn from(e: super::Error) -> Self {
613        Self::Error(e)
614    }
615}
616
617/// The import mode describes how files will be imported.
618///
619/// This is a hint to the import trait method. For some implementations, this
620/// does not make any sense. E.g. an in memory implementation will always have
621/// to copy the file into memory. Also, a disk based implementation might choose
622/// to copy small files even if the mode is `Reference`.
623#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
624pub enum ImportMode {
625    /// This mode will copy the file into the database before hashing.
626    ///
627    /// This is the safe default because the file can not be accidentally modified
628    /// after it has been imported.
629    #[default]
630    Copy,
631    /// This mode will try to reference the file in place and assume it is unchanged after import.
632    ///
633    /// This has a large performance and storage benefit, but it is less safe since
634    /// the file might be modified after it has been imported.
635    ///
636    /// Stores are allowed to ignore this mode and always copy the file, e.g.
637    /// if the file is very small or if the store does not support referencing files.
638    TryReference,
639}
640
641/// The import mode describes how files will be imported.
642///
643/// This is a hint to the import trait method. For some implementations, this
644/// does not make any sense. E.g. an in memory implementation will always have
645/// to copy the file into memory. Also, a disk based implementation might choose
646/// to copy small files even if the mode is `Reference`.
647#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
648pub enum ExportMode {
649    /// This mode will copy the file to the target directory.
650    ///
651    /// This is the safe default because the file can not be accidentally modified
652    /// after it has been exported.
653    #[default]
654    Copy,
655    /// This mode will try to move the file to the target directory and then reference it from
656    /// the database.
657    ///
658    /// This has a large performance and storage benefit, but it is less safe since
659    /// the file might be modified in the target directory after it has been exported.
660    ///
661    /// Stores are allowed to ignore this mode and always copy the file, e.g.
662    /// if the file is very small or if the store does not support referencing files.
663    TryReference,
664}
665
666/// Status information about a blob.
667#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
668pub enum BlobStatus {
669    /// The blob is not stored at all.
670    NotFound,
671    /// The blob is only stored partially.
672    Partial {
673        /// The size of the currently stored partial blob.
674        size: Option<u64>,
675    },
676    /// The blob is stored completely.
677    Complete {
678        /// The size of the blob.
679        size: u64,
680    },
681}
682
683/// A scope for a write operation.
684#[derive(
685    Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
686)]
687pub struct Scope(pub(crate) u64);
688
689impl Scope {
690    pub const GLOBAL: Self = Self(0);
691}
692
693impl std::fmt::Debug for Scope {
694    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
695        if self.0 == 0 {
696            write!(f, "Global")
697        } else {
698            f.debug_tuple("Scope").field(&self.0).finish()
699        }
700    }
701}