fedimint_api_client/api/
mod.rs

1use core::panic;
2use std::collections::{BTreeMap, BTreeSet, HashMap};
3use std::fmt::Debug;
4use std::iter::once;
5use std::pin::Pin;
6use std::result;
7use std::sync::Arc;
8
9use anyhow::{Context, anyhow};
10#[cfg(all(feature = "tor", not(target_family = "wasm")))]
11use arti_client::{TorAddr, TorClient, TorClientConfig};
12use async_channel::bounded;
13use async_trait::async_trait;
14use base64::Engine as _;
15use bitcoin::hashes::sha256;
16use bitcoin::secp256k1;
17pub use error::{FederationError, OutputOutcomeError, PeerError};
18use fedimint_core::admin_client::{PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus};
19use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
20use fedimint_core::core::backup::SignedBackupRequest;
21use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
22use fedimint_core::encoding::{Decodable, Encodable};
23use fedimint_core::envs::{FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env};
24use fedimint_core::invite_code::InviteCode;
25use fedimint_core::module::audit::AuditSummary;
26use fedimint_core::module::registry::ModuleDecoderRegistry;
27use fedimint_core::module::{
28    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
29};
30use fedimint_core::net::api_announcement::SignedApiAnnouncement;
31use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
32use fedimint_core::task::{MaybeSend, MaybeSync};
33use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
34use fedimint_core::util::backoff_util::api_networking_backoff;
35use fedimint_core::util::{FmtCompact as _, SafeUrl};
36use fedimint_core::{
37    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
38};
39use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
40use futures::channel::oneshot;
41use futures::future::pending;
42use futures::stream::FuturesUnordered;
43use futures::{Future, StreamExt};
44use global_api::with_cache::GlobalFederationApiWithCache;
45use jsonrpsee_core::DeserializeOwned;
46use jsonrpsee_core::client::ClientT;
47pub use jsonrpsee_core::client::Error as JsonRpcClientError;
48use jsonrpsee_types::ErrorCode;
49#[cfg(target_family = "wasm")]
50use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
51#[cfg(not(target_family = "wasm"))]
52use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
53#[cfg(not(target_family = "wasm"))]
54use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
55use serde::{Deserialize, Serialize};
56use serde_json::Value;
57#[cfg(not(target_family = "wasm"))]
58use tokio_rustls::rustls::RootCertStore;
59#[cfg(all(feature = "tor", not(target_family = "wasm")))]
60use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
61use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
62
63use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
64mod error;
65pub mod global_api;
66pub mod net;
67
68pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
69
70pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
71    ApiVersion { major: 0, minor: 1 };
72
73pub type PeerResult<T> = Result<T, PeerError>;
74pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
75pub type FederationResult<T> = Result<T, FederationError>;
76pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
77
78pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
79
80/// Set of api versions for each component (core + modules)
81///
82/// E.g. result of federated common api versions discovery.
83#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
84pub struct ApiVersionSet {
85    pub core: ApiVersion,
86    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
87}
88
89/// An API (module or global) that can query a federation
90#[apply(async_trait_maybe_send!)]
91pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
92    /// List of all federation peers for the purpose of iterating each peer
93    /// in the federation.
94    ///
95    /// The underlying implementation is responsible for knowing how many
96    /// and `PeerId`s of each. The caller of this interface most probably
97    /// have some idea as well, but passing this set across every
98    /// API call to the federation would be inconvenient.
99    fn all_peers(&self) -> &BTreeSet<PeerId>;
100
101    /// PeerId of the Guardian node, if set
102    ///
103    /// This is for using Client in a "Admin" mode, making authenticated
104    /// calls to own `fedimintd` instance.
105    fn self_peer(&self) -> Option<PeerId>;
106
107    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
108
109    /// Make request to a specific federation peer by `peer_id`
110    async fn request_raw(
111        &self,
112        peer_id: PeerId,
113        method: &str,
114        params: &ApiRequestErased,
115    ) -> PeerResult<Value>;
116}
117
118/// An extension trait allowing to making federation-wide API call on top
119/// [`IRawFederationApi`].
120#[apply(async_trait_maybe_send!)]
121pub trait FederationApiExt: IRawFederationApi {
122    async fn request_single_peer<Ret>(
123        &self,
124        method: String,
125        params: ApiRequestErased,
126        peer: PeerId,
127    ) -> PeerResult<Ret>
128    where
129        Ret: DeserializeOwned,
130    {
131        self.request_raw(peer, &method, &params)
132            .await
133            .and_then(|v| {
134                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
135            })
136    }
137
138    async fn request_single_peer_federation<FedRet>(
139        &self,
140        method: String,
141        params: ApiRequestErased,
142        peer_id: PeerId,
143    ) -> FederationResult<FedRet>
144    where
145        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
146    {
147        self.request_raw(peer_id, &method, &params)
148            .await
149            .and_then(|v| {
150                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
151            })
152            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
153    }
154
155    /// Make an aggregate request to federation, using `strategy` to logically
156    /// merge the responses.
157    #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
158    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
159        &self,
160        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
161        method: String,
162        params: ApiRequestErased,
163    ) -> FederationResult<FR> {
164        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
165        // completed results from it and we don't do any `await`s when
166        // processing them, it should be totally OK.
167        #[cfg(not(target_family = "wasm"))]
168        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
169        #[cfg(target_family = "wasm")]
170        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
171
172        for peer in self.all_peers() {
173            futures.push(Box::pin({
174                let method = &method;
175                let params = &params;
176                async move {
177                    let result = self
178                        .request_single_peer(method.clone(), params.clone(), *peer)
179                        .await;
180
181                    (*peer, result)
182                }
183            }));
184        }
185
186        let mut peer_errors = BTreeMap::new();
187        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
188
189        loop {
190            let (peer, result) = futures
191                .next()
192                .await
193                .expect("Query strategy ran out of peers to query without returning a result");
194
195            match result {
196                Ok(response) => match strategy.process(peer, response) {
197                    QueryStep::Retry(peers) => {
198                        for peer in peers {
199                            futures.push(Box::pin({
200                                let method = &method;
201                                let params = &params;
202                                async move {
203                                    let result = self
204                                        .request_single_peer(method.clone(), params.clone(), peer)
205                                        .await;
206
207                                    (peer, result)
208                                }
209                            }));
210                        }
211                    }
212                    QueryStep::Success(response) => return Ok(response),
213                    QueryStep::Failure(e) => {
214                        peer_errors.insert(peer, e);
215                    }
216                    QueryStep::Continue => {}
217                },
218                Err(e) => {
219                    e.report_if_unusual(peer, "RequestWithStrategy");
220                    peer_errors.insert(peer, e);
221                }
222            }
223
224            if peer_errors.len() == peer_error_threshold {
225                return Err(FederationError::peer_errors(
226                    method.clone(),
227                    params.params.clone(),
228                    peer_errors,
229                ));
230            }
231        }
232    }
233
234    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
235        &self,
236        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
237        method: String,
238        params: ApiRequestErased,
239    ) -> FR {
240        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
241        // completed results from it and we don't do any `await`s when
242        // processing them, it should be totally OK.
243        #[cfg(not(target_family = "wasm"))]
244        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
245        #[cfg(target_family = "wasm")]
246        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
247
248        for peer in self.all_peers() {
249            futures.push(Box::pin({
250                let method = &method;
251                let params = &params;
252                async move {
253                    let response = util::retry(
254                        "api-request-{method}-{peer}",
255                        api_networking_backoff(),
256                        || async {
257                            self.request_single_peer(method.clone(), params.clone(), *peer)
258                                .await
259                                .inspect_err(|e| {
260                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
261                                })
262                                .map_err(|e| anyhow!(e.to_string()))
263                        },
264                    )
265                    .await
266                    .expect("Number of retries has no limit");
267
268                    (*peer, response)
269                }
270            }));
271        }
272
273        loop {
274            let (peer, response) = match futures.next().await {
275                Some(t) => t,
276                None => pending().await,
277            };
278
279            match strategy.process(peer, response) {
280                QueryStep::Retry(peers) => {
281                    for peer in peers {
282                        futures.push(Box::pin({
283                            let method = &method;
284                            let params = &params;
285                            async move {
286                                let response = util::retry(
287                                    "api-request-{method}-{peer}",
288                                    api_networking_backoff(),
289                                    || async {
290                                        self.request_single_peer(
291                                            method.clone(),
292                                            params.clone(),
293                                            peer,
294                                        )
295                                        .await
296                                        .inspect_err(|err| {
297                                            if err.is_unusual() {
298                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
299                                            }
300                                        })
301                                        .map_err(|e| anyhow!(e.to_string()))
302                                    },
303                                )
304                                .await
305                                .expect("Number of retries has no limit");
306
307                                (peer, response)
308                            }
309                        }));
310                    }
311                }
312                QueryStep::Success(response) => return response,
313                QueryStep::Failure(e) => {
314                    warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
315                }
316                QueryStep::Continue => {}
317            }
318        }
319    }
320
321    async fn request_current_consensus<Ret>(
322        &self,
323        method: String,
324        params: ApiRequestErased,
325    ) -> FederationResult<Ret>
326    where
327        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
328    {
329        self.request_with_strategy(
330            ThresholdConsensus::new(self.all_peers().to_num_peers()),
331            method,
332            params,
333        )
334        .await
335    }
336
337    async fn request_current_consensus_retry<Ret>(
338        &self,
339        method: String,
340        params: ApiRequestErased,
341    ) -> Ret
342    where
343        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
344    {
345        self.request_with_strategy_retry(
346            ThresholdConsensus::new(self.all_peers().to_num_peers()),
347            method,
348            params,
349        )
350        .await
351    }
352
353    async fn request_admin<Ret>(
354        &self,
355        method: &str,
356        params: ApiRequestErased,
357        auth: ApiAuth,
358    ) -> FederationResult<Ret>
359    where
360        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
361    {
362        let Some(self_peer_id) = self.self_peer() else {
363            return Err(FederationError::general(
364                method,
365                params,
366                anyhow::format_err!("Admin peer_id not set"),
367            ));
368        };
369
370        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
371            .await
372    }
373
374    async fn request_admin_no_auth<Ret>(
375        &self,
376        method: &str,
377        params: ApiRequestErased,
378    ) -> FederationResult<Ret>
379    where
380        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
381    {
382        let Some(self_peer_id) = self.self_peer() else {
383            return Err(FederationError::general(
384                method,
385                params,
386                anyhow::format_err!("Admin peer_id not set"),
387            ));
388        };
389
390        self.request_single_peer_federation(method.into(), params, self_peer_id)
391            .await
392    }
393}
394
395#[apply(async_trait_maybe_send!)]
396impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
397
398/// Trait marker for the module (non-global) endpoints
399pub trait IModuleFederationApi: IRawFederationApi {}
400
401dyn_newtype_define! {
402    #[derive(Clone)]
403    pub DynModuleApi(Arc<IModuleFederationApi>)
404}
405
406dyn_newtype_define! {
407    #[derive(Clone)]
408    pub DynGlobalApi(Arc<IGlobalFederationApi>)
409}
410
411impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
412    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
413        self.inner.as_ref()
414    }
415}
416
417impl DynGlobalApi {
418    pub async fn new_admin(
419        peer: PeerId,
420        url: SafeUrl,
421        api_secret: &Option<String>,
422    ) -> anyhow::Result<DynGlobalApi> {
423        Ok(GlobalFederationApiWithCache::new(
424            ReconnectFederationApi::from_endpoints(once((peer, url)), api_secret, Some(peer))
425                .await?,
426        )
427        .into())
428    }
429
430    // FIXME: (@leonardo) Should we have the option to do DKG and config related
431    // actions through Tor ? Should we add the `Connector` choice to
432    // ConfigParams then ?
433    pub async fn from_setup_endpoint(
434        url: SafeUrl,
435        api_secret: &Option<String>,
436    ) -> anyhow::Result<Self> {
437        // PeerIds are used only for informational purposes, but just in case, make a
438        // big number so it stands out
439
440        Self::new_admin(PeerId::from(1024), url, api_secret).await
441    }
442
443    pub async fn from_endpoints(
444        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
445        api_secret: &Option<String>,
446    ) -> anyhow::Result<Self> {
447        Ok(GlobalFederationApiWithCache::new(
448            ReconnectFederationApi::from_endpoints(peers, api_secret, None).await?,
449        )
450        .into())
451    }
452}
453
454/// The API for the global (non-module) endpoints
455#[apply(async_trait_maybe_send!)]
456pub trait IGlobalFederationApi: IRawFederationApi {
457    async fn submit_transaction(
458        &self,
459        tx: Transaction,
460    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
461
462    async fn await_block(
463        &self,
464        block_index: u64,
465        decoders: &ModuleDecoderRegistry,
466    ) -> anyhow::Result<SessionOutcome>;
467
468    async fn get_session_status(
469        &self,
470        block_index: u64,
471        decoders: &ModuleDecoderRegistry,
472        core_api_version: ApiVersion,
473        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
474    ) -> anyhow::Result<SessionStatus>;
475
476    async fn session_count(&self) -> FederationResult<u64>;
477
478    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
479
480    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
481
482    async fn download_backup(
483        &self,
484        id: &secp256k1::PublicKey,
485    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
486
487    /// Sets the password used to decrypt the configs and authenticate
488    ///
489    /// Must be called first before any other calls to the API
490    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
491
492    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
493
494    async fn set_local_params(
495        &self,
496        name: String,
497        federation_name: Option<String>,
498        auth: ApiAuth,
499    ) -> FederationResult<String>;
500
501    async fn add_peer_connection_info(
502        &self,
503        info: String,
504        auth: ApiAuth,
505    ) -> FederationResult<String>;
506
507    /// Reset the peer setup codes during the federation setup process
508    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
509
510    /// During config gen, used for an API-to-API call that adds a peer's server
511    /// connection info to the leader.
512    ///
513    /// Note this call will fail until the leader has their API running and has
514    /// `set_server_connections` so clients should retry.
515    ///
516    /// This call is not authenticated because it's guardian-to-guardian
517    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
518
519    /// During config gen, gets all the server connections we've received from
520    /// peers using `add_config_gen_peer`
521    ///
522    /// Could be called on the leader, so it's not authenticated
523    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
524
525    /// Runs DKG, can only be called once after configs have been generated in
526    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
527    /// error and config gen must be restarted.
528    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
529
530    /// After DKG, returns the hash of the consensus config tweaked with our id.
531    /// We need to share this with all other peers to complete verification.
532    async fn get_verify_config_hash(
533        &self,
534        auth: ApiAuth,
535    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
536
537    /// Updates local state and notify leader that we have verified configs.
538    /// This allows for a synchronization point, before we start consensus.
539    async fn verified_configs(
540        &self,
541        auth: ApiAuth,
542    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
543
544    /// Reads the configs from the disk, starts the consensus server, and shuts
545    /// down the config gen API to start the Fedimint API
546    ///
547    /// Clients may receive an error due to forced shutdown, should call the
548    /// `server_status` to see if consensus has started.
549    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
550
551    /// Returns the status of the server
552    async fn status(&self) -> FederationResult<StatusResponse>;
553
554    /// Show an audit across all modules
555    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
556
557    /// Download the guardian config to back it up
558    async fn guardian_config_backup(&self, auth: ApiAuth)
559    -> FederationResult<GuardianConfigBackup>;
560
561    /// Check auth credentials
562    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
563
564    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
565
566    /// Publish our signed API announcement to other guardians
567    async fn submit_api_announcement(
568        &self,
569        peer_id: PeerId,
570        announcement: SignedApiAnnouncement,
571    ) -> FederationResult<()>;
572
573    async fn api_announcements(
574        &self,
575        guardian: PeerId,
576    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
577
578    async fn sign_api_announcement(
579        &self,
580        api_url: SafeUrl,
581        auth: ApiAuth,
582    ) -> FederationResult<SignedApiAnnouncement>;
583
584    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
585
586    /// Returns the fedimintd version a peer is running
587    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
588
589    /// Fetch the backup statistics from the federation (admin endpoint)
590    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
591
592    /// Get the invite code for the federation guardian.
593    /// For instance, useful after DKG
594    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
595}
596
597pub fn deserialize_outcome<R>(
598    outcome: &SerdeOutputOutcome,
599    module_decoder: &Decoder,
600) -> OutputOutcomeResult<R>
601where
602    R: OutputOutcome + MaybeSend,
603{
604    let dyn_outcome = outcome
605        .try_into_inner_known_module_kind(module_decoder)
606        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
607
608    let source_instance = dyn_outcome.module_instance_id();
609
610    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
611        let target_type = std::any::type_name::<R>();
612        OutputOutcomeError::ResponseDeserialization(anyhow!(
613            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
614        ))
615    })
616}
617
618#[derive(Debug, Clone)]
619pub struct WebsocketConnector {
620    peers: BTreeMap<PeerId, SafeUrl>,
621    api_secret: Option<String>,
622
623    /// List of overrides to use when attempting to connect to given
624    /// `PeerId`
625    ///
626    /// This is useful for testing, or forcing non-default network
627    /// connectivity.
628    pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
629}
630
631impl WebsocketConnector {
632    fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
633        let mut s = Self::new_no_overrides(peers, api_secret);
634
635        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
636            s = s.with_connection_override(k, v);
637        }
638
639        Ok(s)
640    }
641    pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
642        self.connection_overrides.insert(peer_id, url);
643        self
644    }
645    pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
646        Self {
647            peers,
648            api_secret,
649            connection_overrides: BTreeMap::default(),
650        }
651    }
652}
653
654#[async_trait]
655impl IClientConnector for WebsocketConnector {
656    fn peers(&self) -> BTreeSet<PeerId> {
657        self.peers.keys().copied().collect()
658    }
659
660    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
661        let api_endpoint = match self.connection_overrides.get(&peer_id) {
662            Some(url) => {
663                trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
664                url
665            }
666            None => self.peers.get(&peer_id).ok_or_else(|| {
667                PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
668            })?,
669        };
670
671        #[cfg(not(target_family = "wasm"))]
672        let mut client = {
673            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
674            let mut root_certs = RootCertStore::empty();
675            root_certs.extend(webpki_roots);
676
677            let tls_cfg = CustomCertStore::builder()
678                .with_root_certificates(root_certs)
679                .with_no_client_auth();
680
681            WsClientBuilder::default()
682                .max_concurrent_requests(u16::MAX as usize)
683                .with_custom_cert_store(tls_cfg)
684        };
685
686        #[cfg(target_family = "wasm")]
687        let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
688
689        if let Some(api_secret) = &self.api_secret {
690            #[cfg(not(target_family = "wasm"))]
691            {
692                // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
693                // but we can set up the headers manually
694                let mut headers = HeaderMap::new();
695
696                let auth = base64::engine::general_purpose::STANDARD
697                    .encode(format!("fedimint:{api_secret}"));
698
699                headers.insert(
700                    "Authorization",
701                    HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
702                );
703
704                client = client.set_headers(headers);
705            }
706            #[cfg(target_family = "wasm")]
707            {
708                // on wasm, url will be handled by the browser, which should take care of
709                // `user:pass@...`
710                let mut url = api_endpoint.clone();
711                url.set_username("fedimint")
712                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
713                url.set_password(Some(&api_secret))
714                    .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
715
716                let client = client
717                    .build(url.as_str())
718                    .await
719                    .map_err(|err| PeerError::InternalClientError(err.into()))?;
720
721                return Ok(client.into_dyn());
722            }
723        }
724
725        let client = client
726            .build(api_endpoint.as_str())
727            .await
728            .map_err(|err| PeerError::InternalClientError(err.into()))?;
729
730        Ok(client.into_dyn())
731    }
732}
733
734#[cfg(all(feature = "tor", not(target_family = "wasm")))]
735#[derive(Debug, Clone)]
736pub struct TorConnector {
737    peers: BTreeMap<PeerId, SafeUrl>,
738    api_secret: Option<String>,
739}
740
741#[cfg(all(feature = "tor", not(target_family = "wasm")))]
742impl TorConnector {
743    pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
744        Self { peers, api_secret }
745    }
746}
747
748#[cfg(all(feature = "tor", not(target_family = "wasm")))]
749#[async_trait]
750impl IClientConnector for TorConnector {
751    fn peers(&self) -> BTreeSet<PeerId> {
752        self.peers.keys().copied().collect()
753    }
754
755    #[allow(clippy::too_many_lines)]
756    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
757        let api_endpoint = self
758            .peers
759            .get(&peer_id)
760            .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
761
762        let tor_config = TorClientConfig::default();
763        let tor_client = TorClient::create_bootstrapped(tor_config)
764            .await
765            .map_err(|err| PeerError::InternalClientError(err.into()))?
766            .isolated_client();
767
768        debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
769
770        // TODO: (@leonardo) should we implement our `IntoTorAddr` for `SafeUrl`
771        // instead?
772        let addr = (
773            api_endpoint
774                .host_str()
775                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
776            api_endpoint
777                .port_or_known_default()
778                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
779        );
780        let tor_addr = TorAddr::from(addr).map_err(|e| {
781            PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
782        })?;
783
784        let tor_addr_clone = tor_addr.clone();
785
786        debug!(
787            ?tor_addr,
788            ?addr,
789            "Successfully created `TorAddr` for given address (i.e. host and port)"
790        );
791
792        // TODO: It can be updated to use `is_onion_address()` implementation,
793        // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
794        let anonymized_stream = if api_endpoint.is_onion_address() {
795            let mut stream_prefs = arti_client::StreamPrefs::default();
796            stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
797
798            let anonymized_stream = tor_client
799                .connect_with_prefs(tor_addr, &stream_prefs)
800                .await
801                .map_err(|e| PeerError::Connection(e.into()))?;
802
803            debug!(
804                ?tor_addr_clone,
805                "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
806            );
807            anonymized_stream
808        } else {
809            let anonymized_stream = tor_client
810                .connect(tor_addr)
811                .await
812                .map_err(|e| PeerError::Connection(e.into()))?;
813
814            debug!(
815                ?tor_addr_clone,
816                "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
817            );
818            anonymized_stream
819        };
820
821        let is_tls = match api_endpoint.scheme() {
822            "wss" => true,
823            "ws" => false,
824            unexpected_scheme => {
825                return Err(PeerError::InvalidEndpoint(anyhow!(
826                    "Unsupported scheme: {unexpected_scheme}"
827                )));
828            }
829        };
830
831        let tls_connector = if is_tls {
832            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
833            let mut root_certs = RootCertStore::empty();
834            root_certs.extend(webpki_roots);
835
836            let tls_config = TlsClientConfig::builder()
837                .with_root_certificates(root_certs)
838                .with_no_client_auth();
839            let tls_connector = TlsConnector::from(Arc::new(tls_config));
840            Some(tls_connector)
841        } else {
842            None
843        };
844
845        let mut ws_client_builder =
846            WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
847
848        if let Some(api_secret) = &self.api_secret {
849            // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
850            // but we can set up the headers manually
851            let mut headers = HeaderMap::new();
852
853            let auth =
854                base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
855
856            headers.insert(
857                "Authorization",
858                HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
859            );
860
861            ws_client_builder = ws_client_builder.set_headers(headers);
862        }
863
864        match tls_connector {
865            None => {
866                let client = ws_client_builder
867                    .build_with_stream(api_endpoint.as_str(), anonymized_stream)
868                    .await
869                    .map_err(|e| PeerError::Connection(e.into()))?;
870
871                Ok(client.into_dyn())
872            }
873            Some(tls_connector) => {
874                let host = api_endpoint
875                    .host_str()
876                    .map(ToOwned::to_owned)
877                    .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
878
879                // FIXME: (@leonardo) Is this leaking any data ? Should investigate it further
880                // if it's really needed.
881                let server_name = rustls_pki_types::ServerName::try_from(host)
882                    .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
883
884                let anonymized_tls_stream = tls_connector
885                    .connect(server_name, anonymized_stream)
886                    .await
887                    .map_err(|e| PeerError::Connection(e.into()))?;
888
889                let client = ws_client_builder
890                    .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
891                    .await
892                    .map_err(|e| PeerError::Connection(e.into()))?;
893
894                Ok(client.into_dyn())
895            }
896        }
897    }
898}
899
900fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
901    match jsonrpc_error {
902        JsonRpcClientError::Call(error_object) => {
903            let error = anyhow!(error_object.message().to_owned());
904            match ErrorCode::from(error_object.code()) {
905                ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
906                    PeerError::InvalidRequest(error)
907                }
908                ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
909                ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
910                ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
911                    PeerError::ServerError(error)
912                }
913            }
914        }
915        JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
916        JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
917        JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
918        JsonRpcClientError::InvalidSubscriptionId => {
919            PeerError::Transport(anyhow!("Invalid subscription id"))
920        }
921        JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
922            PeerError::InvalidRequest(anyhow!(invalid_request_id))
923        }
924        JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
925        JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
926        JsonRpcClientError::HttpNotImplemented => {
927            PeerError::ServerError(anyhow!("Http not implemented"))
928        }
929        JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
930            PeerError::InvalidRequest(anyhow!(empty_batch_request))
931        }
932        JsonRpcClientError::RegisterMethod(register_method_error) => {
933            PeerError::InvalidResponse(anyhow!(register_method_error))
934        }
935    }
936}
937
938#[async_trait]
939impl IClientConnection for WsClient {
940    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
941        let method = match method {
942            ApiMethod::Core(method) => method,
943            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
944        };
945
946        Ok(ClientT::request(self, &method, [request.to_json()])
947            .await
948            .map_err(jsonrpc_error_to_peer_error)?)
949    }
950
951    async fn await_disconnection(&self) {
952        self.on_disconnect().await;
953    }
954}
955
956pub type DynClientConnector = Arc<dyn IClientConnector>;
957
958/// Allows to connect to peers. Connections are request based and should be
959/// authenticated and encrypted for production deployments.
960#[async_trait]
961pub trait IClientConnector: Send + Sync + 'static {
962    fn peers(&self) -> BTreeSet<PeerId>;
963
964    async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
965
966    fn into_dyn(self) -> DynClientConnector
967    where
968        Self: Sized,
969    {
970        Arc::new(self)
971    }
972}
973
974pub type DynClientConnection = Arc<dyn IClientConnection>;
975
976#[async_trait]
977pub trait IClientConnection: Debug + Send + Sync + 'static {
978    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
979
980    async fn await_disconnection(&self);
981
982    fn into_dyn(self) -> DynClientConnection
983    where
984        Self: Sized,
985    {
986        Arc::new(self)
987    }
988}
989
990#[derive(Clone, Debug)]
991pub struct ReconnectFederationApi {
992    peers: BTreeSet<PeerId>,
993    admin_id: Option<PeerId>,
994    module_id: Option<ModuleInstanceId>,
995    connections: ReconnectClientConnections,
996}
997
998impl ReconnectFederationApi {
999    fn new(connector: &DynClientConnector, admin_id: Option<PeerId>) -> Self {
1000        Self {
1001            peers: connector.peers(),
1002            admin_id,
1003            module_id: None,
1004            connections: ReconnectClientConnections::new(connector),
1005        }
1006    }
1007
1008    pub async fn new_admin(
1009        peer: PeerId,
1010        url: SafeUrl,
1011        api_secret: &Option<String>,
1012    ) -> anyhow::Result<Self> {
1013        Self::from_endpoints(once((peer, url)), api_secret, Some(peer)).await
1014    }
1015
1016    pub async fn from_endpoints(
1017        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1018        api_secret: &Option<String>,
1019        admin_id: Option<PeerId>,
1020    ) -> anyhow::Result<Self> {
1021        let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1022
1023        let scheme = peers
1024            .values()
1025            .next()
1026            .expect("Federation api has been initialized with no peers")
1027            .scheme();
1028
1029        let connector = match scheme {
1030            "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1031            #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1032            "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1033            "iroh" => iroh::IrohConnector::new(peers).await?.into_dyn(),
1034            scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1035        };
1036
1037        Ok(ReconnectFederationApi::new(&connector, admin_id))
1038    }
1039}
1040
1041impl IModuleFederationApi for ReconnectFederationApi {}
1042
1043#[apply(async_trait_maybe_send!)]
1044impl IRawFederationApi for ReconnectFederationApi {
1045    fn all_peers(&self) -> &BTreeSet<PeerId> {
1046        &self.peers
1047    }
1048
1049    fn self_peer(&self) -> Option<PeerId> {
1050        self.admin_id
1051    }
1052
1053    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1054        ReconnectFederationApi {
1055            peers: self.peers.clone(),
1056            admin_id: self.admin_id,
1057            module_id: Some(id),
1058            connections: self.connections.clone(),
1059        }
1060        .into()
1061    }
1062
1063    #[instrument(
1064        target = LOG_NET_API,
1065        skip_all,
1066        fields(
1067            peer_id = %peer_id,
1068            method = %method,
1069            params = %params.params,
1070        )
1071    )]
1072    async fn request_raw(
1073        &self,
1074        peer_id: PeerId,
1075        method: &str,
1076        params: &ApiRequestErased,
1077    ) -> PeerResult<Value> {
1078        let method = match self.module_id {
1079            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1080            None => ApiMethod::Core(method.to_string()),
1081        };
1082
1083        self.connections
1084            .request(peer_id, method, params.clone())
1085            .await
1086    }
1087}
1088
1089#[derive(Clone, Debug)]
1090pub struct ReconnectClientConnections {
1091    connections: BTreeMap<PeerId, ClientConnection>,
1092}
1093
1094impl ReconnectClientConnections {
1095    pub fn new(connector: &DynClientConnector) -> Self {
1096        ReconnectClientConnections {
1097            connections: connector
1098                .peers()
1099                .into_iter()
1100                .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1101                .collect(),
1102        }
1103    }
1104
1105    async fn request(
1106        &self,
1107        peer: PeerId,
1108        method: ApiMethod,
1109        request: ApiRequestErased,
1110    ) -> PeerResult<Value> {
1111        trace!(target: LOG_NET_API, %method, "Api request");
1112        let res = self
1113            .connections
1114            .get(&peer)
1115            .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1116            .connection()
1117            .await
1118            .context("Failed to connect to peer")
1119            .map_err(PeerError::Connection)?
1120            .request(method.clone(), request)
1121            .await;
1122
1123        trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1124
1125        res
1126    }
1127}
1128
1129#[derive(Clone, Debug)]
1130struct ClientConnection {
1131    sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1132}
1133
1134impl ClientConnection {
1135    fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1136        let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1137
1138        fedimint_core::task::spawn(
1139            "peer-api-connection",
1140            async move {
1141                let mut backoff = api_networking_backoff();
1142
1143                while let Ok(sender) = receiver.recv().await {
1144                    let mut senders = vec![sender];
1145
1146                    // Drain the queue, so we everyone that already joined fail or succeed
1147                    // together.
1148                    while let Ok(sender) = receiver.try_recv() {
1149                        senders.push(sender);
1150                    }
1151
1152                    match connector.connect(peer).await {
1153                        Ok(connection) => {
1154                            trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1155
1156                            for sender in senders {
1157                                sender.send(connection.clone()).ok();
1158                            }
1159
1160                            loop {
1161                                tokio::select! {
1162                                    sender = receiver.recv() => {
1163                                        match sender.ok() {
1164                                            Some(sender) => sender.send(connection.clone()).ok(),
1165                                            None => break,
1166                                        };
1167                                    }
1168                                    () = connection.await_disconnection() => break,
1169                                }
1170                            }
1171
1172                            trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1173
1174                            backoff = api_networking_backoff();
1175                        }
1176                        Err(e) => {
1177                            trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1178
1179                            fedimint_core::task::sleep(
1180                                backoff.next().expect("No limit to the number of retries"),
1181                            )
1182                            .await;
1183                        }
1184                    }
1185                }
1186
1187                trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1188            }
1189            .instrument(trace_span!("peer-api-connection", ?peer)),
1190        );
1191
1192        ClientConnection { sender }
1193    }
1194
1195    async fn connection(&self) -> Option<DynClientConnection> {
1196        let (sender, receiver) = oneshot::channel();
1197
1198        self.sender
1199            .send(sender)
1200            .await
1201            .inspect_err(|err| {
1202                warn!(
1203                    target: LOG_CLIENT_NET_API,
1204                    err = %err.fmt_compact(),
1205                    "Api connection request channel closed unexpectedly"
1206                );
1207            })
1208            .ok()?;
1209
1210        receiver.await.ok()
1211    }
1212}
1213
1214mod iroh {
1215    use std::collections::{BTreeMap, BTreeSet};
1216    use std::str::FromStr;
1217
1218    use anyhow::Context;
1219    use async_trait::async_trait;
1220    use fedimint_core::PeerId;
1221    use fedimint_core::envs::parse_kv_list_from_env;
1222    use fedimint_core::module::{
1223        ApiError, ApiMethod, ApiRequestErased, FEDIMINT_API_ALPN, IrohApiRequest,
1224    };
1225    use fedimint_core::util::SafeUrl;
1226    use fedimint_logging::LOG_NET_IROH;
1227    use iroh::endpoint::Connection;
1228    use iroh::{Endpoint, NodeAddr, NodeId, PublicKey};
1229    use iroh_base::ticket::NodeTicket;
1230    use serde_json::Value;
1231    use tracing::{debug, trace, warn};
1232
1233    use super::{DynClientConnection, IClientConnection, IClientConnector, PeerError, PeerResult};
1234
1235    #[derive(Debug, Clone)]
1236    pub struct IrohConnector {
1237        node_ids: BTreeMap<PeerId, NodeId>,
1238        endpoint: Endpoint,
1239
1240        /// List of overrides to use when attempting to connect to given
1241        /// `NodeId`
1242        ///
1243        /// This is useful for testing, or forcing non-default network
1244        /// connectivity.
1245        pub connection_overrides: BTreeMap<NodeId, NodeAddr>,
1246    }
1247
1248    impl IrohConnector {
1249        pub async fn new(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1250            const FM_IROH_CONNECT_OVERRIDES_ENV: &str = "FM_IROH_CONNECT_OVERRIDES";
1251            warn!(target: LOG_NET_IROH, "Iroh support is experimental");
1252            let mut s = Self::new_no_overrides(peers).await?;
1253
1254            for (k, v) in parse_kv_list_from_env::<_, NodeTicket>(FM_IROH_CONNECT_OVERRIDES_ENV)? {
1255                s = s.with_connection_override(k, v.into());
1256            }
1257
1258            Ok(s)
1259        }
1260
1261        pub async fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>) -> anyhow::Result<Self> {
1262            let node_ids = peers
1263                .into_iter()
1264                .map(|(peer, url)| {
1265                    let host = url.host_str().context("Url is missing host")?;
1266
1267                    let node_id = PublicKey::from_str(host).context("Failed to parse node id")?;
1268
1269                    Ok((peer, node_id))
1270                })
1271                .collect::<anyhow::Result<BTreeMap<PeerId, NodeId>>>()?;
1272
1273            let builder = Endpoint::builder().discovery_n0();
1274            #[cfg(not(target_family = "wasm"))]
1275            let builder = builder.discovery_dht();
1276            let endpoint = builder.bind().await?;
1277            debug!(
1278                target: LOG_NET_IROH,
1279                node_id = %endpoint.node_id(),
1280                node_id_pkarr = %z32::encode(endpoint.node_id().as_bytes()),
1281                "Iroh api client endpoint"
1282            );
1283
1284            Ok(Self {
1285                node_ids,
1286                endpoint,
1287                connection_overrides: BTreeMap::new(),
1288            })
1289        }
1290
1291        pub fn with_connection_override(mut self, node: NodeId, addr: NodeAddr) -> Self {
1292            self.connection_overrides.insert(node, addr);
1293            self
1294        }
1295    }
1296
1297    #[async_trait]
1298    impl IClientConnector for IrohConnector {
1299        fn peers(&self) -> BTreeSet<PeerId> {
1300            self.node_ids.keys().copied().collect()
1301        }
1302
1303        async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
1304            let node_id = *self
1305                .node_ids
1306                .get(&peer_id)
1307                .ok_or(PeerError::InvalidPeerId { peer_id })?;
1308
1309            let connection = match self.connection_overrides.get(&node_id) {
1310                Some(node_addr) => {
1311                    trace!(target: LOG_NET_IROH, %node_id, "Using a connectivity override for connection");
1312                    self.endpoint
1313                        .connect(node_addr.clone(), FEDIMINT_API_ALPN)
1314                        .await
1315                }
1316                None => self.endpoint.connect(node_id, FEDIMINT_API_ALPN).await,
1317            }.map_err(PeerError::Connection)?;
1318
1319            Ok(connection.into_dyn())
1320        }
1321    }
1322
1323    #[async_trait]
1324    impl IClientConnection for Connection {
1325        async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1326            let json = serde_json::to_vec(&IrohApiRequest { method, request })
1327                .expect("Serialization to vec can't fail");
1328
1329            let (mut sink, mut stream) = self
1330                .open_bi()
1331                .await
1332                .map_err(|e| PeerError::Transport(e.into()))?;
1333
1334            sink.write_all(&json)
1335                .await
1336                .map_err(|e| PeerError::Transport(e.into()))?;
1337
1338            sink.finish().map_err(|e| PeerError::Transport(e.into()))?;
1339
1340            let response = stream
1341                .read_to_end(1_000_000)
1342                .await
1343                .map_err(|e| PeerError::Transport(e.into()))?;
1344
1345            // TODO: We should not be serializing Results on the wire
1346            let response = serde_json::from_slice::<Result<Value, ApiError>>(&response)
1347                .map_err(|e| PeerError::InvalidResponse(e.into()))?;
1348
1349            response.map_err(|e| PeerError::InvalidResponse(anyhow::anyhow!("Api Error: {:?}", e)))
1350        }
1351
1352        async fn await_disconnection(&self) {
1353            self.closed().await;
1354        }
1355    }
1356}
1357
1358/// The status of a server, including how it views its peers
1359#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1360pub struct LegacyFederationStatus {
1361    pub session_count: u64,
1362    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1363    pub peers_online: u64,
1364    pub peers_offline: u64,
1365    /// This should always be 0 if everything is okay, so a monitoring tool
1366    /// should generate an alert if this is not the case.
1367    pub peers_flagged: u64,
1368    pub scheduled_shutdown: Option<u64>,
1369}
1370
1371#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1372pub struct LegacyPeerStatus {
1373    pub last_contribution: Option<u64>,
1374    pub connection_status: LegacyP2PConnectionStatus,
1375    /// Indicates that this peer needs attention from the operator since
1376    /// it has not contributed to the consensus in a long time
1377    pub flagged: bool,
1378}
1379
1380#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1381#[serde(rename_all = "snake_case")]
1382pub enum LegacyP2PConnectionStatus {
1383    #[default]
1384    Disconnected,
1385    Connected,
1386}
1387
1388#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1389pub struct StatusResponse {
1390    pub server: ServerStatusLegacy,
1391    pub federation: Option<LegacyFederationStatus>,
1392}
1393
1394/// Archive of all the guardian config files that can be used to recover a lost
1395/// guardian node.
1396#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
1397pub struct GuardianConfigBackup {
1398    #[serde(with = "fedimint_core::hex::serde")]
1399    pub tar_archive_bytes: Vec<u8>,
1400}
1401
1402#[cfg(test)]
1403mod tests {
1404    use std::str::FromStr as _;
1405
1406    use fedimint_core::config::FederationId;
1407    use fedimint_core::invite_code::InviteCode;
1408
1409    use super::*;
1410
1411    #[test]
1412    fn converts_invite_code() {
1413        let connect = InviteCode::new(
1414            "ws://test1".parse().unwrap(),
1415            PeerId::from(1),
1416            FederationId::dummy(),
1417            Some("api_secret".into()),
1418        );
1419
1420        let bech32 = connect.to_string();
1421        let connect_parsed = InviteCode::from_str(&bech32).expect("parses");
1422        assert_eq!(connect, connect_parsed);
1423
1424        let json = serde_json::to_string(&connect).unwrap();
1425        let connect_as_string: String = serde_json::from_str(&json).unwrap();
1426        assert_eq!(connect_as_string, bech32);
1427        let connect_parsed_json: InviteCode = serde_json::from_str(&json).unwrap();
1428        assert_eq!(connect_parsed_json, connect_parsed);
1429    }
1430
1431    #[test]
1432    fn creates_essential_guardians_invite_code() {
1433        let mut peer_to_url_map = BTreeMap::new();
1434        peer_to_url_map.insert(PeerId::from(0), "ws://test1".parse().expect("URL fail"));
1435        peer_to_url_map.insert(PeerId::from(1), "ws://test2".parse().expect("URL fail"));
1436        peer_to_url_map.insert(PeerId::from(2), "ws://test3".parse().expect("URL fail"));
1437        peer_to_url_map.insert(PeerId::from(3), "ws://test4".parse().expect("URL fail"));
1438        let max_size = peer_to_url_map.to_num_peers().max_evil() + 1;
1439
1440        let code =
1441            InviteCode::new_with_essential_num_guardians(&peer_to_url_map, FederationId::dummy());
1442
1443        assert_eq!(FederationId::dummy(), code.federation_id());
1444
1445        let expected_map: BTreeMap<PeerId, SafeUrl> =
1446            peer_to_url_map.into_iter().take(max_size).collect();
1447        assert_eq!(expected_map, code.peers());
1448    }
1449}