1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use std::{
  sync::Arc,
  thread::JoinHandle,
  time::{Duration, Instant}
};

use parking_lot::{Condvar, Mutex};

/// Maximum amount of milliseconds allowed
const MAX_PROC_MILLIS: u64 = 200;

enum State {
  /// Waiting for a message to arrive.
  Idle,

  /// A message has arrived and is being processed.
  Processing {
    start_time: Instant
  },

  /// Message processing has timed out.
  Timeout,

  Term
}

struct Inner {
  state: State
}

struct Shared {
  inner: Mutex<Inner>,
  signal: Condvar
}

pub(crate) fn run() -> WatchDog {
  let inner = Inner { state: State::Idle };

  let shared = Shared {
    inner: Mutex::new(inner),
    signal: Condvar::new()
  };

  let sh = Arc::new(shared);
  let shared = Arc::clone(&sh);
  let jh = std::thread::spawn(|| monitor_thread(shared));

  WatchDog { sh, jh }
}

pub(crate) struct WatchDog {
  sh: Arc<Shared>,
  jh: JoinHandle<()>
}

impl WatchDog {
  pub(crate) fn begin_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Processing {
      start_time: Instant::now()
    };
    self.sh.signal.notify_one();
  }

  pub(crate) fn end_process(&self) {
    let mut g = self.sh.inner.lock();
    g.state = State::Idle;
    self.sh.signal.notify_one();
  }

  pub(crate) fn kill(self) -> std::thread::Result<()> {
    let mut g = self.sh.inner.lock();
    g.state = State::Term;
    self.sh.signal.notify_one();
    drop(g);
    self.jh.join()
  }
}


fn monitor_thread(sh: Arc<Shared>) {
  let mut g = sh.inner.lock();
  loop {
    match g.state {
      State::Idle => {
        // Wait to be notified about a state change
        sh.signal.wait(&mut g);
      }
      State::Processing { start_time } => {
        let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
        if sh.signal.wait_until(&mut g, timeout).timed_out() {
          g.state = State::Timeout;
          continue;
        }
      }
      State::Timeout => {
        #[cfg(feature = "tracing")]
        tracing::warn!(
          "Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        #[cfg(not(feature = "tracing"))]
        eprintln!(
          "Warning: Message processing held up the dispatcher more than {}ms",
          MAX_PROC_MILLIS
        );

        // Retutn to idle state
        g.state = State::Idle;
        continue;
      }
      State::Term => {
        break;
      }
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :