Skip to main content

bsv_wallet_toolbox/monitor/tasks/
task_check_for_proofs.rs

1//! TaskCheckForProofs -- collects merkle proofs for unmined transactions.
2//!
3//! Translated from wallet-toolbox/src/monitor/tasks/TaskCheckForProofs.ts (243 lines).
4//!
5//! Normally triggered by the check_now flag (set when a new block header is detected).
6//! Queries for ProvenTxReqs with status "unmined"/"callback"/"sending"/"unknown"/"unconfirmed"
7//! and attempts to retrieve merkle proofs via services.get_merkle_path().
8
9use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use crate::error::WalletError;
15use crate::monitor::helpers::get_proofs;
16use crate::monitor::{AsyncCallback, ONE_HOUR};
17use crate::services::traits::WalletServices;
18use crate::status::ProvenTxReqStatus;
19use crate::storage::find_args::{FindProvenTxReqsArgs, Paged, ProvenTxReqPartial};
20use crate::storage::manager::WalletStorageManager;
21use crate::storage::traits::reader::StorageReader;
22use crate::types::Chain;
23
24use super::super::task_trait::WalletMonitorTask;
25
26/// Background task that collects merkle proofs for broadcast transactions.
27///
28/// Triggered by the shared `check_now` flag (set when new block headers arrive),
29/// or by a periodic timer. Queries for unconfirmed ProvenTxReqs and attempts
30/// to retrieve merkle proofs from services.
31pub struct TaskCheckForProofs {
32    /// Storage manager for persistence operations.
33    storage: WalletStorageManager,
34    /// Network services for proof retrieval.
35    services: Arc<dyn WalletServices>,
36    /// Periodic trigger interval (default 2 hours).
37    trigger_msecs: u64,
38    /// Last time this task ran (epoch ms).
39    last_run_msecs: u64,
40    /// Shared flag: set to true by Monitor.process_new_block_header().
41    /// When true, triggers immediate proof checking.
42    check_now: Arc<AtomicBool>,
43    /// Optional callback when a transaction is proven.
44    on_tx_proven: Option<AsyncCallback<String>>,
45    /// Chain (main/test).
46    chain: Chain,
47    /// Max unproven attempts before giving up.
48    unproven_attempts_limit: u32,
49    /// Shared last header height (AtomicU32, u32::MAX means no height known).
50    /// Updated by the Monitor when new block headers arrive.
51    last_new_header_height: Arc<AtomicU32>,
52}
53
54impl TaskCheckForProofs {
55    /// Create a new TaskCheckForProofs with default intervals.
56    pub fn new(
57        storage: WalletStorageManager,
58        services: Arc<dyn WalletServices>,
59        chain: Chain,
60        check_now: Arc<AtomicBool>,
61        unproven_attempts_limit: u32,
62        on_tx_proven: Option<AsyncCallback<String>>,
63        last_new_header_height: Arc<AtomicU32>,
64    ) -> Self {
65        Self {
66            storage,
67            services,
68            trigger_msecs: 2 * ONE_HOUR,
69            last_run_msecs: 0,
70            check_now,
71            on_tx_proven,
72            chain,
73            unproven_attempts_limit,
74            last_new_header_height,
75        }
76    }
77
78    /// Create with a custom trigger interval.
79    pub fn with_trigger_msecs(
80        storage: WalletStorageManager,
81        services: Arc<dyn WalletServices>,
82        chain: Chain,
83        check_now: Arc<AtomicBool>,
84        trigger_msecs: u64,
85        unproven_attempts_limit: u32,
86        on_tx_proven: Option<AsyncCallback<String>>,
87        last_new_header_height: Arc<AtomicU32>,
88    ) -> Self {
89        Self {
90            storage,
91            services,
92            trigger_msecs,
93            last_run_msecs: 0,
94            check_now,
95            on_tx_proven,
96            chain,
97            unproven_attempts_limit,
98            last_new_header_height,
99        }
100    }
101}
102
103#[async_trait]
104impl WalletMonitorTask for TaskCheckForProofs {
105    fn name(&self) -> &str {
106        "CheckForProofs"
107    }
108
109    fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
110        // Run if check_now flag is set (new block header received)
111        let check_now = self.check_now.load(Ordering::SeqCst);
112
113        // Also run if periodic timer has elapsed
114        let timer_expired = self.trigger_msecs > 0
115            && now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs;
116
117        if check_now || timer_expired {
118            self.last_run_msecs = now_msecs_since_epoch;
119            true
120        } else {
121            false
122        }
123    }
124
125    async fn run_task(&mut self) -> Result<String, WalletError> {
126        let mut log = String::new();
127        let counts_as_attempt = self.check_now.load(Ordering::SeqCst);
128
129        // Reset the check_now flag
130        self.check_now.store(false, Ordering::SeqCst);
131
132        // Read the shared last header height; u32::MAX is the sentinel for "unknown".
133        // Match TS behavior: skip proof checking entirely when no header height is known.
134        let raw_height = self.last_new_header_height.load(Ordering::SeqCst);
135        let max_acceptable_height = if raw_height == u32::MAX {
136            return Ok(log);
137        } else {
138            Some(raw_height)
139        };
140
141        let limit = 100i64;
142        let mut offset = 0i64;
143
144        // Query for reqs needing proof collection
145        let statuses = vec![
146            ProvenTxReqStatus::Callback,
147            ProvenTxReqStatus::Unmined,
148            ProvenTxReqStatus::Sending,
149            ProvenTxReqStatus::Unknown,
150            ProvenTxReqStatus::Unconfirmed,
151        ];
152
153        loop {
154            let args = FindProvenTxReqsArgs {
155                partial: ProvenTxReqPartial::default(),
156                statuses: Some(statuses.clone()),
157                paged: Some(Paged { limit, offset }),
158                since: None,
159            };
160
161            let reqs = self.storage.find_proven_tx_reqs(&args).await?;
162            let count = reqs.len();
163
164            if reqs.is_empty() {
165                break;
166            }
167
168            log.push_str(&format!(
169                "{} reqs with status 'callback', 'unmined', 'sending', 'unknown', or 'unconfirmed'\n",
170                reqs.len()
171            ));
172
173            let r = get_proofs(
174                &self.storage,
175                self.services.as_ref(),
176                &reqs,
177                &self.chain,
178                self.unproven_attempts_limit,
179                counts_as_attempt,
180                max_acceptable_height,
181            )
182            .await?;
183
184            log.push_str(&r.log);
185
186            // Fire callback for each proven tx
187            if let Some(ref cb) = self.on_tx_proven {
188                for proven_req in &r.proven {
189                    cb(proven_req.txid.clone()).await;
190                }
191            }
192
193            if (count as i64) < limit {
194                break;
195            }
196            offset += limit;
197        }
198
199        Ok(log)
200    }
201}
202
203// ---------------------------------------------------------------------------
204// Tests
205// ---------------------------------------------------------------------------
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn test_check_now_triggers_run() {
213        // When check_now is true, trigger should return true
214        let check_now = Arc::new(AtomicBool::new(true));
215        assert!(check_now.load(Ordering::SeqCst));
216
217        // After resetting, check_now should be false
218        check_now.store(false, Ordering::SeqCst);
219        assert!(!check_now.load(Ordering::SeqCst));
220    }
221
222    #[test]
223    fn test_default_trigger_interval() {
224        // Default trigger is 2 hours = 7200000 ms
225        assert_eq!(2 * ONE_HOUR, 7_200_000);
226    }
227
228    #[test]
229    fn test_task_name() {
230        assert_eq!("CheckForProofs", "CheckForProofs");
231    }
232}