use crate::engine::*;
use crate::network::router::{Router, RouterBuilder};
use crate::network::routing::{route_all, Network};
use crate::network::server::{Server, ServerBuilder};
use crate::network::tcp::*;
use crate::spsc::Producer;
use crate::start;
use crate::worker::Advancer;
use csv::ReaderBuilder;
use std::collections::HashMap;
use std::error::Error;
use std::time::Instant;
mod router;
pub mod routing;
mod server;
mod tcp;
const Q_SIZE: usize = 1 << 14;
pub enum NetworkEvent {
Flow(tcp::Flow),
Packet(tcp::Packet),
}
impl std::fmt::Debug for NetworkEvent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
NetworkEvent::Flow(_) => "Flow",
NetworkEvent::Packet(packet) => {
if packet.is_ack {
"Ack"
} else {
"Packet"
}
}
})
}
}
type ModelEvent = crate::engine::Event<u64, NetworkEvent>;
#[derive(Debug)]
pub enum Device {
Router,
Server,
}
pub trait Connectable {
fn id(&self) -> usize;
fn flavor(&self) -> Device;
fn connect(&mut self, other: impl Connectable);
fn back_connect(
&mut self,
other: impl Connectable,
tx_queue: Producer<ModelEvent>,
) -> Producer<ModelEvent>;
}
pub fn build_network(
_n_racks: usize,
time_limit: u64,
n_cpus: usize,
) -> Result<(), Box<dyn Error>> {
println!("Setup...");
let (net, n_hosts) = routing::build_clos(3, 9);
let n_links: u64 = (&net).iter().map(|(_, v)| v.len() as u64).sum();
let mut world = World::new_from_network(net, n_hosts);
let mut flows = Vec::new();
let flow_rdr = ReaderBuilder::new()
.has_headers(false)
.delimiter(b' ')
.from_path("/home/nibr/opera-sim/Figure7_datamining/3to1_clos/traffic_gen/flows_25percLoad_10sec_648hosts_3to1.htsim")
.expect("File open failed");
for (flow_id, try_line) in flow_rdr.into_records().enumerate() {
let line = try_line?;
let src = line[0].parse::<usize>()? + 1;
let dst = line[1].parse::<usize>()? + 1;
let size_byte = line[2].parse::<u64>()?;
let time = line[3].parse::<u64>()?;
if time > time_limit {
break;
}
let flow = Flow::new(flow_id, src, dst, size_byte);
flows.push((time, flow));
}
world.add_flows(flows);
println!("Run...");
let start = Instant::now();
let counts = world.start(n_cpus, time_limit);
let duration = start.elapsed();
let n_actors = counts.len();
let n_cpus = std::cmp::min(n_cpus, n_actors);
let sum_count = counts.iter().sum::<u64>();
let ns_per_count: f64 = if sum_count > 0 {
1000. * duration.as_nanos() as f64 / sum_count as f64
} else {
0.
};
let gbps = (n_links * 8 * time_limit) as f64 / 1e9 / duration.as_secs_f64();
println!(
"= {} in {:.3}s. {} actors, {} hosts, {} cores",
sum_count,
duration.as_secs_f32(),
n_actors,
n_hosts,
n_cpus,
);
println!(
" {:.3}M count/sec, {:.3}M /actors, {:.3}M /cpu",
(1e6 / ns_per_count as f64),
(1e6 / (ns_per_count * n_actors as f64)),
(1e6 / (ns_per_count * n_cpus as f64)),
);
println!(
" {:.1} ns/count, {:.1} ns/actor, {:.1} ns/cpu",
ns_per_count / 1000. as f64,
ns_per_count * n_actors as f64 / 1000.,
ns_per_count * n_cpus as f64 / 1000.
);
println!(
" {:.3} gbps, {:.3} gbps/actor ({} links total)",
gbps,
(gbps / n_actors as f64),
n_links
);
println!("done");
Ok(())
}
#[derive(Debug)]
struct World {
routers: Vec<Router>,
servers: Vec<Server>,
chans: HashMap<usize, Producer<ModelEvent>>,
}
impl World {
pub fn new_from_network(network: Network, n_hosts: usize) -> World {
let mut server_builders: Vec<ServerBuilder> = Vec::new();
let mut router_builders: Vec<RouterBuilder> = Vec::new();
for id in 1..n_hosts + 1 {
server_builders.push(ServerBuilder::new(id));
}
for id in n_hosts + 1..network.len() + 1 {
let mut rb = RouterBuilder::new(id);
for &n in &network[&id] {
if n >= id {
continue;
}
if n <= n_hosts {
server_builders.get_mut(n - 1).unwrap().connect(&mut rb);
} else {
router_builders
.get_mut(n - n_hosts - 1)
.unwrap()
.connect(&mut rb);
}
}
router_builders.push(rb);
}
println!(" Routing...");
router_builders
.iter_mut()
.map(|r| {
let routes = route_all(&network, r.id);
r.install_routes(routes);
})
.for_each(drop);
let mut chans = HashMap::new();
println!(" Build servers...");
let mut servers = vec![];
for mut b in server_builders {
chans.insert(b.id, b.connect_world());
servers.push(b.build());
}
println!(" Build routers...");
let mut routers = vec![];
for mut rb in router_builders {
chans.insert(rb.id, rb.connect_world());
routers.push(rb.build());
}
World {
servers,
routers,
chans,
}
}
pub fn _new(n_racks: usize) -> World {
let servers_per_rack = n_racks - 1;
let mut next_id = 1;
let mut network = Network::new();
let mut rack_builders: Vec<RouterBuilder> = Vec::new();
for _ in 0..n_racks {
let mut r = RouterBuilder::new(next_id);
network.insert(next_id, vec![]);
for rack2 in rack_builders.iter_mut() {
network.get_mut(&next_id).unwrap().push(rack2.id());
network.get_mut(&rack2.id()).unwrap().push(next_id);
(&mut r).connect(rack2);
}
next_id += 1;
rack_builders.push(r);
}
let mut server_builders = Vec::new();
for rack_ix in 0..n_racks {
for _ in 0..servers_per_rack {
let mut s = ServerBuilder::new(next_id);
network.insert(next_id, vec![]);
let rack = rack_builders.get_mut(rack_ix).unwrap();
network.get_mut(&next_id).unwrap().push(rack.id());
network.get_mut(&rack.id()).unwrap().push(next_id);
(&mut s).connect(rack);
next_id += 1;
server_builders.push(s);
}
}
for r in rack_builders.iter_mut() {
let rack_id = r.id();
let routes = route_all(&network, rack_id);
r.install_routes(routes);
}
let mut chans = HashMap::new();
let mut servers = vec![];
for mut b in server_builders {
chans.insert(b.id, b.connect_world());
servers.push(b.build());
}
let mut routers = vec![];
for mut rb in rack_builders {
chans.insert(rb.id, rb.connect_world());
routers.push(rb.build());
}
World {
routers,
servers,
chans,
}
}
pub fn add_flows(&mut self, flows: Vec<(u64, Flow)>) {
println!(" Init flows...");
for (time, f) in flows {
self.chans[&f.src]
.push(Event {
src: 0,
time,
event_type: EventType::ModelEvent(NetworkEvent::Flow(f)),
})
.unwrap();
}
}
pub fn start(mut self, num_cpus: usize, done: u64) -> Vec<u64> {
for (_, c) in self.chans.iter_mut() {
c.push(Event {
time: done,
src: 0,
event_type: EventType::Close,
})
.unwrap();
}
let mut actors: Vec<Box<dyn Advancer<u64, u64> + Send>> = Vec::new();
for s in self.servers {
actors.push(Box::new(s));
}
for r in self.routers {
actors.push(Box::new(r));
}
start(num_cpus, actors)
}
}