use crate::node::mempool::MempoolManager;
use crate::payment::covenant::{CovenantEngine, CovenantProof};
use crate::payment::processor::PaymentError;
use crate::rpc::errors::STORAGE_NOT_AVAILABLE_MSG;
use crate::storage::Storage;
use crate::utils::current_timestamp;
use blvm_protocol::payment::PaymentOutput;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info};
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum TransactionPriority {
Low = 1,
Normal = 2,
High = 3,
Urgent = 4,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BatchConfig {
pub max_batch_size: usize,
pub target_fee_rate: u64,
pub max_wait_time: u64,
pub min_fee_rate: u64,
}
impl Default for BatchConfig {
fn default() -> Self {
Self {
max_batch_size: 100,
target_fee_rate: 1, max_wait_time: 3600, min_fee_rate: 1, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PendingTransaction {
pub tx_id: String,
pub outputs: Vec<PaymentOutput>,
pub priority: TransactionPriority,
pub created_at: u64,
pub deadline: Option<u64>, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransactionBatch {
pub batch_id: String,
pub transactions: Vec<PendingTransaction>,
pub covenant_template: Option<CovenantProof>,
pub target_fee_rate: u64,
pub created_at: u64,
pub ready_to_broadcast: bool,
pub broadcast_at: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CongestionMetrics {
pub mempool_size: usize,
pub avg_fee_rate: u64,
pub median_fee_rate: u64,
pub estimated_blocks: u32,
pub collected_at: u64,
}
pub struct CongestionManager {
covenant_engine: Arc<CovenantEngine>,
mempool_manager: Option<Arc<MempoolManager>>,
storage: Option<Arc<Storage>>,
batches: HashMap<String, TransactionBatch>,
config: BatchConfig,
}
impl CongestionManager {
pub fn new(
covenant_engine: Arc<CovenantEngine>,
mempool_manager: Option<Arc<MempoolManager>>,
storage: Option<Arc<Storage>>,
config: BatchConfig,
) -> Self {
let manager = Self {
covenant_engine,
mempool_manager,
storage,
batches: HashMap::new(),
config,
};
if let Err(e) = manager.load_all_batches() {
tracing::warn!("Failed to load batches from storage: {}", e);
}
manager
}
fn load_all_batches(&self) -> Result<(), PaymentError> {
let storage = self
.storage
.as_ref()
.ok_or_else(|| PaymentError::ProcessingError(STORAGE_NOT_AVAILABLE_MSG.to_string()))?;
let batches_tree = storage.open_tree("batches").map_err(|e| {
PaymentError::ProcessingError(format!("Failed to open batches tree: {}", e))
})?;
let _count = batches_tree.len().unwrap_or(0);
tracing::info!("Found {} batches in storage", _count);
Ok(())
}
fn save_batch(&self, batch: &TransactionBatch) -> Result<(), PaymentError> {
if let Some(storage) = &self.storage {
let batches_tree = storage.open_tree("batches").map_err(|e| {
PaymentError::ProcessingError(format!("Failed to open batches tree: {}", e))
})?;
let key = batch.batch_id.as_bytes();
let value = bincode::serialize(batch).map_err(|e| {
PaymentError::ProcessingError(format!("Failed to serialize batch: {}", e))
})?;
batches_tree.insert(key, &value).map_err(|e| {
PaymentError::ProcessingError(format!("Failed to save batch: {}", e))
})?;
}
Ok(())
}
pub fn create_batch(&mut self, batch_id: &str, target_fee_rate: Option<u64>) -> String {
let target = target_fee_rate.unwrap_or(self.config.target_fee_rate);
let created_at = crate::utils::current_timestamp();
let batch = TransactionBatch {
batch_id: batch_id.to_string(),
transactions: Vec::new(),
covenant_template: None,
target_fee_rate: target,
created_at,
ready_to_broadcast: false,
broadcast_at: None,
};
self.batches.insert(batch_id.to_string(), batch);
info!(
"Created transaction batch: {} with target fee rate: {}",
batch_id, target
);
batch_id.to_string()
}
pub fn add_to_batch(
&mut self,
batch_id: &str,
tx: PendingTransaction,
) -> Result<(), PaymentError> {
let (batch_clone, tx_count) = {
let batch = self.batches.get_mut(batch_id).ok_or_else(|| {
PaymentError::ProcessingError(format!("Batch {} not found", batch_id))
})?;
if batch.transactions.len() >= self.config.max_batch_size {
return Err(PaymentError::ProcessingError(format!(
"Batch {} is full: {} transactions",
batch_id, self.config.max_batch_size
)));
}
batch.transactions.push(tx);
let batch_clone = batch.clone();
let tx_count = batch.transactions.len();
(batch_clone, tx_count)
};
if let Err(e) = self.save_batch(&batch_clone) {
tracing::warn!("Failed to save batch {} to storage: {}", batch_id, e);
}
if tx_count >= self.config.max_batch_size {
self.update_batch_covenant(batch_id)?;
}
Ok(())
}
pub fn check_congestion(&self) -> Result<CongestionMetrics, PaymentError> {
let mempool_manager = self.mempool_manager.as_ref().ok_or_else(|| {
PaymentError::ProcessingError("Mempool manager not available".to_string())
})?;
let mempool_size = mempool_manager.size();
let transactions = mempool_manager.get_transactions();
let avg_fee_rate = if transactions.is_empty() {
self.config.min_fee_rate
} else {
let base_fee = self.config.min_fee_rate;
let congestion_multiplier = (mempool_size as f64 / 1000.0).min(10.0) as u64;
base_fee + congestion_multiplier
};
let median_fee_rate = avg_fee_rate;
let estimated_blocks = if avg_fee_rate >= self.config.target_fee_rate {
1 } else {
(self.config.target_fee_rate / avg_fee_rate.max(1)) as u32
};
Ok(CongestionMetrics {
mempool_size,
avg_fee_rate,
median_fee_rate,
estimated_blocks,
collected_at: current_timestamp(),
})
}
pub fn should_broadcast(&self, batch_id: &str) -> Result<bool, PaymentError> {
let batch = self.batches.get(batch_id).ok_or_else(|| {
PaymentError::ProcessingError(format!("Batch {} not found", batch_id))
})?;
if batch.transactions.is_empty() {
return Ok(false);
}
if batch.transactions.len() >= self.config.max_batch_size {
return Ok(true);
}
if batch
.transactions
.iter()
.any(|tx| tx.priority == TransactionPriority::Urgent)
{
return Ok(true);
}
let now = current_timestamp();
if now - batch.created_at >= self.config.max_wait_time {
return Ok(true);
}
if batch.transactions.iter().any(|tx| {
if let Some(deadline) = tx.deadline {
now >= deadline
} else {
false
}
}) {
return Ok(true);
}
let metrics = self.check_congestion()?;
if metrics.avg_fee_rate <= batch.target_fee_rate {
return Ok(true);
}
Ok(false)
}
pub fn broadcast_batch(&mut self, batch_id: &str) -> Result<CovenantProof, PaymentError> {
#[cfg(not(feature = "ctv"))]
{
return Err(PaymentError::FeatureNotEnabled(
"Congestion control requires CTV feature".to_string(),
));
}
#[cfg(feature = "ctv")]
{
let (needs_covenant, tx_count, target_fee) = {
let batch = self.batches.get_mut(batch_id).ok_or_else(|| {
PaymentError::ProcessingError(format!("Batch {} not found", batch_id))
})?;
if batch.transactions.is_empty() {
return Err(PaymentError::ProcessingError("Batch is empty".to_string()));
}
(
batch.covenant_template.is_none(),
batch.transactions.len(),
batch.target_fee_rate,
)
};
if needs_covenant {
self.update_batch_covenant(batch_id)?;
}
let covenant = {
let batch = self.batches.get_mut(batch_id).ok_or_else(|| {
PaymentError::ProcessingError(format!("Batch {} not found", batch_id))
})?;
let covenant = batch.covenant_template.as_ref().ok_or_else(|| {
PaymentError::ProcessingError("Batch covenant not created".to_string())
})?;
batch.ready_to_broadcast = true;
batch.broadcast_at = Some(current_timestamp());
covenant.clone()
};
info!(
"Batch {} ready to broadcast with {} transactions at fee rate {} sat/vbyte",
batch_id, tx_count, target_fee
);
Ok(covenant)
}
}
pub fn adjust_fee_rate(&mut self, batch_id: &str) -> Result<u64, PaymentError> {
let metrics = self.check_congestion()?;
let batch = self.batches.get_mut(batch_id).ok_or_else(|| {
PaymentError::ProcessingError(format!("Batch {} not found", batch_id))
})?;
let new_fee_rate = if metrics.avg_fee_rate > batch.target_fee_rate {
metrics.avg_fee_rate.max(self.config.min_fee_rate)
} else {
batch.target_fee_rate
};
batch.target_fee_rate = new_fee_rate;
debug!(
"Adjusted fee rate for batch {} to {} sat/vbyte",
batch_id, new_fee_rate
);
Ok(new_fee_rate)
}
pub fn get_batch(&self, batch_id: &str) -> Option<&TransactionBatch> {
self.batches.get(batch_id)
}
pub fn list_batches(&self) -> Vec<&TransactionBatch> {
self.batches.values().collect()
}
fn update_batch_covenant(&mut self, batch_id: &str) -> Result<(), PaymentError> {
let batch = self.batches.get_mut(batch_id).ok_or_else(|| {
PaymentError::ProcessingError(format!("Batch {} not found", batch_id))
})?;
let mut all_outputs = Vec::new();
for tx in &batch.transactions {
all_outputs.extend(tx.outputs.clone());
}
let covenant = self.covenant_engine.create_payment_covenant(
&format!("{}_batch", batch_id),
&all_outputs,
None,
)?;
batch.covenant_template = Some(covenant);
Ok(())
}
}