1use std::{
2 sync::Arc,
3 thread::JoinHandle,
4 time::{Duration, Instant}
5};
6
7use parking_lot::{Condvar, Mutex};
8
9const MAX_PROC_MILLIS: u64 = 200;
11
12enum State {
13 Idle,
15
16 Processing {
18 start_time: Instant
19 },
20
21 Timeout,
23
24 Term
25}
26
27struct Inner {
28 state: State
29}
30
31struct Shared {
32 inner: Mutex<Inner>,
33 signal: Condvar
34}
35
36pub fn run() -> WatchDog {
37 let inner = Inner { state: State::Idle };
38
39 let shared = Shared {
40 inner: Mutex::new(inner),
41 signal: Condvar::new()
42 };
43
44 let sh = Arc::new(shared);
45 let shared = Arc::clone(&sh);
46 let jh = std::thread::spawn(move || monitor_thread(&shared));
47
48 WatchDog { sh, jh }
49}
50
51pub struct WatchDog {
52 sh: Arc<Shared>,
53 jh: JoinHandle<()>
54}
55
56impl WatchDog {
57 pub(crate) fn begin_process(&self) {
58 let mut g = self.sh.inner.lock();
59 g.state = State::Processing {
60 start_time: Instant::now()
61 };
62 self.sh.signal.notify_one();
63 drop(g);
64 }
65
66 pub(crate) fn end_process(&self) {
67 let mut g = self.sh.inner.lock();
68 g.state = State::Idle;
69 self.sh.signal.notify_one();
70 drop(g);
71 }
72
73 pub(crate) fn kill(self) -> std::thread::Result<()> {
74 let mut g = self.sh.inner.lock();
75 g.state = State::Term;
76 self.sh.signal.notify_one();
77 drop(g);
78 self.jh.join()
79 }
80}
81
82
83#[allow(clippy::significant_drop_tightening)]
84fn monitor_thread(sh: &Arc<Shared>) {
85 let mut g = sh.inner.lock();
86 loop {
87 match g.state {
88 State::Idle => {
89 sh.signal.wait(&mut g);
91 }
92 State::Processing { start_time } => {
93 let timeout = start_time + Duration::from_millis(MAX_PROC_MILLIS);
94 if sh.signal.wait_until(&mut g, timeout).timed_out() {
95 g.state = State::Timeout;
96 continue;
97 }
98 }
99 State::Timeout => {
100 #[cfg(feature = "tracing")]
101 tracing::warn!(
102 "Message processing held up the dispatcher more than {}ms",
103 MAX_PROC_MILLIS
104 );
105
106 #[cfg(not(feature = "tracing"))]
107 eprintln!(
108 "Warning: Message processing held up the dispatcher more than {}ms",
109 MAX_PROC_MILLIS
110 );
111
112 g.state = State::Idle;
114 continue;
115 }
116 State::Term => {
117 break;
118 }
119 }
120 }
121}
122
123