use crate::control::{
NodeJoinRequest,
NodeReq
};
use crate::message::{
Payload,
PingOrPong
};
use crate::peer::Clock;
use crate::store::{
Config,
Conspirator
};
use super::*;
fn create_node(name: String)
-> (Node, NodeID, Receiver<AnyEvent>, Receiver<PktTo>)
{
let id = NodeIDArg::random().get();
let db = Store::create_in_memory().unwrap();
Node::init(id, name, db)
.map(|(n, d, o)| (n, id, d, o))
.unwrap()
}
fn query_node<R: Requester>(
node: &mut Node,
request_variant: fn(R) -> NodeReq,
request_content: R::Content)
-> R::Reply
{
let (req, mut receiver) = R::make(request_content);
node.handle_request(request_variant(req));
match receiver.try_recv() {
Ok(r) => r,
Err(err) => panic!("no response for query: {err}")
}
}
#[test_log::test(tokio::test(start_paused = true))]
async fn joining_other() {
let node_name = "TestNode".to_string();
let (mut node, node_id, _, mut pkt_rx) = create_node(node_name.clone());
let join_id = NodeIDArg::random().get();
let join_name = "JoinNode".to_string();
let join_addr = dummy_address(0);
let req = NodeJoinRequest {
id: join_id.into(),
name: join_name.clone(),
addr: join_addr
};
assert!(
query_node(&mut node, NodeReq::Join, req),
"join request failed"
);
let (addr, pkt) = timeout_100ms(pkt_rx.recv()).await.unwrap();
assert_eq!(addr, join_addr);
assert_eq!(pkt.src, node_id);
assert_eq!(pkt.dst, join_id);
assert_eq!(pkt.png, PingOrPong::Ping);
}
#[test_log::test]
fn op_sync() {
let (mut a, id_a, _disp_a, mut outbound_a) = create_node("A".to_string());
let (mut b, id_b, _disp_b, mut outbound_b) = create_node("B".to_string());
let a_addr = dummy_address(0);
let b_addr = dummy_address(1);
let req = NodeJoinRequest {
id: id_b.into(),
name: "B".to_string(),
addr: b_addr
};
assert!(
query_node(&mut a, NodeReq::Join, req),
"join request failed"
);
loop {
if let Ok((_to, pkt)) = outbound_b.try_recv() {
println!("B → A: {pkt:?}");
a.handle_pkt(b_addr, pkt);
continue;
}
if let Ok((_to, pkt)) = outbound_a.try_recv() {
println!("A → B: {pkt:?}");
b.handle_pkt(a_addr, pkt);
continue;
}
break;
}
let fetched_a = query_node(&mut b, NodeReq::GetConspirator, id_a.into());
assert_eq!(fetched_a.id.get(), id_a);
assert_eq!(fetched_a.name, "A");
}
#[test_log::test]
fn persist() {
let (mut a, id_a, _disp_a, mut outbound_a) = create_node("A".to_string());
let (mut b, id_b, _disp_b, mut outbound_b) = create_node("B".to_string());
let a_addr = dummy_address(0);
let b_addr = dummy_address(1);
let req = NodeJoinRequest {
id: id_b.into(),
name: "B".to_string(),
addr: b_addr
};
assert!(
query_node(&mut a, NodeReq::Join, req),
"join request failed"
);
loop {
if let Ok((_to, pkt)) = outbound_b.try_recv() {
println!("B → A: {pkt:?}");
a.handle_pkt(b_addr, pkt);
continue;
}
if let Ok((_to, pkt)) = outbound_a.try_recv() {
println!("A → B: {pkt:?}");
b.handle_pkt(a_addr, pkt);
continue;
}
break;
}
assert!(a.leave());
let db = a.stop();
let a_conspirator: Conspirator = db.get(id_a).unwrap().unwrap();
assert_eq!(a_conspirator.id, id_a);
assert_eq!(a_conspirator.name, "A");
assert_ne!(a_conspirator.clock, Clock::null_clock());
let (dispatcher_sender, _dispatcher_receiver)
= new_channel::<AnyEvent>();
let (outbound, mut outbound_a)
= new_channel::<PktTo>();
let mut a = Node::load(db, outbound, dispatcher_sender)
.unwrap()
.unwrap();
let fetched_b = query_node(&mut a, NodeReq::GetConspirator, id_b.into());
assert_eq!(fetched_b.id.get(), id_b);
assert_eq!(fetched_b.name, "B");
assert!(
query_node(&mut a, NodeReq::Disable, id_b.into()),
"A failed to disable B"
);
let trigger_payload = (
id_b.into(),
Payload::None,
Some(b_addr)
);
assert!(
query_node(&mut a, NodeReq::SendPayload, trigger_payload),
"A failed to trigger op sync"
);
loop {
if let Ok((_to, pkt)) = outbound_a.try_recv() {
println!("A → B: {pkt:?}");
b.handle_pkt(a_addr, pkt);
continue;
}
if let Ok((_to, pkt)) = outbound_b.try_recv() {
println!("B → A: {pkt:?}");
a.handle_pkt(b_addr, pkt);
continue;
}
break;
}
assert!(b.leave());
let db = b.stop();
let b_conspirator: Conspirator = db.get(id_b).unwrap().unwrap();
assert_eq!(b_conspirator.id, id_b);
assert!(!b_conspirator.active);
assert_eq!(b_conspirator.name, "B");
assert_ne!(b_conspirator.clock, Clock::null_clock());
let (dispatcher_sender, _dispatcher_receiver)
= new_channel::<AnyEvent>();
let (outbound, _outbound_packet_receiver)
= new_channel::<PktTo>();
assert!(Node::load(db, outbound, dispatcher_sender).unwrap().is_none());
assert!(
query_node(&mut a, NodeReq::Disable, id_a.into()),
"A failed to disable itself"
);
assert!(a.leave());
let db = a.stop();
let a_conspirator: Conspirator = db.get(id_a).unwrap().unwrap();
assert_eq!(a_conspirator.id, id_a);
assert!(!a_conspirator.active);
assert_eq!(a_conspirator.name, "A");
assert_ne!(a_conspirator.clock, Clock::null_clock());
let (dispatcher_sender, _dispatcher_receiver)
= new_channel::<AnyEvent>();
let (outbound, _outbound_packet_receiver)
= new_channel::<PktTo>();
assert!(Node::load(db, outbound, dispatcher_sender).unwrap().is_none());
}
#[test_log::test]
fn corrupt_db() {
let (node, _id, _disp, _outbound) = create_node("Node".to_string());
assert!(node.leave());
let db = node.stop();
db.write_config(Config::ID, "corrupt Node ID").unwrap();
let (dispatcher_sender, _dispatcher_receiver)
= new_channel::<AnyEvent>();
let (outbound, _outbound_packet_receiver)
= new_channel::<PktTo>();
let _err = Node::load(db, outbound, dispatcher_sender).unwrap_err();
}