Skip to main content

bsv_wallet_toolbox/monitor/tasks/
task_send_waiting.rs

1//! TaskSendWaiting -- broadcasts pending unsent/sending ProvenTxReqs to the network.
2//!
3//! Translated from wallet-toolbox/src/monitor/tasks/TaskSendWaiting.ts (133 lines).
4//!
5//! Finds ProvenTxReqs with status "unsent" (and optionally "sending") that
6//! have aged past `aged_msecs`, then calls attempt_to_post_reqs_to_network.
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11
12use crate::error::WalletError;
13use crate::monitor::helpers::attempt_to_post_reqs_to_network;
14use crate::monitor::{AsyncCallback, ONE_MINUTE, ONE_SECOND};
15use crate::services::traits::WalletServices;
16use crate::status::ProvenTxReqStatus;
17use crate::storage::find_args::{FindProvenTxReqsArgs, Paged, ProvenTxReqPartial};
18use crate::storage::manager::WalletStorageManager;
19use crate::storage::traits::reader::StorageReader;
20use crate::types::Chain;
21
22use super::super::task_trait::WalletMonitorTask;
23
24/// Background task that broadcasts pending transactions to the network.
25///
26/// Periodically queries for ProvenTxReqs with status "unsent" (and optionally
27/// "sending" for retry), filters by age, and posts them via services.post_beef().
28pub struct TaskSendWaiting {
29    /// Storage manager for persistence operations.
30    storage: WalletStorageManager,
31    /// Network services for broadcasting.
32    services: Arc<dyn WalletServices>,
33    /// How often to trigger (default 8 seconds).
34    trigger_msecs: u64,
35    /// Only process reqs older than this (default 7 seconds).
36    aged_msecs: u64,
37    /// How often to include "sending" status reqs (default 5 minutes).
38    sending_msecs: u64,
39    /// Last time this task ran (epoch ms).
40    last_run_msecs: u64,
41    /// Last time "sending" reqs were included (epoch ms).
42    last_sending_run_msecs: Option<u64>,
43    /// Whether to include "sending" status in next run.
44    include_sending: bool,
45    /// Optional callback when a transaction is broadcasted.
46    on_tx_broadcasted: Option<AsyncCallback<String>>,
47    /// Chain (main/test) -- reserved for chain-specific broadcast logic.
48    _chain: Chain,
49}
50
51impl TaskSendWaiting {
52    /// Create a new TaskSendWaiting with default intervals.
53    pub fn new(
54        storage: WalletStorageManager,
55        services: Arc<dyn WalletServices>,
56        chain: Chain,
57        on_tx_broadcasted: Option<AsyncCallback<String>>,
58    ) -> Self {
59        Self {
60            storage,
61            services,
62            trigger_msecs: ONE_SECOND * 8,
63            aged_msecs: ONE_SECOND * 7,
64            sending_msecs: ONE_MINUTE * 5,
65            last_run_msecs: 0,
66            last_sending_run_msecs: None,
67            include_sending: true,
68            on_tx_broadcasted,
69            _chain: chain,
70        }
71    }
72
73    /// Create with custom intervals.
74    pub fn with_intervals(
75        storage: WalletStorageManager,
76        services: Arc<dyn WalletServices>,
77        chain: Chain,
78        trigger_msecs: u64,
79        aged_msecs: u64,
80        sending_msecs: u64,
81        on_tx_broadcasted: Option<AsyncCallback<String>>,
82    ) -> Self {
83        Self {
84            storage,
85            services,
86            trigger_msecs,
87            aged_msecs,
88            sending_msecs,
89            last_run_msecs: 0,
90            last_sending_run_msecs: None,
91            include_sending: true,
92            on_tx_broadcasted,
93            _chain: chain,
94        }
95    }
96}
97
98#[async_trait]
99impl WalletMonitorTask for TaskSendWaiting {
100    fn name(&self) -> &str {
101        "SendWaiting"
102    }
103
104    fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
105        // Determine whether to include "sending" status reqs this cycle
106        self.include_sending = match self.last_sending_run_msecs {
107            None => true,
108            Some(last) => now_msecs_since_epoch > last + self.sending_msecs,
109        };
110        if self.include_sending {
111            self.last_sending_run_msecs = Some(now_msecs_since_epoch);
112        }
113
114        // Check if trigger interval has elapsed
115        if now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs {
116            self.last_run_msecs = now_msecs_since_epoch;
117            true
118        } else {
119            false
120        }
121    }
122
123    async fn run_task(&mut self) -> Result<String, WalletError> {
124        let mut log = String::new();
125        let limit = 100i64;
126        let mut offset = 0i64;
127
128        // Calculate the age cutoff
129        let aged_limit =
130            chrono::Utc::now().naive_utc() - chrono::Duration::milliseconds(self.aged_msecs as i64);
131
132        // Build status filter
133        let statuses = if self.include_sending {
134            vec![ProvenTxReqStatus::Unsent, ProvenTxReqStatus::Sending]
135        } else {
136            vec![ProvenTxReqStatus::Unsent]
137        };
138
139        loop {
140            let args = FindProvenTxReqsArgs {
141                partial: ProvenTxReqPartial::default(),
142                statuses: Some(statuses.clone()),
143                paged: Some(Paged { limit, offset }),
144                since: None,
145            };
146
147            let reqs = self.storage.find_proven_tx_reqs(&args).await?;
148            let count = reqs.len();
149
150            if reqs.is_empty() {
151                break;
152            }
153
154            log.push_str(&format!(
155                "{} reqs with status {}\n",
156                reqs.len(),
157                statuses
158                    .iter()
159                    .map(|s| s.to_string())
160                    .collect::<Vec<_>>()
161                    .join(" or ")
162            ));
163
164            // Filter by age: only process reqs updated before the cutoff
165            let aged_reqs: Vec<_> = reqs
166                .into_iter()
167                .filter(|req| req.updated_at < aged_limit)
168                .collect();
169
170            log.push_str(&format!(
171                "  Of those, {} aged past {} ms.\n",
172                aged_reqs.len(),
173                self.aged_msecs
174            ));
175
176            if !aged_reqs.is_empty() {
177                let post_result = attempt_to_post_reqs_to_network(
178                    &self.storage,
179                    self.services.as_ref(),
180                    &aged_reqs,
181                )
182                .await?;
183                log.push_str(&post_result.log);
184
185                // Fire callback for each successfully posted req
186                if let Some(ref cb) = self.on_tx_broadcasted {
187                    for detail in &post_result.details {
188                        if detail.status == crate::monitor::helpers::PostReqStatus::Success {
189                            cb(detail.txid.clone()).await;
190                        }
191                    }
192                }
193            }
194
195            if (count as i64) < limit {
196                break;
197            }
198            offset += limit;
199        }
200
201        Ok(log)
202    }
203}
204
205// ---------------------------------------------------------------------------
206// Tests
207// ---------------------------------------------------------------------------
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212
213    #[test]
214    fn test_default_intervals() {
215        // Verify the default intervals are correct
216        assert_eq!(ONE_SECOND * 8, 8000);
217        assert_eq!(ONE_SECOND * 7, 7000);
218        assert_eq!(ONE_MINUTE * 5, 300_000);
219    }
220
221    #[test]
222    fn test_task_name() {
223        // Verify the task name constant
224        assert_eq!("SendWaiting", "SendWaiting");
225    }
226}