use crate::{
mempool::{
service::{MempoolRequest, MempoolResponse, MempoolServiceError},
StatsResponse,
TxStorageResponse,
},
transactions::{transaction::Transaction, types::Signature},
};
use futures::channel::mpsc::UnboundedSender;
use log::*;
use tari_comms::peer_manager::NodeId;
use tari_service_framework::{reply_channel::SenderService, Service};
pub const LOG_TARGET: &str = "c::mp::service::outbound_interface";
#[derive(Clone)]
pub struct OutboundMempoolServiceInterface {
request_sender: SenderService<MempoolRequest, Result<MempoolResponse, MempoolServiceError>>,
tx_sender: UnboundedSender<(Transaction, Vec<NodeId>)>,
}
impl OutboundMempoolServiceInterface {
pub fn new(
request_sender: SenderService<MempoolRequest, Result<MempoolResponse, MempoolServiceError>>,
tx_sender: UnboundedSender<(Transaction, Vec<NodeId>)>,
) -> Self
{
Self {
request_sender,
tx_sender,
}
}
pub async fn get_stats(&mut self) -> Result<StatsResponse, MempoolServiceError> {
if let MempoolResponse::Stats(stats) = self.request_sender.call(MempoolRequest::GetStats).await?? {
trace!(target: LOG_TARGET, "Mempool stats requested: {:?}", stats,);
Ok(stats)
} else {
Err(MempoolServiceError::UnexpectedApiResponse)
}
}
pub async fn propagate_tx(
&mut self,
transaction: Transaction,
exclude_peers: Vec<NodeId>,
) -> Result<(), MempoolServiceError>
{
self.tx_sender
.unbounded_send((transaction, exclude_peers))
.or_else(|e| {
{
error!(target: LOG_TARGET, "Could not broadcast transaction. {:?}", e);
Err(e)
}
.map_err(|_| MempoolServiceError::BroadcastFailed)
})
}
pub async fn get_tx_state_by_excess_sig(
&mut self,
excess_sig: Signature,
) -> Result<TxStorageResponse, MempoolServiceError>
{
if let MempoolResponse::TxStorage(tx_storage_response) = self
.request_sender
.call(MempoolRequest::GetTxStateByExcessSig(excess_sig))
.await??
{
Ok(tx_storage_response)
} else {
Err(MempoolServiceError::UnexpectedApiResponse)
}
}
}