use bitcoin::Txid;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, broadcast};
use crate::client::BitcoinClient;
use crate::error::{BitcoinError, Result};
use crate::tx_parser::{ParsedTransaction, TransactionParser};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RbfStatus {
NonReplaceable,
Replaceable,
Replaced,
Confirmed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RbfReplacement {
pub original_txid: String,
pub replacement_txid: String,
pub old_fee_sats: Option<u64>,
pub new_fee_sats: Option<u64>,
pub fee_increase_sats: Option<u64>,
pub detected_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone)]
pub struct TrackedRbfTransaction {
pub txid: String,
pub input_outpoints: Vec<String>,
pub status: RbfStatus,
pub fee_sats: Option<u64>,
pub order_id: Option<String>,
pub first_seen: chrono::DateTime<chrono::Utc>,
pub status_changed: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RbfEvent {
TransactionReplaced(RbfReplacement),
TransactionConfirmed {
txid: String,
confirmations: u32,
},
TransactionDropped {
txid: String,
reason: String,
},
}
pub struct RbfTracker {
client: Arc<BitcoinClient>,
parser: TransactionParser,
tracked: Arc<RwLock<HashMap<String, TrackedRbfTransaction>>>,
outpoint_index: Arc<RwLock<HashMap<String, String>>>,
event_tx: broadcast::Sender<RbfEvent>,
}
impl RbfTracker {
pub fn new(client: Arc<BitcoinClient>) -> Self {
let (event_tx, _) = broadcast::channel(100);
Self {
parser: TransactionParser::new(client.clone()),
client,
tracked: Arc::new(RwLock::new(HashMap::new())),
outpoint_index: Arc::new(RwLock::new(HashMap::new())),
event_tx,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<RbfEvent> {
self.event_tx.subscribe()
}
pub async fn track_transaction(
&self,
txid: &Txid,
order_id: Option<String>,
) -> Result<TrackedRbfTransaction> {
let parsed = self.parser.parse_transaction(txid)?;
let status = if parsed.confirmations > 0 {
RbfStatus::Confirmed
} else if parsed.is_rbf {
RbfStatus::Replaceable
} else {
RbfStatus::NonReplaceable
};
let input_outpoints: Vec<String> = parsed
.inputs
.iter()
.map(|input| format!("{}:{}", input.prev_txid, input.prev_vout))
.collect();
let now = chrono::Utc::now();
let tracked = TrackedRbfTransaction {
txid: txid.to_string(),
input_outpoints: input_outpoints.clone(),
status,
fee_sats: parsed.fee_sats,
order_id,
first_seen: now,
status_changed: now,
};
{
let mut tracked_map = self.tracked.write().await;
tracked_map.insert(txid.to_string(), tracked.clone());
}
{
let mut index = self.outpoint_index.write().await;
for outpoint in input_outpoints {
index.insert(outpoint, txid.to_string());
}
}
tracing::info!(
txid = %txid,
status = ?status,
is_rbf = parsed.is_rbf,
"Tracking transaction for RBF"
);
Ok(tracked)
}
pub async fn check_replacement(&self, txid: &Txid) -> Result<Option<RbfReplacement>> {
let txid_str = txid.to_string();
let tracked = {
let tracked_map = self.tracked.read().await;
tracked_map.get(&txid_str).cloned()
};
let tracked = match tracked {
Some(t) => t,
None => return Ok(None),
};
if matches!(tracked.status, RbfStatus::Confirmed | RbfStatus::Replaced) {
return Ok(None);
}
match self.client.get_raw_transaction(txid) {
Ok(raw_tx) => {
if raw_tx.confirmations.unwrap_or(0) > 0 {
self.update_status(&txid_str, RbfStatus::Confirmed).await;
let _ = self.event_tx.send(RbfEvent::TransactionConfirmed {
txid: txid_str,
confirmations: raw_tx.confirmations.unwrap_or(0),
});
}
Ok(None)
}
Err(BitcoinError::Rpc(_)) => {
self.detect_replacement(&tracked).await
}
Err(e) => Err(e),
}
}
async fn detect_replacement(
&self,
tracked: &TrackedRbfTransaction,
) -> Result<Option<RbfReplacement>> {
for _outpoint in &tracked.input_outpoints {
}
self.update_status(&tracked.txid, RbfStatus::Replaced).await;
let _ = self.event_tx.send(RbfEvent::TransactionDropped {
txid: tracked.txid.clone(),
reason: "Transaction no longer in mempool".to_string(),
});
Ok(None)
}
async fn update_status(&self, txid: &str, new_status: RbfStatus) {
let mut tracked_map = self.tracked.write().await;
if let Some(tracked) = tracked_map.get_mut(txid) {
tracked.status = new_status;
tracked.status_changed = chrono::Utc::now();
}
}
pub async fn get_status(&self, txid: &str) -> Option<RbfStatus> {
let tracked_map = self.tracked.read().await;
tracked_map.get(txid).map(|t| t.status)
}
pub async fn untrack(&self, txid: &str) {
let tracked = {
let mut tracked_map = self.tracked.write().await;
tracked_map.remove(txid)
};
if let Some(tracked) = tracked {
let mut index = self.outpoint_index.write().await;
for outpoint in tracked.input_outpoints {
index.remove(&outpoint);
}
}
}
pub async fn check_all(&self) -> Vec<RbfEvent> {
let txids: Vec<String> = {
let tracked_map = self.tracked.read().await;
tracked_map.keys().cloned().collect()
};
let mut events = Vec::new();
for txid_str in txids {
if let Ok(txid) = txid_str.parse::<Txid>() {
if let Ok(Some(replacement)) = self.check_replacement(&txid).await {
events.push(RbfEvent::TransactionReplaced(replacement));
}
}
}
events
}
pub async fn check_conflict(&self, new_tx: &ParsedTransaction) -> Option<String> {
let index = self.outpoint_index.read().await;
for input in &new_tx.inputs {
let outpoint = format!("{}:{}", input.prev_txid, input.prev_vout);
if let Some(existing_txid) = index.get(&outpoint) {
if existing_txid != &new_tx.txid {
return Some(existing_txid.clone());
}
}
}
None
}
pub async fn handle_replacement(
&self,
original_txid: &str,
replacement_txid: &Txid,
) -> Result<RbfReplacement> {
let tracked = {
let tracked_map = self.tracked.read().await;
tracked_map.get(original_txid).cloned()
};
let tracked =
tracked.ok_or_else(|| BitcoinError::TransactionNotFound(original_txid.to_string()))?;
let replacement = self.parser.parse_transaction(replacement_txid)?;
let fee_increase = match (tracked.fee_sats, replacement.fee_sats) {
(Some(old), Some(new)) => Some(new.saturating_sub(old)),
_ => None,
};
let rbf_replacement = RbfReplacement {
original_txid: original_txid.to_string(),
replacement_txid: replacement_txid.to_string(),
old_fee_sats: tracked.fee_sats,
new_fee_sats: replacement.fee_sats,
fee_increase_sats: fee_increase,
detected_at: chrono::Utc::now(),
};
self.update_status(original_txid, RbfStatus::Replaced).await;
self.track_transaction(replacement_txid, tracked.order_id.clone())
.await?;
let _ = self
.event_tx
.send(RbfEvent::TransactionReplaced(rbf_replacement.clone()));
tracing::warn!(
original_txid = original_txid,
replacement_txid = %replacement_txid,
fee_increase = ?fee_increase,
"Transaction replaced via RBF"
);
Ok(rbf_replacement)
}
}
#[derive(Debug, Clone)]
pub struct RbfConfig {
pub min_fee_bump_percent: u32,
pub check_interval_secs: u64,
pub max_track_age_secs: u64,
}
impl Default for RbfConfig {
fn default() -> Self {
Self {
min_fee_bump_percent: 10,
check_interval_secs: 60,
max_track_age_secs: 86400 * 7, }
}
}
pub struct RbfTransactionBuilder {
#[allow(dead_code)]
original: Option<ParsedTransaction>,
#[allow(dead_code)]
outputs: Vec<(String, u64)>,
target_fee_rate: f64,
}
impl RbfTransactionBuilder {
pub fn new() -> Self {
Self {
original: None,
outputs: Vec::new(),
target_fee_rate: 10.0,
}
}
pub fn replace(mut self, tx: ParsedTransaction) -> Self {
self.original = Some(tx);
self
}
pub fn fee_rate(mut self, sat_per_vb: f64) -> Self {
self.target_fee_rate = sat_per_vb;
self
}
pub fn add_output(mut self, address: String, amount_sats: u64) -> Self {
self.outputs.push((address, amount_sats));
self
}
pub fn calculate_replacement_fee(&self) -> Option<u64> {
let original = self.original.as_ref()?;
let original_fee = original.fee_sats?;
let min_fee = original_fee + original.vsize;
let target_fee = (self.target_fee_rate * original.vsize as f64) as u64;
Some(std::cmp::max(min_fee, target_fee))
}
}
impl Default for RbfTransactionBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rbf_config_defaults() {
let config = RbfConfig::default();
assert_eq!(config.min_fee_bump_percent, 10);
assert_eq!(config.check_interval_secs, 60);
}
#[test]
fn test_rbf_status() {
assert_ne!(RbfStatus::Replaceable, RbfStatus::Confirmed);
assert_ne!(RbfStatus::NonReplaceable, RbfStatus::Replaced);
}
}