rustywallet_mempool/
websocket.rs

1//! WebSocket support for real-time mempool data.
2//!
3//! This module provides WebSocket connectivity for receiving
4//! real-time updates from mempool.space.
5
6use std::collections::HashSet;
7use std::sync::Arc;
8
9use serde::{Deserialize, Serialize};
10use tokio::sync::{broadcast, RwLock};
11
12use crate::types::FeeEstimates;
13
14/// WebSocket endpoint URLs.
15pub const MAINNET_WS_URL: &str = "wss://mempool.space/api/v1/ws";
16/// Testnet WebSocket URL.
17pub const TESTNET_WS_URL: &str = "wss://mempool.space/testnet/api/v1/ws";
18/// Signet WebSocket URL.
19pub const SIGNET_WS_URL: &str = "wss://mempool.space/signet/api/v1/ws";
20
21/// WebSocket event types.
22#[derive(Debug, Clone)]
23pub enum WsEvent {
24    /// New block mined
25    Block(BlockEvent),
26    /// Mempool update
27    MempoolInfo(MempoolInfoEvent),
28    /// Fee rate update
29    Fees(FeeEstimates),
30    /// Address transaction detected
31    AddressTx(AddressTxEvent),
32    /// Transaction confirmed
33    TxConfirmed(TxConfirmedEvent),
34    /// Connection status changed
35    ConnectionStatus(WsConnectionStatus),
36}
37
38/// Block event data.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BlockEvent {
41    /// Block height
42    pub height: u64,
43    /// Block hash
44    pub hash: String,
45    /// Block timestamp
46    pub timestamp: u64,
47    /// Number of transactions
48    pub tx_count: u32,
49    /// Block size in bytes
50    pub size: u32,
51    /// Block weight
52    pub weight: u32,
53    /// Total fees in satoshis
54    pub total_fees: u64,
55    /// Median fee rate
56    pub median_fee: f64,
57}
58
59/// Mempool info event.
60#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct MempoolInfoEvent {
62    /// Number of transactions in mempool
63    pub count: u64,
64    /// Total size in virtual bytes
65    pub vsize: u64,
66    /// Total fees in satoshis
67    pub total_fee: u64,
68    /// Memory usage in bytes
69    pub usage: u64,
70}
71
72/// Address transaction event.
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct AddressTxEvent {
75    /// The address
76    pub address: String,
77    /// Transaction ID
78    pub txid: String,
79    /// Value change in satoshis (positive = received, negative = sent)
80    pub value: i64,
81    /// Whether confirmed
82    pub confirmed: bool,
83}
84
85/// Transaction confirmed event.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct TxConfirmedEvent {
88    /// Transaction ID
89    pub txid: String,
90    /// Block height
91    pub block_height: u64,
92    /// Block hash
93    pub block_hash: String,
94}
95
96/// WebSocket connection status.
97#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum WsConnectionStatus {
99    /// Connected to server
100    Connected,
101    /// Disconnected from server
102    Disconnected,
103    /// Reconnecting to server
104    Reconnecting,
105    /// Connection error
106    Error,
107}
108
109/// WebSocket subscription configuration.
110#[derive(Debug, Clone, Default)]
111pub struct WsSubscription {
112    /// Subscribe to new blocks
113    pub blocks: bool,
114    /// Subscribe to mempool info updates
115    pub mempool_info: bool,
116    /// Subscribe to fee updates
117    pub fees: bool,
118    /// Addresses to track
119    pub addresses: HashSet<String>,
120    /// Transactions to track
121    pub transactions: HashSet<String>,
122}
123
124impl WsSubscription {
125    /// Create a new empty subscription.
126    pub fn new() -> Self {
127        Self::default()
128    }
129
130    /// Subscribe to new blocks.
131    pub fn with_blocks(mut self) -> Self {
132        self.blocks = true;
133        self
134    }
135
136    /// Subscribe to mempool info.
137    pub fn with_mempool_info(mut self) -> Self {
138        self.mempool_info = true;
139        self
140    }
141
142    /// Subscribe to fee updates.
143    pub fn with_fees(mut self) -> Self {
144        self.fees = true;
145        self
146    }
147
148    /// Track an address.
149    pub fn track_address(mut self, address: impl Into<String>) -> Self {
150        self.addresses.insert(address.into());
151        self
152    }
153
154    /// Track multiple addresses.
155    pub fn track_addresses(mut self, addresses: impl IntoIterator<Item = impl Into<String>>) -> Self {
156        self.addresses.extend(addresses.into_iter().map(|a| a.into()));
157        self
158    }
159
160    /// Track a transaction.
161    pub fn track_transaction(mut self, txid: impl Into<String>) -> Self {
162        self.transactions.insert(txid.into());
163        self
164    }
165
166    /// Check if any subscriptions are active.
167    pub fn has_subscriptions(&self) -> bool {
168        self.blocks || self.mempool_info || self.fees || 
169        !self.addresses.is_empty() || !self.transactions.is_empty()
170    }
171}
172
173/// WebSocket client state (simulated - actual WebSocket requires additional deps).
174pub struct WsClientState {
175    /// Current subscription
176    pub subscription: WsSubscription,
177    /// Connection status
178    pub status: WsConnectionStatus,
179    /// Event broadcaster
180    event_tx: broadcast::Sender<WsEvent>,
181}
182
183impl WsClientState {
184    /// Create new client state.
185    pub fn new() -> Self {
186        let (event_tx, _) = broadcast::channel(1000);
187        Self {
188            subscription: WsSubscription::new(),
189            status: WsConnectionStatus::Disconnected,
190            event_tx,
191        }
192    }
193
194    /// Subscribe to events.
195    pub fn subscribe(&self) -> broadcast::Receiver<WsEvent> {
196        self.event_tx.subscribe()
197    }
198
199    /// Broadcast an event.
200    pub fn broadcast(&self, event: WsEvent) {
201        let _ = self.event_tx.send(event);
202    }
203}
204
205impl Default for WsClientState {
206    fn default() -> Self {
207        Self::new()
208    }
209}
210
211/// WebSocket client for real-time mempool data.
212///
213/// Note: This is a high-level API. Actual WebSocket connectivity
214/// requires the `tokio-tungstenite` crate which can be added as
215/// an optional dependency.
216pub struct MempoolWsClient {
217    ws_url: String,
218    state: Arc<RwLock<WsClientState>>,
219}
220
221impl MempoolWsClient {
222    /// Create a new WebSocket client for mainnet.
223    pub fn new() -> Self {
224        Self::with_url(MAINNET_WS_URL)
225    }
226
227    /// Create a new WebSocket client for testnet.
228    pub fn testnet() -> Self {
229        Self::with_url(TESTNET_WS_URL)
230    }
231
232    /// Create a new WebSocket client for signet.
233    pub fn signet() -> Self {
234        Self::with_url(SIGNET_WS_URL)
235    }
236
237    /// Create a new WebSocket client with custom URL.
238    pub fn with_url(url: &str) -> Self {
239        Self {
240            ws_url: url.to_string(),
241            state: Arc::new(RwLock::new(WsClientState::new())),
242        }
243    }
244
245    /// Get the WebSocket URL.
246    pub fn url(&self) -> &str {
247        &self.ws_url
248    }
249
250    /// Subscribe to events.
251    pub async fn subscribe(&self) -> broadcast::Receiver<WsEvent> {
252        self.state.read().await.subscribe()
253    }
254
255    /// Get current connection status.
256    pub async fn status(&self) -> WsConnectionStatus {
257        self.state.read().await.status
258    }
259
260    /// Update subscription configuration.
261    pub async fn set_subscription(&self, subscription: WsSubscription) {
262        let mut state = self.state.write().await;
263        state.subscription = subscription;
264    }
265
266    /// Get current subscription.
267    pub async fn get_subscription(&self) -> WsSubscription {
268        self.state.read().await.subscription.clone()
269    }
270
271    /// Track an address for transactions.
272    pub async fn track_address(&self, address: impl Into<String>) {
273        let mut state = self.state.write().await;
274        state.subscription.addresses.insert(address.into());
275    }
276
277    /// Untrack an address.
278    pub async fn untrack_address(&self, address: &str) {
279        let mut state = self.state.write().await;
280        state.subscription.addresses.remove(address);
281    }
282
283    /// Track a transaction for confirmation.
284    pub async fn track_transaction(&self, txid: impl Into<String>) {
285        let mut state = self.state.write().await;
286        state.subscription.transactions.insert(txid.into());
287    }
288
289    /// Untrack a transaction.
290    pub async fn untrack_transaction(&self, txid: &str) {
291        let mut state = self.state.write().await;
292        state.subscription.transactions.remove(txid);
293    }
294
295    /// Simulate receiving a block event (for testing).
296    #[cfg(test)]
297    pub async fn simulate_block(&self, event: BlockEvent) {
298        let state = self.state.read().await;
299        state.broadcast(WsEvent::Block(event));
300    }
301
302    /// Simulate receiving a fee update (for testing).
303    #[cfg(test)]
304    pub async fn simulate_fees(&self, fees: FeeEstimates) {
305        let state = self.state.read().await;
306        state.broadcast(WsEvent::Fees(fees));
307    }
308}
309
310impl Default for MempoolWsClient {
311    fn default() -> Self {
312        Self::new()
313    }
314}
315
316/// Builder for WebSocket subscriptions.
317pub struct WsSubscriptionBuilder {
318    subscription: WsSubscription,
319}
320
321impl WsSubscriptionBuilder {
322    /// Create a new builder.
323    pub fn new() -> Self {
324        Self {
325            subscription: WsSubscription::new(),
326        }
327    }
328
329    /// Subscribe to new blocks.
330    pub fn blocks(mut self) -> Self {
331        self.subscription.blocks = true;
332        self
333    }
334
335    /// Subscribe to mempool info.
336    pub fn mempool_info(mut self) -> Self {
337        self.subscription.mempool_info = true;
338        self
339    }
340
341    /// Subscribe to fee updates.
342    pub fn fees(mut self) -> Self {
343        self.subscription.fees = true;
344        self
345    }
346
347    /// Track an address.
348    pub fn address(mut self, address: impl Into<String>) -> Self {
349        self.subscription.addresses.insert(address.into());
350        self
351    }
352
353    /// Track a transaction.
354    pub fn transaction(mut self, txid: impl Into<String>) -> Self {
355        self.subscription.transactions.insert(txid.into());
356        self
357    }
358
359    /// Build the subscription.
360    pub fn build(self) -> WsSubscription {
361        self.subscription
362    }
363}
364
365impl Default for WsSubscriptionBuilder {
366    fn default() -> Self {
367        Self::new()
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[test]
376    fn test_ws_subscription() {
377        let sub = WsSubscription::new()
378            .with_blocks()
379            .with_fees()
380            .track_address("addr1");
381        
382        assert!(sub.blocks);
383        assert!(sub.fees);
384        assert!(!sub.mempool_info);
385        assert!(sub.addresses.contains("addr1"));
386        assert!(sub.has_subscriptions());
387    }
388
389    #[test]
390    fn test_ws_subscription_builder() {
391        let sub = WsSubscriptionBuilder::new()
392            .blocks()
393            .fees()
394            .address("addr1")
395            .transaction("txid1")
396            .build();
397        
398        assert!(sub.blocks);
399        assert!(sub.fees);
400        assert!(sub.addresses.contains("addr1"));
401        assert!(sub.transactions.contains("txid1"));
402    }
403
404    #[test]
405    fn test_ws_connection_status() {
406        assert_eq!(WsConnectionStatus::Connected, WsConnectionStatus::Connected);
407        assert_ne!(WsConnectionStatus::Connected, WsConnectionStatus::Disconnected);
408    }
409
410    #[test]
411    fn test_block_event() {
412        let event = BlockEvent {
413            height: 800000,
414            hash: "abc123".to_string(),
415            timestamp: 1234567890,
416            tx_count: 1000,
417            size: 1000000,
418            weight: 4000000,
419            total_fees: 50000000,
420            median_fee: 10.5,
421        };
422        
423        assert_eq!(event.height, 800000);
424        assert_eq!(event.tx_count, 1000);
425    }
426
427    #[tokio::test]
428    async fn test_ws_client() {
429        let client = MempoolWsClient::new();
430        assert_eq!(client.url(), MAINNET_WS_URL);
431        assert_eq!(client.status().await, WsConnectionStatus::Disconnected);
432    }
433}