1use std::thread;
12
13use quanta::{Clock, Instant};
14
15use crossbeam::channel::{unbounded, Receiver, Sender};
16
17use ncomm_core::{Executor, ExecutorState, Node};
18
19use crate::{insert_into, NodeWrapper, SimpleExecutor};
20
21pub struct ThreadedExecutor<NID: PartialEq + Send, TID: PartialEq + Send> {
27 executors: Vec<(SimpleExecutor<NID>, TID)>,
29 backing: Vec<NodeWrapper<NID>>,
31 thread_id: TID,
33 clock: Clock,
35 start_instant: Instant,
37 state: ExecutorState,
39 interrupt: Receiver<bool>,
41 interrupt_propagators: Vec<Sender<bool>>,
43 interrupted: bool,
45}
46
47impl<NID: PartialEq + Send, TID: PartialEq + Send> ThreadedExecutor<NID, TID> {
48 pub fn new(interrupt: Receiver<bool>, main_thread_id: TID) -> Self {
50 let clock = Clock::new();
51 let now = clock.now();
52
53 Self {
54 executors: Vec::new(),
55 backing: Vec::new(),
56 thread_id: main_thread_id,
57 clock,
58 state: ExecutorState::Stopped,
59 start_instant: now,
60 interrupt,
61 interrupt_propagators: Vec::new(),
62 interrupted: false,
63 }
64 }
65
66 #[allow(clippy::type_complexity)]
68 pub fn new_with(
69 interrupt: Receiver<bool>,
70 main_thread_id: TID,
71 mut nodes: Vec<(Vec<Box<dyn Node<NID>>>, TID)>,
72 ) -> Self {
73 let mut backing = Vec::new();
74 if let Some(idx) = nodes.iter().position(|(_, tid)| tid.eq(&main_thread_id)) {
75 let (mut node_list, _) = nodes.remove(idx);
76 for node in node_list.drain(..) {
77 backing.push(NodeWrapper { priority: 0, node });
78 }
79 }
80
81 let mut executors = Vec::new();
82 let mut interrupt_propagators = Vec::new();
83 for (node_list, thread_id) in nodes.drain(..) {
84 let (tx, rx) = unbounded();
85 interrupt_propagators.push(tx);
86 executors.push((SimpleExecutor::new_with(rx, node_list), thread_id));
87 }
88
89 let clock = Clock::new();
90 let now = clock.now();
91
92 Self {
93 executors,
94 backing,
95 thread_id: main_thread_id,
96 clock,
97 start_instant: now,
98 state: ExecutorState::Stopped,
99 interrupt,
100 interrupt_propagators,
101 interrupted: false,
102 }
103 }
104
105 fn start_self(&mut self) {
106 for node_wrapper in self.backing.iter_mut() {
107 node_wrapper.priority = 0;
108 node_wrapper.node.start();
109 }
110
111 self.interrupted = false;
112 self.state = ExecutorState::Started;
113 self.start_instant = self.clock.now();
114 }
115}
116
117impl<NID: PartialEq + Send + 'static, TID: PartialEq + Send + 'static> Executor<NID>
118 for ThreadedExecutor<NID, TID>
119{
120 type Context = TID;
121
122 fn start(&mut self) {
123 let mut handles = Vec::new();
124 for (mut executor, tid) in self.executors.drain(..) {
125 handles.push(thread::spawn(move || {
126 executor.start();
127 (executor, tid)
128 }));
129 }
130
131 self.start_self();
132
133 for handle in handles {
134 self.executors.push(handle.join().unwrap());
135 }
136 }
137
138 fn update_for_ms(&mut self, ms: u128) {
139 let mut handles = Vec::new();
141 for (mut executor, tid) in self.executors.drain(..) {
142 handles.push(thread::spawn(move || {
143 executor.update_for_ms(ms);
144 (executor, tid)
145 }));
146 }
147
148 self.start_self();
150
151 self.state = ExecutorState::Running;
153 while self
154 .clock
155 .now()
156 .duration_since(self.start_instant)
157 .as_millis()
158 < ms
159 && !self.check_interrupt()
160 {
161 if self.backing.last().is_some()
162 && self
163 .clock
164 .now()
165 .duration_since(self.start_instant)
166 .as_micros()
167 >= self.backing.last().unwrap().priority
168 {
169 let mut node_wrapper = self.backing.pop().unwrap();
170 node_wrapper.node.update();
171 node_wrapper.priority += node_wrapper.node.get_update_delay_us();
172 insert_into(&mut self.backing, node_wrapper);
173 }
174 }
175
176 for node_wrapper in self.backing.iter_mut() {
178 node_wrapper.priority = 0;
179 node_wrapper.node.shutdown();
180 }
181 self.state = ExecutorState::Stopped;
182
183 for handle in handles {
184 self.executors.push(handle.join().unwrap());
185 }
186 }
187
188 fn update_loop(&mut self) {
189 let mut handles = Vec::new();
191 for (mut executor, tid) in self.executors.drain(..) {
192 handles.push(thread::spawn(move || {
193 executor.update_loop();
194 (executor, tid)
195 }));
196 }
197
198 self.start_self();
200
201 self.state = ExecutorState::Running;
203 while !self.check_interrupt() {
204 if self.backing.last().is_some()
205 && self
206 .clock
207 .now()
208 .duration_since(self.start_instant)
209 .as_micros()
210 >= self.backing.last().unwrap().priority
211 {
212 let mut node_wrapper = self.backing.pop().unwrap();
213 node_wrapper.node.update();
214 node_wrapper.priority += node_wrapper.node.get_update_delay_us();
215 insert_into(&mut self.backing, node_wrapper);
216 }
217 }
218
219 for node_wrapper in self.backing.iter_mut() {
221 node_wrapper.priority = 0;
222 node_wrapper.node.shutdown();
223 }
224 self.state = ExecutorState::Stopped;
225
226 for handle in handles {
227 self.executors.push(handle.join().unwrap());
228 }
229 }
230
231 fn check_interrupt(&mut self) -> bool {
232 if let Ok(interrupt) = self.interrupt.try_recv() {
233 self.interrupted = interrupt;
234 for tx in self.interrupt_propagators.iter_mut() {
235 tx.send(interrupt).unwrap();
236 }
237 }
238
239 self.interrupted
240 }
241
242 fn add_node(&mut self, node: Box<dyn Node<NID>>) {
243 if let Some(idx) = self
244 .backing
245 .iter()
246 .position(|node_wrapper| node_wrapper.node.get_id().eq(&node.get_id()))
247 {
248 self.backing.remove(idx);
249 }
250
251 if self.state == ExecutorState::Stopped {
252 self.backing.push(NodeWrapper { priority: 0, node });
253 } else if self.state == ExecutorState::Started {
254 insert_into(
255 &mut self.backing,
256 NodeWrapper {
257 priority: self
258 .clock
259 .now()
260 .duration_since(self.start_instant)
261 .as_micros(),
262 node,
263 },
264 );
265 }
266 }
267
268 fn add_node_with_context(&mut self, node: Box<dyn Node<NID>>, _ctx: Self::Context) {
269 if _ctx == self.thread_id {
270 self.add_node(node);
271 } else if let Some((executor, _)) = self.executors.iter_mut().find(|(_, tid)| tid.eq(&_ctx))
272 {
273 executor.add_node(node);
274 } else {
275 let (tx, rx) = unbounded();
276 self.interrupt_propagators.push(tx);
277 self.executors
278 .push((SimpleExecutor::new_with(rx, vec![node]), _ctx));
279 }
280 }
281
282 fn remove_node(&mut self, id: &NID) -> Option<Box<dyn Node<NID>>> {
283 if let Some(idx) = self
284 .backing
285 .iter()
286 .position(|node_wrapper| node_wrapper.node.get_id().eq(id))
287 {
288 return Some(self.backing.remove(idx).destroy());
289 }
290
291 let mut found_node = None;
292 let mut delete_executor = None;
293 for (idx, (executor, _)) in self.executors.iter_mut().enumerate() {
294 if let Some(node) = executor.remove_node(id) {
295 found_node = Some(node);
296 if executor.backing.is_empty() {
297 delete_executor = Some(idx);
298 }
299 }
300 }
301
302 if let Some(idx) = delete_executor {
303 self.executors.remove(idx);
304 }
305
306 found_node
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 use std::{any::Any, time::Duration};
315
316 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
317 enum State {
318 Stopped,
319 Started,
320 Updating,
321 }
322
323 pub struct SimpleNode {
324 id: u8,
325 pub update_delay: u128,
326 pub num: u8,
327 state: State,
328 }
329
330 impl SimpleNode {
331 pub fn new(id: u8, update_delay: u128) -> Self {
332 Self {
333 id,
334 update_delay,
335 num: 0,
336 state: State::Stopped,
337 }
338 }
339 }
340
341 impl Node<u8> for SimpleNode {
342 fn get_id(&self) -> u8 {
343 self.id
344 }
345 fn start(&mut self) {
346 self.state = State::Started;
347 }
348
349 fn update(&mut self) {
350 self.state = State::Updating;
351 self.num = self.num.wrapping_add(1);
352 }
353
354 fn shutdown(&mut self) {
355 self.state = State::Stopped;
356 }
357
358 fn get_update_delay_us(&self) -> u128 {
359 self.update_delay
360 }
361 }
362
363 #[test]
364 fn test_start() {
365 let (_, rx) = unbounded();
366
367 let mut executor = ThreadedExecutor::new_with(
368 rx,
369 0,
370 vec![
371 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
372 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
373 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
374 ],
375 );
376 let original_start_instant = executor.start_instant;
377
378 executor.start();
379
380 for node_wrapper in executor.backing.iter() {
381 assert_eq!(node_wrapper.priority, 0);
382 let simple_node: &dyn Any = &node_wrapper.node;
383 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
384 assert_eq!(simple_node.state, State::Started);
385 }
386
387 for executor in executor.executors.iter() {
388 for node_wrapper in executor.0.backing.iter() {
389 assert_eq!(node_wrapper.priority, 0);
390 let simple_node: &dyn Any = &node_wrapper.node;
391 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
392 assert_eq!(simple_node.state, State::Started);
393 }
394 }
395
396 assert!(!executor.interrupted);
397 assert_eq!(executor.state, ExecutorState::Started);
398 assert!(executor.start_instant > original_start_instant);
399 }
400
401 #[test]
402 fn test_check_interrupt() {
403 let (tx, rx) = unbounded();
404
405 let mut executor = ThreadedExecutor::new_with(
406 rx,
407 0,
408 vec![
409 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
410 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
411 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
412 ],
413 );
414
415 tx.send(true).unwrap();
416
417 assert!(executor.check_interrupt());
418 for executor in executor.executors.iter_mut() {
419 assert!(executor.0.check_interrupt());
420 }
421 }
422
423 #[test]
424 fn test_add_node() {
425 let (_, rx) = unbounded();
426
427 let mut executor = ThreadedExecutor::new_with(
428 rx,
429 0,
430 vec![
431 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
432 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
433 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
434 ],
435 );
436
437 executor.add_node(Box::new(SimpleNode::new(3, 22_000)));
438
439 assert_eq!(executor.backing.len(), 2);
440 }
441
442 #[test]
443 fn test_add_node_with_context_backing() {
444 let (_, rx) = unbounded();
445
446 let mut executor = ThreadedExecutor::new_with(
447 rx,
448 0,
449 vec![
450 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
451 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
452 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
453 ],
454 );
455
456 executor.add_node_with_context(Box::new(SimpleNode::new(3, 10_000)), 0);
457 assert_eq!(executor.backing.len(), 2);
458 }
459
460 #[test]
461 fn test_add_node_with_context_other_executor() {
462 let (_, rx) = unbounded();
463
464 let mut executor = ThreadedExecutor::new_with(
465 rx,
466 0,
467 vec![
468 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
469 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
470 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
471 ],
472 );
473
474 executor.add_node_with_context(Box::new(SimpleNode::new(3, 10_000)), 1);
475 assert_eq!(
476 executor
477 .executors
478 .iter()
479 .find(|(_, tid)| tid.eq(&1))
480 .unwrap()
481 .0
482 .backing
483 .len(),
484 2
485 );
486 }
487
488 #[test]
489 fn test_add_node_same_id() {
490 let (_, rx) = unbounded();
491
492 let mut executor = ThreadedExecutor::new_with(
493 rx,
494 0,
495 vec![
496 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
497 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
498 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
499 ],
500 );
501
502 executor.add_node(Box::new(SimpleNode::new(0, 100_000)));
503 assert_eq!(executor.backing.len(), 1);
504 assert_eq!(executor.backing[0].node.get_update_delay_us(), 100_000);
505 }
506
507 #[test]
508 fn test_remove_node_backing() {
509 let (_, rx) = unbounded();
510
511 let mut executor = ThreadedExecutor::new_with(
512 rx,
513 0,
514 vec![
515 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
516 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
517 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
518 ],
519 );
520
521 executor.remove_node(&0);
522 assert_eq!(executor.backing.len(), 0);
523 }
524
525 #[test]
526 fn test_remove_node_executors_no_remove_executor() {
527 let (_, rx) = unbounded();
528
529 let mut executor = ThreadedExecutor::new_with(
530 rx,
531 0,
532 vec![
533 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
534 (
535 vec![
536 Box::new(SimpleNode::new(1, 100_000)),
537 Box::new(SimpleNode::new(2, 111_111)),
538 ],
539 1,
540 ),
541 (vec![Box::new(SimpleNode::new(3, 110_000))], 2),
542 ],
543 );
544
545 executor.remove_node(&1);
546 assert_eq!(executor.executors[0].0.backing.len(), 1);
547 }
548
549 #[test]
550 fn test_remove_node_executors_remove_executor() {
551 let (_, rx) = unbounded();
552
553 let mut executor = ThreadedExecutor::new_with(
554 rx,
555 0,
556 vec![
557 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
558 (vec![Box::new(SimpleNode::new(1, 100_000))], 1),
559 (vec![Box::new(SimpleNode::new(2, 110_000))], 2),
560 ],
561 );
562
563 executor.remove_node(&1);
564 assert_eq!(executor.executors.len(), 1);
565 }
566
567 #[test]
568 fn test_update_ms() {
569 let (_, rx) = unbounded();
570
571 let mut executor = ThreadedExecutor::new_with(
572 rx,
573 0,
574 vec![
575 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
576 (vec![Box::new(SimpleNode::new(1, 10_000))], 1),
577 (vec![Box::new(SimpleNode::new(2, 10_000))], 2),
578 ],
579 );
580
581 let start = executor.clock.now();
582 executor.update_for_ms(100);
583 let end = executor.clock.now();
584
585 for node_wrapper in executor.backing.iter() {
587 assert!(node_wrapper.priority == 0);
588 let simple_node: &dyn Any = &node_wrapper.node;
589 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
590 assert_eq!(simple_node.state, State::Stopped);
591 assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
592 }
593
594 for (executor, _) in executor.executors.iter() {
595 for node_wrapper in executor.backing.iter() {
596 assert!(node_wrapper.priority == 0);
597 let simple_node: &dyn Any = &node_wrapper.node;
598 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
599 assert_eq!(simple_node.state, State::Stopped);
600 assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
601 }
602 }
603
604 assert!(Duration::from_millis(95) < end - start);
605 assert!(end - start < Duration::from_millis(150));
606 }
607
608 #[test]
609 fn test_update_loop() {
610 let (tx, rx) = unbounded();
611
612 let mut executor = ThreadedExecutor::new_with(
613 rx,
614 0,
615 vec![
616 (vec![Box::new(SimpleNode::new(0, 10_000))], 0),
617 (vec![Box::new(SimpleNode::new(1, 10_000))], 1),
618 (vec![Box::new(SimpleNode::new(2, 10_000))], 2),
619 ],
620 );
621
622 let handle = thread::spawn(move || {
623 executor.update_loop();
624 executor
625 });
626
627 thread::sleep(Duration::from_millis(100));
628 tx.send(true).unwrap();
629
630 let executor = handle.join().unwrap();
631
632 for node_wrapper in executor.backing.iter() {
634 assert!(node_wrapper.priority == 0);
635 let simple_node: &dyn Any = &node_wrapper.node;
636 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
637 assert_eq!(simple_node.state, State::Stopped);
638 assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
639 }
640
641 for (executor, _) in executor.executors.iter() {
642 for node_wrapper in executor.backing.iter() {
643 assert!(node_wrapper.priority == 0);
644 let simple_node: &dyn Any = &node_wrapper.node;
645 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
646 assert_eq!(simple_node.state, State::Stopped);
647 assert!([8, 9, 10, 11, 12].contains(&simple_node.num));
648 }
649 }
650 }
651}