cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::cutask::CuMsgMetadata;
6use crate::log::*;
7use cu29_clock::{CuDuration, RobotClock};
8use cu29_traits::{CuError, CuResult};
9use hdrhistogram::Histogram;
10use serde_derive::{Deserialize, Serialize};
11use std::alloc::{GlobalAlloc, Layout, System};
12use std::sync::atomic::{AtomicUsize, Ordering};
13
14/// The state of a task.
15#[derive(Debug, Serialize, Deserialize)]
16pub enum CuTaskState {
17    Start,
18    Preprocess,
19    Process,
20    Postprocess,
21    Stop,
22}
23
24/// Monitor decision to be taken when a task errored out.
25#[derive(Debug)]
26pub enum Decision {
27    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
28    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
29    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
30}
31
32/// Trait to implement a monitoring task.
33pub trait CuMonitor: Sized {
34    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
35    where
36        Self: Sized;
37
38    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
39        Ok(())
40    }
41
42    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
43    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
44
45    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
46    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
47
48    /// Callbacked when copper is stopping.
49    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
50        Ok(())
51    }
52}
53
54/// A do nothing monitor if no monitor is provided.
55/// This is basically defining the default behavior of Copper in case of error.
56pub struct NoMonitor {}
57impl CuMonitor for NoMonitor {
58    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
59        Ok(NoMonitor {})
60    }
61
62    fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
63        // By default, do nothing.
64        Ok(())
65    }
66
67    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
68        // By default, just try to continue.
69        Decision::Ignore
70    }
71}
72
73#[global_allocator]
74pub static GLOBAL: CountingAllocator = CountingAllocator::new();
75
76/// A simple allocator that counts the number of bytes allocated and deallocated.
77pub struct CountingAllocator {
78    allocated: AtomicUsize,
79    deallocated: AtomicUsize,
80}
81
82impl Default for CountingAllocator {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl CountingAllocator {
89    pub const fn new() -> Self {
90        CountingAllocator {
91            allocated: AtomicUsize::new(0),
92            deallocated: AtomicUsize::new(0),
93        }
94    }
95
96    pub fn get_allocated(&self) -> usize {
97        self.allocated.load(Ordering::SeqCst)
98    }
99
100    pub fn get_deallocated(&self) -> usize {
101        self.deallocated.load(Ordering::SeqCst)
102    }
103
104    pub fn reset(&self) {
105        self.allocated.store(0, Ordering::SeqCst);
106        self.deallocated.store(0, Ordering::SeqCst);
107    }
108}
109
110unsafe impl GlobalAlloc for CountingAllocator {
111    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
112        let ptr = System.alloc(layout);
113        if !ptr.is_null() {
114            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
115        }
116        ptr
117    }
118
119    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
120        System.dealloc(ptr, layout);
121        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
122    }
123}
124
125/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
126pub struct ScopedAllocCounter {
127    bf_allocated: usize,
128    bf_deallocated: usize,
129}
130
131impl Default for ScopedAllocCounter {
132    fn default() -> Self {
133        Self::new()
134    }
135}
136
137impl ScopedAllocCounter {
138    pub fn new() -> Self {
139        ScopedAllocCounter {
140            bf_allocated: GLOBAL.get_allocated(),
141            bf_deallocated: GLOBAL.get_deallocated(),
142        }
143    }
144
145    /// Returns the total number of bytes allocated in the current scope
146    /// since the creation of this `ScopedAllocCounter`.
147    ///
148    /// # Example
149    /// ```
150    /// use cu29_runtime::monitoring::ScopedAllocCounter;
151    ///
152    /// let counter = ScopedAllocCounter::new();
153    /// let _vec = vec![0u8; 1024];
154    /// println!("Bytes allocated: {}", counter.get_allocated());
155    /// ```
156    pub fn get_allocated(&self) -> usize {
157        GLOBAL.get_allocated() - self.bf_allocated
158    }
159
160    /// Returns the total number of bytes deallocated in the current scope
161    /// since the creation of this `ScopedAllocCounter`.
162    ///
163    /// # Example
164    /// ```
165    /// use cu29_runtime::monitoring::ScopedAllocCounter;
166    ///
167    /// let counter = ScopedAllocCounter::new();
168    /// let _vec = vec![0u8; 1024];
169    /// drop(_vec);
170    /// println!("Bytes deallocated: {}", counter.get_deallocated());
171    /// ```
172    pub fn get_deallocated(&self) -> usize {
173        GLOBAL.get_deallocated() - self.bf_deallocated
174    }
175}
176
177/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
178impl Drop for ScopedAllocCounter {
179    fn drop(&mut self) {
180        let _allocated = GLOBAL.get_allocated() - self.bf_allocated;
181        let _deallocated = GLOBAL.get_deallocated() - self.bf_deallocated;
182        // TODO(gbin): Fix this when the logger is ready.
183        // debug!(
184        //     "Allocations: +{}B -{}B",
185        //     allocated = allocated,
186        //     deallocated = deallocated,
187        // );
188    }
189}
190
191/// Accumulative stat object that can give your some real time statistics.
192#[derive(Debug, Clone)]
193pub struct LiveStatistics {
194    stats: Histogram<u64>, // u64 is the Counter type.
195}
196
197impl LiveStatistics {
198    pub fn new_unbounded() -> Self {
199        LiveStatistics {
200            stats: Histogram::<u64>::new(3).unwrap(),
201        }
202    }
203
204    pub fn new_with_max(max: u64) -> Self {
205        LiveStatistics {
206            stats: Histogram::<u64>::new_with_bounds(1, max, 3).unwrap(),
207        }
208    }
209
210    #[inline]
211    pub fn min(&self) -> u64 {
212        self.stats.min()
213    }
214
215    #[inline]
216    pub fn max(&self) -> u64 {
217        self.stats.max()
218    }
219
220    #[inline]
221    pub fn mean(&self) -> f64 {
222        self.stats.mean()
223    }
224
225    #[inline]
226    pub fn percentile(&self, percentile: f64) -> u64 {
227        self.stats.value_at_quantile(percentile)
228    }
229
230    /// Adds a value to the statistics.
231    #[inline]
232    pub fn record(&mut self, value: u64) {
233        let maybe_error = self.stats.record(value);
234        if let Err(e) = maybe_error {
235            debug!("stats.record errored out: {}", e.to_string());
236        }
237    }
238
239    #[inline]
240    pub fn len(&self) -> u64 {
241        self.stats.len()
242    }
243
244    #[inline]
245    pub fn is_empty(&self) -> bool {
246        self.stats.len() == 0
247    }
248
249    #[inline]
250    pub fn reset(&mut self) {
251        self.stats.reset();
252    }
253}
254
255/// A Specialized statistics object for CuDuration.
256/// It will also keep track of the jitter between the values.
257#[derive(Debug, Clone)]
258pub struct CuDurationStatistics {
259    bare: LiveStatistics,
260    jitter: LiveStatistics,
261    last_value: CuDuration,
262}
263
264impl CuDurationStatistics {
265    pub fn new(max: CuDuration) -> Self {
266        CuDurationStatistics {
267            bare: LiveStatistics::new_with_max(max.0),
268            jitter: LiveStatistics::new_with_max(max.0),
269            last_value: CuDuration::default(),
270        }
271    }
272
273    #[inline]
274    pub fn min(&self) -> CuDuration {
275        CuDuration(self.bare.min())
276    }
277
278    #[inline]
279    pub fn max(&self) -> CuDuration {
280        CuDuration(self.bare.max())
281    }
282
283    #[inline]
284    pub fn mean(&self) -> CuDuration {
285        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
286    }
287
288    #[inline]
289    pub fn percentile(&self, percentile: f64) -> CuDuration {
290        CuDuration(self.bare.percentile(percentile))
291    }
292
293    #[inline]
294    pub fn stddev(&self) -> CuDuration {
295        CuDuration(self.bare.stats.stdev() as u64)
296    }
297
298    #[inline]
299    pub fn len(&self) -> u64 {
300        self.bare.len()
301    }
302
303    #[inline]
304    pub fn is_empty(&self) -> bool {
305        self.bare.len() == 0
306    }
307
308    #[inline]
309    pub fn jitter_min(&self) -> CuDuration {
310        CuDuration(self.jitter.min())
311    }
312
313    #[inline]
314    pub fn jitter_max(&self) -> CuDuration {
315        CuDuration(self.jitter.max())
316    }
317
318    #[inline]
319    pub fn jitter_mean(&self) -> CuDuration {
320        CuDuration(self.jitter.mean() as u64)
321    }
322
323    #[inline]
324    pub fn jitter_stddev(&self) -> CuDuration {
325        CuDuration(self.jitter.stats.stdev() as u64)
326    }
327
328    #[inline]
329    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
330        CuDuration(self.jitter.percentile(percentile))
331    }
332
333    #[inline]
334    pub fn record(&mut self, value: CuDuration) {
335        if self.bare.is_empty() {
336            self.bare.stats.record(value.0).unwrap();
337            self.last_value = value;
338            return;
339        }
340        self.bare.stats.record(value.0).unwrap();
341        self.jitter
342            .stats
343            .record(value.0.abs_diff(self.last_value.0))
344            .unwrap();
345        self.last_value = value;
346    }
347
348    #[inline]
349    pub fn reset(&mut self) {
350        self.bare.reset();
351        self.jitter.reset();
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_live_statistics() {
361        let mut stats = LiveStatistics::new_unbounded();
362        stats.record(1);
363        stats.record(2);
364        stats.record(3);
365        stats.record(4);
366        stats.record(5);
367        assert_eq!(stats.min(), 1);
368        assert_eq!(stats.max(), 5);
369        assert_eq!(stats.mean(), 3.0);
370        assert_eq!(stats.percentile(0.5), 3);
371        assert_eq!(stats.percentile(0.90), 5);
372        assert_eq!(stats.percentile(0.99), 5);
373        assert_eq!(stats.len(), 5);
374        stats.reset();
375        assert_eq!(stats.len(), 0);
376    }
377
378    #[test]
379    fn test_duration_stats() {
380        let mut stats = CuDurationStatistics::new(CuDuration(100));
381        stats.record(CuDuration(100));
382        stats.record(CuDuration(200));
383        stats.record(CuDuration(500));
384        stats.record(CuDuration(400));
385        assert_eq!(stats.min(), CuDuration(100));
386        assert_eq!(stats.max(), CuDuration(500));
387        assert_eq!(stats.mean(), CuDuration(300));
388        assert_eq!(stats.percentile(0.5), CuDuration(200));
389        assert_eq!(stats.percentile(0.90), CuDuration(500));
390        assert_eq!(stats.percentile(0.99), CuDuration(500));
391        assert_eq!(stats.len(), 4);
392        assert_eq!(stats.jitter.len(), 3);
393        assert_eq!(stats.jitter_min(), CuDuration(100));
394        assert_eq!(stats.jitter_max(), CuDuration(300));
395        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
396        assert_eq!(stats.jitter_percentile(0.5), CuDuration(100));
397        assert_eq!(stats.jitter_percentile(0.90), CuDuration(300));
398        assert_eq!(stats.jitter_percentile(0.99), CuDuration(300));
399        stats.reset();
400        assert_eq!(stats.len(), 0);
401    }
402}