1use std::cell::{Ref, RefCell, RefMut};
4use std::collections::HashMap;
5use std::path::Path;
6use std::rc::Rc;
7
8use rand::distributions::uniform::{SampleRange, SampleUniform};
9
10use simcore::handler::EventCancellationPolicy;
11use simcore::{cast, Simulation};
12
13use crate::events::MessageReceived;
14use crate::logger::{LogEntry, Logger};
15use crate::{EventLogEntry, Message, Network, Node, Process};
16
17pub struct System {
19 sim: Simulation,
20 net: Rc<RefCell<Network>>,
21 nodes: HashMap<String, Rc<RefCell<Node>>>,
22 proc_nodes: HashMap<String, Rc<RefCell<Node>>>,
23 logger: Rc<RefCell<Logger>>,
24}
25
26impl System {
27 pub fn new(seed: u64) -> Self {
29 let logger = Rc::new(RefCell::new(Logger::new()));
30 let mut sim = Simulation::new(seed);
31 let net = Rc::new(RefCell::new(Network::new(sim.create_context("net"), logger.clone())));
32 Self {
33 sim,
34 net,
35 nodes: HashMap::new(),
36 proc_nodes: HashMap::new(),
37 logger,
38 }
39 }
40
41 pub fn with_log_file(seed: u64, log_path: &Path) -> Self {
43 let logger = Rc::new(RefCell::new(Logger::with_log_file(log_path)));
44 let mut sim = Simulation::new(seed);
45 let net = Rc::new(RefCell::new(Network::new(sim.create_context("net"), logger.clone())));
46 Self {
47 sim,
48 net,
49 nodes: HashMap::new(),
50 proc_nodes: HashMap::new(),
51 logger,
52 }
53 }
54
55 pub fn logger(&self) -> RefMut<Logger> {
57 self.logger.borrow_mut()
58 }
59
60 pub fn network(&self) -> RefMut<Network> {
64 self.net.borrow_mut()
65 }
66
67 pub fn nodes(&self) -> Vec<String> {
71 self.nodes.keys().cloned().collect()
72 }
73
74 pub fn add_node(&mut self, name: &str) {
78 let node = Rc::new(RefCell::new(Node::new(
79 name.to_string(),
80 self.net.clone(),
81 self.sim.create_context(name),
82 self.logger.clone(),
83 )));
84 let node_id = self.sim.add_handler(name, node.clone());
85 assert!(
86 self.nodes.insert(name.to_string(), node).is_none(),
87 "Node with name {} already exists, node names must be unique",
88 name
89 );
90 self.net.borrow_mut().add_node(name.to_string(), node_id);
91 self.logger.borrow_mut().log(LogEntry::NodeStarted {
92 time: self.sim.time(),
93 node: name.to_string(),
94 node_id,
95 });
96 }
97
98 pub fn set_node_clock_skew(&mut self, node: &str, clock_skew: f64) {
100 self.nodes[node].borrow_mut().set_clock_skew(clock_skew);
101 }
102
103 pub fn crash_node(&mut self, node_name: &str) {
112 let node = self.nodes.get(node_name).unwrap();
113 node.borrow_mut().crash();
114
115 self.logger.borrow_mut().log(LogEntry::NodeCrashed {
116 time: self.sim.time(),
117 node: node_name.to_string(),
118 });
119
120 let node_id = self.sim.lookup_id(node_name);
122 let cancelled = self.sim.cancel_and_get_events(|e| e.src == node_id);
123 for event in cancelled {
124 cast!(match event.data {
125 MessageReceived {
126 id,
127 msg,
128 src,
129 src_node,
130 dst,
131 dst_node,
132 } => {
133 self.logger.borrow_mut().log(LogEntry::MessageDropped {
134 time: self.sim.time(),
135 msg_id: id.to_string(),
136 msg,
137 src_proc: src,
138 src_node,
139 dst_proc: dst,
140 dst_node,
141 });
142 }
143 })
144 }
145
146 self.sim.remove_handler(node_name, EventCancellationPolicy::Incoming);
148 }
149
150 pub fn recover_node(&mut self, node_name: &str) {
155 assert!(
156 self.node_is_crashed(node_name),
157 "Node is not crashed to be eligible for recovery"
158 );
159 let node = self.nodes.get(node_name).unwrap();
160 node.borrow_mut().recover();
161 self.sim.add_handler(node_name, node.clone());
162
163 self.proc_nodes.retain(|_, node| node.borrow().name != node_name);
165
166 self.logger.borrow_mut().log(LogEntry::NodeRecovered {
167 time: self.sim.time(),
168 node: node_name.to_string(),
169 });
170 }
171
172 pub fn get_node(&self, name: &str) -> Option<Ref<Node>> {
174 self.nodes.get(name).map(|res| res.borrow())
175 }
176
177 pub fn get_mut_node(&self, name: &str) -> Option<RefMut<Node>> {
179 self.nodes.get(name).map(|res| res.borrow_mut())
180 }
181
182 pub fn node_is_crashed(&self, node: &str) -> bool {
184 self.nodes.get(node).unwrap().borrow().is_crashed()
185 }
186
187 pub fn add_process(&mut self, name: &str, proc: Box<dyn Process>, node: &str) {
193 self.nodes[node].borrow_mut().add_process(name, proc);
194 self.net
195 .borrow_mut()
196 .set_proc_location(name.to_string(), node.to_string());
197 assert!(
198 self.proc_nodes
199 .insert(name.to_string(), self.nodes[node].clone())
200 .is_none(),
201 "Process with name {} already exists, process names must be unique",
202 name
203 );
204 self.logger.borrow_mut().log(LogEntry::ProcessStarted {
205 time: self.sim.time(),
206 node: node.to_string(),
207 proc: name.to_string(),
208 });
209 }
210
211 pub fn process_names(&self) -> Vec<String> {
213 self.proc_nodes.keys().cloned().collect()
214 }
215
216 pub fn send_local_message(&mut self, proc: &str, msg: Message) {
218 let mut node = self.proc_nodes[proc].borrow_mut();
219 assert!(
220 !node.is_crashed(),
221 "Cannot send local message to process {} on crashed node {}",
222 proc,
223 node.name
224 );
225 node.send_local_message(proc.to_string(), msg);
226 }
227
228 pub fn read_local_messages(&mut self, proc: &str) -> Vec<Message> {
230 self.proc_nodes[proc]
231 .borrow_mut()
232 .read_local_messages(proc)
233 .unwrap_or_default()
234 }
235
236 pub fn local_outbox(&self, proc: &str) -> Vec<Message> {
240 self.proc_nodes[proc].borrow().local_outbox(proc)
241 }
242
243 pub fn event_log(&self, proc: &str) -> Vec<EventLogEntry> {
245 self.proc_nodes[proc].borrow().event_log(proc)
246 }
247
248 pub fn max_size(&mut self, proc: &str) -> u64 {
250 self.proc_nodes[proc].borrow_mut().max_size(proc)
251 }
252
253 pub fn sent_message_count(&self, proc: &str) -> u64 {
255 self.proc_nodes[proc].borrow().sent_message_count(proc)
256 }
257
258 pub fn received_message_count(&self, proc: &str) -> u64 {
260 self.proc_nodes[proc].borrow().received_message_count(proc)
261 }
262
263 pub fn proc_node_name(&self, proc: &str) -> String {
265 self.proc_nodes[proc].borrow().name().to_owned()
266 }
267
268 pub fn proc_node_is_crashed(&self, proc: &str) -> bool {
270 self.proc_nodes[proc].borrow().is_crashed()
271 }
272
273 pub fn sim(&self) -> &Simulation {
277 &self.sim
278 }
279
280 pub fn time(&self) -> f64 {
282 self.sim.time()
283 }
284
285 pub fn step(&mut self) -> bool {
287 self.sim.step()
288 }
289
290 pub fn steps(&mut self, step_count: u64) -> bool {
292 self.sim.steps(step_count)
293 }
294
295 pub fn step_until_no_events(&mut self) {
297 self.sim.step_until_no_events()
298 }
299
300 pub fn step_for_duration(&mut self, duration: f64) -> bool {
302 self.sim.step_for_duration(duration)
303 }
304
305 pub fn step_until_local_message(&mut self, proc: &str) -> Result<Vec<Message>, &str> {
310 let node = self.proc_nodes[proc].clone();
311 loop {
312 if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
313 return Ok(messages);
314 }
315 if !self.step() {
316 return Err("No messages");
317 }
318 }
319 }
320
321 pub fn step_until_local_message_max_steps(&mut self, proc: &str, max_steps: u32) -> Result<Vec<Message>, &str> {
324 let mut steps = 0;
325 let node = self.proc_nodes[proc].clone();
326 if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
327 return Ok(messages);
328 }
329 while steps < max_steps {
330 if !self.step() {
331 break;
332 }
333 if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
334 return Ok(messages);
335 }
336 steps += 1;
337 }
338 Err("No messages")
339 }
340
341 pub fn step_until_local_message_timeout(&mut self, proc: &str, timeout: f64) -> Result<Vec<Message>, &str> {
344 let end_time = self.time() + timeout;
345 let node = self.proc_nodes[proc].clone();
346 while self.time() < end_time {
347 if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
348 return Ok(messages);
349 }
350 if !self.step() {
351 break;
352 }
353 }
354 Err("No messages")
355 }
356
357 pub fn gen_range<T, R>(&mut self, range: R) -> T
360 where
361 T: SampleUniform,
362 R: SampleRange<T>,
363 {
364 self.sim.gen_range(range)
365 }
366
367 pub fn random_string(&mut self, len: usize) -> String {
370 self.sim.random_string(len)
371 }
372}