1use std::collections::HashSet;
7use std::sync::Arc;
8
9use serde::{Deserialize, Serialize};
10use tokio::sync::{broadcast, RwLock};
11
12use crate::types::FeeEstimates;
13
14pub const MAINNET_WS_URL: &str = "wss://mempool.space/api/v1/ws";
16pub const TESTNET_WS_URL: &str = "wss://mempool.space/testnet/api/v1/ws";
18pub const SIGNET_WS_URL: &str = "wss://mempool.space/signet/api/v1/ws";
20
21#[derive(Debug, Clone)]
23pub enum WsEvent {
24 Block(BlockEvent),
26 MempoolInfo(MempoolInfoEvent),
28 Fees(FeeEstimates),
30 AddressTx(AddressTxEvent),
32 TxConfirmed(TxConfirmedEvent),
34 ConnectionStatus(WsConnectionStatus),
36}
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct BlockEvent {
41 pub height: u64,
43 pub hash: String,
45 pub timestamp: u64,
47 pub tx_count: u32,
49 pub size: u32,
51 pub weight: u32,
53 pub total_fees: u64,
55 pub median_fee: f64,
57}
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
61pub struct MempoolInfoEvent {
62 pub count: u64,
64 pub vsize: u64,
66 pub total_fee: u64,
68 pub usage: u64,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct AddressTxEvent {
75 pub address: String,
77 pub txid: String,
79 pub value: i64,
81 pub confirmed: bool,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct TxConfirmedEvent {
88 pub txid: String,
90 pub block_height: u64,
92 pub block_hash: String,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
98pub enum WsConnectionStatus {
99 Connected,
101 Disconnected,
103 Reconnecting,
105 Error,
107}
108
109#[derive(Debug, Clone, Default)]
111pub struct WsSubscription {
112 pub blocks: bool,
114 pub mempool_info: bool,
116 pub fees: bool,
118 pub addresses: HashSet<String>,
120 pub transactions: HashSet<String>,
122}
123
124impl WsSubscription {
125 pub fn new() -> Self {
127 Self::default()
128 }
129
130 pub fn with_blocks(mut self) -> Self {
132 self.blocks = true;
133 self
134 }
135
136 pub fn with_mempool_info(mut self) -> Self {
138 self.mempool_info = true;
139 self
140 }
141
142 pub fn with_fees(mut self) -> Self {
144 self.fees = true;
145 self
146 }
147
148 pub fn track_address(mut self, address: impl Into<String>) -> Self {
150 self.addresses.insert(address.into());
151 self
152 }
153
154 pub fn track_addresses(mut self, addresses: impl IntoIterator<Item = impl Into<String>>) -> Self {
156 self.addresses.extend(addresses.into_iter().map(|a| a.into()));
157 self
158 }
159
160 pub fn track_transaction(mut self, txid: impl Into<String>) -> Self {
162 self.transactions.insert(txid.into());
163 self
164 }
165
166 pub fn has_subscriptions(&self) -> bool {
168 self.blocks || self.mempool_info || self.fees ||
169 !self.addresses.is_empty() || !self.transactions.is_empty()
170 }
171}
172
173pub struct WsClientState {
175 pub subscription: WsSubscription,
177 pub status: WsConnectionStatus,
179 event_tx: broadcast::Sender<WsEvent>,
181}
182
183impl WsClientState {
184 pub fn new() -> Self {
186 let (event_tx, _) = broadcast::channel(1000);
187 Self {
188 subscription: WsSubscription::new(),
189 status: WsConnectionStatus::Disconnected,
190 event_tx,
191 }
192 }
193
194 pub fn subscribe(&self) -> broadcast::Receiver<WsEvent> {
196 self.event_tx.subscribe()
197 }
198
199 pub fn broadcast(&self, event: WsEvent) {
201 let _ = self.event_tx.send(event);
202 }
203}
204
205impl Default for WsClientState {
206 fn default() -> Self {
207 Self::new()
208 }
209}
210
211pub struct MempoolWsClient {
217 ws_url: String,
218 state: Arc<RwLock<WsClientState>>,
219}
220
221impl MempoolWsClient {
222 pub fn new() -> Self {
224 Self::with_url(MAINNET_WS_URL)
225 }
226
227 pub fn testnet() -> Self {
229 Self::with_url(TESTNET_WS_URL)
230 }
231
232 pub fn signet() -> Self {
234 Self::with_url(SIGNET_WS_URL)
235 }
236
237 pub fn with_url(url: &str) -> Self {
239 Self {
240 ws_url: url.to_string(),
241 state: Arc::new(RwLock::new(WsClientState::new())),
242 }
243 }
244
245 pub fn url(&self) -> &str {
247 &self.ws_url
248 }
249
250 pub async fn subscribe(&self) -> broadcast::Receiver<WsEvent> {
252 self.state.read().await.subscribe()
253 }
254
255 pub async fn status(&self) -> WsConnectionStatus {
257 self.state.read().await.status
258 }
259
260 pub async fn set_subscription(&self, subscription: WsSubscription) {
262 let mut state = self.state.write().await;
263 state.subscription = subscription;
264 }
265
266 pub async fn get_subscription(&self) -> WsSubscription {
268 self.state.read().await.subscription.clone()
269 }
270
271 pub async fn track_address(&self, address: impl Into<String>) {
273 let mut state = self.state.write().await;
274 state.subscription.addresses.insert(address.into());
275 }
276
277 pub async fn untrack_address(&self, address: &str) {
279 let mut state = self.state.write().await;
280 state.subscription.addresses.remove(address);
281 }
282
283 pub async fn track_transaction(&self, txid: impl Into<String>) {
285 let mut state = self.state.write().await;
286 state.subscription.transactions.insert(txid.into());
287 }
288
289 pub async fn untrack_transaction(&self, txid: &str) {
291 let mut state = self.state.write().await;
292 state.subscription.transactions.remove(txid);
293 }
294
295 #[cfg(test)]
297 pub async fn simulate_block(&self, event: BlockEvent) {
298 let state = self.state.read().await;
299 state.broadcast(WsEvent::Block(event));
300 }
301
302 #[cfg(test)]
304 pub async fn simulate_fees(&self, fees: FeeEstimates) {
305 let state = self.state.read().await;
306 state.broadcast(WsEvent::Fees(fees));
307 }
308}
309
310impl Default for MempoolWsClient {
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316pub struct WsSubscriptionBuilder {
318 subscription: WsSubscription,
319}
320
321impl WsSubscriptionBuilder {
322 pub fn new() -> Self {
324 Self {
325 subscription: WsSubscription::new(),
326 }
327 }
328
329 pub fn blocks(mut self) -> Self {
331 self.subscription.blocks = true;
332 self
333 }
334
335 pub fn mempool_info(mut self) -> Self {
337 self.subscription.mempool_info = true;
338 self
339 }
340
341 pub fn fees(mut self) -> Self {
343 self.subscription.fees = true;
344 self
345 }
346
347 pub fn address(mut self, address: impl Into<String>) -> Self {
349 self.subscription.addresses.insert(address.into());
350 self
351 }
352
353 pub fn transaction(mut self, txid: impl Into<String>) -> Self {
355 self.subscription.transactions.insert(txid.into());
356 self
357 }
358
359 pub fn build(self) -> WsSubscription {
361 self.subscription
362 }
363}
364
365impl Default for WsSubscriptionBuilder {
366 fn default() -> Self {
367 Self::new()
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 #[test]
376 fn test_ws_subscription() {
377 let sub = WsSubscription::new()
378 .with_blocks()
379 .with_fees()
380 .track_address("addr1");
381
382 assert!(sub.blocks);
383 assert!(sub.fees);
384 assert!(!sub.mempool_info);
385 assert!(sub.addresses.contains("addr1"));
386 assert!(sub.has_subscriptions());
387 }
388
389 #[test]
390 fn test_ws_subscription_builder() {
391 let sub = WsSubscriptionBuilder::new()
392 .blocks()
393 .fees()
394 .address("addr1")
395 .transaction("txid1")
396 .build();
397
398 assert!(sub.blocks);
399 assert!(sub.fees);
400 assert!(sub.addresses.contains("addr1"));
401 assert!(sub.transactions.contains("txid1"));
402 }
403
404 #[test]
405 fn test_ws_connection_status() {
406 assert_eq!(WsConnectionStatus::Connected, WsConnectionStatus::Connected);
407 assert_ne!(WsConnectionStatus::Connected, WsConnectionStatus::Disconnected);
408 }
409
410 #[test]
411 fn test_block_event() {
412 let event = BlockEvent {
413 height: 800000,
414 hash: "abc123".to_string(),
415 timestamp: 1234567890,
416 tx_count: 1000,
417 size: 1000000,
418 weight: 4000000,
419 total_fees: 50000000,
420 median_fee: 10.5,
421 };
422
423 assert_eq!(event.height, 800000);
424 assert_eq!(event.tx_count, 1000);
425 }
426
427 #[tokio::test]
428 async fn test_ws_client() {
429 let client = MempoolWsClient::new();
430 assert_eq!(client.url(), MAINNET_WS_URL);
431 assert_eq!(client.status().await, WsConnectionStatus::Disconnected);
432 }
433}