1use std::any::Any;
16
17use crossbeam::channel::Receiver;
18
19use quanta::{Clock, Instant};
20
21use ncomm_core::{Executor, ExecutorState, Node};
22
23use crate::{insert_into, NodeWrapper};
24
25pub struct SimpleExecutor<ID: PartialEq> {
38 pub(crate) backing: Vec<NodeWrapper<ID>>,
40 clock: Clock,
42 state: ExecutorState,
44 start_instant: Instant,
46 interrupt: Receiver<bool>,
48 interrupted: bool,
50}
51
52impl<ID: PartialEq> SimpleExecutor<ID> {
53 pub fn new(interrupt: Receiver<bool>) -> Self {
55 let clock = Clock::new();
56 let now = clock.now();
57
58 Self {
59 backing: Vec::new(),
60 clock,
61 start_instant: now,
62 state: ExecutorState::Stopped,
63 interrupt,
64 interrupted: false,
65 }
66 }
67
68 pub fn new_with(interrupt: Receiver<bool>, mut nodes: Vec<Box<dyn Node<ID>>>) -> Self {
70 let mut backing = Vec::new();
71 for node in nodes.drain(..) {
72 backing.push(NodeWrapper { priority: 0, node });
73 }
74
75 let clock = Clock::new();
76 let now = clock.now();
77
78 Self {
79 backing,
80 clock,
81 start_instant: now,
82 state: ExecutorState::Stopped,
83 interrupt,
84 interrupted: false,
85 }
86 }
87}
88
89impl<ID: PartialEq> Executor<ID> for SimpleExecutor<ID> {
90 type Context = Box<dyn Any>;
92
93 fn start(&mut self) {
100 for node_wrapper in self.backing.iter_mut() {
101 node_wrapper.priority = 0;
102 node_wrapper.node.start();
103 }
104
105 self.interrupted = false;
106 self.state = ExecutorState::Started;
107 self.start_instant = self.clock.now();
108 }
109
110 fn update_for_ms(&mut self, ms: u128) {
116 self.start();
118
119 self.state = ExecutorState::Running;
121 while self
122 .clock
123 .now()
124 .duration_since(self.start_instant)
125 .as_millis()
126 < ms
127 && !self.check_interrupt()
128 {
129 if self.backing.last().is_some()
130 && self
131 .clock
132 .now()
133 .duration_since(self.start_instant)
134 .as_micros()
135 >= self.backing.last().unwrap().priority
136 {
137 let mut node_wrapper = self.backing.pop().unwrap();
138 node_wrapper.node.update();
139 node_wrapper.priority += node_wrapper.node.get_update_delay_us();
140 insert_into(&mut self.backing, node_wrapper);
141 }
142 }
143
144 for node_wrapper in self.backing.iter_mut() {
146 node_wrapper.priority = 0;
147 node_wrapper.node.shutdown();
148 }
149 self.state = ExecutorState::Stopped;
150 }
151
152 fn update_loop(&mut self) {
157 self.start();
159
160 self.state = ExecutorState::Running;
162 while !self.check_interrupt() {
163 if self.backing.last().is_some()
164 && self
165 .clock
166 .now()
167 .duration_since(self.start_instant)
168 .as_micros()
169 >= self.backing.last().unwrap().priority
170 {
171 let mut node_wrapper = self.backing.pop().unwrap();
172 node_wrapper.node.update();
173 node_wrapper.priority += node_wrapper.node.get_update_delay_us();
174 insert_into(&mut self.backing, node_wrapper);
175 }
176 }
177
178 for node_wrapper in self.backing.iter_mut() {
180 node_wrapper.priority = 0;
181 node_wrapper.node.shutdown();
182 }
183 self.state = ExecutorState::Stopped;
184 }
185
186 fn check_interrupt(&mut self) -> bool {
190 if let Ok(interrupt) = self.interrupt.try_recv() {
191 self.interrupted = interrupt;
192 }
193 self.interrupted
194 }
195
196 fn add_node(&mut self, node: Box<dyn Node<ID>>) {
203 if let Some(idx) = self
204 .backing
205 .iter()
206 .position(|node_wrapper| node_wrapper.node.get_id().eq(&node.get_id()))
207 {
208 self.backing.remove(idx);
209 }
210
211 if self.state == ExecutorState::Stopped {
212 self.backing.push(NodeWrapper { priority: 0, node });
213 } else if self.state == ExecutorState::Started {
214 insert_into(
215 &mut self.backing,
216 NodeWrapper {
217 priority: self
218 .clock
219 .now()
220 .duration_since(self.start_instant)
221 .as_micros(),
222 node,
223 },
224 );
225 }
226 }
227
228 fn remove_node(&mut self, id: &ID) -> Option<Box<dyn Node<ID>>> {
232 if self.state != ExecutorState::Running {
233 let idx = self
234 .backing
235 .iter()
236 .position(|node_wrapper| node_wrapper.node.get_id().eq(id));
237 if let Some(idx) = idx {
238 Some(self.backing.remove(idx).destroy())
239 } else {
240 None
241 }
242 } else {
243 None
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251
252 use std::{any::Any, thread, time::Duration};
253
254 use crossbeam::channel::unbounded;
255
256 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
257 enum State {
258 Stopped,
259 Started,
260 Updating,
261 }
262
263 pub struct SimpleNode {
264 id: u8,
265 pub update_delay: u128,
266 pub num: u8,
267 state: State,
268 }
269
270 impl SimpleNode {
271 pub fn new(id: u8, update_delay: u128) -> Self {
272 Self {
273 id,
274 update_delay,
275 num: 0,
276 state: State::Stopped,
277 }
278 }
279 }
280
281 impl Node<u8> for SimpleNode {
282 fn get_id(&self) -> u8 {
283 self.id
284 }
285 fn start(&mut self) {
286 self.state = State::Started;
287 }
288
289 fn update(&mut self) {
290 self.state = State::Updating;
291 self.num = self.num.wrapping_add(1);
292 }
293
294 fn shutdown(&mut self) {
295 self.state = State::Stopped;
296 }
297
298 fn get_update_delay_us(&self) -> u128 {
299 self.update_delay
300 }
301 }
302
303 #[test]
304 fn test_simple_executor_start() {
308 let (_, rx) = unbounded();
309
310 let mut executor = SimpleExecutor::new_with(
311 rx,
312 vec![
313 Box::new(SimpleNode::new(0, 100_000)),
314 Box::new(SimpleNode::new(1, 250_000)),
315 ],
316 );
317 let original_start_instant = executor.start_instant;
318
319 executor.start();
320
321 for node_wrapper in executor.backing.iter() {
322 assert_eq!(node_wrapper.priority, 0);
323 let simple_node: &dyn Any = &node_wrapper.node;
324 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
325 assert_eq!(simple_node.state, State::Started);
326 }
327 assert!(!executor.interrupted);
328 assert_eq!(executor.state, ExecutorState::Started);
329 assert!(executor.start_instant > original_start_instant);
330 }
331
332 #[test]
333 fn test_update_for_ms() {
334 let (_, rx) = unbounded();
335
336 let mut executor = SimpleExecutor::new_with(
337 rx,
338 vec![
339 Box::new(SimpleNode::new(0, 10_000)),
340 Box::new(SimpleNode::new(1, 25_000)),
341 ],
342 );
343
344 let start = executor.clock.now();
345 executor.update_for_ms(100);
346 let end = executor.clock.now();
347
348 for node_wrapper in executor.backing.iter() {
350 assert!(node_wrapper.priority == 0);
352 let simple_node: &dyn Any = &node_wrapper.node;
353 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
354 assert_eq!(simple_node.state, State::Stopped);
355 assert!([9, 10, 11, 3, 4, 5].contains(&simple_node.num));
357 }
358
359 assert!(Duration::from_millis(95) < end - start);
360 assert!(end - start < Duration::from_millis(105));
361 }
362
363 #[test]
364 fn test_check_interrupt() {
365 let (tx, rx) = unbounded();
366
367 let mut executor = SimpleExecutor::new_with(
368 rx,
369 vec![
370 Box::new(SimpleNode::new(0, 110_000)),
371 Box::new(SimpleNode::new(1, 25_000)),
372 ],
373 );
374
375 tx.send(true).unwrap();
376
377 assert!(executor.check_interrupt());
378 }
379
380 #[test]
381 fn test_add_node_stopped() {
382 let (_, rx) = unbounded();
383
384 let mut executor = SimpleExecutor::new_with(
385 rx,
386 vec![
387 Box::new(SimpleNode::new(0, 10_000)),
388 Box::new(SimpleNode::new(1, 25_000)),
389 ],
390 );
391
392 executor.add_node(Box::new(SimpleNode::new(2, 1_000)));
393
394 assert_eq!(executor.backing.len(), 3);
395 }
396
397 #[test]
398 fn test_add_node_same_id() {
399 let (_, rx) = unbounded();
400
401 let mut executor = SimpleExecutor::new_with(
402 rx,
403 vec![
404 Box::new(SimpleNode::new(0, 10_000)),
405 Box::new(SimpleNode::new(1, 25_000)),
406 ],
407 );
408
409 executor.add_node(Box::new(SimpleNode::new(0, 1_000)));
410
411 assert_eq!(executor.backing.len(), 2);
412 let zero_id = executor
413 .backing
414 .iter()
415 .find(|node_wrapper| node_wrapper.node.get_id().eq(&0))
416 .unwrap();
417 assert_eq!(zero_id.node.get_update_delay_us(), 1_000);
418 }
419
420 #[test]
421 fn test_remove_node() {
422 let (_, rx) = unbounded();
423
424 let mut executor = SimpleExecutor::new_with(
425 rx,
426 vec![
427 Box::new(SimpleNode::new(0, 10_000)),
428 Box::new(SimpleNode::new(1, 25_000)),
429 ],
430 );
431
432 executor.remove_node(&0);
433
434 assert_eq!(executor.backing.len(), 1);
435 assert_eq!(executor.backing[0].node.get_id(), 1);
436 }
437
438 #[test]
439 fn test_update_loop() {
440 let (tx, rx) = unbounded();
441
442 let mut executor = SimpleExecutor::new_with(
443 rx,
444 vec![
445 Box::new(SimpleNode::new(0, 10_000)),
446 Box::new(SimpleNode::new(1, 25_000)),
447 ],
448 );
449
450 let handle = thread::spawn(move || {
451 executor.update_loop();
452 executor
453 });
454
455 thread::sleep(Duration::from_millis(100));
456 tx.send(true).unwrap();
457
458 let executor = handle.join().unwrap();
459 for node_wrapper in executor.backing.iter() {
460 assert_eq!(node_wrapper.priority, 0);
461 let simple_node: &dyn Any = &node_wrapper.node;
462 let simple_node: &Box<SimpleNode> = unsafe { simple_node.downcast_ref_unchecked() };
463 assert_eq!(simple_node.state, State::Stopped);
464 assert!([3, 4, 5, 9, 10, 11].contains(&simple_node.num));
465 }
466
467 assert!(executor.interrupted);
468 assert_eq!(executor.state, ExecutorState::Stopped);
469 }
470}