fedimint_client/
client.rs

1use std::collections::{BTreeMap, HashSet};
2use std::fmt::{self, Formatter};
3use std::future::{Future, pending};
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::{Duration, SystemTime, UNIX_EPOCH};
8
9use anyhow::{Context as _, anyhow, bail, format_err};
10use async_stream::try_stream;
11use bitcoin::key::Secp256k1;
12use bitcoin::key::rand::thread_rng;
13use bitcoin::secp256k1::{self, PublicKey};
14use fedimint_api_client::api::global_api::with_request_hook::ApiRequestHook;
15use fedimint_api_client::api::net::Connector;
16use fedimint_api_client::api::{
17    ApiVersionSet, DynGlobalApi, FederationApiExt as _, FederationResult, IGlobalFederationApi,
18};
19use fedimint_client_module::module::recovery::RecoveryProgress;
20use fedimint_client_module::module::{
21    ClientContextIface, ClientModule, ClientModuleRegistry, DynClientModule, FinalClientIface,
22    IClientModule, IdxRange, OutPointRange,
23};
24use fedimint_client_module::oplog::IOperationLog;
25use fedimint_client_module::secret::{PlainRootSecretStrategy, RootSecretStrategy as _};
26use fedimint_client_module::sm::executor::{ActiveStateKey, IExecutor, InactiveStateKey};
27use fedimint_client_module::sm::{ActiveStateMeta, DynState, InactiveStateMeta};
28use fedimint_client_module::transaction::{
29    TRANSACTION_SUBMISSION_MODULE_INSTANCE, TransactionBuilder, TxSubmissionStates,
30    TxSubmissionStatesSM,
31};
32use fedimint_client_module::{
33    AddStateMachinesResult, ClientModuleInstance, GetInviteCodeRequest, ModuleGlobalContextGen,
34    ModuleRecoveryCompleted, TransactionUpdates, TxCreatedEvent,
35};
36use fedimint_core::config::{
37    ClientConfig, FederationId, GlobalClientConfig, JsonClientConfig, ModuleInitRegistry,
38};
39use fedimint_core::core::{DynInput, DynOutput, ModuleInstanceId, ModuleKind, OperationId};
40use fedimint_core::db::{
41    AutocommitError, Database, DatabaseKey, DatabaseRecord, DatabaseTransaction,
42    IDatabaseTransactionOpsCore as _, IDatabaseTransactionOpsCoreTyped as _, NonCommittable,
43};
44use fedimint_core::encoding::{Decodable, Encodable};
45use fedimint_core::endpoint_constants::{CLIENT_CONFIG_ENDPOINT, VERSION_ENDPOINT};
46use fedimint_core::envs::is_running_in_test_env;
47use fedimint_core::invite_code::InviteCode;
48use fedimint_core::module::registry::{ModuleDecoderRegistry, ModuleRegistry};
49use fedimint_core::module::{
50    ApiRequestErased, ApiVersion, MultiApiVersion, SupportedApiVersionsSummary,
51    SupportedCoreApiVersions, SupportedModuleApiVersions,
52};
53use fedimint_core::net::api_announcement::SignedApiAnnouncement;
54use fedimint_core::task::{Elapsed, MaybeSend, MaybeSync, TaskGroup};
55use fedimint_core::transaction::Transaction;
56use fedimint_core::util::backoff_util::custom_backoff;
57use fedimint_core::util::{
58    BoxStream, FmtCompact as _, FmtCompactAnyhow as _, SafeUrl, backoff_util, retry,
59};
60use fedimint_core::{
61    Amount, NumPeers, OutPoint, PeerId, apply, async_trait_maybe_send, maybe_add_send,
62    maybe_add_send_sync, runtime,
63};
64use fedimint_derive_secret::DerivableSecret;
65use fedimint_eventlog::{
66    DBTransactionEventLogExt as _, Event, EventKind, EventLogEntry, EventLogId, EventLogTrimableId,
67    EventPersistence, PersistedLogEntry,
68};
69use fedimint_logging::{LOG_CLIENT, LOG_CLIENT_NET_API, LOG_CLIENT_RECOVERY};
70use futures::stream::FuturesUnordered;
71use futures::{Stream, StreamExt as _};
72use global_ctx::ModuleGlobalClientContext;
73use serde::{Deserialize, Serialize};
74use tokio::sync::{broadcast, watch};
75use tokio_stream::wrappers::WatchStream;
76use tracing::{debug, info, warn};
77
78use crate::ClientBuilder;
79use crate::api_announcements::{ApiAnnouncementPrefix, get_api_urls};
80use crate::backup::Metadata;
81use crate::db::{
82    ApiSecretKey, CachedApiVersionSet, CachedApiVersionSetKey, ChronologicalOperationLogKey,
83    ClientConfigKey, ClientMetadataKey, ClientModuleRecovery, ClientModuleRecoveryState,
84    EncodedClientSecretKey, OperationLogKey, PeerLastApiVersionsSummary,
85    PeerLastApiVersionsSummaryKey, PendingClientConfigKey, apply_migrations_core_client_dbtx,
86    get_decoded_client_secret, verify_client_db_integrity_dbtx,
87};
88use crate::meta::MetaService;
89use crate::module_init::{ClientModuleInitRegistry, DynClientModuleInit, IClientModuleInit};
90use crate::oplog::OperationLog;
91use crate::sm::executor::{
92    ActiveModuleOperationStateKeyPrefix, ActiveOperationStateKeyPrefix, Executor,
93    InactiveModuleOperationStateKeyPrefix, InactiveOperationStateKeyPrefix,
94};
95
96pub(crate) mod builder;
97pub(crate) mod global_ctx;
98pub(crate) mod handle;
99
100/// List of core api versions supported by the implementation.
101/// Notably `major` version is the one being supported, and corresponding
102/// `minor` version is the one required (for given `major` version).
103const SUPPORTED_CORE_API_VERSIONS: &[fedimint_core::module::ApiVersion] =
104    &[ApiVersion { major: 0, minor: 0 }];
105
106/// Main client type
107///
108/// A handle and API to interacting with a single federation. End user
109/// applications that want to support interacting with multiple federations at
110/// the same time, will need to instantiate and manage multiple instances of
111/// this struct.
112///
113/// Under the hood it is starting and managing service tasks, state machines,
114/// database and other resources required.
115///
116/// This type is shared externally and internally, and
117/// [`crate::ClientHandle`] is responsible for external lifecycle management
118/// and resource freeing of the [`Client`].
119pub struct Client {
120    final_client: FinalClientIface,
121    config: tokio::sync::RwLock<ClientConfig>,
122    api_secret: Option<String>,
123    decoders: ModuleDecoderRegistry,
124    db: Database,
125    federation_id: FederationId,
126    federation_config_meta: BTreeMap<String, String>,
127    primary_module_instance: ModuleInstanceId,
128    pub(crate) modules: ClientModuleRegistry,
129    module_inits: ClientModuleInitRegistry,
130    executor: Executor,
131    pub(crate) api: DynGlobalApi,
132    root_secret: DerivableSecret,
133    operation_log: OperationLog,
134    secp_ctx: Secp256k1<secp256k1::All>,
135    meta_service: Arc<MetaService>,
136    connector: Connector,
137
138    task_group: TaskGroup,
139
140    /// Updates about client recovery progress
141    client_recovery_progress_receiver:
142        watch::Receiver<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
143
144    /// Internal client sender to wake up log ordering task every time a
145    /// (unuordered) log event is added.
146    log_ordering_wakeup_tx: watch::Sender<()>,
147    /// Receiver for events fired every time (ordered) log event is added.
148    log_event_added_rx: watch::Receiver<()>,
149    log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
150    request_hook: ApiRequestHook,
151    iroh_enable_dht: bool,
152    iroh_enable_next: bool,
153}
154
155#[derive(Debug, Serialize, Deserialize)]
156struct ListOperationsParams {
157    limit: Option<usize>,
158    last_seen: Option<ChronologicalOperationLogKey>,
159}
160
161#[derive(Debug, Clone, Serialize, Deserialize)]
162pub struct GetOperationIdRequest {
163    operation_id: OperationId,
164}
165
166impl Client {
167    /// Initialize a client builder that can be configured to create a new
168    /// client.
169    pub async fn builder() -> anyhow::Result<ClientBuilder> {
170        Ok(ClientBuilder::new())
171    }
172
173    pub fn api(&self) -> &(dyn IGlobalFederationApi + 'static) {
174        self.api.as_ref()
175    }
176
177    pub fn api_clone(&self) -> DynGlobalApi {
178        self.api.clone()
179    }
180
181    /// Get the [`TaskGroup`] that is tied to Client's lifetime.
182    pub fn task_group(&self) -> &TaskGroup {
183        &self.task_group
184    }
185
186    /// Useful for our CLI tooling, not meant for external use
187    #[doc(hidden)]
188    pub fn executor(&self) -> &Executor {
189        &self.executor
190    }
191
192    pub async fn get_config_from_db(db: &Database) -> Option<ClientConfig> {
193        let mut dbtx = db.begin_transaction_nc().await;
194        dbtx.get_value(&ClientConfigKey).await
195    }
196
197    pub async fn get_pending_config_from_db(db: &Database) -> Option<ClientConfig> {
198        let mut dbtx = db.begin_transaction_nc().await;
199        dbtx.get_value(&PendingClientConfigKey).await
200    }
201
202    pub async fn get_api_secret_from_db(db: &Database) -> Option<String> {
203        let mut dbtx = db.begin_transaction_nc().await;
204        dbtx.get_value(&ApiSecretKey).await
205    }
206
207    pub async fn store_encodable_client_secret<T: Encodable>(
208        db: &Database,
209        secret: T,
210    ) -> anyhow::Result<()> {
211        let mut dbtx = db.begin_transaction().await;
212
213        // Don't overwrite an existing secret
214        if dbtx.get_value(&EncodedClientSecretKey).await.is_some() {
215            bail!("Encoded client secret already exists, cannot overwrite")
216        }
217
218        let encoded_secret = T::consensus_encode_to_vec(&secret);
219        dbtx.insert_entry(&EncodedClientSecretKey, &encoded_secret)
220            .await;
221        dbtx.commit_tx().await;
222        Ok(())
223    }
224
225    pub async fn load_decodable_client_secret<T: Decodable>(db: &Database) -> anyhow::Result<T> {
226        let Some(secret) = Self::load_decodable_client_secret_opt(db).await? else {
227            bail!("Encoded client secret not present in DB")
228        };
229
230        Ok(secret)
231    }
232    pub async fn load_decodable_client_secret_opt<T: Decodable>(
233        db: &Database,
234    ) -> anyhow::Result<Option<T>> {
235        let mut dbtx = db.begin_transaction_nc().await;
236
237        let client_secret = dbtx.get_value(&EncodedClientSecretKey).await;
238
239        Ok(match client_secret {
240            Some(client_secret) => Some(
241                T::consensus_decode_whole(&client_secret, &ModuleRegistry::default())
242                    .map_err(|e| anyhow!("Decoding failed: {e}"))?,
243            ),
244            None => None,
245        })
246    }
247
248    pub async fn load_or_generate_client_secret(db: &Database) -> anyhow::Result<[u8; 64]> {
249        let client_secret = match Self::load_decodable_client_secret::<[u8; 64]>(db).await {
250            Ok(secret) => secret,
251            _ => {
252                let secret = PlainRootSecretStrategy::random(&mut thread_rng());
253                Self::store_encodable_client_secret(db, secret)
254                    .await
255                    .expect("Storing client secret must work");
256                secret
257            }
258        };
259        Ok(client_secret)
260    }
261
262    pub async fn is_initialized(db: &Database) -> bool {
263        let mut dbtx = db.begin_transaction_nc().await;
264        dbtx.raw_get_bytes(&[ClientConfigKey::DB_PREFIX])
265            .await
266            .expect("Unrecoverable error occurred while reading and entry from the database")
267            .is_some()
268    }
269
270    pub fn start_executor(self: &Arc<Self>) {
271        debug!(
272            target: LOG_CLIENT,
273            "Starting fedimint client executor",
274        );
275        self.executor.start_executor(self.context_gen());
276    }
277
278    pub fn federation_id(&self) -> FederationId {
279        self.federation_id
280    }
281
282    fn context_gen(self: &Arc<Self>) -> ModuleGlobalContextGen {
283        let client_inner = Arc::downgrade(self);
284        Arc::new(move |module_instance, operation| {
285            ModuleGlobalClientContext {
286                client: client_inner
287                    .clone()
288                    .upgrade()
289                    .expect("ModuleGlobalContextGen called after client was dropped"),
290                module_instance_id: module_instance,
291                operation,
292            }
293            .into()
294        })
295    }
296
297    pub async fn config(&self) -> ClientConfig {
298        self.config.read().await.clone()
299    }
300
301    pub fn api_secret(&self) -> &Option<String> {
302        &self.api_secret
303    }
304
305    pub fn decoders(&self) -> &ModuleDecoderRegistry {
306        &self.decoders
307    }
308
309    /// Returns a reference to the module, panics if not found
310    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
311        self.try_get_module(instance)
312            .expect("Module instance not found")
313    }
314
315    fn try_get_module(
316        &self,
317        instance: ModuleInstanceId,
318    ) -> Option<&maybe_add_send_sync!(dyn IClientModule)> {
319        Some(self.modules.get(instance)?.as_ref())
320    }
321
322    pub fn has_module(&self, instance: ModuleInstanceId) -> bool {
323        self.modules.get(instance).is_some()
324    }
325
326    /// Returns the input amount and output amount of a transaction
327    ///
328    /// # Panics
329    /// If any of the input or output versions in the transaction builder are
330    /// unknown by the respective module.
331    fn transaction_builder_balance(&self, builder: &TransactionBuilder) -> (Amount, Amount) {
332        // FIXME: prevent overflows, currently not suitable for untrusted input
333        let mut in_amount = Amount::ZERO;
334        let mut out_amount = Amount::ZERO;
335        let mut fee_amount = Amount::ZERO;
336
337        for input in builder.inputs() {
338            let module = self.get_module(input.input.module_instance_id());
339
340            let item_fee = module.input_fee(input.amount, &input.input).expect(
341                "We only build transactions with input versions that are supported by the module",
342            );
343
344            in_amount += input.amount;
345            fee_amount += item_fee;
346        }
347
348        for output in builder.outputs() {
349            let module = self.get_module(output.output.module_instance_id());
350
351            let item_fee = module.output_fee(output.amount, &output.output).expect(
352                "We only build transactions with output versions that are supported by the module",
353            );
354
355            out_amount += output.amount;
356            fee_amount += item_fee;
357        }
358
359        (in_amount, out_amount + fee_amount)
360    }
361
362    pub fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
363        Ok((self.federation_id().to_fake_ln_pub_key(&self.secp_ctx)?, 0))
364    }
365
366    /// Get metadata value from the federation config itself
367    pub fn get_config_meta(&self, key: &str) -> Option<String> {
368        self.federation_config_meta.get(key).cloned()
369    }
370
371    pub(crate) fn root_secret(&self) -> DerivableSecret {
372        self.root_secret.clone()
373    }
374
375    pub async fn add_state_machines(
376        &self,
377        dbtx: &mut DatabaseTransaction<'_>,
378        states: Vec<DynState>,
379    ) -> AddStateMachinesResult {
380        self.executor.add_state_machines_dbtx(dbtx, states).await
381    }
382
383    // TODO: implement as part of [`OperationLog`]
384    pub async fn get_active_operations(&self) -> HashSet<OperationId> {
385        let active_states = self.executor.get_active_states().await;
386        let mut active_operations = HashSet::with_capacity(active_states.len());
387        let mut dbtx = self.db().begin_transaction_nc().await;
388        for (state, _) in active_states {
389            let operation_id = state.operation_id();
390            if dbtx
391                .get_value(&OperationLogKey { operation_id })
392                .await
393                .is_some()
394            {
395                active_operations.insert(operation_id);
396            }
397        }
398        active_operations
399    }
400
401    pub fn operation_log(&self) -> &OperationLog {
402        &self.operation_log
403    }
404
405    /// Get the meta manager to read meta fields.
406    pub fn meta_service(&self) -> &Arc<MetaService> {
407        &self.meta_service
408    }
409
410    /// Get the meta manager to read meta fields.
411    pub async fn get_meta_expiration_timestamp(&self) -> Option<SystemTime> {
412        let meta_service = self.meta_service();
413        let ts = meta_service
414            .get_field::<u64>(self.db(), "federation_expiry_timestamp")
415            .await
416            .and_then(|v| v.value)?;
417        Some(UNIX_EPOCH + Duration::from_secs(ts))
418    }
419
420    /// Adds funding to a transaction or removes over-funding via change.
421    async fn finalize_transaction(
422        &self,
423        dbtx: &mut DatabaseTransaction<'_>,
424        operation_id: OperationId,
425        mut partial_transaction: TransactionBuilder,
426    ) -> anyhow::Result<(Transaction, Vec<DynState>, Range<u64>)> {
427        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
428        let primary_module = self
429            .primary_module()
430            .ok_or(anyhow!("No primary module available"))?;
431
432        let (added_input_bundle, change_outputs) = primary_module
433            .create_final_inputs_and_outputs(
434                self.primary_module_instance,
435                dbtx,
436                operation_id,
437                input_amount,
438                output_amount,
439            )
440            .await?;
441
442        // This is the range of  outputs that will be added to the transaction
443        // in order to balance it. Notice that it may stay empty in case the transaction
444        // is already balanced.
445        let change_range = Range {
446            start: partial_transaction.outputs().count() as u64,
447            end: (partial_transaction.outputs().count() + change_outputs.outputs().len()) as u64,
448        };
449
450        partial_transaction = partial_transaction
451            .with_inputs(added_input_bundle)
452            .with_outputs(change_outputs);
453
454        let (input_amount, output_amount) = self.transaction_builder_balance(&partial_transaction);
455
456        assert!(input_amount >= output_amount, "Transaction is underfunded");
457
458        let (tx, states) = partial_transaction.build(&self.secp_ctx, thread_rng());
459
460        Ok((tx, states, change_range))
461    }
462
463    /// Add funding and/or change to the transaction builder as needed, finalize
464    /// the transaction and submit it to the federation.
465    ///
466    /// ## Errors
467    /// The function will return an error if the operation with given ID already
468    /// exists.
469    ///
470    /// ## Panics
471    /// The function will panic if the database transaction collides with
472    /// other and fails with others too often, this should not happen except for
473    /// excessively concurrent scenarios.
474    pub async fn finalize_and_submit_transaction<F, M>(
475        &self,
476        operation_id: OperationId,
477        operation_type: &str,
478        operation_meta_gen: F,
479        tx_builder: TransactionBuilder,
480    ) -> anyhow::Result<OutPointRange>
481    where
482        F: Fn(OutPointRange) -> M + Clone + MaybeSend + MaybeSync,
483        M: serde::Serialize + MaybeSend,
484    {
485        let operation_type = operation_type.to_owned();
486
487        let autocommit_res = self
488            .db
489            .autocommit(
490                |dbtx, _| {
491                    let operation_type = operation_type.clone();
492                    let tx_builder = tx_builder.clone();
493                    let operation_meta_gen = operation_meta_gen.clone();
494                    Box::pin(async move {
495                        if Client::operation_exists_dbtx(dbtx, operation_id).await {
496                            bail!("There already exists an operation with id {operation_id:?}")
497                        }
498
499                        let out_point_range = self
500                            .finalize_and_submit_transaction_inner(dbtx, operation_id, tx_builder)
501                            .await?;
502
503                        self.operation_log()
504                            .add_operation_log_entry_dbtx(
505                                dbtx,
506                                operation_id,
507                                &operation_type,
508                                operation_meta_gen(out_point_range),
509                            )
510                            .await;
511
512                        Ok(out_point_range)
513                    })
514                },
515                Some(100), // TODO: handle what happens after 100 retries
516            )
517            .await;
518
519        match autocommit_res {
520            Ok(txid) => Ok(txid),
521            Err(AutocommitError::ClosureError { error, .. }) => Err(error),
522            Err(AutocommitError::CommitFailed {
523                attempts,
524                last_error,
525            }) => panic!(
526                "Failed to commit tx submission dbtx after {attempts} attempts: {last_error}"
527            ),
528        }
529    }
530
531    async fn finalize_and_submit_transaction_inner(
532        &self,
533        dbtx: &mut DatabaseTransaction<'_>,
534        operation_id: OperationId,
535        tx_builder: TransactionBuilder,
536    ) -> anyhow::Result<OutPointRange> {
537        let (transaction, mut states, change_range) = self
538            .finalize_transaction(&mut dbtx.to_ref_nc(), operation_id, tx_builder)
539            .await?;
540
541        if transaction.consensus_encode_to_vec().len() > Transaction::MAX_TX_SIZE {
542            let inputs = transaction
543                .inputs
544                .iter()
545                .map(DynInput::module_instance_id)
546                .collect::<Vec<_>>();
547            let outputs = transaction
548                .outputs
549                .iter()
550                .map(DynOutput::module_instance_id)
551                .collect::<Vec<_>>();
552            warn!(
553                target: LOG_CLIENT_NET_API,
554                size=%transaction.consensus_encode_to_vec().len(),
555                ?inputs,
556                ?outputs,
557                "Transaction too large",
558            );
559            debug!(target: LOG_CLIENT_NET_API, ?transaction, "transaction details");
560            bail!(
561                "The generated transaction would be rejected by the federation for being too large."
562            );
563        }
564
565        let txid = transaction.tx_hash();
566
567        debug!(target: LOG_CLIENT_NET_API, %txid, ?transaction,  "Finalized and submitting transaction");
568
569        let tx_submission_sm = DynState::from_typed(
570            TRANSACTION_SUBMISSION_MODULE_INSTANCE,
571            TxSubmissionStatesSM {
572                operation_id,
573                state: TxSubmissionStates::Created(transaction),
574            },
575        );
576        states.push(tx_submission_sm);
577
578        self.executor.add_state_machines_dbtx(dbtx, states).await?;
579
580        self.log_event_dbtx(dbtx, None, TxCreatedEvent { txid, operation_id })
581            .await;
582
583        Ok(OutPointRange::new(txid, IdxRange::from(change_range)))
584    }
585
586    async fn transaction_update_stream(
587        &self,
588        operation_id: OperationId,
589    ) -> BoxStream<'static, TxSubmissionStatesSM> {
590        self.executor
591            .notifier()
592            .module_notifier::<TxSubmissionStatesSM>(
593                TRANSACTION_SUBMISSION_MODULE_INSTANCE,
594                self.final_client.clone(),
595            )
596            .subscribe(operation_id)
597            .await
598    }
599
600    pub async fn operation_exists(&self, operation_id: OperationId) -> bool {
601        let mut dbtx = self.db().begin_transaction_nc().await;
602
603        Client::operation_exists_dbtx(&mut dbtx, operation_id).await
604    }
605
606    pub async fn operation_exists_dbtx(
607        dbtx: &mut DatabaseTransaction<'_>,
608        operation_id: OperationId,
609    ) -> bool {
610        let active_state_exists = dbtx
611            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
612            .await
613            .next()
614            .await
615            .is_some();
616
617        let inactive_state_exists = dbtx
618            .find_by_prefix(&InactiveOperationStateKeyPrefix { operation_id })
619            .await
620            .next()
621            .await
622            .is_some();
623
624        active_state_exists || inactive_state_exists
625    }
626
627    pub async fn has_active_states(&self, operation_id: OperationId) -> bool {
628        self.db
629            .begin_transaction_nc()
630            .await
631            .find_by_prefix(&ActiveOperationStateKeyPrefix { operation_id })
632            .await
633            .next()
634            .await
635            .is_some()
636    }
637
638    /// Waits for an output from the primary module to reach its final
639    /// state.
640    pub async fn await_primary_module_output(
641        &self,
642        operation_id: OperationId,
643        out_point: OutPoint,
644    ) -> anyhow::Result<()> {
645        self.primary_module()
646            .ok_or(anyhow!("Primary module not available"))?
647            .await_primary_module_output(operation_id, out_point)
648            .await
649    }
650
651    /// Returns a reference to a typed module client instance by kind
652    pub fn get_first_module<M: ClientModule>(
653        &'_ self,
654    ) -> anyhow::Result<ClientModuleInstance<'_, M>> {
655        let module_kind = M::kind();
656        let id = self
657            .get_first_instance(&module_kind)
658            .ok_or_else(|| format_err!("No modules found of kind {module_kind}"))?;
659        let module: &M = self
660            .try_get_module(id)
661            .ok_or_else(|| format_err!("Unknown module instance {id}"))?
662            .as_any()
663            .downcast_ref::<M>()
664            .ok_or_else(|| format_err!("Module is not of type {}", std::any::type_name::<M>()))?;
665        let (db, _) = self.db().with_prefix_module_id(id);
666        Ok(ClientModuleInstance {
667            id,
668            db,
669            api: self.api().with_module(id),
670            module,
671        })
672    }
673
674    pub fn get_module_client_dyn(
675        &self,
676        instance_id: ModuleInstanceId,
677    ) -> anyhow::Result<&maybe_add_send_sync!(dyn IClientModule)> {
678        self.try_get_module(instance_id)
679            .ok_or(anyhow!("Unknown module instance {}", instance_id))
680    }
681
682    pub fn db(&self) -> &Database {
683        &self.db
684    }
685
686    /// Returns a stream of transaction updates for the given operation id that
687    /// can later be used to watch for a specific transaction being accepted.
688    pub async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
689        TransactionUpdates {
690            update_stream: self.transaction_update_stream(operation_id).await,
691        }
692    }
693
694    /// Returns the instance id of the first module of the given kind. The
695    /// primary module will always be returned before any other modules (which
696    /// themselves are ordered by their instance ID).
697    pub fn get_first_instance(&self, module_kind: &ModuleKind) -> Option<ModuleInstanceId> {
698        if self
699            .modules
700            .get_with_kind(self.primary_module_instance)
701            .is_some_and(|(kind, _)| kind == module_kind)
702        {
703            return Some(self.primary_module_instance);
704        }
705
706        self.modules
707            .iter_modules()
708            .find(|(_, kind, _module)| *kind == module_kind)
709            .map(|(instance_id, _, _)| instance_id)
710    }
711
712    /// Returns the data from which the client's root secret is derived (e.g.
713    /// BIP39 seed phrase struct).
714    pub async fn root_secret_encoding<T: Decodable>(&self) -> anyhow::Result<T> {
715        get_decoded_client_secret::<T>(self.db()).await
716    }
717
718    /// Waits for outputs from the primary module to reach its final
719    /// state.
720    pub async fn await_primary_module_outputs(
721        &self,
722        operation_id: OperationId,
723        outputs: Vec<OutPoint>,
724    ) -> anyhow::Result<()> {
725        for out_point in outputs {
726            self.await_primary_module_output(operation_id, out_point)
727                .await?;
728        }
729
730        Ok(())
731    }
732
733    /// Returns the config of the client in JSON format.
734    ///
735    /// Compared to the consensus module format where module configs are binary
736    /// encoded this format cannot be cryptographically verified but is easier
737    /// to consume and to some degree human-readable.
738    pub async fn get_config_json(&self) -> JsonClientConfig {
739        self.config().await.to_json()
740    }
741
742    /// Get the primary module
743    pub fn primary_module(&self) -> Option<&DynClientModule> {
744        self.modules.get(self.primary_module_instance)
745    }
746
747    /// Balance available to the client for spending
748    ///
749    /// Returns `None` if the primary module is not available
750    pub async fn get_balance(&self) -> Option<Amount> {
751        Some(
752            self.primary_module()?
753                .get_balance(
754                    self.primary_module_instance,
755                    &mut self.db().begin_transaction_nc().await,
756                )
757                .await,
758        )
759    }
760
761    // Ideally this would not be in the API, but there's a lot of places where this
762    // makes it easier.
763    #[doc(hidden)]
764    /// Like [`Self::get_balance`] but returns an error if primary module is not
765    /// available
766    pub async fn get_balance_err(&self) -> anyhow::Result<Amount> {
767        self.get_balance()
768            .await
769            .ok_or_else(|| anyhow!("Primary module not available"))
770    }
771
772    /// Returns a stream that yields the current client balance every time it
773    /// changes.
774    pub async fn subscribe_balance_changes(&self) -> BoxStream<'static, Amount> {
775        let primary_module_things = if let Some(primary_module) = self.primary_module() {
776            let balance_changes = primary_module.subscribe_balance_changes().await;
777            let initial_balance = self.get_balance().await.expect("Primary is present");
778
779            Some((primary_module.clone(), balance_changes, initial_balance))
780        } else {
781            None
782        };
783        let db = self.db().clone();
784        let primary_module_instance = self.primary_module_instance;
785
786        Box::pin(async_stream::stream! {
787            let Some((primary_module, mut balance_changes, initial_balance)) = primary_module_things else {
788                // If there is no primary module, there will not be one until client is
789                // restarted
790                pending().await
791            };
792
793
794            yield initial_balance;
795            let mut prev_balance = initial_balance;
796            while let Some(()) = balance_changes.next().await {
797                let mut dbtx = db.begin_transaction_nc().await;
798                let balance = primary_module
799                    .get_balance(primary_module_instance, &mut dbtx)
800                    .await;
801
802                // Deduplicate in case modules cannot always tell if the balance actually changed
803                if balance != prev_balance {
804                    prev_balance = balance;
805                    yield balance;
806                }
807            }
808        })
809    }
810
811    /// Query the federation for API version support and then calculate
812    /// the best API version to use (supported by most guardians).
813    pub async fn fetch_common_api_versions_from_all_peers(
814        num_peers: NumPeers,
815        api: DynGlobalApi,
816        db: Database,
817        num_responses_sender: watch::Sender<usize>,
818    ) {
819        // Keep trying, initially somewhat aggressively, but after a while retry very
820        // slowly, because chances for response are getting lower and lower.
821        let mut backoff =
822            custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
823
824        // Make a single request to a peer after a delay
825        //
826        // The delay is here to unify the type of a future both for initial request and
827        // possible retries.
828        async fn make_request(
829            delay: Duration,
830            peer_id: PeerId,
831            api: &DynGlobalApi,
832        ) -> (
833            PeerId,
834            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
835        ) {
836            runtime::sleep(delay).await;
837            (
838                peer_id,
839                api.request_single_peer::<SupportedApiVersionsSummary>(
840                    VERSION_ENDPOINT.to_owned(),
841                    ApiRequestErased::default(),
842                    peer_id,
843                )
844                .await,
845            )
846        }
847
848        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
849        // and make a single async db write operation, it should be OK.
850        let mut requests = FuturesUnordered::new();
851
852        for peer_id in num_peers.peer_ids() {
853            requests.push(make_request(Duration::ZERO, peer_id, &api));
854        }
855
856        let mut num_responses = 0;
857
858        while let Some((peer_id, response)) = requests.next().await {
859            let retry = match response {
860                Err(err) => {
861                    let has_previous_response = db
862                        .begin_transaction_nc()
863                        .await
864                        .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
865                        .await
866                        .is_some();
867                    debug!(
868                        target: LOG_CLIENT,
869                        %peer_id,
870                        err = %err.fmt_compact(),
871                        %has_previous_response,
872                        "Failed to refresh API versions of a peer"
873                    );
874
875                    !has_previous_response
876                }
877                Ok(o) => {
878                    // Save the response to the database right away, just to
879                    // not lose it
880                    let mut dbtx = db.begin_transaction().await;
881                    dbtx.insert_entry(
882                        &PeerLastApiVersionsSummaryKey(peer_id),
883                        &PeerLastApiVersionsSummary(o),
884                    )
885                    .await;
886                    dbtx.commit_tx().await;
887                    false
888                }
889            };
890
891            if retry {
892                requests.push(make_request(
893                    backoff.next().expect("Keeps retrying"),
894                    peer_id,
895                    &api,
896                ));
897            } else {
898                num_responses += 1;
899                num_responses_sender.send_replace(num_responses);
900            }
901        }
902    }
903
904    /// Fetch API versions from peers, retrying until we get threshold number of
905    /// successful responses. Returns the successful responses collected
906    /// from at least `num_peers.threshold()` peers.
907    pub async fn fetch_peers_api_versions_from_threshold_of_peers(
908        num_peers: NumPeers,
909        api: DynGlobalApi,
910    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
911        // Keep trying, initially somewhat aggressively, but after a while retry very
912        // slowly, because chances for response are getting lower and lower.
913        let mut backoff =
914            custom_backoff(Duration::from_millis(200), Duration::from_secs(600), None);
915
916        // Make a single request to a peer after a delay
917        //
918        // The delay is here to unify the type of a future both for initial request and
919        // possible retries.
920        async fn make_request(
921            delay: Duration,
922            peer_id: PeerId,
923            api: &DynGlobalApi,
924        ) -> (
925            PeerId,
926            Result<SupportedApiVersionsSummary, fedimint_api_client::api::PeerError>,
927        ) {
928            runtime::sleep(delay).await;
929            (
930                peer_id,
931                api.request_single_peer::<SupportedApiVersionsSummary>(
932                    VERSION_ENDPOINT.to_owned(),
933                    ApiRequestErased::default(),
934                    peer_id,
935                )
936                .await,
937            )
938        }
939
940        // NOTE: `FuturesUnordered` is a footgun, but since we only poll it for result
941        // and collect responses, it should be OK.
942        let mut requests = FuturesUnordered::new();
943
944        for peer_id in num_peers.peer_ids() {
945            requests.push(make_request(Duration::ZERO, peer_id, &api));
946        }
947
948        let mut successful_responses = BTreeMap::new();
949
950        while successful_responses.len() < num_peers.threshold()
951            && let Some((peer_id, response)) = requests.next().await
952        {
953            let retry = match response {
954                Err(err) => {
955                    debug!(
956                        target: LOG_CLIENT,
957                        %peer_id,
958                        err = %err.fmt_compact(),
959                        "Failed to fetch API versions from peer"
960                    );
961                    true
962                }
963                Ok(response) => {
964                    successful_responses.insert(peer_id, response);
965                    false
966                }
967            };
968
969            if retry {
970                requests.push(make_request(
971                    backoff.next().expect("Keeps retrying"),
972                    peer_id,
973                    &api,
974                ));
975            }
976        }
977
978        successful_responses
979    }
980
981    /// Fetch API versions from peers and discover common API versions to use.
982    pub async fn fetch_common_api_versions(
983        config: &ClientConfig,
984        api: &DynGlobalApi,
985    ) -> anyhow::Result<BTreeMap<PeerId, SupportedApiVersionsSummary>> {
986        debug!(
987            target: LOG_CLIENT,
988            "Fetching common api versions"
989        );
990
991        let num_peers = NumPeers::from(config.global.api_endpoints.len());
992
993        let peer_api_version_sets =
994            Self::fetch_peers_api_versions_from_threshold_of_peers(num_peers, api.clone()).await;
995
996        Ok(peer_api_version_sets)
997    }
998
999    /// Write API version set to database cache.
1000    /// Used when we have a pre-calculated API version set that should be stored
1001    /// for later use.
1002    pub async fn write_api_version_cache(
1003        dbtx: &mut DatabaseTransaction<'_>,
1004        api_version_set: ApiVersionSet,
1005    ) {
1006        debug!(
1007            target: LOG_CLIENT,
1008            value = ?api_version_set,
1009            "Writing API version set to cache"
1010        );
1011
1012        dbtx.insert_entry(
1013            &CachedApiVersionSetKey,
1014            &CachedApiVersionSet(api_version_set),
1015        )
1016        .await;
1017    }
1018
1019    /// Store prefetched peer API version responses and calculate/store common
1020    /// API version set. This processes the individual peer responses by
1021    /// storing them in the database and calculating the common API version
1022    /// set for caching.
1023    pub async fn store_prefetched_api_versions(
1024        db: &Database,
1025        config: &ClientConfig,
1026        client_module_init: &ClientModuleInitRegistry,
1027        peer_api_versions: &BTreeMap<PeerId, SupportedApiVersionsSummary>,
1028    ) {
1029        debug!(
1030            target: LOG_CLIENT,
1031            "Storing {} prefetched peer API version responses and calculating common version set",
1032            peer_api_versions.len()
1033        );
1034
1035        let mut dbtx = db.begin_transaction().await;
1036        // Calculate common API version set from individual responses
1037        let client_supported_versions =
1038            Self::supported_api_versions_summary_static(config, client_module_init);
1039        match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1040            &client_supported_versions,
1041            peer_api_versions,
1042        ) {
1043            Ok(common_api_versions) => {
1044                // Write the calculated common API version set to database cache
1045                Self::write_api_version_cache(&mut dbtx.to_ref_nc(), common_api_versions).await;
1046                debug!(target: LOG_CLIENT, "Calculated and stored common API version set");
1047            }
1048            Err(err) => {
1049                debug!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to calculate common API versions from prefetched data");
1050            }
1051        }
1052
1053        // Store individual peer responses to database
1054        for (peer_id, peer_api_versions) in peer_api_versions {
1055            dbtx.insert_entry(
1056                &PeerLastApiVersionsSummaryKey(*peer_id),
1057                &PeerLastApiVersionsSummary(peer_api_versions.clone()),
1058            )
1059            .await;
1060        }
1061        dbtx.commit_tx().await;
1062        debug!(target: LOG_CLIENT, "Stored individual peer API version responses");
1063    }
1064
1065    /// [`SupportedApiVersionsSummary`] that the client and its modules support
1066    pub fn supported_api_versions_summary_static(
1067        config: &ClientConfig,
1068        client_module_init: &ClientModuleInitRegistry,
1069    ) -> SupportedApiVersionsSummary {
1070        SupportedApiVersionsSummary {
1071            core: SupportedCoreApiVersions {
1072                core_consensus: config.global.consensus_version,
1073                api: MultiApiVersion::try_from_iter(SUPPORTED_CORE_API_VERSIONS.to_owned())
1074                    .expect("must not have conflicting versions"),
1075            },
1076            modules: config
1077                .modules
1078                .iter()
1079                .filter_map(|(&module_instance_id, module_config)| {
1080                    client_module_init
1081                        .get(module_config.kind())
1082                        .map(|module_init| {
1083                            (
1084                                module_instance_id,
1085                                SupportedModuleApiVersions {
1086                                    core_consensus: config.global.consensus_version,
1087                                    module_consensus: module_config.version,
1088                                    api: module_init.supported_api_versions(),
1089                                },
1090                            )
1091                        })
1092                })
1093                .collect(),
1094        }
1095    }
1096
1097    pub async fn load_and_refresh_common_api_version(&self) -> anyhow::Result<ApiVersionSet> {
1098        Self::load_and_refresh_common_api_version_static(
1099            &self.config().await,
1100            &self.module_inits,
1101            &self.api,
1102            &self.db,
1103            &self.task_group,
1104        )
1105        .await
1106    }
1107
1108    /// Load the common api versions to use from cache and start a background
1109    /// process to refresh them.
1110    ///
1111    /// This is a compromise, so we not have to wait for version discovery to
1112    /// complete every time a [`Client`] is being built.
1113    async fn load_and_refresh_common_api_version_static(
1114        config: &ClientConfig,
1115        module_init: &ClientModuleInitRegistry,
1116        api: &DynGlobalApi,
1117        db: &Database,
1118        task_group: &TaskGroup,
1119    ) -> anyhow::Result<ApiVersionSet> {
1120        if let Some(v) = db
1121            .begin_transaction_nc()
1122            .await
1123            .get_value(&CachedApiVersionSetKey)
1124            .await
1125        {
1126            debug!(
1127                target: LOG_CLIENT,
1128                "Found existing cached common api versions"
1129            );
1130            let config = config.clone();
1131            let client_module_init = module_init.clone();
1132            let api = api.clone();
1133            let db = db.clone();
1134            let task_group = task_group.clone();
1135            // Separate task group, because we actually don't want to be waiting for this to
1136            // finish, and it's just best effort.
1137            task_group
1138                .clone()
1139                .spawn_cancellable("refresh_common_api_version_static", async move {
1140                    if let Err(error) = Self::refresh_common_api_version_static(
1141                        &config,
1142                        &client_module_init,
1143                        &api,
1144                        &db,
1145                        task_group,
1146                        false,
1147                    )
1148                    .await
1149                    {
1150                        warn!(
1151                            target: LOG_CLIENT,
1152                            err = %error.fmt_compact_anyhow(), "Failed to discover common api versions"
1153                        );
1154                    }
1155                });
1156
1157            return Ok(v.0);
1158        }
1159
1160        info!(
1161            target: LOG_CLIENT,
1162            "Fetching initial API versions "
1163        );
1164        Self::refresh_common_api_version_static(
1165            config,
1166            module_init,
1167            api,
1168            db,
1169            task_group.clone(),
1170            true,
1171        )
1172        .await
1173    }
1174
1175    async fn refresh_common_api_version_static(
1176        config: &ClientConfig,
1177        client_module_init: &ClientModuleInitRegistry,
1178        api: &DynGlobalApi,
1179        db: &Database,
1180        task_group: TaskGroup,
1181        block_until_ok: bool,
1182    ) -> anyhow::Result<ApiVersionSet> {
1183        debug!(
1184            target: LOG_CLIENT,
1185            "Refreshing common api versions"
1186        );
1187
1188        let (num_responses_sender, mut num_responses_receiver) = tokio::sync::watch::channel(0);
1189        let num_peers = NumPeers::from(config.global.api_endpoints.len());
1190
1191        task_group.spawn_cancellable("refresh peers api versions", {
1192            Client::fetch_common_api_versions_from_all_peers(
1193                num_peers,
1194                api.clone(),
1195                db.clone(),
1196                num_responses_sender,
1197            )
1198        });
1199
1200        let common_api_versions = loop {
1201            // Wait to collect enough answers before calculating a set of common api
1202            // versions to use. Note that all peers individual responses from
1203            // previous attempts are still being used, and requests, or even
1204            // retries for response of peers are not actually cancelled, as they
1205            // are happening on a separate task. This is all just to bound the
1206            // time user can be waiting for the join operation to finish, at the
1207            // risk of picking wrong version in very rare circumstances.
1208            let _: Result<_, Elapsed> = runtime::timeout(
1209                Duration::from_secs(30),
1210                num_responses_receiver.wait_for(|num| num_peers.threshold() <= *num),
1211            )
1212            .await;
1213
1214            let peer_api_version_sets = Self::load_peers_last_api_versions(db, num_peers).await;
1215
1216            match fedimint_client_module::api_version_discovery::discover_common_api_versions_set(
1217                &Self::supported_api_versions_summary_static(config, client_module_init),
1218                &peer_api_version_sets,
1219            ) {
1220                Ok(o) => break o,
1221                Err(err) if block_until_ok => {
1222                    warn!(
1223                        target: LOG_CLIENT,
1224                        err = %err.fmt_compact_anyhow(),
1225                        "Failed to discover API version to use. Retrying..."
1226                    );
1227                    continue;
1228                }
1229                Err(e) => return Err(e),
1230            }
1231        };
1232
1233        debug!(
1234            target: LOG_CLIENT,
1235            value = ?common_api_versions,
1236            "Updating the cached common api versions"
1237        );
1238        let mut dbtx = db.begin_transaction().await;
1239        let _ = dbtx
1240            .insert_entry(
1241                &CachedApiVersionSetKey,
1242                &CachedApiVersionSet(common_api_versions.clone()),
1243            )
1244            .await;
1245
1246        dbtx.commit_tx().await;
1247
1248        Ok(common_api_versions)
1249    }
1250
1251    /// Get the client [`Metadata`]
1252    pub async fn get_metadata(&self) -> Metadata {
1253        self.db
1254            .begin_transaction_nc()
1255            .await
1256            .get_value(&ClientMetadataKey)
1257            .await
1258            .unwrap_or_else(|| {
1259                warn!(
1260                    target: LOG_CLIENT,
1261                    "Missing existing metadata. This key should have been set on Client init"
1262                );
1263                Metadata::empty()
1264            })
1265    }
1266
1267    /// Set the client [`Metadata`]
1268    pub async fn set_metadata(&self, metadata: &Metadata) {
1269        self.db
1270            .autocommit::<_, _, anyhow::Error>(
1271                |dbtx, _| {
1272                    Box::pin(async {
1273                        Self::set_metadata_dbtx(dbtx, metadata).await;
1274                        Ok(())
1275                    })
1276                },
1277                None,
1278            )
1279            .await
1280            .expect("Failed to autocommit metadata");
1281    }
1282
1283    pub fn has_pending_recoveries(&self) -> bool {
1284        !self
1285            .client_recovery_progress_receiver
1286            .borrow()
1287            .iter()
1288            .all(|(_id, progress)| progress.is_done())
1289    }
1290
1291    /// Wait for all module recoveries to finish
1292    ///
1293    /// This will block until the recovery task is done with recoveries.
1294    /// Returns success if all recovery tasks are complete (success case),
1295    /// or an error if some modules could not complete the recovery at the time.
1296    ///
1297    /// A bit of a heavy approach.
1298    pub async fn wait_for_all_recoveries(&self) -> anyhow::Result<()> {
1299        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1300        recovery_receiver
1301            .wait_for(|in_progress| {
1302                in_progress
1303                    .iter()
1304                    .all(|(_id, progress)| progress.is_done())
1305            })
1306            .await
1307            .context("Recovery task completed and update receiver disconnected, but some modules failed to recover")?;
1308
1309        Ok(())
1310    }
1311
1312    /// Subscribe to recover progress for all the modules.
1313    ///
1314    /// This stream can contain duplicate progress for a module.
1315    /// Don't use this stream for detecting completion of recovery.
1316    pub fn subscribe_to_recovery_progress(
1317        &self,
1318    ) -> impl Stream<Item = (ModuleInstanceId, RecoveryProgress)> + use<> {
1319        WatchStream::new(self.client_recovery_progress_receiver.clone())
1320            .flat_map(futures::stream::iter)
1321    }
1322
1323    pub async fn wait_for_module_kind_recovery(
1324        &self,
1325        module_kind: ModuleKind,
1326    ) -> anyhow::Result<()> {
1327        let mut recovery_receiver = self.client_recovery_progress_receiver.clone();
1328        let config = self.config().await;
1329        recovery_receiver
1330            .wait_for(|in_progress| {
1331                !in_progress
1332                    .iter()
1333                    .filter(|(module_instance_id, _progress)| {
1334                        config.modules[module_instance_id].kind == module_kind
1335                    })
1336                    .any(|(_id, progress)| !progress.is_done())
1337            })
1338            .await
1339            .context("Recovery task completed and update receiver disconnected, but the desired modules are still unavailable or failed to recover")?;
1340
1341        Ok(())
1342    }
1343
1344    pub async fn wait_for_all_active_state_machines(&self) -> anyhow::Result<()> {
1345        loop {
1346            if self.executor.get_active_states().await.is_empty() {
1347                break;
1348            }
1349            fedimint_core::runtime::sleep(Duration::from_millis(100)).await;
1350        }
1351        Ok(())
1352    }
1353
1354    /// Set the client [`Metadata`]
1355    pub async fn set_metadata_dbtx(dbtx: &mut DatabaseTransaction<'_>, metadata: &Metadata) {
1356        dbtx.insert_new_entry(&ClientMetadataKey, metadata).await;
1357    }
1358
1359    fn spawn_module_recoveries_task(
1360        &self,
1361        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1362        module_recoveries: BTreeMap<
1363            ModuleInstanceId,
1364            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1365        >,
1366        module_recovery_progress_receivers: BTreeMap<
1367            ModuleInstanceId,
1368            watch::Receiver<RecoveryProgress>,
1369        >,
1370    ) {
1371        let db = self.db.clone();
1372        let log_ordering_wakeup_tx = self.log_ordering_wakeup_tx.clone();
1373        self.task_group
1374            .spawn("module recoveries", |_task_handle| async {
1375                Self::run_module_recoveries_task(
1376                    db,
1377                    log_ordering_wakeup_tx,
1378                    recovery_sender,
1379                    module_recoveries,
1380                    module_recovery_progress_receivers,
1381                )
1382                .await;
1383            });
1384    }
1385
1386    async fn run_module_recoveries_task(
1387        db: Database,
1388        log_ordering_wakeup_tx: watch::Sender<()>,
1389        recovery_sender: watch::Sender<BTreeMap<ModuleInstanceId, RecoveryProgress>>,
1390        module_recoveries: BTreeMap<
1391            ModuleInstanceId,
1392            Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
1393        >,
1394        module_recovery_progress_receivers: BTreeMap<
1395            ModuleInstanceId,
1396            watch::Receiver<RecoveryProgress>,
1397        >,
1398    ) {
1399        debug!(target: LOG_CLIENT_RECOVERY, num_modules=%module_recovery_progress_receivers.len(), "Staring module recoveries");
1400        let mut completed_stream = Vec::new();
1401        let progress_stream = futures::stream::FuturesUnordered::new();
1402
1403        for (module_instance_id, f) in module_recoveries {
1404            completed_stream.push(futures::stream::once(Box::pin(async move {
1405                match f.await {
1406                    Ok(()) => (module_instance_id, None),
1407                    Err(err) => {
1408                        warn!(
1409                            target: LOG_CLIENT,
1410                            err = %err.fmt_compact_anyhow(), module_instance_id, "Module recovery failed"
1411                        );
1412                        // a module recovery that failed reports and error and
1413                        // just never finishes, so we don't need a separate state
1414                        // for it
1415                        futures::future::pending::<()>().await;
1416                        unreachable!()
1417                    }
1418                }
1419            })));
1420        }
1421
1422        for (module_instance_id, rx) in module_recovery_progress_receivers {
1423            progress_stream.push(
1424                tokio_stream::wrappers::WatchStream::new(rx)
1425                    .fuse()
1426                    .map(move |progress| (module_instance_id, Some(progress))),
1427            );
1428        }
1429
1430        let mut futures = futures::stream::select(
1431            futures::stream::select_all(progress_stream),
1432            futures::stream::select_all(completed_stream),
1433        );
1434
1435        while let Some((module_instance_id, progress)) = futures.next().await {
1436            let mut dbtx = db.begin_transaction().await;
1437
1438            let prev_progress = *recovery_sender
1439                .borrow()
1440                .get(&module_instance_id)
1441                .expect("existing progress must be present");
1442
1443            let progress = if prev_progress.is_done() {
1444                // since updates might be out of order, once done, stick with it
1445                prev_progress
1446            } else if let Some(progress) = progress {
1447                progress
1448            } else {
1449                prev_progress.to_complete()
1450            };
1451
1452            if !prev_progress.is_done() && progress.is_done() {
1453                info!(
1454                    target: LOG_CLIENT,
1455                    module_instance_id,
1456                    progress = format!("{}/{}", progress.complete, progress.total),
1457                    "Recovery complete"
1458                );
1459                dbtx.log_event(
1460                    log_ordering_wakeup_tx.clone(),
1461                    None,
1462                    ModuleRecoveryCompleted {
1463                        module_id: module_instance_id,
1464                    },
1465                )
1466                .await;
1467            } else {
1468                info!(
1469                    target: LOG_CLIENT,
1470                    module_instance_id,
1471                    progress = format!("{}/{}", progress.complete, progress.total),
1472                    "Recovery progress"
1473                );
1474            }
1475
1476            dbtx.insert_entry(
1477                &ClientModuleRecovery { module_instance_id },
1478                &ClientModuleRecoveryState { progress },
1479            )
1480            .await;
1481            dbtx.commit_tx().await;
1482
1483            recovery_sender.send_modify(|v| {
1484                v.insert(module_instance_id, progress);
1485            });
1486        }
1487        debug!(target: LOG_CLIENT_RECOVERY, "Recovery executor stopped");
1488    }
1489
1490    async fn load_peers_last_api_versions(
1491        db: &Database,
1492        num_peers: NumPeers,
1493    ) -> BTreeMap<PeerId, SupportedApiVersionsSummary> {
1494        let mut peer_api_version_sets = BTreeMap::new();
1495
1496        let mut dbtx = db.begin_transaction_nc().await;
1497        for peer_id in num_peers.peer_ids() {
1498            if let Some(v) = dbtx
1499                .get_value(&PeerLastApiVersionsSummaryKey(peer_id))
1500                .await
1501            {
1502                peer_api_version_sets.insert(peer_id, v.0);
1503            }
1504        }
1505        drop(dbtx);
1506        peer_api_version_sets
1507    }
1508
1509    /// You likely want to use [`Client::get_peer_urls`]. This function returns
1510    /// only the announcements and doesn't use the config as fallback.
1511    pub async fn get_peer_url_announcements(&self) -> BTreeMap<PeerId, SignedApiAnnouncement> {
1512        self.db()
1513            .begin_transaction_nc()
1514            .await
1515            .find_by_prefix(&ApiAnnouncementPrefix)
1516            .await
1517            .map(|(announcement_key, announcement)| (announcement_key.0, announcement))
1518            .collect()
1519            .await
1520    }
1521
1522    /// Returns a list of guardian API URLs
1523    pub async fn get_peer_urls(&self) -> BTreeMap<PeerId, SafeUrl> {
1524        get_api_urls(&self.db, &self.config().await).await
1525    }
1526
1527    /// Create an invite code with the api endpoint of the given peer which can
1528    /// be used to download this client config
1529    pub async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1530        self.get_peer_urls()
1531            .await
1532            .into_iter()
1533            .find_map(|(peer_id, url)| (peer == peer_id).then_some(url))
1534            .map(|peer_url| {
1535                InviteCode::new(
1536                    peer_url.clone(),
1537                    peer,
1538                    self.federation_id(),
1539                    self.api_secret.clone(),
1540                )
1541            })
1542    }
1543
1544    /// Blocks till the client has synced the guardian public key set
1545    /// (introduced in version 0.4) and returns it. Once it has been fetched
1546    /// once this function is guaranteed to return immediately.
1547    pub async fn get_guardian_public_keys_blocking(
1548        &self,
1549    ) -> BTreeMap<PeerId, fedimint_core::secp256k1::PublicKey> {
1550        self.db
1551            .autocommit(
1552                |dbtx, _| {
1553                    Box::pin(async move {
1554                        let config = self.config().await;
1555
1556                        let guardian_pub_keys = self
1557                            .get_or_backfill_broadcast_public_keys(dbtx, config)
1558                            .await;
1559
1560                        Result::<_, ()>::Ok(guardian_pub_keys)
1561                    })
1562                },
1563                None,
1564            )
1565            .await
1566            .expect("Will retry forever")
1567    }
1568
1569    async fn get_or_backfill_broadcast_public_keys(
1570        &self,
1571        dbtx: &mut DatabaseTransaction<'_>,
1572        config: ClientConfig,
1573    ) -> BTreeMap<PeerId, PublicKey> {
1574        match config.global.broadcast_public_keys {
1575            Some(guardian_pub_keys) => guardian_pub_keys,
1576            _ => {
1577                let (guardian_pub_keys, new_config) = self.fetch_and_update_config(config).await;
1578
1579                dbtx.insert_entry(&ClientConfigKey, &new_config).await;
1580                *(self.config.write().await) = new_config;
1581                guardian_pub_keys
1582            }
1583        }
1584    }
1585
1586    async fn fetch_session_count(&self) -> FederationResult<u64> {
1587        self.api.session_count().await
1588    }
1589
1590    async fn fetch_and_update_config(
1591        &self,
1592        config: ClientConfig,
1593    ) -> (BTreeMap<PeerId, PublicKey>, ClientConfig) {
1594        let fetched_config = retry(
1595            "Fetching guardian public keys",
1596            backoff_util::background_backoff(),
1597            || async {
1598                Ok(self
1599                    .api
1600                    .request_current_consensus::<ClientConfig>(
1601                        CLIENT_CONFIG_ENDPOINT.to_owned(),
1602                        ApiRequestErased::default(),
1603                    )
1604                    .await?)
1605            },
1606        )
1607        .await
1608        .expect("Will never return on error");
1609
1610        let Some(guardian_pub_keys) = fetched_config.global.broadcast_public_keys else {
1611            warn!(
1612                target: LOG_CLIENT,
1613                "Guardian public keys not found in fetched config, server not updated to 0.4 yet"
1614            );
1615            pending::<()>().await;
1616            unreachable!("Pending will never return");
1617        };
1618
1619        let new_config = ClientConfig {
1620            global: GlobalClientConfig {
1621                broadcast_public_keys: Some(guardian_pub_keys.clone()),
1622                ..config.global
1623            },
1624            modules: config.modules,
1625        };
1626        (guardian_pub_keys, new_config)
1627    }
1628
1629    pub fn handle_global_rpc(
1630        &self,
1631        method: String,
1632        params: serde_json::Value,
1633    ) -> BoxStream<'_, anyhow::Result<serde_json::Value>> {
1634        Box::pin(try_stream! {
1635            match method.as_str() {
1636                "get_balance" => {
1637                    let balance = self.get_balance().await.unwrap_or_default();
1638                    yield serde_json::to_value(balance)?;
1639                }
1640                "subscribe_balance_changes" => {
1641                    let mut stream = self.subscribe_balance_changes().await;
1642                    while let Some(balance) = stream.next().await {
1643                        yield serde_json::to_value(balance)?;
1644                    }
1645                }
1646                "get_config" => {
1647                    let config = self.config().await;
1648                    yield serde_json::to_value(config)?;
1649                }
1650                "get_federation_id" => {
1651                    let federation_id = self.federation_id();
1652                    yield serde_json::to_value(federation_id)?;
1653                }
1654                "get_invite_code" => {
1655                    let req: GetInviteCodeRequest = serde_json::from_value(params)?;
1656                    let invite_code = self.invite_code(req.peer).await;
1657                    yield serde_json::to_value(invite_code)?;
1658                }
1659                "get_operation" => {
1660                    let req: GetOperationIdRequest = serde_json::from_value(params)?;
1661                    let operation = self.operation_log().get_operation(req.operation_id).await;
1662                    yield serde_json::to_value(operation)?;
1663                }
1664                "list_operations" => {
1665                    let req: ListOperationsParams = serde_json::from_value(params)?;
1666                    let limit = if req.limit.is_none() && req.last_seen.is_none() {
1667                        usize::MAX
1668                    } else {
1669                        req.limit.unwrap_or(usize::MAX)
1670                    };
1671                    let operations = self.operation_log()
1672                        .paginate_operations_rev(limit, req.last_seen)
1673                        .await;
1674                    yield serde_json::to_value(operations)?;
1675                }
1676                "session_count" => {
1677                    let count = self.fetch_session_count().await?;
1678                    yield serde_json::to_value(count)?;
1679                }
1680                "has_pending_recoveries" => {
1681                    let has_pending = self.has_pending_recoveries();
1682                    yield serde_json::to_value(has_pending)?;
1683                }
1684                "wait_for_all_recoveries" => {
1685                    self.wait_for_all_recoveries().await?;
1686                    yield serde_json::Value::Null;
1687                }
1688                "subscribe_to_recovery_progress" => {
1689                    let mut stream = self.subscribe_to_recovery_progress();
1690                    while let Some((module_id, progress)) = stream.next().await {
1691                        yield serde_json::json!({
1692                            "module_id": module_id,
1693                            "progress": progress
1694                        });
1695                    }
1696                }
1697                "backup_to_federation" => {
1698                    let metadata = if params.is_null() {
1699                        Metadata::from_json_serialized(serde_json::json!({}))
1700                    } else {
1701                        Metadata::from_json_serialized(params)
1702                    };
1703                    self.backup_to_federation(metadata).await?;
1704                    yield serde_json::Value::Null;
1705                }
1706                _ => {
1707                    Err(anyhow::format_err!("Unknown method: {}", method))?;
1708                    unreachable!()
1709                },
1710            }
1711        })
1712    }
1713
1714    pub async fn log_event<E>(&self, module_id: Option<ModuleInstanceId>, event: E)
1715    where
1716        E: Event + Send,
1717    {
1718        let mut dbtx = self.db.begin_transaction().await;
1719        self.log_event_dbtx(&mut dbtx, module_id, event).await;
1720        dbtx.commit_tx().await;
1721    }
1722
1723    pub async fn log_event_dbtx<E, Cap>(
1724        &self,
1725        dbtx: &mut DatabaseTransaction<'_, Cap>,
1726        module_id: Option<ModuleInstanceId>,
1727        event: E,
1728    ) where
1729        E: Event + Send,
1730        Cap: Send,
1731    {
1732        dbtx.log_event(self.log_ordering_wakeup_tx.clone(), module_id, event)
1733            .await;
1734    }
1735
1736    pub async fn log_event_raw_dbtx<Cap>(
1737        &self,
1738        dbtx: &mut DatabaseTransaction<'_, Cap>,
1739        kind: EventKind,
1740        module: Option<(ModuleKind, ModuleInstanceId)>,
1741        payload: Vec<u8>,
1742        persist: EventPersistence,
1743    ) where
1744        Cap: Send,
1745    {
1746        let module_id = module.as_ref().map(|m| m.1);
1747        let module_kind = module.map(|m| m.0);
1748        dbtx.log_event_raw(
1749            self.log_ordering_wakeup_tx.clone(),
1750            kind,
1751            module_kind,
1752            module_id,
1753            payload,
1754            persist,
1755        )
1756        .await;
1757    }
1758
1759    pub async fn handle_events<F, R, K>(&self, pos_key: &K, call_fn: F) -> anyhow::Result<()>
1760    where
1761        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1762        K: DatabaseRecord<Value = EventLogId>,
1763        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1764        R: Future<Output = anyhow::Result<()>>,
1765    {
1766        fedimint_eventlog::handle_events(
1767            self.db.clone(),
1768            pos_key,
1769            self.log_event_added_rx.clone(),
1770            call_fn,
1771        )
1772        .await
1773    }
1774
1775    pub async fn handle_trimable_events<F, R, K>(
1776        &self,
1777        pos_key: &K,
1778        call_fn: F,
1779    ) -> anyhow::Result<()>
1780    where
1781        K: DatabaseKey + DatabaseRecord + MaybeSend + MaybeSync,
1782        K: DatabaseRecord<Value = EventLogTrimableId>,
1783        F: Fn(&mut DatabaseTransaction<NonCommittable>, EventLogEntry) -> R,
1784        R: Future<Output = anyhow::Result<()>>,
1785    {
1786        fedimint_eventlog::handle_trimable_events(
1787            self.db.clone(),
1788            pos_key,
1789            self.log_event_added_rx.clone(),
1790            call_fn,
1791        )
1792        .await
1793    }
1794
1795    pub async fn get_event_log(
1796        &self,
1797        pos: Option<EventLogId>,
1798        limit: u64,
1799    ) -> Vec<PersistedLogEntry> {
1800        self.get_event_log_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1801            .await
1802    }
1803
1804    pub async fn get_event_log_trimable(
1805        &self,
1806        pos: Option<EventLogTrimableId>,
1807        limit: u64,
1808    ) -> Vec<PersistedLogEntry> {
1809        self.get_event_log_trimable_dbtx(&mut self.db.begin_transaction_nc().await, pos, limit)
1810            .await
1811    }
1812
1813    pub async fn get_event_log_dbtx<Cap>(
1814        &self,
1815        dbtx: &mut DatabaseTransaction<'_, Cap>,
1816        pos: Option<EventLogId>,
1817        limit: u64,
1818    ) -> Vec<PersistedLogEntry>
1819    where
1820        Cap: Send,
1821    {
1822        dbtx.get_event_log(pos, limit).await
1823    }
1824
1825    pub async fn get_event_log_trimable_dbtx<Cap>(
1826        &self,
1827        dbtx: &mut DatabaseTransaction<'_, Cap>,
1828        pos: Option<EventLogTrimableId>,
1829        limit: u64,
1830    ) -> Vec<PersistedLogEntry>
1831    where
1832        Cap: Send,
1833    {
1834        dbtx.get_event_log_trimable(pos, limit).await
1835    }
1836
1837    /// Register to receiver all new transient (unpersisted) events
1838    pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1839        self.log_event_added_transient_tx.subscribe()
1840    }
1841
1842    pub fn iroh_enable_dht(&self) -> bool {
1843        self.iroh_enable_dht
1844    }
1845
1846    pub(crate) async fn run_core_migrations(
1847        db_no_decoders: &Database,
1848    ) -> Result<(), anyhow::Error> {
1849        let mut dbtx = db_no_decoders.begin_transaction().await;
1850        apply_migrations_core_client_dbtx(&mut dbtx.to_ref_nc(), "fedimint-client".to_string())
1851            .await?;
1852        if is_running_in_test_env() {
1853            verify_client_db_integrity_dbtx(&mut dbtx.to_ref_nc()).await;
1854        }
1855        dbtx.commit_tx_result().await?;
1856        Ok(())
1857    }
1858}
1859
1860#[apply(async_trait_maybe_send!)]
1861impl ClientContextIface for Client {
1862    fn get_module(&self, instance: ModuleInstanceId) -> &maybe_add_send_sync!(dyn IClientModule) {
1863        Client::get_module(self, instance)
1864    }
1865
1866    fn api_clone(&self) -> DynGlobalApi {
1867        Client::api_clone(self)
1868    }
1869    fn decoders(&self) -> &ModuleDecoderRegistry {
1870        Client::decoders(self)
1871    }
1872
1873    async fn finalize_and_submit_transaction(
1874        &self,
1875        operation_id: OperationId,
1876        operation_type: &str,
1877        operation_meta_gen: Box<maybe_add_send_sync!(dyn Fn(OutPointRange) -> serde_json::Value)>,
1878        tx_builder: TransactionBuilder,
1879    ) -> anyhow::Result<OutPointRange> {
1880        Client::finalize_and_submit_transaction(
1881            self,
1882            operation_id,
1883            operation_type,
1884            // |out_point_range| operation_meta_gen(out_point_range),
1885            &operation_meta_gen,
1886            tx_builder,
1887        )
1888        .await
1889    }
1890
1891    async fn finalize_and_submit_transaction_inner(
1892        &self,
1893        dbtx: &mut DatabaseTransaction<'_>,
1894        operation_id: OperationId,
1895        tx_builder: TransactionBuilder,
1896    ) -> anyhow::Result<OutPointRange> {
1897        Client::finalize_and_submit_transaction_inner(self, dbtx, operation_id, tx_builder).await
1898    }
1899
1900    async fn transaction_updates(&self, operation_id: OperationId) -> TransactionUpdates {
1901        Client::transaction_updates(self, operation_id).await
1902    }
1903
1904    async fn await_primary_module_outputs(
1905        &self,
1906        operation_id: OperationId,
1907        // TODO: make `impl Iterator<Item = ...>`
1908        outputs: Vec<OutPoint>,
1909    ) -> anyhow::Result<()> {
1910        Client::await_primary_module_outputs(self, operation_id, outputs).await
1911    }
1912
1913    fn operation_log(&self) -> &dyn IOperationLog {
1914        Client::operation_log(self)
1915    }
1916
1917    async fn has_active_states(&self, operation_id: OperationId) -> bool {
1918        Client::has_active_states(self, operation_id).await
1919    }
1920
1921    async fn operation_exists(&self, operation_id: OperationId) -> bool {
1922        Client::operation_exists(self, operation_id).await
1923    }
1924
1925    async fn config(&self) -> ClientConfig {
1926        Client::config(self).await
1927    }
1928
1929    fn db(&self) -> &Database {
1930        Client::db(self)
1931    }
1932
1933    fn executor(&self) -> &(maybe_add_send_sync!(dyn IExecutor + 'static)) {
1934        Client::executor(self)
1935    }
1936
1937    async fn invite_code(&self, peer: PeerId) -> Option<InviteCode> {
1938        Client::invite_code(self, peer).await
1939    }
1940
1941    fn get_internal_payment_markers(&self) -> anyhow::Result<(PublicKey, u64)> {
1942        Client::get_internal_payment_markers(self)
1943    }
1944
1945    async fn log_event_json(
1946        &self,
1947        dbtx: &mut DatabaseTransaction<'_, NonCommittable>,
1948        module_kind: Option<ModuleKind>,
1949        module_id: ModuleInstanceId,
1950        kind: EventKind,
1951        payload: serde_json::Value,
1952        persist: EventPersistence,
1953    ) {
1954        dbtx.ensure_global()
1955            .expect("Must be called with global dbtx");
1956        self.log_event_raw_dbtx(
1957            dbtx,
1958            kind,
1959            module_kind.map(|kind| (kind, module_id)),
1960            serde_json::to_vec(&payload).expect("Serialization can't fail"),
1961            persist,
1962        )
1963        .await;
1964    }
1965
1966    async fn read_operation_active_states<'dbtx>(
1967        &self,
1968        operation_id: OperationId,
1969        module_id: ModuleInstanceId,
1970        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1971    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (ActiveStateKey, ActiveStateMeta)> + 'dbtx)>>
1972    {
1973        Box::pin(
1974            dbtx.find_by_prefix(&ActiveModuleOperationStateKeyPrefix {
1975                operation_id,
1976                module_instance: module_id,
1977            })
1978            .await
1979            .map(move |(k, v)| (k.0, v)),
1980        )
1981    }
1982    async fn read_operation_inactive_states<'dbtx>(
1983        &self,
1984        operation_id: OperationId,
1985        module_id: ModuleInstanceId,
1986        dbtx: &'dbtx mut DatabaseTransaction<'_>,
1987    ) -> Pin<Box<maybe_add_send!(dyn Stream<Item = (InactiveStateKey, InactiveStateMeta)> + 'dbtx)>>
1988    {
1989        Box::pin(
1990            dbtx.find_by_prefix(&InactiveModuleOperationStateKeyPrefix {
1991                operation_id,
1992                module_instance: module_id,
1993            })
1994            .await
1995            .map(move |(k, v)| (k.0, v)),
1996        )
1997    }
1998}
1999
2000// TODO: impl `Debug` for `Client` and derive here
2001impl fmt::Debug for Client {
2002    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2003        write!(f, "Client")
2004    }
2005}
2006
2007pub fn client_decoders<'a>(
2008    registry: &ModuleInitRegistry<DynClientModuleInit>,
2009    module_kinds: impl Iterator<Item = (ModuleInstanceId, &'a ModuleKind)>,
2010) -> ModuleDecoderRegistry {
2011    let mut modules = BTreeMap::new();
2012    for (id, kind) in module_kinds {
2013        let Some(init) = registry.get(kind) else {
2014            debug!("Detected configuration for unsupported module id: {id}, kind: {kind}");
2015            continue;
2016        };
2017
2018        modules.insert(
2019            id,
2020            (
2021                kind.clone(),
2022                IClientModuleInit::decoder(AsRef::<dyn IClientModuleInit + 'static>::as_ref(init)),
2023            ),
2024        );
2025    }
2026    ModuleDecoderRegistry::from(modules)
2027}