1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus};
10use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
11use fedimint_core::core::ModuleInstanceId;
12use fedimint_core::core::backup::SignedBackupRequest;
13use fedimint_core::endpoint_constants::{
14 ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
15 AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
16 BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CONFIG_GEN_PEERS_ENDPOINT,
17 FEDIMINTD_VERSION_ENDPOINT, GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT,
18 RECOVER_ENDPOINT, RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT,
19 SESSION_COUNT_ENDPOINT, SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT,
20 SET_LOCAL_PARAMS_ENDPOINT, SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
21 SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
22 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
23 VERIFY_CONFIG_HASH_ENDPOINT,
24};
25use fedimint_core::invite_code::InviteCode;
26use fedimint_core::module::audit::AuditSummary;
27use fedimint_core::module::registry::ModuleDecoderRegistry;
28use fedimint_core::module::{
29 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
30};
31use fedimint_core::net::api_announcement::{
32 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
33};
34use fedimint_core::session_outcome::{
35 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
36};
37use fedimint_core::task::{MaybeSend, MaybeSync};
38use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
39use fedimint_core::util::SafeUrl;
40use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
41use fedimint_logging::LOG_CLIENT_NET_API;
42use futures::future::join_all;
43use itertools::Itertools;
44use rand::seq::SliceRandom;
45use serde_json::Value;
46use tokio::sync::OnceCell;
47use tracing::debug;
48
49use super::super::{
50 DynModuleApi, GuardianConfigBackup, IGlobalFederationApi, IRawFederationApi, StatusResponse,
51};
52use crate::api::{
53 FederationApiExt, FederationError, FederationResult, PeerResult,
54 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
55};
56use crate::query::FilterMapThreshold;
57
58pub trait GlobalFederationApiWithCacheExt
61where
62 Self: Sized,
63{
64 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
65}
66
67impl<T> GlobalFederationApiWithCacheExt for T
68where
69 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
70{
71 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
72 GlobalFederationApiWithCache::new(self)
73 }
74}
75
76#[derive(Debug)]
81pub struct GlobalFederationApiWithCache<T> {
82 pub(crate) inner: T,
83 pub(crate) await_session_lru:
93 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
94
95 pub(crate) get_session_status_lru:
103 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
104}
105
106impl<T> GlobalFederationApiWithCache<T> {
107 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
108 Self {
109 inner,
110 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
111 NonZeroUsize::new(512).expect("is non-zero"),
112 ))),
113 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
114 NonZeroUsize::new(512).expect("is non-zero"),
115 ))),
116 }
117 }
118}
119
120impl<T> GlobalFederationApiWithCache<T>
121where
122 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
123{
124 pub(crate) async fn await_block_raw(
125 &self,
126 block_index: u64,
127 decoders: &ModuleDecoderRegistry,
128 ) -> anyhow::Result<SessionOutcome> {
129 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
130 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
131 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
132 ApiRequestErased::new(block_index),
133 )
134 .await?
135 .try_into_inner(decoders)
136 .map_err(|e| anyhow!(e.to_string()))
137 }
138
139 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
140 let mut peers = self.all_peers().iter().copied().collect_vec();
141 peers.shuffle(&mut rand::thread_rng());
142 peers.into_iter()
143 }
144
145 pub(crate) async fn get_session_status_raw_v2(
146 &self,
147 block_index: u64,
148 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
149 decoders: &ModuleDecoderRegistry,
150 ) -> anyhow::Result<SessionStatus> {
151 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
152 let params = ApiRequestErased::new(block_index);
153 let mut last_error = None;
154 for peer_id in self.select_peers_for_status() {
156 match self
157 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
158 SESSION_STATUS_V2_ENDPOINT.to_string(),
159 params.clone(),
160 peer_id,
161 )
162 .await
163 .map_err(anyhow::Error::from)
164 .and_then(|s| Ok(s.try_into_inner(decoders)?))
165 {
166 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
167 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
168 return Ok(SessionStatus::Complete(
170 signed_session_outcome.session_outcome,
171 ));
172 }
173 last_error = Some(format_err!("Invalid signature"));
174 }
175 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
176 return self.get_session_status_raw(block_index, decoders).await;
178 }
179 Err(err) => {
180 last_error = Some(err);
181 }
182 }
183 assert!(last_error.is_some());
185 }
186 Err(last_error.expect("must have at least one peer"))
187 }
188
189 pub(crate) async fn get_session_status_raw(
190 &self,
191 block_index: u64,
192 decoders: &ModuleDecoderRegistry,
193 ) -> anyhow::Result<SessionStatus> {
194 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
195 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
196 SESSION_STATUS_ENDPOINT.to_string(),
197 ApiRequestErased::new(block_index),
198 )
199 .await?
200 .try_into_inner(&decoders.clone().with_fallback())
201 .map_err(|e| anyhow!(e))
202 }
203}
204
205#[apply(async_trait_maybe_send!)]
206impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
207where
208 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
209{
210 fn all_peers(&self) -> &BTreeSet<PeerId> {
211 self.inner.all_peers()
212 }
213
214 fn self_peer(&self) -> Option<PeerId> {
215 self.inner.self_peer()
216 }
217
218 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
219 self.inner.with_module(id)
220 }
221
222 async fn request_raw(
224 &self,
225 peer_id: PeerId,
226 method: &str,
227 params: &ApiRequestErased,
228 ) -> PeerResult<Value> {
229 self.inner.request_raw(peer_id, method, params).await
230 }
231}
232
233#[apply(async_trait_maybe_send!)]
234impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
235where
236 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
237{
238 async fn await_block(
239 &self,
240 session_idx: u64,
241 decoders: &ModuleDecoderRegistry,
242 ) -> anyhow::Result<SessionOutcome> {
243 let mut lru_lock = self.await_session_lru.lock().await;
244
245 let entry_arc = lru_lock
246 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
247 .clone();
248
249 drop(lru_lock);
251
252 entry_arc
253 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
254 .await
255 .cloned()
256 }
257
258 async fn get_session_status(
259 &self,
260 session_idx: u64,
261 decoders: &ModuleDecoderRegistry,
262 core_api_version: ApiVersion,
263 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
264 ) -> anyhow::Result<SessionStatus> {
265 let mut lru_lock = self.get_session_status_lru.lock().await;
266
267 let entry_arc = lru_lock
268 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
269 .clone();
270
271 drop(lru_lock);
273
274 enum NoCacheErr {
275 Initial,
276 Pending(Vec<AcceptedItem>),
277 Err(anyhow::Error),
278 }
279 match entry_arc
280 .get_or_try_init(|| async {
281 let session_status =
282 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
283 self.get_session_status_raw(session_idx, decoders).await
284 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
285 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
286 .await
287 } else {
288 self.get_session_status_raw(session_idx, decoders).await
289 };
290 match session_status {
291 Err(e) => Err(NoCacheErr::Err(e)),
292 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
293 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
294 Ok(SessionStatus::Complete(s)) => Ok(s),
296 }
297 })
298 .await
299 .cloned()
300 {
301 Ok(s) => Ok(SessionStatus::Complete(s)),
302 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
303 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
304 Err(NoCacheErr::Err(e)) => Err(e),
305 }
306 }
307
308 async fn submit_transaction(
309 &self,
310 tx: Transaction,
311 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
312 self.request_current_consensus_retry(
313 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
314 ApiRequestErased::new(SerdeTransaction::from(&tx)),
315 )
316 .await
317 }
318
319 async fn session_count(&self) -> FederationResult<u64> {
320 self.request_current_consensus(
321 SESSION_COUNT_ENDPOINT.to_owned(),
322 ApiRequestErased::default(),
323 )
324 .await
325 }
326
327 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
328 self.request_current_consensus_retry(
329 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
330 ApiRequestErased::new(txid),
331 )
332 .await
333 }
334
335 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
336 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
337 .await
338 }
339
340 async fn download_backup(
341 &self,
342 id: &secp256k1::PublicKey,
343 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
344 self.request_with_strategy(
345 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
346 RECOVER_ENDPOINT.to_owned(),
347 ApiRequestErased::new(id),
348 )
349 .await
350 }
351
352 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
353 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
354 .await
355 }
356
357 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
358 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
359 .await
360 }
361
362 async fn set_local_params(
363 &self,
364 name: String,
365 federation_name: Option<String>,
366 auth: ApiAuth,
367 ) -> FederationResult<String> {
368 self.request_admin(
369 SET_LOCAL_PARAMS_ENDPOINT,
370 ApiRequestErased::new(SetLocalParamsRequest {
371 name,
372 federation_name,
373 }),
374 auth,
375 )
376 .await
377 }
378
379 async fn add_peer_connection_info(
380 &self,
381 info: String,
382 auth: ApiAuth,
383 ) -> FederationResult<String> {
384 self.request_admin(
385 ADD_PEER_SETUP_CODE_ENDPOINT,
386 ApiRequestErased::new(info),
387 auth,
388 )
389 .await
390 }
391
392 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
393 self.request_admin(
394 RESET_PEER_SETUP_CODES_ENDPOINT,
395 ApiRequestErased::default(),
396 auth,
397 )
398 .await
399 }
400
401 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
402 self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
403 .await
404 }
405
406 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
407 self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
408 .await
409 }
410
411 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
412 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
413 .await
414 }
415
416 async fn get_verify_config_hash(
417 &self,
418 auth: ApiAuth,
419 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
420 self.request_admin(
421 VERIFY_CONFIG_HASH_ENDPOINT,
422 ApiRequestErased::default(),
423 auth,
424 )
425 .await
426 }
427
428 async fn verified_configs(
429 &self,
430 auth: ApiAuth,
431 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
432 self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
433 .await
434 }
435
436 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
437 self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
438 .await
439 }
440
441 async fn status(&self) -> FederationResult<StatusResponse> {
442 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
443 .await
444 }
445
446 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
447 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
448 .await
449 }
450
451 async fn guardian_config_backup(
452 &self,
453 auth: ApiAuth,
454 ) -> FederationResult<GuardianConfigBackup> {
455 self.request_admin(
456 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
457 ApiRequestErased::default(),
458 auth,
459 )
460 .await
461 }
462
463 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
464 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
465 .await
466 }
467
468 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
469 self.request_admin(
470 RESTART_FEDERATION_SETUP_ENDPOINT,
471 ApiRequestErased::default(),
472 auth,
473 )
474 .await
475 }
476
477 async fn submit_api_announcement(
478 &self,
479 announcement_peer_id: PeerId,
480 announcement: SignedApiAnnouncement,
481 ) -> FederationResult<()> {
482 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
483 let announcement_inner = announcement.clone();
484 async move {
485 (
486 peer_id,
487 self.request_single_peer::<()>(
488 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
489 ApiRequestErased::new(SignedApiAnnouncementSubmission {
490 signed_api_announcement: announcement_inner,
491 peer_id: announcement_peer_id,
492 }),
493 peer_id,
494 )
495 .await,
496 )
497 }
498 }))
499 .await
500 .into_iter()
501 .filter_map(|(peer_id, result)| match result {
502 Ok(()) => None,
503 Err(e) => Some((peer_id, e)),
504 })
505 .collect::<BTreeMap<_, _>>();
506
507 if peer_errors.is_empty() {
508 Ok(())
509 } else {
510 Err(FederationError {
511 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
512 params: serde_json::to_value(announcement).expect("can be serialized"),
513 general: None,
514 peer_errors,
515 })
516 }
517 }
518
519 async fn api_announcements(
520 &self,
521 guardian: PeerId,
522 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
523 self.request_single_peer(
524 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
525 ApiRequestErased::default(),
526 guardian,
527 )
528 .await
529 }
530
531 async fn sign_api_announcement(
532 &self,
533 api_url: SafeUrl,
534 auth: ApiAuth,
535 ) -> FederationResult<SignedApiAnnouncement> {
536 self.request_admin(
537 SIGN_API_ANNOUNCEMENT_ENDPOINT,
538 ApiRequestErased::new(api_url),
539 auth,
540 )
541 .await
542 }
543
544 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
545 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
546 .await
547 }
548
549 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
550 self.request_admin(
551 BACKUP_STATISTICS_ENDPOINT,
552 ApiRequestErased::default(),
553 auth,
554 )
555 .await
556 }
557
558 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
559 self.request_single_peer(
560 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
561 ApiRequestErased::default(),
562 peer_id,
563 )
564 .await
565 }
566
567 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode> {
568 self.request_single_peer(
569 INVITE_CODE_ENDPOINT.to_owned(),
570 ApiRequestErased::default(),
571 guardian,
572 )
573 .await
574 }
575}