1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::util::{FmtCompact as _, SafeUrl};
14use fedimint_logging::LOG_NET_IROH;
15use futures::Future;
16use futures::stream::{FuturesUnordered, StreamExt};
17use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
18use iroh::endpoint::Connection;
19use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
20use iroh_base::ticket::NodeTicket;
21use serde_json::Value;
22use tracing::{debug, trace, warn};
23use url::Url;
24
25use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
26
27#[derive(Debug, Clone)]
28pub struct IrohConnector {
29 node_ids: BTreeMap<PeerId, NodeId>,
30 endpoint_stable: Endpoint,
31 endpoint_next: iroh_next::Endpoint,
32
33 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
39}
40
41impl IrohConnector {
42 pub async fn new(
43 peers: BTreeMap<PeerId, SafeUrl>,
44 iroh_dns: Option<SafeUrl>,
45 ) -> anyhow::Result<Self> {
46 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
47 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
48 let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
49
50 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
51 s = s.with_connection_override(k, v.into());
52 }
53
54 Ok(s)
55 }
56
57 pub async fn new_no_overrides(
58 peers: BTreeMap<PeerId, SafeUrl>,
59 iroh_dns: Option<SafeUrl>,
60 ) -> anyhow::Result<Self> {
61 let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
62 || {
63 FM_DNS_PKARR_RELAY_PROD
64 .into_iter()
65 .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
66 .collect()
67 },
68 |url| vec![url.to_unsafe()],
69 );
70 let node_ids = peers
71 .into_iter()
72 .map(|(peer, url)| {
73 let host = url.host_str().context("Url is missing host")?;
74
75 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
76
77 Ok((peer, node_id))
78 })
79 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
80
81 let endpoint_stable = {
82 let mut builder = Endpoint::builder();
83
84 for iroh_dns in iroh_dns_servers {
85 builder = builder
86 .add_discovery({
87 let iroh_dns = iroh_dns.clone();
88 move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
89 })
90 .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
91 }
92
93 #[cfg(not(target_family = "wasm"))]
94 let builder = builder.discovery_dht().discovery_n0();
95
96 let endpoint = builder.discovery_n0().bind().await?;
97 debug!(
98 target: LOG_NET_IROH,
99 node_id = %endpoint.node_id(),
100 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
101 "Iroh api client endpoint (stable)"
102 );
103 endpoint
104 };
105 let endpoint_next = {
106 let builder = iroh_next::Endpoint::builder().discovery_n0();
107 #[cfg(not(target_family = "wasm"))]
108 let builder = builder.discovery_dht();
109 let endpoint = builder.bind().await?;
110 debug!(
111 target: LOG_NET_IROH,
112 node_id = %endpoint.node_id(),
113 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
114 "Iroh api client endpoint (next)"
115 );
116 endpoint
117 };
118
119 Ok(Self {
120 node_ids,
121 endpoint_stable,
122 endpoint_next,
123 connection_overrides: BTreeMap::new(),
124 })
125 }
126
127 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
128 self.connection_overrides.insert(node, addr);
129 self
130 }
131}
132
133#[async_trait]
134impl IClientConnector for IrohConnector {
135 fn peers(&self) -> BTreeSet<PeerId> {
136 self.node_ids.keys().copied().collect()
137 }
138
139 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
140 let node_id = *self
141 .node_ids
142 .get(&peer_id)
143 .ok_or(PeerError::InvalidPeerId { peer_id })?;
144
145 let mut futures = FuturesUnordered::<
146 Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
147 >::new();
148 let connection_override = self.connection_overrides.get(&node_id).cloned();
149 let endpoint_stable = self.endpoint_stable.clone();
150 let endpoint_next = self.endpoint_next.clone();
151
152 futures.push(Box::pin({
153 let connection_override = connection_override.clone();
154 async move {
155 match connection_override {
156 Some(node_addr) => {
157 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
158 endpoint_stable
159 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
160 .await
161 }
162 None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
163 }.map_err(PeerError::Connection)
164 .map(super::IClientConnection::into_dyn)
165 }
166 }));
167
168 futures.push(Box::pin(async move {
169 match connection_override {
170 Some(node_addr) => {
171 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
172 endpoint_next
173 .connect(node_addr_stable_to_next(&node_addr), FEDIMINT_API_ALPN)
174 .await
175 }
176 None => endpoint_next.connect(
177 iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
178 FEDIMINT_API_ALPN
179 ).await,
180 }
181 .map_err(Into::into)
182 .map_err(PeerError::Connection)
183 .map(super::IClientConnection::into_dyn)
184 }));
185
186 let mut prev_err = None;
189
190 while let Some(result) = futures.next().await {
192 match result {
193 Ok(connection) => return Ok(connection),
194 Err(err) => {
195 warn!(
196 target: LOG_NET_IROH,
197 err = %err.fmt_compact(),
198 "Join error in iroh connection task"
199 );
200 prev_err = Some(err);
201 }
202 }
203 }
204
205 Err(prev_err.unwrap_or_else(|| {
206 PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
207 }))
208 }
209}
210
211fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
212 iroh_next::NodeAddr {
213 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
214 relay_url: stable
215 .relay_url
216 .as_ref()
217 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
218 direct_addresses: stable.direct_addresses.clone(),
219 }
220}
221#[async_trait]
222impl IClientConnection for Connection {
223 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
224 let json = serde_json::to_vec(&IrohApiRequest { method, request })
225 .expect("Serialization to vec can't fail");
226
227 let (mut sink, mut stream) = self
228 .open_bi()
229 .await
230 .map_err(|e| PeerError::Transport(e.into()))?;
231
232 sink.write_all(&json)
233 .await
234 .map_err(|e| PeerError::Transport(e.into()))?;
235
236 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
237
238 let response = stream
239 .read_to_end(1_000_000)
240 .await
241 .map_err(|e| PeerError::Transport(e.into()))?;
242
243 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
245 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
246
247 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
248 }
249
250 async fn await_disconnection(&self) {
251 self.closed().await;
252 }
253}
254
255#[async_trait]
256impl IClientConnection for iroh_next::endpoint::Connection {
257 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
258 let json = serde_json::to_vec(&IrohApiRequest { method, request })
259 .expect("Serialization to vec can't fail");
260
261 let (mut sink, mut stream) = self
262 .open_bi()
263 .await
264 .map_err(|e| PeerError::Transport(e.into()))?;
265
266 sink.write_all(&json)
267 .await
268 .map_err(|e| PeerError::Transport(e.into()))?;
269
270 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
271
272 let response = stream
273 .read_to_end(1_000_000)
274 .await
275 .map_err(|e| PeerError::Transport(e.into()))?;
276
277 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
279 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
280
281 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
282 }
283
284 async fn await_disconnection(&self) {
285 self.closed().await;
286 }
287}