use std::sync::Arc;
use log::*;
use tari_comms::peer_manager::NodeId;
use tari_transaction_components::transaction_components::Transaction;
use tari_utilities::hex::Hex;
#[cfg(feature = "metrics")]
use crate::mempool::metrics;
use crate::{
base_node::comms_interface::{BlockEvent, BlockEvent::AddBlockErrored},
chain_storage::BlockAddResult,
mempool::{
Mempool,
TxStorageResponse,
service::{MempoolRequest, MempoolResponse, MempoolServiceError, OutboundMempoolServiceInterface},
},
};
pub const LOG_TARGET: &str = "c::mp::service::inbound_handlers";
#[derive(Clone)]
pub struct MempoolInboundHandlers {
mempool: Mempool,
outbound_service: OutboundMempoolServiceInterface,
}
impl MempoolInboundHandlers {
pub fn new(mempool: Mempool, outbound_service: OutboundMempoolServiceInterface) -> Self {
Self {
mempool,
outbound_service,
}
}
pub async fn handle_request(&mut self, request: MempoolRequest) -> Result<MempoolResponse, MempoolServiceError> {
trace!(target: LOG_TARGET, "Handling remote request: {request}");
use MempoolRequest::{
FilterOutputsInMempool,
GetFeePerGramStats,
GetState,
GetStats,
GetTxStateByExcessSig,
SubmitTransaction,
};
match request {
GetStats => Ok(MempoolResponse::Stats(self.mempool.stats().await?)),
GetState => Ok(MempoolResponse::State(self.mempool.state().await?)),
GetTxStateByExcessSig(excess_sig) => Ok(MempoolResponse::TxStorage(
self.mempool.has_tx_with_excess_sig(excess_sig).await?,
)),
SubmitTransaction(tx) => {
let first_tx_kernel_excess_sig = tx
.first_kernel_excess_sig()
.ok_or(MempoolServiceError::TransactionNoKernels)?
.get_signature()
.to_hex();
debug!(
target: LOG_TARGET,
"Transaction ({first_tx_kernel_excess_sig}) submitted using request."
);
Ok(MempoolResponse::TxStorage(self.submit_transaction(tx, None).await?))
},
GetFeePerGramStats { count, tip_height } => {
let stats = self.mempool.get_fee_per_gram_stats(count, tip_height).await?;
Ok(MempoolResponse::FeePerGramStats { response: stats })
},
FilterOutputsInMempool(hashes) => Ok(MempoolResponse::FilteredOutputs(
self.mempool.filter_outputs_in_mempool(hashes).await?,
)),
}
}
pub async fn handle_transaction(
&mut self,
tx: Transaction,
source_peer: Option<NodeId>,
) -> Result<(), MempoolServiceError> {
let first_tx_kernel_excess_sig = tx
.first_kernel_excess_sig()
.ok_or(MempoolServiceError::TransactionNoKernels)?
.get_signature()
.to_hex();
debug!(
target: LOG_TARGET,
"Transaction ({}) received from {}.",
first_tx_kernel_excess_sig,
source_peer
.as_ref()
.map(|p| format!("remote peer: {p}"))
.unwrap_or_else(|| "local services".to_string())
);
self.submit_transaction(tx, source_peer).await?;
Ok(())
}
async fn submit_transaction(
&mut self,
tx: Transaction,
source_peer: Option<NodeId>,
) -> Result<TxStorageResponse, MempoolServiceError> {
trace!(target: LOG_TARGET, "submit_transaction: {tx}");
let tx = Arc::new(tx);
let tx_storage = self.mempool.has_transaction(tx.clone()).await?;
let kernel_excess_sig = tx
.first_kernel_excess_sig()
.ok_or(MempoolServiceError::TransactionNoKernels)?
.get_signature()
.to_hex();
if tx_storage.is_stored() {
debug!(
target: LOG_TARGET,
"Mempool already has transaction: {kernel_excess_sig}"
);
return Ok(tx_storage);
}
match self.mempool.insert(tx.clone()).await {
Ok(tx_storage) => {
#[cfg(feature = "metrics")]
if tx_storage.is_stored() {
metrics::inbound_transactions().inc();
} else {
metrics::rejected_inbound_transactions().inc();
}
self.update_pool_size_metrics().await;
debug!(
target: LOG_TARGET,
"Transaction inserted into mempool: {kernel_excess_sig}, pool: {tx_storage}"
);
if matches!(tx_storage, TxStorageResponse::UnconfirmedPool) {
debug!(
target: LOG_TARGET,
"Propagate transaction ({kernel_excess_sig}) to network."
);
self.outbound_service
.propagate_tx(tx, source_peer.into_iter().collect())
.await?;
}
Ok(tx_storage)
},
Err(e) => Err(MempoolServiceError::MempoolError(e)),
}
}
#[allow(clippy::cast_possible_wrap)]
async fn update_pool_size_metrics(&self) {
#[cfg(feature = "metrics")]
if let Ok(stats) = self.mempool.stats().await {
metrics::unconfirmed_pool_size().set(stats.unconfirmed_txs as i64);
metrics::reorg_pool_size().set(stats.reorg_txs as i64);
}
}
pub async fn handle_block_event(&mut self, block_event: &BlockEvent) -> Result<(), MempoolServiceError> {
use BlockEvent::{AddBlockValidationFailed, BlockSyncComplete, BlockSyncRewind, ValidBlockAdded};
match block_event {
ValidBlockAdded(block, BlockAddResult::Ok(_)) => {
self.mempool.process_published_block(block.clone()).await?;
},
ValidBlockAdded(_, BlockAddResult::ChainReorg { added, removed }) => {
self.mempool
.process_reorg(
removed.iter().map(|b| b.to_arc_block()).collect(),
added.iter().map(|b| b.to_arc_block()).collect(),
)
.await?;
},
ValidBlockAdded(_, _) => {},
BlockSyncRewind(_) => {},
BlockSyncComplete(_, _) => {
self.mempool.process_sync().await?;
},
AddBlockValidationFailed {
block: failed_block,
source_peer,
} => {
if source_peer.is_none() {
self.mempool
.clear_transactions_for_failed_block(failed_block.clone())
.await?;
}
},
AddBlockErrored { .. } => {},
}
self.update_pool_size_metrics().await;
Ok(())
}
}