1use std::{
2 sync::{
3 Arc,
4 atomic::{AtomicUsize,Ordering},
5 },
6 time::Instant,
7};
8
9use crate::{
10 ThreadState, ThreadStatus, State,
11};
12
13use oneshot::OneSet;
14
15enum Ask {
16 All(OneSet<ThreadInfoIterator>),
17}
18
19pub struct ThreadInfoIterator {
20 data: ThreadInfoIteratorInner,
21}
22impl ThreadInfoIterator {
23 fn new(data: ThreadInfoIteratorInner) -> ThreadInfoIterator {
24 ThreadInfoIterator { data }
25 }
26}
27
28impl Iterator for ThreadInfoIterator {
29 type Item = ThreadInfoItem;
30
31 fn next(&mut self) -> Option<Self::Item> {
32 match &mut self.data {
33 ThreadInfoIteratorInner::Static(ar) => {
34 for it in ar {
35 let r = it.take();
36 if r.is_some() { return r; }
37 }
38 None
39 },
40 ThreadInfoIteratorInner::Vec(v) => v.pop(),
41 }
42 }
43}
44
45
46#[derive(Debug,Clone)]
47pub struct ThreadInfoItem {
48 name: Arc<String>,
49 info: ThreadInfo,
50}
51impl ThreadInfoItem {
52 pub fn name(&self) -> &str {
53 self.name.as_ref()
54 }
55 pub fn info(&self) -> ThreadInfo {
56 self.info
57 }
58}
59
60
61enum ThreadInfoIteratorInner {
62 Static([Option<ThreadInfoItem>; 16]),
63 Vec(Vec<ThreadInfoItem>)
64}
65impl ThreadInfoIteratorInner {
66 fn new(sz: usize) -> ThreadInfoIteratorInner {
67 if sz > 16 {
68 ThreadInfoIteratorInner::Static([None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None])
69 } else {
70 ThreadInfoIteratorInner::Vec(Vec::with_capacity(sz))
71 }
72 }
73 fn push(&mut self, name: Arc<String>, info: ThreadInfo) {
74 let item = ThreadInfoItem { name, info };
75 match self {
76 ThreadInfoIteratorInner::Static(ar) => match ar.len() < 16 {
77 true => { ar[ar.len()] = Some(item); },
78 false => {
79 let mut v = Vec::with_capacity(32);
80 for it in ar {
81 if let Some(it) = it.take() {
82 v.push(it);
83 }
84 }
85 v.push(item);
86 *self = ThreadInfoIteratorInner::Vec(v);
87 },
88 },
89 ThreadInfoIteratorInner::Vec(v) => v.push(item),
90 };
91 }
92}
93
94
95pub struct StatMonitor {
96 asker: Option<crossbeam::channel::Sender<Ask>>,
97 monitor: Option<std::thread::JoinHandle<()>>,
98 profiler: Option<std::thread::JoinHandle<()>>,
99}
100impl StatMonitor {
101 pub fn new() -> StatMonitor {
102 StatMonitor::create(100)
103 }
104 pub fn with_sample_rate(max_samples_per_sec: u16) -> StatMonitor {
105 StatMonitor::create(max_samples_per_sec)
106 }
107
108 pub fn get_info(&self) -> Option<ThreadInfoIterator> {
109 match &self.asker {
110 None => None,
111 Some(sender) => {
112 let (tx,rx) = oneshot::oneshot();
113 match sender.send(Ask::All(tx)) {
114 Err(_) => None,
115 Ok(()) => rx.wait(),
116 }
117 },
118 }
119 }
120
121 fn create(max_samples_per_sec: u16) -> StatMonitor {
122 crate::THREAD_MONITOR.turn_on();
123 let (asker,receiver) = crossbeam::channel::unbounded();
124 let (tx,rx) = crossbeam::channel::unbounded();
125 StatMonitor {
126 asker: Some(asker),
127 monitor: Some(crate::spawn_str("stat_monitor",move || {
128 let slp = std::time::Duration::new(0,300_000_000);
129 #[allow(unused_variables)]
130 let mut thr_started = 0;
131 #[allow(unused_variables)]
132 let mut thr_finished = 0;
133 let mut stat_vec = Vec::new();
134 loop {
135 match crate::THREAD_MONITOR.process() {
136 None => {
137 log::error!("thread monitor was unexpectedly turned off");
138 break;
139 },
140 Some(v) => for st in v {
141 match st {
142 ThreadStatus::Started{ state: None, .. } => {
143 thr_started += 1;
144 },
145 ThreadStatus::Started{ n, name, tm, state: Some(state) } => {
146 thr_started += 1;
147 let (watcher,agregator) = watcher(state);
148 stat_vec.push((n,Arc::new(name),tm,agregator));
149 if let Err(_) = tx.send(watcher) {
150 log::error!("thread monitor profiler was unexpectedly turned off");
151 break;
152 }
153 },
154 ThreadStatus::Finished{ n, .. } => {
155 thr_finished += 1;
156 let mut to_remove = None;
157 for (i,(svn,_,_,_)) in stat_vec.iter().enumerate() {
158 if *svn == n {
159 to_remove = Some(i);
160 }
161 }
162 if let Some(idx) = to_remove {
163 stat_vec.swap_remove(idx);
164 }
165 },
166 }
167 },
168 }
169 match receiver.try_recv() {
170 Ok(Ask::All(oneset)) => {
171 let mut iter = ThreadInfoIteratorInner::new(stat_vec.len());
172 for (_,name,_,agregator) in &mut stat_vec {
173 iter.push(name.clone(),agregator.get());
174 }
175 oneset.set(ThreadInfoIterator::new(iter));
176 },
177 Err(crossbeam::channel::TryRecvError::Empty) => {},
178 Err(crossbeam::channel::TryRecvError::Disconnected) => {
179 log::error!("stat_monitor was unexpectedly turned off");
180 break;
181 },
182 }
183
184 std::thread::sleep(slp);
185 }
186 })),
187 profiler: Some(crate::spawn_str("stat_profiler",move || {
188 let sps = 1u32 + max_samples_per_sec as u32;
189 let slp = std::time::Duration::new(0,1_000_000_000 / sps);
190 let mut to_watch = Vec::with_capacity(1024);
191 loop {
192 match rx.try_recv() {
193 Ok(watcher) => {
194 to_watch.push(watcher);
195 if sps <= 10 {
196 while let Ok(watcher) = rx.try_recv() {
197 to_watch.push(watcher);
198 }
199 }
200 },
201 Err(crossbeam::channel::TryRecvError::Empty) => {},
202 Err(crossbeam::channel::TryRecvError::Disconnected) => {
203 log::error!("thread monitor main was unexpectedly turned off");
204 break;
205 },
206 }
207
208 let mut to_remove = None;
209 for (i,w) in to_watch.iter().enumerate() {
210 if !w.process() {
211 to_remove = Some(i);
212 }
213 }
214 if let Some(idx) = to_remove {
215 to_watch.swap_remove(idx);
219 }
220
221 std::thread::sleep(slp);
222 }
223 })),
224 }
225 }
226}
227impl Drop for StatMonitor {
228 fn drop(&mut self) {
229 std::mem::drop(self.asker.take());
230 if let Some(h) = self.monitor.take() {
231 if let Err(e) = h.join() {
232 log::warn!("error joining monitor thread: {:?}",e);
233 }
234 }
235 if let Some(h) = self.profiler.take() {
236 if let Err(e) = h.join() {
237 log::warn!("error joining profiler thread: {:?}",e);
238 }
239 }
240 }
241}
242
243#[derive(Debug,Clone,Copy)]
244pub struct ThreadInfo {
245 pub payload: usize,
246 pub idle: usize,
247 pub wait: usize,
248 pub measures: usize,
249 pub interval: f64,
250}
251
252fn watcher(state: ThreadState) -> (StateWatcher,ThreadStatistics) {
253 let payload = Arc::new(AtomicUsize::new(0));
254 let idle = Arc::new(AtomicUsize::new(0));
255 let wait = Arc::new(AtomicUsize::new(0));
256 let unknown = Arc::new(AtomicUsize::new(0));
257 let measures = Arc::new(AtomicUsize::new(0));
258 let sw = StateWatcher{
259 state: state,
260 payload: payload.clone(),
261 idle: idle.clone(),
262 wait: wait.clone(),
263 unknown: unknown.clone(),
264 measures: measures.clone(),
265 };
266 let ts = ThreadStatistics {
267 payload: payload,
268 idle: idle,
269 wait: wait,
270 unknown: unknown,
271 measures: measures,
272
273 prev_payload: 0,
274 prev_idle: 0,
275 prev_wait: 0,
276 prev_unknown: 0,
277 prev_measures: 0,
278 prev_tm: Instant::now(),
279 };
280 (sw,ts)
281}
282
283struct ThreadStatistics {
284 payload: Arc<AtomicUsize>,
285 idle: Arc<AtomicUsize>,
286 wait: Arc<AtomicUsize>,
287 unknown: Arc<AtomicUsize>,
288 measures: Arc<AtomicUsize>,
289
290 prev_payload: usize,
291 prev_idle: usize,
292 prev_wait: usize,
293 prev_unknown: usize,
294 prev_measures: usize,
295 prev_tm: Instant,
296}
297
298impl ThreadStatistics {
299 fn get(&mut self) -> ThreadInfo {
300 let unk = self.unknown.load(Ordering::Relaxed);
301 let pld = self.payload.load(Ordering::Relaxed);
302 let idle = self.idle.load(Ordering::Relaxed);
303 let wait = self.wait.load(Ordering::Relaxed);
304 let ms = self.measures.load(Ordering::Relaxed);
305 let _u = unk.wrapping_sub(self.prev_unknown); self.prev_unknown = unk;
306 let p = pld.wrapping_sub(self.prev_payload); self.prev_payload = pld;
307 let i = idle.wrapping_sub(self.prev_idle); self.prev_idle = idle;
308 let w = wait.wrapping_sub(self.prev_wait); self.prev_wait = wait;
309 let m = ms.wrapping_sub(self.prev_measures); self.prev_measures = ms;
310 let t = self.prev_tm.elapsed().as_secs_f64(); self.prev_tm = Instant::now();
311 ThreadInfo {
312 payload: p,
313 idle: i,
314 wait: w,
315 measures: m,
316 interval: t,
317 }
318 }
319}
320
321
322
323struct StateWatcher {
324 state: ThreadState,
325 payload: Arc<AtomicUsize>,
326 idle: Arc<AtomicUsize>,
327 wait: Arc<AtomicUsize>,
328 unknown: Arc<AtomicUsize>,
329 measures: Arc<AtomicUsize>,
330}
331
332impl StateWatcher {
333 fn process(&self) -> bool {
334 match self.state.get() {
335 Some(s) => {
336 match s {
337 State::Init => { self.unknown.fetch_add(1,Ordering::Relaxed); },
338 State::Payload => { self.payload.fetch_add(1,Ordering::Relaxed); },
339 State::Idle => { self.idle.fetch_add(1,Ordering::Relaxed); },
340 State::Wait => { self.wait.fetch_add(1,Ordering::Relaxed); },
341 State::Done => return false,
342 }
343 self.measures.fetch_add(1,Ordering::Relaxed);
344 true
345 },
346 None => false,
347 }
348 }
349}