1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::{fmt, panic};
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{
24 GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
25};
26use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
27use fedimint_core::core::backup::SignedBackupRequest;
28use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
29use fedimint_core::encoding::{Decodable, Encodable};
30use fedimint_core::envs::{
31 FM_IROH_DNS_ENV, FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env,
32};
33use fedimint_core::invite_code::InviteCode;
34use fedimint_core::module::audit::AuditSummary;
35use fedimint_core::module::registry::ModuleDecoderRegistry;
36use fedimint_core::module::{
37 ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
38};
39use fedimint_core::net::api_announcement::SignedApiAnnouncement;
40#[cfg(not(target_family = "wasm"))]
41use fedimint_core::rustls::install_crypto_provider;
42use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
43use fedimint_core::task::{MaybeSend, MaybeSync};
44use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
45use fedimint_core::util::backoff_util::api_networking_backoff;
46use fedimint_core::util::{FmtCompact as _, SafeUrl};
47use fedimint_core::{
48 NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
49};
50use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
51use futures::channel::oneshot;
52use futures::future::pending;
53use futures::stream::FuturesUnordered;
54use futures::{Future, StreamExt};
55use global_api::with_cache::GlobalFederationApiWithCache;
56use jsonrpsee_core::DeserializeOwned;
57use jsonrpsee_core::client::ClientT;
58pub use jsonrpsee_core::client::Error as JsonRpcClientError;
59use jsonrpsee_types::ErrorCode;
60#[cfg(target_family = "wasm")]
61use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
62#[cfg(not(target_family = "wasm"))]
63use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
64#[cfg(not(target_family = "wasm"))]
65use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
66use serde::{Deserialize, Serialize};
67use serde_json::Value;
68use tokio::sync::OnceCell;
69#[cfg(not(target_family = "wasm"))]
70use tokio_rustls::rustls::RootCertStore;
71#[cfg(all(feature = "tor", not(target_family = "wasm")))]
72use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
73use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
74
75use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
76
77pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
78
79pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
80 ApiVersion { major: 0, minor: 1 };
81
82pub type PeerResult<T> = Result<T, PeerError>;
83pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
84pub type FederationResult<T> = Result<T, FederationError>;
85pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
86
87pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
88
89#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
93pub struct ApiVersionSet {
94 pub core: ApiVersion,
95 pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
96}
97
98#[apply(async_trait_maybe_send!)]
100pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
101 fn all_peers(&self) -> &BTreeSet<PeerId>;
109
110 fn self_peer(&self) -> Option<PeerId>;
115
116 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
117
118 async fn request_raw(
120 &self,
121 peer_id: PeerId,
122 method: &str,
123 params: &ApiRequestErased,
124 ) -> PeerResult<Value>;
125
126 fn connector(&self) -> &DynClientConnector;
127}
128
129#[apply(async_trait_maybe_send!)]
132pub trait FederationApiExt: IRawFederationApi {
133 async fn request_single_peer<Ret>(
134 &self,
135 method: String,
136 params: ApiRequestErased,
137 peer: PeerId,
138 ) -> PeerResult<Ret>
139 where
140 Ret: DeserializeOwned,
141 {
142 self.request_raw(peer, &method, ¶ms)
143 .await
144 .and_then(|v| {
145 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
146 })
147 }
148
149 async fn request_single_peer_federation<FedRet>(
150 &self,
151 method: String,
152 params: ApiRequestErased,
153 peer_id: PeerId,
154 ) -> FederationResult<FedRet>
155 where
156 FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
157 {
158 self.request_raw(peer_id, &method, ¶ms)
159 .await
160 .and_then(|v| {
161 serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
162 })
163 .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
164 }
165
166 #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
169 async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
170 &self,
171 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
172 method: String,
173 params: ApiRequestErased,
174 ) -> FederationResult<FR> {
175 #[cfg(not(target_family = "wasm"))]
179 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
180 #[cfg(target_family = "wasm")]
181 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
182
183 for peer in self.all_peers() {
184 futures.push(Box::pin({
185 let method = &method;
186 let params = ¶ms;
187 async move {
188 let result = self
189 .request_single_peer(method.clone(), params.clone(), *peer)
190 .await;
191
192 (*peer, result)
193 }
194 }));
195 }
196
197 let mut peer_errors = BTreeMap::new();
198 let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
199
200 loop {
201 let (peer, result) = futures
202 .next()
203 .await
204 .expect("Query strategy ran out of peers to query without returning a result");
205
206 match result {
207 Ok(response) => match strategy.process(peer, response) {
208 QueryStep::Retry(peers) => {
209 for peer in peers {
210 futures.push(Box::pin({
211 let method = &method;
212 let params = ¶ms;
213 async move {
214 let result = self
215 .request_single_peer(method.clone(), params.clone(), peer)
216 .await;
217
218 (peer, result)
219 }
220 }));
221 }
222 }
223 QueryStep::Success(response) => return Ok(response),
224 QueryStep::Failure(e) => {
225 peer_errors.insert(peer, e);
226 }
227 QueryStep::Continue => {}
228 },
229 Err(e) => {
230 e.report_if_unusual(peer, "RequestWithStrategy");
231 peer_errors.insert(peer, e);
232 }
233 }
234
235 if peer_errors.len() == peer_error_threshold {
236 return Err(FederationError::peer_errors(
237 method.clone(),
238 params.params.clone(),
239 peer_errors,
240 ));
241 }
242 }
243 }
244
245 #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
246 async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
247 &self,
248 mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
249 method: String,
250 params: ApiRequestErased,
251 ) -> FR {
252 #[cfg(not(target_family = "wasm"))]
256 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
257 #[cfg(target_family = "wasm")]
258 let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
259
260 for peer in self.all_peers() {
261 futures.push(Box::pin({
262 let method = &method;
263 let params = ¶ms;
264 async move {
265 let response = util::retry(
266 format!("api-request-{method}-{peer}"),
267 api_networking_backoff(),
268 || async {
269 self.request_single_peer(method.clone(), params.clone(), *peer)
270 .await
271 .inspect_err(|e| {
272 e.report_if_unusual(*peer, "QueryWithStrategyRetry");
273 })
274 .map_err(|e| anyhow!(e.to_string()))
275 },
276 )
277 .await
278 .expect("Number of retries has no limit");
279
280 (*peer, response)
281 }
282 }));
283 }
284
285 loop {
286 let (peer, response) = match futures.next().await {
287 Some(t) => t,
288 None => pending().await,
289 };
290
291 match strategy.process(peer, response) {
292 QueryStep::Retry(peers) => {
293 for peer in peers {
294 futures.push(Box::pin({
295 let method = &method;
296 let params = ¶ms;
297 async move {
298 let response = util::retry(
299 format!("api-request-{method}-{peer}"),
300 api_networking_backoff(),
301 || async {
302 self.request_single_peer(
303 method.clone(),
304 params.clone(),
305 peer,
306 )
307 .await
308 .inspect_err(|err| {
309 if err.is_unusual() {
310 debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
311 }
312 })
313 .map_err(|e| anyhow!(e.to_string()))
314 },
315 )
316 .await
317 .expect("Number of retries has no limit");
318
319 (peer, response)
320 }
321 }));
322 }
323 }
324 QueryStep::Success(response) => return response,
325 QueryStep::Failure(e) => {
326 warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
327 }
328 QueryStep::Continue => {}
329 }
330 }
331 }
332
333 async fn request_current_consensus<Ret>(
334 &self,
335 method: String,
336 params: ApiRequestErased,
337 ) -> FederationResult<Ret>
338 where
339 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
340 {
341 self.request_with_strategy(
342 ThresholdConsensus::new(self.all_peers().to_num_peers()),
343 method,
344 params,
345 )
346 .await
347 }
348
349 async fn request_current_consensus_retry<Ret>(
350 &self,
351 method: String,
352 params: ApiRequestErased,
353 ) -> Ret
354 where
355 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
356 {
357 self.request_with_strategy_retry(
358 ThresholdConsensus::new(self.all_peers().to_num_peers()),
359 method,
360 params,
361 )
362 .await
363 }
364
365 async fn request_admin<Ret>(
366 &self,
367 method: &str,
368 params: ApiRequestErased,
369 auth: ApiAuth,
370 ) -> FederationResult<Ret>
371 where
372 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
373 {
374 let Some(self_peer_id) = self.self_peer() else {
375 return Err(FederationError::general(
376 method,
377 params,
378 anyhow::format_err!("Admin peer_id not set"),
379 ));
380 };
381
382 self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
383 .await
384 }
385
386 async fn request_admin_no_auth<Ret>(
387 &self,
388 method: &str,
389 params: ApiRequestErased,
390 ) -> FederationResult<Ret>
391 where
392 Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
393 {
394 let Some(self_peer_id) = self.self_peer() else {
395 return Err(FederationError::general(
396 method,
397 params,
398 anyhow::format_err!("Admin peer_id not set"),
399 ));
400 };
401
402 self.request_single_peer_federation(method.into(), params, self_peer_id)
403 .await
404 }
405}
406
407#[apply(async_trait_maybe_send!)]
408impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
409
410pub trait IModuleFederationApi: IRawFederationApi {}
412
413dyn_newtype_define! {
414 #[derive(Clone)]
415 pub DynModuleApi(Arc<IModuleFederationApi>)
416}
417
418dyn_newtype_define! {
419 #[derive(Clone)]
420 pub DynGlobalApi(Arc<IGlobalFederationApi>)
421}
422
423impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
424 fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
425 self.inner.as_ref()
426 }
427}
428
429impl DynGlobalApi {
430 pub async fn new_admin(
431 peer: PeerId,
432 url: SafeUrl,
433 api_secret: &Option<String>,
434 iroh_enable_dht: bool,
435 iroh_enable_next: bool,
436 ) -> anyhow::Result<DynGlobalApi> {
437 let connector =
438 make_admin_connector(peer, url, api_secret, iroh_enable_dht, iroh_enable_next).await?;
439 Ok(
440 GlobalFederationApiWithCache::new(ReconnectFederationApi::new(connector, Some(peer)))
441 .into(),
442 )
443 }
444
445 pub fn new(connector: DynClientConnector) -> anyhow::Result<Self> {
446 Ok(GlobalFederationApiWithCache::new(ReconnectFederationApi::new(connector, None)).into())
447 }
448 pub async fn from_setup_endpoint(
452 url: SafeUrl,
453 api_secret: &Option<String>,
454 iroh_enable_dht: bool,
455 iroh_enable_next: bool,
456 ) -> anyhow::Result<Self> {
457 Self::new_admin(
461 PeerId::from(1024),
462 url,
463 api_secret,
464 iroh_enable_dht,
465 iroh_enable_next,
466 )
467 .await
468 }
469
470 pub async fn from_endpoints(
471 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
472 api_secret: &Option<String>,
473 iroh_enable_dht: bool,
474 iroh_enable_next: bool,
475 ) -> anyhow::Result<Self> {
476 let connector =
477 make_connector(peers, api_secret, iroh_enable_dht, iroh_enable_next).await?;
478 Ok(GlobalFederationApiWithCache::new(ReconnectFederationApi::new(connector, None)).into())
479 }
480}
481
482#[apply(async_trait_maybe_send!)]
484pub trait IGlobalFederationApi: IRawFederationApi {
485 async fn submit_transaction(
486 &self,
487 tx: Transaction,
488 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
489
490 async fn await_block(
491 &self,
492 block_index: u64,
493 decoders: &ModuleDecoderRegistry,
494 ) -> anyhow::Result<SessionOutcome>;
495
496 async fn get_session_status(
497 &self,
498 block_index: u64,
499 decoders: &ModuleDecoderRegistry,
500 core_api_version: ApiVersion,
501 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
502 ) -> anyhow::Result<SessionStatus>;
503
504 async fn session_count(&self) -> FederationResult<u64>;
505
506 async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
507
508 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
509
510 async fn download_backup(
511 &self,
512 id: &secp256k1::PublicKey,
513 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
514
515 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
519
520 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
521
522 async fn set_local_params(
523 &self,
524 name: String,
525 federation_name: Option<String>,
526 disable_base_fees: Option<bool>,
527 auth: ApiAuth,
528 ) -> FederationResult<String>;
529
530 async fn add_peer_connection_info(
531 &self,
532 info: String,
533 auth: ApiAuth,
534 ) -> FederationResult<String>;
535
536 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
538
539 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
541
542 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
550
551 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
556
557 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
561
562 async fn get_verify_config_hash(
565 &self,
566 auth: ApiAuth,
567 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
568
569 async fn verified_configs(
572 &self,
573 auth: ApiAuth,
574 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
575
576 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
582
583 async fn status(&self) -> FederationResult<StatusResponse>;
585
586 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
588
589 async fn guardian_config_backup(&self, auth: ApiAuth)
591 -> FederationResult<GuardianConfigBackup>;
592
593 async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
595
596 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
597
598 async fn submit_api_announcement(
600 &self,
601 peer_id: PeerId,
602 announcement: SignedApiAnnouncement,
603 ) -> FederationResult<()>;
604
605 async fn api_announcements(
606 &self,
607 guardian: PeerId,
608 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
609
610 async fn sign_api_announcement(
611 &self,
612 api_url: SafeUrl,
613 auth: ApiAuth,
614 ) -> FederationResult<SignedApiAnnouncement>;
615
616 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
617
618 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
620
621 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
623
624 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
627
628 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
631}
632
633pub fn deserialize_outcome<R>(
634 outcome: &SerdeOutputOutcome,
635 module_decoder: &Decoder,
636) -> OutputOutcomeResult<R>
637where
638 R: OutputOutcome + MaybeSend,
639{
640 let dyn_outcome = outcome
641 .try_into_inner_known_module_kind(module_decoder)
642 .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
643
644 let source_instance = dyn_outcome.module_instance_id();
645
646 dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
647 let target_type = std::any::type_name::<R>();
648 OutputOutcomeError::ResponseDeserialization(anyhow!(
649 "Could not downcast output outcome with instance id {source_instance} to {target_type}"
650 ))
651 })
652}
653
654#[derive(Debug, Clone)]
655pub struct WebsocketConnector {
656 peers: BTreeMap<PeerId, SafeUrl>,
657 api_secret: Option<String>,
658
659 pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
665
666 #[allow(clippy::type_complexity)]
668 connections: Arc<tokio::sync::Mutex<HashMap<PeerId, Arc<OnceCell<Arc<WsClient>>>>>>,
669}
670
671impl WebsocketConnector {
672 fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
673 let mut s = Self::new_no_overrides(peers, api_secret);
674
675 for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
676 s = s.with_connection_override(k, v);
677 }
678
679 Ok(s)
680 }
681 pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
682 self.connection_overrides.insert(peer_id, url);
683 self
684 }
685 pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
686 Self {
687 peers,
688 api_secret,
689 connection_overrides: BTreeMap::default(),
690 connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
691 }
692 }
693
694 async fn get_or_create_connection(&self, peer_id: PeerId) -> PeerResult<Arc<WsClient>> {
695 let mut pool_lock = self.connections.lock().await;
696
697 let entry_arc = pool_lock
698 .entry(peer_id)
699 .and_modify(|entry_arc| {
700 if let Some(existing_conn) = entry_arc.get()
702 && !existing_conn.is_connected() {
703 trace!(target: LOG_NET_WS, %peer_id, "Existing connection is disconnected, removing from pool");
704 *entry_arc = Arc::new(OnceCell::new());
705 }
706 })
707 .or_insert_with(|| Arc::new(OnceCell::new()))
708 .clone();
709
710 drop(pool_lock);
712
713 let conn = entry_arc
714 .get_or_try_init(|| async {
715 trace!(target: LOG_NET_WS, %peer_id, "Creating new websocket connection");
716 let api_endpoint = match self.connection_overrides.get(&peer_id) {
717 Some(url) => {
718 trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
719 url
720 }
721 None => self.peers.get(&peer_id).ok_or_else(|| {
722 PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
723 })?,
724 };
725
726 #[cfg(not(target_family = "wasm"))]
727 let mut client = {
728 install_crypto_provider().await;
729 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
730 let mut root_certs = RootCertStore::empty();
731 root_certs.extend(webpki_roots);
732
733 let tls_cfg = CustomCertStore::builder()
734 .with_root_certificates(root_certs)
735 .with_no_client_auth();
736
737 WsClientBuilder::default()
738 .max_concurrent_requests(u16::MAX as usize)
739 .with_custom_cert_store(tls_cfg)
740 };
741
742 #[cfg(target_family = "wasm")]
743 let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
744
745 if let Some(api_secret) = &self.api_secret {
746 #[cfg(not(target_family = "wasm"))]
747 {
748 let mut headers = HeaderMap::new();
751
752 let auth = base64::engine::general_purpose::STANDARD
753 .encode(format!("fedimint:{api_secret}"));
754
755 headers.insert(
756 "Authorization",
757 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
758 );
759
760 client = client.set_headers(headers);
761 }
762 #[cfg(target_family = "wasm")]
763 {
764 let mut url = api_endpoint.clone();
767 url.set_username("fedimint")
768 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
769 url.set_password(Some(&api_secret))
770 .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
771
772 let client = client
773 .build(url.as_str())
774 .await
775 .map_err(|err| PeerError::InternalClientError(err.into()))?;
776
777 return Ok(Arc::new(client));
778 }
779 }
780
781 let client = client
782 .build(api_endpoint.as_str())
783 .await
784 .map_err(|err| PeerError::InternalClientError(err.into()))?;
785
786 Ok(Arc::new(client))
787 })
788 .await?;
789
790 trace!(target: LOG_NET_WS, %peer_id, "Using websocket connection");
791 Ok(conn.clone())
792 }
793}
794
795#[async_trait]
796impl IClientConnector for WebsocketConnector {
797 fn peers(&self) -> BTreeSet<PeerId> {
798 self.peers.keys().copied().collect()
799 }
800
801 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
802 let client = self.get_or_create_connection(peer_id).await?;
803 Ok(client.into_dyn())
804 }
805}
806
807#[cfg(all(feature = "tor", not(target_family = "wasm")))]
808#[derive(Debug, Clone)]
809pub struct TorConnector {
810 peers: BTreeMap<PeerId, SafeUrl>,
811 api_secret: Option<String>,
812}
813
814#[cfg(all(feature = "tor", not(target_family = "wasm")))]
815impl TorConnector {
816 pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
817 Self { peers, api_secret }
818 }
819}
820
821#[cfg(all(feature = "tor", not(target_family = "wasm")))]
822#[async_trait]
823impl IClientConnector for TorConnector {
824 fn peers(&self) -> BTreeSet<PeerId> {
825 self.peers.keys().copied().collect()
826 }
827
828 #[allow(clippy::too_many_lines)]
829 async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
830 let api_endpoint = self
831 .peers
832 .get(&peer_id)
833 .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
834
835 let tor_config = TorClientConfig::default();
836 let tor_client = TorClient::create_bootstrapped(tor_config)
837 .await
838 .map_err(|err| PeerError::InternalClientError(err.into()))?
839 .isolated_client();
840
841 debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
842
843 let addr = (
846 api_endpoint
847 .host_str()
848 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
849 api_endpoint
850 .port_or_known_default()
851 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
852 );
853 let tor_addr = TorAddr::from(addr).map_err(|e| {
854 PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
855 })?;
856
857 let tor_addr_clone = tor_addr.clone();
858
859 debug!(
860 ?tor_addr,
861 ?addr,
862 "Successfully created `TorAddr` for given address (i.e. host and port)"
863 );
864
865 let anonymized_stream = if api_endpoint.is_onion_address() {
868 let mut stream_prefs = arti_client::StreamPrefs::default();
869 stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
870
871 let anonymized_stream = tor_client
872 .connect_with_prefs(tor_addr, &stream_prefs)
873 .await
874 .map_err(|e| PeerError::Connection(e.into()))?;
875
876 debug!(
877 ?tor_addr_clone,
878 "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
879 );
880 anonymized_stream
881 } else {
882 let anonymized_stream = tor_client
883 .connect(tor_addr)
884 .await
885 .map_err(|e| PeerError::Connection(e.into()))?;
886
887 debug!(
888 ?tor_addr_clone,
889 "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
890 );
891 anonymized_stream
892 };
893
894 let is_tls = match api_endpoint.scheme() {
895 "wss" => true,
896 "ws" => false,
897 unexpected_scheme => {
898 return Err(PeerError::InvalidEndpoint(anyhow!(
899 "Unsupported scheme: {unexpected_scheme}"
900 )));
901 }
902 };
903
904 let tls_connector = if is_tls {
905 let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
906 let mut root_certs = RootCertStore::empty();
907 root_certs.extend(webpki_roots);
908
909 let tls_config = TlsClientConfig::builder()
910 .with_root_certificates(root_certs)
911 .with_no_client_auth();
912 let tls_connector = TlsConnector::from(Arc::new(tls_config));
913 Some(tls_connector)
914 } else {
915 None
916 };
917
918 let mut ws_client_builder =
919 WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
920
921 if let Some(api_secret) = &self.api_secret {
922 let mut headers = HeaderMap::new();
925
926 let auth =
927 base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
928
929 headers.insert(
930 "Authorization",
931 HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
932 );
933
934 ws_client_builder = ws_client_builder.set_headers(headers);
935 }
936
937 match tls_connector {
938 None => {
939 let client = ws_client_builder
940 .build_with_stream(api_endpoint.as_str(), anonymized_stream)
941 .await
942 .map_err(|e| PeerError::Connection(e.into()))?;
943
944 Ok(client.into_dyn())
945 }
946 Some(tls_connector) => {
947 let host = api_endpoint
948 .host_str()
949 .map(ToOwned::to_owned)
950 .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
951
952 let server_name = rustls_pki_types::ServerName::try_from(host)
955 .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
956
957 let anonymized_tls_stream = tls_connector
958 .connect(server_name, anonymized_stream)
959 .await
960 .map_err(|e| PeerError::Connection(e.into()))?;
961
962 let client = ws_client_builder
963 .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
964 .await
965 .map_err(|e| PeerError::Connection(e.into()))?;
966
967 Ok(client.into_dyn())
968 }
969 }
970 }
971}
972
973fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
974 match jsonrpc_error {
975 JsonRpcClientError::Call(error_object) => {
976 let error = anyhow!(error_object.message().to_owned());
977 match ErrorCode::from(error_object.code()) {
978 ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
979 PeerError::InvalidRequest(error)
980 }
981 ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
982 ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
983 ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
984 PeerError::ServerError(error)
985 }
986 }
987 }
988 JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
989 JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
990 JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
991 JsonRpcClientError::InvalidSubscriptionId => {
992 PeerError::Transport(anyhow!("Invalid subscription id"))
993 }
994 JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
995 PeerError::InvalidRequest(anyhow!(invalid_request_id))
996 }
997 JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
998 JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
999 JsonRpcClientError::HttpNotImplemented => {
1000 PeerError::ServerError(anyhow!("Http not implemented"))
1001 }
1002 JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
1003 PeerError::InvalidRequest(anyhow!(empty_batch_request))
1004 }
1005 JsonRpcClientError::RegisterMethod(register_method_error) => {
1006 PeerError::InvalidResponse(anyhow!(register_method_error))
1007 }
1008 }
1009}
1010
1011#[async_trait]
1012impl IClientConnection for WsClient {
1013 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1014 let method = match method {
1015 ApiMethod::Core(method) => method,
1016 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
1017 };
1018
1019 Ok(ClientT::request(self, &method, [request.to_json()])
1020 .await
1021 .map_err(jsonrpc_error_to_peer_error)?)
1022 }
1023
1024 async fn await_disconnection(&self) {
1025 self.on_disconnect().await;
1026 }
1027}
1028
1029#[async_trait]
1030impl IClientConnection for Arc<WsClient> {
1031 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1032 let method = match method {
1033 ApiMethod::Core(method) => method,
1034 ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
1035 };
1036
1037 Ok(
1038 ClientT::request(self.as_ref(), &method, [request.to_json()])
1039 .await
1040 .map_err(jsonrpc_error_to_peer_error)?,
1041 )
1042 }
1043
1044 async fn await_disconnection(&self) {
1045 self.on_disconnect().await;
1046 }
1047}
1048
1049pub type DynClientConnector = Arc<dyn IClientConnector>;
1050
1051#[async_trait]
1054pub trait IClientConnector: Send + Sync + 'static + fmt::Debug {
1055 fn peers(&self) -> BTreeSet<PeerId>;
1056
1057 async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
1058
1059 fn into_dyn(self) -> DynClientConnector
1060 where
1061 Self: Sized,
1062 {
1063 Arc::new(self)
1064 }
1065
1066 fn is_admin(&self) -> bool {
1067 self.peers().len() == 1
1068 }
1069}
1070
1071pub type DynClientConnection = Arc<dyn IClientConnection>;
1072
1073#[async_trait]
1074pub trait IClientConnection: Debug + Send + Sync + 'static {
1075 async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
1076
1077 async fn await_disconnection(&self);
1078
1079 fn into_dyn(self) -> DynClientConnection
1080 where
1081 Self: Sized,
1082 {
1083 Arc::new(self)
1084 }
1085}
1086
1087#[derive(Clone, Debug)]
1088pub struct ReconnectFederationApi {
1089 connector: DynClientConnector,
1090 peers: BTreeSet<PeerId>,
1091 admin_id: Option<PeerId>,
1092 module_id: Option<ModuleInstanceId>,
1093 connections: ReconnectClientConnections,
1094}
1095
1096impl ReconnectFederationApi {
1097 pub fn new(connector: DynClientConnector, admin_peer_id: Option<PeerId>) -> Self {
1098 Self {
1099 peers: connector.peers(),
1100 admin_id: admin_peer_id,
1101 module_id: None,
1102 connections: ReconnectClientConnections::new(&connector),
1103 connector,
1104 }
1105 }
1106
1107 pub fn new_admin(connector: DynClientConnector, peer: PeerId) -> Self {
1108 Self::new(connector, Some(peer))
1109 }
1110}
1111
1112impl IModuleFederationApi for ReconnectFederationApi {}
1113
1114#[apply(async_trait_maybe_send!)]
1115impl IRawFederationApi for ReconnectFederationApi {
1116 fn all_peers(&self) -> &BTreeSet<PeerId> {
1117 &self.peers
1118 }
1119
1120 fn self_peer(&self) -> Option<PeerId> {
1121 self.admin_id
1122 }
1123
1124 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1125 ReconnectFederationApi {
1126 connector: self.connector.clone(),
1127 peers: self.peers.clone(),
1128 admin_id: self.admin_id,
1129 module_id: Some(id),
1130 connections: self.connections.clone(),
1131 }
1132 .into()
1133 }
1134
1135 #[instrument(
1136 target = LOG_NET_API,
1137 skip_all,
1138 fields(
1139 peer_id = %peer_id,
1140 method = %method,
1141 params = %params.params,
1142 )
1143 )]
1144 async fn request_raw(
1145 &self,
1146 peer_id: PeerId,
1147 method: &str,
1148 params: &ApiRequestErased,
1149 ) -> PeerResult<Value> {
1150 let method = match self.module_id {
1151 Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1152 None => ApiMethod::Core(method.to_string()),
1153 };
1154
1155 self.connections
1156 .request(peer_id, method, params.clone())
1157 .await
1158 }
1159 fn connector(&self) -> &DynClientConnector {
1160 &self.connector
1161 }
1162}
1163
1164#[derive(Clone, Debug)]
1165pub struct ReconnectClientConnections {
1166 connections: BTreeMap<PeerId, ClientConnection>,
1167}
1168
1169impl ReconnectClientConnections {
1170 pub fn new(connector: &DynClientConnector) -> Self {
1171 ReconnectClientConnections {
1172 connections: connector
1173 .peers()
1174 .into_iter()
1175 .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1176 .collect(),
1177 }
1178 }
1179
1180 async fn request(
1181 &self,
1182 peer: PeerId,
1183 method: ApiMethod,
1184 request: ApiRequestErased,
1185 ) -> PeerResult<Value> {
1186 trace!(target: LOG_NET_API, %method, "Api request");
1187 let res = self
1188 .connections
1189 .get(&peer)
1190 .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1191 .connection()
1192 .await
1193 .context("Failed to connect to peer")
1194 .map_err(PeerError::Connection)?
1195 .request(method.clone(), request)
1196 .await;
1197
1198 trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1199
1200 res
1201 }
1202}
1203
1204#[derive(Clone, Debug)]
1205struct ClientConnection {
1206 sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1207}
1208
1209impl ClientConnection {
1210 fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1211 let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1212
1213 fedimint_core::task::spawn(
1214 "peer-api-connection",
1215 async move {
1216 let mut backoff = api_networking_backoff();
1217
1218 while let Ok(sender) = receiver.recv().await {
1219 let mut senders = vec![sender];
1220
1221 while let Ok(sender) = receiver.try_recv() {
1224 senders.push(sender);
1225 }
1226
1227 match connector.connect(peer).await {
1228 Ok(connection) => {
1229 trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1230
1231 for sender in senders {
1232 sender.send(connection.clone()).ok();
1233 }
1234
1235 loop {
1236 tokio::select! {
1237 sender = receiver.recv() => {
1238 match sender.ok() {
1239 Some(sender) => sender.send(connection.clone()).ok(),
1240 None => break,
1241 };
1242 }
1243 () = connection.await_disconnection() => break,
1244 }
1245 }
1246
1247 trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1248
1249 backoff = api_networking_backoff();
1250 }
1251 Err(e) => {
1252 trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1253
1254 drop(senders);
1255
1256 fedimint_core::task::sleep(
1257 backoff.next().expect("No limit to the number of retries"),
1258 )
1259 .await;
1260 }
1261 }
1262 }
1263
1264 trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1265 }
1266 .instrument(trace_span!("peer-api-connection", ?peer)),
1267 );
1268
1269 ClientConnection { sender }
1270 }
1271
1272 async fn connection(&self) -> Option<DynClientConnection> {
1273 let (sender, receiver) = oneshot::channel();
1274
1275 self.sender
1276 .send(sender)
1277 .await
1278 .inspect_err(|err| {
1279 warn!(
1280 target: LOG_CLIENT_NET_API,
1281 err = %err.fmt_compact(),
1282 "Api connection request channel closed unexpectedly"
1283 );
1284 })
1285 .ok()?;
1286
1287 receiver.await.ok()
1288 }
1289}
1290
1291#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1293pub struct LegacyFederationStatus {
1294 pub session_count: u64,
1295 pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1296 pub peers_online: u64,
1297 pub peers_offline: u64,
1298 pub peers_flagged: u64,
1301 pub scheduled_shutdown: Option<u64>,
1302}
1303
1304#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1305pub struct LegacyPeerStatus {
1306 pub last_contribution: Option<u64>,
1307 pub connection_status: LegacyP2PConnectionStatus,
1308 pub flagged: bool,
1311}
1312
1313#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1314#[serde(rename_all = "snake_case")]
1315pub enum LegacyP2PConnectionStatus {
1316 #[default]
1317 Disconnected,
1318 Connected,
1319}
1320
1321#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1322pub struct StatusResponse {
1323 pub server: ServerStatusLegacy,
1324 pub federation: Option<LegacyFederationStatus>,
1325}
1326
1327pub async fn make_admin_connector(
1328 admin_peer_id: PeerId,
1329 url: SafeUrl,
1330 api_secret: &Option<String>,
1331 iroh_enable_dht: bool,
1332 iroh_enable_next: bool,
1333) -> anyhow::Result<DynClientConnector> {
1334 make_connector(
1335 once((admin_peer_id, url)),
1336 api_secret,
1337 iroh_enable_dht,
1338 iroh_enable_next,
1339 )
1340 .await
1341}
1342
1343pub async fn make_connector(
1344 peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1345 api_secret: &Option<String>,
1346 iroh_enable_dht: bool,
1347 iroh_enable_next: bool,
1348) -> anyhow::Result<DynClientConnector> {
1349 let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1350
1351 let scheme = peers
1352 .values()
1353 .next()
1354 .expect("Federation api has been initialized with no peers")
1355 .scheme();
1356
1357 Ok(match scheme {
1358 "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1359 #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1360 "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1361 "iroh" => {
1362 let iroh_dns = std::env::var(FM_IROH_DNS_ENV)
1363 .ok()
1364 .and_then(|dns| dns.parse().ok());
1365 iroh::IrohConnector::new(peers, iroh_dns, iroh_enable_dht, iroh_enable_next)
1366 .await?
1367 .into_dyn()
1368 }
1369 scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1370 })
1371}
1372
1373#[cfg(test)]
1374mod tests;