Skip to main content

bsv_wallet_toolbox/monitor/tasks/
task_fail_abandoned.rs

1//! TaskFailAbandoned -- fails stuck unsigned/unprocessed transactions.
2//!
3//! Translated from wallet-toolbox/src/monitor/tasks/TaskFailAbandoned.ts (54 lines).
4//!
5//! Finds transactions with status "unsigned" or "unprocessed" that have not
6//! been updated for longer than abandoned_msecs, and sets their status to "failed".
7//! This releases locked UTXOs back to spendable status.
8
9use async_trait::async_trait;
10
11use crate::error::WalletError;
12use crate::monitor::ONE_MINUTE;
13use crate::status::TransactionStatus;
14use crate::storage::find_args::{FindTransactionsArgs, TransactionPartial};
15use crate::storage::manager::WalletStorageManager;
16use crate::storage::traits::reader::StorageReader;
17use crate::storage::traits::reader_writer::StorageReaderWriter;
18
19use super::super::task_trait::WalletMonitorTask;
20
21/// Background task that fails stuck transactions.
22///
23/// Transactions with status "unsigned" or "unprocessed" that have not been
24/// updated within abandoned_msecs are marked as "failed". This releases
25/// any locked UTXOs back to spendable status.
26pub struct TaskFailAbandoned {
27    /// Storage manager for persistence operations.
28    storage: WalletStorageManager,
29    /// How often to trigger (default 5 minutes, matching TS).
30    trigger_msecs: u64,
31    /// Last time this task ran (epoch ms).
32    last_run_msecs: u64,
33    /// How long before a transaction is considered abandoned (default 5 minutes).
34    abandoned_msecs: u64,
35}
36
37impl TaskFailAbandoned {
38    /// Create a new TaskFailAbandoned with default intervals.
39    pub fn new(storage: WalletStorageManager, abandoned_msecs: u64) -> Self {
40        Self {
41            storage,
42            trigger_msecs: ONE_MINUTE * 5,
43            last_run_msecs: 0,
44            abandoned_msecs,
45        }
46    }
47
48    /// Create with a custom trigger interval.
49    pub fn with_trigger_msecs(
50        storage: WalletStorageManager,
51        trigger_msecs: u64,
52        abandoned_msecs: u64,
53    ) -> Self {
54        Self {
55            storage,
56            trigger_msecs,
57            last_run_msecs: 0,
58            abandoned_msecs,
59        }
60    }
61}
62
63#[async_trait]
64impl WalletMonitorTask for TaskFailAbandoned {
65    fn name(&self) -> &str {
66        "FailAbandoned"
67    }
68
69    fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
70        if now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs {
71            self.last_run_msecs = now_msecs_since_epoch;
72            true
73        } else {
74            false
75        }
76    }
77
78    async fn run_task(&mut self) -> Result<String, WalletError> {
79        let mut log = String::new();
80        let limit = 100i64;
81        let mut offset = 0i64;
82
83        let abandoned_cutoff = chrono::Utc::now().naive_utc()
84            - chrono::Duration::milliseconds(self.abandoned_msecs as i64);
85
86        loop {
87            let args = FindTransactionsArgs {
88                partial: TransactionPartial::default(),
89                status: Some(vec![
90                    TransactionStatus::Unprocessed,
91                    TransactionStatus::Unsigned,
92                ]),
93                paged: Some(crate::storage::find_args::Paged { limit, offset }),
94                since: None,
95                no_raw_tx: true,
96            };
97
98            let txs = self.storage.find_transactions(&args).await?;
99            let count = txs.len();
100
101            if txs.is_empty() {
102                break;
103            }
104
105            // Filter by age: only fail txs last updated before the cutoff
106            let abandoned_txs: Vec<_> = txs
107                .into_iter()
108                .filter(|tx| tx.updated_at < abandoned_cutoff)
109                .collect();
110
111            for tx in &abandoned_txs {
112                if let Some(ref txid) = tx.txid {
113                    match self
114                        .storage
115                        .update_transaction_status(txid, TransactionStatus::Failed)
116                        .await
117                    {
118                        Ok(_) => {
119                            log.push_str(&format!(
120                                "updated tx {} (id={}) status to 'failed'\n",
121                                txid, tx.transaction_id
122                            ));
123                        }
124                        Err(e) => {
125                            log.push_str(&format!(
126                                "error failing tx {} (id={}): {}\n",
127                                txid, tx.transaction_id, e
128                            ));
129                        }
130                    }
131                } else {
132                    // Transaction has no txid -- fail by transaction_id reference
133                    log.push_str(&format!("skipped tx id={} (no txid)\n", tx.transaction_id));
134                }
135            }
136
137            if (count as i64) < limit {
138                break;
139            }
140            offset += limit;
141        }
142
143        Ok(log)
144    }
145}
146
147// ---------------------------------------------------------------------------
148// Tests
149// ---------------------------------------------------------------------------
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    #[test]
156    fn test_default_intervals() {
157        // Default trigger is 5 minutes = 300_000 ms (matching TS 1000 * 60 * 5)
158        assert_eq!(ONE_MINUTE * 5, 300_000);
159    }
160
161    #[test]
162    fn test_task_name() {
163        assert_eq!("FailAbandoned", "FailAbandoned");
164    }
165
166    #[test]
167    fn test_abandoned_cutoff_calculation() {
168        let now = chrono::Utc::now().naive_utc();
169        let five_min_ago = now - chrono::Duration::milliseconds(ONE_MINUTE as i64 * 5);
170        assert!(five_min_ago < now);
171    }
172}