kaccy_bitcoin/
mempool.rs

1//! Mempool monitoring and block reorganization detection
2//!
3//! Provides:
4//! - Real-time unconfirmed transaction detection
5//! - Transaction tracking through mempool
6//! - Block reorganization detection and handling
7
8use bitcoin::{BlockHash, Txid};
9use serde::{Deserialize, Serialize};
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use tokio::sync::{RwLock, broadcast};
13
14use crate::client::BitcoinClient;
15use crate::error::{BitcoinError, Result};
16
17/// Mempool transaction information
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct MempoolTransaction {
20    pub txid: String,
21    pub addresses: Vec<String>,
22    pub amount_sats: i64,
23    pub fee_sats: u64,
24    pub size: u64,
25    pub time: i64,
26    pub first_seen: i64,
27}
28
29/// Mempool entry from Bitcoin Core
30#[derive(Debug, Clone, Serialize)]
31pub struct MempoolEntry {
32    pub txid: String,
33    pub vsize: u64,
34    pub weight: u64,
35    pub fee: u64,
36    pub time: i64,
37    pub descendant_count: u64,
38    pub descendant_size: u64,
39    pub descendant_fees: u64,
40    pub ancestor_count: u64,
41    pub ancestor_size: u64,
42    pub ancestor_fees: u64,
43}
44
45/// Mempool event types
46#[derive(Debug, Clone)]
47pub enum MempoolEvent {
48    /// New transaction detected in mempool matching watched addresses
49    TransactionDetected {
50        txid: String,
51        address: String,
52        amount_sats: i64,
53        fee_sats: u64,
54    },
55    /// Transaction confirmed (moved from mempool to block)
56    TransactionConfirmed {
57        txid: String,
58        block_hash: String,
59        confirmations: u32,
60    },
61    /// Transaction removed from mempool (replaced or expired)
62    TransactionRemoved { txid: String, reason: RemovalReason },
63    /// Block reorganization detected
64    Reorganization {
65        old_tip: String,
66        new_tip: String,
67        depth: u32,
68        affected_txids: Vec<String>,
69    },
70}
71
72/// Reason for transaction removal from mempool
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub enum RemovalReason {
75    /// Transaction was included in a block
76    Confirmed,
77    /// Transaction was replaced (RBF)
78    Replaced { replacement_txid: String },
79    /// Transaction expired from mempool
80    Expired,
81    /// Transaction was double-spent
82    DoubleSpent,
83    /// Unknown reason
84    Unknown,
85}
86
87/// Configuration for mempool monitor
88#[derive(Debug, Clone)]
89pub struct MempoolMonitorConfig {
90    /// Polling interval in seconds
91    pub poll_interval_secs: u64,
92    /// Maximum transactions to track
93    pub max_tracked_transactions: usize,
94    /// Enable block reorganization detection
95    pub detect_reorgs: bool,
96    /// Minimum reorg depth to report
97    pub min_reorg_depth: u32,
98}
99
100impl Default for MempoolMonitorConfig {
101    fn default() -> Self {
102        Self {
103            poll_interval_secs: 10,
104            max_tracked_transactions: 10000,
105            detect_reorgs: true,
106            min_reorg_depth: 1,
107        }
108    }
109}
110
111/// Block chain tip tracker for reorg detection
112#[derive(Debug, Clone)]
113struct ChainTip {
114    hash: BlockHash,
115    height: u64,
116    #[allow(dead_code)]
117    time: i64,
118}
119
120/// Mempool monitor for tracking unconfirmed transactions
121pub struct MempoolMonitor {
122    client: Arc<BitcoinClient>,
123    config: MempoolMonitorConfig,
124    /// Addresses being watched
125    watched_addresses: Arc<RwLock<HashSet<String>>>,
126    /// Transactions we're tracking (txid -> first seen time)
127    tracked_transactions: Arc<RwLock<HashMap<String, i64>>>,
128    /// Last known mempool txids (for detecting removed transactions)
129    #[allow(dead_code)]
130    last_mempool_txids: Arc<RwLock<HashSet<String>>>,
131    /// Last known chain tip
132    last_chain_tip: Arc<RwLock<Option<ChainTip>>>,
133    /// Event broadcast channel
134    event_tx: broadcast::Sender<MempoolEvent>,
135}
136
137impl MempoolMonitor {
138    /// Create a new mempool monitor
139    pub fn new(client: Arc<BitcoinClient>, config: MempoolMonitorConfig) -> Self {
140        let (event_tx, _) = broadcast::channel(1000);
141
142        Self {
143            client,
144            config,
145            watched_addresses: Arc::new(RwLock::new(HashSet::new())),
146            tracked_transactions: Arc::new(RwLock::new(HashMap::new())),
147            last_mempool_txids: Arc::new(RwLock::new(HashSet::new())),
148            last_chain_tip: Arc::new(RwLock::new(None)),
149            event_tx,
150        }
151    }
152
153    /// Subscribe to mempool events
154    pub fn subscribe(&self) -> broadcast::Receiver<MempoolEvent> {
155        self.event_tx.subscribe()
156    }
157
158    /// Add an address to watch
159    pub async fn watch_address(&self, address: &str) {
160        let mut addresses = self.watched_addresses.write().await;
161        addresses.insert(address.to_string());
162        tracing::debug!(
163            address = address,
164            "Watching address for mempool transactions"
165        );
166    }
167
168    /// Remove an address from watch list
169    pub async fn unwatch_address(&self, address: &str) {
170        let mut addresses = self.watched_addresses.write().await;
171        addresses.remove(address);
172    }
173
174    /// Track a specific transaction
175    pub async fn track_transaction(&self, txid: &str) {
176        let mut tracked = self.tracked_transactions.write().await;
177        if tracked.len() < self.config.max_tracked_transactions {
178            tracked.insert(txid.to_string(), chrono::Utc::now().timestamp());
179            tracing::debug!(txid = txid, "Tracking transaction");
180        }
181    }
182
183    /// Stop tracking a transaction
184    pub async fn untrack_transaction(&self, txid: &str) {
185        let mut tracked = self.tracked_transactions.write().await;
186        tracked.remove(txid);
187    }
188
189    /// Get current mempool statistics
190    pub fn get_mempool_stats(&self) -> Result<MempoolStats> {
191        let info = self.client.get_mempool_info()?;
192
193        Ok(MempoolStats {
194            size: info.size as u64,
195            bytes: info.bytes as u64,
196            usage: info.usage as u64,
197            max_mempool: info.max_mempool as u64,
198            mempool_min_fee: info.mempool_min_fee.to_sat() as f64 / 1000.0,
199            min_relay_fee: info.min_relay_tx_fee.to_sat() as f64 / 1000.0,
200        })
201    }
202
203    /// Check if a transaction is in the mempool
204    pub fn is_in_mempool(&self, txid: &str) -> Result<bool> {
205        let txid_parsed: Txid = txid
206            .parse()
207            .map_err(|e| BitcoinError::InvalidTransaction(format!("Invalid txid: {}", e)))?;
208
209        // Try to get raw mempool entry
210        match self.client.get_raw_transaction(&txid_parsed) {
211            Ok(tx_info) => Ok(tx_info.confirmations.is_none() || tx_info.confirmations == Some(0)),
212            Err(BitcoinError::Rpc(_)) => Ok(false),
213            Err(e) => Err(e),
214        }
215    }
216
217    /// Poll mempool for changes
218    pub async fn poll(&self) -> Result<Vec<MempoolEvent>> {
219        let mut events = Vec::new();
220
221        // Check for chain reorgs first
222        if self.config.detect_reorgs {
223            if let Some(reorg_event) = self.check_for_reorg().await? {
224                events.push(reorg_event);
225            }
226        }
227
228        // Get watched addresses
229        let watched = self.watched_addresses.read().await.clone();
230        if watched.is_empty() {
231            return Ok(events);
232        }
233
234        // Check for new transactions to watched addresses
235        // Use list_since_block with 0 confirmations to get mempool transactions
236        let blockchain_info = self.client.get_blockchain_info()?;
237        let since_result = self
238            .client
239            .list_since_block(Some(&blockchain_info.best_block_hash), Some(0))?;
240
241        for tx in since_result.transactions {
242            if let Some(ref addr) = tx.address {
243                if watched.contains(addr) && tx.confirmations == 0 {
244                    // New unconfirmed transaction to watched address
245                    let event = MempoolEvent::TransactionDetected {
246                        txid: tx.txid.to_string(),
247                        address: addr.clone(),
248                        amount_sats: tx.amount,
249                        fee_sats: 0, // Would need additional lookup
250                    };
251                    events.push(event.clone());
252
253                    // Track this transaction
254                    self.track_transaction(&tx.txid.to_string()).await;
255                }
256            }
257        }
258
259        // Broadcast events
260        for event in &events {
261            let _ = self.event_tx.send(event.clone());
262        }
263
264        Ok(events)
265    }
266
267    /// Check for block reorganization
268    async fn check_for_reorg(&self) -> Result<Option<MempoolEvent>> {
269        let blockchain_info = self.client.get_blockchain_info()?;
270        let current_tip = ChainTip {
271            hash: blockchain_info.best_block_hash,
272            height: blockchain_info.blocks,
273            time: chrono::Utc::now().timestamp(),
274        };
275
276        let mut last_tip_guard = self.last_chain_tip.write().await;
277
278        // Take the previous tip to avoid borrow conflicts
279        let previous_tip = last_tip_guard.take();
280
281        let result = if let Some(prev) = previous_tip {
282            // Check if we're on a different chain
283            if prev.hash != current_tip.hash && current_tip.height <= prev.height {
284                let depth = (prev.height - current_tip.height + 1) as u32;
285
286                if depth >= self.config.min_reorg_depth {
287                    // Get affected transactions from our tracked list
288                    let tracked = self.tracked_transactions.read().await;
289                    let affected_txids: Vec<String> = tracked.keys().cloned().collect();
290
291                    tracing::warn!(
292                        old_tip = %prev.hash,
293                        new_tip = %current_tip.hash,
294                        depth = depth,
295                        "Block reorganization detected"
296                    );
297
298                    Some(MempoolEvent::Reorganization {
299                        old_tip: prev.hash.to_string(),
300                        new_tip: blockchain_info.best_block_hash.to_string(),
301                        depth,
302                        affected_txids,
303                    })
304                } else {
305                    None
306                }
307            } else {
308                None
309            }
310        } else {
311            None
312        };
313
314        *last_tip_guard = Some(current_tip);
315        Ok(result)
316    }
317
318    /// Start the background monitoring task
319    pub async fn run(&self, mut shutdown: tokio::sync::watch::Receiver<bool>) {
320        let poll_interval = std::time::Duration::from_secs(self.config.poll_interval_secs);
321
322        tracing::info!(
323            poll_interval_secs = self.config.poll_interval_secs,
324            "Mempool monitor started"
325        );
326
327        loop {
328            tokio::select! {
329                _ = tokio::time::sleep(poll_interval) => {
330                    if let Err(e) = self.poll().await {
331                        tracing::warn!(error = %e, "Mempool poll failed");
332                    }
333                }
334                _ = shutdown.changed() => {
335                    if *shutdown.borrow() {
336                        tracing::info!("Mempool monitor shutting down");
337                        break;
338                    }
339                }
340            }
341        }
342    }
343}
344
345/// Mempool statistics
346#[derive(Debug, Clone, Serialize)]
347pub struct MempoolStats {
348    /// Number of transactions in mempool
349    pub size: u64,
350    /// Total size in bytes
351    pub bytes: u64,
352    /// Memory usage
353    pub usage: u64,
354    /// Maximum mempool size
355    pub max_mempool: u64,
356    /// Minimum fee rate (sat/vB)
357    pub mempool_min_fee: f64,
358    /// Minimum relay fee (sat/vB)
359    pub min_relay_fee: f64,
360}
361
362/// Block reorganization tracker
363pub struct ReorgTracker {
364    /// Recent block hashes by height
365    block_history: HashMap<u64, BlockHash>,
366    /// Maximum history depth
367    max_depth: usize,
368    /// Confirmed transactions that might be affected by reorg
369    confirmed_transactions: HashMap<String, u64>, // txid -> confirmed_height
370}
371
372impl ReorgTracker {
373    /// Create a new reorg tracker
374    pub fn new(max_depth: usize) -> Self {
375        Self {
376            block_history: HashMap::new(),
377            max_depth,
378            confirmed_transactions: HashMap::new(),
379        }
380    }
381
382    /// Record a block at a specific height
383    pub fn record_block(&mut self, height: u64, hash: BlockHash) {
384        self.block_history.insert(height, hash);
385
386        // Prune old blocks beyond max_depth
387        let min_height = height.saturating_sub(self.max_depth as u64);
388        self.block_history.retain(|h, _| *h >= min_height);
389    }
390
391    /// Record a confirmed transaction
392    pub fn record_confirmation(&mut self, txid: &str, height: u64) {
393        self.confirmed_transactions.insert(txid.to_string(), height);
394
395        // Prune old confirmations
396        let min_height = height.saturating_sub(self.max_depth as u64);
397        self.confirmed_transactions.retain(|_, h| *h >= min_height);
398    }
399
400    /// Check if a block hash matches our recorded hash for that height
401    pub fn verify_block(&self, height: u64, hash: &BlockHash) -> bool {
402        self.block_history.get(&height).is_none_or(|h| h == hash)
403    }
404
405    /// Get transactions that would be affected by a reorg to a specific height
406    pub fn get_affected_transactions(&self, reorg_height: u64) -> Vec<String> {
407        self.confirmed_transactions
408            .iter()
409            .filter(|(_, h)| **h >= reorg_height)
410            .map(|(txid, _)| txid.clone())
411            .collect()
412    }
413
414    /// Detect if a reorganization occurred
415    pub fn detect_reorg(
416        &self,
417        client: &BitcoinClient,
418        current_height: u64,
419    ) -> Result<Option<ReorgInfo>> {
420        // Check our recorded blocks against the chain
421        for (height, expected_hash) in &self.block_history {
422            if *height <= current_height {
423                let actual_hash = client.get_block_hash(*height)?;
424                if actual_hash != *expected_hash {
425                    // Found a divergence point
426                    let depth = (current_height - height + 1) as u32;
427                    let affected = self.get_affected_transactions(*height);
428
429                    return Ok(Some(ReorgInfo {
430                        divergence_height: *height,
431                        depth,
432                        expected_hash: expected_hash.to_string(),
433                        actual_hash: actual_hash.to_string(),
434                        affected_transactions: affected,
435                    }));
436                }
437            }
438        }
439
440        Ok(None)
441    }
442}
443
444/// Reorganization information
445#[derive(Debug, Clone, Serialize)]
446pub struct ReorgInfo {
447    /// Height where chain diverged
448    pub divergence_height: u64,
449    /// Depth of reorganization
450    pub depth: u32,
451    /// Expected block hash at divergence
452    pub expected_hash: String,
453    /// Actual block hash at divergence
454    pub actual_hash: String,
455    /// Transactions that may have been unconfirmed
456    pub affected_transactions: Vec<String>,
457}
458
459/// Unconfirmed transaction watcher for specific addresses
460pub struct AddressWatcher {
461    client: Arc<BitcoinClient>,
462    addresses: HashSet<String>,
463    /// Last known unconfirmed amounts by address
464    last_unconfirmed: HashMap<String, u64>,
465}
466
467impl AddressWatcher {
468    /// Create a new address watcher
469    pub fn new(client: Arc<BitcoinClient>) -> Self {
470        Self {
471            client,
472            addresses: HashSet::new(),
473            last_unconfirmed: HashMap::new(),
474        }
475    }
476
477    /// Add an address to watch
478    pub fn watch(&mut self, address: &str) {
479        self.addresses.insert(address.to_string());
480    }
481
482    /// Remove an address from watch
483    pub fn unwatch(&mut self, address: &str) {
484        self.addresses.remove(address);
485        self.last_unconfirmed.remove(address);
486    }
487
488    /// Check for new unconfirmed payments to watched addresses
489    pub fn check_unconfirmed(&mut self) -> Result<Vec<UnconfirmedPayment>> {
490        let mut new_payments = Vec::new();
491
492        for address in &self.addresses {
493            let addr: bitcoin::Address<bitcoin::address::NetworkUnchecked> = address
494                .parse()
495                .map_err(|e| BitcoinError::InvalidAddress(format!("{:?}", e)))?;
496
497            let checked_addr = addr.assume_checked();
498
499            // Get unconfirmed amount (0 confirmations)
500            let unconfirmed = self
501                .client
502                .get_received_by_address(&checked_addr, Some(0))?
503                .to_sat();
504
505            // Get confirmed amount
506            let confirmed = self
507                .client
508                .get_received_by_address(&checked_addr, Some(1))?
509                .to_sat();
510
511            // Unconfirmed balance is total minus confirmed
512            let unconfirmed_balance = unconfirmed.saturating_sub(confirmed);
513
514            // Check if this is new unconfirmed balance
515            let last = self.last_unconfirmed.get(address).copied().unwrap_or(0);
516            if unconfirmed_balance > last {
517                let new_amount = unconfirmed_balance - last;
518                new_payments.push(UnconfirmedPayment {
519                    address: address.clone(),
520                    amount_sats: new_amount,
521                    detected_at: chrono::Utc::now().timestamp(),
522                });
523
524                tracing::info!(
525                    address = address,
526                    amount_sats = new_amount,
527                    "New unconfirmed payment detected"
528                );
529            }
530
531            self.last_unconfirmed
532                .insert(address.clone(), unconfirmed_balance);
533        }
534
535        Ok(new_payments)
536    }
537
538    /// Get list of watched addresses
539    pub fn watched_addresses(&self) -> Vec<String> {
540        self.addresses.iter().cloned().collect()
541    }
542}
543
544/// Unconfirmed payment notification
545#[derive(Debug, Clone, Serialize)]
546pub struct UnconfirmedPayment {
547    pub address: String,
548    pub amount_sats: u64,
549    pub detected_at: i64,
550}