use std::{iter, sync::Arc};
use log::*;
use tari_shutdown::ShutdownSignal;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{broadcast, mpsc, watch},
};
use super::{CommsBuilderError, CommsShutdown};
use crate::{
CommsBuilder,
Substream,
connection_manager::{
ConnectionManager,
ConnectionManagerEvent,
ConnectionManagerRequest,
ConnectionManagerRequester,
SelfLivenessCheck,
SelfLivenessStatus,
},
connectivity::{ConnectivityEventRx, ConnectivityManager, ConnectivityRequest, ConnectivityRequester},
multiaddr::Multiaddr,
peer_manager::{NodeIdentity, PeerManager},
protocol::{
ProtocolExtension,
ProtocolExtensionContext,
ProtocolExtensions,
ProtocolId,
ProtocolNotificationTx,
Protocols,
},
tor,
transports::Transport,
};
const LOG_TARGET: &str = "comms::node";
pub struct UnspawnedCommsNode {
pub(super) node_identity: Arc<NodeIdentity>,
pub(super) builder: CommsBuilder,
pub(super) connection_manager_request_rx: mpsc::Receiver<ConnectionManagerRequest>,
pub(super) connection_manager_requester: ConnectionManagerRequester,
pub(super) connectivity_requester: ConnectivityRequester,
pub(super) connectivity_rx: mpsc::Receiver<ConnectivityRequest>,
pub(super) peer_manager: Arc<PeerManager>,
pub(super) protocol_extensions: ProtocolExtensions,
pub(super) protocols: Protocols<Substream>,
pub(super) shutdown_signal: ShutdownSignal,
}
impl UnspawnedCommsNode {
#[cfg(feature = "rpc")]
pub fn add_rpc_server<T: ProtocolExtension + 'static>(mut self, rpc: T) -> Self {
self.protocol_extensions.add(rpc);
self
}
pub fn add_protocol_extensions(mut self, extensions: ProtocolExtensions) -> Self {
self.protocol_extensions.extend(extensions);
self
}
pub fn add_protocol_extension<T: ProtocolExtension + 'static>(mut self, extension: T) -> Self {
self.protocol_extensions.add(extension);
self
}
pub fn add_protocol<I: AsRef<[ProtocolId]>>(
mut self,
protocol: I,
notifier: &ProtocolNotificationTx<Substream>,
) -> Self {
self.protocols.add(protocol, notifier);
self
}
pub fn with_listener_address(mut self, listener_address: Multiaddr) -> Self {
self.builder = self.builder.with_listener_address(listener_address);
self
}
pub fn with_hidden_service_controller(mut self, hidden_service_ctl: tor::HiddenServiceController) -> Self {
self.builder.hidden_service_ctl = Some(hidden_service_ctl);
self
}
#[allow(clippy::too_many_lines)]
pub async fn spawn_with_transport<TTransport>(self, transport: TTransport) -> Result<CommsNode, CommsBuilderError>
where
TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
TTransport::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let UnspawnedCommsNode {
builder,
connection_manager_request_rx,
connection_manager_requester,
connectivity_requester,
connectivity_rx,
node_identity,
shutdown_signal,
peer_manager,
protocol_extensions,
protocols,
} = self;
let CommsBuilder {
dial_backoff,
connection_manager_config,
connectivity_config,
..
} = builder;
let connectivity_manager = ConnectivityManager {
config: connectivity_config,
request_rx: connectivity_rx,
event_tx: connectivity_requester.get_event_publisher(),
connection_manager: connection_manager_requester.clone(),
node_identity: node_identity.clone(),
peer_manager: peer_manager.clone(),
shutdown_signal: shutdown_signal.clone(),
};
let mut ext_context = ProtocolExtensionContext::new(
connectivity_requester.clone(),
peer_manager.clone(),
shutdown_signal.clone(),
);
debug!(
target: LOG_TARGET,
"Installing {} protocol extension(s)",
protocol_extensions.len()
);
protocol_extensions.install_all(&mut ext_context)?;
let mut connection_manager = ConnectionManager::new(
connection_manager_config.clone(),
transport.clone(),
dial_backoff,
connection_manager_request_rx,
node_identity.clone(),
peer_manager.clone(),
connection_manager_requester.get_event_publisher(),
shutdown_signal.clone(),
);
ext_context.register_complete_signal(connection_manager.complete_signal());
connection_manager.add_protocols(ext_context.take_protocols().expect("Protocols already taken"));
connection_manager.add_protocols(protocols);
connectivity_manager.spawn();
connection_manager.spawn();
trace!(target: LOG_TARGET, "Hello from comms!");
info!(
target: LOG_TARGET,
"Your node's public key is '{}'",
node_identity.public_key()
);
info!(
target: LOG_TARGET,
"Your node's network ID is '{}'",
node_identity.node_id()
);
info!(
target: LOG_TARGET,
"Your node's public addresses are '{}'",
node_identity
.public_addresses()
.iter()
.map(|a| a.to_string())
.collect::<Vec<_>>()
.join(", ")
);
let public_addresses = node_identity.public_addresses();
let liveness_watch = if public_addresses.is_empty() {
watch::channel(SelfLivenessStatus::Disabled).1
} else {
connection_manager_config
.self_liveness_self_check_interval
.map(|interval| {
SelfLivenessCheck::spawn(transport, public_addresses, interval, shutdown_signal.clone())
})
.unwrap_or_else(|| watch::channel(SelfLivenessStatus::Disabled).1)
};
Ok(CommsNode {
shutdown_signal,
connection_manager_requester,
connectivity_requester,
node_identity,
peer_manager,
liveness_watch,
complete_signals: ext_context.drain_complete_signals(),
})
}
pub fn peer_manager(&self) -> Arc<PeerManager> {
Arc::clone(&self.peer_manager)
}
pub fn node_identity(&self) -> Arc<NodeIdentity> {
Arc::clone(&self.node_identity)
}
pub fn connectivity(&self) -> ConnectivityRequester {
self.connectivity_requester.clone()
}
pub fn shutdown_signal(&self) -> ShutdownSignal {
self.shutdown_signal.clone()
}
}
#[derive(Clone)]
pub struct CommsNode {
shutdown_signal: ShutdownSignal,
connection_manager_requester: ConnectionManagerRequester,
connectivity_requester: ConnectivityRequester,
node_identity: Arc<NodeIdentity>,
peer_manager: Arc<PeerManager>,
liveness_watch: watch::Receiver<SelfLivenessStatus>,
complete_signals: Vec<ShutdownSignal>,
}
impl CommsNode {
pub fn subscribe_connection_manager_events(&self) -> broadcast::Receiver<Arc<ConnectionManagerEvent>> {
self.connection_manager_requester.get_event_subscription()
}
pub fn connection_manager_requester(&mut self) -> &mut ConnectionManagerRequester {
&mut self.connection_manager_requester
}
pub fn subscribe_connectivity_events(&self) -> ConnectivityEventRx {
self.connectivity_requester.get_event_subscription()
}
pub fn peer_manager(&self) -> Arc<PeerManager> {
Arc::clone(&self.peer_manager)
}
pub fn node_identity(&self) -> Arc<NodeIdentity> {
Arc::clone(&self.node_identity)
}
pub fn node_identity_ref(&self) -> &NodeIdentity {
&self.node_identity
}
pub fn liveness_status(&self) -> SelfLivenessStatus {
*self.liveness_watch.borrow()
}
pub fn connectivity(&self) -> ConnectivityRequester {
self.connectivity_requester.clone()
}
pub fn shutdown_signal(&self) -> ShutdownSignal {
self.shutdown_signal.clone()
}
pub fn wait_until_shutdown(self) -> CommsShutdown {
CommsShutdown::new(iter::once(self.shutdown_signal).chain(self.complete_signals))
}
}