cog_task/action/core/
clock.rs1use crate::action::{Action, Props, StatefulAction, INFINITE};
2use crate::comm::{QWriter, Signal, SignalId};
3use crate::resource::{IoManager, ResourceManager};
4use crate::server::{AsyncSignal, Config, State, SyncSignal};
5use crate::util::spin_sleeper;
6use eyre::{eyre, Context, Result};
7use serde::{Deserialize, Serialize};
8use serde_cbor::Value;
9use std::collections::BTreeSet;
10use std::sync::mpsc::{self, Sender, TryRecvError};
11use std::thread;
12use std::time::{Duration, Instant};
13
14const MIN_STEP_DELAY: f32 = 0.010;
15
16#[derive(Debug, Deserialize, Serialize)]
17pub struct Clock {
18 step: f32,
19 #[serde(default)]
20 from: i64,
21 #[serde(default)]
22 on_start: bool,
23 out_tic: SignalId,
24}
25
26stateful!(Clock {
27 link: Sender<()>,
28});
29
30impl Action for Clock {
31 #[inline]
32 fn out_signals(&self) -> BTreeSet<SignalId> {
33 BTreeSet::from([self.out_tic])
34 }
35
36 fn init(self) -> Result<Box<dyn Action>>
37 where
38 Self: 'static + Sized,
39 {
40 if self.step < MIN_STEP_DELAY {
41 return Err(eyre!("Step size for clock is smaller than MIN_STEP_DELAY."));
42 }
43
44 if self.out_tic == 0 {
45 return Err(eyre!("Clock with no `out_signal` is useless."));
46 }
47
48 Ok(Box::new(self))
49 }
50
51 fn stateful(
52 &self,
53 _io: &IoManager,
54 _res: &ResourceManager,
55 _config: &Config,
56 sync_writer: &QWriter<SyncSignal>,
57 _async_writer: &QWriter<AsyncSignal>,
58 ) -> Result<Box<dyn StatefulAction>> {
59 let (tx, rx) = mpsc::channel();
60
61 {
62 let step = Duration::from_secs_f32(self.step);
63 let on_start = self.on_start;
64 let out_tic = self.out_tic;
65 let mut tics = self.from as i128;
66 let mut sync_writer = sync_writer.clone();
67
68 thread::spawn(move || {
69 let sleeper = spin_sleeper();
70
71 if rx.recv().is_err() {
72 return;
73 }
74
75 if on_start {
76 sync_writer.push(SyncSignal::Emit(
77 Instant::now(),
78 vec![(out_tic, Value::Integer(tics))].into(),
79 ));
80 }
81
82 while let Err(TryRecvError::Empty) | Ok(()) = rx.try_recv() {
83 tics += 1;
84 sleeper.sleep(step);
85 sync_writer.push(SyncSignal::Emit(
86 Instant::now(),
87 vec![(out_tic, Value::Integer(tics))].into(),
88 ));
89 }
90 });
91 }
92
93 Ok(Box::new(StatefulClock {
94 done: false,
95 link: tx,
96 }))
97 }
98}
99
100impl StatefulAction for StatefulClock {
101 impl_stateful!();
102
103 fn props(&self) -> Props {
104 INFINITE.into()
105 }
106
107 fn start(
108 &mut self,
109 _sync_writer: &mut QWriter<SyncSignal>,
110 _async_writer: &mut QWriter<AsyncSignal>,
111 _state: &State,
112 ) -> Result<Signal> {
113 self.link
114 .send(())
115 .wrap_err("Failed to communicate with the clock thread.")?;
116 Ok(Signal::none())
117 }
118}