iroh_blobs/
net_protocol.rs

1//! Adaptation of `iroh-blobs` as an [`iroh`] [`ProtocolHandler`].
2//!
3//! This is the easiest way to share data from a [`crate::api::Store`] over iroh connections.
4//!
5//! # Example
6//!
7//! ```rust
8//! # async fn example() -> anyhow::Result<()> {
9//! use iroh::{protocol::Router, Endpoint};
10//! use iroh_blobs::{net_protocol::Blobs, store};
11//!
12//! // create a store
13//! let store = store::fs::FsStore::load("blobs").await?;
14//!
15//! // add some data
16//! let t = store.add_slice(b"hello world").await?;
17//!
18//! // create an iroh endpoint
19//! let endpoint = Endpoint::builder().discovery_n0().bind().await?;
20//!
21//! // create a blobs protocol handler
22//! let blobs = Blobs::new(&store, endpoint.clone(), None);
23//!
24//! // create a router and add the blobs protocol handler
25//! let router = Router::builder(endpoint)
26//!     .accept(iroh_blobs::ALPN, blobs.clone())
27//!     .spawn();
28//!
29//! // this data is now globally available using the ticket
30//! let ticket = blobs.ticket(t).await?;
31//! println!("ticket: {}", ticket);
32//!
33//! // wait for control-c to exit
34//! tokio::signal::ctrl_c().await?;
35//! #   Ok(())
36//! # }
37//! ```
38
39use std::{fmt::Debug, future::Future, sync::Arc};
40
41use iroh::{
42    endpoint::Connection,
43    protocol::{AcceptError, ProtocolHandler},
44    Endpoint, Watcher,
45};
46use tokio::sync::mpsc;
47use tracing::error;
48
49use crate::{
50    api::Store,
51    provider::{Event, EventSender},
52    ticket::BlobTicket,
53    HashAndFormat,
54};
55
56#[derive(Debug)]
57pub(crate) struct BlobsInner {
58    pub(crate) store: Store,
59    pub(crate) endpoint: Endpoint,
60    pub(crate) events: EventSender,
61}
62
63/// A protocol handler for the blobs protocol.
64#[derive(Debug, Clone)]
65pub struct Blobs {
66    pub(crate) inner: Arc<BlobsInner>,
67}
68
69impl Blobs {
70    pub fn new(store: &Store, endpoint: Endpoint, events: Option<mpsc::Sender<Event>>) -> Self {
71        Self {
72            inner: Arc::new(BlobsInner {
73                store: store.clone(),
74                endpoint,
75                events: EventSender::new(events),
76            }),
77        }
78    }
79
80    pub fn store(&self) -> &Store {
81        &self.inner.store
82    }
83
84    pub fn endpoint(&self) -> &Endpoint {
85        &self.inner.endpoint
86    }
87
88    /// Create a ticket for content on this node.
89    ///
90    /// Note that this does not check whether the content is partially or fully available. It is
91    /// just a convenience method to create a ticket from content and the address of this node.
92    pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
93        let content = content.into();
94        let addr = self.inner.endpoint.node_addr().initialized().await?;
95        let ticket = BlobTicket::new(addr, content.hash, content.format);
96        Ok(ticket)
97    }
98}
99
100impl ProtocolHandler for Blobs {
101    fn accept(
102        &self,
103        conn: Connection,
104    ) -> impl Future<Output = std::result::Result<(), AcceptError>> + Send {
105        let store = self.store().clone();
106        let events = self.inner.events.clone();
107
108        Box::pin(async move {
109            crate::provider::handle_connection(conn, store, events).await;
110            Ok(())
111        })
112    }
113
114    fn shutdown(&self) -> impl Future<Output = ()> + Send {
115        let store = self.store().clone();
116        Box::pin(async move {
117            if let Err(cause) = store.shutdown().await {
118                error!("error shutting down store: {:?}", cause);
119            }
120        })
121    }
122}