Skip to main content

cu_logmon/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3//! Lightweight Copper monitor that periodically dumps compact runtime stats over the
4//! standard `debug!` / `info!` logging macros.
5//!
6//! This monitor is `no_std` friendly and keeps allocations to a minimum while still
7//! reporting a per-second summary of the Copperlist cadence and latencies.
8
9extern crate alloc;
10
11use alloc::format;
12use alloc::vec;
13use alloc::vec::Vec;
14use cu29::prelude::*;
15#[cfg(all(feature = "std", debug_assertions))]
16use cu29_log_runtime::{
17    format_message_only, register_live_log_listener, unregister_live_log_listener,
18};
19use spin::Mutex;
20#[cfg(all(feature = "std", debug_assertions))]
21use std::collections::HashMap;
22
23const REPORT_INTERVAL_SECS: u64 = 1;
24const MAX_LATENCY_SECS: u64 = 5;
25
26#[cfg(all(feature = "std", debug_assertions))]
27fn format_timestamp(time: CuTime) -> String {
28    // Render CuTime (nanoseconds from an epoch) as HH:mm:ss.xxxx where xxxx is 1e-4 s.
29    let nanos = time.as_nanos();
30    let total_seconds = nanos / 1_000_000_000;
31    let hours = total_seconds / 3600;
32    let minutes = (total_seconds / 60) % 60;
33    let seconds = total_seconds % 60;
34    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000; // 4 fractional digits.
35    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
36}
37
38struct WindowState {
39    total_copperlists: u64,
40    window_copperlists: u32,
41    last_report_at: Option<CuTime>,
42    last_log_duration: CuDuration,
43    end_to_end: CuDurationStatistics,
44    per_task: Vec<CuDurationStatistics>,
45}
46
47impl WindowState {
48    fn new(task_count: usize) -> Self {
49        let max_sample = CuDuration::from_secs(MAX_LATENCY_SECS);
50        #[cfg(target_os = "none")]
51        info!("WindowState::new: init end_to_end");
52        let end_to_end = CuDurationStatistics::new(max_sample);
53        #[cfg(target_os = "none")]
54        info!("WindowState::new: init per_task");
55        #[cfg(target_os = "none")]
56        info!(
57            "WindowState::new: stats_size={} per_task_bytes={}",
58            core::mem::size_of::<CuDurationStatistics>(),
59            core::mem::size_of::<CuDurationStatistics>() * task_count
60        );
61        let per_task = vec![CuDurationStatistics::new(max_sample); task_count];
62        #[cfg(target_os = "none")]
63        info!("WindowState::new: init done");
64        Self {
65            total_copperlists: 0,
66            window_copperlists: 0,
67            last_report_at: None,
68            last_log_duration: CuDuration::MIN,
69            end_to_end,
70            per_task,
71        }
72    }
73
74    fn reset_window(&mut self, now: CuTime) {
75        self.window_copperlists = 0;
76        self.last_report_at = Some(now);
77        self.end_to_end.reset();
78        for stat in &mut self.per_task {
79            stat.reset();
80        }
81    }
82}
83
84struct Snapshot<'a> {
85    copperlist_index: u64,
86    rate_whole: u64,
87    rate_tenths: u64,
88    e2e_p50_us: u64,
89    e2e_p90_us: u64,
90    e2e_p99_us: u64,
91    e2e_max_us: u64,
92    slowest_task: &'a str,
93    slowest_task_p99_us: u64,
94    log_overhead_us: u64,
95}
96
97pub struct CuLogMon {
98    taskids: &'static [&'static str],
99    clock: Mutex<Option<RobotClock>>,
100    window: Mutex<WindowState>,
101}
102
103impl CuLogMon {
104    fn compute_snapshot<'a>(&'a self, state: &WindowState, now: CuTime) -> Option<Snapshot<'a>> {
105        let last_report = state.last_report_at?;
106
107        let elapsed = now - last_report;
108        let elapsed_ns = elapsed.as_nanos();
109
110        if elapsed_ns < CuDuration::from_secs(REPORT_INTERVAL_SECS).as_nanos() {
111            return None;
112        }
113
114        let rate_x10 = if elapsed_ns > 0 {
115            (state.window_copperlists as u64 * 10 * 1_000_000_000u64) / elapsed_ns
116        } else {
117            0
118        };
119
120        let slowest = find_slowest_task(&state.per_task);
121        let (slowest_task, slowest_task_p99_us) = slowest
122            .map(|(idx, dur)| {
123                let name = self.taskids.get(idx).copied().unwrap_or("<?>");
124                (name, dur.as_micros())
125            })
126            .unwrap_or(("none", 0));
127
128        let e2e_p50 = state.end_to_end.percentile(0.5).as_micros();
129        let e2e_p90 = state.end_to_end.percentile(0.9).as_micros();
130        let e2e_p99 = state.end_to_end.percentile(0.99).as_micros();
131        // Max can skew low if a bucket underflows; keep it at least as high as p99.
132        let e2e_max = state.end_to_end.max().as_micros().max(e2e_p99);
133
134        Some(Snapshot {
135            copperlist_index: state.total_copperlists,
136            rate_whole: rate_x10 / 10,
137            rate_tenths: rate_x10 % 10,
138            e2e_p50_us: e2e_p50,
139            e2e_p90_us: e2e_p90,
140            e2e_p99_us: e2e_p99,
141            e2e_max_us: e2e_max,
142            slowest_task,
143            slowest_task_p99_us,
144            log_overhead_us: state.last_log_duration.as_micros(),
145        })
146    }
147}
148
149fn task_duration(meta: &CuMsgMetadata) -> Option<CuDuration> {
150    let start = Option::<CuTime>::from(meta.process_time.start)?;
151    let end = Option::<CuTime>::from(meta.process_time.end)?;
152    (end >= start).then_some(end - start)
153}
154
155fn end_to_end_latency(msgs: &[&CuMsgMetadata]) -> Option<CuDuration> {
156    let start = msgs
157        .first()
158        .and_then(|m| Option::<CuTime>::from(m.process_time.start));
159    let end = msgs
160        .last()
161        .and_then(|m| Option::<CuTime>::from(m.process_time.end));
162    match (start, end) {
163        (Some(s), Some(e)) if e >= s => Some(e - s),
164        _ => None,
165    }
166}
167
168fn find_slowest_task(per_task: &[CuDurationStatistics]) -> Option<(usize, CuDuration)> {
169    per_task
170        .iter()
171        .enumerate()
172        .filter_map(|(idx, stats)| {
173            if stats.is_empty() {
174                None
175            } else {
176                Some((idx, stats.percentile(0.99)))
177            }
178        })
179        .max_by_key(|(_, dur)| dur.as_nanos())
180}
181
182fn task_state_label(state: &CuTaskState) -> &'static str {
183    match state {
184        CuTaskState::Start => "start",
185        CuTaskState::Preprocess => "pre",
186        CuTaskState::Process => "process",
187        CuTaskState::Postprocess => "post",
188        CuTaskState::Stop => "stop",
189    }
190}
191
192impl CuMonitor for CuLogMon {
193    fn new(_config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self> {
194        #[cfg(target_os = "none")]
195        info!("CuLogMon::new: task_count={}", taskids.len());
196        let window = WindowState::new(taskids.len());
197        #[cfg(target_os = "none")]
198        info!("CuLogMon::new: window ready");
199        Ok(Self {
200            taskids,
201            clock: Mutex::new(None),
202            window: Mutex::new(window),
203        })
204    }
205
206    fn start(&mut self, clock: &RobotClock) -> CuResult<()> {
207        *self.clock.lock() = Some(clock.clone());
208        let mut window = self.window.lock();
209        window.last_report_at = Some(clock.recent());
210        info!("cu_logmon started ({} tasks)", self.taskids.len());
211
212        // Also listen to structured logs and print them with color.
213        #[cfg(all(feature = "std", debug_assertions))]
214        register_live_log_listener(|entry, format_str, param_names| {
215            const PARAM_COLOR: &str = "\x1b[36m"; // cyan
216            const RESET: &str = "\x1b[0m";
217
218            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
219            let colored_params: Vec<String> = params
220                .iter()
221                .map(|v| format!("{PARAM_COLOR}{v}{RESET}"))
222                .collect();
223            let colored_named: HashMap<String, String> = param_names
224                .iter()
225                .zip(params.iter())
226                .map(|(k, v)| (k.to_string(), format!("{PARAM_COLOR}{v}{RESET}")))
227                .collect();
228
229            if let Ok(msg) =
230                format_message_only(format_str, colored_params.as_slice(), &colored_named)
231            {
232                let level_color = match entry.level {
233                    CuLogLevel::Debug => "\x1b[32m",   // green
234                    CuLogLevel::Info => "\x1b[90m",    // gray
235                    CuLogLevel::Warning => "\x1b[93m", // yellow
236                    CuLogLevel::Error => "\x1b[91m",   // red
237                    CuLogLevel::Critical => "\x1b[91m",
238                };
239                let ts_color = "\x1b[34m";
240                let ts = format_timestamp(entry.time);
241                println!(
242                    "{ts_color}{ts}{reset} {level_color}[{:?}]{reset} {msg}",
243                    entry.level,
244                    ts = ts,
245                    ts_color = ts_color,
246                    level_color = level_color,
247                    reset = RESET,
248                    msg = msg
249                );
250            }
251        });
252        Ok(())
253    }
254
255    fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
256        let Some(clock) = self.clock.lock().clone() else {
257            return Ok(());
258        };
259
260        let call_start = clock.recent();
261
262        let snapshot = {
263            let mut window = self.window.lock();
264            window.last_report_at.get_or_insert(call_start);
265
266            window.total_copperlists = window.total_copperlists.saturating_add(1);
267            window.window_copperlists = window.window_copperlists.saturating_add(1);
268
269            if let Some(latency) = end_to_end_latency(msgs) {
270                window.end_to_end.record(latency);
271            }
272
273            for (idx, meta) in msgs.iter().enumerate() {
274                if let Some(task_stat) = window.per_task.get_mut(idx)
275                    && let Some(duration) = task_duration(meta)
276                {
277                    task_stat.record(duration);
278                }
279            }
280
281            let snapshot = self.compute_snapshot(&window, call_start);
282            if snapshot.is_some() {
283                window.reset_window(call_start);
284            }
285            snapshot
286        };
287
288        if let Some(snapshot) = snapshot {
289            let log_start = clock.recent();
290            let use_color = cfg!(feature = "color_log");
291            let base = format!(
292                "[CL {}] rate {}.{} Hz | slowest {} {}us | e2e p50 {}us p90 {}us p99 {}us max {}us | log_overhead {}us",
293                snapshot.copperlist_index,
294                snapshot.rate_whole,
295                snapshot.rate_tenths,
296                snapshot.slowest_task,
297                snapshot.slowest_task_p99_us,
298                snapshot.e2e_p50_us,
299                snapshot.e2e_p90_us,
300                snapshot.e2e_p99_us,
301                snapshot.e2e_max_us,
302                snapshot.log_overhead_us,
303            );
304            if use_color {
305                // Colored labels for readability (values stay uncolored).
306                const CL_COLOR: &str = "\x1b[94m"; // blue
307                const LABEL_COLOR: &str = "\x1b[92m"; // green for main labels
308                const SUBLABEL_COLOR: &str = "\x1b[93m"; // yellow for sublabels
309                const TASK_NAME_COLOR: &str = "\x1b[38;5;208m"; // orange for task name
310                const RESET: &str = "\x1b[0m";
311                let colored = format!(
312                    "[{cl_color}CL {cl}{reset}] {label}rate{reset} {rate_whole}.{rate_tenths} Hz | {label}slowest{reset} {task_color}{slow_task}{reset} {slow_p99}us | {label}e2e{reset} {sublabel}p50{reset} {p50}us {sublabel}p90{reset} {p90}us {sublabel}p99{reset} {p99}us {sublabel}max{reset} {max}us | {label}log_overhead{reset} {log_overhead}us",
313                    cl_color = CL_COLOR,
314                    label = LABEL_COLOR,
315                    sublabel = SUBLABEL_COLOR,
316                    task_color = TASK_NAME_COLOR,
317                    reset = RESET,
318                    cl = snapshot.copperlist_index,
319                    rate_whole = snapshot.rate_whole,
320                    rate_tenths = snapshot.rate_tenths,
321                    slow_task = snapshot.slowest_task,
322                    slow_p99 = snapshot.slowest_task_p99_us,
323                    p50 = snapshot.e2e_p50_us,
324                    p90 = snapshot.e2e_p90_us,
325                    p99 = snapshot.e2e_p99_us,
326                    max = snapshot.e2e_max_us,
327                    log_overhead = snapshot.log_overhead_us,
328                );
329                info!("{}", &colored);
330            } else {
331                info!("{}", &base);
332            }
333            let log_end = clock.recent();
334            self.window.lock().last_log_duration = log_end - log_start;
335        }
336
337        Ok(())
338    }
339
340    fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
341        let task_name = self.taskids.get(taskid).copied().unwrap_or("<??>");
342        error!(
343            "Task {} @ {}: Error: {}.",
344            task_name,
345            task_state_label(&step),
346            error,
347        );
348        Decision::Ignore
349    }
350
351    fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
352        #[cfg(all(feature = "std", debug_assertions))]
353        unregister_live_log_listener();
354        Ok(())
355    }
356}