fedimint_api_client/api/
iroh.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::util::{FmtCompact as _, SafeUrl};
14use fedimint_logging::LOG_NET_IROH;
15use futures::Future;
16use futures::stream::{FuturesUnordered, StreamExt};
17use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
18use iroh::endpoint::Connection;
19use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
20use iroh_base::ticket::NodeTicket;
21use serde_json::Value;
22use tracing::{debug, trace, warn};
23use url::Url;
24
25use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
26
27#[derive(Debug, Clone)]
28pub struct IrohConnector {
29    node_ids: BTreeMap<PeerId, NodeId>,
30    endpoint_stable: Endpoint,
31    endpoint_next: iroh_next::Endpoint,
32
33    /// List of overrides to use when attempting to connect to given
34    /// `NodeId`
35    ///
36    /// This is useful for testing, or forcing non-default network
37    /// connectivity.
38    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
39}
40
41impl IrohConnector {
42    pub async fn new(
43        peers: BTreeMap<PeerId, SafeUrl>,
44        iroh_dns: Option<SafeUrl>,
45    ) -> anyhow::Result<Self> {
46        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
47        warn!(target: LOG_NET_IROH, "Iroh support is experimental");
48        let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
49
50        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
51            s = s.with_connection_override(k, v.into());
52        }
53
54        Ok(s)
55    }
56
57    pub async fn new_no_overrides(
58        peers: BTreeMap<PeerId, SafeUrl>,
59        iroh_dns: Option<SafeUrl>,
60    ) -> anyhow::Result<Self> {
61        let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
62            || {
63                FM_DNS_PKARR_RELAY_PROD
64                    .into_iter()
65                    .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
66                    .collect()
67            },
68            |url| vec![url.to_unsafe()],
69        );
70        let node_ids = peers
71            .into_iter()
72            .map(|(peer, url)| {
73                let host = url.host_str().context("Url is missing host")?;
74
75                let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
76
77                Ok((peer, node_id))
78            })
79            .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
80
81        let endpoint_stable = {
82            let mut builder = Endpoint::builder();
83
84            for iroh_dns in iroh_dns_servers {
85                builder = builder
86                    .add_discovery({
87                        let iroh_dns = iroh_dns.clone();
88                        move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
89                    })
90                    .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
91            }
92
93            #[cfg(not(target_family = "wasm"))]
94            let builder = builder.discovery_dht();
95            let endpoint = builder.bind().await?;
96            debug!(
97                target: LOG_NET_IROH,
98                node_id = %endpoint.node_id(),
99                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
100                "Iroh api client endpoint (stable)"
101            );
102            endpoint
103        };
104        let endpoint_next = {
105            let builder = iroh_next::Endpoint::builder().discovery_n0();
106            #[cfg(not(target_family = "wasm"))]
107            let builder = builder.discovery_dht();
108            let endpoint = builder.bind().await?;
109            debug!(
110                target: LOG_NET_IROH,
111                node_id = %endpoint.node_id(),
112                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
113                "Iroh api client endpoint (next)"
114            );
115            endpoint
116        };
117
118        Ok(Self {
119            node_ids,
120            endpoint_stable,
121            endpoint_next,
122            connection_overrides: BTreeMap::new(),
123        })
124    }
125
126    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
127        self.connection_overrides.insert(node, addr);
128        self
129    }
130}
131
132#[async_trait]
133impl IClientConnector for IrohConnector {
134    fn peers(&self) -> BTreeSet<PeerId> {
135        self.node_ids.keys().copied().collect()
136    }
137
138    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
139        let node_id = *self
140            .node_ids
141            .get(&peer_id)
142            .ok_or(PeerError::InvalidPeerId { peer_id })?;
143
144        let mut futures = FuturesUnordered::<
145            Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
146        >::new();
147        let connection_override = self.connection_overrides.get(&node_id).cloned();
148        let endpoint_stable = self.endpoint_stable.clone();
149        let endpoint_next = self.endpoint_next.clone();
150
151        futures.push(Box::pin({
152            let connection_override = connection_override.clone();
153            async move {
154                match connection_override {
155                    Some(node_addr) => {
156                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
157                        endpoint_stable
158                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
159                            .await
160                    }
161                    None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
162                }.map_err(PeerError::Connection)
163                .map(super::IClientConnection::into_dyn)
164            }
165        }));
166
167        futures.push(Box::pin(async move {
168            match connection_override {
169                Some(node_addr) => {
170                    trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
171                    endpoint_next
172                        .connect(node_addr_stable_to_next(&node_addr), FEDIMINT_API_ALPN)
173                        .await
174                }
175                None => endpoint_next.connect(
176                        iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
177                        FEDIMINT_API_ALPN
178                    ).await,
179                }
180                .map_err(Into::into)
181                .map_err(PeerError::Connection)
182                .map(super::IClientConnection::into_dyn)
183        }));
184
185        // Remember last error, so we have something to return if
186        // neither connection works.
187        let mut prev_err = None;
188
189        // Loop until first success, or running out of connections.
190        while let Some(result) = futures.next().await {
191            match result {
192                Ok(connection) => return Ok(connection),
193                Err(err) => {
194                    warn!(
195                        target: LOG_NET_IROH,
196                        err = %err.fmt_compact(),
197                        "Join error in iroh connection task"
198                    );
199                    prev_err = Some(err);
200                }
201            }
202        }
203
204        Err(prev_err.unwrap_or_else(|| {
205            PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
206        }))
207    }
208}
209
210fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
211    iroh_next::NodeAddr {
212        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
213        relay_url: stable
214            .relay_url
215            .as_ref()
216            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
217        direct_addresses: stable.direct_addresses.clone(),
218    }
219}
220#[async_trait]
221impl IClientConnection for Connection {
222    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
223        let json = serde_json::to_vec(&IrohApiRequest { method, request })
224            .expect("Serialization to vec can't fail");
225
226        let (mut sink, mut stream) = self
227            .open_bi()
228            .await
229            .map_err(|e| PeerError::Transport(e.into()))?;
230
231        sink.write_all(&json)
232            .await
233            .map_err(|e| PeerError::Transport(e.into()))?;
234
235        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
236
237        let response = stream
238            .read_to_end(1_000_000)
239            .await
240            .map_err(|e| PeerError::Transport(e.into()))?;
241
242        // TODO: We should not be serializing Results on the wire
243        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
244            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
245
246        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
247    }
248
249    async fn await_disconnection(&self) {
250        self.closed().await;
251    }
252}
253
254#[async_trait]
255impl IClientConnection for iroh_next::endpoint::Connection {
256    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
257        let json = serde_json::to_vec(&IrohApiRequest { method, request })
258            .expect("Serialization to vec can't fail");
259
260        let (mut sink, mut stream) = self
261            .open_bi()
262            .await
263            .map_err(|e| PeerError::Transport(e.into()))?;
264
265        sink.write_all(&json)
266            .await
267            .map_err(|e| PeerError::Transport(e.into()))?;
268
269        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
270
271        let response = stream
272            .read_to_end(1_000_000)
273            .await
274            .map_err(|e| PeerError::Transport(e.into()))?;
275
276        // TODO: We should not be serializing Results on the wire
277        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
278            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
279
280        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
281    }
282
283    async fn await_disconnection(&self) {
284        self.closed().await;
285    }
286}