use std::boxed::Box;
use std::cell::{RefCell};
use std::rc::Rc;
use std::collections::{BTreeSet,BTreeMap,VecDeque};
use std::fmt::Debug;
use ::rand::{Rng,rngs::StdRng};
use crate::match_object_panic;
use crate::config_parser::ConfigurationValue;
use crate::{Message,Plugs};
use crate::pattern::{Pattern,new_pattern,PatternBuilderArgument};
use crate::topology::Topology;
use quantifiable_derive::Quantifiable;use crate::quantify::Quantifiable;
#[derive(Debug)]
pub enum TrafficError
{
OriginOutsideTraffic,
SelfMessage,
}
#[derive(Debug)]
pub enum ServerTrafficState
{
Generating,
WaitingData,
WaitingCycle{cycle:usize},
UnspecifiedWait,
FinishedGenerating,
Finished,
}
pub trait Traffic : Quantifiable + Debug
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>;
fn probability_per_cycle(&self, server:usize) -> f32;
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool;
fn is_finished(&self) -> bool;
fn should_generate(&self, server:usize, _cycle:usize, rng: &RefCell<StdRng>) -> bool
{
let p=self.probability_per_cycle(server);
let r=rng.borrow_mut().gen_range(0f32..1f32);
r<p
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState;
}
#[derive(Debug)]
pub struct TrafficBuilderArgument<'a>
{
pub cv: &'a ConfigurationValue,
pub plugs: &'a Plugs,
pub topology: &'a dyn Topology,
pub rng: &'a RefCell<StdRng>,
}
pub fn new_traffic(arg:TrafficBuilderArgument) -> Box<dyn Traffic>
{
if let &ConfigurationValue::Object(ref cv_name, ref _cv_pairs)=arg.cv
{
if let Some(builder) = arg.plugs.traffics.get(cv_name)
{
return builder(arg);
}
match cv_name.as_ref()
{
"HomogeneousTraffic" => Box::new(Homogeneous::new(arg)),
"TrafficSum" => Box::new(Sum::new(arg)),
"ShiftedTraffic" => Box::new(Shifted::new(arg)),
"ProductTraffic" => Box::new(ProductTraffic::new(arg)),
"SubRangeTraffic" => Box::new(SubRangeTraffic::new(arg)),
"Burst" => Box::new(Burst::new(arg)),
"MultimodalBurst" => Box::new(MultimodalBurst::new(arg)),
"Reactive" => Box::new(Reactive::new(arg)),
"TimeSequenced" => Box::new(TimeSequenced::new(arg)),
"Sequence" => Box::new(Sequence::new(arg)),
"BoundedDifference" => Box::new(BoundedDifference::new(arg)),
_ => panic!("Unknown traffic {}",cv_name),
}
}
else
{
panic!("Trying to create a traffic from a non-Object");
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Homogeneous
{
servers: usize,
pattern: Box<dyn Pattern>,
message_size: usize,
load: f32,
generated_messages: BTreeSet<*const Message>,
}
impl Traffic for Homogeneous
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
if origin>=self.servers
{
return Err(TrafficError::OriginOutsideTraffic);
}
let destination=self.pattern.get_destination(origin,topology,rng);
if origin==destination
{
return Err(TrafficError::SelfMessage);
}
let message=Rc::new(Message{
origin,
destination,
size:self.message_size,
creation_cycle: cycle,
});
self.generated_messages.insert(message.as_ref() as *const Message);
Ok(message)
}
fn probability_per_cycle(&self, _server:usize) -> f32
{
let r=self.load/self.message_size as f32;
if r>1.0
{
1.0
}
else
{
r
}
}
fn try_consume(&mut self, _server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
{
let message_ptr=message.as_ref() as *const Message;
self.generated_messages.remove(&message_ptr)
}
fn is_finished(&self) -> bool
{
false
}
fn server_state(&self, _server:usize, _cycle:usize) -> ServerTrafficState
{
ServerTrafficState::Generating
}
}
impl Homogeneous
{
pub fn new(arg:TrafficBuilderArgument) -> Homogeneous
{
let mut servers=None;
let mut load=None;
let mut pattern=None;
let mut message_size=None;
match_object_panic!(arg.cv,"HomogeneousTraffic",value,
"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
"load" => load=Some(value.as_f64().expect("bad value for load") as f32),
"message_size" => message_size=Some(value.as_f64().expect("bad value for message_size") as usize),
);
let servers=servers.expect("There were no servers");
let message_size=message_size.expect("There were no message_size");
let load=load.expect("There were no load");
let mut pattern=pattern.expect("There were no pattern");
let topo_servers=arg.topology.num_servers();
if servers != topo_servers
{
println!("WARNING: Generating traffic over {} servers when the topology has {} servers.",servers,topo_servers);
}
pattern.initialize(servers, servers, arg.topology, arg.rng);
Homogeneous{
servers,
pattern,
message_size,
load,
generated_messages: BTreeSet::new(),
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Sum
{
list: Vec<Box<dyn Traffic>>,
}
impl Traffic for Sum
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
let probs:Vec<f32> =self.list.iter().map(|t|t.probability_per_cycle(origin)).collect();
let mut r=rng.borrow_mut().gen_range(0f32..probs.iter().sum()); for i in 0..self.list.len()
{
if r<probs[i]
{
return self.list[i].generate_message(origin,cycle,topology,rng);
}
else
{
r-=probs[i];
}
}
panic!("failed probability");
}
fn probability_per_cycle(&self,server:usize) -> f32
{
self.list.iter().map(|t|t.probability_per_cycle(server)).sum()
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
{
for traffic in self.list.iter_mut()
{
if traffic.try_consume(server,message.clone(),cycle,topology,rng)
{
return true;
}
}
return false;
}
fn is_finished(&self) -> bool
{
for traffic in self.list.iter()
{
if !traffic.is_finished()
{
return false;
}
}
return true;
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
{
use ServerTrafficState::*;
let mut state = Finished;
for traffic in self.list.iter()
{
match traffic.server_state(server,cycle)
{
Finished => (),
Generating => return Generating,
FinishedGenerating => state = FinishedGenerating,
_ => state = UnspecifiedWait,
}
}
state
}
}
impl Sum
{
pub fn new(arg:TrafficBuilderArgument) -> Sum
{
let mut list=None;
match_object_panic!(arg.cv,"TrafficSum",value,
"list" => list = Some(value.as_array().expect("bad value for list").iter()
.map(|v|new_traffic(TrafficBuilderArgument{cv:v,..arg})).collect()),
);
let list=list.expect("There were no list");
Sum{
list
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Shifted
{
shift: usize,
traffic: Box<dyn Traffic>,
generated_messages: BTreeMap<*const Message,Rc<Message>>,
}
impl Traffic for Shifted
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
if origin<self.shift
{
return Err(TrafficError::OriginOutsideTraffic);
}
let inner_message=self.traffic.generate_message(origin-self.shift,cycle,topology,rng)?;
let outer_message=Rc::new(Message{
origin,
destination:inner_message.destination+self.shift,
size:inner_message.size,
creation_cycle: cycle,
});
self.generated_messages.insert(outer_message.as_ref() as *const Message,inner_message);
Ok(outer_message)
}
fn probability_per_cycle(&self,server:usize) -> f32
{
self.traffic.probability_per_cycle(server-self.shift)
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
{
let message_ptr=message.as_ref() as *const Message;
let outer_message=match self.generated_messages.remove(&message_ptr)
{
None => return false,
Some(m) => m,
};
if !self.traffic.try_consume(server,outer_message,cycle,topology,rng)
{
panic!("Shifted traffic consumed a message but its child did not.");
}
true
}
fn is_finished(&self) -> bool
{
self.traffic.is_finished()
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
{
self.traffic.server_state(server-self.shift,cycle)
}
}
impl Shifted
{
pub fn new(arg:TrafficBuilderArgument) -> Shifted
{
let mut shift=None;
let mut traffic=None;
match_object_panic!(arg.cv,"ShiftedTraffic",value,
"traffic" => traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
"shift" => shift=Some(value.as_f64().expect("bad value for shift") as usize),
);
let shift=shift.expect("There were no shift");
let traffic=traffic.expect("There were no traffic");
Shifted{
shift,
traffic,
generated_messages: BTreeMap::new(),
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct ProductTraffic
{
block_size: usize,
block_traffic: Box<dyn Traffic>,
global_pattern: Box<dyn Pattern>,
generated_messages: BTreeMap<*const Message,Rc<Message>>,
}
impl Traffic for ProductTraffic
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
let local=origin % self.block_size;
let global=origin / self.block_size;
let global_dest=self.global_pattern.get_destination(global,topology,rng);
let inner_message=self.block_traffic.generate_message(local,cycle,topology,rng)?;
let outer_message=Rc::new(Message{
origin,
destination:global_dest*self.block_size+inner_message.destination,
size:inner_message.size,
creation_cycle: cycle,
});
self.generated_messages.insert(outer_message.as_ref() as *const Message,inner_message);
Ok(outer_message)
}
fn probability_per_cycle(&self,server:usize) -> f32
{
let local=server % self.block_size;
self.block_traffic.probability_per_cycle(local)
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
{
let message_ptr=message.as_ref() as *const Message;
let outer_message=match self.generated_messages.remove(&message_ptr)
{
None => return false,
Some(m) => m,
};
if !self.block_traffic.try_consume(server,outer_message,cycle,topology,rng)
{
panic!("ProductTraffic traffic consumed a message but its child did not.");
}
true
}
fn is_finished(&self) -> bool
{
self.block_traffic.is_finished()
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
{
let local=server % self.block_size;
self.block_traffic.server_state(local,cycle)
}
}
impl ProductTraffic
{
pub fn new(arg:TrafficBuilderArgument) -> ProductTraffic
{
let mut block_size=None;
let mut block_traffic=None;
let mut global_pattern=None;
match_object_panic!(arg.cv,"ProductTraffic",value,
"block_traffic" => block_traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
"global_pattern" => global_pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
"block_size" => block_size=Some(value.as_f64().expect("bad value for block_size") as usize),
);
let block_size=block_size.expect("There were no block_size");
let block_traffic=block_traffic.expect("There were no block_traffic");
let mut global_pattern=global_pattern.expect("There were no global_pattern");
let global_size=arg.topology.num_servers()/block_size;
global_pattern.initialize(global_size,global_size,arg.topology,arg.rng);
ProductTraffic{
block_size,
block_traffic,
global_pattern,
generated_messages: BTreeMap::new(),
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct SubRangeTraffic
{
start: usize,
end: usize,
traffic: Box<dyn Traffic>,
}
impl Traffic for SubRangeTraffic
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
if origin<self.start || origin>=self.end
{
return Err(TrafficError::OriginOutsideTraffic);
}
self.traffic.generate_message(origin,cycle,topology,rng)
}
fn probability_per_cycle(&self,server:usize) -> f32
{
self.traffic.probability_per_cycle(server)
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
{
self.traffic.try_consume(server,message,cycle,topology,rng)
}
fn is_finished(&self) -> bool
{
self.traffic.is_finished()
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
{
self.traffic.server_state(server,cycle)
}
}
impl SubRangeTraffic
{
pub fn new(arg:TrafficBuilderArgument) -> SubRangeTraffic
{
let mut start=None;
let mut end=None;
let mut traffic=None;
match_object_panic!(arg.cv,"SubRangeTraffic",value,
"traffic" => traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
"start" => start=Some(value.as_f64().expect("bad value for start") as usize),
"end" => end=Some(value.as_f64().expect("bad value for end") as usize),
);
let start=start.expect("There were no start");
let end=end.expect("There were no end");
let traffic=traffic.expect("There were no traffic");
SubRangeTraffic{
start,
end,
traffic,
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Burst
{
servers: usize,
pattern: Box<dyn Pattern>,
message_size: usize,
pending_messages: Vec<usize>,
generated_messages: BTreeSet<*const Message>,
}
impl Traffic for Burst
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
if origin>=self.servers
{
return Err(TrafficError::OriginOutsideTraffic);
}
self.pending_messages[origin]-=1;
let destination=self.pattern.get_destination(origin,topology,rng);
if origin==destination
{
return Err(TrafficError::SelfMessage);
}
let message=Rc::new(Message{
origin,
destination,
size:self.message_size,
creation_cycle: cycle,
});
self.generated_messages.insert(message.as_ref() as *const Message);
Ok(message)
}
fn probability_per_cycle(&self, server:usize) -> f32
{
if self.pending_messages[server]>0
{
1.0
}
else
{
0.0
}
}
fn try_consume(&mut self, _server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
{
let message_ptr=message.as_ref() as *const Message;
self.generated_messages.remove(&message_ptr)
}
fn is_finished(&self) -> bool
{
if !self.generated_messages.is_empty()
{
return false;
}
for &pm in self.pending_messages.iter()
{
if pm>0
{
return false;
}
}
true
}
fn server_state(&self, server:usize, _cycle:usize) -> ServerTrafficState
{
if self.pending_messages[server]>0 {
ServerTrafficState::Generating
} else {
ServerTrafficState::FinishedGenerating
}
}
}
impl Burst
{
pub fn new(arg:TrafficBuilderArgument) -> Burst
{
let mut servers=None;
let mut messages_per_server=None;
let mut pattern=None;
let mut message_size=None;
match_object_panic!(arg.cv,"Burst",value,
"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
"messages_per_server" => messages_per_server=Some(value.as_f64().expect("bad value for messages_per_server") as usize),
"message_size" => message_size=Some(value.as_f64().expect("bad value for message_size") as usize),
);
let servers=servers.expect("There were no servers");
let message_size=message_size.expect("There were no message_size");
let messages_per_server=messages_per_server.expect("There were no messages_per_server");
let mut pattern=pattern.expect("There were no pattern");
pattern.initialize(servers, servers, arg.topology, arg.rng);
Burst{
servers,
pattern,
message_size,
pending_messages:vec![messages_per_server;servers],
generated_messages: BTreeSet::new(),
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Reactive
{
action_traffic: Box<dyn Traffic>,
reaction_traffic: Box<dyn Traffic>,
pending_messages: Vec<VecDeque<Rc<Message>>>,
}
impl Traffic for Reactive
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
if origin<self.pending_messages.len()
{
if let Some(message)=self.pending_messages[origin].pop_front()
{
return Ok(message);
}
}
return self.action_traffic.generate_message(origin,cycle,topology,rng);
}
fn probability_per_cycle(&self, server:usize) -> f32
{
if server<self.pending_messages.len() && !self.pending_messages[server].is_empty()
{
return 1.0;
}
return self.action_traffic.probability_per_cycle(server);
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
{
if self.action_traffic.try_consume(server,message.clone(),cycle,topology,rng)
{
if self.reaction_traffic.should_generate(message.origin,cycle,rng)
{
match self.reaction_traffic.generate_message(message.origin,cycle,topology,rng)
{
Ok(response_message) =>
{
if self.pending_messages.len()<message.origin+1
{
self.pending_messages.resize(message.origin+1,VecDeque::new());
}
self.pending_messages[message.origin].push_back(response_message);
},
Err(error) => panic!("An error happened when generating response traffic: {:?}",error),
};
}
return true;
}
self.reaction_traffic.try_consume(server,message,cycle,topology,rng)
}
fn is_finished(&self) -> bool
{
if !self.action_traffic.is_finished() || !self.reaction_traffic.is_finished()
{
return false;
}
for pm in self.pending_messages.iter()
{
if !pm.is_empty()
{
return false;
}
}
return true;
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
{
use ServerTrafficState::*;
let action_state = self.action_traffic.server_state(server,cycle);
if let Finished = action_state
{
return Finished
}
let reaction_state = self.reaction_traffic.server_state(server,cycle);
if let Finished = reaction_state
{
return Finished
}
if self.is_finished() { Finished } else { UnspecifiedWait }
}
}
impl Reactive
{
pub fn new(arg:TrafficBuilderArgument) -> Reactive
{
let mut action_traffic=None;
let mut reaction_traffic=None;
match_object_panic!(arg.cv,"Reactive",value,
"action_traffic" => action_traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
"reaction_traffic" => reaction_traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
);
let action_traffic=action_traffic.expect("There were no action_traffic");
let reaction_traffic=reaction_traffic.expect("There were no reaction_traffic");
Reactive{
action_traffic,
reaction_traffic,
pending_messages:vec![],
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct TimeSequenced
{
traffics: Vec<Box<dyn Traffic>>,
times: Vec<usize>,
}
impl Traffic for TimeSequenced
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
let mut offset = cycle;
let mut traffic_index = 0;
while traffic_index<self.traffics.len() && offset >= self.times[traffic_index]
{
offset -= self.times[traffic_index];
traffic_index += 1;
}
assert!(traffic_index<self.traffics.len());
self.traffics[traffic_index].generate_message(origin,cycle,topology,rng)
}
fn probability_per_cycle(&self,_server:usize) -> f32
{
1.0
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
{
for traffic in self.traffics.iter_mut()
{
if traffic.try_consume(server,message.clone(),cycle,topology,rng)
{
return true;
}
}
return false;
}
fn is_finished(&self) -> bool
{
for traffic in self.traffics.iter()
{
if !traffic.is_finished()
{
return false;
}
}
return true;
}
fn should_generate(&self, server:usize, cycle:usize, rng: &RefCell<StdRng>) -> bool
{
let mut offset = cycle;
let mut traffic_index = 0;
while traffic_index<self.traffics.len() && offset >= self.times[traffic_index]
{
offset -= self.times[traffic_index];
traffic_index += 1;
}
if traffic_index<self.traffics.len(){
self.traffics[traffic_index].should_generate(server,cycle,rng)
} else {
false
}
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
{
let mut offset = cycle;
let mut traffic_index = 0;
while traffic_index<self.traffics.len() && offset >= self.times[traffic_index]
{
offset -= self.times[traffic_index];
traffic_index += 1;
}
if traffic_index == self.traffics.len()
{
return ServerTrafficState::Finished;
}
let state = self.traffics[traffic_index].server_state(server,cycle);
if let ServerTrafficState::Finished = state {
ServerTrafficState::WaitingCycle { cycle:self.times[traffic_index] }
} else {
state
}
}
}
impl TimeSequenced
{
pub fn new(arg:TrafficBuilderArgument) -> TimeSequenced
{
let mut traffics=None;
let mut times=None;
match_object_panic!(arg.cv,"TimeSequenced",value,
"traffics" => traffics = Some(value.as_array().expect("bad value for traffics").iter()
.map(|v|new_traffic(TrafficBuilderArgument{cv:v,..arg})).collect()),
"times" => times = Some(value.as_array()
.expect("bad value for times").iter()
.map(|v|v.as_f64().expect("bad value in times") as usize).collect()),
);
let traffics=traffics.expect("There were no traffics");
let times=times.expect("There were no times");
TimeSequenced{
traffics,
times,
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Sequence
{
traffics: Vec<Box<dyn Traffic>>,
current_traffic: usize,
}
impl Traffic for Sequence
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
while self.traffics[self.current_traffic].is_finished()
{
self.current_traffic += 1;
}
assert!(self.current_traffic<self.traffics.len());
self.traffics[self.current_traffic].generate_message(origin,cycle,topology,rng)
}
fn probability_per_cycle(&self,server:usize) -> f32
{
self.traffics[self.current_traffic].probability_per_cycle(server)
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
{
for traffic in self.traffics.iter_mut()
{
if traffic.try_consume(server,message.clone(),cycle,topology,rng)
{
while self.current_traffic < self.traffics.len() && self.traffics[self.current_traffic].is_finished()
{
self.current_traffic += 1;
}
return true;
}
}
return false;
}
fn is_finished(&self) -> bool
{
return self.current_traffic>=self.traffics.len() || (self.current_traffic==self.traffics.len()-1 && self.traffics[self.current_traffic].is_finished())
}
fn should_generate(&self, server:usize, cycle:usize, rng: &RefCell<StdRng>) -> bool
{
if self.current_traffic>=self.traffics.len()
{
false
} else {
self.traffics[self.current_traffic].should_generate(server,cycle,rng)
}
}
fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
{
use ServerTrafficState::*;
if self.current_traffic>=self.traffics.len()
{
Finished
} else {
let state = self.traffics[self.current_traffic].server_state(server,cycle);
if let Finished=state{
UnspecifiedWait
} else {
state
}
}
}
}
impl Sequence
{
pub fn new(arg:TrafficBuilderArgument) -> Sequence
{
let mut traffics_args=None;
let mut period_number=1usize;
match_object_panic!(arg.cv,"Sequence",value,
"traffics" => traffics_args = Some(value.as_array().expect("bad value for traffics")),
"period_number" => period_number=value.as_f64().expect("bad value for period_number") as usize,
);
let traffics_args=traffics_args.expect("There were no traffics");
let traffics = (0..period_number).flat_map(|_ip| traffics_args.iter().map(|v|new_traffic(TrafficBuilderArgument{cv:v,..arg})).collect::<Vec<_>>() ).collect();
Sequence{
traffics,
current_traffic:0,
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct MultimodalBurst
{
servers: usize,
provenance: Vec< (Box<dyn Pattern>,usize,usize,usize) >,
pending: Vec<Vec<(usize,usize)>>,
next_provenance: Vec<usize>,
generated_messages: BTreeSet<*const Message>,
}
impl Traffic for MultimodalBurst
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
if origin>=self.servers
{
return Err(TrafficError::OriginOutsideTraffic);
}
let pending = &mut self.pending[origin];
let mut provenance_index = self.next_provenance[origin];
loop
{
let (ref mut total_remaining, ref mut step_remaining) = pending[provenance_index];
if *total_remaining > 0
{
*step_remaining -=1;
*total_remaining -=1;
if *step_remaining == 0
{
let (ref _pattern, _total_messages, _message_size, step_size) = self.provenance[provenance_index];
*step_remaining = step_size;
self.next_provenance[origin] = (provenance_index+1) % pending.len();
}
break;
}
provenance_index = (provenance_index+1) % pending.len();
}
let (ref pattern,_total_messages,message_size,_step_size) = self.provenance[provenance_index];
let destination=pattern.get_destination(origin,topology,rng);
if origin==destination
{
return Err(TrafficError::SelfMessage);
}
let message = Rc::new(Message{
origin,
destination,
size:message_size,
creation_cycle: cycle,
});
self.generated_messages.insert(message.as_ref() as *const Message);
Ok(message)
}
fn probability_per_cycle(&self, server:usize) -> f32
{
for (total_remaining,_step_remaining) in self.pending[server].iter()
{
if *total_remaining > 0
{
return 1.0;
}
}
0.0
}
fn try_consume(&mut self, _server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
{
let message_ptr=message.as_ref() as *const Message;
self.generated_messages.remove(&message_ptr)
}
fn is_finished(&self) -> bool
{
if !self.generated_messages.is_empty()
{
return false;
}
for server_pending in self.pending.iter()
{
for (total_remaining, _step_remaining) in server_pending.iter()
{
if *total_remaining > 0
{
return false;
}
}
}
true
}
fn server_state(&self, server:usize, _cycle:usize) -> ServerTrafficState
{
if self.pending[server].iter().any(|(total_remaining,_step_remaining)| *total_remaining > 0 ) {
ServerTrafficState::Generating
} else {
ServerTrafficState::FinishedGenerating
}
}
}
impl MultimodalBurst
{
pub fn new(arg:TrafficBuilderArgument) -> MultimodalBurst
{
let mut servers=None;
let mut provenance : Option<Vec<(_,_,_,_)>> = None;
match_object_panic!(arg.cv,"MultimodalBurst",value,
"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
"provenance" => match value
{
&ConfigurationValue::Array(ref a) => provenance=Some(a.iter().map(|pcv|{
let mut messages_per_server=None;
let mut pattern=None;
let mut message_size=None;
let mut step_size=None;
match_object_panic!(pcv,"Provenance",pvalue,
"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:pvalue,plugs:arg.plugs})),
"messages_per_server" | "total_messages" =>
messages_per_server=Some(pvalue.as_f64().expect("bad value for messages_per_server") as usize),
"message_size" => message_size=Some(pvalue.as_f64().expect("bad value for message_size") as usize),
"step_size" => step_size=Some(pvalue.as_f64().expect("bad value for step_size") as usize),
);
let pattern=pattern.expect("There were no pattern");
let messages_per_server=messages_per_server.expect("There were no messages_per_server");
let message_size=message_size.expect("There were no message_size");
let step_size=step_size.expect("There were no step_size");
(pattern,messages_per_server,message_size,step_size)
}).collect()),
_ => panic!("bad value for provenance"),
}
);
let servers=servers.expect("There were no servers");
let mut provenance=provenance.expect("There were no provenance");
for (pattern,_total_messages,_message_size,_step_size) in provenance.iter_mut()
{
pattern.initialize(servers, servers, arg.topology, arg.rng);
}
let each_pending = provenance.iter().map(|(_pattern,total_messages,_message_size,step_size)|(*total_messages,*step_size)).collect();
MultimodalBurst{
servers,
provenance,
pending: vec![each_pending;servers],
next_provenance:vec![0;servers],
generated_messages: BTreeSet::new(),
}
}
}
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct BoundedDifference
{
servers: usize,
pattern: Box<dyn Pattern>,
message_size: usize,
load: f32,
bound: usize,
generated_messages: BTreeSet<*const Message>,
allowance: Vec<usize>,
}
impl Traffic for BoundedDifference
{
fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
{
if origin>=self.servers
{
return Err(TrafficError::OriginOutsideTraffic);
}
assert!(self.allowance[origin]>0,"Origin {} has no allowance to send more messages.",origin);
let destination=self.pattern.get_destination(origin,topology,rng);
if origin==destination
{
return Err(TrafficError::SelfMessage);
}
self.allowance[origin]-=1;
let message=Rc::new(Message{
origin,
destination,
size:self.message_size,
creation_cycle: cycle,
});
self.generated_messages.insert(message.as_ref() as *const Message);
Ok(message)
}
fn probability_per_cycle(&self, server:usize) -> f32
{
if self.allowance[server]>0
{
let r=self.load/self.message_size as f32;
if r>1.0
{
1.0
}
else
{
r
}
} else { 0f32 }
}
fn try_consume(&mut self, server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
{
let message_ptr=message.as_ref() as *const Message;
self.allowance[server]+=1;
self.generated_messages.remove(&message_ptr)
}
fn is_finished(&self) -> bool
{
false
}
fn server_state(&self, server:usize, _cycle:usize) -> ServerTrafficState
{
if self.allowance[server]>0 {
ServerTrafficState::Generating
} else {
ServerTrafficState::WaitingData
}
}
}
impl BoundedDifference
{
pub fn new(arg:TrafficBuilderArgument) -> BoundedDifference
{
let mut servers=None;
let mut load=None;
let mut pattern=None;
let mut message_size=None;
let mut bound=None;
match_object_panic!(arg.cv,"BoundedDifference",value,
"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
"load" => load=Some(value.as_f64().expect("bad value for load") as f32),
"message_size" => message_size=Some(value.as_f64().expect("bad value for message_size") as usize),
"bound" => bound=Some(value.as_f64().expect("bad value for bound") as usize),
);
let servers=servers.expect("There were no servers");
let message_size=message_size.expect("There were no message_size");
let bound=bound.expect("There were no bound");
let load=load.expect("There were no load");
let mut pattern=pattern.expect("There were no pattern");
let topo_servers=arg.topology.num_servers();
if servers != topo_servers
{
println!("WARNING: Generating traffic over {} servers when the topology has {} servers.",servers,topo_servers);
}
pattern.initialize(servers, servers, arg.topology, arg.rng);
BoundedDifference{
servers,
pattern,
message_size,
load,
bound,
generated_messages: BTreeSet::new(),
allowance: vec![bound;servers],
}
}
}