mod comms_node;
pub use comms_node::{CommsNode, UnspawnedCommsNode};
mod shutdown;
pub use shutdown::CommsShutdown;
mod error;
pub use error::CommsBuilderError;
mod consts;
#[cfg(test)]
mod tests;
use std::{sync::Arc, time::Duration};
use tari_shutdown::ShutdownSignal;
use tokio::sync::{broadcast, mpsc};
use crate::{
backoff::{Backoff, BoxedBackoff, ConstantBackoff},
connection_manager::{ConnectionManagerConfig, ConnectionManagerRequester},
connectivity::{ConnectivityConfig, ConnectivityRequester},
multiaddr::Multiaddr,
net_address::MultiaddrRange,
peer_manager::{NodeIdentity, PeerManager},
peer_validator::PeerValidatorConfig,
protocol::{NodeNetworkInfo, ProtocolExtensions},
tor,
types::{CommsDatabase, TransportProtocol},
};
pub struct CommsBuilder {
peer_storage: Option<CommsDatabase>,
node_identity: Option<Arc<NodeIdentity>>,
dial_backoff: BoxedBackoff,
hidden_service_ctl: Option<tor::HiddenServiceController>,
connection_manager_config: ConnectionManagerConfig,
connectivity_config: ConnectivityConfig,
shutdown_signal: Option<ShutdownSignal>,
maintain_n_closest_connections_only: Option<usize>,
transport_protocols: Vec<TransportProtocol>,
}
impl Default for CommsBuilder {
fn default() -> Self {
Self {
peer_storage: None,
node_identity: None,
dial_backoff: Box::new(ConstantBackoff::new(Duration::from_millis(500))),
hidden_service_ctl: None,
connection_manager_config: ConnectionManagerConfig::default(),
connectivity_config: ConnectivityConfig::default(),
shutdown_signal: None,
maintain_n_closest_connections_only: None,
transport_protocols: TransportProtocol::get_all(),
}
}
}
impl CommsBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_node_identity(mut self, node_identity: Arc<NodeIdentity>) -> Self {
self.node_identity = Some(node_identity);
self
}
pub fn with_connection_pool_refresh_interval(mut self, interval: Duration) -> Self {
self.connectivity_config.connection_pool_refresh_interval = interval;
self
}
pub fn with_max_seed_peer_age(mut self, max_age: Duration) -> Self {
self.connectivity_config.max_seed_peer_age = max_age;
self
}
pub fn node_identity(&self) -> Option<Arc<NodeIdentity>> {
self.node_identity.clone()
}
pub fn with_shutdown_signal(mut self, shutdown_signal: ShutdownSignal) -> Self {
self.shutdown_signal = Some(shutdown_signal);
self
}
pub fn with_user_agent<T: ToString>(mut self, user_agent: &T) -> Self {
self.connection_manager_config.network_info.user_agent = user_agent.to_string();
self
}
pub fn with_network_byte(mut self, network_byte: u8) -> Self {
self.connection_manager_config.network_info.network_wire_byte = network_byte;
self
}
pub fn with_node_info(mut self, node_info: NodeNetworkInfo) -> Self {
self.connection_manager_config.network_info = node_info;
self
}
pub fn with_node_version(mut self, major_version: u8, minor_version: u8) -> Self {
self.connection_manager_config.network_info.major_version = major_version;
self.connection_manager_config.network_info.minor_version = minor_version;
self
}
pub fn allow_test_addresses(mut self) -> Self {
#[cfg(not(debug_assertions))]
log::warn!(
target: "comms::builder",
"Test addresses are enabled! This is invalid and potentially insecure when running a production node."
);
self.connection_manager_config
.peer_validation_config
.allow_test_addresses = true;
self
}
pub fn with_peer_validator_config(mut self, config: PeerValidatorConfig) -> Self {
#[cfg(not(debug_assertions))]
if config.allow_test_addresses {
log::warn!(
target: "comms::builder",
"Test addresses are enabled! This is invalid and potentially insecure when running a production node."
);
}
self.connection_manager_config.peer_validation_config = config;
self
}
pub fn peer_validator_config(&self) -> &PeerValidatorConfig {
&self.connection_manager_config.peer_validation_config
}
pub fn with_listener_address(mut self, listener_address: Multiaddr) -> Self {
self.connection_manager_config.listener_address = listener_address;
self
}
pub fn with_auxiliary_tcp_listener_address(mut self, listener_address: Multiaddr) -> Self {
self.connection_manager_config.auxiliary_tcp_listener_address = Some(listener_address);
self
}
pub fn with_listener_liveness_max_sessions(mut self, max_sessions: usize) -> Self {
self.connection_manager_config.liveness_max_sessions = max_sessions;
self
}
pub fn with_excluded_dial_addresses(mut self, excluded_addresses: Vec<MultiaddrRange>) -> Self {
self.connection_manager_config.excluded_dial_addresses = excluded_addresses;
self
}
pub fn with_listener_liveness_allowlist_cidrs(mut self, cidrs: Vec<cidr::AnyIpCidr>) -> Self {
self.connection_manager_config.liveness_cidr_allowlist = cidrs;
self
}
pub fn with_max_simultaneous_inbound_connects(mut self, max_simultaneous_inbound_connects: usize) -> Self {
self.connection_manager_config.max_simultaneous_inbound_connects = max_simultaneous_inbound_connects;
self
}
pub fn with_max_dial_attempts(mut self, max_dial_attempts: usize) -> Self {
self.connection_manager_config.max_dial_attempts = max_dial_attempts;
self
}
pub fn with_min_connectivity(mut self, min_connectivity: usize) -> Self {
self.connectivity_config.min_connectivity = min_connectivity;
self
}
pub fn disable_connection_reaping(mut self) -> Self {
self.connectivity_config.is_connection_reaping_enabled = false;
self
}
pub fn with_peer_storage(mut self, peer_storage: CommsDatabase) -> Self {
self.peer_storage = Some(peer_storage);
self
}
pub fn with_dial_backoff<T>(mut self, backoff: T) -> Self
where T: Backoff + Send + Sync + 'static {
self.dial_backoff = Box::new(backoff);
self
}
pub fn set_self_liveness_check(mut self, check_interval: Option<Duration>) -> Self {
self.connection_manager_config.self_liveness_self_check_interval = check_interval;
self
}
pub fn with_minimize_connections(mut self, connections: Option<usize>) -> Self {
self.maintain_n_closest_connections_only = connections;
self.connectivity_config.maintain_n_closest_connections_only = connections;
if let Some(val) = connections {
self.connectivity_config.reaper_min_connection_threshold = val;
}
self
}
pub fn with_transport_protocols(mut self, protocols: Vec<TransportProtocol>) -> Self {
self.transport_protocols = protocols;
self
}
fn make_peer_manager(&mut self) -> Result<Arc<PeerManager>, CommsBuilderError> {
match self.peer_storage.take() {
Some(storage) => {
let peer_manager = PeerManager::new(storage, self.transport_protocols.clone())
.map_err(CommsBuilderError::PeerManagerError)?;
Ok(Arc::new(peer_manager))
},
None => Err(CommsBuilderError::PeerStorageNotProvided),
}
}
pub fn build(mut self) -> Result<UnspawnedCommsNode, CommsBuilderError> {
let node_identity = self.node_identity.take().ok_or(CommsBuilderError::NodeIdentityNotSet)?;
let shutdown_signal = self
.shutdown_signal
.take()
.ok_or(CommsBuilderError::ShutdownSignalNotSet)?;
let peer_manager = self.make_peer_manager()?;
let (conn_man_tx, connection_manager_request_rx) =
mpsc::channel(consts::CONNECTION_MANAGER_REQUEST_BUFFER_SIZE);
let (connection_manager_event_tx, _) = broadcast::channel(consts::CONNECTION_MANAGER_EVENTS_BUFFER_SIZE);
let connection_manager_requester = ConnectionManagerRequester::new(conn_man_tx, connection_manager_event_tx);
let (connectivity_tx, connectivity_rx) = mpsc::channel(consts::CONNECTIVITY_MANAGER_REQUEST_BUFFER_SIZE);
let (event_tx, _) = broadcast::channel(consts::CONNECTIVITY_MANAGER_EVENTS_BUFFER_SIZE);
let connectivity_requester = ConnectivityRequester::new(connectivity_tx, event_tx);
Ok(UnspawnedCommsNode {
protocols: Default::default(),
node_identity,
connection_manager_requester,
connection_manager_request_rx,
shutdown_signal,
builder: self,
connectivity_requester,
connectivity_rx,
peer_manager,
protocol_extensions: ProtocolExtensions::new(),
})
}
}