bsv_wallet_toolbox/monitor/tasks/
task_reorg.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12
13use crate::error::WalletError;
14use crate::monitor::helpers::now_msecs;
15use crate::monitor::{DeactivatedHeader, ONE_MINUTE, ONE_SECOND};
16use crate::services::traits::WalletServices;
17use crate::storage::find_args::{FindProvenTxsArgs, ProvenTxPartial};
18use crate::storage::manager::WalletStorageManager;
19use crate::storage::traits::reader::StorageReader;
20use crate::storage::traits::reader_writer::StorageReaderWriter;
21use crate::tables::ProvenTx;
22
23use super::super::task_trait::WalletMonitorTask;
24
25pub struct TaskReorg {
31 storage: WalletStorageManager,
33 services: Arc<dyn WalletServices>,
35 aged_msecs: u64,
37 max_retries: u32,
39 last_run_msecs: u64,
41 trigger_msecs: u64,
43 deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
45 process: Vec<DeactivatedHeader>,
47}
48
49impl TaskReorg {
50 pub fn new(
52 storage: WalletStorageManager,
53 services: Arc<dyn WalletServices>,
54 deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
55 ) -> Self {
56 Self {
57 storage,
58 services,
59 aged_msecs: ONE_MINUTE * 10,
60 max_retries: 3,
61 last_run_msecs: 0,
62 trigger_msecs: ONE_SECOND * 30,
63 deactivated_headers,
64 process: Vec::new(),
65 }
66 }
67
68 pub fn with_intervals(
70 storage: WalletStorageManager,
71 services: Arc<dyn WalletServices>,
72 deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
73 aged_msecs: u64,
74 max_retries: u32,
75 trigger_msecs: u64,
76 ) -> Self {
77 Self {
78 storage,
79 services,
80 aged_msecs,
81 max_retries,
82 last_run_msecs: 0,
83 trigger_msecs,
84 deactivated_headers,
85 process: Vec::new(),
86 }
87 }
88}
89
90#[async_trait]
91impl WalletMonitorTask for TaskReorg {
92 fn name(&self) -> &str {
93 "Reorg"
94 }
95
96 fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
97 if now_msecs_since_epoch <= self.last_run_msecs + self.trigger_msecs {
98 return false;
99 }
100
101 let cutoff = now_msecs_since_epoch.saturating_sub(self.aged_msecs);
103 if let Ok(mut queue) = self.deactivated_headers.try_lock() {
104 let mut remaining = Vec::new();
105 for header in queue.drain(..) {
106 if header.when_msecs < cutoff {
107 self.process.push(header);
108 } else {
109 remaining.push(header);
110 }
111 }
112 *queue = remaining;
113 }
114
115 if !self.process.is_empty() {
116 self.last_run_msecs = now_msecs_since_epoch;
117 true
118 } else {
119 false
120 }
121 }
122
123 async fn run_task(&mut self) -> Result<String, WalletError> {
124 let mut log = String::new();
125
126 while let Some(header) = self.process.pop() {
127 log.push_str(&format!(
128 " processing deactivated header height={} hash={} tries={}\n",
129 header.header.height, header.header.hash, header.tries
130 ));
131
132 let find_args = FindProvenTxsArgs {
134 partial: ProvenTxPartial {
135 block_hash: Some(header.header.hash.clone()),
136 ..Default::default()
137 },
138 since: None,
139 paged: None,
140 };
141
142 let proven_txs = match self.storage.find_proven_txs(&find_args).await {
143 Ok(txs) => txs,
144 Err(e) => {
145 log.push_str(&format!(" error finding proven txs: {}\n", e));
146 continue;
147 }
148 };
149
150 if proven_txs.is_empty() {
151 log.push_str(" no matching proven_txs records\n");
152 continue;
153 }
154
155 log.push_str(&format!(
156 " found {} proven_txs records for block {}\n",
157 proven_txs.len(),
158 header.header.hash
159 ));
160
161 let mut unavailable_count = 0;
162
163 for ptx in &proven_txs {
164 let gmpr = self.services.get_merkle_path(&ptx.txid, false).await;
166
167 if let (Some(merkle_path), Some(new_header)) = (&gmpr.merkle_path, &gmpr.header) {
168 let now = chrono::Utc::now().naive_utc();
170 let _updated_ptx = ProvenTx {
171 created_at: ptx.created_at,
172 updated_at: now,
173 proven_tx_id: ptx.proven_tx_id,
174 txid: ptx.txid.clone(),
175 height: new_header.height as i32,
176 index: 0,
177 merkle_path: merkle_path.clone(),
178 raw_tx: ptx.raw_tx.clone(),
179 block_hash: new_header.hash.clone(),
180 merkle_root: new_header.merkle_root.clone(),
181 };
182
183 let update = crate::storage::find_args::ProvenTxPartial {
184 proven_tx_id: Some(ptx.proven_tx_id),
185 height: Some(new_header.height as i32),
186 block_hash: Some(new_header.hash.clone()),
187 ..Default::default()
188 };
189 match self
190 .storage
191 .update_proven_tx(ptx.proven_tx_id, &update)
192 .await
193 {
194 Ok(_) => {
195 log.push_str(&format!(
196 " updated proof for txid {} to height {}\n",
197 ptx.txid, new_header.height
198 ));
199 }
200 Err(e) => {
201 log.push_str(&format!(
202 " error updating proof for txid {}: {}\n",
203 ptx.txid, e
204 ));
205 unavailable_count += 1;
206 }
207 }
208 } else {
209 log.push_str(&format!(
210 " no updated proof available for txid {}\n",
211 ptx.txid
212 ));
213 unavailable_count += 1;
214 }
215 }
216
217 if unavailable_count > 0 {
219 if header.tries + 1 >= self.max_retries {
220 log.push_str(&format!(
221 " maximum retries {} exceeded\n",
222 self.max_retries
223 ));
224 } else {
225 log.push_str(" retrying...\n");
226 if let Ok(mut queue) = self.deactivated_headers.try_lock() {
228 queue.push(DeactivatedHeader {
229 header: header.header.clone(),
230 when_msecs: now_msecs(),
231 tries: header.tries + 1,
232 });
233 }
234 }
235 }
236 }
237
238 Ok(log)
239 }
240}
241
242#[cfg(test)]
247mod tests {
248 use super::*;
249 use crate::services::types::BlockHeader;
250
251 #[test]
252 fn test_default_intervals() {
253 assert_eq!(ONE_MINUTE * 10, 600_000);
254 assert_eq!(ONE_SECOND * 30, 30_000);
255 }
256
257 #[test]
258 fn test_task_name() {
259 assert_eq!("Reorg", "Reorg");
260 }
261
262 #[tokio::test]
263 async fn test_deactivated_header_aging() {
264 let queue: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
265 Arc::new(tokio::sync::Mutex::new(Vec::new()));
266
267 let header = BlockHeader {
268 version: 1,
269 previous_hash: "0000".to_string(),
270 merkle_root: "abcd".to_string(),
271 time: 1234567890,
272 bits: 0x1d00ffff,
273 nonce: 42,
274 height: 100,
275 hash: "blockhash".to_string(),
276 };
277
278 {
280 let mut q = queue.lock().await;
281 q.push(DeactivatedHeader {
282 when_msecs: 1000, tries: 0,
284 header: header.clone(),
285 });
286 }
287
288 let q = queue.lock().await;
290 assert_eq!(q.len(), 1);
291 assert_eq!(q[0].tries, 0);
292 }
293}