1use core::panic;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::fmt::Debug;
4use std::iter::once;
5use std::pin::Pin;
6use std::result;
7use std::sync::Arc;
8
9use anyhow::{Context, anyhow};
10#[cfg(all(feature = "tor", not(target_family = "wasm")))]
11use arti_client::{TorAddr, TorClient, TorClientConfig};
12use async_channel::bounded;
13use async_trait::async_trait;
14use base64::Engine as _;
15use bitcoin::hashes::sha256;
16use bitcoin::secp256k1;
17pub use error::{FederationError, OutputOutcomeError, PeerError};
18use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
19use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
20use fedimint_core::core::backup::SignedBackupRequest;
21use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
24use fedimint_core::module::audit::AuditSummary;
25use fedimint_core::module::registry::ModuleDecoderRegistry;
26use fedimint_core::module::{
27 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
28};
29use fedimint_core::net::api_announcement::SignedApiAnnouncement;
30use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
31use fedimint_core::task::{MaybeSend, MaybeSync};
32use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
33use fedimint_core::util::backoff_util::api_networking_backoff;
34use fedimint_core::util::{FmtCompact as _, SafeUrl};
35use fedimint_core::{
36 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
37};
38use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
39use futures::channel::oneshot;
40use futures::future::pending;
41use futures::stream::FuturesUnordered;
42use futures::{Future, StreamExt};
43use global_api::with_cache::GlobalFederationApiWithCache;
44use jsonrpsee_core::DeserializeOwned;
45use jsonrpsee_core::client::ClientT;
46pub use jsonrpsee_core::client::Error as JsonRpcClientError;
47use jsonrpsee_types::ErrorCode;
48#[cfg(target_family = "wasm")]
49use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
50#[cfg(not(target_family = "wasm"))]
51use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
52#[cfg(not(target_family = "wasm"))]
53use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
54use serde::{Deserialize, Serialize};
55use serde_json::Value;
56#[cfg(not(target_family = "wasm"))]
57use tokio_rustls::rustls::RootCertStore;
58#[cfg(all(feature = "tor", not(target_family = "wasm")))]
59use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
60use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
61
62use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
63mod error;
64pub mod global_api;
65pub mod net;
66
67pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
68
69pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
70 ApiVersion { major: 0, minor: 1 };
71
72pub type PeerResult<T> = Result<T, PeerError>;
73pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
74pub type FederationResult<T> = Result<T, FederationError>;
75pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
76
77pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
78
79#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
83pub struct ApiVersionSet {
84 pub core: ApiVersion,
85 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
86}
87
88#[apply(async_trait_maybe_send!)]
90pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
91 fn all_peers(&self) -> &BTreeSet<PeerId>;
99
100 fn self_peer(&self) -> Option<PeerId>;
105
106 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
107
108 async fn request_raw(
110 &self,
111 peer_id: PeerId,
112 method: &str,
113 params: &ApiRequestErased,
114 ) -> PeerResult<Value>;
115}
116
117#[apply(async_trait_maybe_send!)]
120pub trait FederationApiExt: IRawFederationApi {
121 async fn request_single_peer<Ret>(
122 &self,
123 method: String,
124 params: ApiRequestErased,
125 peer: PeerId,
126 ) -> PeerResult<Ret>
127 where
128 Ret: DeserializeOwned,
129 {
130 self.request_raw(peer, &method, ¶ms)
131 .await
132 .and_then(|v| {
133 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
134 })
135 }
136
137 async fn request_single_peer_federation<FedRet>(
138 &self,
139 method: String,
140 params: ApiRequestErased,
141 peer_id: PeerId,
142 ) -> FederationResult<FedRet>
143 where
144 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
145 {
146 self.request_raw(peer_id, &method, ¶ms)
147 .await
148 .and_then(|v| {
149 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
150 })
151 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
152 }
153
154 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
157 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
158 &self,
159 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
160 method: String,
161 params: ApiRequestErased,
162 ) -> FederationResult<FR> {
163 #[cfg(not(target_family = "wasm"))]
167 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
168 #[cfg(target_family = "wasm")]
169 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
170
171 for peer in self.all_peers() {
172 futures.push(Box::pin({
173 let method = &method;
174 let params = ¶ms;
175 async move {
176 let result = self
177 .request_single_peer(method.clone(), params.clone(), *peer)
178 .await;
179
180 (*peer, result)
181 }
182 }));
183 }
184
185 let mut peer_errors = BTreeMap::new();
186 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
187
188 loop {
189 let (peer, result) = futures
190 .next()
191 .await
192 .expect("Query strategy ran out of peers to query without returning a result");
193
194 match result {
195 Ok(response) => match strategy.process(peer, response) {
196 QueryStep::Retry(peers) => {
197 for peer in peers {
198 futures.push(Box::pin({
199 let method = &method;
200 let params = ¶ms;
201 async move {
202 let result = self
203 .request_single_peer(method.clone(), params.clone(), peer)
204 .await;
205
206 (peer, result)
207 }
208 }));
209 }
210 }
211 QueryStep::Success(response) => return Ok(response),
212 QueryStep::Failure(e) => {
213 peer_errors.insert(peer, e);
214 }
215 QueryStep::Continue => {}
216 },
217 Err(e) => {
218 e.report_if_unusual(peer, "RequestWithStrategy");
219 peer_errors.insert(peer, e);
220 }
221 }
222
223 if peer_errors.len() == peer_error_threshold {
224 return Err(FederationError::peer_errors(
225 method.clone(),
226 params.params.clone(),
227 peer_errors,
228 ));
229 }
230 }
231 }
232
233 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
234 &self,
235 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
236 method: String,
237 params: ApiRequestErased,
238 ) -> FR {
239 #[cfg(not(target_family = "wasm"))]
243 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
244 #[cfg(target_family = "wasm")]
245 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
246
247 for peer in self.all_peers() {
248 futures.push(Box::pin({
249 let method = &method;
250 let params = ¶ms;
251 async move {
252 let response = util::retry(
253 "api-request-{method}-{peer}",
254 api_networking_backoff(),
255 || async {
256 self.request_single_peer(method.clone(), params.clone(), *peer)
257 .await
258 .inspect_err(|e| {
259 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
260 })
261 .map_err(|e| anyhow!(e.to_string()))
262 },
263 )
264 .await
265 .expect("Number of retries has no limit");
266
267 (*peer, response)
268 }
269 }));
270 }
271
272 loop {
273 let (peer, response) = match futures.next().await {
274 Some(t) => t,
275 None => pending().await,
276 };
277
278 match strategy.process(peer, response) {
279 QueryStep::Retry(peers) => {
280 for peer in peers {
281 futures.push(Box::pin({
282 let method = &method;
283 let params = ¶ms;
284 async move {
285 let response = util::retry(
286 "api-request-{method}-{peer}",
287 api_networking_backoff(),
288 || async {
289 self.request_single_peer(
290 method.clone(),
291 params.clone(),
292 peer,
293 )
294 .await
295 .inspect_err(|err| {
296 if err.is_unusual() {
297 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
298 }
299 })
300 .map_err(|e| anyhow!(e.to_string()))
301 },
302 )
303 .await
304 .expect("Number of retries has no limit");
305
306 (peer, response)
307 }
308 }));
309 }
310 }
311 QueryStep::Success(response) => return response,
312 QueryStep::Failure(e) => {
313 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
314 }
315 QueryStep::Continue => {}
316 }
317 }
318 }
319
320 async fn request_current_consensus<Ret>(
321 &self,
322 method: String,
323 params: ApiRequestErased,
324 ) -> FederationResult<Ret>
325 where
326 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
327 {
328 self.request_with_strategy(
329 ThresholdConsensus::new(self.all_peers().to_num_peers()),
330 method,
331 params,
332 )
333 .await
334 }
335
336 async fn request_current_consensus_retry<Ret>(
337 &self,
338 method: String,
339 params: ApiRequestErased,
340 ) -> Ret
341 where
342 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
343 {
344 self.request_with_strategy_retry(
345 ThresholdConsensus::new(self.all_peers().to_num_peers()),
346 method,
347 params,
348 )
349 .await
350 }
351
352 async fn request_admin<Ret>(
353 &self,
354 method: &str,
355 params: ApiRequestErased,
356 auth: ApiAuth,
357 ) -> FederationResult<Ret>
358 where
359 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
360 {
361 let Some(self_peer_id) = self.self_peer() else {
362 return Err(FederationError::general(
363 method,
364 params,
365 anyhow::format_err!("Admin peer_id not set"),
366 ));
367 };
368
369 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
370 .await
371 }
372
373 async fn request_admin_no_auth<Ret>(
374 &self,
375 method: &str,
376 params: ApiRequestErased,
377 ) -> FederationResult<Ret>
378 where
379 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
380 {
381 let Some(self_peer_id) = self.self_peer() else {
382 return Err(FederationError::general(
383 method,
384 params,
385 anyhow::format_err!("Admin peer_id not set"),
386 ));
387 };
388
389 self.request_single_peer_federation(method.into(), params, self_peer_id)
390 .await
391 }
392}
393
394#[apply(async_trait_maybe_send!)]
395impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
396
397pub trait IModuleFederationApi: IRawFederationApi {}
399
400dyn_newtype_define! {
401 #[derive(Clone)]
402 pub DynModuleApi(Arc<IModuleFederationApi>)
403}
404
405dyn_newtype_define! {
406 #[derive(Clone)]
407 pub DynGlobalApi(Arc<IGlobalFederationApi>)
408}
409
410impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
411 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
412 self.inner.as_ref()
413 }
414}
415
416impl DynGlobalApi {
417 pub async fn new_admin(
418 peer: PeerId,
419 url: SafeUrl,
420 api_secret: &Option<String>,
421 ) -> anyhow::Result<DynGlobalApi> {
422 Ok(GlobalFederationApiWithCache::new(
423 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
424 .await?,
425 )
426 .into())
427 }
428
429 pub async fn from_setup_endpoint(
433 url: SafeUrl,
434 api_secret: &Option<String>,
435 ) -> anyhow::Result<Self> {
436 Self::new_admin(PeerId::from(1024), url, api_secret).await
440 }
441
442 pub async fn from_endpoints(
443 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
444 api_secret: &Option<String>,
445 ) -> anyhow::Result<Self> {
446 Ok(GlobalFederationApiWithCache::new(
447 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
448 )
449 .into())
450 }
451}
452
453#[apply(async_trait_maybe_send!)]
455pub trait IGlobalFederationApi: IRawFederationApi {
456 async fn submit_transaction(
457 &self,
458 tx: Transaction,
459 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
460
461 async fn await_block(
462 &self,
463 block_index: u64,
464 decoders: &ModuleDecoderRegistry,
465 ) -> anyhow::Result<SessionOutcome>;
466
467 async fn get_session_status(
468 &self,
469 block_index: u64,
470 decoders: &ModuleDecoderRegistry,
471 core_api_version: ApiVersion,
472 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
473 ) -> anyhow::Result<SessionStatus>;
474
475 async fn session_count(&self) -> FederationResult<u64>;
476
477 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
478
479 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
480
481 async fn download_backup(
482 &self,
483 id: &secp256k1::PublicKey,
484 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
485
486 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
490
491 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
492
493 async fn set_local_params(
494 &self,
495 name: String,
496 federation_name: Option<String>,
497 auth: ApiAuth,
498 ) -> FederationResult<String>;
499
500 async fn add_peer_connection_info(
501 &self,
502 info: String,
503 auth: ApiAuth,
504 ) -> FederationResult<String>;
505
506 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
508
509 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
517
518 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
523
524 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
528
529 async fn get_verify_config_hash(
532 &self,
533 auth: ApiAuth,
534 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
535
536 async fn verified_configs(
539 &self,
540 auth: ApiAuth,
541 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
542
543 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
549
550 async fn status(&self) -> FederationResult<StatusResponse>;
552
553 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
555
556 async fn guardian_config_backup(&self, auth: ApiAuth)
558 -> FederationResult<GuardianConfigBackup>;
559
560 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
562
563 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
564
565 async fn submit_api_announcement(
567 &self,
568 peer_id: PeerId,
569 announcement: SignedApiAnnouncement,
570 ) -> FederationResult<()>;
571
572 async fn api_announcements(
573 &self,
574 guardian: PeerId,
575 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
576
577 async fn sign_api_announcement(
578 &self,
579 api_url: SafeUrl,
580 auth: ApiAuth,
581 ) -> FederationResult<SignedApiAnnouncement>;
582
583 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
584
585 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
587
588 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
590}
591
592pub fn deserialize_outcome<R>(
593 outcome: &SerdeOutputOutcome,
594 module_decoder: &Decoder,
595) -> OutputOutcomeResult<R>
596where
597 R: OutputOutcome + MaybeSend,
598{
599 let dyn_outcome = outcome
600 .try_into_inner_known_module_kind(module_decoder)
601 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
602
603 let source_instance = dyn_outcome.module_instance_id();
604
605 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
606 let target_type = std::any::type_name::<R>();
607 OutputOutcomeError::ResponseDeserialization(anyhow!(
608 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
609 ))
610 })
611}
612
613#[derive(Debug, Clone)]
614pub struct WebsocketConnector {
615 peers: BTreeMap<PeerId, SafeUrl>,
616 api_secret: Option<String>,
617
618 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
624}
625
626impl WebsocketConnector {
627 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
628 let mut s = Self::new_no_overrides(peers, api_secret);
629
630 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
631 s = s.with_connection_override(k, v);
632 }
633
634 Ok(s)
635 }
636 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
637 self.connection_overrides.insert(peer_id, url);
638 self
639 }
640 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
641 Self {
642 peers,
643 api_secret,
644 connection_overrides: BTreeMap::default(),
645 }
646 }
647}
648
649#[async_trait]
650impl IClientConnector for WebsocketConnector {
651 fn peers(&self) -> BTreeSet<PeerId> {
652 self.peers.keys().copied().collect()
653 }
654
655 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
656 let api_endpoint = match self.connection_overrides.get(&peer_id) {
657 Some(url) => {
658 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
659 url
660 }
661 None => self.peers.get(&peer_id).ok_or_else(|| {
662 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
663 })?,
664 };
665
666 #[cfg(not(target_family = "wasm"))]
667 let mut client = {
668 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
669 let mut root_certs = RootCertStore::empty();
670 root_certs.extend(webpki_roots);
671
672 let tls_cfg = CustomCertStore::builder()
673 .with_root_certificates(root_certs)
674 .with_no_client_auth();
675
676 WsClientBuilder::default()
677 .max_concurrent_requests(u16::MAX as usize)
678 .with_custom_cert_store(tls_cfg)
679 };
680
681 #[cfg(target_family = "wasm")]
682 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
683
684 if let Some(api_secret) = &self.api_secret {
685 #[cfg(not(target_family = "wasm"))]
686 {
687 let mut headers = HeaderMap::new();
690
691 let auth = base64::engine::general_purpose::STANDARD
692 .encode(format!("fedimint:{api_secret}"));
693
694 headers.insert(
695 "Authorization",
696 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
697 );
698
699 client = client.set_headers(headers);
700 }
701 #[cfg(target_family = "wasm")]
702 {
703 let mut url = api_endpoint.clone();
706 url.set_username("fedimint")
707 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
708 url.set_password(Some(&api_secret))
709 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
710
711 let client = client
712 .build(url.as_str())
713 .await
714 .map_err(|err| PeerError::InternalClientError(err.into()))?;
715
716 return Ok(client.into_dyn());
717 }
718 }
719
720 let client = client
721 .build(api_endpoint.as_str())
722 .await
723 .map_err(|err| PeerError::InternalClientError(err.into()))?;
724
725 Ok(client.into_dyn())
726 }
727}
728
729#[cfg(all(feature = "tor", not(target_family = "wasm")))]
730#[derive(Debug, Clone)]
731pub struct TorConnector {
732 peers: BTreeMap<PeerId, SafeUrl>,
733 api_secret: Option<String>,
734}
735
736#[cfg(all(feature = "tor", not(target_family = "wasm")))]
737impl TorConnector {
738 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
739 Self { peers, api_secret }
740 }
741}
742
743#[cfg(all(feature = "tor", not(target_family = "wasm")))]
744#[async_trait]
745impl IClientConnector for TorConnector {
746 fn peers(&self) -> BTreeSet<PeerId> {
747 self.peers.keys().copied().collect()
748 }
749
750 #[allow(clippy::too_many_lines)]
751 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
752 let api_endpoint = self
753 .peers
754 .get(&peer_id)
755 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
756
757 let tor_config = TorClientConfig::default();
758 let tor_client = TorClient::create_bootstrapped(tor_config)
759 .await
760 .map_err(|err| PeerError::InternalClientError(err.into()))?
761 .isolated_client();
762
763 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
764
765 let addr = (
768 api_endpoint
769 .host_str()
770 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
771 api_endpoint
772 .port_or_known_default()
773 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
774 );
775 let tor_addr = TorAddr::from(addr).map_err(|e| {
776 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
777 })?;
778
779 let tor_addr_clone = tor_addr.clone();
780
781 debug!(
782 ?tor_addr,
783 ?addr,
784 "Successfully created `TorAddr` for given address (i.e. host and port)"
785 );
786
787 let anonymized_stream = if api_endpoint.is_onion_address() {
790 let mut stream_prefs = arti_client::StreamPrefs::default();
791 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
792
793 let anonymized_stream = tor_client
794 .connect_with_prefs(tor_addr, &stream_prefs)
795 .await
796 .map_err(|e| PeerError::Connection(e.into()))?;
797
798 debug!(
799 ?tor_addr_clone,
800 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
801 );
802 anonymized_stream
803 } else {
804 let anonymized_stream = tor_client
805 .connect(tor_addr)
806 .await
807 .map_err(|e| PeerError::Connection(e.into()))?;
808
809 debug!(
810 ?tor_addr_clone,
811 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
812 );
813 anonymized_stream
814 };
815
816 let is_tls = match api_endpoint.scheme() {
817 "wss" => true,
818 "ws" => false,
819 unexpected_scheme => {
820 return Err(PeerError::InvalidEndpoint(anyhow!(
821 "Unsupported scheme: {unexpected_scheme}"
822 )));
823 }
824 };
825
826 let tls_connector = if is_tls {
827 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
828 let mut root_certs = RootCertStore::empty();
829 root_certs.extend(webpki_roots);
830
831 let tls_config = TlsClientConfig::builder()
832 .with_root_certificates(root_certs)
833 .with_no_client_auth();
834 let tls_connector = TlsConnector::from(Arc::new(tls_config));
835 Some(tls_connector)
836 } else {
837 None
838 };
839
840 let mut ws_client_builder =
841 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
842
843 if let Some(api_secret) = &self.api_secret {
844 let mut headers = HeaderMap::new();
847
848 let auth =
849 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
850
851 headers.insert(
852 "Authorization",
853 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
854 );
855
856 ws_client_builder = ws_client_builder.set_headers(headers);
857 }
858
859 match tls_connector {
860 None => {
861 let client = ws_client_builder
862 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
863 .await
864 .map_err(|e| PeerError::Connection(e.into()))?;
865
866 Ok(client.into_dyn())
867 }
868 Some(tls_connector) => {
869 let host = api_endpoint
870 .host_str()
871 .map(ToOwned::to_owned)
872 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
873
874 let server_name = rustls_pki_types::ServerName::try_from(host)
877 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
878
879 let anonymized_tls_stream = tls_connector
880 .connect(server_name, anonymized_stream)
881 .await
882 .map_err(|e| PeerError::Connection(e.into()))?;
883
884 let client = ws_client_builder
885 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
886 .await
887 .map_err(|e| PeerError::Connection(e.into()))?;
888
889 Ok(client.into_dyn())
890 }
891 }
892 }
893}
894
895fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
896 match jsonrpc_error {
897 JsonRpcClientError::Call(error_object) => {
898 let error = anyhow!(error_object.message().to_owned());
899 match ErrorCode::from(error_object.code()) {
900 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
901 PeerError::InvalidRequest(error)
902 }
903 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
904 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
905 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
906 PeerError::ServerError(error)
907 }
908 }
909 }
910 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
911 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
912 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
913 JsonRpcClientError::InvalidSubscriptionId => {
914 PeerError::Transport(anyhow!("Invalid subscription id"))
915 }
916 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
917 PeerError::InvalidRequest(anyhow!(invalid_request_id))
918 }
919 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
920 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
921 JsonRpcClientError::HttpNotImplemented => {
922 PeerError::ServerError(anyhow!("Http not implemented"))
923 }
924 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
925 PeerError::InvalidRequest(anyhow!(empty_batch_request))
926 }
927 JsonRpcClientError::RegisterMethod(register_method_error) => {
928 PeerError::InvalidResponse(anyhow!(register_method_error))
929 }
930 }
931}
932
933#[async_trait]
934impl IClientConnection for WsClient {
935 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
936 let method = match method {
937 ApiMethod::Core(method) => method,
938 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
939 };
940
941 Ok(ClientT::request(self, &method, [request.to_json()])
942 .await
943 .map_err(jsonrpc_error_to_peer_error)?)
944 }
945
946 async fn await_disconnection(&self) {
947 self.on_disconnect().await;
948 }
949}
950
951pub type DynClientConnector = Arc<dyn IClientConnector>;
952
953#[async_trait]
956pub trait IClientConnector: Send + Sync + 'static {
957 fn peers(&self) -> BTreeSet<PeerId>;
958
959 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
960
961 fn into_dyn(self) -> DynClientConnector
962 where
963 Self: Sized,
964 {
965 Arc::new(self)
966 }
967}
968
969pub type DynClientConnection = Arc<dyn IClientConnection>;
970
971#[async_trait]
972pub trait IClientConnection: Debug + Send + Sync + 'static {
973 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
974
975 async fn await_disconnection(&self);
976
977 fn into_dyn(self) -> DynClientConnection
978 where
979 Self: Sized,
980 {
981 Arc::new(self)
982 }
983}
984
985#[derive(Clone, Debug)]
986pub struct ReconnectFederationApi {
987 peers: BTreeSet<PeerId>,
988 admin_id: Option<PeerId>,
989 module_id: Option<ModuleInstanceId>,
990 connections: ReconnectClientConnections,
991}
992
993impl ReconnectFederationApi {
994 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
995 Self {
996 peers: connector.peers(),
997 admin_id,
998 module_id: None,
999 connections: ReconnectClientConnections::new(connector),
1000 }
1001 }
1002
1003 pub async fn new_admin(
1004 peer: PeerId,
1005 url: SafeUrl,
1006 api_secret: &Option<String>,
1007 ) -> anyhow::Result<Self> {
1008 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1009 }
1010
1011 pub async fn from_endpoints(
1012 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1013 api_secret: &Option<String>,
1014 admin_id: Option<PeerId>,
1015 ) -> anyhow::Result<Self> {
1016 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1017
1018 let scheme = peers
1019 .values()
1020 .next()
1021 .expect("Federation api has been initialized with no peers")
1022 .scheme();
1023
1024 let connector = match scheme {
1025 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1026 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1027 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1028 "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1029 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1030 };
1031
1032 Ok(ReconnectFederationApi::new(&connector, admin_id))
1033 }
1034}
1035
1036impl IModuleFederationApi for ReconnectFederationApi {}
1037
1038#[apply(async_trait_maybe_send!)]
1039impl IRawFederationApi for ReconnectFederationApi {
1040 fn all_peers(&self) -> &BTreeSet<PeerId> {
1041 &self.peers
1042 }
1043
1044 fn self_peer(&self) -> Option<PeerId> {
1045 self.admin_id
1046 }
1047
1048 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1049 ReconnectFederationApi {
1050 peers: self.peers.clone(),
1051 admin_id: self.admin_id,
1052 module_id: Some(id),
1053 connections: self.connections.clone(),
1054 }
1055 .into()
1056 }
1057
1058 async fn request_raw(
1059 &self,
1060 peer_id: PeerId,
1061 method: &str,
1062 params: &ApiRequestErased,
1063 ) -> PeerResult<Value> {
1064 let method = match self.module_id {
1065 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1066 None => ApiMethod::Core(method.to_string()),
1067 };
1068
1069 self.connections
1070 .request(peer_id, method, params.clone())
1071 .await
1072 }
1073}
1074
1075#[derive(Clone, Debug)]
1076pub struct ReconnectClientConnections {
1077 connections: BTreeMap<PeerId, ClientConnection>,
1078}
1079
1080impl ReconnectClientConnections {
1081 pub fn new(connector: &DynClientConnector) -> Self {
1082 ReconnectClientConnections {
1083 connections: connector
1084 .peers()
1085 .into_iter()
1086 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1087 .collect(),
1088 }
1089 }
1090
1091 async fn request(
1092 &self,
1093 peer: PeerId,
1094 method: ApiMethod,
1095 request: ApiRequestErased,
1096 ) -> PeerResult<Value> {
1097 trace!(target: LOG_NET_API, %method, "Api request");
1098 let res = self
1099 .connections
1100 .get(&peer)
1101 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1102 .connection()
1103 .await
1104 .context("Failed to connect to peer")
1105 .map_err(PeerError::Connection)?
1106 .request(method.clone(), request)
1107 .await;
1108
1109 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1110
1111 res
1112 }
1113}
1114
1115#[derive(Clone, Debug)]
1116struct ClientConnection {
1117 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1118}
1119
1120impl ClientConnection {
1121 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1122 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1123
1124 fedimint_core::task::spawn(
1125 "peer-api-connection",
1126 async move {
1127 let mut backoff = api_networking_backoff();
1128
1129 while let Ok(sender) = receiver.recv().await {
1130 let mut senders = vec![sender];
1131
1132 while let Ok(sender) = receiver.try_recv() {
1135 senders.push(sender);
1136 }
1137
1138 match connector.connect(peer).await {
1139 Ok(connection) => {
1140 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1141
1142 for sender in senders {
1143 sender.send(connection.clone()).ok();
1144 }
1145
1146 loop {
1147 tokio::select! {
1148 sender = receiver.recv() => {
1149 match sender.ok() {
1150 Some(sender) => sender.send(connection.clone()).ok(),
1151 None => break,
1152 };
1153 }
1154 () = connection.await_disconnection() => break,
1155 }
1156 }
1157
1158 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1159
1160 backoff = api_networking_backoff();
1161 }
1162 Err(e) => {
1163 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1164
1165 fedimint_core::task::sleep(
1166 backoff.next().expect("No limit to the number of retries"),
1167 )
1168 .await;
1169 }
1170 }
1171 }
1172
1173 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1174 }
1175 .instrument(trace_span!("peer-api-connection", ?peer)),
1176 );
1177
1178 ClientConnection { sender }
1179 }
1180
1181 async fn connection(&self) -> Option<DynClientConnection> {
1182 let (sender, receiver) = oneshot::channel();
1183
1184 self.sender
1185 .send(sender)
1186 .await
1187 .expect("Api connection request channel closed unexpectedly");
1188
1189 receiver.await.ok()
1190 }
1191}
1192
1193mod iroh {
1194 use std::collections::{BTreeMap, BTreeSet};
1195 use std::str::FromStr;
1196
1197 use anyhow::Context;
1198 use async_trait::async_trait;
1199 use fedimint_core::PeerId;
1200 use fedimint_core::envs::parse_kv_list_from_env;
1201 use fedimint_core::module::{
1202 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
1203 };
1204 use fedimint_core::util::SafeUrl;
1205 use fedimint_logging::LOG_NET_IROH;
1206 use iroh::endpoint::Connection;
1207 use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
1208 use iroh_base::ticket::NodeTicket;
1209 use serde_json::Value;
1210 use tracing::{debug, trace, warn};
1211
1212 use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
1213
1214 #[derive(Debug, Clone)]
1215 pub struct IrohConnector {
1216 node_ids: BTreeMap<PeerId, NodeId>,
1217 endpoint: Endpoint,
1218
1219 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
1225 }
1226
1227 impl IrohConnector {
1228 pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1229 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
1230 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
1231 let mut s = Self::new_no_overrides(peers).await?;
1232
1233 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
1234 s = s.with_connection_override(k, v.into());
1235 }
1236
1237 Ok(s)
1238 }
1239
1240 pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1241 let node_ids = peers
1242 .into_iter()
1243 .map(|(peer, url)| {
1244 let host = url.host_str().context("Url is missing host")?;
1245
1246 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
1247
1248 Ok((peer, node_id))
1249 })
1250 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
1251
1252 let builder = Endpoint::builder().discovery_n0();
1253 #[cfg(not(target_family = "wasm"))]
1254 let builder = builder.discovery_dht();
1255 let endpoint = builder.bind().await?;
1256 debug!(
1257 target: LOG_NET_IROH,
1258 node_id = %endpoint.node_id(),
1259 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
1260 "Iroh api client endpoint"
1261 );
1262
1263 Ok(Self {
1264 node_ids,
1265 endpoint,
1266 connection_overrides: BTreeMap::new(),
1267 })
1268 }
1269
1270 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
1271 self.connection_overrides.insert(node, addr);
1272 self
1273 }
1274 }
1275
1276 #[async_trait]
1277 impl IClientConnector for IrohConnector {
1278 fn peers(&self) -> BTreeSet<PeerId> {
1279 self.node_ids.keys().copied().collect()
1280 }
1281
1282 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
1283 let node_id = *self
1284 .node_ids
1285 .get(&peer_id)
1286 .ok_or(PeerError::InvalidPeerId { peer_id })?;
1287
1288 let connection = match self.connection_overrides.get(&node_id) {
1289 Some(node_addr) => {
1290 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
1291 self.endpoint
1292 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
1293 .await
1294 }
1295 None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
1296 }.map_err(PeerError::Connection)?;
1297
1298 Ok(connection.into_dyn())
1299 }
1300 }
1301
1302 #[async_trait]
1303 impl IClientConnection for Connection {
1304 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1305 let json = serde_json::to_vec(&IrohApiRequest { method, request })
1306 .expect("Serialization to vec can't fail");
1307
1308 let (mut sink, mut stream) = self
1309 .open_bi()
1310 .await
1311 .map_err(|e| PeerError::Transport(e.into()))?;
1312
1313 sink.write_all(&json)
1314 .await
1315 .map_err(|e| PeerError::Transport(e.into()))?;
1316
1317 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
1318
1319 let response = stream
1320 .read_to_end(1_000_000)
1321 .await
1322 .map_err(|e| PeerError::Transport(e.into()))?;
1323
1324 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
1326 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
1327
1328 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
1329 }
1330
1331 async fn await_disconnection(&self) {
1332 self.closed().await;
1333 }
1334 }
1335}
1336
1337#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1339pub struct LegacyFederationStatus {
1340 pub session_count: u64,
1341 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1342 pub peers_online: u64,
1343 pub peers_offline: u64,
1344 pub peers_flagged: u64,
1347 pub scheduled_shutdown: Option<u64>,
1348}
1349
1350#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1351pub struct LegacyPeerStatus {
1352 pub last_contribution: Option<u64>,
1353 pub connection_status: LegacyP2PConnectionStatus,
1354 pub flagged: bool,
1357}
1358
1359#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1360#[serde(rename_all = "snake_case")]
1361pub enum LegacyP2PConnectionStatus {
1362 #[default]
1363 Disconnected,
1364 Connected,
1365}
1366
1367#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1368pub struct StatusResponse {
1369 pub server: ServerStatusLegacy,
1370 pub federation: Option<LegacyFederationStatus>,
1371}
1372
1373#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1376pub struct GuardianConfigBackup {
1377 #[serde(with = "fedimint_core::hex::serde")]
1378 pub tar_archive_bytes: Vec<u8>,
1379}
1380
1381#[cfg(test)]
1382mod tests {
1383 use std::str::FromStr as _;
1384
1385 use fedimint_core::config::FederationId;
1386 use fedimint_core::invite_code::InviteCode;
1387
1388 use super::*;
1389
1390 #[test]
1391 fn converts_invite_code() {
1392 let connect = InviteCode::new(
1393 "ws://test1".parse().unwrap(),
1394 PeerId::from(1),
1395 FederationId::dummy(),
1396 Some("api_secret".into()),
1397 );
1398
1399 let bech32 = connect.to_string();
1400 let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1401 assert_eq!(connect, connect_parsed);
1402
1403 let json = serde_json::to_string(&connect).unwrap();
1404 let connect_as_string: String = serde_json::from_str(&json).unwrap();
1405 assert_eq!(connect_as_string, bech32);
1406 let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1407 assert_eq!(connect_parsed_json, connect_parsed);
1408 }
1409
1410 #[test]
1411 fn creates_essential_guardians_invite_code() {
1412 let mut peer_to_url_map = BTreeMap::new();
1413 peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1414 peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1415 peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1416 peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1417 let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1418
1419 let code =
1420 InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1421
1422 assert_eq!(FederationId::dummy(), code.federation_id());
1423
1424 let expected_map: BTreeMap<PeerId, SafeUrl> =
1425 peer_to_url_map.into_iter().take(max_size).collect();
1426 assert_eq!(expected_map, code.peers());
1427 }
1428}