simul/
lib.rs

1//! `simul` is a discrete-event simulation library for running high-level
2//! simulations of real-world problems and for running simulated experiments.
3//!
4//! `simul` is a *discrete-event simulator* using *incremental time
5//! progression*, with [M/M/c queues](https://en.wikipedia.org/wiki/M/M/c_queue)
6//! for interactions between agents. It also supports some forms of
7//! experimentation and simulated annealing to replicate a simulation many
8//! times, varying the simulation parameters.
9//!
10//! Use-cases:
11//! - [Discrete-event simulation](https://en.wikipedia.org/wiki/Discrete-event_simulation)
12//! - [Complex adaptive systems](https://authors.library.caltech.edu/60491/1/MGM%20113.pdf)
13//! - [Simulated annealing](https://en.wikipedia.org/wiki/Simulated_annealing)
14//! - [Job-shop scheduling](https://en.wikipedia.org/wiki/Job-shop_scheduling)
15//! - [Birth-death processes](https://en.wikipedia.org/wiki/Birth%E2%80%93death_process)
16//! - [Computer experiments](https://en.wikipedia.org/wiki/Computer_experiment)
17//! - Other: simulating logistics, operations research problems, running
18//!   experiments to approximate a global optimum, simulating queueing systems,
19//!   distributed systems, performance engineering/analysis, and so on.
20//!
21
22extern crate self as simul;
23pub mod agent;
24pub mod experiment;
25pub mod message;
26
27pub use agent::*;
28pub use message::*;
29
30use log::{debug, info};
31use std::collections::HashMap;
32
33/// `DiscreteTime` is a Simulation's internal representation of time.
34pub type DiscreteTime = u64;
35
36/// The current mode of a Simulation.
37#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
38pub enum SimulationMode {
39    /// The Simulation has only been constructed.
40    Constructed,
41    /// The Simulation is actively simulating.
42    Running,
43    /// The Simulation successfully reached the halt condition.
44    Completed,
45    /// The Simulation catastrophically crashed.
46    Failed,
47}
48
49/// A Simulation struct is responsible to hold all the state for a simulation
50/// and coordinates the actions and interactions of the agents.
51///
52/// A Simulation has its own concept of time, which is implemented as discrete
53/// ticks of the u64 field `time`. Every tick is modeled as an instantaneous
54/// point in time at which interactions can occur. The Simulation engine uses a
55/// concept of `Messages` to communicate between agents. Agents can receive
56/// messages and send messages to other Agents.
57#[derive(Clone, Debug)]
58pub struct Simulation {
59    /// The agents within the simulation, e.g. adaptive agents.
60    agents: Vec<SimulationAgent>,
61
62    /// The current discrete time of the Simulation.
63    time: DiscreteTime,
64
65    /// A halt check function: given the state of the Simulation determine halt or not.
66    halt_check: fn(&Simulation) -> bool,
67
68    /// Whether to record metrics on queue depths. Takes space.
69    enable_queue_depth_metric: bool,
70
71    /// Records a metric on the number of cycles an agent was asleep for.
72    enable_agent_asleep_cycles_metric: bool,
73
74    /// The mode of the Simulation.
75    mode: SimulationMode,
76
77    /// Maps from an Agent's id to its index, a handle for indexing the Agent.
78    agent_name_handle_map: HashMap<String, usize>,
79}
80
81/// The parameters to create a Simulation.
82#[derive(Clone, Debug)]
83pub struct SimulationParameters {
84    /// The agents within the simulation, e.g. adaptive agents.
85    /// See here: <https://authors.library.caltech.edu/60491/1/MGM%20113.pdf>
86    pub agent_initializers: Vec<AgentInitializer>,
87
88    /// Given the state of the Simulation a function that determines if the Simulation is complete.
89    pub halt_check: fn(&Simulation) -> bool,
90
91    /// The discrete time at which the simulation should begin.
92    /// For the vast majority of simulations, 0 is the correct default.
93    pub starting_time: DiscreteTime,
94
95    /// Whether to record metrics on queue depths at every tick of the simulation.
96    pub enable_queue_depth_metrics: bool,
97
98    /// Records a metric on the number of cycles an agent was asleep for.
99    pub enable_agent_asleep_cycles_metric: bool,
100}
101
102impl Default for SimulationParameters {
103    fn default() -> Self {
104        Self {
105            agent_initializers: vec![],
106            halt_check: |_| true,
107            starting_time: 0,
108            enable_queue_depth_metrics: false,
109            enable_agent_asleep_cycles_metric: false,
110        }
111    }
112}
113
114impl Simulation {
115    #[must_use]
116    pub fn new(parameters: SimulationParameters) -> Self {
117        // TODO(jmqd): Add the handle id to the agents here, use instead of mapping.
118        let agent_name_handle_map: HashMap<String, usize> = parameters
119            .agent_initializers
120            .iter()
121            .enumerate()
122            .map(|(i, agent_initializer)| (agent_initializer.options.name.clone(), i))
123            .collect();
124
125        let agents: Vec<SimulationAgent> = parameters
126            .agent_initializers
127            .into_iter()
128            .map(|agent_initializer| SimulationAgent {
129                agent: agent_initializer.agent,
130                name: agent_initializer.options.name,
131                metadata: AgentMetadata::default(),
132                state: AgentState {
133                    mode: agent_initializer.options.initial_mode,
134                    wake_mode: agent_initializer.options.wake_mode,
135                    queue: agent_initializer.options.initial_queue,
136                    consumed: vec![],
137                    produced: vec![],
138                },
139            })
140            .collect();
141
142        Self {
143            mode: SimulationMode::Constructed,
144            agents,
145            halt_check: parameters.halt_check,
146            time: parameters.starting_time,
147            enable_queue_depth_metric: parameters.enable_queue_depth_metrics,
148            enable_agent_asleep_cycles_metric: parameters.enable_agent_asleep_cycles_metric,
149            agent_name_handle_map,
150        }
151    }
152
153    /// Returns the consumed messages for a given Agent during the Simulation.
154    #[must_use]
155    pub fn consumed_for_agent(&self, name: &str) -> Option<&[Message]> {
156        Some(&self.find_by_name(name)?.state.consumed)
157    }
158
159    /// Returns a `SimulationAgent` by name.
160    #[must_use]
161    pub fn find_by_name(&self, name: &str) -> Option<&SimulationAgent> {
162        self.agent_name_handle_map
163            .get(name)
164            .map(|id| self.agents.get(*id))?
165    }
166
167    /// Returns a `SimulationAgent` by name.
168    pub fn find_by_name_mut(&mut self, name: &str) -> Option<&mut SimulationAgent> {
169        self.agent_name_handle_map
170            .get(name)
171            .map(|id| self.agents.get_mut(*id))?
172    }
173
174    /// Returns the produced messages for a given Agent during the Simulation.
175    #[must_use]
176    pub fn produced_for_agent(&self, name: &str) -> Option<&[Message]> {
177        Some(&self.find_by_name(name)?.state.produced)
178    }
179
180    /// Returns the queue depth timeseries for a given Agent during the Simulation.
181    #[must_use]
182    pub fn queue_depth_metrics(&self, name: &str) -> Option<&[usize]> {
183        Some(&self.find_by_name(name)?.metadata.queue_depth_metrics)
184    }
185
186    /// Returns the asleep cycle count for a given Agent during the Simulation.
187    #[must_use]
188    pub fn asleep_cycle_count(&self, name: &str) -> Option<DiscreteTime> {
189        Some(self.find_by_name(name)?.metadata.asleep_cycle_count)
190    }
191
192    /// Runs the simulation. This should only be called after adding all the beginning state.
193    pub fn run(&mut self) {
194        self.mode = SimulationMode::Running;
195        let mut command_buffer: Vec<AgentCommand> = vec![];
196
197        while !(self.halt_check)(self) {
198            debug!("Running next tick of simulation at time {}", self.time);
199            self.wakeup_agents_scheduled_to_wakeup_now();
200
201            for agent_handle in 0..self.agents.len() {
202                let agent = &mut self.agents[agent_handle];
203                let queued_msg = agent.state.queue.pop_front();
204
205                if self.enable_queue_depth_metric {
206                    agent
207                        .metadata
208                        .queue_depth_metrics
209                        .push(agent.state.queue.len());
210                }
211
212                let mut agent_commands: Vec<AgentCommandType> = vec![];
213
214                let mut ctx = AgentContext {
215                    handle: agent_handle,
216                    name: &agent.name,
217                    time: self.time,
218                    commands: &mut agent_commands,
219                    state: &agent.state,
220                    message_processing_status: MessageProcessingStatus::NoError,
221                };
222
223                match agent.state.mode {
224                    AgentMode::Proactive => agent.agent.on_tick(&mut ctx),
225                    AgentMode::Reactive => {
226                        if let Some(msg) = queued_msg {
227                            // TODO(jmqd): agent.agent is not pretty; fix this composition naming.
228                            agent.agent.on_message(&mut ctx, &msg);
229
230                            match ctx.message_processing_status {
231                                MessageProcessingStatus::InProgress => {
232                                    agent.state.queue.push_front(msg);
233                                }
234                                MessageProcessingStatus::NoError => {
235                                    agent.state.consumed.push(Message {
236                                        completed_time: Some(self.time),
237                                        ..msg
238                                    });
239                                }
240                            }
241                        }
242                    }
243                    AgentMode::AsleepUntil(_) => {
244                        if self.enable_agent_asleep_cycles_metric {
245                            agent.metadata.asleep_cycle_count += 1;
246                        }
247                    }
248                    AgentMode::Dead => {}
249                }
250
251                command_buffer.extend(agent_commands.into_iter().map(|command_type| {
252                    AgentCommand {
253                        ty: command_type,
254                        agent_handle,
255                    }
256                }));
257            }
258
259            // Consume all the new messages in the bus and deliver to agents.
260            self.process_command_buffer(&mut command_buffer);
261
262            debug!("Finished this tick; incrementing time.");
263            self.time += 1;
264        }
265
266        self.mode = SimulationMode::Completed;
267        self.emit_completed_simulation_debug_logging();
268    }
269
270    /// A helper to calculate the average waiting time to process items.
271    /// Note: This function will likely go away; it is an artifact of prototyping.
272    #[must_use]
273    pub fn calc_avg_wait_statistics(&self) -> HashMap<String, f64> {
274        let mut data = HashMap::new();
275        for agent in self
276            .agents
277            .iter()
278            .filter(|agent| !agent.state.consumed.is_empty())
279        {
280            let mut sum_of_times: f64 = 0f64;
281            for completed in &agent.state.consumed {
282                sum_of_times += completed.completed_time.unwrap_or(completed.queued_time) as f64
283                    - completed.queued_time as f64;
284            }
285
286            data.insert(
287                agent.name.clone(),
288                sum_of_times / agent.state.consumed.len() as f64,
289            );
290        }
291
292        data
293    }
294
295    /// Calculates the statistics of queue lengths.
296    /// Mostly useful for checking which agents still have queues of work after halting.
297    #[must_use]
298    pub fn calc_queue_len_statistics(&self) -> HashMap<String, usize> {
299        let mut data = HashMap::new();
300
301        for agent in &self.agents {
302            data.insert(agent.name.clone(), agent.state.queue.len());
303        }
304
305        data
306    }
307
308    /// Calculates the length of the consumed messages for each Agent.
309    #[must_use]
310    pub fn calc_consumed_len_statistics(&self) -> HashMap<String, usize> {
311        let mut data = HashMap::new();
312
313        for agent in &self.agents {
314            data.insert(agent.name.clone(), agent.state.consumed.len());
315        }
316
317        data
318    }
319
320    /// Calculates the length of the produced messages for each Agent.
321    #[must_use]
322    pub fn calc_produced_len_statistics(&self) -> HashMap<String, usize> {
323        let mut data = HashMap::new();
324
325        for agent in &self.agents {
326            data.insert(agent.name.clone(), agent.state.produced.len());
327        }
328
329        data
330    }
331
332    /// SAFETY: The caller must ensure that `handle` is within the bounds of `self.agents`.
333    unsafe fn agent_by_handle_mut_unchecked(&mut self, handle: usize) -> &mut SimulationAgent {
334        unsafe { self.agents.get_unchecked_mut(handle) }
335    }
336
337    /// Emits debug logging w/ analytical stats.
338    fn emit_completed_simulation_debug_logging(&self) {
339        let queue_len_stats = self.calc_queue_len_statistics();
340        let consumed_len_stats = self.calc_consumed_len_statistics();
341        let avg_wait_stats = self.calc_avg_wait_statistics();
342        let produced_len_stats = self.calc_produced_len_statistics();
343
344        debug!("Queues: {queue_len_stats:?}");
345        debug!("Consumed: {consumed_len_stats:?}");
346        debug!("Produced: {produced_len_stats:?}");
347        debug!("Average processing time: {avg_wait_stats:?}");
348    }
349
350    /// Consume a `message_bus` of messages and disperse those messages to the agents.
351    /// If there are any interrupts, process those immediately.
352    fn process_command_buffer(&mut self, command_buffer: &mut Vec<AgentCommand>) {
353        while let Some(command) = command_buffer.pop() {
354            match command.ty {
355                AgentCommandType::SendMessage(message) => {
356                    if let Some(receiver) = self.find_by_name_mut(&message.destination) {
357                        receiver.state.queue.push_back(message.clone());
358                    }
359
360                    let commanding_agent =
361                        unsafe { self.agent_by_handle_mut_unchecked(command.agent_handle) };
362
363                    commanding_agent.state.produced.push(message);
364                }
365
366                AgentCommandType::HaltSimulation(reason) => {
367                    info!("Received a halt interrupt: {reason:?}");
368                    self.mode = SimulationMode::Completed;
369                }
370
371                AgentCommandType::Sleep(ticks) => {
372                    let sleep_until = self.time + ticks;
373                    let commanding_agent =
374                        unsafe { self.agent_by_handle_mut_unchecked(command.agent_handle) };
375
376                    commanding_agent.state.mode = AgentMode::AsleepUntil(sleep_until);
377                }
378            }
379        }
380    }
381
382    /// An internal function used to wakeup sleeping Agents due to wake.
383    fn wakeup_agents_scheduled_to_wakeup_now(&mut self) {
384        for agent in &mut self.agents {
385            if let AgentMode::AsleepUntil(wakeup_at) = agent.state.mode {
386                if self.time >= wakeup_at {
387                    agent.state.mode = agent.state.wake_mode;
388                }
389            }
390        }
391    }
392
393    /// Searches for an agent in the Simulation matching the given predicate.
394    pub fn find_agent<P>(&self, predicate: P) -> Option<&SimulationAgent>
395    where
396        P: FnMut(&&SimulationAgent) -> bool,
397    {
398        self.agents.iter().find(predicate)
399    }
400
401    /// Checks whether all agents match the given predicate.
402    pub fn all_agents<P>(&self, predicate: P) -> bool
403    where
404        P: FnMut(&SimulationAgent) -> bool,
405    {
406        self.agents.iter().all(predicate)
407    }
408
409    /// Returns a slice of the Agents in the Simulation.
410    #[must_use]
411    pub fn agents(&self) -> &[SimulationAgent] {
412        self.agents.iter().as_slice()
413    }
414
415    /// Returns the current `DiscreteTime` tick for the Simulation.
416    #[must_use]
417    pub const fn time(&self) -> DiscreteTime {
418        self.time
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use super::*;
425    use rand_distr::Poisson;
426
427    fn init() {
428        let _ = env_logger::builder().is_test(true).try_init();
429    }
430
431    #[test]
432    fn basic_periodic_test() {
433        init();
434        let mut simulation = Simulation::new(SimulationParameters {
435            agent_initializers: vec![
436                periodic_producer("producer".to_string(), 1, "consumer".to_string()),
437                periodic_consumer("consumer".to_string(), 1),
438            ],
439            halt_check: |s: &Simulation| s.time == 5,
440            ..Default::default()
441        });
442        simulation.run();
443        let produced_stats = simulation.calc_produced_len_statistics();
444        assert_eq!(produced_stats.get("producer"), Some(&5));
445        assert_eq!(produced_stats.get("consumer"), Some(&0));
446
447        let consumed_stats = simulation.calc_consumed_len_statistics();
448        assert_eq!(consumed_stats.get("producer"), Some(&0));
449        assert_eq!(consumed_stats.get("consumer"), Some(&4));
450    }
451
452    #[test]
453    fn starbucks_clerk() -> Result<(), Box<dyn std::error::Error>> {
454        #[derive(Debug, Clone)]
455        struct Clerk {}
456
457        impl Agent for Clerk {
458            fn on_message(&mut self, ctx: &mut AgentContext, msg: &Message) {
459                debug!("{} looking for a customer.", ctx.name);
460                if let Some(last) = ctx.state.consumed.last() {
461                    if last.completed_time.is_some_and(|t| t + 60 > ctx.time) {
462                        debug!("Sorry, we're still serving the last customer.");
463                    }
464                }
465
466                if let Some(_msg) = ctx.state.queue.front() {
467                    if msg.queued_time + 100 > ctx.time {
468                        debug!("Still making your coffee, sorry!");
469                        ctx.set_processing_status(MessageProcessingStatus::InProgress);
470                    }
471
472                    debug!("Serviced a customer!");
473                }
474            }
475        }
476
477        init();
478
479        let mut simulation = Simulation::new(SimulationParameters {
480            starting_time: 1,
481            enable_queue_depth_metrics: false,
482            enable_agent_asleep_cycles_metric: false,
483            halt_check: |s: &Simulation| s.time > 500,
484            agent_initializers: vec![
485                poisson_distributed_producer(
486                    "Starbucks Customers".to_string(),
487                    Poisson::new(80.0_f64)?,
488                    "Starbucks Clerk".to_string(),
489                ),
490                AgentInitializer {
491                    agent: Box::new(Clerk {}),
492                    options: AgentOptions::defaults_with_name("Starbucks Clerk".to_string()),
493                },
494            ],
495        });
496
497        simulation.run();
498        assert!(Some(simulation).is_some());
499        Ok(())
500    }
501}