use crate::test_utils::cluster_listener::TestClusterListener;
use crate::{
Cluster, ClusterListener, ClusterLog, Connector, CustomSystemService, NetworkInterface,
NodeResolving,
};
use actix::prelude::*;
use actix_broker::BrokerSubscribe;
use actix_rt;
use log::*;
use port_scanner::{local_port_available, request_open_port};
use rayon::prelude::*;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use tokio::time::{Duration, sleep};
type SocketTestClusterListener = TestClusterListener<Arc<Mutex<Vec<SocketAddr>>>>;
impl Actor for SocketTestClusterListener {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
self.subscribe_system_async::<ClusterLog>(ctx);
}
}
impl Handler<ClusterLog> for SocketTestClusterListener {
type Result = ();
fn handle(&mut self, msg: ClusterLog, _ctx: &mut Context<Self>) -> Self::Result {
match msg {
ClusterLog::NewMember(node) => {
debug!("new member {}", node.socket_addr.to_string());
(*(self.content.as_ref().unwrap().lock().unwrap())).push(node.socket_addr);
}
ClusterLog::MemberLeft(_addr) => {}
}
}
}
struct OwnListenerAskingGossip {
pub asking: SocketAddr,
pub addrs: Arc<Mutex<Vec<Addr<NetworkInterface>>>>,
}
impl ClusterListener for OwnListenerAskingGossip {}
impl Supervised for OwnListenerAskingGossip {}
impl Actor for OwnListenerAskingGossip {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
self.subscribe_system_async::<ClusterLog>(ctx);
}
}
impl Handler<ClusterLog> for OwnListenerAskingGossip {
type Result = ();
fn handle(&mut self, msg: ClusterLog, ctx: &mut Context<Self>) -> Self::Result {
match msg {
ClusterLog::NewMember(_node) => {
Connector::from_custom_registry()
.send(NodeResolving {
addrs: vec![self.asking.clone()],
})
.into_actor(self)
.map(
|res: Result<Result<Vec<Addr<NetworkInterface>>, ()>, MailboxError>,
act,
_ctx| match res.unwrap() {
Ok(addrs) => {
(*(act.addrs.lock().unwrap())).push(addrs.get(0).unwrap().clone())
}
Err(_) => (),
},
)
.wait(ctx);
}
ClusterLog::MemberLeft(addr) => {
Connector::from_custom_registry()
.send(NodeResolving { addrs: vec![addr] })
.into_actor(self)
.map(
|res: Result<Result<Vec<Addr<NetworkInterface>>, ()>, MailboxError>,
_act,
_ctx| match res.unwrap() {
Ok(addrs) => assert_eq!(addrs.len(), 0),
Err(_) => (),
},
)
.wait(ctx);
}
}
}
}
#[actix_rt::test]
async fn cluster_binds_port() {
let port = request_open_port().unwrap_or(8000);
let _listener = Cluster::bind(format!("127.0.0.1:{}", port));
assert_eq!(local_port_available(port), false);
}
#[actix_rt::test]
async fn cluster_adds_node_and_from_stream() {
let _cluster = Cluster::new("127.0.0.1:1992".parse().unwrap(), vec![]);
let connections = Arc::new(Mutex::new(vec![]));
let own_listener = SocketTestClusterListener::new_with_content(Arc::clone(&connections));
let _addr = own_listener.start();
let _network_interface = NetworkInterface::new(
"127.0.0.1:1993".parse().unwrap(),
"127.0.0.1:1992".parse().unwrap(),
true,
)
.start();
sleep(Duration::from_secs(1)).await;
assert!(
connections
.lock()
.unwrap()
.contains(&("127.0.0.1:1993".parse().unwrap()))
);
assert!(
connections
.lock()
.unwrap()
.contains(&("127.0.0.1:1992".parse().unwrap()))
);
}
#[actix_rt::test]
async fn gossip_adds_member_and_resolves_it() {
let local_ip: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let other_ip: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let _cluster = Cluster::new_with_connection_protocol(
local_ip.clone(),
vec![],
crate::ConnectionProtocol::Gossip,
);
let addrs = Arc::new(Mutex::new(vec![]));
let _own_listener = OwnListenerAskingGossip {
asking: other_ip.clone(),
addrs: Arc::clone(&addrs),
}
.start();
let _network_interface = NetworkInterface::new(other_ip, local_ip, true).start();
sleep(Duration::from_secs(1)).await;
assert_eq!(addrs.lock().unwrap().len(), 2);
}
#[actix_rt::test]
async fn gossip_removes_member() {
let local_ip: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let other_ip: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let _cluster = Cluster::new_with_connection_protocol(
local_ip.clone(),
vec![],
crate::ConnectionProtocol::Gossip,
);
let addrs = Arc::new(Mutex::new(vec![]));
let _own_listener = OwnListenerAskingGossip {
asking: other_ip.clone(),
addrs: Arc::clone(&addrs),
}
.start();
let _network_interface = NetworkInterface::new(other_ip, local_ip, true).start();
sleep(Duration::from_secs(1)).await;
assert_eq!(addrs.lock().unwrap().len(), 2);
}
struct TestParams {
ip: SocketAddr,
seeds: Vec<SocketAddr>,
start: u64,
delay: u64,
end: u64,
expect: usize,
}
#[test]
#[ignore] fn gossip_adds_member_and_introduces_other_members() {
let ip1: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let ip2: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let ip3: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let arr = [
TestParams {
ip: ip1.clone(),
seeds: vec![],
start: 0,
delay: 0,
end: 2000,
expect: 0,
},
TestParams {
ip: ip2.clone(),
seeds: vec![ip1.clone()],
start: 1000,
delay: 1000,
end: 0,
expect: 2,
},
TestParams {
ip: ip3.clone(),
seeds: vec![ip1.clone()],
start: 1000,
delay: 1000,
end: 0,
expect: 2,
},
];
arr.par_iter()
.for_each(|p| build_cluster(p.ip, p.seeds.clone(), p.start, p.delay, p.end, p.expect));
}
#[actix_rt::main]
async fn build_cluster(
own_ip: SocketAddr,
other_ip: Vec<SocketAddr>,
start: u64,
delay: u64,
end: u64,
expect: usize,
) {
sleep(Duration::from_millis(start)).await;
let _cluster = Cluster::new(own_ip, other_ip);
let addrs = Arc::new(Mutex::new(vec![]));
let _cluster_listener = SocketTestClusterListener::new_with_content(Arc::clone(&addrs)).start();
sleep(Duration::from_millis(delay)).await;
assert_eq!((*(addrs.lock().unwrap())).len(), expect);
sleep(Duration::from_millis(end)).await;
}
#[test]
#[ignore] fn cluster_reconnects_to_left_node() {
env_logger::init();
let ip1: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let ip2: SocketAddr = format!("127.0.0.1:{}", request_open_port().unwrap_or(8000))
.parse()
.unwrap();
let variables = vec![
(ip1, vec![], false),
(ip2, vec![ip1], false),
(ip2, vec![ip1], true),
];
let results: Vec<bool> = variables
.par_iter()
.map(|(own_ip, seed_node, delay)| {
build_cluster_and_maybe_leave(own_ip.clone(), seed_node.clone(), *delay)
})
.collect();
assert_eq!(results, vec![true, true, true]);
}
#[actix_rt::main]
async fn build_cluster_and_maybe_leave(
own_ip: SocketAddr,
seed_nodes: Vec<SocketAddr>,
delay: bool,
) -> bool {
if delay {
sleep(Duration::from_secs(2)).await;
}
let _cluster = Cluster::new(own_ip, seed_nodes.clone());
sleep(Duration::from_secs(1)).await;
if seed_nodes.len() > 0 && !delay {
System::current().stop();
return true;
}
sleep(Duration::from_secs(2)).await;
true
}