Skip to main content

simple_tokio_watchdog/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use tokio::sync::{mpsc, oneshot};
4use tokio::time::{sleep, Duration, Instant};
5
6#[cfg(feature = "serde")]
7use serde::{Deserialize, Serialize};
8
9#[cfg(feature = "cli")]
10use clap::{Parser, ValueEnum};
11
12#[cfg(test)]
13mod test;
14
15/// Signal for interacting with the watchdog.
16#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
17#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
18#[cfg_attr(feature = "cli", derive(ValueEnum, Parser))]
19pub enum Signal {
20    #[default]
21    Reset,
22    Stop,
23}
24
25/// Signal on watchdog expiration.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
27#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
28pub struct Expired;
29
30/// Watchdog holding the fixed duration.
31#[derive(Debug, Copy, Clone, PartialEq, Eq)]
32pub struct Watchdog {
33    /// The timeout interval.
34    duration: Duration,
35}
36
37impl Watchdog {
38    /// Make a watchdog with the given timeout duration.
39    #[must_use]
40    pub const fn with_timeout(duration: Duration) -> Self {
41        Self { duration }
42    }
43
44    /// Spawn the watchdog actor.
45    ///
46    /// Returns the `reset_tx` and `expire_tx` channels needed for communicating with the watchdog.
47    #[must_use]
48    pub fn run(self) -> (mpsc::Sender<Signal>, oneshot::Receiver<Expired>) {
49        let (reset_tx, reset_rx) = mpsc::channel(16);
50        let (expire_tx, expire_rx) = oneshot::channel();
51        tokio::spawn(self.event_loop(reset_rx, expire_tx));
52        (reset_tx, expire_rx)
53    }
54
55    /// Run the watchdog event loop.
56    async fn event_loop(
57        self,
58        mut reset_rx: mpsc::Receiver<Signal>,
59        expire_tx: oneshot::Sender<Expired>,
60    ) {
61        let sleep = sleep(self.duration);
62        tokio::pin!(sleep);
63        let mut active = true;
64        loop {
65            tokio::select! {
66                msg = reset_rx.recv() => {
67                    match msg {
68                        // on reset: set active, restart sleep.
69                        Some(Signal::Reset) => {
70                            active = true;
71                            sleep.as_mut().reset(Instant::now() + self.duration);
72                        }
73                        // on stop: mark watchdog as not active.
74                        Some(Signal::Stop) => active = false,
75                        // on channel close: exit watchdog.
76                        None => break,
77                    }
78                }
79                () = sleep.as_mut(), if active => {
80                    // on sleep expiry: use up `expire_tx`, then exit.
81                    let _ = expire_tx.send(Expired);
82                    break;
83                },
84            }
85        }
86    }
87}