ncomm_executors/
threaded_executor.rs

1//!
2//! The Threaded Executor takes control of a given number of threads and sends a simple
3//! executor over each of the threads to execute Nodes.
4//!
5//! I would typically recommend utilizing the threadpool executor if each Node is equally
6//! important to execute.  If, however, there is a specific Node that should always
7//! execute at a given rate and needs to be on its own thread to accomplish that, the
8//! Threaded Executor may be the best choice.
9//!
10
11use std::thread;
12
13use quanta::{Clock, Instant};
14
15use crossbeam::channel::{unbounded, Receiver, Sender};
16
17use ncomm_core::{Executor, ExecutorState, Node};
18
19use crate::{insert_into, NodeWrapper, SimpleExecutor};
20
21/// Threaded Executor
22///
23/// The Threaded Executor stores nodes in a bunch of SimpleExecutors on
24/// given threads.  On the update loop each of hte SimpleExecutors execute
25/// their nodes in parallel
26pub struct ThreadedExecutor<NID: PartialEq + Send, TID: PartialEq + Send> {
27    /// The executors to run
28    executors: Vec<(SimpleExecutor<NID>, TID)>,
29    /// The backing for the main thread
30    backing: Vec<NodeWrapper<NID>>,
31    /// The thread id of the main thread
32    thread_id: TID,
33    /// The quanta high-prevision clock
34    clock: Clock,
35    /// The Instant the executor was started
36    start_instant: Instant,
37    /// The current state of the executor
38    state: ExecutorState,
39    /// The interrupt receiver channel
40    interrupt: Receiver<bool>,
41    /// The interrupt senders used to propagate the interrupt to other threads
42    interrupt_propagators: Vec<Sender<bool>>,
43    /// Whether or not the executor has been interrupted
44    interrupted: bool,
45}
46
47impl<NID: PartialEq + Send, TID: PartialEq + Send> ThreadedExecutor<NID, TID> {
48    /// Create a new Threaded Executor without any Nodes
49    pub fn new(interrupt: Receiver<bool>, main_thread_id: TID) -> Self {
50        let clock = Clock::new();
51        let now = clock.now();
52
53        Self {
54            executors: Vec::new(),
55            backing: Vec::new(),
56            thread_id: main_thread_id,
57            clock,
58            state: ExecutorState::Stopped,
59            start_instant: now,
60            interrupt,
61            interrupt_propagators: Vec::new(),
62            interrupted: false,
63        }
64    }
65
66    /// Creates a new Threaded executor with a given mapping for nodes
67    #[allow(clippy::type_complexity)]
68    pub fn new_with(
69        interrupt: Receiver<bool>,
70        main_thread_id: TID,
71        mut nodes: Vec<(Vec<Box<dyn Node<NID>>>, TID)>,
72    ) -> Self {
73        let mut backing = Vec::new();
74        if let Some(idx) = nodes.iter().position(|(_, tid)| tid.eq(&main_thread_id)) {
75            let (mut node_list, _) = nodes.remove(idx);
76            for node in node_list.drain(..) {
77                backing.push(NodeWrapper { priority: 0, node });
78            }
79        }
80
81        let mut executors = Vec::new();
82        let mut interrupt_propagators = Vec::new();
83        for (node_list, thread_id) in nodes.drain(..) {
84            let (tx, rx) = unbounded();
85            interrupt_propagators.push(tx);
86            executors.push((SimpleExecutor::new_with(rx, node_list), thread_id));
87        }
88
89        let clock = Clock::new();
90        let now = clock.now();
91
92        Self {
93            executors,
94            backing,
95            thread_id: main_thread_id,
96            clock,
97            start_instant: now,
98            state: ExecutorState::Stopped,
99            interrupt,
100            interrupt_propagators,
101            interrupted: false,
102        }
103    }
104
105    fn start_self(&mut self) {
106        for node_wrapper in self.backing.iter_mut() {
107            node_wrapper.priority = 0;
108            node_wrapper.node.start();
109        }
110
111        self.interrupted = false;
112        self.state = ExecutorState::Started;
113        self.start_instant = self.clock.now();
114    }
115}
116
117impl<NID: PartialEq + Send + 'static, TID: PartialEq + Send + 'static> Executor<NID>
118    for ThreadedExecutor<NID, TID>
119{
120    type Context = TID;
121
122    fn start(&mut self) {
123        let mut handles = Vec::new();
124        for (mut executor, tid) in self.executors.drain(..) {
125            handles.push(thread::spawn(move || {
126                executor.start();
127                (executor, tid)
128            }));
129        }
130
131        self.start_self();
132
133        for handle in handles {
134            self.executors.push(handle.join().unwrap());
135        }
136    }
137
138    fn update_for_ms(&mut self, ms: u128) {
139        // Dispatch the other threads
140        let mut handles = Vec::new();
141        for (mut executor, tid) in self.executors.drain(..) {
142            handles.push(thread::spawn(move || {
143                executor.update_for_ms(ms);
144                (executor, tid)
145            }));
146        }
147
148        // Start this exector
149        self.start_self();
150
151        // Run this executor
152        self.state = ExecutorState::Running;
153        while self
154            .clock
155            .now()
156            .duration_since(self.start_instant)
157            .as_millis()
158            < ms
159            && !self.check_interrupt()
160        {
161            if self.backing.last().is_some()
162                && self
163                    .clock
164                    .now()
165                    .duration_since(self.start_instant)
166                    .as_micros()
167                    >= self.backing.last().unwrap().priority
168            {
169                let mut node_wrapper = self.backing.pop().unwrap();
170                node_wrapper.node.update();
171                node_wrapper.priority += node_wrapper.node.get_update_delay_us();
172                insert_into(&mut self.backing, node_wrapper);
173            }
174        }
175
176        // Stop the Executor
177        for node_wrapper in self.backing.iter_mut() {
178            node_wrapper.priority = 0;
179            node_wrapper.node.shutdown();
180        }
181        self.state = ExecutorState::Stopped;
182
183        for handle in handles {
184            self.executors.push(handle.join().unwrap());
185        }
186    }
187
188    fn update_loop(&mut self) {
189        // Dispatch the other threads
190        let mut handles = Vec::new();
191        for (mut executor, tid) in self.executors.drain(..) {
192            handles.push(thread::spawn(move || {
193                executor.update_loop();
194                (executor, tid)
195            }));
196        }
197
198        // Start this executor
199        self.start_self();
200
201        // Run the executor
202        self.state = ExecutorState::Running;
203        while !self.check_interrupt() {
204            if self.backing.last().is_some()
205                && self
206                    .clock
207                    .now()
208                    .duration_since(self.start_instant)
209                    .as_micros()
210                    >= self.backing.last().unwrap().priority
211            {
212                let mut node_wrapper = self.backing.pop().unwrap();
213                node_wrapper.node.update();
214                node_wrapper.priority += node_wrapper.node.get_update_delay_us();
215                insert_into(&mut self.backing, node_wrapper);
216            }
217        }
218
219        // Stop this executor
220        for node_wrapper in self.backing.iter_mut() {
221            node_wrapper.priority = 0;
222            node_wrapper.node.shutdown();
223        }
224        self.state = ExecutorState::Stopped;
225
226        for handle in handles {
227            self.executors.push(handle.join().unwrap());
228        }
229    }
230
231    fn check_interrupt(&mut self) -> bool {
232        if let Ok(interrupt) = self.interrupt.try_recv() {
233            self.interrupted = interrupt;
234            for tx in self.interrupt_propagators.iter_mut() {
235                tx.send(interrupt).unwrap();
236            }
237        }
238
239        self.interrupted
240    }
241
242    fn add_node(&mut self, node: Box<dyn Node<NID>>) {
243        if let Some(idx) = self
244            .backing
245            .iter()
246            .position(|node_wrapper| node_wrapper.node.get_id().eq(&node.get_id()))
247        {
248            self.backing.remove(idx);
249        }
250
251        if self.state == ExecutorState::Stopped {
252            self.backing.push(NodeWrapper { priority: 0, node });
253        } else if self.state == ExecutorState::Started {
254            insert_into(
255                &mut self.backing,
256                NodeWrapper {
257                    priority: self
258                        .clock
259                        .now()
260                        .duration_since(self.start_instant)
261                        .as_micros(),
262                    node,
263                },
264            );
265        }
266    }
267
268    fn add_node_with_context(&mut self, node: Box<dyn Node<NID>>, _ctx: Self::Context) {
269        if _ctx == self.thread_id {
270            self.add_node(node);
271        } else if let Some((executor, _)) = self.executors.iter_mut().find(|(_, tid)| tid.eq(&_ctx))
272        {
273            executor.add_node(node);
274        } else {
275            let (tx, rx) = unbounded();
276            self.interrupt_propagators.push(tx);
277            self.executors
278                .push((SimpleExecutor::new_with(rx, vec![node]), _ctx));
279        }
280    }
281
282    fn remove_node(&mut self, id: &NID) -> Option<Box<dyn Node<NID>>> {
283        if let Some(idx) = self
284            .backing
285            .iter()
286            .position(|node_wrapper| node_wrapper.node.get_id().eq(id))
287        {
288            return Some(self.backing.remove(idx).destroy());
289        }
290
291        let mut found_node = None;
292        let mut delete_executor = None;
293        for (idx, (executor, _)) in self.executors.iter_mut().enumerate() {
294            if let Some(node) = executor.remove_node(id) {
295                found_node = Some(node);
296                if executor.backing.is_empty() {
297                    delete_executor = Some(idx);
298                }
299            }
300        }
301
302        if let Some(idx) = delete_executor {
303            self.executors.remove(idx);
304        }
305
306        found_node
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    use std::{any::Any, time::Duration};
315
316    #[derive(Clone, Copy, Debug, PartialEq, Eq)]
317    enum State {
318        Stopped,
319        Started,
320        Updating,
321    }
322
323    pub struct SimpleNode {
324        id: u8,
325        pub update_delay: u128,
326        pub num: u8,
327        state: State,
328    }
329
330    impl SimpleNode {
331        pub fn new(id: u8, update_delay: u128) -> Self {
332            Self {
333                id,
334                update_delay,
335                num: 0,
336                state: State::Stopped,
337            }
338        }
339    }
340
341    impl Node<u8> for SimpleNode {
342        fn get_id(&self) -> u8 {
343            self.id
344        }
345        fn start(&mut self) {
346            self.state = State::Started;
347        }
348
349        fn update(&mut self) {
350            self.state = State::Updating;
351            self.num = self.num.wrapping_add(1);
352        }
353
354        fn shutdown(&mut self) {
355            self.state = State::Stopped;
356        }
357
358        fn get_update_delay_us(&self) -> u128 {
359            self.update_delay
360        }
361    }
362
363    #[test]
364    fn test_start() {
365        let (_, rx) = unbounded();
366
367        let mut executor = ThreadedExecutor::new_with(
368            rx,
369            0,
370            vec![
371                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
372                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
373                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
374            ],
375        );
376        let original_start_instant = executor.start_instant;
377
378        executor.start();
379
380        for node_wrapper in executor.backing.iter() {
381            assert_eq!(node_wrapper.priority, 0);
382            let simple_node: &dyn Any = &node_wrapper.node;
383            let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
384            assert_eq!(simple_node.state, State::Started);
385        }
386
387        for executor in executor.executors.iter() {
388            for node_wrapper in executor.0.backing.iter() {
389                assert_eq!(node_wrapper.priority, 0);
390                let simple_node: &dyn Any = &node_wrapper.node;
391                let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
392                assert_eq!(simple_node.state, State::Started);
393            }
394        }
395
396        assert!(!executor.interrupted);
397        assert_eq!(executor.state, ExecutorState::Started);
398        assert!(executor.start_instant > original_start_instant);
399    }
400
401    #[test]
402    fn test_check_interrupt() {
403        let (tx, rx) = unbounded();
404
405        let mut executor = ThreadedExecutor::new_with(
406            rx,
407            0,
408            vec![
409                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
410                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
411                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
412            ],
413        );
414
415        tx.send(true).unwrap();
416
417        assert!(executor.check_interrupt());
418        for executor in executor.executors.iter_mut() {
419            assert!(executor.0.check_interrupt());
420        }
421    }
422
423    #[test]
424    fn test_add_node() {
425        let (_, rx) = unbounded();
426
427        let mut executor = ThreadedExecutor::new_with(
428            rx,
429            0,
430            vec![
431                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
432                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
433                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
434            ],
435        );
436
437        executor.add_node(Box::new(SimpleNode::new(3, 22_000)));
438
439        assert_eq!(executor.backing.len(), 2);
440    }
441
442    #[test]
443    fn test_add_node_with_context_backing() {
444        let (_, rx) = unbounded();
445
446        let mut executor = ThreadedExecutor::new_with(
447            rx,
448            0,
449            vec![
450                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
451                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
452                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
453            ],
454        );
455
456        executor.add_node_with_context(Box::new(SimpleNode::new(3, 10_000)), 0);
457        assert_eq!(executor.backing.len(), 2);
458    }
459
460    #[test]
461    fn test_add_node_with_context_other_executor() {
462        let (_, rx) = unbounded();
463
464        let mut executor = ThreadedExecutor::new_with(
465            rx,
466            0,
467            vec![
468                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
469                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
470                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
471            ],
472        );
473
474        executor.add_node_with_context(Box::new(SimpleNode::new(3, 10_000)), 1);
475        assert_eq!(
476            executor
477                .executors
478                .iter()
479                .find(|(_, tid)| tid.eq(&1))
480                .unwrap()
481                .0
482                .backing
483                .len(),
484            2
485        );
486    }
487
488    #[test]
489    fn test_add_node_same_id() {
490        let (_, rx) = unbounded();
491
492        let mut executor = ThreadedExecutor::new_with(
493            rx,
494            0,
495            vec![
496                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
497                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
498                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
499            ],
500        );
501
502        executor.add_node(Box::new(SimpleNode::new(0, 100_000)));
503        assert_eq!(executor.backing.len(), 1);
504        assert_eq!(executor.backing[0].node.get_update_delay_us(), 100_000);
505    }
506
507    #[test]
508    fn test_remove_node_backing() {
509        let (_, rx) = unbounded();
510
511        let mut executor = ThreadedExecutor::new_with(
512            rx,
513            0,
514            vec![
515                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
516                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
517                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
518            ],
519        );
520
521        executor.remove_node(&0);
522        assert_eq!(executor.backing.len(), 0);
523    }
524
525    #[test]
526    fn test_remove_node_executors_no_remove_executor() {
527        let (_, rx) = unbounded();
528
529        let mut executor = ThreadedExecutor::new_with(
530            rx,
531            0,
532            vec![
533                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
534                (
535                    vec![
536                        Box::new(SimpleNode::new(1, 100_000)),
537                        Box::new(SimpleNode::new(2, 111_111)),
538                    ],
539                    1,
540                ),
541                (vec![Box::new(SimpleNode::new(3, 110_000))], 2),
542            ],
543        );
544
545        executor.remove_node(&1);
546        assert_eq!(executor.executors[0].0.backing.len(), 1);
547    }
548
549    #[test]
550    fn test_remove_node_executors_remove_executor() {
551        let (_, rx) = unbounded();
552
553        let mut executor = ThreadedExecutor::new_with(
554            rx,
555            0,
556            vec![
557                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
558                (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
559                (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
560            ],
561        );
562
563        executor.remove_node(&1);
564        assert_eq!(executor.executors.len(), 1);
565    }
566
567    #[test]
568    fn test_update_ms() {
569        let (_, rx) = unbounded();
570
571        let mut executor = ThreadedExecutor::new_with(
572            rx,
573            0,
574            vec![
575                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
576                (vec![Box::new(SimpleNode::new(1, 10_000))], 1),
577                (vec![Box::new(SimpleNode::new(2, 10_000))], 2),
578            ],
579        );
580
581        let start = executor.clock.now();
582        executor.update_for_ms(100);
583        let end = executor.clock.now();
584
585        // Check that the nodes were  started and updated
586        for node_wrapper in executor.backing.iter() {
587            assert!(node_wrapper.priority == 0);
588            let simple_node: &dyn Any = &node_wrapper.node;
589            let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
590            assert_eq!(simple_node.state, State::Stopped);
591            assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
592        }
593
594        for (executor, _) in executor.executors.iter() {
595            for node_wrapper in executor.backing.iter() {
596                assert!(node_wrapper.priority == 0);
597                let simple_node: &dyn Any = &node_wrapper.node;
598                let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
599                assert_eq!(simple_node.state, State::Stopped);
600                assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
601            }
602        }
603
604        assert!(Duration::from_millis(95) < end - start);
605        assert!(end - start < Duration::from_millis(150));
606    }
607
608    #[test]
609    fn test_update_loop() {
610        let (tx, rx) = unbounded();
611
612        let mut executor = ThreadedExecutor::new_with(
613            rx,
614            0,
615            vec![
616                (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
617                (vec![Box::new(SimpleNode::new(1, 10_000))], 1),
618                (vec![Box::new(SimpleNode::new(2, 10_000))], 2),
619            ],
620        );
621
622        let handle = thread::spawn(move || {
623            executor.update_loop();
624            executor
625        });
626
627        thread::sleep(Duration::from_millis(100));
628        tx.send(true).unwrap();
629
630        let executor = handle.join().unwrap();
631
632        // Check that the nodes were  started and updated
633        for node_wrapper in executor.backing.iter() {
634            assert!(node_wrapper.priority == 0);
635            let simple_node: &dyn Any = &node_wrapper.node;
636            let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
637            assert_eq!(simple_node.state, State::Stopped);
638            assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
639        }
640
641        for (executor, _) in executor.executors.iter() {
642            for node_wrapper in executor.backing.iter() {
643                assert!(node_wrapper.priority == 0);
644                let simple_node: &dyn Any = &node_wrapper.node;
645                let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
646                assert_eq!(simple_node.state, State::Stopped);
647                assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
648            }
649        }
650    }
651}