1use std::{
2 sync::{
3 atomic::{AtomicBool, AtomicUsize, Ordering},
4 Arc,
5 },
6 thread::{Builder, JoinHandle},
7};
8
9use crossbeam::queue::SegQueue;
10
11pub mod monitor {
12 mod stat_profiler;
13
14 pub use stat_profiler::{StatMonitor, ThreadInfo, ThreadInfoItem, ThreadInfoIterator};
15}
16
17lazy_static::lazy_static! {
18 pub static ref THREAD_MONITOR: ThreadMonitor = ThreadMonitor::new();
19}
20
21pub fn spawn_builder<F, T>(builder: std::thread::Builder, f: F) -> JoinHandle<T>
22where
23 F: FnOnce() -> T,
24 F: Send + 'static,
25 T: Send + 'static,
26{
27 builder
28 .spawn(move || {
29 let name = match std::thread::current().name() {
30 Some(n) => n.to_string(),
31 None => format!("{:?}", std::thread::current().id()),
32 };
33 let n = THREAD_MONITOR.started(name.clone(), None);
34 let r = f();
35 THREAD_MONITOR.finished(n, name);
36 r
37 })
38 .unwrap()
39}
40
41pub fn spawn<F, T>(name: String, f: F) -> JoinHandle<T>
42where
43 F: FnOnce() -> T,
44 F: Send + 'static,
45 T: Send + 'static,
46{
47 spawn_builder(Builder::new().name(name), f)
48}
49
50pub fn spawn_str<F, T>(name: &str, f: F) -> JoinHandle<T>
51where
52 F: FnOnce() -> T,
53 F: Send + 'static,
54 T: Send + 'static,
55{
56 spawn_builder(Builder::new().name(name.to_string()), f)
57}
58
59pub fn spawn_builder_with_state<F, T>(builder: std::thread::Builder, f: F) -> JoinHandle<T>
60where
61 F: FnOnce(ThreadState) -> T,
62 F: Send + 'static,
63 T: Send + 'static,
64{
65 builder
67 .spawn(move || {
68 let name = match std::thread::current().name() {
69 Some(n) => n.to_string(),
70 None => format!("{:?}", std::thread::current().id()),
71 };
72 let state = ThreadState::new();
73 let n = THREAD_MONITOR.started(name.clone(), Some(state.clone()));
74 let r = f(state.clone());
75 state.done();
76 THREAD_MONITOR.finished(n, name);
77 r
78 })
79 .unwrap()
80}
81
82pub fn spawn_with_state<F, T>(name: String, f: F) -> JoinHandle<T>
83where
84 F: FnOnce(ThreadState) -> T,
85 F: Send + 'static,
86 T: Send + 'static,
87{
88 spawn_builder_with_state(Builder::new().name(name), f)
89}
90
91pub fn spawn_str_with_state<F, T>(name: &str, f: F) -> JoinHandle<T>
92where
93 F: FnOnce(ThreadState) -> T,
94 F: Send + 'static,
95 T: Send + 'static,
96{
97 spawn_builder_with_state(Builder::new().name(name.to_string()), f)
98}
99
100use std::cell::RefCell;
101thread_local!(static TOKIO_LOCAL: RefCell<Option<TokioLocalData>> = RefCell::new(None));
102
103struct TokioLocalData {
104 monitor_n: usize,
105 state: ThreadState,
106}
107
108pub fn tokio_start() {
109 let name = match std::thread::current().name() {
110 Some(n) => n.to_string(),
111 None => format!("{:?}", std::thread::current().id()),
112 };
113 let state = ThreadState::new();
114 let n = THREAD_MONITOR.started(name, Some(state.clone()));
115 TOKIO_LOCAL.set(Some(TokioLocalData {
116 monitor_n: n,
117 state,
118 }));
119}
120
121pub fn tokio_park() {
122 TOKIO_LOCAL.with_borrow(|opt_data| {
123 if let Some(TokioLocalData { state, .. }) = opt_data {
124 state.idle();
125 }
126 });
127}
128
129pub fn tokio_unpark() {
130 TOKIO_LOCAL.with_borrow(|opt_data| {
131 if let Some(TokioLocalData { state, .. }) = opt_data {
132 state.payload();
133 }
134 });
135}
136
137pub fn tokio_stop() {
138 let name = match std::thread::current().name() {
139 Some(n) => n.to_string(),
140 None => format!("{:?}", std::thread::current().id()),
141 };
142 TOKIO_LOCAL.with_borrow(|opt_data| {
143 if let Some(TokioLocalData { monitor_n, state }) = opt_data {
144 state.done();
145 THREAD_MONITOR.finished(*monitor_n, name);
146 }
147 });
148}
149
150#[derive(Debug)]
151pub enum ThreadStatus {
152 Started {
153 n: usize,
154 name: String,
155 tm: time::OffsetDateTime,
156 state: Option<ThreadState>,
157 },
158 Finished {
159 n: usize,
160 name: String,
161 tm: time::OffsetDateTime,
162 },
163}
164
165#[derive(Debug, Clone, Copy)]
166pub enum State {
167 Init,
168 Payload,
169 Idle,
170 Wait,
171 Done,
172}
173impl State {
174 fn opt_from(u: usize) -> Option<State> {
175 Some(match u {
176 0 => State::Init,
177 1 => State::Payload,
178 2 => State::Idle,
179 3 => State::Wait,
180 4 => State::Done,
181 _ => return None,
182 })
183 }
184}
185impl Into<usize> for State {
186 fn into(self) -> usize {
187 match self {
188 State::Init => 0,
189 State::Payload => 1,
190 State::Idle => 2,
191 State::Wait => 3,
192 State::Done => 4,
193 }
194 }
195}
196
197#[derive(Debug, Clone)]
198pub struct ThreadState(Arc<AtomicUsize>);
199impl ThreadState {
200 fn new() -> ThreadState {
201 ThreadState(Arc::new(AtomicUsize::new(State::Init.into())))
202 }
203 pub fn get(&self) -> Option<State> {
204 State::opt_from(self.0.load(Ordering::SeqCst))
205 }
206 pub fn payload(&self) {
207 self.0.store(State::Payload.into(), Ordering::SeqCst);
208 }
209 pub fn idle(&self) {
210 self.0.store(State::Idle.into(), Ordering::SeqCst);
211 }
212 pub fn wait(&self) {
213 self.0.store(State::Wait.into(), Ordering::SeqCst);
214 }
215 fn done(&self) {
216 self.0.store(State::Done.into(), Ordering::SeqCst);
217 }
218}
219
220pub struct ThreadMonitor {
221 on: AtomicBool,
222 next: AtomicUsize,
223 messages: SegQueue<ThreadStatus>,
224}
225impl ThreadMonitor {
226 fn new() -> ThreadMonitor {
227 log::info!("thread monitor processing is turned OFF");
228 ThreadMonitor {
229 on: AtomicBool::new(false),
230 next: AtomicUsize::new(0),
231 messages: SegQueue::new(),
232 }
233 }
234 fn started(&self, name: String, state: Option<ThreadState>) -> usize {
235 let n = self.next.fetch_add(1, Ordering::SeqCst);
236 log::info!("thread {}: {} has been spawned", n, name);
237 if self.on.load(Ordering::SeqCst) {
238 self.messages.push(ThreadStatus::Started {
239 n: n,
240 name,
241 tm: time::OffsetDateTime::now_utc(),
242 state: state,
243 });
244 }
245 n
246 }
247 fn finished(&self, n: usize, name: String) {
248 log::info!("thread {}: {} finished", n, name);
249 if self.on.load(Ordering::SeqCst) {
250 self.messages.push(ThreadStatus::Finished {
251 n: n,
252 name,
253 tm: time::OffsetDateTime::now_utc(),
254 });
255 }
256 }
257 pub fn turn_on(&self) {
258 log::info!("thread monitor processing is turned ON");
259 self.on.store(true, Ordering::SeqCst);
260 }
261 pub fn turn_off(&self) {
262 log::info!("thread monitor processing is turned OFF");
263 self.on.store(false, Ordering::SeqCst);
264 }
265 pub fn process(&self) -> Option<Vec<ThreadStatus>> {
266 if self.on.load(Ordering::SeqCst) {
267 let mut v = Vec::new();
268 while let Some(msg) = self.messages.pop() {
269 v.push(msg);
270 }
271 Some(v)
272 } else {
273 None
274 }
275 }
276}