use core::panic;
use std::net::{Ipv4Addr, SocketAddr};
use peerlink::Handle;
use peerlink::{Command, Config, DisconnectReason, Event, PeerId};
mod common;
use common::Message;
#[test]
fn interleaved() {
let Scaffold {
server,
client,
server_addr,
..
} = start_server_client(8100);
let (client_peer, server_peer) = connect(&client, &server, server_addr);
for nonce in 0..1000 {
message(&client, server_peer, Message::Ping(nonce));
expect_ping(server.receive_blocking().unwrap(), nonce);
message(&server, client_peer, Message::Pong(nonce));
expect_pong(client.receive_blocking().unwrap(), nonce);
}
}
#[test]
fn stuff_messages() {
let Scaffold {
server,
client,
server_addr,
..
} = start_server_client(8101);
let (_client_peer, server_peer) = connect(&client, &server, server_addr);
for i in 0..1000 {
message(&client, server_peer, Message::Ping(i));
}
client.send(Command::Disconnect(server_peer)).unwrap();
for i in 0..1000 {
expect_ping(server.receive_blocking().unwrap(), i);
}
match server.receive_blocking().unwrap() {
peerlink::Event::Disconnected {
reason: DisconnectReason::Left,
..
} => {}
event => {
panic!("did not get disconnect; event: {:?}", event);
}
}
}
#[test]
fn batches() {
let Scaffold {
server,
client,
server_addr,
..
} = start_server_client(8102);
let (client_peer, server_peer) = connect(&client, &server, server_addr);
for i in 0..1000 {
message(&client, server_peer, Message::Ping(i));
message(&server, client_peer, Message::Ping(i));
}
for i in 0..1000 {
expect_ping(server.receive_blocking().unwrap(), i);
expect_ping(client.receive_blocking().unwrap(), i);
}
}
#[test]
fn many_to_one_interleaved() {
let _ = env_logger::builder().is_test(true).try_init();
let server_addr = (Ipv4Addr::LOCALHOST, 8103);
let server_config = Config {
bind_addr: vec![server_addr.into()],
..Default::default()
};
let server = peerlink::run(server_config).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
let n_clients = 5;
let clients: Vec<_> = (0..n_clients)
.enumerate()
.map(|(i, _)| {
let client = peerlink::run(Config::default()).unwrap();
let (client_peer, _) = connect(&client, &server, server_addr.into());
assert_eq!(i as u64, client_peer.inner());
client
})
.collect();
for nonce in 0..100 {
for (i, c) in clients.iter().enumerate() {
message(c, PeerId::set_raw(0), Message::Ping(nonce));
let ping_from_peer = expect_ping(server.receive_blocking().unwrap(), nonce);
assert_eq!(ping_from_peer, PeerId::set_raw(i as u64));
message(&server, ping_from_peer, Message::Pong(nonce));
expect_pong(c.receive_blocking().unwrap(), nonce);
}
}
}
#[test]
fn many_to_one_bulk() {
let _ = env_logger::builder().is_test(true).try_init();
let server_addr = (Ipv4Addr::LOCALHOST, 8104);
let server_config = Config {
bind_addr: vec![server_addr.into()],
..Default::default()
};
let server = peerlink::run(server_config).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
let n_clients = 5;
let n_pings = 100;
let clients: Vec<_> = (0..n_clients)
.enumerate()
.map(|(i, _)| {
let client = peerlink::run(Config::default()).unwrap();
let (client_peer, _) = connect(&client, &server, server_addr.into());
assert_eq!(i as u64, client_peer.inner());
client
})
.collect();
for nonce in 0..n_pings {
for c in &clients {
message(c, PeerId::set_raw(0), Message::Ping(nonce));
}
}
for _ in 0..(clients.len() * n_pings as usize) {
let (peer, nonce) = read_ping(server.receive_blocking().unwrap());
message(&server, peer, Message::Pong(nonce));
}
assert_eq!(server.receiver().len(), 0);
for nonce in 0..100 {
for c in &clients {
expect_pong(c.receive_blocking().unwrap(), nonce);
}
}
for c in &clients {
assert_eq!(c.receiver().len(), 0);
}
}
#[test]
fn very_large() {
let Scaffold {
server,
client,
server_addr,
..
} = start_server_client(8105);
let (client_peer, _) = connect(&client, &server, server_addr);
let ten_mb = vec![1; 1024 * 1024 * 10];
let message = Message::Data(ten_mb);
client.send(Command::Message(client_peer, message)).unwrap();
match server.receive_blocking().unwrap() {
Event::Message {
message: Message::Data(data),
size,
..
} if data.len() == 1024 * 1024 * 10
&& data.iter().all(|x| *x == 1)
&& size == (1024 * 1024 * 10 + 4 + 4) => {}
_ => panic!("bad data received"),
}
}
#[test]
fn peer_id_increments() {
let _ = env_logger::builder().is_test(true).try_init();
let server_addr = (Ipv4Addr::LOCALHOST, 8106);
let server_config = Config {
bind_addr: vec![server_addr.into()],
..Default::default()
};
let server = peerlink::run(server_config).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
let n_clients = 5;
for i in 0..n_clients {
let client = peerlink::run(Config::default()).unwrap();
let (client_peer, _) = connect(&client, &server, server_addr.into());
assert_eq!(i as u64, client_peer.inner());
let client_that_left = disconnect(&client, &server, PeerId::set_raw(0));
assert_eq!(client_that_left, client_peer);
}
}
#[allow(dead_code)]
struct Scaffold {
server: Handle<Message>,
client: Handle<Message>,
server_addr: SocketAddr,
}
fn start_server_client(server_port: u16) -> Scaffold {
let _ = env_logger::builder().is_test(true).try_init();
let server_addr = (Ipv4Addr::LOCALHOST, server_port);
let server_config = Config {
bind_addr: vec![server_addr.into()],
..Default::default()
};
let server_handle = peerlink::run(server_config).unwrap();
let client_handle = peerlink::run(Config::default()).unwrap();
Scaffold {
server: server_handle,
client: client_handle,
server_addr: server_addr.into(),
}
}
fn connect(
client: &Handle<Message>,
server: &Handle<Message>,
server_addr: SocketAddr,
) -> (PeerId, PeerId) {
client.send(Command::connect(server_addr)).unwrap();
let client_peer = match server.receive_blocking().unwrap() {
peerlink::Event::ConnectedFrom { peer, .. } => {
println!("server: client has connected");
peer
}
_ => panic!(),
};
let server_peer = match client.receive_blocking().unwrap() {
peerlink::Event::ConnectedTo {
result: Ok(peer), ..
} => {
println!("client: connected to server");
peer
}
_ => panic!(),
};
(client_peer, server_peer)
}
fn disconnect(client: &Handle<Message>, server: &Handle<Message>, server_peer: PeerId) -> PeerId {
client.send(Command::Disconnect(server_peer)).unwrap();
let client_that_left = match server.receive_blocking().unwrap() {
peerlink::Event::Disconnected {
peer,
reason: DisconnectReason::Left,
} => {
println!("server: client has disconnected");
peer
}
_ => panic!(),
};
match client.receive_blocking().unwrap() {
peerlink::Event::Disconnected {
peer,
reason: DisconnectReason::Requested,
} if peer == server_peer => {
println!("client: disconnected from server");
}
_ => panic!(),
};
client_that_left
}
fn message(handle: &Handle<Message>, peer: PeerId, message: Message) {
handle.send(Command::Message(peer, message)).unwrap();
}
fn expect_ping(event: Event<Message>, nonce: u64) -> PeerId {
match event {
peerlink::Event::Message {
message: Message::Ping(p),
peer,
size,
} if nonce == p && size == 12 => peer,
event => {
panic!("expected Ping({nonce}) but got {:?}", event);
}
}
}
fn expect_pong(event: Event<Message>, nonce: u64) -> PeerId {
match event {
peerlink::Event::Message {
message: Message::Pong(p),
peer,
size,
} if nonce == p && size == 12 => peer,
event => {
panic!("expected Pong({nonce}) but got {:?}", event);
}
}
}
fn read_ping(event: Event<Message>) -> (PeerId, u64) {
match event {
peerlink::Event::Message {
message: Message::Ping(p),
peer,
size,
} if size == 12 => (peer, p),
event => {
panic!("expected Ping(_) but got {:?}", event);
}
}
}