1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
10
11use alloc::format;
12use alloc::string::String;
13use alloc::vec;
14use alloc::vec::Vec;
15use core::fmt::Write as _;
16use cu29::prelude::*;
17#[cfg(all(feature = "std", debug_assertions))]
18use cu29_log_runtime::{
19 format_message_only, register_live_log_listener, unregister_live_log_listener,
20};
21use spin::Mutex;
22#[cfg(all(feature = "std", debug_assertions))]
23use std::collections::HashMap;
24
25const REPORT_INTERVAL_SECS: u64 = 1;
26const MAX_LATENCY_SECS: u64 = 5;
27
28#[cfg(all(feature = "std", debug_assertions))]
29fn format_timestamp(time: CuTime) -> String {
30 let nanos = time.as_nanos();
32 let total_seconds = nanos / 1_000_000_000;
33 let hours = total_seconds / 3600;
34 let minutes = (total_seconds / 60) % 60;
35 let seconds = total_seconds % 60;
36 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000; format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
38}
39
40struct WindowState {
41 total_copperlists: u64,
42 window_copperlists: u32,
43 last_report_at: Option<CuTime>,
44 last_log_duration: CuDuration,
45 end_to_end: CuDurationStatistics,
46 per_component: Vec<CuDurationStatistics>,
47}
48
49impl WindowState {
50 fn new(component_count: usize, max_sample: CuDuration) -> Self {
51 #[cfg(target_os = "none")]
52 info!("WindowState::new: init end_to_end");
53 let end_to_end = CuDurationStatistics::new(max_sample);
54 #[cfg(target_os = "none")]
55 info!("WindowState::new: init per_component");
56 #[cfg(target_os = "none")]
57 info!(
58 "WindowState::new: stats_size={} per_component_bytes={}",
59 core::mem::size_of::<CuDurationStatistics>(),
60 core::mem::size_of::<CuDurationStatistics>() * component_count
61 );
62 let per_component = vec![CuDurationStatistics::new(max_sample); component_count];
63 #[cfg(target_os = "none")]
64 info!("WindowState::new: init done");
65 Self {
66 total_copperlists: 0,
67 window_copperlists: 0,
68 last_report_at: None,
69 last_log_duration: CuDuration::MIN,
70 end_to_end,
71 per_component,
72 }
73 }
74
75 fn reset_window(&mut self, now: CuTime) {
76 self.window_copperlists = 0;
77 self.last_report_at = Some(now);
78 self.end_to_end.reset();
79 for stat in &mut self.per_component {
80 stat.reset();
81 }
82 }
83}
84
85fn monitor_max_sample(monitor_cfg: Option<&ComponentConfig>) -> CuResult<CuDuration> {
86 if let Some(cfg) = monitor_cfg {
87 if let Some(us) = cfg.get::<u64>("max_latency_us")? {
88 if us == 0 {
89 return Err(CuError::from("cu_logmon max_latency_us must be > 0"));
90 }
91 return Ok(CuDuration::from_micros(us));
92 }
93 if let Some(ms) = cfg.get::<u64>("max_latency_ms")? {
94 if ms == 0 {
95 return Err(CuError::from("cu_logmon max_latency_ms must be > 0"));
96 }
97 return Ok(CuDuration::from_millis(ms));
98 }
99 if let Some(secs) = cfg.get::<u64>("max_latency_secs")? {
100 if secs == 0 {
101 return Err(CuError::from("cu_logmon max_latency_secs must be > 0"));
102 }
103 return Ok(CuDuration::from_secs(secs));
104 }
105 }
106 Ok(CuDuration::from_secs(MAX_LATENCY_SECS))
107}
108
109struct Snapshot {
110 copperlist_index: u64,
111 rate_whole: u64,
112 rate_tenths: u64,
113 e2e_p50_us: u64,
114 e2e_p90_us: u64,
115 e2e_p99_us: u64,
116 e2e_max_us: u64,
117 top4: String,
118 overhead_us: u64,
119}
120
121pub struct CuLogMon {
122 components: &'static [MonitorComponentMetadata],
123 component_count: usize,
124 window: Mutex<WindowState>,
125}
126
127impl CuLogMon {
128 fn component_name(&self, component_id: ComponentId) -> &'static str {
129 debug_assert!(component_id.index() < self.component_count);
130 self.components[component_id.index()].id()
131 }
132
133 fn compute_snapshot(&self, state: &WindowState, now: CuTime) -> Option<Snapshot> {
134 let last_report = state.last_report_at?;
135
136 let elapsed = now - last_report;
137 let elapsed_ns = elapsed.as_nanos();
138
139 if elapsed_ns < CuDuration::from_secs(REPORT_INTERVAL_SECS).as_nanos() {
140 return None;
141 }
142
143 let rate_x10 = (state.window_copperlists as u64 * 10 * 1_000_000_000u64)
144 .checked_div(elapsed_ns)
145 .unwrap_or(0);
146
147 let top4_max_entries = find_top_components_by_max(&state.per_component, 4);
148 let mut top4 = String::new();
149 if top4_max_entries.is_empty() {
150 top4.push_str("none");
151 } else {
152 for (rank, (component_id, dur)) in top4_max_entries.iter().enumerate() {
153 if rank > 0 {
154 top4.push_str(", ");
155 }
156 let name = self.component_name(*component_id);
157 let _ = write!(&mut top4, "{} {}us", name, dur.as_micros());
158 }
159 }
160
161 let e2e_p50 = state.end_to_end.percentile(0.5).as_micros();
162 let e2e_p90 = state.end_to_end.percentile(0.9).as_micros();
163 let e2e_p99 = state.end_to_end.percentile(0.99).as_micros();
164 let e2e_max = state.end_to_end.max().as_micros().max(e2e_p99);
166
167 Some(Snapshot {
168 copperlist_index: state.total_copperlists,
169 rate_whole: rate_x10 / 10,
170 rate_tenths: rate_x10 % 10,
171 e2e_p50_us: e2e_p50,
172 e2e_p90_us: e2e_p90,
173 e2e_p99_us: e2e_p99,
174 e2e_max_us: e2e_max,
175 top4,
176 overhead_us: state.last_log_duration.as_micros(),
177 })
178 }
179}
180
181fn component_duration(meta: &CuMsgMetadata) -> Option<CuDuration> {
182 let start = Option::<CuTime>::from(meta.process_time.start)?;
183 let end = Option::<CuTime>::from(meta.process_time.end)?;
184 (end >= start).then_some(end - start)
185}
186
187fn end_to_end_latency(msgs: &[&CuMsgMetadata]) -> Option<CuDuration> {
188 let start = msgs
189 .first()
190 .and_then(|m| Option::<CuTime>::from(m.process_time.start));
191 let end = msgs
192 .last()
193 .and_then(|m| Option::<CuTime>::from(m.process_time.end));
194 match (start, end) {
195 (Some(s), Some(e)) if e >= s => Some(e - s),
196 _ => None,
197 }
198}
199
200fn find_top_components_by_max(
201 per_component: &[CuDurationStatistics],
202 limit: usize,
203) -> Vec<(ComponentId, CuDuration)> {
204 let mut ranked: Vec<(ComponentId, CuDuration)> = per_component
205 .iter()
206 .enumerate()
207 .filter_map(|(idx, stats)| {
208 (!stats.is_empty()).then_some((ComponentId::new(idx), stats.max()))
209 })
210 .collect();
211 ranked.sort_unstable_by(|a, b| {
212 b.1.as_nanos()
213 .cmp(&a.1.as_nanos())
214 .then_with(|| a.0.index().cmp(&b.0.index()))
215 });
216 ranked.truncate(limit);
217 ranked
218}
219
220fn component_state_label(state: &CuComponentState) -> &'static str {
221 match state {
222 CuComponentState::Start => "start",
223 CuComponentState::Preprocess => "pre",
224 CuComponentState::Process => "process",
225 CuComponentState::Postprocess => "post",
226 CuComponentState::Stop => "stop",
227 }
228}
229
230impl CuMonitor for CuLogMon {
231 fn new(metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
232 let components = metadata.components();
233 let component_count = components.len();
234 #[cfg(target_os = "none")]
235 info!("CuLogMon::new: component_count={}", component_count);
236 let max_sample = monitor_max_sample(metadata.monitor_config())?;
237 let window = WindowState::new(component_count, max_sample);
238 #[cfg(target_os = "none")]
239 info!("CuLogMon::new: window ready");
240 Ok(Self {
241 components,
242 component_count,
243 window: Mutex::new(window),
244 })
245 }
246
247 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
248 let mut window = self.window.lock();
249 window.last_report_at = Some(ctx.recent());
250 info!("cu_logmon started ({} components)", self.component_count);
251
252 #[cfg(all(feature = "std", debug_assertions))]
254 register_live_log_listener(|entry, format_str, param_names| {
255 const PARAM_COLOR: &str = "\x1b[36m"; const RESET: &str = "\x1b[0m";
257
258 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
259 let colored_params: Vec<String> = params
260 .iter()
261 .map(|v| format!("{PARAM_COLOR}{v}{RESET}"))
262 .collect();
263 let colored_named: HashMap<String, String> = param_names
264 .iter()
265 .zip(params.iter())
266 .map(|(k, v)| (k.to_string(), format!("{PARAM_COLOR}{v}{RESET}")))
267 .collect();
268
269 if let Ok(msg) =
270 format_message_only(format_str, colored_params.as_slice(), &colored_named)
271 {
272 let level_color = match entry.level {
273 CuLogLevel::Debug => "\x1b[32m", CuLogLevel::Info => "\x1b[90m", CuLogLevel::Warning => "\x1b[93m", CuLogLevel::Error => "\x1b[91m", CuLogLevel::Critical => "\x1b[91m",
278 };
279 let ts_color = "\x1b[34m";
280 let ts = format_timestamp(entry.time);
281 println!(
282 "{ts_color}{ts}{reset} {level_color}[{:?}]{reset} {msg}",
283 entry.level,
284 ts = ts,
285 ts_color = ts_color,
286 level_color = level_color,
287 reset = RESET,
288 msg = msg
289 );
290 }
291 });
292 Ok(())
293 }
294
295 fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
296 let call_start = ctx.recent();
297
298 let snapshot = {
299 let mut window = self.window.lock();
300 window.last_report_at.get_or_insert(call_start);
301
302 window.total_copperlists = window.total_copperlists.saturating_add(1);
303 window.window_copperlists = window.window_copperlists.saturating_add(1);
304
305 if let Some(latency) = end_to_end_latency(view.msgs()) {
306 window.end_to_end.record(latency);
307 }
308
309 for entry in view.entries() {
310 let component_index = entry.component_id.index();
311 if let Some(component_stat) = window.per_component.get_mut(component_index)
312 && let Some(duration) = component_duration(entry.msg)
313 {
314 component_stat.record(duration);
315 } else {
316 debug_assert!(
317 component_index < window.per_component.len(),
318 "cu_logmon: component index {} out of bounds {}",
319 component_index,
320 window.per_component.len()
321 );
322 }
323 }
324
325 let snapshot = self.compute_snapshot(&window, call_start);
326 if snapshot.is_some() {
327 window.reset_window(call_start);
328 }
329 snapshot
330 };
331
332 if let Some(snapshot) = snapshot {
333 let log_start = ctx.recent();
334 let use_color = cfg!(feature = "color_log");
335 let base = format!(
336 "[CL {}] rate {}.{} Hz | top4 {} | e2e p50 {}us p90 {}us p99 {}us max {}us | overhead {}us",
337 snapshot.copperlist_index,
338 snapshot.rate_whole,
339 snapshot.rate_tenths,
340 snapshot.top4,
341 snapshot.e2e_p50_us,
342 snapshot.e2e_p90_us,
343 snapshot.e2e_p99_us,
344 snapshot.e2e_max_us,
345 snapshot.overhead_us,
346 );
347 if use_color {
348 const CL_COLOR: &str = "\x1b[94m"; const LABEL_COLOR: &str = "\x1b[92m"; const SUBLABEL_COLOR: &str = "\x1b[93m"; const COMPONENT_NAME_COLOR: &str = "\x1b[38;5;208m"; const RESET: &str = "\x1b[0m";
354 let colored = format!(
355 "[{cl_color}CL {cl}{reset}] {label}rate{reset} {rate_whole}.{rate_tenths} Hz | {label}top4{reset} {component_color}{top4}{reset} | {label}e2e{reset} {sublabel}p50{reset} {p50}us {sublabel}p90{reset} {p90}us {sublabel}p99{reset} {p99}us {sublabel}max{reset} {max}us | {label}overhead{reset} {overhead}us",
356 cl_color = CL_COLOR,
357 label = LABEL_COLOR,
358 sublabel = SUBLABEL_COLOR,
359 component_color = COMPONENT_NAME_COLOR,
360 reset = RESET,
361 cl = snapshot.copperlist_index,
362 rate_whole = snapshot.rate_whole,
363 rate_tenths = snapshot.rate_tenths,
364 p50 = snapshot.e2e_p50_us,
365 p90 = snapshot.e2e_p90_us,
366 p99 = snapshot.e2e_p99_us,
367 max = snapshot.e2e_max_us,
368 top4 = snapshot.top4,
369 overhead = snapshot.overhead_us,
370 );
371 info!("{}", &colored);
372 } else {
373 info!("{}", &base);
374 }
375 let log_end = ctx.recent();
376 self.window.lock().last_log_duration = log_end - log_start;
377 }
378
379 Ok(())
380 }
381
382 fn process_error(
383 &self,
384 component_id: ComponentId,
385 step: CuComponentState,
386 error: &CuError,
387 ) -> Decision {
388 let component_name = self.component_name(component_id);
389 error!(
390 "Component {} @ {}: Error: {}.",
391 component_name,
392 component_state_label(&step),
393 error,
394 );
395 Decision::Ignore
396 }
397
398 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
399 #[cfg(all(feature = "std", debug_assertions))]
400 unregister_live_log_listener();
401 Ok(())
402 }
403}