1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::pin::Pin;
3use std::str::FromStr;
4use std::sync::Arc;
5
6use anyhow::Context;
7use async_trait::async_trait;
8use fedimint_core::PeerId;
9use fedimint_core::envs::parse_kv_list_from_env;
10use fedimint_core::iroh_prod::FM_IROH_DNS_FEDIMINT_PROD;
11use fedimint_core::module::{
12 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
13};
14use fedimint_core::task::spawn;
15use fedimint_core::util::{FmtCompact as _, SafeUrl};
16use fedimint_logging::LOG_NET_IROH;
17use futures::Future;
18use futures::stream::{FuturesUnordered, StreamExt};
19use iroh::discovery::pkarr::PkarrResolver;
20use iroh::endpoint::Connection;
21use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
22use iroh_base::ticket::NodeTicket;
23use iroh_next::Watcher as _;
24use serde_json::Value;
25use tokio::sync::OnceCell;
26use tracing::{debug, trace, warn};
27use url::Url;
28
29use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
30
31#[derive(Debug, Clone)]
32pub struct IrohConnector {
33 node_ids: BTreeMap<PeerId, NodeId>,
34 endpoint_stable: Endpoint,
35 endpoint_next: Option<iroh_next::Endpoint>,
36
37 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
43
44 connections_stable: Arc<tokio::sync::Mutex<HashMap<NodeId, Arc<OnceCell<Connection>>>>>,
46
47 connections_next: Arc<
49 tokio::sync::Mutex<
50 HashMap<iroh_next::NodeId, Arc<OnceCell<iroh_next::endpoint::Connection>>>,
51 >,
52 >,
53}
54
55impl IrohConnector {
56 #[cfg(not(target_family = "wasm"))]
57 fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
58 if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
59 #[allow(clippy::let_underscore_future)]
60 let _ = spawn("iroh connection (stable)", async move {
61 if let Ok(conn_type) = conn_type_watcher.get() {
62 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
63 }
64 while let Ok(event) = conn_type_watcher.updated().await {
65 debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
66 }
67 });
68 }
69 }
70
71 #[cfg(not(target_family = "wasm"))]
72 fn spawn_connection_monitoring_next(
73 endpoint: &iroh_next::Endpoint,
74 node_addr: &iroh_next::NodeAddr,
75 ) {
76 if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
77 let node_id = node_addr.node_id;
78 #[allow(clippy::let_underscore_future)]
79 let _ = spawn("iroh connection (next)", async move {
80 if let Ok(conn_type) = conn_type_watcher.get() {
81 debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
82 }
83 while let Ok(event) = conn_type_watcher.updated().await {
84 debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
85 }
86 });
87 }
88 }
89
90 pub async fn new(
91 peers: BTreeMap<PeerId, SafeUrl>,
92 iroh_dns: Option<SafeUrl>,
93 iroh_enable_dht: bool,
94 iroh_enable_next: bool,
95 ) -> anyhow::Result<Self> {
96 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
97 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
98 let mut s =
99 Self::new_no_overrides(peers, iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
100
101 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
102 s = s.with_connection_override(k, v.into());
103 }
104
105 Ok(s)
106 }
107
108 pub async fn new_no_overrides(
109 peers: BTreeMap<PeerId, SafeUrl>,
110 iroh_dns: Option<SafeUrl>,
111 iroh_enable_dht: bool,
112 iroh_enable_next: bool,
113 ) -> anyhow::Result<Self> {
114 let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
115 || {
116 FM_IROH_DNS_FEDIMINT_PROD
117 .into_iter()
118 .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
119 .collect()
120 },
121 |url| vec![url.to_unsafe()],
122 );
123 let node_ids = peers
124 .into_iter()
125 .map(|(peer, url)| {
126 let host = url.host_str().context("Url is missing host")?;
127
128 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
129
130 Ok((peer, node_id))
131 })
132 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
133
134 let endpoint_stable = Box::pin({
135 let iroh_dns_servers = iroh_dns_servers.clone();
136 async {
137 let mut builder = Endpoint::builder();
138
139 for iroh_dns in iroh_dns_servers {
140 builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
141 }
142
143 let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
145
146 #[cfg(not(target_family = "wasm"))]
147 if iroh_enable_dht {
148 builder = builder.discovery_dht();
149 }
150
151 {
153 #[cfg(target_family = "wasm")]
154 {
155 builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
156 }
157
158 #[cfg(not(target_family = "wasm"))]
159 {
160 builder = builder.add_discovery(move |_| {
161 Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
162 });
163 }
164 }
165
166 let endpoint = builder.bind().await?;
167 debug!(
168 target: LOG_NET_IROH,
169 node_id = %endpoint.node_id(),
170 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
171 "Iroh api client endpoint (stable)"
172 );
173 Ok::<_, anyhow::Error>(endpoint)
174 }
175 });
176 let endpoint_next = Box::pin(async {
177 let mut builder = iroh_next::Endpoint::builder();
178
179 for iroh_dns in iroh_dns_servers {
180 builder = builder.add_discovery(
181 iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
182 );
183 }
184
185 let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
187
188 #[cfg(not(target_family = "wasm"))]
189 if iroh_enable_dht {
190 builder = builder.discovery_dht();
191 }
192
193 {
195 #[cfg(target_family = "wasm")]
197 {
198 builder =
199 builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
200 }
201 #[cfg(not(target_family = "wasm"))]
203 {
204 builder =
205 builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
206 }
207 }
208
209 let endpoint = builder.bind().await?;
210 debug!(
211 target: LOG_NET_IROH,
212 node_id = %endpoint.node_id(),
213 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
214 "Iroh api client endpoint (next)"
215 );
216 Ok(endpoint)
217 });
218
219 let (endpoint_stable, endpoint_next) = if iroh_enable_next {
220 let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
221 (s, Some(n))
222 } else {
223 (endpoint_stable.await?, None)
224 };
225
226 Ok(Self {
227 node_ids,
228 endpoint_stable,
229 endpoint_next,
230 connection_overrides: BTreeMap::new(),
231 connections_stable: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
232 connections_next: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
233 })
234 }
235
236 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
237 self.connection_overrides.insert(node, addr);
238 self
239 }
240
241 async fn get_or_create_connection_stable(
242 &self,
243 node_id: NodeId,
244 node_addr: Option<NodeAddr>,
245 ) -> PeerResult<Connection> {
246 let mut pool_lock = self.connections_stable.lock().await;
247
248 let entry_arc = pool_lock
249 .entry(node_id)
250 .and_modify(|entry_arc| {
251 if let Some(existing_conn) = entry_arc.get()
253 && existing_conn.close_reason().is_some() {
254 trace!(target: LOG_NET_IROH, %node_id, "Existing stable connection is disconnected, removing from pool");
255 *entry_arc = Arc::new(OnceCell::new());
256 }
257 })
258 .or_insert_with(|| Arc::new(OnceCell::new()))
259 .clone();
260
261 drop(pool_lock);
263
264 let conn = entry_arc
265 .get_or_try_init(|| async {
266 trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
267 let conn = match node_addr.clone() {
268 Some(node_addr) => {
269 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
270 let conn = self.endpoint_stable
271 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
272 .await;
273
274 #[cfg(not(target_family = "wasm"))]
275 if conn.is_ok() {
276 Self::spawn_connection_monitoring_stable(&self.endpoint_stable, node_id);
277 }
278 conn
279 }
280 None => self.endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
281 }.map_err(PeerError::Connection)?;
282
283 Ok(conn)
284 })
285 .await?;
286
287 trace!(target: LOG_NET_IROH, %node_id, "Using stable connection");
288 Ok(conn.clone())
289 }
290
291 async fn get_or_create_connection_next(
292 &self,
293 endpoint_next: &iroh_next::Endpoint,
294 node_id: NodeId,
295 node_addr: Option<NodeAddr>,
296 ) -> PeerResult<iroh_next::endpoint::Connection> {
297 let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
298
299 let mut pool_lock = self.connections_next.lock().await;
300
301 let entry_arc = pool_lock
302 .entry(next_node_id)
303 .and_modify(|entry_arc| {
304 if let Some(existing_conn) = entry_arc.get()
306 && existing_conn.close_reason().is_some() {
307 trace!(target: LOG_NET_IROH, %node_id, "Existing next connection is disconnected, removing from pool");
308 *entry_arc = Arc::new(OnceCell::new());
309 }
310 })
311 .or_insert_with(|| Arc::new(OnceCell::new()))
312 .clone();
313
314 drop(pool_lock);
316
317 let endpoint_next = endpoint_next.clone();
318 let conn = entry_arc
319 .get_or_try_init(|| async move {
320 trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
321 let conn = match node_addr.clone() {
322 Some(node_addr) => {
323 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
324 let node_addr = node_addr_stable_to_next(&node_addr);
325 let conn = endpoint_next
326 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
327 .await;
328
329 #[cfg(not(target_family = "wasm"))]
330 if conn.is_ok() {
331 Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
332 }
333
334 conn
335 }
336 None => endpoint_next.connect(
337 next_node_id,
338 FEDIMINT_API_ALPN
339 ).await,
340 }
341 .map_err(Into::into)
342 .map_err(PeerError::Connection)?;
343
344 Ok(conn)
345 })
346 .await?;
347
348 trace!(target: LOG_NET_IROH, %node_id, "Using next connection");
349 Ok(conn.clone())
350 }
351}
352
353#[async_trait]
354impl IClientConnector for IrohConnector {
355 fn peers(&self) -> BTreeSet<PeerId> {
356 self.node_ids.keys().copied().collect()
357 }
358
359 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
360 let node_id = *self
361 .node_ids
362 .get(&peer_id)
363 .ok_or(PeerError::InvalidPeerId { peer_id })?;
364
365 let mut futures = FuturesUnordered::<
366 Pin<Box<dyn Future<Output = (PeerResult<DynClientConnection>, &'static str)> + Send>>,
367 >::new();
368 let connection_override = self.connection_overrides.get(&node_id).cloned();
369
370 let self_clone = self.clone();
372 futures.push(Box::pin({
373 let connection_override = connection_override.clone();
374 async move {
375 (
376 self_clone
377 .get_or_create_connection_stable(node_id, connection_override)
378 .await
379 .map(super::IClientConnection::into_dyn),
380 "stable",
381 )
382 }
383 }));
384
385 if let Some(endpoint_next) = &self.endpoint_next {
387 let self_clone = self.clone();
388 let endpoint_next = endpoint_next.clone();
389 futures.push(Box::pin(async move {
390 (
391 self_clone
392 .get_or_create_connection_next(&endpoint_next, node_id, connection_override)
393 .await
394 .map(super::IClientConnection::into_dyn),
395 "next",
396 )
397 }));
398 }
399
400 let mut prev_err = None;
403
404 while let Some((result, iroh_stack)) = futures.next().await {
406 match result {
407 Ok(connection) => return Ok(connection),
408 Err(err) => {
409 warn!(
410 target: LOG_NET_IROH,
411 err = %err.fmt_compact(),
412 %iroh_stack,
413 "Join error in iroh connection task"
414 );
415 prev_err = Some(err);
416 }
417 }
418 }
419
420 Err(prev_err.unwrap_or_else(|| {
421 PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
422 }))
423 }
424}
425
426fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
427 iroh_next::NodeAddr {
428 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
429 relay_url: stable
430 .relay_url
431 .as_ref()
432 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
433 direct_addresses: stable.direct_addresses.clone(),
434 }
435}
436#[async_trait]
437impl IClientConnection for Connection {
438 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
439 let json = serde_json::to_vec(&IrohApiRequest { method, request })
440 .expect("Serialization to vec can't fail");
441
442 let (mut sink, mut stream) = self
443 .open_bi()
444 .await
445 .map_err(|e| PeerError::Transport(e.into()))?;
446
447 sink.write_all(&json)
448 .await
449 .map_err(|e| PeerError::Transport(e.into()))?;
450
451 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
452
453 let response = stream
454 .read_to_end(1_000_000)
455 .await
456 .map_err(|e| PeerError::Transport(e.into()))?;
457
458 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
460 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
461
462 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
463 }
464
465 async fn await_disconnection(&self) {
466 self.closed().await;
467 }
468}
469
470#[async_trait]
471impl IClientConnection for iroh_next::endpoint::Connection {
472 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
473 let json = serde_json::to_vec(&IrohApiRequest { method, request })
474 .expect("Serialization to vec can't fail");
475
476 let (mut sink, mut stream) = self
477 .open_bi()
478 .await
479 .map_err(|e| PeerError::Transport(e.into()))?;
480
481 sink.write_all(&json)
482 .await
483 .map_err(|e| PeerError::Transport(e.into()))?;
484
485 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
486
487 let response = stream
488 .read_to_end(1_000_000)
489 .await
490 .map_err(|e| PeerError::Transport(e.into()))?;
491
492 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
494 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
495
496 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
497 }
498
499 async fn await_disconnection(&self) {
500 self.closed().await;
501 }
502}