Skip to main content

bsv_wallet_toolbox/monitor/tasks/
task_reorg.rs

1//! TaskReorg -- processes deactivated headers from reorg events.
2//!
3//! Translated from wallet-toolbox/src/monitor/tasks/TaskReorg.ts (89 lines).
4//!
5//! When chain reorgs are detected, deactivated block headers are queued.
6//! This task processes aged headers by finding affected ProvenTx records
7//! and attempting to re-fetch their merkle proofs.
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12
13use crate::error::WalletError;
14use crate::monitor::helpers::now_msecs;
15use crate::monitor::{DeactivatedHeader, ONE_MINUTE, ONE_SECOND};
16use crate::services::traits::WalletServices;
17use crate::storage::find_args::{FindProvenTxsArgs, ProvenTxPartial};
18use crate::storage::manager::WalletStorageManager;
19use crate::storage::traits::reader::StorageReader;
20use crate::storage::traits::reader_writer::StorageReaderWriter;
21use crate::tables::ProvenTx;
22
23use super::super::task_trait::WalletMonitorTask;
24
25/// Background task that handles chain reorg processing.
26///
27/// Processes deactivated block headers by finding ProvenTx records that
28/// reference those blocks and attempting to get updated merkle proofs.
29/// Uses an aging pattern to delay processing, allowing the chain to stabilize.
30pub struct TaskReorg {
31    /// Storage manager for persistence operations.
32    storage: WalletStorageManager,
33    /// Network services for proof retrieval.
34    services: Arc<dyn WalletServices>,
35    /// How long to age headers before processing (default 10 minutes).
36    aged_msecs: u64,
37    /// Maximum retries per deactivated header (default 3).
38    max_retries: u32,
39    /// Last time this task ran (epoch ms).
40    last_run_msecs: u64,
41    /// Trigger interval (default 30 seconds).
42    trigger_msecs: u64,
43    /// Shared queue of deactivated headers from Monitor.
44    deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
45    /// Headers staged for processing in the current run.
46    process: Vec<DeactivatedHeader>,
47}
48
49impl TaskReorg {
50    /// Create a new TaskReorg with default intervals.
51    pub fn new(
52        storage: WalletStorageManager,
53        services: Arc<dyn WalletServices>,
54        deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
55    ) -> Self {
56        Self {
57            storage,
58            services,
59            aged_msecs: ONE_MINUTE * 10,
60            max_retries: 3,
61            last_run_msecs: 0,
62            trigger_msecs: ONE_SECOND * 30,
63            deactivated_headers,
64            process: Vec::new(),
65        }
66    }
67
68    /// Create with custom intervals.
69    pub fn with_intervals(
70        storage: WalletStorageManager,
71        services: Arc<dyn WalletServices>,
72        deactivated_headers: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>>,
73        aged_msecs: u64,
74        max_retries: u32,
75        trigger_msecs: u64,
76    ) -> Self {
77        Self {
78            storage,
79            services,
80            aged_msecs,
81            max_retries,
82            last_run_msecs: 0,
83            trigger_msecs,
84            deactivated_headers,
85            process: Vec::new(),
86        }
87    }
88}
89
90#[async_trait]
91impl WalletMonitorTask for TaskReorg {
92    fn name(&self) -> &str {
93        "Reorg"
94    }
95
96    fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
97        if now_msecs_since_epoch <= self.last_run_msecs + self.trigger_msecs {
98            return false;
99        }
100
101        // Shift aged deactivated headers from the shared queue to the process list
102        let cutoff = now_msecs_since_epoch.saturating_sub(self.aged_msecs);
103        if let Ok(mut queue) = self.deactivated_headers.try_lock() {
104            let mut remaining = Vec::new();
105            for header in queue.drain(..) {
106                if header.when_msecs < cutoff {
107                    self.process.push(header);
108                } else {
109                    remaining.push(header);
110                }
111            }
112            *queue = remaining;
113        }
114
115        if !self.process.is_empty() {
116            self.last_run_msecs = now_msecs_since_epoch;
117            true
118        } else {
119            false
120        }
121    }
122
123    async fn run_task(&mut self) -> Result<String, WalletError> {
124        let mut log = String::new();
125
126        while let Some(header) = self.process.pop() {
127            log.push_str(&format!(
128                "  processing deactivated header height={} hash={} tries={}\n",
129                header.header.height, header.header.hash, header.tries
130            ));
131
132            // Find ProvenTx records with this block_hash
133            let find_args = FindProvenTxsArgs {
134                partial: ProvenTxPartial {
135                    block_hash: Some(header.header.hash.clone()),
136                    ..Default::default()
137                },
138                since: None,
139                paged: None,
140            };
141
142            let proven_txs = match self.storage.find_proven_txs(&find_args).await {
143                Ok(txs) => txs,
144                Err(e) => {
145                    log.push_str(&format!("    error finding proven txs: {}\n", e));
146                    continue;
147                }
148            };
149
150            if proven_txs.is_empty() {
151                log.push_str("    no matching proven_txs records\n");
152                continue;
153            }
154
155            log.push_str(&format!(
156                "    found {} proven_txs records for block {}\n",
157                proven_txs.len(),
158                header.header.hash
159            ));
160
161            let mut unavailable_count = 0;
162
163            for ptx in &proven_txs {
164                // Try to get an updated merkle proof
165                let gmpr = self.services.get_merkle_path(&ptx.txid, false).await;
166
167                if let (Some(merkle_path), Some(new_header)) = (&gmpr.merkle_path, &gmpr.header) {
168                    // Got updated proof -- update the ProvenTx record
169                    let now = chrono::Utc::now().naive_utc();
170                    let _updated_ptx = ProvenTx {
171                        created_at: ptx.created_at,
172                        updated_at: now,
173                        proven_tx_id: ptx.proven_tx_id,
174                        txid: ptx.txid.clone(),
175                        height: new_header.height as i32,
176                        index: 0,
177                        merkle_path: merkle_path.clone(),
178                        raw_tx: ptx.raw_tx.clone(),
179                        block_hash: new_header.hash.clone(),
180                        merkle_root: new_header.merkle_root.clone(),
181                    };
182
183                    let update = crate::storage::find_args::ProvenTxPartial {
184                        proven_tx_id: Some(ptx.proven_tx_id),
185                        height: Some(new_header.height as i32),
186                        block_hash: Some(new_header.hash.clone()),
187                        ..Default::default()
188                    };
189                    match self
190                        .storage
191                        .update_proven_tx(ptx.proven_tx_id, &update)
192                        .await
193                    {
194                        Ok(_) => {
195                            log.push_str(&format!(
196                                "    updated proof for txid {} to height {}\n",
197                                ptx.txid, new_header.height
198                            ));
199                        }
200                        Err(e) => {
201                            log.push_str(&format!(
202                                "    error updating proof for txid {}: {}\n",
203                                ptx.txid, e
204                            ));
205                            unavailable_count += 1;
206                        }
207                    }
208                } else {
209                    log.push_str(&format!(
210                        "    no updated proof available for txid {}\n",
211                        ptx.txid
212                    ));
213                    unavailable_count += 1;
214                }
215            }
216
217            // If some proofs were unavailable, retry if under max_retries
218            if unavailable_count > 0 {
219                if header.tries + 1 >= self.max_retries {
220                    log.push_str(&format!(
221                        "    maximum retries {} exceeded\n",
222                        self.max_retries
223                    ));
224                } else {
225                    log.push_str("    retrying...\n");
226                    // Push back to the shared queue for another attempt
227                    if let Ok(mut queue) = self.deactivated_headers.try_lock() {
228                        queue.push(DeactivatedHeader {
229                            header: header.header.clone(),
230                            when_msecs: now_msecs(),
231                            tries: header.tries + 1,
232                        });
233                    }
234                }
235            }
236        }
237
238        Ok(log)
239    }
240}
241
242// ---------------------------------------------------------------------------
243// Tests
244// ---------------------------------------------------------------------------
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use crate::services::types::BlockHeader;
250
251    #[test]
252    fn test_default_intervals() {
253        assert_eq!(ONE_MINUTE * 10, 600_000);
254        assert_eq!(ONE_SECOND * 30, 30_000);
255    }
256
257    #[test]
258    fn test_task_name() {
259        assert_eq!("Reorg", "Reorg");
260    }
261
262    #[tokio::test]
263    async fn test_deactivated_header_aging() {
264        let queue: Arc<tokio::sync::Mutex<Vec<DeactivatedHeader>>> =
265            Arc::new(tokio::sync::Mutex::new(Vec::new()));
266
267        let header = BlockHeader {
268            version: 1,
269            previous_hash: "0000".to_string(),
270            merkle_root: "abcd".to_string(),
271            time: 1234567890,
272            bits: 0x1d00ffff,
273            nonce: 42,
274            height: 100,
275            hash: "blockhash".to_string(),
276        };
277
278        // Add a header with old timestamp (should be aged)
279        {
280            let mut q = queue.lock().await;
281            q.push(DeactivatedHeader {
282                when_msecs: 1000, // very old
283                tries: 0,
284                header: header.clone(),
285            });
286        }
287
288        // Verify it's in the queue
289        let q = queue.lock().await;
290        assert_eq!(q.len(), 1);
291        assert_eq!(q[0].tries, 0);
292    }
293}