ump_ng_server/
wdog.rs

1use std::{
2  sync::Arc,
3  thread::JoinHandle,
4  time::{Duration, Instant}
5};
6
7use parking_lot::{Condvar, Mutex};
8
9/// Maximum amount of milliseconds allowed
10const MAX_PROC_MILLIS: u64 = 200;
11
12enum State {
13  /// Waiting for a message to arrive.
14  Idle,
15
16  /// A message has arrived and is being processed.
17  Processing {
18    start_time: Instant
19  },
20
21  /// Message processing has timed out.
22  Timeout,
23
24  Term
25}
26
27struct Inner {
28  state: State
29}
30
31struct Shared {
32  inner: Mutex<Inner>,
33  signal: Condvar
34}
35
36pub fn run() -> WatchDog {
37  let inner = Inner { state: State::Idle };
38
39  let shared = Shared {
40    inner: Mutex::new(inner),
41    signal: Condvar::new()
42  };
43
44  let sh = Arc::new(shared);
45  let shared = Arc::clone(&sh);
46  let jh = std::thread::spawn(move || monitor_thread(&shared));
47
48  WatchDog { sh, jh }
49}
50
51pub struct WatchDog {
52  sh: Arc<Shared>,
53  jh: JoinHandle<()>
54}
55
56impl WatchDog {
57  pub(crate) fn begin_process(&self) {
58    let mut g = self.sh.inner.lock();
59    g.state = State::Processing {
60      start_time: Instant::now()
61    };
62    self.sh.signal.notify_one();
63    drop(g);
64  }
65
66  pub(crate) fn end_process(&self) {
67    let mut g = self.sh.inner.lock();
68    g.state = State::Idle;
69    self.sh.signal.notify_one();
70    drop(g);
71  }
72
73  pub(crate) fn kill(self) -> std::thread::Result<()> {
74    let mut g = self.sh.inner.lock();
75    g.state = State::Term;
76    self.sh.signal.notify_one();
77    drop(g);
78    self.jh.join()
79  }
80}
81
82
83#[allow(clippy::significant_drop_tightening)]
84fn monitor_thread(sh: &Arc<Shared>) {
85  let mut g = sh.inner.lock();
86  loop {
87    match g.state {
88      State::Idle => {
89        // Wait to be notified about a state change
90        sh.signal.wait(&mut g);
91      }
92      State::Processing { start_time } => {
93        let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
94        if sh.signal.wait_until(&mut g, timeout).timed_out() {
95          g.state = State::Timeout;
96          continue;
97        }
98      }
99      State::Timeout => {
100        #[cfg(feature = "tracing")]
101        tracing::warn!(
102          "Message processing held up the dispatcher more than {}ms",
103          MAX_PROC_MILLIS
104        );
105
106        #[cfg(not(feature = "tracing"))]
107        eprintln!(
108          "Warning: Message processing held up the dispatcher more than {}ms",
109          MAX_PROC_MILLIS
110        );
111
112        // Retutn to idle state
113        g.state = State::Idle;
114        continue;
115      }
116      State::Term => {
117        break;
118      }
119    }
120  }
121}
122
123// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :