#![allow(clippy::suspicious_else_formatting)]
#![allow(clippy::needless_return)]
#![allow(clippy::new_without_default)]
#![allow(clippy::comparison_chain)] #![allow(clippy::single_match)]
#![allow(clippy::let_and_return)]
#![allow(clippy::len_without_is_empty)]
#![allow(clippy::needless_range_loop)]
#![allow(clippy::collapsible_else_if)]
#![allow(clippy::match_ref_pats)]
#![allow(clippy::tabs_in_doc_comments)]
#![allow(clippy::type_complexity)]
#![allow(clippy::option_map_unit_fn)]
#![warn(clippy::cargo)]
#![allow(clippy::cargo_common_metadata)]
pub use quantifiable_derive::Quantifiable;
#[allow(clippy::manual_map)]
#[allow(clippy::match_single_binding)]
pub mod config_parser;
pub mod topology;
pub mod traffic;
pub mod pattern;
pub mod router;
pub mod routing;
pub mod event;
pub mod matrix;
pub mod output;
pub mod quantify;
pub mod policies;
pub mod experiments;
pub mod config;
pub mod error;
pub mod measures;
pub mod allocator;
pub mod packet;
use std::rc::Rc;
use std::boxed::Box;
use std::cell::{RefCell};
use std::env;
use std::fs::{self,File};
use std::io::prelude::*;
use std::io::{stdout};
use std::collections::{VecDeque,BTreeMap};
use std::ops::DerefMut;
use std::path::{Path};
use std::mem::{size_of};
use std::fmt::Debug;
use std::cmp::Ordering;
use rand::{rngs::StdRng,SeedableRng};
use config_parser::{ConfigurationValue,Expr};
use topology::{Topology,new_topology,TopologyBuilderArgument,Location,
multistage::{Stage,StageBuilderArgument}};
use traffic::{Traffic,new_traffic,TrafficBuilderArgument,TrafficError};
use router::{Router,new_router,RouterBuilderArgument};
use routing::{RoutingInfo,Routing,new_routing,RoutingBuilderArgument};
use event::{EventQueue,Event};
use quantify::Quantifiable;
use experiments::{Experiment,Action,ExperimentOptions};
use policies::{VirtualChannelPolicy,VCPolicyBuilderArgument};
use pattern::{Pattern,PatternBuilderArgument};
use config::flatten_configuration_value;
use measures::{Statistics,ServerStatistics};
use error::{Error,SourceLocation};
use allocator::{Allocator,AllocatorBuilderArgument};
pub use packet::{Phit,Packet,Message,PacketExtraInfo,PacketRef};
#[derive(Quantifiable)]
pub struct Server
{
index: usize,
port: (Location,usize),
router_status: Box<dyn router::StatusAtEmissor+'static>,
stored_messages: VecDeque<Rc<Message>>,
stored_packets: VecDeque<PacketRef>,
stored_phits: VecDeque<Rc<Phit>>,
outcoming_virtual_channel: Option<usize>,
consumed_phits: BTreeMap<*const Message,usize>,
statistics: ServerStatistics,
}
impl Server
{
fn consume(&mut self, phit:Rc<Phit>, traffic:&mut dyn Traffic, statistics:&mut Statistics, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>)
{
self.statistics.track_consumed_phit(cycle);
statistics.track_consumed_phit(cycle);
let message=phit.packet.message.clone();
let message_ptr=message.as_ref() as *const Message;
let cp=match self.consumed_phits.get(&message_ptr)
{
None => 1,
Some(x) => x+1,
};
if cp==message.size
{
self.statistics.track_consumed_message(cycle);
statistics.track_consumed_message(cycle);
self.statistics.track_message_delay(cycle-message.creation_cycle,cycle);
statistics.track_message_delay(cycle-message.creation_cycle,cycle);
self.consumed_phits.remove(&message_ptr);
if !traffic.try_consume(self.index,message,cycle,topology,rng)
{
panic!("The traffic could not consume its own message.");
}
if !phit.is_end()
{
panic!("message was consumed by a non-ending phit.");
}
}
else
{
self.consumed_phits.insert(message_ptr,cp);
}
if phit.is_end()
{
statistics.track_consumed_packet(cycle,&phit.packet);
if cp < phit.packet.size
{
println!("phit tail has been consuming without having consumed a whole packet.");
}
phit.packet.destroy(); }
}
}
pub struct Network
{
pub topology: Box<dyn Topology>,
pub routers: Vec<Rc<RefCell<dyn Router>>>,
pub servers: Vec<Server>,
}
impl Quantifiable for Network
{
fn total_memory(&self) -> usize
{
let mut total=size_of::<Box<dyn Topology>>() + self.topology.total_memory() + self.routers.total_memory() + self.servers.total_memory();
for router in self.routers.iter()
{
total+=router.as_ref().total_memory();
let rb=router.borrow();
for phit in rb.iter_phits()
{
total+=phit.as_ref().total_memory();
if phit.is_end()
{
let packet=phit.packet.as_ref();
total+=packet.total_memory();
}
}
}
for server in self.servers.iter()
{
for phit in server.stored_phits.iter()
{
total+=phit.as_ref().total_memory();
}
for packet in server.stored_packets.iter()
{
total+=packet.as_ref().total_memory();
}
for message in server.stored_messages.iter()
{
total+=message.as_ref().total_memory();
}
for (_message_ptr,_) in server.consumed_phits.iter()
{
total+=size_of::<Message>();
}
}
total
}
fn print_memory_breakdown(&self)
{
unimplemented!();
}
fn forecast_total_memory(&self) -> usize
{
unimplemented!();
}
}
impl Network
{
fn jain_server_created_phits(&self) -> f64
{
measures::jain(self.servers.iter().map(|s|s.statistics.current_measurement.created_phits as f64))
}
fn jain_server_consumed_phits(&self) -> f64
{
measures::jain(self.servers.iter().map(|s|s.statistics.current_measurement.consumed_phits as f64))
}
fn temporal_jain_server_created_phits<'a>(&'a self) -> Vec<f64>
{
let limit:usize = self.servers.iter().map(|s|s.statistics.temporal_statistics.len()).max().unwrap_or(0);
(0..limit).map(|index|{
measures::jain(self.servers.iter().map(|s|
s.statistics.temporal_statistics.get(index).map(|m|m.created_phits as f64).unwrap_or(0f64)
))
}).collect()
}
fn temporal_jain_server_consumed_phits<'a>(&'a self) -> Vec<f64>
{
let limit:usize = self.servers.iter().map(|s|s.statistics.temporal_statistics.len()).max().unwrap_or(0);
(0..limit).map(|index|{
measures::jain(self.servers.iter().map(|s|
s.statistics.temporal_statistics.get(index).map(|m|m.created_phits as f64).unwrap_or(0f64)
))
}).collect()
}
}
pub struct LinkClass
{
pub delay: usize,
}
impl LinkClass
{
fn new(cv:&ConfigurationValue) -> LinkClass
{
let mut delay=None;
match_object_panic!(cv,"LinkClass",value,
"delay" => delay=Some(value.as_f64().expect("bad value for delay") as usize),
"transference_speed" => (), );
let delay=delay.expect("There were no delay");
LinkClass{
delay,
}
}
}
pub struct Simulation<'a>
{
#[allow(dead_code)]
pub configuration: ConfigurationValue,
#[allow(dead_code)]
pub seed: usize,
pub rng: RefCell<StdRng>,
pub warmup: usize,
pub measured: usize,
pub network: Network,
pub traffic: Box<dyn Traffic>,
pub maximum_packet_size: usize,
pub routing: Box<dyn Routing>,
pub link_classes: Vec<LinkClass>,
pub server_queue_size: usize,
pub event_queue: EventQueue,
pub cycle:usize,
pub statistics: Statistics,
#[allow(dead_code)]
pub launch_configurations: Vec<ConfigurationValue>,
pub plugs: &'a Plugs,
}
impl<'a> Simulation<'a>
{
fn new(cv: &ConfigurationValue, plugs:&'a Plugs) -> Simulation<'a>
{
let mut seed: Option<usize> = None;
let mut topology =None;
let mut traffic =None;
let mut router_cfg: Option<&ConfigurationValue> =None;
let mut warmup = None;
let mut measured = None;
let mut maximum_packet_size=None;
let mut routing=None;
let mut link_classes = None;
let mut statistics_temporal_step = 0;
let mut launch_configurations: Vec<ConfigurationValue> = vec![];
let mut statistics_server_percentiles: Vec<u8> = vec![];
let mut statistics_packet_percentiles: Vec<u8> = vec![];
let mut statistics_packet_definitions:Vec< (Vec<Expr>,Vec<Expr>) > = vec![];
let mut server_queue_size = None;
match_object_panic!(cv,"Configuration",value,
"random_seed" => seed=Some(value.as_f64().expect("bad value for random_seed") as usize),
"warmup" => warmup=Some(value.as_f64().expect("bad value for warmup") as usize),
"measured" => measured=Some(value.as_f64().expect("bad value for measured") as usize),
"topology" => topology=Some(value),
"traffic" => traffic=Some(value),
"maximum_packet_size" => maximum_packet_size=Some(value.as_f64().expect("bad value for maximum_packet_size") as usize),
"server_queue_size" => server_queue_size=Some(value.as_f64().expect("bad value for server_queue_size") as usize),
"router" => router_cfg=Some(value),
"routing" => routing=Some(new_routing(RoutingBuilderArgument{cv:value,plugs})),
"link_classes" => link_classes = Some(value.as_array().expect("bad value for link_classes").iter()
.map(LinkClass::new).collect()),
"statistics_temporal_step" => statistics_temporal_step=value.as_f64().expect("bad value for statistics_temporal_step") as usize,
"launch_configurations" => launch_configurations = value.as_array().expect("bad value for launch_configurations").clone(),
"statistics_server_percentiles" => statistics_server_percentiles = value
.as_array().expect("bad value for statistics_server_percentiles").iter()
.map(|v|v.as_f64().expect("bad value in statistics_server_percentiles").round() as u8).collect(),
"statistics_packet_percentiles" => statistics_packet_percentiles = value
.as_array().expect("bad value for statistics_packet_percentiles").iter()
.map(|v|v.as_f64().expect("bad value in statistics_packet_percentiles").round() as u8).collect(),
"statistics_packet_definitions" => match value
{
&ConfigurationValue::Array(ref l) => statistics_packet_definitions=l.iter().map(|definition|match definition {
&ConfigurationValue::Array(ref dl) => {
if dl.len()!=2
{
panic!("Each definition of statistics_packet_definitions must be composed of [keys,values]");
}
let keys = match dl[0]
{
ConfigurationValue::Array(ref lx) => lx.iter().map(|x|match x{
ConfigurationValue::Expression(expr) => expr.clone(),
_ => panic!("bad value for statistics_packet_definitions"),
}).collect(),
_ => panic!("bad value for statistics_packet_definitions"),
};
let values = match dl[1]
{
ConfigurationValue::Array(ref lx) => lx.iter().map(|x|match x{
ConfigurationValue::Expression(expr) => expr.clone(),
_ => panic!("bad value for statistics_packet_definitions"),
}).collect(),
_ => panic!("bad value for statistics_packet_definitions"),
};
(keys,values)
},
_ => panic!("bad value for statistics_packet_definitions"),
}).collect(),
_ => panic!("bad value for statistics_packet_definitions"),
}
);
let seed=seed.expect("There were no random_seed");
let warmup=warmup.expect("There were no warmup");
let measured=measured.expect("There were no measured");
let topology=topology.expect("There were no topology");
let traffic=traffic.expect("There were no traffic");
let maximum_packet_size=maximum_packet_size.expect("There were no maximum_packet_size");
let server_queue_size = server_queue_size.unwrap_or(20);
assert!(server_queue_size>0, "we need space in the servers to store generated messages.");
let router_cfg=router_cfg.expect("There were no router");
let mut routing=routing.expect("There were no routing");
let link_classes:Vec<LinkClass>=link_classes.expect("There were no link_classes");
let rng=RefCell::new(StdRng::seed_from_u64(seed as u64));
let topology=new_topology(TopologyBuilderArgument{
cv:topology,
plugs,
rng:&rng,
});
topology.check_adjacency_consistency(Some(link_classes.len()));
routing.initialize(topology.as_ref(),&rng);
let num_routers=topology.num_routers();
let num_servers=topology.num_servers();
let routers: Vec<Rc<RefCell<dyn Router>>>=(0..num_routers).map(|index|new_router(router::RouterBuilderArgument{
router_index:index,
cv:router_cfg,
plugs,
topology:topology.as_ref(),
maximum_packet_size,
statistics_temporal_step,
rng:&rng,
})).collect();
let servers=(0..num_servers).map(|index|{
let port=topology.server_neighbour(index);
let router_status=match port.0
{
Location::RouterPort{
router_index,
router_port
} => {
let router=routers[router_index].borrow();
router.build_emissor_status(router_port,&*topology)
}
_ => panic!("Server is not connected to router"),
};
Server{
index,
port,
router_status,
stored_messages:VecDeque::new(),
stored_packets:VecDeque::new(),
stored_phits:VecDeque::new(),
outcoming_virtual_channel: None,
consumed_phits: BTreeMap::new(),
statistics: ServerStatistics::new(statistics_temporal_step),
}
}).collect();
let traffic=new_traffic(TrafficBuilderArgument{
cv:traffic,
plugs,
topology:topology.as_ref(),
rng:&rng,
});
let statistics=Statistics::new(statistics_temporal_step,statistics_server_percentiles,statistics_packet_percentiles,statistics_packet_definitions,topology.as_ref());
Simulation{
configuration: cv.clone(),
seed,
rng,
warmup,
measured,
network: Network{
topology,
routers,
servers,
},
traffic,
maximum_packet_size,
routing,
link_classes,
server_queue_size,
event_queue: EventQueue::new(1000),
cycle:0,
statistics,
launch_configurations,
plugs,
}
}
fn run(&mut self)
{
self.print_memory_breakdown();
self.statistics.print_header();
while self.cycle < self.warmup+self.measured
{
self.advance();
if self.cycle==self.warmup
{
self.statistics.reset(self.cycle,&mut self.network);
self.routing.reset_statistics(self.cycle);
}
if self.traffic.is_finished()
{
println!("Traffic consumed before cycle {}",self.cycle);
break;
}
}
}
fn advance(&mut self)
{
let mut ievent=0;
loop
{
let event=if let Some(event) = self.event_queue.access_begin(ievent)
{
event.clone()
}
else
{
break;
};
match event
{
Event::PhitToLocation{
ref phit,
ref previous,
ref new,
} =>
{
match new
{
&Location::RouterPort{router_index:router,router_port:port} =>
{
self.statistics.link_statistics[router][port].phit_arrivals+=1;
if phit.is_begin() && !self.statistics.packet_defined_statistics_definitions.is_empty()
{
let mut be = phit.packet.extra.borrow_mut();
if be.is_none()
{
*be=Some(PacketExtraInfo::default());
}
let extra = be.as_mut().unwrap();
let (_,link_class) = self.network.topology.neighbour(router,port);
extra.link_classes.push(link_class);
extra.entry_virtual_channels.push(*phit.virtual_channel.borrow());
extra.cycle_per_hop.push(self.cycle);
}
let mut brouter=self.network.routers[router].borrow_mut();
brouter.insert(phit.clone(),port,&self.rng);
if brouter.pending_events()==0
{
brouter.add_pending_event();
self.event_queue.enqueue_end(Event::Generic(brouter.as_eventful().upgrade().expect("missing router")),0);
}
match previous
{
&Location::ServerPort(_server_index) => if phit.is_begin()
{
*phit.packet.cycle_into_network.borrow_mut() = self.cycle;
self.routing.initialize_routing_info(&phit.packet.routing_info, self.network.topology.as_ref(), router, phit.packet.message.destination,&self.rng);
},
&Location::RouterPort{..} =>
{
self.statistics.track_phit_hop(phit,self.cycle);
if phit.is_begin()
{
phit.packet.routing_info.borrow_mut().hops+=1;
self.routing.update_routing_info(&phit.packet.routing_info, self.network.topology.as_ref(), router, port, phit.packet.message.destination,&self.rng);
}
},
_ => (),
};
},
&Location::ServerPort(server) =>
{
if server!=phit.packet.message.destination
{
panic!("Packet reached wrong server, {} instead of {}!\n",server,phit.packet.message.destination);
}
self.network.servers[server].consume(phit.clone(),self.traffic.deref_mut(),&mut self.statistics,self.cycle,self.network.topology.as_ref(),&self.rng);
}
&Location::None => panic!("Phit went nowhere previous={:?}",previous),
};
},
Event::Acknowledge{
location,
message: ack_message,
} => match location
{
Location::RouterPort{
router_index,
router_port,
} =>
{
let mut brouter=self.network.routers[router_index].borrow_mut();
brouter.acknowledge(router_port,ack_message);
if brouter.pending_events()==0
{
brouter.add_pending_event();
self.event_queue.enqueue_end(Event::Generic(brouter.as_eventful().upgrade().expect("missing router")),0);
}
},
Location::ServerPort(server) => self.network.servers[server].router_status.acknowledge(ack_message),
_ => (),
},
Event::Generic(ref _element) => unimplemented!(),
};
ievent+=1;
}
ievent=0;
loop
{
let event=if let Some(event) = self.event_queue.access_end(ievent)
{
event.clone()
}
else
{
break;
};
match event
{
Event::PhitToLocation{
..
} => panic!("Phits should not arrive at the end of a cycle"),
Event::Acknowledge{
..
} => panic!("Phit Acknowledgements should not arrive at the end of a cycle"),
Event::Generic(ref element) =>
{
let new_events=element.borrow_mut().process(self);
for ge in new_events.into_iter()
{
self.event_queue.enqueue(ge);
}
},
};
ievent+=1;
}
let num_servers=self.network.servers.len();
for (iserver,server) in self.network.servers.iter_mut().enumerate()
{
if let (Location::RouterPort{router_index: index,router_port: port},link_class)=server.port
{
if self.traffic.should_generate(iserver,self.cycle,&self.rng)
{
if server.stored_messages.len()<self.server_queue_size {
match self.traffic.generate_message(iserver,self.cycle,self.network.topology.as_ref(),&self.rng)
{
Ok(message) =>
{
if message.destination>=num_servers
{
panic!("Message sent to outside the network unexpectedly. destination={destination}",destination=message.destination);
}
if message.destination==iserver
{
panic!("Generated message to self unexpectedly.");
}
server.stored_messages.push_back(message);
},
Err(TrafficError::OriginOutsideTraffic) => (),
Err(TrafficError::SelfMessage) => (),
};
} else {
server.statistics.track_missed_generation(self.cycle);
}
}
if server.stored_packets.is_empty() && !server.stored_messages.is_empty()
{
let message=server.stored_messages.pop_front().expect("There are not messages in queue");
let mut size=message.size;
while size>0
{
let ps=if size>self.maximum_packet_size
{
self.maximum_packet_size
}
else
{
size
};
server.stored_packets.push_back(Packet{
size:ps,
routing_info: RefCell::new(RoutingInfo::new()),
message:message.clone(),
index:0,
cycle_into_network:RefCell::new(0),
extra: RefCell::new(None),
}.into_ref());
size-=ps;
}
}
if server.stored_phits.is_empty() && !server.stored_packets.is_empty()
{
let packet=server.stored_packets.pop_front().expect("There are not packets in queue");
for index in 0..packet.size
{
server.stored_phits.push_back(Rc::new(Phit{
packet:packet.clone(),
index,
virtual_channel: RefCell::new(None),
}));
}
}
if !server.stored_phits.is_empty()
{
let phit=server.stored_phits.front().expect("There are not phits");
if let None = server.outcoming_virtual_channel
{
assert!(phit.is_begin(),"Not VC assigned for server--router while transmitting a middle phit.");
let status = &server.router_status;
for vc in 0..status.num_virtual_channels()
{
if status.can_transmit(phit,vc)
{
server.outcoming_virtual_channel = Some(vc);
break;
}
}
}
if let Some(vc) = server.outcoming_virtual_channel
{
if server.router_status.can_transmit(phit,vc)
{
let phit=server.stored_phits.pop_front().expect("There are not phits");
*phit.virtual_channel.borrow_mut() = Some(vc);
if phit.is_end()
{
server.outcoming_virtual_channel = None;
}
let event=Event::PhitToLocation{
phit,
previous: Location::ServerPort(iserver),
new: Location::RouterPort{router_index:index,router_port:port},
};
self.statistics.track_created_phit(self.cycle);
server.statistics.track_created_phit(self.cycle);
self.event_queue.enqueue_begin(event,self.link_classes[link_class].delay);
server.router_status.notify_outcoming_phit(0,self.cycle);
}
}
}
}
else
{
panic!("Where goes this port?");
}
}
self.event_queue.advance();
self.cycle+=1;
if self.cycle%1000==0
{
self.statistics.print(self.cycle,&self.network);
}
}
fn write_result(&self,output:&mut dyn Write)
{
let measurement = &self.statistics.current_measurement;
let cycles=self.cycle-measurement.begin_cycle;
let num_servers=self.network.servers.len();
let injected_load=measurement.created_phits as f64/cycles as f64/num_servers as f64;
let accepted_load=measurement.consumed_phits as f64/cycles as f64/num_servers as f64;
let average_message_delay=measurement.total_message_delay as f64/measurement.consumed_messages as f64;
let average_packet_network_delay=measurement.total_packet_network_delay as f64/measurement.consumed_packets as f64;
let jscp=self.network.jain_server_consumed_phits();
let jsgp=self.network.jain_server_created_phits();
let average_packet_hops=measurement.total_packet_hops as f64 / measurement.consumed_packets as f64;
let total_packet_per_hop_count=measurement.total_packet_per_hop_count.iter().map(|&count|ConfigurationValue::Number(count as f64)).collect();
let total_arrivals:usize = (0..self.network.topology.num_routers()).map(|i|(0..self.network.topology.degree(i)).map(|j|self.statistics.link_statistics[i][j].phit_arrivals).sum::<usize>()).sum();
let total_links: usize = (0..self.network.topology.num_routers()).map(|i|self.network.topology.degree(i)).sum();
let average_link_utilization = total_arrivals as f64 / cycles as f64 / total_links as f64;
let maximum_arrivals:usize = self.statistics.link_statistics.iter().map(|rls|rls.iter().map(|ls|ls.phit_arrivals).max().unwrap()).max().unwrap();
let maximum_link_utilization = maximum_arrivals as f64 / cycles as f64;
let server_average_cycle_last_created_phit : f64 = (self.network.servers.iter().map(|s|s.statistics.cycle_last_created_phit).sum::<usize>() as f64)/(self.network.servers.len() as f64);
let server_average_cycle_last_consumed_message : f64 = (self.network.servers.iter().map(|s|s.statistics.cycle_last_consumed_message).sum::<usize>() as f64)/(self.network.servers.len() as f64);
let server_average_missed_generations : f64 = (self.network.servers.iter().map(|s|s.statistics.current_measurement.missed_generations).sum::<usize>() as f64)/(self.network.servers.len() as f64);
let servers_with_missed_generations : usize = self.network.servers.iter().map(|s|if s.statistics.current_measurement.missed_generations > 0 {1} else {0}).sum::<usize>();
let virtual_channel_usage: Vec<_> =measurement.virtual_channel_usage.iter().map(|&count|
ConfigurationValue::Number(count as f64 / cycles as f64 / total_links as f64)
).collect();
let git_id=get_git_id();
let version_number = get_version_number();
let mut result_content = vec![
(String::from("cycle"),ConfigurationValue::Number(self.cycle as f64)),
(String::from("injected_load"),ConfigurationValue::Number(injected_load)),
(String::from("accepted_load"),ConfigurationValue::Number(accepted_load)),
(String::from("average_message_delay"),ConfigurationValue::Number(average_message_delay)),
(String::from("average_packet_network_delay"),ConfigurationValue::Number(average_packet_network_delay)),
(String::from("server_generation_jain_index"),ConfigurationValue::Number(jsgp)),
(String::from("server_consumption_jain_index"),ConfigurationValue::Number(jscp)),
(String::from("average_packet_hops"),ConfigurationValue::Number(average_packet_hops)),
(String::from("total_packet_per_hop_count"),ConfigurationValue::Array(total_packet_per_hop_count)),
(String::from("average_link_utilization"),ConfigurationValue::Number(average_link_utilization)),
(String::from("maximum_link_utilization"),ConfigurationValue::Number(maximum_link_utilization)),
(String::from("server_average_cycle_last_created_phit"),ConfigurationValue::Number(server_average_cycle_last_created_phit)),
(String::from("server_average_cycle_last_consumed_message"),ConfigurationValue::Number(server_average_cycle_last_consumed_message)),
(String::from("server_average_missed_generations"),ConfigurationValue::Number(server_average_missed_generations)),
(String::from("servers_with_missed_generations"),ConfigurationValue::Number(servers_with_missed_generations as f64)),
(String::from("virtual_channel_usage"),ConfigurationValue::Array(virtual_channel_usage)),
(String::from("git_id"),ConfigurationValue::Literal(git_id.to_string())),
(String::from("version_number"),ConfigurationValue::Literal(version_number.to_string())),
];
if let Some(content)=self.routing.statistics(self.cycle)
{
result_content.push((String::from("routing_statistics"),content));
}
if let Some(content) = self.network.routers.iter().enumerate().fold(None,|maybe_stat,(index,router)|router.borrow().aggregate_statistics(maybe_stat,index,self.network.routers.len(),self.cycle))
{
result_content.push((String::from("router_aggregated_statistics"),content));
}
if let Ok(linux_process) = procfs::process::Process::myself()
{
let status = linux_process.status().expect("failed to get status of the self process");
if let Some(peak_memory)=status.vmhwm
{
result_content.push((String::from("linux_high_water_mark"),ConfigurationValue::Number(peak_memory as f64)));
}
let stat = linux_process.stat().expect("failed to get stat of the self process");
let tps = procfs::ticks_per_second().expect("could not get the number of ticks per second.") as f64;
result_content.push((String::from("user_time"),ConfigurationValue::Number(stat.utime as f64/tps)));
result_content.push((String::from("system_time"),ConfigurationValue::Number(stat.stime as f64/tps)));
}
if self.statistics.temporal_step > 0
{
let step = self.statistics.temporal_step;
let samples = self.statistics.temporal_statistics.len();
let mut injected_load_collect = Vec::with_capacity(samples);
let mut accepted_load_collect = Vec::with_capacity(samples);
let mut average_message_delay_collect = Vec::with_capacity(samples);
let mut average_packet_network_delay_collect = Vec::with_capacity(samples);
let mut average_packet_hops_collect = Vec::with_capacity(samples);
let mut virtual_channel_usage_collect = Vec::with_capacity(samples);
for measurement in self.statistics.temporal_statistics.iter()
{
let injected_load=measurement.created_phits as f64/step as f64/num_servers as f64;
injected_load_collect.push(ConfigurationValue::Number(injected_load));
let accepted_load=measurement.consumed_phits as f64/step as f64/num_servers as f64;
accepted_load_collect.push(ConfigurationValue::Number(accepted_load));
let average_message_delay=measurement.total_message_delay as f64/measurement.consumed_messages as f64;
average_message_delay_collect.push(ConfigurationValue::Number(average_message_delay));
let average_packet_network_delay=measurement.total_message_delay as f64/measurement.consumed_messages as f64;
average_packet_network_delay_collect.push(ConfigurationValue::Number(average_packet_network_delay));
let average_packet_hops=measurement.total_packet_hops as f64 / measurement.consumed_packets as f64;
average_packet_hops_collect.push(ConfigurationValue::Number(average_packet_hops));
let virtual_channel_usage: Vec<_> =measurement.virtual_channel_usage.iter().map(|&count|
ConfigurationValue::Number(count as f64 / step as f64 / total_links as f64)
).collect();
virtual_channel_usage_collect.push(ConfigurationValue::Array(virtual_channel_usage));
};
let jscp_collect = self.network.temporal_jain_server_consumed_phits()
.into_iter()
.map(|x|ConfigurationValue::Number(x))
.collect();
let jsgp_collect = self.network.temporal_jain_server_created_phits()
.into_iter()
.map(|x|ConfigurationValue::Number(x))
.collect();
let temporal_content = vec![
(String::from("injected_load"),ConfigurationValue::Array(injected_load_collect)),
(String::from("accepted_load"),ConfigurationValue::Array(accepted_load_collect)),
(String::from("average_message_delay"),ConfigurationValue::Array(average_message_delay_collect)),
(String::from("average_packet_network_delay"),ConfigurationValue::Array(average_packet_network_delay_collect)),
(String::from("server_generation_jain_index"),ConfigurationValue::Array(jsgp_collect)),
(String::from("server_consumption_jain_index"),ConfigurationValue::Array(jscp_collect)),
(String::from("average_packet_hops"),ConfigurationValue::Array(average_packet_hops_collect)),
(String::from("virtual_channel_usage"),ConfigurationValue::Array(virtual_channel_usage_collect)),
];
result_content.push((String::from("temporal_statistics"),ConfigurationValue::Object(String::from("TemporalStatistics"),temporal_content)));
}
if !self.statistics.server_percentiles.is_empty()
{
let mut servers_injected_load : Vec<f64> = self.network.servers.iter().map(|s|s.statistics.current_measurement.created_phits as f64/cycles as f64).collect();
let mut servers_accepted_load : Vec<f64> = self.network.servers.iter().map(|s|s.statistics.current_measurement.consumed_phits as f64/cycles as f64).collect();
let mut servers_average_message_delay : Vec<f64> = self.network.servers.iter().map(|s|s.statistics.current_measurement.total_message_delay as f64/s.statistics.current_measurement.consumed_messages as f64).collect();
let mut servers_cycle_last_created_phit : Vec<usize> = self.network.servers.iter().map(|s|s.statistics.cycle_last_created_phit).collect();
let mut servers_cycle_last_consumed_message : Vec<usize> = self.network.servers.iter().map(|s|s.statistics.cycle_last_consumed_message).collect();
let mut servers_missed_generations : Vec<usize> = self.network.servers.iter().map(|s|s.statistics.current_measurement.missed_generations).collect();
servers_injected_load.sort_by(|a,b|a.partial_cmp(b).unwrap_or(Ordering::Less));
servers_accepted_load.sort_by(|a,b|a.partial_cmp(b).unwrap_or(Ordering::Less));
servers_average_message_delay.sort_by(|a,b|a.partial_cmp(b).unwrap_or(Ordering::Less));
servers_cycle_last_created_phit.sort_unstable();
servers_cycle_last_consumed_message.sort_unstable();
servers_missed_generations.sort_unstable();
for &percentile in self.statistics.server_percentiles.iter()
{
let mut index:usize = num_servers * usize::from(percentile) /100;
if index >= num_servers
{
index = num_servers -1;
}
let server_content = vec![
(String::from("injected_load"),ConfigurationValue::Number(servers_injected_load[index])),
(String::from("accepted_load"),ConfigurationValue::Number(servers_accepted_load[index])),
(String::from("average_message_delay"),ConfigurationValue::Number(servers_average_message_delay[index])),
(String::from("cycle_last_created_phit"),ConfigurationValue::Number(servers_cycle_last_created_phit[index] as f64)),
(String::from("cycle_last_consumed_message"),ConfigurationValue::Number(servers_cycle_last_consumed_message[index] as f64)),
(String::from("missed_generations"),ConfigurationValue::Number(servers_missed_generations[index] as f64)),
];
result_content.push((format!("server_percentile{}",percentile),ConfigurationValue::Object(String::from("ServerStatistics"),server_content)));
}
}
if !self.statistics.packet_percentiles.is_empty()
{
let mut packets_delay : Vec<usize> = self.statistics.packet_statistics.iter().map(|ps|ps.delay).collect();
let mut packets_hops : Vec<usize> = self.statistics.packet_statistics.iter().map(|ps|ps.hops).collect();
let mut packets_consumed_cycle: Vec<usize> = self.statistics.packet_statistics.iter().map(|ps|ps.consumed_cycle).collect();
packets_delay.sort_unstable();
packets_hops.sort_unstable();
packets_consumed_cycle.sort_unstable();
let num_packets = packets_delay.len();
for &percentile in self.statistics.packet_percentiles.iter()
{
let mut index:usize = num_packets * usize::from(percentile) /100;
if index >= num_packets
{
index = num_packets -1;
}
let packet_content = vec![
(String::from("delay"),ConfigurationValue::Number(packets_delay[index] as f64)),
(String::from("hops"),ConfigurationValue::Number(packets_hops[index] as f64)),
(String::from("consumed_cycle"),ConfigurationValue::Number(packets_consumed_cycle[index] as f64)),
];
result_content.push((format!("packet_percentile{}",percentile),ConfigurationValue::Object(String::from("PacketStatistics"),packet_content)));
}
}
if !self.statistics.packet_defined_statistics_measurement.is_empty()
{
let mut pds_content=vec![];
for definition_measurement in self.statistics.packet_defined_statistics_measurement.iter()
{
let mut dm_list = vec![];
for (key,val,count) in definition_measurement
{
let fcount = *count as f32;
let averages = ConfigurationValue::Array( val.iter().map(|v|ConfigurationValue::Number(f64::from(v/fcount))).collect() );
let dm_content: Vec<(String,ConfigurationValue)> = vec![
(String::from("key"),ConfigurationValue::Array(key.to_vec())),
(String::from("average"),averages),
(String::from("count"),ConfigurationValue::Number(*count as f64)),
];
dm_list.push( ConfigurationValue::Object(String::from("PacketBin"),dm_content) );
}
pds_content.push(ConfigurationValue::Array(dm_list));
}
result_content.push( (String::from("packet_defined_statistics"),ConfigurationValue::Array(pds_content)) );
}
let result=ConfigurationValue::Object(String::from("Result"),result_content);
writeln!(output,"{}",result).unwrap();
}
}
impl<'a> Quantifiable for Simulation<'a>
{
fn total_memory(&self) -> usize
{
unimplemented!();
}
fn print_memory_breakdown(&self)
{
println!("\nBegin memory report");
println!("self : {}",size_of::<Self>());
println!("phit : {}",size_of::<Phit>());
println!("packet : {}",size_of::<Packet>());
println!("message : {}",size_of::<Message>());
println!("server : {}",size_of::<Server>());
println!("event : {}",size_of::<Event>());
println!("network total : {}",quantify::human_bytes(self.network.total_memory()));
println!("traffic total : {}",quantify::human_bytes(self.traffic.total_memory()));
println!("event_queue total : {}",quantify::human_bytes(self.event_queue.total_memory()));
println!("End memory report\n");
}
fn forecast_total_memory(&self) -> usize
{
unimplemented!();
}
}
#[derive(Default)]
pub struct Plugs
{
routers: BTreeMap<String, fn(RouterBuilderArgument) -> Rc<RefCell<dyn Router>> >,
topologies: BTreeMap<String, fn(TopologyBuilderArgument) -> Box<dyn Topology> >,
stages: BTreeMap<String, fn(StageBuilderArgument) -> Box<dyn Stage> >,
routings: BTreeMap<String,fn(RoutingBuilderArgument) -> Box<dyn Routing>>,
traffics: BTreeMap<String,fn(TrafficBuilderArgument) -> Box<dyn Traffic> >,
patterns: BTreeMap<String, fn(PatternBuilderArgument) -> Box<dyn Pattern> >,
policies: BTreeMap<String, fn(VCPolicyBuilderArgument) -> Box<dyn VirtualChannelPolicy> >,
allocators: BTreeMap<String, fn(AllocatorBuilderArgument) -> Box<dyn Allocator> >,
}
impl Plugs
{
pub fn add_router(&mut self, key:String, builder:fn(RouterBuilderArgument) -> Rc<RefCell<dyn Router>>)
{
self.routers.insert(key,builder);
}
pub fn add_topology(&mut self, key:String, builder:fn(TopologyBuilderArgument) -> Box<dyn Topology>)
{
self.topologies.insert(key,builder);
}
pub fn add_stage(&mut self, key:String, builder:fn(StageBuilderArgument) -> Box<dyn Stage>)
{
self.stages.insert(key,builder);
}
pub fn add_traffic(&mut self, key:String, builder:fn(TrafficBuilderArgument) -> Box<dyn Traffic>)
{
self.traffics.insert(key,builder);
}
pub fn add_routing(&mut self, key:String, builder:fn(RoutingBuilderArgument) -> Box<dyn Routing>)
{
self.routings.insert(key,builder);
}
pub fn add_policy(&mut self, key:String, builder: fn(VCPolicyBuilderArgument) -> Box<dyn VirtualChannelPolicy>)
{
self.policies.insert(key,builder);
}
pub fn add_pattern(&mut self, key:String, builder: fn(PatternBuilderArgument) -> Box<dyn Pattern>)
{
self.patterns.insert(key,builder);
}
}
impl Debug for Plugs
{
fn fmt(&self,f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error>
{
write!(f,"{};",self.routers.keys().map(|s|s.to_string()).collect::<Vec<String>>().join(","))?;
write!(f,"{};",self.topologies.keys().map(|s|s.to_string()).collect::<Vec<String>>().join(","))?;
write!(f,"{};",self.stages.keys().map(|s|s.to_string()).collect::<Vec<String>>().join(","))?;
write!(f,"{};",self.routings.keys().map(|s|s.to_string()).collect::<Vec<String>>().join(","))?;
write!(f,"{};",self.traffics.keys().map(|s|s.to_string()).collect::<Vec<String>>().join(","))?;
write!(f,"{};",self.patterns.keys().map(|s|s.to_string()).collect::<Vec<String>>().join(","))?;
write!(f,"{};",self.policies.keys().map(|s|s.to_string()).collect::<Vec<String>>().join(","))?;
Ok(())
}
}
pub fn file_main(file:&mut File, plugs:&Plugs, mut results_file:Option<File>,free_args:&[String]) -> Result<(),Error>
{
let mut contents = String::new();
file.read_to_string(&mut contents).expect("something went wrong reading the file");
let mut rewrites: Vec< (Expr,ConfigurationValue) > = vec![];
for arg in free_args
{
if let Some( (left,right) ) = arg.split_once('=')
{
let left_expr = match config_parser::parse_expression(left).expect("error parsing a free argument")
{
config_parser::Token::Expression(expr) => expr,
x => panic!("the left of free argument is not an expression ({:?}), which it should be.",x),
};
let right_value = match config_parser::parse(right).expect("error parsing a free argument")
{
config_parser::Token::Value(value) => value,
x => panic!("the right of free argument is not a value ({:?}), which it should be.",x),
};
rewrites.push( (left_expr,right_value) );
} else {
println!("WARNING: ignoring argument {}",arg);
}
}
match config_parser::parse(&contents)
{
Err(x) => println!("error parsing configuration file: {:?}",x),
Ok(mut x) =>
{
println!("parsed correctly: {:?}",x);
match x
{
config_parser::Token::Value(ref mut value) =>
{
for (path_expr,new_value) in rewrites
{
config::rewrite_pair_value(value,&path_expr,new_value);
}
let flat=flatten_configuration_value(value);
if let ConfigurationValue::Experiments(ref experiments)=flat
{
for (i,experiment) in experiments.iter().enumerate()
{
println!("experiment {} of {} is {:?}",i,experiments.len(),experiment);
let mut simulation=Simulation::new(experiment,plugs);
simulation.run();
match results_file
{
Some(ref mut f) => simulation.write_result(f),
None => simulation.write_result(&mut stdout()),
};
}
}
else
{
panic!("there are not experiments");
}
},
_ => panic!("Not a value"),
};
},
};
Ok(())
}
pub fn directory_main(path:&Path, binary:&str, plugs:&Plugs, action:Action, options: ExperimentOptions) -> Result<(),Error>
{
if !path.exists()
{
println!("Folder {:?} does not exists; creating it.",path);
fs::create_dir(&path).expect("Something went wrong when creating the main path.");
}
let binary_path=Path::new(binary);
let mut experiment=Experiment::new(binary_path,path,plugs,options);
experiment.execute_action(action).map_err(|error|error.with_message(format!("Execution of the action {action} failed.")))
}
pub fn get_git_id() -> &'static str
{
include_str!(concat!(env!("OUT_DIR"), "/generated_git_id"))
}
pub fn get_version_number() -> &'static str
{
match option_env!("CARGO_PKG_VERSION")
{
Some( version ) => version,
_ => "?",
}
}
pub fn terminal_default_options() -> getopts::Options
{
let mut opts = getopts::Options::new();
opts.optopt("a","action","selected action to execute (for directory experiment)","METHOD");
opts.optopt("r","results","file in which to write the simulation results (for file experiment)","FILE");
opts.optopt("s","start_index","experiment index in which to start processing","INDEX");
opts.optopt("e","end_index","experiment index in which to end processing","INDEX");
opts.optopt("x","special","some special execution","SPECIAL_VALUE");
opts.optopt("","special_args","arguments for special execution","SPECIAL_VALUE");
opts.optopt("f","source","copy matching results from another path experiment","PATH");
opts.optopt("w","where","select the subset of indices for which the configuration expression evaluates to true","EXPRESION");
opts.optopt("m","message","write a message into the journal file","TEXT");
opts.optopt("i","interactive","whether to ask for confirmation","BOOLEAN");
opts.optopt("","use_csv","Use a CSV file as a source for the generations of outputs.","FILE");
opts.optflag("h","help","show this help");
opts.optflag("","foreign","Assume to be working with foreign data. Many checks are relaxed.");
opts
}
pub fn terminal_main_normal_opts(args:&[String], plugs:&Plugs, option_matches:getopts::Matches) -> Result<(),Error>
{
let action=if option_matches.opt_present("action")
{
use std::str::FromStr;
Action::from_str(&option_matches.opt_str("action").unwrap())?
}
else
{
Action::LocalAndOutput
};
let path=Path::new(&option_matches.free[0]);
if path.is_dir() || (!path.exists() && match action {Action::Shell=>true,_=>false} )
{
if option_matches.free.len()>1
{
println!("WARNING: there are {} excess free arguments. This first fre argument is the path the rest is ignored.",option_matches.free.len());
println!("non-ignored arg {} is {}",0,option_matches.free[0]);
for (i,free_arg) in option_matches.free.iter().enumerate().skip(1)
{
println!("ignored arg {} is {}",i,free_arg);
}
}
let mut options= ExperimentOptions::default();
if option_matches.opt_present("source")
{
options.external_source = Some(Path::new(&option_matches.opt_str("source").unwrap()).to_path_buf());
}
if option_matches.opt_present("start_index")
{
options.start_index = Some(option_matches.opt_str("start_index").unwrap().parse::<usize>().expect("non-usize received from --start_index"));
}
if option_matches.opt_present("end_index")
{
options.end_index = Some(option_matches.opt_str("end_index").unwrap().parse::<usize>().expect("non-usize received from --end_index"));
}
if option_matches.opt_present("where")
{
let expr = match config_parser::parse_expression(&option_matches.opt_str("where").unwrap()).expect("error parsing the where clause")
{
config_parser::Token::Expression(expr) => expr,
x =>
{
eprintln!("The where clause is not an expression ({:?}), which it should be.",x);
std::process::exit(-1);
}
};
options.where_clause = Some(expr);
}
if option_matches.opt_present("message")
{
options.message = Some(option_matches.opt_str("message").unwrap());
}
if option_matches.opt_present("interactive")
{
let s = option_matches.opt_str("interactive").unwrap();
options.interactive = match s.as_ref()
{
"" | "true" | "yes" | "y" => Some(true),
"false" | "no" | "n" => Some(false),
"none" => None,
_ =>
{
eprintln!("--interactive={s} is not a valid option.");
std::process::exit(-1);
}
};
}
if option_matches.opt_present("foreign")
{
options.foreign=true;
}
if option_matches.opt_present("use_csv")
{
options.use_csv = Some(Path::new(&option_matches.opt_str("use_csv").unwrap()).to_path_buf());
}
return directory_main(&path,&args[0],&plugs,action,options);
}
else
{
let mut f = File::open(&path).map_err(|err|error!(could_not_open_file,path.to_path_buf(),err).with_message("could not open configuration file.".to_string()))?;
let results_file= if option_matches.opt_present("results")
{
Some(File::create(option_matches.opt_str("results").unwrap()).expect("Could not create results file"))
}
else
{
None
};
let free_args=&option_matches.free[1..];
return file_main(&mut f,&plugs,results_file,free_args);
}
}
pub fn special_export(args: &str, plugs:&Plugs)
{
let topology_cfg = match config_parser::parse(args)
{
Ok(x) => match x
{
config_parser::Token::Value(value) => value,
_ => panic!("Not a value"),
},
Err(x) => panic!("Error parsing topology to export ({:?})",x),
};
let mut topology = None;
let mut seed = None;
let mut format = None;
let mut filename = None;
if let ConfigurationValue::Object(ref cv_name, ref cv_pairs)=topology_cfg
{
if cv_name!="Export"
{
panic!("A Export must be created from a `Export` object not `{}`",cv_name);
}
for &(ref name,ref value) in cv_pairs
{
match AsRef::<str>::as_ref(&name)
{
"topology" =>
{
topology=Some(value);
},
"seed" => match value
{
&ConfigurationValue::Number(f) => seed=Some(f as usize),
_ => panic!("bad value for seed"),
},
"format" => match value
{
&ConfigurationValue::Number(f) => format=Some(f as usize),
_ => panic!("bad value for format"),
},
"filename" => match value
{
&ConfigurationValue::Literal(ref s) => filename=Some(s.to_string()),
_ => panic!("bad value for filename"),
},
_ => panic!("Nothing to do with field {} in Export",name),
}
}
}
else
{
panic!("Trying to create a Export from a non-Object");
}
let seed=seed.unwrap_or(42);
let topology_cfg=topology.expect("There were no topology.");
let format=format.unwrap_or(0);
let filename=filename.expect("There were no filename.");
let rng=RefCell::new(StdRng::from_seed({
let mut std_rng_seed = [0u8;32];
for (index,value) in seed.to_ne_bytes().iter().enumerate()
{
std_rng_seed[index]=*value;
}
std_rng_seed
}));
let topology = topology::new_topology(topology::TopologyBuilderArgument{cv:&topology_cfg,plugs,rng:&rng});
let mut topology_file=File::create(&filename).expect("Could not create topology file");
topology.write_adjacencies_to_file(&mut topology_file,format).expect("Failed writing topology to file");
}
#[cfg(test)]
mod tests {
#[test]
fn it_works() {
assert_eq!(2 + 2, 4);
}
}