cog_task/action/core/
clock.rs

1use crate::action::{Action, Props, StatefulAction, INFINITE};
2use crate::comm::{QWriter, Signal, SignalId};
3use crate::resource::{IoManager, ResourceManager};
4use crate::server::{AsyncSignal, Config, State, SyncSignal};
5use crate::util::spin_sleeper;
6use eyre::{eyre, Context, Result};
7use serde::{Deserialize, Serialize};
8use serde_cbor::Value;
9use std::collections::BTreeSet;
10use std::sync::mpsc::{self, Sender, TryRecvError};
11use std::thread;
12use std::time::{Duration, Instant};
13
14const MIN_STEP_DELAY: f32 = 0.010;
15
16#[derive(Debug, Deserialize, Serialize)]
17pub struct Clock {
18    step: f32,
19    #[serde(default)]
20    from: i64,
21    #[serde(default)]
22    on_start: bool,
23    out_tic: SignalId,
24}
25
26stateful!(Clock {
27    link: Sender<()>,
28});
29
30impl Action for Clock {
31    #[inline]
32    fn out_signals(&self) -> BTreeSet<SignalId> {
33        BTreeSet::from([self.out_tic])
34    }
35
36    fn init(self) -> Result<Box<dyn Action>>
37    where
38        Self: 'static + Sized,
39    {
40        if self.step < MIN_STEP_DELAY {
41            return Err(eyre!("Step size for clock is smaller than MIN_STEP_DELAY."));
42        }
43
44        if self.out_tic == 0 {
45            return Err(eyre!("Clock with no `out_signal` is useless."));
46        }
47
48        Ok(Box::new(self))
49    }
50
51    fn stateful(
52        &self,
53        _io: &IoManager,
54        _res: &ResourceManager,
55        _config: &Config,
56        sync_writer: &QWriter<SyncSignal>,
57        _async_writer: &QWriter<AsyncSignal>,
58    ) -> Result<Box<dyn StatefulAction>> {
59        let (tx, rx) = mpsc::channel();
60
61        {
62            let step = Duration::from_secs_f32(self.step);
63            let on_start = self.on_start;
64            let out_tic = self.out_tic;
65            let mut tics = self.from as i128;
66            let mut sync_writer = sync_writer.clone();
67
68            thread::spawn(move || {
69                let sleeper = spin_sleeper();
70
71                if rx.recv().is_err() {
72                    return;
73                }
74
75                if on_start {
76                    sync_writer.push(SyncSignal::Emit(
77                        Instant::now(),
78                        vec![(out_tic, Value::Integer(tics))].into(),
79                    ));
80                }
81
82                while let Err(TryRecvError::Empty) | Ok(()) = rx.try_recv() {
83                    tics += 1;
84                    sleeper.sleep(step);
85                    sync_writer.push(SyncSignal::Emit(
86                        Instant::now(),
87                        vec![(out_tic, Value::Integer(tics))].into(),
88                    ));
89                }
90            });
91        }
92
93        Ok(Box::new(StatefulClock {
94            done: false,
95            link: tx,
96        }))
97    }
98}
99
100impl StatefulAction for StatefulClock {
101    impl_stateful!();
102
103    fn props(&self) -> Props {
104        INFINITE.into()
105    }
106
107    fn start(
108        &mut self,
109        _sync_writer: &mut QWriter<SyncSignal>,
110        _async_writer: &mut QWriter<AsyncSignal>,
111        _state: &State,
112    ) -> Result<Signal> {
113        self.link
114            .send(())
115            .wrap_err("Failed to communicate with the clock thread.")?;
116        Ok(Signal::none())
117    }
118}