bsv_wallet_toolbox/monitor/tasks/
task_check_for_proofs.rs1use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
10use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use crate::error::WalletError;
15use crate::monitor::helpers::get_proofs;
16use crate::monitor::{AsyncCallback, ONE_HOUR};
17use crate::services::traits::WalletServices;
18use crate::status::ProvenTxReqStatus;
19use crate::storage::find_args::{FindProvenTxReqsArgs, Paged, ProvenTxReqPartial};
20use crate::storage::manager::WalletStorageManager;
21use crate::storage::traits::reader::StorageReader;
22use crate::types::Chain;
23
24use super::super::task_trait::WalletMonitorTask;
25
26pub struct TaskCheckForProofs {
32 storage: WalletStorageManager,
34 services: Arc<dyn WalletServices>,
36 trigger_msecs: u64,
38 last_run_msecs: u64,
40 check_now: Arc<AtomicBool>,
43 on_tx_proven: Option<AsyncCallback<String>>,
45 chain: Chain,
47 unproven_attempts_limit: u32,
49 last_new_header_height: Arc<AtomicU32>,
52}
53
54impl TaskCheckForProofs {
55 pub fn new(
57 storage: WalletStorageManager,
58 services: Arc<dyn WalletServices>,
59 chain: Chain,
60 check_now: Arc<AtomicBool>,
61 unproven_attempts_limit: u32,
62 on_tx_proven: Option<AsyncCallback<String>>,
63 last_new_header_height: Arc<AtomicU32>,
64 ) -> Self {
65 Self {
66 storage,
67 services,
68 trigger_msecs: 2 * ONE_HOUR,
69 last_run_msecs: 0,
70 check_now,
71 on_tx_proven,
72 chain,
73 unproven_attempts_limit,
74 last_new_header_height,
75 }
76 }
77
78 pub fn with_trigger_msecs(
80 storage: WalletStorageManager,
81 services: Arc<dyn WalletServices>,
82 chain: Chain,
83 check_now: Arc<AtomicBool>,
84 trigger_msecs: u64,
85 unproven_attempts_limit: u32,
86 on_tx_proven: Option<AsyncCallback<String>>,
87 last_new_header_height: Arc<AtomicU32>,
88 ) -> Self {
89 Self {
90 storage,
91 services,
92 trigger_msecs,
93 last_run_msecs: 0,
94 check_now,
95 on_tx_proven,
96 chain,
97 unproven_attempts_limit,
98 last_new_header_height,
99 }
100 }
101}
102
103#[async_trait]
104impl WalletMonitorTask for TaskCheckForProofs {
105 fn name(&self) -> &str {
106 "CheckForProofs"
107 }
108
109 fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
110 let check_now = self.check_now.load(Ordering::SeqCst);
112
113 let timer_expired = self.trigger_msecs > 0
115 && now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs;
116
117 if check_now || timer_expired {
118 self.last_run_msecs = now_msecs_since_epoch;
119 true
120 } else {
121 false
122 }
123 }
124
125 async fn run_task(&mut self) -> Result<String, WalletError> {
126 let mut log = String::new();
127 let counts_as_attempt = self.check_now.load(Ordering::SeqCst);
128
129 self.check_now.store(false, Ordering::SeqCst);
131
132 let raw_height = self.last_new_header_height.load(Ordering::SeqCst);
135 let max_acceptable_height = if raw_height == u32::MAX {
136 return Ok(log);
137 } else {
138 Some(raw_height)
139 };
140
141 let limit = 100i64;
142 let mut offset = 0i64;
143
144 let statuses = vec![
146 ProvenTxReqStatus::Callback,
147 ProvenTxReqStatus::Unmined,
148 ProvenTxReqStatus::Sending,
149 ProvenTxReqStatus::Unknown,
150 ProvenTxReqStatus::Unconfirmed,
151 ];
152
153 loop {
154 let args = FindProvenTxReqsArgs {
155 partial: ProvenTxReqPartial::default(),
156 statuses: Some(statuses.clone()),
157 paged: Some(Paged { limit, offset }),
158 since: None,
159 };
160
161 let reqs = self.storage.find_proven_tx_reqs(&args).await?;
162 let count = reqs.len();
163
164 if reqs.is_empty() {
165 break;
166 }
167
168 log.push_str(&format!(
169 "{} reqs with status 'callback', 'unmined', 'sending', 'unknown', or 'unconfirmed'\n",
170 reqs.len()
171 ));
172
173 let r = get_proofs(
174 &self.storage,
175 self.services.as_ref(),
176 &reqs,
177 &self.chain,
178 self.unproven_attempts_limit,
179 counts_as_attempt,
180 max_acceptable_height,
181 )
182 .await?;
183
184 log.push_str(&r.log);
185
186 if let Some(ref cb) = self.on_tx_proven {
188 for proven_req in &r.proven {
189 cb(proven_req.txid.clone()).await;
190 }
191 }
192
193 if (count as i64) < limit {
194 break;
195 }
196 offset += limit;
197 }
198
199 Ok(log)
200 }
201}
202
203#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 fn test_check_now_triggers_run() {
213 let check_now = Arc::new(AtomicBool::new(true));
215 assert!(check_now.load(Ordering::SeqCst));
216
217 check_now.store(false, Ordering::SeqCst);
219 assert!(!check_now.load(Ordering::SeqCst));
220 }
221
222 #[test]
223 fn test_default_trigger_interval() {
224 assert_eq!(2 * ONE_HOUR, 7_200_000);
226 }
227
228 #[test]
229 fn test_task_name() {
230 assert_eq!("CheckForProofs", "CheckForProofs");
231 }
232}