use super::{placeholder::PlaceholderService, CommsBuilderError, CommsShutdown};
use crate::{
backoff::BoxedBackoff,
bounded_executor::BoundedExecutor,
builder::consts,
connection_manager::{ConnectionManager, ConnectionManagerEvent, ConnectionManagerRequester},
connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequester},
message::InboundMessage,
multiaddr::Multiaddr,
multiplexing::Substream,
peer_manager::{NodeIdentity, PeerManager},
pipeline,
protocol::{messaging, messaging::MessagingProtocol, ProtocolNotificationTx, Protocols},
runtime,
runtime::task,
tor,
transports::Transport,
};
use futures::{channel::mpsc, AsyncRead, AsyncWrite, StreamExt};
use log::*;
use std::{fmt, sync::Arc, time::Duration};
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{sync::broadcast, time};
use tower::Service;
const LOG_TARGET: &str = "comms::node";
pub struct BuiltCommsNode<
TTransport,
TInPipe = PlaceholderService<InboundMessage, (), ()>,
TOutPipe = PlaceholderService<(), (), ()>,
TOutReq = (),
> {
pub connection_manager: ConnectionManager<TTransport, BoxedBackoff>,
pub connection_manager_requester: ConnectionManagerRequester,
pub connection_manager_event_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
pub connectivity_manager: ConnectivityManager,
pub connectivity_requester: ConnectivityRequester,
pub messaging_pipeline: Option<pipeline::Config<TInPipe, TOutPipe, TOutReq>>,
pub node_identity: Arc<NodeIdentity>,
pub hidden_service: Option<tor::HiddenService>,
pub peer_manager: Arc<PeerManager>,
pub protocols: Protocols<Substream>,
pub shutdown: Shutdown,
}
impl<TTransport, TInPipe, TOutPipe, TOutReq> BuiltCommsNode<TTransport, TInPipe, TOutPipe, TOutReq>
where
TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
TTransport::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
TOutPipe: Service<TOutReq, Response = ()> + Clone + Send + 'static,
TOutPipe::Error: fmt::Debug + Send,
TOutPipe::Future: Send + 'static,
TInPipe: Service<InboundMessage> + Clone + Send + 'static,
TInPipe::Error: fmt::Debug + Send,
TInPipe::Future: Send + 'static,
TOutReq: Send + 'static,
{
pub fn with_messaging_pipeline<I, O, R>(
self,
messaging_pipeline: pipeline::Config<I, O, R>,
) -> BuiltCommsNode<TTransport, I, O, R>
where
O: Service<R, Response = ()> + Clone + Send + 'static,
O::Error: fmt::Debug + Send,
O::Future: Send + 'static,
I: Service<InboundMessage> + Clone + Send + 'static,
I::Error: fmt::Debug + Send,
I::Future: Send + 'static,
{
BuiltCommsNode {
messaging_pipeline: Some(messaging_pipeline),
connection_manager: self.connection_manager,
connection_manager_requester: self.connection_manager_requester,
connection_manager_event_tx: self.connection_manager_event_tx,
connectivity_manager: self.connectivity_manager,
connectivity_requester: self.connectivity_requester,
node_identity: self.node_identity,
shutdown: self.shutdown,
protocols: self.protocols,
hidden_service: self.hidden_service,
peer_manager: self.peer_manager,
}
}
async fn wait_listening(
mut events: broadcast::Receiver<Arc<ConnectionManagerEvent>>,
) -> Result<Multiaddr, CommsBuilderError> {
loop {
let event = time::timeout(Duration::from_secs(10), events.next())
.await
.map_err(|_| CommsBuilderError::ConnectionManagerEventStreamTimeout)?
.ok_or(CommsBuilderError::ConnectionManagerEventStreamClosed)?
.map_err(|_| CommsBuilderError::ConnectionManagerEventStreamLagged)?;
match &*event {
ConnectionManagerEvent::Listening(addr) => return Ok(addr.clone()),
ConnectionManagerEvent::ListenFailed(err) => return Err(err.clone().into()),
_ => {},
}
}
}
pub async fn spawn(self) -> Result<CommsNode, CommsBuilderError> {
let BuiltCommsNode {
mut connection_manager,
connection_manager_requester,
connection_manager_event_tx,
connectivity_manager,
connectivity_requester,
messaging_pipeline,
node_identity,
shutdown,
peer_manager,
mut protocols,
hidden_service,
} = self;
info!(target: LOG_TARGET, "Hello from comms!");
info!(
target: LOG_TARGET,
"Your node's public key is '{}'",
node_identity.public_key()
);
info!(
target: LOG_TARGET,
"Your node's network ID is '{}'",
node_identity.node_id()
);
info!(
target: LOG_TARGET,
"Your node's public address is '{}'",
node_identity.public_address()
);
let mut complete_signals = Vec::new();
let events_stream = connection_manager_event_tx.subscribe();
complete_signals.push(connection_manager.complete_signal());
task::spawn(connectivity_manager.create().run());
let mut messaging_event_tx = None;
if let Some(messaging_pipeline) = messaging_pipeline {
let (messaging, notifier, messaging_request_tx, inbound_message_rx, messaging_event_sender) =
initialize_messaging(connection_manager_requester.clone(), shutdown.to_signal());
messaging_event_tx = Some(messaging_event_sender);
protocols.add(&[messaging::MESSAGING_PROTOCOL.clone()], notifier);
complete_signals.push(messaging.complete_signal());
task::spawn(messaging.run());
let bounded_executor =
BoundedExecutor::new(runtime::current(), messaging_pipeline.max_concurrent_inbound_tasks);
let inbound = pipeline::Inbound::new(
bounded_executor,
inbound_message_rx,
messaging_pipeline.inbound,
shutdown.to_signal(),
);
task::spawn(inbound.run());
let outbound =
pipeline::Outbound::new(runtime::current(), messaging_pipeline.outbound, messaging_request_tx);
task::spawn(outbound.run());
}
connection_manager.set_protocols(protocols);
task::spawn(connection_manager.run());
let listening_addr = Self::wait_listening(events_stream).await?;
Ok(CommsNode {
shutdown,
connection_manager_event_tx,
connection_manager_requester,
connectivity_requester,
listening_addr,
node_identity,
peer_manager,
messaging_event_tx: messaging_event_tx.unwrap_or_else(|| broadcast::channel(1).0),
hidden_service,
complete_signals,
})
}
pub fn peer_manager(&self) -> Arc<PeerManager> {
Arc::clone(&self.peer_manager)
}
pub fn node_identity(&self) -> Arc<NodeIdentity> {
Arc::clone(&self.node_identity)
}
pub fn connection_manager_requester(&self) -> ConnectionManagerRequester {
self.connection_manager_requester.clone()
}
pub fn connectivity(&self) -> ConnectivityRequester {
self.connectivity_requester.clone()
}
pub fn shutdown_signal(&self) -> ShutdownSignal {
self.shutdown.to_signal()
}
}
pub struct CommsNode {
shutdown: Shutdown,
connection_manager_event_tx: broadcast::Sender<Arc<ConnectionManagerEvent>>,
connection_manager_requester: ConnectionManagerRequester,
connectivity_requester: ConnectivityRequester,
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
messaging_event_tx: messaging::MessagingEventSender,
listening_addr: Multiaddr,
hidden_service: Option<tor::HiddenService>,
complete_signals: Vec<ShutdownSignal>,
}
impl CommsNode {
pub fn subscribe_connection_manager_events(&self) -> broadcast::Receiver<Arc<ConnectionManagerEvent>> {
self.connection_manager_event_tx.subscribe()
}
pub fn subscribe_connectivity_events(&self) -> ConnectivityEventRx {
self.connectivity_requester.subscribe_event_stream()
}
pub fn subscribe_messaging_events(&self) -> messaging::MessagingEventReceiver {
self.messaging_event_tx.subscribe()
}
pub fn peer_manager(&self) -> Arc<PeerManager> {
Arc::clone(&self.peer_manager)
}
pub fn node_identity(&self) -> Arc<NodeIdentity> {
Arc::clone(&self.node_identity)
}
pub fn node_identity_ref(&self) -> &NodeIdentity {
&self.node_identity
}
pub fn listening_address(&self) -> &Multiaddr {
&self.listening_addr
}
pub fn hidden_service(&self) -> Option<&tor::HiddenService> {
self.hidden_service.as_ref()
}
pub fn connection_manager(&self) -> ConnectionManagerRequester {
self.connection_manager_requester.clone()
}
pub fn connectivity(&self) -> ConnectivityRequester {
self.connectivity_requester.clone()
}
pub fn shutdown_signal(&self) -> ShutdownSignal {
self.shutdown.to_signal()
}
pub fn shutdown(mut self) -> CommsShutdown {
info!(target: LOG_TARGET, "Comms is shutting down");
self.shutdown.trigger().expect("Shutdown failed to trigger signal");
CommsShutdown::new(self.complete_signals)
}
}
fn initialize_messaging(
connection_manager_requester: ConnectionManagerRequester,
shutdown_signal: ShutdownSignal,
) -> (
messaging::MessagingProtocol,
ProtocolNotificationTx<Substream>,
mpsc::Sender<messaging::MessagingRequest>,
mpsc::Receiver<InboundMessage>,
messaging::MessagingEventSender,
)
{
let (proto_tx, proto_rx) = mpsc::channel(consts::MESSAGING_PROTOCOL_EVENTS_BUFFER_SIZE);
let (messaging_request_tx, messaging_request_rx) = mpsc::channel(consts::MESSAGING_REQUEST_BUFFER_SIZE);
let (inbound_message_tx, inbound_message_rx) = mpsc::channel(consts::INBOUND_MESSAGE_BUFFER_SIZE);
let (event_tx, _) = broadcast::channel(consts::MESSAGING_EVENTS_BUFFER_SIZE);
let messaging = MessagingProtocol::new(
Default::default(),
connection_manager_requester,
proto_rx,
messaging_request_rx,
event_tx.clone(),
inbound_message_tx,
shutdown_signal,
);
(messaging, proto_tx, messaging_request_tx, inbound_message_rx, event_tx)
}