1use bitcoin::{BlockHash, Txid};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::{RwLock, broadcast};
13
14use crate::client::BitcoinClient;
15use crate::error::{BitcoinError, Result};
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct MempoolTransaction {
20 pub txid: String,
21 pub addresses: Vec<String>,
22 pub amount_sats: i64,
23 pub fee_sats: u64,
24 pub size: u64,
25 pub time: i64,
26 pub first_seen: i64,
27}
28
29#[derive(Debug, Clone, Serialize)]
31pub struct MempoolEntry {
32 pub txid: String,
33 pub vsize: u64,
34 pub weight: u64,
35 pub fee: u64,
36 pub time: i64,
37 pub descendant_count: u64,
38 pub descendant_size: u64,
39 pub descendant_fees: u64,
40 pub ancestor_count: u64,
41 pub ancestor_size: u64,
42 pub ancestor_fees: u64,
43}
44
45#[derive(Debug, Clone)]
47pub enum MempoolEvent {
48 TransactionDetected {
50 txid: String,
51 address: String,
52 amount_sats: i64,
53 fee_sats: u64,
54 },
55 TransactionConfirmed {
57 txid: String,
58 block_hash: String,
59 confirmations: u32,
60 },
61 TransactionRemoved { txid: String, reason: RemovalReason },
63 Reorganization {
65 old_tip: String,
66 new_tip: String,
67 depth: u32,
68 affected_txids: Vec<String>,
69 },
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub enum RemovalReason {
75 Confirmed,
77 Replaced { replacement_txid: String },
79 Expired,
81 DoubleSpent,
83 Unknown,
85}
86
87#[derive(Debug, Clone)]
89pub struct MempoolMonitorConfig {
90 pub poll_interval_secs: u64,
92 pub max_tracked_transactions: usize,
94 pub detect_reorgs: bool,
96 pub min_reorg_depth: u32,
98}
99
100impl Default for MempoolMonitorConfig {
101 fn default() -> Self {
102 Self {
103 poll_interval_secs: 10,
104 max_tracked_transactions: 10000,
105 detect_reorgs: true,
106 min_reorg_depth: 1,
107 }
108 }
109}
110
111#[derive(Debug, Clone)]
113struct ChainTip {
114 hash: BlockHash,
115 height: u64,
116 #[allow(dead_code)]
117 time: i64,
118}
119
120pub struct MempoolMonitor {
122 client: Arc<BitcoinClient>,
123 config: MempoolMonitorConfig,
124 watched_addresses: Arc<RwLock<HashSet<String>>>,
126 tracked_transactions: Arc<RwLock<HashMap<String, i64>>>,
128 #[allow(dead_code)]
130 last_mempool_txids: Arc<RwLock<HashSet<String>>>,
131 last_chain_tip: Arc<RwLock<Option<ChainTip>>>,
133 event_tx: broadcast::Sender<MempoolEvent>,
135}
136
137impl MempoolMonitor {
138 pub fn new(client: Arc<BitcoinClient>, config: MempoolMonitorConfig) -> Self {
140 let (event_tx, _) = broadcast::channel(1000);
141
142 Self {
143 client,
144 config,
145 watched_addresses: Arc::new(RwLock::new(HashSet::new())),
146 tracked_transactions: Arc::new(RwLock::new(HashMap::new())),
147 last_mempool_txids: Arc::new(RwLock::new(HashSet::new())),
148 last_chain_tip: Arc::new(RwLock::new(None)),
149 event_tx,
150 }
151 }
152
153 pub fn subscribe(&self) -> broadcast::Receiver<MempoolEvent> {
155 self.event_tx.subscribe()
156 }
157
158 pub async fn watch_address(&self, address: &str) {
160 let mut addresses = self.watched_addresses.write().await;
161 addresses.insert(address.to_string());
162 tracing::debug!(
163 address = address,
164 "Watching address for mempool transactions"
165 );
166 }
167
168 pub async fn unwatch_address(&self, address: &str) {
170 let mut addresses = self.watched_addresses.write().await;
171 addresses.remove(address);
172 }
173
174 pub async fn track_transaction(&self, txid: &str) {
176 let mut tracked = self.tracked_transactions.write().await;
177 if tracked.len() < self.config.max_tracked_transactions {
178 tracked.insert(txid.to_string(), chrono::Utc::now().timestamp());
179 tracing::debug!(txid = txid, "Tracking transaction");
180 }
181 }
182
183 pub async fn untrack_transaction(&self, txid: &str) {
185 let mut tracked = self.tracked_transactions.write().await;
186 tracked.remove(txid);
187 }
188
189 pub fn get_mempool_stats(&self) -> Result<MempoolStats> {
191 let info = self.client.get_mempool_info()?;
192
193 Ok(MempoolStats {
194 size: info.size as u64,
195 bytes: info.bytes as u64,
196 usage: info.usage as u64,
197 max_mempool: info.max_mempool as u64,
198 mempool_min_fee: info.mempool_min_fee.to_sat() as f64 / 1000.0,
199 min_relay_fee: info.min_relay_tx_fee.to_sat() as f64 / 1000.0,
200 })
201 }
202
203 pub fn is_in_mempool(&self, txid: &str) -> Result<bool> {
205 let txid_parsed: Txid = txid
206 .parse()
207 .map_err(|e| BitcoinError::InvalidTransaction(format!("Invalid txid: {}", e)))?;
208
209 match self.client.get_raw_transaction(&txid_parsed) {
211 Ok(tx_info) => Ok(tx_info.confirmations.is_none() || tx_info.confirmations == Some(0)),
212 Err(BitcoinError::Rpc(_)) => Ok(false),
213 Err(e) => Err(e),
214 }
215 }
216
217 pub async fn poll(&self) -> Result<Vec<MempoolEvent>> {
219 let mut events = Vec::new();
220
221 if self.config.detect_reorgs {
223 if let Some(reorg_event) = self.check_for_reorg().await? {
224 events.push(reorg_event);
225 }
226 }
227
228 let watched = self.watched_addresses.read().await.clone();
230 if watched.is_empty() {
231 return Ok(events);
232 }
233
234 let blockchain_info = self.client.get_blockchain_info()?;
237 let since_result = self
238 .client
239 .list_since_block(Some(&blockchain_info.best_block_hash), Some(0))?;
240
241 for tx in since_result.transactions {
242 if let Some(ref addr) = tx.address {
243 if watched.contains(addr) && tx.confirmations == 0 {
244 let event = MempoolEvent::TransactionDetected {
246 txid: tx.txid.to_string(),
247 address: addr.clone(),
248 amount_sats: tx.amount,
249 fee_sats: 0, };
251 events.push(event.clone());
252
253 self.track_transaction(&tx.txid.to_string()).await;
255 }
256 }
257 }
258
259 for event in &events {
261 let _ = self.event_tx.send(event.clone());
262 }
263
264 Ok(events)
265 }
266
267 async fn check_for_reorg(&self) -> Result<Option<MempoolEvent>> {
269 let blockchain_info = self.client.get_blockchain_info()?;
270 let current_tip = ChainTip {
271 hash: blockchain_info.best_block_hash,
272 height: blockchain_info.blocks,
273 time: chrono::Utc::now().timestamp(),
274 };
275
276 let mut last_tip_guard = self.last_chain_tip.write().await;
277
278 let previous_tip = last_tip_guard.take();
280
281 let result = if let Some(prev) = previous_tip {
282 if prev.hash != current_tip.hash && current_tip.height <= prev.height {
284 let depth = (prev.height - current_tip.height + 1) as u32;
285
286 if depth >= self.config.min_reorg_depth {
287 let tracked = self.tracked_transactions.read().await;
289 let affected_txids: Vec<String> = tracked.keys().cloned().collect();
290
291 tracing::warn!(
292 old_tip = %prev.hash,
293 new_tip = %current_tip.hash,
294 depth = depth,
295 "Block reorganization detected"
296 );
297
298 Some(MempoolEvent::Reorganization {
299 old_tip: prev.hash.to_string(),
300 new_tip: blockchain_info.best_block_hash.to_string(),
301 depth,
302 affected_txids,
303 })
304 } else {
305 None
306 }
307 } else {
308 None
309 }
310 } else {
311 None
312 };
313
314 *last_tip_guard = Some(current_tip);
315 Ok(result)
316 }
317
318 pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
320 let poll_interval = std::time::Duration::from_secs(self.config.poll_interval_secs);
321
322 tracing::info!(
323 poll_interval_secs = self.config.poll_interval_secs,
324 "Mempool monitor started"
325 );
326
327 loop {
328 tokio::select! {
329 _ = tokio::time::sleep(poll_interval) => {
330 if let Err(e) = self.poll().await {
331 tracing::warn!(error = %e, "Mempool poll failed");
332 }
333 }
334 _ = shutdown.changed() => {
335 if *shutdown.borrow() {
336 tracing::info!("Mempool monitor shutting down");
337 break;
338 }
339 }
340 }
341 }
342 }
343}
344
345#[derive(Debug, Clone, Serialize)]
347pub struct MempoolStats {
348 pub size: u64,
350 pub bytes: u64,
352 pub usage: u64,
354 pub max_mempool: u64,
356 pub mempool_min_fee: f64,
358 pub min_relay_fee: f64,
360}
361
362pub struct ReorgTracker {
364 block_history: HashMap<u64, BlockHash>,
366 max_depth: usize,
368 confirmed_transactions: HashMap<String, u64>, }
371
372impl ReorgTracker {
373 pub fn new(max_depth: usize) -> Self {
375 Self {
376 block_history: HashMap::new(),
377 max_depth,
378 confirmed_transactions: HashMap::new(),
379 }
380 }
381
382 pub fn record_block(&mut self, height: u64, hash: BlockHash) {
384 self.block_history.insert(height, hash);
385
386 let min_height = height.saturating_sub(self.max_depth as u64);
388 self.block_history.retain(|h, _| *h >= min_height);
389 }
390
391 pub fn record_confirmation(&mut self, txid: &str, height: u64) {
393 self.confirmed_transactions.insert(txid.to_string(), height);
394
395 let min_height = height.saturating_sub(self.max_depth as u64);
397 self.confirmed_transactions.retain(|_, h| *h >= min_height);
398 }
399
400 pub fn verify_block(&self, height: u64, hash: &BlockHash) -> bool {
402 self.block_history.get(&height).is_none_or(|h| h == hash)
403 }
404
405 pub fn get_affected_transactions(&self, reorg_height: u64) -> Vec<String> {
407 self.confirmed_transactions
408 .iter()
409 .filter(|(_, h)| **h >= reorg_height)
410 .map(|(txid, _)| txid.clone())
411 .collect()
412 }
413
414 pub fn detect_reorg(
416 &self,
417 client: &BitcoinClient,
418 current_height: u64,
419 ) -> Result<Option<ReorgInfo>> {
420 for (height, expected_hash) in &self.block_history {
422 if *height <= current_height {
423 let actual_hash = client.get_block_hash(*height)?;
424 if actual_hash != *expected_hash {
425 let depth = (current_height - height + 1) as u32;
427 let affected = self.get_affected_transactions(*height);
428
429 return Ok(Some(ReorgInfo {
430 divergence_height: *height,
431 depth,
432 expected_hash: expected_hash.to_string(),
433 actual_hash: actual_hash.to_string(),
434 affected_transactions: affected,
435 }));
436 }
437 }
438 }
439
440 Ok(None)
441 }
442}
443
444#[derive(Debug, Clone, Serialize)]
446pub struct ReorgInfo {
447 pub divergence_height: u64,
449 pub depth: u32,
451 pub expected_hash: String,
453 pub actual_hash: String,
455 pub affected_transactions: Vec<String>,
457}
458
459pub struct AddressWatcher {
461 client: Arc<BitcoinClient>,
462 addresses: HashSet<String>,
463 last_unconfirmed: HashMap<String, u64>,
465}
466
467impl AddressWatcher {
468 pub fn new(client: Arc<BitcoinClient>) -> Self {
470 Self {
471 client,
472 addresses: HashSet::new(),
473 last_unconfirmed: HashMap::new(),
474 }
475 }
476
477 pub fn watch(&mut self, address: &str) {
479 self.addresses.insert(address.to_string());
480 }
481
482 pub fn unwatch(&mut self, address: &str) {
484 self.addresses.remove(address);
485 self.last_unconfirmed.remove(address);
486 }
487
488 pub fn check_unconfirmed(&mut self) -> Result<Vec<UnconfirmedPayment>> {
490 let mut new_payments = Vec::new();
491
492 for address in &self.addresses {
493 let addr: bitcoin::Address<bitcoin::address::NetworkUnchecked> = address
494 .parse()
495 .map_err(|e| BitcoinError::InvalidAddress(format!("{:?}", e)))?;
496
497 let checked_addr = addr.assume_checked();
498
499 let unconfirmed = self
501 .client
502 .get_received_by_address(&checked_addr, Some(0))?
503 .to_sat();
504
505 let confirmed = self
507 .client
508 .get_received_by_address(&checked_addr, Some(1))?
509 .to_sat();
510
511 let unconfirmed_balance = unconfirmed.saturating_sub(confirmed);
513
514 let last = self.last_unconfirmed.get(address).copied().unwrap_or(0);
516 if unconfirmed_balance > last {
517 let new_amount = unconfirmed_balance - last;
518 new_payments.push(UnconfirmedPayment {
519 address: address.clone(),
520 amount_sats: new_amount,
521 detected_at: chrono::Utc::now().timestamp(),
522 });
523
524 tracing::info!(
525 address = address,
526 amount_sats = new_amount,
527 "New unconfirmed payment detected"
528 );
529 }
530
531 self.last_unconfirmed
532 .insert(address.clone(), unconfirmed_balance);
533 }
534
535 Ok(new_payments)
536 }
537
538 pub fn watched_addresses(&self) -> Vec<String> {
540 self.addresses.iter().cloned().collect()
541 }
542}
543
544#[derive(Debug, Clone, Serialize)]
546pub struct UnconfirmedPayment {
547 pub address: String,
548 pub amount_sats: u64,
549 pub detected_at: i64,
550}