extern crate pickleback;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use log::*;
use pickleback::prelude::*;
use pickleback::testing::*;
const NUM_TEST_MSGS: usize = 100000;
const NUM_EXTRA_ITERATIONS: usize = 100;
const MAX_FRAGMENTS: u32 = 10;
#[test]
fn soak_message_transmission() {
init_logger();
let channel = 0;
let mut harness = MessageTestHarness::new(JitterPipeConfig::disabled());
let test_msgs = (0..NUM_TEST_MSGS)
.map(|_| random_payload_max_frags(MAX_FRAGMENTS))
.collect::<Vec<_>>();
let mut send_msgs_iter = test_msgs.iter();
let mut in_flight_msgs = HashMap::<Vec<u8>, MessageId>::new();
let mut sent_ids = VecDeque::new();
'sending: while !test_msgs.is_empty() {
let num_to_send = rand::random::<usize>() % 7;
let mut num_sent = 0;
for _ in 0..num_to_send {
let Some(msg) = send_msgs_iter.next() else {
if num_sent == 0 {
break 'sending;
} else {
break;
}
};
num_sent += 1;
let msg_id = harness.server.send_message(channel, msg.as_ref()).unwrap();
in_flight_msgs.insert(msg.clone(), msg_id);
sent_ids.push_back(msg_id);
}
harness.advance(0.03);
let client_received_messages = harness
.client
.drain_received_messages(channel)
.collect::<Vec<_>>();
let acks = harness
.server
.drain_message_acks(channel)
.collect::<HashSet<_>>();
assert_eq!(
client_received_messages.len(),
num_sent,
"didn't receive all the messages sent this tick"
);
for recv_msg in client_received_messages.iter() {
let rec = recv_msg.payload_to_owned();
if let Some(expected_id) = in_flight_msgs.get(&rec) {
assert_eq!(*expected_id, recv_msg.id(), "ids don't match");
} else {
panic!("payload not found in sent msgs for {recv_msg:?}, payload: {rec:?}");
}
assert!(
acks.contains(&sent_ids.pop_front().unwrap()),
"ack mismatch"
);
}
}
}
#[test]
fn soak_reliable_message_transmission_with_terrible_network() {
init_logger();
let channel = 1;
let mut harness = MessageTestHarness::new(JitterPipeConfig::very_very_bad());
let mut test_msgs = Vec::new();
(0..NUM_TEST_MSGS).for_each(|_| test_msgs.push(random_payload_max_frags(MAX_FRAGMENTS)));
let mut unacked_sent_msg_ids = Vec::new();
let mut client_received_messages = Vec::new();
let mut i = 0;
while i < NUM_TEST_MSGS + NUM_EXTRA_ITERATIONS {
if let Some(msg) = test_msgs.get(i) {
let size = msg.len();
match harness.server.send_message(channel, msg.as_ref()) {
Ok(msg_id) => {
trace!("💌💌 Sending message {i}/{NUM_TEST_MSGS}, size {size}, msg_id: {msg_id:?}");
unacked_sent_msg_ids.push(msg_id);
i += 1;
}
Err(PicklebackError::Backpressure(Backpressure::TooManyPending)) => {
warn!("Backpressure");
}
Err(e) => panic!("Unhandled send error {e:?}"),
}
} else {
i += 1;
}
harness.advance(0.051);
let acked_ids = harness.collect_server_acks(channel);
if !acked_ids.is_empty() {
unacked_sent_msg_ids.retain(|id| !acked_ids.contains(id));
}
client_received_messages.extend(harness.client.drain_received_messages(channel));
}
assert_eq!(
Vec::<MessageId>::new(),
unacked_sent_msg_ids,
"server is missing acks for these messages"
);
assert_eq!(client_received_messages.len(), test_msgs.len());
let server_sp = harness.server.stats().packets_sent;
let client_rp = harness.client.stats().packets_received;
let observed_packet_loss = (server_sp - client_rp) as f32 / server_sp as f32;
let configured_packet_loss = harness.server_jitter_pipe.config_mut().drop_chance;
println!("Observed packet loss: {observed_packet_loss} configured: {configured_packet_loss}");
assert_float_relative_eq!(observed_packet_loss, configured_packet_loss, 0.5);
let observed_dupe = harness.client.stats().packets_duplicate as f32 / server_sp as f32;
let configured_dupe = harness.server_jitter_pipe.config_mut().duplicate_chance;
println!("Observed duplicate rate: {observed_dupe} configured: {configured_dupe}");
assert_float_relative_eq!(observed_dupe, configured_dupe, 0.5);
println!("Server Stats: {:?}", harness.server.stats());
println!("Server Packet loss: {}", harness.server.packet_loss());
println!("Server RTT: {}", harness.server.rtt());
println!("Client Stats: {:?}", harness.client.stats());
println!("Client Packet loss: {}", harness.client.packet_loss());
println!("Client RTT: {}", harness.client.rtt());
println!("Test harness transmission stats: {:?}", harness.stats);
}