fedimint_api_client/api/
iroh.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap};
2use std::pin::Pin;
3use std::str::FromStr;
4use std::sync::Arc;
5
6use anyhow::Context;
7use async_trait::async_trait;
8use fedimint_core::PeerId;
9use fedimint_core::envs::parse_kv_list_from_env;
10use fedimint_core::iroh_prod::FM_IROH_DNS_FEDIMINT_PROD;
11use fedimint_core::module::{
12    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
13};
14use fedimint_core::task::spawn;
15use fedimint_core::util::{FmtCompact as _, SafeUrl};
16use fedimint_logging::LOG_NET_IROH;
17use futures::Future;
18use futures::stream::{FuturesUnordered, StreamExt};
19use iroh::discovery::pkarr::PkarrResolver;
20use iroh::endpoint::Connection;
21use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
22use iroh_base::ticket::NodeTicket;
23use iroh_next::Watcher as _;
24use serde_json::Value;
25use tokio::sync::OnceCell;
26use tracing::{debug, trace, warn};
27use url::Url;
28
29use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
30
31#[derive(Debug, Clone)]
32pub struct IrohConnector {
33    node_ids: BTreeMap<PeerId, NodeId>,
34    endpoint_stable: Endpoint,
35    endpoint_next: Option<iroh_next::Endpoint>,
36
37    /// List of overrides to use when attempting to connect to given
38    /// `NodeId`
39    ///
40    /// This is useful for testing, or forcing non-default network
41    /// connectivity.
42    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
43
44    /// Connection pool for stable endpoint connections
45    connections_stable: Arc<tokio::sync::Mutex<HashMap<NodeId, Arc<OnceCell<Connection>>>>>,
46
47    /// Connection pool for next endpoint connections  
48    connections_next: Arc<
49        tokio::sync::Mutex<
50            HashMap<iroh_next::NodeId, Arc<OnceCell<iroh_next::endpoint::Connection>>>,
51        >,
52    >,
53}
54
55impl IrohConnector {
56    #[cfg(not(target_family = "wasm"))]
57    fn spawn_connection_monitoring_stable(endpoint: &Endpoint, node_id: NodeId) {
58        if let Ok(mut conn_type_watcher) = endpoint.conn_type(node_id) {
59            #[allow(clippy::let_underscore_future)]
60            let _ = spawn("iroh connection (stable)", async move {
61                if let Ok(conn_type) = conn_type_watcher.get() {
62                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
63                }
64                while let Ok(event) = conn_type_watcher.updated().await {
65                    debug!(target: LOG_NET_IROH, %node_id, type = %event, "Connection type (changed)");
66                }
67            });
68        }
69    }
70
71    #[cfg(not(target_family = "wasm"))]
72    fn spawn_connection_monitoring_next(
73        endpoint: &iroh_next::Endpoint,
74        node_addr: &iroh_next::NodeAddr,
75    ) {
76        if let Some(mut conn_type_watcher) = endpoint.conn_type(node_addr.node_id) {
77            let node_id = node_addr.node_id;
78            #[allow(clippy::let_underscore_future)]
79            let _ = spawn("iroh connection (next)", async move {
80                if let Ok(conn_type) = conn_type_watcher.get() {
81                    debug!(target: LOG_NET_IROH, %node_id, type = %conn_type, "Connection type (initial)");
82                }
83                while let Ok(event) = conn_type_watcher.updated().await {
84                    debug!(target: LOG_NET_IROH, node_id = %node_id, %event, "Connection type changed");
85                }
86            });
87        }
88    }
89
90    pub async fn new(
91        peers: BTreeMap<PeerId, SafeUrl>,
92        iroh_dns: Option<SafeUrl>,
93        iroh_enable_dht: bool,
94        iroh_enable_next: bool,
95    ) -> anyhow::Result<Self> {
96        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
97        warn!(target: LOG_NET_IROH, "Iroh support is experimental");
98        let mut s =
99            Self::new_no_overrides(peers, iroh_dns, iroh_enable_dht, iroh_enable_next).await?;
100
101        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
102            s = s.with_connection_override(k, v.into());
103        }
104
105        Ok(s)
106    }
107
108    pub async fn new_no_overrides(
109        peers: BTreeMap<PeerId, SafeUrl>,
110        iroh_dns: Option<SafeUrl>,
111        iroh_enable_dht: bool,
112        iroh_enable_next: bool,
113    ) -> anyhow::Result<Self> {
114        let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
115            || {
116                FM_IROH_DNS_FEDIMINT_PROD
117                    .into_iter()
118                    .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
119                    .collect()
120            },
121            |url| vec![url.to_unsafe()],
122        );
123        let node_ids = peers
124            .into_iter()
125            .map(|(peer, url)| {
126                let host = url.host_str().context("Url is missing host")?;
127
128                let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
129
130                Ok((peer, node_id))
131            })
132            .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
133
134        let endpoint_stable = Box::pin({
135            let iroh_dns_servers = iroh_dns_servers.clone();
136            async {
137                let mut builder = Endpoint::builder();
138
139                for iroh_dns in iroh_dns_servers {
140                    builder = builder.add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
141                }
142
143                // As a client, we don't need to register on any relays
144                let mut builder = builder.relay_mode(iroh::RelayMode::Disabled);
145
146                #[cfg(not(target_family = "wasm"))]
147                if iroh_enable_dht {
148                    builder = builder.discovery_dht();
149                }
150
151                // instead of `.discovery_n0`, which brings publisher we don't want
152                {
153                    #[cfg(target_family = "wasm")]
154                    {
155                        builder = builder.add_discovery(move |_| Some(PkarrResolver::n0_dns()));
156                    }
157
158                    #[cfg(not(target_family = "wasm"))]
159                    {
160                        builder = builder.add_discovery(move |_| {
161                            Some(iroh::discovery::dns::DnsDiscovery::n0_dns())
162                        });
163                    }
164                }
165
166                let endpoint = builder.bind().await?;
167                debug!(
168                    target: LOG_NET_IROH,
169                    node_id = %endpoint.node_id(),
170                    node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
171                    "Iroh api client endpoint (stable)"
172                );
173                Ok::<_, anyhow::Error>(endpoint)
174            }
175        });
176        let endpoint_next = Box::pin(async {
177            let mut builder = iroh_next::Endpoint::builder();
178
179            for iroh_dns in iroh_dns_servers {
180                builder = builder.add_discovery(
181                    iroh_next::discovery::pkarr::PkarrResolver::builder(iroh_dns).build(),
182                );
183            }
184
185            // As a client, we don't need to register on any relays
186            let mut builder = builder.relay_mode(iroh_next::RelayMode::Disabled);
187
188            #[cfg(not(target_family = "wasm"))]
189            if iroh_enable_dht {
190                builder = builder.discovery_dht();
191            }
192
193            // instead of `.discovery_n0`, which brings publisher we don't want
194            {
195                // Resolve using HTTPS requests to our DNS server's /pkarr path in browsers
196                #[cfg(target_family = "wasm")]
197                {
198                    builder =
199                        builder.add_discovery(iroh_next::discovery::pkarr::PkarrResolver::n0_dns());
200                }
201                // Resolve using DNS queries outside browsers.
202                #[cfg(not(target_family = "wasm"))]
203                {
204                    builder =
205                        builder.add_discovery(iroh_next::discovery::dns::DnsDiscovery::n0_dns());
206                }
207            }
208
209            let endpoint = builder.bind().await?;
210            debug!(
211                target: LOG_NET_IROH,
212                node_id = %endpoint.node_id(),
213                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
214                "Iroh api client endpoint (next)"
215            );
216            Ok(endpoint)
217        });
218
219        let (endpoint_stable, endpoint_next) = if iroh_enable_next {
220            let (s, n) = tokio::try_join!(endpoint_stable, endpoint_next)?;
221            (s, Some(n))
222        } else {
223            (endpoint_stable.await?, None)
224        };
225
226        Ok(Self {
227            node_ids,
228            endpoint_stable,
229            endpoint_next,
230            connection_overrides: BTreeMap::new(),
231            connections_stable: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
232            connections_next: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
233        })
234    }
235
236    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
237        self.connection_overrides.insert(node, addr);
238        self
239    }
240
241    async fn get_or_create_connection_stable(
242        &self,
243        node_id: NodeId,
244        node_addr: Option<NodeAddr>,
245    ) -> PeerResult<Connection> {
246        let mut pool_lock = self.connections_stable.lock().await;
247
248        let entry_arc = pool_lock
249            .entry(node_id)
250            .and_modify(|entry_arc| {
251                // Check if existing connection is disconnected and remove it
252                if let Some(existing_conn) = entry_arc.get()
253                    && existing_conn.close_reason().is_some() {
254                        trace!(target: LOG_NET_IROH, %node_id, "Existing stable connection is disconnected, removing from pool");
255                        *entry_arc = Arc::new(OnceCell::new());
256                    }
257            })
258            .or_insert_with(|| Arc::new(OnceCell::new()))
259            .clone();
260
261        // Drop the pool lock so other connections can work in parallel
262        drop(pool_lock);
263
264        let conn = entry_arc
265            .get_or_try_init(|| async {
266                trace!(target: LOG_NET_IROH, %node_id, "Creating new stable connection");
267                let conn = match node_addr.clone() {
268                    Some(node_addr) => {
269                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
270                        let conn = self.endpoint_stable
271                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
272                            .await;
273
274                        #[cfg(not(target_family = "wasm"))]
275                        if conn.is_ok() {
276                            Self::spawn_connection_monitoring_stable(&self.endpoint_stable, node_id);
277                        }
278                        conn
279                    }
280                    None => self.endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
281                }.map_err(PeerError::Connection)?;
282
283                Ok(conn)
284            })
285            .await?;
286
287        trace!(target: LOG_NET_IROH, %node_id, "Using stable connection");
288        Ok(conn.clone())
289    }
290
291    async fn get_or_create_connection_next(
292        &self,
293        endpoint_next: &iroh_next::Endpoint,
294        node_id: NodeId,
295        node_addr: Option<NodeAddr>,
296    ) -> PeerResult<iroh_next::endpoint::Connection> {
297        let next_node_id = iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail");
298
299        let mut pool_lock = self.connections_next.lock().await;
300
301        let entry_arc = pool_lock
302            .entry(next_node_id)
303            .and_modify(|entry_arc| {
304                // Check if existing connection is disconnected and remove it
305                if let Some(existing_conn) = entry_arc.get()
306                    && existing_conn.close_reason().is_some() {
307                        trace!(target: LOG_NET_IROH, %node_id, "Existing next connection is disconnected, removing from pool");
308                        *entry_arc = Arc::new(OnceCell::new());
309                    }
310            })
311            .or_insert_with(|| Arc::new(OnceCell::new()))
312            .clone();
313
314        // Drop the pool lock so other connections can work in parallel
315        drop(pool_lock);
316
317        let endpoint_next = endpoint_next.clone();
318        let conn = entry_arc
319            .get_or_try_init(|| async move {
320                trace!(target: LOG_NET_IROH, %node_id, "Creating new next connection");
321                let conn = match node_addr.clone() {
322                    Some(node_addr) => {
323                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
324                        let node_addr = node_addr_stable_to_next(&node_addr);
325                        let conn = endpoint_next
326                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
327                            .await;
328
329                        #[cfg(not(target_family = "wasm"))]
330                        if conn.is_ok() {
331                            Self::spawn_connection_monitoring_next(&endpoint_next, &node_addr);
332                        }
333
334                        conn
335                    }
336                    None => endpoint_next.connect(
337                        next_node_id,
338                        FEDIMINT_API_ALPN
339                    ).await,
340                }
341                .map_err(Into::into)
342                .map_err(PeerError::Connection)?;
343
344                Ok(conn)
345            })
346            .await?;
347
348        trace!(target: LOG_NET_IROH, %node_id, "Using next connection");
349        Ok(conn.clone())
350    }
351}
352
353#[async_trait]
354impl IClientConnector for IrohConnector {
355    fn peers(&self) -> BTreeSet<PeerId> {
356        self.node_ids.keys().copied().collect()
357    }
358
359    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
360        let node_id = *self
361            .node_ids
362            .get(&peer_id)
363            .ok_or(PeerError::InvalidPeerId { peer_id })?;
364
365        let mut futures = FuturesUnordered::<
366            Pin<Box<dyn Future<Output = (PeerResult<DynClientConnection>, &'static str)> + Send>>,
367        >::new();
368        let connection_override = self.connection_overrides.get(&node_id).cloned();
369
370        // Use connection pool for stable endpoint
371        let self_clone = self.clone();
372        futures.push(Box::pin({
373            let connection_override = connection_override.clone();
374            async move {
375                (
376                    self_clone
377                        .get_or_create_connection_stable(node_id, connection_override)
378                        .await
379                        .map(super::IClientConnection::into_dyn),
380                    "stable",
381                )
382            }
383        }));
384
385        // Use connection pool for next endpoint if available
386        if let Some(endpoint_next) = &self.endpoint_next {
387            let self_clone = self.clone();
388            let endpoint_next = endpoint_next.clone();
389            futures.push(Box::pin(async move {
390                (
391                    self_clone
392                        .get_or_create_connection_next(&endpoint_next, node_id, connection_override)
393                        .await
394                        .map(super::IClientConnection::into_dyn),
395                    "next",
396                )
397            }));
398        }
399
400        // Remember last error, so we have something to return if
401        // neither connection works.
402        let mut prev_err = None;
403
404        // Loop until first success, or running out of connections.
405        while let Some((result, iroh_stack)) = futures.next().await {
406            match result {
407                Ok(connection) => return Ok(connection),
408                Err(err) => {
409                    warn!(
410                        target: LOG_NET_IROH,
411                        err = %err.fmt_compact(),
412                        %iroh_stack,
413                        "Join error in iroh connection task"
414                    );
415                    prev_err = Some(err);
416                }
417            }
418        }
419
420        Err(prev_err.unwrap_or_else(|| {
421            PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
422        }))
423    }
424}
425
426fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
427    iroh_next::NodeAddr {
428        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
429        relay_url: stable
430            .relay_url
431            .as_ref()
432            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
433        direct_addresses: stable.direct_addresses.clone(),
434    }
435}
436#[async_trait]
437impl IClientConnection for Connection {
438    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
439        let json = serde_json::to_vec(&IrohApiRequest { method, request })
440            .expect("Serialization to vec can't fail");
441
442        let (mut sink, mut stream) = self
443            .open_bi()
444            .await
445            .map_err(|e| PeerError::Transport(e.into()))?;
446
447        sink.write_all(&json)
448            .await
449            .map_err(|e| PeerError::Transport(e.into()))?;
450
451        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
452
453        let response = stream
454            .read_to_end(1_000_000)
455            .await
456            .map_err(|e| PeerError::Transport(e.into()))?;
457
458        // TODO: We should not be serializing Results on the wire
459        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
460            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
461
462        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
463    }
464
465    async fn await_disconnection(&self) {
466        self.closed().await;
467    }
468}
469
470#[async_trait]
471impl IClientConnection for iroh_next::endpoint::Connection {
472    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
473        let json = serde_json::to_vec(&IrohApiRequest { method, request })
474            .expect("Serialization to vec can't fail");
475
476        let (mut sink, mut stream) = self
477            .open_bi()
478            .await
479            .map_err(|e| PeerError::Transport(e.into()))?;
480
481        sink.write_all(&json)
482            .await
483            .map_err(|e| PeerError::Transport(e.into()))?;
484
485        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
486
487        let response = stream
488            .read_to_end(1_000_000)
489            .await
490            .map_err(|e| PeerError::Transport(e.into()))?;
491
492        // TODO: We should not be serializing Results on the wire
493        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
494            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
495
496        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
497    }
498
499    async fn await_disconnection(&self) {
500        self.closed().await;
501    }
502}