ncomm_executors/
simple_executor.rs

1//!
2//! The Simple Executor
3//!
4//! The simple executor is the most simple and easy to understand
5//! executor in the NComm system.  Basically, the simple executor is
6//! a singular thread that stores each node in a sorted vector and
7//! pops off the highest priority Node, executes its update method
8//! and then inserts it into the sorted vector with an updated priority.
9//!
10//! In practice, I would say it is unlikely for the simple executor
11//! to find a lot of use out in the wild but it is probably the best
12//! executor for single threaded execution.
13//!
14
15use std::any::Any;
16
17use crossbeam::channel::Receiver;
18
19use quanta::{Clock, Instant};
20
21use ncomm_core::{Executor, ExecutorState, Node};
22
23use crate::{insert_into, NodeWrapper};
24
25/// Simple Executor
26///
27/// This simple executor stores Nodes in a sorted vector where the
28/// priority is higher the closer to the current timestamp the Node's
29/// next update is.
30///
31/// Note: The Simple Executor can be interrupted by sending a true value
32/// over the mpsc channel whose receiving end is owned by the SimpleExecutor
33///
34/// Addendum: The Simple Executor will also busy wait between node executions
35/// so do not expect the SimpleExecutor to yield CPU time to other processes while
36/// it is running.
37pub struct SimpleExecutor<ID: PartialEq> {
38    /// The sorted backing vector for the executor
39    pub(crate) backing: Vec<NodeWrapper<ID>>,
40    /// The quanta high-precision clock backing the SimplExecutor
41    clock: Clock,
42    /// The current state of the executor
43    state: ExecutorState,
44    /// The Instant the executor was started
45    start_instant: Instant,
46    /// The Interrupt receiver channel
47    interrupt: Receiver<bool>,
48    /// Whether or not the executor has been interrupted
49    interrupted: bool,
50}
51
52impl<ID: PartialEq> SimpleExecutor<ID> {
53    /// Create a new Simple Executor without any Nodes
54    pub fn new(interrupt: Receiver<bool>) -> Self {
55        let clock = Clock::new();
56        let now = clock.now();
57
58        Self {
59            backing: Vec::new(),
60            clock,
61            start_instant: now,
62            state: ExecutorState::Stopped,
63            interrupt,
64            interrupted: false,
65        }
66    }
67
68    /// Creates a new Simple Executor with a number of Nodes
69    pub fn new_with(interrupt: Receiver<bool>, mut nodes: Vec<Box<dyn Node<ID>>>) -> Self {
70        let mut backing = Vec::new();
71        for node in nodes.drain(..) {
72            backing.push(NodeWrapper { priority: 0, node });
73        }
74
75        let clock = Clock::new();
76        let now = clock.now();
77
78        Self {
79            backing,
80            clock,
81            start_instant: now,
82            state: ExecutorState::Stopped,
83            interrupt,
84            interrupted: false,
85        }
86    }
87}
88
89impl<ID: PartialEq> Executor<ID> for SimpleExecutor<ID> {
90    /// Context doesn't really apply to SimpleExecutors
91    type Context = Box<dyn Any>;
92
93    /// For each node in the simple executor we should reset their priority to 0
94    /// and start the node.  We should also set the start_instant to the current time.
95    ///
96    /// Note: this method should not be called individually as it will always be
97    /// called during the `update_for_ms` and `update_loop` methods so running
98    /// it here is completely redundant.
99    fn start(&mut self) {
100        for node_wrapper in self.backing.iter_mut() {
101            node_wrapper.priority = 0;
102            node_wrapper.node.start();
103        }
104
105        self.interrupted = false;
106        self.state = ExecutorState::Started;
107        self.start_instant = self.clock.now();
108    }
109
110    /// Start the executor and run the executor for a given number of milliseconds before
111    /// stopping the executor.  An interrupt will also stop the executor early.
112    ///
113    /// Note: if there are no Nodes currently in the executor it will busy wait until the
114    /// time has passed or an interrupt occurs
115    fn update_for_ms(&mut self, ms: u128) {
116        // Start the Executor
117        self.start();
118
119        // Run the Executor
120        self.state = ExecutorState::Running;
121        while self
122            .clock
123            .now()
124            .duration_since(self.start_instant)
125            .as_millis()
126            < ms
127            && !self.check_interrupt()
128        {
129            if self.backing.last().is_some()
130                && self
131                    .clock
132                    .now()
133                    .duration_since(self.start_instant)
134                    .as_micros()
135                    >= self.backing.last().unwrap().priority
136            {
137                let mut node_wrapper = self.backing.pop().unwrap();
138                node_wrapper.node.update();
139                node_wrapper.priority += node_wrapper.node.get_update_delay_us();
140                insert_into(&mut self.backing, node_wrapper);
141            }
142        }
143
144        // Stop the Executor
145        for node_wrapper in self.backing.iter_mut() {
146            node_wrapper.priority = 0;
147            node_wrapper.node.shutdown();
148        }
149        self.state = ExecutorState::Stopped;
150    }
151
152    /// Start the executor and run until an interrupt is received.
153    ///
154    /// Note: if there are no Nodes currently in the executor it will busy wait until it
155    /// receives an interrupt
156    fn update_loop(&mut self) {
157        // Start the Executor
158        self.start();
159
160        // Run the Executor
161        self.state = ExecutorState::Running;
162        while !self.check_interrupt() {
163            if self.backing.last().is_some()
164                && self
165                    .clock
166                    .now()
167                    .duration_since(self.start_instant)
168                    .as_micros()
169                    >= self.backing.last().unwrap().priority
170            {
171                let mut node_wrapper = self.backing.pop().unwrap();
172                node_wrapper.node.update();
173                node_wrapper.priority += node_wrapper.node.get_update_delay_us();
174                insert_into(&mut self.backing, node_wrapper);
175            }
176        }
177
178        // Stop the Executor
179        for node_wrapper in self.backing.iter_mut() {
180            node_wrapper.priority = 0;
181            node_wrapper.node.shutdown();
182        }
183        self.state = ExecutorState::Stopped;
184    }
185
186    /// Check the interrupt receiver for an interrupt.  If an interrupt
187    /// signal was sent over the channel then this node should report that
188    /// it was interrupted.
189    fn check_interrupt(&mut self) -> bool {
190        if let Ok(interrupt) = self.interrupt.try_recv() {
191            self.interrupted = interrupt;
192        }
193        self.interrupted
194    }
195
196    /// Add a node to the Simple Executor.
197    ///
198    /// Note: Nodes can only be added to the executor when it is not running.
199    ///
200    /// Additionally, only 1 node can exist per id so additional nodes added with
201    /// the same id will replace the previous node of a given id.
202    fn add_node(&mut self, node: Box<dyn Node<ID>>) {
203        if let Some(idx) = self
204            .backing
205            .iter()
206            .position(|node_wrapper| node_wrapper.node.get_id().eq(&node.get_id()))
207        {
208            self.backing.remove(idx);
209        }
210
211        if self.state == ExecutorState::Stopped {
212            self.backing.push(NodeWrapper { priority: 0, node });
213        } else if self.state == ExecutorState::Started {
214            insert_into(
215                &mut self.backing,
216                NodeWrapper {
217                    priority: self
218                        .clock
219                        .now()
220                        .duration_since(self.start_instant)
221                        .as_micros(),
222                    node,
223                },
224            );
225        }
226    }
227
228    /// Remove a node from the Simple Executor.
229    ///
230    /// Note: Nodes can only be removed from the executor when it is not running.
231    fn remove_node(&mut self, id: &ID) -> Option<Box<dyn Node<ID>>> {
232        if self.state != ExecutorState::Running {
233            let idx = self
234                .backing
235                .iter()
236                .position(|node_wrapper| node_wrapper.node.get_id().eq(id));
237            if let Some(idx) = idx {
238                Some(self.backing.remove(idx).destroy())
239            } else {
240                None
241            }
242        } else {
243            None
244        }
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251
252    use std::{any::Any, thread, time::Duration};
253
254    use crossbeam::channel::unbounded;
255
256    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
257    enum State {
258        Stopped,
259        Started,
260        Updating,
261    }
262
263    pub struct SimpleNode {
264        id: u8,
265        pub update_delay: u128,
266        pub num: u8,
267        state: State,
268    }
269
270    impl SimpleNode {
271        pub fn new(id: u8, update_delay: u128) -> Self {
272            Self {
273                id,
274                update_delay,
275                num: 0,
276                state: State::Stopped,
277            }
278        }
279    }
280
281    impl Node<u8> for SimpleNode {
282        fn get_id(&self) -> u8 {
283            self.id
284        }
285        fn start(&mut self) {
286            self.state = State::Started;
287        }
288
289        fn update(&mut self) {
290            self.state = State::Updating;
291            self.num = self.num.wrapping_add(1);
292        }
293
294        fn shutdown(&mut self) {
295            self.state = State::Stopped;
296        }
297
298        fn get_update_delay_us(&self) -> u128 {
299            self.update_delay
300        }
301    }
302
303    #[test]
304    /// Start should set the priority of all nodes to 0, start all nodes, set its
305    /// interrupted value to false, enter the ExecutorState::Started state and set its
306    /// start instant
307    fn test_simple_executor_start() {
308        let (_, rx) = unbounded();
309
310        let mut executor = SimpleExecutor::new_with(
311            rx,
312            vec![
313                Box::new(SimpleNode::new(0, 100_000)),
314                Box::new(SimpleNode::new(1, 250_000)),
315            ],
316        );
317        let original_start_instant = executor.start_instant;
318
319        executor.start();
320
321        for node_wrapper in executor.backing.iter() {
322            assert_eq!(node_wrapper.priority, 0);
323            let simple_node: &dyn Any = &node_wrapper.node;
324            let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
325            assert_eq!(simple_node.state, State::Started);
326        }
327        assert!(!executor.interrupted);
328        assert_eq!(executor.state, ExecutorState::Started);
329        assert!(executor.start_instant > original_start_instant);
330    }
331
332    #[test]
333    fn test_update_for_ms() {
334        let (_, rx) = unbounded();
335
336        let mut executor = SimpleExecutor::new_with(
337            rx,
338            vec![
339                Box::new(SimpleNode::new(0, 10_000)),
340                Box::new(SimpleNode::new(1, 25_000)),
341            ],
342        );
343
344        let start = executor.clock.now();
345        executor.update_for_ms(100);
346        let end = executor.clock.now();
347
348        // Check the nodes were started and updated
349        for node_wrapper in executor.backing.iter() {
350            // Priority should have been reset to 0
351            assert!(node_wrapper.priority == 0);
352            let simple_node: &dyn Any = &node_wrapper.node;
353            let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
354            assert_eq!(simple_node.state, State::Stopped);
355            // Check the node has been updated a valid number of times
356            assert!([9, 10, 11, 3, 4, 5].contains(&simple_node.num));
357        }
358
359        assert!(Duration::from_millis(95) < end - start);
360        assert!(end - start < Duration::from_millis(105));
361    }
362
363    #[test]
364    fn test_check_interrupt() {
365        let (tx, rx) = unbounded();
366
367        let mut executor = SimpleExecutor::new_with(
368            rx,
369            vec![
370                Box::new(SimpleNode::new(0, 110_000)),
371                Box::new(SimpleNode::new(1, 25_000)),
372            ],
373        );
374
375        tx.send(true).unwrap();
376
377        assert!(executor.check_interrupt());
378    }
379
380    #[test]
381    fn test_add_node_stopped() {
382        let (_, rx) = unbounded();
383
384        let mut executor = SimpleExecutor::new_with(
385            rx,
386            vec![
387                Box::new(SimpleNode::new(0, 10_000)),
388                Box::new(SimpleNode::new(1, 25_000)),
389            ],
390        );
391
392        executor.add_node(Box::new(SimpleNode::new(2, 1_000)));
393
394        assert_eq!(executor.backing.len(), 3);
395    }
396
397    #[test]
398    fn test_add_node_same_id() {
399        let (_, rx) = unbounded();
400
401        let mut executor = SimpleExecutor::new_with(
402            rx,
403            vec![
404                Box::new(SimpleNode::new(0, 10_000)),
405                Box::new(SimpleNode::new(1, 25_000)),
406            ],
407        );
408
409        executor.add_node(Box::new(SimpleNode::new(0, 1_000)));
410
411        assert_eq!(executor.backing.len(), 2);
412        let zero_id = executor
413            .backing
414            .iter()
415            .find(|node_wrapper| node_wrapper.node.get_id().eq(&0))
416            .unwrap();
417        assert_eq!(zero_id.node.get_update_delay_us(), 1_000);
418    }
419
420    #[test]
421    fn test_remove_node() {
422        let (_, rx) = unbounded();
423
424        let mut executor = SimpleExecutor::new_with(
425            rx,
426            vec![
427                Box::new(SimpleNode::new(0, 10_000)),
428                Box::new(SimpleNode::new(1, 25_000)),
429            ],
430        );
431
432        executor.remove_node(&0);
433
434        assert_eq!(executor.backing.len(), 1);
435        assert_eq!(executor.backing[0].node.get_id(), 1);
436    }
437
438    #[test]
439    fn test_update_loop() {
440        let (tx, rx) = unbounded();
441
442        let mut executor = SimpleExecutor::new_with(
443            rx,
444            vec![
445                Box::new(SimpleNode::new(0, 10_000)),
446                Box::new(SimpleNode::new(1, 25_000)),
447            ],
448        );
449
450        let handle = thread::spawn(move || {
451            executor.update_loop();
452            executor
453        });
454
455        thread::sleep(Duration::from_millis(100));
456        tx.send(true).unwrap();
457
458        let executor = handle.join().unwrap();
459        for node_wrapper in executor.backing.iter() {
460            assert_eq!(node_wrapper.priority, 0);
461            let simple_node: &dyn Any = &node_wrapper.node;
462            let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
463            assert_eq!(simple_node.state, State::Stopped);
464            assert!([3, 4, 5, 9, 10, 11].contains(&simple_node.num));
465        }
466
467        assert!(executor.interrupted);
468        assert_eq!(executor.state, ExecutorState::Stopped);
469    }
470}