anysystem/
system.rs

1//! System implementation.
2
3use std::cell::{Ref, RefCell, RefMut};
4use std::collections::HashMap;
5use std::path::Path;
6use std::rc::Rc;
7
8use rand::distributions::uniform::{SampleRange, SampleUniform};
9
10use simcore::handler::EventCancellationPolicy;
11use simcore::{cast, Simulation};
12
13use crate::events::MessageReceived;
14use crate::logger::{LogEntry, Logger};
15use crate::{EventLogEntry, Message, Network, Node, Process};
16
17/// Models distributed system consisting of multiple nodes connected via network.
18pub struct System {
19    sim: Simulation,
20    net: Rc<RefCell<Network>>,
21    nodes: HashMap<String, Rc<RefCell<Node>>>,
22    proc_nodes: HashMap<String, Rc<RefCell<Node>>>,
23    logger: Rc<RefCell<Logger>>,
24}
25
26impl System {
27    /// Creates a system with specified random seed.
28    pub fn new(seed: u64) -> Self {
29        let logger = Rc::new(RefCell::new(Logger::new()));
30        let mut sim = Simulation::new(seed);
31        let net = Rc::new(RefCell::new(Network::new(sim.create_context("net"), logger.clone())));
32        Self {
33            sim,
34            net,
35            nodes: HashMap::new(),
36            proc_nodes: HashMap::new(),
37            logger,
38        }
39    }
40
41    /// Creates a system with logging events to file.
42    pub fn with_log_file(seed: u64, log_path: &Path) -> Self {
43        let logger = Rc::new(RefCell::new(Logger::with_log_file(log_path)));
44        let mut sim = Simulation::new(seed);
45        let net = Rc::new(RefCell::new(Network::new(sim.create_context("net"), logger.clone())));
46        Self {
47            sim,
48            net,
49            nodes: HashMap::new(),
50            proc_nodes: HashMap::new(),
51            logger,
52        }
53    }
54
55    /// Returns a mutable reference to system logger.
56    pub fn logger(&self) -> RefMut<Logger> {
57        self.logger.borrow_mut()
58    }
59
60    // Network ---------------------------------------------------------------------------------------------------------
61
62    /// Returns a mutable reference to network.
63    pub fn network(&self) -> RefMut<Network> {
64        self.net.borrow_mut()
65    }
66
67    // Nodes -----------------------------------------------------------------------------------------------------------
68
69    /// Returns a list of node names.
70    pub fn nodes(&self) -> Vec<String> {
71        self.nodes.keys().cloned().collect()
72    }
73
74    /// Adds a node to the system.
75    ///
76    /// Note that node names must be unique.
77    pub fn add_node(&mut self, name: &str) {
78        let node = Rc::new(RefCell::new(Node::new(
79            name.to_string(),
80            self.net.clone(),
81            self.sim.create_context(name),
82            self.logger.clone(),
83        )));
84        let node_id = self.sim.add_handler(name, node.clone());
85        assert!(
86            self.nodes.insert(name.to_string(), node).is_none(),
87            "Node with name {} already exists, node names must be unique",
88            name
89        );
90        self.net.borrow_mut().add_node(name.to_string(), node_id);
91        self.logger.borrow_mut().log(LogEntry::NodeStarted {
92            time: self.sim.time(),
93            node: name.to_string(),
94            node_id,
95        });
96    }
97
98    /// Sets local clock skew of the node.
99    pub fn set_node_clock_skew(&mut self, node: &str, clock_skew: f64) {
100        self.nodes[node].borrow_mut().set_clock_skew(clock_skew);
101    }
102
103    /// Crashes the specified node.
104    ///
105    /// All pending events created by the node will be discarded.
106    /// The undelivered messages sent by the node will be dropped.
107    /// All pending and future events destined to the node will be discarded.
108    ///
109    /// Processes running on the node are not cleared to allow working
110    /// with processes after the crash (i.e. examine event log).
111    pub fn crash_node(&mut self, node_name: &str) {
112        let node = self.nodes.get(node_name).unwrap();
113        node.borrow_mut().crash();
114
115        self.logger.borrow_mut().log(LogEntry::NodeCrashed {
116            time: self.sim.time(),
117            node: node_name.to_string(),
118        });
119
120        // cancel pending events (i.e. undelivered messages) from the crashed node
121        let node_id = self.sim.lookup_id(node_name);
122        let cancelled = self.sim.cancel_and_get_events(|e| e.src == node_id);
123        for event in cancelled {
124            cast!(match event.data {
125                MessageReceived {
126                    id,
127                    msg,
128                    src,
129                    src_node,
130                    dst,
131                    dst_node,
132                } => {
133                    self.logger.borrow_mut().log(LogEntry::MessageDropped {
134                        time: self.sim.time(),
135                        msg_id: id.to_string(),
136                        msg,
137                        src_proc: src,
138                        src_node,
139                        dst_proc: dst,
140                        dst_node,
141                    });
142                }
143            })
144        }
145
146        // remove the handler to discard all pending and future events sent to this node
147        self.sim.remove_handler(node_name, EventCancellationPolicy::Incoming);
148    }
149
150    /// Recovers the previously crashed node.
151    ///
152    /// Processes running on the node before the crash are cleared.
153    /// The delivery of events to the node is enabled.
154    pub fn recover_node(&mut self, node_name: &str) {
155        assert!(
156            self.node_is_crashed(node_name),
157            "Node is not crashed to be eligible for recovery"
158        );
159        let node = self.nodes.get(node_name).unwrap();
160        node.borrow_mut().recover();
161        self.sim.add_handler(node_name, node.clone());
162
163        // remove previous process-node mappings to enable recreating these processes
164        self.proc_nodes.retain(|_, node| node.borrow().name != node_name);
165
166        self.logger.borrow_mut().log(LogEntry::NodeRecovered {
167            time: self.sim.time(),
168            node: node_name.to_string(),
169        });
170    }
171
172    /// Returns an immutable reference to the node.
173    pub fn get_node(&self, name: &str) -> Option<Ref<Node>> {
174        self.nodes.get(name).map(|res| res.borrow())
175    }
176
177    /// Returns a mutable reference to the node.
178    pub fn get_mut_node(&self, name: &str) -> Option<RefMut<Node>> {
179        self.nodes.get(name).map(|res| res.borrow_mut())
180    }
181
182    /// Checks if the node is crashed.
183    pub fn node_is_crashed(&self, node: &str) -> bool {
184        self.nodes.get(node).unwrap().borrow().is_crashed()
185    }
186
187    // Processes -------------------------------------------------------------------------------------------------------
188
189    /// Adds a process executing on the node.
190    ///
191    /// Note that process names should be globally unique.
192    pub fn add_process(&mut self, name: &str, proc: Box<dyn Process>, node: &str) {
193        self.nodes[node].borrow_mut().add_process(name, proc);
194        self.net
195            .borrow_mut()
196            .set_proc_location(name.to_string(), node.to_string());
197        assert!(
198            self.proc_nodes
199                .insert(name.to_string(), self.nodes[node].clone())
200                .is_none(),
201            "Process with name {} already exists, process names must be unique",
202            name
203        );
204        self.logger.borrow_mut().log(LogEntry::ProcessStarted {
205            time: self.sim.time(),
206            node: node.to_string(),
207            proc: name.to_string(),
208        });
209    }
210
211    /// Returns the names of all processes in the system.
212    pub fn process_names(&self) -> Vec<String> {
213        self.proc_nodes.keys().cloned().collect()
214    }
215
216    /// Sends a local message to the process.
217    pub fn send_local_message(&mut self, proc: &str, msg: Message) {
218        let mut node = self.proc_nodes[proc].borrow_mut();
219        assert!(
220            !node.is_crashed(),
221            "Cannot send local message to process {} on crashed node {}",
222            proc,
223            node.name
224        );
225        node.send_local_message(proc.to_string(), msg);
226    }
227
228    /// Reads and returns the local messages produced by the process.
229    pub fn read_local_messages(&mut self, proc: &str) -> Vec<Message> {
230        self.proc_nodes[proc]
231            .borrow_mut()
232            .read_local_messages(proc)
233            .unwrap_or_default()
234    }
235
236    /// Returns a copy of the local messages produced by the process.
237    ///
238    /// In contrast to [`Self::read_local_messages`], this method does not drain the process outbox.
239    pub fn local_outbox(&self, proc: &str) -> Vec<Message> {
240        self.proc_nodes[proc].borrow().local_outbox(proc)
241    }
242
243    /// Returns the event log for the process.
244    pub fn event_log(&self, proc: &str) -> Vec<EventLogEntry> {
245        self.proc_nodes[proc].borrow().event_log(proc)
246    }
247
248    /// Returns the maximum size of process inner data observed so far.
249    pub fn max_size(&mut self, proc: &str) -> u64 {
250        self.proc_nodes[proc].borrow_mut().max_size(proc)
251    }
252
253    /// Returns the number of messages sent by the process.
254    pub fn sent_message_count(&self, proc: &str) -> u64 {
255        self.proc_nodes[proc].borrow().sent_message_count(proc)
256    }
257
258    /// Returns the number of messages received by the process.
259    pub fn received_message_count(&self, proc: &str) -> u64 {
260        self.proc_nodes[proc].borrow().received_message_count(proc)
261    }
262
263    /// Returns the name of node hosting the process.
264    pub fn proc_node_name(&self, proc: &str) -> String {
265        self.proc_nodes[proc].borrow().name().to_owned()
266    }
267
268    /// Checks if the node hosting the process is crashed.
269    pub fn proc_node_is_crashed(&self, proc: &str) -> bool {
270        self.proc_nodes[proc].borrow().is_crashed()
271    }
272
273    // Simulation ------------------------------------------------------------------------------------------------------
274
275    /// Returns the reference to inner simulation.
276    pub fn sim(&self) -> &Simulation {
277        &self.sim
278    }
279
280    /// Returns the current simulation time.
281    pub fn time(&self) -> f64 {
282        self.sim.time()
283    }
284
285    /// Performs a single step through the simulation.
286    pub fn step(&mut self) -> bool {
287        self.sim.step()
288    }
289
290    /// Performs the specified number of steps through the simulation.
291    pub fn steps(&mut self, step_count: u64) -> bool {
292        self.sim.steps(step_count)
293    }
294
295    /// Steps through the simulation until there are no pending events left.
296    pub fn step_until_no_events(&mut self) {
297        self.sim.step_until_no_events()
298    }
299
300    /// Steps through the simulation with duration limit.
301    pub fn step_for_duration(&mut self, duration: f64) -> bool {
302        self.sim.step_for_duration(duration)
303    }
304
305    /// Steps through the simulation until the process produces local message(s)
306    /// or there are no pending events left.
307    ///
308    /// Returns the read local messages if any and error otherwise.
309    pub fn step_until_local_message(&mut self, proc: &str) -> Result<Vec<Message>, &str> {
310        let node = self.proc_nodes[proc].clone();
311        loop {
312            if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
313                return Ok(messages);
314            }
315            if !self.step() {
316                return Err("No messages");
317            }
318        }
319    }
320
321    /// Similar to [`Self::step_until_local_message`] but with additional limit
322    /// on the number of steps through the simulations.
323    pub fn step_until_local_message_max_steps(&mut self, proc: &str, max_steps: u32) -> Result<Vec<Message>, &str> {
324        let mut steps = 0;
325        let node = self.proc_nodes[proc].clone();
326        if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
327            return Ok(messages);
328        }
329        while steps < max_steps {
330            if !self.step() {
331                break;
332            }
333            if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
334                return Ok(messages);
335            }
336            steps += 1;
337        }
338        Err("No messages")
339    }
340
341    /// Similar to [`Self::step_until_local_message`] but with additional limit
342    /// on the duration of waiting for local messages.
343    pub fn step_until_local_message_timeout(&mut self, proc: &str, timeout: f64) -> Result<Vec<Message>, &str> {
344        let end_time = self.time() + timeout;
345        let node = self.proc_nodes[proc].clone();
346        while self.time() < end_time {
347            if let Some(messages) = node.borrow_mut().read_local_messages(proc) {
348                return Ok(messages);
349            }
350            if !self.step() {
351                break;
352            }
353        }
354        Err("No messages")
355    }
356
357    /// Returns a random number in the specified range
358    /// using the simulation-wide random number generator.
359    pub fn gen_range<T, R>(&mut self, range: R) -> T
360    where
361        T: SampleUniform,
362        R: SampleRange<T>,
363    {
364        self.sim.gen_range(range)
365    }
366
367    /// Returns a random alphanumeric string of specified length
368    /// using the simulation-wide random number generator.
369    pub fn random_string(&mut self, len: usize) -> String {
370        self.sim.random_string(len)
371    }
372}