#![allow(unused)]
#![allow(missing_docs)]
use std::net::SocketAddr;
use crate::*;
pub fn init_logger() {
let _ = env_logger::builder()
.write_style(env_logger::WriteStyle::Always)
.is_test(true)
.try_init();
}
pub fn random_payload_max_frags(max_fragments: u32) -> Vec<u8> {
let size = 1 + rand::random::<u32>() % (1024 * max_fragments - 1);
random_payload(size)
}
pub fn random_payload(size: u32) -> Vec<u8> {
let mut b = Vec::with_capacity(size as usize);
for _ in 0..size {
b.push(rand::random::<u8>());
}
b
}
#[derive(Debug, Default)]
pub struct TransmissionStats {
pub server_sent: u32,
pub server_received: u32,
pub client_sent: u32,
pub client_received: u32,
}
pub struct MessageTestHarness {
pub server: Pickleback,
pub client: Pickleback,
pub server_jitter_pipe: JitterPipe<BufHandle>,
pub client_jitter_pipe: JitterPipe<BufHandle>,
server_drop_indices: Option<Vec<u32>>,
pub stats: TransmissionStats,
pool: BufPool,
}
impl MessageTestHarness {
pub fn new(config: JitterPipeConfig) -> Self {
let server_jitter_pipe = JitterPipe::<BufHandle>::new(config.clone());
let client_jitter_pipe = JitterPipe::<BufHandle>::new(config);
let mut server = Pickleback::default();
let mut client = Pickleback::default();
server.set_xor_salt(Some(0));
client.set_xor_salt(Some(0));
Self {
server,
client,
server_jitter_pipe,
client_jitter_pipe,
server_drop_indices: None,
stats: TransmissionStats::default(),
pool: BufPool::default(),
}
}
pub fn collect_client_acks(&mut self, channel: u8) -> Vec<MessageId> {
self.client.drain_message_acks(channel).collect::<Vec<_>>()
}
pub fn collect_server_acks(&mut self, channel: u8) -> Vec<MessageId> {
self.server.drain_message_acks(channel).collect::<Vec<_>>()
}
pub fn collect_client_messages(&mut self, channel: u8) -> Vec<ReceivedMessage> {
self.client
.drain_received_messages(channel)
.collect::<Vec<_>>()
}
pub fn collect_server_messages(&mut self, channel: u8) -> Vec<ReceivedMessage> {
self.server
.drain_received_messages(channel)
.collect::<Vec<_>>()
}
pub fn advance_with_server_outbound_drops(
&mut self,
dt: f64,
drop_indices: Vec<u32>,
) -> &TransmissionStats {
self.server_drop_indices = Some(drop_indices);
self.advance(dt);
self.server_drop_indices = None;
&self.stats
}
pub fn advance(&mut self, dt: f64) -> &TransmissionStats {
debug!(
"🟡 server.update({dt}) --> {} ----",
self.server.time() + dt
);
self.server.update(dt);
debug!(
"🟠client.update({dt}) --> {} ----",
self.server.time() + dt
);
self.client.update(dt);
let empty = Vec::new();
let server_drop_indices = self.server_drop_indices.as_ref().unwrap_or(&empty);
trace!("🟡 server -> compose and send packets");
let server_sent = self
.server
.drain_packets_to_send()
.fold(0_u32, |acc, packet| {
if !server_drop_indices.contains(&acc) {
self.server_jitter_pipe.insert(packet);
}
acc + 1
});
trace!("🟠client -> process incoming packets");
let mut client_received = 0;
while let Some(p) = self.server_jitter_pipe.take_next() {
client_received += 1;
let packet_len = p.len();
let mut reader = Cursor::new(p.as_slice());
match read_packet(&mut reader).unwrap() {
ProtocolPacket::Messages(mut m) => {
self.client.process_incoming_packet(&m.header, &mut reader)
}
_ => panic!("Invalid proto msg"),
};
}
trace!("🟠client -> compose and send packets");
let client_sent = self.client.drain_packets_to_send().fold(0, |acc, packet| {
self.client_jitter_pipe.insert(packet);
acc + 1
});
trace!("🟡 server -> process incoming packets");
let mut server_received = 0;
while let Some(p) = self.client_jitter_pipe.take_next() {
server_received += 1;
let packet_len = p.len();
let mut reader = Cursor::new(p.as_slice());
match read_packet(&mut reader).unwrap() {
ProtocolPacket::Messages(mut m) => {
self.server.process_incoming_packet(&m.header, &mut reader)
}
_ => panic!("Invalid proto msg"),
};
}
self.stats.server_received += server_received;
self.stats.server_sent += server_sent;
self.stats.client_received += client_received;
self.stats.client_sent += client_sent;
&self.stats
}
}
pub struct ProtocolTestHarness {
pub server: PicklebackServer,
pub client: PicklebackClient,
pub server_jitter_pipe: JitterPipe<AddressedPacket>,
pub client_jitter_pipe: JitterPipe<AddressedPacket>,
server_drop_indices: Option<Vec<u32>>,
pub stats: TransmissionStats,
}
impl ProtocolTestHarness {
pub fn new(jitter_config: JitterPipeConfig) -> Self {
let server_jitter_pipe = JitterPipe::new(jitter_config.clone());
let client_jitter_pipe = JitterPipe::new(jitter_config);
let time = 0.0;
let config = PicklebackConfig::default();
let server = PicklebackServer::new(time, &config);
let mut client = PicklebackClient::new(time, &config);
client.connect("127.0.0.1:0");
Self {
server,
client,
server_jitter_pipe,
client_jitter_pipe,
server_drop_indices: None,
stats: TransmissionStats::default(),
}
}
pub fn advance_with_server_outbound_drops(
&mut self,
dt: f64,
drop_indices: Vec<u32>,
) -> &TransmissionStats {
self.server_drop_indices = Some(drop_indices);
self.advance(dt);
self.server_drop_indices = None;
&self.stats
}
pub fn advance(&mut self, dt: f64) -> &TransmissionStats {
trace!("🟡 server.update({dt}) --> {} ----", self.server.time + dt);
self.server.update(dt);
trace!("🟠client.update({dt}) --> {} ----", self.server.time + dt);
self.client.update(dt);
let empty = Vec::new();
let server_drop_indices = self.server_drop_indices.as_ref().unwrap_or(&empty);
trace!("🟡 server -> compose and send packets");
let mut server_sent = 0;
self.server
.visit_packets_to_send(|address: SocketAddr, packet: BufHandle| {
if !server_drop_indices.contains(&server_sent) {
self.server_jitter_pipe
.insert(AddressedPacket { address, packet });
}
server_sent += 1;
});
trace!("🟠client -> process incoming packets");
let mut client_received = 0;
while let Some(p) = self.server_jitter_pipe.take_next() {
client_received += 1;
self.client.receive(p.packet.as_slice(), p.address);
}
trace!("🟠client -> compose and send packets");
let client_sent = 0;
self.client
.visit_packets_to_send(|address: SocketAddr, packet: BufHandle| {
self.client_jitter_pipe
.insert(AddressedPacket { address, packet });
server_sent += 1;
});
trace!("🟡 server -> process incoming packets");
let mut server_received = 0;
while let Some(p) = self.client_jitter_pipe.take_next() {
server_received += 1;
self.server.receive(p.packet.as_slice(), p.address);
}
self.stats.server_received += server_received;
self.stats.server_sent += server_sent;
self.stats.client_received += client_received;
self.stats.client_sent += client_sent;
&self.stats
}
}