Skip to main content

cu_logmon/
lib.rs

1#![cfg_attr(not(feature = "std"), no_std)]
2
3//! Lightweight Copper monitor that periodically dumps compact runtime stats over the
4//! standard `debug!` / `info!` logging macros.
5//!
6//! This monitor is `no_std` friendly and keeps allocations to a minimum while still
7//! reporting a per-second summary of the Copperlist cadence and latencies.
8
9extern crate alloc;
10
11use alloc::format;
12use alloc::string::String;
13use alloc::vec;
14use alloc::vec::Vec;
15use core::fmt::Write as _;
16use cu29::prelude::*;
17#[cfg(all(feature = "std", debug_assertions))]
18use cu29_log_runtime::{
19    format_message_only, register_live_log_listener, unregister_live_log_listener,
20};
21use spin::Mutex;
22#[cfg(all(feature = "std", debug_assertions))]
23use std::collections::HashMap;
24
25const REPORT_INTERVAL_SECS: u64 = 1;
26const MAX_LATENCY_SECS: u64 = 5;
27
28#[cfg(all(feature = "std", debug_assertions))]
29fn format_timestamp(time: CuTime) -> String {
30    // Render CuTime (nanoseconds from an epoch) as HH:mm:ss.xxxx where xxxx is 1e-4 s.
31    let nanos = time.as_nanos();
32    let total_seconds = nanos / 1_000_000_000;
33    let hours = total_seconds / 3600;
34    let minutes = (total_seconds / 60) % 60;
35    let seconds = total_seconds % 60;
36    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000; // 4 fractional digits.
37    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
38}
39
40struct WindowState {
41    total_copperlists: u64,
42    window_copperlists: u32,
43    last_report_at: Option<CuTime>,
44    last_log_duration: CuDuration,
45    end_to_end: CuDurationStatistics,
46    per_component: Vec<CuDurationStatistics>,
47}
48
49impl WindowState {
50    fn new(component_count: usize, max_sample: CuDuration) -> Self {
51        #[cfg(target_os = "none")]
52        info!("WindowState::new: init end_to_end");
53        let end_to_end = CuDurationStatistics::new(max_sample);
54        #[cfg(target_os = "none")]
55        info!("WindowState::new: init per_component");
56        #[cfg(target_os = "none")]
57        info!(
58            "WindowState::new: stats_size={} per_component_bytes={}",
59            core::mem::size_of::<CuDurationStatistics>(),
60            core::mem::size_of::<CuDurationStatistics>() * component_count
61        );
62        let per_component = vec![CuDurationStatistics::new(max_sample); component_count];
63        #[cfg(target_os = "none")]
64        info!("WindowState::new: init done");
65        Self {
66            total_copperlists: 0,
67            window_copperlists: 0,
68            last_report_at: None,
69            last_log_duration: CuDuration::MIN,
70            end_to_end,
71            per_component,
72        }
73    }
74
75    fn reset_window(&mut self, now: CuTime) {
76        self.window_copperlists = 0;
77        self.last_report_at = Some(now);
78        self.end_to_end.reset();
79        for stat in &mut self.per_component {
80            stat.reset();
81        }
82    }
83}
84
85fn monitor_max_sample(monitor_cfg: Option<&ComponentConfig>) -> CuResult<CuDuration> {
86    if let Some(cfg) = monitor_cfg {
87        if let Some(us) = cfg.get::<u64>("max_latency_us")? {
88            if us == 0 {
89                return Err(CuError::from("cu_logmon max_latency_us must be > 0"));
90            }
91            return Ok(CuDuration::from_micros(us));
92        }
93        if let Some(ms) = cfg.get::<u64>("max_latency_ms")? {
94            if ms == 0 {
95                return Err(CuError::from("cu_logmon max_latency_ms must be > 0"));
96            }
97            return Ok(CuDuration::from_millis(ms));
98        }
99        if let Some(secs) = cfg.get::<u64>("max_latency_secs")? {
100            if secs == 0 {
101                return Err(CuError::from("cu_logmon max_latency_secs must be > 0"));
102            }
103            return Ok(CuDuration::from_secs(secs));
104        }
105    }
106    Ok(CuDuration::from_secs(MAX_LATENCY_SECS))
107}
108
109struct Snapshot {
110    copperlist_index: u64,
111    rate_whole: u64,
112    rate_tenths: u64,
113    e2e_p50_us: u64,
114    e2e_p90_us: u64,
115    e2e_p99_us: u64,
116    e2e_max_us: u64,
117    top4: String,
118    overhead_us: u64,
119}
120
121pub struct CuLogMon {
122    components: &'static [MonitorComponentMetadata],
123    component_count: usize,
124    window: Mutex<WindowState>,
125}
126
127impl CuLogMon {
128    fn component_name(&self, component_id: ComponentId) -> &'static str {
129        debug_assert!(component_id.index() < self.component_count);
130        self.components[component_id.index()].id()
131    }
132
133    fn compute_snapshot(&self, state: &WindowState, now: CuTime) -> Option<Snapshot> {
134        let last_report = state.last_report_at?;
135
136        let elapsed = now - last_report;
137        let elapsed_ns = elapsed.as_nanos();
138
139        if elapsed_ns < CuDuration::from_secs(REPORT_INTERVAL_SECS).as_nanos() {
140            return None;
141        }
142
143        let rate_x10 = (state.window_copperlists as u64 * 10 * 1_000_000_000u64)
144            .checked_div(elapsed_ns)
145            .unwrap_or(0);
146
147        let top4_max_entries = find_top_components_by_max(&state.per_component, 4);
148        let mut top4 = String::new();
149        if top4_max_entries.is_empty() {
150            top4.push_str("none");
151        } else {
152            for (rank, (component_id, dur)) in top4_max_entries.iter().enumerate() {
153                if rank > 0 {
154                    top4.push_str(", ");
155                }
156                let name = self.component_name(*component_id);
157                let _ = write!(&mut top4, "{} {}us", name, dur.as_micros());
158            }
159        }
160
161        let e2e_p50 = state.end_to_end.percentile(0.5).as_micros();
162        let e2e_p90 = state.end_to_end.percentile(0.9).as_micros();
163        let e2e_p99 = state.end_to_end.percentile(0.99).as_micros();
164        // Max can skew low if a bucket underflows; keep it at least as high as p99.
165        let e2e_max = state.end_to_end.max().as_micros().max(e2e_p99);
166
167        Some(Snapshot {
168            copperlist_index: state.total_copperlists,
169            rate_whole: rate_x10 / 10,
170            rate_tenths: rate_x10 % 10,
171            e2e_p50_us: e2e_p50,
172            e2e_p90_us: e2e_p90,
173            e2e_p99_us: e2e_p99,
174            e2e_max_us: e2e_max,
175            top4,
176            overhead_us: state.last_log_duration.as_micros(),
177        })
178    }
179}
180
181fn component_duration(meta: &CuMsgMetadata) -> Option<CuDuration> {
182    let start = Option::<CuTime>::from(meta.process_time.start)?;
183    let end = Option::<CuTime>::from(meta.process_time.end)?;
184    (end >= start).then_some(end - start)
185}
186
187fn end_to_end_latency(msgs: &[&CuMsgMetadata]) -> Option<CuDuration> {
188    let start = msgs
189        .first()
190        .and_then(|m| Option::<CuTime>::from(m.process_time.start));
191    let end = msgs
192        .last()
193        .and_then(|m| Option::<CuTime>::from(m.process_time.end));
194    match (start, end) {
195        (Some(s), Some(e)) if e >= s => Some(e - s),
196        _ => None,
197    }
198}
199
200fn find_top_components_by_max(
201    per_component: &[CuDurationStatistics],
202    limit: usize,
203) -> Vec<(ComponentId, CuDuration)> {
204    let mut ranked: Vec<(ComponentId, CuDuration)> = per_component
205        .iter()
206        .enumerate()
207        .filter_map(|(idx, stats)| {
208            (!stats.is_empty()).then_some((ComponentId::new(idx), stats.max()))
209        })
210        .collect();
211    ranked.sort_unstable_by(|a, b| {
212        b.1.as_nanos()
213            .cmp(&a.1.as_nanos())
214            .then_with(|| a.0.index().cmp(&b.0.index()))
215    });
216    ranked.truncate(limit);
217    ranked
218}
219
220fn component_state_label(state: &CuComponentState) -> &'static str {
221    match state {
222        CuComponentState::Start => "start",
223        CuComponentState::Preprocess => "pre",
224        CuComponentState::Process => "process",
225        CuComponentState::Postprocess => "post",
226        CuComponentState::Stop => "stop",
227    }
228}
229
230impl CuMonitor for CuLogMon {
231    fn new(metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
232        let components = metadata.components();
233        let component_count = components.len();
234        #[cfg(target_os = "none")]
235        info!("CuLogMon::new: component_count={}", component_count);
236        let max_sample = monitor_max_sample(metadata.monitor_config())?;
237        let window = WindowState::new(component_count, max_sample);
238        #[cfg(target_os = "none")]
239        info!("CuLogMon::new: window ready");
240        Ok(Self {
241            components,
242            component_count,
243            window: Mutex::new(window),
244        })
245    }
246
247    fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
248        let mut window = self.window.lock();
249        window.last_report_at = Some(ctx.recent());
250        info!("cu_logmon started ({} components)", self.component_count);
251
252        // Also listen to structured logs and print them with color.
253        #[cfg(all(feature = "std", debug_assertions))]
254        register_live_log_listener(|entry, format_str, param_names| {
255            const PARAM_COLOR: &str = "\x1b[36m"; // cyan
256            const RESET: &str = "\x1b[0m";
257
258            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
259            let colored_params: Vec<String> = params
260                .iter()
261                .map(|v| format!("{PARAM_COLOR}{v}{RESET}"))
262                .collect();
263            let colored_named: HashMap<String, String> = param_names
264                .iter()
265                .zip(params.iter())
266                .map(|(k, v)| (k.to_string(), format!("{PARAM_COLOR}{v}{RESET}")))
267                .collect();
268
269            if let Ok(msg) =
270                format_message_only(format_str, colored_params.as_slice(), &colored_named)
271            {
272                let level_color = match entry.level {
273                    CuLogLevel::Debug => "\x1b[32m",   // green
274                    CuLogLevel::Info => "\x1b[90m",    // gray
275                    CuLogLevel::Warning => "\x1b[93m", // yellow
276                    CuLogLevel::Error => "\x1b[91m",   // red
277                    CuLogLevel::Critical => "\x1b[91m",
278                };
279                let ts_color = "\x1b[34m";
280                let ts = format_timestamp(entry.time);
281                println!(
282                    "{ts_color}{ts}{reset} {level_color}[{:?}]{reset} {msg}",
283                    entry.level,
284                    ts = ts,
285                    ts_color = ts_color,
286                    level_color = level_color,
287                    reset = RESET,
288                    msg = msg
289                );
290            }
291        });
292        Ok(())
293    }
294
295    fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
296        let call_start = ctx.recent();
297
298        let snapshot = {
299            let mut window = self.window.lock();
300            window.last_report_at.get_or_insert(call_start);
301
302            window.total_copperlists = window.total_copperlists.saturating_add(1);
303            window.window_copperlists = window.window_copperlists.saturating_add(1);
304
305            if let Some(latency) = end_to_end_latency(view.msgs()) {
306                window.end_to_end.record(latency);
307            }
308
309            for entry in view.entries() {
310                let component_index = entry.component_id.index();
311                if let Some(component_stat) = window.per_component.get_mut(component_index)
312                    && let Some(duration) = component_duration(entry.msg)
313                {
314                    component_stat.record(duration);
315                } else {
316                    debug_assert!(
317                        component_index < window.per_component.len(),
318                        "cu_logmon: component index {} out of bounds {}",
319                        component_index,
320                        window.per_component.len()
321                    );
322                }
323            }
324
325            let snapshot = self.compute_snapshot(&window, call_start);
326            if snapshot.is_some() {
327                window.reset_window(call_start);
328            }
329            snapshot
330        };
331
332        if let Some(snapshot) = snapshot {
333            let log_start = ctx.recent();
334            let use_color = cfg!(feature = "color_log");
335            let base = format!(
336                "[CL {}] rate {}.{} Hz | top4 {} | e2e p50 {}us p90 {}us p99 {}us max {}us | overhead {}us",
337                snapshot.copperlist_index,
338                snapshot.rate_whole,
339                snapshot.rate_tenths,
340                snapshot.top4,
341                snapshot.e2e_p50_us,
342                snapshot.e2e_p90_us,
343                snapshot.e2e_p99_us,
344                snapshot.e2e_max_us,
345                snapshot.overhead_us,
346            );
347            if use_color {
348                // Colored labels for readability (values stay uncolored).
349                const CL_COLOR: &str = "\x1b[94m"; // blue
350                const LABEL_COLOR: &str = "\x1b[92m"; // green for main labels
351                const SUBLABEL_COLOR: &str = "\x1b[93m"; // yellow for sublabels
352                const COMPONENT_NAME_COLOR: &str = "\x1b[38;5;208m"; // orange for component name
353                const RESET: &str = "\x1b[0m";
354                let colored = format!(
355                    "[{cl_color}CL {cl}{reset}] {label}rate{reset} {rate_whole}.{rate_tenths} Hz | {label}top4{reset} {component_color}{top4}{reset} | {label}e2e{reset} {sublabel}p50{reset} {p50}us {sublabel}p90{reset} {p90}us {sublabel}p99{reset} {p99}us {sublabel}max{reset} {max}us | {label}overhead{reset} {overhead}us",
356                    cl_color = CL_COLOR,
357                    label = LABEL_COLOR,
358                    sublabel = SUBLABEL_COLOR,
359                    component_color = COMPONENT_NAME_COLOR,
360                    reset = RESET,
361                    cl = snapshot.copperlist_index,
362                    rate_whole = snapshot.rate_whole,
363                    rate_tenths = snapshot.rate_tenths,
364                    p50 = snapshot.e2e_p50_us,
365                    p90 = snapshot.e2e_p90_us,
366                    p99 = snapshot.e2e_p99_us,
367                    max = snapshot.e2e_max_us,
368                    top4 = snapshot.top4,
369                    overhead = snapshot.overhead_us,
370                );
371                info!("{}", &colored);
372            } else {
373                info!("{}", &base);
374            }
375            let log_end = ctx.recent();
376            self.window.lock().last_log_duration = log_end - log_start;
377        }
378
379        Ok(())
380    }
381
382    fn process_error(
383        &self,
384        component_id: ComponentId,
385        step: CuComponentState,
386        error: &CuError,
387    ) -> Decision {
388        let component_name = self.component_name(component_id);
389        error!(
390            "Component {} @ {}: Error: {}.",
391            component_name,
392            component_state_label(&step),
393            error,
394        );
395        Decision::Ignore
396    }
397
398    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
399        #[cfg(all(feature = "std", debug_assertions))]
400        unregister_live_log_listener();
401        Ok(())
402    }
403}