1use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use tracing::warn;
13
14use bsv::wallet::interfaces::{
15 AbortActionArgs, AbortActionResult, ListActionsArgs, ListActionsResult, ListCertificatesArgs,
16 ListCertificatesResult, ListOutputsArgs, ListOutputsResult, RelinquishCertificateArgs,
17 RelinquishOutputArgs,
18};
19
20use crate::error::{WalletError, WalletResult};
21use crate::services::traits::WalletServices;
22use crate::status::TransactionStatus;
23use crate::storage::action_types::{
24 StorageCreateActionArgs, StorageCreateActionResult, StorageInternalizeActionArgs,
25 StorageInternalizeActionResult, StorageProcessActionArgs, StorageProcessActionResult,
26};
27use crate::storage::find_args::{
28 FindCertificateFieldsArgs, FindCertificatesArgs, FindOutputBasketsArgs, FindOutputsArgs,
29 FindProvenTxReqsArgs, FindProvenTxsArgs, FindTransactionsArgs, OutputPartial, ProvenTxPartial,
30 ProvenTxReqPartial, PurgeParams, TransactionPartial,
31};
32use crate::storage::sync::request_args::{RequestSyncChunkArgs, SyncChunkOffset};
33use crate::storage::sync::sync_map::SyncMap;
34use crate::storage::sync::{ProcessSyncChunkResult, SyncChunk};
35use crate::storage::traits::wallet_provider::WalletStorageProvider;
36use crate::tables::SyncState;
37use crate::tables::{
38 Certificate, MonitorEvent, Output, OutputBasket, ProvenTx, ProvenTxReq, Settings, Transaction,
39 User,
40};
41use crate::wallet::types::{
42 AdminStatsResult, AuthId, ReproveHeaderResult, ReproveProvenResult, ReproveProvenUpdate,
43 WalletStorageInfo,
44};
45
46pub fn make_request_sync_chunk_args(
60 sync_state: &SyncState,
61 identity_key: &str,
62 writer_storage_identity_key: &str,
63) -> WalletResult<RequestSyncChunkArgs> {
64 let sync_map: SyncMap = if sync_state.sync_map.is_empty() || sync_state.sync_map == "{}" {
68 SyncMap::new()
69 } else {
70 serde_json::from_str(&sync_state.sync_map).map_err(|e| {
71 WalletError::Internal(format!(
72 "make_request_sync_chunk_args: failed to parse sync_map JSON: {e}"
73 ))
74 })?
75 };
76
77 let offsets = vec![
81 SyncChunkOffset {
82 name: sync_map.proven_tx.entity_name.clone(),
83 offset: sync_map.proven_tx.count,
84 },
85 SyncChunkOffset {
86 name: sync_map.output_basket.entity_name.clone(),
87 offset: sync_map.output_basket.count,
88 },
89 SyncChunkOffset {
90 name: sync_map.transaction.entity_name.clone(),
91 offset: sync_map.transaction.count,
92 },
93 SyncChunkOffset {
94 name: sync_map.output.entity_name.clone(),
95 offset: sync_map.output.count,
96 },
97 SyncChunkOffset {
98 name: sync_map.tx_label.entity_name.clone(),
99 offset: sync_map.tx_label.count,
100 },
101 SyncChunkOffset {
102 name: sync_map.tx_label_map.entity_name.clone(),
103 offset: sync_map.tx_label_map.count,
104 },
105 SyncChunkOffset {
106 name: sync_map.output_tag.entity_name.clone(),
107 offset: sync_map.output_tag.count,
108 },
109 SyncChunkOffset {
110 name: sync_map.output_tag_map.entity_name.clone(),
111 offset: sync_map.output_tag_map.count,
112 },
113 SyncChunkOffset {
114 name: sync_map.certificate.entity_name.clone(),
115 offset: sync_map.certificate.count,
116 },
117 SyncChunkOffset {
118 name: sync_map.certificate_field.entity_name.clone(),
119 offset: sync_map.certificate_field.count,
120 },
121 SyncChunkOffset {
122 name: sync_map.commission.entity_name.clone(),
123 offset: sync_map.commission.count,
124 },
125 SyncChunkOffset {
126 name: sync_map.proven_tx_req.entity_name.clone(),
127 offset: sync_map.proven_tx_req.count,
128 },
129 ];
130
131 Ok(RequestSyncChunkArgs {
132 from_storage_identity_key: sync_state.storage_identity_key.clone(),
134 to_storage_identity_key: writer_storage_identity_key.to_string(),
136 identity_key: identity_key.to_string(),
137 since: sync_state.when,
138 max_rough_size: 10_000_000,
139 max_items: 1000,
140 offsets,
141 })
142}
143
144struct ManagedStorage {
152 storage: Arc<dyn WalletStorageProvider>,
154 is_storage_provider: bool,
156 is_available: AtomicBool,
158 settings: tokio::sync::Mutex<Option<Settings>>,
160 user: tokio::sync::Mutex<Option<User>>,
162}
163
164impl ManagedStorage {
165 fn new(storage: Arc<dyn WalletStorageProvider>) -> Self {
166 let is_sp = storage.is_storage_provider();
167 Self {
168 storage,
169 is_storage_provider: is_sp,
170 is_available: AtomicBool::new(false),
171 settings: tokio::sync::Mutex::new(None),
172 user: tokio::sync::Mutex::new(None),
173 }
174 }
175
176 async fn get_settings_cached(&self) -> Option<Settings> {
178 self.settings.lock().await.clone()
179 }
180
181 async fn get_user_cached(&self) -> Option<User> {
183 self.user.lock().await.clone()
184 }
185}
186
187struct ManagerState {
195 stores: Vec<ManagedStorage>,
197 active_index: Option<usize>,
199 backup_indices: Vec<usize>,
201 conflicting_active_indices: Vec<usize>,
203}
204
205impl ManagerState {
206 fn new(stores: Vec<ManagedStorage>) -> Self {
207 Self {
208 stores,
209 active_index: None,
210 backup_indices: Vec::new(),
211 conflicting_active_indices: Vec::new(),
212 }
213 }
214
215 fn require_active(&self) -> WalletResult<usize> {
217 self.active_index.ok_or_else(|| {
218 WalletError::InvalidOperation(
219 "WalletStorageManager not yet available — call make_available() first".to_string(),
220 )
221 })
222 }
223}
224
225pub struct WalletStorageManager {
245 state: tokio::sync::Mutex<ManagerState>,
247 identity_key: String,
249 initial_active: Option<Arc<dyn WalletStorageProvider>>,
252 initial_backups: Vec<Arc<dyn WalletStorageProvider>>,
255 pub(crate) reader_lock: tokio::sync::Mutex<()>,
257 pub(crate) writer_lock: tokio::sync::Mutex<()>,
259 pub(crate) sync_lock: tokio::sync::Mutex<()>,
261 pub(crate) sp_lock: tokio::sync::Mutex<()>,
263 is_available_flag: AtomicBool,
265 services: tokio::sync::Mutex<Option<Arc<dyn WalletServices>>>,
267}
268
269impl WalletStorageManager {
270 pub fn new(
279 identity_key: String,
280 active: Option<Arc<dyn WalletStorageProvider>>,
281 backups: Vec<Arc<dyn WalletStorageProvider>>,
282 ) -> Self {
283 let initial_active = active.clone();
284 let initial_backups = backups.clone();
285
286 let mut stores = Vec::with_capacity(active.is_some() as usize + backups.len());
287 if let Some(a) = active {
288 stores.push(ManagedStorage::new(a));
289 }
290 for b in backups {
291 stores.push(ManagedStorage::new(b));
292 }
293
294 Self {
295 state: tokio::sync::Mutex::new(ManagerState::new(stores)),
296 identity_key,
297 initial_active,
298 initial_backups,
299 reader_lock: tokio::sync::Mutex::new(()),
300 writer_lock: tokio::sync::Mutex::new(()),
301 sync_lock: tokio::sync::Mutex::new(()),
302 sp_lock: tokio::sync::Mutex::new(()),
303 is_available_flag: AtomicBool::new(false),
304 services: tokio::sync::Mutex::new(None),
305 }
306 }
307
308 pub fn get_user_identity_key(&self) -> &str {
314 &self.identity_key
315 }
316
317 pub fn auth_id(&self) -> &str {
319 &self.identity_key
320 }
321
322 pub fn is_available(&self) -> bool {
324 self.is_available_flag.load(Ordering::Acquire)
325 }
326
327 pub fn is_storage_provider(&self) -> bool {
331 false
332 }
333
334 pub fn active(&self) -> Option<&Arc<dyn WalletStorageProvider>> {
340 self.initial_active.as_ref()
341 }
342
343 pub fn backups(&self) -> &[Arc<dyn WalletStorageProvider>] {
347 &self.initial_backups
348 }
349
350 pub fn backup(&self) -> Option<&Arc<dyn WalletStorageProvider>> {
354 self.initial_backups.first()
355 }
356
357 pub fn has_backup(&self) -> bool {
359 !self.initial_backups.is_empty()
360 }
361
362 pub async fn can_make_available(&self) -> bool {
368 let state = self.state.lock().await;
369 !state.stores.is_empty()
370 }
371
372 pub async fn get_active_store(&self) -> WalletResult<String> {
374 let state = self.state.lock().await;
375 let idx = state.require_active()?;
376 let settings = state.stores[idx]
377 .get_settings_cached()
378 .await
379 .ok_or_else(|| {
380 WalletError::InvalidOperation("Active settings not cached".to_string())
381 })?;
382 Ok(settings.storage_identity_key)
383 }
384
385 pub async fn get_active_store_name(&self) -> WalletResult<String> {
387 let state = self.state.lock().await;
388 let idx = state.require_active()?;
389 let settings = state.stores[idx]
390 .get_settings_cached()
391 .await
392 .ok_or_else(|| {
393 WalletError::InvalidOperation("Active settings not cached".to_string())
394 })?;
395 Ok(settings.storage_name)
396 }
397
398 pub async fn get_active_settings(&self) -> WalletResult<Settings> {
400 let state = self.state.lock().await;
401 let idx = state.require_active()?;
402 state.stores[idx]
403 .get_settings_cached()
404 .await
405 .ok_or_else(|| WalletError::InvalidOperation("Active settings not cached".to_string()))
406 }
407
408 pub async fn get_settings(&self) -> WalletResult<Settings> {
410 self.get_active_settings().await
411 }
412
413 pub async fn get_active_user(&self) -> WalletResult<User> {
415 let state = self.state.lock().await;
416 let idx = state.require_active()?;
417 state.stores[idx]
418 .get_user_cached()
419 .await
420 .ok_or_else(|| WalletError::InvalidOperation("Active user not cached".to_string()))
421 }
422
423 pub async fn get_backup_stores(&self) -> Vec<String> {
425 let state = self.state.lock().await;
426 let mut result = Vec::with_capacity(state.backup_indices.len());
427 for &idx in &state.backup_indices {
428 if let Some(settings) = state.stores[idx].get_settings_cached().await {
429 result.push(settings.storage_identity_key);
430 }
431 }
432 result
433 }
434
435 pub async fn get_conflicting_stores(&self) -> Vec<String> {
437 let state = self.state.lock().await;
438 let mut result = Vec::with_capacity(state.conflicting_active_indices.len());
439 for &idx in &state.conflicting_active_indices {
440 if let Some(settings) = state.stores[idx].get_settings_cached().await {
441 result.push(settings.storage_identity_key);
442 }
443 }
444 result
445 }
446
447 pub async fn get_all_stores(&self) -> Vec<String> {
452 let state = self.state.lock().await;
453 let mut result = Vec::with_capacity(state.stores.len());
454 for store in &state.stores {
455 if let Some(settings) = store.get_settings_cached().await {
456 result.push(settings.storage_identity_key);
457 } else {
458 result.push(String::new());
459 }
460 }
461 result
462 }
463
464 pub async fn is_active_storage_provider(&self) -> WalletResult<bool> {
466 let state = self.state.lock().await;
467 let idx = state.require_active()?;
468 Ok(state.stores[idx].is_storage_provider)
469 }
470
471 pub async fn is_active_enabled(&self) -> bool {
474 let state = self.state.lock().await;
475 let Some(idx) = state.active_index else {
476 return false;
477 };
478 let settings = match state.stores[idx].get_settings_cached().await {
479 Some(s) => s,
480 None => return false,
481 };
482 let user = match state.stores[idx].get_user_cached().await {
483 Some(u) => u,
484 None => return false,
485 };
486 state.conflicting_active_indices.is_empty()
487 && settings.storage_identity_key == user.active_storage
488 }
489
490 pub async fn get_auth(&self, must_be_active: bool) -> WalletResult<AuthId> {
495 if must_be_active && !self.is_available() {
496 return Err(WalletError::NotActive(
497 "Storage manager is not yet available — call make_available() first".to_string(),
498 ));
499 }
500
501 let state = self.state.lock().await;
502 let user_id = if let Some(idx) = state.active_index {
503 state.stores[idx].get_user_cached().await.map(|u| u.user_id)
504 } else {
505 None
506 };
507
508 if must_be_active {
509 let Some(idx) = state.active_index else {
510 return Err(WalletError::NotActive(
511 "No active storage — call make_available() first".to_string(),
512 ));
513 };
514 let settings = state.stores[idx]
515 .get_settings_cached()
516 .await
517 .ok_or_else(|| WalletError::NotActive("Active settings not cached".to_string()))?;
518 let user = state.stores[idx]
519 .get_user_cached()
520 .await
521 .ok_or_else(|| WalletError::NotActive("Active user not cached".to_string()))?;
522 if !state.conflicting_active_indices.is_empty()
523 || settings.storage_identity_key != user.active_storage
524 {
525 return Err(WalletError::NotActive(
526 "Active storage is not enabled — conflicting stores or mismatched active_storage".to_string(),
527 ));
528 }
529 }
530
531 Ok(AuthId {
532 identity_key: self.identity_key.clone(),
533 user_id,
534 is_active: Some(self.is_available()),
535 })
536 }
537
538 pub async fn get_user_id(&self) -> WalletResult<i64> {
540 let auth = self.get_auth(false).await?;
541 auth.user_id.ok_or_else(|| {
542 WalletError::InvalidOperation(
543 "user_id not available — call make_available() first".to_string(),
544 )
545 })
546 }
547
548 pub async fn get_active(&self) -> WalletResult<Arc<dyn WalletStorageProvider>> {
556 let state = self.state.lock().await;
557 let idx = state.require_active()?;
558 Ok(state.stores[idx].storage.clone())
559 }
560
561 pub async fn make_available(&self) -> WalletResult<Settings> {
570 if self.is_available() {
572 return self.get_active_settings().await;
573 }
574
575 let _reader_guard = self.reader_lock.lock().await;
577
578 if self.is_available() {
580 return self.get_active_settings().await;
581 }
582
583 self.do_make_available().await
584 }
585
586 async fn do_make_available(&self) -> WalletResult<Settings> {
591 let providers: Vec<Arc<dyn WalletStorageProvider>> = {
595 let state = self.state.lock().await;
596 if state.stores.is_empty() {
597 return Err(WalletError::InvalidParameter {
598 parameter: "stores".to_string(),
599 must_be: "non-empty — provide at least one storage provider".to_string(),
600 });
601 }
602 state.stores.iter().map(|s| s.storage.clone()).collect()
603 };
604
605 let mut init_results: Vec<(Settings, User)> = Vec::with_capacity(providers.len());
608 for provider in &providers {
609 let settings = provider.make_available().await?;
610 let (user, _) = provider.find_or_insert_user(&self.identity_key).await?;
611 init_results.push((settings, user));
612 }
613
614 let mut state = self.state.lock().await;
616 for (i, (settings, user)) in init_results.into_iter().enumerate() {
617 *state.stores[i].settings.lock().await = Some(settings);
618 *state.stores[i].user.lock().await = Some(user);
619 state.stores[i].is_available.store(true, Ordering::Release);
620 }
621
622 let num_stores = state.stores.len();
624 state.active_index = Some(0);
625 state.backup_indices.clear();
626 state.conflicting_active_indices.clear();
627
628 for i in 1..num_stores {
629 let store_sik = {
630 let s = state.stores[i].settings.lock().await;
631 s.as_ref()
632 .map(|s| s.storage_identity_key.clone())
633 .unwrap_or_default()
634 };
635 let store_user_active = {
636 let u = state.stores[i].user.lock().await;
637 u.as_ref()
638 .map(|u| u.active_storage.clone())
639 .unwrap_or_default()
640 };
641
642 let current_active_enabled = {
644 let active_idx = state.active_index.unwrap();
645 let active_sik = {
646 let s = state.stores[active_idx].settings.lock().await;
647 s.as_ref()
648 .map(|s| s.storage_identity_key.clone())
649 .unwrap_or_default()
650 };
651 let active_user_active = {
652 let u = state.stores[active_idx].user.lock().await;
653 u.as_ref()
654 .map(|u| u.active_storage.clone())
655 .unwrap_or_default()
656 };
657 active_sik == active_user_active
658 };
659
660 if store_user_active == store_sik && !current_active_enabled {
661 let old_active = state.active_index.unwrap();
663 state.backup_indices.push(old_active);
664 state.active_index = Some(i);
665 } else {
666 state.backup_indices.push(i);
667 }
668 }
669
670 let active_sik = {
672 let active_idx = state.active_index.unwrap();
673 let s = state.stores[active_idx].settings.lock().await;
674 s.as_ref()
675 .map(|s| s.storage_identity_key.clone())
676 .unwrap_or_default()
677 };
678
679 let old_backups = std::mem::take(&mut state.backup_indices);
680 for backup_idx in old_backups {
681 let backup_user_active = {
682 let u = state.stores[backup_idx].user.lock().await;
683 u.as_ref()
684 .map(|u| u.active_storage.clone())
685 .unwrap_or_default()
686 };
687 if backup_user_active != active_sik {
688 state.conflicting_active_indices.push(backup_idx);
689 } else {
690 state.backup_indices.push(backup_idx);
691 }
692 }
693
694 self.is_available_flag.store(true, Ordering::Release);
696
697 let active_idx = state.active_index.unwrap();
699 let settings = state.stores[active_idx]
700 .settings
701 .lock()
702 .await
703 .clone()
704 .unwrap();
705 Ok(settings)
706 }
707
708 pub async fn acquire_reader(&self) -> WalletResult<tokio::sync::MutexGuard<'_, ()>> {
718 if !self.is_available() {
719 self.make_available().await?;
722 }
723 Ok(self.reader_lock.lock().await)
724 }
725
726 pub async fn acquire_writer(
730 &self,
731 ) -> WalletResult<(
732 tokio::sync::MutexGuard<'_, ()>,
733 tokio::sync::MutexGuard<'_, ()>,
734 )> {
735 if !self.is_available() {
736 self.make_available().await?;
737 }
738 let reader = self.reader_lock.lock().await;
739 let writer = self.writer_lock.lock().await;
740 Ok((reader, writer))
741 }
742
743 pub async fn acquire_sync(
747 &self,
748 ) -> WalletResult<(
749 tokio::sync::MutexGuard<'_, ()>,
750 tokio::sync::MutexGuard<'_, ()>,
751 tokio::sync::MutexGuard<'_, ()>,
752 )> {
753 if !self.is_available() {
754 self.make_available().await?;
755 }
756 let reader = self.reader_lock.lock().await;
757 let writer = self.writer_lock.lock().await;
758 let sync = self.sync_lock.lock().await;
759 Ok((reader, writer, sync))
760 }
761
762 pub async fn acquire_storage_provider(
766 &self,
767 ) -> WalletResult<(
768 tokio::sync::MutexGuard<'_, ()>,
769 tokio::sync::MutexGuard<'_, ()>,
770 tokio::sync::MutexGuard<'_, ()>,
771 tokio::sync::MutexGuard<'_, ()>,
772 )> {
773 if !self.is_available() {
774 self.make_available().await?;
775 }
776 let reader = self.reader_lock.lock().await;
777 let writer = self.writer_lock.lock().await;
778 let sync = self.sync_lock.lock().await;
779 let sp = self.sp_lock.lock().await;
780 Ok((reader, writer, sync, sp))
781 }
782
783 pub async fn destroy(&self) -> WalletResult<()> {
789 let state = self.state.lock().await;
790 for store in &state.stores {
791 store.storage.destroy().await?;
792 }
793 Ok(())
794 }
795
796 pub async fn get_services_ref(&self) -> WalletResult<Arc<dyn WalletServices>> {
798 self.services.lock().await.clone().ok_or_else(|| {
799 WalletError::NotImplemented("services not configured on manager".to_string())
800 })
801 }
802
803 pub async fn set_services(&self, services: Arc<dyn WalletServices>) {
805 *self.services.lock().await = Some(services);
806 }
807
808 pub async fn list_actions(
817 &self,
818 auth: &AuthId,
819 args: &ListActionsArgs,
820 ) -> WalletResult<ListActionsResult> {
821 let _rg = self.acquire_reader().await?;
822 let active = self.get_active().await?;
823 active.list_actions(auth, args).await
824 }
825
826 pub async fn list_outputs(
828 &self,
829 auth: &AuthId,
830 args: &ListOutputsArgs,
831 ) -> WalletResult<ListOutputsResult> {
832 let _rg = self.acquire_reader().await?;
833 let active = self.get_active().await?;
834 active.list_outputs(auth, args).await
835 }
836
837 pub async fn list_certificates(
839 &self,
840 auth: &AuthId,
841 args: &ListCertificatesArgs,
842 ) -> WalletResult<ListCertificatesResult> {
843 let _rg = self.acquire_reader().await?;
844 let active = self.get_active().await?;
845 active.list_certificates(auth, args).await
846 }
847
848 pub async fn find_certificates_auth(
850 &self,
851 auth: &AuthId,
852 args: &FindCertificatesArgs,
853 ) -> WalletResult<Vec<Certificate>> {
854 let _rg = self.acquire_reader().await?;
855 let active = self.get_active().await?;
856 active.find_certificates_auth(auth, args).await
857 }
858
859 pub async fn find_output_baskets_auth(
861 &self,
862 auth: &AuthId,
863 args: &FindOutputBasketsArgs,
864 ) -> WalletResult<Vec<OutputBasket>> {
865 let _rg = self.acquire_reader().await?;
866 let active = self.get_active().await?;
867 active.find_output_baskets_auth(auth, args).await
868 }
869
870 pub async fn find_outputs_auth(
872 &self,
873 auth: &AuthId,
874 args: &FindOutputsArgs,
875 ) -> WalletResult<Vec<Output>> {
876 let _rg = self.acquire_reader().await?;
877 let active = self.get_active().await?;
878 active.find_outputs_auth(auth, args).await
879 }
880
881 pub async fn find_proven_tx_reqs(
883 &self,
884 args: &FindProvenTxReqsArgs,
885 ) -> WalletResult<Vec<ProvenTxReq>> {
886 let _rg = self.acquire_reader().await?;
887 let active = self.get_active().await?;
888 active.find_proven_tx_reqs(args).await
889 }
890
891 pub async fn abort_action(
901 &self,
902 auth: &AuthId,
903 args: &AbortActionArgs,
904 ) -> WalletResult<AbortActionResult> {
905 let (_rg, _wg) = self.acquire_writer().await?;
906 let active = self.get_active().await?;
907 active.abort_action(auth, args).await
908 }
909
910 pub async fn create_action(
912 &self,
913 auth: &AuthId,
914 args: &StorageCreateActionArgs,
915 ) -> WalletResult<StorageCreateActionResult> {
916 let (_rg, _wg) = self.acquire_writer().await?;
917 let active = self.get_active().await?;
918 active.create_action(auth, args).await
919 }
920
921 pub async fn process_action(
923 &self,
924 auth: &AuthId,
925 args: &StorageProcessActionArgs,
926 ) -> WalletResult<StorageProcessActionResult> {
927 let (_rg, _wg) = self.acquire_writer().await?;
928 let active = self.get_active().await?;
929 active.process_action(auth, args).await
930 }
931
932 pub async fn internalize_action(
934 &self,
935 auth: &AuthId,
936 args: &StorageInternalizeActionArgs,
937 services: &dyn WalletServices,
938 ) -> WalletResult<StorageInternalizeActionResult> {
939 let (_rg, _wg) = self.acquire_writer().await?;
940 let active = self.get_active().await?;
941 active.internalize_action(auth, args, services).await
942 }
943
944 pub async fn insert_certificate_auth(
946 &self,
947 auth: &AuthId,
948 certificate: &Certificate,
949 ) -> WalletResult<i64> {
950 let (_rg, _wg) = self.acquire_writer().await?;
951 let active = self.get_active().await?;
952 active.insert_certificate_auth(auth, certificate).await
953 }
954
955 pub async fn relinquish_certificate(
957 &self,
958 auth: &AuthId,
959 args: &RelinquishCertificateArgs,
960 ) -> WalletResult<i64> {
961 let (_rg, _wg) = self.acquire_writer().await?;
962 let active = self.get_active().await?;
963 active.relinquish_certificate(auth, args).await
964 }
965
966 pub async fn relinquish_output(
968 &self,
969 auth: &AuthId,
970 args: &RelinquishOutputArgs,
971 ) -> WalletResult<i64> {
972 let (_rg, _wg) = self.acquire_writer().await?;
973 let active = self.get_active().await?;
974 active.relinquish_output(auth, args).await
975 }
976
977 pub async fn find_or_insert_user(&self, identity_key: &str) -> WalletResult<(User, bool)> {
982 let active = self.get_active().await?;
983 let result = active.find_or_insert_user(identity_key).await?;
984
985 if let Ok(cached_auth) = self.get_auth(false).await {
989 if let Some(cached_id) = cached_auth.user_id {
990 if result.0.user_id != cached_id {
991 return Err(WalletError::Internal(
992 "find_or_insert_user: returned user_id does not match cached user_id — identity consistency violation".to_string(),
993 ));
994 }
995 }
996 }
997
998 Ok(result)
999 }
1000
1001 pub async fn get_sync_chunk(&self, args: &RequestSyncChunkArgs) -> WalletResult<SyncChunk> {
1002 let active = self.get_active().await?;
1003 active.get_sync_chunk(args).await
1004 }
1005
1006 pub async fn process_sync_chunk(
1007 &self,
1008 args: &RequestSyncChunkArgs,
1009 chunk: &SyncChunk,
1010 ) -> WalletResult<ProcessSyncChunkResult> {
1011 let active = self.get_active().await?;
1012 active.process_sync_chunk(args, chunk).await
1013 }
1014
1015 pub async fn find_user_by_identity_key(&self, key: &str) -> WalletResult<Option<User>> {
1020 let active = self.get_active().await?;
1021 active.find_user_by_identity_key(key).await
1022 }
1023
1024 pub async fn find_certificates_storage(
1025 &self,
1026 args: &FindCertificatesArgs,
1027 ) -> WalletResult<Vec<Certificate>> {
1028 let active = self.get_active().await?;
1029 active.find_certificates_storage(args).await
1030 }
1031
1032 pub async fn find_certificate_fields(
1033 &self,
1034 args: &FindCertificateFieldsArgs,
1035 ) -> WalletResult<Vec<crate::tables::CertificateField>> {
1036 let active = self.get_active().await?;
1037 active.find_certificate_fields(args).await
1038 }
1039
1040 pub async fn find_outputs_storage(&self, args: &FindOutputsArgs) -> WalletResult<Vec<Output>> {
1041 let active = self.get_active().await?;
1042 active.find_outputs_storage(args).await
1043 }
1044
1045 pub async fn find_outputs(&self, args: &FindOutputsArgs) -> WalletResult<Vec<Output>> {
1047 self.find_outputs_storage(args).await
1048 }
1049
1050 pub async fn update_output(&self, id: i64, update: &OutputPartial) -> WalletResult<i64> {
1052 let active = self.get_active().await?;
1053 active.update_output(id, update).await
1054 }
1055
1056 pub async fn insert_certificate_storage(&self, cert: &Certificate) -> WalletResult<i64> {
1057 let active = self.get_active().await?;
1058 active.insert_certificate_storage(cert).await
1059 }
1060
1061 pub async fn insert_certificate_field_storage(
1062 &self,
1063 field: &crate::tables::CertificateField,
1064 ) -> WalletResult<()> {
1065 let active = self.get_active().await?;
1066 active.insert_certificate_field_storage(field).await
1067 }
1068
1069 pub async fn find_proven_txs(&self, args: &FindProvenTxsArgs) -> WalletResult<Vec<ProvenTx>> {
1078 let active = self.get_active().await?;
1079 active.find_proven_txs(args).await
1080 }
1081
1082 pub async fn find_transactions(
1083 &self,
1084 args: &FindTransactionsArgs,
1085 ) -> WalletResult<Vec<Transaction>> {
1086 let active = self.get_active().await?;
1087 active.find_transactions(args).await
1088 }
1089
1090 pub async fn update_proven_tx_req(
1091 &self,
1092 id: i64,
1093 update: &ProvenTxReqPartial,
1094 ) -> WalletResult<i64> {
1095 let active = self.get_active().await?;
1096 active.update_proven_tx_req(id, update).await
1097 }
1098
1099 pub async fn update_proven_tx(&self, id: i64, update: &ProvenTxPartial) -> WalletResult<i64> {
1100 let active = self.get_active().await?;
1101 active.update_proven_tx(id, update).await
1102 }
1103
1104 pub async fn update_transaction(
1105 &self,
1106 id: i64,
1107 update: &TransactionPartial,
1108 ) -> WalletResult<i64> {
1109 let active = self.get_active().await?;
1110 active.update_transaction(id, update).await
1111 }
1112
1113 pub async fn update_transaction_status(
1114 &self,
1115 txid: &str,
1116 new_status: TransactionStatus,
1117 ) -> WalletResult<()> {
1118 let active = self.get_active().await?;
1119 active.update_transaction_status(txid, new_status).await
1120 }
1121
1122 pub async fn update_proven_tx_req_with_new_proven_tx(
1123 &self,
1124 req_id: i64,
1125 proven_tx: &ProvenTx,
1126 ) -> WalletResult<i64> {
1127 let active = self.get_active().await?;
1128 active
1129 .update_proven_tx_req_with_new_proven_tx(req_id, proven_tx)
1130 .await
1131 }
1132
1133 pub async fn insert_monitor_event(&self, event: &MonitorEvent) -> WalletResult<i64> {
1134 let active = self.get_active().await?;
1135 active.insert_monitor_event(event).await
1136 }
1137
1138 pub async fn admin_stats(&self, auth_id: &str) -> WalletResult<AdminStatsResult> {
1139 let active = self.get_active().await?;
1140 active.admin_stats(auth_id).await
1141 }
1142
1143 pub async fn purge_data(&self, params: &PurgeParams) -> WalletResult<String> {
1144 let active = self.get_active().await?;
1145 active.purge_data(params).await
1146 }
1147
1148 pub async fn review_status(&self, aged_limit: chrono::NaiveDateTime) -> WalletResult<String> {
1149 let active = self.get_active().await?;
1150 active.review_status(aged_limit).await
1151 }
1152
1153 pub async fn get_storage_identity_key(&self) -> WalletResult<String> {
1154 let active = self.get_active().await?;
1155 active.get_storage_identity_key().await
1156 }
1157
1158 pub async fn get_stores(&self) -> Vec<WalletStorageInfo> {
1167 let state = self.state.lock().await;
1168 let mut result = Vec::with_capacity(state.stores.len());
1169
1170 let is_active_enabled = {
1171 if let Some(idx) = state.active_index {
1172 let sik = state.stores[idx]
1173 .settings
1174 .lock()
1175 .await
1176 .as_ref()
1177 .map(|s| s.storage_identity_key.clone())
1178 .unwrap_or_default();
1179 let user_active = state.stores[idx]
1180 .user
1181 .lock()
1182 .await
1183 .as_ref()
1184 .map(|u| u.active_storage.clone())
1185 .unwrap_or_default();
1186 state.conflicting_active_indices.is_empty() && sik == user_active
1187 } else {
1188 false
1189 }
1190 };
1191
1192 for (i, store) in state.stores.iter().enumerate() {
1193 let settings_guard = store.settings.lock().await;
1194 let user_guard = store.user.lock().await;
1195
1196 let storage_identity_key = settings_guard
1197 .as_ref()
1198 .map(|s| s.storage_identity_key.clone())
1199 .unwrap_or_default();
1200 let storage_name = settings_guard
1201 .as_ref()
1202 .map(|s| s.storage_name.clone())
1203 .unwrap_or_default();
1204 let user_id = user_guard.as_ref().map(|u| u.user_id);
1205 let endpoint_url = store.storage.get_endpoint_url();
1206
1207 let is_active = state.active_index == Some(i);
1208 let is_backup = state.backup_indices.contains(&i);
1209 let is_conflicting = state.conflicting_active_indices.contains(&i);
1210 let is_enabled = is_active && is_active_enabled;
1212
1213 result.push(WalletStorageInfo {
1214 storage_identity_key,
1215 storage_name,
1216 user_id,
1217 is_active,
1218 is_enabled,
1219 is_backup,
1220 is_conflicting,
1221 endpoint_url,
1222 });
1223 }
1224
1225 result
1226 }
1227
1228 pub async fn get_store_endpoint_url(&self, storage_identity_key: &str) -> Option<String> {
1233 let state = self.state.lock().await;
1234 for store in &state.stores {
1235 let sik = store
1236 .settings
1237 .lock()
1238 .await
1239 .as_ref()
1240 .map(|s| s.storage_identity_key.clone())
1241 .unwrap_or_default();
1242 if sik == storage_identity_key {
1243 return store.storage.get_endpoint_url();
1244 }
1245 }
1246 None
1247 }
1248
1249 pub async fn reprove_proven(
1264 &self,
1265 ptx: &ProvenTx,
1266 no_update: bool,
1267 ) -> WalletResult<ReproveProvenResult> {
1268 let services = self.get_services_ref().await?;
1269
1270 let merkle_result = services.get_merkle_path(&ptx.txid, false).await;
1272
1273 let (merkle_path_bytes, header) = match (merkle_result.merkle_path, merkle_result.header) {
1275 (Some(mp), Some(hdr)) => (mp, hdr),
1276 _ => {
1277 return Ok(ReproveProvenResult {
1278 updated: None,
1279 unchanged: false,
1280 unavailable: true,
1281 });
1282 }
1283 };
1284
1285 let height = header.height as i32;
1287 let block_hash = header.hash.clone();
1288 let merkle_root = header.merkle_root.clone();
1289
1290 if ptx.height == height
1291 && ptx.block_hash == block_hash
1292 && ptx.merkle_root == merkle_root
1293 && ptx.merkle_path == merkle_path_bytes
1294 {
1295 return Ok(ReproveProvenResult {
1296 updated: None,
1297 unchanged: true,
1298 unavailable: false,
1299 });
1300 }
1301
1302 let mut updated_ptx = ptx.clone();
1304 updated_ptx.height = height;
1305 updated_ptx.block_hash = block_hash.clone();
1306 updated_ptx.merkle_root = merkle_root.clone();
1307 updated_ptx.merkle_path = merkle_path_bytes;
1308 let log_update = format!(
1314 "reproved txid={} old_block={} new_block={} height={}",
1315 ptx.txid, ptx.block_hash, block_hash, height
1316 );
1317
1318 if !no_update {
1320 let active = self.get_active().await?;
1321 let partial = ProvenTxPartial {
1322 height: Some(updated_ptx.height),
1323 block_hash: Some(updated_ptx.block_hash.clone()),
1324 ..Default::default()
1325 };
1326 active.update_proven_tx(ptx.proven_tx_id, &partial).await?;
1327 }
1328
1329 Ok(ReproveProvenResult {
1330 updated: Some(ReproveProvenUpdate {
1331 update: updated_ptx,
1332 log_update,
1333 }),
1334 unchanged: false,
1335 unavailable: false,
1336 })
1337 }
1338
1339 pub async fn reprove_header(
1348 &self,
1349 deactivated_hash: &str,
1350 ) -> WalletResult<ReproveHeaderResult> {
1351 let _guards = self.acquire_storage_provider().await?;
1353
1354 let proven_txs = {
1355 let active = self.get_active().await?;
1356 active
1357 .find_proven_txs(&FindProvenTxsArgs {
1358 partial: ProvenTxPartial {
1359 block_hash: Some(deactivated_hash.to_string()),
1360 ..Default::default()
1361 },
1362 ..Default::default()
1363 })
1364 .await?
1365 };
1366
1367 let mut updated_vec = Vec::new();
1368 let mut unchanged_vec = Vec::new();
1369 let mut unavailable_vec = Vec::new();
1370
1371 let mut updates_to_persist: Vec<(i64, ProvenTxPartial, ProvenTx)> = Vec::new();
1372
1373 for ptx in &proven_txs {
1374 let result = self.reprove_proven(ptx, true).await?;
1376
1377 if result.unavailable {
1378 unavailable_vec.push(ptx.clone());
1379 } else if result.unchanged {
1380 unchanged_vec.push(ptx.clone());
1381 } else if let Some(ru) = result.updated {
1382 let partial = ProvenTxPartial {
1383 height: Some(ru.update.height),
1384 block_hash: Some(ru.update.block_hash.clone()),
1385 ..Default::default()
1386 };
1387 updates_to_persist.push((ptx.proven_tx_id, partial, ru.update));
1388 }
1389 }
1390
1391 if !updates_to_persist.is_empty() {
1393 let active = self.get_active().await?;
1394 for (id, partial, updated_ptx) in updates_to_persist {
1395 active.update_proven_tx(id, &partial).await?;
1396 updated_vec.push(updated_ptx);
1397 }
1398 }
1399
1400 Ok(ReproveHeaderResult {
1401 updated: updated_vec,
1402 unchanged: unchanged_vec,
1403 unavailable: unavailable_vec,
1404 })
1405 }
1406
1407 pub async fn sync_to_writer(
1426 &self,
1427 writer: &dyn WalletStorageProvider,
1428 reader: &dyn WalletStorageProvider,
1429 writer_settings: &Settings,
1430 reader_settings: &Settings,
1431 prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1432 ) -> WalletResult<(i64, i64, String)> {
1433 let auth = self.get_auth(false).await?;
1434 let mut total_inserts: i64 = 0;
1435 let mut total_updates: i64 = 0;
1436 let mut log = String::new();
1437
1438 let mut i = 0usize;
1439 loop {
1440 let (ss, _) = writer
1442 .find_or_insert_sync_state_auth(
1443 &auth,
1444 &reader_settings.storage_identity_key,
1445 &reader_settings.storage_name,
1446 )
1447 .await?;
1448
1449 let args = make_request_sync_chunk_args(
1450 &ss,
1451 &self.identity_key,
1452 &writer_settings.storage_identity_key,
1453 )?;
1454
1455 let chunk = reader.get_sync_chunk(&args).await?;
1456 let r = writer.process_sync_chunk(&args, &chunk).await?;
1457
1458 total_inserts += r.inserts;
1459 total_updates += r.updates;
1460
1461 if let Some(cb) = prog_log {
1462 let msg = format!(
1463 "sync chunk {} inserts={} updates={}",
1464 i, r.inserts, r.updates
1465 );
1466 let s = cb(&msg);
1467 log.push_str(&s);
1468 log.push('\n');
1469 }
1470
1471 if r.done {
1472 break;
1473 }
1474 i += 1;
1475 }
1476
1477 Ok((total_inserts, total_updates, log))
1478 }
1479
1480 pub async fn sync_from_reader(
1490 &self,
1491 reader_identity_key: &str,
1492 writer: &dyn WalletStorageProvider,
1493 reader: &dyn WalletStorageProvider,
1494 writer_settings: &Settings,
1495 reader_settings: &Settings,
1496 prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1497 ) -> WalletResult<(i64, i64, String)> {
1498 if reader_identity_key != self.identity_key {
1499 return Err(WalletError::Unauthorized(
1500 "sync_from_reader: reader identity key does not match manager identity key"
1501 .to_string(),
1502 ));
1503 }
1504 let auth = self.get_auth(false).await?;
1505 let mut total_inserts: i64 = 0;
1506 let mut total_updates: i64 = 0;
1507 let mut log = String::new();
1508
1509 let writer_active_storage = {
1511 let state = self.state.lock().await;
1512 if let Some(idx) = state.active_index {
1513 state.stores[idx]
1514 .get_user_cached()
1515 .await
1516 .map(|u| u.active_storage)
1517 } else {
1518 None
1519 }
1520 };
1521
1522 let mut i = 0usize;
1523 loop {
1524 let (ss, _) = writer
1525 .find_or_insert_sync_state_auth(
1526 &auth,
1527 &reader_settings.storage_identity_key,
1528 &reader_settings.storage_name,
1529 )
1530 .await?;
1531
1532 let args = make_request_sync_chunk_args(
1533 &ss,
1534 &self.identity_key,
1535 &writer_settings.storage_identity_key,
1536 )?;
1537
1538 let mut chunk = reader.get_sync_chunk(&args).await?;
1539
1540 if let (Some(ref mut chunk_user), Some(ref writer_as)) =
1543 (&mut chunk.user, &writer_active_storage)
1544 {
1545 chunk_user.active_storage = writer_as.clone();
1546 }
1547
1548 let r = writer.process_sync_chunk(&args, &chunk).await?;
1549
1550 total_inserts += r.inserts;
1551 total_updates += r.updates;
1552
1553 if let Some(cb) = prog_log {
1554 let msg = format!(
1555 "sync chunk {} inserts={} updates={}",
1556 i, r.inserts, r.updates
1557 );
1558 let s = cb(&msg);
1559 log.push_str(&s);
1560 log.push('\n');
1561 }
1562
1563 if r.done {
1564 break;
1565 }
1566 i += 1;
1567 }
1568
1569 Ok((total_inserts, total_updates, log))
1570 }
1571
1572 pub async fn update_backups(
1580 &self,
1581 prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1582 ) -> WalletResult<(i64, i64, String)> {
1583 let _guards = self.acquire_sync().await?;
1585
1586 let (active_arc, active_settings, backup_arcs_and_settings) = {
1587 let state = self.state.lock().await;
1588 let idx = state.require_active()?;
1589 let active_arc = state.stores[idx].storage.clone();
1590 let active_settings =
1591 state.stores[idx]
1592 .get_settings_cached()
1593 .await
1594 .ok_or_else(|| {
1595 WalletError::InvalidOperation("Active settings not cached".to_string())
1596 })?;
1597
1598 let mut backups = Vec::with_capacity(state.backup_indices.len());
1599 for &bi in &state.backup_indices {
1600 let backup_arc = state.stores[bi].storage.clone();
1601 let backup_settings = state.stores[bi].get_settings_cached().await;
1602 backups.push((backup_arc, backup_settings));
1603 }
1604 (active_arc, active_settings, backups)
1605 };
1606
1607 let mut total_inserts: i64 = 0;
1608 let mut total_updates: i64 = 0;
1609 let mut full_log = String::new();
1610
1611 for (backup_arc, maybe_backup_settings) in &backup_arcs_and_settings {
1612 let backup_settings = match maybe_backup_settings {
1613 Some(s) => s.clone(),
1614 None => {
1615 warn!("update_backups: backup has no cached settings, skipping");
1616 continue;
1617 }
1618 };
1619
1620 match self
1621 .sync_to_writer(
1622 backup_arc.as_ref(),
1623 active_arc.as_ref(),
1624 &backup_settings,
1625 &active_settings,
1626 prog_log,
1627 )
1628 .await
1629 {
1630 Ok((ins, upd, log)) => {
1631 total_inserts += ins;
1632 total_updates += upd;
1633 full_log.push_str(&log);
1634 }
1635 Err(e) => {
1636 warn!(
1638 "update_backups: sync to backup '{}' failed: {e}",
1639 backup_settings.storage_identity_key
1640 );
1641 }
1642 }
1643 }
1644
1645 Ok((total_inserts, total_updates, full_log))
1646 }
1647
1648 pub async fn set_active(
1663 &self,
1664 storage_identity_key: &str,
1665 prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1666 ) -> WalletResult<String> {
1667 if !self.is_available() {
1669 self.make_available().await?;
1670 }
1671
1672 let new_active_idx = {
1674 let state = self.state.lock().await;
1675 let mut found = None;
1676 for (i, store) in state.stores.iter().enumerate() {
1677 let sik = store
1678 .get_settings_cached()
1679 .await
1680 .map(|s| s.storage_identity_key)
1681 .unwrap_or_default();
1682 if sik == storage_identity_key {
1683 found = Some(i);
1684 break;
1685 }
1686 }
1687 found
1688 };
1689
1690 let new_active_idx = new_active_idx.ok_or_else(|| WalletError::InvalidParameter {
1691 parameter: "storage_identity_key".to_string(),
1692 must_be: format!(
1693 "registered with this WalletStorageManager. {} does not match any managed store.",
1694 storage_identity_key
1695 ),
1696 })?;
1697
1698 let current_active_sik = self.get_active_store().await?;
1700 if current_active_sik == storage_identity_key && self.is_active_enabled().await {
1701 return Ok(format!("{} unchanged\n", storage_identity_key));
1702 }
1703
1704 let _guards = self.acquire_sync().await?;
1708
1709 let mut log = String::new();
1710
1711 let had_conflicts;
1714 let (new_active_arc, new_active_settings) = {
1715 let state = self.state.lock().await;
1716 let arc = state.stores[new_active_idx].storage.clone();
1717 let settings = state.stores[new_active_idx]
1718 .get_settings_cached()
1719 .await
1720 .ok_or_else(|| {
1721 WalletError::InvalidOperation(
1722 "set_active: new active settings not cached".to_string(),
1723 )
1724 })?;
1725 (arc, settings)
1726 };
1727
1728 let conflict_sources = {
1729 let state = self.state.lock().await;
1730 if state.conflicting_active_indices.is_empty() {
1731 had_conflicts = false;
1732 Vec::new()
1733 } else {
1734 had_conflicts = true;
1735 let mut sources = state.conflicting_active_indices.clone();
1737 if let Some(ai) = state.active_index {
1738 sources.push(ai);
1739 }
1740 sources.retain(|&i| i != new_active_idx);
1742
1743 let mut result = Vec::with_capacity(sources.len());
1745 for idx in sources {
1746 let arc = state.stores[idx].storage.clone();
1747 let settings = state.stores[idx].get_settings_cached().await;
1748 result.push((arc, settings));
1749 }
1750 result
1751 }
1752 };
1753
1754 for (conflict_arc, maybe_conflict_settings) in &conflict_sources {
1756 let conflict_settings = match maybe_conflict_settings {
1757 Some(s) => s.clone(),
1758 None => {
1759 warn!("set_active: conflict source has no cached settings, skipping");
1760 continue;
1761 }
1762 };
1763 let (ins, upd, chunk_log) = self
1765 .sync_to_writer(
1766 new_active_arc.as_ref(),
1767 conflict_arc.as_ref(),
1768 &new_active_settings,
1769 &conflict_settings,
1770 prog_log,
1771 )
1772 .await?;
1773 if let Some(cb) = prog_log {
1774 let msg = format!(
1775 "set_active: merged conflict {} inserts={} updates={}",
1776 conflict_settings.storage_identity_key, ins, upd
1777 );
1778 let s = cb(&msg);
1779 log.push_str(&s);
1780 log.push('\n');
1781 }
1782 log.push_str(&chunk_log);
1783 }
1784
1785 let (backup_source_arc, backup_source_settings, backup_source_user_id) = {
1789 let state = self.state.lock().await;
1790 if had_conflicts {
1791 let user_id = state.stores[new_active_idx]
1793 .get_user_cached()
1794 .await
1795 .map(|u| u.user_id)
1796 .ok_or_else(|| {
1797 WalletError::InvalidOperation(
1798 "set_active: new active user not cached".to_string(),
1799 )
1800 })?;
1801 (new_active_arc.clone(), new_active_settings.clone(), user_id)
1802 } else {
1803 let ai = state.require_active()?;
1805 let arc = state.stores[ai].storage.clone();
1806 let settings = state.stores[ai]
1807 .get_settings_cached()
1808 .await
1809 .ok_or_else(|| {
1810 WalletError::InvalidOperation(
1811 "set_active: current active settings not cached".to_string(),
1812 )
1813 })?;
1814 let user_id = state.stores[ai]
1815 .get_user_cached()
1816 .await
1817 .map(|u| u.user_id)
1818 .ok_or_else(|| {
1819 WalletError::InvalidOperation(
1820 "set_active: current active user not cached".to_string(),
1821 )
1822 })?;
1823 (arc, settings, user_id)
1824 }
1825 };
1826
1827 let auth = AuthId {
1830 identity_key: self.identity_key.clone(),
1831 user_id: Some(backup_source_user_id),
1832 is_active: Some(false),
1833 };
1834 backup_source_arc
1835 .set_active(&auth, storage_identity_key)
1836 .await?;
1837
1838 if let Some(cb) = prog_log {
1839 let msg = format!(
1840 "set_active: provider-level set_active on {} complete",
1841 backup_source_settings.storage_identity_key
1842 );
1843 let s = cb(&msg);
1844 log.push_str(&s);
1845 log.push('\n');
1846 }
1847
1848 {
1851 let state = self.state.lock().await;
1852 for store in &state.stores {
1853 let mut user_guard = store.user.lock().await;
1854 if let Some(ref mut u) = *user_guard {
1855 u.active_storage = storage_identity_key.to_string();
1856 }
1857 }
1858 }
1859
1860 let backup_source_sik = backup_source_settings.storage_identity_key.clone();
1862 let propagation_targets = {
1863 let state = self.state.lock().await;
1864 let mut targets = Vec::new();
1865 for store in &state.stores {
1866 let sik = store
1867 .get_settings_cached()
1868 .await
1869 .map(|s| s.storage_identity_key)
1870 .unwrap_or_default();
1871 if sik != backup_source_sik {
1872 let arc = store.storage.clone();
1873 let settings = store.get_settings_cached().await;
1874 targets.push((arc, settings));
1875 }
1876 }
1877 targets
1878 };
1879
1880 for (store_arc, maybe_store_settings) in &propagation_targets {
1881 let store_settings = match maybe_store_settings {
1882 Some(s) => s.clone(),
1883 None => {
1884 warn!("set_active: propagation target has no cached settings, skipping");
1885 continue;
1886 }
1887 };
1888 let (ins, upd, chunk_log) = self
1889 .sync_to_writer(
1890 store_arc.as_ref(),
1891 backup_source_arc.as_ref(),
1892 &store_settings,
1893 &backup_source_settings,
1894 prog_log,
1895 )
1896 .await?;
1897 if let Some(cb) = prog_log {
1898 let msg = format!(
1899 "set_active: propagated to {} inserts={} updates={}",
1900 store_settings.storage_identity_key, ins, upd
1901 );
1902 let s = cb(&msg);
1903 log.push_str(&s);
1904 log.push('\n');
1905 }
1906 log.push_str(&chunk_log);
1907 }
1908
1909 self.is_available_flag.store(false, Ordering::Release);
1913 self.do_make_available().await?;
1914
1915 if let Some(cb) = prog_log {
1916 let msg = format!(
1917 "set_active: complete, new active is {}",
1918 storage_identity_key
1919 );
1920 let s = cb(&msg);
1921 log.push_str(&s);
1922 log.push('\n');
1923 }
1924
1925 Ok(log)
1926 }
1927
1928 pub async fn add_wallet_storage_provider(
1936 &self,
1937 provider: Arc<dyn WalletStorageProvider>,
1938 ) -> WalletResult<()> {
1939 let _guards = self.acquire_storage_provider().await?;
1941
1942 {
1943 let mut state = self.state.lock().await;
1944 state.stores.push(ManagedStorage::new(provider));
1945 self.is_available_flag.store(false, Ordering::Release);
1947 }
1948
1949 self.do_make_available().await?;
1954
1955 Ok(())
1956 }
1957
1958 pub async fn find_output_baskets(
1966 &self,
1967 args: &FindOutputBasketsArgs,
1968 ) -> WalletResult<Vec<OutputBasket>> {
1969 let active = self.get_active().await?;
1970 active.find_output_baskets(args).await
1971 }
1972
1973 pub async fn insert_output_basket(&self, basket: &OutputBasket) -> WalletResult<i64> {
1975 let active = self.get_active().await?;
1976 active.insert_output_basket(basket).await
1977 }
1978
1979 pub async fn insert_transaction(&self, tx: &Transaction) -> WalletResult<i64> {
1981 let active = self.get_active().await?;
1982 active.insert_transaction(tx).await
1983 }
1984
1985 pub async fn insert_output(&self, output: &Output) -> WalletResult<i64> {
1987 let active = self.get_active().await?;
1988 active.insert_output(output).await
1989 }
1990}