1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17 ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21 ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22 IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30 TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33 AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34 ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37 ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41 AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42 IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50 ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51 SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::backoff_util::custom_backoff;
57use fedimint_core::util::{
58 BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
59};
60use fedimint_core::{
61 Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
62 maybe_add_send_sync, runtime,
63};
64use fedimint_derive_secret::DerivableSecret;
65use fedimint_eventlog::{
66 DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
67 EventPersistence, PersistedLogEntry,
68};
69use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
70use futures::stream::FuturesUnordered;
71use futures::{Stream, StreamExt as _};
72use global_ctx::ModuleGlobalClientContext;
73use serde::{Deserialize, Serialize};
74use tokio::sync::{broadcast, watch};
75use tokio_stream::wrappers::WatchStream;
76use tracing::{debug, info, warn};
77
78use crate::ClientBuilder;
79use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
80use crate::backup::Metadata;
81use crate::db::{
82 ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
83 ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
84 EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
85 PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
86 get_decoded_client_secret, verify_client_db_integrity_dbtx,
87};
88use crate::meta::MetaService;
89use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
90use crate::oplog::OperationLog;
91use crate::sm::executor::{
92 ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
93 InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
94};
95
96pub(crate) mod builder;
97pub(crate) mod global_ctx;
98pub(crate) mod handle;
99
100const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
104 &[ApiVersion { major: 0, minor: 0 }];
105
106pub struct Client {
120 final_client: FinalClientIface,
121 config: tokio::sync::RwLock<ClientConfig>,
122 api_secret: Option<String>,
123 decoders: ModuleDecoderRegistry,
124 db: Database,
125 federation_id: FederationId,
126 federation_config_meta: BTreeMap<String, String>,
127 primary_module_instance: ModuleInstanceId,
128 pub(crate) modules: ClientModuleRegistry,
129 module_inits: ClientModuleInitRegistry,
130 executor: Executor,
131 pub(crate) api: DynGlobalApi,
132 root_secret: DerivableSecret,
133 operation_log: OperationLog,
134 secp_ctx: Secp256k1<secp256k1::All>,
135 meta_service: Arc<MetaService>,
136 connector: Connector,
137
138 task_group: TaskGroup,
139
140 client_recovery_progress_receiver:
142 watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
143
144 log_ordering_wakeup_tx: watch::Sender<()>,
147 log_event_added_rx: watch::Receiver<()>,
149 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
150 request_hook: ApiRequestHook,
151 iroh_enable_dht: bool,
152 iroh_enable_next: bool,
153}
154
155#[derive(Debug, Serialize, Deserialize)]
156struct ListOperationsParams {
157 limit: Option<usize>,
158 last_seen: Option<ChronologicalOperationLogKey>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct GetOperationIdRequest {
163 operation_id: OperationId,
164}
165
166impl Client {
167 pub async fn builder() -> anyhow::Result<ClientBuilder> {
170 Ok(ClientBuilder::new())
171 }
172
173 pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
174 self.api.as_ref()
175 }
176
177 pub fn api_clone(&self) -> DynGlobalApi {
178 self.api.clone()
179 }
180
181 pub fn task_group(&self) -> &TaskGroup {
183 &self.task_group
184 }
185
186 #[doc(hidden)]
188 pub fn executor(&self) -> &Executor {
189 &self.executor
190 }
191
192 pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
193 let mut dbtx = db.begin_transaction_nc().await;
194 dbtx.get_value(&ClientConfigKey).await
195 }
196
197 pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
198 let mut dbtx = db.begin_transaction_nc().await;
199 dbtx.get_value(&PendingClientConfigKey).await
200 }
201
202 pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
203 let mut dbtx = db.begin_transaction_nc().await;
204 dbtx.get_value(&ApiSecretKey).await
205 }
206
207 pub async fn store_encodable_client_secret<T: Encodable>(
208 db: &Database,
209 secret: T,
210 ) -> anyhow::Result<()> {
211 let mut dbtx = db.begin_transaction().await;
212
213 if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
215 bail!("Encoded client secret already exists, cannot overwrite")
216 }
217
218 let encoded_secret = T::consensus_encode_to_vec(&secret);
219 dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
220 .await;
221 dbtx.commit_tx().await;
222 Ok(())
223 }
224
225 pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
226 let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
227 bail!("Encoded client secret not present in DB")
228 };
229
230 Ok(secret)
231 }
232 pub async fn load_decodable_client_secret_opt<T: Decodable>(
233 db: &Database,
234 ) -> anyhow::Result<Option<T>> {
235 let mut dbtx = db.begin_transaction_nc().await;
236
237 let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
238
239 Ok(match client_secret {
240 Some(client_secret) => Some(
241 T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
242 .map_err(|e| anyhow!("Decoding failed: {e}"))?,
243 ),
244 None => None,
245 })
246 }
247
248 pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
249 let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
250 Ok(secret) => secret,
251 _ => {
252 let secret = PlainRootSecretStrategy::random(&mut thread_rng());
253 Self::store_encodable_client_secret(db, secret)
254 .await
255 .expect("Storing client secret must work");
256 secret
257 }
258 };
259 Ok(client_secret)
260 }
261
262 pub async fn is_initialized(db: &Database) -> bool {
263 let mut dbtx = db.begin_transaction_nc().await;
264 dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
265 .await
266 .expect("Unrecoverable error occurred while reading and entry from the database")
267 .is_some()
268 }
269
270 pub fn start_executor(self: &Arc<Self>) {
271 debug!(
272 target: LOG_CLIENT,
273 "Starting fedimint client executor",
274 );
275 self.executor.start_executor(self.context_gen());
276 }
277
278 pub fn federation_id(&self) -> FederationId {
279 self.federation_id
280 }
281
282 fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
283 let client_inner = Arc::downgrade(self);
284 Arc::new(move |module_instance, operation| {
285 ModuleGlobalClientContext {
286 client: client_inner
287 .clone()
288 .upgrade()
289 .expect("ModuleGlobalContextGen called after client was dropped"),
290 module_instance_id: module_instance,
291 operation,
292 }
293 .into()
294 })
295 }
296
297 pub async fn config(&self) -> ClientConfig {
298 self.config.read().await.clone()
299 }
300
301 pub fn api_secret(&self) -> &Option<String> {
302 &self.api_secret
303 }
304
305 pub fn decoders(&self) -> &ModuleDecoderRegistry {
306 &self.decoders
307 }
308
309 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
311 self.try_get_module(instance)
312 .expect("Module instance not found")
313 }
314
315 fn try_get_module(
316 &self,
317 instance: ModuleInstanceId,
318 ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
319 Some(self.modules.get(instance)?.as_ref())
320 }
321
322 pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
323 self.modules.get(instance).is_some()
324 }
325
326 fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
332 let mut in_amount = Amount::ZERO;
334 let mut out_amount = Amount::ZERO;
335 let mut fee_amount = Amount::ZERO;
336
337 for input in builder.inputs() {
338 let module = self.get_module(input.input.module_instance_id());
339
340 let item_fee = module.input_fee(input.amount, &input.input).expect(
341 "We only build transactions with input versions that are supported by the module",
342 );
343
344 in_amount += input.amount;
345 fee_amount += item_fee;
346 }
347
348 for output in builder.outputs() {
349 let module = self.get_module(output.output.module_instance_id());
350
351 let item_fee = module.output_fee(output.amount, &output.output).expect(
352 "We only build transactions with output versions that are supported by the module",
353 );
354
355 out_amount += output.amount;
356 fee_amount += item_fee;
357 }
358
359 (in_amount, out_amount + fee_amount)
360 }
361
362 pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
363 Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
364 }
365
366 pub fn get_config_meta(&self, key: &str) -> Option<String> {
368 self.federation_config_meta.get(key).cloned()
369 }
370
371 pub(crate) fn root_secret(&self) -> DerivableSecret {
372 self.root_secret.clone()
373 }
374
375 pub async fn add_state_machines(
376 &self,
377 dbtx: &mut DatabaseTransaction<'_>,
378 states: Vec<DynState>,
379 ) -> AddStateMachinesResult {
380 self.executor.add_state_machines_dbtx(dbtx, states).await
381 }
382
383 pub async fn get_active_operations(&self) -> HashSet<OperationId> {
385 let active_states = self.executor.get_active_states().await;
386 let mut active_operations = HashSet::with_capacity(active_states.len());
387 let mut dbtx = self.db().begin_transaction_nc().await;
388 for (state, _) in active_states {
389 let operation_id = state.operation_id();
390 if dbtx
391 .get_value(&OperationLogKey { operation_id })
392 .await
393 .is_some()
394 {
395 active_operations.insert(operation_id);
396 }
397 }
398 active_operations
399 }
400
401 pub fn operation_log(&self) -> &OperationLog {
402 &self.operation_log
403 }
404
405 pub fn meta_service(&self) -> &Arc<MetaService> {
407 &self.meta_service
408 }
409
410 pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
412 let meta_service = self.meta_service();
413 let ts = meta_service
414 .get_field::<u64>(self.db(), "federation_expiry_timestamp")
415 .await
416 .and_then(|v| v.value)?;
417 Some(UNIX_EPOCH + Duration::from_secs(ts))
418 }
419
420 async fn finalize_transaction(
422 &self,
423 dbtx: &mut DatabaseTransaction<'_>,
424 operation_id: OperationId,
425 mut partial_transaction: TransactionBuilder,
426 ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
427 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
428 let primary_module = self
429 .primary_module()
430 .ok_or(anyhow!("No primary module available"))?;
431
432 let (added_input_bundle, change_outputs) = primary_module
433 .create_final_inputs_and_outputs(
434 self.primary_module_instance,
435 dbtx,
436 operation_id,
437 input_amount,
438 output_amount,
439 )
440 .await?;
441
442 let change_range = Range {
446 start: partial_transaction.outputs().count() as u64,
447 end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
448 };
449
450 partial_transaction = partial_transaction
451 .with_inputs(added_input_bundle)
452 .with_outputs(change_outputs);
453
454 let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
455
456 assert!(input_amount >= output_amount, "Transaction is underfunded");
457
458 let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
459
460 Ok((tx, states, change_range))
461 }
462
463 pub async fn finalize_and_submit_transaction<F, M>(
475 &self,
476 operation_id: OperationId,
477 operation_type: &str,
478 operation_meta_gen: F,
479 tx_builder: TransactionBuilder,
480 ) -> anyhow::Result<OutPointRange>
481 where
482 F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
483 M: serde::Serialize + MaybeSend,
484 {
485 let operation_type = operation_type.to_owned();
486
487 let autocommit_res = self
488 .db
489 .autocommit(
490 |dbtx, _| {
491 let operation_type = operation_type.clone();
492 let tx_builder = tx_builder.clone();
493 let operation_meta_gen = operation_meta_gen.clone();
494 Box::pin(async move {
495 if Client::operation_exists_dbtx(dbtx, operation_id).await {
496 bail!("There already exists an operation with id {operation_id:?}")
497 }
498
499 let out_point_range = self
500 .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
501 .await?;
502
503 self.operation_log()
504 .add_operation_log_entry_dbtx(
505 dbtx,
506 operation_id,
507 &operation_type,
508 operation_meta_gen(out_point_range),
509 )
510 .await;
511
512 Ok(out_point_range)
513 })
514 },
515 Some(100), )
517 .await;
518
519 match autocommit_res {
520 Ok(txid) => Ok(txid),
521 Err(AutocommitError::ClosureError { error, .. }) => Err(error),
522 Err(AutocommitError::CommitFailed {
523 attempts,
524 last_error,
525 }) => panic!(
526 "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
527 ),
528 }
529 }
530
531 async fn finalize_and_submit_transaction_inner(
532 &self,
533 dbtx: &mut DatabaseTransaction<'_>,
534 operation_id: OperationId,
535 tx_builder: TransactionBuilder,
536 ) -> anyhow::Result<OutPointRange> {
537 let (transaction, mut states, change_range) = self
538 .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
539 .await?;
540
541 if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
542 let inputs = transaction
543 .inputs
544 .iter()
545 .map(DynInput::module_instance_id)
546 .collect::<Vec<_>>();
547 let outputs = transaction
548 .outputs
549 .iter()
550 .map(DynOutput::module_instance_id)
551 .collect::<Vec<_>>();
552 warn!(
553 target: LOG_CLIENT_NET_API,
554 size=%transaction.consensus_encode_to_vec().len(),
555 ?inputs,
556 ?outputs,
557 "Transaction too large",
558 );
559 debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
560 bail!(
561 "The generated transaction would be rejected by the federation for being too large."
562 );
563 }
564
565 let txid = transaction.tx_hash();
566
567 debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction, "Finalized and submitting transaction");
568
569 let tx_submission_sm = DynState::from_typed(
570 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
571 TxSubmissionStatesSM {
572 operation_id,
573 state: TxSubmissionStates::Created(transaction),
574 },
575 );
576 states.push(tx_submission_sm);
577
578 self.executor.add_state_machines_dbtx(dbtx, states).await?;
579
580 self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
581 .await;
582
583 Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
584 }
585
586 async fn transaction_update_stream(
587 &self,
588 operation_id: OperationId,
589 ) -> BoxStream<'static, TxSubmissionStatesSM> {
590 self.executor
591 .notifier()
592 .module_notifier::<TxSubmissionStatesSM>(
593 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
594 self.final_client.clone(),
595 )
596 .subscribe(operation_id)
597 .await
598 }
599
600 pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
601 let mut dbtx = self.db().begin_transaction_nc().await;
602
603 Client::operation_exists_dbtx(&mut dbtx, operation_id).await
604 }
605
606 pub async fn operation_exists_dbtx(
607 dbtx: &mut DatabaseTransaction<'_>,
608 operation_id: OperationId,
609 ) -> bool {
610 let active_state_exists = dbtx
611 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
612 .await
613 .next()
614 .await
615 .is_some();
616
617 let inactive_state_exists = dbtx
618 .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
619 .await
620 .next()
621 .await
622 .is_some();
623
624 active_state_exists || inactive_state_exists
625 }
626
627 pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
628 self.db
629 .begin_transaction_nc()
630 .await
631 .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
632 .await
633 .next()
634 .await
635 .is_some()
636 }
637
638 pub async fn await_primary_module_output(
641 &self,
642 operation_id: OperationId,
643 out_point: OutPoint,
644 ) -> anyhow::Result<()> {
645 self.primary_module()
646 .ok_or(anyhow!("Primary module not available"))?
647 .await_primary_module_output(operation_id, out_point)
648 .await
649 }
650
651 pub fn get_first_module<M: ClientModule>(
653 &'_ self,
654 ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
655 let module_kind = M::kind();
656 let id = self
657 .get_first_instance(&module_kind)
658 .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
659 let module: &M = self
660 .try_get_module(id)
661 .ok_or_else(|| format_err!("Unknown module instance {id}"))?
662 .as_any()
663 .downcast_ref::<M>()
664 .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
665 let (db, _) = self.db().with_prefix_module_id(id);
666 Ok(ClientModuleInstance {
667 id,
668 db,
669 api: self.api().with_module(id),
670 module,
671 })
672 }
673
674 pub fn get_module_client_dyn(
675 &self,
676 instance_id: ModuleInstanceId,
677 ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
678 self.try_get_module(instance_id)
679 .ok_or(anyhow!("Unknown module instance {}", instance_id))
680 }
681
682 pub fn db(&self) -> &Database {
683 &self.db
684 }
685
686 pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
689 TransactionUpdates {
690 update_stream: self.transaction_update_stream(operation_id).await,
691 }
692 }
693
694 pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
698 if self
699 .modules
700 .get_with_kind(self.primary_module_instance)
701 .is_some_and(|(kind, _)| kind == module_kind)
702 {
703 return Some(self.primary_module_instance);
704 }
705
706 self.modules
707 .iter_modules()
708 .find(|(_, kind, _module)| *kind == module_kind)
709 .map(|(instance_id, _, _)| instance_id)
710 }
711
712 pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
715 get_decoded_client_secret::<T>(self.db()).await
716 }
717
718 pub async fn await_primary_module_outputs(
721 &self,
722 operation_id: OperationId,
723 outputs: Vec<OutPoint>,
724 ) -> anyhow::Result<()> {
725 for out_point in outputs {
726 self.await_primary_module_output(operation_id, out_point)
727 .await?;
728 }
729
730 Ok(())
731 }
732
733 pub async fn get_config_json(&self) -> JsonClientConfig {
739 self.config().await.to_json()
740 }
741
742 pub fn primary_module(&self) -> Option<&DynClientModule> {
744 self.modules.get(self.primary_module_instance)
745 }
746
747 pub async fn get_balance(&self) -> Option<Amount> {
751 Some(
752 self.primary_module()?
753 .get_balance(
754 self.primary_module_instance,
755 &mut self.db().begin_transaction_nc().await,
756 )
757 .await,
758 )
759 }
760
761 #[doc(hidden)]
764 pub async fn get_balance_err(&self) -> anyhow::Result<Amount> {
767 self.get_balance()
768 .await
769 .ok_or_else(|| anyhow!("Primary module not available"))
770 }
771
772 pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
775 let primary_module_things = if let Some(primary_module) = self.primary_module() {
776 let balance_changes = primary_module.subscribe_balance_changes().await;
777 let initial_balance = self.get_balance().await.expect("Primary is present");
778
779 Some((primary_module.clone(), balance_changes, initial_balance))
780 } else {
781 None
782 };
783 let db = self.db().clone();
784 let primary_module_instance = self.primary_module_instance;
785
786 Box::pin(async_stream::stream! {
787 let Some((primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
788 pending().await
791 };
792
793
794 yield initial_balance;
795 let mut prev_balance = initial_balance;
796 while let Some(()) = balance_changes.next().await {
797 let mut dbtx = db.begin_transaction_nc().await;
798 let balance = primary_module
799 .get_balance(primary_module_instance, &mut dbtx)
800 .await;
801
802 if balance != prev_balance {
804 prev_balance = balance;
805 yield balance;
806 }
807 }
808 })
809 }
810
811 pub async fn fetch_common_api_versions_from_all_peers(
814 num_peers: NumPeers,
815 api: DynGlobalApi,
816 db: Database,
817 num_responses_sender: watch::Sender<usize>,
818 ) {
819 let mut backoff =
822 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
823
824 async fn make_request(
829 delay: Duration,
830 peer_id: PeerId,
831 api: &DynGlobalApi,
832 ) -> (
833 PeerId,
834 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
835 ) {
836 runtime::sleep(delay).await;
837 (
838 peer_id,
839 api.request_single_peer::<SupportedApiVersionsSummary>(
840 VERSION_ENDPOINT.to_owned(),
841 ApiRequestErased::default(),
842 peer_id,
843 )
844 .await,
845 )
846 }
847
848 let mut requests = FuturesUnordered::new();
851
852 for peer_id in num_peers.peer_ids() {
853 requests.push(make_request(Duration::ZERO, peer_id, &api));
854 }
855
856 let mut num_responses = 0;
857
858 while let Some((peer_id, response)) = requests.next().await {
859 let retry = match response {
860 Err(err) => {
861 let has_previous_response = db
862 .begin_transaction_nc()
863 .await
864 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
865 .await
866 .is_some();
867 debug!(
868 target: LOG_CLIENT,
869 %peer_id,
870 err = %err.fmt_compact(),
871 %has_previous_response,
872 "Failed to refresh API versions of a peer"
873 );
874
875 !has_previous_response
876 }
877 Ok(o) => {
878 let mut dbtx = db.begin_transaction().await;
881 dbtx.insert_entry(
882 &PeerLastApiVersionsSummaryKey(peer_id),
883 &PeerLastApiVersionsSummary(o),
884 )
885 .await;
886 dbtx.commit_tx().await;
887 false
888 }
889 };
890
891 if retry {
892 requests.push(make_request(
893 backoff.next().expect("Keeps retrying"),
894 peer_id,
895 &api,
896 ));
897 } else {
898 num_responses += 1;
899 num_responses_sender.send_replace(num_responses);
900 }
901 }
902 }
903
904 pub async fn fetch_peers_api_versions_from_threshold_of_peers(
908 num_peers: NumPeers,
909 api: DynGlobalApi,
910 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
911 let mut backoff =
914 custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
915
916 async fn make_request(
921 delay: Duration,
922 peer_id: PeerId,
923 api: &DynGlobalApi,
924 ) -> (
925 PeerId,
926 Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
927 ) {
928 runtime::sleep(delay).await;
929 (
930 peer_id,
931 api.request_single_peer::<SupportedApiVersionsSummary>(
932 VERSION_ENDPOINT.to_owned(),
933 ApiRequestErased::default(),
934 peer_id,
935 )
936 .await,
937 )
938 }
939
940 let mut requests = FuturesUnordered::new();
943
944 for peer_id in num_peers.peer_ids() {
945 requests.push(make_request(Duration::ZERO, peer_id, &api));
946 }
947
948 let mut successful_responses = BTreeMap::new();
949
950 while successful_responses.len() < num_peers.threshold()
951 && let Some((peer_id, response)) = requests.next().await
952 {
953 let retry = match response {
954 Err(err) => {
955 debug!(
956 target: LOG_CLIENT,
957 %peer_id,
958 err = %err.fmt_compact(),
959 "Failed to fetch API versions from peer"
960 );
961 true
962 }
963 Ok(response) => {
964 successful_responses.insert(peer_id, response);
965 false
966 }
967 };
968
969 if retry {
970 requests.push(make_request(
971 backoff.next().expect("Keeps retrying"),
972 peer_id,
973 &api,
974 ));
975 }
976 }
977
978 successful_responses
979 }
980
981 pub async fn fetch_common_api_versions(
983 config: &ClientConfig,
984 api: &DynGlobalApi,
985 ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
986 debug!(
987 target: LOG_CLIENT,
988 "Fetching common api versions"
989 );
990
991 let num_peers = NumPeers::from(config.global.api_endpoints.len());
992
993 let peer_api_version_sets =
994 Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
995
996 Ok(peer_api_version_sets)
997 }
998
999 pub async fn write_api_version_cache(
1003 dbtx: &mut DatabaseTransaction<'_>,
1004 api_version_set: ApiVersionSet,
1005 ) {
1006 debug!(
1007 target: LOG_CLIENT,
1008 value = ?api_version_set,
1009 "Writing API version set to cache"
1010 );
1011
1012 dbtx.insert_entry(
1013 &CachedApiVersionSetKey,
1014 &CachedApiVersionSet(api_version_set),
1015 )
1016 .await;
1017 }
1018
1019 pub async fn store_prefetched_api_versions(
1024 db: &Database,
1025 config: &ClientConfig,
1026 client_module_init: &ClientModuleInitRegistry,
1027 peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1028 ) {
1029 debug!(
1030 target: LOG_CLIENT,
1031 "Storing {} prefetched peer API version responses and calculating common version set",
1032 peer_api_versions.len()
1033 );
1034
1035 let mut dbtx = db.begin_transaction().await;
1036 let client_supported_versions =
1038 Self::supported_api_versions_summary_static(config, client_module_init);
1039 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1040 &client_supported_versions,
1041 peer_api_versions,
1042 ) {
1043 Ok(common_api_versions) => {
1044 Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1046 debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1047 }
1048 Err(err) => {
1049 debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1050 }
1051 }
1052
1053 for (peer_id, peer_api_versions) in peer_api_versions {
1055 dbtx.insert_entry(
1056 &PeerLastApiVersionsSummaryKey(*peer_id),
1057 &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1058 )
1059 .await;
1060 }
1061 dbtx.commit_tx().await;
1062 debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1063 }
1064
1065 pub fn supported_api_versions_summary_static(
1067 config: &ClientConfig,
1068 client_module_init: &ClientModuleInitRegistry,
1069 ) -> SupportedApiVersionsSummary {
1070 SupportedApiVersionsSummary {
1071 core: SupportedCoreApiVersions {
1072 core_consensus: config.global.consensus_version,
1073 api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1074 .expect("must not have conflicting versions"),
1075 },
1076 modules: config
1077 .modules
1078 .iter()
1079 .filter_map(|(&module_instance_id, module_config)| {
1080 client_module_init
1081 .get(module_config.kind())
1082 .map(|module_init| {
1083 (
1084 module_instance_id,
1085 SupportedModuleApiVersions {
1086 core_consensus: config.global.consensus_version,
1087 module_consensus: module_config.version,
1088 api: module_init.supported_api_versions(),
1089 },
1090 )
1091 })
1092 })
1093 .collect(),
1094 }
1095 }
1096
1097 pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1098 Self::load_and_refresh_common_api_version_static(
1099 &self.config().await,
1100 &self.module_inits,
1101 &self.api,
1102 &self.db,
1103 &self.task_group,
1104 )
1105 .await
1106 }
1107
1108 async fn load_and_refresh_common_api_version_static(
1114 config: &ClientConfig,
1115 module_init: &ClientModuleInitRegistry,
1116 api: &DynGlobalApi,
1117 db: &Database,
1118 task_group: &TaskGroup,
1119 ) -> anyhow::Result<ApiVersionSet> {
1120 if let Some(v) = db
1121 .begin_transaction_nc()
1122 .await
1123 .get_value(&CachedApiVersionSetKey)
1124 .await
1125 {
1126 debug!(
1127 target: LOG_CLIENT,
1128 "Found existing cached common api versions"
1129 );
1130 let config = config.clone();
1131 let client_module_init = module_init.clone();
1132 let api = api.clone();
1133 let db = db.clone();
1134 let task_group = task_group.clone();
1135 task_group
1138 .clone()
1139 .spawn_cancellable("refresh_common_api_version_static", async move {
1140 if let Err(error) = Self::refresh_common_api_version_static(
1141 &config,
1142 &client_module_init,
1143 &api,
1144 &db,
1145 task_group,
1146 false,
1147 )
1148 .await
1149 {
1150 warn!(
1151 target: LOG_CLIENT,
1152 err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1153 );
1154 }
1155 });
1156
1157 return Ok(v.0);
1158 }
1159
1160 info!(
1161 target: LOG_CLIENT,
1162 "Fetching initial API versions "
1163 );
1164 Self::refresh_common_api_version_static(
1165 config,
1166 module_init,
1167 api,
1168 db,
1169 task_group.clone(),
1170 true,
1171 )
1172 .await
1173 }
1174
1175 async fn refresh_common_api_version_static(
1176 config: &ClientConfig,
1177 client_module_init: &ClientModuleInitRegistry,
1178 api: &DynGlobalApi,
1179 db: &Database,
1180 task_group: TaskGroup,
1181 block_until_ok: bool,
1182 ) -> anyhow::Result<ApiVersionSet> {
1183 debug!(
1184 target: LOG_CLIENT,
1185 "Refreshing common api versions"
1186 );
1187
1188 let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1189 let num_peers = NumPeers::from(config.global.api_endpoints.len());
1190
1191 task_group.spawn_cancellable("refresh peers api versions", {
1192 Client::fetch_common_api_versions_from_all_peers(
1193 num_peers,
1194 api.clone(),
1195 db.clone(),
1196 num_responses_sender,
1197 )
1198 });
1199
1200 let common_api_versions = loop {
1201 let _: Result<_, Elapsed> = runtime::timeout(
1209 Duration::from_secs(30),
1210 num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1211 )
1212 .await;
1213
1214 let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1215
1216 match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1217 &Self::supported_api_versions_summary_static(config, client_module_init),
1218 &peer_api_version_sets,
1219 ) {
1220 Ok(o) => break o,
1221 Err(err) if block_until_ok => {
1222 warn!(
1223 target: LOG_CLIENT,
1224 err = %err.fmt_compact_anyhow(),
1225 "Failed to discover API version to use. Retrying..."
1226 );
1227 continue;
1228 }
1229 Err(e) => return Err(e),
1230 }
1231 };
1232
1233 debug!(
1234 target: LOG_CLIENT,
1235 value = ?common_api_versions,
1236 "Updating the cached common api versions"
1237 );
1238 let mut dbtx = db.begin_transaction().await;
1239 let _ = dbtx
1240 .insert_entry(
1241 &CachedApiVersionSetKey,
1242 &CachedApiVersionSet(common_api_versions.clone()),
1243 )
1244 .await;
1245
1246 dbtx.commit_tx().await;
1247
1248 Ok(common_api_versions)
1249 }
1250
1251 pub async fn get_metadata(&self) -> Metadata {
1253 self.db
1254 .begin_transaction_nc()
1255 .await
1256 .get_value(&ClientMetadataKey)
1257 .await
1258 .unwrap_or_else(|| {
1259 warn!(
1260 target: LOG_CLIENT,
1261 "Missing existing metadata. This key should have been set on Client init"
1262 );
1263 Metadata::empty()
1264 })
1265 }
1266
1267 pub async fn set_metadata(&self, metadata: &Metadata) {
1269 self.db
1270 .autocommit::<_, _, anyhow::Error>(
1271 |dbtx, _| {
1272 Box::pin(async {
1273 Self::set_metadata_dbtx(dbtx, metadata).await;
1274 Ok(())
1275 })
1276 },
1277 None,
1278 )
1279 .await
1280 .expect("Failed to autocommit metadata");
1281 }
1282
1283 pub fn has_pending_recoveries(&self) -> bool {
1284 !self
1285 .client_recovery_progress_receiver
1286 .borrow()
1287 .iter()
1288 .all(|(_id, progress)| progress.is_done())
1289 }
1290
1291 pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1299 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1300 recovery_receiver
1301 .wait_for(|in_progress| {
1302 in_progress
1303 .iter()
1304 .all(|(_id, progress)| progress.is_done())
1305 })
1306 .await
1307 .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1308
1309 Ok(())
1310 }
1311
1312 pub fn subscribe_to_recovery_progress(
1317 &self,
1318 ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1319 WatchStream::new(self.client_recovery_progress_receiver.clone())
1320 .flat_map(futures::stream::iter)
1321 }
1322
1323 pub async fn wait_for_module_kind_recovery(
1324 &self,
1325 module_kind: ModuleKind,
1326 ) -> anyhow::Result<()> {
1327 let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1328 let config = self.config().await;
1329 recovery_receiver
1330 .wait_for(|in_progress| {
1331 !in_progress
1332 .iter()
1333 .filter(|(module_instance_id, _progress)| {
1334 config.modules[module_instance_id].kind == module_kind
1335 })
1336 .any(|(_id, progress)| !progress.is_done())
1337 })
1338 .await
1339 .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1340
1341 Ok(())
1342 }
1343
1344 pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1345 loop {
1346 if self.executor.get_active_states().await.is_empty() {
1347 break;
1348 }
1349 fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1350 }
1351 Ok(())
1352 }
1353
1354 pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1356 dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1357 }
1358
1359 fn spawn_module_recoveries_task(
1360 &self,
1361 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1362 module_recoveries: BTreeMap<
1363 ModuleInstanceId,
1364 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1365 >,
1366 module_recovery_progress_receivers: BTreeMap<
1367 ModuleInstanceId,
1368 watch::Receiver<RecoveryProgress>,
1369 >,
1370 ) {
1371 let db = self.db.clone();
1372 let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1373 self.task_group
1374 .spawn("module recoveries", |_task_handle| async {
1375 Self::run_module_recoveries_task(
1376 db,
1377 log_ordering_wakeup_tx,
1378 recovery_sender,
1379 module_recoveries,
1380 module_recovery_progress_receivers,
1381 )
1382 .await;
1383 });
1384 }
1385
1386 async fn run_module_recoveries_task(
1387 db: Database,
1388 log_ordering_wakeup_tx: watch::Sender<()>,
1389 recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1390 module_recoveries: BTreeMap<
1391 ModuleInstanceId,
1392 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1393 >,
1394 module_recovery_progress_receivers: BTreeMap<
1395 ModuleInstanceId,
1396 watch::Receiver<RecoveryProgress>,
1397 >,
1398 ) {
1399 debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1400 let mut completed_stream = Vec::new();
1401 let progress_stream = futures::stream::FuturesUnordered::new();
1402
1403 for (module_instance_id, f) in module_recoveries {
1404 completed_stream.push(futures::stream::once(Box::pin(async move {
1405 match f.await {
1406 Ok(()) => (module_instance_id, None),
1407 Err(err) => {
1408 warn!(
1409 target: LOG_CLIENT,
1410 err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1411 );
1412 futures::future::pending::<()>().await;
1416 unreachable!()
1417 }
1418 }
1419 })));
1420 }
1421
1422 for (module_instance_id, rx) in module_recovery_progress_receivers {
1423 progress_stream.push(
1424 tokio_stream::wrappers::WatchStream::new(rx)
1425 .fuse()
1426 .map(move |progress| (module_instance_id, Some(progress))),
1427 );
1428 }
1429
1430 let mut futures = futures::stream::select(
1431 futures::stream::select_all(progress_stream),
1432 futures::stream::select_all(completed_stream),
1433 );
1434
1435 while let Some((module_instance_id, progress)) = futures.next().await {
1436 let mut dbtx = db.begin_transaction().await;
1437
1438 let prev_progress = *recovery_sender
1439 .borrow()
1440 .get(&module_instance_id)
1441 .expect("existing progress must be present");
1442
1443 let progress = if prev_progress.is_done() {
1444 prev_progress
1446 } else if let Some(progress) = progress {
1447 progress
1448 } else {
1449 prev_progress.to_complete()
1450 };
1451
1452 if !prev_progress.is_done() && progress.is_done() {
1453 info!(
1454 target: LOG_CLIENT,
1455 module_instance_id,
1456 progress = format!("{}/{}", progress.complete, progress.total),
1457 "Recovery complete"
1458 );
1459 dbtx.log_event(
1460 log_ordering_wakeup_tx.clone(),
1461 None,
1462 ModuleRecoveryCompleted {
1463 module_id: module_instance_id,
1464 },
1465 )
1466 .await;
1467 } else {
1468 info!(
1469 target: LOG_CLIENT,
1470 module_instance_id,
1471 progress = format!("{}/{}", progress.complete, progress.total),
1472 "Recovery progress"
1473 );
1474 }
1475
1476 dbtx.insert_entry(
1477 &ClientModuleRecovery { module_instance_id },
1478 &ClientModuleRecoveryState { progress },
1479 )
1480 .await;
1481 dbtx.commit_tx().await;
1482
1483 recovery_sender.send_modify(|v| {
1484 v.insert(module_instance_id, progress);
1485 });
1486 }
1487 debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1488 }
1489
1490 async fn load_peers_last_api_versions(
1491 db: &Database,
1492 num_peers: NumPeers,
1493 ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1494 let mut peer_api_version_sets = BTreeMap::new();
1495
1496 let mut dbtx = db.begin_transaction_nc().await;
1497 for peer_id in num_peers.peer_ids() {
1498 if let Some(v) = dbtx
1499 .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1500 .await
1501 {
1502 peer_api_version_sets.insert(peer_id, v.0);
1503 }
1504 }
1505 drop(dbtx);
1506 peer_api_version_sets
1507 }
1508
1509 pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1512 self.db()
1513 .begin_transaction_nc()
1514 .await
1515 .find_by_prefix(&ApiAnnouncementPrefix)
1516 .await
1517 .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1518 .collect()
1519 .await
1520 }
1521
1522 pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1524 get_api_urls(&self.db, &self.config().await).await
1525 }
1526
1527 pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1530 self.get_peer_urls()
1531 .await
1532 .into_iter()
1533 .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1534 .map(|peer_url| {
1535 InviteCode::new(
1536 peer_url.clone(),
1537 peer,
1538 self.federation_id(),
1539 self.api_secret.clone(),
1540 )
1541 })
1542 }
1543
1544 pub async fn get_guardian_public_keys_blocking(
1548 &self,
1549 ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1550 self.db
1551 .autocommit(
1552 |dbtx, _| {
1553 Box::pin(async move {
1554 let config = self.config().await;
1555
1556 let guardian_pub_keys = self
1557 .get_or_backfill_broadcast_public_keys(dbtx, config)
1558 .await;
1559
1560 Result::<_, ()>::Ok(guardian_pub_keys)
1561 })
1562 },
1563 None,
1564 )
1565 .await
1566 .expect("Will retry forever")
1567 }
1568
1569 async fn get_or_backfill_broadcast_public_keys(
1570 &self,
1571 dbtx: &mut DatabaseTransaction<'_>,
1572 config: ClientConfig,
1573 ) -> BTreeMap<PeerId, PublicKey> {
1574 match config.global.broadcast_public_keys {
1575 Some(guardian_pub_keys) => guardian_pub_keys,
1576 _ => {
1577 let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1578
1579 dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1580 *(self.config.write().await) = new_config;
1581 guardian_pub_keys
1582 }
1583 }
1584 }
1585
1586 async fn fetch_session_count(&self) -> FederationResult<u64> {
1587 self.api.session_count().await
1588 }
1589
1590 async fn fetch_and_update_config(
1591 &self,
1592 config: ClientConfig,
1593 ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1594 let fetched_config = retry(
1595 "Fetching guardian public keys",
1596 backoff_util::background_backoff(),
1597 || async {
1598 Ok(self
1599 .api
1600 .request_current_consensus::<ClientConfig>(
1601 CLIENT_CONFIG_ENDPOINT.to_owned(),
1602 ApiRequestErased::default(),
1603 )
1604 .await?)
1605 },
1606 )
1607 .await
1608 .expect("Will never return on error");
1609
1610 let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1611 warn!(
1612 target: LOG_CLIENT,
1613 "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1614 );
1615 pending::<()>().await;
1616 unreachable!("Pending will never return");
1617 };
1618
1619 let new_config = ClientConfig {
1620 global: GlobalClientConfig {
1621 broadcast_public_keys: Some(guardian_pub_keys.clone()),
1622 ..config.global
1623 },
1624 modules: config.modules,
1625 };
1626 (guardian_pub_keys, new_config)
1627 }
1628
1629 pub fn handle_global_rpc(
1630 &self,
1631 method: String,
1632 params: serde_json::Value,
1633 ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1634 Box::pin(try_stream! {
1635 match method.as_str() {
1636 "get_balance" => {
1637 let balance = self.get_balance().await.unwrap_or_default();
1638 yield serde_json::to_value(balance)?;
1639 }
1640 "subscribe_balance_changes" => {
1641 let mut stream = self.subscribe_balance_changes().await;
1642 while let Some(balance) = stream.next().await {
1643 yield serde_json::to_value(balance)?;
1644 }
1645 }
1646 "get_config" => {
1647 let config = self.config().await;
1648 yield serde_json::to_value(config)?;
1649 }
1650 "get_federation_id" => {
1651 let federation_id = self.federation_id();
1652 yield serde_json::to_value(federation_id)?;
1653 }
1654 "get_invite_code" => {
1655 let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1656 let invite_code = self.invite_code(req.peer).await;
1657 yield serde_json::to_value(invite_code)?;
1658 }
1659 "get_operation" => {
1660 let req: GetOperationIdRequest = serde_json::from_value(params)?;
1661 let operation = self.operation_log().get_operation(req.operation_id).await;
1662 yield serde_json::to_value(operation)?;
1663 }
1664 "list_operations" => {
1665 let req: ListOperationsParams = serde_json::from_value(params)?;
1666 let limit = if req.limit.is_none() && req.last_seen.is_none() {
1667 usize::MAX
1668 } else {
1669 req.limit.unwrap_or(usize::MAX)
1670 };
1671 let operations = self.operation_log()
1672 .paginate_operations_rev(limit, req.last_seen)
1673 .await;
1674 yield serde_json::to_value(operations)?;
1675 }
1676 "session_count" => {
1677 let count = self.fetch_session_count().await?;
1678 yield serde_json::to_value(count)?;
1679 }
1680 "has_pending_recoveries" => {
1681 let has_pending = self.has_pending_recoveries();
1682 yield serde_json::to_value(has_pending)?;
1683 }
1684 "wait_for_all_recoveries" => {
1685 self.wait_for_all_recoveries().await?;
1686 yield serde_json::Value::Null;
1687 }
1688 "subscribe_to_recovery_progress" => {
1689 let mut stream = self.subscribe_to_recovery_progress();
1690 while let Some((module_id, progress)) = stream.next().await {
1691 yield serde_json::json!({
1692 "module_id": module_id,
1693 "progress": progress
1694 });
1695 }
1696 }
1697 "backup_to_federation" => {
1698 let metadata = if params.is_null() {
1699 Metadata::from_json_serialized(serde_json::json!({}))
1700 } else {
1701 Metadata::from_json_serialized(params)
1702 };
1703 self.backup_to_federation(metadata).await?;
1704 yield serde_json::Value::Null;
1705 }
1706 _ => {
1707 Err(anyhow::format_err!("Unknown method: {}", method))?;
1708 unreachable!()
1709 },
1710 }
1711 })
1712 }
1713
1714 pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1715 where
1716 E: Event + Send,
1717 {
1718 let mut dbtx = self.db.begin_transaction().await;
1719 self.log_event_dbtx(&mut dbtx, module_id, event).await;
1720 dbtx.commit_tx().await;
1721 }
1722
1723 pub async fn log_event_dbtx<E, Cap>(
1724 &self,
1725 dbtx: &mut DatabaseTransaction<'_, Cap>,
1726 module_id: Option<ModuleInstanceId>,
1727 event: E,
1728 ) where
1729 E: Event + Send,
1730 Cap: Send,
1731 {
1732 dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1733 .await;
1734 }
1735
1736 pub async fn log_event_raw_dbtx<Cap>(
1737 &self,
1738 dbtx: &mut DatabaseTransaction<'_, Cap>,
1739 kind: EventKind,
1740 module: Option<(ModuleKind, ModuleInstanceId)>,
1741 payload: Vec<u8>,
1742 persist: EventPersistence,
1743 ) where
1744 Cap: Send,
1745 {
1746 let module_id = module.as_ref().map(|m| m.1);
1747 let module_kind = module.map(|m| m.0);
1748 dbtx.log_event_raw(
1749 self.log_ordering_wakeup_tx.clone(),
1750 kind,
1751 module_kind,
1752 module_id,
1753 payload,
1754 persist,
1755 )
1756 .await;
1757 }
1758
1759 pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1760 where
1761 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1762 K: DatabaseRecord<Value = EventLogId>,
1763 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1764 R: Future<Output = anyhow::Result<()>>,
1765 {
1766 fedimint_eventlog::handle_events(
1767 self.db.clone(),
1768 pos_key,
1769 self.log_event_added_rx.clone(),
1770 call_fn,
1771 )
1772 .await
1773 }
1774
1775 pub async fn handle_trimable_events<F, R, K>(
1776 &self,
1777 pos_key: &K,
1778 call_fn: F,
1779 ) -> anyhow::Result<()>
1780 where
1781 K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1782 K: DatabaseRecord<Value = EventLogTrimableId>,
1783 F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1784 R: Future<Output = anyhow::Result<()>>,
1785 {
1786 fedimint_eventlog::handle_trimable_events(
1787 self.db.clone(),
1788 pos_key,
1789 self.log_event_added_rx.clone(),
1790 call_fn,
1791 )
1792 .await
1793 }
1794
1795 pub async fn get_event_log(
1796 &self,
1797 pos: Option<EventLogId>,
1798 limit: u64,
1799 ) -> Vec<PersistedLogEntry> {
1800 self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1801 .await
1802 }
1803
1804 pub async fn get_event_log_trimable(
1805 &self,
1806 pos: Option<EventLogTrimableId>,
1807 limit: u64,
1808 ) -> Vec<PersistedLogEntry> {
1809 self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1810 .await
1811 }
1812
1813 pub async fn get_event_log_dbtx<Cap>(
1814 &self,
1815 dbtx: &mut DatabaseTransaction<'_, Cap>,
1816 pos: Option<EventLogId>,
1817 limit: u64,
1818 ) -> Vec<PersistedLogEntry>
1819 where
1820 Cap: Send,
1821 {
1822 dbtx.get_event_log(pos, limit).await
1823 }
1824
1825 pub async fn get_event_log_trimable_dbtx<Cap>(
1826 &self,
1827 dbtx: &mut DatabaseTransaction<'_, Cap>,
1828 pos: Option<EventLogTrimableId>,
1829 limit: u64,
1830 ) -> Vec<PersistedLogEntry>
1831 where
1832 Cap: Send,
1833 {
1834 dbtx.get_event_log_trimable(pos, limit).await
1835 }
1836
1837 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1839 self.log_event_added_transient_tx.subscribe()
1840 }
1841
1842 pub fn iroh_enable_dht(&self) -> bool {
1843 self.iroh_enable_dht
1844 }
1845
1846 pub(crate) async fn run_core_migrations(
1847 db_no_decoders: &Database,
1848 ) -> Result<(), anyhow::Error> {
1849 let mut dbtx = db_no_decoders.begin_transaction().await;
1850 apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1851 .await?;
1852 if is_running_in_test_env() {
1853 verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1854 }
1855 dbtx.commit_tx_result().await?;
1856 Ok(())
1857 }
1858}
1859
1860#[apply(async_trait_maybe_send!)]
1861impl ClientContextIface for Client {
1862 fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1863 Client::get_module(self, instance)
1864 }
1865
1866 fn api_clone(&self) -> DynGlobalApi {
1867 Client::api_clone(self)
1868 }
1869 fn decoders(&self) -> &ModuleDecoderRegistry {
1870 Client::decoders(self)
1871 }
1872
1873 async fn finalize_and_submit_transaction(
1874 &self,
1875 operation_id: OperationId,
1876 operation_type: &str,
1877 operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1878 tx_builder: TransactionBuilder,
1879 ) -> anyhow::Result<OutPointRange> {
1880 Client::finalize_and_submit_transaction(
1881 self,
1882 operation_id,
1883 operation_type,
1884 &operation_meta_gen,
1886 tx_builder,
1887 )
1888 .await
1889 }
1890
1891 async fn finalize_and_submit_transaction_inner(
1892 &self,
1893 dbtx: &mut DatabaseTransaction<'_>,
1894 operation_id: OperationId,
1895 tx_builder: TransactionBuilder,
1896 ) -> anyhow::Result<OutPointRange> {
1897 Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1898 }
1899
1900 async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1901 Client::transaction_updates(self, operation_id).await
1902 }
1903
1904 async fn await_primary_module_outputs(
1905 &self,
1906 operation_id: OperationId,
1907 outputs: Vec<OutPoint>,
1909 ) -> anyhow::Result<()> {
1910 Client::await_primary_module_outputs(self, operation_id, outputs).await
1911 }
1912
1913 fn operation_log(&self) -> &dyn IOperationLog {
1914 Client::operation_log(self)
1915 }
1916
1917 async fn has_active_states(&self, operation_id: OperationId) -> bool {
1918 Client::has_active_states(self, operation_id).await
1919 }
1920
1921 async fn operation_exists(&self, operation_id: OperationId) -> bool {
1922 Client::operation_exists(self, operation_id).await
1923 }
1924
1925 async fn config(&self) -> ClientConfig {
1926 Client::config(self).await
1927 }
1928
1929 fn db(&self) -> &Database {
1930 Client::db(self)
1931 }
1932
1933 fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1934 Client::executor(self)
1935 }
1936
1937 async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1938 Client::invite_code(self, peer).await
1939 }
1940
1941 fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1942 Client::get_internal_payment_markers(self)
1943 }
1944
1945 async fn log_event_json(
1946 &self,
1947 dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1948 module_kind: Option<ModuleKind>,
1949 module_id: ModuleInstanceId,
1950 kind: EventKind,
1951 payload: serde_json::Value,
1952 persist: EventPersistence,
1953 ) {
1954 dbtx.ensure_global()
1955 .expect("Must be called with global dbtx");
1956 self.log_event_raw_dbtx(
1957 dbtx,
1958 kind,
1959 module_kind.map(|kind| (kind, module_id)),
1960 serde_json::to_vec(&payload).expect("Serialization can't fail"),
1961 persist,
1962 )
1963 .await;
1964 }
1965
1966 async fn read_operation_active_states<'dbtx>(
1967 &self,
1968 operation_id: OperationId,
1969 module_id: ModuleInstanceId,
1970 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1971 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1972 {
1973 Box::pin(
1974 dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1975 operation_id,
1976 module_instance: module_id,
1977 })
1978 .await
1979 .map(move |(k, v)| (k.0, v)),
1980 )
1981 }
1982 async fn read_operation_inactive_states<'dbtx>(
1983 &self,
1984 operation_id: OperationId,
1985 module_id: ModuleInstanceId,
1986 dbtx: &'dbtx mut DatabaseTransaction<'_>,
1987 ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1988 {
1989 Box::pin(
1990 dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1991 operation_id,
1992 module_instance: module_id,
1993 })
1994 .await
1995 .map(move |(k, v)| (k.0, v)),
1996 )
1997 }
1998}
1999
2000impl fmt::Debug for Client {
2002 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2003 write!(f, "Client")
2004 }
2005}
2006
2007pub fn client_decoders<'a>(
2008 registry: &ModuleInitRegistry<DynClientModuleInit>,
2009 module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2010) -> ModuleDecoderRegistry {
2011 let mut modules = BTreeMap::new();
2012 for (id, kind) in module_kinds {
2013 let Some(init) = registry.get(kind) else {
2014 debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2015 continue;
2016 };
2017
2018 modules.insert(
2019 id,
2020 (
2021 kind.clone(),
2022 IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2023 ),
2024 );
2025 }
2026 ModuleDecoderRegistry::from(modules)
2027}