fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{
10    GuardianConfigBackup, PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus,
11};
12use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
13use fedimint_core::core::ModuleInstanceId;
14use fedimint_core::core::backup::SignedBackupRequest;
15use fedimint_core::endpoint_constants::{
16    ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
17    AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
18    BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
19    CONFIG_GEN_PEERS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT,
20    GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
21    RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
22    SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
23    SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
24    SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
25    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
26    VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::invite_code::InviteCode;
29use fedimint_core::module::audit::AuditSummary;
30use fedimint_core::module::registry::ModuleDecoderRegistry;
31use fedimint_core::module::{
32    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
33};
34use fedimint_core::net::api_announcement::{
35    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
36};
37use fedimint_core::session_outcome::{
38    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
39};
40use fedimint_core::task::{MaybeSend, MaybeSync};
41use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
44use fedimint_logging::LOG_CLIENT_NET_API;
45use futures::future::join_all;
46use itertools::Itertools;
47use rand::seq::SliceRandom;
48use serde_json::Value;
49use tokio::sync::OnceCell;
50use tracing::{debug, trace};
51
52use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
53use crate::api::{
54    DynClientConnector, FederationApiExt, FederationError, FederationResult, PeerResult,
55    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
56};
57use crate::query::FilterMapThreshold;
58
59/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
60/// a [`GlobalFederationApiWithCache`]
61pub trait GlobalFederationApiWithCacheExt
62where
63    Self: Sized,
64{
65    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
66}
67
68impl<T> GlobalFederationApiWithCacheExt for T
69where
70    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
71{
72    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
73        GlobalFederationApiWithCache::new(self)
74    }
75}
76
77/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
78/// a tiny bit of caching.
79///
80/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
81#[derive(Debug)]
82pub struct GlobalFederationApiWithCache<T> {
83    pub(crate) inner: T,
84    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
85    ///
86    /// This is mostly to avoid multiple client module recovery processes
87    /// re-requesting same blocks and putting burden on the federation.
88    ///
89    /// The LRU can be be fairly small, as if the modules are
90    /// (near-)bottlenecked on fetching blocks they will naturally
91    /// synchronize, or split into a handful of groups. And if they are not,
92    /// no LRU here is going to help them.
93    pub(crate) await_session_lru:
94        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
95
96    /// Like [`Self::await_session_lru`], but for
97    /// [`IGlobalFederationApi::get_session_status`].
98    ///
99    /// In theory these two LRUs have the same content, but one is locked by
100    /// potentially long-blocking operation, while the other non-blocking one.
101    /// Given how tiny they are, it's not worth complicating things to unify
102    /// them.
103    pub(crate) get_session_status_lru:
104        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
105}
106
107impl<T> GlobalFederationApiWithCache<T> {
108    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
109        Self {
110            inner,
111            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
112                NonZeroUsize::new(512).expect("is non-zero"),
113            ))),
114            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
115                NonZeroUsize::new(512).expect("is non-zero"),
116            ))),
117        }
118    }
119}
120
121impl<T> GlobalFederationApiWithCache<T>
122where
123    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
124{
125    pub(crate) async fn await_block_raw(
126        &self,
127        block_index: u64,
128        decoders: &ModuleDecoderRegistry,
129    ) -> anyhow::Result<SessionOutcome> {
130        if block_index % 100 == 0 {
131            debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
132        } else {
133            trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
134        }
135        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
136            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
137            ApiRequestErased::new(block_index),
138        )
139        .await?
140        .try_into_inner(decoders)
141        .map_err(|e| anyhow!(e.to_string()))
142    }
143
144    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
145        let mut peers = self.all_peers().iter().copied().collect_vec();
146        peers.shuffle(&mut rand::thread_rng());
147        peers.into_iter()
148    }
149
150    pub(crate) async fn get_session_status_raw_v2(
151        &self,
152        block_index: u64,
153        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
154        decoders: &ModuleDecoderRegistry,
155    ) -> anyhow::Result<SessionStatus> {
156        if block_index % 100 == 0 {
157            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
158        } else {
159            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
160        }
161        let params = ApiRequestErased::new(block_index);
162        let mut last_error = None;
163        // fetch serially
164        for peer_id in self.select_peers_for_status() {
165            match self
166                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
167                    SESSION_STATUS_V2_ENDPOINT.to_string(),
168                    params.clone(),
169                    peer_id,
170                )
171                .await
172                .map_err(anyhow::Error::from)
173                .and_then(|s| Ok(s.try_into_inner(decoders)?))
174            {
175                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
176                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
177                        // early return
178                        return Ok(SessionStatus::Complete(
179                            signed_session_outcome.session_outcome,
180                        ));
181                    }
182                    last_error = Some(format_err!("Invalid signature"));
183                }
184                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
185                    // no signature: use fallback method
186                    return self.get_session_status_raw(block_index, decoders).await;
187                }
188                Err(err) => {
189                    last_error = Some(err);
190                }
191            }
192            // if we loop then we must have last_error
193            assert!(last_error.is_some());
194        }
195        Err(last_error.expect("must have at least one peer"))
196    }
197
198    pub(crate) async fn get_session_status_raw(
199        &self,
200        block_index: u64,
201        decoders: &ModuleDecoderRegistry,
202    ) -> anyhow::Result<SessionStatus> {
203        if block_index % 100 == 0 {
204            debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
205        } else {
206            trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
207        }
208        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
209            SESSION_STATUS_ENDPOINT.to_string(),
210            ApiRequestErased::new(block_index),
211        )
212        .await?
213        .try_into_inner(&decoders.clone().with_fallback())
214        .map_err(|e| anyhow!(e))
215    }
216}
217
218#[apply(async_trait_maybe_send!)]
219impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
220where
221    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
222{
223    fn all_peers(&self) -> &BTreeSet<PeerId> {
224        self.inner.all_peers()
225    }
226
227    fn self_peer(&self) -> Option<PeerId> {
228        self.inner.self_peer()
229    }
230
231    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
232        self.inner.with_module(id)
233    }
234
235    /// Make request to a specific federation peer by `peer_id`
236    async fn request_raw(
237        &self,
238        peer_id: PeerId,
239        method: &str,
240        params: &ApiRequestErased,
241    ) -> PeerResult<Value> {
242        self.inner.request_raw(peer_id, method, params).await
243    }
244
245    fn connector(&self) -> &DynClientConnector {
246        self.inner.connector()
247    }
248}
249
250#[apply(async_trait_maybe_send!)]
251impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
252where
253    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
254{
255    async fn await_block(
256        &self,
257        session_idx: u64,
258        decoders: &ModuleDecoderRegistry,
259    ) -> anyhow::Result<SessionOutcome> {
260        let mut lru_lock = self.await_session_lru.lock().await;
261
262        let entry_arc = lru_lock
263            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
264            .clone();
265
266        // we drop the lru lock so requests for other `session_idx` can work in parallel
267        drop(lru_lock);
268
269        entry_arc
270            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
271            .await
272            .cloned()
273    }
274
275    async fn get_session_status(
276        &self,
277        session_idx: u64,
278        decoders: &ModuleDecoderRegistry,
279        core_api_version: ApiVersion,
280        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
281    ) -> anyhow::Result<SessionStatus> {
282        let mut lru_lock = self.get_session_status_lru.lock().await;
283
284        let entry_arc = lru_lock
285            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
286            .clone();
287
288        // we drop the lru lock so requests for other `session_idx` can work in parallel
289        drop(lru_lock);
290
291        enum NoCacheErr {
292            Initial,
293            Pending(Vec<AcceptedItem>),
294            Err(anyhow::Error),
295        }
296        match entry_arc
297            .get_or_try_init(|| async {
298                let session_status =
299                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
300                        self.get_session_status_raw(session_idx, decoders).await
301                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
302                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
303                            .await
304                    } else {
305                        self.get_session_status_raw(session_idx, decoders).await
306                    };
307                match session_status {
308                    Err(e) => Err(NoCacheErr::Err(e)),
309                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
310                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
311                    // only status we can cache (hance outer Ok)
312                    Ok(SessionStatus::Complete(s)) => Ok(s),
313                }
314            })
315            .await
316            .cloned()
317        {
318            Ok(s) => Ok(SessionStatus::Complete(s)),
319            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
320            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
321            Err(NoCacheErr::Err(e)) => Err(e),
322        }
323    }
324
325    async fn submit_transaction(
326        &self,
327        tx: Transaction,
328    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
329        self.request_current_consensus_retry(
330            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
331            ApiRequestErased::new(SerdeTransaction::from(&tx)),
332        )
333        .await
334    }
335
336    async fn session_count(&self) -> FederationResult<u64> {
337        self.request_current_consensus(
338            SESSION_COUNT_ENDPOINT.to_owned(),
339            ApiRequestErased::default(),
340        )
341        .await
342    }
343
344    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
345        self.request_current_consensus_retry(
346            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
347            ApiRequestErased::new(txid),
348        )
349        .await
350    }
351
352    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
353        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
354            .await
355    }
356
357    async fn download_backup(
358        &self,
359        id: &secp256k1::PublicKey,
360    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
361        self.request_with_strategy(
362            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
363            RECOVER_ENDPOINT.to_owned(),
364            ApiRequestErased::new(id),
365        )
366        .await
367    }
368
369    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
370        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
371            .await
372    }
373
374    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
375        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
376            .await
377    }
378
379    async fn set_local_params(
380        &self,
381        name: String,
382        federation_name: Option<String>,
383        disable_base_fees: Option<bool>,
384        auth: ApiAuth,
385    ) -> FederationResult<String> {
386        self.request_admin(
387            SET_LOCAL_PARAMS_ENDPOINT,
388            ApiRequestErased::new(SetLocalParamsRequest {
389                name,
390                federation_name,
391                disable_base_fees,
392            }),
393            auth,
394        )
395        .await
396    }
397
398    async fn add_peer_connection_info(
399        &self,
400        info: String,
401        auth: ApiAuth,
402    ) -> FederationResult<String> {
403        self.request_admin(
404            ADD_PEER_SETUP_CODE_ENDPOINT,
405            ApiRequestErased::new(info),
406            auth,
407        )
408        .await
409    }
410
411    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
412        self.request_admin(
413            RESET_PEER_SETUP_CODES_ENDPOINT,
414            ApiRequestErased::default(),
415            auth,
416        )
417        .await
418    }
419
420    async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
421        self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
422            .await
423    }
424
425    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
426        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
427            .await
428    }
429
430    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
431        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
432            .await
433    }
434
435    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
436        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
437            .await
438    }
439
440    async fn get_verify_config_hash(
441        &self,
442        auth: ApiAuth,
443    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
444        self.request_admin(
445            VERIFY_CONFIG_HASH_ENDPOINT,
446            ApiRequestErased::default(),
447            auth,
448        )
449        .await
450    }
451
452    async fn verified_configs(
453        &self,
454        auth: ApiAuth,
455    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
456        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
457            .await
458    }
459
460    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
461        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
462            .await
463    }
464
465    async fn status(&self) -> FederationResult<StatusResponse> {
466        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
467            .await
468    }
469
470    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
471        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
472            .await
473    }
474
475    async fn guardian_config_backup(
476        &self,
477        auth: ApiAuth,
478    ) -> FederationResult<GuardianConfigBackup> {
479        self.request_admin(
480            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
481            ApiRequestErased::default(),
482            auth,
483        )
484        .await
485    }
486
487    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
488        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
489            .await
490    }
491
492    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
493        self.request_admin(
494            RESTART_FEDERATION_SETUP_ENDPOINT,
495            ApiRequestErased::default(),
496            auth,
497        )
498        .await
499    }
500
501    async fn submit_api_announcement(
502        &self,
503        announcement_peer_id: PeerId,
504        announcement: SignedApiAnnouncement,
505    ) -> FederationResult<()> {
506        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
507            let announcement_inner = announcement.clone();
508            async move {
509                (
510                    peer_id,
511                    self.request_single_peer::<()>(
512                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
513                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
514                            signed_api_announcement: announcement_inner,
515                            peer_id: announcement_peer_id,
516                        }),
517                        peer_id,
518                    )
519                    .await,
520                )
521            }
522        }))
523        .await
524        .into_iter()
525        .filter_map(|(peer_id, result)| match result {
526            Ok(()) => None,
527            Err(e) => Some((peer_id, e)),
528        })
529        .collect::<BTreeMap<_, _>>();
530
531        if peer_errors.is_empty() {
532            Ok(())
533        } else {
534            Err(FederationError {
535                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
536                params: serde_json::to_value(announcement).expect("can be serialized"),
537                general: None,
538                peer_errors,
539            })
540        }
541    }
542
543    async fn api_announcements(
544        &self,
545        guardian: PeerId,
546    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
547        self.request_single_peer(
548            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
549            ApiRequestErased::default(),
550            guardian,
551        )
552        .await
553    }
554
555    async fn sign_api_announcement(
556        &self,
557        api_url: SafeUrl,
558        auth: ApiAuth,
559    ) -> FederationResult<SignedApiAnnouncement> {
560        self.request_admin(
561            SIGN_API_ANNOUNCEMENT_ENDPOINT,
562            ApiRequestErased::new(api_url),
563            auth,
564        )
565        .await
566    }
567
568    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
569        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
570            .await
571    }
572
573    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
574        self.request_admin(
575            BACKUP_STATISTICS_ENDPOINT,
576            ApiRequestErased::default(),
577            auth,
578        )
579        .await
580    }
581
582    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
583        self.request_single_peer(
584            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
585            ApiRequestErased::default(),
586            peer_id,
587        )
588        .await
589    }
590
591    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode> {
592        self.request_single_peer(
593            INVITE_CODE_ENDPOINT.to_owned(),
594            ApiRequestErased::default(),
595            guardian,
596        )
597        .await
598    }
599
600    async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
601        self.request_admin(
602            CHANGE_PASSWORD_ENDPOINT,
603            ApiRequestErased::new(new_password),
604            auth,
605        )
606        .await
607    }
608}