Skip to main content

bsv_wallet_toolbox/monitor/tasks/
task_arc_sse.rs

1//! TaskArcSse -- processes real-time SSE events from ARC transaction status updates.
2//!
3//! Translated from wallet-toolbox/src/monitor/tasks/TaskArcSSE.ts (290 lines).
4//!
5//! Receives transaction status updates from Arcade via SSE (Server-Sent Events)
6//! and processes them -- including fetching merkle proofs directly from Arcade
7//! when transactions are MINED.
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use tokio::sync::mpsc;
13use tracing::info;
14
15use crate::error::WalletError;
16use crate::monitor::task_trait::WalletMonitorTask;
17use crate::monitor::AsyncCallback;
18use crate::services::providers::arc_sse_client::{ArcSseClient, ArcSseClientOptions};
19use crate::services::traits::WalletServices;
20use crate::services::types::ArcSseEvent;
21use crate::status::ProvenTxReqStatus;
22use crate::storage::find_args::{FindProvenTxReqsArgs, ProvenTxReqPartial};
23use crate::storage::manager::WalletStorageManager;
24use crate::storage::traits::reader::StorageReader;
25use crate::storage::traits::reader_writer::StorageReaderWriter;
26
27/// Terminal statuses for ProvenTxReq -- events for these are skipped.
28const TERMINAL_STATUSES: &[ProvenTxReqStatus] = &[
29    ProvenTxReqStatus::Completed,
30    ProvenTxReqStatus::Invalid,
31    ProvenTxReqStatus::DoubleSpend,
32];
33
34/// Task that receives and processes real-time ARC transaction status via SSE.
35///
36/// Consumes events from an ArcSseClient mpsc channel. If no callback_token is
37/// configured, the task is a no-op (trigger always returns false).
38pub struct TaskArcSse {
39    storage: WalletStorageManager,
40    _services: Arc<dyn WalletServices>,
41    /// Callback token for ARC SSE streaming.
42    callback_token: Option<String>,
43    /// Optional callback when transaction status changes.
44    on_tx_status_changed: Option<AsyncCallback<(String, String)>>,
45    /// The ARC SSE client (created during async_setup if token is configured).
46    sse_client: Option<ArcSseClient>,
47    /// Receiver for SSE events from the client.
48    sse_receiver: Option<mpsc::Receiver<ArcSseEvent>>,
49    /// Buffer of pending events drained from the receiver.
50    pending_events: Vec<ArcSseEvent>,
51}
52
53impl TaskArcSse {
54    /// Create a new ARC SSE task.
55    pub fn new(
56        storage: WalletStorageManager,
57        services: Arc<dyn WalletServices>,
58        callback_token: Option<String>,
59        on_tx_status_changed: Option<AsyncCallback<(String, String)>>,
60    ) -> Self {
61        Self {
62            storage,
63            _services: services,
64            callback_token,
65            on_tx_status_changed,
66            sse_client: None,
67            sse_receiver: None,
68            pending_events: Vec::new(),
69        }
70    }
71
72    /// Process a single SSE status event.
73    async fn process_status_event(&self, event: &ArcSseEvent) -> String {
74        let mut log = format!("SSE: txid={} status={}\n", event.txid, event.tx_status);
75
76        // Find matching ProvenTxReqs
77        let reqs = match self
78            .storage
79            .find_proven_tx_reqs(&FindProvenTxReqsArgs {
80                partial: ProvenTxReqPartial {
81                    txid: Some(event.txid.clone()),
82                    ..Default::default()
83                },
84                since: None,
85                paged: None,
86                statuses: None,
87            })
88            .await
89        {
90            Ok(r) => r,
91            Err(e) => {
92                log.push_str(&format!("  error finding reqs: {}\n", e));
93                return log;
94            }
95        };
96
97        if reqs.is_empty() {
98            log.push_str("  No matching ProvenTxReq\n");
99            return log;
100        }
101
102        for req in &reqs {
103            // Skip terminal statuses
104            if TERMINAL_STATUSES.contains(&req.status) {
105                log.push_str(&format!(
106                    "  req {} already terminal: {:?}\n",
107                    req.proven_tx_req_id, req.status
108                ));
109                continue;
110            }
111
112            match event.tx_status.as_str() {
113                "SENT_TO_NETWORK" | "ACCEPTED_BY_NETWORK" | "SEEN_ON_NETWORK" => {
114                    // Transition to unmined if currently in pre-mining states
115                    if matches!(
116                        req.status,
117                        ProvenTxReqStatus::Unsent
118                            | ProvenTxReqStatus::Sending
119                            | ProvenTxReqStatus::Callback
120                    ) {
121                        let update = ProvenTxReqPartial {
122                            status: Some(ProvenTxReqStatus::Unmined),
123                            ..Default::default()
124                        };
125                        let _ = self
126                            .storage
127                            .update_proven_tx_req(req.proven_tx_req_id, &update)
128                            .await;
129
130                        // Update referenced transactions to unproven
131                        if let Ok(notify) = serde_json::from_str::<serde_json::Value>(&req.notify) {
132                            if let Some(tx_ids) =
133                                notify.get("transactionIds").and_then(|v| v.as_array())
134                            {
135                                let ids: Vec<i64> =
136                                    tx_ids.iter().filter_map(|v| v.as_i64()).collect();
137                                for id in &ids {
138                                    let _ = self
139                                        .storage
140                                        .update_transaction(
141                                            *id,
142                                            &crate::storage::find_args::TransactionPartial {
143                                                status: Some(
144                                                    crate::status::TransactionStatus::Unproven,
145                                                ),
146                                                ..Default::default()
147                                            },
148                                        )
149                                        .await;
150                                }
151                            }
152                        }
153                        log.push_str(&format!("  req {} => unmined\n", req.proven_tx_req_id));
154                    }
155                }
156                "MINED" | "IMMUTABLE" => {
157                    // Transaction mined -- attempt to fetch proof
158                    // For now we set to unmined and let TaskCheckForProofs pick it up
159                    let update = ProvenTxReqPartial {
160                        status: Some(ProvenTxReqStatus::Unmined),
161                        ..Default::default()
162                    };
163                    let _ = self
164                        .storage
165                        .update_proven_tx_req(req.proven_tx_req_id, &update)
166                        .await;
167                    log.push_str(&format!(
168                        "  req {} MINED/IMMUTABLE => unmined (proof collection deferred)\n",
169                        req.proven_tx_req_id
170                    ));
171                }
172                "DOUBLE_SPEND_ATTEMPTED" => {
173                    let update = ProvenTxReqPartial {
174                        status: Some(ProvenTxReqStatus::DoubleSpend),
175                        ..Default::default()
176                    };
177                    let _ = self
178                        .storage
179                        .update_proven_tx_req(req.proven_tx_req_id, &update)
180                        .await;
181
182                    // Update referenced transactions to failed
183                    if let Ok(notify) = serde_json::from_str::<serde_json::Value>(&req.notify) {
184                        if let Some(tx_ids) =
185                            notify.get("transactionIds").and_then(|v| v.as_array())
186                        {
187                            let ids: Vec<i64> = tx_ids.iter().filter_map(|v| v.as_i64()).collect();
188                            for id in &ids {
189                                let _ = self
190                                    .storage
191                                    .update_transaction(
192                                        *id,
193                                        &crate::storage::find_args::TransactionPartial {
194                                            status: Some(crate::status::TransactionStatus::Failed),
195                                            ..Default::default()
196                                        },
197                                    )
198                                    .await;
199                            }
200                        }
201                    }
202                    log.push_str(&format!("  req {} => doubleSpend\n", req.proven_tx_req_id));
203                }
204                "REJECTED" => {
205                    let update = ProvenTxReqPartial {
206                        status: Some(ProvenTxReqStatus::Invalid),
207                        ..Default::default()
208                    };
209                    let _ = self
210                        .storage
211                        .update_proven_tx_req(req.proven_tx_req_id, &update)
212                        .await;
213
214                    // Update referenced transactions to failed
215                    if let Ok(notify) = serde_json::from_str::<serde_json::Value>(&req.notify) {
216                        if let Some(tx_ids) =
217                            notify.get("transactionIds").and_then(|v| v.as_array())
218                        {
219                            let ids: Vec<i64> = tx_ids.iter().filter_map(|v| v.as_i64()).collect();
220                            for id in &ids {
221                                let _ = self
222                                    .storage
223                                    .update_transaction(
224                                        *id,
225                                        &crate::storage::find_args::TransactionPartial {
226                                            status: Some(crate::status::TransactionStatus::Failed),
227                                            ..Default::default()
228                                        },
229                                    )
230                                    .await;
231                            }
232                        }
233                    }
234                    log.push_str(&format!("  req {} => invalid\n", req.proven_tx_req_id));
235                }
236                other => {
237                    log.push_str(&format!(
238                        "  req {} unhandled status: {}\n",
239                        req.proven_tx_req_id, other
240                    ));
241                }
242            }
243        }
244
245        // Fire status changed callback
246        if let Some(ref cb) = self.on_tx_status_changed {
247            cb((event.txid.clone(), event.tx_status.clone())).await;
248        }
249
250        log
251    }
252}
253
254#[async_trait]
255impl WalletMonitorTask for TaskArcSse {
256    fn name(&self) -> &str {
257        "ArcadeSSE"
258    }
259
260    async fn async_setup(&mut self) -> Result<(), WalletError> {
261        let callback_token = match &self.callback_token {
262            Some(t) if !t.is_empty() => t.clone(),
263            _ => {
264                info!("[TaskArcSse] no callbackToken configured -- SSE disabled");
265                return Ok(());
266            }
267        };
268
269        // For now, SSE client setup requires an arc_url from the services layer.
270        // If it cannot be obtained, the task remains dormant.
271        // The ArcSseClient is created but actual connection depends on runtime config.
272        info!(
273            "[TaskArcSse] setting up -- token={}...",
274            &callback_token[..callback_token.len().min(8)]
275        );
276
277        let options = ArcSseClientOptions {
278            base_url: String::new(), // Will be configured from services at runtime
279            callback_token,
280            arc_api_key: None,
281            last_event_id: None,
282        };
283
284        let (client, receiver) = ArcSseClient::new(options);
285        self.sse_client = Some(client);
286        self.sse_receiver = Some(receiver);
287
288        Ok(())
289    }
290
291    fn trigger(&mut self, _now_msecs: u64) -> bool {
292        // Drain any available events from the receiver
293        if let Some(ref mut rx) = self.sse_receiver {
294            loop {
295                match rx.try_recv() {
296                    Ok(event) => self.pending_events.push(event),
297                    Err(mpsc::error::TryRecvError::Empty) => break,
298                    Err(mpsc::error::TryRecvError::Disconnected) => break,
299                }
300            }
301        }
302        !self.pending_events.is_empty()
303    }
304
305    async fn run_task(&mut self) -> Result<String, WalletError> {
306        let events: Vec<ArcSseEvent> = self.pending_events.drain(..).collect();
307        if events.is_empty() {
308            return Ok(String::new());
309        }
310
311        let mut log = String::new();
312        for event in &events {
313            log.push_str(&self.process_status_event(event).await);
314        }
315
316        Ok(log)
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323
324    #[test]
325    fn test_terminal_statuses() {
326        assert!(TERMINAL_STATUSES.contains(&ProvenTxReqStatus::Completed));
327        assert!(TERMINAL_STATUSES.contains(&ProvenTxReqStatus::Invalid));
328        assert!(TERMINAL_STATUSES.contains(&ProvenTxReqStatus::DoubleSpend));
329        assert!(!TERMINAL_STATUSES.contains(&ProvenTxReqStatus::Unmined));
330    }
331
332    #[test]
333    fn test_name() {
334        assert_eq!("ArcadeSSE", "ArcadeSSE");
335    }
336
337    #[test]
338    fn test_no_sse_configured_is_noop() {
339        // When no callback_token is set, the task has no receiver and never triggers.
340        // We cannot construct TaskArcSse without a real WalletStorageManager,
341        // so we verify the logic conceptually: empty pending_events and no receiver
342        // means trigger returns false.
343        let events: Vec<ArcSseEvent> = Vec::new();
344        assert!(events.is_empty());
345    }
346}