bsv_wallet_toolbox/monitor/tasks/
task_send_waiting.rs1use std::sync::Arc;
9
10use async_trait::async_trait;
11
12use crate::error::WalletError;
13use crate::monitor::helpers::attempt_to_post_reqs_to_network;
14use crate::monitor::{AsyncCallback, ONE_MINUTE, ONE_SECOND};
15use crate::services::traits::WalletServices;
16use crate::status::ProvenTxReqStatus;
17use crate::storage::find_args::{FindProvenTxReqsArgs, Paged, ProvenTxReqPartial};
18use crate::storage::manager::WalletStorageManager;
19use crate::storage::traits::reader::StorageReader;
20use crate::types::Chain;
21
22use super::super::task_trait::WalletMonitorTask;
23
24pub struct TaskSendWaiting {
29 storage: WalletStorageManager,
31 services: Arc<dyn WalletServices>,
33 trigger_msecs: u64,
35 aged_msecs: u64,
37 sending_msecs: u64,
39 last_run_msecs: u64,
41 last_sending_run_msecs: Option<u64>,
43 include_sending: bool,
45 on_tx_broadcasted: Option<AsyncCallback<String>>,
47 _chain: Chain,
49}
50
51impl TaskSendWaiting {
52 pub fn new(
54 storage: WalletStorageManager,
55 services: Arc<dyn WalletServices>,
56 chain: Chain,
57 on_tx_broadcasted: Option<AsyncCallback<String>>,
58 ) -> Self {
59 Self {
60 storage,
61 services,
62 trigger_msecs: ONE_SECOND * 8,
63 aged_msecs: ONE_SECOND * 7,
64 sending_msecs: ONE_MINUTE * 5,
65 last_run_msecs: 0,
66 last_sending_run_msecs: None,
67 include_sending: true,
68 on_tx_broadcasted,
69 _chain: chain,
70 }
71 }
72
73 pub fn with_intervals(
75 storage: WalletStorageManager,
76 services: Arc<dyn WalletServices>,
77 chain: Chain,
78 trigger_msecs: u64,
79 aged_msecs: u64,
80 sending_msecs: u64,
81 on_tx_broadcasted: Option<AsyncCallback<String>>,
82 ) -> Self {
83 Self {
84 storage,
85 services,
86 trigger_msecs,
87 aged_msecs,
88 sending_msecs,
89 last_run_msecs: 0,
90 last_sending_run_msecs: None,
91 include_sending: true,
92 on_tx_broadcasted,
93 _chain: chain,
94 }
95 }
96}
97
98#[async_trait]
99impl WalletMonitorTask for TaskSendWaiting {
100 fn name(&self) -> &str {
101 "SendWaiting"
102 }
103
104 fn trigger(&mut self, now_msecs_since_epoch: u64) -> bool {
105 self.include_sending = match self.last_sending_run_msecs {
107 None => true,
108 Some(last) => now_msecs_since_epoch > last + self.sending_msecs,
109 };
110 if self.include_sending {
111 self.last_sending_run_msecs = Some(now_msecs_since_epoch);
112 }
113
114 if now_msecs_since_epoch > self.last_run_msecs + self.trigger_msecs {
116 self.last_run_msecs = now_msecs_since_epoch;
117 true
118 } else {
119 false
120 }
121 }
122
123 async fn run_task(&mut self) -> Result<String, WalletError> {
124 let mut log = String::new();
125 let limit = 100i64;
126 let mut offset = 0i64;
127
128 let aged_limit =
130 chrono::Utc::now().naive_utc() - chrono::Duration::milliseconds(self.aged_msecs as i64);
131
132 let statuses = if self.include_sending {
134 vec![ProvenTxReqStatus::Unsent, ProvenTxReqStatus::Sending]
135 } else {
136 vec![ProvenTxReqStatus::Unsent]
137 };
138
139 loop {
140 let args = FindProvenTxReqsArgs {
141 partial: ProvenTxReqPartial::default(),
142 statuses: Some(statuses.clone()),
143 paged: Some(Paged { limit, offset }),
144 since: None,
145 };
146
147 let reqs = self.storage.find_proven_tx_reqs(&args).await?;
148 let count = reqs.len();
149
150 if reqs.is_empty() {
151 break;
152 }
153
154 log.push_str(&format!(
155 "{} reqs with status {}\n",
156 reqs.len(),
157 statuses
158 .iter()
159 .map(|s| s.to_string())
160 .collect::<Vec<_>>()
161 .join(" or ")
162 ));
163
164 let aged_reqs: Vec<_> = reqs
166 .into_iter()
167 .filter(|req| req.updated_at < aged_limit)
168 .collect();
169
170 log.push_str(&format!(
171 " Of those, {} aged past {} ms.\n",
172 aged_reqs.len(),
173 self.aged_msecs
174 ));
175
176 if !aged_reqs.is_empty() {
177 let post_result = attempt_to_post_reqs_to_network(
178 &self.storage,
179 self.services.as_ref(),
180 &aged_reqs,
181 )
182 .await?;
183 log.push_str(&post_result.log);
184
185 if let Some(ref cb) = self.on_tx_broadcasted {
187 for detail in &post_result.details {
188 if detail.status == crate::monitor::helpers::PostReqStatus::Success {
189 cb(detail.txid.clone()).await;
190 }
191 }
192 }
193 }
194
195 if (count as i64) < limit {
196 break;
197 }
198 offset += limit;
199 }
200
201 Ok(log)
202 }
203}
204
205#[cfg(test)]
210mod tests {
211 use super::*;
212
213 #[test]
214 fn test_default_intervals() {
215 assert_eq!(ONE_SECOND * 8, 8000);
217 assert_eq!(ONE_SECOND * 7, 7000);
218 assert_eq!(ONE_MINUTE * 5, 300_000);
219 }
220
221 #[test]
222 fn test_task_name() {
223 assert_eq!("SendWaiting", "SendWaiting");
225 }
226}