1use std::collections::{BTreeMap, BTreeSet};
2use std::pin::Pin;
3use std::str::FromStr;
4
5use anyhow::Context;
6use async_trait::async_trait;
7use fedimint_core::PeerId;
8use fedimint_core::envs::parse_kv_list_from_env;
9use fedimint_core::iroh_prod::FM_DNS_PKARR_RELAY_PROD;
10use fedimint_core::module::{
11 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
12};
13use fedimint_core::util::{FmtCompact as _, SafeUrl};
14use fedimint_logging::LOG_NET_IROH;
15use futures::Future;
16use futures::stream::{FuturesUnordered, StreamExt};
17use iroh::discovery::pkarr::{PkarrPublisher, PkarrResolver};
18use iroh::endpoint::Connection;
19use iroh::{Endpoint, NodeAddr, NodeId, PublicKey, SecretKey};
20use iroh_base::ticket::NodeTicket;
21use serde_json::Value;
22use tracing::{debug, trace, warn};
23use url::Url;
24
25use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
26
27#[derive(Debug, Clone)]
28pub struct IrohConnector {
29 node_ids: BTreeMap<PeerId, NodeId>,
30 endpoint_stable: Endpoint,
31 endpoint_next: iroh_next::Endpoint,
32
33 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
39}
40
41impl IrohConnector {
42 pub async fn new(
43 peers: BTreeMap<PeerId, SafeUrl>,
44 iroh_dns: Option<SafeUrl>,
45 ) -> anyhow::Result<Self> {
46 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
47 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
48 let mut s = Self::new_no_overrides(peers, iroh_dns).await?;
49
50 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
51 s = s.with_connection_override(k, v.into());
52 }
53
54 Ok(s)
55 }
56
57 pub async fn new_no_overrides(
58 peers: BTreeMap<PeerId, SafeUrl>,
59 iroh_dns: Option<SafeUrl>,
60 ) -> anyhow::Result<Self> {
61 let iroh_dns_servers: Vec<_> = iroh_dns.map_or_else(
62 || {
63 FM_DNS_PKARR_RELAY_PROD
64 .into_iter()
65 .map(|url| Url::parse(url).expect("Hardcoded, can't fail"))
66 .collect()
67 },
68 |url| vec![url.to_unsafe()],
69 );
70 let node_ids = peers
71 .into_iter()
72 .map(|(peer, url)| {
73 let host = url.host_str().context("Url is missing host")?;
74
75 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
76
77 Ok((peer, node_id))
78 })
79 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
80
81 let endpoint_stable = {
82 let mut builder = Endpoint::builder();
83
84 for iroh_dns in iroh_dns_servers {
85 builder = builder
86 .add_discovery({
87 let iroh_dns = iroh_dns.clone();
88 move |sk: &SecretKey| Some(PkarrPublisher::new(sk.clone(), iroh_dns))
89 })
90 .add_discovery(|_| Some(PkarrResolver::new(iroh_dns)));
91 }
92
93 #[cfg(not(target_family = "wasm"))]
94 let builder = builder.discovery_dht();
95 let endpoint = builder.bind().await?;
96 debug!(
97 target: LOG_NET_IROH,
98 node_id = %endpoint.node_id(),
99 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
100 "Iroh api client endpoint (stable)"
101 );
102 endpoint
103 };
104 let endpoint_next = {
105 let builder = iroh_next::Endpoint::builder().discovery_n0();
106 #[cfg(not(target_family = "wasm"))]
107 let builder = builder.discovery_dht();
108 let endpoint = builder.bind().await?;
109 debug!(
110 target: LOG_NET_IROH,
111 node_id = %endpoint.node_id(),
112 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
113 "Iroh api client endpoint (next)"
114 );
115 endpoint
116 };
117
118 Ok(Self {
119 node_ids,
120 endpoint_stable,
121 endpoint_next,
122 connection_overrides: BTreeMap::new(),
123 })
124 }
125
126 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
127 self.connection_overrides.insert(node, addr);
128 self
129 }
130}
131
132#[async_trait]
133impl IClientConnector for IrohConnector {
134 fn peers(&self) -> BTreeSet<PeerId> {
135 self.node_ids.keys().copied().collect()
136 }
137
138 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
139 let node_id = *self
140 .node_ids
141 .get(&peer_id)
142 .ok_or(PeerError::InvalidPeerId { peer_id })?;
143
144 let mut futures = FuturesUnordered::<
145 Pin<Box<dyn Future<Output = PeerResult<DynClientConnection>> + Send>>,
146 >::new();
147 let connection_override = self.connection_overrides.get(&node_id).cloned();
148 let endpoint_stable = self.endpoint_stable.clone();
149 let endpoint_next = self.endpoint_next.clone();
150
151 futures.push(Box::pin({
152 let connection_override = connection_override.clone();
153 async move {
154 match connection_override {
155 Some(node_addr) => {
156 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
157 endpoint_stable
158 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
159 .await
160 }
161 None => endpoint_stable.connect(node_id, FEDIMINT_API_ALPN).await,
162 }.map_err(PeerError::Connection)
163 .map(super::IClientConnection::into_dyn)
164 }
165 }));
166
167 futures.push(Box::pin(async move {
168 match connection_override {
169 Some(node_addr) => {
170 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
171 endpoint_next
172 .connect(node_addr_stable_to_next(&node_addr), FEDIMINT_API_ALPN)
173 .await
174 }
175 None => endpoint_next.connect(
176 iroh_next::NodeId::from_bytes(node_id.as_bytes()).expect("Can't fail"),
177 FEDIMINT_API_ALPN
178 ).await,
179 }
180 .map_err(Into::into)
181 .map_err(PeerError::Connection)
182 .map(super::IClientConnection::into_dyn)
183 }));
184
185 let mut prev_err = None;
188
189 while let Some(result) = futures.next().await {
191 match result {
192 Ok(connection) => return Ok(connection),
193 Err(err) => {
194 warn!(
195 target: LOG_NET_IROH,
196 err = %err.fmt_compact(),
197 "Join error in iroh connection task"
198 );
199 prev_err = Some(err);
200 }
201 }
202 }
203
204 Err(prev_err.unwrap_or_else(|| {
205 PeerError::ServerError(anyhow::anyhow!("Both iroh connection attempts failed"))
206 }))
207 }
208}
209
210fn node_addr_stable_to_next(stable: &iroh::NodeAddr) -> iroh_next::NodeAddr {
211 iroh_next::NodeAddr {
212 node_id: iroh_next::NodeId::from_bytes(stable.node_id.as_bytes()).expect("Can't fail"),
213 relay_url: stable
214 .relay_url
215 .as_ref()
216 .map(|u| iroh_next::RelayUrl::from_str(&u.to_string()).expect("Can't fail")),
217 direct_addresses: stable.direct_addresses.clone(),
218 }
219}
220#[async_trait]
221impl IClientConnection for Connection {
222 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
223 let json = serde_json::to_vec(&IrohApiRequest { method, request })
224 .expect("Serialization to vec can't fail");
225
226 let (mut sink, mut stream) = self
227 .open_bi()
228 .await
229 .map_err(|e| PeerError::Transport(e.into()))?;
230
231 sink.write_all(&json)
232 .await
233 .map_err(|e| PeerError::Transport(e.into()))?;
234
235 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
236
237 let response = stream
238 .read_to_end(1_000_000)
239 .await
240 .map_err(|e| PeerError::Transport(e.into()))?;
241
242 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
244 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
245
246 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
247 }
248
249 async fn await_disconnection(&self) {
250 self.closed().await;
251 }
252}
253
254#[async_trait]
255impl IClientConnection for iroh_next::endpoint::Connection {
256 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
257 let json = serde_json::to_vec(&IrohApiRequest { method, request })
258 .expect("Serialization to vec can't fail");
259
260 let (mut sink, mut stream) = self
261 .open_bi()
262 .await
263 .map_err(|e| PeerError::Transport(e.into()))?;
264
265 sink.write_all(&json)
266 .await
267 .map_err(|e| PeerError::Transport(e.into()))?;
268
269 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
270
271 let response = stream
272 .read_to_end(1_000_000)
273 .await
274 .map_err(|e| PeerError::Transport(e.into()))?;
275
276 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
278 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
279
280 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
281 }
282
283 async fn await_disconnection(&self) {
284 self.closed().await;
285 }
286}