iroh_blobs/
net_protocol.rs1use std::{fmt::Debug, future::Future, sync::Arc};
40
41use iroh::{
42 endpoint::Connection,
43 protocol::{AcceptError, ProtocolHandler},
44 Endpoint, Watcher,
45};
46use tokio::sync::mpsc;
47use tracing::error;
48
49use crate::{
50 api::Store,
51 provider::{Event, EventSender},
52 ticket::BlobTicket,
53 HashAndFormat,
54};
55
56#[derive(Debug)]
57pub(crate) struct BlobsInner {
58 pub(crate) store: Store,
59 pub(crate) endpoint: Endpoint,
60 pub(crate) events: EventSender,
61}
62
63#[derive(Debug, Clone)]
65pub struct Blobs {
66 pub(crate) inner: Arc<BlobsInner>,
67}
68
69impl Blobs {
70 pub fn new(store: &Store, endpoint: Endpoint, events: Option<mpsc::Sender<Event>>) -> Self {
71 Self {
72 inner: Arc::new(BlobsInner {
73 store: store.clone(),
74 endpoint,
75 events: EventSender::new(events),
76 }),
77 }
78 }
79
80 pub fn store(&self) -> &Store {
81 &self.inner.store
82 }
83
84 pub fn endpoint(&self) -> &Endpoint {
85 &self.inner.endpoint
86 }
87
88 pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
93 let content = content.into();
94 let addr = self.inner.endpoint.node_addr().initialized().await?;
95 let ticket = BlobTicket::new(addr, content.hash, content.format);
96 Ok(ticket)
97 }
98}
99
100impl ProtocolHandler for Blobs {
101 fn accept(
102 &self,
103 conn: Connection,
104 ) -> impl Future<Output = std::result::Result<(), AcceptError>> + Send {
105 let store = self.store().clone();
106 let events = self.inner.events.clone();
107
108 Box::pin(async move {
109 crate::provider::handle_connection(conn, store, events).await;
110 Ok(())
111 })
112 }
113
114 fn shutdown(&self) -> impl Future<Output = ()> + Send {
115 let store = self.store().clone();
116 Box::pin(async move {
117 if let Err(cause) = store.shutdown().await {
118 error!("error shutting down store: {:?}", cause);
119 }
120 })
121 }
122}