thread_monitor/monitor/
stat_profiler.rs

1use std::{
2    sync::{
3        Arc,
4        atomic::{AtomicUsize,Ordering},
5    },
6    time::Instant,
7};
8
9use crate::{
10    ThreadState, ThreadStatus, State,
11};
12
13use oneshot::OneSet;
14
15enum Ask {
16    All(OneSet<ThreadInfoIterator>),
17}
18
19pub struct ThreadInfoIterator {
20    data: ThreadInfoIteratorInner,
21}
22impl ThreadInfoIterator {
23    fn new(data: ThreadInfoIteratorInner) -> ThreadInfoIterator {
24        ThreadInfoIterator { data }
25    }
26}
27
28impl Iterator for ThreadInfoIterator {
29    type Item = ThreadInfoItem;
30
31    fn next(&mut self) -> Option<Self::Item> {
32        match &mut self.data {
33            ThreadInfoIteratorInner::Static(ar) => {
34                for it in ar {
35                    let r = it.take();
36                    if r.is_some() { return r; }
37                }
38                None
39            },
40            ThreadInfoIteratorInner::Vec(v) => v.pop(),
41        }
42    }
43}
44
45
46#[derive(Debug,Clone)]
47pub struct ThreadInfoItem {
48    name: Arc<String>,
49    info: ThreadInfo,
50}
51impl ThreadInfoItem {
52    pub fn name(&self) -> &str {
53        self.name.as_ref()
54    }
55    pub fn info(&self) -> ThreadInfo {
56        self.info
57    }
58}
59    
60
61enum ThreadInfoIteratorInner {
62    Static([Option<ThreadInfoItem>; 16]),
63    Vec(Vec<ThreadInfoItem>)
64}
65impl ThreadInfoIteratorInner {
66    fn new(sz: usize) -> ThreadInfoIteratorInner {
67        if sz > 16 {
68            ThreadInfoIteratorInner::Static([None, None, None, None, None, None, None, None, None, None, None, None, None, None, None, None])
69        } else {
70            ThreadInfoIteratorInner::Vec(Vec::with_capacity(sz))
71        }
72    }
73    fn push(&mut self, name: Arc<String>, info: ThreadInfo) {
74        let item = ThreadInfoItem { name, info };
75        match self {
76            ThreadInfoIteratorInner::Static(ar) => match ar.len() < 16 {
77                true => { ar[ar.len()] = Some(item); },
78                false => {
79                    let mut v = Vec::with_capacity(32);
80                    for it in ar {
81                        if let Some(it) = it.take() {
82                            v.push(it);
83                        }
84                    }
85                    v.push(item);
86                    *self = ThreadInfoIteratorInner::Vec(v);
87                },
88            },
89            ThreadInfoIteratorInner::Vec(v) => v.push(item),
90        };
91    }
92}
93
94
95pub struct StatMonitor {
96    asker: Option<crossbeam::channel::Sender<Ask>>,
97    monitor: Option<std::thread::JoinHandle<()>>,
98    profiler: Option<std::thread::JoinHandle<()>>,
99}
100impl StatMonitor {
101    pub fn new() -> StatMonitor {
102        StatMonitor::create(100)
103    }
104    pub fn with_sample_rate(max_samples_per_sec: u16) -> StatMonitor {
105        StatMonitor::create(max_samples_per_sec)
106    }
107
108    pub fn get_info(&self) -> Option<ThreadInfoIterator> {
109        match &self.asker {
110            None => None,
111            Some(sender) => {
112                let (tx,rx) = oneshot::oneshot();
113                match sender.send(Ask::All(tx)) {
114                    Err(_) => None,
115                    Ok(()) => rx.wait(),
116                }
117            },
118        }
119    }
120    
121    fn create(max_samples_per_sec: u16) -> StatMonitor {
122        crate::THREAD_MONITOR.turn_on();
123        let (asker,receiver) = crossbeam::channel::unbounded();
124        let (tx,rx) = crossbeam::channel::unbounded();
125        StatMonitor {
126            asker: Some(asker),
127            monitor: Some(crate::spawn_str("stat_monitor",move || {              
128                let slp = std::time::Duration::new(0,300_000_000);
129                #[allow(unused_variables)]
130                let mut thr_started = 0;
131                #[allow(unused_variables)]
132                let mut thr_finished = 0;
133                let mut stat_vec = Vec::new();
134                loop {
135                    match crate::THREAD_MONITOR.process() {
136                        None => {
137                            log::error!("thread monitor was unexpectedly turned off");
138                            break;
139                        },
140                        Some(v) => for st in v {
141                            match st {
142                                ThreadStatus::Started{ state: None, .. } => {
143                                    thr_started += 1;
144                                },
145                                ThreadStatus::Started{ n, name, tm, state: Some(state) } => {
146                                    thr_started += 1;
147                                    let (watcher,agregator) = watcher(state);
148                                    stat_vec.push((n,Arc::new(name),tm,agregator));
149                                    if let Err(_) = tx.send(watcher) {
150                                        log::error!("thread monitor profiler was unexpectedly turned off");
151                                        break;
152                                    }
153                                },
154                                ThreadStatus::Finished{ n, .. } => {
155                                    thr_finished += 1;
156                                    let mut to_remove = None;
157                                    for (i,(svn,_,_,_)) in stat_vec.iter().enumerate() {
158                                        if *svn == n {
159                                            to_remove = Some(i);
160                                        }
161                                    }
162                                    if let Some(idx) = to_remove {
163                                        stat_vec.swap_remove(idx);
164                                    }
165                                },
166                            }
167                        },
168                    }
169                    match receiver.try_recv() {
170                        Ok(Ask::All(oneset)) => {
171                            let mut iter = ThreadInfoIteratorInner::new(stat_vec.len());
172                            for (_,name,_,agregator) in &mut stat_vec {
173                                iter.push(name.clone(),agregator.get());
174                            }
175                            oneset.set(ThreadInfoIterator::new(iter));
176                        },
177                        Err(crossbeam::channel::TryRecvError::Empty) => {},
178                        Err(crossbeam::channel::TryRecvError::Disconnected) => {
179                            log::error!("stat_monitor was unexpectedly turned off");
180                            break;
181                        },
182                    }
183                    
184                    std::thread::sleep(slp);
185                }
186            })),
187            profiler: Some(crate::spawn_str("stat_profiler",move || {
188                let sps = 1u32 + max_samples_per_sec as u32;
189                let slp = std::time::Duration::new(0,1_000_000_000 / sps);
190                let mut to_watch = Vec::with_capacity(1024);
191                loop {
192                    match rx.try_recv() {
193                        Ok(watcher) => {
194                            to_watch.push(watcher);
195                            if sps <= 10 {
196                                while let Ok(watcher) = rx.try_recv() {
197                                    to_watch.push(watcher);
198                                }
199                            }
200                        },
201                        Err(crossbeam::channel::TryRecvError::Empty) => {},
202                        Err(crossbeam::channel::TryRecvError::Disconnected) => {
203                            log::error!("thread monitor main was unexpectedly turned off");
204                            break;
205                        },
206                    }
207
208                    let mut to_remove = None;
209                    for (i,w) in to_watch.iter().enumerate() {
210                        if !w.process() {
211                            to_remove = Some(i);
212                        }
213                    }
214                    if let Some(idx) = to_remove {
215                        // removing rate may be too slow if:
216                        //    1) sps is lower then 10
217                        //    2) spawn rate is higher then sps 
218                        to_watch.swap_remove(idx);
219                    }
220
221                    std::thread::sleep(slp);
222                }
223            })),
224        }
225    }
226}
227impl Drop for StatMonitor {
228    fn drop(&mut self) {
229        std::mem::drop(self.asker.take());
230        if let Some(h) = self.monitor.take() {
231            if let Err(e) = h.join() {
232                log::warn!("error joining monitor thread: {:?}",e);
233            }
234        }
235        if let Some(h) = self.profiler.take() {
236            if let Err(e) = h.join() {
237                log::warn!("error joining profiler thread: {:?}",e);
238            }
239        }
240    }
241}
242
243#[derive(Debug,Clone,Copy)]
244pub struct ThreadInfo {
245    pub payload: usize,
246    pub idle: usize,
247    pub wait: usize,
248    pub measures: usize,
249    pub interval: f64,
250}
251
252fn watcher(state: ThreadState) -> (StateWatcher,ThreadStatistics) {
253    let payload = Arc::new(AtomicUsize::new(0));
254    let idle = Arc::new(AtomicUsize::new(0));
255    let wait = Arc::new(AtomicUsize::new(0));
256    let unknown = Arc::new(AtomicUsize::new(0));
257    let measures = Arc::new(AtomicUsize::new(0));
258    let sw = StateWatcher{
259        state: state,
260        payload: payload.clone(),
261        idle: idle.clone(),
262        wait: wait.clone(),
263        unknown: unknown.clone(),
264        measures: measures.clone(),
265    };
266    let ts = ThreadStatistics {
267        payload: payload,
268        idle: idle,
269        wait: wait,
270        unknown: unknown,
271        measures: measures,
272
273        prev_payload: 0,
274        prev_idle: 0,
275        prev_wait: 0,
276        prev_unknown: 0,
277        prev_measures: 0,
278        prev_tm: Instant::now(),
279    };
280    (sw,ts)
281}
282
283struct ThreadStatistics {
284    payload: Arc<AtomicUsize>,
285    idle: Arc<AtomicUsize>,
286    wait: Arc<AtomicUsize>,
287    unknown: Arc<AtomicUsize>,
288    measures: Arc<AtomicUsize>,
289
290    prev_payload: usize,
291    prev_idle: usize,
292    prev_wait: usize,
293    prev_unknown: usize,
294    prev_measures: usize,
295    prev_tm: Instant,
296}
297
298impl ThreadStatistics {
299    fn get(&mut self) -> ThreadInfo {
300        let unk = self.unknown.load(Ordering::Relaxed);
301        let pld = self.payload.load(Ordering::Relaxed);
302        let idle = self.idle.load(Ordering::Relaxed);
303        let wait = self.wait.load(Ordering::Relaxed);
304        let ms = self.measures.load(Ordering::Relaxed);
305        let _u = unk.wrapping_sub(self.prev_unknown); self.prev_unknown = unk;
306        let p = pld.wrapping_sub(self.prev_payload);  self.prev_payload = pld;
307        let i = idle.wrapping_sub(self.prev_idle);    self.prev_idle = idle;
308        let w = wait.wrapping_sub(self.prev_wait);    self.prev_wait = wait;
309        let m = ms.wrapping_sub(self.prev_measures);  self.prev_measures = ms;
310        let t = self.prev_tm.elapsed().as_secs_f64(); self.prev_tm = Instant::now();
311        ThreadInfo {
312            payload: p,
313            idle: i,
314            wait: w,
315            measures: m,
316            interval: t,
317        }
318    }
319}
320
321
322
323struct StateWatcher {
324    state: ThreadState,
325    payload: Arc<AtomicUsize>,
326    idle: Arc<AtomicUsize>,
327    wait: Arc<AtomicUsize>,
328    unknown: Arc<AtomicUsize>,
329    measures: Arc<AtomicUsize>,
330}
331
332impl StateWatcher {
333    fn process(&self) -> bool {
334        match self.state.get() {
335            Some(s) => {    
336                match s {
337                    State::Init => { self.unknown.fetch_add(1,Ordering::Relaxed); },    
338                    State::Payload => { self.payload.fetch_add(1,Ordering::Relaxed); },
339                    State::Idle => { self.idle.fetch_add(1,Ordering::Relaxed); },
340                    State::Wait => { self.wait.fetch_add(1,Ordering::Relaxed); },
341                    State::Done => return false,
342                }
343                self.measures.fetch_add(1,Ordering::Relaxed);
344                true
345            },
346            None => false,
347        }
348    }
349}