use std::{convert::TryFrom, sync::Arc};
use futures::{Stream, pin_mut, stream::StreamExt};
use log::*;
use tari_comms::peer_manager::NodeId;
use tari_comms_dht::{
domain_message::OutboundDomainMessage,
envelope::NodeDestination,
outbound::{DhtOutboundError, OutboundEncryption, OutboundMessageRequester},
};
use tari_p2p::{domain_message::DomainMessage, tari_message::TariMessageType};
use tari_service_framework::{reply_channel, reply_channel::RequestContext};
use tari_transaction_components::transaction_components::Transaction;
use tari_utilities::hex::Hex;
use tokio::{sync::mpsc, task};
use crate::{
base_node::comms_interface::{BlockEvent, BlockEventReceiver},
mempool::service::{
MempoolRequest,
MempoolResponse,
error::MempoolServiceError,
inbound_handlers::MempoolInboundHandlers,
},
proto,
};
const LOG_TARGET: &str = "c::mempool::service::service";
pub struct MempoolStreams<STxIn, SLocalReq> {
pub outbound_tx_stream: mpsc::UnboundedReceiver<(Arc<Transaction>, Vec<NodeId>)>,
pub inbound_transaction_stream: STxIn,
pub local_request_stream: SLocalReq,
pub block_event_stream: BlockEventReceiver,
pub request_receiver: reply_channel::TryReceiver<MempoolRequest, MempoolResponse, MempoolServiceError>,
}
pub struct MempoolService {
outbound_message_service: OutboundMessageRequester,
inbound_handlers: MempoolInboundHandlers,
}
impl MempoolService {
pub fn new(outbound_message_service: OutboundMessageRequester, inbound_handlers: MempoolInboundHandlers) -> Self {
Self {
outbound_message_service,
inbound_handlers,
}
}
pub async fn start<STxIn, SLocalReq>(
mut self,
streams: MempoolStreams<STxIn, SLocalReq>,
) -> Result<(), MempoolServiceError>
where
STxIn: Stream<Item = DomainMessage<Transaction>>,
SLocalReq: Stream<Item = RequestContext<MempoolRequest, Result<MempoolResponse, MempoolServiceError>>>,
{
let mut outbound_tx_stream = streams.outbound_tx_stream;
let inbound_transaction_stream = streams.inbound_transaction_stream.fuse();
pin_mut!(inbound_transaction_stream);
let local_request_stream = streams.local_request_stream.fuse();
pin_mut!(local_request_stream);
let mut block_event_stream = streams.block_event_stream;
let mut request_receiver = streams.request_receiver;
loop {
tokio::select! {
Some(request) = request_receiver.next() => {
let (request, reply) = request.split();
let _result = reply.send(self.handle_request(request).await);
},
Some((txn, excluded_peers)) = outbound_tx_stream.recv() => {
let _res = self.handle_outbound_tx(txn, excluded_peers).await.map_err(|e|
error!(target: LOG_TARGET, "Error sending outbound tx message: {e}")
);
},
Some(transaction_msg) = inbound_transaction_stream.next() => self.handle_incoming_tx(transaction_msg),
Some(local_request_context) = local_request_stream.next() => {
self.spawn_handle_local_request(local_request_context);
},
block_event = block_event_stream.recv() => {
if let Ok(block_event) = block_event {
self.spawn_handle_block_event(block_event);
}
},
else => {
info!(target: LOG_TARGET, "Mempool service shutting down");
break;
}
}
}
Ok(())
}
async fn handle_request(&mut self, request: MempoolRequest) -> Result<MempoolResponse, MempoolServiceError> {
self.inbound_handlers.handle_request(request).await
}
fn spawn_handle_local_request(
&self,
request_context: RequestContext<MempoolRequest, Result<MempoolResponse, MempoolServiceError>>,
) {
let mut inbound_handlers = self.inbound_handlers.clone();
task::spawn(async move {
let (request, reply_tx) = request_context.split();
let result = reply_tx.send(inbound_handlers.handle_request(request).await);
if let Err(res) = result {
error!(
target: LOG_TARGET,
"MempoolService failed to send reply to local request {:?}",
res.map(|r| r.to_string()).map_err(|e| e.to_string())
);
}
});
}
fn spawn_handle_block_event(&self, block_event: Arc<BlockEvent>) {
let mut inbound_handlers = self.inbound_handlers.clone();
task::spawn(async move {
let result = inbound_handlers.handle_block_event(&block_event).await;
if let Err(e) = result {
error!(target: LOG_TARGET, "Failed to handle base node block event: {e}");
}
});
}
fn handle_incoming_tx(&self, domain_transaction_msg: DomainMessage<Transaction>) {
let DomainMessage::<_> { source_peer, inner, .. } = domain_transaction_msg;
debug!(
"New transaction received: {}, from: {}",
inner
.first_kernel_excess_sig()
.map(|s| s.get_signature().to_hex())
.unwrap_or_else(|| "No kernels!".to_string()),
source_peer.public_key,
);
trace!(
target: LOG_TARGET,
"New transaction: {}, from: {}",
inner,
source_peer.public_key
);
let mut inbound_handlers = self.inbound_handlers.clone();
task::spawn(async move {
let result = inbound_handlers
.handle_transaction(inner, Some(source_peer.node_id))
.await;
if let Err(e) = result {
error!(
target: LOG_TARGET,
"Failed to handle incoming transaction message: {e:?}"
);
}
});
}
async fn handle_outbound_tx(
&mut self,
tx: Arc<Transaction>,
exclude_peers: Vec<NodeId>,
) -> Result<(), MempoolServiceError> {
let result = self
.outbound_message_service
.flood(
NodeDestination::Unknown,
OutboundEncryption::ClearText,
exclude_peers,
OutboundDomainMessage::new(
&TariMessageType::NewTransaction,
proto::types::Transaction::try_from(tx.clone()).map_err(MempoolServiceError::ConversionError)?,
),
format!(
"Outbound mempool tx: {}",
tx.first_kernel_excess_sig()
.map(|s| s.get_signature().to_hex())
.unwrap_or_else(|| "No kernels!".to_string())
),
)
.await;
match result {
Ok(_) => Ok(()),
Err(DhtOutboundError::NoMessagesQueued) => Ok(()),
Err(e) => {
error!(target: LOG_TARGET, "Handle outbound tx failure. {e:?}");
Err(MempoolServiceError::OutboundMessageService(e.to_string()))
},
}
}
}