Skip to main content

bsv_wallet_toolbox/storage/
manager.rs

1//! WalletStorageManager -- multi-provider storage coordinator with hierarchical locking.
2//!
3//! Implements the TS-parity manager architecture:
4//! - Multiple providers wrapped in `ManagedStorage` with cached settings/user
5//! - `make_available()` partitions stores into active/backups/conflicting_actives
6//! - Four-level hierarchical lock system (reader < writer < sync < storage_provider)
7//! - All writes route to active only; backups sync via `update_backups()` (Plan 02)
8
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11
12use tracing::warn;
13
14use bsv::wallet::interfaces::{
15    AbortActionArgs, AbortActionResult, ListActionsArgs, ListActionsResult, ListCertificatesArgs,
16    ListCertificatesResult, ListOutputsArgs, ListOutputsResult, RelinquishCertificateArgs,
17    RelinquishOutputArgs,
18};
19
20use crate::error::{WalletError, WalletResult};
21use crate::services::traits::WalletServices;
22use crate::status::TransactionStatus;
23use crate::storage::action_types::{
24    StorageCreateActionArgs, StorageCreateActionResult, StorageInternalizeActionArgs,
25    StorageInternalizeActionResult, StorageProcessActionArgs, StorageProcessActionResult,
26};
27use crate::storage::find_args::{
28    FindCertificateFieldsArgs, FindCertificatesArgs, FindOutputBasketsArgs, FindOutputsArgs,
29    FindProvenTxReqsArgs, FindProvenTxsArgs, FindTransactionsArgs, OutputPartial, ProvenTxPartial,
30    ProvenTxReqPartial, PurgeParams, TransactionPartial,
31};
32use crate::storage::sync::request_args::{RequestSyncChunkArgs, SyncChunkOffset};
33use crate::storage::sync::sync_map::SyncMap;
34use crate::storage::sync::{ProcessSyncChunkResult, SyncChunk};
35use crate::storage::traits::wallet_provider::WalletStorageProvider;
36use crate::tables::SyncState;
37use crate::tables::{
38    Certificate, MonitorEvent, Output, OutputBasket, ProvenTx, ProvenTxReq, Settings, Transaction,
39    User,
40};
41use crate::wallet::types::{
42    AdminStatsResult, AuthId, ReproveHeaderResult, ReproveProvenResult, ReproveProvenUpdate,
43    WalletStorageInfo,
44};
45
46// ---------------------------------------------------------------------------
47// make_request_sync_chunk_args -- free helper for sync loop
48// ---------------------------------------------------------------------------
49
50/// Build `RequestSyncChunkArgs` from an existing `SyncState`.
51///
52/// The `SyncState.sync_map` JSON is deserialized to extract per-entity
53/// offsets (count field) and the `when` timestamp becomes the `since` filter.
54///
55/// # Arguments
56/// * `sync_state` - The sync state record for this storage pair.
57/// * `identity_key` - The wallet owner's identity key.
58/// * `writer_storage_identity_key` - The destination storage's identity key.
59pub fn make_request_sync_chunk_args(
60    sync_state: &SyncState,
61    identity_key: &str,
62    writer_storage_identity_key: &str,
63) -> WalletResult<RequestSyncChunkArgs> {
64    // Parse the stored JSON sync_map to extract per-entity offsets.
65    // A newly-inserted SyncState has sync_map = "{}" which cannot deserialize
66    // into SyncMap (all fields required). Fall back to a fresh SyncMap in that case.
67    let sync_map: SyncMap = if sync_state.sync_map.is_empty() || sync_state.sync_map == "{}" {
68        SyncMap::new()
69    } else {
70        serde_json::from_str(&sync_state.sync_map).map_err(|e| {
71            WalletError::Internal(format!(
72                "make_request_sync_chunk_args: failed to parse sync_map JSON: {e}"
73            ))
74        })?
75    };
76
77    // Build offsets from each EntitySyncMap's count field.
78    // The `count` tracks cumulative items received in the current sync window
79    // — this IS the pagination offset, matching TS `ess.count`.
80    let offsets = vec![
81        SyncChunkOffset {
82            name: sync_map.proven_tx.entity_name.clone(),
83            offset: sync_map.proven_tx.count,
84        },
85        SyncChunkOffset {
86            name: sync_map.output_basket.entity_name.clone(),
87            offset: sync_map.output_basket.count,
88        },
89        SyncChunkOffset {
90            name: sync_map.transaction.entity_name.clone(),
91            offset: sync_map.transaction.count,
92        },
93        SyncChunkOffset {
94            name: sync_map.output.entity_name.clone(),
95            offset: sync_map.output.count,
96        },
97        SyncChunkOffset {
98            name: sync_map.tx_label.entity_name.clone(),
99            offset: sync_map.tx_label.count,
100        },
101        SyncChunkOffset {
102            name: sync_map.tx_label_map.entity_name.clone(),
103            offset: sync_map.tx_label_map.count,
104        },
105        SyncChunkOffset {
106            name: sync_map.output_tag.entity_name.clone(),
107            offset: sync_map.output_tag.count,
108        },
109        SyncChunkOffset {
110            name: sync_map.output_tag_map.entity_name.clone(),
111            offset: sync_map.output_tag_map.count,
112        },
113        SyncChunkOffset {
114            name: sync_map.certificate.entity_name.clone(),
115            offset: sync_map.certificate.count,
116        },
117        SyncChunkOffset {
118            name: sync_map.certificate_field.entity_name.clone(),
119            offset: sync_map.certificate_field.count,
120        },
121        SyncChunkOffset {
122            name: sync_map.commission.entity_name.clone(),
123            offset: sync_map.commission.count,
124        },
125        SyncChunkOffset {
126            name: sync_map.proven_tx_req.entity_name.clone(),
127            offset: sync_map.proven_tx_req.count,
128        },
129    ];
130
131    Ok(RequestSyncChunkArgs {
132        // from = the reader (sync_state records the reader's identity key)
133        from_storage_identity_key: sync_state.storage_identity_key.clone(),
134        // to = the writer we are syncing into
135        to_storage_identity_key: writer_storage_identity_key.to_string(),
136        identity_key: identity_key.to_string(),
137        since: sync_state.when,
138        max_rough_size: 10_000_000,
139        max_items: 1000,
140        offsets,
141    })
142}
143
144// ---------------------------------------------------------------------------
145// ManagedStorage -- wraps a single WalletStorageProvider with cached state
146// ---------------------------------------------------------------------------
147
148/// Per-provider wrapper that caches settings and user after `make_available`.
149///
150/// Private to this module — only `WalletStorageManager` creates these.
151struct ManagedStorage {
152    /// The underlying storage provider.
153    storage: Arc<dyn WalletStorageProvider>,
154    /// Set once at construction from `storage.is_storage_provider()`.
155    is_storage_provider: bool,
156    /// Becomes true after `make_available()` succeeds for this store.
157    is_available: AtomicBool,
158    /// Cached `Settings` populated during `make_available`.
159    settings: tokio::sync::Mutex<Option<Settings>>,
160    /// Cached `User` populated during `make_available`.
161    user: tokio::sync::Mutex<Option<User>>,
162}
163
164impl ManagedStorage {
165    fn new(storage: Arc<dyn WalletStorageProvider>) -> Self {
166        let is_sp = storage.is_storage_provider();
167        Self {
168            storage,
169            is_storage_provider: is_sp,
170            is_available: AtomicBool::new(false),
171            settings: tokio::sync::Mutex::new(None),
172            user: tokio::sync::Mutex::new(None),
173        }
174    }
175
176    /// Get a clone of the cached settings, if available.
177    async fn get_settings_cached(&self) -> Option<Settings> {
178        self.settings.lock().await.clone()
179    }
180
181    /// Get a clone of the cached user, if available.
182    async fn get_user_cached(&self) -> Option<User> {
183        self.user.lock().await.clone()
184    }
185}
186
187// ---------------------------------------------------------------------------
188// ManagerState -- partition state for active/backup/conflicting stores
189// ---------------------------------------------------------------------------
190
191/// Guards the partition state: which store is active, which are backups, etc.
192///
193/// Protected by a `tokio::sync::Mutex` inside `WalletStorageManager`.
194struct ManagerState {
195    /// All providers, with active (if any) first at construction.
196    stores: Vec<ManagedStorage>,
197    /// Index into `stores` for the currently-active provider.
198    active_index: Option<usize>,
199    /// Indices of backup stores (after active is determined).
200    backup_indices: Vec<usize>,
201    /// Indices of stores whose `user.active_storage` doesn't match active.
202    conflicting_active_indices: Vec<usize>,
203}
204
205impl ManagerState {
206    fn new(stores: Vec<ManagedStorage>) -> Self {
207        Self {
208            stores,
209            active_index: None,
210            backup_indices: Vec::new(),
211            conflicting_active_indices: Vec::new(),
212        }
213    }
214
215    /// Returns the active index, or an error if not yet partitioned.
216    fn require_active(&self) -> WalletResult<usize> {
217        self.active_index.ok_or_else(|| {
218            WalletError::InvalidOperation(
219                "WalletStorageManager not yet available — call make_available() first".to_string(),
220            )
221        })
222    }
223}
224
225// ---------------------------------------------------------------------------
226// WalletStorageManager -- public struct
227// ---------------------------------------------------------------------------
228
229/// Multi-provider storage manager with hierarchical locking.
230///
231/// Wraps one or more `WalletStorageProvider` implementations. On first use
232/// (or explicit `make_available()` call) partitions providers into
233/// active / backups / conflicting_actives based on stored user preferences.
234///
235/// # Lock hierarchy
236///
237/// Four `tokio::sync::Mutex<()>` must be acquired in strict ascending order:
238/// reader < writer < sync < storage_provider.
239///
240/// - `acquire_reader` — read-only operations
241/// - `acquire_writer` — mutations (create/process/abort action, etc.)
242/// - `acquire_sync` — sync loop operations
243/// - `acquire_storage_provider` — storage-provider-level operations
244pub struct WalletStorageManager {
245    /// Partition state — holds all ManagedStorage instances.
246    state: tokio::sync::Mutex<ManagerState>,
247    /// Wallet owner's identity key (compressed public key hex).
248    identity_key: String,
249    /// Original active provider (the first store passed to constructor).
250    /// Kept for sync access before make_available() completes.
251    initial_active: Option<Arc<dyn WalletStorageProvider>>,
252    /// Original backup providers (remaining stores passed to constructor).
253    /// Kept for sync access before make_available() completes.
254    initial_backups: Vec<Arc<dyn WalletStorageProvider>>,
255    /// Level-1 lock (outermost). All operations acquire this.
256    pub(crate) reader_lock: tokio::sync::Mutex<()>,
257    /// Level-2 lock. Writers + sync + sp acquire this.
258    pub(crate) writer_lock: tokio::sync::Mutex<()>,
259    /// Level-3 lock. Sync + sp acquire this.
260    pub(crate) sync_lock: tokio::sync::Mutex<()>,
261    /// Level-4 lock (innermost). Only sp-level operations acquire this.
262    pub(crate) sp_lock: tokio::sync::Mutex<()>,
263    /// True once `make_available()` completes successfully.
264    is_available_flag: AtomicBool,
265    /// Optional wallet services reference.
266    services: tokio::sync::Mutex<Option<Arc<dyn WalletServices>>>,
267}
268
269impl WalletStorageManager {
270    // -----------------------------------------------------------------------
271    // Constructor
272    // -----------------------------------------------------------------------
273
274    /// Create a new manager from an optional active provider and zero or more backups.
275    ///
276    /// Providers are stored in order: active first (if Some), then backups.
277    /// No partitioning happens at construction — call `make_available()` to initialize.
278    pub fn new(
279        identity_key: String,
280        active: Option<Arc<dyn WalletStorageProvider>>,
281        backups: Vec<Arc<dyn WalletStorageProvider>>,
282    ) -> Self {
283        let initial_active = active.clone();
284        let initial_backups = backups.clone();
285
286        let mut stores = Vec::with_capacity(active.is_some() as usize + backups.len());
287        if let Some(a) = active {
288            stores.push(ManagedStorage::new(a));
289        }
290        for b in backups {
291            stores.push(ManagedStorage::new(b));
292        }
293
294        Self {
295            state: tokio::sync::Mutex::new(ManagerState::new(stores)),
296            identity_key,
297            initial_active,
298            initial_backups,
299            reader_lock: tokio::sync::Mutex::new(()),
300            writer_lock: tokio::sync::Mutex::new(()),
301            sync_lock: tokio::sync::Mutex::new(()),
302            sp_lock: tokio::sync::Mutex::new(()),
303            is_available_flag: AtomicBool::new(false),
304            services: tokio::sync::Mutex::new(None),
305        }
306    }
307
308    // -----------------------------------------------------------------------
309    // Zero-lock sync accessors
310    // -----------------------------------------------------------------------
311
312    /// Returns the wallet owner's identity key.
313    pub fn get_user_identity_key(&self) -> &str {
314        &self.identity_key
315    }
316
317    /// Alias for `get_user_identity_key()`.
318    pub fn auth_id(&self) -> &str {
319        &self.identity_key
320    }
321
322    /// Returns `true` after `make_available()` has successfully completed.
323    pub fn is_available(&self) -> bool {
324        self.is_available_flag.load(Ordering::Acquire)
325    }
326
327    /// The manager itself is NOT a storage provider — always returns `false`.
328    ///
329    /// Matches TS `WalletStorageManager.isStorageProvider = false`.
330    pub fn is_storage_provider(&self) -> bool {
331        false
332    }
333
334    /// Returns the initial active provider (the first provider passed to the constructor).
335    ///
336    /// This is a sync accessor that returns the provider as passed at construction,
337    /// before `make_available()` determines the actual active via partition logic.
338    /// Primarily used for backward-compatibility in sync contexts (Wallet::new, signer setup).
339    pub fn active(&self) -> Option<&Arc<dyn WalletStorageProvider>> {
340        self.initial_active.as_ref()
341    }
342
343    /// Returns the initial backup providers (the remaining providers from the constructor).
344    ///
345    /// Sync accessor for backward-compatibility.
346    pub fn backups(&self) -> &[Arc<dyn WalletStorageProvider>] {
347        &self.initial_backups
348    }
349
350    /// Returns the first backup provider if exactly one backup was provided.
351    ///
352    /// Backward-compatibility alias for the old `backup()` method.
353    pub fn backup(&self) -> Option<&Arc<dyn WalletStorageProvider>> {
354        self.initial_backups.first()
355    }
356
357    /// Returns true if at least one backup provider was provided.
358    pub fn has_backup(&self) -> bool {
359        !self.initial_backups.is_empty()
360    }
361
362    // -----------------------------------------------------------------------
363    // Async state accessors (lock state mutex)
364    // -----------------------------------------------------------------------
365
366    /// Returns `true` if there is at least one provider registered.
367    pub async fn can_make_available(&self) -> bool {
368        let state = self.state.lock().await;
369        !state.stores.is_empty()
370    }
371
372    /// Returns the active store's `settings.storage_identity_key`.
373    pub async fn get_active_store(&self) -> WalletResult<String> {
374        let state = self.state.lock().await;
375        let idx = state.require_active()?;
376        let settings = state.stores[idx]
377            .get_settings_cached()
378            .await
379            .ok_or_else(|| {
380                WalletError::InvalidOperation("Active settings not cached".to_string())
381            })?;
382        Ok(settings.storage_identity_key)
383    }
384
385    /// Returns the active store's `settings.storage_name`.
386    pub async fn get_active_store_name(&self) -> WalletResult<String> {
387        let state = self.state.lock().await;
388        let idx = state.require_active()?;
389        let settings = state.stores[idx]
390            .get_settings_cached()
391            .await
392            .ok_or_else(|| {
393                WalletError::InvalidOperation("Active settings not cached".to_string())
394            })?;
395        Ok(settings.storage_name)
396    }
397
398    /// Returns a clone of the active store's cached `Settings`.
399    pub async fn get_active_settings(&self) -> WalletResult<Settings> {
400        let state = self.state.lock().await;
401        let idx = state.require_active()?;
402        state.stores[idx]
403            .get_settings_cached()
404            .await
405            .ok_or_else(|| WalletError::InvalidOperation("Active settings not cached".to_string()))
406    }
407
408    /// Alias for `get_active_settings()` (TS parity name: `getSettings`).
409    pub async fn get_settings(&self) -> WalletResult<Settings> {
410        self.get_active_settings().await
411    }
412
413    /// Returns a clone of the active store's cached `User`.
414    pub async fn get_active_user(&self) -> WalletResult<User> {
415        let state = self.state.lock().await;
416        let idx = state.require_active()?;
417        state.stores[idx]
418            .get_user_cached()
419            .await
420            .ok_or_else(|| WalletError::InvalidOperation("Active user not cached".to_string()))
421    }
422
423    /// Returns `storage_identity_key` strings for all backup stores.
424    pub async fn get_backup_stores(&self) -> Vec<String> {
425        let state = self.state.lock().await;
426        let mut result = Vec::with_capacity(state.backup_indices.len());
427        for &idx in &state.backup_indices {
428            if let Some(settings) = state.stores[idx].get_settings_cached().await {
429                result.push(settings.storage_identity_key);
430            }
431        }
432        result
433    }
434
435    /// Returns `storage_identity_key` strings for all conflicting active stores.
436    pub async fn get_conflicting_stores(&self) -> Vec<String> {
437        let state = self.state.lock().await;
438        let mut result = Vec::with_capacity(state.conflicting_active_indices.len());
439        for &idx in &state.conflicting_active_indices {
440            if let Some(settings) = state.stores[idx].get_settings_cached().await {
441                result.push(settings.storage_identity_key);
442            }
443        }
444        result
445    }
446
447    /// Returns `storage_identity_key` strings for ALL stores.
448    ///
449    /// Before `make_available()`, returns empty strings as placeholders so callers
450    /// can check the count without waiting for initialization.
451    pub async fn get_all_stores(&self) -> Vec<String> {
452        let state = self.state.lock().await;
453        let mut result = Vec::with_capacity(state.stores.len());
454        for store in &state.stores {
455            if let Some(settings) = store.get_settings_cached().await {
456                result.push(settings.storage_identity_key);
457            } else {
458                result.push(String::new());
459            }
460        }
461        result
462    }
463
464    /// Returns `true` if the active store's `is_storage_provider` flag is set.
465    pub async fn is_active_storage_provider(&self) -> WalletResult<bool> {
466        let state = self.state.lock().await;
467        let idx = state.require_active()?;
468        Ok(state.stores[idx].is_storage_provider)
469    }
470
471    /// Returns true if active.settings.storage_identity_key == active.user.active_storage
472    /// AND conflicting_active_indices is empty.
473    pub async fn is_active_enabled(&self) -> bool {
474        let state = self.state.lock().await;
475        let Some(idx) = state.active_index else {
476            return false;
477        };
478        let settings = match state.stores[idx].get_settings_cached().await {
479            Some(s) => s,
480            None => return false,
481        };
482        let user = match state.stores[idx].get_user_cached().await {
483            Some(u) => u,
484            None => return false,
485        };
486        state.conflicting_active_indices.is_empty()
487            && settings.storage_identity_key == user.active_storage
488    }
489
490    /// Builds an `AuthId` for the manager's identity key.
491    ///
492    /// If `must_be_active` is true and the manager is not yet available,
493    /// returns `WERR_NOT_ACTIVE`.
494    pub async fn get_auth(&self, must_be_active: bool) -> WalletResult<AuthId> {
495        if must_be_active && !self.is_available() {
496            return Err(WalletError::NotActive(
497                "Storage manager is not yet available — call make_available() first".to_string(),
498            ));
499        }
500
501        let state = self.state.lock().await;
502        let user_id = if let Some(idx) = state.active_index {
503            state.stores[idx].get_user_cached().await.map(|u| u.user_id)
504        } else {
505            None
506        };
507
508        if must_be_active {
509            let Some(idx) = state.active_index else {
510                return Err(WalletError::NotActive(
511                    "No active storage — call make_available() first".to_string(),
512                ));
513            };
514            let settings = state.stores[idx]
515                .get_settings_cached()
516                .await
517                .ok_or_else(|| WalletError::NotActive("Active settings not cached".to_string()))?;
518            let user = state.stores[idx]
519                .get_user_cached()
520                .await
521                .ok_or_else(|| WalletError::NotActive("Active user not cached".to_string()))?;
522            if !state.conflicting_active_indices.is_empty()
523                || settings.storage_identity_key != user.active_storage
524            {
525                return Err(WalletError::NotActive(
526                    "Active storage is not enabled — conflicting stores or mismatched active_storage".to_string(),
527                ));
528            }
529        }
530
531        Ok(AuthId {
532            identity_key: self.identity_key.clone(),
533            user_id,
534            is_active: Some(self.is_available()),
535        })
536    }
537
538    /// Returns the user_id from the active store's cached user.
539    pub async fn get_user_id(&self) -> WalletResult<i64> {
540        let auth = self.get_auth(false).await?;
541        auth.user_id.ok_or_else(|| {
542            WalletError::InvalidOperation(
543                "user_id not available — call make_available() first".to_string(),
544            )
545        })
546    }
547
548    // -----------------------------------------------------------------------
549    // get_active — returns the active WalletStorageProvider
550    // -----------------------------------------------------------------------
551
552    /// Returns a clone of the active `Arc<dyn WalletStorageProvider>`.
553    ///
554    /// Errors if `make_available()` has not been called.
555    pub async fn get_active(&self) -> WalletResult<Arc<dyn WalletStorageProvider>> {
556        let state = self.state.lock().await;
557        let idx = state.require_active()?;
558        Ok(state.stores[idx].storage.clone())
559    }
560
561    // -----------------------------------------------------------------------
562    // makeAvailable — initialize and partition stores
563    // -----------------------------------------------------------------------
564
565    /// Initialize all stores and partition them into active/backups/conflicting.
566    ///
567    /// Idempotent — returns cached active settings on subsequent calls.
568    /// Uses `reader_lock` as the idempotency guard to prevent concurrent re-init.
569    pub async fn make_available(&self) -> WalletResult<Settings> {
570        // Fast path: already available
571        if self.is_available() {
572            return self.get_active_settings().await;
573        }
574
575        // Acquire reader lock to serialize concurrent makeAvailable calls (Pitfall 3)
576        let _reader_guard = self.reader_lock.lock().await;
577
578        // Re-check after acquiring lock (another task may have finished first)
579        if self.is_available() {
580            return self.get_active_settings().await;
581        }
582
583        self.do_make_available().await
584    }
585
586    /// Internal make_available that runs under the reader lock.
587    ///
588    /// Separated from `make_available()` so `acquire_reader()` can avoid
589    /// re-acquiring the reader_lock that it is about to return to the caller.
590    async fn do_make_available(&self) -> WalletResult<Settings> {
591        // Collect storage Arcs under a brief state lock, then release so that
592        // network I/O (make_available, find_or_insert_user) does not block
593        // concurrent operations on the manager.
594        let providers: Vec<Arc<dyn WalletStorageProvider>> = {
595            let state = self.state.lock().await;
596            if state.stores.is_empty() {
597                return Err(WalletError::InvalidParameter {
598                    parameter: "stores".to_string(),
599                    must_be: "non-empty — provide at least one storage provider".to_string(),
600                });
601            }
602            state.stores.iter().map(|s| s.storage.clone()).collect()
603        };
604
605        // Step 1: Call make_available() + find_or_insert_user() on each store
606        // without holding the state lock.
607        let mut init_results: Vec<(Settings, User)> = Vec::with_capacity(providers.len());
608        for provider in &providers {
609            let settings = provider.make_available().await?;
610            let (user, _) = provider.find_or_insert_user(&self.identity_key).await?;
611            init_results.push((settings, user));
612        }
613
614        // Re-acquire the state lock to write results and partition.
615        let mut state = self.state.lock().await;
616        for (i, (settings, user)) in init_results.into_iter().enumerate() {
617            *state.stores[i].settings.lock().await = Some(settings);
618            *state.stores[i].user.lock().await = Some(user);
619            state.stores[i].is_available.store(true, Ordering::Release);
620        }
621
622        // Step 2: Partition stores — first store is default active
623        let num_stores = state.stores.len();
624        state.active_index = Some(0);
625        state.backup_indices.clear();
626        state.conflicting_active_indices.clear();
627
628        for i in 1..num_stores {
629            let store_sik = {
630                let s = state.stores[i].settings.lock().await;
631                s.as_ref()
632                    .map(|s| s.storage_identity_key.clone())
633                    .unwrap_or_default()
634            };
635            let store_user_active = {
636                let u = state.stores[i].user.lock().await;
637                u.as_ref()
638                    .map(|u| u.active_storage.clone())
639                    .unwrap_or_default()
640            };
641
642            // Check if current active is enabled (settings.sik == user.active_storage)
643            let current_active_enabled = {
644                let active_idx = state.active_index.unwrap();
645                let active_sik = {
646                    let s = state.stores[active_idx].settings.lock().await;
647                    s.as_ref()
648                        .map(|s| s.storage_identity_key.clone())
649                        .unwrap_or_default()
650                };
651                let active_user_active = {
652                    let u = state.stores[active_idx].user.lock().await;
653                    u.as_ref()
654                        .map(|u| u.active_storage.clone())
655                        .unwrap_or_default()
656                };
657                active_sik == active_user_active
658            };
659
660            if store_user_active == store_sik && !current_active_enabled {
661                // Swap: old active becomes backup, this store becomes new active
662                let old_active = state.active_index.unwrap();
663                state.backup_indices.push(old_active);
664                state.active_index = Some(i);
665            } else {
666                state.backup_indices.push(i);
667            }
668        }
669
670        // Step 3: Second pass — partition backups into plain backups vs conflicting_actives
671        let active_sik = {
672            let active_idx = state.active_index.unwrap();
673            let s = state.stores[active_idx].settings.lock().await;
674            s.as_ref()
675                .map(|s| s.storage_identity_key.clone())
676                .unwrap_or_default()
677        };
678
679        let old_backups = std::mem::take(&mut state.backup_indices);
680        for backup_idx in old_backups {
681            let backup_user_active = {
682                let u = state.stores[backup_idx].user.lock().await;
683                u.as_ref()
684                    .map(|u| u.active_storage.clone())
685                    .unwrap_or_default()
686            };
687            if backup_user_active != active_sik {
688                state.conflicting_active_indices.push(backup_idx);
689            } else {
690                state.backup_indices.push(backup_idx);
691            }
692        }
693
694        // Step 4: Mark manager as available
695        self.is_available_flag.store(true, Ordering::Release);
696
697        // Return active settings
698        let active_idx = state.active_index.unwrap();
699        let settings = state.stores[active_idx]
700            .settings
701            .lock()
702            .await
703            .clone()
704            .unwrap();
705        Ok(settings)
706    }
707
708    // -----------------------------------------------------------------------
709    // Hierarchical lock acquire helpers
710    // -----------------------------------------------------------------------
711
712    /// Acquire the reader lock (level 1).
713    ///
714    /// Auto-initializes via `make_available()` if not yet available.
715    /// The reader lock is acquired AFTER initialization so that initialization
716    /// itself can serialize via the same reader_lock.
717    pub async fn acquire_reader(&self) -> WalletResult<tokio::sync::MutexGuard<'_, ()>> {
718        if !self.is_available() {
719            // make_available() acquires+releases reader_lock internally for idempotency.
720            // After it returns, is_available is true and reader_lock is free.
721            self.make_available().await?;
722        }
723        Ok(self.reader_lock.lock().await)
724    }
725
726    /// Acquire the reader + writer locks (levels 1 and 2).
727    ///
728    /// Auto-initializes via `make_available()` if not yet available.
729    pub async fn acquire_writer(
730        &self,
731    ) -> WalletResult<(
732        tokio::sync::MutexGuard<'_, ()>,
733        tokio::sync::MutexGuard<'_, ()>,
734    )> {
735        if !self.is_available() {
736            self.make_available().await?;
737        }
738        let reader = self.reader_lock.lock().await;
739        let writer = self.writer_lock.lock().await;
740        Ok((reader, writer))
741    }
742
743    /// Acquire reader + writer + sync locks (levels 1, 2, and 3).
744    ///
745    /// Auto-initializes via `make_available()` if not yet available.
746    pub async fn acquire_sync(
747        &self,
748    ) -> WalletResult<(
749        tokio::sync::MutexGuard<'_, ()>,
750        tokio::sync::MutexGuard<'_, ()>,
751        tokio::sync::MutexGuard<'_, ()>,
752    )> {
753        if !self.is_available() {
754            self.make_available().await?;
755        }
756        let reader = self.reader_lock.lock().await;
757        let writer = self.writer_lock.lock().await;
758        let sync = self.sync_lock.lock().await;
759        Ok((reader, writer, sync))
760    }
761
762    /// Acquire all four locks (levels 1, 2, 3, and 4).
763    ///
764    /// Auto-initializes via `make_available()` if not yet available.
765    pub async fn acquire_storage_provider(
766        &self,
767    ) -> WalletResult<(
768        tokio::sync::MutexGuard<'_, ()>,
769        tokio::sync::MutexGuard<'_, ()>,
770        tokio::sync::MutexGuard<'_, ()>,
771        tokio::sync::MutexGuard<'_, ()>,
772    )> {
773        if !self.is_available() {
774            self.make_available().await?;
775        }
776        let reader = self.reader_lock.lock().await;
777        let writer = self.writer_lock.lock().await;
778        let sync = self.sync_lock.lock().await;
779        let sp = self.sp_lock.lock().await;
780        Ok((reader, writer, sync, sp))
781    }
782
783    // -----------------------------------------------------------------------
784    // Lifecycle methods
785    // -----------------------------------------------------------------------
786
787    /// Destroy all stores, cleaning up all resources.
788    pub async fn destroy(&self) -> WalletResult<()> {
789        let state = self.state.lock().await;
790        for store in &state.stores {
791            store.storage.destroy().await?;
792        }
793        Ok(())
794    }
795
796    /// Get wallet services.
797    pub async fn get_services_ref(&self) -> WalletResult<Arc<dyn WalletServices>> {
798        self.services.lock().await.clone().ok_or_else(|| {
799            WalletError::NotImplemented("services not configured on manager".to_string())
800        })
801    }
802
803    /// Set wallet services.
804    pub async fn set_services(&self, services: Arc<dyn WalletServices>) {
805        *self.services.lock().await = Some(services);
806    }
807
808    // -----------------------------------------------------------------------
809    // WalletStorageProvider delegation — read operations
810    //
811    // All read operations acquire the reader lock (level 1) before delegating
812    // to the active provider. This matches the TS `runAsReader` pattern.
813    // -----------------------------------------------------------------------
814
815    /// List wallet actions (transactions) with filtering and pagination.
816    pub async fn list_actions(
817        &self,
818        auth: &AuthId,
819        args: &ListActionsArgs,
820    ) -> WalletResult<ListActionsResult> {
821        let _rg = self.acquire_reader().await?;
822        let active = self.get_active().await?;
823        active.list_actions(auth, args).await
824    }
825
826    /// List outputs with basket/tag filtering.
827    pub async fn list_outputs(
828        &self,
829        auth: &AuthId,
830        args: &ListOutputsArgs,
831    ) -> WalletResult<ListOutputsResult> {
832        let _rg = self.acquire_reader().await?;
833        let active = self.get_active().await?;
834        active.list_outputs(auth, args).await
835    }
836
837    /// List certificates with certifier/type filtering.
838    pub async fn list_certificates(
839        &self,
840        auth: &AuthId,
841        args: &ListCertificatesArgs,
842    ) -> WalletResult<ListCertificatesResult> {
843        let _rg = self.acquire_reader().await?;
844        let active = self.get_active().await?;
845        active.list_certificates(auth, args).await
846    }
847
848    /// Find certificates scoped to the given auth identity.
849    pub async fn find_certificates_auth(
850        &self,
851        auth: &AuthId,
852        args: &FindCertificatesArgs,
853    ) -> WalletResult<Vec<Certificate>> {
854        let _rg = self.acquire_reader().await?;
855        let active = self.get_active().await?;
856        active.find_certificates_auth(auth, args).await
857    }
858
859    /// Find output baskets scoped to the given auth identity.
860    pub async fn find_output_baskets_auth(
861        &self,
862        auth: &AuthId,
863        args: &FindOutputBasketsArgs,
864    ) -> WalletResult<Vec<OutputBasket>> {
865        let _rg = self.acquire_reader().await?;
866        let active = self.get_active().await?;
867        active.find_output_baskets_auth(auth, args).await
868    }
869
870    /// Find outputs scoped to the given auth identity.
871    pub async fn find_outputs_auth(
872        &self,
873        auth: &AuthId,
874        args: &FindOutputsArgs,
875    ) -> WalletResult<Vec<Output>> {
876        let _rg = self.acquire_reader().await?;
877        let active = self.get_active().await?;
878        active.find_outputs_auth(auth, args).await
879    }
880
881    /// Find proven transaction requests.
882    pub async fn find_proven_tx_reqs(
883        &self,
884        args: &FindProvenTxReqsArgs,
885    ) -> WalletResult<Vec<ProvenTxReq>> {
886        let _rg = self.acquire_reader().await?;
887        let active = self.get_active().await?;
888        active.find_proven_tx_reqs(args).await
889    }
890
891    // -----------------------------------------------------------------------
892    // WalletStorageProvider delegation — write operations
893    //
894    // All write operations acquire reader + writer locks (levels 1 and 2)
895    // before delegating to the active provider. This matches the TS
896    // `runAsWriter` pattern.
897    // -----------------------------------------------------------------------
898
899    /// Abort (cancel) a transaction by reference.
900    pub async fn abort_action(
901        &self,
902        auth: &AuthId,
903        args: &AbortActionArgs,
904    ) -> WalletResult<AbortActionResult> {
905        let (_rg, _wg) = self.acquire_writer().await?;
906        let active = self.get_active().await?;
907        active.abort_action(auth, args).await
908    }
909
910    /// Create a new transaction in storage.
911    pub async fn create_action(
912        &self,
913        auth: &AuthId,
914        args: &StorageCreateActionArgs,
915    ) -> WalletResult<StorageCreateActionResult> {
916        let (_rg, _wg) = self.acquire_writer().await?;
917        let active = self.get_active().await?;
918        active.create_action(auth, args).await
919    }
920
921    /// Process (commit) a signed transaction to storage.
922    pub async fn process_action(
923        &self,
924        auth: &AuthId,
925        args: &StorageProcessActionArgs,
926    ) -> WalletResult<StorageProcessActionResult> {
927        let (_rg, _wg) = self.acquire_writer().await?;
928        let active = self.get_active().await?;
929        active.process_action(auth, args).await
930    }
931
932    /// Internalize outputs from an external transaction.
933    pub async fn internalize_action(
934        &self,
935        auth: &AuthId,
936        args: &StorageInternalizeActionArgs,
937        services: &dyn WalletServices,
938    ) -> WalletResult<StorageInternalizeActionResult> {
939        let (_rg, _wg) = self.acquire_writer().await?;
940        let active = self.get_active().await?;
941        active.internalize_action(auth, args, services).await
942    }
943
944    /// Insert a certificate scoped to the given auth identity.
945    pub async fn insert_certificate_auth(
946        &self,
947        auth: &AuthId,
948        certificate: &Certificate,
949    ) -> WalletResult<i64> {
950        let (_rg, _wg) = self.acquire_writer().await?;
951        let active = self.get_active().await?;
952        active.insert_certificate_auth(auth, certificate).await
953    }
954
955    /// Relinquish (soft-delete) a certificate by marking it deleted.
956    pub async fn relinquish_certificate(
957        &self,
958        auth: &AuthId,
959        args: &RelinquishCertificateArgs,
960    ) -> WalletResult<i64> {
961        let (_rg, _wg) = self.acquire_writer().await?;
962        let active = self.get_active().await?;
963        active.relinquish_certificate(auth, args).await
964    }
965
966    /// Relinquish (remove from basket) an output by outpoint.
967    pub async fn relinquish_output(
968        &self,
969        auth: &AuthId,
970        args: &RelinquishOutputArgs,
971    ) -> WalletResult<i64> {
972        let (_rg, _wg) = self.acquire_writer().await?;
973        let active = self.get_active().await?;
974        active.relinquish_output(auth, args).await
975    }
976
977    // -----------------------------------------------------------------------
978    // Sync-level delegation
979    // -----------------------------------------------------------------------
980
981    pub async fn find_or_insert_user(&self, identity_key: &str) -> WalletResult<(User, bool)> {
982        let active = self.get_active().await?;
983        let result = active.find_or_insert_user(identity_key).await?;
984
985        // Consistency guard: if the manager already has a cached user_id for this identity,
986        // verify it matches the returned record. A mismatch indicates a storage corruption
987        // or identity key collision that must not silently proceed.
988        if let Ok(cached_auth) = self.get_auth(false).await {
989            if let Some(cached_id) = cached_auth.user_id {
990                if result.0.user_id != cached_id {
991                    return Err(WalletError::Internal(
992                        "find_or_insert_user: returned user_id does not match cached user_id — identity consistency violation".to_string(),
993                    ));
994                }
995            }
996        }
997
998        Ok(result)
999    }
1000
1001    pub async fn get_sync_chunk(&self, args: &RequestSyncChunkArgs) -> WalletResult<SyncChunk> {
1002        let active = self.get_active().await?;
1003        active.get_sync_chunk(args).await
1004    }
1005
1006    pub async fn process_sync_chunk(
1007        &self,
1008        args: &RequestSyncChunkArgs,
1009        chunk: &SyncChunk,
1010    ) -> WalletResult<ProcessSyncChunkResult> {
1011        let active = self.get_active().await?;
1012        active.process_sync_chunk(args, chunk).await
1013    }
1014
1015    // -----------------------------------------------------------------------
1016    // Low-level CRUD operations (certificates, signer, beef helper use these)
1017    // -----------------------------------------------------------------------
1018
1019    pub async fn find_user_by_identity_key(&self, key: &str) -> WalletResult<Option<User>> {
1020        let active = self.get_active().await?;
1021        active.find_user_by_identity_key(key).await
1022    }
1023
1024    pub async fn find_certificates_storage(
1025        &self,
1026        args: &FindCertificatesArgs,
1027    ) -> WalletResult<Vec<Certificate>> {
1028        let active = self.get_active().await?;
1029        active.find_certificates_storage(args).await
1030    }
1031
1032    pub async fn find_certificate_fields(
1033        &self,
1034        args: &FindCertificateFieldsArgs,
1035    ) -> WalletResult<Vec<crate::tables::CertificateField>> {
1036        let active = self.get_active().await?;
1037        active.find_certificate_fields(args).await
1038    }
1039
1040    pub async fn find_outputs_storage(&self, args: &FindOutputsArgs) -> WalletResult<Vec<Output>> {
1041        let active = self.get_active().await?;
1042        active.find_outputs_storage(args).await
1043    }
1044
1045    /// Find outputs (signer pipeline alias).
1046    pub async fn find_outputs(&self, args: &FindOutputsArgs) -> WalletResult<Vec<Output>> {
1047        self.find_outputs_storage(args).await
1048    }
1049
1050    /// Update an output by ID.
1051    pub async fn update_output(&self, id: i64, update: &OutputPartial) -> WalletResult<i64> {
1052        let active = self.get_active().await?;
1053        active.update_output(id, update).await
1054    }
1055
1056    pub async fn insert_certificate_storage(&self, cert: &Certificate) -> WalletResult<i64> {
1057        let active = self.get_active().await?;
1058        active.insert_certificate_storage(cert).await
1059    }
1060
1061    pub async fn insert_certificate_field_storage(
1062        &self,
1063        field: &crate::tables::CertificateField,
1064    ) -> WalletResult<()> {
1065        let active = self.get_active().await?;
1066        active.insert_certificate_field_storage(field).await
1067    }
1068
1069    // -----------------------------------------------------------------------
1070    // Internal storage operations (monitor, signer, beef helper use these)
1071    //
1072    // These delegate to the active WalletStorageProvider's internal methods.
1073    // The active provider (SqliteStorage) implements these via the blanket impl.
1074    // StorageClient returns NotImplemented for these methods.
1075    // -----------------------------------------------------------------------
1076
1077    pub async fn find_proven_txs(&self, args: &FindProvenTxsArgs) -> WalletResult<Vec<ProvenTx>> {
1078        let active = self.get_active().await?;
1079        active.find_proven_txs(args).await
1080    }
1081
1082    pub async fn find_transactions(
1083        &self,
1084        args: &FindTransactionsArgs,
1085    ) -> WalletResult<Vec<Transaction>> {
1086        let active = self.get_active().await?;
1087        active.find_transactions(args).await
1088    }
1089
1090    pub async fn update_proven_tx_req(
1091        &self,
1092        id: i64,
1093        update: &ProvenTxReqPartial,
1094    ) -> WalletResult<i64> {
1095        let active = self.get_active().await?;
1096        active.update_proven_tx_req(id, update).await
1097    }
1098
1099    pub async fn update_proven_tx(&self, id: i64, update: &ProvenTxPartial) -> WalletResult<i64> {
1100        let active = self.get_active().await?;
1101        active.update_proven_tx(id, update).await
1102    }
1103
1104    pub async fn update_transaction(
1105        &self,
1106        id: i64,
1107        update: &TransactionPartial,
1108    ) -> WalletResult<i64> {
1109        let active = self.get_active().await?;
1110        active.update_transaction(id, update).await
1111    }
1112
1113    pub async fn update_transaction_status(
1114        &self,
1115        txid: &str,
1116        new_status: TransactionStatus,
1117    ) -> WalletResult<()> {
1118        let active = self.get_active().await?;
1119        active.update_transaction_status(txid, new_status).await
1120    }
1121
1122    pub async fn update_proven_tx_req_with_new_proven_tx(
1123        &self,
1124        req_id: i64,
1125        proven_tx: &ProvenTx,
1126    ) -> WalletResult<i64> {
1127        let active = self.get_active().await?;
1128        active
1129            .update_proven_tx_req_with_new_proven_tx(req_id, proven_tx)
1130            .await
1131    }
1132
1133    pub async fn insert_monitor_event(&self, event: &MonitorEvent) -> WalletResult<i64> {
1134        let active = self.get_active().await?;
1135        active.insert_monitor_event(event).await
1136    }
1137
1138    pub async fn admin_stats(&self, auth_id: &str) -> WalletResult<AdminStatsResult> {
1139        let active = self.get_active().await?;
1140        active.admin_stats(auth_id).await
1141    }
1142
1143    pub async fn purge_data(&self, params: &PurgeParams) -> WalletResult<String> {
1144        let active = self.get_active().await?;
1145        active.purge_data(params).await
1146    }
1147
1148    pub async fn review_status(&self, aged_limit: chrono::NaiveDateTime) -> WalletResult<String> {
1149        let active = self.get_active().await?;
1150        active.review_status(aged_limit).await
1151    }
1152
1153    pub async fn get_storage_identity_key(&self) -> WalletResult<String> {
1154        let active = self.get_active().await?;
1155        active.get_storage_identity_key().await
1156    }
1157
1158    // -----------------------------------------------------------------------
1159    // TS-parity getter methods
1160    // -----------------------------------------------------------------------
1161
1162    /// Returns metadata for all registered storage providers.
1163    ///
1164    /// Matches TS `getStores()`: iterates all stores and returns a `WalletStorageInfo`
1165    /// for each, including which is active, which are backups, and which are conflicting.
1166    pub async fn get_stores(&self) -> Vec<WalletStorageInfo> {
1167        let state = self.state.lock().await;
1168        let mut result = Vec::with_capacity(state.stores.len());
1169
1170        let is_active_enabled = {
1171            if let Some(idx) = state.active_index {
1172                let sik = state.stores[idx]
1173                    .settings
1174                    .lock()
1175                    .await
1176                    .as_ref()
1177                    .map(|s| s.storage_identity_key.clone())
1178                    .unwrap_or_default();
1179                let user_active = state.stores[idx]
1180                    .user
1181                    .lock()
1182                    .await
1183                    .as_ref()
1184                    .map(|u| u.active_storage.clone())
1185                    .unwrap_or_default();
1186                state.conflicting_active_indices.is_empty() && sik == user_active
1187            } else {
1188                false
1189            }
1190        };
1191
1192        for (i, store) in state.stores.iter().enumerate() {
1193            let settings_guard = store.settings.lock().await;
1194            let user_guard = store.user.lock().await;
1195
1196            let storage_identity_key = settings_guard
1197                .as_ref()
1198                .map(|s| s.storage_identity_key.clone())
1199                .unwrap_or_default();
1200            let storage_name = settings_guard
1201                .as_ref()
1202                .map(|s| s.storage_name.clone())
1203                .unwrap_or_default();
1204            let user_id = user_guard.as_ref().map(|u| u.user_id);
1205            let endpoint_url = store.storage.get_endpoint_url();
1206
1207            let is_active = state.active_index == Some(i);
1208            let is_backup = state.backup_indices.contains(&i);
1209            let is_conflicting = state.conflicting_active_indices.contains(&i);
1210            // A store is "enabled" only when it is the active store and active is enabled.
1211            let is_enabled = is_active && is_active_enabled;
1212
1213            result.push(WalletStorageInfo {
1214                storage_identity_key,
1215                storage_name,
1216                user_id,
1217                is_active,
1218                is_enabled,
1219                is_backup,
1220                is_conflicting,
1221                endpoint_url,
1222            });
1223        }
1224
1225        result
1226    }
1227
1228    /// Returns the remote endpoint URL for the store with the given identity key.
1229    ///
1230    /// Matches TS `getStoreEndpointURL(storageIdentityKey)`.
1231    /// Returns None for local SQLite stores and for unknown identity keys.
1232    pub async fn get_store_endpoint_url(&self, storage_identity_key: &str) -> Option<String> {
1233        let state = self.state.lock().await;
1234        for store in &state.stores {
1235            let sik = store
1236                .settings
1237                .lock()
1238                .await
1239                .as_ref()
1240                .map(|s| s.storage_identity_key.clone())
1241                .unwrap_or_default();
1242            if sik == storage_identity_key {
1243                return store.storage.get_endpoint_url();
1244            }
1245        }
1246        None
1247    }
1248
1249    // -----------------------------------------------------------------------
1250    // Reprove methods (chain reorganization proof re-validation)
1251    // -----------------------------------------------------------------------
1252
1253    /// Re-validate the merkle proof for a single ProvenTx record.
1254    ///
1255    /// Matches TS `reproveProven(ptx, noUpdate?)`:
1256    /// - Fetches a fresh merkle path from WalletServices.
1257    /// - Validates it against the current chain tracker.
1258    /// - If valid and `no_update` is false, persists the updated record to active storage.
1259    /// - Returns a `ReproveProvenResult` indicating updated/unchanged/unavailable.
1260    ///
1261    /// This method must be called while the StorageProvider lock is already held
1262    /// (the caller's responsibility — `reprove_header` acquires it before calling here).
1263    pub async fn reprove_proven(
1264        &self,
1265        ptx: &ProvenTx,
1266        no_update: bool,
1267    ) -> WalletResult<ReproveProvenResult> {
1268        let services = self.get_services_ref().await?;
1269
1270        // Attempt to get a fresh merkle proof for this transaction.
1271        let merkle_result = services.get_merkle_path(&ptx.txid, false).await;
1272
1273        // If no proof available, return unavailable.
1274        let (merkle_path_bytes, header) = match (merkle_result.merkle_path, merkle_result.header) {
1275            (Some(mp), Some(hdr)) => (mp, hdr),
1276            _ => {
1277                return Ok(ReproveProvenResult {
1278                    updated: None,
1279                    unchanged: false,
1280                    unavailable: true,
1281                });
1282            }
1283        };
1284
1285        // Check if proof data is unchanged.
1286        let height = header.height as i32;
1287        let block_hash = header.hash.clone();
1288        let merkle_root = header.merkle_root.clone();
1289
1290        if ptx.height == height
1291            && ptx.block_hash == block_hash
1292            && ptx.merkle_root == merkle_root
1293            && ptx.merkle_path == merkle_path_bytes
1294        {
1295            return Ok(ReproveProvenResult {
1296                updated: None,
1297                unchanged: true,
1298                unavailable: false,
1299            });
1300        }
1301
1302        // Build the updated record.
1303        let mut updated_ptx = ptx.clone();
1304        updated_ptx.height = height;
1305        updated_ptx.block_hash = block_hash.clone();
1306        updated_ptx.merkle_root = merkle_root.clone();
1307        updated_ptx.merkle_path = merkle_path_bytes;
1308        // index comes from the chain tracker header if available; fall back to existing.
1309        // (TS updates index via MerklePath.index — we leave it unchanged unless we have
1310        // the actual index from a decoded path, which requires the bsv library. Acceptable
1311        // for now: reprove_header primarily needs height + block_hash updated.)
1312
1313        let log_update = format!(
1314            "reproved txid={} old_block={} new_block={} height={}",
1315            ptx.txid, ptx.block_hash, block_hash, height
1316        );
1317
1318        // Persist update unless caller requested dry-run mode.
1319        if !no_update {
1320            let active = self.get_active().await?;
1321            let partial = ProvenTxPartial {
1322                height: Some(updated_ptx.height),
1323                block_hash: Some(updated_ptx.block_hash.clone()),
1324                ..Default::default()
1325            };
1326            active.update_proven_tx(ptx.proven_tx_id, &partial).await?;
1327        }
1328
1329        Ok(ReproveProvenResult {
1330            updated: Some(ReproveProvenUpdate {
1331                update: updated_ptx,
1332                log_update,
1333            }),
1334            unchanged: false,
1335            unavailable: false,
1336        })
1337    }
1338
1339    /// Re-prove all ProvenTx records associated with a deactivated block hash.
1340    ///
1341    /// Matches TS `reproveHeader(deactivatedHash)`:
1342    /// - Runs under StorageProvider lock (all four locks).
1343    /// - Finds all ProvenTx records with block_hash == deactivated_hash.
1344    /// - Calls `reprove_proven` for each (no_update=true for individual calls).
1345    /// - Bulk-updates all successfully reproved records in a single pass.
1346    /// - Returns `ReproveHeaderResult` with updated/unchanged/unavailable partitions.
1347    pub async fn reprove_header(
1348        &self,
1349        deactivated_hash: &str,
1350    ) -> WalletResult<ReproveHeaderResult> {
1351        // Acquire all four locks (StorageProvider level).
1352        let _guards = self.acquire_storage_provider().await?;
1353
1354        let proven_txs = {
1355            let active = self.get_active().await?;
1356            active
1357                .find_proven_txs(&FindProvenTxsArgs {
1358                    partial: ProvenTxPartial {
1359                        block_hash: Some(deactivated_hash.to_string()),
1360                        ..Default::default()
1361                    },
1362                    ..Default::default()
1363                })
1364                .await?
1365        };
1366
1367        let mut updated_vec = Vec::new();
1368        let mut unchanged_vec = Vec::new();
1369        let mut unavailable_vec = Vec::new();
1370
1371        let mut updates_to_persist: Vec<(i64, ProvenTxPartial, ProvenTx)> = Vec::new();
1372
1373        for ptx in &proven_txs {
1374            // Call reprove_proven in dry-run mode (no_update=true); we bulk-persist below.
1375            let result = self.reprove_proven(ptx, true).await?;
1376
1377            if result.unavailable {
1378                unavailable_vec.push(ptx.clone());
1379            } else if result.unchanged {
1380                unchanged_vec.push(ptx.clone());
1381            } else if let Some(ru) = result.updated {
1382                let partial = ProvenTxPartial {
1383                    height: Some(ru.update.height),
1384                    block_hash: Some(ru.update.block_hash.clone()),
1385                    ..Default::default()
1386                };
1387                updates_to_persist.push((ptx.proven_tx_id, partial, ru.update));
1388            }
1389        }
1390
1391        // Bulk-persist all updates.
1392        if !updates_to_persist.is_empty() {
1393            let active = self.get_active().await?;
1394            for (id, partial, updated_ptx) in updates_to_persist {
1395                active.update_proven_tx(id, &partial).await?;
1396                updated_vec.push(updated_ptx);
1397            }
1398        }
1399
1400        Ok(ReproveHeaderResult {
1401            updated: updated_vec,
1402            unchanged: unchanged_vec,
1403            unavailable: unavailable_vec,
1404        })
1405    }
1406
1407    // -----------------------------------------------------------------------
1408    // Sync loop methods (Plan 02)
1409    // -----------------------------------------------------------------------
1410
1411    /// Sync entities from `reader` into `writer` using chunked getSyncChunk/processSyncChunk.
1412    ///
1413    /// Loops until `processSyncChunk` returns `done=true`, accumulating insert/update counts.
1414    /// Caller must hold the sync lock (acquired by `update_backups` or `sync_from_reader`).
1415    ///
1416    /// # Arguments
1417    /// * `writer` - Storage receiving the sync data.
1418    /// * `reader` - Storage providing the sync data.
1419    /// * `writer_settings` - Cached settings for the writer (to_storage_identity_key).
1420    /// * `reader_settings` - Cached settings for the reader (from_storage_identity_key).
1421    /// * `prog_log` - Optional progress callback. If Some, called after each chunk with a
1422    ///   progress string. Returns the accumulated log of all callback return values.
1423    ///
1424    /// Returns `(total_inserts, total_updates, log_string)`.
1425    pub async fn sync_to_writer(
1426        &self,
1427        writer: &dyn WalletStorageProvider,
1428        reader: &dyn WalletStorageProvider,
1429        writer_settings: &Settings,
1430        reader_settings: &Settings,
1431        prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1432    ) -> WalletResult<(i64, i64, String)> {
1433        let auth = self.get_auth(false).await?;
1434        let mut total_inserts: i64 = 0;
1435        let mut total_updates: i64 = 0;
1436        let mut log = String::new();
1437
1438        let mut i = 0usize;
1439        loop {
1440            // Re-query sync state on every iteration — processSyncChunk updates it.
1441            let (ss, _) = writer
1442                .find_or_insert_sync_state_auth(
1443                    &auth,
1444                    &reader_settings.storage_identity_key,
1445                    &reader_settings.storage_name,
1446                )
1447                .await?;
1448
1449            let args = make_request_sync_chunk_args(
1450                &ss,
1451                &self.identity_key,
1452                &writer_settings.storage_identity_key,
1453            )?;
1454
1455            let chunk = reader.get_sync_chunk(&args).await?;
1456            let r = writer.process_sync_chunk(&args, &chunk).await?;
1457
1458            total_inserts += r.inserts;
1459            total_updates += r.updates;
1460
1461            if let Some(cb) = prog_log {
1462                let msg = format!(
1463                    "sync chunk {} inserts={} updates={}",
1464                    i, r.inserts, r.updates
1465                );
1466                let s = cb(&msg);
1467                log.push_str(&s);
1468                log.push('\n');
1469            }
1470
1471            if r.done {
1472                break;
1473            }
1474            i += 1;
1475        }
1476
1477        Ok((total_inserts, total_updates, log))
1478    }
1479
1480    /// Sync entities from `reader` into `writer` with active_storage override protection.
1481    ///
1482    /// Identical to `sync_to_writer` except that before calling `writer.process_sync_chunk`,
1483    /// the `chunk.user.active_storage` field is overridden with the writer's cached
1484    /// `user.active_storage`. This prevents the reader's active_storage from clobbering
1485    /// the writer's during sync (Pitfall 5 from the research).
1486    ///
1487    /// `reader_identity_key` must match this manager's identity key; otherwise returns
1488    /// `WERR_UNAUTHORIZED`. This mirrors the TS WalletStorageManager.syncFromReader check.
1489    pub async fn sync_from_reader(
1490        &self,
1491        reader_identity_key: &str,
1492        writer: &dyn WalletStorageProvider,
1493        reader: &dyn WalletStorageProvider,
1494        writer_settings: &Settings,
1495        reader_settings: &Settings,
1496        prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1497    ) -> WalletResult<(i64, i64, String)> {
1498        if reader_identity_key != self.identity_key {
1499            return Err(WalletError::Unauthorized(
1500                "sync_from_reader: reader identity key does not match manager identity key"
1501                    .to_string(),
1502            ));
1503        }
1504        let auth = self.get_auth(false).await?;
1505        let mut total_inserts: i64 = 0;
1506        let mut total_updates: i64 = 0;
1507        let mut log = String::new();
1508
1509        // Fetch writer's cached user.active_storage for the override below.
1510        let writer_active_storage = {
1511            let state = self.state.lock().await;
1512            if let Some(idx) = state.active_index {
1513                state.stores[idx]
1514                    .get_user_cached()
1515                    .await
1516                    .map(|u| u.active_storage)
1517            } else {
1518                None
1519            }
1520        };
1521
1522        let mut i = 0usize;
1523        loop {
1524            let (ss, _) = writer
1525                .find_or_insert_sync_state_auth(
1526                    &auth,
1527                    &reader_settings.storage_identity_key,
1528                    &reader_settings.storage_name,
1529                )
1530                .await?;
1531
1532            let args = make_request_sync_chunk_args(
1533                &ss,
1534                &self.identity_key,
1535                &writer_settings.storage_identity_key,
1536            )?;
1537
1538            let mut chunk = reader.get_sync_chunk(&args).await?;
1539
1540            // Pitfall 5: override chunk.user.active_storage with writer's cached value
1541            // to prevent the reader's active_storage from clobbering the writer's.
1542            if let (Some(ref mut chunk_user), Some(ref writer_as)) =
1543                (&mut chunk.user, &writer_active_storage)
1544            {
1545                chunk_user.active_storage = writer_as.clone();
1546            }
1547
1548            let r = writer.process_sync_chunk(&args, &chunk).await?;
1549
1550            total_inserts += r.inserts;
1551            total_updates += r.updates;
1552
1553            if let Some(cb) = prog_log {
1554                let msg = format!(
1555                    "sync chunk {} inserts={} updates={}",
1556                    i, r.inserts, r.updates
1557                );
1558                let s = cb(&msg);
1559                log.push_str(&s);
1560                log.push('\n');
1561            }
1562
1563            if r.done {
1564                break;
1565            }
1566            i += 1;
1567        }
1568
1569        Ok((total_inserts, total_updates, log))
1570    }
1571
1572    /// Sync all backup stores from the active store.
1573    ///
1574    /// Acquires sync-level locks, iterates `backup_indices`, and calls
1575    /// `sync_to_writer(backup, active, ...)` for each backup. Per-backup errors
1576    /// are logged as warnings and do not block other backups.
1577    ///
1578    /// Returns `(total_inserts, total_updates, accumulated_log)`.
1579    pub async fn update_backups(
1580        &self,
1581        prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1582    ) -> WalletResult<(i64, i64, String)> {
1583        // Acquire sync-level lock (reader + writer + sync).
1584        let _guards = self.acquire_sync().await?;
1585
1586        let (active_arc, active_settings, backup_arcs_and_settings) = {
1587            let state = self.state.lock().await;
1588            let idx = state.require_active()?;
1589            let active_arc = state.stores[idx].storage.clone();
1590            let active_settings =
1591                state.stores[idx]
1592                    .get_settings_cached()
1593                    .await
1594                    .ok_or_else(|| {
1595                        WalletError::InvalidOperation("Active settings not cached".to_string())
1596                    })?;
1597
1598            let mut backups = Vec::with_capacity(state.backup_indices.len());
1599            for &bi in &state.backup_indices {
1600                let backup_arc = state.stores[bi].storage.clone();
1601                let backup_settings = state.stores[bi].get_settings_cached().await;
1602                backups.push((backup_arc, backup_settings));
1603            }
1604            (active_arc, active_settings, backups)
1605        };
1606
1607        let mut total_inserts: i64 = 0;
1608        let mut total_updates: i64 = 0;
1609        let mut full_log = String::new();
1610
1611        for (backup_arc, maybe_backup_settings) in &backup_arcs_and_settings {
1612            let backup_settings = match maybe_backup_settings {
1613                Some(s) => s.clone(),
1614                None => {
1615                    warn!("update_backups: backup has no cached settings, skipping");
1616                    continue;
1617                }
1618            };
1619
1620            match self
1621                .sync_to_writer(
1622                    backup_arc.as_ref(),
1623                    active_arc.as_ref(),
1624                    &backup_settings,
1625                    &active_settings,
1626                    prog_log,
1627                )
1628                .await
1629            {
1630                Ok((ins, upd, log)) => {
1631                    total_inserts += ins;
1632                    total_updates += upd;
1633                    full_log.push_str(&log);
1634                }
1635                Err(e) => {
1636                    // Per-backup failure must not block other backups (TS parity).
1637                    warn!(
1638                        "update_backups: sync to backup '{}' failed: {e}",
1639                        backup_settings.storage_identity_key
1640                    );
1641                }
1642            }
1643        }
1644
1645        Ok((total_inserts, total_updates, full_log))
1646    }
1647
1648    /// Switch the active store to the one identified by `storage_identity_key`.
1649    ///
1650    /// Implements the TS `setActive(storageIdentityKey, progLog?)` algorithm (8 steps):
1651    ///
1652    /// 1. Validate the target store is registered.
1653    /// 2. Early-return "unchanged" if already active and enabled.
1654    /// 3. Acquire sync lock.
1655    /// 4. Merge any conflicting-active stores into the new active via `sync_to_writer`.
1656    /// 5. Determine backup_source (new_active if conflicts existed, else current_active).
1657    /// 6. Call provider-level `set_active` on backup_source to persist `user.active_storage`.
1658    /// 7. Propagate state to all other stores via `sync_to_writer`.
1659    /// 8. Reset `is_available` and re-partition via `do_make_available`.
1660    ///
1661    /// Returns accumulated progress log string on success.
1662    pub async fn set_active(
1663        &self,
1664        storage_identity_key: &str,
1665        prog_log: Option<&(dyn Fn(&str) -> String + Send + Sync)>,
1666    ) -> WalletResult<String> {
1667        // Step 0: Auto-init if needed
1668        if !self.is_available() {
1669            self.make_available().await?;
1670        }
1671
1672        // Step 1: Validate target store exists — find its index
1673        let new_active_idx = {
1674            let state = self.state.lock().await;
1675            let mut found = None;
1676            for (i, store) in state.stores.iter().enumerate() {
1677                let sik = store
1678                    .get_settings_cached()
1679                    .await
1680                    .map(|s| s.storage_identity_key)
1681                    .unwrap_or_default();
1682                if sik == storage_identity_key {
1683                    found = Some(i);
1684                    break;
1685                }
1686            }
1687            found
1688        };
1689
1690        let new_active_idx = new_active_idx.ok_or_else(|| WalletError::InvalidParameter {
1691            parameter: "storage_identity_key".to_string(),
1692            must_be: format!(
1693                "registered with this WalletStorageManager. {} does not match any managed store.",
1694                storage_identity_key
1695            ),
1696        })?;
1697
1698        // Step 2: Early return if already active and enabled
1699        let current_active_sik = self.get_active_store().await?;
1700        if current_active_sik == storage_identity_key && self.is_active_enabled().await {
1701            return Ok(format!("{} unchanged\n", storage_identity_key));
1702        }
1703
1704        // Step 3: Acquire sync lock (reader + writer + sync)
1705        // IMPORTANT: acquire_sync already holds reader_lock — must use do_make_available()
1706        // at re-partition step, NOT make_available().
1707        let _guards = self.acquire_sync().await?;
1708
1709        let mut log = String::new();
1710
1711        // Step 4: Conflict resolution — merge each conflicting active into the new active
1712        // Clone Arcs and Settings before releasing state lock (never hold state lock across async I/O)
1713        let had_conflicts;
1714        let (new_active_arc, new_active_settings) = {
1715            let state = self.state.lock().await;
1716            let arc = state.stores[new_active_idx].storage.clone();
1717            let settings = state.stores[new_active_idx]
1718                .get_settings_cached()
1719                .await
1720                .ok_or_else(|| {
1721                    WalletError::InvalidOperation(
1722                        "set_active: new active settings not cached".to_string(),
1723                    )
1724                })?;
1725            (arc, settings)
1726        };
1727
1728        let conflict_sources = {
1729            let state = self.state.lock().await;
1730            if state.conflicting_active_indices.is_empty() {
1731                had_conflicts = false;
1732                Vec::new()
1733            } else {
1734                had_conflicts = true;
1735                // Build list: all conflicting_active_indices + active_index, minus new_active_idx
1736                let mut sources = state.conflicting_active_indices.clone();
1737                if let Some(ai) = state.active_index {
1738                    sources.push(ai);
1739                }
1740                // Remove the new_active_idx from this list (it's the destination)
1741                sources.retain(|&i| i != new_active_idx);
1742
1743                // Clone arcs and settings for each conflict source
1744                let mut result = Vec::with_capacity(sources.len());
1745                for idx in sources {
1746                    let arc = state.stores[idx].storage.clone();
1747                    let settings = state.stores[idx].get_settings_cached().await;
1748                    result.push((arc, settings));
1749                }
1750                result
1751            }
1752        };
1753
1754        // Now do the async sync_to_writer calls outside the state lock
1755        for (conflict_arc, maybe_conflict_settings) in &conflict_sources {
1756            let conflict_settings = match maybe_conflict_settings {
1757                Some(s) => s.clone(),
1758                None => {
1759                    warn!("set_active: conflict source has no cached settings, skipping");
1760                    continue;
1761                }
1762            };
1763            // Merge conflict into new_active (new_active is the writer/destination)
1764            let (ins, upd, chunk_log) = self
1765                .sync_to_writer(
1766                    new_active_arc.as_ref(),
1767                    conflict_arc.as_ref(),
1768                    &new_active_settings,
1769                    &conflict_settings,
1770                    prog_log,
1771                )
1772                .await?;
1773            if let Some(cb) = prog_log {
1774                let msg = format!(
1775                    "set_active: merged conflict {} inserts={} updates={}",
1776                    conflict_settings.storage_identity_key, ins, upd
1777                );
1778                let s = cb(&msg);
1779                log.push_str(&s);
1780                log.push('\n');
1781            }
1782            log.push_str(&chunk_log);
1783        }
1784
1785        // Step 5: Determine backup_source
1786        // If conflicts existed → backup_source = new_active
1787        // Else → backup_source = current_active
1788        let (backup_source_arc, backup_source_settings, backup_source_user_id) = {
1789            let state = self.state.lock().await;
1790            if had_conflicts {
1791                // backup_source is the new active
1792                let user_id = state.stores[new_active_idx]
1793                    .get_user_cached()
1794                    .await
1795                    .map(|u| u.user_id)
1796                    .ok_or_else(|| {
1797                        WalletError::InvalidOperation(
1798                            "set_active: new active user not cached".to_string(),
1799                        )
1800                    })?;
1801                (new_active_arc.clone(), new_active_settings.clone(), user_id)
1802            } else {
1803                // backup_source is the current active
1804                let ai = state.require_active()?;
1805                let arc = state.stores[ai].storage.clone();
1806                let settings = state.stores[ai]
1807                    .get_settings_cached()
1808                    .await
1809                    .ok_or_else(|| {
1810                        WalletError::InvalidOperation(
1811                            "set_active: current active settings not cached".to_string(),
1812                        )
1813                    })?;
1814                let user_id = state.stores[ai]
1815                    .get_user_cached()
1816                    .await
1817                    .map(|u| u.user_id)
1818                    .ok_or_else(|| {
1819                        WalletError::InvalidOperation(
1820                            "set_active: current active user not cached".to_string(),
1821                        )
1822                    })?;
1823                (arc, settings, user_id)
1824            }
1825        };
1826
1827        // Step 6: Provider-level set_active on backup_source
1828        // This persists user.active_storage = storage_identity_key in the backup_source DB
1829        let auth = AuthId {
1830            identity_key: self.identity_key.clone(),
1831            user_id: Some(backup_source_user_id),
1832            is_active: Some(false),
1833        };
1834        backup_source_arc
1835            .set_active(&auth, storage_identity_key)
1836            .await?;
1837
1838        if let Some(cb) = prog_log {
1839            let msg = format!(
1840                "set_active: provider-level set_active on {} complete",
1841                backup_source_settings.storage_identity_key
1842            );
1843            let s = cb(&msg);
1844            log.push_str(&s);
1845            log.push('\n');
1846        }
1847
1848        // Step 7: Propagate state to all other stores via sync_to_writer
1849        // First update in-memory user.active_storage cache for all stores
1850        {
1851            let state = self.state.lock().await;
1852            for store in &state.stores {
1853                let mut user_guard = store.user.lock().await;
1854                if let Some(ref mut u) = *user_guard {
1855                    u.active_storage = storage_identity_key.to_string();
1856                }
1857            }
1858        }
1859
1860        // Collect list of (store_arc, store_settings) for stores != backup_source
1861        let backup_source_sik = backup_source_settings.storage_identity_key.clone();
1862        let propagation_targets = {
1863            let state = self.state.lock().await;
1864            let mut targets = Vec::new();
1865            for store in &state.stores {
1866                let sik = store
1867                    .get_settings_cached()
1868                    .await
1869                    .map(|s| s.storage_identity_key)
1870                    .unwrap_or_default();
1871                if sik != backup_source_sik {
1872                    let arc = store.storage.clone();
1873                    let settings = store.get_settings_cached().await;
1874                    targets.push((arc, settings));
1875                }
1876            }
1877            targets
1878        };
1879
1880        for (store_arc, maybe_store_settings) in &propagation_targets {
1881            let store_settings = match maybe_store_settings {
1882                Some(s) => s.clone(),
1883                None => {
1884                    warn!("set_active: propagation target has no cached settings, skipping");
1885                    continue;
1886                }
1887            };
1888            let (ins, upd, chunk_log) = self
1889                .sync_to_writer(
1890                    store_arc.as_ref(),
1891                    backup_source_arc.as_ref(),
1892                    &store_settings,
1893                    &backup_source_settings,
1894                    prog_log,
1895                )
1896                .await?;
1897            if let Some(cb) = prog_log {
1898                let msg = format!(
1899                    "set_active: propagated to {} inserts={} updates={}",
1900                    store_settings.storage_identity_key, ins, upd
1901                );
1902                let s = cb(&msg);
1903                log.push_str(&s);
1904                log.push('\n');
1905            }
1906            log.push_str(&chunk_log);
1907        }
1908
1909        // Step 8: Re-partition — reset flag and re-run do_make_available
1910        // MUST use do_make_available(), NOT make_available(): acquire_sync holds reader_lock
1911        // and make_available() would try to acquire it again → deadlock.
1912        self.is_available_flag.store(false, Ordering::Release);
1913        self.do_make_available().await?;
1914
1915        if let Some(cb) = prog_log {
1916            let msg = format!(
1917                "set_active: complete, new active is {}",
1918                storage_identity_key
1919            );
1920            let s = cb(&msg);
1921            log.push_str(&s);
1922            log.push('\n');
1923        }
1924
1925        Ok(log)
1926    }
1927
1928    /// Add a new storage provider at runtime.
1929    ///
1930    /// Acquires all four locks (storage-provider level), appends the provider
1931    /// to the `stores` vec, resets `is_available`, and re-runs `make_available()`
1932    /// to re-partition the updated store list.
1933    ///
1934    /// Matches TS `addWalletStorageProvider(storage)`.
1935    pub async fn add_wallet_storage_provider(
1936        &self,
1937        provider: Arc<dyn WalletStorageProvider>,
1938    ) -> WalletResult<()> {
1939        // Acquire all four locks (storage-provider level).
1940        let _guards = self.acquire_storage_provider().await?;
1941
1942        {
1943            let mut state = self.state.lock().await;
1944            state.stores.push(ManagedStorage::new(provider));
1945            // Reset is_available so make_available() will re-run the full partition.
1946            self.is_available_flag.store(false, Ordering::Release);
1947        }
1948
1949        // Re-run make_available() to initialize the new store and re-partition.
1950        // We must NOT call self.make_available() here because it would try to
1951        // acquire reader_lock which is already held by acquire_storage_provider.
1952        // Instead call do_make_available() which runs without acquiring reader_lock.
1953        self.do_make_available().await?;
1954
1955        Ok(())
1956    }
1957
1958    // -----------------------------------------------------------------------
1959    // Low-level seeding helpers (used by tests and setup code)
1960    //
1961    // These bypass auth and write directly to the active provider.
1962    // -----------------------------------------------------------------------
1963
1964    /// Find or create an output basket by user ID and name.
1965    pub async fn find_output_baskets(
1966        &self,
1967        args: &FindOutputBasketsArgs,
1968    ) -> WalletResult<Vec<OutputBasket>> {
1969        let active = self.get_active().await?;
1970        active.find_output_baskets(args).await
1971    }
1972
1973    /// Insert an output basket directly (no auth check).
1974    pub async fn insert_output_basket(&self, basket: &OutputBasket) -> WalletResult<i64> {
1975        let active = self.get_active().await?;
1976        active.insert_output_basket(basket).await
1977    }
1978
1979    /// Insert a transaction directly (no auth check).
1980    pub async fn insert_transaction(&self, tx: &Transaction) -> WalletResult<i64> {
1981        let active = self.get_active().await?;
1982        active.insert_transaction(tx).await
1983    }
1984
1985    /// Insert an output directly (no auth check).
1986    pub async fn insert_output(&self, output: &Output) -> WalletResult<i64> {
1987        let active = self.get_active().await?;
1988        active.insert_output(output).await
1989    }
1990}