iroh_blobs/
net_protocol.rs

1//! Adaptation of `iroh-blobs` as an `iroh` protocol.
2
3// TODO: reduce API surface and add documentation
4#![allow(missing_docs)]
5
6use std::{
7    collections::BTreeSet,
8    fmt::Debug,
9    ops::{Deref, DerefMut},
10    sync::Arc,
11};
12
13use anyhow::{bail, Result};
14use futures_lite::future::Boxed as BoxedFuture;
15use futures_util::future::BoxFuture;
16use iroh::{endpoint::Connecting, protocol::ProtocolHandler, Endpoint, NodeAddr};
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::{
21    downloader::Downloader,
22    provider::EventSender,
23    store::GcConfig,
24    util::{
25        local_pool::{self, LocalPool, LocalPoolHandle},
26        SetTagOption,
27    },
28    BlobFormat, Hash,
29};
30
31/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
32pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
33
34/// The state of the gc loop.
35#[derive(derive_more::Debug)]
36enum GcState {
37    // Gc loop is not yet running. Other protocols can add protect callbacks
38    Initial(#[debug(skip)] Vec<ProtectCb>),
39    // Gc loop is running. No more protect callbacks can be added.
40    Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
41}
42
43impl Default for GcState {
44    fn default() -> Self {
45        Self::Initial(Vec::new())
46    }
47}
48
49#[derive(Debug)]
50enum Rt {
51    Handle(LocalPoolHandle),
52    Owned(LocalPool),
53}
54
55impl Deref for Rt {
56    type Target = LocalPoolHandle;
57
58    fn deref(&self) -> &Self::Target {
59        match self {
60            Self::Handle(ref handle) => handle,
61            Self::Owned(ref pool) => pool.handle(),
62        }
63    }
64}
65
66#[derive(Debug)]
67pub(crate) struct BlobsInner<S> {
68    rt: Rt,
69    pub(crate) store: S,
70    events: EventSender,
71    pub(crate) downloader: Downloader,
72    pub(crate) endpoint: Endpoint,
73    gc_state: std::sync::Mutex<GcState>,
74    #[cfg(feature = "rpc")]
75    pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
76}
77
78impl<S> BlobsInner<S> {
79    pub(crate) fn rt(&self) -> &LocalPoolHandle {
80        &self.rt
81    }
82}
83
84#[derive(Debug, Clone)]
85pub struct Blobs<S> {
86    pub(crate) inner: Arc<BlobsInner<S>>,
87    #[cfg(feature = "rpc")]
88    pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
89}
90
91/// Keeps track of all the currently active batch operations of the blobs api.
92#[cfg(feature = "rpc")]
93#[derive(Debug, Default)]
94pub(crate) struct BlobBatches {
95    /// Currently active batches
96    batches: std::collections::BTreeMap<BatchId, BlobBatch>,
97    /// Used to generate new batch ids.
98    max: u64,
99}
100
101/// A single batch of blob operations
102#[cfg(feature = "rpc")]
103#[derive(Debug, Default)]
104struct BlobBatch {
105    /// The tags in this batch.
106    tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
107}
108
109#[cfg(feature = "rpc")]
110impl BlobBatches {
111    /// Create a new unique batch id.
112    pub fn create(&mut self) -> BatchId {
113        let id = self.max;
114        self.max += 1;
115        BatchId(id)
116    }
117
118    /// Store a temp tag in a batch identified by a batch id.
119    pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
120        let entry = self.batches.entry(batch).or_default();
121        entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
122    }
123
124    /// Remove a tag from a batch.
125    pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
126        if let Some(batch) = self.batches.get_mut(&batch) {
127            if let Some(tags) = batch.tags.get_mut(content) {
128                tags.pop();
129                if tags.is_empty() {
130                    batch.tags.remove(content);
131                }
132                return Ok(());
133            }
134        }
135        // this can happen if we try to upgrade a tag from an expired batch
136        anyhow::bail!("tag not found in batch");
137    }
138
139    /// Remove an entire batch.
140    pub fn remove(&mut self, batch: BatchId) {
141        self.batches.remove(&batch);
142    }
143}
144
145/// Builder for the Blobs protocol handler
146#[derive(Debug)]
147pub struct Builder<S> {
148    store: S,
149    events: Option<EventSender>,
150    rt: Option<LocalPoolHandle>,
151}
152
153impl<S: crate::store::Store> Builder<S> {
154    /// Set the event sender for the blobs protocol.
155    pub fn events(mut self, value: EventSender) -> Self {
156        self.events = Some(value);
157        self
158    }
159
160    /// Set a custom `LocalPoolHandle` to use.
161    pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
162        self.rt = Some(rt);
163        self
164    }
165
166    /// Build the Blobs protocol handler.
167    /// You need to provide a the endpoint.
168    pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
169        let rt = self
170            .rt
171            .map(Rt::Handle)
172            .unwrap_or_else(|| Rt::Owned(LocalPool::default()));
173        let downloader = Downloader::new(self.store.clone(), endpoint.clone(), rt.clone());
174        Blobs::new(
175            self.store,
176            rt,
177            self.events.unwrap_or_default(),
178            downloader,
179            endpoint.clone(),
180        )
181    }
182}
183
184impl<S> Blobs<S> {
185    /// Create a new Blobs protocol handler builder, given a store.
186    pub fn builder(store: S) -> Builder<S> {
187        Builder {
188            store,
189            events: None,
190            rt: None,
191        }
192    }
193}
194
195impl Blobs<crate::store::mem::Store> {
196    /// Create a new memory-backed Blobs protocol handler.
197    pub fn memory() -> Builder<crate::store::mem::Store> {
198        Self::builder(crate::store::mem::Store::new())
199    }
200}
201
202impl Blobs<crate::store::fs::Store> {
203    /// Load a persistent Blobs protocol handler from a path.
204    pub async fn persistent(
205        path: impl AsRef<std::path::Path>,
206    ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
207        Ok(Self::builder(crate::store::fs::Store::load(path).await?))
208    }
209}
210
211impl<S: crate::store::Store> Blobs<S> {
212    fn new(
213        store: S,
214        rt: Rt,
215        events: EventSender,
216        downloader: Downloader,
217        endpoint: Endpoint,
218    ) -> Self {
219        Self {
220            inner: Arc::new(BlobsInner {
221                rt,
222                store,
223                events,
224                downloader,
225                endpoint,
226                #[cfg(feature = "rpc")]
227                batches: Default::default(),
228                gc_state: Default::default(),
229            }),
230            #[cfg(feature = "rpc")]
231            rpc_handler: Default::default(),
232        }
233    }
234
235    pub fn store(&self) -> &S {
236        &self.inner.store
237    }
238
239    pub fn events(&self) -> &EventSender {
240        &self.inner.events
241    }
242
243    pub fn rt(&self) -> &LocalPoolHandle {
244        self.inner.rt()
245    }
246
247    pub fn downloader(&self) -> &Downloader {
248        &self.inner.downloader
249    }
250
251    pub fn endpoint(&self) -> &Endpoint {
252        &self.inner.endpoint
253    }
254
255    /// Add a callback that will be called before the garbage collector runs.
256    ///
257    /// This can only be called before the garbage collector has started, otherwise it will return an error.
258    pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
259        let mut state = self.inner.gc_state.lock().unwrap();
260        match &mut *state {
261            GcState::Initial(cbs) => {
262                cbs.push(cb);
263            }
264            GcState::Started(_) => {
265                anyhow::bail!("cannot add protected blobs after gc has started");
266            }
267        }
268        Ok(())
269    }
270
271    /// Start garbage collection with the given settings.
272    pub fn start_gc(&self, config: GcConfig) -> Result<()> {
273        let mut state = self.inner.gc_state.lock().unwrap();
274        let protected = match state.deref_mut() {
275            GcState::Initial(items) => std::mem::take(items),
276            GcState::Started(_) => bail!("gc already started"),
277        };
278        let protected = Arc::new(protected);
279        let protected_cb = move || {
280            let protected = protected.clone();
281            async move {
282                let mut set = BTreeSet::new();
283                for cb in protected.iter() {
284                    cb(&mut set).await;
285                }
286                set
287            }
288        };
289        let store = self.store().clone();
290        let run = self
291            .rt()
292            .spawn(move || async move { store.gc_run(config, protected_cb).await });
293        *state = GcState::Started(Some(run));
294        Ok(())
295    }
296}
297
298impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
299    fn accept(&self, conn: Connecting) -> BoxedFuture<Result<()>> {
300        let db = self.store().clone();
301        let events = self.events().clone();
302        let rt = self.rt().clone();
303
304        Box::pin(async move {
305            crate::provider::handle_connection(conn.await?, db, events, rt).await;
306            Ok(())
307        })
308    }
309
310    fn shutdown(&self) -> BoxedFuture<()> {
311        let store = self.store().clone();
312        Box::pin(async move {
313            store.shutdown().await;
314        })
315    }
316}
317
318/// A request to the node to download and share the data specified by the hash.
319#[derive(Debug, Clone, Serialize, Deserialize)]
320pub struct BlobDownloadRequest {
321    /// This mandatory field contains the hash of the data to download and share.
322    pub hash: Hash,
323    /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
324    /// well.
325    pub format: BlobFormat,
326    /// This mandatory field specifies the nodes to download the data from.
327    ///
328    /// If set to more than a single node, they will all be tried. If `mode` is set to
329    /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
330    /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
331    /// if the concurrency limits permit.
332    pub nodes: Vec<NodeAddr>,
333    /// Optional tag to tag the data with.
334    pub tag: SetTagOption,
335    /// Whether to directly start the download or add it to the download queue.
336    pub mode: DownloadMode,
337}
338
339/// Set the mode for whether to directly start the download or add it to the download queue.
340#[derive(Debug, Clone, Serialize, Deserialize)]
341pub enum DownloadMode {
342    /// Start the download right away.
343    ///
344    /// No concurrency limits or queuing will be applied. It is up to the user to manage download
345    /// concurrency.
346    Direct,
347    /// Queue the download.
348    ///
349    /// The download queue will be processed in-order, while respecting the downloader concurrency limits.
350    Queued,
351}
352
353/// Newtype for a batch id
354#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
355pub struct BatchId(pub u64);