bsv_wallet_toolbox/monitor/tasks/
task_fail_abandoned.rs1use async_trait::async_trait;
10
11use crate::error::WalletError;
12use crate::monitor::ONE_MINUTE;
13use crate::status::TransactionStatus;
14use crate::storage::find_args::{FindTransactionsArgs, TransactionPartial};
15use crate::storage::manager::WalletStorageManager;
16use crate::storage::traits::reader::StorageReader;
17use crate::storage::traits::reader_writer::StorageReaderWriter;
18
19use super::super::task_trait::WalletMonitorTask;
20
21pub struct TaskFailAbandoned {
27 storage: WalletStorageManager,
29 trigger_msecs: u64,
31 last_run_msecs: u64,
33 abandoned_msecs: u64,
35}
36
37impl TaskFailAbandoned {
38 pub fn new(storage: WalletStorageManager, abandoned_msecs: u64) -> Self {
40 Self {
41 storage,
42 trigger_msecs: ONE_MINUTE * 5,
43 last_run_msecs: 0,
44 abandoned_msecs,
45 }
46 }
47
48 pub fn with_trigger_msecs(
50 storage: WalletStorageManager,
51 trigger_msecs: u64,
52 abandoned_msecs: u64,
53 ) -> Self {
54 Self {
55 storage,
56 trigger_msecs,
57 last_run_msecs: 0,
58 abandoned_msecs,
59 }
60 }
61}
62
63#[async_trait]
64impl WalletMonitorTask for TaskFailAbandoned {
65 fn name(&self) -> &str {
66 "FailAbandoned"
67 }
68
69 fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
70 if now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs {
71 self.last_run_msecs = now_msecs_since_epoch;
72 true
73 } else {
74 false
75 }
76 }
77
78 async fn run_task(&mut self) -> Result<String, WalletError> {
79 let mut log = String::new();
80 let limit = 100i64;
81 let mut offset = 0i64;
82
83 let abandoned_cutoff = chrono::Utc::now().naive_utc()
84 - chrono::Duration::milliseconds(self.abandoned_msecs as i64);
85
86 loop {
87 let args = FindTransactionsArgs {
88 partial: TransactionPartial::default(),
89 status: Some(vec![
90 TransactionStatus::Unprocessed,
91 TransactionStatus::Unsigned,
92 ]),
93 paged: Some(crate::storage::find_args::Paged { limit, offset }),
94 since: None,
95 no_raw_tx: true,
96 };
97
98 let txs = self.storage.find_transactions(&args).await?;
99 let count = txs.len();
100
101 if txs.is_empty() {
102 break;
103 }
104
105 let abandoned_txs: Vec<_> = txs
107 .into_iter()
108 .filter(|tx| tx.updated_at < abandoned_cutoff)
109 .collect();
110
111 for tx in &abandoned_txs {
112 if let Some(ref txid) = tx.txid {
113 match self
114 .storage
115 .update_transaction_status(txid, TransactionStatus::Failed)
116 .await
117 {
118 Ok(_) => {
119 log.push_str(&format!(
120 "updated tx {} (id={}) status to 'failed'\n",
121 txid, tx.transaction_id
122 ));
123 }
124 Err(e) => {
125 log.push_str(&format!(
126 "error failing tx {} (id={}): {}\n",
127 txid, tx.transaction_id, e
128 ));
129 }
130 }
131 } else {
132 log.push_str(&format!("skipped tx id={} (no txid)\n", tx.transaction_id));
134 }
135 }
136
137 if (count as i64) < limit {
138 break;
139 }
140 offset += limit;
141 }
142
143 Ok(log)
144 }
145}
146
147#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn test_default_intervals() {
157 assert_eq!(ONE_MINUTE * 5, 300_000);
159 }
160
161 #[test]
162 fn test_task_name() {
163 assert_eq!("FailAbandoned", "FailAbandoned");
164 }
165
166 #[test]
167 fn test_abandoned_cutoff_calculation() {
168 let now = chrono::Utc::now().naive_utc();
169 let five_min_ago = now - chrono::Duration::milliseconds(ONE_MINUTE as i64 * 5);
170 assert!(five_min_ago < now);
171 }
172}