use crate::{
base_node::{comms_interface::LocalNodeCommsInterface, StateMachineHandle},
mempool::{
mempool::Mempool,
proto as mempool_proto,
service::{
inbound_handlers::MempoolInboundHandlers,
local_service::LocalMempoolService,
outbound_interface::OutboundMempoolServiceInterface,
service::{MempoolService, MempoolStreams},
MempoolHandle,
},
MempoolServiceConfig,
},
proto,
transactions::transaction::Transaction,
};
use futures::{channel::mpsc, future, Future, Stream, StreamExt};
use log::*;
use std::{convert::TryFrom, sync::Arc};
use tari_comms_dht::Dht;
use tari_p2p::{
comms_connector::{PeerMessage, SubscriptionFactory},
domain_message::DomainMessage,
services::utils::{map_decode, ok_or_skip_result},
tari_message::TariMessageType,
};
use tari_service_framework::{
reply_channel,
ServiceInitializationError,
ServiceInitializer,
ServiceInitializerContext,
};
use tokio::sync::broadcast;
const LOG_TARGET: &str = "c::bn::mempool_service::initializer";
const SUBSCRIPTION_LABEL: &str = "Mempool";
pub struct MempoolServiceInitializer {
inbound_message_subscription_factory: Arc<SubscriptionFactory>,
mempool: Mempool,
config: MempoolServiceConfig,
}
impl MempoolServiceInitializer {
pub fn new(
config: MempoolServiceConfig,
mempool: Mempool,
inbound_message_subscription_factory: Arc<SubscriptionFactory>,
) -> Self
{
Self {
inbound_message_subscription_factory,
mempool,
config,
}
}
fn inbound_request_stream(&self) -> impl Stream<Item = DomainMessage<mempool_proto::MempoolServiceRequest>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::MempoolRequest, SUBSCRIPTION_LABEL)
.map(map_decode::<mempool_proto::MempoolServiceRequest>)
.filter_map(ok_or_skip_result)
}
fn inbound_response_stream(&self) -> impl Stream<Item = DomainMessage<mempool_proto::MempoolServiceResponse>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::MempoolResponse, SUBSCRIPTION_LABEL)
.map(map_decode::<mempool_proto::MempoolServiceResponse>)
.filter_map(ok_or_skip_result)
}
fn inbound_transaction_stream(&self) -> impl Stream<Item = DomainMessage<Transaction>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::NewTransaction, SUBSCRIPTION_LABEL)
.filter_map(extract_transaction)
}
}
async fn extract_transaction(msg: Arc<PeerMessage>) -> Option<DomainMessage<Transaction>> {
match msg.decode_message::<proto::types::Transaction>() {
Err(e) => {
warn!(
target: LOG_TARGET,
"Could not decode inbound transaction message. {}",
e.to_string()
);
None
},
Ok(tx) => {
let tx = match Transaction::try_from(tx) {
Err(e) => {
warn!(
target: LOG_TARGET,
"Inbound transaction message from {} was ill-formed. {}", msg.source_peer.public_key, e
);
return None;
},
Ok(b) => b,
};
Some(DomainMessage {
source_peer: msg.source_peer.clone(),
dht_header: msg.dht_header.clone(),
authenticated_origin: msg.authenticated_origin.clone(),
inner: tx,
})
},
}
}
impl ServiceInitializer for MempoolServiceInitializer {
type Future = impl Future<Output = Result<(), ServiceInitializationError>>;
fn initialize(&mut self, context: ServiceInitializerContext) -> Self::Future {
let inbound_request_stream = self.inbound_request_stream();
let inbound_response_stream = self.inbound_response_stream();
let inbound_transaction_stream = self.inbound_transaction_stream();
let (request_sender, request_receiver) = reply_channel::unbounded();
let mempool_handle = MempoolHandle::new(request_sender);
context.register_handle(mempool_handle);
let (outbound_tx_sender, outbound_tx_stream) = mpsc::unbounded();
let (outbound_request_sender_service, outbound_request_stream) = reply_channel::unbounded();
let (local_request_sender_service, local_request_stream) = reply_channel::unbounded();
let (mempool_state_event_publisher, _) = broadcast::channel(100);
let outbound_mp_interface =
OutboundMempoolServiceInterface::new(outbound_request_sender_service, outbound_tx_sender);
let local_mp_interface =
LocalMempoolService::new(local_request_sender_service, mempool_state_event_publisher.clone());
let config = self.config;
let inbound_handlers = MempoolInboundHandlers::new(
mempool_state_event_publisher,
self.mempool.clone(),
outbound_mp_interface.clone(),
);
context.register_handle(outbound_mp_interface);
context.register_handle(local_mp_interface);
context.spawn_when_ready(move |handles| async move {
let outbound_message_service = handles.expect_handle::<Dht>().outbound_requester();
let state_machine = handles.expect_handle::<StateMachineHandle>();
let base_node = handles.expect_handle::<LocalNodeCommsInterface>();
let streams = MempoolStreams {
outbound_request_stream,
outbound_tx_stream,
inbound_request_stream,
inbound_response_stream,
inbound_transaction_stream,
local_request_stream,
block_event_stream: base_node.get_block_event_stream(),
request_receiver,
};
let service =
MempoolService::new(outbound_message_service, inbound_handlers, config, state_machine).start(streams);
futures::pin_mut!(service);
future::select(service, handles.get_shutdown_signal()).await;
info!(target: LOG_TARGET, "Mempool Service shutdown");
});
future::ready(Ok(()))
}
}