1use std::collections::{BinaryHeap, HashMap, HashSet};
8
9struct Event<T> {
11 timestamp: u64,
12 dest: u64,
13 msg: Option<T>,
14}
15
16impl<T> Event<T> {
17 pub fn new(timestamp: u64, dest: u64, msg: Option<T>) -> Self {
18 Self { timestamp, dest, msg }
19 }
20}
21
22impl<T> PartialEq for Event<T> {
23 fn eq(&self, other: &Self) -> bool {
24 self.timestamp == other.timestamp && self.dest == other.dest
25 }
26}
27
28impl<T> Eq for Event<T> {}
29
30impl<T> PartialOrd for Event<T> {
31 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
32 Some(self.cmp(other))
33 }
34}
35
36impl<T> Ord for Event<T> {
37 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
38 other.timestamp.cmp(&self.timestamp).then_with(|| other.dest.cmp(&self.dest))
40 }
41}
42
43pub enum Resumption<T> {
45 Initial,
46 Scheduled,
47 Message(T),
48}
49
50pub enum Suspension {
52 Timeout(u64),
53 Idle,
54 Halt, }
56
57pub trait Process<T> {
59 fn resume(&mut self, resumption: Resumption<T>, env: &mut Environment<T>) -> Suspension;
60}
61
62pub type BoxedProcess<T> = Box<dyn Process<T>>;
63
64#[derive(Default)]
66pub struct Environment<T> {
67 now: u64,
68 broadcast_delay: u64,
69 event_queue: BinaryHeap<Event<T>>,
70 process_ids: HashSet<u64>,
71}
72
73impl<T: Clone> Environment<T> {
74 pub fn new(delay: u64) -> Self {
75 Self::with_start_time(delay, 0)
76 }
77
78 pub fn with_start_time(delay: u64, start_time: u64) -> Self {
79 Self { now: start_time, broadcast_delay: delay, event_queue: BinaryHeap::new(), process_ids: HashSet::new() }
80 }
81
82 pub fn now(&self) -> u64 {
83 self.now
84 }
85
86 pub fn send(&mut self, delay: u64, dest: u64, msg: T) {
87 self.event_queue.push(Event::new(self.now + delay, dest, Some(msg)))
88 }
89
90 pub fn timeout(&mut self, timeout: u64, dest: u64) {
91 self.event_queue.push(Event::new(self.now + timeout, dest, None))
92 }
93
94 pub fn broadcast(&mut self, _sender: u64, msg: T) {
95 for &id in self.process_ids.iter() {
96 self.event_queue.push(Event::new(self.now + self.broadcast_delay, id, Some(msg.clone())));
97 }
98 }
99
100 fn next_event(&mut self) -> Event<T> {
101 let event = self.event_queue.pop().unwrap();
102 self.now = event.timestamp;
103 event
104 }
105}
106
107#[derive(Default)]
109pub struct Simulation<T> {
110 env: Environment<T>,
111 processes: HashMap<u64, BoxedProcess<T>>,
112}
113
114impl<T: Clone> Simulation<T> {
115 pub fn new(delay: u64) -> Self {
116 Self { env: Environment::new(delay), processes: HashMap::new() }
117 }
118
119 pub fn with_start_time(delay: u64, start_time: u64) -> Self {
120 Self { env: Environment::with_start_time(delay, start_time), processes: HashMap::new() }
121 }
122
123 pub fn register(&mut self, id: u64, process: BoxedProcess<T>) {
124 self.processes.insert(id, process);
125 self.env.process_ids.insert(id);
126 }
127
128 pub fn step(&mut self) -> bool {
129 let event = self.env.next_event();
130 let process = self.processes.get_mut(&event.dest).unwrap();
131 let op = if let Some(msg) = event.msg { Resumption::Message(msg) } else { Resumption::Scheduled };
132 match process.resume(op, &mut self.env) {
133 Suspension::Timeout(timeout) => {
134 self.env.timeout(timeout, event.dest);
135 true
136 }
137 Suspension::Idle => true,
138 Suspension::Halt => false,
139 }
140 }
141
142 pub fn run(&mut self, until: u64) {
143 for (&id, process) in self.processes.iter_mut() {
144 match process.resume(Resumption::Initial, &mut self.env) {
145 Suspension::Timeout(timeout) => self.env.timeout(timeout, id),
146 Suspension::Idle => {}
147 Suspension::Halt => panic!("not expecting halt on startup"),
148 }
149 }
150
151 while self.step() {
152 if self.env.now() > until {
153 break;
154 }
155 }
156 self.processes.clear();
157 }
158}