iroh_blobs/
net_protocol.rs

1//! Adaptation of `iroh-blobs` as an `iroh` protocol.
2
3// TODO: reduce API surface and add documentation
4#![allow(missing_docs)]
5
6use std::{
7    collections::BTreeSet,
8    fmt::Debug,
9    ops::{Deref, DerefMut},
10    sync::Arc,
11};
12
13use anyhow::{bail, Result};
14use futures_lite::future::Boxed as BoxedFuture;
15use futures_util::future::BoxFuture;
16use iroh::{endpoint::Connection, protocol::ProtocolHandler, Endpoint, NodeAddr};
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::{
21    downloader::{ConcurrencyLimits, Downloader, RetryConfig},
22    provider::EventSender,
23    store::GcConfig,
24    util::{
25        local_pool::{self, LocalPool, LocalPoolHandle},
26        SetTagOption,
27    },
28    BlobFormat, Hash,
29};
30
31/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
32pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
33
34/// The state of the gc loop.
35#[derive(derive_more::Debug)]
36enum GcState {
37    // Gc loop is not yet running. Other protocols can add protect callbacks
38    Initial(#[debug(skip)] Vec<ProtectCb>),
39    // Gc loop is running. No more protect callbacks can be added.
40    Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
41}
42
43impl Default for GcState {
44    fn default() -> Self {
45        Self::Initial(Vec::new())
46    }
47}
48
49#[derive(Debug)]
50enum Rt {
51    Handle(LocalPoolHandle),
52    Owned(LocalPool),
53}
54
55impl Deref for Rt {
56    type Target = LocalPoolHandle;
57
58    fn deref(&self) -> &Self::Target {
59        match self {
60            Self::Handle(ref handle) => handle,
61            Self::Owned(ref pool) => pool.handle(),
62        }
63    }
64}
65
66#[derive(Debug)]
67pub(crate) struct BlobsInner<S> {
68    rt: Rt,
69    pub(crate) store: S,
70    events: EventSender,
71    pub(crate) downloader: Downloader,
72    pub(crate) endpoint: Endpoint,
73    gc_state: std::sync::Mutex<GcState>,
74    #[cfg(feature = "rpc")]
75    pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
76}
77
78impl<S> BlobsInner<S> {
79    pub(crate) fn rt(&self) -> &LocalPoolHandle {
80        &self.rt
81    }
82}
83
84#[derive(Debug, Clone)]
85pub struct Blobs<S> {
86    pub(crate) inner: Arc<BlobsInner<S>>,
87    #[cfg(feature = "rpc")]
88    pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
89}
90
91/// Keeps track of all the currently active batch operations of the blobs api.
92#[cfg(feature = "rpc")]
93#[derive(Debug, Default)]
94pub(crate) struct BlobBatches {
95    /// Currently active batches
96    batches: std::collections::BTreeMap<BatchId, BlobBatch>,
97    /// Used to generate new batch ids.
98    max: u64,
99}
100
101/// A single batch of blob operations
102#[cfg(feature = "rpc")]
103#[derive(Debug, Default)]
104struct BlobBatch {
105    /// The tags in this batch.
106    tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
107}
108
109#[cfg(feature = "rpc")]
110impl BlobBatches {
111    /// Create a new unique batch id.
112    pub fn create(&mut self) -> BatchId {
113        let id = self.max;
114        self.max += 1;
115        BatchId(id)
116    }
117
118    /// Store a temp tag in a batch identified by a batch id.
119    pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
120        let entry = self.batches.entry(batch).or_default();
121        entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
122    }
123
124    /// Remove a tag from a batch.
125    pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
126        if let Some(batch) = self.batches.get_mut(&batch) {
127            if let Some(tags) = batch.tags.get_mut(content) {
128                tags.pop();
129                if tags.is_empty() {
130                    batch.tags.remove(content);
131                }
132                return Ok(());
133            }
134        }
135        // this can happen if we try to upgrade a tag from an expired batch
136        anyhow::bail!("tag not found in batch");
137    }
138
139    /// Remove an entire batch.
140    pub fn remove(&mut self, batch: BatchId) {
141        self.batches.remove(&batch);
142    }
143}
144
145/// Builder for the Blobs protocol handler
146#[derive(Debug)]
147pub struct Builder<S> {
148    store: S,
149    events: Option<EventSender>,
150    rt: Option<LocalPoolHandle>,
151    concurrency_limits: Option<ConcurrencyLimits>,
152    retry_config: Option<RetryConfig>,
153}
154
155impl<S: crate::store::Store> Builder<S> {
156    /// Set the event sender for the blobs protocol.
157    pub fn events(mut self, value: EventSender) -> Self {
158        self.events = Some(value);
159        self
160    }
161
162    /// Set a custom [`LocalPoolHandle`] to use.
163    pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
164        self.rt = Some(rt);
165        self
166    }
167
168    /// Set custom [`ConcurrencyLimits`] to use.
169    pub fn concurrency_limits(mut self, concurrency_limits: ConcurrencyLimits) -> Self {
170        self.concurrency_limits = Some(concurrency_limits);
171        self
172    }
173
174    /// Set a custom [`RetryConfig`] to use.
175    pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
176        self.retry_config = Some(retry_config);
177        self
178    }
179
180    /// Build the Blobs protocol handler.
181    /// You need to provide a the endpoint.
182    pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
183        let rt = self
184            .rt
185            .map(Rt::Handle)
186            .unwrap_or_else(|| Rt::Owned(LocalPool::default()));
187        let downloader = Downloader::with_config(
188            self.store.clone(),
189            endpoint.clone(),
190            rt.clone(),
191            self.concurrency_limits.unwrap_or_default(),
192            self.retry_config.unwrap_or_default(),
193        );
194        Blobs::new(
195            self.store,
196            rt,
197            self.events.unwrap_or_default(),
198            downloader,
199            endpoint.clone(),
200        )
201    }
202}
203
204impl<S> Blobs<S> {
205    /// Create a new Blobs protocol handler builder, given a store.
206    pub fn builder(store: S) -> Builder<S> {
207        Builder {
208            store,
209            events: None,
210            rt: None,
211            concurrency_limits: None,
212            retry_config: None,
213        }
214    }
215}
216
217impl Blobs<crate::store::mem::Store> {
218    /// Create a new memory-backed Blobs protocol handler.
219    pub fn memory() -> Builder<crate::store::mem::Store> {
220        Self::builder(crate::store::mem::Store::new())
221    }
222}
223
224impl Blobs<crate::store::fs::Store> {
225    /// Load a persistent Blobs protocol handler from a path.
226    pub async fn persistent(
227        path: impl AsRef<std::path::Path>,
228    ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
229        Ok(Self::builder(crate::store::fs::Store::load(path).await?))
230    }
231}
232
233impl<S: crate::store::Store> Blobs<S> {
234    fn new(
235        store: S,
236        rt: Rt,
237        events: EventSender,
238        downloader: Downloader,
239        endpoint: Endpoint,
240    ) -> Self {
241        Self {
242            inner: Arc::new(BlobsInner {
243                rt,
244                store,
245                events,
246                downloader,
247                endpoint,
248                #[cfg(feature = "rpc")]
249                batches: Default::default(),
250                gc_state: Default::default(),
251            }),
252            #[cfg(feature = "rpc")]
253            rpc_handler: Default::default(),
254        }
255    }
256
257    pub fn store(&self) -> &S {
258        &self.inner.store
259    }
260
261    pub fn events(&self) -> &EventSender {
262        &self.inner.events
263    }
264
265    pub fn rt(&self) -> &LocalPoolHandle {
266        self.inner.rt()
267    }
268
269    pub fn downloader(&self) -> &Downloader {
270        &self.inner.downloader
271    }
272
273    pub fn endpoint(&self) -> &Endpoint {
274        &self.inner.endpoint
275    }
276
277    /// Add a callback that will be called before the garbage collector runs.
278    ///
279    /// This can only be called before the garbage collector has started, otherwise it will return an error.
280    pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
281        let mut state = self.inner.gc_state.lock().unwrap();
282        match &mut *state {
283            GcState::Initial(cbs) => {
284                cbs.push(cb);
285            }
286            GcState::Started(_) => {
287                anyhow::bail!("cannot add protected blobs after gc has started");
288            }
289        }
290        Ok(())
291    }
292
293    /// Start garbage collection with the given settings.
294    pub fn start_gc(&self, config: GcConfig) -> Result<()> {
295        let mut state = self.inner.gc_state.lock().unwrap();
296        let protected = match state.deref_mut() {
297            GcState::Initial(items) => std::mem::take(items),
298            GcState::Started(_) => bail!("gc already started"),
299        };
300        let protected = Arc::new(protected);
301        let protected_cb = move || {
302            let protected = protected.clone();
303            async move {
304                let mut set = BTreeSet::new();
305                for cb in protected.iter() {
306                    cb(&mut set).await;
307                }
308                set
309            }
310        };
311        let store = self.store().clone();
312        let run = self
313            .rt()
314            .spawn(move || async move { store.gc_run(config, protected_cb).await });
315        *state = GcState::Started(Some(run));
316        Ok(())
317    }
318}
319
320impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
321    fn accept(&self, conn: Connection) -> BoxedFuture<Result<()>> {
322        let db = self.store().clone();
323        let events = self.events().clone();
324        let rt = self.rt().clone();
325
326        Box::pin(async move {
327            crate::provider::handle_connection(conn, db, events, rt).await;
328            Ok(())
329        })
330    }
331
332    fn shutdown(&self) -> BoxedFuture<()> {
333        let store = self.store().clone();
334        Box::pin(async move {
335            store.shutdown().await;
336        })
337    }
338}
339
340/// A request to the node to download and share the data specified by the hash.
341#[derive(Debug, Clone, Serialize, Deserialize)]
342pub struct BlobDownloadRequest {
343    /// This mandatory field contains the hash of the data to download and share.
344    pub hash: Hash,
345    /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
346    /// well.
347    pub format: BlobFormat,
348    /// This mandatory field specifies the nodes to download the data from.
349    ///
350    /// If set to more than a single node, they will all be tried. If `mode` is set to
351    /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
352    /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
353    /// if the concurrency limits permit.
354    pub nodes: Vec<NodeAddr>,
355    /// Optional tag to tag the data with.
356    pub tag: SetTagOption,
357    /// Whether to directly start the download or add it to the download queue.
358    pub mode: DownloadMode,
359}
360
361/// Set the mode for whether to directly start the download or add it to the download queue.
362#[derive(Debug, Clone, Serialize, Deserialize)]
363pub enum DownloadMode {
364    /// Start the download right away.
365    ///
366    /// No concurrency limits or queuing will be applied. It is up to the user to manage download
367    /// concurrency.
368    Direct,
369    /// Queue the download.
370    ///
371    /// The download queue will be processed in-order, while respecting the downloader concurrency limits.
372    Queued,
373}
374
375/// Newtype for a batch id
376#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
377pub struct BatchId(pub u64);