1use std::collections::BTreeMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::time::Duration;
6
7use anyhow::{Context as _, anyhow, bail, ensure};
8use bitcoin::key::Secp256k1;
9use fedimint_api_client::api::global_api::with_cache::GlobalFederationApiWithCacheExt as _;
10use fedimint_api_client::api::global_api::with_request_hook::{
11 ApiRequestHook, RawFederationApiWithRequestHookExt as _,
12};
13use fedimint_api_client::api::net::Connector;
14use fedimint_api_client::api::{
15 ApiVersionSet, DynClientConnector, DynGlobalApi, FederationApiExt as _, ReconnectFederationApi,
16 make_admin_connector, make_connector,
17};
18use fedimint_client_module::api::ClientRawFederationApiExt as _;
19use fedimint_client_module::meta::LegacyMetaSource;
20use fedimint_client_module::module::init::ClientModuleInit;
21use fedimint_client_module::module::recovery::RecoveryProgress;
22use fedimint_client_module::module::{ClientModuleRegistry, FinalClientIface};
23use fedimint_client_module::secret::{DeriveableSecretClientExt as _, get_default_client_secret};
24use fedimint_client_module::transaction::{
25 TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext, tx_submission_sm_decoder,
26};
27use fedimint_client_module::{AdminCreds, ModuleRecoveryStarted};
28use fedimint_core::config::{ClientConfig, FederationId, ModuleInitRegistry};
29use fedimint_core::core::{ModuleInstanceId, ModuleKind};
30use fedimint_core::db::{
31 Database, IDatabaseTransactionOpsCoreTyped as _, verify_module_db_integrity_dbtx,
32};
33use fedimint_core::endpoint_constants::CLIENT_CONFIG_ENDPOINT;
34use fedimint_core::envs::is_running_in_test_env;
35use fedimint_core::invite_code::InviteCode;
36use fedimint_core::module::registry::ModuleDecoderRegistry;
37use fedimint_core::module::{ApiRequestErased, ApiVersion, SupportedApiVersionsSummary};
38use fedimint_core::task::TaskGroup;
39use fedimint_core::task::jit::{Jit, JitTry, JitTryAnyhow};
40use fedimint_core::util::{FmtCompact as _, FmtCompactAnyhow as _};
41use fedimint_core::{NumPeers, PeerId, fedimint_build_code_version_env, maybe_add_send};
42use fedimint_derive_secret::DerivableSecret;
43use fedimint_eventlog::{
44 DBTransactionEventLogExt as _, EventLogEntry, run_event_log_ordering_task,
45};
46use fedimint_logging::LOG_CLIENT;
47use tokio::sync::{broadcast, watch};
48use tracing::{debug, trace, warn};
49
50use super::handle::ClientHandle;
51use super::{Client, client_decoders};
52use crate::api_announcements::{
53 PeersSignedApiAnnouncements, fetch_api_announcements_from_at_least_num_of_peers, get_api_urls,
54 run_api_announcement_refresh_task, store_api_announcements_updates_from_peers,
55};
56use crate::backup::{ClientBackup, Metadata};
57use crate::db::{
58 self, ApiSecretKey, ClientInitStateKey, ClientMetadataKey, ClientModuleRecovery,
59 ClientModuleRecoveryState, ClientPreRootSecretHashKey, InitMode, InitState,
60 PendingClientConfigKey, apply_migrations_client_module_dbtx,
61};
62use crate::meta::MetaService;
63use crate::module_init::ClientModuleInitRegistry;
64use crate::oplog::OperationLog;
65use crate::sm::executor::Executor;
66use crate::sm::notifier::Notifier;
67
68#[derive(Clone)]
90pub enum RootSecret {
91 StandardDoubleDerive(DerivableSecret),
96 Custom(DerivableSecret),
101}
102
103impl RootSecret {
104 fn to_inner(&self, federation_id: FederationId) -> DerivableSecret {
105 match self {
106 RootSecret::StandardDoubleDerive(derivable_secret) => {
107 get_default_client_secret(derivable_secret, &federation_id)
108 }
109 RootSecret::Custom(derivable_secret) => derivable_secret.clone(),
110 }
111 }
112}
113
114pub struct ClientBuilder {
116 module_inits: ClientModuleInitRegistry,
117 primary_module_instance: Option<ModuleInstanceId>,
118 primary_module_kind: Option<ModuleKind>,
119 admin_creds: Option<AdminCreds>,
120 meta_service: Arc<crate::meta::MetaService>,
121 connector: Connector,
122 stopped: bool,
123 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
124 request_hook: ApiRequestHook,
125 reuse_connector: Option<DynClientConnector>,
126 iroh_enable_dht: bool,
127 iroh_enable_next: bool,
128}
129
130impl ClientBuilder {
131 pub(crate) fn new() -> Self {
132 trace!(
133 target: LOG_CLIENT,
134 version = %fedimint_build_code_version_env!(),
135 "Initializing fedimint client",
136 );
137 let meta_service = MetaService::new(LegacyMetaSource::default());
138 let (log_event_added_transient_tx, _log_event_added_transient_rx) =
139 broadcast::channel(1024);
140 ClientBuilder {
141 module_inits: ModuleInitRegistry::new(),
142 primary_module_instance: None,
143 primary_module_kind: None,
144 connector: Connector::default(),
145 admin_creds: None,
146 stopped: false,
147 meta_service,
148 log_event_added_transient_tx,
149 request_hook: Arc::new(|api| api),
150 reuse_connector: None,
151 iroh_enable_dht: true,
152 iroh_enable_next: true,
153 }
154 }
155
156 pub(crate) fn from_existing(client: &Client) -> Self {
157 ClientBuilder {
158 module_inits: client.module_inits.clone(),
159 primary_module_instance: Some(client.primary_module_instance),
160 primary_module_kind: None,
161 admin_creds: None,
162 stopped: false,
163 meta_service: client.meta_service.clone(),
165 connector: client.connector,
166 log_event_added_transient_tx: client.log_event_added_transient_tx.clone(),
167 request_hook: client.request_hook.clone(),
168 reuse_connector: Some(client.api.connector().clone()),
169 iroh_enable_dht: client.iroh_enable_dht,
170 iroh_enable_next: client.iroh_enable_next,
171 }
172 }
173
174 pub fn with_module_inits(&mut self, module_inits: ClientModuleInitRegistry) {
183 self.module_inits = module_inits;
184 }
185
186 pub fn with_module<M: ClientModuleInit>(&mut self, module_init: M) {
195 self.module_inits.attach(module_init);
196 }
197
198 pub fn stopped(&mut self) {
199 self.stopped = true;
200 }
201
202 pub fn with_api_request_hook(mut self, hook: ApiRequestHook) -> Self {
211 self.request_hook = hook;
212 self
213 }
214
215 #[deprecated(
222 since = "0.6.0",
223 note = "Use `with_primary_module_kind` instead, as the instance id can't be known upfront. If you *really* need the old behavior you can use `with_primary_module_instance_id`."
224 )]
225 pub fn with_primary_module(&mut self, primary_module_instance: ModuleInstanceId) {
226 self.with_primary_module_instance_id(primary_module_instance);
227 }
228
229 pub fn with_primary_module_instance_id(&mut self, primary_module_instance: ModuleInstanceId) {
244 let was_replaced = self
245 .primary_module_instance
246 .replace(primary_module_instance)
247 .is_some();
248 assert!(
249 !was_replaced,
250 "Only one primary module can be given to the builder."
251 );
252 }
253
254 pub fn with_primary_module_kind(&mut self, primary_module_kind: ModuleKind) {
261 let was_replaced = self
262 .primary_module_kind
263 .replace(primary_module_kind)
264 .is_some();
265 assert!(
266 !was_replaced,
267 "Only one primary module kind can be given to the builder."
268 );
269 }
270
271 pub fn with_meta_service(&mut self, meta_service: Arc<MetaService>) {
272 self.meta_service = meta_service;
273 }
274
275 pub fn with_iroh_enable_dht(mut self, iroh_enable_dht: bool) -> Self {
278 self.iroh_enable_dht = iroh_enable_dht;
279 self
280 }
281
282 pub fn with_iroh_enable_next(mut self, iroh_enable_next: bool) -> Self {
285 self.iroh_enable_next = iroh_enable_next;
286 self
287 }
288
289 async fn migrate_module_dbs(&self, db: &Database) -> anyhow::Result<()> {
296 if let Ok(client_config) = self.load_existing_config(db).await {
300 for (module_id, module_cfg) in client_config.modules {
301 let kind = module_cfg.kind.clone();
302 let Some(init) = self.module_inits.get(&kind) else {
303 continue;
305 };
306
307 let mut dbtx = db.begin_transaction().await;
308 apply_migrations_client_module_dbtx(
309 &mut dbtx.to_ref_nc(),
310 kind.to_string(),
311 init.get_database_migrations(),
312 module_id,
313 )
314 .await?;
315 if let Some(used_db_prefixes) = init.used_db_prefixes()
316 && is_running_in_test_env()
317 {
318 verify_module_db_integrity_dbtx(
319 &mut dbtx.to_ref_nc(),
320 module_id,
321 kind,
322 &used_db_prefixes,
323 )
324 .await;
325 }
326 dbtx.commit_tx_result().await?;
327 }
328 }
329
330 Ok(())
331 }
332
333 pub async fn load_existing_config(&self, db: &Database) -> anyhow::Result<ClientConfig> {
334 let Some(config) = Client::get_config_from_db(db).await else {
335 bail!("Client database not initialized")
336 };
337
338 Ok(config)
339 }
340
341 pub fn set_admin_creds(&mut self, creds: AdminCreds) {
342 self.admin_creds = Some(creds);
343 }
344
345 pub fn with_connector(&mut self, connector: Connector) {
346 self.connector = connector;
347 }
348
349 #[cfg(feature = "tor")]
350 pub fn with_tor_connector(&mut self) {
351 self.with_connector(Connector::tor());
352 }
353
354 #[allow(clippy::too_many_arguments)]
355 async fn init(
356 self,
357 db_no_decoders: Database,
358 pre_root_secret: DerivableSecret,
359 config: ClientConfig,
360 api_secret: Option<String>,
361 init_mode: InitMode,
362 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
363 preview_prefetch_api_version_set: Option<
364 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
365 >,
366 ) -> anyhow::Result<ClientHandle> {
367 if Client::is_initialized(&db_no_decoders).await {
368 bail!("Client database already initialized")
369 }
370
371 Client::run_core_migrations(&db_no_decoders).await?;
372
373 {
376 debug!(target: LOG_CLIENT, "Initializing client database");
377 let mut dbtx = db_no_decoders.begin_transaction().await;
378 dbtx.insert_new_entry(&crate::db::ClientConfigKey, &config)
380 .await;
381 dbtx.insert_entry(
382 &ClientPreRootSecretHashKey,
383 &pre_root_secret.derive_pre_root_secret_hash(),
384 )
385 .await;
386
387 if let Some(api_secret) = api_secret.as_ref() {
388 dbtx.insert_new_entry(&ApiSecretKey, api_secret).await;
389 }
390
391 let init_state = InitState::Pending(init_mode);
392 dbtx.insert_entry(&ClientInitStateKey, &init_state).await;
393
394 let metadata = init_state
395 .does_require_recovery()
396 .flatten()
397 .map_or(Metadata::empty(), |s| s.metadata);
398
399 dbtx.insert_new_entry(&ClientMetadataKey, &metadata).await;
400
401 dbtx.commit_tx_result().await?;
402 }
403
404 let stopped = self.stopped;
405 self.build(
406 db_no_decoders,
407 pre_root_secret,
408 config,
409 api_secret,
410 stopped,
411 preview_prefetch_api_announcements,
412 preview_prefetch_api_version_set,
413 )
414 .await
415 }
416
417 pub async fn preview(self, invite_code: &InviteCode) -> anyhow::Result<ClientPreview> {
418 let (config, api) = self
419 .connector
420 .download_from_invite_code(invite_code, self.iroh_enable_dht, self.iroh_enable_next)
421 .await?;
422
423 let prefetch_api_announcements =
424 config
425 .global
426 .broadcast_public_keys
427 .clone()
428 .map(|guardian_pub_keys| {
429 Jit::new({
430 let api = api.clone();
431 move || async move {
432 fetch_api_announcements_from_at_least_num_of_peers(
436 1,
437 &api,
438 &guardian_pub_keys,
439 Duration::from_millis(20),
442 )
443 .await
444 }
445 })
446 });
447
448 self.preview_inner(
449 config,
450 invite_code.api_secret(),
451 Some(api),
452 prefetch_api_announcements,
453 )
454 .await
455 }
456
457 pub async fn preview_with_existing_config(
462 self,
463 config: ClientConfig,
464 api_secret: Option<String>,
465 reuse_api: Option<DynGlobalApi>,
466 ) -> anyhow::Result<ClientPreview> {
467 self.preview_inner(config, api_secret, reuse_api, None)
468 .await
469 }
470
471 async fn preview_inner(
472 mut self,
473 config: ClientConfig,
474 api_secret: Option<String>,
475 reuse_api: Option<DynGlobalApi>,
476 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
477 ) -> anyhow::Result<ClientPreview> {
478 let preview_prefetch_api_version_set = if let Some(api) = reuse_api {
479 self.reuse_connector = Some(api.connector().clone());
480
481 Some(JitTry::new_try({
482 let config = config.clone();
483 || async move { Client::fetch_common_api_versions(&config, &api).await }
484 }))
485 } else {
486 None
487 };
488 Ok(ClientPreview {
489 inner: self,
490 config,
491 api_secret,
492 prefetch_api_announcements,
493 preview_prefetch_api_version_set,
494 })
495 }
496
497 pub async fn open(
498 self,
499 db_no_decoders: Database,
500 pre_root_secret: RootSecret,
501 ) -> anyhow::Result<ClientHandle> {
502 Client::run_core_migrations(&db_no_decoders).await?;
503
504 Self::migrate_pending_config_if_present(&db_no_decoders).await;
506
507 let Some(config) = Client::get_config_from_db(&db_no_decoders).await else {
508 bail!("Client database not initialized")
509 };
510
511 let pre_root_secret = pre_root_secret.to_inner(config.calculate_federation_id());
512
513 match db_no_decoders
514 .begin_transaction_nc()
515 .await
516 .get_value(&ClientPreRootSecretHashKey)
517 .await
518 {
519 Some(secret_hash) => {
520 ensure!(
521 pre_root_secret.derive_pre_root_secret_hash() == secret_hash,
522 "Secret hash does not match. Incorrect secret"
523 );
524 }
525 _ => {
526 debug!(target: LOG_CLIENT, "Backfilling secret hash");
527 let mut dbtx = db_no_decoders.begin_transaction().await;
529 dbtx.insert_entry(
530 &ClientPreRootSecretHashKey,
531 &pre_root_secret.derive_pre_root_secret_hash(),
532 )
533 .await;
534 dbtx.commit_tx().await;
535 }
536 }
537
538 let api_secret = Client::get_api_secret_from_db(&db_no_decoders).await;
539 let stopped = self.stopped;
540 let request_hook = self.request_hook.clone();
541
542 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
543 let client = self
544 .build_stopped(
545 db_no_decoders,
546 pre_root_secret,
547 &config,
548 api_secret,
549 log_event_added_transient_tx,
550 request_hook,
551 None,
552 None,
553 )
554 .await?;
555 if !stopped {
556 client.as_inner().start_executor();
557 }
558 Ok(client)
559 }
560
561 #[allow(clippy::too_many_arguments)]
563 pub(crate) async fn build(
564 self,
565 db_no_decoders: Database,
566 pre_root_secret: DerivableSecret,
567 config: ClientConfig,
568 api_secret: Option<String>,
569 stopped: bool,
570 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
571 preview_prefetch_api_version_set: Option<
572 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
573 >,
574 ) -> anyhow::Result<ClientHandle> {
575 let log_event_added_transient_tx = self.log_event_added_transient_tx.clone();
576 let request_hook = self.request_hook.clone();
577 let client = self
578 .build_stopped(
579 db_no_decoders,
580 pre_root_secret,
581 &config,
582 api_secret,
583 log_event_added_transient_tx,
584 request_hook,
585 preview_prefetch_api_announcements,
586 preview_prefetch_api_version_set,
587 )
588 .await?;
589 if !stopped {
590 client.as_inner().start_executor();
591 }
592
593 Ok(client)
594 }
595
596 #[allow(clippy::too_many_arguments)]
599 async fn build_stopped(
600 self,
601 db_no_decoders: Database,
602 pre_root_secret: DerivableSecret,
603 config: &ClientConfig,
604 api_secret: Option<String>,
605 log_event_added_transient_tx: broadcast::Sender<EventLogEntry>,
606 request_hook: ApiRequestHook,
607 preview_prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
608 preview_prefetch_api_version_set: Option<
609 JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>,
610 >,
611 ) -> anyhow::Result<ClientHandle> {
612 debug!(
613 target: LOG_CLIENT,
614 version = %fedimint_build_code_version_env!(),
615 "Building fedimint client",
616 );
617 let (log_event_added_tx, log_event_added_rx) = watch::channel(());
618 let (log_ordering_wakeup_tx, log_ordering_wakeup_rx) = watch::channel(());
619
620 let decoders = self.decoders(config);
621 let config = Self::config_decoded(config, &decoders)?;
622 let fed_id = config.calculate_federation_id();
623 let db = db_no_decoders.with_decoders(decoders.clone());
624 let connector = self.connector;
625 let peer_urls = get_api_urls(&db, &config).await;
626 let api = match self.admin_creds.as_ref() {
627 Some(admin_creds) => {
628 let connector = make_admin_connector(
629 admin_creds.peer_id,
630 peer_urls
631 .into_iter()
632 .find_map(|(peer, api_url)| {
633 (admin_creds.peer_id == peer).then_some(api_url)
634 })
635 .context("Admin creds should match a peer")?,
636 &api_secret,
637 self.iroh_enable_dht,
638 self.iroh_enable_next,
639 )
640 .await?;
641 ReconnectFederationApi::new_admin(connector, admin_creds.peer_id)
642 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
643 .with_request_hook(&request_hook)
644 .with_cache()
645 .into()
646 }
647 None => {
648 let connector = if let Some(connector) = self.reuse_connector.clone()
649 && connector.peers().len() == peer_urls.len()
650 {
651 connector
652 } else {
653 make_connector(
654 peer_urls,
655 &api_secret,
656 self.iroh_enable_dht,
657 self.iroh_enable_next,
658 )
659 .await?
660 };
661 ReconnectFederationApi::new(connector, None)
662 .with_client_ext(db.clone(), log_ordering_wakeup_tx.clone())
663 .with_request_hook(&request_hook)
664 .with_cache()
665 .into()
666 }
667 };
668
669 let task_group = TaskGroup::new();
670
671 self.migrate_module_dbs(&db).await?;
674
675 let init_state = Self::load_init_state(&db).await;
676
677 let mut primary_module_instance = self.primary_module_instance.or_else(|| {
678 let primary_module_kind = self.primary_module_kind?;
679 config
680 .modules
681 .iter()
682 .find_map(|(module_instance_id, module_config)| {
683 (module_config.kind() == &primary_module_kind).then_some(*module_instance_id)
684 })
685 });
686
687 let notifier = Notifier::new();
688
689 if let Some(p) = preview_prefetch_api_announcements {
690 let announcements = p.get().await;
694
695 store_api_announcements_updates_from_peers(&db, announcements).await?
696 }
697
698 if let Some(preview_prefetch_api_version_set) = preview_prefetch_api_version_set {
699 match preview_prefetch_api_version_set.get_try().await {
700 Ok(peer_api_versions) => {
701 Client::store_prefetched_api_versions(
702 &db,
703 &config,
704 &self.module_inits,
705 peer_api_versions,
706 )
707 .await;
708 }
709 Err(err) => {
710 debug!(target: LOG_CLIENT, err = %err.fmt_compact(), "Prefetching api version negotiation failed");
711 }
712 }
713 }
714
715 let common_api_versions = Client::load_and_refresh_common_api_version_static(
716 &config,
717 &self.module_inits,
718 &api,
719 &db,
720 &task_group,
721 )
722 .await
723 .inspect_err(|err| {
724 warn!(target: LOG_CLIENT, err = %err.fmt_compact_anyhow(), "Failed to discover API version to use.");
725 })
726 .unwrap_or(ApiVersionSet {
727 core: ApiVersion::new(0, 0),
728 modules: BTreeMap::new(),
730 });
731
732 debug!(target: LOG_CLIENT, ?common_api_versions, "Completed api version negotiation");
733
734 Self::load_and_refresh_client_config_static(&config, &api, &db, &task_group);
736
737 let mut module_recoveries: BTreeMap<
738 ModuleInstanceId,
739 Pin<Box<maybe_add_send!(dyn Future<Output = anyhow::Result<()>>)>>,
740 > = BTreeMap::new();
741 let mut module_recovery_progress_receivers: BTreeMap<
742 ModuleInstanceId,
743 watch::Receiver<RecoveryProgress>,
744 > = BTreeMap::new();
745
746 let final_client = FinalClientIface::default();
747
748 let root_secret = Self::federation_root_secret(&pre_root_secret, &config);
749
750 let modules = {
751 let mut modules = ClientModuleRegistry::default();
752 for (module_instance_id, module_config) in config.modules.clone() {
753 let kind = module_config.kind().clone();
754 let Some(module_init) = self.module_inits.get(&kind).cloned() else {
755 debug!(
756 target: LOG_CLIENT,
757 kind=%kind,
758 instance_id=%module_instance_id,
759 "Module kind of instance not found in module gens, skipping");
760 continue;
761 };
762
763 let Some(&api_version) = common_api_versions.modules.get(&module_instance_id)
764 else {
765 warn!(
766 target: LOG_CLIENT,
767 kind=%kind,
768 instance_id=%module_instance_id,
769 "Module kind of instance has incompatible api version, skipping"
770 );
771 continue;
772 };
773
774 let start_module_recover_fn =
777 |snapshot: Option<ClientBackup>, progress: RecoveryProgress| {
778 let module_config = module_config.clone();
779 let num_peers = NumPeers::from(config.global.api_endpoints.len());
780 let db = db.clone();
781 let kind = kind.clone();
782 let notifier = notifier.clone();
783 let api = api.clone();
784 let root_secret = root_secret.clone();
785 let admin_auth = self.admin_creds.as_ref().map(|creds| creds.auth.clone());
786 let final_client = final_client.clone();
787 let (progress_tx, progress_rx) = tokio::sync::watch::channel(progress);
788 let task_group = task_group.clone();
789 let module_init = module_init.clone();
790 (
791 Box::pin(async move {
792 module_init
793 .recover(
794 final_client.clone(),
795 fed_id,
796 num_peers,
797 module_config.clone(),
798 db.clone(),
799 module_instance_id,
800 common_api_versions.core,
801 api_version,
802 root_secret.derive_module_secret(module_instance_id),
803 notifier.clone(),
804 api.clone(),
805 admin_auth,
806 snapshot.as_ref().and_then(|s| s.modules.get(&module_instance_id)),
807 progress_tx,
808 task_group,
809 )
810 .await
811 .inspect_err(|err| {
812 warn!(
813 target: LOG_CLIENT,
814 module_id = module_instance_id, %kind, err = %err.fmt_compact_anyhow(), "Module failed to recover"
815 );
816 })
817 }),
818 progress_rx,
819 )
820 };
821
822 let recovery = match init_state.does_require_recovery() {
823 Some(snapshot) => {
824 match db
825 .begin_transaction_nc()
826 .await
827 .get_value(&ClientModuleRecovery { module_instance_id })
828 .await
829 {
830 Some(module_recovery_state) => {
831 if module_recovery_state.is_done() {
832 debug!(
833 id = %module_instance_id,
834 %kind, "Module recovery already complete"
835 );
836 None
837 } else {
838 debug!(
839 id = %module_instance_id,
840 %kind,
841 progress = %module_recovery_state.progress,
842 "Starting module recovery with an existing progress"
843 );
844 Some(start_module_recover_fn(
845 snapshot,
846 module_recovery_state.progress,
847 ))
848 }
849 }
850 _ => {
851 let progress = RecoveryProgress::none();
852 let mut dbtx = db.begin_transaction().await;
853 dbtx.log_event(
854 log_ordering_wakeup_tx.clone(),
855 None,
856 ModuleRecoveryStarted::new(module_instance_id),
857 )
858 .await;
859 dbtx.insert_entry(
860 &ClientModuleRecovery { module_instance_id },
861 &ClientModuleRecoveryState { progress },
862 )
863 .await;
864
865 dbtx.commit_tx().await;
866
867 debug!(
868 id = %module_instance_id,
869 %kind, "Starting new module recovery"
870 );
871 Some(start_module_recover_fn(snapshot, progress))
872 }
873 }
874 }
875 _ => None,
876 };
877
878 match recovery {
879 Some((recovery, recovery_progress_rx)) => {
880 module_recoveries.insert(module_instance_id, recovery);
881 module_recovery_progress_receivers
882 .insert(module_instance_id, recovery_progress_rx);
883 }
884 _ => {
885 let module = module_init
886 .init(
887 final_client.clone(),
888 fed_id,
889 config.global.api_endpoints.len(),
890 module_config,
891 db.clone(),
892 module_instance_id,
893 common_api_versions.core,
894 api_version,
895 root_secret.derive_module_secret(module_instance_id),
902 notifier.clone(),
903 api.clone(),
904 self.admin_creds.as_ref().map(|cred| cred.auth.clone()),
905 task_group.clone(),
906 )
907 .await?;
908
909 if primary_module_instance.is_none() && module.supports_being_primary() {
910 primary_module_instance = Some(module_instance_id);
911 } else if primary_module_instance == Some(module_instance_id)
912 && !module.supports_being_primary()
913 {
914 bail!(
915 "Module instance {module_instance_id} of kind {kind} does not support being a primary module"
916 );
917 }
918
919 modules.register_module(module_instance_id, kind, module);
920 }
921 }
922 }
923 modules
924 };
925
926 if init_state.is_pending() && module_recoveries.is_empty() {
927 let mut dbtx = db.begin_transaction().await;
928 dbtx.insert_entry(&ClientInitStateKey, &init_state.into_complete())
929 .await;
930 dbtx.commit_tx().await;
931 }
932
933 let executor = {
934 let mut executor_builder = Executor::builder();
935 executor_builder
936 .with_module(TRANSACTION_SUBMISSION_MODULE_INSTANCE, TxSubmissionContext);
937
938 for (module_instance_id, _, module) in modules.iter_modules() {
939 executor_builder.with_module_dyn(module.context(module_instance_id));
940 }
941
942 for module_instance_id in module_recoveries.keys() {
943 executor_builder.with_valid_module_id(*module_instance_id);
944 }
945
946 executor_builder.build(
947 db.clone(),
948 notifier,
949 task_group.clone(),
950 log_ordering_wakeup_tx.clone(),
951 )
952 };
953
954 let recovery_receiver_init_val = module_recovery_progress_receivers
955 .iter()
956 .map(|(module_instance_id, rx)| (*module_instance_id, *rx.borrow()))
957 .collect::<BTreeMap<_, _>>();
958 let (client_recovery_progress_sender, client_recovery_progress_receiver) =
959 watch::channel(recovery_receiver_init_val);
960
961 let client_inner = Arc::new(Client {
962 final_client: final_client.clone(),
963 config: tokio::sync::RwLock::new(config.clone()),
964 api_secret,
965 decoders,
966 db: db.clone(),
967 federation_id: fed_id,
968 federation_config_meta: config.global.meta,
969 primary_module_instance: primary_module_instance
970 .ok_or(anyhow!("No primary module set or found"))?,
971 modules,
972 module_inits: self.module_inits.clone(),
973 log_ordering_wakeup_tx,
974 log_event_added_rx,
975 log_event_added_transient_tx: log_event_added_transient_tx.clone(),
976 request_hook,
977 executor,
978 api,
979 secp_ctx: Secp256k1::new(),
980 root_secret,
981 task_group,
982 operation_log: OperationLog::new(db.clone()),
983 client_recovery_progress_receiver,
984 meta_service: self.meta_service,
985 connector,
986 iroh_enable_dht: self.iroh_enable_dht,
987 iroh_enable_next: self.iroh_enable_next,
988 });
989 client_inner
990 .task_group
991 .spawn_cancellable("MetaService::update_continuously", {
992 let client_inner = client_inner.clone();
993 async move {
994 client_inner
995 .meta_service
996 .update_continuously(&client_inner)
997 .await;
998 }
999 });
1000
1001 client_inner.task_group.spawn_cancellable(
1002 "update-api-announcements",
1003 run_api_announcement_refresh_task(client_inner.clone()),
1004 );
1005
1006 client_inner.task_group.spawn_cancellable(
1007 "event log ordering task",
1008 run_event_log_ordering_task(
1009 db.clone(),
1010 log_ordering_wakeup_rx,
1011 log_event_added_tx,
1012 log_event_added_transient_tx,
1013 ),
1014 );
1015 let client_iface = std::sync::Arc::<Client>::downgrade(&client_inner);
1016
1017 let client_arc = ClientHandle::new(client_inner);
1018
1019 for (_, _, module) in client_arc.modules.iter_modules() {
1020 module.start().await;
1021 }
1022
1023 final_client.set(client_iface.clone());
1024
1025 if !module_recoveries.is_empty() {
1026 client_arc.spawn_module_recoveries_task(
1027 client_recovery_progress_sender,
1028 module_recoveries,
1029 module_recovery_progress_receivers,
1030 );
1031 }
1032
1033 Ok(client_arc)
1034 }
1035
1036 async fn load_init_state(db: &Database) -> InitState {
1037 let mut dbtx = db.begin_transaction_nc().await;
1038 dbtx.get_value(&ClientInitStateKey)
1039 .await
1040 .unwrap_or_else(|| {
1041 warn!(
1044 target: LOG_CLIENT,
1045 "Client missing ClientRequiresRecovery: assuming complete"
1046 );
1047 db::InitState::Complete(db::InitModeComplete::Fresh)
1048 })
1049 }
1050
1051 fn decoders(&self, config: &ClientConfig) -> ModuleDecoderRegistry {
1052 let mut decoders = client_decoders(
1053 &self.module_inits,
1054 config
1055 .modules
1056 .iter()
1057 .map(|(module_instance, module_config)| (*module_instance, module_config.kind())),
1058 );
1059
1060 decoders.register_module(
1061 TRANSACTION_SUBMISSION_MODULE_INSTANCE,
1062 ModuleKind::from_static_str("tx_submission"),
1063 tx_submission_sm_decoder(),
1064 );
1065
1066 decoders
1067 }
1068
1069 fn config_decoded(
1070 config: &ClientConfig,
1071 decoders: &ModuleDecoderRegistry,
1072 ) -> Result<ClientConfig, fedimint_core::encoding::DecodeError> {
1073 config.clone().redecode_raw(decoders)
1074 }
1075
1076 fn federation_root_secret(
1080 pre_root_secret: &DerivableSecret,
1081 config: &ClientConfig,
1082 ) -> DerivableSecret {
1083 pre_root_secret.federation_key(&config.global.calculate_federation_id())
1084 }
1085
1086 pub fn get_event_log_transient_receiver(&self) -> broadcast::Receiver<EventLogEntry> {
1088 self.log_event_added_transient_tx.subscribe()
1089 }
1090
1091 async fn migrate_pending_config_if_present(db: &Database) {
1095 if let Some(pending_config) = Client::get_pending_config_from_db(db).await {
1096 debug!(target: LOG_CLIENT, "Found pending client config, migrating to current config");
1097
1098 let mut dbtx = db.begin_transaction().await;
1099 dbtx.insert_entry(&crate::db::ClientConfigKey, &pending_config)
1101 .await;
1102 dbtx.remove_entry(&PendingClientConfigKey).await;
1104 dbtx.commit_tx().await;
1105
1106 debug!(target: LOG_CLIENT, "Successfully migrated pending config to current config");
1107 }
1108 }
1109
1110 fn load_and_refresh_client_config_static(
1113 config: &ClientConfig,
1114 api: &DynGlobalApi,
1115 db: &Database,
1116 task_group: &TaskGroup,
1117 ) {
1118 let config = config.clone();
1119 let api = api.clone();
1120 let db = db.clone();
1121 let task_group = task_group.clone();
1122
1123 task_group.spawn_cancellable("refresh_client_config_static", async move {
1125 Self::refresh_client_config_static(&config, &api, &db).await;
1126 });
1127 }
1128
1129 async fn refresh_client_config_static(
1131 config: &ClientConfig,
1132 api: &DynGlobalApi,
1133 db: &Database,
1134 ) {
1135 if let Err(error) = Self::refresh_client_config_static_try(config, api, db).await {
1136 warn!(
1137 target: LOG_CLIENT,
1138 err = %error.fmt_compact_anyhow(), "Failed to refresh client config"
1139 );
1140 }
1141 }
1142
1143 fn validate_config_update(
1145 current_config: &ClientConfig,
1146 new_config: &ClientConfig,
1147 ) -> anyhow::Result<()> {
1148 if current_config.global != new_config.global {
1150 bail!("Global configuration changes are not allowed in config updates");
1151 }
1152
1153 for (module_id, current_module_config) in ¤t_config.modules {
1155 match new_config.modules.get(module_id) {
1156 Some(new_module_config) => {
1157 if current_module_config != new_module_config {
1158 bail!(
1159 "Module {} configuration changes are not allowed, only additions are permitted",
1160 module_id
1161 );
1162 }
1163 }
1164 None => {
1165 bail!(
1166 "Module {} was removed in new config, only additions are allowed",
1167 module_id
1168 );
1169 }
1170 }
1171 }
1172
1173 Ok(())
1174 }
1175
1176 async fn refresh_client_config_static_try(
1178 current_config: &ClientConfig,
1179 api: &DynGlobalApi,
1180 db: &Database,
1181 ) -> anyhow::Result<()> {
1182 debug!(target: LOG_CLIENT, "Refreshing client config");
1183
1184 let fetched_config = api
1186 .request_current_consensus::<ClientConfig>(
1187 CLIENT_CONFIG_ENDPOINT.to_owned(),
1188 ApiRequestErased::default(),
1189 )
1190 .await?;
1191
1192 Self::validate_config_update(current_config, &fetched_config)?;
1194
1195 if current_config != &fetched_config {
1197 debug!(target: LOG_CLIENT, "Detected federation config change, saving as pending config");
1198
1199 let mut dbtx = db.begin_transaction().await;
1200 dbtx.insert_entry(&PendingClientConfigKey, &fetched_config)
1201 .await;
1202 dbtx.commit_tx().await;
1203 } else {
1204 debug!(target: LOG_CLIENT, "No federation config changes detected");
1205 }
1206
1207 Ok(())
1208 }
1209}
1210
1211pub struct ClientPreview {
1212 inner: ClientBuilder,
1213 config: ClientConfig,
1214 api_secret: Option<String>,
1215 prefetch_api_announcements: Option<Jit<Vec<PeersSignedApiAnnouncements>>>,
1216 preview_prefetch_api_version_set:
1217 Option<JitTryAnyhow<BTreeMap<PeerId, SupportedApiVersionsSummary>>>,
1218}
1219
1220impl ClientPreview {
1221 pub fn config(&self) -> &ClientConfig {
1223 &self.config
1224 }
1225
1226 pub async fn join(
1303 self,
1304 db_no_decoders: Database,
1305 pre_root_secret: RootSecret,
1306 ) -> anyhow::Result<ClientHandle> {
1307 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1308
1309 let client = self
1310 .inner
1311 .init(
1312 db_no_decoders,
1313 pre_root_secret,
1314 self.config,
1315 self.api_secret,
1316 InitMode::Fresh,
1317 self.prefetch_api_announcements,
1318 self.preview_prefetch_api_version_set,
1319 )
1320 .await?;
1321
1322 Ok(client)
1323 }
1324
1325 pub async fn recover(
1337 self,
1338 db_no_decoders: Database,
1339 pre_root_secret: RootSecret,
1340 backup: Option<ClientBackup>,
1341 ) -> anyhow::Result<ClientHandle> {
1342 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1343
1344 let client = self
1345 .inner
1346 .init(
1347 db_no_decoders,
1348 pre_root_secret,
1349 self.config,
1350 self.api_secret,
1351 InitMode::Recover {
1352 snapshot: backup.clone(),
1353 },
1354 self.prefetch_api_announcements,
1355 self.preview_prefetch_api_version_set,
1356 )
1357 .await?;
1358
1359 Ok(client)
1360 }
1361
1362 pub async fn download_backup_from_federation(
1364 &self,
1365 pre_root_secret: RootSecret,
1366 ) -> anyhow::Result<Option<ClientBackup>> {
1367 let pre_root_secret = pre_root_secret.to_inner(self.config.calculate_federation_id());
1368 let api = DynGlobalApi::from_endpoints(
1369 self.config
1371 .global
1372 .api_endpoints
1373 .iter()
1374 .map(|(peer_id, peer_url)| (*peer_id, peer_url.url.clone())),
1375 &self.api_secret,
1376 self.inner.iroh_enable_dht,
1377 self.inner.iroh_enable_next,
1378 )
1379 .await?;
1380
1381 Client::download_backup_from_federation_static(
1382 &api,
1383 &ClientBuilder::federation_root_secret(&pre_root_secret, &self.config),
1384 &self.inner.decoders(&self.config),
1385 )
1386 .await
1387 }
1388}