1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::panic;
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{
24 GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
25};
26use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
27use fedimint_core::core::backup::SignedBackupRequest;
28use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
29use fedimint_core::encoding::{Decodable, Encodable};
30use fedimint_core::envs::{
31 FM_IROH_DNS_ENV, FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env,
32};
33use fedimint_core::invite_code::InviteCode;
34use fedimint_core::module::audit::AuditSummary;
35use fedimint_core::module::registry::ModuleDecoderRegistry;
36use fedimint_core::module::{
37 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
38};
39use fedimint_core::net::api_announcement::SignedApiAnnouncement;
40use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
41use fedimint_core::task::{MaybeSend, MaybeSync};
42use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
43use fedimint_core::util::backoff_util::api_networking_backoff;
44use fedimint_core::util::{FmtCompact as _, SafeUrl};
45use fedimint_core::{
46 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
47};
48use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
49use futures::channel::oneshot;
50use futures::future::pending;
51use futures::stream::FuturesUnordered;
52use futures::{Future, StreamExt};
53use global_api::with_cache::GlobalFederationApiWithCache;
54use jsonrpsee_core::DeserializeOwned;
55use jsonrpsee_core::client::ClientT;
56pub use jsonrpsee_core::client::Error as JsonRpcClientError;
57use jsonrpsee_types::ErrorCode;
58#[cfg(target_family = "wasm")]
59use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
60#[cfg(not(target_family = "wasm"))]
61use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
62#[cfg(not(target_family = "wasm"))]
63use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
64use serde::{Deserialize, Serialize};
65use serde_json::Value;
66#[cfg(not(target_family = "wasm"))]
67use tokio_rustls::rustls::RootCertStore;
68#[cfg(all(feature = "tor", not(target_family = "wasm")))]
69use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
70use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
71
72use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
73
74pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
75
76pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
77 ApiVersion { major: 0, minor: 1 };
78
79pub type PeerResult<T> = Result<T, PeerError>;
80pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
81pub type FederationResult<T> = Result<T, FederationError>;
82pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
83
84pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
85
86#[cfg(not(target_family = "wasm"))]
87fn install_crypto_provider() {
88 let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
95pub struct ApiVersionSet {
96 pub core: ApiVersion,
97 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
98}
99
100#[apply(async_trait_maybe_send!)]
102pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
103 fn all_peers(&self) -> &BTreeSet<PeerId>;
111
112 fn self_peer(&self) -> Option<PeerId>;
117
118 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
119
120 async fn request_raw(
122 &self,
123 peer_id: PeerId,
124 method: &str,
125 params: &ApiRequestErased,
126 ) -> PeerResult<Value>;
127}
128
129#[apply(async_trait_maybe_send!)]
132pub trait FederationApiExt: IRawFederationApi {
133 async fn request_single_peer<Ret>(
134 &self,
135 method: String,
136 params: ApiRequestErased,
137 peer: PeerId,
138 ) -> PeerResult<Ret>
139 where
140 Ret: DeserializeOwned,
141 {
142 self.request_raw(peer, &method, ¶ms)
143 .await
144 .and_then(|v| {
145 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
146 })
147 }
148
149 async fn request_single_peer_federation<FedRet>(
150 &self,
151 method: String,
152 params: ApiRequestErased,
153 peer_id: PeerId,
154 ) -> FederationResult<FedRet>
155 where
156 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
157 {
158 self.request_raw(peer_id, &method, ¶ms)
159 .await
160 .and_then(|v| {
161 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
162 })
163 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
164 }
165
166 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
169 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
170 &self,
171 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
172 method: String,
173 params: ApiRequestErased,
174 ) -> FederationResult<FR> {
175 #[cfg(not(target_family = "wasm"))]
179 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
180 #[cfg(target_family = "wasm")]
181 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
182
183 for peer in self.all_peers() {
184 futures.push(Box::pin({
185 let method = &method;
186 let params = ¶ms;
187 async move {
188 let result = self
189 .request_single_peer(method.clone(), params.clone(), *peer)
190 .await;
191
192 (*peer, result)
193 }
194 }));
195 }
196
197 let mut peer_errors = BTreeMap::new();
198 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
199
200 loop {
201 let (peer, result) = futures
202 .next()
203 .await
204 .expect("Query strategy ran out of peers to query without returning a result");
205
206 match result {
207 Ok(response) => match strategy.process(peer, response) {
208 QueryStep::Retry(peers) => {
209 for peer in peers {
210 futures.push(Box::pin({
211 let method = &method;
212 let params = ¶ms;
213 async move {
214 let result = self
215 .request_single_peer(method.clone(), params.clone(), peer)
216 .await;
217
218 (peer, result)
219 }
220 }));
221 }
222 }
223 QueryStep::Success(response) => return Ok(response),
224 QueryStep::Failure(e) => {
225 peer_errors.insert(peer, e);
226 }
227 QueryStep::Continue => {}
228 },
229 Err(e) => {
230 e.report_if_unusual(peer, "RequestWithStrategy");
231 peer_errors.insert(peer, e);
232 }
233 }
234
235 if peer_errors.len() == peer_error_threshold {
236 return Err(FederationError::peer_errors(
237 method.clone(),
238 params.params.clone(),
239 peer_errors,
240 ));
241 }
242 }
243 }
244
245 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
246 &self,
247 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
248 method: String,
249 params: ApiRequestErased,
250 ) -> FR {
251 #[cfg(not(target_family = "wasm"))]
255 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
256 #[cfg(target_family = "wasm")]
257 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
258
259 for peer in self.all_peers() {
260 futures.push(Box::pin({
261 let method = &method;
262 let params = ¶ms;
263 async move {
264 let response = util::retry(
265 format!("api-request-{method}-{peer}"),
266 api_networking_backoff(),
267 || async {
268 self.request_single_peer(method.clone(), params.clone(), *peer)
269 .await
270 .inspect_err(|e| {
271 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
272 })
273 .map_err(|e| anyhow!(e.to_string()))
274 },
275 )
276 .await
277 .expect("Number of retries has no limit");
278
279 (*peer, response)
280 }
281 }));
282 }
283
284 loop {
285 let (peer, response) = match futures.next().await {
286 Some(t) => t,
287 None => pending().await,
288 };
289
290 match strategy.process(peer, response) {
291 QueryStep::Retry(peers) => {
292 for peer in peers {
293 futures.push(Box::pin({
294 let method = &method;
295 let params = ¶ms;
296 async move {
297 let response = util::retry(
298 format!("api-request-{method}-{peer}"),
299 api_networking_backoff(),
300 || async {
301 self.request_single_peer(
302 method.clone(),
303 params.clone(),
304 peer,
305 )
306 .await
307 .inspect_err(|err| {
308 if err.is_unusual() {
309 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
310 }
311 })
312 .map_err(|e| anyhow!(e.to_string()))
313 },
314 )
315 .await
316 .expect("Number of retries has no limit");
317
318 (peer, response)
319 }
320 }));
321 }
322 }
323 QueryStep::Success(response) => return response,
324 QueryStep::Failure(e) => {
325 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
326 }
327 QueryStep::Continue => {}
328 }
329 }
330 }
331
332 async fn request_current_consensus<Ret>(
333 &self,
334 method: String,
335 params: ApiRequestErased,
336 ) -> FederationResult<Ret>
337 where
338 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
339 {
340 self.request_with_strategy(
341 ThresholdConsensus::new(self.all_peers().to_num_peers()),
342 method,
343 params,
344 )
345 .await
346 }
347
348 async fn request_current_consensus_retry<Ret>(
349 &self,
350 method: String,
351 params: ApiRequestErased,
352 ) -> Ret
353 where
354 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
355 {
356 self.request_with_strategy_retry(
357 ThresholdConsensus::new(self.all_peers().to_num_peers()),
358 method,
359 params,
360 )
361 .await
362 }
363
364 async fn request_admin<Ret>(
365 &self,
366 method: &str,
367 params: ApiRequestErased,
368 auth: ApiAuth,
369 ) -> FederationResult<Ret>
370 where
371 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
372 {
373 let Some(self_peer_id) = self.self_peer() else {
374 return Err(FederationError::general(
375 method,
376 params,
377 anyhow::format_err!("Admin peer_id not set"),
378 ));
379 };
380
381 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
382 .await
383 }
384
385 async fn request_admin_no_auth<Ret>(
386 &self,
387 method: &str,
388 params: ApiRequestErased,
389 ) -> FederationResult<Ret>
390 where
391 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
392 {
393 let Some(self_peer_id) = self.self_peer() else {
394 return Err(FederationError::general(
395 method,
396 params,
397 anyhow::format_err!("Admin peer_id not set"),
398 ));
399 };
400
401 self.request_single_peer_federation(method.into(), params, self_peer_id)
402 .await
403 }
404}
405
406#[apply(async_trait_maybe_send!)]
407impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
408
409pub trait IModuleFederationApi: IRawFederationApi {}
411
412dyn_newtype_define! {
413 #[derive(Clone)]
414 pub DynModuleApi(Arc<IModuleFederationApi>)
415}
416
417dyn_newtype_define! {
418 #[derive(Clone)]
419 pub DynGlobalApi(Arc<IGlobalFederationApi>)
420}
421
422impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
423 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
424 self.inner.as_ref()
425 }
426}
427
428impl DynGlobalApi {
429 pub async fn new_admin(
430 peer: PeerId,
431 url: SafeUrl,
432 api_secret: &Option<String>,
433 ) -> anyhow::Result<DynGlobalApi> {
434 Ok(GlobalFederationApiWithCache::new(
435 ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
436 .await?,
437 )
438 .into())
439 }
440
441 pub async fn from_setup_endpoint(
445 url: SafeUrl,
446 api_secret: &Option<String>,
447 ) -> anyhow::Result<Self> {
448 Self::new_admin(PeerId::from(1024), url, api_secret).await
452 }
453
454 pub async fn from_endpoints(
455 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
456 api_secret: &Option<String>,
457 ) -> anyhow::Result<Self> {
458 Ok(GlobalFederationApiWithCache::new(
459 ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
460 )
461 .into())
462 }
463}
464
465#[apply(async_trait_maybe_send!)]
467pub trait IGlobalFederationApi: IRawFederationApi {
468 async fn submit_transaction(
469 &self,
470 tx: Transaction,
471 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
472
473 async fn await_block(
474 &self,
475 block_index: u64,
476 decoders: &ModuleDecoderRegistry,
477 ) -> anyhow::Result<SessionOutcome>;
478
479 async fn get_session_status(
480 &self,
481 block_index: u64,
482 decoders: &ModuleDecoderRegistry,
483 core_api_version: ApiVersion,
484 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
485 ) -> anyhow::Result<SessionStatus>;
486
487 async fn session_count(&self) -> FederationResult<u64>;
488
489 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
490
491 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
492
493 async fn download_backup(
494 &self,
495 id: &secp256k1::PublicKey,
496 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
497
498 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
502
503 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
504
505 async fn set_local_params(
506 &self,
507 name: String,
508 federation_name: Option<String>,
509 auth: ApiAuth,
510 ) -> FederationResult<String>;
511
512 async fn add_peer_connection_info(
513 &self,
514 info: String,
515 auth: ApiAuth,
516 ) -> FederationResult<String>;
517
518 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
520
521 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
523
524 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
532
533 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
538
539 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
543
544 async fn get_verify_config_hash(
547 &self,
548 auth: ApiAuth,
549 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
550
551 async fn verified_configs(
554 &self,
555 auth: ApiAuth,
556 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
557
558 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
564
565 async fn status(&self) -> FederationResult<StatusResponse>;
567
568 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
570
571 async fn guardian_config_backup(&self, auth: ApiAuth)
573 -> FederationResult<GuardianConfigBackup>;
574
575 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
577
578 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
579
580 async fn submit_api_announcement(
582 &self,
583 peer_id: PeerId,
584 announcement: SignedApiAnnouncement,
585 ) -> FederationResult<()>;
586
587 async fn api_announcements(
588 &self,
589 guardian: PeerId,
590 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
591
592 async fn sign_api_announcement(
593 &self,
594 api_url: SafeUrl,
595 auth: ApiAuth,
596 ) -> FederationResult<SignedApiAnnouncement>;
597
598 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
599
600 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
602
603 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
605
606 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
609}
610
611pub fn deserialize_outcome<R>(
612 outcome: &SerdeOutputOutcome,
613 module_decoder: &Decoder,
614) -> OutputOutcomeResult<R>
615where
616 R: OutputOutcome + MaybeSend,
617{
618 let dyn_outcome = outcome
619 .try_into_inner_known_module_kind(module_decoder)
620 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
621
622 let source_instance = dyn_outcome.module_instance_id();
623
624 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
625 let target_type = std::any::type_name::<R>();
626 OutputOutcomeError::ResponseDeserialization(anyhow!(
627 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
628 ))
629 })
630}
631
632#[derive(Debug, Clone)]
633pub struct WebsocketConnector {
634 peers: BTreeMap<PeerId, SafeUrl>,
635 api_secret: Option<String>,
636
637 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
643}
644
645impl WebsocketConnector {
646 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
647 let mut s = Self::new_no_overrides(peers, api_secret);
648
649 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
650 s = s.with_connection_override(k, v);
651 }
652
653 Ok(s)
654 }
655 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
656 self.connection_overrides.insert(peer_id, url);
657 self
658 }
659 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
660 Self {
661 peers,
662 api_secret,
663 connection_overrides: BTreeMap::default(),
664 }
665 }
666}
667
668#[async_trait]
669impl IClientConnector for WebsocketConnector {
670 fn peers(&self) -> BTreeSet<PeerId> {
671 self.peers.keys().copied().collect()
672 }
673
674 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
675 let api_endpoint = match self.connection_overrides.get(&peer_id) {
676 Some(url) => {
677 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
678 url
679 }
680 None => self.peers.get(&peer_id).ok_or_else(|| {
681 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
682 })?,
683 };
684
685 #[cfg(not(target_family = "wasm"))]
686 let mut client = {
687 install_crypto_provider();
688 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
689 let mut root_certs = RootCertStore::empty();
690 root_certs.extend(webpki_roots);
691
692 let tls_cfg = CustomCertStore::builder()
693 .with_root_certificates(root_certs)
694 .with_no_client_auth();
695
696 WsClientBuilder::default()
697 .max_concurrent_requests(u16::MAX as usize)
698 .with_custom_cert_store(tls_cfg)
699 };
700
701 #[cfg(target_family = "wasm")]
702 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
703
704 if let Some(api_secret) = &self.api_secret {
705 #[cfg(not(target_family = "wasm"))]
706 {
707 let mut headers = HeaderMap::new();
710
711 let auth = base64::engine::general_purpose::STANDARD
712 .encode(format!("fedimint:{api_secret}"));
713
714 headers.insert(
715 "Authorization",
716 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
717 );
718
719 client = client.set_headers(headers);
720 }
721 #[cfg(target_family = "wasm")]
722 {
723 let mut url = api_endpoint.clone();
726 url.set_username("fedimint")
727 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
728 url.set_password(Some(&api_secret))
729 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
730
731 let client = client
732 .build(url.as_str())
733 .await
734 .map_err(|err| PeerError::InternalClientError(err.into()))?;
735
736 return Ok(client.into_dyn());
737 }
738 }
739
740 let client = client
741 .build(api_endpoint.as_str())
742 .await
743 .map_err(|err| PeerError::InternalClientError(err.into()))?;
744
745 Ok(client.into_dyn())
746 }
747}
748
749#[cfg(all(feature = "tor", not(target_family = "wasm")))]
750#[derive(Debug, Clone)]
751pub struct TorConnector {
752 peers: BTreeMap<PeerId, SafeUrl>,
753 api_secret: Option<String>,
754}
755
756#[cfg(all(feature = "tor", not(target_family = "wasm")))]
757impl TorConnector {
758 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
759 Self { peers, api_secret }
760 }
761}
762
763#[cfg(all(feature = "tor", not(target_family = "wasm")))]
764#[async_trait]
765impl IClientConnector for TorConnector {
766 fn peers(&self) -> BTreeSet<PeerId> {
767 self.peers.keys().copied().collect()
768 }
769
770 #[allow(clippy::too_many_lines)]
771 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
772 let api_endpoint = self
773 .peers
774 .get(&peer_id)
775 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
776
777 install_crypto_provider();
778
779 let tor_config = TorClientConfig::default();
780 let tor_client = TorClient::create_bootstrapped(tor_config)
781 .await
782 .map_err(|err| PeerError::InternalClientError(err.into()))?
783 .isolated_client();
784
785 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
786
787 let addr = (
790 api_endpoint
791 .host_str()
792 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
793 api_endpoint
794 .port_or_known_default()
795 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
796 );
797 let tor_addr = TorAddr::from(addr).map_err(|e| {
798 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
799 })?;
800
801 let tor_addr_clone = tor_addr.clone();
802
803 debug!(
804 ?tor_addr,
805 ?addr,
806 "Successfully created `TorAddr` for given address (i.e. host and port)"
807 );
808
809 let anonymized_stream = if api_endpoint.is_onion_address() {
812 let mut stream_prefs = arti_client::StreamPrefs::default();
813 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
814
815 let anonymized_stream = tor_client
816 .connect_with_prefs(tor_addr, &stream_prefs)
817 .await
818 .map_err(|e| PeerError::Connection(e.into()))?;
819
820 debug!(
821 ?tor_addr_clone,
822 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
823 );
824 anonymized_stream
825 } else {
826 let anonymized_stream = tor_client
827 .connect(tor_addr)
828 .await
829 .map_err(|e| PeerError::Connection(e.into()))?;
830
831 debug!(
832 ?tor_addr_clone,
833 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
834 );
835 anonymized_stream
836 };
837
838 let is_tls = match api_endpoint.scheme() {
839 "wss" => true,
840 "ws" => false,
841 unexpected_scheme => {
842 return Err(PeerError::InvalidEndpoint(anyhow!(
843 "Unsupported scheme: {unexpected_scheme}"
844 )));
845 }
846 };
847
848 let tls_connector = if is_tls {
849 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
850 let mut root_certs = RootCertStore::empty();
851 root_certs.extend(webpki_roots);
852
853 let tls_config = TlsClientConfig::builder()
854 .with_root_certificates(root_certs)
855 .with_no_client_auth();
856 let tls_connector = TlsConnector::from(Arc::new(tls_config));
857 Some(tls_connector)
858 } else {
859 None
860 };
861
862 let mut ws_client_builder =
863 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
864
865 if let Some(api_secret) = &self.api_secret {
866 let mut headers = HeaderMap::new();
869
870 let auth =
871 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
872
873 headers.insert(
874 "Authorization",
875 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
876 );
877
878 ws_client_builder = ws_client_builder.set_headers(headers);
879 }
880
881 match tls_connector {
882 None => {
883 let client = ws_client_builder
884 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
885 .await
886 .map_err(|e| PeerError::Connection(e.into()))?;
887
888 Ok(client.into_dyn())
889 }
890 Some(tls_connector) => {
891 let host = api_endpoint
892 .host_str()
893 .map(ToOwned::to_owned)
894 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
895
896 let server_name = rustls_pki_types::ServerName::try_from(host)
899 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
900
901 let anonymized_tls_stream = tls_connector
902 .connect(server_name, anonymized_stream)
903 .await
904 .map_err(|e| PeerError::Connection(e.into()))?;
905
906 let client = ws_client_builder
907 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
908 .await
909 .map_err(|e| PeerError::Connection(e.into()))?;
910
911 Ok(client.into_dyn())
912 }
913 }
914 }
915}
916
917fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
918 match jsonrpc_error {
919 JsonRpcClientError::Call(error_object) => {
920 let error = anyhow!(error_object.message().to_owned());
921 match ErrorCode::from(error_object.code()) {
922 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
923 PeerError::InvalidRequest(error)
924 }
925 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
926 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
927 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
928 PeerError::ServerError(error)
929 }
930 }
931 }
932 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
933 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
934 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
935 JsonRpcClientError::InvalidSubscriptionId => {
936 PeerError::Transport(anyhow!("Invalid subscription id"))
937 }
938 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
939 PeerError::InvalidRequest(anyhow!(invalid_request_id))
940 }
941 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
942 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
943 JsonRpcClientError::HttpNotImplemented => {
944 PeerError::ServerError(anyhow!("Http not implemented"))
945 }
946 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
947 PeerError::InvalidRequest(anyhow!(empty_batch_request))
948 }
949 JsonRpcClientError::RegisterMethod(register_method_error) => {
950 PeerError::InvalidResponse(anyhow!(register_method_error))
951 }
952 }
953}
954
955#[async_trait]
956impl IClientConnection for WsClient {
957 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
958 let method = match method {
959 ApiMethod::Core(method) => method,
960 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
961 };
962
963 Ok(ClientT::request(self, &method, [request.to_json()])
964 .await
965 .map_err(jsonrpc_error_to_peer_error)?)
966 }
967
968 async fn await_disconnection(&self) {
969 self.on_disconnect().await;
970 }
971}
972
973pub type DynClientConnector = Arc<dyn IClientConnector>;
974
975#[async_trait]
978pub trait IClientConnector: Send + Sync + 'static {
979 fn peers(&self) -> BTreeSet<PeerId>;
980
981 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
982
983 fn into_dyn(self) -> DynClientConnector
984 where
985 Self: Sized,
986 {
987 Arc::new(self)
988 }
989}
990
991pub type DynClientConnection = Arc<dyn IClientConnection>;
992
993#[async_trait]
994pub trait IClientConnection: Debug + Send + Sync + 'static {
995 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
996
997 async fn await_disconnection(&self);
998
999 fn into_dyn(self) -> DynClientConnection
1000 where
1001 Self: Sized,
1002 {
1003 Arc::new(self)
1004 }
1005}
1006
1007#[derive(Clone, Debug)]
1008pub struct ReconnectFederationApi {
1009 peers: BTreeSet<PeerId>,
1010 admin_id: Option<PeerId>,
1011 module_id: Option<ModuleInstanceId>,
1012 connections: ReconnectClientConnections,
1013}
1014
1015impl ReconnectFederationApi {
1016 fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1017 Self {
1018 peers: connector.peers(),
1019 admin_id,
1020 module_id: None,
1021 connections: ReconnectClientConnections::new(connector),
1022 }
1023 }
1024
1025 pub async fn new_admin(
1026 peer: PeerId,
1027 url: SafeUrl,
1028 api_secret: &Option<String>,
1029 ) -> anyhow::Result<Self> {
1030 Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1031 }
1032
1033 pub async fn from_endpoints(
1034 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1035 api_secret: &Option<String>,
1036 admin_id: Option<PeerId>,
1037 ) -> anyhow::Result<Self> {
1038 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1039
1040 let scheme = peers
1041 .values()
1042 .next()
1043 .expect("Federation api has been initialized with no peers")
1044 .scheme();
1045
1046 let connector = match scheme {
1047 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1048 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1049 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1050 "iroh" => {
1051 let iroh_dns = std::env::var(FM_IROH_DNS_ENV)
1052 .ok()
1053 .and_then(|dns| dns.parse().ok());
1054 iroh::IrohConnector::new(peers, iroh_dns).await?.into_dyn()
1055 }
1056 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1057 };
1058
1059 Ok(ReconnectFederationApi::new(&connector, admin_id))
1060 }
1061}
1062
1063impl IModuleFederationApi for ReconnectFederationApi {}
1064
1065#[apply(async_trait_maybe_send!)]
1066impl IRawFederationApi for ReconnectFederationApi {
1067 fn all_peers(&self) -> &BTreeSet<PeerId> {
1068 &self.peers
1069 }
1070
1071 fn self_peer(&self) -> Option<PeerId> {
1072 self.admin_id
1073 }
1074
1075 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1076 ReconnectFederationApi {
1077 peers: self.peers.clone(),
1078 admin_id: self.admin_id,
1079 module_id: Some(id),
1080 connections: self.connections.clone(),
1081 }
1082 .into()
1083 }
1084
1085 #[instrument(
1086 target = LOG_NET_API,
1087 skip_all,
1088 fields(
1089 peer_id = %peer_id,
1090 method = %method,
1091 params = %params.params,
1092 )
1093 )]
1094 async fn request_raw(
1095 &self,
1096 peer_id: PeerId,
1097 method: &str,
1098 params: &ApiRequestErased,
1099 ) -> PeerResult<Value> {
1100 let method = match self.module_id {
1101 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1102 None => ApiMethod::Core(method.to_string()),
1103 };
1104
1105 self.connections
1106 .request(peer_id, method, params.clone())
1107 .await
1108 }
1109}
1110
1111#[derive(Clone, Debug)]
1112pub struct ReconnectClientConnections {
1113 connections: BTreeMap<PeerId, ClientConnection>,
1114}
1115
1116impl ReconnectClientConnections {
1117 pub fn new(connector: &DynClientConnector) -> Self {
1118 ReconnectClientConnections {
1119 connections: connector
1120 .peers()
1121 .into_iter()
1122 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1123 .collect(),
1124 }
1125 }
1126
1127 async fn request(
1128 &self,
1129 peer: PeerId,
1130 method: ApiMethod,
1131 request: ApiRequestErased,
1132 ) -> PeerResult<Value> {
1133 trace!(target: LOG_NET_API, %method, "Api request");
1134 let res = self
1135 .connections
1136 .get(&peer)
1137 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1138 .connection()
1139 .await
1140 .context("Failed to connect to peer")
1141 .map_err(PeerError::Connection)?
1142 .request(method.clone(), request)
1143 .await;
1144
1145 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1146
1147 res
1148 }
1149}
1150
1151#[derive(Clone, Debug)]
1152struct ClientConnection {
1153 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1154}
1155
1156impl ClientConnection {
1157 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1158 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1159
1160 fedimint_core::task::spawn(
1161 "peer-api-connection",
1162 async move {
1163 let mut backoff = api_networking_backoff();
1164
1165 while let Ok(sender) = receiver.recv().await {
1166 let mut senders = vec![sender];
1167
1168 while let Ok(sender) = receiver.try_recv() {
1171 senders.push(sender);
1172 }
1173
1174 match connector.connect(peer).await {
1175 Ok(connection) => {
1176 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1177
1178 for sender in senders {
1179 sender.send(connection.clone()).ok();
1180 }
1181
1182 loop {
1183 tokio::select! {
1184 sender = receiver.recv() => {
1185 match sender.ok() {
1186 Some(sender) => sender.send(connection.clone()).ok(),
1187 None => break,
1188 };
1189 }
1190 () = connection.await_disconnection() => break,
1191 }
1192 }
1193
1194 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1195
1196 backoff = api_networking_backoff();
1197 }
1198 Err(e) => {
1199 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1200
1201 fedimint_core::task::sleep(
1202 backoff.next().expect("No limit to the number of retries"),
1203 )
1204 .await;
1205 }
1206 }
1207 }
1208
1209 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1210 }
1211 .instrument(trace_span!("peer-api-connection", ?peer)),
1212 );
1213
1214 ClientConnection { sender }
1215 }
1216
1217 async fn connection(&self) -> Option<DynClientConnection> {
1218 let (sender, receiver) = oneshot::channel();
1219
1220 self.sender
1221 .send(sender)
1222 .await
1223 .inspect_err(|err| {
1224 warn!(
1225 target: LOG_CLIENT_NET_API,
1226 err = %err.fmt_compact(),
1227 "Api connection request channel closed unexpectedly"
1228 );
1229 })
1230 .ok()?;
1231
1232 receiver.await.ok()
1233 }
1234}
1235
1236#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1238pub struct LegacyFederationStatus {
1239 pub session_count: u64,
1240 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1241 pub peers_online: u64,
1242 pub peers_offline: u64,
1243 pub peers_flagged: u64,
1246 pub scheduled_shutdown: Option<u64>,
1247}
1248
1249#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1250pub struct LegacyPeerStatus {
1251 pub last_contribution: Option<u64>,
1252 pub connection_status: LegacyP2PConnectionStatus,
1253 pub flagged: bool,
1256}
1257
1258#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1259#[serde(rename_all = "snake_case")]
1260pub enum LegacyP2PConnectionStatus {
1261 #[default]
1262 Disconnected,
1263 Connected,
1264}
1265
1266#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1267pub struct StatusResponse {
1268 pub server: ServerStatusLegacy,
1269 pub federation: Option<LegacyFederationStatus>,
1270}
1271
1272#[cfg(test)]
1273mod tests;