use crate::{
base_node::{
comms_interface::{InboundNodeCommsHandlers, LocalNodeCommsInterface, OutboundNodeCommsInterface},
service::service::{BaseNodeService, BaseNodeServiceConfig, BaseNodeStreams},
StateMachineHandle,
},
blocks::NewBlock,
chain_storage::{async_db::AsyncBlockchainDb, BlockchainBackend},
consensus::ConsensusManager,
mempool::Mempool,
proto as shared_protos,
proto::base_node as proto,
};
use futures::{channel::mpsc, future, Future, Stream, StreamExt};
use log::*;
use std::{convert::TryFrom, sync::Arc};
use tari_comms_dht::Dht;
use tari_p2p::{
comms_connector::{PeerMessage, SubscriptionFactory},
domain_message::DomainMessage,
services::utils::{map_decode, ok_or_skip_result},
tari_message::TariMessageType,
};
use tari_service_framework::{
reply_channel,
ServiceInitializationError,
ServiceInitializer,
ServiceInitializerContext,
};
use tokio::sync::broadcast;
const LOG_TARGET: &str = "c::bn::service::initializer";
const SUBSCRIPTION_LABEL: &str = "Base Node";
pub struct BaseNodeServiceInitializer<T> {
inbound_message_subscription_factory: Arc<SubscriptionFactory>,
blockchain_db: AsyncBlockchainDb<T>,
mempool: Mempool,
consensus_manager: ConsensusManager,
config: BaseNodeServiceConfig,
}
impl<T> BaseNodeServiceInitializer<T>
where T: BlockchainBackend
{
pub fn new(
inbound_message_subscription_factory: Arc<SubscriptionFactory>,
blockchain_db: AsyncBlockchainDb<T>,
mempool: Mempool,
consensus_manager: ConsensusManager,
config: BaseNodeServiceConfig,
) -> Self
{
Self {
inbound_message_subscription_factory,
blockchain_db,
mempool,
consensus_manager,
config,
}
}
fn inbound_request_stream(&self) -> impl Stream<Item = DomainMessage<proto::BaseNodeServiceRequest>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::BaseNodeRequest, SUBSCRIPTION_LABEL)
.map(map_decode::<proto::BaseNodeServiceRequest>)
.filter_map(ok_or_skip_result)
}
fn inbound_response_stream(&self) -> impl Stream<Item = DomainMessage<proto::BaseNodeServiceResponse>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::BaseNodeResponse, SUBSCRIPTION_LABEL)
.map(map_decode::<proto::BaseNodeServiceResponse>)
.filter_map(ok_or_skip_result)
}
fn inbound_block_stream(&self) -> impl Stream<Item = DomainMessage<NewBlock>> {
self.inbound_message_subscription_factory
.get_subscription(TariMessageType::NewBlock, SUBSCRIPTION_LABEL)
.filter_map(extract_block)
}
}
async fn extract_block(msg: Arc<PeerMessage>) -> Option<DomainMessage<NewBlock>> {
match msg.decode_message::<shared_protos::core::NewBlock>() {
Err(e) => {
warn!(
target: LOG_TARGET,
"Could not decode inbound block message. {}",
e.to_string()
);
None
},
Ok(new_block) => {
let block = match NewBlock::try_from(new_block) {
Err(e) => {
let origin = &msg.source_peer.node_id;
warn!(
target: LOG_TARGET,
"Inbound block message from {} was ill-formed. {}", origin, e
);
return None;
},
Ok(b) => b,
};
Some(DomainMessage {
source_peer: msg.source_peer.clone(),
dht_header: msg.dht_header.clone(),
authenticated_origin: msg.authenticated_origin.clone(),
inner: block,
})
},
}
}
impl<T> ServiceInitializer for BaseNodeServiceInitializer<T>
where T: BlockchainBackend + 'static
{
type Future = impl Future<Output = Result<(), ServiceInitializationError>>;
fn initialize(&mut self, context: ServiceInitializerContext) -> Self::Future {
let inbound_request_stream = self.inbound_request_stream();
let inbound_response_stream = self.inbound_response_stream();
let inbound_block_stream = self.inbound_block_stream();
let (outbound_request_sender_service, outbound_request_stream) = reply_channel::unbounded();
let (outbound_block_sender_service, outbound_block_stream) = mpsc::unbounded();
let (local_request_sender_service, local_request_stream) = reply_channel::unbounded();
let (local_block_sender_service, local_block_stream) = reply_channel::unbounded();
let outbound_nci =
OutboundNodeCommsInterface::new(outbound_request_sender_service, outbound_block_sender_service);
let (block_event_sender, _) = broadcast::channel(50);
let local_nci = LocalNodeCommsInterface::new(
local_request_sender_service,
local_block_sender_service,
block_event_sender.clone(),
);
let inbound_nch = InboundNodeCommsHandlers::new(
block_event_sender,
self.blockchain_db.clone(),
self.mempool.clone(),
self.consensus_manager.clone(),
outbound_nci.clone(),
);
let config = self.config;
context.register_handle(outbound_nci);
context.register_handle(local_nci);
context.spawn_when_ready(move |handles| async move {
let dht = handles.expect_handle::<Dht>();
let outbound_message_service = dht.outbound_requester();
let state_machine = handles.expect_handle::<StateMachineHandle>();
let streams = BaseNodeStreams {
outbound_request_stream,
outbound_block_stream,
inbound_request_stream,
inbound_response_stream,
inbound_block_stream,
local_request_stream,
local_block_stream,
};
let service =
BaseNodeService::new(outbound_message_service, inbound_nch, config, state_machine).start(streams);
futures::pin_mut!(service);
future::select(service, handles.get_shutdown_signal()).await;
info!(target: LOG_TARGET, "Base Node Service shutdown");
});
future::ready(Ok(()))
}
}