thread_monitor/
lib.rs

1use std::{
2    sync::{
3        atomic::{AtomicBool, AtomicUsize, Ordering},
4        Arc,
5    },
6    thread::{Builder, JoinHandle},
7};
8
9use crossbeam::queue::SegQueue;
10
11pub mod monitor {
12    mod stat_profiler;
13
14    pub use stat_profiler::{StatMonitor, ThreadInfo, ThreadInfoItem, ThreadInfoIterator};
15}
16
17lazy_static::lazy_static! {
18    pub static ref THREAD_MONITOR: ThreadMonitor = ThreadMonitor::new();
19}
20
21pub fn spawn_builder<F, T>(builder: std::thread::Builder, f: F) -> JoinHandle<T>
22where
23    F: FnOnce() -> T,
24    F: Send + 'static,
25    T: Send + 'static,
26{
27    builder
28        .spawn(move || {
29            let name = match std::thread::current().name() {
30                Some(n) => n.to_string(),
31                None => format!("{:?}", std::thread::current().id()),
32            };
33            let n = THREAD_MONITOR.started(name.clone(), None);
34            let r = f();
35            THREAD_MONITOR.finished(n, name);
36            r
37        })
38        .unwrap()
39}
40
41pub fn spawn<F, T>(name: String, f: F) -> JoinHandle<T>
42where
43    F: FnOnce() -> T,
44    F: Send + 'static,
45    T: Send + 'static,
46{
47    spawn_builder(Builder::new().name(name), f)
48}
49
50pub fn spawn_str<F, T>(name: &str, f: F) -> JoinHandle<T>
51where
52    F: FnOnce() -> T,
53    F: Send + 'static,
54    T: Send + 'static,
55{
56    spawn_builder(Builder::new().name(name.to_string()), f)
57}
58
59pub fn spawn_builder_with_state<F, T>(builder: std::thread::Builder, f: F) -> JoinHandle<T>
60where
61    F: FnOnce(ThreadState) -> T,
62    F: Send + 'static,
63    T: Send + 'static,
64{
65    // like std::thread::spawn, but with name
66    builder
67        .spawn(move || {
68            let name = match std::thread::current().name() {
69                Some(n) => n.to_string(),
70                None => format!("{:?}", std::thread::current().id()),
71            };
72            let state = ThreadState::new();
73            let n = THREAD_MONITOR.started(name.clone(), Some(state.clone()));
74            let r = f(state.clone());
75            state.done();
76            THREAD_MONITOR.finished(n, name);
77            r
78        })
79        .unwrap()
80}
81
82pub fn spawn_with_state<F, T>(name: String, f: F) -> JoinHandle<T>
83where
84    F: FnOnce(ThreadState) -> T,
85    F: Send + 'static,
86    T: Send + 'static,
87{
88    spawn_builder_with_state(Builder::new().name(name), f)
89}
90
91pub fn spawn_str_with_state<F, T>(name: &str, f: F) -> JoinHandle<T>
92where
93    F: FnOnce(ThreadState) -> T,
94    F: Send + 'static,
95    T: Send + 'static,
96{
97    spawn_builder_with_state(Builder::new().name(name.to_string()), f)
98}
99
100use std::cell::RefCell;
101thread_local!(static TOKIO_LOCAL: RefCell<Option<TokioLocalData>> = RefCell::new(None));
102
103struct TokioLocalData {
104    monitor_n: usize,
105    state: ThreadState,
106}
107
108pub fn tokio_start() {
109    let name = match std::thread::current().name() {
110        Some(n) => n.to_string(),
111        None => format!("{:?}", std::thread::current().id()),
112    };
113    let state = ThreadState::new();
114    let n = THREAD_MONITOR.started(name, Some(state.clone()));
115    TOKIO_LOCAL.set(Some(TokioLocalData {
116        monitor_n: n,
117        state,
118    }));
119}
120
121pub fn tokio_park() {
122    TOKIO_LOCAL.with_borrow(|opt_data| {
123        if let Some(TokioLocalData { state, .. }) = opt_data {
124            state.idle();
125        }
126    });
127}
128
129pub fn tokio_unpark() {
130    TOKIO_LOCAL.with_borrow(|opt_data| {
131        if let Some(TokioLocalData { state, .. }) = opt_data {
132            state.payload();
133        }
134    });
135}
136
137pub fn tokio_stop() {
138    let name = match std::thread::current().name() {
139        Some(n) => n.to_string(),
140        None => format!("{:?}", std::thread::current().id()),
141    };
142    TOKIO_LOCAL.with_borrow(|opt_data| {
143        if let Some(TokioLocalData { monitor_n, state }) = opt_data {
144            state.done();
145            THREAD_MONITOR.finished(*monitor_n, name);
146        }
147    });
148}
149
150#[derive(Debug)]
151pub enum ThreadStatus {
152    Started {
153        n: usize,
154        name: String,
155        tm: time::OffsetDateTime,
156        state: Option<ThreadState>,
157    },
158    Finished {
159        n: usize,
160        name: String,
161        tm: time::OffsetDateTime,
162    },
163}
164
165#[derive(Debug, Clone, Copy)]
166pub enum State {
167    Init,
168    Payload,
169    Idle,
170    Wait,
171    Done,
172}
173impl State {
174    fn opt_from(u: usize) -> Option<State> {
175        Some(match u {
176            0 => State::Init,
177            1 => State::Payload,
178            2 => State::Idle,
179            3 => State::Wait,
180            4 => State::Done,
181            _ => return None,
182        })
183    }
184}
185impl Into<usize> for State {
186    fn into(self) -> usize {
187        match self {
188            State::Init => 0,
189            State::Payload => 1,
190            State::Idle => 2,
191            State::Wait => 3,
192            State::Done => 4,
193        }
194    }
195}
196
197#[derive(Debug, Clone)]
198pub struct ThreadState(Arc<AtomicUsize>);
199impl ThreadState {
200    fn new() -> ThreadState {
201        ThreadState(Arc::new(AtomicUsize::new(State::Init.into())))
202    }
203    pub fn get(&self) -> Option<State> {
204        State::opt_from(self.0.load(Ordering::SeqCst))
205    }
206    pub fn payload(&self) {
207        self.0.store(State::Payload.into(), Ordering::SeqCst);
208    }
209    pub fn idle(&self) {
210        self.0.store(State::Idle.into(), Ordering::SeqCst);
211    }
212    pub fn wait(&self) {
213        self.0.store(State::Wait.into(), Ordering::SeqCst);
214    }
215    fn done(&self) {
216        self.0.store(State::Done.into(), Ordering::SeqCst);
217    }
218}
219
220pub struct ThreadMonitor {
221    on: AtomicBool,
222    next: AtomicUsize,
223    messages: SegQueue<ThreadStatus>,
224}
225impl ThreadMonitor {
226    fn new() -> ThreadMonitor {
227        log::info!("thread monitor processing is turned OFF");
228        ThreadMonitor {
229            on: AtomicBool::new(false),
230            next: AtomicUsize::new(0),
231            messages: SegQueue::new(),
232        }
233    }
234    fn started(&self, name: String, state: Option<ThreadState>) -> usize {
235        let n = self.next.fetch_add(1, Ordering::SeqCst);
236        log::info!("thread {}: {} has been spawned", n, name);
237        if self.on.load(Ordering::SeqCst) {
238            self.messages.push(ThreadStatus::Started {
239                n: n,
240                name,
241                tm: time::OffsetDateTime::now_utc(),
242                state: state,
243            });
244        }
245        n
246    }
247    fn finished(&self, n: usize, name: String) {
248        log::info!("thread {}: {} finished", n, name);
249        if self.on.load(Ordering::SeqCst) {
250            self.messages.push(ThreadStatus::Finished {
251                n: n,
252                name,
253                tm: time::OffsetDateTime::now_utc(),
254            });
255        }
256    }
257    pub fn turn_on(&self) {
258        log::info!("thread monitor processing is turned ON");
259        self.on.store(true, Ordering::SeqCst);
260    }
261    pub fn turn_off(&self) {
262        log::info!("thread monitor processing is turned OFF");
263        self.on.store(false, Ordering::SeqCst);
264    }
265    pub fn process(&self) -> Option<Vec<ThreadStatus>> {
266        if self.on.load(Ordering::SeqCst) {
267            let mut v = Vec::new();
268            while let Some(msg) = self.messages.pop() {
269                v.push(msg);
270            }
271            Some(v)
272        } else {
273            None
274        }
275    }
276}