heartbeat_watchdog/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2#![deny(missing_docs)]
3#![ doc = include_str!( concat!( env!( "CARGO_MANIFEST_DIR" ), "/", "README.md" ) ) ]
4use core::{future::Future, ops, time::Duration};
5#[cfg(feature = "embassy")]
6use embassy_time::Instant;
7#[cfg(feature = "std")]
8use std::{sync::Arc, time::Instant};
9
10use io::{WatchdogIo, WatchdogIoAsync};
11use portable_atomic::{AtomicBool, Ordering};
12#[cfg(feature = "std")]
13use rtsc::{policy_channel, policy_channel_async};
14
15/// Watchdog I/O
16pub mod io;
17
18/// Errors
19#[derive(thiserror::Error, Debug)]
20pub enum Error {
21    #[cfg(feature = "std")]
22    /// IO error (std)
23    #[error("IO error: {0}")]
24    Io(std::io::Error),
25    /// Timeout
26    #[error("Timed out")]
27    Timeout,
28    /// All other errors
29    #[cfg(feature = "std")]
30    #[error("Failed: {0}")]
31    Failed(String),
32    /// All other errors (no std)
33    #[cfg(not(feature = "std"))]
34    #[error("Failed")]
35    Failed,
36}
37
38#[cfg(feature = "std")]
39impl From<std::io::Error> for Error {
40    fn from(e: std::io::Error) -> Self {
41        match e.kind() {
42            std::io::ErrorKind::TimedOut | std::io::ErrorKind::WouldBlock => Error::Timeout,
43            _ => Error::Io(e),
44        }
45    }
46}
47
48impl Error {
49    #[cfg(feature = "std")]
50    /// Create a new failed error
51    pub fn failed<T: core::fmt::Display>(msg: T) -> Self {
52        Error::Failed(msg.to_string())
53    }
54    #[cfg(not(feature = "std"))]
55    /// Create a new failed error
56    pub fn failed() -> Self {
57        Error::Failed
58    }
59}
60
61/// Result type
62pub type Result<T> = core::result::Result<T, Error>;
63
64#[cfg(feature = "std")]
65type RawMutex = rtsc::pi::RawMutex;
66#[cfg(feature = "std")]
67type Condvar = rtsc::pi::Condvar;
68#[cfg(feature = "embassy")]
69type NoopMutex = embassy_sync::blocking_mutex::raw::NoopRawMutex;
70#[cfg(feature = "embassy")]
71/// Embassy state channel
72pub type EmbassyStateChannel = embassy_sync::channel::Channel<NoopMutex, StateEvent, 32>;
73
74/// State event
75#[derive(Debug, Clone, Eq, PartialEq)]
76pub enum StateEvent {
77    /// Watchdog switched to Fault state
78    Fault(FaultKind),
79    /// Watchdog switched to OK state
80    Ok,
81}
82
83impl defmt::Format for StateEvent {
84    fn format(&self, f: defmt::Formatter) {
85        match self {
86            StateEvent::Fault(kind) => defmt::write!(f, "Fault({})", kind),
87            StateEvent::Ok => defmt::write!(f, "Ok"),
88        }
89    }
90}
91
92#[cfg(feature = "std")]
93impl rtsc::data_policy::DataDeliveryPolicy for StateEvent {
94    fn delivery_policy(&self) -> rtsc::data_policy::DeliveryPolicy {
95        rtsc::data_policy::DeliveryPolicy::Latest
96    }
97}
98
99impl From<StateEvent> for State {
100    fn from(e: StateEvent) -> Self {
101        match e {
102            StateEvent::Ok => State::Ok,
103            StateEvent::Fault(_) => State::Fault,
104        }
105    }
106}
107
108impl defmt::Format for State {
109    fn format(&self, f: defmt::Formatter) {
110        match self {
111            State::Fault => defmt::write!(f, "Fault"),
112            State::Ok => defmt::write!(f, "Ok"),
113        }
114    }
115}
116
117/// Watchdog state
118#[repr(u8)]
119#[derive(Debug, Clone, Copy, Eq, PartialEq)]
120pub enum State {
121    /// Fault state
122    Fault = 0,
123    /// OK state
124    Ok = 1,
125}
126
127impl From<u8> for State {
128    fn from(b: u8) -> Self {
129        match b {
130            0 => State::Fault,
131            _ => State::Ok,
132        }
133    }
134}
135
136impl From<bool> for State {
137    fn from(b: bool) -> Self {
138        if b {
139            State::Ok
140        } else {
141            State::Fault
142        }
143    }
144}
145
146impl From<State> for bool {
147    fn from(s: State) -> bool {
148        match s {
149            State::Fault => false,
150            State::Ok => true,
151        }
152    }
153}
154
155/// Edge
156#[repr(u8)]
157#[derive(Debug, Clone, Copy, Eq, PartialEq)]
158pub enum Edge {
159    /// Rising edge
160    Rising = b'+',
161    /// Falling edge
162    Falling = b'.',
163}
164
165impl ops::Not for Edge {
166    type Output = Self;
167    fn not(self) -> Self {
168        match self {
169            Edge::Rising => Edge::Falling,
170            Edge::Falling => Edge::Rising,
171        }
172    }
173}
174
175impl From<u8> for Edge {
176    fn from(b: u8) -> Self {
177        match b {
178            1 | b'+' => Edge::Rising,
179            _ => Edge::Falling,
180        }
181    }
182}
183
184impl From<bool> for Edge {
185    fn from(b: bool) -> Self {
186        if b {
187            Edge::Rising
188        } else {
189            Edge::Falling
190        }
191    }
192}
193
194impl From<Edge> for bool {
195    fn from(e: Edge) -> bool {
196        match e {
197            Edge::Rising => true,
198            Edge::Falling => false,
199        }
200    }
201}
202
203/// Heartbeat range
204#[derive(Debug, Clone)]
205pub enum Range {
206    /// Upper bound (timeout)
207    Timeout(Duration),
208    /// Time window
209    Window(Duration),
210}
211
212/// Fault state kind
213#[derive(Debug, Clone, Copy, Eq, PartialEq)]
214pub enum FaultKind {
215    /// Initial state (watchdog is always started in "Fault")
216    Initial,
217    /// No heartbeat received in time
218    Timeout,
219    /// Heartbeat not in the time window
220    Window,
221    /// Out-of-order edge (e.g. for TCP/IP packets)
222    OutOfOrder,
223}
224
225impl defmt::Format for FaultKind {
226    fn format(&self, f: defmt::Formatter) {
227        match self {
228            FaultKind::Initial => defmt::write!(f, "Initial"),
229            FaultKind::Timeout => defmt::write!(f, "Timeout"),
230            FaultKind::Window => defmt::write!(f, "Window"),
231            FaultKind::OutOfOrder => defmt::write!(f, "OutOfOrder"),
232        }
233    }
234}
235
236impl Range {
237    /// Get the relative I/O timeout duration
238    #[allow(dead_code)]
239    pub fn timeout(&self) -> Duration {
240        match self {
241            Range::Timeout(d) | Range::Window(d) => *d,
242        }
243    }
244}
245
246/// Watchdog configuration
247#[derive(Debug, Clone)]
248pub struct WatchdogConfig {
249    interval: Duration,
250    range: Range,
251    warmup: Duration,
252    min_beats: u32,
253}
254
255impl WatchdogConfig {
256    /// Create a new watchdog configuration
257    pub fn new(interval: Duration) -> Self {
258        Self {
259            interval,
260            range: Range::Timeout(interval + interval / 10),
261            warmup: interval * 2,
262            min_beats: 2,
263        }
264    }
265    /// Set the range
266    pub fn with_range(mut self, range: Range) -> Self {
267        self.range = range;
268        self
269    }
270    /// Set the warmup time (no heartbeat checked after startup/fault)
271    pub fn with_warmup(mut self, warmup: Duration) -> Self {
272        self.warmup = warmup;
273        self
274    }
275    /// Set the minimum number of valid beats before switching to OK state
276    pub fn with_min_beats(mut self, min_beats: u32) -> Self {
277        self.min_beats = min_beats;
278        self
279    }
280    /// Get the interval
281    pub fn interval(&self) -> Duration {
282        self.interval
283    }
284    /// Get the range
285    pub fn range(&self) -> &Range {
286        &self.range
287    }
288    /// Get the warmup time
289    pub fn warmup(&self) -> Duration {
290        self.warmup
291    }
292    /// Get the minimum number of valid beats
293    pub fn min_beats(&self) -> u32 {
294        self.min_beats
295    }
296    /// Get timeout for I/O
297    pub fn io_timeout(&self) -> Duration {
298        self.interval + self.range.timeout()
299    }
300}
301
302/// Watchdog
303pub struct Watchdog<I: WatchdogIo> {
304    #[cfg(feature = "std")]
305    inner: Arc<WatchDogInner<I>>,
306    #[cfg(not(feature = "std"))]
307    inner: WatchDogInner<I>,
308}
309
310#[cfg(feature = "std")]
311impl<I: WatchdogIo> Clone for Watchdog<I> {
312    fn clone(&self) -> Self {
313        Self {
314            inner: self.inner.clone(),
315        }
316    }
317}
318
319struct WatchDogProcessor<'a> {
320    packets: u32,
321    next: Edge,
322    last_packet: Instant,
323    config: &'a WatchdogConfig,
324}
325
326impl<'a> WatchDogProcessor<'a> {
327    fn new(config: &'a WatchdogConfig) -> Self {
328        Self {
329            packets: 0,
330            next: Edge::Rising,
331            last_packet: Instant::now(),
332            config,
333        }
334    }
335    fn process(&mut self, res: Result<Edge>, current_state: State) -> Result<Option<StateEvent>> {
336        #[cfg(feature = "std")]
337        let elapsed_ms = u64::try_from(self.last_packet.elapsed().as_micros()).unwrap();
338        #[cfg(feature = "embassy")]
339        let elapsed_ms = self.last_packet.elapsed().as_micros();
340        self.last_packet = Instant::now();
341        match res {
342            Ok(edge) => {
343                if let Range::Window(v) = self.config.range {
344                    if elapsed_ms
345                        < u64::try_from(self.config.interval.as_micros() - v.as_micros()).unwrap()
346                    {
347                        self.packets = 0;
348                        return Ok(Some(StateEvent::Fault(FaultKind::Window)));
349                    }
350                }
351                if edge == self.next {
352                    self.next = !self.next;
353                    if current_state == State::Fault {
354                        self.packets += 1;
355                        if self.packets >= self.config.min_beats * 2 {
356                            return Ok(Some(StateEvent::Ok));
357                        }
358                    }
359                    return Ok(None);
360                }
361                if self.packets > 1 {
362                    self.packets = 0;
363                    return Ok(Some(StateEvent::Fault(FaultKind::OutOfOrder)));
364                }
365                Ok(None)
366            }
367            Err(Error::Timeout) => {
368                self.packets = 0;
369                Ok(Some(StateEvent::Fault(FaultKind::Timeout)))
370            }
371            Err(e) => Err(e),
372        }
373    }
374}
375
376struct WatchDogInner<I: WatchdogIo> {
377    io: I,
378    state: AtomicBool,
379    config: WatchdogConfig,
380    #[cfg(feature = "std")]
381    state_tx: policy_channel::Sender<StateEvent, RawMutex, Condvar>,
382    #[cfg(feature = "std")]
383    state_rx: policy_channel::Receiver<StateEvent, RawMutex, Condvar>,
384}
385
386impl<I: WatchdogIo> Watchdog<I> {
387    /// Create a new watchdog
388    #[allow(clippy::useless_conversion)]
389    pub fn new(config: WatchdogConfig, io: I) -> Self {
390        #[cfg(feature = "std")]
391        let (state_tx, state_rx) = rtsc::policy_channel::bounded(1);
392        Self {
393            inner: WatchDogInner {
394                io,
395                state: AtomicBool::new(State::Fault.into()),
396                config,
397                #[cfg(feature = "std")]
398                state_tx,
399                #[cfg(feature = "std")]
400                state_rx,
401            }
402            .into(),
403        }
404    }
405    /// Get the current state
406    pub fn state(&self) -> State {
407        self.inner.state.load(Ordering::Relaxed).into()
408    }
409    /// Get the state receiver channel
410    #[cfg(feature = "std")]
411    pub fn state_rx(&self) -> policy_channel::Receiver<StateEvent, RawMutex, Condvar> {
412        self.inner.state_rx.clone()
413    }
414    /// Run the watchdog
415    pub fn run(&self) -> Result<()> {
416        self.set_fault(FaultKind::Initial)?;
417        let mut p = WatchDogProcessor::new(&self.inner.config);
418        loop {
419            match p.process(self.inner.io.get(p.next), self.state()) {
420                Ok(Some(event)) => match event {
421                    StateEvent::Ok => self.set_ok()?,
422                    StateEvent::Fault(kind) => self.set_fault(kind)?,
423                },
424                Ok(None) => (),
425                Err(e) => return Err(e),
426            }
427        }
428    }
429    #[allow(clippy::unnecessary_wraps)]
430    fn set_ok(&self) -> Result<()> {
431        if self.state() == State::Ok {
432            return Ok(());
433        }
434        self.inner.state.store(true, Ordering::Relaxed);
435        #[cfg(feature = "std")]
436        self.inner
437            .state_tx
438            .send(StateEvent::Ok)
439            .map_err(Error::failed)?;
440        Ok(())
441    }
442    fn set_fault(&self, kind: FaultKind) -> Result<()> {
443        if self.state() == State::Fault && kind != FaultKind::Initial {
444            return Ok(());
445        }
446        self.inner.state.store(false, Ordering::Relaxed);
447        #[cfg(feature = "std")]
448        self.inner
449            .state_tx
450            .send(StateEvent::Fault(kind))
451            .map_err(Error::failed)?;
452        self.warmup()?;
453        Ok(())
454    }
455    fn warmup(&self) -> Result<()> {
456        #[cfg(feature = "std")]
457        std::thread::sleep(self.inner.config.warmup);
458        self.inner.io.clear()?;
459        Ok(())
460    }
461}
462
463/// Watchdog
464pub struct WatchdogAsync<I: WatchdogIoAsync> {
465    #[cfg(feature = "std")]
466    inner: Arc<WatchDogInnerAsync<I>>,
467    #[cfg(not(feature = "std"))]
468    inner: WatchDogInnerAsync<I>,
469}
470
471struct WatchDogInnerAsync<I: WatchdogIoAsync> {
472    io: I,
473    state: AtomicBool,
474    config: WatchdogConfig,
475    #[cfg(feature = "std")]
476    state_tx: policy_channel_async::Sender<StateEvent>,
477    #[cfg(feature = "std")]
478    state_rx: policy_channel_async::Receiver<StateEvent>,
479    #[cfg(feature = "embassy")]
480    embassy_state_tx: Option<embassy_sync::channel::Sender<'static, NoopMutex, StateEvent, 32>>,
481}
482
483impl<I: WatchdogIoAsync> WatchdogAsync<I> {
484    /// Create a new watchdog
485    #[allow(clippy::useless_conversion)]
486    pub fn new(config: WatchdogConfig, io: I) -> Self {
487        #[cfg(feature = "std")]
488        let (state_tx, state_rx) = rtsc::policy_channel_async::bounded(1);
489        Self {
490            inner: WatchDogInnerAsync {
491                io,
492                state: AtomicBool::new(State::Fault.into()),
493                config,
494                #[cfg(feature = "std")]
495                state_tx,
496                #[cfg(feature = "std")]
497                state_rx,
498                #[cfg(feature = "embassy")]
499                embassy_state_tx: None,
500            }
501            .into(),
502        }
503    }
504    /// Get the current state
505    pub fn state(&self) -> State {
506        self.inner.state.load(Ordering::Relaxed).into()
507    }
508    #[cfg(feature = "std")]
509    /// Get the state receiver channel
510    pub fn state_rx(&self) -> policy_channel_async::Receiver<StateEvent> {
511        self.inner.state_rx.clone()
512    }
513    #[cfg(all(feature = "embassy", not(feature = "std")))]
514    /// Set the state sender channel
515    pub fn set_state_tx(
516        &mut self,
517        tx: embassy_sync::channel::Sender<'static, NoopMutex, StateEvent, 32>,
518    ) {
519        self.inner.embassy_state_tx = Some(tx);
520    }
521    /// Run the watchdog
522    pub async fn run(&self) -> Result<()> {
523        self.set_fault(FaultKind::Initial).await?;
524        let mut p = WatchDogProcessor::new(&self.inner.config);
525        loop {
526            match p.process(self.inner.io.get(p.next).await, self.state()) {
527                Ok(Some(event)) => match event {
528                    StateEvent::Ok => self.set_ok().await?,
529                    StateEvent::Fault(kind) => self.set_fault(kind).await?,
530                },
531                Ok(None) => (),
532                Err(e) => return Err(e),
533            }
534        }
535    }
536    async fn set_ok(&self) -> Result<()> {
537        if self.state() == State::Ok {
538            return Ok(());
539        }
540        self.inner.state.store(true, Ordering::Relaxed);
541        #[cfg(feature = "std")]
542        self.inner
543            .state_tx
544            .send(StateEvent::Ok)
545            .await
546            .map_err(Error::failed)?;
547        #[cfg(feature = "embassy")]
548        if let Some(tx) = &self.inner.embassy_state_tx {
549            tx.send(StateEvent::Ok).await;
550        }
551        Ok(())
552    }
553    async fn set_fault(&self, kind: FaultKind) -> Result<()> {
554        if self.state() == State::Fault && kind != FaultKind::Initial {
555            return Ok(());
556        }
557        self.inner.state.store(false, Ordering::Relaxed);
558        #[cfg(feature = "std")]
559        self.inner
560            .state_tx
561            .send(StateEvent::Fault(kind))
562            .await
563            .map_err(Error::failed)?;
564        #[cfg(feature = "embassy")]
565        if let Some(tx) = &self.inner.embassy_state_tx {
566            tx.send(StateEvent::Fault(kind)).await;
567        }
568        self.warmup().await?;
569        Ok(())
570    }
571    async fn warmup(&self) -> Result<()> {
572        #[cfg(feature = "std")]
573        async_io::Timer::after(self.inner.config.warmup).await;
574        #[cfg(all(feature = "embassy", not(feature = "std")))]
575        embassy_time::Timer::after(embassy_time::Duration::from_micros(
576            self.inner.config.warmup.as_micros().try_into().unwrap(),
577        ))
578        .await;
579        self.inner.io.clear().await?;
580        Ok(())
581    }
582}
583
584/// Heartbeat client trait
585pub trait Heart {
586    /// Send the current edge
587    fn beat(&self) -> Result<()>;
588}
589
590/// Heartbeat async client trait
591pub trait HeartAsync {
592    /// Send the current edge asynchronouslyyc
593    fn beat_async(&self) -> impl Future<Output = Result<()>>;
594}