cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the tasks it is running.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, CuGraph, Flavor, NodeId};
6use crate::cutask::CuMsgMetadata;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10use cu29_traits::{CuError, CuResult};
11use serde_derive::{Deserialize, Serialize};
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[cfg(feature = "std")]
17use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
18
19#[cfg(not(feature = "std"))]
20use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
21
22#[cfg(not(feature = "std"))]
23mod imp {
24    pub use alloc::alloc::{GlobalAlloc, Layout};
25    pub use core::sync::atomic::{AtomicUsize, Ordering};
26    pub use libm::sqrt;
27}
28
29#[cfg(feature = "std")]
30mod imp {
31    #[cfg(feature = "memory_monitoring")]
32    use super::CountingAlloc;
33    #[cfg(feature = "memory_monitoring")]
34    pub use std::alloc::System;
35    pub use std::alloc::{GlobalAlloc, Layout};
36    pub use std::sync::atomic::{AtomicUsize, Ordering};
37    #[cfg(feature = "memory_monitoring")]
38    #[global_allocator]
39    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
40}
41
42use imp::*;
43
44/// The state of a task.
45#[derive(Debug, Serialize, Deserialize)]
46pub enum CuTaskState {
47    Start,
48    Preprocess,
49    Process,
50    Postprocess,
51    Stop,
52}
53
54/// Monitor decision to be taken when a task errored out.
55#[derive(Debug)]
56pub enum Decision {
57    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
58    Ignore, // Ignore this error and try to continue, ie calling the other tasks steps, setting a None return value and continue a copperlist.
59    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ComponentKind {
64    Task,
65    Bridge,
66}
67
68#[derive(Debug, Clone)]
69pub struct MonitorNode {
70    pub id: String,
71    pub type_name: Option<String>,
72    pub kind: ComponentKind,
73    /// Ordered list of input port identifiers.
74    pub inputs: Vec<String>,
75    /// Ordered list of output port identifiers.
76    pub outputs: Vec<String>,
77}
78
79#[derive(Debug, Clone)]
80pub struct MonitorConnection {
81    pub src: String,
82    pub src_port: Option<String>,
83    pub dst: String,
84    pub dst_port: Option<String>,
85    pub msg: String,
86}
87
88#[derive(Debug, Clone, Default)]
89pub struct MonitorTopology {
90    pub nodes: Vec<MonitorNode>,
91    pub connections: Vec<MonitorConnection>,
92}
93
94#[derive(Default, Debug, Clone, Copy)]
95struct NodeIoUsage {
96    has_incoming: bool,
97    has_outgoing: bool,
98}
99
100fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
101    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
102    edge_ids.sort();
103
104    let mut outputs = Vec::new();
105    let mut seen = Vec::new();
106    let mut port_idx = 0usize;
107    for edge_id in edge_ids {
108        let Some(edge) = graph.edge(edge_id) else {
109            continue;
110        };
111        if seen.iter().any(|msg| msg == &edge.msg) {
112            continue;
113        }
114        seen.push(edge.msg.clone());
115        let mut port_label = String::from("out");
116        port_label.push_str(&port_idx.to_string());
117        port_label.push_str(": ");
118        port_label.push_str(edge.msg.as_str());
119        outputs.push((edge.msg.clone(), port_label));
120        port_idx += 1;
121    }
122    outputs
123}
124
125/// Derive a monitor-friendly topology from the runtime configuration.
126pub fn build_monitor_topology(
127    config: &CuConfig,
128    mission: Option<&str>,
129) -> CuResult<MonitorTopology> {
130    let graph = config.get_graph(mission)?;
131    let mut nodes: Map<String, MonitorNode> = Map::new();
132    let mut io_usage: Map<String, NodeIoUsage> = Map::new();
133    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
134
135    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
136    for bridge in &config.bridges {
137        bridge_lookup.insert(bridge.id.as_str(), bridge);
138    }
139
140    for cnx in graph.edges() {
141        io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
142        io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
143    }
144
145    for (_, node) in graph.get_all_nodes() {
146        let kind = match node.get_flavor() {
147            Flavor::Bridge => ComponentKind::Bridge,
148            _ => ComponentKind::Task,
149        };
150        let node_id = node.get_id();
151
152        let mut inputs = Vec::new();
153        let mut outputs = Vec::new();
154        if kind == ComponentKind::Bridge {
155            if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
156                for ch in &bridge.channels {
157                    match ch {
158                        BridgeChannelConfigRepresentation::Rx { id, .. } => {
159                            outputs.push(id.clone())
160                        }
161                        BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
162                    }
163                }
164            }
165        } else {
166            let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
167            if usage.has_incoming || !usage.has_outgoing {
168                inputs.push("in".to_string());
169            }
170            if usage.has_outgoing {
171                if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
172                    let ports = collect_output_ports(graph, node_idx);
173                    let mut port_map: Map<String, String> = Map::new();
174                    for (msg_type, label) in ports {
175                        port_map.insert(msg_type, label.clone());
176                        outputs.push(label);
177                    }
178                    output_port_lookup.insert(node_id.clone(), port_map);
179                }
180            } else if !usage.has_incoming {
181                outputs.push("out".to_string());
182            }
183        }
184
185        nodes.insert(
186            node_id.clone(),
187            MonitorNode {
188                id: node_id,
189                type_name: Some(node.get_type().to_string()),
190                kind,
191                inputs,
192                outputs,
193            },
194        );
195    }
196
197    let mut connections = Vec::new();
198    for cnx in graph.edges() {
199        let src = cnx.src.clone();
200        let dst = cnx.dst.clone();
201
202        let src_port = cnx.src_channel.clone().or_else(|| {
203            output_port_lookup
204                .get(&src)
205                .and_then(|ports| ports.get(&cnx.msg).cloned())
206                .or_else(|| {
207                    nodes
208                        .get(&src)
209                        .and_then(|node| node.outputs.first().cloned())
210                })
211        });
212        let dst_port = cnx.dst_channel.clone().or_else(|| {
213            nodes
214                .get(&dst)
215                .and_then(|node| node.inputs.first().cloned())
216        });
217
218        connections.push(MonitorConnection {
219            src,
220            src_port,
221            dst,
222            dst_port,
223            msg: cnx.msg.clone(),
224        });
225    }
226
227    Ok(MonitorTopology {
228        nodes: nodes.into_values().collect(),
229        connections,
230    })
231}
232
233/// Trait to implement a monitoring task.
234pub trait CuMonitor: Sized {
235    fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
236    where
237        Self: Sized;
238
239    fn set_topology(&mut self, _topology: MonitorTopology) {}
240
241    fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
242        Ok(())
243    }
244
245    /// Callback that will be trigger at the end of every copperlist (before, on or after the serialization).
246    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
247
248    /// Callbacked when a Task errored out. The runtime requires an immediate decision.
249    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
250
251    /// Callbacked when copper is stopping.
252    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
253        Ok(())
254    }
255}
256
257/// A do nothing monitor if no monitor is provided.
258/// This is basically defining the default behavior of Copper in case of error.
259pub struct NoMonitor {}
260impl CuMonitor for NoMonitor {
261    fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
262        Ok(NoMonitor {})
263    }
264
265    fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
266        // By default, do nothing.
267        Ok(())
268    }
269
270    fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
271        // By default, just try to continue.
272        Decision::Ignore
273    }
274}
275
276/// A simple allocator that counts the number of bytes allocated and deallocated.
277pub struct CountingAlloc<A: GlobalAlloc> {
278    inner: A,
279    allocated: AtomicUsize,
280    deallocated: AtomicUsize,
281}
282
283impl<A: GlobalAlloc> CountingAlloc<A> {
284    pub const fn new(inner: A) -> Self {
285        CountingAlloc {
286            inner,
287            allocated: AtomicUsize::new(0),
288            deallocated: AtomicUsize::new(0),
289        }
290    }
291
292    pub fn allocated(&self) -> usize {
293        self.allocated.load(Ordering::SeqCst)
294    }
295
296    pub fn deallocated(&self) -> usize {
297        self.deallocated.load(Ordering::SeqCst)
298    }
299
300    pub fn reset(&self) {
301        self.allocated.store(0, Ordering::SeqCst);
302        self.deallocated.store(0, Ordering::SeqCst);
303    }
304}
305
306unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
307    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
308        let p = unsafe { self.inner.alloc(layout) };
309        if !p.is_null() {
310            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
311        }
312        p
313    }
314
315    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
316        unsafe {
317            self.inner.dealloc(ptr, layout);
318        }
319        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
320    }
321}
322
323/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
324#[cfg(feature = "memory_monitoring")]
325pub struct ScopedAllocCounter {
326    bf_allocated: usize,
327    bf_deallocated: usize,
328}
329
330#[cfg(feature = "memory_monitoring")]
331impl Default for ScopedAllocCounter {
332    fn default() -> Self {
333        Self::new()
334    }
335}
336
337#[cfg(feature = "memory_monitoring")]
338impl ScopedAllocCounter {
339    pub fn new() -> Self {
340        ScopedAllocCounter {
341            bf_allocated: GLOBAL.allocated(),
342            bf_deallocated: GLOBAL.deallocated(),
343        }
344    }
345
346    /// Returns the total number of bytes allocated in the current scope
347    /// since the creation of this `ScopedAllocCounter`.
348    ///
349    /// # Example
350    /// ```
351    /// use cu29_runtime::monitoring::ScopedAllocCounter;
352    ///
353    /// let counter = ScopedAllocCounter::new();
354    /// let _vec = vec![0u8; 1024];
355    /// println!("Bytes allocated: {}", counter.get_allocated());
356    /// ```
357    pub fn allocated(&self) -> usize {
358        GLOBAL.allocated() - self.bf_allocated
359    }
360
361    /// Returns the total number of bytes deallocated in the current scope
362    /// since the creation of this `ScopedAllocCounter`.
363    ///
364    /// # Example
365    /// ```
366    /// use cu29_runtime::monitoring::ScopedAllocCounter;
367    ///
368    /// let counter = ScopedAllocCounter::new();
369    /// let _vec = vec![0u8; 1024];
370    /// drop(_vec);
371    /// println!("Bytes deallocated: {}", counter.get_deallocated());
372    /// ```
373    pub fn deallocated(&self) -> usize {
374        GLOBAL.deallocated() - self.bf_deallocated
375    }
376}
377
378/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
379#[cfg(feature = "memory_monitoring")]
380impl Drop for ScopedAllocCounter {
381    fn drop(&mut self) {
382        let _allocated = GLOBAL.allocated() - self.bf_allocated;
383        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
384        // TODO(gbin): Fix this when the logger is ready.
385        // debug!(
386        //     "Allocations: +{}B -{}B",
387        //     allocated = allocated,
388        //     deallocated = deallocated,
389        // );
390    }
391}
392
393#[cfg(feature = "std")]
394const BUCKET_COUNT: usize = 1024;
395#[cfg(not(feature = "std"))]
396const BUCKET_COUNT: usize = 256;
397
398/// Accumulative stat object that can give your some real time statistics.
399/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
400#[derive(Debug, Clone)]
401pub struct LiveStatistics {
402    buckets: [u64; BUCKET_COUNT],
403    min_val: u64,
404    max_val: u64,
405    sum: u64,
406    sum_sq: u64,
407    count: u64,
408    max_value: u64,
409}
410
411impl LiveStatistics {
412    /// Creates a new `LiveStatistics` instance with a specified maximum value.
413    ///
414    /// This function initializes a `LiveStatistics` structure with default values
415    /// for tracking statistical data, while setting an upper limit for the data
416    /// points that the structure tracks.
417    ///
418    /// # Parameters
419    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
420    ///
421    /// # Returns
422    /// A new instance of `LiveStatistics` with:
423    /// - `buckets`: An array pre-filled with zeros to categorize data points.
424    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
425    /// - `max_val`: Initialized to zero.
426    /// - `sum`: The sum of all data points, initialized to zero.
427    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
428    /// - `count`: The total number of data points, initialized to zero.
429    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
430    ///
431    pub fn new_with_max(max_value: u64) -> Self {
432        LiveStatistics {
433            buckets: [0; BUCKET_COUNT],
434            min_val: u64::MAX,
435            max_val: 0,
436            sum: 0,
437            sum_sq: 0,
438            count: 0,
439            max_value,
440        }
441    }
442
443    #[inline]
444    fn value_to_bucket(&self, value: u64) -> usize {
445        if value >= self.max_value {
446            BUCKET_COUNT - 1
447        } else {
448            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
449        }
450    }
451
452    #[inline]
453    pub fn min(&self) -> u64 {
454        if self.count == 0 { 0 } else { self.min_val }
455    }
456
457    #[inline]
458    pub fn max(&self) -> u64 {
459        self.max_val
460    }
461
462    #[inline]
463    pub fn mean(&self) -> f64 {
464        if self.count == 0 {
465            0.0
466        } else {
467            self.sum as f64 / self.count as f64
468        }
469    }
470
471    #[inline]
472    pub fn stdev(&self) -> f64 {
473        if self.count == 0 {
474            return 0.0;
475        }
476        let mean = self.mean();
477        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
478        if variance < 0.0 {
479            return 0.0;
480        }
481        #[cfg(feature = "std")]
482        return variance.sqrt();
483        #[cfg(not(feature = "std"))]
484        return sqrt(variance);
485    }
486
487    #[inline]
488    pub fn percentile(&self, percentile: f64) -> u64 {
489        if self.count == 0 {
490            return 0;
491        }
492
493        let target_count = (self.count as f64 * percentile) as u64;
494        let mut accumulated = 0u64;
495
496        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
497            accumulated += bucket_count;
498            if accumulated >= target_count {
499                // Linear interpolation within the bucket
500                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
501                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
502                let bucket_fraction = if bucket_count > 0 {
503                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
504                } else {
505                    0.5
506                };
507                return bucket_start
508                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
509            }
510        }
511
512        self.max_val
513    }
514
515    /// Adds a value to the statistics.
516    #[inline]
517    pub fn record(&mut self, value: u64) {
518        if value < self.min_val {
519            self.min_val = value;
520        }
521        if value > self.max_val {
522            self.max_val = value;
523        }
524        self.sum += value;
525        self.sum_sq += value * value;
526        self.count += 1;
527
528        let bucket = self.value_to_bucket(value);
529        self.buckets[bucket] += 1;
530    }
531
532    #[inline]
533    pub fn len(&self) -> u64 {
534        self.count
535    }
536
537    #[inline]
538    pub fn is_empty(&self) -> bool {
539        self.count == 0
540    }
541
542    #[inline]
543    pub fn reset(&mut self) {
544        self.buckets.fill(0);
545        self.min_val = u64::MAX;
546        self.max_val = 0;
547        self.sum = 0;
548        self.sum_sq = 0;
549        self.count = 0;
550    }
551}
552
553/// A Specialized statistics object for CuDuration.
554/// It will also keep track of the jitter between the values.
555#[derive(Debug, Clone)]
556pub struct CuDurationStatistics {
557    bare: LiveStatistics,
558    jitter: LiveStatistics,
559    last_value: CuDuration,
560}
561
562impl CuDurationStatistics {
563    pub fn new(max: CuDuration) -> Self {
564        let CuDuration(max) = max;
565        CuDurationStatistics {
566            bare: LiveStatistics::new_with_max(max),
567            jitter: LiveStatistics::new_with_max(max),
568            last_value: CuDuration::default(),
569        }
570    }
571
572    #[inline]
573    pub fn min(&self) -> CuDuration {
574        CuDuration(self.bare.min())
575    }
576
577    #[inline]
578    pub fn max(&self) -> CuDuration {
579        CuDuration(self.bare.max())
580    }
581
582    #[inline]
583    pub fn mean(&self) -> CuDuration {
584        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
585    }
586
587    #[inline]
588    pub fn percentile(&self, percentile: f64) -> CuDuration {
589        CuDuration(self.bare.percentile(percentile))
590    }
591
592    #[inline]
593    pub fn stddev(&self) -> CuDuration {
594        CuDuration(self.bare.stdev() as u64)
595    }
596
597    #[inline]
598    pub fn len(&self) -> u64 {
599        self.bare.len()
600    }
601
602    #[inline]
603    pub fn is_empty(&self) -> bool {
604        self.bare.len() == 0
605    }
606
607    #[inline]
608    pub fn jitter_min(&self) -> CuDuration {
609        CuDuration(self.jitter.min())
610    }
611
612    #[inline]
613    pub fn jitter_max(&self) -> CuDuration {
614        CuDuration(self.jitter.max())
615    }
616
617    #[inline]
618    pub fn jitter_mean(&self) -> CuDuration {
619        CuDuration(self.jitter.mean() as u64)
620    }
621
622    #[inline]
623    pub fn jitter_stddev(&self) -> CuDuration {
624        CuDuration(self.jitter.stdev() as u64)
625    }
626
627    #[inline]
628    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
629        CuDuration(self.jitter.percentile(percentile))
630    }
631
632    #[inline]
633    pub fn record(&mut self, value: CuDuration) {
634        let CuDuration(nanos) = value;
635        if self.bare.is_empty() {
636            self.bare.record(nanos);
637            self.last_value = value;
638            return;
639        }
640        self.bare.record(nanos);
641        let CuDuration(last_nanos) = self.last_value;
642        self.jitter.record(nanos.abs_diff(last_nanos));
643        self.last_value = value;
644    }
645
646    #[inline]
647    pub fn reset(&mut self) {
648        self.bare.reset();
649        self.jitter.reset();
650    }
651}
652
653#[cfg(test)]
654mod tests {
655    use super::*;
656
657    #[test]
658    fn test_live_statistics_percentiles() {
659        let mut stats = LiveStatistics::new_with_max(1000);
660
661        // Record 100 values from 0 to 99
662        for i in 0..100 {
663            stats.record(i);
664        }
665
666        assert_eq!(stats.len(), 100);
667        assert_eq!(stats.min(), 0);
668        assert_eq!(stats.max(), 99);
669        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
670
671        // Test percentiles - should be approximately correct
672        let p50 = stats.percentile(0.5);
673        let p90 = stats.percentile(0.90);
674        let p95 = stats.percentile(0.95);
675        let p99 = stats.percentile(0.99);
676
677        // With 100 samples from 0-99, percentiles should be close to their index
678        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
679        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
680        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
681        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
682    }
683
684    #[test]
685    fn test_duration_stats() {
686        let mut stats = CuDurationStatistics::new(CuDuration(1000));
687        stats.record(CuDuration(100));
688        stats.record(CuDuration(200));
689        stats.record(CuDuration(500));
690        stats.record(CuDuration(400));
691        assert_eq!(stats.min(), CuDuration(100));
692        assert_eq!(stats.max(), CuDuration(500));
693        assert_eq!(stats.mean(), CuDuration(300));
694        assert_eq!(stats.len(), 4);
695        assert_eq!(stats.jitter.len(), 3);
696        assert_eq!(stats.jitter_min(), CuDuration(100));
697        assert_eq!(stats.jitter_max(), CuDuration(300));
698        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
699        stats.reset();
700        assert_eq!(stats.len(), 0);
701    }
702}