Skip to main content

bsv_wallet_toolbox/monitor/
mod.rs

1//! Monitor module -- background task scheduler for transaction lifecycle management.
2//!
3//! Translated from wallet-toolbox/src/monitor/Monitor.ts.
4//! Provides the Monitor struct, MonitorBuilder, WalletMonitorTask trait,
5//! and supporting types for automatic transaction broadcasting, proof collection,
6//! chain reorg detection, and data cleanup.
7
8/// Shared helper functions for monitor tasks.
9pub mod helpers;
10/// The WalletMonitorTask trait for implementing custom tasks.
11pub mod task_trait;
12/// Built-in monitor task implementations.
13pub mod tasks;
14
15use std::future::Future;
16use std::pin::Pin;
17use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
18use std::sync::Arc;
19
20use tracing::{error, info, warn};
21
22use crate::error::{WalletError, WalletResult};
23use crate::services::traits::WalletServices;
24use crate::services::types::BlockHeader;
25use crate::storage::find_args::PurgeParams;
26use crate::storage::manager::WalletStorageManager;
27use crate::types::Chain;
28
29use self::helpers::{log_event, now_msecs};
30use self::task_trait::WalletMonitorTask;
31
32// ---------------------------------------------------------------------------
33// Time constants (in milliseconds)
34// ---------------------------------------------------------------------------
35
36/// One second in milliseconds.
37pub const ONE_SECOND: u64 = 1000;
38/// One minute in milliseconds.
39pub const ONE_MINUTE: u64 = 60 * ONE_SECOND;
40/// One hour in milliseconds.
41pub const ONE_HOUR: u64 = 60 * ONE_MINUTE;
42/// One day in milliseconds.
43pub const ONE_DAY: u64 = 24 * ONE_HOUR;
44/// One week in milliseconds.
45pub const ONE_WEEK: u64 = 7 * ONE_DAY;
46
47// ---------------------------------------------------------------------------
48// Callback type aliases
49// ---------------------------------------------------------------------------
50
51/// Type alias for async callback closures used by the Monitor.
52pub type AsyncCallback<T> =
53    Arc<dyn Fn(T) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
54
55// ---------------------------------------------------------------------------
56// MonitorOptions
57// ---------------------------------------------------------------------------
58
59/// Configuration options for the Monitor.
60///
61/// Matches the TS `MonitorOptions` interface from Monitor.ts.
62pub struct MonitorOptions {
63    /// Which chain this monitor operates on.
64    pub chain: Chain,
65
66    /// How many msecs to wait after each task run iteration.
67    pub task_run_wait_msecs: u64,
68
69    /// How many msecs before a transaction is considered abandoned.
70    pub abandoned_msecs: u64,
71
72    /// How many msecs to wait between merkle proof service requests.
73    pub msecs_wait_per_merkle_proof_service_req: u64,
74
75    /// Max unproven attempts before giving up (testnet).
76    pub unproven_attempts_limit_test: u32,
77
78    /// Max unproven attempts before giving up (mainnet).
79    pub unproven_attempts_limit_main: u32,
80
81    /// Stable callback token for ARC SSE event streaming.
82    pub callback_token: Option<String>,
83
84    /// Hook called when a transaction is broadcasted.
85    pub on_tx_broadcasted: Option<AsyncCallback<String>>,
86
87    /// Hook called when a transaction is proven.
88    pub on_tx_proven: Option<AsyncCallback<String>>,
89
90    /// Hook called when a transaction status changes.
91    pub on_tx_status_changed: Option<AsyncCallback<(String, String)>>,
92}
93
94impl Default for MonitorOptions {
95    fn default() -> Self {
96        Self {
97            chain: Chain::Main,
98            task_run_wait_msecs: 5000,
99            abandoned_msecs: ONE_MINUTE * 5,
100            msecs_wait_per_merkle_proof_service_req: 500,
101            unproven_attempts_limit_test: 10,
102            unproven_attempts_limit_main: 144,
103            callback_token: None,
104            on_tx_broadcasted: None,
105            on_tx_proven: None,
106            on_tx_status_changed: None,
107        }
108    }
109}
110
111// ---------------------------------------------------------------------------
112// DeactivatedHeader
113// ---------------------------------------------------------------------------
114
115/// A deactivated block header from a reorg event, queued for aging before processing.
116#[derive(Debug, Clone)]
117pub struct DeactivatedHeader {
118    /// Timestamp (epoch ms) when the deactivation was received.
119    /// Used to control aging of notification before pursuing updated proof data.
120    pub when_msecs: u64,
121    /// Number of attempts made to process the header.
122    /// Supports returning deactivation notification to the queue if proof data
123    /// is not yet available.
124    pub tries: u32,
125    /// The deactivated block header.
126    pub header: BlockHeader,
127}
128
129// ---------------------------------------------------------------------------
130// Monitor struct
131// ---------------------------------------------------------------------------
132
133/// Background task scheduler that manages transaction lifecycle.
134///
135/// Automatically handles broadcasting pending transactions, collecting merkle proofs,
136/// detecting chain reorgs, failing stale transactions, and cleaning up old data.
137///
138/// Translated from TS `Monitor` class in wallet-toolbox/src/monitor/Monitor.ts.
139///
140/// NOTE: Tasks run sequentially within each polling iteration, matching the TS pattern.
141/// Concurrent task execution is a future optimization.
142pub struct Monitor {
143    /// Configuration options.
144    pub options: MonitorOptions,
145
146    /// Storage manager for persistence operations.
147    pub storage: WalletStorageManager,
148
149    /// Network services for broadcasting and proof retrieval.
150    pub services: Arc<dyn WalletServices>,
151
152    /// The chain this monitor is configured for.
153    pub chain: Chain,
154
155    /// Scheduled tasks -- run by the polling loop and also by `run_task`.
156    tasks: Vec<Box<dyn WalletMonitorTask>>,
157
158    /// Other tasks -- only run by `run_task`, not by the scheduler.
159    other_tasks: Vec<Box<dyn WalletMonitorTask>>,
160
161    /// Flag indicating whether the polling loop is running.
162    running: Arc<AtomicBool>,
163
164    /// Flag to nudge proof checking (set by processNewBlockHeader).
165    /// Shared with TaskCheckForProofs.
166    pub check_now: Arc<AtomicBool>,
167
168    /// Shared last header height for max acceptable height guard.
169    /// u32::MAX is the sentinel for "no height known".
170    /// Shared with TaskCheckForProofs and updated in process_new_block_header.
171    pub last_new_header_height: Arc<AtomicU32>,
172
173    /// The last new block header received.
174    pub last_new_header: Option<BlockHeader>,
175
176    /// When the last new header was received (epoch ms).
177    pub last_new_header_when: Option<u64>,
178
179    /// Queue of deactivated headers from reorg events, awaiting processing.
180    pub deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
181
182    /// Handle to the spawned polling loop task.
183    join_handle: Option<tokio::task::JoinHandle<()>>,
184
185    /// Whether async_setup needs to run on first iteration.
186    run_async_setup: bool,
187}
188
189/// Default purge parameters matching the TS Monitor.defaultPurgeParams.
190pub fn default_purge_params() -> PurgeParams {
191    PurgeParams {
192        purge_spent: false,
193        purge_completed: false,
194        purge_failed: true,
195        purge_spent_age: 2 * ONE_WEEK,
196        purge_completed_age: 2 * ONE_WEEK,
197        purge_failed_age: 5 * ONE_DAY,
198    }
199}
200
201impl Monitor {
202    /// Returns a new MonitorBuilder for fluent construction.
203    pub fn builder() -> MonitorBuilder {
204        MonitorBuilder::new()
205    }
206
207    /// Start the polling loop in a background tokio task.
208    ///
209    /// Validates that the monitor is not already running, then spawns the loop.
210    pub fn start_tasks(&mut self) -> WalletResult<()> {
211        if self.running.load(Ordering::SeqCst) {
212            return Err(WalletError::BadRequest(
213                "monitor tasks are already running".to_string(),
214            ));
215        }
216
217        self.running.store(true, Ordering::SeqCst);
218        self.run_async_setup = true;
219
220        // Move task state into the spawned future.
221        // We need to take ownership of tasks for the spawn.
222        let running = self.running.clone();
223        let _check_now = self.check_now.clone();
224        let _deactivated_headers = self.deactivated_headers.clone();
225        let task_run_wait_msecs = self.options.task_run_wait_msecs;
226
227        // Take tasks out of self for the spawned future.
228        // They will be returned when stop_tasks is called.
229        let mut tasks: Vec<Box<dyn WalletMonitorTask>> = std::mem::take(&mut self.tasks);
230        let storage = WalletStorageManager::new(
231            self.storage.auth_id().to_string(),
232            self.storage.active().cloned(),
233            self.storage.backups().to_vec(),
234        );
235
236        let handle = tokio::spawn(async move {
237            // First iteration: run async_setup on all tasks
238            for task in tasks.iter_mut() {
239                if !running.load(Ordering::SeqCst) {
240                    break;
241                }
242                if let Err(e) = task.async_setup().await {
243                    let details = format!("monitor task {} asyncSetup error: {}", task.name(), e);
244                    warn!("{}", details);
245                    let _ = log_event(&storage, "error0", &details).await;
246                }
247            }
248
249            // Main polling loop
250            while running.load(Ordering::SeqCst) {
251                let now = now_msecs();
252
253                // Collect indices of triggered tasks
254                let mut triggered_indices = Vec::new();
255                for (i, task) in tasks.iter_mut().enumerate() {
256                    match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
257                        task.trigger(now)
258                    })) {
259                        Ok(should_run) => {
260                            if should_run {
261                                triggered_indices.push(i);
262                            }
263                        }
264                        Err(_) => {
265                            let details = format!("monitor task {} trigger panicked", task.name());
266                            error!("{}", details);
267                            let _ = log_event(&storage, "error0", &details).await;
268                        }
269                    }
270                }
271
272                // Run triggered tasks sequentially
273                for idx in triggered_indices {
274                    if !running.load(Ordering::SeqCst) {
275                        break;
276                    }
277                    let task = &mut tasks[idx];
278                    match task.run_task().await {
279                        Ok(log) => {
280                            if !log.is_empty() {
281                                info!("Task {} {}", task.name(), &log[..log.len().min(1024)]);
282                                let _ = log_event(&storage, task.name(), &log).await;
283                            }
284                        }
285                        Err(e) => {
286                            let details =
287                                format!("monitor task {} runTask error: {}", task.name(), e);
288                            error!("{}", details);
289                            let _ = log_event(&storage, "error1", &details).await;
290                        }
291                    }
292                }
293
294                // Sleep before next iteration
295                tokio::time::sleep(tokio::time::Duration::from_millis(task_run_wait_msecs)).await;
296            }
297
298            info!("Monitor polling loop stopped");
299        });
300
301        self.join_handle = Some(handle);
302        Ok(())
303    }
304
305    /// Stop the polling loop and wait for it to complete.
306    pub async fn stop_tasks(&mut self) {
307        self.running.store(false, Ordering::SeqCst);
308        if let Some(handle) = self.join_handle.take() {
309            let _ = handle.await;
310        }
311    }
312
313    /// Destroy the monitor: stops tasks and cleans up subscriptions.
314    pub async fn destroy(&mut self) {
315        self.stop_tasks().await;
316        // Future: unsubscribe from chaintracks events if subscribed.
317    }
318
319    /// Run one iteration of the polling loop (for testing).
320    ///
321    /// Runs async_setup on first call, then triggers and runs tasks once.
322    pub async fn run_once(&mut self) -> WalletResult<()> {
323        if self.run_async_setup {
324            for task in self.tasks.iter_mut() {
325                if let Err(e) = task.async_setup().await {
326                    let details = format!("monitor task {} asyncSetup error: {}", task.name(), e);
327                    warn!("{}", details);
328                    let _ = log_event(&self.storage, "error0", &details).await;
329                }
330            }
331            self.run_async_setup = false;
332        }
333
334        let now = now_msecs();
335
336        // Collect triggered tasks
337        let mut triggered_indices = Vec::new();
338        for (i, task) in self.tasks.iter_mut().enumerate() {
339            if task.trigger(now) {
340                triggered_indices.push(i);
341            }
342        }
343
344        // Run triggered tasks
345        for idx in triggered_indices {
346            let task = &mut self.tasks[idx];
347            match task.run_task().await {
348                Ok(log) => {
349                    if !log.is_empty() {
350                        info!("Task {} {}", task.name(), &log[..log.len().min(1024)]);
351                        let _ = log_event(&self.storage, task.name(), &log).await;
352                    }
353                }
354                Err(e) => {
355                    let details = format!("monitor task {} runTask error: {}", task.name(), e);
356                    error!("{}", details);
357                    let _ = log_event(&self.storage, "error1", &details).await;
358                }
359            }
360        }
361
362        Ok(())
363    }
364
365    /// Run a specific task by name (from _tasks or _otherTasks).
366    pub async fn run_task(&mut self, name: &str) -> WalletResult<String> {
367        // Search in scheduled tasks first
368        for task in self.tasks.iter_mut() {
369            if task.name() == name {
370                task.async_setup().await?;
371                return task.run_task().await;
372            }
373        }
374        // Search in other tasks
375        for task in self.other_tasks.iter_mut() {
376            if task.name() == name {
377                task.async_setup().await?;
378                return task.run_task().await;
379            }
380        }
381        Err(WalletError::InvalidParameter {
382            parameter: "name".to_string(),
383            must_be: format!("an existing task name, '{}' not found", name),
384        })
385    }
386
387    /// Process a new block header event received from Chaintracks.
388    ///
389    /// Stores the header and nudges the proof checker to try again.
390    pub fn process_new_block_header(&mut self, header: BlockHeader) {
391        // Update the shared last header height for the max acceptable height guard.
392        self.last_new_header_height
393            .store(header.height, Ordering::SeqCst);
394        self.last_new_header = Some(header);
395        self.last_new_header_when = Some(now_msecs());
396        // Nudge the proof checker to try again.
397        self.check_now.store(true, Ordering::SeqCst);
398    }
399
400    /// Process a reorg event received from Chaintracks.
401    ///
402    /// Reorgs can move recent transactions to new blocks at new index positions.
403    /// Affected transaction proofs become invalid and must be updated.
404    pub async fn process_reorg(
405        &self,
406        _depth: u32,
407        _old_tip: &BlockHeader,
408        _new_tip: &BlockHeader,
409        deactivated: Option<&[BlockHeader]>,
410    ) {
411        if let Some(headers) = deactivated {
412            let mut queue = self.deactivated_headers.lock().await;
413            for header in headers {
414                queue.push(DeactivatedHeader {
415                    when_msecs: now_msecs(),
416                    tries: 0,
417                    header: header.clone(),
418                });
419            }
420        }
421    }
422
423    /// Handler for new header events from Chaintracks.
424    ///
425    /// To minimize reorg processing, new headers are aged before processing
426    /// via TaskNewHeader. Therefore this handler is intentionally a no-op.
427    pub fn process_header(&self, _header: &BlockHeader) {
428        // Intentional no-op -- headers aged via TaskNewHeader polling.
429    }
430
431    // -----------------------------------------------------------------------
432    // Callback helpers
433    // -----------------------------------------------------------------------
434
435    /// Call the on_tx_broadcasted callback if registered.
436    pub async fn call_on_broadcasted_transaction(&self, broadcast_result: &str) {
437        if let Some(ref cb) = self.options.on_tx_broadcasted {
438            cb(broadcast_result.to_string()).await;
439        }
440    }
441
442    /// Call the on_tx_proven callback if registered.
443    pub async fn call_on_proven_transaction(&self, tx_status: &str) {
444        if let Some(ref cb) = self.options.on_tx_proven {
445            cb(tx_status.to_string()).await;
446        }
447    }
448
449    /// Call the on_tx_status_changed callback if registered.
450    pub async fn call_on_transaction_status_changed(&self, txid: &str, new_status: &str) {
451        if let Some(ref cb) = self.options.on_tx_status_changed {
452            cb((txid.to_string(), new_status.to_string())).await;
453        }
454    }
455
456    /// Returns whether the monitor is currently running.
457    pub fn is_running(&self) -> bool {
458        self.running.load(Ordering::SeqCst)
459    }
460
461    /// Add a task to the scheduled task list.
462    pub fn add_task(&mut self, task: Box<dyn WalletMonitorTask>) -> WalletResult<()> {
463        let name = task.name().to_string();
464        if self.tasks.iter().any(|t| t.name() == name) {
465            return Err(WalletError::BadRequest(format!(
466                "task {} has already been added",
467                name
468            )));
469        }
470        self.tasks.push(task);
471        Ok(())
472    }
473
474    /// Remove a task from the scheduled task list by name.
475    pub fn remove_task(&mut self, name: &str) {
476        self.tasks.retain(|t| t.name() != name);
477    }
478}
479
480// ---------------------------------------------------------------------------
481// MonitorBuilder
482// ---------------------------------------------------------------------------
483
484/// Fluent builder for constructing a Monitor instance.
485///
486/// Required fields: chain, storage, services.
487/// Optional: task presets, individual tasks, callback hooks.
488///
489/// # Example
490///
491/// ```no_run
492/// # use std::sync::Arc;
493/// # fn example(
494/// #     storage: bsv_wallet_toolbox::storage::manager::WalletStorageManager,
495/// #     services: Arc<dyn bsv_wallet_toolbox::services::traits::WalletServices>,
496/// # ) -> bsv_wallet_toolbox::WalletResult<()> {
497/// use bsv_wallet_toolbox::monitor::Monitor;
498/// use bsv_wallet_toolbox::types::Chain;
499///
500/// let monitor = Monitor::builder()
501///     .chain(Chain::Test)
502///     .storage(storage)
503///     .services(services)
504///     .default_tasks()
505///     .build()?;
506/// # Ok(())
507/// # }
508/// ```
509pub struct MonitorBuilder {
510    chain: Option<Chain>,
511    storage: Option<WalletStorageManager>,
512    services: Option<Arc<dyn WalletServices>>,
513    options: MonitorOptions,
514    default_tasks: bool,
515    multi_user_tasks: bool,
516    extra_tasks: Vec<Box<dyn WalletMonitorTask>>,
517    removed_task_names: Vec<String>,
518}
519
520impl MonitorBuilder {
521    fn new() -> Self {
522        Self {
523            chain: None,
524            storage: None,
525            services: None,
526            options: MonitorOptions::default(),
527            default_tasks: false,
528            multi_user_tasks: false,
529            extra_tasks: Vec::new(),
530            removed_task_names: Vec::new(),
531        }
532    }
533
534    /// Set the chain (required).
535    pub fn chain(mut self, chain: Chain) -> Self {
536        self.chain = Some(chain);
537        self
538    }
539
540    /// Set the storage manager (required).
541    pub fn storage(mut self, storage: WalletStorageManager) -> Self {
542        self.storage = Some(storage);
543        self
544    }
545
546    /// Set the services provider (required).
547    pub fn services(mut self, services: Arc<dyn WalletServices>) -> Self {
548        self.services = Some(services);
549        self
550    }
551
552    /// Set the task run wait interval in milliseconds.
553    pub fn task_run_wait_msecs(mut self, msecs: u64) -> Self {
554        self.options.task_run_wait_msecs = msecs;
555        self
556    }
557
558    /// Set the abandoned transaction threshold in milliseconds.
559    pub fn abandoned_msecs(mut self, msecs: u64) -> Self {
560        self.options.abandoned_msecs = msecs;
561        self
562    }
563
564    /// Set the callback token for ARC SSE streaming.
565    pub fn callback_token(mut self, token: String) -> Self {
566        self.options.callback_token = Some(token);
567        self
568    }
569
570    /// Use default tasks preset (single-user with sync).
571    ///
572    /// Default tasks include: TaskClock, TaskNewHeader, TaskMonitorCallHistory,
573    /// TaskSendWaiting, TaskCheckForProofs, TaskCheckNoSends, TaskFailAbandoned,
574    /// TaskUnFail, TaskReviewStatus, TaskReorg, TaskArcadeSSE, and
575    /// TaskMineBlock (mock chain only).
576    pub fn default_tasks(mut self) -> Self {
577        self.default_tasks = true;
578        self
579    }
580
581    /// Use multi-user tasks preset (without sync).
582    ///
583    /// Multi-user tasks include: TaskClock, TaskNewHeader, TaskMonitorCallHistory,
584    /// TaskSendWaiting, TaskCheckForProofs, TaskCheckNoSends, TaskFailAbandoned,
585    /// TaskUnFail, TaskReviewStatus, TaskReorg, and TaskMineBlock (mock chain only).
586    pub fn multi_user_tasks(mut self) -> Self {
587        self.multi_user_tasks = true;
588        self
589    }
590
591    /// Add an additional task to the scheduled task list.
592    pub fn add_task(mut self, task: Box<dyn WalletMonitorTask>) -> Self {
593        self.extra_tasks.push(task);
594        self
595    }
596
597    /// Remove a task by name from the preset task list.
598    pub fn remove_task(mut self, name: &str) -> Self {
599        self.removed_task_names.push(name.to_string());
600        self
601    }
602
603    /// Set the on_tx_broadcasted callback.
604    pub fn on_tx_broadcasted(mut self, cb: AsyncCallback<String>) -> Self {
605        self.options.on_tx_broadcasted = Some(cb);
606        self
607    }
608
609    /// Set the on_tx_proven callback.
610    pub fn on_tx_proven(mut self, cb: AsyncCallback<String>) -> Self {
611        self.options.on_tx_proven = Some(cb);
612        self
613    }
614
615    /// Set the on_tx_status_changed callback.
616    pub fn on_tx_status_changed(mut self, cb: AsyncCallback<(String, String)>) -> Self {
617        self.options.on_tx_status_changed = Some(cb);
618        self
619    }
620
621    /// Build the Monitor instance.
622    ///
623    /// Validates that required fields (chain, storage, services) are set.
624    /// Constructs tasks based on the selected preset.
625    pub fn build(mut self) -> WalletResult<Monitor> {
626        let chain = self
627            .chain
628            .ok_or_else(|| WalletError::MissingParameter("chain".to_string()))?;
629        let storage = self
630            .storage
631            .ok_or_else(|| WalletError::MissingParameter("storage".to_string()))?;
632        let services = self
633            .services
634            .ok_or_else(|| WalletError::MissingParameter("services".to_string()))?;
635
636        self.options.chain = chain.clone();
637
638        // Shared state for task coordination
639        let check_now = Arc::new(AtomicBool::new(false));
640        let last_new_header_height = Arc::new(AtomicU32::new(u32::MAX));
641        let deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
642            Arc::new(tokio::sync::Mutex::new(Vec::new()));
643
644        // Helper: create a new WalletStorageManager sharing the same providers
645        let make_storage = |s: &WalletStorageManager| -> WalletStorageManager {
646            WalletStorageManager::new(
647                s.auth_id().to_string(),
648                s.active().cloned(),
649                s.backups().to_vec(),
650            )
651        };
652
653        // Unproven attempt limits based on chain
654        let unproven_limit = match chain {
655            Chain::Test => self.options.unproven_attempts_limit_test,
656            _ => self.options.unproven_attempts_limit_main,
657        };
658
659        // Build task list based on preset.
660        let mut tasks: Vec<Box<dyn WalletMonitorTask>> = Vec::new();
661
662        if self.default_tasks || self.multi_user_tasks {
663            // -- Plan 03 tasks --
664            tasks.push(Box::new(tasks::task_clock::TaskClock::new()));
665            tasks.push(Box::new(
666                tasks::task_monitor_call_history::TaskMonitorCallHistory::new(services.clone()),
667            ));
668
669            // -- Plan 02 tasks --
670            tasks.push(Box::new(tasks::task_new_header::TaskNewHeader::new(
671                make_storage(&storage),
672                services.clone(),
673                check_now.clone(),
674            )));
675            tasks.push(Box::new(tasks::task_send_waiting::TaskSendWaiting::new(
676                make_storage(&storage),
677                services.clone(),
678                chain.clone(),
679                self.options.on_tx_broadcasted.clone(),
680            )));
681            tasks.push(Box::new(
682                tasks::task_check_for_proofs::TaskCheckForProofs::new(
683                    make_storage(&storage),
684                    services.clone(),
685                    chain.clone(),
686                    check_now.clone(),
687                    unproven_limit,
688                    self.options.on_tx_proven.clone(),
689                    last_new_header_height.clone(),
690                ),
691            ));
692            tasks.push(Box::new(tasks::task_check_no_sends::TaskCheckNoSends::new(
693                make_storage(&storage),
694                services.clone(),
695                chain.clone(),
696                unproven_limit,
697                last_new_header_height.clone(),
698            )));
699            tasks.push(Box::new(
700                tasks::task_fail_abandoned::TaskFailAbandoned::new(
701                    make_storage(&storage),
702                    self.options.abandoned_msecs,
703                ),
704            ));
705            tasks.push(Box::new(tasks::task_unfail::TaskUnFail::new(
706                make_storage(&storage),
707                services.clone(),
708            )));
709            tasks.push(Box::new(tasks::task_review_status::TaskReviewStatus::new(
710                make_storage(&storage),
711            )));
712            tasks.push(Box::new(tasks::task_reorg::TaskReorg::new(
713                make_storage(&storage),
714                services.clone(),
715                deactivated_headers.clone(),
716            )));
717
718            // Default tasks preset includes ARC SSE and SyncWhenIdle
719            if self.default_tasks {
720                tasks.push(Box::new(tasks::task_arc_sse::TaskArcSse::new(
721                    make_storage(&storage),
722                    services.clone(),
723                    self.options.callback_token.clone(),
724                    self.options.on_tx_status_changed.clone(),
725                )));
726                tasks.push(Box::new(tasks::task_sync_when_idle::TaskSyncWhenIdle::new()));
727            }
728
729            // TaskPurge: present in presets with long interval (TS comments it out in default)
730            tasks.push(Box::new(tasks::task_purge::TaskPurge::new(
731                make_storage(&storage),
732                default_purge_params(),
733            )));
734        }
735
736        // Remove tasks by name
737        for name in &self.removed_task_names {
738            tasks.retain(|t| t.name() != name.as_str());
739        }
740
741        // Add extra tasks
742        tasks.append(&mut self.extra_tasks);
743
744        Ok(Monitor {
745            options: self.options,
746            storage,
747            services,
748            chain,
749            tasks,
750            other_tasks: Vec::new(),
751            running: Arc::new(AtomicBool::new(false)),
752            check_now,
753            last_new_header_height,
754            last_new_header: None,
755            last_new_header_when: None,
756            deactivated_headers,
757            join_handle: None,
758            run_async_setup: true,
759        })
760    }
761}
762
763// ---------------------------------------------------------------------------
764// Tests
765// ---------------------------------------------------------------------------
766
767#[cfg(test)]
768mod tests {
769    use super::*;
770    use crate::services::types::BlockHeader;
771
772    // A minimal mock WalletServices for testing
773    struct MockServices {
774        chain: Chain,
775    }
776
777    #[async_trait::async_trait]
778    impl WalletServices for MockServices {
779        fn chain(&self) -> Chain {
780            self.chain.clone()
781        }
782        async fn get_chain_tracker(
783            &self,
784        ) -> WalletResult<Box<dyn bsv::transaction::chain_tracker::ChainTracker>> {
785            Err(WalletError::NotImplemented("mock".into()))
786        }
787        async fn get_merkle_path(
788            &self,
789            _txid: &str,
790            _use_next: bool,
791        ) -> crate::services::types::GetMerklePathResult {
792            crate::services::types::GetMerklePathResult::default()
793        }
794        async fn get_raw_tx(
795            &self,
796            _txid: &str,
797            _use_next: bool,
798        ) -> crate::services::types::GetRawTxResult {
799            crate::services::types::GetRawTxResult::default()
800        }
801        async fn post_beef(
802            &self,
803            _beef: &[u8],
804            _txids: &[String],
805        ) -> Vec<crate::services::types::PostBeefResult> {
806            vec![]
807        }
808        async fn get_utxo_status(
809            &self,
810            _output: &str,
811            _output_format: Option<crate::services::types::GetUtxoStatusOutputFormat>,
812            _outpoint: Option<&str>,
813            _use_next: bool,
814        ) -> crate::services::types::GetUtxoStatusResult {
815            crate::services::types::GetUtxoStatusResult {
816                name: "mock".to_string(),
817                status: "error".to_string(),
818                error: Some("mock".to_string()),
819                is_utxo: None,
820                details: vec![],
821            }
822        }
823        async fn get_status_for_txids(
824            &self,
825            _txids: &[String],
826            _use_next: bool,
827        ) -> crate::services::types::GetStatusForTxidsResult {
828            crate::services::types::GetStatusForTxidsResult {
829                name: "mock".to_string(),
830                status: "error".to_string(),
831                error: Some("mock".to_string()),
832                results: vec![],
833            }
834        }
835        async fn get_script_hash_history(
836            &self,
837            _hash: &str,
838            _use_next: bool,
839        ) -> crate::services::types::GetScriptHashHistoryResult {
840            crate::services::types::GetScriptHashHistoryResult {
841                name: "mock".to_string(),
842                status: "error".to_string(),
843                error: Some("mock".to_string()),
844                history: vec![],
845            }
846        }
847        async fn hash_to_header(&self, _hash: &str) -> WalletResult<BlockHeader> {
848            Err(WalletError::NotImplemented("mock".into()))
849        }
850        async fn get_header_for_height(&self, _height: u32) -> WalletResult<Vec<u8>> {
851            Err(WalletError::NotImplemented("mock".into()))
852        }
853        async fn get_height(&self) -> WalletResult<u32> {
854            Ok(800000)
855        }
856        async fn n_lock_time_is_final(
857            &self,
858            _input: crate::services::types::NLockTimeInput,
859        ) -> WalletResult<bool> {
860            Ok(true)
861        }
862        async fn get_bsv_exchange_rate(
863            &self,
864        ) -> WalletResult<crate::services::types::BsvExchangeRate> {
865            Err(WalletError::NotImplemented("mock".into()))
866        }
867        async fn get_fiat_exchange_rate(
868            &self,
869            _currency: &str,
870            _base: Option<&str>,
871        ) -> WalletResult<f64> {
872            Ok(1.0)
873        }
874        async fn get_fiat_exchange_rates(
875            &self,
876            _target: &[String],
877        ) -> WalletResult<crate::services::types::FiatExchangeRates> {
878            Err(WalletError::NotImplemented("mock".into()))
879        }
880        fn get_services_call_history(
881            &self,
882            _reset: bool,
883        ) -> crate::services::types::ServicesCallHistory {
884            crate::services::types::ServicesCallHistory { services: vec![] }
885        }
886        async fn get_beef_for_txid(&self, _txid: &str) -> WalletResult<bsv::transaction::Beef> {
887            Err(WalletError::NotImplemented("mock".into()))
888        }
889        fn hash_output_script(&self, _script: &[u8]) -> String {
890            String::new()
891        }
892        async fn is_utxo(
893            &self,
894            _locking_script: &[u8],
895            _txid: &str,
896            _vout: u32,
897        ) -> WalletResult<bool> {
898            Ok(false)
899        }
900    }
901
902    // We cannot easily construct a real WalletStorageManager in unit tests
903    // (it requires a real StorageProvider), so we test what we can without one.
904
905    #[test]
906    fn test_monitor_builder_validates_required_fields() {
907        // Missing chain
908        let result = MonitorBuilder::new().build();
909        assert!(result.is_err());
910        match result {
911            Err(e) => assert!(
912                e.to_string().contains("chain"),
913                "Expected chain error, got: {}",
914                e
915            ),
916            Ok(_) => panic!("Expected error for missing chain"),
917        }
918
919        // Missing storage (chain set but no storage)
920        let result = MonitorBuilder::new().chain(Chain::Test).build();
921        assert!(result.is_err());
922        match result {
923            Err(e) => assert!(
924                e.to_string().contains("storage"),
925                "Expected storage error, got: {}",
926                e
927            ),
928            Ok(_) => panic!("Expected error for missing storage"),
929        }
930    }
931
932    #[test]
933    fn test_time_constants() {
934        assert_eq!(ONE_SECOND, 1000);
935        assert_eq!(ONE_MINUTE, 60_000);
936        assert_eq!(ONE_HOUR, 3_600_000);
937        assert_eq!(ONE_DAY, 86_400_000);
938        assert_eq!(ONE_WEEK, 604_800_000);
939    }
940
941    #[test]
942    fn test_default_purge_params() {
943        let params = default_purge_params();
944        assert!(!params.purge_spent);
945        assert!(!params.purge_completed);
946        assert!(params.purge_failed);
947        assert_eq!(params.purge_spent_age, 2 * ONE_WEEK);
948        assert_eq!(params.purge_completed_age, 2 * ONE_WEEK);
949        assert_eq!(params.purge_failed_age, 5 * ONE_DAY);
950    }
951
952    #[test]
953    fn test_deactivated_header() {
954        let header = BlockHeader {
955            version: 1,
956            previous_hash: "0000".to_string(),
957            merkle_root: "abcd".to_string(),
958            time: 1234567890,
959            bits: 0x1d00ffff,
960            nonce: 42,
961            height: 100,
962            hash: "blockhash".to_string(),
963        };
964        let dh = DeactivatedHeader {
965            when_msecs: 1000,
966            tries: 0,
967            header: header.clone(),
968        };
969        assert_eq!(dh.when_msecs, 1000);
970        assert_eq!(dh.tries, 0);
971        assert_eq!(dh.header.height, 100);
972    }
973
974    #[test]
975    fn test_now_msecs_returns_reasonable_value() {
976        let now = now_msecs();
977        // Should be after 2020-01-01 in milliseconds
978        assert!(now > 1_577_836_800_000);
979    }
980
981    #[tokio::test]
982    async fn test_process_reorg_adds_deactivated_headers() {
983        // We need a Monitor to test process_reorg.
984        // Create one with mock services -- requires a storage manager though.
985        // Instead, test the DeactivatedHeader queue logic directly.
986        let queue: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
987            Arc::new(tokio::sync::Mutex::new(Vec::new()));
988
989        let header = BlockHeader {
990            version: 1,
991            previous_hash: "0000".to_string(),
992            merkle_root: "abcd".to_string(),
993            time: 1234567890,
994            bits: 0x1d00ffff,
995            nonce: 42,
996            height: 100,
997            hash: "blockhash".to_string(),
998        };
999
1000        // Simulate process_reorg logic
1001        {
1002            let mut q = queue.lock().await;
1003            q.push(DeactivatedHeader {
1004                when_msecs: now_msecs(),
1005                tries: 0,
1006                header: header.clone(),
1007            });
1008        }
1009
1010        let q = queue.lock().await;
1011        assert_eq!(q.len(), 1);
1012        assert_eq!(q[0].header.height, 100);
1013        assert_eq!(q[0].tries, 0);
1014    }
1015}