bsv_wallet_toolbox/monitor/tasks/
task_arc_sse.rs1use std::sync::Arc;
10
11use async_trait::async_trait;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::error::WalletError;
16use crate::monitor::task_trait::WalletMonitorTask;
17use crate::monitor::AsyncCallback;
18use crate::services::providers::arc_sse_client::{ArcSseClient, ArcSseClientOptions};
19use crate::services::traits::WalletServices;
20use crate::services::types::ArcSseEvent;
21use crate::status::ProvenTxReqStatus;
22use crate::storage::find_args::{FindProvenTxReqsArgs, ProvenTxReqPartial};
23use crate::storage::manager::WalletStorageManager;
24use crate::storage::traits::reader::StorageReader;
25use crate::storage::traits::reader_writer::StorageReaderWriter;
26
27const TERMINAL_STATUSES: &[ProvenTxReqStatus] = &[
29 ProvenTxReqStatus::Completed,
30 ProvenTxReqStatus::Invalid,
31 ProvenTxReqStatus::DoubleSpend,
32];
33
34pub struct TaskArcSse {
39 storage: WalletStorageManager,
40 _services: Arc<dyn WalletServices>,
41 callback_token: Option<String>,
43 on_tx_status_changed: Option<AsyncCallback<(String, String)>>,
45 sse_client: Option<ArcSseClient>,
47 sse_receiver: Option<mpsc::Receiver<ArcSseEvent>>,
49 pending_events: Vec<ArcSseEvent>,
51}
52
53impl TaskArcSse {
54 pub fn new(
56 storage: WalletStorageManager,
57 services: Arc<dyn WalletServices>,
58 callback_token: Option<String>,
59 on_tx_status_changed: Option<AsyncCallback<(String, String)>>,
60 ) -> Self {
61 Self {
62 storage,
63 _services: services,
64 callback_token,
65 on_tx_status_changed,
66 sse_client: None,
67 sse_receiver: None,
68 pending_events: Vec::new(),
69 }
70 }
71
72 async fn process_status_event(&self, event: &ArcSseEvent) -> String {
74 let mut log = format!("SSE: txid={} status={}\n", event.txid, event.tx_status);
75
76 let reqs = match self
78 .storage
79 .find_proven_tx_reqs(&FindProvenTxReqsArgs {
80 partial: ProvenTxReqPartial {
81 txid: Some(event.txid.clone()),
82 ..Default::default()
83 },
84 since: None,
85 paged: None,
86 statuses: None,
87 })
88 .await
89 {
90 Ok(r) => r,
91 Err(e) => {
92 log.push_str(&format!(" error finding reqs: {}\n", e));
93 return log;
94 }
95 };
96
97 if reqs.is_empty() {
98 log.push_str(" No matching ProvenTxReq\n");
99 return log;
100 }
101
102 for req in &reqs {
103 if TERMINAL_STATUSES.contains(&req.status) {
105 log.push_str(&format!(
106 " req {} already terminal: {:?}\n",
107 req.proven_tx_req_id, req.status
108 ));
109 continue;
110 }
111
112 match event.tx_status.as_str() {
113 "SENT_TO_NETWORK" | "ACCEPTED_BY_NETWORK" | "SEEN_ON_NETWORK" => {
114 if matches!(
116 req.status,
117 ProvenTxReqStatus::Unsent
118 | ProvenTxReqStatus::Sending
119 | ProvenTxReqStatus::Callback
120 ) {
121 let update = ProvenTxReqPartial {
122 status: Some(ProvenTxReqStatus::Unmined),
123 ..Default::default()
124 };
125 let _ = self
126 .storage
127 .update_proven_tx_req(req.proven_tx_req_id, &update)
128 .await;
129
130 if let Ok(notify) = serde_json::from_str::<serde_json::Value>(&req.notify) {
132 if let Some(tx_ids) =
133 notify.get("transactionIds").and_then(|v| v.as_array())
134 {
135 let ids: Vec<i64> =
136 tx_ids.iter().filter_map(|v| v.as_i64()).collect();
137 for id in &ids {
138 let _ = self
139 .storage
140 .update_transaction(
141 *id,
142 &crate::storage::find_args::TransactionPartial {
143 status: Some(
144 crate::status::TransactionStatus::Unproven,
145 ),
146 ..Default::default()
147 },
148 )
149 .await;
150 }
151 }
152 }
153 log.push_str(&format!(" req {} => unmined\n", req.proven_tx_req_id));
154 }
155 }
156 "MINED" | "IMMUTABLE" => {
157 let update = ProvenTxReqPartial {
160 status: Some(ProvenTxReqStatus::Unmined),
161 ..Default::default()
162 };
163 let _ = self
164 .storage
165 .update_proven_tx_req(req.proven_tx_req_id, &update)
166 .await;
167 log.push_str(&format!(
168 " req {} MINED/IMMUTABLE => unmined (proof collection deferred)\n",
169 req.proven_tx_req_id
170 ));
171 }
172 "DOUBLE_SPEND_ATTEMPTED" => {
173 let update = ProvenTxReqPartial {
174 status: Some(ProvenTxReqStatus::DoubleSpend),
175 ..Default::default()
176 };
177 let _ = self
178 .storage
179 .update_proven_tx_req(req.proven_tx_req_id, &update)
180 .await;
181
182 if let Ok(notify) = serde_json::from_str::<serde_json::Value>(&req.notify) {
184 if let Some(tx_ids) =
185 notify.get("transactionIds").and_then(|v| v.as_array())
186 {
187 let ids: Vec<i64> = tx_ids.iter().filter_map(|v| v.as_i64()).collect();
188 for id in &ids {
189 let _ = self
190 .storage
191 .update_transaction(
192 *id,
193 &crate::storage::find_args::TransactionPartial {
194 status: Some(crate::status::TransactionStatus::Failed),
195 ..Default::default()
196 },
197 )
198 .await;
199 }
200 }
201 }
202 log.push_str(&format!(" req {} => doubleSpend\n", req.proven_tx_req_id));
203 }
204 "REJECTED" => {
205 let update = ProvenTxReqPartial {
206 status: Some(ProvenTxReqStatus::Invalid),
207 ..Default::default()
208 };
209 let _ = self
210 .storage
211 .update_proven_tx_req(req.proven_tx_req_id, &update)
212 .await;
213
214 if let Ok(notify) = serde_json::from_str::<serde_json::Value>(&req.notify) {
216 if let Some(tx_ids) =
217 notify.get("transactionIds").and_then(|v| v.as_array())
218 {
219 let ids: Vec<i64> = tx_ids.iter().filter_map(|v| v.as_i64()).collect();
220 for id in &ids {
221 let _ = self
222 .storage
223 .update_transaction(
224 *id,
225 &crate::storage::find_args::TransactionPartial {
226 status: Some(crate::status::TransactionStatus::Failed),
227 ..Default::default()
228 },
229 )
230 .await;
231 }
232 }
233 }
234 log.push_str(&format!(" req {} => invalid\n", req.proven_tx_req_id));
235 }
236 other => {
237 log.push_str(&format!(
238 " req {} unhandled status: {}\n",
239 req.proven_tx_req_id, other
240 ));
241 }
242 }
243 }
244
245 if let Some(ref cb) = self.on_tx_status_changed {
247 cb((event.txid.clone(), event.tx_status.clone())).await;
248 }
249
250 log
251 }
252}
253
254#[async_trait]
255impl WalletMonitorTask for TaskArcSse {
256 fn name(&self) -> &str {
257 "ArcadeSSE"
258 }
259
260 async fn async_setup(&mut self) -> Result<(), WalletError> {
261 let callback_token = match &self.callback_token {
262 Some(t) if !t.is_empty() => t.clone(),
263 _ => {
264 info!("[TaskArcSse] no callbackToken configured -- SSE disabled");
265 return Ok(());
266 }
267 };
268
269 info!(
273 "[TaskArcSse] setting up -- token={}...",
274 &callback_token[..callback_token.len().min(8)]
275 );
276
277 let options = ArcSseClientOptions {
278 base_url: String::new(), callback_token,
280 arc_api_key: None,
281 last_event_id: None,
282 };
283
284 let (client, receiver) = ArcSseClient::new(options);
285 self.sse_client = Some(client);
286 self.sse_receiver = Some(receiver);
287
288 Ok(())
289 }
290
291 fn trigger(&mut self, _now_msecs: u64) -> bool {
292 if let Some(ref mut rx) = self.sse_receiver {
294 loop {
295 match rx.try_recv() {
296 Ok(event) => self.pending_events.push(event),
297 Err(mpsc::error::TryRecvError::Empty) => break,
298 Err(mpsc::error::TryRecvError::Disconnected) => break,
299 }
300 }
301 }
302 !self.pending_events.is_empty()
303 }
304
305 async fn run_task(&mut self) -> Result<String, WalletError> {
306 let events: Vec<ArcSseEvent> = self.pending_events.drain(..).collect();
307 if events.is_empty() {
308 return Ok(String::new());
309 }
310
311 let mut log = String::new();
312 for event in &events {
313 log.push_str(&self.process_status_event(event).await);
314 }
315
316 Ok(log)
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323
324 #[test]
325 fn test_terminal_statuses() {
326 assert!(TERMINAL_STATUSES.contains(&ProvenTxReqStatus::Completed));
327 assert!(TERMINAL_STATUSES.contains(&ProvenTxReqStatus::Invalid));
328 assert!(TERMINAL_STATUSES.contains(&ProvenTxReqStatus::DoubleSpend));
329 assert!(!TERMINAL_STATUSES.contains(&ProvenTxReqStatus::Unmined));
330 }
331
332 #[test]
333 fn test_name() {
334 assert_eq!("ArcadeSSE", "ArcadeSSE");
335 }
336
337 #[test]
338 fn test_no_sse_configured_is_noop() {
339 let events: Vec<ArcSseEvent> = Vec::new();
344 assert!(events.is_empty());
345 }
346}