extern crate self as simul;
pub mod agent;
pub mod experiment;
pub mod message;
pub use agent::*;
pub use message::*;
use log::{debug, info};
use std::collections::HashMap;
pub type DiscreteTime = u64;
#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub enum SimulationMode {
Constructed,
Running,
Completed,
Failed,
}
#[derive(Clone, Debug)]
pub struct Simulation {
agents: Vec<SimulationAgent>,
time: DiscreteTime,
halt_check: fn(&Simulation) -> bool,
enable_queue_depth_metric: bool,
enable_agent_asleep_cycles_metric: bool,
mode: SimulationMode,
agent_name_handle_map: HashMap<String, usize>,
}
#[derive(Clone, Debug)]
pub struct SimulationParameters {
pub agent_initializers: Vec<AgentInitializer>,
pub halt_check: fn(&Simulation) -> bool,
pub starting_time: DiscreteTime,
pub enable_queue_depth_metrics: bool,
pub enable_agent_asleep_cycles_metric: bool,
}
impl Default for SimulationParameters {
fn default() -> Self {
Self {
agent_initializers: vec![],
halt_check: |_| true,
starting_time: 0,
enable_queue_depth_metrics: false,
enable_agent_asleep_cycles_metric: false,
}
}
}
impl Simulation {
#[must_use]
pub fn new(parameters: SimulationParameters) -> Self {
let agent_name_handle_map: HashMap<String, usize> = parameters
.agent_initializers
.iter()
.enumerate()
.map(|(i, agent_initializer)| (agent_initializer.options.name.clone(), i))
.collect();
let agents: Vec<SimulationAgent> = parameters
.agent_initializers
.into_iter()
.map(|agent_initializer| SimulationAgent {
agent: agent_initializer.agent,
name: agent_initializer.options.name,
metadata: AgentMetadata::default(),
state: AgentState {
mode: agent_initializer.options.initial_mode,
wake_mode: agent_initializer.options.wake_mode,
queue: agent_initializer.options.initial_queue,
consumed: vec![],
produced: vec![],
},
})
.collect();
Self {
mode: SimulationMode::Constructed,
agents,
halt_check: parameters.halt_check,
time: parameters.starting_time,
enable_queue_depth_metric: parameters.enable_queue_depth_metrics,
enable_agent_asleep_cycles_metric: parameters.enable_agent_asleep_cycles_metric,
agent_name_handle_map,
}
}
#[must_use]
pub fn consumed_for_agent(&self, name: &str) -> Option<&[Message]> {
Some(&self.find_by_name(name)?.state.consumed)
}
#[must_use]
pub fn find_by_name(&self, name: &str) -> Option<&SimulationAgent> {
self.agent_name_handle_map
.get(name)
.map(|id| self.agents.get(*id))?
}
pub fn find_by_name_mut(&mut self, name: &str) -> Option<&mut SimulationAgent> {
self.agent_name_handle_map
.get(name)
.map(|id| self.agents.get_mut(*id))?
}
#[must_use]
pub fn produced_for_agent(&self, name: &str) -> Option<&[Message]> {
Some(&self.find_by_name(name)?.state.produced)
}
#[must_use]
pub fn queue_depth_metrics(&self, name: &str) -> Option<&[usize]> {
Some(&self.find_by_name(name)?.metadata.queue_depth_metrics)
}
#[must_use]
pub fn asleep_cycle_count(&self, name: &str) -> Option<DiscreteTime> {
Some(self.find_by_name(name)?.metadata.asleep_cycle_count)
}
pub fn run(&mut self) {
self.mode = SimulationMode::Running;
let mut command_buffer: Vec<AgentCommand> = vec![];
while !(self.halt_check)(self) {
debug!("Running next tick of simulation at time {}", self.time);
self.wakeup_agents_scheduled_to_wakeup_now();
for agent_handle in 0..self.agents.len() {
let agent = &mut self.agents[agent_handle];
let queued_msg = agent.state.queue.pop_front();
if self.enable_queue_depth_metric {
agent
.metadata
.queue_depth_metrics
.push(agent.state.queue.len());
}
let mut agent_commands: Vec<AgentCommandType> = vec![];
let mut ctx = AgentContext {
handle: agent_handle,
name: &agent.name,
time: self.time,
commands: &mut agent_commands,
state: &agent.state,
message_processing_status: MessageProcessingStatus::NoError,
};
match agent.state.mode {
AgentMode::Proactive => agent.agent.on_tick(&mut ctx),
AgentMode::Reactive => {
if let Some(msg) = queued_msg {
agent.agent.on_message(&mut ctx, &msg);
match ctx.message_processing_status {
MessageProcessingStatus::InProgress => {
agent.state.queue.push_front(msg);
}
MessageProcessingStatus::NoError => {
agent.state.consumed.push(Message {
completed_time: Some(self.time),
..msg
});
}
}
}
}
AgentMode::AsleepUntil(_) => {
if self.enable_agent_asleep_cycles_metric {
agent.metadata.asleep_cycle_count += 1;
}
}
AgentMode::Dead => {}
}
command_buffer.extend(agent_commands.into_iter().map(|command_type| {
AgentCommand {
ty: command_type,
agent_handle,
}
}));
}
self.process_command_buffer(&mut command_buffer);
debug!("Finished this tick; incrementing time.");
self.time += 1;
}
self.mode = SimulationMode::Completed;
self.emit_completed_simulation_debug_logging();
}
#[must_use]
pub fn calc_avg_wait_statistics(&self) -> HashMap<String, f64> {
let mut data = HashMap::new();
for agent in self
.agents
.iter()
.filter(|agent| !agent.state.consumed.is_empty())
{
let mut sum_of_times: f64 = 0f64;
for completed in &agent.state.consumed {
sum_of_times += completed.completed_time.unwrap_or(completed.queued_time) as f64
- completed.queued_time as f64;
}
data.insert(
agent.name.clone(),
sum_of_times / agent.state.consumed.len() as f64,
);
}
data
}
#[must_use]
pub fn calc_queue_len_statistics(&self) -> HashMap<String, usize> {
let mut data = HashMap::new();
for agent in &self.agents {
data.insert(agent.name.clone(), agent.state.queue.len());
}
data
}
#[must_use]
pub fn calc_consumed_len_statistics(&self) -> HashMap<String, usize> {
let mut data = HashMap::new();
for agent in &self.agents {
data.insert(agent.name.clone(), agent.state.consumed.len());
}
data
}
#[must_use]
pub fn calc_produced_len_statistics(&self) -> HashMap<String, usize> {
let mut data = HashMap::new();
for agent in &self.agents {
data.insert(agent.name.clone(), agent.state.produced.len());
}
data
}
unsafe fn agent_by_handle_mut_unchecked(&mut self, handle: usize) -> &mut SimulationAgent {
unsafe { self.agents.get_unchecked_mut(handle) }
}
fn emit_completed_simulation_debug_logging(&self) {
let queue_len_stats = self.calc_queue_len_statistics();
let consumed_len_stats = self.calc_consumed_len_statistics();
let avg_wait_stats = self.calc_avg_wait_statistics();
let produced_len_stats = self.calc_produced_len_statistics();
debug!("Queues: {queue_len_stats:?}");
debug!("Consumed: {consumed_len_stats:?}");
debug!("Produced: {produced_len_stats:?}");
debug!("Average processing time: {avg_wait_stats:?}");
}
fn process_command_buffer(&mut self, command_buffer: &mut Vec<AgentCommand>) {
while let Some(command) = command_buffer.pop() {
match command.ty {
AgentCommandType::SendMessage(message) => {
if let Some(receiver) = self.find_by_name_mut(&message.destination) {
receiver.state.queue.push_back(message.clone());
}
let commanding_agent =
unsafe { self.agent_by_handle_mut_unchecked(command.agent_handle) };
commanding_agent.state.produced.push(message);
}
AgentCommandType::HaltSimulation(reason) => {
info!("Received a halt interrupt: {reason:?}");
self.mode = SimulationMode::Completed;
}
AgentCommandType::Sleep(ticks) => {
let sleep_until = self.time + ticks;
let commanding_agent =
unsafe { self.agent_by_handle_mut_unchecked(command.agent_handle) };
commanding_agent.state.mode = AgentMode::AsleepUntil(sleep_until);
}
}
}
}
fn wakeup_agents_scheduled_to_wakeup_now(&mut self) {
for agent in &mut self.agents {
if let AgentMode::AsleepUntil(wakeup_at) = agent.state.mode {
if self.time >= wakeup_at {
agent.state.mode = agent.state.wake_mode;
}
}
}
}
pub fn find_agent<P>(&self, predicate: P) -> Option<&SimulationAgent>
where
P: FnMut(&&SimulationAgent) -> bool,
{
self.agents.iter().find(predicate)
}
pub fn all_agents<P>(&self, predicate: P) -> bool
where
P: FnMut(&SimulationAgent) -> bool,
{
self.agents.iter().all(predicate)
}
#[must_use]
pub fn agents(&self) -> &[SimulationAgent] {
self.agents.iter().as_slice()
}
#[must_use]
pub const fn time(&self) -> DiscreteTime {
self.time
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand_distr::Poisson;
fn init() {
let _ = env_logger::builder().is_test(true).try_init();
}
#[test]
fn basic_periodic_test() {
init();
let mut simulation = Simulation::new(SimulationParameters {
agent_initializers: vec![
periodic_producer("producer".to_string(), 1, "consumer".to_string()),
periodic_consumer("consumer".to_string(), 1),
],
halt_check: |s: &Simulation| s.time == 5,
..Default::default()
});
simulation.run();
let produced_stats = simulation.calc_produced_len_statistics();
assert_eq!(produced_stats.get("producer"), Some(&5));
assert_eq!(produced_stats.get("consumer"), Some(&0));
let consumed_stats = simulation.calc_consumed_len_statistics();
assert_eq!(consumed_stats.get("producer"), Some(&0));
assert_eq!(consumed_stats.get("consumer"), Some(&4));
}
#[test]
fn starbucks_clerk() -> Result<(), Box<dyn std::error::Error>> {
#[derive(Debug, Clone)]
struct Clerk {}
impl Agent for Clerk {
fn on_message(&mut self, ctx: &mut AgentContext, msg: &Message) {
debug!("{} looking for a customer.", ctx.name);
if let Some(last) = ctx.state.consumed.last() {
if last.completed_time.is_some_and(|t| t + 60 > ctx.time) {
debug!("Sorry, we're still serving the last customer.");
}
}
if let Some(_msg) = ctx.state.queue.front() {
if msg.queued_time + 100 > ctx.time {
debug!("Still making your coffee, sorry!");
ctx.set_processing_status(MessageProcessingStatus::InProgress);
}
debug!("Serviced a customer!");
}
}
}
init();
let mut simulation = Simulation::new(SimulationParameters {
starting_time: 1,
enable_queue_depth_metrics: false,
enable_agent_asleep_cycles_metric: false,
halt_check: |s: &Simulation| s.time > 500,
agent_initializers: vec![
poisson_distributed_producer(
"Starbucks Customers".to_string(),
Poisson::new(80.0_f64)?,
"Starbucks Clerk".to_string(),
),
AgentInitializer {
agent: Box::new(Clerk {}),
options: AgentOptions::defaults_with_name("Starbucks Clerk".to_string()),
},
],
});
simulation.run();
assert!(Some(simulation).is_some());
Ok(())
}
}