use std::{sync::Arc, time::Duration};
use tari_shutdown::ShutdownSignal;
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{broadcast, mpsc},
};
use crate::{
backoff::ConstantBackoff,
connection_manager::{ConnectionManager, ConnectionManagerConfig, ConnectionManagerRequester},
multiplexing::Substream,
peer_manager::{NodeIdentity, PeerFeatures, PeerManager},
peer_validator::PeerValidatorConfig,
protocol::Protocols,
transports::Transport,
};
#[derive(Clone, Debug)]
pub struct TestNodeConfig {
pub dial_backoff_duration: Duration,
pub connection_manager_config: ConnectionManagerConfig,
pub node_identity: Arc<NodeIdentity>,
}
impl Default for TestNodeConfig {
fn default() -> Self {
let node_identity = Arc::new(NodeIdentity::random(
&mut rand::rng(),
"/memory/0".parse().unwrap(),
PeerFeatures::COMMUNICATION_NODE,
));
Self {
connection_manager_config: ConnectionManagerConfig {
peer_validation_config: PeerValidatorConfig {
allow_test_addresses: true,
..Default::default()
},
listener_address: "/memory/0".parse().unwrap(),
..Default::default()
},
dial_backoff_duration: Duration::from_millis(200),
node_identity,
}
}
}
pub fn build_connection_manager<TTransport>(
config: TestNodeConfig,
transport: TTransport,
peer_manager: Arc<PeerManager>,
protocols: Protocols<Substream>,
shutdown: ShutdownSignal,
) -> ConnectionManagerRequester
where
TTransport: Transport + Unpin + Send + Sync + Clone + 'static,
TTransport::Output: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
{
let (request_tx, request_rx) = mpsc::channel(10);
let (event_tx, _) = broadcast::channel(100);
let requester = ConnectionManagerRequester::new(request_tx, event_tx.clone());
let mut connection_manager = ConnectionManager::new(
config.connection_manager_config,
transport,
ConstantBackoff::new(config.dial_backoff_duration),
request_rx,
config.node_identity,
peer_manager,
event_tx,
shutdown,
);
connection_manager.add_protocols(protocols);
connection_manager.spawn();
requester
}