#![recursion_limit = "256"]
#[allow(dead_code)]
mod common;
use common::{node::*, test_peer::TestPeer};
use snarkos_node::{Client, Prover, Validator};
use snarkos_node_network::PeerPoolHandling;
use snarkos_node_tcp::{ConnectError, P2P};
use snarkvm::prelude::{MainnetV0 as CurrentNetwork, store::helpers::memory::ConsensusMemory};
use pea2pea::Pea2Pea;
use std::{net::SocketAddr, time::Duration};
use tokio::time::sleep;
#[async_trait::async_trait]
trait Connect {
fn listening_addr(&self) -> SocketAddr;
async fn connect(&self, target: SocketAddr) -> Result<(), ConnectError>;
}
macro_rules! impl_connect {
($($node_type:ident),*) => {
$(
#[async_trait::async_trait]
impl Connect for $node_type<CurrentNetwork, ConsensusMemory<CurrentNetwork>> {
fn listening_addr(&self) -> SocketAddr {
self.tcp().listening_addr().expect("node listener should exist")
}
async fn connect(&self, target: SocketAddr) -> Result<(), ConnectError>
where
Self: P2P,
{
self.tcp().connect(target).await
}
}
)*
};
}
impl_connect!(Client, Prover, Validator);
#[async_trait::async_trait]
impl Connect for TestPeer
where
Self: Pea2Pea,
{
fn listening_addr(&self) -> SocketAddr {
self.node().listening_addr().expect("node listener should exist")
}
async fn connect(&self, target: SocketAddr) -> Result<(), ConnectError> {
self.node().connect(target).await.map_err(|err| err.into())
}
}
async fn assert_connect<T, U>(initiator: T, responder: U)
where
T: Connect,
U: Connect,
{
initiator.connect(responder.listening_addr()).await.unwrap()
}
macro_rules! test_handshake {
($node_type:ident, $peer_type:ident, $is_initiator:expr, $($attr:meta)?) => {
#[tokio::test]
$(#[$attr])?
async fn $peer_type() {
let node = $crate::$node_type().await;
let peer = $crate::common::test_peer::TestPeer::$peer_type().await;
if $is_initiator {
$crate::assert_connect(node, peer).await;
} else {
$crate::assert_connect(peer, node).await;
};
}
};
($($node_type:ident -> $peer_type:ident $(= $attr:meta)?),*) => {
mod handshake_initiator_side {
$(
test_handshake!($node_type, $peer_type, true, $($attr)?);
)*
}
};
($($node_type:ident <- $peer_type:ident $(= $attr:meta)?),*) => {
mod handshake_responder_side {
$(
test_handshake!($node_type, $peer_type, false, $($attr)?);
)*
}
};
}
mod client {
test_handshake! {
client -> client,
client -> prover
}
test_handshake! {
client <- client,
client <- prover
}
}
mod prover {
test_handshake! {
prover -> client,
prover -> prover
}
test_handshake! {
prover <- client,
prover <- prover
}
}
#[tokio::test(flavor = "multi_thread")]
async fn simultaneous_connection_attempt() {
let node1 = validator().await;
let addr1 = node1.listening_addr();
let node2 = validator().await;
let addr2 = node2.listening_addr();
let node1_clone = node1.clone();
let conn1 = tokio::spawn(async move {
if let Ok(conn_task) = node1_clone.router().connect(addr2) { conn_task.await.unwrap().is_ok() } else { false }
});
let node2_clone = node2.clone();
let conn2 = tokio::spawn(async move {
if let Ok(conn_task) = node2_clone.router().connect(addr1) { conn_task.await.unwrap().is_ok() } else { false }
});
let (result1, result2) = tokio::join!(conn1, conn2);
sleep(Duration::from_millis(200)).await;
let mut successes = 0;
if result1.unwrap() {
successes += 1;
}
if result2.unwrap() {
successes += 1;
}
let tcp_connected1 = node1.tcp().num_connected();
let tcp_connected2 = node2.tcp().num_connected();
let router_connected1 = node1.router().number_of_connected_peers();
let router_connected2 = node2.router().number_of_connected_peers();
assert!(successes <= 1);
if successes == 0 {
assert_eq!(tcp_connected1, 0);
assert_eq!(tcp_connected2, 0);
assert_eq!(router_connected1, 0);
assert_eq!(router_connected2, 0);
} else {
assert_eq!(tcp_connected1, 1);
assert_eq!(tcp_connected2, 1);
assert_eq!(router_connected1, 1);
assert_eq!(router_connected2, 1);
}
}
#[tokio::test(flavor = "multi_thread")]
async fn duplicate_connection_attempts() {
let node1 = client().await;
let node2 = client().await;
let addr2 = node2.listening_addr();
let node1_clone = node1.clone();
let conn1 = tokio::spawn(async move {
if let Ok(conn_task) = node1_clone.router().connect(addr2) { conn_task.await.unwrap().is_ok() } else { false }
});
let node1_clone = node1.clone();
let conn2 = tokio::spawn(async move {
if let Ok(conn_task) = node1_clone.router().connect(addr2) { conn_task.await.unwrap().is_ok() } else { false }
});
let node1_clone = node1.clone();
let conn3 = tokio::spawn(async move {
if let Ok(conn_task) = node1_clone.router().connect(addr2) { conn_task.await.unwrap().is_ok() } else { false }
});
let (result1, result2, result3) = tokio::join!(conn1, conn2, conn3);
sleep(Duration::from_millis(200)).await;
let mut successes = 0;
if result1.unwrap() {
successes += 1;
}
if result2.unwrap() {
successes += 1;
}
if result3.unwrap() {
successes += 1;
}
assert_eq!(successes, 1);
assert_eq!(node1.router().number_of_connected_peers(), 1);
assert_eq!(node2.router().number_of_connected_peers(), 1);
}