1use std::collections::{BTreeMap, BTreeSet};
2use std::fmt::Debug;
3use std::num::NonZeroUsize;
4use std::sync::Arc;
5
6use anyhow::{anyhow, format_err};
7use bitcoin::hashes::sha256;
8use bitcoin::secp256k1;
9use fedimint_core::admin_client::{
10 GuardianConfigBackup, PeerServerParamsLegacy, SetLocalParamsRequest, SetupStatus,
11};
12use fedimint_core::backup::{BackupStatistics, ClientBackupSnapshot};
13use fedimint_core::core::ModuleInstanceId;
14use fedimint_core::core::backup::SignedBackupRequest;
15use fedimint_core::endpoint_constants::{
16 ADD_CONFIG_GEN_PEER_ENDPOINT, ADD_PEER_SETUP_CODE_ENDPOINT, API_ANNOUNCEMENTS_ENDPOINT,
17 AUDIT_ENDPOINT, AUTH_ENDPOINT, AWAIT_SESSION_OUTCOME_ENDPOINT, AWAIT_TRANSACTION_ENDPOINT,
18 BACKUP_ENDPOINT, BACKUP_STATISTICS_ENDPOINT, CHANGE_PASSWORD_ENDPOINT,
19 CONFIG_GEN_PEERS_ENDPOINT, FEDIMINTD_VERSION_ENDPOINT, GET_SETUP_CODE_ENDPOINT,
20 GUARDIAN_CONFIG_BACKUP_ENDPOINT, INVITE_CODE_ENDPOINT, RECOVER_ENDPOINT,
21 RESET_PEER_SETUP_CODES_ENDPOINT, RESTART_FEDERATION_SETUP_ENDPOINT, SESSION_COUNT_ENDPOINT,
22 SESSION_STATUS_ENDPOINT, SESSION_STATUS_V2_ENDPOINT, SET_LOCAL_PARAMS_ENDPOINT,
23 SET_PASSWORD_ENDPOINT, SETUP_STATUS_ENDPOINT, SHUTDOWN_ENDPOINT,
24 SIGN_API_ANNOUNCEMENT_ENDPOINT, START_CONSENSUS_ENDPOINT, START_DKG_ENDPOINT, STATUS_ENDPOINT,
25 SUBMIT_API_ANNOUNCEMENT_ENDPOINT, SUBMIT_TRANSACTION_ENDPOINT, VERIFIED_CONFIGS_ENDPOINT,
26 VERIFY_CONFIG_HASH_ENDPOINT,
27};
28use fedimint_core::invite_code::InviteCode;
29use fedimint_core::module::audit::AuditSummary;
30use fedimint_core::module::registry::ModuleDecoderRegistry;
31use fedimint_core::module::{
32 ApiAuth, ApiRequestErased, ApiVersion, SerdeModuleEncoding, SerdeModuleEncodingBase64,
33};
34use fedimint_core::net::api_announcement::{
35 SignedApiAnnouncement, SignedApiAnnouncementSubmission,
36};
37use fedimint_core::session_outcome::{
38 AcceptedItem, SessionOutcome, SessionStatus, SessionStatusV2,
39};
40use fedimint_core::task::{MaybeSend, MaybeSync};
41use fedimint_core::transaction::{SerdeTransaction, Transaction, TransactionSubmissionOutcome};
42use fedimint_core::util::SafeUrl;
43use fedimint_core::{NumPeersExt, PeerId, TransactionId, apply, async_trait_maybe_send};
44use fedimint_logging::LOG_CLIENT_NET_API;
45use futures::future::join_all;
46use itertools::Itertools;
47use rand::seq::SliceRandom;
48use serde_json::Value;
49use tokio::sync::OnceCell;
50use tracing::{debug, trace};
51
52use super::super::{DynModuleApi, IGlobalFederationApi, IRawFederationApi, StatusResponse};
53use crate::api::{
54 DynClientConnector, FederationApiExt, FederationError, FederationResult, PeerResult,
55 VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2,
56};
57use crate::query::FilterMapThreshold;
58
59pub trait GlobalFederationApiWithCacheExt
62where
63 Self: Sized,
64{
65 fn with_cache(self) -> GlobalFederationApiWithCache<Self>;
66}
67
68impl<T> GlobalFederationApiWithCacheExt for T
69where
70 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
71{
72 fn with_cache(self) -> GlobalFederationApiWithCache<T> {
73 GlobalFederationApiWithCache::new(self)
74 }
75}
76
77#[derive(Debug)]
82pub struct GlobalFederationApiWithCache<T> {
83 pub(crate) inner: T,
84 pub(crate) await_session_lru:
94 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
95
96 pub(crate) get_session_status_lru:
104 Arc<tokio::sync::Mutex<lru::LruCache<u64, Arc<OnceCell<SessionOutcome>>>>>,
105}
106
107impl<T> GlobalFederationApiWithCache<T> {
108 pub fn new(inner: T) -> GlobalFederationApiWithCache<T> {
109 Self {
110 inner,
111 await_session_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
112 NonZeroUsize::new(512).expect("is non-zero"),
113 ))),
114 get_session_status_lru: Arc::new(tokio::sync::Mutex::new(lru::LruCache::new(
115 NonZeroUsize::new(512).expect("is non-zero"),
116 ))),
117 }
118 }
119}
120
121impl<T> GlobalFederationApiWithCache<T>
122where
123 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
124{
125 pub(crate) async fn await_block_raw(
126 &self,
127 block_index: u64,
128 decoders: &ModuleDecoderRegistry,
129 ) -> anyhow::Result<SessionOutcome> {
130 if block_index % 100 == 0 {
131 debug!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
132 } else {
133 trace!(target: LOG_CLIENT_NET_API, block_index, "Awaiting block's outcome from Federation");
134 }
135 self.request_current_consensus::<SerdeModuleEncoding<SessionOutcome>>(
136 AWAIT_SESSION_OUTCOME_ENDPOINT.to_string(),
137 ApiRequestErased::new(block_index),
138 )
139 .await?
140 .try_into_inner(decoders)
141 .map_err(|e| anyhow!(e.to_string()))
142 }
143
144 pub(crate) fn select_peers_for_status(&self) -> impl Iterator<Item = PeerId> + '_ {
145 let mut peers = self.all_peers().iter().copied().collect_vec();
146 peers.shuffle(&mut rand::thread_rng());
147 peers.into_iter()
148 }
149
150 pub(crate) async fn get_session_status_raw_v2(
151 &self,
152 block_index: u64,
153 broadcast_public_keys: &BTreeMap<PeerId, secp256k1::PublicKey>,
154 decoders: &ModuleDecoderRegistry,
155 ) -> anyhow::Result<SessionStatus> {
156 if block_index % 100 == 0 {
157 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
158 } else {
159 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v2");
160 }
161 let params = ApiRequestErased::new(block_index);
162 let mut last_error = None;
163 for peer_id in self.select_peers_for_status() {
165 match self
166 .request_single_peer_federation::<SerdeModuleEncodingBase64<SessionStatusV2>>(
167 SESSION_STATUS_V2_ENDPOINT.to_string(),
168 params.clone(),
169 peer_id,
170 )
171 .await
172 .map_err(anyhow::Error::from)
173 .and_then(|s| Ok(s.try_into_inner(decoders)?))
174 {
175 Ok(SessionStatusV2::Complete(signed_session_outcome)) => {
176 if signed_session_outcome.verify(broadcast_public_keys, block_index) {
177 return Ok(SessionStatus::Complete(
179 signed_session_outcome.session_outcome,
180 ));
181 }
182 last_error = Some(format_err!("Invalid signature"));
183 }
184 Ok(SessionStatusV2::Initial | SessionStatusV2::Pending(..)) => {
185 return self.get_session_status_raw(block_index, decoders).await;
187 }
188 Err(err) => {
189 last_error = Some(err);
190 }
191 }
192 assert!(last_error.is_some());
194 }
195 Err(last_error.expect("must have at least one peer"))
196 }
197
198 pub(crate) async fn get_session_status_raw(
199 &self,
200 block_index: u64,
201 decoders: &ModuleDecoderRegistry,
202 ) -> anyhow::Result<SessionStatus> {
203 if block_index % 100 == 0 {
204 debug!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
205 } else {
206 trace!(target: LOG_CLIENT_NET_API, block_index, "Get session status raw v1");
207 }
208 self.request_current_consensus::<SerdeModuleEncoding<SessionStatus>>(
209 SESSION_STATUS_ENDPOINT.to_string(),
210 ApiRequestErased::new(block_index),
211 )
212 .await?
213 .try_into_inner(&decoders.clone().with_fallback())
214 .map_err(|e| anyhow!(e))
215 }
216}
217
218#[apply(async_trait_maybe_send!)]
219impl<T> IRawFederationApi for GlobalFederationApiWithCache<T>
220where
221 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
222{
223 fn all_peers(&self) -> &BTreeSet<PeerId> {
224 self.inner.all_peers()
225 }
226
227 fn self_peer(&self) -> Option<PeerId> {
228 self.inner.self_peer()
229 }
230
231 fn with_module(&self, id: ModuleInstanceId) -> DynModuleApi {
232 self.inner.with_module(id)
233 }
234
235 async fn request_raw(
237 &self,
238 peer_id: PeerId,
239 method: &str,
240 params: &ApiRequestErased,
241 ) -> PeerResult<Value> {
242 self.inner.request_raw(peer_id, method, params).await
243 }
244
245 fn connector(&self) -> &DynClientConnector {
246 self.inner.connector()
247 }
248}
249
250#[apply(async_trait_maybe_send!)]
251impl<T> IGlobalFederationApi for GlobalFederationApiWithCache<T>
252where
253 T: IRawFederationApi + MaybeSend + MaybeSync + 'static,
254{
255 async fn await_block(
256 &self,
257 session_idx: u64,
258 decoders: &ModuleDecoderRegistry,
259 ) -> anyhow::Result<SessionOutcome> {
260 let mut lru_lock = self.await_session_lru.lock().await;
261
262 let entry_arc = lru_lock
263 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
264 .clone();
265
266 drop(lru_lock);
268
269 entry_arc
270 .get_or_try_init(|| self.await_block_raw(session_idx, decoders))
271 .await
272 .cloned()
273 }
274
275 async fn get_session_status(
276 &self,
277 session_idx: u64,
278 decoders: &ModuleDecoderRegistry,
279 core_api_version: ApiVersion,
280 broadcast_public_keys: Option<&BTreeMap<PeerId, secp256k1::PublicKey>>,
281 ) -> anyhow::Result<SessionStatus> {
282 let mut lru_lock = self.get_session_status_lru.lock().await;
283
284 let entry_arc = lru_lock
285 .get_or_insert(session_idx, || Arc::new(OnceCell::new()))
286 .clone();
287
288 drop(lru_lock);
290
291 enum NoCacheErr {
292 Initial,
293 Pending(Vec<AcceptedItem>),
294 Err(anyhow::Error),
295 }
296 match entry_arc
297 .get_or_try_init(|| async {
298 let session_status =
299 if core_api_version < VERSION_THAT_INTRODUCED_GET_SESSION_STATUS_V2 {
300 self.get_session_status_raw(session_idx, decoders).await
301 } else if let Some(broadcast_public_keys) = broadcast_public_keys {
302 self.get_session_status_raw_v2(session_idx, broadcast_public_keys, decoders)
303 .await
304 } else {
305 self.get_session_status_raw(session_idx, decoders).await
306 };
307 match session_status {
308 Err(e) => Err(NoCacheErr::Err(e)),
309 Ok(SessionStatus::Initial) => Err(NoCacheErr::Initial),
310 Ok(SessionStatus::Pending(s)) => Err(NoCacheErr::Pending(s)),
311 Ok(SessionStatus::Complete(s)) => Ok(s),
313 }
314 })
315 .await
316 .cloned()
317 {
318 Ok(s) => Ok(SessionStatus::Complete(s)),
319 Err(NoCacheErr::Initial) => Ok(SessionStatus::Initial),
320 Err(NoCacheErr::Pending(s)) => Ok(SessionStatus::Pending(s)),
321 Err(NoCacheErr::Err(e)) => Err(e),
322 }
323 }
324
325 async fn submit_transaction(
326 &self,
327 tx: Transaction,
328 ) -> SerdeModuleEncoding<TransactionSubmissionOutcome> {
329 self.request_current_consensus_retry(
330 SUBMIT_TRANSACTION_ENDPOINT.to_owned(),
331 ApiRequestErased::new(SerdeTransaction::from(&tx)),
332 )
333 .await
334 }
335
336 async fn session_count(&self) -> FederationResult<u64> {
337 self.request_current_consensus(
338 SESSION_COUNT_ENDPOINT.to_owned(),
339 ApiRequestErased::default(),
340 )
341 .await
342 }
343
344 async fn await_transaction(&self, txid: TransactionId) -> TransactionId {
345 self.request_current_consensus_retry(
346 AWAIT_TRANSACTION_ENDPOINT.to_owned(),
347 ApiRequestErased::new(txid),
348 )
349 .await
350 }
351
352 async fn upload_backup(&self, request: &SignedBackupRequest) -> FederationResult<()> {
353 self.request_current_consensus(BACKUP_ENDPOINT.to_owned(), ApiRequestErased::new(request))
354 .await
355 }
356
357 async fn download_backup(
358 &self,
359 id: &secp256k1::PublicKey,
360 ) -> FederationResult<BTreeMap<PeerId, Option<ClientBackupSnapshot>>> {
361 self.request_with_strategy(
362 FilterMapThreshold::new(|_, snapshot| Ok(snapshot), self.all_peers().to_num_peers()),
363 RECOVER_ENDPOINT.to_owned(),
364 ApiRequestErased::new(id),
365 )
366 .await
367 }
368
369 async fn set_password(&self, auth: ApiAuth) -> FederationResult<()> {
370 self.request_admin(SET_PASSWORD_ENDPOINT, ApiRequestErased::default(), auth)
371 .await
372 }
373
374 async fn setup_status(&self, auth: ApiAuth) -> FederationResult<SetupStatus> {
375 self.request_admin(SETUP_STATUS_ENDPOINT, ApiRequestErased::default(), auth)
376 .await
377 }
378
379 async fn set_local_params(
380 &self,
381 name: String,
382 federation_name: Option<String>,
383 disable_base_fees: Option<bool>,
384 auth: ApiAuth,
385 ) -> FederationResult<String> {
386 self.request_admin(
387 SET_LOCAL_PARAMS_ENDPOINT,
388 ApiRequestErased::new(SetLocalParamsRequest {
389 name,
390 federation_name,
391 disable_base_fees,
392 }),
393 auth,
394 )
395 .await
396 }
397
398 async fn add_peer_connection_info(
399 &self,
400 info: String,
401 auth: ApiAuth,
402 ) -> FederationResult<String> {
403 self.request_admin(
404 ADD_PEER_SETUP_CODE_ENDPOINT,
405 ApiRequestErased::new(info),
406 auth,
407 )
408 .await
409 }
410
411 async fn reset_peer_setup_codes(&self, auth: ApiAuth) -> FederationResult<()> {
412 self.request_admin(
413 RESET_PEER_SETUP_CODES_ENDPOINT,
414 ApiRequestErased::default(),
415 auth,
416 )
417 .await
418 }
419
420 async fn get_setup_code(&self, auth: ApiAuth) -> FederationResult<Option<String>> {
421 self.request_admin(GET_SETUP_CODE_ENDPOINT, ApiRequestErased::default(), auth)
422 .await
423 }
424
425 async fn add_config_gen_peer(&self, peer: PeerServerParamsLegacy) -> FederationResult<()> {
426 self.request_admin_no_auth(ADD_CONFIG_GEN_PEER_ENDPOINT, ApiRequestErased::new(peer))
427 .await
428 }
429
430 async fn get_config_gen_peers(&self) -> FederationResult<Vec<PeerServerParamsLegacy>> {
431 self.request_admin_no_auth(CONFIG_GEN_PEERS_ENDPOINT, ApiRequestErased::default())
432 .await
433 }
434
435 async fn start_dkg(&self, auth: ApiAuth) -> FederationResult<()> {
436 self.request_admin(START_DKG_ENDPOINT, ApiRequestErased::default(), auth)
437 .await
438 }
439
440 async fn get_verify_config_hash(
441 &self,
442 auth: ApiAuth,
443 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
444 self.request_admin(
445 VERIFY_CONFIG_HASH_ENDPOINT,
446 ApiRequestErased::default(),
447 auth,
448 )
449 .await
450 }
451
452 async fn verified_configs(
453 &self,
454 auth: ApiAuth,
455 ) -> FederationResult<BTreeMap<PeerId, sha256::Hash>> {
456 self.request_admin(VERIFIED_CONFIGS_ENDPOINT, ApiRequestErased::default(), auth)
457 .await
458 }
459
460 async fn start_consensus(&self, auth: ApiAuth) -> FederationResult<()> {
461 self.request_admin(START_CONSENSUS_ENDPOINT, ApiRequestErased::default(), auth)
462 .await
463 }
464
465 async fn status(&self) -> FederationResult<StatusResponse> {
466 self.request_admin_no_auth(STATUS_ENDPOINT, ApiRequestErased::default())
467 .await
468 }
469
470 async fn audit(&self, auth: ApiAuth) -> FederationResult<AuditSummary> {
471 self.request_admin(AUDIT_ENDPOINT, ApiRequestErased::default(), auth)
472 .await
473 }
474
475 async fn guardian_config_backup(
476 &self,
477 auth: ApiAuth,
478 ) -> FederationResult<GuardianConfigBackup> {
479 self.request_admin(
480 GUARDIAN_CONFIG_BACKUP_ENDPOINT,
481 ApiRequestErased::default(),
482 auth,
483 )
484 .await
485 }
486
487 async fn auth(&self, auth: ApiAuth) -> FederationResult<()> {
488 self.request_admin(AUTH_ENDPOINT, ApiRequestErased::default(), auth)
489 .await
490 }
491
492 async fn restart_federation_setup(&self, auth: ApiAuth) -> FederationResult<()> {
493 self.request_admin(
494 RESTART_FEDERATION_SETUP_ENDPOINT,
495 ApiRequestErased::default(),
496 auth,
497 )
498 .await
499 }
500
501 async fn submit_api_announcement(
502 &self,
503 announcement_peer_id: PeerId,
504 announcement: SignedApiAnnouncement,
505 ) -> FederationResult<()> {
506 let peer_errors = join_all(self.all_peers().iter().map(|&peer_id| {
507 let announcement_inner = announcement.clone();
508 async move {
509 (
510 peer_id,
511 self.request_single_peer::<()>(
512 SUBMIT_API_ANNOUNCEMENT_ENDPOINT.into(),
513 ApiRequestErased::new(SignedApiAnnouncementSubmission {
514 signed_api_announcement: announcement_inner,
515 peer_id: announcement_peer_id,
516 }),
517 peer_id,
518 )
519 .await,
520 )
521 }
522 }))
523 .await
524 .into_iter()
525 .filter_map(|(peer_id, result)| match result {
526 Ok(()) => None,
527 Err(e) => Some((peer_id, e)),
528 })
529 .collect::<BTreeMap<_, _>>();
530
531 if peer_errors.is_empty() {
532 Ok(())
533 } else {
534 Err(FederationError {
535 method: SUBMIT_API_ANNOUNCEMENT_ENDPOINT.to_string(),
536 params: serde_json::to_value(announcement).expect("can be serialized"),
537 general: None,
538 peer_errors,
539 })
540 }
541 }
542
543 async fn api_announcements(
544 &self,
545 guardian: PeerId,
546 ) -> PeerResult<BTreeMap<PeerId, SignedApiAnnouncement>> {
547 self.request_single_peer(
548 API_ANNOUNCEMENTS_ENDPOINT.to_owned(),
549 ApiRequestErased::default(),
550 guardian,
551 )
552 .await
553 }
554
555 async fn sign_api_announcement(
556 &self,
557 api_url: SafeUrl,
558 auth: ApiAuth,
559 ) -> FederationResult<SignedApiAnnouncement> {
560 self.request_admin(
561 SIGN_API_ANNOUNCEMENT_ENDPOINT,
562 ApiRequestErased::new(api_url),
563 auth,
564 )
565 .await
566 }
567
568 async fn shutdown(&self, session: Option<u64>, auth: ApiAuth) -> FederationResult<()> {
569 self.request_admin(SHUTDOWN_ENDPOINT, ApiRequestErased::new(session), auth)
570 .await
571 }
572
573 async fn backup_statistics(&self, auth: ApiAuth) -> FederationResult<BackupStatistics> {
574 self.request_admin(
575 BACKUP_STATISTICS_ENDPOINT,
576 ApiRequestErased::default(),
577 auth,
578 )
579 .await
580 }
581
582 async fn fedimintd_version(&self, peer_id: PeerId) -> PeerResult<String> {
583 self.request_single_peer(
584 FEDIMINTD_VERSION_ENDPOINT.to_owned(),
585 ApiRequestErased::default(),
586 peer_id,
587 )
588 .await
589 }
590
591 async fn get_invite_code(&self, guardian: PeerId) -> PeerResult<InviteCode> {
592 self.request_single_peer(
593 INVITE_CODE_ENDPOINT.to_owned(),
594 ApiRequestErased::default(),
595 guardian,
596 )
597 .await
598 }
599
600 async fn change_password(&self, auth: ApiAuth, new_password: &str) -> FederationResult<()> {
601 self.request_admin(
602 CHANGE_PASSWORD_ENDPOINT,
603 ApiRequestErased::new(new_password),
604 auth,
605 )
606 .await
607 }
608}