1use core::panic;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::fmt::Debug;
4use std::iter::once;
5use std::pin::Pin;
6use std::result;
7use std::sync::Arc;
8
9use anyhow::{Context, anyhow};
10#[cfg(all(feature = "tor", not(target_family = "wasm")))]
11use arti_client::{TorAddr, TorClient, TorClientConfig};
12use async_channel::bounded;
13use async_trait::async_trait;
14use base64::Engine as _;
15use bitcoin::hashes::sha256;
16use bitcoin::secp256k1;
17pub use error::{FederationError, OutputOutcomeError, PeerError};
18use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
19use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
20use fedimint_core::core::backup::SignedBackupRequest;
21use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
29};
30use fedimint_core::net::api_announcement::SignedApiAnnouncement;
31use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
32use fedimint_core::task::{MaybeSend, MaybeSync};
33use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
34use fedimint_core::util::backoff_util::api_networking_backoff;
35use fedimint_core::util::{FmtCompact as _, SafeUrl};
36use fedimint_core::{
37 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
38};
39use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
40use futures::channel::oneshot;
41use futures::future::pending;
42use futures::stream::FuturesUnordered;
43use futures::{Future, StreamExt};
44use global_api::with_cache::GlobalFederationApiWithCache;
45use jsonrpsee_core::DeserializeOwned;
46use jsonrpsee_core::client::ClientT;
47pub use jsonrpsee_core::client::Error as JsonRpcClientError;
48use jsonrpsee_types::ErrorCode;
49#[cfg(target_family = "wasm")]
50use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
51#[cfg(not(target_family = "wasm"))]
52use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
53#[cfg(not(target_family = "wasm"))]
54use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
55use serde::{Deserialize, Serialize};
56use serde_json::Value;
57#[cfg(not(target_family = "wasm"))]
58use tokio_rustls::rustls::RootCertStore;
59#[cfg(all(feature = "tor", not(target_family = "wasm")))]
60use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
61use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
62
63use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
64mod error;
65pub mod global_api;
66pub mod net;
67
68pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
69
70pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
71 ApiVersion { major: 0, minor: 1 };
72
73pub type PeerResult<T> = Result<T, PeerError>;
74pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
75pub type FederationResult<T> = Result<T, FederationError>;
76pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
77
78pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
79
80#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
84pub struct ApiVersionSet {
85 pub core: ApiVersion,
86 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
87}
88
89#[apply(async_trait_maybe_send!)]
91pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
92 fn all_peers(&self) -> &BTreeSet<PeerId>;
100
101 fn self_peer(&self) -> Option<PeerId>;
106
107 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
108
109 async fn request_raw(
111 &self,
112 peer_id: PeerId,
113 method: &str,
114 params: &ApiRequestErased,
115 ) -> PeerResult<Value>;
116}
117
118#[apply(async_trait_maybe_send!)]
121pub trait FederationApiExt: IRawFederationApi {
122 async fn request_single_peer<Ret>(
123 &self,
124 method: String,
125 params: ApiRequestErased,
126 peer: PeerId,
127 ) -> PeerResult<Ret>
128 where
129 Ret: DeserializeOwned,
130 {
131 self.request_raw(peer, &method, ¶ms)
132 .await
133 .and_then(|v| {
134 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
135 })
136 }
137
138 async fn request_single_peer_federation<FedRet>(
139 &self,
140 method: String,
141 params: ApiRequestErased,
142 peer_id: PeerId,
143 ) -> FederationResult<FedRet>
144 where
145 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
146 {
147 self.request_raw(peer_id, &method, ¶ms)
148 .await
149 .and_then(|v| {
150 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
151 })
152 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
153 }
154
155 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
158 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
159 &self,
160 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
161 method: String,
162 params: ApiRequestErased,
163 ) -> FederationResult<FR> {
164 #[cfg(not(target_family = "wasm"))]
168 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
169 #[cfg(target_family = "wasm")]
170 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
171
172 for peer in self.all_peers() {
173 futures.push(Box::pin({
174 let method = &method;
175 let params = ¶ms;
176 async move {
177 let result = self
178 .request_single_peer(method.clone(), params.clone(), *peer)
179 .await;
180
181 (*peer, result)
182 }
183 }));
184 }
185
186 let mut peer_errors = BTreeMap::new();
187 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
188
189 loop {
190 let (peer, result) = futures
191 .next()
192 .await
193 .expect("Query strategy ran out of peers to query without returning a result");
194
195 match result {
196 Ok(response) => match strategy.process(peer, response) {
197 QueryStep::Retry(peers) => {
198 for peer in peers {
199 futures.push(Box::pin({
200 let method = &method;
201 let params = ¶ms;
202 async move {
203 let result = self
204 .request_single_peer(method.clone(), params.clone(), peer)
205 .await;
206
207 (peer, result)
208 }
209 }));
210 }
211 }
212 QueryStep::Success(response) => return Ok(response),
213 QueryStep::Failure(e) => {
214 peer_errors.insert(peer, e);
215 }
216 QueryStep::Continue => {}
217 },
218 Err(e) => {
219 e.report_if_unusual(peer, "RequestWithStrategy");
220 peer_errors.insert(peer, e);
221 }
222 }
223
224 if peer_errors.len() == peer_error_threshold {
225 return Err(FederationError::peer_errors(
226 method.clone(),
227 params.params.clone(),
228 peer_errors,
229 ));
230 }
231 }
232 }
233
234 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
235 &self,
236 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
237 method: String,
238 params: ApiRequestErased,
239 ) -> FR {
240 #[cfg(not(target_family = "wasm"))]
244 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
245 #[cfg(target_family = "wasm")]
246 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
247
248 for peer in self.all_peers() {
249 futures.push(Box::pin({
250 let method = &method;
251 let params = ¶ms;
252 async move {
253 let response = util::retry(
254 "api-request-{method}-{peer}",
255 api_networking_backoff(),
256 || async {
257 self.request_single_peer(method.clone(), params.clone(), *peer)
258 .await
259 .inspect_err(|e| {
260 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
261 })
262 .map_err(|e| anyhow!(e.to_string()))
263 },
264 )
265 .await
266 .expect("Number of retries has no limit");
267
268 (*peer, response)
269 }
270 }));
271 }
272
273 loop {
274 let (peer, response) = match futures.next().await {
275 Some(t) => t,
276 None => pending().await,
277 };
278
279 match strategy.process(peer, response) {
280 QueryStep::Retry(peers) => {
281 for peer in peers {
282 futures.push(Box::pin({
283 let method = &method;
284 let params = ¶ms;
285 async move {
286 let response = util::retry(
287 "api-request-{method}-{peer}",
288 api_networking_backoff(),
289 || async {
290 self.request_single_peer(
291 method.clone(),
292 params.clone(),
293 peer,
294 )
295 .await
296 .inspect_err(|err| {
297 if err.is_unusual() {
298 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
299 }
300 })
301 .map_err(|e| anyhow!(e.to_string()))
302 },
303 )
304 .await
305 .expect("Number of retries has no limit");
306
307 (peer, response)
308 }
309 }));
310 }
311 }
312 QueryStep::Success(response) => return response,
313 QueryStep::Failure(e) => {
314 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
315 }
316 QueryStep::Continue => {}
317 }
318 }
319 }
320
321 async fn request_current_consensus<Ret>(
322 &self,
323 method: String,
324 params: ApiRequestErased,
325 ) -> FederationResult<Ret>
326 where
327 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
328 {
329 self.request_with_strategy(
330 ThresholdConsensus::new(self.all_peers().to_num_peers()),
331 method,
332 params,
333 )
334 .await
335 }
336
337 async fn request_current_consensus_retry<Ret>(
338 &self,
339 method: String,
340 params: ApiRequestErased,
341 ) -> Ret
342 where
343 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
344 {
345 self.request_with_strategy_retry(
346 ThresholdConsensus::new(self.all_peers().to_num_peers()),
347 method,
348 params,
349 )
350 .await
351 }
352
353 async fn request_admin<Ret>(
354 &self,
355 method: &str,
356 params: ApiRequestErased,
357 auth: ApiAuth,
358 ) -> FederationResult<Ret>
359 where
360 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
361 {
362 let Some(self_peer_id) = self.self_peer() else {
363 return Err(FederationError::general(
364 method,
365 params,
366 anyhow::format_err!("Admin peer_id not set"),
367 ));
368 };
369
370 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
371 .await
372 }
373
374 async fn request_admin_no_auth<Ret>(
375 &self,
376 method: &str,
377 params: ApiRequestErased,
378 ) -> FederationResult<Ret>
379 where
380 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
381 {
382 let Some(self_peer_id) = self.self_peer() else {
383 return Err(FederationError::general(
384 method,
385 params,
386 anyhow::format_err!("Admin peer_id not set"),
387 ));
388 };
389
390 self.request_single_peer_federation(method.into(), params, self_peer_id)
391 .await
392 }
393}
394
395#[apply(async_trait_maybe_send!)]
396impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
397
398pub trait IModuleFederationApi: IRawFederationApi {}
400
401dyn_newtype_define! {
402 #[derive(Clone)]
403 pub DynModuleApi(Arc<IModuleFederationApi>)
404}
405
406dyn_newtype_define! {
407 #[derive(Clone)]
408 pub DynGlobalApi(Arc<IGlobalFederationApi>)
409}
410
411impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
412 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
413 self.inner.as_ref()
414 }
415}
416
417impl DynGlobalApi {
418 pub async fn new_admin(
419 peer: PeerId,
420 url: SafeUrl,
421 api_secret: &Option<String>,
422 ) -> anyhow::Result<DynGlobalApi> {
423 Ok(GlobalFederationApiWithCache::new(
424 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
425 .await?,
426 )
427 .into())
428 }
429
430 pub async fn from_setup_endpoint(
434 url: SafeUrl,
435 api_secret: &Option<String>,
436 ) -> anyhow::Result<Self> {
437 Self::new_admin(PeerId::from(1024), url, api_secret).await
441 }
442
443 pub async fn from_endpoints(
444 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
445 api_secret: &Option<String>,
446 ) -> anyhow::Result<Self> {
447 Ok(GlobalFederationApiWithCache::new(
448 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
449 )
450 .into())
451 }
452}
453
454#[apply(async_trait_maybe_send!)]
456pub trait IGlobalFederationApi: IRawFederationApi {
457 async fn submit_transaction(
458 &self,
459 tx: Transaction,
460 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
461
462 async fn await_block(
463 &self,
464 block_index: u64,
465 decoders: &ModuleDecoderRegistry,
466 ) -> anyhow::Result<SessionOutcome>;
467
468 async fn get_session_status(
469 &self,
470 block_index: u64,
471 decoders: &ModuleDecoderRegistry,
472 core_api_version: ApiVersion,
473 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
474 ) -> anyhow::Result<SessionStatus>;
475
476 async fn session_count(&self) -> FederationResult<u64>;
477
478 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
479
480 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
481
482 async fn download_backup(
483 &self,
484 id: &secp256k1::PublicKey,
485 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
486
487 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
491
492 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
493
494 async fn set_local_params(
495 &self,
496 name: String,
497 federation_name: Option<String>,
498 auth: ApiAuth,
499 ) -> FederationResult<String>;
500
501 async fn add_peer_connection_info(
502 &self,
503 info: String,
504 auth: ApiAuth,
505 ) -> FederationResult<String>;
506
507 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
509
510 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
518
519 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
524
525 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
529
530 async fn get_verify_config_hash(
533 &self,
534 auth: ApiAuth,
535 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
536
537 async fn verified_configs(
540 &self,
541 auth: ApiAuth,
542 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
543
544 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
550
551 async fn status(&self) -> FederationResult<StatusResponse>;
553
554 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
556
557 async fn guardian_config_backup(&self, auth: ApiAuth)
559 -> FederationResult<GuardianConfigBackup>;
560
561 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
563
564 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
565
566 async fn submit_api_announcement(
568 &self,
569 peer_id: PeerId,
570 announcement: SignedApiAnnouncement,
571 ) -> FederationResult<()>;
572
573 async fn api_announcements(
574 &self,
575 guardian: PeerId,
576 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
577
578 async fn sign_api_announcement(
579 &self,
580 api_url: SafeUrl,
581 auth: ApiAuth,
582 ) -> FederationResult<SignedApiAnnouncement>;
583
584 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
585
586 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
588
589 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
591
592 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
595}
596
597pub fn deserialize_outcome<R>(
598 outcome: &SerdeOutputOutcome,
599 module_decoder: &Decoder,
600) -> OutputOutcomeResult<R>
601where
602 R: OutputOutcome + MaybeSend,
603{
604 let dyn_outcome = outcome
605 .try_into_inner_known_module_kind(module_decoder)
606 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
607
608 let source_instance = dyn_outcome.module_instance_id();
609
610 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
611 let target_type = std::any::type_name::<R>();
612 OutputOutcomeError::ResponseDeserialization(anyhow!(
613 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
614 ))
615 })
616}
617
618#[derive(Debug, Clone)]
619pub struct WebsocketConnector {
620 peers: BTreeMap<PeerId, SafeUrl>,
621 api_secret: Option<String>,
622
623 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
629}
630
631impl WebsocketConnector {
632 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
633 let mut s = Self::new_no_overrides(peers, api_secret);
634
635 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
636 s = s.with_connection_override(k, v);
637 }
638
639 Ok(s)
640 }
641 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
642 self.connection_overrides.insert(peer_id, url);
643 self
644 }
645 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
646 Self {
647 peers,
648 api_secret,
649 connection_overrides: BTreeMap::default(),
650 }
651 }
652}
653
654#[async_trait]
655impl IClientConnector for WebsocketConnector {
656 fn peers(&self) -> BTreeSet<PeerId> {
657 self.peers.keys().copied().collect()
658 }
659
660 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
661 let api_endpoint = match self.connection_overrides.get(&peer_id) {
662 Some(url) => {
663 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
664 url
665 }
666 None => self.peers.get(&peer_id).ok_or_else(|| {
667 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
668 })?,
669 };
670
671 #[cfg(not(target_family = "wasm"))]
672 let mut client = {
673 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
674 let mut root_certs = RootCertStore::empty();
675 root_certs.extend(webpki_roots);
676
677 let tls_cfg = CustomCertStore::builder()
678 .with_root_certificates(root_certs)
679 .with_no_client_auth();
680
681 WsClientBuilder::default()
682 .max_concurrent_requests(u16::MAX as usize)
683 .with_custom_cert_store(tls_cfg)
684 };
685
686 #[cfg(target_family = "wasm")]
687 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
688
689 if let Some(api_secret) = &self.api_secret {
690 #[cfg(not(target_family = "wasm"))]
691 {
692 let mut headers = HeaderMap::new();
695
696 let auth = base64::engine::general_purpose::STANDARD
697 .encode(format!("fedimint:{api_secret}"));
698
699 headers.insert(
700 "Authorization",
701 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
702 );
703
704 client = client.set_headers(headers);
705 }
706 #[cfg(target_family = "wasm")]
707 {
708 let mut url = api_endpoint.clone();
711 url.set_username("fedimint")
712 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
713 url.set_password(Some(&api_secret))
714 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
715
716 let client = client
717 .build(url.as_str())
718 .await
719 .map_err(|err| PeerError::InternalClientError(err.into()))?;
720
721 return Ok(client.into_dyn());
722 }
723 }
724
725 let client = client
726 .build(api_endpoint.as_str())
727 .await
728 .map_err(|err| PeerError::InternalClientError(err.into()))?;
729
730 Ok(client.into_dyn())
731 }
732}
733
734#[cfg(all(feature = "tor", not(target_family = "wasm")))]
735#[derive(Debug, Clone)]
736pub struct TorConnector {
737 peers: BTreeMap<PeerId, SafeUrl>,
738 api_secret: Option<String>,
739}
740
741#[cfg(all(feature = "tor", not(target_family = "wasm")))]
742impl TorConnector {
743 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
744 Self { peers, api_secret }
745 }
746}
747
748#[cfg(all(feature = "tor", not(target_family = "wasm")))]
749#[async_trait]
750impl IClientConnector for TorConnector {
751 fn peers(&self) -> BTreeSet<PeerId> {
752 self.peers.keys().copied().collect()
753 }
754
755 #[allow(clippy::too_many_lines)]
756 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
757 let api_endpoint = self
758 .peers
759 .get(&peer_id)
760 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
761
762 let tor_config = TorClientConfig::default();
763 let tor_client = TorClient::create_bootstrapped(tor_config)
764 .await
765 .map_err(|err| PeerError::InternalClientError(err.into()))?
766 .isolated_client();
767
768 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
769
770 let addr = (
773 api_endpoint
774 .host_str()
775 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
776 api_endpoint
777 .port_or_known_default()
778 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
779 );
780 let tor_addr = TorAddr::from(addr).map_err(|e| {
781 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
782 })?;
783
784 let tor_addr_clone = tor_addr.clone();
785
786 debug!(
787 ?tor_addr,
788 ?addr,
789 "Successfully created `TorAddr` for given address (i.e. host and port)"
790 );
791
792 let anonymized_stream = if api_endpoint.is_onion_address() {
795 let mut stream_prefs = arti_client::StreamPrefs::default();
796 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
797
798 let anonymized_stream = tor_client
799 .connect_with_prefs(tor_addr, &stream_prefs)
800 .await
801 .map_err(|e| PeerError::Connection(e.into()))?;
802
803 debug!(
804 ?tor_addr_clone,
805 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
806 );
807 anonymized_stream
808 } else {
809 let anonymized_stream = tor_client
810 .connect(tor_addr)
811 .await
812 .map_err(|e| PeerError::Connection(e.into()))?;
813
814 debug!(
815 ?tor_addr_clone,
816 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
817 );
818 anonymized_stream
819 };
820
821 let is_tls = match api_endpoint.scheme() {
822 "wss" => true,
823 "ws" => false,
824 unexpected_scheme => {
825 return Err(PeerError::InvalidEndpoint(anyhow!(
826 "Unsupported scheme: {unexpected_scheme}"
827 )));
828 }
829 };
830
831 let tls_connector = if is_tls {
832 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
833 let mut root_certs = RootCertStore::empty();
834 root_certs.extend(webpki_roots);
835
836 let tls_config = TlsClientConfig::builder()
837 .with_root_certificates(root_certs)
838 .with_no_client_auth();
839 let tls_connector = TlsConnector::from(Arc::new(tls_config));
840 Some(tls_connector)
841 } else {
842 None
843 };
844
845 let mut ws_client_builder =
846 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
847
848 if let Some(api_secret) = &self.api_secret {
849 let mut headers = HeaderMap::new();
852
853 let auth =
854 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
855
856 headers.insert(
857 "Authorization",
858 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
859 );
860
861 ws_client_builder = ws_client_builder.set_headers(headers);
862 }
863
864 match tls_connector {
865 None => {
866 let client = ws_client_builder
867 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
868 .await
869 .map_err(|e| PeerError::Connection(e.into()))?;
870
871 Ok(client.into_dyn())
872 }
873 Some(tls_connector) => {
874 let host = api_endpoint
875 .host_str()
876 .map(ToOwned::to_owned)
877 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
878
879 let server_name = rustls_pki_types::ServerName::try_from(host)
882 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
883
884 let anonymized_tls_stream = tls_connector
885 .connect(server_name, anonymized_stream)
886 .await
887 .map_err(|e| PeerError::Connection(e.into()))?;
888
889 let client = ws_client_builder
890 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
891 .await
892 .map_err(|e| PeerError::Connection(e.into()))?;
893
894 Ok(client.into_dyn())
895 }
896 }
897 }
898}
899
900fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
901 match jsonrpc_error {
902 JsonRpcClientError::Call(error_object) => {
903 let error = anyhow!(error_object.message().to_owned());
904 match ErrorCode::from(error_object.code()) {
905 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
906 PeerError::InvalidRequest(error)
907 }
908 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
909 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
910 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
911 PeerError::ServerError(error)
912 }
913 }
914 }
915 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
916 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
917 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
918 JsonRpcClientError::InvalidSubscriptionId => {
919 PeerError::Transport(anyhow!("Invalid subscription id"))
920 }
921 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
922 PeerError::InvalidRequest(anyhow!(invalid_request_id))
923 }
924 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
925 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
926 JsonRpcClientError::HttpNotImplemented => {
927 PeerError::ServerError(anyhow!("Http not implemented"))
928 }
929 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
930 PeerError::InvalidRequest(anyhow!(empty_batch_request))
931 }
932 JsonRpcClientError::RegisterMethod(register_method_error) => {
933 PeerError::InvalidResponse(anyhow!(register_method_error))
934 }
935 }
936}
937
938#[async_trait]
939impl IClientConnection for WsClient {
940 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
941 let method = match method {
942 ApiMethod::Core(method) => method,
943 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
944 };
945
946 Ok(ClientT::request(self, &method, [request.to_json()])
947 .await
948 .map_err(jsonrpc_error_to_peer_error)?)
949 }
950
951 async fn await_disconnection(&self) {
952 self.on_disconnect().await;
953 }
954}
955
956pub type DynClientConnector = Arc<dyn IClientConnector>;
957
958#[async_trait]
961pub trait IClientConnector: Send + Sync + 'static {
962 fn peers(&self) -> BTreeSet<PeerId>;
963
964 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
965
966 fn into_dyn(self) -> DynClientConnector
967 where
968 Self: Sized,
969 {
970 Arc::new(self)
971 }
972}
973
974pub type DynClientConnection = Arc<dyn IClientConnection>;
975
976#[async_trait]
977pub trait IClientConnection: Debug + Send + Sync + 'static {
978 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
979
980 async fn await_disconnection(&self);
981
982 fn into_dyn(self) -> DynClientConnection
983 where
984 Self: Sized,
985 {
986 Arc::new(self)
987 }
988}
989
990#[derive(Clone, Debug)]
991pub struct ReconnectFederationApi {
992 peers: BTreeSet<PeerId>,
993 admin_id: Option<PeerId>,
994 module_id: Option<ModuleInstanceId>,
995 connections: ReconnectClientConnections,
996}
997
998impl ReconnectFederationApi {
999 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1000 Self {
1001 peers: connector.peers(),
1002 admin_id,
1003 module_id: None,
1004 connections: ReconnectClientConnections::new(connector),
1005 }
1006 }
1007
1008 pub async fn new_admin(
1009 peer: PeerId,
1010 url: SafeUrl,
1011 api_secret: &Option<String>,
1012 ) -> anyhow::Result<Self> {
1013 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1014 }
1015
1016 pub async fn from_endpoints(
1017 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1018 api_secret: &Option<String>,
1019 admin_id: Option<PeerId>,
1020 ) -> anyhow::Result<Self> {
1021 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1022
1023 let scheme = peers
1024 .values()
1025 .next()
1026 .expect("Federation api has been initialized with no peers")
1027 .scheme();
1028
1029 let connector = match scheme {
1030 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1031 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1032 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1033 "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1034 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1035 };
1036
1037 Ok(ReconnectFederationApi::new(&connector, admin_id))
1038 }
1039}
1040
1041impl IModuleFederationApi for ReconnectFederationApi {}
1042
1043#[apply(async_trait_maybe_send!)]
1044impl IRawFederationApi for ReconnectFederationApi {
1045 fn all_peers(&self) -> &BTreeSet<PeerId> {
1046 &self.peers
1047 }
1048
1049 fn self_peer(&self) -> Option<PeerId> {
1050 self.admin_id
1051 }
1052
1053 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1054 ReconnectFederationApi {
1055 peers: self.peers.clone(),
1056 admin_id: self.admin_id,
1057 module_id: Some(id),
1058 connections: self.connections.clone(),
1059 }
1060 .into()
1061 }
1062
1063 #[instrument(
1064 target = LOG_NET_API,
1065 skip_all,
1066 fields(
1067 peer_id = %peer_id,
1068 method = %method,
1069 params = %params.params,
1070 )
1071 )]
1072 async fn request_raw(
1073 &self,
1074 peer_id: PeerId,
1075 method: &str,
1076 params: &ApiRequestErased,
1077 ) -> PeerResult<Value> {
1078 let method = match self.module_id {
1079 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1080 None => ApiMethod::Core(method.to_string()),
1081 };
1082
1083 self.connections
1084 .request(peer_id, method, params.clone())
1085 .await
1086 }
1087}
1088
1089#[derive(Clone, Debug)]
1090pub struct ReconnectClientConnections {
1091 connections: BTreeMap<PeerId, ClientConnection>,
1092}
1093
1094impl ReconnectClientConnections {
1095 pub fn new(connector: &DynClientConnector) -> Self {
1096 ReconnectClientConnections {
1097 connections: connector
1098 .peers()
1099 .into_iter()
1100 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1101 .collect(),
1102 }
1103 }
1104
1105 async fn request(
1106 &self,
1107 peer: PeerId,
1108 method: ApiMethod,
1109 request: ApiRequestErased,
1110 ) -> PeerResult<Value> {
1111 trace!(target: LOG_NET_API, %method, "Api request");
1112 let res = self
1113 .connections
1114 .get(&peer)
1115 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1116 .connection()
1117 .await
1118 .context("Failed to connect to peer")
1119 .map_err(PeerError::Connection)?
1120 .request(method.clone(), request)
1121 .await;
1122
1123 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1124
1125 res
1126 }
1127}
1128
1129#[derive(Clone, Debug)]
1130struct ClientConnection {
1131 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1132}
1133
1134impl ClientConnection {
1135 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1136 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1137
1138 fedimint_core::task::spawn(
1139 "peer-api-connection",
1140 async move {
1141 let mut backoff = api_networking_backoff();
1142
1143 while let Ok(sender) = receiver.recv().await {
1144 let mut senders = vec![sender];
1145
1146 while let Ok(sender) = receiver.try_recv() {
1149 senders.push(sender);
1150 }
1151
1152 match connector.connect(peer).await {
1153 Ok(connection) => {
1154 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1155
1156 for sender in senders {
1157 sender.send(connection.clone()).ok();
1158 }
1159
1160 loop {
1161 tokio::select! {
1162 sender = receiver.recv() => {
1163 match sender.ok() {
1164 Some(sender) => sender.send(connection.clone()).ok(),
1165 None => break,
1166 };
1167 }
1168 () = connection.await_disconnection() => break,
1169 }
1170 }
1171
1172 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1173
1174 backoff = api_networking_backoff();
1175 }
1176 Err(e) => {
1177 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1178
1179 fedimint_core::task::sleep(
1180 backoff.next().expect("No limit to the number of retries"),
1181 )
1182 .await;
1183 }
1184 }
1185 }
1186
1187 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1188 }
1189 .instrument(trace_span!("peer-api-connection", ?peer)),
1190 );
1191
1192 ClientConnection { sender }
1193 }
1194
1195 async fn connection(&self) -> Option<DynClientConnection> {
1196 let (sender, receiver) = oneshot::channel();
1197
1198 self.sender
1199 .send(sender)
1200 .await
1201 .inspect_err(|err| {
1202 warn!(
1203 target: LOG_CLIENT_NET_API,
1204 err = %err.fmt_compact(),
1205 "Api connection request channel closed unexpectedly"
1206 );
1207 })
1208 .ok()?;
1209
1210 receiver.await.ok()
1211 }
1212}
1213
1214mod iroh {
1215 use std::collections::{BTreeMap, BTreeSet};
1216 use std::str::FromStr;
1217
1218 use anyhow::Context;
1219 use async_trait::async_trait;
1220 use fedimint_core::PeerId;
1221 use fedimint_core::envs::parse_kv_list_from_env;
1222 use fedimint_core::module::{
1223 ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
1224 };
1225 use fedimint_core::util::SafeUrl;
1226 use fedimint_logging::LOG_NET_IROH;
1227 use iroh::endpoint::Connection;
1228 use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
1229 use iroh_base::ticket::NodeTicket;
1230 use serde_json::Value;
1231 use tracing::{debug, trace, warn};
1232
1233 use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
1234
1235 #[derive(Debug, Clone)]
1236 pub struct IrohConnector {
1237 node_ids: BTreeMap<PeerId, NodeId>,
1238 endpoint: Endpoint,
1239
1240 pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
1246 }
1247
1248 impl IrohConnector {
1249 pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1250 const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
1251 warn!(target: LOG_NET_IROH, "Iroh support is experimental");
1252 let mut s = Self::new_no_overrides(peers).await?;
1253
1254 for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
1255 s = s.with_connection_override(k, v.into());
1256 }
1257
1258 Ok(s)
1259 }
1260
1261 pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1262 let node_ids = peers
1263 .into_iter()
1264 .map(|(peer, url)| {
1265 let host = url.host_str().context("Url is missing host")?;
1266
1267 let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
1268
1269 Ok((peer, node_id))
1270 })
1271 .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
1272
1273 let builder = Endpoint::builder().discovery_n0();
1274 #[cfg(not(target_family = "wasm"))]
1275 let builder = builder.discovery_dht();
1276 let endpoint = builder.bind().await?;
1277 debug!(
1278 target: LOG_NET_IROH,
1279 node_id = %endpoint.node_id(),
1280 node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
1281 "Iroh api client endpoint"
1282 );
1283
1284 Ok(Self {
1285 node_ids,
1286 endpoint,
1287 connection_overrides: BTreeMap::new(),
1288 })
1289 }
1290
1291 pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
1292 self.connection_overrides.insert(node, addr);
1293 self
1294 }
1295 }
1296
1297 #[async_trait]
1298 impl IClientConnector for IrohConnector {
1299 fn peers(&self) -> BTreeSet<PeerId> {
1300 self.node_ids.keys().copied().collect()
1301 }
1302
1303 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
1304 let node_id = *self
1305 .node_ids
1306 .get(&peer_id)
1307 .ok_or(PeerError::InvalidPeerId { peer_id })?;
1308
1309 let connection = match self.connection_overrides.get(&node_id) {
1310 Some(node_addr) => {
1311 trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
1312 self.endpoint
1313 .connect(node_addr.clone(), FEDIMINT_API_ALPN)
1314 .await
1315 }
1316 None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
1317 }.map_err(PeerError::Connection)?;
1318
1319 Ok(connection.into_dyn())
1320 }
1321 }
1322
1323 #[async_trait]
1324 impl IClientConnection for Connection {
1325 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1326 let json = serde_json::to_vec(&IrohApiRequest { method, request })
1327 .expect("Serialization to vec can't fail");
1328
1329 let (mut sink, mut stream) = self
1330 .open_bi()
1331 .await
1332 .map_err(|e| PeerError::Transport(e.into()))?;
1333
1334 sink.write_all(&json)
1335 .await
1336 .map_err(|e| PeerError::Transport(e.into()))?;
1337
1338 sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
1339
1340 let response = stream
1341 .read_to_end(1_000_000)
1342 .await
1343 .map_err(|e| PeerError::Transport(e.into()))?;
1344
1345 let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
1347 .map_err(|e| PeerError::InvalidResponse(e.into()))?;
1348
1349 response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
1350 }
1351
1352 async fn await_disconnection(&self) {
1353 self.closed().await;
1354 }
1355 }
1356}
1357
1358#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1360pub struct LegacyFederationStatus {
1361 pub session_count: u64,
1362 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1363 pub peers_online: u64,
1364 pub peers_offline: u64,
1365 pub peers_flagged: u64,
1368 pub scheduled_shutdown: Option<u64>,
1369}
1370
1371#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1372pub struct LegacyPeerStatus {
1373 pub last_contribution: Option<u64>,
1374 pub connection_status: LegacyP2PConnectionStatus,
1375 pub flagged: bool,
1378}
1379
1380#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1381#[serde(rename_all = "snake_case")]
1382pub enum LegacyP2PConnectionStatus {
1383 #[default]
1384 Disconnected,
1385 Connected,
1386}
1387
1388#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1389pub struct StatusResponse {
1390 pub server: ServerStatusLegacy,
1391 pub federation: Option<LegacyFederationStatus>,
1392}
1393
1394#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1397pub struct GuardianConfigBackup {
1398 #[serde(with = "fedimint_core::hex::serde")]
1399 pub tar_archive_bytes: Vec<u8>,
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404 use std::str::FromStr as _;
1405
1406 use fedimint_core::config::FederationId;
1407 use fedimint_core::invite_code::InviteCode;
1408
1409 use super::*;
1410
1411 #[test]
1412 fn converts_invite_code() {
1413 let connect = InviteCode::new(
1414 "ws://test1".parse().unwrap(),
1415 PeerId::from(1),
1416 FederationId::dummy(),
1417 Some("api_secret".into()),
1418 );
1419
1420 let bech32 = connect.to_string();
1421 let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1422 assert_eq!(connect, connect_parsed);
1423
1424 let json = serde_json::to_string(&connect).unwrap();
1425 let connect_as_string: String = serde_json::from_str(&json).unwrap();
1426 assert_eq!(connect_as_string, bech32);
1427 let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1428 assert_eq!(connect_parsed_json, connect_parsed);
1429 }
1430
1431 #[test]
1432 fn creates_essential_guardians_invite_code() {
1433 let mut peer_to_url_map = BTreeMap::new();
1434 peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1435 peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1436 peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1437 peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1438 let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1439
1440 let code =
1441 InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1442
1443 assert_eq!(FederationId::dummy(), code.federation_id());
1444
1445 let expected_map: BTreeMap<PeerId, SafeUrl> =
1446 peer_to_url_map.into_iter().take(max_size).collect();
1447 assert_eq!(expected_map, code.peers());
1448 }
1449}