1extern crate self as simul;
23pub mod agent;
24pub mod experiment;
25pub mod message;
26
27pub use agent::*;
28pub use message::*;
29
30use log::{debug, info};
31use std::collections::HashMap;
32
33pub type DiscreteTime = u64;
35
36#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
38pub enum SimulationMode {
39 Constructed,
41 Running,
43 Completed,
45 Failed,
47}
48
49#[derive(Clone, Debug)]
58pub struct Simulation {
59 agents: Vec<SimulationAgent>,
61
62 time: DiscreteTime,
64
65 halt_check: fn(&Simulation) -> bool,
67
68 enable_queue_depth_metric: bool,
70
71 enable_agent_asleep_cycles_metric: bool,
73
74 mode: SimulationMode,
76
77 agent_name_handle_map: HashMap<String, usize>,
79}
80
81#[derive(Clone, Debug)]
83pub struct SimulationParameters {
84 pub agent_initializers: Vec<AgentInitializer>,
87
88 pub halt_check: fn(&Simulation) -> bool,
90
91 pub starting_time: DiscreteTime,
94
95 pub enable_queue_depth_metrics: bool,
97
98 pub enable_agent_asleep_cycles_metric: bool,
100}
101
102impl Default for SimulationParameters {
103 fn default() -> Self {
104 Self {
105 agent_initializers: vec![],
106 halt_check: |_| true,
107 starting_time: 0,
108 enable_queue_depth_metrics: false,
109 enable_agent_asleep_cycles_metric: false,
110 }
111 }
112}
113
114impl Simulation {
115 #[must_use]
116 pub fn new(parameters: SimulationParameters) -> Self {
117 let agent_name_handle_map: HashMap<String, usize> = parameters
119 .agent_initializers
120 .iter()
121 .enumerate()
122 .map(|(i, agent_initializer)| (agent_initializer.options.name.clone(), i))
123 .collect();
124
125 let agents: Vec<SimulationAgent> = parameters
126 .agent_initializers
127 .into_iter()
128 .map(|agent_initializer| SimulationAgent {
129 agent: agent_initializer.agent,
130 name: agent_initializer.options.name,
131 metadata: AgentMetadata::default(),
132 state: AgentState {
133 mode: agent_initializer.options.initial_mode,
134 wake_mode: agent_initializer.options.wake_mode,
135 queue: agent_initializer.options.initial_queue,
136 consumed: vec![],
137 produced: vec![],
138 },
139 })
140 .collect();
141
142 Self {
143 mode: SimulationMode::Constructed,
144 agents,
145 halt_check: parameters.halt_check,
146 time: parameters.starting_time,
147 enable_queue_depth_metric: parameters.enable_queue_depth_metrics,
148 enable_agent_asleep_cycles_metric: parameters.enable_agent_asleep_cycles_metric,
149 agent_name_handle_map,
150 }
151 }
152
153 #[must_use]
155 pub fn consumed_for_agent(&self, name: &str) -> Option<&[Message]> {
156 Some(&self.find_by_name(name)?.state.consumed)
157 }
158
159 #[must_use]
161 pub fn find_by_name(&self, name: &str) -> Option<&SimulationAgent> {
162 self.agent_name_handle_map
163 .get(name)
164 .map(|id| self.agents.get(*id))?
165 }
166
167 pub fn find_by_name_mut(&mut self, name: &str) -> Option<&mut SimulationAgent> {
169 self.agent_name_handle_map
170 .get(name)
171 .map(|id| self.agents.get_mut(*id))?
172 }
173
174 #[must_use]
176 pub fn produced_for_agent(&self, name: &str) -> Option<&[Message]> {
177 Some(&self.find_by_name(name)?.state.produced)
178 }
179
180 #[must_use]
182 pub fn queue_depth_metrics(&self, name: &str) -> Option<&[usize]> {
183 Some(&self.find_by_name(name)?.metadata.queue_depth_metrics)
184 }
185
186 #[must_use]
188 pub fn asleep_cycle_count(&self, name: &str) -> Option<DiscreteTime> {
189 Some(self.find_by_name(name)?.metadata.asleep_cycle_count)
190 }
191
192 pub fn run(&mut self) {
194 self.mode = SimulationMode::Running;
195 let mut command_buffer: Vec<AgentCommand> = vec![];
196
197 while !(self.halt_check)(self) {
198 debug!("Running next tick of simulation at time {}", self.time);
199 self.wakeup_agents_scheduled_to_wakeup_now();
200
201 for agent_handle in 0..self.agents.len() {
202 let agent = &mut self.agents[agent_handle];
203 let queued_msg = agent.state.queue.pop_front();
204
205 if self.enable_queue_depth_metric {
206 agent
207 .metadata
208 .queue_depth_metrics
209 .push(agent.state.queue.len());
210 }
211
212 let mut agent_commands: Vec<AgentCommandType> = vec![];
213
214 let mut ctx = AgentContext {
215 handle: agent_handle,
216 name: &agent.name,
217 time: self.time,
218 commands: &mut agent_commands,
219 state: &agent.state,
220 message_processing_status: MessageProcessingStatus::NoError,
221 };
222
223 match agent.state.mode {
224 AgentMode::Proactive => agent.agent.on_tick(&mut ctx),
225 AgentMode::Reactive => {
226 if let Some(msg) = queued_msg {
227 agent.agent.on_message(&mut ctx, &msg);
229
230 match ctx.message_processing_status {
231 MessageProcessingStatus::InProgress => {
232 agent.state.queue.push_front(msg);
233 }
234 MessageProcessingStatus::NoError => {
235 agent.state.consumed.push(Message {
236 completed_time: Some(self.time),
237 ..msg
238 });
239 }
240 }
241 }
242 }
243 AgentMode::AsleepUntil(_) => {
244 if self.enable_agent_asleep_cycles_metric {
245 agent.metadata.asleep_cycle_count += 1;
246 }
247 }
248 AgentMode::Dead => {}
249 }
250
251 command_buffer.extend(agent_commands.into_iter().map(|command_type| {
252 AgentCommand {
253 ty: command_type,
254 agent_handle,
255 }
256 }));
257 }
258
259 self.process_command_buffer(&mut command_buffer);
261
262 debug!("Finished this tick; incrementing time.");
263 self.time += 1;
264 }
265
266 self.mode = SimulationMode::Completed;
267 self.emit_completed_simulation_debug_logging();
268 }
269
270 #[must_use]
273 pub fn calc_avg_wait_statistics(&self) -> HashMap<String, f64> {
274 let mut data = HashMap::new();
275 for agent in self
276 .agents
277 .iter()
278 .filter(|agent| !agent.state.consumed.is_empty())
279 {
280 let mut sum_of_times: f64 = 0f64;
281 for completed in &agent.state.consumed {
282 sum_of_times += completed.completed_time.unwrap_or(completed.queued_time) as f64
283 - completed.queued_time as f64;
284 }
285
286 data.insert(
287 agent.name.clone(),
288 sum_of_times / agent.state.consumed.len() as f64,
289 );
290 }
291
292 data
293 }
294
295 #[must_use]
298 pub fn calc_queue_len_statistics(&self) -> HashMap<String, usize> {
299 let mut data = HashMap::new();
300
301 for agent in &self.agents {
302 data.insert(agent.name.clone(), agent.state.queue.len());
303 }
304
305 data
306 }
307
308 #[must_use]
310 pub fn calc_consumed_len_statistics(&self) -> HashMap<String, usize> {
311 let mut data = HashMap::new();
312
313 for agent in &self.agents {
314 data.insert(agent.name.clone(), agent.state.consumed.len());
315 }
316
317 data
318 }
319
320 #[must_use]
322 pub fn calc_produced_len_statistics(&self) -> HashMap<String, usize> {
323 let mut data = HashMap::new();
324
325 for agent in &self.agents {
326 data.insert(agent.name.clone(), agent.state.produced.len());
327 }
328
329 data
330 }
331
332 unsafe fn agent_by_handle_mut_unchecked(&mut self, handle: usize) -> &mut SimulationAgent {
334 unsafe { self.agents.get_unchecked_mut(handle) }
335 }
336
337 fn emit_completed_simulation_debug_logging(&self) {
339 let queue_len_stats = self.calc_queue_len_statistics();
340 let consumed_len_stats = self.calc_consumed_len_statistics();
341 let avg_wait_stats = self.calc_avg_wait_statistics();
342 let produced_len_stats = self.calc_produced_len_statistics();
343
344 debug!("Queues: {queue_len_stats:?}");
345 debug!("Consumed: {consumed_len_stats:?}");
346 debug!("Produced: {produced_len_stats:?}");
347 debug!("Average processing time: {avg_wait_stats:?}");
348 }
349
350 fn process_command_buffer(&mut self, command_buffer: &mut Vec<AgentCommand>) {
353 while let Some(command) = command_buffer.pop() {
354 match command.ty {
355 AgentCommandType::SendMessage(message) => {
356 if let Some(receiver) = self.find_by_name_mut(&message.destination) {
357 receiver.state.queue.push_back(message.clone());
358 }
359
360 let commanding_agent =
361 unsafe { self.agent_by_handle_mut_unchecked(command.agent_handle) };
362
363 commanding_agent.state.produced.push(message);
364 }
365
366 AgentCommandType::HaltSimulation(reason) => {
367 info!("Received a halt interrupt: {reason:?}");
368 self.mode = SimulationMode::Completed;
369 }
370
371 AgentCommandType::Sleep(ticks) => {
372 let sleep_until = self.time + ticks;
373 let commanding_agent =
374 unsafe { self.agent_by_handle_mut_unchecked(command.agent_handle) };
375
376 commanding_agent.state.mode = AgentMode::AsleepUntil(sleep_until);
377 }
378 }
379 }
380 }
381
382 fn wakeup_agents_scheduled_to_wakeup_now(&mut self) {
384 for agent in &mut self.agents {
385 if let AgentMode::AsleepUntil(wakeup_at) = agent.state.mode {
386 if self.time >= wakeup_at {
387 agent.state.mode = agent.state.wake_mode;
388 }
389 }
390 }
391 }
392
393 pub fn find_agent<P>(&self, predicate: P) -> Option<&SimulationAgent>
395 where
396 P: FnMut(&&SimulationAgent) -> bool,
397 {
398 self.agents.iter().find(predicate)
399 }
400
401 pub fn all_agents<P>(&self, predicate: P) -> bool
403 where
404 P: FnMut(&SimulationAgent) -> bool,
405 {
406 self.agents.iter().all(predicate)
407 }
408
409 #[must_use]
411 pub fn agents(&self) -> &[SimulationAgent] {
412 self.agents.iter().as_slice()
413 }
414
415 #[must_use]
417 pub const fn time(&self) -> DiscreteTime {
418 self.time
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use super::*;
425 use rand_distr::Poisson;
426
427 fn init() {
428 let _ = env_logger::builder().is_test(true).try_init();
429 }
430
431 #[test]
432 fn basic_periodic_test() {
433 init();
434 let mut simulation = Simulation::new(SimulationParameters {
435 agent_initializers: vec![
436 periodic_producer("producer".to_string(), 1, "consumer".to_string()),
437 periodic_consumer("consumer".to_string(), 1),
438 ],
439 halt_check: |s: &Simulation| s.time == 5,
440 ..Default::default()
441 });
442 simulation.run();
443 let produced_stats = simulation.calc_produced_len_statistics();
444 assert_eq!(produced_stats.get("producer"), Some(&5));
445 assert_eq!(produced_stats.get("consumer"), Some(&0));
446
447 let consumed_stats = simulation.calc_consumed_len_statistics();
448 assert_eq!(consumed_stats.get("producer"), Some(&0));
449 assert_eq!(consumed_stats.get("consumer"), Some(&4));
450 }
451
452 #[test]
453 fn starbucks_clerk() -> Result<(), Box<dyn std::error::Error>> {
454 #[derive(Debug, Clone)]
455 struct Clerk {}
456
457 impl Agent for Clerk {
458 fn on_message(&mut self, ctx: &mut AgentContext, msg: &Message) {
459 debug!("{} looking for a customer.", ctx.name);
460 if let Some(last) = ctx.state.consumed.last() {
461 if last.completed_time.is_some_and(|t| t + 60 > ctx.time) {
462 debug!("Sorry, we're still serving the last customer.");
463 }
464 }
465
466 if let Some(_msg) = ctx.state.queue.front() {
467 if msg.queued_time + 100 > ctx.time {
468 debug!("Still making your coffee, sorry!");
469 ctx.set_processing_status(MessageProcessingStatus::InProgress);
470 }
471
472 debug!("Serviced a customer!");
473 }
474 }
475 }
476
477 init();
478
479 let mut simulation = Simulation::new(SimulationParameters {
480 starting_time: 1,
481 enable_queue_depth_metrics: false,
482 enable_agent_asleep_cycles_metric: false,
483 halt_check: |s: &Simulation| s.time > 500,
484 agent_initializers: vec![
485 poisson_distributed_producer(
486 "Starbucks Customers".to_string(),
487 Poisson::new(80.0_f64)?,
488 "Starbucks Clerk".to_string(),
489 ),
490 AgentInitializer {
491 agent: Box::new(Clerk {}),
492 options: AgentOptions::defaults_with_name("Starbucks Clerk".to_string()),
493 },
494 ],
495 });
496
497 simulation.run();
498 assert!(Some(simulation).is_some());
499 Ok(())
500 }
501}