caminos-lib 0.5.2

A modular interconnection network simulator.
Documentation

/*!

A Traffic defines the way load is generated by the servers.

see [`new_traffic`](fn.new_traffic.html) for documentation on the configuration syntax of predefined traffics.

*/

use std::boxed::Box;
use std::cell::{RefCell};
use std::rc::Rc;
use std::collections::{BTreeSet,BTreeMap,VecDeque};
//use std::mem::{size_of};
use std::fmt::Debug;

use ::rand::{Rng,rngs::StdRng};

use crate::match_object_panic;
use crate::config_parser::ConfigurationValue;
use crate::{Message,Plugs};
use crate::pattern::{Pattern,new_pattern,PatternBuilderArgument};
use crate::topology::Topology;
use quantifiable_derive::Quantifiable;//the derive macro
use crate::quantify::Quantifiable;

///Possible errors when trying to generate a message with a `Traffic`.
#[derive(Debug)]
pub enum TrafficError
{
	///The traffic tried to send a message outside the network range.
	OriginOutsideTraffic,
	///A server has generated a message to itself. Not necessarily an error.
	SelfMessage,
}

#[derive(Debug)]
pub enum ServerTrafficState
{
	///The server is currently generating traffic.
	Generating,
	///The server is currently waiting to receive some message from others.
	///If the server is known to not going to generate any more traffic it should be a `FinishedGenerating` state instead.
	WaitingData,
	///The server is not going to generate traffic nor change state until the `cycle`.
	WaitingCycle{cycle:usize},
	///The server is not generating traffic for some other reasons.
	UnspecifiedWait,
	///This server will not generate more traffic, but perhaps it will consume.
	FinishedGenerating,
	///This server has nothing else to do within this `Traffic`.
	Finished,
}

///A traffic to be offered to a network. Each server may generate and consume messages.
///Each should call `should_generate` every cycle unless it is unable to store more messages.
pub trait Traffic : Quantifiable + Debug
{
	///Returns a new message following the indications of the traffic.
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>;
	///Get its probability of generating per cycle
	fn probability_per_cycle(&self, server:usize) -> f32;
	///If the message was generated by the traffic updates itself and returns true
	///The argument `server` is the one consuming the message.
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool;
	///Indicates if the traffic is not going to generate any more messages.
	///Should be true if and only if the state of all servers is `Finished`.
	fn is_finished(&self) -> bool;
	///Returns true if a server should generate a message this cycle
	///Should coincide with having the `Genrating` state for deterministic traffics.
	fn should_generate(&self, server:usize, _cycle:usize, rng: &RefCell<StdRng>) -> bool
	{
		let p=self.probability_per_cycle(server);
		let r=rng.borrow_mut().gen_range(0f32..1f32);
		r<p
	}
	///Indicates the state of the server within the traffic.
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState;
}

#[derive(Debug)]
pub struct TrafficBuilderArgument<'a>
{
	///A ConfigurationValue::Object defining the traffic.
	pub cv: &'a ConfigurationValue,
	///The user defined plugs. In case the traffic needs to create elements.
	pub plugs: &'a Plugs,
	///The topology of the network that is gonna to receive the traffic.
	pub topology: &'a dyn Topology,
	///The random number generator to use.
	pub rng: &'a RefCell<StdRng>,
}

/**Build a new traffic.

## Base traffics.

### Homogeneous traffic
Is a traffic where all servers behave equally and uniform in time. Some `pattern` is generated
by `servers` number of involved servers along the whole simulation. Each server tries to use its link toward the network a `load`
fraction of the cycles. The generated messages has a size in phits of `message_size`. The generation is the typical Bernoulli process.
```ignore
HomogeneousTraffic{
	pattern:Uniform,
	servers:1000,
	load: 0.9,
	message_size: 16,
}
```

### Burst
In the Burst traffic each of the involved `servers` has a initial list of `messages_per_server` messages to emit. When all the messages
are consumed the simulation is requested to end.
```ignore
Burst{
	pattern:Uniform,
	servers:1000,
	messages_per_server:200,
	message_size: 16,
}
```

### Reactive

A Reactive traffic is composed of an `action_traffic` generated normally, whose packets, when consumed create a response by the `reaction_traffic`.
If both subtraffics are requesting to end and there is no pending message the reactive traffic also requests to end.
```ignore
Reactive{
	action_traffic:HomogeneousTraffic{...},
	reaction_traffic:HomogeneousTraffic{...},
}
```

## Operations

### TrafficSum

Generates several traffic at once, if the total load allows it.
```ignore
TrafficSum{
	list: [HomogeneousTraffic{...},... ],
}
```

### ShiftedTraffic

A ShiftedTraffic shifts a given traffic a certain amount of servers. Yu should really check if some pattern transformation fit your purpose, since it will be simpler.
```ignore
ShiftedTraffic{
	traffic: HomogeneousTraffic{...},
	shift: 50,
}
```

### ProductTraffic

A ProductTraffic divides the servers into blocks. Each group generates traffic following the `block_traffic`, but instead of having the destination in the same block it is selected a destination by using the `global_pattern` of the block. Blocks of interest are
* The servers attached to a router. Then if the global_pattern is a permutation, all the servers will comunicate with servers attached to the same router. This can stress the network a lot more than a permutation of servers.
* All servers in a group of a dragonfly. If the global_pattern is a permutation, there is only a global link between groups, and Shortest routing is used, then all the packets generated in a group will try by the same global link. Other global links being unused.
Note there is also a product at pattern level, which may be easier to use.

```ignore
ProductTraffic{
	block_size: 10,
	block_traffic: HomogeneousTraffic{...},
	global_pattern: RandomPermutation,
}
```

### SubRangeTraffic

A SubRangeTraffic makes servers outise the range to not generate traffic.
```ignore
SubRangeTraffic{
	start: 100,
	end: 200,
	traffic: HomogeneousTraffic{...},
}
```

### TimeSequenced

Defines a sequence of traffics with the given finalization times.

TimeSequenced{
	traffics: [HomogeneousTraffic{...}, HomogeneousTraffic{...}],
	times: [2000, 15000],
}

### Sequence

Defines a sequence of traffics. When one is completed the next starts.

Sequence{
	traffics: [Burst{...}, Burst{...}],
}


*/
pub fn new_traffic(arg:TrafficBuilderArgument) -> Box<dyn Traffic>
{
	if let &ConfigurationValue::Object(ref cv_name, ref _cv_pairs)=arg.cv
	{
		if let Some(builder) = arg.plugs.traffics.get(cv_name)
		{
			return builder(arg);
		}
		match cv_name.as_ref()
		{
			"HomogeneousTraffic" => Box::new(Homogeneous::new(arg)),
			"TrafficSum" => Box::new(Sum::new(arg)),
			"ShiftedTraffic" => Box::new(Shifted::new(arg)),
			"ProductTraffic" => Box::new(ProductTraffic::new(arg)),
			"SubRangeTraffic" => Box::new(SubRangeTraffic::new(arg)),
			"Burst" => Box::new(Burst::new(arg)),
			"MultimodalBurst" => Box::new(MultimodalBurst::new(arg)),
			"Reactive" => Box::new(Reactive::new(arg)),
			"TimeSequenced" => Box::new(TimeSequenced::new(arg)),
			"Sequence" => Box::new(Sequence::new(arg)),
			"BoundedDifference" => Box::new(BoundedDifference::new(arg)),
			_ => panic!("Unknown traffic {}",cv_name),
		}
	}
	else
	{
		panic!("Trying to create a traffic from a non-Object");
	}
}

///Traffic in which all messages have same size, follow the same pattern, and there is no change with time.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Homogeneous
{
	///Number of servers applying this traffic.
	servers: usize,
	///The pattern of the communication.
	pattern: Box<dyn Pattern>,
	///The size of each sent message.
	message_size: usize,
	///The load offered to the network. Proportion of the cycles that should be injecting phits.
	load: f32,
	///Set of generated messages.
	generated_messages: BTreeSet<*const Message>,
}

impl Traffic for Homogeneous
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		if origin>=self.servers
		{
			//panic!("origin {} does not belong to the traffic",origin);
			return Err(TrafficError::OriginOutsideTraffic);
		}
		let destination=self.pattern.get_destination(origin,topology,rng);
		if origin==destination
		{
			return Err(TrafficError::SelfMessage);
		}
		let message=Rc::new(Message{
			origin,
			destination,
			size:self.message_size,
			creation_cycle: cycle,
		});
		self.generated_messages.insert(message.as_ref() as *const Message);
		Ok(message)
	}
	fn probability_per_cycle(&self, _server:usize) -> f32
	{
		let r=self.load/self.message_size as f32;
		//println!("load={} r={} size={}",self.load,r,self.message_size);
		if r>1.0
		{
			1.0
		}
		else
		{
			r
		}
	}
	fn try_consume(&mut self, _server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
	{
		let message_ptr=message.as_ref() as *const Message;
		self.generated_messages.remove(&message_ptr)
	}
	fn is_finished(&self) -> bool
	{
		false
	}
	fn server_state(&self, _server:usize, _cycle:usize) -> ServerTrafficState
	{
		ServerTrafficState::Generating
	}
}

impl Homogeneous
{
	pub fn new(arg:TrafficBuilderArgument) -> Homogeneous
	{
		let mut servers=None;
		let mut load=None;
		let mut pattern=None;
		let mut message_size=None;
		match_object_panic!(arg.cv,"HomogeneousTraffic",value,
			"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
			"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
			"load" => load=Some(value.as_f64().expect("bad value for load") as f32),
			"message_size" => message_size=Some(value.as_f64().expect("bad value for message_size") as usize),
		);
		let servers=servers.expect("There were no servers");
		let message_size=message_size.expect("There were no message_size");
		let load=load.expect("There were no load");
		let mut pattern=pattern.expect("There were no pattern");
		let topo_servers=arg.topology.num_servers();
		if servers != topo_servers
		{
			println!("WARNING: Generating traffic over {} servers when the topology has {} servers.",servers,topo_servers);
		}
		pattern.initialize(servers, servers, arg.topology, arg.rng);
		Homogeneous{
			servers,
			pattern,
			message_size,
			load,
			generated_messages: BTreeSet::new(),
		}
	}
}

///Traffic which is the sum of a list of oter traffics.
///While it will clearly work when the sum of the generation rates is at most 1, it should behave nicely enough otherwise.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Sum
{
	///List of traffic summands
	list: Vec<Box<dyn Traffic>>,
}

//impl Quantifiable for Sum
//{
//	fn total_memory(&self) -> usize
//	{
//		unimplemented!();
//	}
//	fn print_memory_breakdown(&self)
//	{
//		unimplemented!();
//	}
//	fn forecast_total_memory(&self) -> usize
//	{
//		unimplemented!();
//	}
//}

impl Traffic for Sum
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		let probs:Vec<f32> =self.list.iter().map(|t|t.probability_per_cycle(origin)).collect();
		//let mut r=rng.borrow_mut().gen_range(0f32,probs.iter().sum());//rand-0.4
		let mut r=rng.borrow_mut().gen_range(0f32..probs.iter().sum());//rand-0.8
		for i in 0..self.list.len()
		{
			if r<probs[i]
			{
				return self.list[i].generate_message(origin,cycle,topology,rng);
			}
			else
			{
				r-=probs[i];
			}
		}
		panic!("failed probability");
	}
	//fn should_generate(&self, rng: &RefCell<StdRng>) -> bool
	//{
	//	let r=rng.borrow_mut().gen_range(0f32,1f32);
	//	r<=self.list.iter().map(|t|t.probability_per_cycle()).sum()
	//}
	fn probability_per_cycle(&self,server:usize) -> f32
	{
		self.list.iter().map(|t|t.probability_per_cycle(server)).sum()
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
	{
		for traffic in self.list.iter_mut()
		{
			if traffic.try_consume(server,message.clone(),cycle,topology,rng)
			{
				return true;
			}
		}
		return false;
	}
	fn is_finished(&self) -> bool
	{
		for traffic in self.list.iter()
		{
			if !traffic.is_finished()
			{
				return false;
			}
		}
		return true;
	}
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
	{
		use ServerTrafficState::*;
		//let states = self.list.iter().map(|t|t.server_state(server,cycle)).collect();
		let mut state = Finished;
		for traffic in self.list.iter()
		{
			match traffic.server_state(server,cycle)
			{
				Finished => (),
				Generating => return Generating,
				FinishedGenerating => state = FinishedGenerating,
				_ => state = UnspecifiedWait,
			}
		}
		state
	}
}

impl Sum
{
	pub fn new(arg:TrafficBuilderArgument) -> Sum
	{
		let mut list=None;
		match_object_panic!(arg.cv,"TrafficSum",value,
			"list" => list = Some(value.as_array().expect("bad value for list").iter()
				.map(|v|new_traffic(TrafficBuilderArgument{cv:v,..arg})).collect()),
		);
		let list=list.expect("There were no list");
		Sum{
			list
		}
	}
}

///Traffic which is another shifted by some amount of servers
///First check whether a transformation at the `Pattern` level is enough.
///The server `index+shit` will be seen as just `index` by the inner traffic.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Shifted
{
	///The amount of the shift in servers
	shift: usize,
	///The traffic that is being shifted
	traffic: Box<dyn Traffic>,
	///Set of generated messages.
	generated_messages: BTreeMap<*const Message,Rc<Message>>,
}

//impl Quantifiable for Shifted
//{
//	fn total_memory(&self) -> usize
//	{
//		unimplemented!();
//	}
//	fn print_memory_breakdown(&self)
//	{
//		unimplemented!();
//	}
//	fn forecast_total_memory(&self) -> usize
//	{
//		unimplemented!();
//	}
//}

impl Traffic for Shifted
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		if origin<self.shift
		{
			return Err(TrafficError::OriginOutsideTraffic);
		}
		//let mut message=self.traffic.generate_message(origin-self.shift,rng)?;
		//message.origin=origin;
		//message.destination+=self.shift;
		//Ok(message)
		let inner_message=self.traffic.generate_message(origin-self.shift,cycle,topology,rng)?;
		let outer_message=Rc::new(Message{
			origin,
			destination:inner_message.destination+self.shift,
			size:inner_message.size,
			creation_cycle: cycle,
		});
		self.generated_messages.insert(outer_message.as_ref() as *const Message,inner_message);
		Ok(outer_message)
	}
	fn probability_per_cycle(&self,server:usize) -> f32
	{
		self.traffic.probability_per_cycle(server-self.shift)
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
	{
		let message_ptr=message.as_ref() as *const Message;
		let outer_message=match self.generated_messages.remove(&message_ptr)
		{
			None => return false,
			Some(m) => m,
		};
		if !self.traffic.try_consume(server,outer_message,cycle,topology,rng)
		{
			panic!("Shifted traffic consumed a message but its child did not.");
		}
		true
	}
	fn is_finished(&self) -> bool
	{
		self.traffic.is_finished()
	}
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
	{
		self.traffic.server_state(server-self.shift,cycle)
	}
}

impl Shifted
{
	pub fn new(arg:TrafficBuilderArgument) -> Shifted
	{
		let mut shift=None;
		let mut traffic=None;
		match_object_panic!(arg.cv,"ShiftedTraffic",value,
			"traffic" => traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
			"shift" => shift=Some(value.as_f64().expect("bad value for shift") as usize),
		);
		let shift=shift.expect("There were no shift");
		let traffic=traffic.expect("There were no traffic");
		Shifted{
			shift,
			traffic,
			generated_messages: BTreeMap::new(),
		}
	}
}

///Divides the network in blocks and use a `Traffic` inside blocks applying a global `Pattern` among blocks.
///First check whether a transformation at the `Pattern` level is enough; specially see the `Product` pattern.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct ProductTraffic
{
	block_size: usize,
	block_traffic: Box<dyn Traffic>,
	global_pattern: Box<dyn Pattern>,
	// ///The amount of the shift in servers
	// shift: usize,
	// ///The traffic that is being shifted
	// traffic: Box<dyn Traffic>,
	///Set of generated messages.
	generated_messages: BTreeMap<*const Message,Rc<Message>>,
}

impl Traffic for ProductTraffic
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		let local=origin % self.block_size;
		let global=origin / self.block_size;
		//let local_dest=self.block_pattern.get_destination(local,topology,rng);
		let global_dest=self.global_pattern.get_destination(global,topology,rng);
		//global_dest*self.block_size+local_dest
		let inner_message=self.block_traffic.generate_message(local,cycle,topology,rng)?;
		let outer_message=Rc::new(Message{
			origin,
			destination:global_dest*self.block_size+inner_message.destination,
			size:inner_message.size,
			creation_cycle: cycle,
		});
		self.generated_messages.insert(outer_message.as_ref() as *const Message,inner_message);
		Ok(outer_message)
	}
	fn probability_per_cycle(&self,server:usize) -> f32
	{
		let local=server % self.block_size;
		self.block_traffic.probability_per_cycle(local)
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
	{
		let message_ptr=message.as_ref() as *const Message;
		let outer_message=match self.generated_messages.remove(&message_ptr)
		{
			None => return false,
			Some(m) => m,
		};
		if !self.block_traffic.try_consume(server,outer_message,cycle,topology,rng)
		{
			panic!("ProductTraffic traffic consumed a message but its child did not.");
		}
		true
	}
	fn is_finished(&self) -> bool
	{
		self.block_traffic.is_finished()
	}
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
	{
		let local=server % self.block_size;
		self.block_traffic.server_state(local,cycle)
	}
}

impl ProductTraffic
{
	pub fn new(arg:TrafficBuilderArgument) -> ProductTraffic
	{
		let mut block_size=None;
		let mut block_traffic=None;
		let mut global_pattern=None;
		match_object_panic!(arg.cv,"ProductTraffic",value,
			"block_traffic" => block_traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
			"global_pattern" => global_pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
			"block_size" => block_size=Some(value.as_f64().expect("bad value for block_size") as usize),
		);
		let block_size=block_size.expect("There were no block_size");
		let block_traffic=block_traffic.expect("There were no block_traffic");
		let mut global_pattern=global_pattern.expect("There were no global_pattern");
		let global_size=arg.topology.num_servers()/block_size;
		global_pattern.initialize(global_size,global_size,arg.topology,arg.rng);
		ProductTraffic{
			block_size,
			block_traffic,
			global_pattern,
			generated_messages: BTreeMap::new(),
		}
	}
}

///Only allow servers in range will generate messages. The messages can go out of the given range.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct SubRangeTraffic
{
	///The first element actually in the traffic.
	start: usize,
	///The next to the last element actually in the traffic.
	end: usize,
	///The traffic that is being filtered.
	traffic: Box<dyn Traffic>,
	// /Set of generated messages.
	//generated_messages: BTreeMap<*const Message,Rc<Message>>,
}

impl Traffic for SubRangeTraffic
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		if origin<self.start || origin>=self.end
		{
			return Err(TrafficError::OriginOutsideTraffic);
		}
		self.traffic.generate_message(origin,cycle,topology,rng)
	}
	fn probability_per_cycle(&self,server:usize) -> f32
	{
		self.traffic.probability_per_cycle(server)
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
	{
		self.traffic.try_consume(server,message,cycle,topology,rng)
	}
	fn is_finished(&self) -> bool
	{
		self.traffic.is_finished()
	}
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
	{
		self.traffic.server_state(server,cycle)
	}
}

impl SubRangeTraffic
{
	pub fn new(arg:TrafficBuilderArgument) -> SubRangeTraffic
	{
		let mut start=None;
		let mut end=None;
		let mut traffic=None;
		match_object_panic!(arg.cv,"SubRangeTraffic",value,
			"traffic" => traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
			"start" => start=Some(value.as_f64().expect("bad value for start") as usize),
			"end" => end=Some(value.as_f64().expect("bad value for end") as usize),
		);
		let start=start.expect("There were no start");
		let end=end.expect("There were no end");
		let traffic=traffic.expect("There were no traffic");
		SubRangeTraffic{
			start,
			end,
			traffic,
			//generated_messages: BTreeMap::new(),
		}
	}
}

///Initialize an amount of messages to send from each server.
///The traffic will be considered complete when all servers have generated their messages and all of them have been consumed.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Burst
{
	///Number of servers applying this traffic.
	servers: usize,
	///The pattern of the communication.
	pattern: Box<dyn Pattern>,
	///The size of each sent message.
	message_size: usize,
	///The number of messages each server has pending to sent.
	pending_messages: Vec<usize>,
	///Set of generated messages.
	generated_messages: BTreeSet<*const Message>,
}

impl Traffic for Burst
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		if origin>=self.servers
		{
			//panic!("origin {} does not belong to the traffic",origin);
			return Err(TrafficError::OriginOutsideTraffic);
		}
		self.pending_messages[origin]-=1;
		let destination=self.pattern.get_destination(origin,topology,rng);
		if origin==destination
		{
			return Err(TrafficError::SelfMessage);
		}
		let message=Rc::new(Message{
			origin,
			destination,
			size:self.message_size,
			creation_cycle: cycle,
		});
		self.generated_messages.insert(message.as_ref() as *const Message);
		Ok(message)
	}
	fn probability_per_cycle(&self, server:usize) -> f32
	{
		if self.pending_messages[server]>0
		{
			1.0
		}
		else
		{
			0.0
		}
	}
	fn try_consume(&mut self, _server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
	{
		let message_ptr=message.as_ref() as *const Message;
		self.generated_messages.remove(&message_ptr)
	}
	fn is_finished(&self) -> bool
	{
		if !self.generated_messages.is_empty()
		{
			return false;
		}
		for &pm in self.pending_messages.iter()
		{
			if pm>0
			{
				return false;
			}
		}
		true
	}
	fn server_state(&self, server:usize, _cycle:usize) -> ServerTrafficState
	{
		if self.pending_messages[server]>0 {
			ServerTrafficState::Generating
		} else {
			//We do not know whether someone is sending us data.
			//if self.is_finished() { ServerTrafficState::Finished } else { ServerTrafficState::UnspecifiedWait }
			// Sometimes it could be Finished, but it is not worth computing...
			ServerTrafficState::FinishedGenerating
		}
	}
}

impl Burst
{
	pub fn new(arg:TrafficBuilderArgument) -> Burst
	{
		let mut servers=None;
		let mut messages_per_server=None;
		let mut pattern=None;
		let mut message_size=None;
		match_object_panic!(arg.cv,"Burst",value,
			"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
			"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
			"messages_per_server" => messages_per_server=Some(value.as_f64().expect("bad value for messages_per_server") as usize),
			"message_size" => message_size=Some(value.as_f64().expect("bad value for message_size") as usize),
		);
		let servers=servers.expect("There were no servers");
		let message_size=message_size.expect("There were no message_size");
		let messages_per_server=messages_per_server.expect("There were no messages_per_server");
		let mut pattern=pattern.expect("There were no pattern");
		pattern.initialize(servers, servers, arg.topology, arg.rng);
		Burst{
			servers,
			pattern,
			message_size,
			pending_messages:vec![messages_per_server;servers],
			generated_messages: BTreeSet::new(),
		}
	}
}


///Has a major traffic `action_traffic` generated normally. When a message from this `action_traffic` is consumed, the `reaction_traffic` is requested for a message. This reaction message will be generated by the server that consumed the action message. The destination of the reaction message is independent of the origin of the action message.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Reactive
{
	action_traffic: Box<dyn Traffic>,
	reaction_traffic: Box<dyn Traffic>,
	pending_messages: Vec<VecDeque<Rc<Message>>>,
}


//impl Quantifiable for Reactive
//{
//	fn total_memory(&self) -> usize
//	{
//		unimplemented!();
//	}
//	fn print_memory_breakdown(&self)
//	{
//		unimplemented!();
//	}
//	fn forecast_total_memory(&self) -> usize
//	{
//		unimplemented!();
//	}
//}
impl Traffic for Reactive
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		if origin<self.pending_messages.len()
		{
			if let Some(message)=self.pending_messages[origin].pop_front()
			{
				return Ok(message);
			}
		}
		return self.action_traffic.generate_message(origin,cycle,topology,rng);
	}
	fn probability_per_cycle(&self, server:usize) -> f32
	{
		if server<self.pending_messages.len() && !self.pending_messages[server].is_empty()
		{
			return 1.0;
		}
		return self.action_traffic.probability_per_cycle(server);
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
	{
		if self.action_traffic.try_consume(server,message.clone(),cycle,topology,rng)
		{
			if self.reaction_traffic.should_generate(message.origin,cycle,rng)
			{
				match self.reaction_traffic.generate_message(message.origin,cycle,topology,rng)
				{
					Ok(response_message) =>
					{
						if self.pending_messages.len()<message.origin+1
						{
							self.pending_messages.resize(message.origin+1,VecDeque::new());
						}
						self.pending_messages[message.origin].push_back(response_message);
					},
					//Err(TrafficError::OriginOutsideTraffic) => (),
					Err(error) => panic!("An error happened when generating response traffic: {:?}",error),
				};
			}
			return true;
		}
		self.reaction_traffic.try_consume(server,message,cycle,topology,rng)
	}
	fn is_finished(&self) -> bool
	{
		if !self.action_traffic.is_finished() || !self.reaction_traffic.is_finished()
		{
			return false;
		}
		for pm in self.pending_messages.iter()
		{
			if !pm.is_empty()
			{
				return false;
			}
		}
		return true;
	}
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
	{
		use ServerTrafficState::*;
		let action_state = self.action_traffic.server_state(server,cycle);
		if let Finished = action_state
		{
			return Finished
		}
		let reaction_state = self.reaction_traffic.server_state(server,cycle);
		if let Finished = reaction_state
		{
			return Finished
		}
		if self.is_finished() { Finished } else { UnspecifiedWait }
	}
}

impl Reactive
{
	pub fn new(arg:TrafficBuilderArgument) -> Reactive
	{
		let mut action_traffic=None;
		let mut reaction_traffic=None;
		match_object_panic!(arg.cv,"Reactive",value,
			"action_traffic" => action_traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
			"reaction_traffic" => reaction_traffic=Some(new_traffic(TrafficBuilderArgument{cv:value,..arg})),
		);
		let action_traffic=action_traffic.expect("There were no action_traffic");
		let reaction_traffic=reaction_traffic.expect("There were no reaction_traffic");
		Reactive{
			action_traffic,
			reaction_traffic,
			pending_messages:vec![],
		}
	}
}



///Selects the traffic from a sequence depending on current cycle
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct TimeSequenced
{
	///List of applicable traffics.
	traffics: Vec<Box<dyn Traffic>>,
	///End time of each traffic. Counting from the end of the previous one.
	times: Vec<usize>,
}

impl Traffic for TimeSequenced
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		let mut offset = cycle;
		let mut traffic_index = 0;
		while traffic_index<self.traffics.len() && offset >= self.times[traffic_index]
		{
			offset -= self.times[traffic_index];
			traffic_index += 1;
		}
		assert!(traffic_index<self.traffics.len());
		self.traffics[traffic_index].generate_message(origin,cycle,topology,rng)
	}
	fn probability_per_cycle(&self,_server:usize) -> f32
	{
		//Can we do better here?
		1.0
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
	{
		for traffic in self.traffics.iter_mut()
		{
			if traffic.try_consume(server,message.clone(),cycle,topology,rng)
			{
				return true;
			}
		}
		return false;
	}
	fn is_finished(&self) -> bool
	{
		//This is a bit silly for a time sequence
		for traffic in self.traffics.iter()
		{
			if !traffic.is_finished()
			{
				return false;
			}
		}
		return true;
	}
	fn should_generate(&self, server:usize, cycle:usize, rng: &RefCell<StdRng>) -> bool
	{
		let mut offset = cycle;
		let mut traffic_index = 0;
		while traffic_index<self.traffics.len() && offset >= self.times[traffic_index]
		{
			offset -= self.times[traffic_index];
			traffic_index += 1;
		}
		if traffic_index<self.traffics.len(){
			self.traffics[traffic_index].should_generate(server,cycle,rng)
		} else {
			false
		}
	}
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
	{
		let mut offset = cycle;
		let mut traffic_index = 0;
		while traffic_index<self.traffics.len() && offset >= self.times[traffic_index]
		{
			offset -= self.times[traffic_index];
			traffic_index += 1;
		}
		if traffic_index == self.traffics.len()
		{
			return ServerTrafficState::Finished;
		}
		let state = self.traffics[traffic_index].server_state(server,cycle);
		if let ServerTrafficState::Finished = state {
			ServerTrafficState::WaitingCycle { cycle:self.times[traffic_index] }
		} else {
			state
		}
	}
}

impl TimeSequenced
{
	pub fn new(arg:TrafficBuilderArgument) -> TimeSequenced
	{
		let mut traffics=None;
		let mut times=None;
		match_object_panic!(arg.cv,"TimeSequenced",value,
			"traffics" => traffics = Some(value.as_array().expect("bad value for traffics").iter()
				.map(|v|new_traffic(TrafficBuilderArgument{cv:v,..arg})).collect()),
			"times" => times = Some(value.as_array()
				.expect("bad value for times").iter()
				.map(|v|v.as_f64().expect("bad value in times") as usize).collect()),
		);
		let traffics=traffics.expect("There were no traffics");
		let times=times.expect("There were no times");
		TimeSequenced{
			traffics,
			times,
		}
	}
}

///A sequence of traffics. When a traffic declares itself to be finished moves to the next.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct Sequence
{
	///List of applicable traffics.
	traffics: Vec<Box<dyn Traffic>>,
	//How many times to apply the whole traffic period. default to 1.
	//period_limit: usize,
	///The traffic which is currently in use.
	current_traffic: usize,
	//The period number, starting at 0. The whole traffic finishes before `current_period` reaching `period_limit`.
	//current_period: usize,
}

impl Traffic for Sequence
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		while self.traffics[self.current_traffic].is_finished()
		{
			self.current_traffic += 1;
			//self.current_traffic = (self.current_traffic + 1) % self.traffics.len();
		}
		assert!(self.current_traffic<self.traffics.len());
		self.traffics[self.current_traffic].generate_message(origin,cycle,topology,rng)
	}
	fn probability_per_cycle(&self,server:usize) -> f32
	{
		self.traffics[self.current_traffic].probability_per_cycle(server)
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> bool
	{
		for traffic in self.traffics.iter_mut()
		{
			if traffic.try_consume(server,message.clone(),cycle,topology,rng)
			{
				while self.current_traffic < self.traffics.len() && self.traffics[self.current_traffic].is_finished()
				{
					//self.current_traffic = (self.current_traffic + 1) % self.traffics.len();
					self.current_traffic += 1;
				}
				return true;
			}
		}
		return false;
	}
	fn is_finished(&self) -> bool
	{
		//return current_period == period_limit;
		return self.current_traffic>=self.traffics.len() || (self.current_traffic==self.traffics.len()-1 && self.traffics[self.current_traffic].is_finished())
	}
	fn should_generate(&self, server:usize, cycle:usize, rng: &RefCell<StdRng>) -> bool
	{
		if self.current_traffic>=self.traffics.len()
		{
			false
		} else {
			self.traffics[self.current_traffic].should_generate(server,cycle,rng)
		}
	}
	fn server_state(&self, server:usize, cycle:usize) -> ServerTrafficState
	{
		use ServerTrafficState::*;
		if self.current_traffic>=self.traffics.len()
		{
			Finished
		} else {
			let state = self.traffics[self.current_traffic].server_state(server,cycle);
			if let Finished=state{
				UnspecifiedWait
			} else {
				state
			}
			//In the last traffic we could try to check for FinishedGenerating
		}
	}
}

impl Sequence
{
	pub fn new(arg:TrafficBuilderArgument) -> Sequence
	{
		let mut traffics_args=None;
		let mut period_number=1usize;
		match_object_panic!(arg.cv,"Sequence",value,
			"traffics" => traffics_args = Some(value.as_array().expect("bad value for traffics")),
			"period_number" => period_number=value.as_f64().expect("bad value for period_number") as usize,
		);
		let traffics_args=traffics_args.expect("There were no traffics");
		let traffics = (0..period_number).flat_map(|_ip| traffics_args.iter().map(|v|new_traffic(TrafficBuilderArgument{cv:v,..arg})).collect::<Vec<_>>() ).collect();
		Sequence{
			traffics,
			current_traffic:0,
			//current_period:0,
		}
	}
}

/// Like the `Burst` pattern, but generating messages from different patterns and with different message sizes.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct MultimodalBurst
{
	///Number of servers applying this traffic.
	servers: usize,
	/// For each kind of message `provenance` we have
	/// `(pattern,total_messages,message_size,step_size)`
	/// a Pattern deciding the destination of the message
	/// a usize with the total number of messages of this kind that each server must generate
	/// a usize with the size of each message size.
	/// a usize with the number of messages to send of this kind before switching to the next one.
	provenance: Vec< (Box<dyn Pattern>,usize,usize,usize) >,
	///For each server and kind we track `pending[server][kind]=(total_remaining,step_remaining)`.
	///where `total_remaining` is the total number of messages of this kind that this server has yet to send.
	///and `step_remaining` is the number of messages that the server will send before switch to the next kind.
	pending: Vec<Vec<(usize,usize)>>,
	///For each server we track which provenance kind is the next one.
	///If for the annotated provenance there is not anything else to send then use the next one.
	next_provenance: Vec<usize>,
	///Set of generated messages.
	generated_messages: BTreeSet<*const Message>,
}

impl Traffic for MultimodalBurst
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		if origin>=self.servers
		{
			//panic!("origin {} does not belong to the traffic",origin);
			return Err(TrafficError::OriginOutsideTraffic);
		}
		let pending = &mut self.pending[origin];
		// Determine the kind to use.
		let mut provenance_index = self.next_provenance[origin];
		loop
		{
			let (ref mut total_remaining, ref mut step_remaining) = pending[provenance_index];
			if *total_remaining > 0
			{
				*step_remaining -=1;
				*total_remaining -=1;
				if *step_remaining == 0
				{
					//When the whole step is performed advance `next_provenance`.
					let (ref _pattern, _total_messages, _message_size, step_size) = self.provenance[provenance_index];
					*step_remaining = step_size;
					self.next_provenance[origin] = (provenance_index+1) % pending.len();
				}
				break;
			}
			provenance_index = (provenance_index+1) % pending.len();
		}
		// Build the message
		let (ref pattern,_total_messages,message_size,_step_size) = self.provenance[provenance_index];
		let destination=pattern.get_destination(origin,topology,rng);
		if origin==destination
		{
			return Err(TrafficError::SelfMessage);
		}
		let message = Rc::new(Message{
			origin,
			destination,
			size:message_size,
			creation_cycle: cycle,
		});
		self.generated_messages.insert(message.as_ref() as *const Message);
		Ok(message)
	}
	fn probability_per_cycle(&self, server:usize) -> f32
	{
		for (total_remaining,_step_remaining) in self.pending[server].iter()
		{
			if *total_remaining > 0
			{
				return 1.0;
			}
		}
		0.0
	}
	fn try_consume(&mut self, _server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
	{
		let message_ptr=message.as_ref() as *const Message;
		self.generated_messages.remove(&message_ptr)
	}
	fn is_finished(&self) -> bool
	{
		if !self.generated_messages.is_empty()
		{
			return false;
		}
		for server_pending in self.pending.iter()
		{
			for (total_remaining, _step_remaining) in server_pending.iter()
			{
				if *total_remaining > 0
				{
					return false;
				}
			}
		}
		true
	}
	fn server_state(&self, server:usize, _cycle:usize) -> ServerTrafficState
	{
		if self.pending[server].iter().any(|(total_remaining,_step_remaining)| *total_remaining > 0 ) {
			ServerTrafficState::Generating
		} else {
			//We do not know whether someone is sending us data.
			//if self.is_finished() { ServerTrafficState::Finished } else { ServerTrafficState::UnspecifiedWait }
			// Sometimes it could be Finished, but it is not worth computing...
			ServerTrafficState::FinishedGenerating
		}
	}
}

impl MultimodalBurst
{
	pub fn new(arg:TrafficBuilderArgument) -> MultimodalBurst
	{
		let mut servers=None;
		let mut provenance : Option<Vec<(_,_,_,_)>> = None;
		match_object_panic!(arg.cv,"MultimodalBurst",value,
			"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
			"provenance" => match value
			{
				&ConfigurationValue::Array(ref a) => provenance=Some(a.iter().map(|pcv|{
					let mut messages_per_server=None;
					let mut pattern=None;
					let mut message_size=None;
					let mut step_size=None;
					match_object_panic!(pcv,"Provenance",pvalue,
						"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:pvalue,plugs:arg.plugs})),
						"messages_per_server" | "total_messages" =>
							messages_per_server=Some(pvalue.as_f64().expect("bad value for messages_per_server") as usize),
						"message_size" => message_size=Some(pvalue.as_f64().expect("bad value for message_size") as usize),
						"step_size" => step_size=Some(pvalue.as_f64().expect("bad value for step_size") as usize),
					);
					let pattern=pattern.expect("There were no pattern");
					let messages_per_server=messages_per_server.expect("There were no messages_per_server");
					let message_size=message_size.expect("There were no message_size");
					let step_size=step_size.expect("There were no step_size");
					(pattern,messages_per_server,message_size,step_size)
				}).collect()),
				_ => panic!("bad value for provenance"),
			}
		);
		let servers=servers.expect("There were no servers");
		let mut provenance=provenance.expect("There were no provenance");
		for (pattern,_total_messages,_message_size,_step_size) in provenance.iter_mut()
		{
			pattern.initialize(servers, servers, arg.topology, arg.rng);
		}
		let each_pending = provenance.iter().map(|(_pattern,total_messages,_message_size,step_size)|(*total_messages,*step_size)).collect();
		MultimodalBurst{
			servers,
			provenance,
			//pending_messages:vec![messages_per_server;servers],
			//pending:vec![vec![(0,?);provenance.len()];servers],
			pending: vec![each_pending;servers],
			next_provenance:vec![0;servers],
			generated_messages: BTreeSet::new(),
		}
	}
}


///In this traffic each server has a limited amount of data that can send over the amount it has received.
///For example, with `bound=1` after a server sends a message it must wait to receive one.
///And if received `x` messages then it may generate `x+bound` before having to wait.
///All messages have same size, follow the same pattern.
#[derive(Quantifiable)]
#[derive(Debug)]
pub struct BoundedDifference
{
	///Number of servers applying this traffic.
	servers: usize,
	///The pattern of the communication.
	pattern: Box<dyn Pattern>,
	///The size of each sent message.
	message_size: usize,
	///The load offered to the network. Proportion of the cycles that should be injecting phits.
	load: f32,
	///The number of messages each server may generate over the amount it has received.
	bound: usize,
	///Set of generated messages.
	generated_messages: BTreeSet<*const Message>,
	///The number of messages each server is currently allowed to generate until they consume more.
	///It is initialized to `bound`.
	allowance: Vec<usize>,
}

impl Traffic for BoundedDifference
{
	fn generate_message(&mut self, origin:usize, cycle:usize, topology:&dyn Topology, rng: &RefCell<StdRng>) -> Result<Rc<Message>,TrafficError>
	{
		if origin>=self.servers
		{
			//panic!("origin {} does not belong to the traffic",origin);
			return Err(TrafficError::OriginOutsideTraffic);
		}
		assert!(self.allowance[origin]>0,"Origin {} has no allowance to send more messages.",origin);
		let destination=self.pattern.get_destination(origin,topology,rng);
		if origin==destination
		{
			return Err(TrafficError::SelfMessage);
		}
		self.allowance[origin]-=1;
		let message=Rc::new(Message{
			origin,
			destination,
			size:self.message_size,
			creation_cycle: cycle,
		});
		self.generated_messages.insert(message.as_ref() as *const Message);
		Ok(message)
	}
	fn probability_per_cycle(&self, server:usize) -> f32
	{
		if self.allowance[server]>0
		{
			let r=self.load/self.message_size as f32;
			//println!("load={} r={} size={}",self.load,r,self.message_size);
			if r>1.0
			{
				1.0
			}
			else
			{
				r
			}
		} else { 0f32 }
	}
	fn try_consume(&mut self, server:usize, message: Rc<Message>, _cycle:usize, _topology:&dyn Topology, _rng: &RefCell<StdRng>) -> bool
	{
		let message_ptr=message.as_ref() as *const Message;
		self.allowance[server]+=1;
		self.generated_messages.remove(&message_ptr)
	}
	fn is_finished(&self) -> bool
	{
		false
	}
	fn server_state(&self, server:usize, _cycle:usize) -> ServerTrafficState
	{
		if self.allowance[server]>0 {
			ServerTrafficState::Generating
		} else {
			ServerTrafficState::WaitingData
		}
	}
}

impl BoundedDifference
{
	pub fn new(arg:TrafficBuilderArgument) -> BoundedDifference
	{
		let mut servers=None;
		let mut load=None;
		let mut pattern=None;
		let mut message_size=None;
		let mut bound=None;
		match_object_panic!(arg.cv,"BoundedDifference",value,
			"pattern" => pattern=Some(new_pattern(PatternBuilderArgument{cv:value,plugs:arg.plugs})),
			"servers" => servers=Some(value.as_f64().expect("bad value for servers") as usize),
			"load" => load=Some(value.as_f64().expect("bad value for load") as f32),
			"message_size" => message_size=Some(value.as_f64().expect("bad value for message_size") as usize),
			"bound" => bound=Some(value.as_f64().expect("bad value for bound") as usize),
		);
		let servers=servers.expect("There were no servers");
		let message_size=message_size.expect("There were no message_size");
		let bound=bound.expect("There were no bound");
		let load=load.expect("There were no load");
		let mut pattern=pattern.expect("There were no pattern");
		let topo_servers=arg.topology.num_servers();
		if servers != topo_servers
		{
			println!("WARNING: Generating traffic over {} servers when the topology has {} servers.",servers,topo_servers);
		}
		pattern.initialize(servers, servers, arg.topology, arg.rng);
		BoundedDifference{
			servers,
			pattern,
			message_size,
			load,
			bound,
			generated_messages: BTreeSet::new(),
			allowance: vec![bound;servers],
		}
	}
}