iroh_blobs/
net_protocol.rs

1//! Adaptation of `iroh-blobs` as an `iroh` protocol.
2
3// TODO: reduce API surface and add documentation
4#![allow(missing_docs)]
5
6use std::{
7    collections::BTreeSet,
8    fmt::Debug,
9    ops::{Deref, DerefMut},
10    sync::Arc,
11};
12
13use anyhow::{bail, Result};
14use futures_lite::future::Boxed as BoxedFuture;
15use futures_util::future::BoxFuture;
16use iroh::{endpoint::Connection, protocol::ProtocolHandler, Endpoint, NodeAddr};
17use serde::{Deserialize, Serialize};
18use tracing::debug;
19
20use crate::{
21    downloader::{self, ConcurrencyLimits, Downloader, RetryConfig},
22    metrics::Metrics,
23    provider::EventSender,
24    store::GcConfig,
25    util::{
26        local_pool::{self, LocalPool, LocalPoolHandle},
27        SetTagOption,
28    },
29    BlobFormat, Hash,
30};
31
32/// A callback that blobs can ask about a set of hashes that should not be garbage collected.
33pub type ProtectCb = Box<dyn Fn(&mut BTreeSet<Hash>) -> BoxFuture<()> + Send + Sync>;
34
35/// The state of the gc loop.
36#[derive(derive_more::Debug)]
37enum GcState {
38    // Gc loop is not yet running. Other protocols can add protect callbacks
39    Initial(#[debug(skip)] Vec<ProtectCb>),
40    // Gc loop is running. No more protect callbacks can be added.
41    Started(#[allow(dead_code)] Option<local_pool::Run<()>>),
42}
43
44impl Default for GcState {
45    fn default() -> Self {
46        Self::Initial(Vec::new())
47    }
48}
49
50#[derive(Debug)]
51enum Rt {
52    Handle(LocalPoolHandle),
53    Owned(LocalPool),
54}
55
56impl Deref for Rt {
57    type Target = LocalPoolHandle;
58
59    fn deref(&self) -> &Self::Target {
60        match self {
61            Self::Handle(ref handle) => handle,
62            Self::Owned(ref pool) => pool.handle(),
63        }
64    }
65}
66
67#[derive(Debug)]
68pub(crate) struct BlobsInner<S> {
69    rt: Rt,
70    pub(crate) store: S,
71    events: EventSender,
72    pub(crate) downloader: Downloader,
73    pub(crate) endpoint: Endpoint,
74    gc_state: std::sync::Mutex<GcState>,
75    #[cfg(feature = "rpc")]
76    pub(crate) batches: tokio::sync::Mutex<BlobBatches>,
77}
78
79impl<S> BlobsInner<S> {
80    pub(crate) fn rt(&self) -> &LocalPoolHandle {
81        &self.rt
82    }
83}
84
85#[derive(Debug, Clone)]
86pub struct Blobs<S> {
87    pub(crate) inner: Arc<BlobsInner<S>>,
88    #[cfg(feature = "rpc")]
89    pub(crate) rpc_handler: Arc<std::sync::OnceLock<crate::rpc::RpcHandler>>,
90}
91
92/// Keeps track of all the currently active batch operations of the blobs api.
93#[cfg(feature = "rpc")]
94#[derive(Debug, Default)]
95pub(crate) struct BlobBatches {
96    /// Currently active batches
97    batches: std::collections::BTreeMap<BatchId, BlobBatch>,
98    /// Used to generate new batch ids.
99    max: u64,
100}
101
102/// A single batch of blob operations
103#[cfg(feature = "rpc")]
104#[derive(Debug, Default)]
105struct BlobBatch {
106    /// The tags in this batch.
107    tags: std::collections::BTreeMap<crate::HashAndFormat, Vec<crate::TempTag>>,
108}
109
110#[cfg(feature = "rpc")]
111impl BlobBatches {
112    /// Create a new unique batch id.
113    pub fn create(&mut self) -> BatchId {
114        let id = self.max;
115        self.max += 1;
116        BatchId(id)
117    }
118
119    /// Store a temp tag in a batch identified by a batch id.
120    pub fn store(&mut self, batch: BatchId, tt: crate::TempTag) {
121        let entry = self.batches.entry(batch).or_default();
122        entry.tags.entry(tt.hash_and_format()).or_default().push(tt);
123    }
124
125    /// Remove a tag from a batch.
126    pub fn remove_one(&mut self, batch: BatchId, content: &crate::HashAndFormat) -> Result<()> {
127        if let Some(batch) = self.batches.get_mut(&batch) {
128            if let Some(tags) = batch.tags.get_mut(content) {
129                tags.pop();
130                if tags.is_empty() {
131                    batch.tags.remove(content);
132                }
133                return Ok(());
134            }
135        }
136        // this can happen if we try to upgrade a tag from an expired batch
137        anyhow::bail!("tag not found in batch");
138    }
139
140    /// Remove an entire batch.
141    pub fn remove(&mut self, batch: BatchId) {
142        self.batches.remove(&batch);
143    }
144}
145
146/// Builder for the Blobs protocol handler
147#[derive(Debug)]
148pub struct Builder<S> {
149    store: S,
150    events: Option<EventSender>,
151    downloader_config: Option<crate::downloader::Config>,
152    rt: Option<LocalPoolHandle>,
153}
154
155impl<S: crate::store::Store> Builder<S> {
156    /// Set the event sender for the blobs protocol.
157    pub fn events(mut self, value: EventSender) -> Self {
158        self.events = Some(value);
159        self
160    }
161
162    /// Set a custom [`LocalPoolHandle`] to use.
163    pub fn local_pool(mut self, rt: LocalPoolHandle) -> Self {
164        self.rt = Some(rt);
165        self
166    }
167
168    /// Set custom downloader config
169    pub fn downloader_config(mut self, downloader_config: downloader::Config) -> Self {
170        self.downloader_config = Some(downloader_config);
171        self
172    }
173
174    /// Set custom [`ConcurrencyLimits`] to use.
175    pub fn concurrency_limits(mut self, concurrency_limits: ConcurrencyLimits) -> Self {
176        let downloader_config = self.downloader_config.get_or_insert_with(Default::default);
177        downloader_config.concurrency = concurrency_limits;
178        self
179    }
180
181    /// Set a custom [`RetryConfig`] to use.
182    pub fn retry_config(mut self, retry_config: RetryConfig) -> Self {
183        let downloader_config = self.downloader_config.get_or_insert_with(Default::default);
184        downloader_config.retry = retry_config;
185        self
186    }
187
188    /// Build the Blobs protocol handler.
189    /// You need to provide a the endpoint.
190    pub fn build(self, endpoint: &Endpoint) -> Blobs<S> {
191        let rt = self
192            .rt
193            .map(Rt::Handle)
194            .unwrap_or_else(|| Rt::Owned(LocalPool::default()));
195        let downloader_config = self.downloader_config.unwrap_or_default();
196        let downloader = Downloader::with_config(
197            self.store.clone(),
198            endpoint.clone(),
199            rt.clone(),
200            downloader_config,
201        );
202        Blobs::new(
203            self.store,
204            rt,
205            self.events.unwrap_or_default(),
206            downloader,
207            endpoint.clone(),
208        )
209    }
210}
211
212impl<S> Blobs<S> {
213    /// Create a new Blobs protocol handler builder, given a store.
214    pub fn builder(store: S) -> Builder<S> {
215        Builder {
216            store,
217            events: None,
218            downloader_config: None,
219            rt: None,
220        }
221    }
222}
223
224impl Blobs<crate::store::mem::Store> {
225    /// Create a new memory-backed Blobs protocol handler.
226    pub fn memory() -> Builder<crate::store::mem::Store> {
227        Self::builder(crate::store::mem::Store::new())
228    }
229}
230
231impl Blobs<crate::store::fs::Store> {
232    /// Load a persistent Blobs protocol handler from a path.
233    pub async fn persistent(
234        path: impl AsRef<std::path::Path>,
235    ) -> anyhow::Result<Builder<crate::store::fs::Store>> {
236        Ok(Self::builder(crate::store::fs::Store::load(path).await?))
237    }
238}
239
240impl<S: crate::store::Store> Blobs<S> {
241    fn new(
242        store: S,
243        rt: Rt,
244        events: EventSender,
245        downloader: Downloader,
246        endpoint: Endpoint,
247    ) -> Self {
248        Self {
249            inner: Arc::new(BlobsInner {
250                rt,
251                store,
252                events,
253                downloader,
254                endpoint,
255                #[cfg(feature = "rpc")]
256                batches: Default::default(),
257                gc_state: Default::default(),
258            }),
259            #[cfg(feature = "rpc")]
260            rpc_handler: Default::default(),
261        }
262    }
263
264    pub fn store(&self) -> &S {
265        &self.inner.store
266    }
267
268    pub fn metrics(&self) -> &Arc<Metrics> {
269        self.downloader().metrics()
270    }
271
272    pub fn events(&self) -> &EventSender {
273        &self.inner.events
274    }
275
276    pub fn rt(&self) -> &LocalPoolHandle {
277        self.inner.rt()
278    }
279
280    pub fn downloader(&self) -> &Downloader {
281        &self.inner.downloader
282    }
283
284    pub fn endpoint(&self) -> &Endpoint {
285        &self.inner.endpoint
286    }
287
288    /// Add a callback that will be called before the garbage collector runs.
289    ///
290    /// This can only be called before the garbage collector has started, otherwise it will return an error.
291    pub fn add_protected(&self, cb: ProtectCb) -> Result<()> {
292        let mut state = self.inner.gc_state.lock().unwrap();
293        match &mut *state {
294            GcState::Initial(cbs) => {
295                cbs.push(cb);
296            }
297            GcState::Started(_) => {
298                anyhow::bail!("cannot add protected blobs after gc has started");
299            }
300        }
301        Ok(())
302    }
303
304    /// Start garbage collection with the given settings.
305    pub fn start_gc(&self, config: GcConfig) -> Result<()> {
306        let mut state = self.inner.gc_state.lock().unwrap();
307        let protected = match state.deref_mut() {
308            GcState::Initial(items) => std::mem::take(items),
309            GcState::Started(_) => bail!("gc already started"),
310        };
311        let protected = Arc::new(protected);
312        let protected_cb = move || {
313            let protected = protected.clone();
314            async move {
315                let mut set = BTreeSet::new();
316                for cb in protected.iter() {
317                    cb(&mut set).await;
318                }
319                set
320            }
321        };
322        let store = self.store().clone();
323        let run = self
324            .rt()
325            .spawn(move || async move { store.gc_run(config, protected_cb).await });
326        *state = GcState::Started(Some(run));
327        Ok(())
328    }
329}
330
331impl<S: crate::store::Store> ProtocolHandler for Blobs<S> {
332    fn accept(&self, conn: Connection) -> BoxedFuture<Result<()>> {
333        let db = self.store().clone();
334        let events = self.events().clone();
335        let rt = self.rt().clone();
336
337        Box::pin(async move {
338            crate::provider::handle_connection(conn, db, events, rt).await;
339            Ok(())
340        })
341    }
342
343    fn shutdown(&self) -> BoxedFuture<()> {
344        let store = self.store().clone();
345        Box::pin(async move {
346            store.shutdown().await;
347        })
348    }
349}
350
351/// A request to the node to download and share the data specified by the hash.
352#[derive(Debug, Clone, Serialize, Deserialize)]
353pub struct BlobDownloadRequest {
354    /// This mandatory field contains the hash of the data to download and share.
355    pub hash: Hash,
356    /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as
357    /// well.
358    pub format: BlobFormat,
359    /// This mandatory field specifies the nodes to download the data from.
360    ///
361    /// If set to more than a single node, they will all be tried. If `mode` is set to
362    /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds.
363    /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel,
364    /// if the concurrency limits permit.
365    pub nodes: Vec<NodeAddr>,
366    /// Optional tag to tag the data with.
367    pub tag: SetTagOption,
368    /// Whether to directly start the download or add it to the download queue.
369    pub mode: DownloadMode,
370}
371
372/// Set the mode for whether to directly start the download or add it to the download queue.
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub enum DownloadMode {
375    /// Start the download right away.
376    ///
377    /// No concurrency limits or queuing will be applied. It is up to the user to manage download
378    /// concurrency.
379    Direct,
380    /// Queue the download.
381    ///
382    /// The download queue will be processed in-order, while respecting the downloader concurrency limits.
383    Queued,
384}
385
386/// Newtype for a batch id
387#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
388pub struct BatchId(pub u64);