fedimint_api_client/api/
iroh.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11    ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::util::{FmtCompact as _, SafeUrl};
14use fedimint_logging::LOG_NET_IROH;
15use futures::Future;
16use futures::stream::{FuturesUnordered, StreamExt};
17use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
18use iroh::endpoint::Connection;
19use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
20use iroh_base::ticket::NodeTicket;
21use serde_json::Value;
22use tracing::{debug, trace, warn};
23use url::Url;
24
25use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
26
27#[derive(Debug, Clone)]
28pub struct IrohConnector {
29    node_ids: BTreeMap<PeerId, NodeId>,
30    endpoint_stable: Endpoint,
31    endpoint_next: iroh_next::Endpoint,
32
33    /// List of overrides to use when attempting to connect to given
34    /// `NodeId`
35    ///
36    /// This is useful for testing, or forcing non-default network
37    /// connectivity.
38    pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
39}
40
41impl IrohConnector {
42    pub async fn new(
43        peers: BTreeMap<PeerId, SafeUrl>,
44        iroh_dns: Option<SafeUrl>,
45    ) -> anyhow::Result<Self> {
46        const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
47        warn!(target: LOG_NET_IROH, "Iroh support is experimental");
48        let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
49
50        for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
51            s = s.with_connection_override(k, v.into());
52        }
53
54        Ok(s)
55    }
56
57    pub async fn new_no_overrides(
58        peers: BTreeMap<PeerId, SafeUrl>,
59        iroh_dns: Option<SafeUrl>,
60    ) -> anyhow::Result<Self> {
61        let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
62            || {
63                FM_DNS_PKARR_RELAY_PROD
64                    .into_iter()
65                    .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
66                    .collect()
67            },
68            |url| vec![url.to_unsafe()],
69        );
70        let node_ids = peers
71            .into_iter()
72            .map(|(peer, url)| {
73                let host = url.host_str().context("Url is missing host")?;
74
75                let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
76
77                Ok((peer, node_id))
78            })
79            .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
80
81        let endpoint_stable = {
82            let mut builder = Endpoint::builder();
83
84            for iroh_dns in iroh_dns_servers {
85                builder = builder
86                    .add_discovery({
87                        let iroh_dns = iroh_dns.clone();
88                        move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
89                    })
90                    .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
91            }
92
93            #[cfg(not(target_family = "wasm"))]
94            let builder = builder.discovery_dht().discovery_n0();
95
96            let endpoint = builder.discovery_n0().bind().await?;
97            debug!(
98                target: LOG_NET_IROH,
99                node_id = %endpoint.node_id(),
100                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
101                "Iroh api client endpoint (stable)"
102            );
103            endpoint
104        };
105        let endpoint_next = {
106            let builder = iroh_next::Endpoint::builder().discovery_n0();
107            #[cfg(not(target_family = "wasm"))]
108            let builder = builder.discovery_dht();
109            let endpoint = builder.bind().await?;
110            debug!(
111                target: LOG_NET_IROH,
112                node_id = %endpoint.node_id(),
113                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
114                "Iroh api client endpoint (next)"
115            );
116            endpoint
117        };
118
119        Ok(Self {
120            node_ids,
121            endpoint_stable,
122            endpoint_next,
123            connection_overrides: BTreeMap::new(),
124        })
125    }
126
127    pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
128        self.connection_overrides.insert(node, addr);
129        self
130    }
131}
132
133#[async_trait]
134impl IClientConnector for IrohConnector {
135    fn peers(&self) -> BTreeSet<PeerId> {
136        self.node_ids.keys().copied().collect()
137    }
138
139    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
140        let node_id = *self
141            .node_ids
142            .get(&peer_id)
143            .ok_or(PeerError::InvalidPeerId { peer_id })?;
144
145        let mut futures = FuturesUnordered::<
146            Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
147        >::new();
148        let connection_override = self.connection_overrides.get(&node_id).cloned();
149        let endpoint_stable = self.endpoint_stable.clone();
150        let endpoint_next = self.endpoint_next.clone();
151
152        futures.push(Box::pin({
153            let connection_override = connection_override.clone();
154            async move {
155                match connection_override {
156                    Some(node_addr) => {
157                        trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
158                        endpoint_stable
159                            .connect(node_addr.clone(), FEDIMINT_API_ALPN)
160                            .await
161                    }
162                    None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
163                }.map_err(PeerError::Connection)
164                .map(super::IClientConnection::into_dyn)
165            }
166        }));
167
168        futures.push(Box::pin(async move {
169            match connection_override {
170                Some(node_addr) => {
171                    trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
172                    endpoint_next
173                        .connect(node_addr_stable_to_next(&node_addr), FEDIMINT_API_ALPN)
174                        .await
175                }
176                None => endpoint_next.connect(
177                        iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
178                        FEDIMINT_API_ALPN
179                    ).await,
180                }
181                .map_err(Into::into)
182                .map_err(PeerError::Connection)
183                .map(super::IClientConnection::into_dyn)
184        }));
185
186        // Remember last error, so we have something to return if
187        // neither connection works.
188        let mut prev_err = None;
189
190        // Loop until first success, or running out of connections.
191        while let Some(result) = futures.next().await {
192            match result {
193                Ok(connection) => return Ok(connection),
194                Err(err) => {
195                    warn!(
196                        target: LOG_NET_IROH,
197                        err = %err.fmt_compact(),
198                        "Join error in iroh connection task"
199                    );
200                    prev_err = Some(err);
201                }
202            }
203        }
204
205        Err(prev_err.unwrap_or_else(|| {
206            PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
207        }))
208    }
209}
210
211fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
212    iroh_next::NodeAddr {
213        node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
214        relay_url: stable
215            .relay_url
216            .as_ref()
217            .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
218        direct_addresses: stable.direct_addresses.clone(),
219    }
220}
221#[async_trait]
222impl IClientConnection for Connection {
223    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
224        let json = serde_json::to_vec(&IrohApiRequest { method, request })
225            .expect("Serialization to vec can't fail");
226
227        let (mut sink, mut stream) = self
228            .open_bi()
229            .await
230            .map_err(|e| PeerError::Transport(e.into()))?;
231
232        sink.write_all(&json)
233            .await
234            .map_err(|e| PeerError::Transport(e.into()))?;
235
236        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
237
238        let response = stream
239            .read_to_end(1_000_000)
240            .await
241            .map_err(|e| PeerError::Transport(e.into()))?;
242
243        // TODO: We should not be serializing Results on the wire
244        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
245            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
246
247        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
248    }
249
250    async fn await_disconnection(&self) {
251        self.closed().await;
252    }
253}
254
255#[async_trait]
256impl IClientConnection for iroh_next::endpoint::Connection {
257    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
258        let json = serde_json::to_vec(&IrohApiRequest { method, request })
259            .expect("Serialization to vec can't fail");
260
261        let (mut sink, mut stream) = self
262            .open_bi()
263            .await
264            .map_err(|e| PeerError::Transport(e.into()))?;
265
266        sink.write_all(&json)
267            .await
268            .map_err(|e| PeerError::Transport(e.into()))?;
269
270        sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
271
272        let response = stream
273            .read_to_end(1_000_000)
274            .await
275            .map_err(|e| PeerError::Transport(e.into()))?;
276
277        // TODO: We should not be serializing Results on the wire
278        let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
279            .map_err(|e| PeerError::InvalidResponse(e.into()))?;
280
281        response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
282    }
283
284    async fn await_disconnection(&self) {
285        self.closed().await;
286    }
287}