iroh_blobs/
net_protocol.rs

1//! Adaptation of `iroh-blobs` as an [`iroh`] [`ProtocolHandler`].
2//!
3//! This is the easiest way to share data from a [`crate::api::Store`] over iroh connections.
4//!
5//! # Example
6//!
7//! ```rust
8//! # async fn example() -> anyhow::Result<()> {
9//! use iroh::{protocol::Router, Endpoint};
10//! use iroh_blobs::{store, BlobsProtocol};
11//!
12//! // create a store
13//! let store = store::fs::FsStore::load("blobs").await?;
14//!
15//! // add some data
16//! let t = store.add_slice(b"hello world").await?;
17//!
18//! // create an iroh endpoint
19//! let endpoint = Endpoint::builder().discovery_n0().bind().await?;
20//!
21//! // create a blobs protocol handler
22//! let blobs = BlobsProtocol::new(&store, endpoint.clone(), None);
23//!
24//! // create a router and add the blobs protocol handler
25//! let router = Router::builder(endpoint)
26//!     .accept(iroh_blobs::ALPN, blobs.clone())
27//!     .spawn();
28//!
29//! // this data is now globally available using the ticket
30//! let ticket = blobs.ticket(t).await?;
31//! println!("ticket: {}", ticket);
32//!
33//! // wait for control-c to exit
34//! tokio::signal::ctrl_c().await?;
35//! #   Ok(())
36//! # }
37//! ```
38
39use std::{fmt::Debug, future::Future, ops::Deref, sync::Arc};
40
41use iroh::{
42    endpoint::Connection,
43    protocol::{AcceptError, ProtocolHandler},
44    Endpoint, Watcher,
45};
46use tokio::sync::mpsc;
47use tracing::error;
48
49use crate::{
50    api::Store,
51    provider::{Event, EventSender},
52    ticket::BlobTicket,
53    HashAndFormat,
54};
55
56#[derive(Debug)]
57pub(crate) struct BlobsInner {
58    pub(crate) store: Store,
59    pub(crate) endpoint: Endpoint,
60    pub(crate) events: EventSender,
61}
62
63/// A protocol handler for the blobs protocol.
64#[derive(Debug, Clone)]
65pub struct BlobsProtocol {
66    pub(crate) inner: Arc<BlobsInner>,
67}
68
69impl Deref for BlobsProtocol {
70    type Target = Store;
71
72    fn deref(&self) -> &Self::Target {
73        &self.inner.store
74    }
75}
76
77impl BlobsProtocol {
78    pub fn new(store: &Store, endpoint: Endpoint, events: Option<mpsc::Sender<Event>>) -> Self {
79        Self {
80            inner: Arc::new(BlobsInner {
81                store: store.clone(),
82                endpoint,
83                events: EventSender::new(events),
84            }),
85        }
86    }
87
88    pub fn store(&self) -> &Store {
89        &self.inner.store
90    }
91
92    pub fn endpoint(&self) -> &Endpoint {
93        &self.inner.endpoint
94    }
95
96    /// Create a ticket for content on this node.
97    ///
98    /// Note that this does not check whether the content is partially or fully available. It is
99    /// just a convenience method to create a ticket from content and the address of this node.
100    pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
101        let content = content.into();
102        let addr = self.inner.endpoint.node_addr().initialized().await?;
103        let ticket = BlobTicket::new(addr, content.hash, content.format);
104        Ok(ticket)
105    }
106}
107
108impl ProtocolHandler for BlobsProtocol {
109    fn accept(
110        &self,
111        conn: Connection,
112    ) -> impl Future<Output = std::result::Result<(), AcceptError>> + Send {
113        let store = self.store().clone();
114        let events = self.inner.events.clone();
115
116        Box::pin(async move {
117            crate::provider::handle_connection(conn, store, events).await;
118            Ok(())
119        })
120    }
121
122    fn shutdown(&self) -> impl Future<Output = ()> + Send {
123        let store = self.store().clone();
124        Box::pin(async move {
125            if let Err(cause) = store.shutdown().await {
126                error!("error shutting down store: {:?}", cause);
127            }
128        })
129    }
130}