iroh_blobs/api/
proto.rs

1#![cfg_attr(feature = "hide-proto-docs", doc(hidden))]
2//! The protocol that a store implementation needs to implement.
3//!
4//! A store needs to handle [`Request`]s. It is fine to just return an error for some
5//! commands. E.g. an immutable store can just return an error for import commands.
6//!
7//! Each command consists of a serializable request message and channels for updates
8//! and responses. The enum containing the full requests is [`Command`]. These are the
9//! commands you will have to handle in a store actor handler.
10//!
11//! This crate provides a file system based store implementation, [`crate::store::fs::FsStore`],
12//! as well as a mutable in-memory store and an immutable in-memory store.
13//!
14//! The file system store is quite complex and optimized, so to get started take a look at
15//! the much simpler memory store.
16use std::{
17    fmt::{self, Debug},
18    io,
19    num::NonZeroU64,
20    ops::{Bound, RangeBounds},
21    path::PathBuf,
22    pin::Pin,
23};
24
25use arrayvec::ArrayString;
26use bao_tree::{
27    io::{mixed::EncodedItem, BaoContentItem, Leaf},
28    ChunkRanges,
29};
30use bytes::Bytes;
31use irpc::{
32    channel::{mpsc, oneshot},
33    rpc_requests,
34};
35use n0_future::Stream;
36use range_collections::RangeSet2;
37use serde::{Deserialize, Serialize};
38pub(crate) mod bitfield;
39pub use bitfield::Bitfield;
40
41use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
42
43pub(crate) trait HashSpecific {
44    fn hash(&self) -> Hash;
45
46    fn hash_short(&self) -> ArrayString<10> {
47        self.hash().fmt_short()
48    }
49}
50
51impl HashSpecific for ImportBaoMsg {
52    fn hash(&self) -> crate::Hash {
53        self.inner.hash
54    }
55}
56
57impl HashSpecific for ObserveMsg {
58    fn hash(&self) -> crate::Hash {
59        self.inner.hash
60    }
61}
62
63impl HashSpecific for ExportBaoMsg {
64    fn hash(&self) -> crate::Hash {
65        self.inner.hash
66    }
67}
68
69impl HashSpecific for ExportRangesMsg {
70    fn hash(&self) -> crate::Hash {
71        self.inner.hash
72    }
73}
74
75impl HashSpecific for ExportPathMsg {
76    fn hash(&self) -> crate::Hash {
77        self.inner.hash
78    }
79}
80
81pub type BoxedByteStream = Pin<Box<dyn Stream<Item = io::Result<Bytes>> + Send + Sync + 'static>>;
82
83impl HashSpecific for CreateTagMsg {
84    fn hash(&self) -> crate::Hash {
85        self.inner.value.hash
86    }
87}
88
89#[derive(Debug, Clone)]
90pub struct StoreService;
91impl irpc::Service for StoreService {}
92
93#[rpc_requests(StoreService, message = Command, alias = "Msg")]
94#[derive(Debug, Serialize, Deserialize)]
95pub enum Request {
96    #[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
97    ListBlobs(ListRequest),
98    #[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
99    Batch(BatchRequest),
100    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
101    DeleteBlobs(BlobDeleteRequest),
102    #[rpc(rx = mpsc::Receiver<BaoContentItem>, tx = oneshot::Sender<super::Result<()>>)]
103    ImportBao(ImportBaoRequest),
104    #[rpc(tx = mpsc::Sender<EncodedItem>)]
105    ExportBao(ExportBaoRequest),
106    #[rpc(tx = mpsc::Sender<ExportRangesItem>)]
107    ExportRanges(ExportRangesRequest),
108    #[rpc(tx = mpsc::Sender<Bitfield>)]
109    Observe(ObserveRequest),
110    #[rpc(tx = oneshot::Sender<BlobStatus>)]
111    BlobStatus(BlobStatusRequest),
112    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
113    ImportBytes(ImportBytesRequest),
114    #[rpc(rx = mpsc::Receiver<ImportByteStreamUpdate>, tx = mpsc::Sender<AddProgressItem>)]
115    ImportByteStream(ImportByteStreamRequest),
116    #[rpc(tx = mpsc::Sender<AddProgressItem>)]
117    ImportPath(ImportPathRequest),
118    #[rpc(tx = mpsc::Sender<ExportProgressItem>)]
119    ExportPath(ExportPathRequest),
120    #[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
121    ListTags(ListTagsRequest),
122    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
123    SetTag(SetTagRequest),
124    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
125    DeleteTags(DeleteTagsRequest),
126    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
127    RenameTag(RenameTagRequest),
128    #[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
129    CreateTag(CreateTagRequest),
130    #[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
131    ListTempTags(ListTempTagsRequest),
132    #[rpc(tx = oneshot::Sender<TempTag>)]
133    CreateTempTag(CreateTempTagRequest),
134    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
135    SyncDb(SyncDbRequest),
136    #[rpc(tx = oneshot::Sender<()>)]
137    Shutdown(ShutdownRequest),
138    #[rpc(tx = oneshot::Sender<super::Result<()>>)]
139    ClearProtected(ClearProtectedRequest),
140}
141
142#[derive(Debug, Serialize, Deserialize)]
143pub struct SyncDbRequest;
144
145#[derive(Debug, Serialize, Deserialize)]
146pub struct ShutdownRequest;
147
148#[derive(Debug, Serialize, Deserialize)]
149pub struct ClearProtectedRequest;
150
151#[derive(Debug, Serialize, Deserialize)]
152pub struct BlobStatusRequest {
153    pub hash: Hash,
154}
155
156#[derive(Debug, Serialize, Deserialize)]
157pub struct ListRequest;
158
159#[derive(Debug, Serialize, Deserialize)]
160pub struct BatchRequest;
161
162#[derive(Debug, Serialize, Deserialize)]
163pub enum BatchResponse {
164    Drop(HashAndFormat),
165    Ping,
166}
167
168/// Options for force deletion of blobs.
169#[derive(Debug, Serialize, Deserialize)]
170pub struct BlobDeleteRequest {
171    pub hashes: Vec<Hash>,
172    pub force: bool,
173}
174
175/// Import the given bytes.
176#[derive(Serialize, Deserialize)]
177pub struct ImportBytesRequest {
178    pub data: Bytes,
179    pub format: BlobFormat,
180    pub scope: Scope,
181}
182
183impl fmt::Debug for ImportBytesRequest {
184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185        f.debug_struct("ImportBytes")
186            .field("data", &self.data.len())
187            .field("format", &self.format)
188            .field("scope", &self.scope)
189            .finish()
190    }
191}
192
193#[derive(Debug, Serialize, Deserialize)]
194pub struct ImportPathRequest {
195    pub path: PathBuf,
196    pub mode: ImportMode,
197    pub format: BlobFormat,
198    pub scope: Scope,
199}
200
201/// Import bao encoded data for the given hash with the iroh block size.
202///
203/// The result is just a single item, indicating if a write error occurred.
204/// To observe the incoming data more granularly, use the `Observe` command
205/// concurrently.
206#[derive(Debug, Serialize, Deserialize)]
207pub struct ImportBaoRequest {
208    pub hash: Hash,
209    pub size: NonZeroU64,
210}
211
212/// Observe the local bitfield of the given hash.
213#[derive(Debug, Serialize, Deserialize)]
214pub struct ObserveRequest {
215    pub hash: Hash,
216}
217
218/// Export the given ranges in bao format, with the iroh block size.
219///
220/// The returned stream should be verified by the store.
221#[derive(Debug, Serialize, Deserialize)]
222pub struct ExportBaoRequest {
223    pub hash: Hash,
224    pub ranges: ChunkRanges,
225}
226
227/// Export the given ranges as chunkks, without validation.
228#[derive(Debug, Serialize, Deserialize)]
229pub struct ExportRangesRequest {
230    pub hash: Hash,
231    pub ranges: RangeSet2<u64>,
232}
233
234/// Export a file to a target path.
235///
236/// For an incomplete file, the size might be truncated and gaps will be filled
237/// with zeros. If possible, a store implementation should try to write as a
238/// sparse file.
239
240#[derive(Debug, Serialize, Deserialize)]
241pub struct ExportPathRequest {
242    pub hash: Hash,
243    pub mode: ExportMode,
244    pub target: PathBuf,
245}
246
247#[derive(Debug, Serialize, Deserialize)]
248pub struct ImportByteStreamRequest {
249    pub format: BlobFormat,
250    pub scope: Scope,
251}
252
253#[derive(Debug, Serialize, Deserialize)]
254pub enum ImportByteStreamUpdate {
255    Bytes(Bytes),
256    Done,
257}
258
259/// Options for a list operation.
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ListTagsRequest {
262    /// List tags to hash seqs
263    pub hash_seq: bool,
264    /// List tags to raw blobs
265    pub raw: bool,
266    /// Optional from tag (inclusive)
267    pub from: Option<Tag>,
268    /// Optional to tag (exclusive)
269    pub to: Option<Tag>,
270}
271
272impl ListTagsRequest {
273    /// List a range of tags
274    pub fn range<R, E>(range: R) -> Self
275    where
276        R: RangeBounds<E>,
277        E: AsRef<[u8]>,
278    {
279        let (from, to) = tags_from_range(range);
280        Self {
281            from,
282            to,
283            raw: true,
284            hash_seq: true,
285        }
286    }
287
288    /// List tags with a prefix
289    pub fn prefix(prefix: &[u8]) -> Self {
290        let from = Tag::from(prefix);
291        let to = from.next_prefix();
292        Self {
293            raw: true,
294            hash_seq: true,
295            from: Some(from),
296            to,
297        }
298    }
299
300    /// List a single tag
301    pub fn single(name: &[u8]) -> Self {
302        let from = Tag::from(name);
303        Self {
304            to: Some(from.successor()),
305            from: Some(from),
306            raw: true,
307            hash_seq: true,
308        }
309    }
310
311    /// List all tags
312    pub fn all() -> Self {
313        Self {
314            raw: true,
315            hash_seq: true,
316            from: None,
317            to: None,
318        }
319    }
320
321    /// List raw tags
322    pub fn raw() -> Self {
323        Self {
324            raw: true,
325            hash_seq: false,
326            from: None,
327            to: None,
328        }
329    }
330
331    /// List hash seq tags
332    pub fn hash_seq() -> Self {
333        Self {
334            raw: false,
335            hash_seq: true,
336            from: None,
337            to: None,
338        }
339    }
340}
341
342/// Information about a tag.
343#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
344pub struct TagInfo {
345    /// Name of the tag
346    pub name: Tag,
347    /// Format of the data
348    pub format: BlobFormat,
349    /// Hash of the data
350    pub hash: Hash,
351}
352
353impl From<TagInfo> for HashAndFormat {
354    fn from(tag_info: TagInfo) -> Self {
355        HashAndFormat {
356            hash: tag_info.hash,
357            format: tag_info.format,
358        }
359    }
360}
361
362impl TagInfo {
363    /// Create a new tag info.
364    pub fn new(name: impl AsRef<[u8]>, value: impl Into<HashAndFormat>) -> Self {
365        let name = name.as_ref();
366        let value = value.into();
367        Self {
368            name: Tag::from(name),
369            hash: value.hash,
370            format: value.format,
371        }
372    }
373
374    /// Get the hash and format of the tag.
375    pub fn hash_and_format(&self) -> HashAndFormat {
376        HashAndFormat {
377            hash: self.hash,
378            format: self.format,
379        }
380    }
381}
382
383pub(crate) fn tags_from_range<R, E>(range: R) -> (Option<Tag>, Option<Tag>)
384where
385    R: RangeBounds<E>,
386    E: AsRef<[u8]>,
387{
388    let from = match range.start_bound() {
389        Bound::Included(start) => Some(Tag::from(start.as_ref())),
390        Bound::Excluded(start) => Some(Tag::from(start.as_ref()).successor()),
391        Bound::Unbounded => None,
392    };
393    let to = match range.end_bound() {
394        Bound::Included(end) => Some(Tag::from(end.as_ref()).successor()),
395        Bound::Excluded(end) => Some(Tag::from(end.as_ref())),
396        Bound::Unbounded => None,
397    };
398    (from, to)
399}
400
401/// List all temp tags
402#[derive(Debug, Serialize, Deserialize)]
403pub struct CreateTempTagRequest {
404    pub scope: Scope,
405    pub value: HashAndFormat,
406}
407
408/// List all temp tags
409#[derive(Debug, Serialize, Deserialize)]
410pub struct ListTempTagsRequest;
411
412/// Rename a tag atomically
413#[derive(Debug, Serialize, Deserialize)]
414pub struct RenameTagRequest {
415    /// Old tag name
416    pub from: Tag,
417    /// New tag name
418    pub to: Tag,
419}
420
421/// Options for a delete operation.
422#[derive(Debug, Clone, Serialize, Deserialize)]
423pub struct DeleteTagsRequest {
424    /// Optional from tag (inclusive)
425    pub from: Option<Tag>,
426    /// Optional to tag (exclusive)
427    pub to: Option<Tag>,
428}
429
430impl DeleteTagsRequest {
431    /// Delete a single tag
432    pub fn single(name: &[u8]) -> Self {
433        let name = Tag::from(name);
434        Self {
435            to: Some(name.successor()),
436            from: Some(name),
437        }
438    }
439
440    /// Delete a range of tags
441    pub fn range<R, E>(range: R) -> Self
442    where
443        R: RangeBounds<E>,
444        E: AsRef<[u8]>,
445    {
446        let (from, to) = tags_from_range(range);
447        Self { from, to }
448    }
449
450    /// Delete tags with a prefix
451    pub fn prefix(prefix: &[u8]) -> Self {
452        let from = Tag::from(prefix);
453        let to = from.next_prefix();
454        Self {
455            from: Some(from),
456            to,
457        }
458    }
459}
460
461/// Options for creating a tag or setting it to a new value.
462#[derive(Debug, Serialize, Deserialize)]
463pub struct SetTagRequest {
464    pub name: Tag,
465    pub value: HashAndFormat,
466}
467
468/// Options for creating a tag
469#[derive(Debug, Serialize, Deserialize)]
470pub struct CreateTagRequest {
471    pub value: HashAndFormat,
472}
473
474/// Debug tool to exit the process in the middle of a write transaction, for testing.
475#[derive(Debug, Serialize, Deserialize)]
476pub struct ProcessExitRequest {
477    pub code: i32,
478}
479
480/// Progress events for importing from any local source.
481///
482/// For sources with known size such as blobs or files, you will get the events
483/// in the following order:
484///
485/// Size -> CopyProgress(*n) -> CopyDone -> OutboardProgress(*n) -> Done
486///
487/// For sources with unknown size such as streams, you will get the events
488/// in the following order:
489///
490/// CopyProgress(*n) -> Size -> CopyDone -> OutboardProgress(*n) -> Done
491///
492/// Errors can happen at any time, and will be reported as an `Error` event.
493#[derive(Debug, Serialize, Deserialize)]
494pub enum AddProgressItem {
495    /// Progress copying the file into the data directory.
496    ///
497    /// On most modern systems, copying will be done with copy on write,
498    /// so copying will be instantaneous and you won't get any of these.
499    ///
500    /// The number is the *byte offset* of the copy process.
501    ///
502    /// This is an ephemeral progress event, so you can't rely on getting
503    /// regular updates.
504    CopyProgress(u64),
505    /// Size of the file or stream has been determined.
506    ///
507    /// For some input such as blobs or files, the size is immediately known.
508    /// For other inputs such as streams, the size is determined by reading
509    /// the stream to the end.
510    ///
511    /// This is a guaranteed progress event, so you can rely on getting exactly
512    /// one of these.
513    Size(u64),
514    /// The copy part of the import operation is done.
515    ///
516    /// This is a guaranteed progress event, so you can rely on getting exactly
517    /// one of these.
518    CopyDone,
519    /// Progress computing the outboard and root hash of the imported data.
520    ///
521    /// This is an ephemeral progress event, so you can't rely on getting
522    /// regular updates.
523    OutboardProgress(u64),
524    /// The import is done. Once you get this event the data is available
525    /// and protected in the store via the temp tag.
526    ///
527    /// This is a guaranteed progress event, so you can rely on getting exactly
528    /// one of these if the operation was successful.
529    ///
530    /// This is one of the two possible final events. After this event, there
531    /// won't be any more progress events.
532    Done(TempTag),
533    /// The import failed with an error. Partial data will be deleted.
534    ///
535    /// This is a guaranteed progress event, so you can rely on getting exactly
536    /// one of these if the operation was unsuccessful.
537    ///
538    /// This is one of the two possible final events. After this event, there
539    /// won't be any more progress events.
540    Error(#[serde(with = "crate::util::serde::io_error_serde")] io::Error),
541}
542
543impl From<io::Error> for AddProgressItem {
544    fn from(e: io::Error) -> Self {
545        Self::Error(e)
546    }
547}
548
549#[derive(Debug, Serialize, Deserialize)]
550pub enum ExportRangesItem {
551    /// The size of the file being exported.
552    ///
553    /// This is a guaranteed progress event, so you can rely on getting exactly
554    /// one of these.
555    Size(u64),
556    /// A range of the file being exported.
557    Data(Leaf),
558    /// Error while exporting the ranges.
559    Error(super::Error),
560}
561
562impl From<super::Error> for ExportRangesItem {
563    fn from(e: super::Error) -> Self {
564        Self::Error(e)
565    }
566}
567
568impl From<Leaf> for ExportRangesItem {
569    fn from(leaf: Leaf) -> Self {
570        Self::Data(leaf)
571    }
572}
573
574/// Progress events for exporting to a local file.
575///
576/// Exporting does not involve outboard computation, so the events are simpler
577/// than [`AddProgressItem`].
578///
579/// Size -> CopyProgress(*n) -> Done
580///
581/// Errors can happen at any time, and will be reported as an `Error` event.
582#[derive(Debug, Serialize, Deserialize)]
583pub enum ExportProgressItem {
584    /// The size of the file being exported.
585    ///
586    /// This is a guaranteed progress event, so you can rely on getting exactly
587    /// one of these.
588    Size(u64),
589    /// Progress copying the file to the target directory.
590    ///
591    /// On many modern systems, copying will be done with copy on write,
592    /// so copying will be instantaneous and you won't get any of these.
593    ///
594    /// This is an ephemeral progress event, so you can't rely on getting
595    /// regular updates.
596    CopyProgress(u64),
597    /// The export is done. Once you get this event the data is available.
598    ///
599    /// This is a guaranteed progress event, so you can rely on getting exactly
600    /// one of these if the operation was successful.
601    ///
602    /// This is one of the two possible final events. After this event, there
603    /// won't be any more progress events.
604    Done,
605    /// The export failed with an error.
606    ///
607    /// This is a guaranteed progress event, so you can rely on getting exactly
608    /// one of these if the operation was unsuccessful.
609    ///
610    /// This is one of the two possible final events. After this event, there
611    /// won't be any more progress events.
612    Error(super::Error),
613}
614
615impl From<super::Error> for ExportProgressItem {
616    fn from(e: super::Error) -> Self {
617        Self::Error(e)
618    }
619}
620
621/// The import mode describes how files will be imported.
622///
623/// This is a hint to the import trait method. For some implementations, this
624/// does not make any sense. E.g. an in memory implementation will always have
625/// to copy the file into memory. Also, a disk based implementation might choose
626/// to copy small files even if the mode is `Reference`.
627#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
628pub enum ImportMode {
629    /// This mode will copy the file into the database before hashing.
630    ///
631    /// This is the safe default because the file can not be accidentally modified
632    /// after it has been imported.
633    #[default]
634    Copy,
635    /// This mode will try to reference the file in place and assume it is unchanged after import.
636    ///
637    /// This has a large performance and storage benefit, but it is less safe since
638    /// the file might be modified after it has been imported.
639    ///
640    /// Stores are allowed to ignore this mode and always copy the file, e.g.
641    /// if the file is very small or if the store does not support referencing files.
642    TryReference,
643}
644
645/// The import mode describes how files will be imported.
646///
647/// This is a hint to the import trait method. For some implementations, this
648/// does not make any sense. E.g. an in memory implementation will always have
649/// to copy the file into memory. Also, a disk based implementation might choose
650/// to copy small files even if the mode is `Reference`.
651#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize, Serialize)]
652pub enum ExportMode {
653    /// This mode will copy the file to the target directory.
654    ///
655    /// This is the safe default because the file can not be accidentally modified
656    /// after it has been exported.
657    #[default]
658    Copy,
659    /// This mode will try to move the file to the target directory and then reference it from
660    /// the database.
661    ///
662    /// This has a large performance and storage benefit, but it is less safe since
663    /// the file might be modified in the target directory after it has been exported.
664    ///
665    /// Stores are allowed to ignore this mode and always copy the file, e.g.
666    /// if the file is very small or if the store does not support referencing files.
667    TryReference,
668}
669
670/// Status information about a blob.
671#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
672pub enum BlobStatus {
673    /// The blob is not stored at all.
674    NotFound,
675    /// The blob is only stored partially.
676    Partial {
677        /// The size of the currently stored partial blob.
678        size: Option<u64>,
679    },
680    /// The blob is stored completely.
681    Complete {
682        /// The size of the blob.
683        size: u64,
684    },
685}
686
687/// A scope for a write operation.
688#[derive(
689    Serialize, Deserialize, Default, Clone, Copy, PartialEq, Eq, Hash, derive_more::Display,
690)]
691pub struct Scope(pub(crate) u64);
692
693impl Scope {
694    pub const GLOBAL: Self = Self(0);
695}
696
697impl std::fmt::Debug for Scope {
698    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
699        if self.0 == 0 {
700            write!(f, "Global")
701        } else {
702            f.debug_tuple("Scope").field(&self.0).finish()
703        }
704    }
705}