fedimint_api_client/api/global_api/
with_cache.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::ModuleInstanceId;
12use fedimint_core::core::backup::SignedBackupRequest;
13use fedimint_core::endpoint_constants::{
14    ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
15    AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
16    BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT,
17    FEDIMINTD_VERSION_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT,
18    RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
19    SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT,
20    SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
21    SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22    SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
23    VERIFY_CONFIG_HASH_ENDPOINT,
24};
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29    ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
30};
31use fedimint_core::net::api_announcement::{
32    SignedApiAnnouncement, SignedApiAnnouncementSubmission,
33};
34use fedimint_core::session_outcome::{
35    AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
36};
37use fedimint_core::task::{MaybeSend, MaybeSync};
38use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
39use fedimint_core::util::SafeUrl;
40use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
41use fedimint_logging::LOG_CLIENT_NET_API;
42use futures::future::join_all;
43use itertools::Itertools;
44use rand::seq::SliceRandom;
45use serde_json::Value;
46use tokio::sync::OnceCell;
47use tracing::debug;
48
49use super::super::{
50    DynModuleApi, GuardianConfigBackup, IGlobalFederationApi, IRawFederationApi, StatusResponse,
51};
52use crate::api::{
53    FederationApiExt, FederationError, FederationResult, PeerResult,
54    VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
55};
56use crate::query::FilterMapThreshold;
57
58/// Convenience extension trait used for wrapping [`IRawFederationApi`] in
59/// a [`GlobalFederationApiWithCache`]
60pub trait GlobalFederationApiWithCacheExt
61where
62    Self: Sized,
63{
64    fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
65}
66
67impl<T> GlobalFederationApiWithCacheExt for T
68where
69    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
70{
71    fn with_cache(self) -> GlobalFederationApiWithCache<T> {
72        GlobalFederationApiWithCache::new(self)
73    }
74}
75
76/// [`IGlobalFederationApi`] wrapping some `T: IRawFederationApi` and adding
77/// a tiny bit of caching.
78///
79/// Use [`GlobalFederationApiWithCacheExt::with_cache`] to create.
80#[derive(Debug)]
81pub struct GlobalFederationApiWithCache<T> {
82    pub(crate) inner: T,
83    /// Small LRU used as [`IGlobalFederationApi::await_block`] cache.
84    ///
85    /// This is mostly to avoid multiple client module recovery processes
86    /// re-requesting same blocks and putting burden on the federation.
87    ///
88    /// The LRU can be be fairly small, as if the modules are
89    /// (near-)bottlenecked on fetching blocks they will naturally
90    /// synchronize, or split into a handful of groups. And if they are not,
91    /// no LRU here is going to help them.
92    pub(crate) await_session_lru:
93        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
94
95    /// Like [`Self::await_session_lru`], but for
96    /// [`IGlobalFederationApi::get_session_status`].
97    ///
98    /// In theory these two LRUs have the same content, but one is locked by
99    /// potentially long-blocking operation, while the other non-blocking one.
100    /// Given how tiny they are, it's not worth complicating things to unify
101    /// them.
102    pub(crate) get_session_status_lru:
103        Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
104}
105
106impl<T> GlobalFederationApiWithCache<T> {
107    pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
108        Self {
109            inner,
110            await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
111                NonZeroUsize::new(512).expect("is non-zero"),
112            ))),
113            get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
114                NonZeroUsize::new(512).expect("is non-zero"),
115            ))),
116        }
117    }
118}
119
120impl<T> GlobalFederationApiWithCache<T>
121where
122    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
123{
124    pub(crate) async fn await_block_raw(
125        &self,
126        block_index: u64,
127        decoders: &ModuleDecoderRegistry,
128    ) -> anyhow::Result<SessionOutcome> {
129        debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
130        self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
131            AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
132            ApiRequestErased::new(block_index),
133        )
134        .await?
135        .try_into_inner(decoders)
136        .map_err(|e| anyhow!(e.to_string()))
137    }
138
139    pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
140        let mut peers = self.all_peers().iter().copied().collect_vec();
141        peers.shuffle(&mut rand::thread_rng());
142        peers.into_iter()
143    }
144
145    pub(crate) async fn get_session_status_raw_v2(
146        &self,
147        block_index: u64,
148        broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
149        decoders: &ModuleDecoderRegistry,
150    ) -> anyhow::Result<SessionStatus> {
151        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
152        let params = ApiRequestErased::new(block_index);
153        let mut last_error = None;
154        // fetch serially
155        for peer_id in self.select_peers_for_status() {
156            match self
157                .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
158                    SESSION_STATUS_V2_ENDPOINT.to_string(),
159                    params.clone(),
160                    peer_id,
161                )
162                .await
163                .map_err(anyhow::Error::from)
164                .and_then(|s| Ok(s.try_into_inner(decoders)?))
165            {
166                Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
167                    if signed_session_outcome.verify(broadcast_public_keys, block_index) {
168                        // early return
169                        return Ok(SessionStatus::Complete(
170                            signed_session_outcome.session_outcome,
171                        ));
172                    }
173                    last_error = Some(format_err!("Invalid signature"));
174                }
175                Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
176                    // no signature: use fallback method
177                    return self.get_session_status_raw(block_index, decoders).await;
178                }
179                Err(err) => {
180                    last_error = Some(err);
181                }
182            }
183            // if we loop then we must have last_error
184            assert!(last_error.is_some());
185        }
186        Err(last_error.expect("must have at least one peer"))
187    }
188
189    pub(crate) async fn get_session_status_raw(
190        &self,
191        block_index: u64,
192        decoders: &ModuleDecoderRegistry,
193    ) -> anyhow::Result<SessionStatus> {
194        debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
195        self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
196            SESSION_STATUS_ENDPOINT.to_string(),
197            ApiRequestErased::new(block_index),
198        )
199        .await?
200        .try_into_inner(&decoders.clone().with_fallback())
201        .map_err(|e| anyhow!(e))
202    }
203}
204
205#[apply(async_trait_maybe_send!)]
206impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
207where
208    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
209{
210    fn all_peers(&self) -> &BTreeSet<PeerId> {
211        self.inner.all_peers()
212    }
213
214    fn self_peer(&self) -> Option<PeerId> {
215        self.inner.self_peer()
216    }
217
218    fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
219        self.inner.with_module(id)
220    }
221
222    /// Make request to a specific federation peer by `peer_id`
223    async fn request_raw(
224        &self,
225        peer_id: PeerId,
226        method: &str,
227        params: &ApiRequestErased,
228    ) -> PeerResult<Value> {
229        self.inner.request_raw(peer_id, method, params).await
230    }
231}
232
233#[apply(async_trait_maybe_send!)]
234impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
235where
236    T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
237{
238    async fn await_block(
239        &self,
240        session_idx: u64,
241        decoders: &ModuleDecoderRegistry,
242    ) -> anyhow::Result<SessionOutcome> {
243        let mut lru_lock = self.await_session_lru.lock().await;
244
245        let entry_arc = lru_lock
246            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
247            .clone();
248
249        // we drop the lru lock so requests for other `session_idx` can work in parallel
250        drop(lru_lock);
251
252        entry_arc
253            .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
254            .await
255            .cloned()
256    }
257
258    async fn get_session_status(
259        &self,
260        session_idx: u64,
261        decoders: &ModuleDecoderRegistry,
262        core_api_version: ApiVersion,
263        broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
264    ) -> anyhow::Result<SessionStatus> {
265        let mut lru_lock = self.get_session_status_lru.lock().await;
266
267        let entry_arc = lru_lock
268            .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
269            .clone();
270
271        // we drop the lru lock so requests for other `session_idx` can work in parallel
272        drop(lru_lock);
273
274        enum NoCacheErr {
275            Initial,
276            Pending(Vec<AcceptedItem>),
277            Err(anyhow::Error),
278        }
279        match entry_arc
280            .get_or_try_init(|| async {
281                let session_status =
282                    if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
283                        self.get_session_status_raw(session_idx, decoders).await
284                    } else if let Some(broadcast_public_keys) = broadcast_public_keys {
285                        self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
286                            .await
287                    } else {
288                        self.get_session_status_raw(session_idx, decoders).await
289                    };
290                match session_status {
291                    Err(e) => Err(NoCacheErr::Err(e)),
292                    Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
293                    Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
294                    // only status we can cache (hance outer Ok)
295                    Ok(SessionStatus::Complete(s)) => Ok(s),
296                }
297            })
298            .await
299            .cloned()
300        {
301            Ok(s) => Ok(SessionStatus::Complete(s)),
302            Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
303            Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
304            Err(NoCacheErr::Err(e)) => Err(e),
305        }
306    }
307
308    async fn submit_transaction(
309        &self,
310        tx: Transaction,
311    ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
312        self.request_current_consensus_retry(
313            SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
314            ApiRequestErased::new(SerdeTransaction::from(&tx)),
315        )
316        .await
317    }
318
319    async fn session_count(&self) -> FederationResult<u64> {
320        self.request_current_consensus(
321            SESSION_COUNT_ENDPOINT.to_owned(),
322            ApiRequestErased::default(),
323        )
324        .await
325    }
326
327    async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
328        self.request_current_consensus_retry(
329            AWAIT_TRANSACTION_ENDPOINT.to_owned(),
330            ApiRequestErased::new(txid),
331        )
332        .await
333    }
334
335    async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
336        self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
337            .await
338    }
339
340    async fn download_backup(
341        &self,
342        id: &secp256k1::PublicKey,
343    ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
344        self.request_with_strategy(
345            FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
346            RECOVER_ENDPOINT.to_owned(),
347            ApiRequestErased::new(id),
348        )
349        .await
350    }
351
352    async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
353        self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
354            .await
355    }
356
357    async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
358        self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
359            .await
360    }
361
362    async fn set_local_params(
363        &self,
364        name: String,
365        federation_name: Option<String>,
366        auth: ApiAuth,
367    ) -> FederationResult<String> {
368        self.request_admin(
369            SET_LOCAL_PARAMS_ENDPOINT,
370            ApiRequestErased::new(SetLocalParamsRequest {
371                name,
372                federation_name,
373            }),
374            auth,
375        )
376        .await
377    }
378
379    async fn add_peer_connection_info(
380        &self,
381        info: String,
382        auth: ApiAuth,
383    ) -> FederationResult<String> {
384        self.request_admin(
385            ADD_PEER_SETUP_CODE_ENDPOINT,
386            ApiRequestErased::new(info),
387            auth,
388        )
389        .await
390    }
391
392    async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
393        self.request_admin(
394            RESET_PEER_SETUP_CODES_ENDPOINT,
395            ApiRequestErased::default(),
396            auth,
397        )
398        .await
399    }
400
401    async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
402        self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
403            .await
404    }
405
406    async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
407        self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
408            .await
409    }
410
411    async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
412        self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
413            .await
414    }
415
416    async fn get_verify_config_hash(
417        &self,
418        auth: ApiAuth,
419    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
420        self.request_admin(
421            VERIFY_CONFIG_HASH_ENDPOINT,
422            ApiRequestErased::default(),
423            auth,
424        )
425        .await
426    }
427
428    async fn verified_configs(
429        &self,
430        auth: ApiAuth,
431    ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
432        self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
433            .await
434    }
435
436    async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
437        self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
438            .await
439    }
440
441    async fn status(&self) -> FederationResult<StatusResponse> {
442        self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
443            .await
444    }
445
446    async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
447        self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
448            .await
449    }
450
451    async fn guardian_config_backup(
452        &self,
453        auth: ApiAuth,
454    ) -> FederationResult<GuardianConfigBackup> {
455        self.request_admin(
456            GUARDIAN_CONFIG_BACKUP_ENDPOINT,
457            ApiRequestErased::default(),
458            auth,
459        )
460        .await
461    }
462
463    async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
464        self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
465            .await
466    }
467
468    async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
469        self.request_admin(
470            RESTART_FEDERATION_SETUP_ENDPOINT,
471            ApiRequestErased::default(),
472            auth,
473        )
474        .await
475    }
476
477    async fn submit_api_announcement(
478        &self,
479        announcement_peer_id: PeerId,
480        announcement: SignedApiAnnouncement,
481    ) -> FederationResult<()> {
482        let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
483            let announcement_inner = announcement.clone();
484            async move {
485                (
486                    peer_id,
487                    self.request_single_peer::<()>(
488                        SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
489                        ApiRequestErased::new(SignedApiAnnouncementSubmission {
490                            signed_api_announcement: announcement_inner,
491                            peer_id: announcement_peer_id,
492                        }),
493                        peer_id,
494                    )
495                    .await,
496                )
497            }
498        }))
499        .await
500        .into_iter()
501        .filter_map(|(peer_id, result)| match result {
502            Ok(()) => None,
503            Err(e) => Some((peer_id, e)),
504        })
505        .collect::<BTreeMap<_, _>>();
506
507        if peer_errors.is_empty() {
508            Ok(())
509        } else {
510            Err(FederationError {
511                method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
512                params: serde_json::to_value(announcement).expect("can be serialized"),
513                general: None,
514                peer_errors,
515            })
516        }
517    }
518
519    async fn api_announcements(
520        &self,
521        guardian: PeerId,
522    ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
523        self.request_single_peer(
524            API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
525            ApiRequestErased::default(),
526            guardian,
527        )
528        .await
529    }
530
531    async fn sign_api_announcement(
532        &self,
533        api_url: SafeUrl,
534        auth: ApiAuth,
535    ) -> FederationResult<SignedApiAnnouncement> {
536        self.request_admin(
537            SIGN_API_ANNOUNCEMENT_ENDPOINT,
538            ApiRequestErased::new(api_url),
539            auth,
540        )
541        .await
542    }
543
544    async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
545        self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
546            .await
547    }
548
549    async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
550        self.request_admin(
551            BACKUP_STATISTICS_ENDPOINT,
552            ApiRequestErased::default(),
553            auth,
554        )
555        .await
556    }
557
558    async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
559        self.request_single_peer(
560            FEDIMINTD_VERSION_ENDPOINT.to_owned(),
561            ApiRequestErased::default(),
562            peer_id,
563        )
564        .await
565    }
566
567    async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode> {
568        self.request_single_peer(
569            INVITE_CODE_ENDPOINT.to_owned(),
570            ApiRequestErased::default(),
571            guardian,
572        )
573        .await
574    }
575}