Skip to main content

bsv_wallet_toolbox/monitor/tasks/
task_unfail.rs

1//! TaskUnFail -- retries previously failed transactions by re-checking proofs.
2//!
3//! Translated from wallet-toolbox/src/monitor/tasks/TaskUnFail.ts (151 lines).
4//!
5//! Setting provenTxReq status to 'unfail' when 'invalid' will attempt to find
6//! a merklePath. If successful: set req to 'unmined', referenced txs to 'unproven',
7//! update input/output spendability. If not found: return to 'invalid'.
8
9use std::io::Cursor;
10use std::sync::Arc;
11
12use bsv::transaction::transaction::Transaction;
13
14use crate::error::WalletError;
15use crate::monitor::helpers::now_msecs;
16use crate::monitor::task_trait::WalletMonitorTask;
17use crate::monitor::ONE_MINUTE;
18use crate::services::traits::WalletServices;
19use crate::status::ProvenTxReqStatus;
20use crate::storage::find_args::{
21    FindOutputsArgs, FindProvenTxReqsArgs, OutputPartial, Paged, ProvenTxReqPartial,
22};
23use crate::storage::manager::WalletStorageManager;
24use crate::storage::traits::reader::StorageReader;
25use crate::storage::traits::reader_writer::StorageReaderWriter;
26use async_trait::async_trait;
27
28/// Task that retries previously failed transactions.
29///
30/// Finds ProvenTxReqs with status "unfail" and attempts to retrieve a merkle proof.
31/// If proof is found: status -> unmined, referenced transactions -> unproven.
32/// If no proof: status -> invalid (back to failed).
33pub struct TaskUnFail {
34    storage: WalletStorageManager,
35    services: Arc<dyn WalletServices>,
36    trigger_msecs: u64,
37    last_run_msecs: u64,
38    /// Manual trigger flag.
39    pub check_now: bool,
40}
41
42impl TaskUnFail {
43    /// Create a new unfail task.
44    pub fn new(storage: WalletStorageManager, services: Arc<dyn WalletServices>) -> Self {
45        Self {
46            storage,
47            services,
48            trigger_msecs: 10 * ONE_MINUTE,
49            last_run_msecs: 0,
50            check_now: false,
51        }
52    }
53
54    /// Set the trigger interval in milliseconds.
55    pub fn with_trigger_msecs(mut self, msecs: u64) -> Self {
56        self.trigger_msecs = msecs;
57        self
58    }
59
60    /// Populate locking_script from raw transaction if missing.
61    /// Matches TS `validateOutputScript` from StorageProvider.ts.
62    async fn validate_output_script(&self, output: &mut crate::tables::Output) {
63        // Without offset and length, nothing to recover
64        let script_length = match output.script_length {
65            Some(len) if len > 0 => len as usize,
66            _ => return,
67        };
68        let script_offset = match output.script_offset {
69            Some(off) if off >= 0 => off as usize,
70            _ => return,
71        };
72        let txid = match &output.txid {
73            Some(t) if !t.is_empty() => t.clone(),
74            _ => return,
75        };
76
77        // If locking_script exists and has correct length, nothing to do
78        if let Some(ref script) = output.locking_script {
79            if script.len() == script_length {
80                return;
81            }
82        }
83
84        // Look up the raw transaction to extract the script
85        let tx_args = crate::storage::find_args::FindTransactionsArgs {
86            partial: crate::storage::find_args::TransactionPartial {
87                txid: Some(txid),
88                ..Default::default()
89            },
90            no_raw_tx: false, // We need raw_tx
91            ..Default::default()
92        };
93
94        if let Ok(txs) = self.storage.find_transactions(&tx_args).await {
95            if let Some(tx) = txs.first() {
96                if let Some(ref raw_tx) = tx.raw_tx {
97                    let end = script_offset + script_length;
98                    if end <= raw_tx.len() {
99                        output.locking_script = Some(raw_tx[script_offset..end].to_vec());
100                    }
101                }
102            }
103        }
104    }
105
106    /// Process a list of "unfail" reqs: attempt to get merkle path for each.
107    async fn unfail(
108        &self,
109        reqs: &[crate::tables::ProvenTxReq],
110        indent: usize,
111    ) -> Result<String, WalletError> {
112        let mut log = String::new();
113        let pad = " ".repeat(indent);
114
115        for req in reqs {
116            log.push_str(&format!(
117                "{}reqId {} txid {}: ",
118                pad, req.proven_tx_req_id, req.txid
119            ));
120
121            let gmpr = self.services.get_merkle_path(&req.txid, false).await;
122
123            if gmpr.merkle_path.is_some() {
124                // Proof found -- set req to unmined
125                let update = ProvenTxReqPartial {
126                    status: Some(ProvenTxReqStatus::Unmined),
127                    ..Default::default()
128                };
129                let _ = self
130                    .storage
131                    .update_proven_tx_req(req.proven_tx_req_id, &update)
132                    .await;
133                log.push_str("unfailed. status is now 'unmined'\n");
134
135                // Also update referenced transactions to 'unproven'
136                // Parse notify JSON to get transactionIds
137                if let Ok(notify) = serde_json::from_str::<serde_json::Value>(&req.notify) {
138                    if let Some(tx_ids) = notify.get("transactionIds").and_then(|v| v.as_array()) {
139                        let ids: Vec<i64> = tx_ids.iter().filter_map(|v| v.as_i64()).collect();
140                        if !ids.is_empty() {
141                            let inner_pad = " ".repeat(indent + 2);
142                            for id in &ids {
143                                let _ = self
144                                    .storage
145                                    .update_transaction(
146                                        *id,
147                                        &crate::storage::find_args::TransactionPartial {
148                                            status: Some(
149                                                crate::status::TransactionStatus::Unproven,
150                                            ),
151                                            ..Default::default()
152                                        },
153                                    )
154                                    .await;
155                                log.push_str(&format!(
156                                    "{}transaction {} status is now 'unproven'\n",
157                                    inner_pad, id
158                                ));
159
160                                // Step 3: Parse raw_tx and match inputs to user's outputs
161                                // First, look up the transaction to get userId (multi-tenant safety)
162                                let tx_record = {
163                                    let tx_args = crate::storage::find_args::FindTransactionsArgs {
164                                        partial: crate::storage::find_args::TransactionPartial {
165                                            transaction_id: Some(*id),
166                                            ..Default::default()
167                                        },
168                                        ..Default::default()
169                                    };
170                                    self.storage
171                                        .find_transactions(&tx_args)
172                                        .await
173                                        .ok()
174                                        .and_then(|txs| txs.into_iter().next())
175                                };
176                                let user_id = tx_record.as_ref().map(|t| t.user_id);
177
178                                if !req.raw_tx.is_empty() {
179                                    if let Ok(bsvtx) =
180                                        Transaction::from_binary(&mut Cursor::new(&req.raw_tx))
181                                    {
182                                        for (vin, input) in bsvtx.inputs.iter().enumerate() {
183                                            let source_txid = match &input.source_txid {
184                                                Some(t) => t.clone(),
185                                                None => continue,
186                                            };
187                                            let source_vout = input.source_output_index as i32;
188
189                                            let find_args = FindOutputsArgs {
190                                                partial: OutputPartial {
191                                                    user_id,
192                                                    txid: Some(source_txid),
193                                                    vout: Some(source_vout),
194                                                    ..Default::default()
195                                                },
196                                                ..Default::default()
197                                            };
198
199                                            match self.storage.find_outputs(&find_args).await {
200                                                Ok(outputs) if outputs.len() == 1 => {
201                                                    let oi = &outputs[0];
202                                                    let update = OutputPartial {
203                                                        spendable: Some(false),
204                                                        spent_by: Some(*id),
205                                                        ..Default::default()
206                                                    };
207                                                    let _ = self
208                                                        .storage
209                                                        .update_output(oi.output_id, &update)
210                                                        .await;
211                                                    log.push_str(&format!(
212                                                        "{}input {} matched to output {} updated spentBy {}\n",
213                                                        inner_pad, vin, oi.output_id, id
214                                                    ));
215                                                }
216                                                _ => {
217                                                    log.push_str(&format!(
218                                                        "{}input {} not matched to user's outputs\n",
219                                                        inner_pad, vin
220                                                    ));
221                                                }
222                                            }
223                                        }
224
225                                        // Step 4: Check output spendability via isUtxo
226                                        let out_find_args = FindOutputsArgs {
227                                            partial: OutputPartial {
228                                                user_id,
229                                                transaction_id: Some(*id),
230                                                ..Default::default()
231                                            },
232                                            ..Default::default()
233                                        };
234
235                                        if let Ok(outputs) =
236                                            self.storage.find_outputs(&out_find_args).await
237                                        {
238                                            for o in &outputs {
239                                                // Populate locking_script from raw_tx if missing
240                                                // (matches TS validateOutputScript)
241                                                let mut o = o.clone();
242                                                self.validate_output_script(&mut o).await;
243
244                                                let script_bytes = match &o.locking_script {
245                                                    Some(s) if !s.is_empty() => s,
246                                                    _ => {
247                                                        log.push_str(&format!(
248                                                            "{}output {} does not have a valid locking script\n",
249                                                            inner_pad, o.output_id
250                                                        ));
251                                                        continue;
252                                                    }
253                                                };
254
255                                                let txid_str = o.txid.as_deref().unwrap_or("");
256                                                let vout = o.vout as u32;
257
258                                                match self
259                                                    .services
260                                                    .is_utxo(script_bytes, txid_str, vout)
261                                                    .await
262                                                {
263                                                    Ok(is_utxo) => {
264                                                        let current_spendable = o.spendable;
265                                                        if is_utxo != current_spendable {
266                                                            let update = OutputPartial {
267                                                                spendable: Some(is_utxo),
268                                                                ..Default::default()
269                                                            };
270                                                            let _ = self
271                                                                .storage
272                                                                .update_output(o.output_id, &update)
273                                                                .await;
274                                                            log.push_str(&format!(
275                                                                "{}output {} set to {}\n",
276                                                                inner_pad,
277                                                                o.output_id,
278                                                                if is_utxo {
279                                                                    "spendable"
280                                                                } else {
281                                                                    "spent"
282                                                                }
283                                                            ));
284                                                        }
285                                                    }
286                                                    Err(_) => {}
287                                                }
288                                            }
289                                        }
290                                    }
291                                }
292                            }
293                        }
294                    }
295                }
296            } else {
297                // No proof found -- return to invalid
298                let update = ProvenTxReqPartial {
299                    status: Some(ProvenTxReqStatus::Invalid),
300                    ..Default::default()
301                };
302                let _ = self
303                    .storage
304                    .update_proven_tx_req(req.proven_tx_req_id, &update)
305                    .await;
306                log.push_str("returned to status 'invalid'\n");
307            }
308        }
309
310        Ok(log)
311    }
312}
313
314#[async_trait]
315impl WalletMonitorTask for TaskUnFail {
316    fn name(&self) -> &str {
317        "UnFail"
318    }
319
320    fn trigger(&mut self, now_msecs: u64) -> bool {
321        self.check_now
322            || (self.trigger_msecs > 0 && now_msecs > self.last_run_msecs + self.trigger_msecs)
323    }
324
325    async fn run_task(&mut self) -> Result<String, WalletError> {
326        self.last_run_msecs = now_msecs();
327        self.check_now = false;
328
329        let mut log = String::new();
330
331        let limit = 100i64;
332        let mut offset = 0i64;
333        loop {
334            let reqs = self
335                .storage
336                .find_proven_tx_reqs(&FindProvenTxReqsArgs {
337                    partial: ProvenTxReqPartial::default(),
338                    since: None,
339                    paged: Some(Paged { limit, offset }),
340                    statuses: Some(vec![ProvenTxReqStatus::Unfail]),
341                })
342                .await?;
343
344            if reqs.is_empty() {
345                break;
346            }
347
348            log.push_str(&format!("{} reqs with status 'unfail'\n", reqs.len()));
349            let r = self.unfail(&reqs, 2).await?;
350            log.push_str(&r);
351            log.push('\n');
352
353            if (reqs.len() as i64) < limit {
354                break;
355            }
356            offset += limit;
357        }
358
359        Ok(log)
360    }
361}
362
363#[cfg(test)]
364mod tests {
365    use crate::monitor::ONE_MINUTE;
366
367    #[test]
368    fn test_unfail_defaults() {
369        // Verify default trigger interval matches TS (10 minutes)
370        assert_eq!(10 * ONE_MINUTE, 600_000);
371    }
372
373    #[test]
374    fn test_name() {
375        assert_eq!("UnFail", "UnFail");
376    }
377
378    #[test]
379    fn test_script_extraction_logic() {
380        // Simulate extracting script bytes from raw_tx using offset/length
381        let raw_tx = vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9];
382        let script_offset: usize = 3;
383        let script_length: usize = 4;
384        let end = script_offset + script_length;
385        assert!(end <= raw_tx.len());
386        let script = &raw_tx[script_offset..end];
387        assert_eq!(script, &[3, 4, 5, 6]);
388
389        // Verify no extraction when end exceeds raw_tx length
390        let bad_offset: usize = 8;
391        let bad_end = bad_offset + script_length;
392        assert!(bad_end > raw_tx.len());
393    }
394
395    #[test]
396    fn test_unfail_steps_3_4_logic() {
397        // Step 3: matching inputs to outputs
398        let found_outputs = 1;
399        let should_update_spent_by = found_outputs == 1;
400        assert!(should_update_spent_by);
401
402        // Step 4: spendability check
403        let current_spendable = true;
404        let is_utxo = false;
405        let should_update = is_utxo != current_spendable;
406        assert!(should_update);
407    }
408}