use crate::spsc;
use crate::spsc::*;
use std::collections::HashMap;
use rand::seq::SliceRandom;
use rand::thread_rng;
use crate::engine::*;
use crate::network::{Connectable, Device, ModelEvent, NetworkEvent, Q_SIZE};
use crate::worker::{ActorState, Advancer};
#[derive(Debug)]
pub struct RouterBuilder {
pub id: usize,
latency_ns: u64,
ns_per_byte: u64,
id_to_ix: HashMap<usize, usize>,
ix_to_id: Vec<usize>,
next_ix: usize,
route: Vec<Vec<usize>>,
in_queues: Vec<Consumer<ModelEvent>>,
out_queues: Vec<Producer<ModelEvent>>,
}
impl Connectable for &mut RouterBuilder {
fn id(&self) -> usize {
self.id
}
fn flavor(&self) -> Device {
Device::Router
}
fn connect(&mut self, mut other: impl Connectable) {
let (prod, cons) = spsc::new(Q_SIZE);
self.id_to_ix.insert(other.id(), self.next_ix);
self.ix_to_id.push(other.id());
let tx_queue = (other).back_connect(&mut **self, prod);
self.out_queues.push(tx_queue);
self.in_queues.push(cons);
self.next_ix += 1;
}
fn back_connect(
&mut self,
other: impl Connectable,
tx_queue: Producer<ModelEvent>,
) -> Producer<ModelEvent> {
self.id_to_ix.insert(other.id(), self.next_ix);
self.ix_to_id.push(other.id());
self.out_queues.push(tx_queue);
let (prod, cons) = spsc::new(Q_SIZE);
self.in_queues.push(cons);
self.next_ix += 1;
prod
}
}
impl RouterBuilder {
pub fn new(id: usize) -> RouterBuilder {
RouterBuilder {
id,
latency_ns: 500,
ns_per_byte: 1,
id_to_ix: HashMap::new(),
ix_to_id: Vec::new(),
next_ix: 0,
in_queues: Vec::new(),
out_queues: Vec::new(),
route: Vec::new(),
}
}
pub fn connect_world(&mut self) -> Producer<ModelEvent> {
self.id_to_ix.insert(0, self.next_ix);
let (prod, cons) = spsc::new(Q_SIZE);
self.in_queues.push(cons);
self.ix_to_id.push(0);
prod
}
pub fn install_routes(&mut self, routes: HashMap<usize, Vec<usize>>) {
self.route = vec![vec![]];
for dst_id in 1..routes.len() + 1 {
if dst_id == self.id {
continue;
}
let next_hop_ids = &routes[&dst_id];
let next_hop_ixs = next_hop_ids.iter().map(|x| self.id_to_ix[&x]).collect();
self.route.push(next_hop_ixs);
}
}
pub fn build(self) -> Router {
let mut v = Vec::new();
for id in &self.ix_to_id {
v.push(*id);
}
let merger = Merger::new(self.in_queues, self.id, v);
let mut out_times = vec![];
for dst_ix in 0..self.out_queues.len() {
self.out_queues[dst_ix]
.push(Event {
event_type: EventType::Null,
src: self.id,
time: self.latency_ns,
})
.unwrap();
out_times.push(0);
}
Router {
id: self.id,
latency_ns: self.latency_ns,
ns_per_byte: self.ns_per_byte,
merger,
ix_to_id: self.ix_to_id,
out_queues: self.out_queues,
out_times,
route: self.route,
count: 0,
}
}
}
#[derive(Debug)]
pub struct Router {
pub id: usize,
latency_ns: u64,
ns_per_byte: u64,
ix_to_id: Vec<usize>,
merger: Merger<u64, NetworkEvent>,
out_queues: Vec<Producer<ModelEvent>>,
out_times: Vec<u64>,
route: Vec<Vec<usize>>,
count: u64,
}
impl Router {
pub fn start(&mut self) -> u64 {
println!("Router {} start", self.id);
while let ActorState::Continue(_) = self.advance() {}
println!("Router {} done", self.id);
self.count
}
}
impl Advancer<u64, u64> for Router {
fn advance(&mut self) -> ActorState<u64, u64> {
let mut rng = thread_rng();
while let Some(event) = self.merger.next() {
match event.event_type {
EventType::Close => {
for dst_ix in 0..self.out_queues.len() {
self.out_queues[dst_ix]
.push(Event {
event_type: EventType::Close,
src: self.id,
time: event.time + self.latency_ns,
}) .unwrap();
}
break;
}
EventType::Stalled => {
for (dst_ix, out_time) in self.out_times.iter_mut().enumerate() {
if *out_time < event.time {
self.out_queues[dst_ix]
.push(Event {
event_type: EventType::Null,
src: self.id,
time: event.time + self.latency_ns,
})
.unwrap();
*out_time = event.time;
}
}
return ActorState::Continue(event.time);
}
EventType::Null => {}
EventType::ModelEvent(model_event) => {
self.count += 1;
match model_event {
NetworkEvent::Flow(_flow) => unreachable!(),
NetworkEvent::Packet(packet) => {
let next_hop_ix: usize =
*self.route[packet.dst].choose(&mut rng).unwrap();
if event.time
> self.out_times[next_hop_ix] + 1000 * 1500 * self.ns_per_byte
{
continue;
}
let cur_time = std::cmp::max(event.time, self.out_times[next_hop_ix]);
let tx_end = cur_time + self.ns_per_byte * packet.size_byte;
let rx_end = tx_end + self.latency_ns;
if let Err(e) = self.out_queues[next_hop_ix].push(Event {
event_type: EventType::ModelEvent(NetworkEvent::Packet(packet)),
src: self.id,
time: rx_end,
}) {
println!(
"@{} Router #{} push error to #{}: {:?}",
event.time, self.id, self.ix_to_id[next_hop_ix], e
);
break;
}
self.out_times[next_hop_ix] = tx_end;
} }
}
} }
ActorState::Done(self.count)
} }