fedimint_api_client/api/
mod.rs

1mod error;
2pub mod global_api;
3mod iroh;
4pub mod net;
5
6use core::{fmt, panic};
7use std::collections::{BTreeMap, BTreeSet, HashMap};
8use std::fmt::Debug;
9use std::iter::once;
10use std::pin::Pin;
11use std::result;
12use std::sync::Arc;
13
14use anyhow::{Context, anyhow};
15#[cfg(all(feature = "tor", not(target_family = "wasm")))]
16use arti_client::{TorAddr, TorClient, TorClientConfig};
17use async_channel::bounded;
18use async_trait::async_trait;
19use base64::Engine as _;
20use bitcoin::hashes::sha256;
21use bitcoin::secp256k1;
22pub use error::{FederationError, OutputOutcomeError, PeerError};
23use fedimint_core::admin_client::{
24    GuardianConfigBackup, PeerServerParamsLegacy, ServerStatusLegacy, SetupStatus,
25};
26use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
27use fedimint_core::core::backup::SignedBackupRequest;
28use fedimint_core::core::{Decoder, DynOutputOutcome, ModuleInstanceId, OutputOutcome};
29use fedimint_core::encoding::{Decodable, Encodable};
30use fedimint_core::envs::{
31    FM_IROH_DNS_ENV, FM_WS_API_CONNECT_OVERRIDES_ENV, parse_kv_list_from_env,
32};
33use fedimint_core::invite_code::InviteCode;
34use fedimint_core::module::audit::AuditSummary;
35use fedimint_core::module::registry::ModuleDecoderRegistry;
36use fedimint_core::module::{
37    ApiAuth, ApiMethod, ApiRequestErased, ApiVersion, SerdeModuleEncoding,
38};
39use fedimint_core::net::api_announcement::SignedApiAnnouncement;
40#[cfg(not(target_family = "wasm"))]
41use fedimint_core::rustls::install_crypto_provider;
42use fedimint_core::session_outcome::{SessionOutcome, SessionStatus};
43use fedimint_core::task::{MaybeSend, MaybeSync};
44use fedimint_core::transaction::{Transaction, TransactionSubmissionOutcome};
45use fedimint_core::util::backoff_util::api_networking_backoff;
46use fedimint_core::util::{FmtCompact as _, SafeUrl};
47use fedimint_core::{
48    NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send, dyn_newtype_define, util,
49};
50use fedimint_logging::{LOG_CLIENT_NET_API, LOG_NET_API, LOG_NET_WS};
51use futures::channel::oneshot;
52use futures::future::pending;
53use futures::stream::FuturesUnordered;
54use futures::{Future, StreamExt};
55use global_api::with_cache::GlobalFederationApiWithCache;
56use jsonrpsee_core::DeserializeOwned;
57use jsonrpsee_core::client::ClientT;
58pub use jsonrpsee_core::client::Error as JsonRpcClientError;
59use jsonrpsee_types::ErrorCode;
60#[cfg(target_family = "wasm")]
61use jsonrpsee_wasm_client::{Client as WsClient, WasmClientBuilder as WsClientBuilder};
62#[cfg(not(target_family = "wasm"))]
63use jsonrpsee_ws_client::{CustomCertStore, HeaderMap, HeaderValue};
64#[cfg(not(target_family = "wasm"))]
65use jsonrpsee_ws_client::{WsClient, WsClientBuilder};
66use serde::{Deserialize, Serialize};
67use serde_json::Value;
68use tokio::sync::OnceCell;
69#[cfg(not(target_family = "wasm"))]
70use tokio_rustls::rustls::RootCertStore;
71#[cfg(all(feature = "tor", not(target_family = "wasm")))]
72use tokio_rustls::{TlsConnector, rustls::ClientConfig as TlsClientConfig};
73use tracing::{Instrument, debug, instrument, trace, trace_span, warn};
74
75use crate::query::{QueryStep, QueryStrategy, ThresholdConsensus};
76
77pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2: ApiVersion = ApiVersion::new(0, 5);
78
79pub const VERSION_THAT_INTRODUCED_GET_SESSION_STATUS: ApiVersion =
80    ApiVersion { major: 0, minor: 1 };
81
82pub type PeerResult<T> = Result<T, PeerError>;
83pub type JsonRpcResult<T> = Result<T, JsonRpcClientError>;
84pub type FederationResult<T> = Result<T, FederationError>;
85pub type SerdeOutputOutcome = SerdeModuleEncoding<DynOutputOutcome>;
86
87pub type OutputOutcomeResult<O> = result::Result<O, OutputOutcomeError>;
88
89/// Set of api versions for each component (core + modules)
90///
91/// E.g. result of federated common api versions discovery.
92#[derive(Debug, Clone, Serialize, Deserialize, Encodable, Decodable)]
93pub struct ApiVersionSet {
94    pub core: ApiVersion,
95    pub modules: BTreeMap<ModuleInstanceId, ApiVersion>,
96}
97
98/// An API (module or global) that can query a federation
99#[apply(async_trait_maybe_send!)]
100pub trait IRawFederationApi: Debug + MaybeSend + MaybeSync {
101    /// List of all federation peers for the purpose of iterating each peer
102    /// in the federation.
103    ///
104    /// The underlying implementation is responsible for knowing how many
105    /// and `PeerId`s of each. The caller of this interface most probably
106    /// have some idea as well, but passing this set across every
107    /// API call to the federation would be inconvenient.
108    fn all_peers(&self) -> &BTreeSet<PeerId>;
109
110    /// `PeerId` of the Guardian node, if set
111    ///
112    /// This is for using Client in a "Admin" mode, making authenticated
113    /// calls to own `fedimintd` instance.
114    fn self_peer(&self) -> Option<PeerId>;
115
116    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi;
117
118    /// Make request to a specific federation peer by `peer_id`
119    async fn request_raw(
120        &self,
121        peer_id: PeerId,
122        method: &str,
123        params: &ApiRequestErased,
124    ) -> PeerResult<Value>;
125
126    fn connector(&self) -> &DynClientConnector;
127}
128
129/// An extension trait allowing to making federation-wide API call on top
130/// [`IRawFederationApi`].
131#[apply(async_trait_maybe_send!)]
132pub trait FederationApiExt: IRawFederationApi {
133    async fn request_single_peer<Ret>(
134        &self,
135        method: String,
136        params: ApiRequestErased,
137        peer: PeerId,
138    ) -> PeerResult<Ret>
139    where
140        Ret: DeserializeOwned,
141    {
142        self.request_raw(peer, &method, &params)
143            .await
144            .and_then(|v| {
145                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
146            })
147    }
148
149    async fn request_single_peer_federation<FedRet>(
150        &self,
151        method: String,
152        params: ApiRequestErased,
153        peer_id: PeerId,
154    ) -> FederationResult<FedRet>
155    where
156        FedRet: serde::de::DeserializeOwned + Eq + Debug + Clone + MaybeSend,
157    {
158        self.request_raw(peer_id, &method, &params)
159            .await
160            .and_then(|v| {
161                serde_json::from_value(v).map_err(|e| PeerError::ResponseDeserialization(e.into()))
162            })
163            .map_err(|e| error::FederationError::new_one_peer(peer_id, method, params, e))
164    }
165
166    /// Make an aggregate request to federation, using `strategy` to logically
167    /// merge the responses.
168    #[instrument(target = LOG_NET_API, skip_all, fields(method=method))]
169    async fn request_with_strategy<PR: DeserializeOwned, FR: Debug>(
170        &self,
171        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
172        method: String,
173        params: ApiRequestErased,
174    ) -> FederationResult<FR> {
175        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
176        // completed results from it and we don't do any `await`s when
177        // processing them, it should be totally OK.
178        #[cfg(not(target_family = "wasm"))]
179        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
180        #[cfg(target_family = "wasm")]
181        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
182
183        for peer in self.all_peers() {
184            futures.push(Box::pin({
185                let method = &method;
186                let params = &params;
187                async move {
188                    let result = self
189                        .request_single_peer(method.clone(), params.clone(), *peer)
190                        .await;
191
192                    (*peer, result)
193                }
194            }));
195        }
196
197        let mut peer_errors = BTreeMap::new();
198        let peer_error_threshold = self.all_peers().to_num_peers().one_honest();
199
200        loop {
201            let (peer, result) = futures
202                .next()
203                .await
204                .expect("Query strategy ran out of peers to query without returning a result");
205
206            match result {
207                Ok(response) => match strategy.process(peer, response) {
208                    QueryStep::Retry(peers) => {
209                        for peer in peers {
210                            futures.push(Box::pin({
211                                let method = &method;
212                                let params = &params;
213                                async move {
214                                    let result = self
215                                        .request_single_peer(method.clone(), params.clone(), peer)
216                                        .await;
217
218                                    (peer, result)
219                                }
220                            }));
221                        }
222                    }
223                    QueryStep::Success(response) => return Ok(response),
224                    QueryStep::Failure(e) => {
225                        peer_errors.insert(peer, e);
226                    }
227                    QueryStep::Continue => {}
228                },
229                Err(e) => {
230                    e.report_if_unusual(peer, "RequestWithStrategy");
231                    peer_errors.insert(peer, e);
232                }
233            }
234
235            if peer_errors.len() == peer_error_threshold {
236                return Err(FederationError::peer_errors(
237                    method.clone(),
238                    params.params.clone(),
239                    peer_errors,
240                ));
241            }
242        }
243    }
244
245    #[instrument(target = LOG_CLIENT_NET_API, level = "debug", skip(self, strategy))]
246    async fn request_with_strategy_retry<PR: DeserializeOwned + MaybeSend, FR: Debug>(
247        &self,
248        mut strategy: impl QueryStrategy<PR, FR> + MaybeSend,
249        method: String,
250        params: ApiRequestErased,
251    ) -> FR {
252        // NOTE: `FuturesUnorderded` is a footgun, but all we do here is polling
253        // completed results from it and we don't do any `await`s when
254        // processing them, it should be totally OK.
255        #[cfg(not(target_family = "wasm"))]
256        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _> + Send>>>::new();
257        #[cfg(target_family = "wasm")]
258        let mut futures = FuturesUnordered::<Pin<Box<dyn Future<Output = _>>>>::new();
259
260        for peer in self.all_peers() {
261            futures.push(Box::pin({
262                let method = &method;
263                let params = &params;
264                async move {
265                    let response = util::retry(
266                        format!("api-request-{method}-{peer}"),
267                        api_networking_backoff(),
268                        || async {
269                            self.request_single_peer(method.clone(), params.clone(), *peer)
270                                .await
271                                .inspect_err(|e| {
272                                    e.report_if_unusual(*peer, "QueryWithStrategyRetry");
273                                })
274                                .map_err(|e| anyhow!(e.to_string()))
275                        },
276                    )
277                    .await
278                    .expect("Number of retries has no limit");
279
280                    (*peer, response)
281                }
282            }));
283        }
284
285        loop {
286            let (peer, response) = match futures.next().await {
287                Some(t) => t,
288                None => pending().await,
289            };
290
291            match strategy.process(peer, response) {
292                QueryStep::Retry(peers) => {
293                    for peer in peers {
294                        futures.push(Box::pin({
295                            let method = &method;
296                            let params = &params;
297                            async move {
298                                let response = util::retry(
299                                    format!("api-request-{method}-{peer}"),
300                                    api_networking_backoff(),
301                                    || async {
302                                        self.request_single_peer(
303                                            method.clone(),
304                                            params.clone(),
305                                            peer,
306                                        )
307                                        .await
308                                        .inspect_err(|err| {
309                                            if err.is_unusual() {
310                                                debug!(target: LOG_CLIENT_NET_API, err = %err.fmt_compact(), "Unusual peer error");
311                                            }
312                                        })
313                                        .map_err(|e| anyhow!(e.to_string()))
314                                    },
315                                )
316                                .await
317                                .expect("Number of retries has no limit");
318
319                                (peer, response)
320                            }
321                        }));
322                    }
323                }
324                QueryStep::Success(response) => return response,
325                QueryStep::Failure(e) => {
326                    warn!("Query strategy returned non-retryable failure for peer {peer}: {e}");
327                }
328                QueryStep::Continue => {}
329            }
330        }
331    }
332
333    async fn request_current_consensus<Ret>(
334        &self,
335        method: String,
336        params: ApiRequestErased,
337    ) -> FederationResult<Ret>
338    where
339        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
340    {
341        self.request_with_strategy(
342            ThresholdConsensus::new(self.all_peers().to_num_peers()),
343            method,
344            params,
345        )
346        .await
347    }
348
349    async fn request_current_consensus_retry<Ret>(
350        &self,
351        method: String,
352        params: ApiRequestErased,
353    ) -> Ret
354    where
355        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
356    {
357        self.request_with_strategy_retry(
358            ThresholdConsensus::new(self.all_peers().to_num_peers()),
359            method,
360            params,
361        )
362        .await
363    }
364
365    async fn request_admin<Ret>(
366        &self,
367        method: &str,
368        params: ApiRequestErased,
369        auth: ApiAuth,
370    ) -> FederationResult<Ret>
371    where
372        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
373    {
374        let Some(self_peer_id) = self.self_peer() else {
375            return Err(FederationError::general(
376                method,
377                params,
378                anyhow::format_err!("Admin peer_id not set"),
379            ));
380        };
381
382        self.request_single_peer_federation(method.into(), params.with_auth(auth), self_peer_id)
383            .await
384    }
385
386    async fn request_admin_no_auth<Ret>(
387        &self,
388        method: &str,
389        params: ApiRequestErased,
390    ) -> FederationResult<Ret>
391    where
392        Ret: DeserializeOwned + Eq + Debug + Clone + MaybeSend,
393    {
394        let Some(self_peer_id) = self.self_peer() else {
395            return Err(FederationError::general(
396                method,
397                params,
398                anyhow::format_err!("Admin peer_id not set"),
399            ));
400        };
401
402        self.request_single_peer_federation(method.into(), params, self_peer_id)
403            .await
404    }
405}
406
407#[apply(async_trait_maybe_send!)]
408impl<T: ?Sized> FederationApiExt for T where T: IRawFederationApi {}
409
410/// Trait marker for the module (non-global) endpoints
411pub trait IModuleFederationApi: IRawFederationApi {}
412
413dyn_newtype_define! {
414    #[derive(Clone)]
415    pub DynModuleApi(Arc<IModuleFederationApi>)
416}
417
418dyn_newtype_define! {
419    #[derive(Clone)]
420    pub DynGlobalApi(Arc<IGlobalFederationApi>)
421}
422
423impl AsRef<dyn IGlobalFederationApi + 'static> for DynGlobalApi {
424    fn as_ref(&self) -> &(dyn IGlobalFederationApi + 'static) {
425        self.inner.as_ref()
426    }
427}
428
429impl DynGlobalApi {
430    pub async fn new_admin(
431        peer: PeerId,
432        url: SafeUrl,
433        api_secret: &Option<String>,
434        iroh_enable_dht: bool,
435        iroh_enable_next: bool,
436    ) -> anyhow::Result<DynGlobalApi> {
437        let connector =
438            make_admin_connector(peer, url, api_secret, iroh_enable_dht, iroh_enable_next).await?;
439        Ok(
440            GlobalFederationApiWithCache::new(ReconnectFederationApi::new(connector, Some(peer)))
441                .into(),
442        )
443    }
444
445    pub fn new(connector: DynClientConnector) -> anyhow::Result<Self> {
446        Ok(GlobalFederationApiWithCache::new(ReconnectFederationApi::new(connector, None)).into())
447    }
448    // FIXME: (@leonardo) Should we have the option to do DKG and config related
449    // actions through Tor ? Should we add the `Connector` choice to
450    // ConfigParams then ?
451    pub async fn from_setup_endpoint(
452        url: SafeUrl,
453        api_secret: &Option<String>,
454        iroh_enable_dht: bool,
455        iroh_enable_next: bool,
456    ) -> anyhow::Result<Self> {
457        // PeerIds are used only for informational purposes, but just in case, make a
458        // big number so it stands out
459
460        Self::new_admin(
461            PeerId::from(1024),
462            url,
463            api_secret,
464            iroh_enable_dht,
465            iroh_enable_next,
466        )
467        .await
468    }
469
470    pub async fn from_endpoints(
471        peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
472        api_secret: &Option<String>,
473        iroh_enable_dht: bool,
474        iroh_enable_next: bool,
475    ) -> anyhow::Result<Self> {
476        let connector =
477            make_connector(peers, api_secret, iroh_enable_dht, iroh_enable_next).await?;
478        Ok(GlobalFederationApiWithCache::new(ReconnectFederationApi::new(connector, None)).into())
479    }
480}
481
482/// The API for the global (non-module) endpoints
483#[apply(async_trait_maybe_send!)]
484pub trait IGlobalFederationApi: IRawFederationApi {
485    async fn submit_transaction(
486        &self,
487        tx: Transaction,
488    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome>;
489
490    async fn await_block(
491        &self,
492        block_index: u64,
493        decoders: &ModuleDecoderRegistry,
494    ) -> anyhow::Result<SessionOutcome>;
495
496    async fn get_session_status(
497        &self,
498        block_index: u64,
499        decoders: &ModuleDecoderRegistry,
500        core_api_version: ApiVersion,
501        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
502    ) -> anyhow::Result<SessionStatus>;
503
504    async fn session_count(&self) -> FederationResult<u64>;
505
506    async fn await_transaction(&self, txid: TransactionId) -> TransactionId;
507
508    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()>;
509
510    async fn download_backup(
511        &self,
512        id: &secp256k1::PublicKey,
513    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>>;
514
515    /// Sets the password used to decrypt the configs and authenticate
516    ///
517    /// Must be called first before any other calls to the API
518    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()>;
519
520    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus>;
521
522    async fn set_local_params(
523        &self,
524        name: String,
525        federation_name: Option<String>,
526        disable_base_fees: Option<bool>,
527        auth: ApiAuth,
528    ) -> FederationResult<String>;
529
530    async fn add_peer_connection_info(
531        &self,
532        info: String,
533        auth: ApiAuth,
534    ) -> FederationResult<String>;
535
536    /// Reset the peer setup codes during the federation setup process
537    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()>;
538
539    /// Returns the setup code if `set_local_params` was already called
540    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>>;
541
542    /// During config gen, used for an API-to-API call that adds a peer's server
543    /// connection info to the leader.
544    ///
545    /// Note this call will fail until the leader has their API running and has
546    /// `set_server_connections` so clients should retry.
547    ///
548    /// This call is not authenticated because it's guardian-to-guardian
549    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()>;
550
551    /// During config gen, gets all the server connections we've received from
552    /// peers using `add_config_gen_peer`
553    ///
554    /// Could be called on the leader, so it's not authenticated
555    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>>;
556
557    /// Runs DKG, can only be called once after configs have been generated in
558    /// `get_consensus_config_gen_params`.  If DKG fails this returns a 500
559    /// error and config gen must be restarted.
560    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()>;
561
562    /// After DKG, returns the hash of the consensus config tweaked with our id.
563    /// We need to share this with all other peers to complete verification.
564    async fn get_verify_config_hash(
565        &self,
566        auth: ApiAuth,
567    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
568
569    /// Updates local state and notify leader that we have verified configs.
570    /// This allows for a synchronization point, before we start consensus.
571    async fn verified_configs(
572        &self,
573        auth: ApiAuth,
574    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>>;
575
576    /// Reads the configs from the disk, starts the consensus server, and shuts
577    /// down the config gen API to start the Fedimint API
578    ///
579    /// Clients may receive an error due to forced shutdown, should call the
580    /// `server_status` to see if consensus has started.
581    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()>;
582
583    /// Returns the status of the server
584    async fn status(&self) -> FederationResult<StatusResponse>;
585
586    /// Show an audit across all modules
587    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary>;
588
589    /// Download the guardian config to back it up
590    async fn guardian_config_backup(&self, auth: ApiAuth)
591    -> FederationResult<GuardianConfigBackup>;
592
593    /// Check auth credentials
594    async fn auth(&self, auth: ApiAuth) -> FederationResult<()>;
595
596    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()>;
597
598    /// Publish our signed API announcement to other guardians
599    async fn submit_api_announcement(
600        &self,
601        peer_id: PeerId,
602        announcement: SignedApiAnnouncement,
603    ) -> FederationResult<()>;
604
605    async fn api_announcements(
606        &self,
607        guardian: PeerId,
608    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>>;
609
610    async fn sign_api_announcement(
611        &self,
612        api_url: SafeUrl,
613        auth: ApiAuth,
614    ) -> FederationResult<SignedApiAnnouncement>;
615
616    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()>;
617
618    /// Returns the fedimintd version a peer is running
619    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String>;
620
621    /// Fetch the backup statistics from the federation (admin endpoint)
622    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics>;
623
624    /// Get the invite code for the federation guardian.
625    /// For instance, useful after DKG
626    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode>;
627
628    /// Change the password used to encrypt the configs and for guardian
629    /// authentication
630    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()>;
631}
632
633pub fn deserialize_outcome<R>(
634    outcome: &SerdeOutputOutcome,
635    module_decoder: &Decoder,
636) -> OutputOutcomeResult<R>
637where
638    R: OutputOutcome + MaybeSend,
639{
640    let dyn_outcome = outcome
641        .try_into_inner_known_module_kind(module_decoder)
642        .map_err(|e| OutputOutcomeError::ResponseDeserialization(e.into()))?;
643
644    let source_instance = dyn_outcome.module_instance_id();
645
646    dyn_outcome.as_any().downcast_ref().cloned().ok_or_else(|| {
647        let target_type = std::any::type_name::<R>();
648        OutputOutcomeError::ResponseDeserialization(anyhow!(
649            "Could not downcast output outcome with instance id {source_instance} to {target_type}"
650        ))
651    })
652}
653
654#[derive(Debug, Clone)]
655pub struct WebsocketConnector {
656    peers: BTreeMap<PeerId, SafeUrl>,
657    api_secret: Option<String>,
658
659    /// List of overrides to use when attempting to connect to given
660    /// `PeerId`
661    ///
662    /// This is useful for testing, or forcing non-default network
663    /// connectivity.
664    pub connection_overrides: BTreeMap<PeerId, SafeUrl>,
665
666    /// Connection pool for websocket connections
667    #[allow(clippy::type_complexity)]
668    connections: Arc<tokio::sync::Mutex<HashMap<PeerId, Arc<OnceCell<Arc<WsClient>>>>>>,
669}
670
671impl WebsocketConnector {
672    fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> anyhow::Result<Self> {
673        let mut s = Self::new_no_overrides(peers, api_secret);
674
675        for (k, v) in parse_kv_list_from_env::<_, SafeUrl>(FM_WS_API_CONNECT_OVERRIDES_ENV)? {
676            s = s.with_connection_override(k, v);
677        }
678
679        Ok(s)
680    }
681    pub fn with_connection_override(mut self, peer_id: PeerId, url: SafeUrl) -> Self {
682        self.connection_overrides.insert(peer_id, url);
683        self
684    }
685    pub fn new_no_overrides(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
686        Self {
687            peers,
688            api_secret,
689            connection_overrides: BTreeMap::default(),
690            connections: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
691        }
692    }
693
694    async fn get_or_create_connection(&self, peer_id: PeerId) -> PeerResult<Arc<WsClient>> {
695        let mut pool_lock = self.connections.lock().await;
696
697        let entry_arc = pool_lock
698            .entry(peer_id)
699            .and_modify(|entry_arc| {
700                // Check if existing connection is disconnected and remove it
701                if let Some(existing_conn) = entry_arc.get()
702                    && !existing_conn.is_connected() {
703                        trace!(target: LOG_NET_WS, %peer_id, "Existing connection is disconnected, removing from pool");
704                        *entry_arc = Arc::new(OnceCell::new());
705                    }
706            })
707            .or_insert_with(|| Arc::new(OnceCell::new()))
708            .clone();
709
710        // Drop the pool lock so other connections can work in parallel
711        drop(pool_lock);
712
713        let conn = entry_arc
714            .get_or_try_init(|| async {
715                trace!(target: LOG_NET_WS, %peer_id, "Creating new websocket connection");
716                let api_endpoint = match self.connection_overrides.get(&peer_id) {
717                    Some(url) => {
718                        trace!(target: LOG_NET_WS, %peer_id, "Using a connectivity override for connection");
719                        url
720                    }
721                    None => self.peers.get(&peer_id).ok_or_else(|| {
722                        PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}"))
723                    })?,
724                };
725
726                #[cfg(not(target_family = "wasm"))]
727                let mut client = {
728                    install_crypto_provider().await;
729                    let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
730                    let mut root_certs = RootCertStore::empty();
731                    root_certs.extend(webpki_roots);
732
733                    let tls_cfg = CustomCertStore::builder()
734                        .with_root_certificates(root_certs)
735                        .with_no_client_auth();
736
737                    WsClientBuilder::default()
738                        .max_concurrent_requests(u16::MAX as usize)
739                        .with_custom_cert_store(tls_cfg)
740                };
741
742                #[cfg(target_family = "wasm")]
743                let client = WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
744
745                if let Some(api_secret) = &self.api_secret {
746                    #[cfg(not(target_family = "wasm"))]
747                    {
748                        // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
749                        // but we can set up the headers manually
750                        let mut headers = HeaderMap::new();
751
752                        let auth = base64::engine::general_purpose::STANDARD
753                            .encode(format!("fedimint:{api_secret}"));
754
755                        headers.insert(
756                            "Authorization",
757                            HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
758                        );
759
760                        client = client.set_headers(headers);
761                    }
762                    #[cfg(target_family = "wasm")]
763                    {
764                        // on wasm, url will be handled by the browser, which should take care of
765                        // `user:pass@...`
766                        let mut url = api_endpoint.clone();
767                        url.set_username("fedimint")
768                            .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid username")))?;
769                        url.set_password(Some(&api_secret))
770                            .map_err(|_| PeerError::InvalidEndpoint(anyhow!("invalid secret")))?;
771
772                        let client = client
773                            .build(url.as_str())
774                            .await
775                            .map_err(|err| PeerError::InternalClientError(err.into()))?;
776
777                        return Ok(Arc::new(client));
778                    }
779                }
780
781                let client = client
782                    .build(api_endpoint.as_str())
783                    .await
784                    .map_err(|err| PeerError::InternalClientError(err.into()))?;
785
786                Ok(Arc::new(client))
787            })
788            .await?;
789
790        trace!(target: LOG_NET_WS, %peer_id, "Using websocket connection");
791        Ok(conn.clone())
792    }
793}
794
795#[async_trait]
796impl IClientConnector for WebsocketConnector {
797    fn peers(&self) -> BTreeSet<PeerId> {
798        self.peers.keys().copied().collect()
799    }
800
801    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
802        let client = self.get_or_create_connection(peer_id).await?;
803        Ok(client.into_dyn())
804    }
805}
806
807#[cfg(all(feature = "tor", not(target_family = "wasm")))]
808#[derive(Debug, Clone)]
809pub struct TorConnector {
810    peers: BTreeMap<PeerId, SafeUrl>,
811    api_secret: Option<String>,
812}
813
814#[cfg(all(feature = "tor", not(target_family = "wasm")))]
815impl TorConnector {
816    pub fn new(peers: BTreeMap<PeerId, SafeUrl>, api_secret: Option<String>) -> Self {
817        Self { peers, api_secret }
818    }
819}
820
821#[cfg(all(feature = "tor", not(target_family = "wasm")))]
822#[async_trait]
823impl IClientConnector for TorConnector {
824    fn peers(&self) -> BTreeSet<PeerId> {
825        self.peers.keys().copied().collect()
826    }
827
828    #[allow(clippy::too_many_lines)]
829    async fn connect(&self, peer_id: PeerId) -> PeerResult<DynClientConnection> {
830        let api_endpoint = self
831            .peers
832            .get(&peer_id)
833            .ok_or_else(|| PeerError::InternalClientError(anyhow!("Invalid peer_id: {peer_id}")))?;
834
835        let tor_config = TorClientConfig::default();
836        let tor_client = TorClient::create_bootstrapped(tor_config)
837            .await
838            .map_err(|err| PeerError::InternalClientError(err.into()))?
839            .isolated_client();
840
841        debug!("Successfully created and bootstrapped the `TorClient`, for given `TorConfig`.");
842
843        // TODO: (@leonardo) should we implement our `IntoTorAddr` for `SafeUrl`
844        // instead?
845        let addr = (
846            api_endpoint
847                .host_str()
848                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected host str")))?,
849            api_endpoint
850                .port_or_known_default()
851                .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Expected port number")))?,
852        );
853        let tor_addr = TorAddr::from(addr).map_err(|e| {
854            PeerError::InvalidEndpoint(anyhow!("Invalid endpoint addr: {addr:?}: {e:#}"))
855        })?;
856
857        let tor_addr_clone = tor_addr.clone();
858
859        debug!(
860            ?tor_addr,
861            ?addr,
862            "Successfully created `TorAddr` for given address (i.e. host and port)"
863        );
864
865        // TODO: It can be updated to use `is_onion_address()` implementation,
866        // once https://gitlab.torproject.org/tpo/core/arti/-/merge_requests/2214 lands.
867        let anonymized_stream = if api_endpoint.is_onion_address() {
868            let mut stream_prefs = arti_client::StreamPrefs::default();
869            stream_prefs.connect_to_onion_services(arti_client::config::BoolOrAuto::Explicit(true));
870
871            let anonymized_stream = tor_client
872                .connect_with_prefs(tor_addr, &stream_prefs)
873                .await
874                .map_err(|e| PeerError::Connection(e.into()))?;
875
876            debug!(
877                ?tor_addr_clone,
878                "Successfully connected to onion address `TorAddr`, and established an anonymized `DataStream`"
879            );
880            anonymized_stream
881        } else {
882            let anonymized_stream = tor_client
883                .connect(tor_addr)
884                .await
885                .map_err(|e| PeerError::Connection(e.into()))?;
886
887            debug!(
888                ?tor_addr_clone,
889                "Successfully connected to `Hostname`or `Ip` `TorAddr`, and established an anonymized `DataStream`"
890            );
891            anonymized_stream
892        };
893
894        let is_tls = match api_endpoint.scheme() {
895            "wss" => true,
896            "ws" => false,
897            unexpected_scheme => {
898                return Err(PeerError::InvalidEndpoint(anyhow!(
899                    "Unsupported scheme: {unexpected_scheme}"
900                )));
901            }
902        };
903
904        let tls_connector = if is_tls {
905            let webpki_roots = webpki_roots::TLS_SERVER_ROOTS.iter().cloned();
906            let mut root_certs = RootCertStore::empty();
907            root_certs.extend(webpki_roots);
908
909            let tls_config = TlsClientConfig::builder()
910                .with_root_certificates(root_certs)
911                .with_no_client_auth();
912            let tls_connector = TlsConnector::from(Arc::new(tls_config));
913            Some(tls_connector)
914        } else {
915            None
916        };
917
918        let mut ws_client_builder =
919            WsClientBuilder::default().max_concurrent_requests(u16::MAX as usize);
920
921        if let Some(api_secret) = &self.api_secret {
922            // on native platforms, jsonrpsee-client ignores `user:pass@...` in the Url,
923            // but we can set up the headers manually
924            let mut headers = HeaderMap::new();
925
926            let auth =
927                base64::engine::general_purpose::STANDARD.encode(format!("fedimint:{api_secret}"));
928
929            headers.insert(
930                "Authorization",
931                HeaderValue::from_str(&format!("Basic {auth}")).expect("Can't fail"),
932            );
933
934            ws_client_builder = ws_client_builder.set_headers(headers);
935        }
936
937        match tls_connector {
938            None => {
939                let client = ws_client_builder
940                    .build_with_stream(api_endpoint.as_str(), anonymized_stream)
941                    .await
942                    .map_err(|e| PeerError::Connection(e.into()))?;
943
944                Ok(client.into_dyn())
945            }
946            Some(tls_connector) => {
947                let host = api_endpoint
948                    .host_str()
949                    .map(ToOwned::to_owned)
950                    .ok_or_else(|| PeerError::InvalidEndpoint(anyhow!("Invalid host str")))?;
951
952                // FIXME: (@leonardo) Is this leaking any data ? Should investigate it further
953                // if it's really needed.
954                let server_name = rustls_pki_types::ServerName::try_from(host)
955                    .map_err(|e| PeerError::InvalidEndpoint(e.into()))?;
956
957                let anonymized_tls_stream = tls_connector
958                    .connect(server_name, anonymized_stream)
959                    .await
960                    .map_err(|e| PeerError::Connection(e.into()))?;
961
962                let client = ws_client_builder
963                    .build_with_stream(api_endpoint.as_str(), anonymized_tls_stream)
964                    .await
965                    .map_err(|e| PeerError::Connection(e.into()))?;
966
967                Ok(client.into_dyn())
968            }
969        }
970    }
971}
972
973fn jsonrpc_error_to_peer_error(jsonrpc_error: JsonRpcClientError) -> PeerError {
974    match jsonrpc_error {
975        JsonRpcClientError::Call(error_object) => {
976            let error = anyhow!(error_object.message().to_owned());
977            match ErrorCode::from(error_object.code()) {
978                ErrorCode::ParseError | ErrorCode::OversizedRequest | ErrorCode::InvalidRequest => {
979                    PeerError::InvalidRequest(error)
980                }
981                ErrorCode::MethodNotFound => PeerError::InvalidRpcId(error),
982                ErrorCode::InvalidParams => PeerError::InvalidRequest(error),
983                ErrorCode::InternalError | ErrorCode::ServerIsBusy | ErrorCode::ServerError(_) => {
984                    PeerError::ServerError(error)
985                }
986            }
987        }
988        JsonRpcClientError::Transport(error) => PeerError::Transport(anyhow!(error)),
989        JsonRpcClientError::RestartNeeded(arc) => PeerError::Transport(anyhow!(arc)),
990        JsonRpcClientError::ParseError(error) => PeerError::InvalidResponse(anyhow!(error)),
991        JsonRpcClientError::InvalidSubscriptionId => {
992            PeerError::Transport(anyhow!("Invalid subscription id"))
993        }
994        JsonRpcClientError::InvalidRequestId(invalid_request_id) => {
995            PeerError::InvalidRequest(anyhow!(invalid_request_id))
996        }
997        JsonRpcClientError::RequestTimeout => PeerError::Transport(anyhow!("Request timeout")),
998        JsonRpcClientError::Custom(e) => PeerError::Transport(anyhow!(e)),
999        JsonRpcClientError::HttpNotImplemented => {
1000            PeerError::ServerError(anyhow!("Http not implemented"))
1001        }
1002        JsonRpcClientError::EmptyBatchRequest(empty_batch_request) => {
1003            PeerError::InvalidRequest(anyhow!(empty_batch_request))
1004        }
1005        JsonRpcClientError::RegisterMethod(register_method_error) => {
1006            PeerError::InvalidResponse(anyhow!(register_method_error))
1007        }
1008    }
1009}
1010
1011#[async_trait]
1012impl IClientConnection for WsClient {
1013    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1014        let method = match method {
1015            ApiMethod::Core(method) => method,
1016            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
1017        };
1018
1019        Ok(ClientT::request(self, &method, [request.to_json()])
1020            .await
1021            .map_err(jsonrpc_error_to_peer_error)?)
1022    }
1023
1024    async fn await_disconnection(&self) {
1025        self.on_disconnect().await;
1026    }
1027}
1028
1029#[async_trait]
1030impl IClientConnection for Arc<WsClient> {
1031    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value> {
1032        let method = match method {
1033            ApiMethod::Core(method) => method,
1034            ApiMethod::Module(module_id, method) => format!("module_{module_id}_{method}"),
1035        };
1036
1037        Ok(
1038            ClientT::request(self.as_ref(), &method, [request.to_json()])
1039                .await
1040                .map_err(jsonrpc_error_to_peer_error)?,
1041        )
1042    }
1043
1044    async fn await_disconnection(&self) {
1045        self.on_disconnect().await;
1046    }
1047}
1048
1049pub type DynClientConnector = Arc<dyn IClientConnector>;
1050
1051/// Allows to connect to peers. Connections are request based and should be
1052/// authenticated and encrypted for production deployments.
1053#[async_trait]
1054pub trait IClientConnector: Send + Sync + 'static + fmt::Debug {
1055    fn peers(&self) -> BTreeSet<PeerId>;
1056
1057    async fn connect(&self, peer: PeerId) -> PeerResult<DynClientConnection>;
1058
1059    fn into_dyn(self) -> DynClientConnector
1060    where
1061        Self: Sized,
1062    {
1063        Arc::new(self)
1064    }
1065
1066    fn is_admin(&self) -> bool {
1067        self.peers().len() == 1
1068    }
1069}
1070
1071pub type DynClientConnection = Arc<dyn IClientConnection>;
1072
1073#[async_trait]
1074pub trait IClientConnection: Debug + Send + Sync + 'static {
1075    async fn request(&self, method: ApiMethod, request: ApiRequestErased) -> PeerResult<Value>;
1076
1077    async fn await_disconnection(&self);
1078
1079    fn into_dyn(self) -> DynClientConnection
1080    where
1081        Self: Sized,
1082    {
1083        Arc::new(self)
1084    }
1085}
1086
1087#[derive(Clone, Debug)]
1088pub struct ReconnectFederationApi {
1089    connector: DynClientConnector,
1090    peers: BTreeSet<PeerId>,
1091    admin_id: Option<PeerId>,
1092    module_id: Option<ModuleInstanceId>,
1093    connections: ReconnectClientConnections,
1094}
1095
1096impl ReconnectFederationApi {
1097    pub fn new(connector: DynClientConnector, admin_peer_id: Option<PeerId>) -> Self {
1098        Self {
1099            peers: connector.peers(),
1100            admin_id: admin_peer_id,
1101            module_id: None,
1102            connections: ReconnectClientConnections::new(&connector),
1103            connector,
1104        }
1105    }
1106
1107    pub fn new_admin(connector: DynClientConnector, peer: PeerId) -> Self {
1108        Self::new(connector, Some(peer))
1109    }
1110}
1111
1112impl IModuleFederationApi for ReconnectFederationApi {}
1113
1114#[apply(async_trait_maybe_send!)]
1115impl IRawFederationApi for ReconnectFederationApi {
1116    fn all_peers(&self) -> &BTreeSet<PeerId> {
1117        &self.peers
1118    }
1119
1120    fn self_peer(&self) -> Option<PeerId> {
1121        self.admin_id
1122    }
1123
1124    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
1125        ReconnectFederationApi {
1126            connector: self.connector.clone(),
1127            peers: self.peers.clone(),
1128            admin_id: self.admin_id,
1129            module_id: Some(id),
1130            connections: self.connections.clone(),
1131        }
1132        .into()
1133    }
1134
1135    #[instrument(
1136        target = LOG_NET_API,
1137        skip_all,
1138        fields(
1139            peer_id = %peer_id,
1140            method = %method,
1141            params = %params.params,
1142        )
1143    )]
1144    async fn request_raw(
1145        &self,
1146        peer_id: PeerId,
1147        method: &str,
1148        params: &ApiRequestErased,
1149    ) -> PeerResult<Value> {
1150        let method = match self.module_id {
1151            Some(module_id) => ApiMethod::Module(module_id, method.to_string()),
1152            None => ApiMethod::Core(method.to_string()),
1153        };
1154
1155        self.connections
1156            .request(peer_id, method, params.clone())
1157            .await
1158    }
1159    fn connector(&self) -> &DynClientConnector {
1160        &self.connector
1161    }
1162}
1163
1164#[derive(Clone, Debug)]
1165pub struct ReconnectClientConnections {
1166    connections: BTreeMap<PeerId, ClientConnection>,
1167}
1168
1169impl ReconnectClientConnections {
1170    pub fn new(connector: &DynClientConnector) -> Self {
1171        ReconnectClientConnections {
1172            connections: connector
1173                .peers()
1174                .into_iter()
1175                .map(|peer| (peer, ClientConnection::new(peer, connector.clone())))
1176                .collect(),
1177        }
1178    }
1179
1180    async fn request(
1181        &self,
1182        peer: PeerId,
1183        method: ApiMethod,
1184        request: ApiRequestErased,
1185    ) -> PeerResult<Value> {
1186        trace!(target: LOG_NET_API, %method, "Api request");
1187        let res = self
1188            .connections
1189            .get(&peer)
1190            .ok_or_else(|| PeerError::InvalidPeerId { peer_id: peer })?
1191            .connection()
1192            .await
1193            .context("Failed to connect to peer")
1194            .map_err(PeerError::Connection)?
1195            .request(method.clone(), request)
1196            .await;
1197
1198        trace!(target: LOG_NET_API, ?method, res_ok = res.is_ok(), "Api response");
1199
1200        res
1201    }
1202}
1203
1204#[derive(Clone, Debug)]
1205struct ClientConnection {
1206    sender: async_channel::Sender<oneshot::Sender<DynClientConnection>>,
1207}
1208
1209impl ClientConnection {
1210    fn new(peer: PeerId, connector: DynClientConnector) -> ClientConnection {
1211        let (sender, receiver) = bounded::<oneshot::Sender<DynClientConnection>>(1024);
1212
1213        fedimint_core::task::spawn(
1214            "peer-api-connection",
1215            async move {
1216                let mut backoff = api_networking_backoff();
1217
1218                while let Ok(sender) = receiver.recv().await {
1219                    let mut senders = vec![sender];
1220
1221                    // Drain the queue, so we everyone that already joined fail or succeed
1222                    // together.
1223                    while let Ok(sender) = receiver.try_recv() {
1224                        senders.push(sender);
1225                    }
1226
1227                    match connector.connect(peer).await {
1228                        Ok(connection) => {
1229                            trace!(target: LOG_CLIENT_NET_API, "Connected to peer api");
1230
1231                            for sender in senders {
1232                                sender.send(connection.clone()).ok();
1233                            }
1234
1235                            loop {
1236                                tokio::select! {
1237                                    sender = receiver.recv() => {
1238                                        match sender.ok() {
1239                                            Some(sender) => sender.send(connection.clone()).ok(),
1240                                            None => break,
1241                                        };
1242                                    }
1243                                    () = connection.await_disconnection() => break,
1244                                }
1245                            }
1246
1247                            trace!(target: LOG_CLIENT_NET_API, "Disconnected from peer api");
1248
1249                            backoff = api_networking_backoff();
1250                        }
1251                        Err(e) => {
1252                            trace!(target: LOG_CLIENT_NET_API, "Failed to connect to peer api {e}");
1253
1254                            drop(senders);
1255
1256                            fedimint_core::task::sleep(
1257                                backoff.next().expect("No limit to the number of retries"),
1258                            )
1259                            .await;
1260                        }
1261                    }
1262                }
1263
1264                trace!(target: LOG_CLIENT_NET_API, "Shutting down peer api connection task");
1265            }
1266            .instrument(trace_span!("peer-api-connection", ?peer)),
1267        );
1268
1269        ClientConnection { sender }
1270    }
1271
1272    async fn connection(&self) -> Option<DynClientConnection> {
1273        let (sender, receiver) = oneshot::channel();
1274
1275        self.sender
1276            .send(sender)
1277            .await
1278            .inspect_err(|err| {
1279                warn!(
1280                    target: LOG_CLIENT_NET_API,
1281                    err = %err.fmt_compact(),
1282                    "Api connection request channel closed unexpectedly"
1283                );
1284            })
1285            .ok()?;
1286
1287        receiver.await.ok()
1288    }
1289}
1290
1291/// The status of a server, including how it views its peers
1292#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
1293pub struct LegacyFederationStatus {
1294    pub session_count: u64,
1295    pub status_by_peer: HashMap<PeerId, LegacyPeerStatus>,
1296    pub peers_online: u64,
1297    pub peers_offline: u64,
1298    /// This should always be 0 if everything is okay, so a monitoring tool
1299    /// should generate an alert if this is not the case.
1300    pub peers_flagged: u64,
1301    pub scheduled_shutdown: Option<u64>,
1302}
1303
1304#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1305pub struct LegacyPeerStatus {
1306    pub last_contribution: Option<u64>,
1307    pub connection_status: LegacyP2PConnectionStatus,
1308    /// Indicates that this peer needs attention from the operator since
1309    /// it has not contributed to the consensus in a long time
1310    pub flagged: bool,
1311}
1312
1313#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
1314#[serde(rename_all = "snake_case")]
1315pub enum LegacyP2PConnectionStatus {
1316    #[default]
1317    Disconnected,
1318    Connected,
1319}
1320
1321#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
1322pub struct StatusResponse {
1323    pub server: ServerStatusLegacy,
1324    pub federation: Option<LegacyFederationStatus>,
1325}
1326
1327pub async fn make_admin_connector(
1328    admin_peer_id: PeerId,
1329    url: SafeUrl,
1330    api_secret: &Option<String>,
1331    iroh_enable_dht: bool,
1332    iroh_enable_next: bool,
1333) -> anyhow::Result<DynClientConnector> {
1334    make_connector(
1335        once((admin_peer_id, url)),
1336        api_secret,
1337        iroh_enable_dht,
1338        iroh_enable_next,
1339    )
1340    .await
1341}
1342
1343pub async fn make_connector(
1344    peers: impl IntoIterator<Item = (PeerId, SafeUrl)>,
1345    api_secret: &Option<String>,
1346    iroh_enable_dht: bool,
1347    iroh_enable_next: bool,
1348) -> anyhow::Result<DynClientConnector> {
1349    let peers = peers.into_iter().collect::<BTreeMap<PeerId, SafeUrl>>();
1350
1351    let scheme = peers
1352        .values()
1353        .next()
1354        .expect("Federation api has been initialized with no peers")
1355        .scheme();
1356
1357    Ok(match scheme {
1358        "ws" | "wss" => WebsocketConnector::new(peers, api_secret.clone())?.into_dyn(),
1359        #[cfg(all(feature = "tor", not(target_family = "wasm")))]
1360        "tor" => TorConnector::new(peers, api_secret.clone()).into_dyn(),
1361        "iroh" => {
1362            let iroh_dns = std::env::var(FM_IROH_DNS_ENV)
1363                .ok()
1364                .and_then(|dns| dns.parse().ok());
1365            iroh::IrohConnector::new(peers, iroh_dns, iroh_enable_dht, iroh_enable_next)
1366                .await?
1367                .into_dyn()
1368        }
1369        scheme => anyhow::bail!("Unsupported connector scheme: {scheme}"),
1370    })
1371}
1372
1373#[cfg(test)]
1374mod tests;