use futures::future;
use crate::control::DaemonReq;
use crate::message::Payload;
use crate::peer::Status;
use super::*;
#[test_log::test(tokio::test(start_paused = true))]
async fn daemon_queries() {
let DaemonSetup {
id: _,
daemon,
ctrl_tx,
dispatch_rx,
interest_rx: _,
inbound_packet_sender,
outbound_packet_receiver,
router_ctrl_rx,
router_monitor_tx
} = bare_daemon("eins".to_string());
let example_id = NodeIDArg::random();
let tests: [TestCase; 8] = [
(
"get fake member",
Command::GetConspirator(NodeIDArg::random().into()),
&check_err
),
(
"message fake member",
Command::SendPayload(NodeIDArg::random().into(), Payload::None, None),
&check_false
),
(
"add address for fake member",
Command::AddAddress {
arg: NodeIDArg::random().into(),
addr: dummy_address(0)
},
&check_false
),
(
"find fake member",
Command::GetConspirator("zwei".to_string().into()),
&check_err
),
(
"join member",
Command::Join{
id: example_id,
name: "zwei".to_string(),
addr: dummy_address(0)
},
&Response::is_ok
),
(
"get joined member",
Command::GetConspirator(example_id.into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
(
"find joined member",
Command::GetConspirator("zwei".to_string().into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
(
"shutdown",
Command::StopServer,
&Response::is_ok
)
];
let results = tests.map(|(msg, cmd, check)|
(msg, cmd.clone(), request(&ctrl_tx, cmd), check)
);
drop(dispatch_rx);
drop(inbound_packet_sender);
drop(outbound_packet_receiver);
drop(router_ctrl_rx);
drop(router_monitor_tx);
assert_eq!(Some(()), daemon.run().await);
for result in results {
check_result(result).await;
}
}
#[test_log::test(tokio::test(start_paused = true))]
async fn daemons_join() {
let names = vec![
"null".to_string(),
"eins".to_string(),
"zwei".to_string()
];
let (kits, addrs, observer) = setup_n(names.clone());
let mut ids = Vec::new();
let mut daemon_tasks = Vec::new();
let mut all_tasks = Vec::new();
let mut ctrl_txs = Vec::new();
let mut dispatch_rxs = Vec::new();
let mut interest_rxs = Vec::new();
all_tasks.push(observer);
for DaemonRouterKit {
id,
daemon,
router,
ctrl_tx,
dispatch_rx,
interest_rx,
tasks
} in kits {
ids.push(id);
daemon_tasks.push(task::spawn(daemon.run()));
all_tasks.push(task::spawn(router.run()));
all_tasks.extend(tasks);
ctrl_txs.push(ctrl_tx);
dispatch_rxs.push(dispatch_rx);
interest_rxs.push(interest_rx);
}
let join_1_cmd = Command::Join {
id: ids[1].into(),
name: names[1].clone(),
addr: addrs[1]
};
println!("0 joining 1");
timeout_100ms(request(&ctrl_txs[0], join_1_cmd.clone())).await;
sleep_ms(100).await;
println!("2 joining 1");
timeout_100ms(request(&ctrl_txs[2], join_1_cmd)).await;
sleep_ms(100).await;
let tests_1: [TestCase; 2] = [
(
"get node 0 from 1",
Command::GetConspirator(ids[0].into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
(
"find node 0 from 1",
Command::GetConspirator(names[0].clone().into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
];
let tests_2: [TestCase; 2] = [
(
"get node 0 from 2",
Command::GetConspirator(ids[0].into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
(
"find node 0 from 2",
Command::GetConspirator(names[0].clone().into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
];
let tests_0: [TestCase; 2] = [
(
"get node 2 from 0",
Command::GetConspirator(ids[2].into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
(
"find node 2 from 0",
Command::GetConspirator(names[2].clone().into()),
&|r| {
matches!(r, Response::GetConspirator(_))
}
),
];
for test in tests_1 {
sleep_ms(10).await;
check_result(make_result(&ctrl_txs[1], test)).await;
}
for test in tests_2 {
sleep_ms(10).await;
check_result(make_result(&ctrl_txs[2], test)).await;
}
for test in tests_0 {
sleep_ms(10).await;
check_result(make_result(&ctrl_txs[0], test)).await;
}
println!("0 quitting");
timeout_100ms(request(&ctrl_txs[0], Command::StopServer)).await;
sleep_ms(10).await;
let quit_tests: [TestCase; 2] = [
(
"get node 0 from 1 & 2, check that it quit",
Command::GetConspirator(ids[0].into()),
&|r| {
if let Response::GetConspirator(c) = r {
c.state == Status::Quit
}
else {false}
}
),
(
"try to ping node 0 from 1 & 2",
Command::SendPayload(ids[0].into(), Payload::None, None),
&|r| {
if let Response::SendPayload(b) = r {!b}
else {false}
}
)
];
for test in quit_tests {
sleep_ms(10).await;
check_result(make_result(&ctrl_txs[1], test.clone())).await;
check_result(make_result(&ctrl_txs[2], test)).await;
}
println!("1 quitting");
timeout_100ms(request(&ctrl_txs[1], Command::StopServer)).await;
sleep_ms(10).await;
println!("2 quitting");
timeout_100ms(request(&ctrl_txs[2], Command::StopServer)).await;
sleep_ms(10).await;
timeout_100ms(future::join_all(all_tasks)).await;
for daemon_task in daemon_tasks {
timeout_100ms(daemon_task).await.unwrap();
}
}
#[test_log::test(tokio::test(start_paused = true))]
async fn request_fn() {
let (tx, mut rx) = new_channel();
let response_fut = request(&tx, Command::StopServer);
if let Ok(AnyRequest::Daemon(DaemonReq::StopServer(req))) = rx.try_recv() {
req.reply(true);
}
else {panic!("invalid request")}
let response = response_fut
.timeout_ms(10)
.await
.unwrap();
assert!(
matches!(response, Response::StopServer(true))
);
}
#[test_log::test(tokio::test(start_paused = true))]
async fn run_empty() {
let (mut setup, _addr, dummy_task) = setup_n(vec!["eins".to_string()]);
let DaemonRouterKit {
id: _,
daemon,
router,
ctrl_tx,
dispatch_rx: _d,
interest_rx: _i,
mut tasks
} = setup.pop().unwrap();
let daemon_task = task::spawn(daemon.run());
let router_task = task::spawn(router.run());
tasks.push(dummy_task);
tasks.push(router_task);
sleep_ms(100).await;
request(&ctrl_tx, Command::StopServer).timeout_ms(100).await.unwrap();
daemon_task.timeout_ms(100).await.unwrap().unwrap();
for result in future::join_all(tasks).timeout_ms(100).await.unwrap() {
result.unwrap();
}
}