use crate::io::Io;
use crate::media::TaggedFrame;
use crate::MemMod;
use crossbeam_channel::TryRecvError;
use ratman_netmod::Endpoint;
use std::collections::{BTreeMap, VecDeque};
#[derive(Default)]
pub struct BroadcastMedium {
ticks: u128,
latency: u32,
last_tag: u32,
buffer: VecDeque<TaggedFrame>,
interfaces: BTreeMap<u32, Io>,
}
impl BroadcastMedium {
pub fn with_latency(latency: u32) -> Self {
assert_ne!(
latency, 0,
"Cannot create a BroadcastMedium with latency == 0."
);
Self {
latency,
..Default::default()
}
}
pub fn make_netmod(&mut self) -> impl Endpoint {
let mut mm = MemMod::new();
let (mm_io, my_io) = Io::make_pair();
mm.link_raw(mm_io);
self.interfaces.insert(self.last_tag, my_io);
self.last_tag += 1;
mm
}
pub fn ticks(&self) -> u128 {
self.ticks
}
pub fn tick(&mut self) {
let mut disconnected: Vec<u32> = Vec::new();
for (tag, io) in &mut self.interfaces {
match io.inc.try_recv() {
Ok(frame) => {
self.buffer
.push_back(TaggedFrame::new(*tag, self.latency, frame));
}
Err(e) => match e {
TryRecvError::Empty => (),
TryRecvError::Disconnected => disconnected.push(*tag),
},
}
}
let mut to_send = 0;
self.buffer
.iter_mut()
.enumerate()
.for_each(|(_index, mut frame)| {
frame.ttl -= 1;
if frame.ttl == 0 {
to_send += 1;
}
});
while to_send > 0 {
to_send -= 1;
let frame = self.buffer.pop_front().expect(
"No frames in buffer despite having recorded a frame as requiring send this tick.",
);
self.interfaces
.iter()
.for_each(|(tag, io)| match io.out.send(frame.frame.clone()) {
Ok(_) => (),
Err(_) => disconnected.push(*tag),
});
}
disconnected.sort_unstable();
disconnected.dedup();
disconnected.iter().for_each(|i| {
self.interfaces.remove(i);
});
self.ticks += 1;
}
}