use std::{
net::{SocketAddr, UdpSocket},
time::{Duration, Instant},
};
use crossbeam::channel::{Receiver, RecvTimeoutError};
use crate::common::timed_event_queue::TimedEventQueue;
pub trait NetworkSimulator: Send {
fn simulate(&mut self, to: SocketAddr, size: usize) -> Option<Duration>;
}
pub(crate) enum SimulatorThreadCmd {
ChangeSimulator(Box<dyn NetworkSimulator>),
Send(SocketAddr, Vec<u8>),
Shutdown,
}
#[derive(Debug, PartialEq, Eq, Hash)]
enum EventKey {
Send(u64),
}
#[derive(Debug, PartialEq, Eq)]
enum Event {
Send(SocketAddr, Vec<u8>),
}
pub(crate) fn simulator_thread(
cmds: Receiver<SimulatorThreadCmd>,
socket: UdpSocket,
mut simulator: Box<dyn NetworkSimulator>,
) {
let mut timed_events: TimedEventQueue<EventKey, Event> = TimedEventQueue::new();
let mut send_key_counter = 0;
loop {
let cmd = if let Some(deadline) = timed_events.next() {
if deadline.elapsed() > Duration::ZERO {
let (_key, event) = timed_events.pop().unwrap();
match event {
Event::Send(socket_addr, packet) => {
match socket.send_to(&packet, socket_addr) {
Ok(size) => assert_eq!(size, packet.len()),
Err(e) => {
todo!("Handle send errors in simulator_thread: {:?}", e);
}
}
}
}
continue;
}
match cmds.recv_deadline(deadline) {
Ok(cmd) => cmd,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => break,
}
} else {
match cmds.recv() {
Ok(cmd) => cmd,
Err(_) => break,
}
};
match cmd {
SimulatorThreadCmd::ChangeSimulator(new_simulator) => {
simulator = new_simulator;
}
SimulatorThreadCmd::Send(socket_addr, packet) => {
if let Some(delay) = simulator.simulate(socket_addr, packet.len()) {
timed_events.push(
EventKey::Send(send_key_counter),
Instant::now() + delay,
Event::Send(socket_addr, packet),
);
send_key_counter += 1;
}
}
SimulatorThreadCmd::Shutdown => break,
}
}
}