1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
10
11use alloc::format;
12use alloc::vec;
13use alloc::vec::Vec;
14use cu29::prelude::*;
15#[cfg(all(feature = "std", debug_assertions))]
16use cu29_log_runtime::{
17 format_message_only, register_live_log_listener, unregister_live_log_listener,
18};
19use spin::Mutex;
20#[cfg(all(feature = "std", debug_assertions))]
21use std::collections::HashMap;
22
23const REPORT_INTERVAL_SECS: u64 = 1;
24const MAX_LATENCY_SECS: u64 = 5;
25
26#[cfg(all(feature = "std", debug_assertions))]
27fn format_timestamp(time: CuTime) -> String {
28 let nanos = time.as_nanos();
30 let total_seconds = nanos / 1_000_000_000;
31 let hours = total_seconds / 3600;
32 let minutes = (total_seconds / 60) % 60;
33 let seconds = total_seconds % 60;
34 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000; format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
36}
37
38struct WindowState {
39 total_copperlists: u64,
40 window_copperlists: u32,
41 last_report_at: Option<CuTime>,
42 last_log_duration: CuDuration,
43 end_to_end: CuDurationStatistics,
44 per_task: Vec<CuDurationStatistics>,
45}
46
47impl WindowState {
48 fn new(task_count: usize) -> Self {
49 let max_sample = CuDuration::from_secs(MAX_LATENCY_SECS);
50 #[cfg(target_os = "none")]
51 info!("WindowState::new: init end_to_end");
52 let end_to_end = CuDurationStatistics::new(max_sample);
53 #[cfg(target_os = "none")]
54 info!("WindowState::new: init per_task");
55 #[cfg(target_os = "none")]
56 info!(
57 "WindowState::new: stats_size={} per_task_bytes={}",
58 core::mem::size_of::<CuDurationStatistics>(),
59 core::mem::size_of::<CuDurationStatistics>() * task_count
60 );
61 let per_task = vec![CuDurationStatistics::new(max_sample); task_count];
62 #[cfg(target_os = "none")]
63 info!("WindowState::new: init done");
64 Self {
65 total_copperlists: 0,
66 window_copperlists: 0,
67 last_report_at: None,
68 last_log_duration: CuDuration::MIN,
69 end_to_end,
70 per_task,
71 }
72 }
73
74 fn reset_window(&mut self, now: CuTime) {
75 self.window_copperlists = 0;
76 self.last_report_at = Some(now);
77 self.end_to_end.reset();
78 for stat in &mut self.per_task {
79 stat.reset();
80 }
81 }
82}
83
84struct Snapshot<'a> {
85 copperlist_index: u64,
86 rate_whole: u64,
87 rate_tenths: u64,
88 e2e_p50_us: u64,
89 e2e_p90_us: u64,
90 e2e_p99_us: u64,
91 e2e_max_us: u64,
92 slowest_task: &'a str,
93 slowest_task_p99_us: u64,
94 log_overhead_us: u64,
95}
96
97pub struct CuLogMon {
98 taskids: &'static [&'static str],
99 clock: Mutex<Option<RobotClock>>,
100 window: Mutex<WindowState>,
101}
102
103impl CuLogMon {
104 fn compute_snapshot<'a>(&'a self, state: &WindowState, now: CuTime) -> Option<Snapshot<'a>> {
105 let last_report = state.last_report_at?;
106
107 let elapsed = now - last_report;
108 let elapsed_ns = elapsed.as_nanos();
109
110 if elapsed_ns < CuDuration::from_secs(REPORT_INTERVAL_SECS).as_nanos() {
111 return None;
112 }
113
114 let rate_x10 = if elapsed_ns > 0 {
115 (state.window_copperlists as u64 * 10 * 1_000_000_000u64) / elapsed_ns
116 } else {
117 0
118 };
119
120 let slowest = find_slowest_task(&state.per_task);
121 let (slowest_task, slowest_task_p99_us) = slowest
122 .map(|(idx, dur)| {
123 let name = self.taskids.get(idx).copied().unwrap_or("<?>");
124 (name, dur.as_micros())
125 })
126 .unwrap_or(("none", 0));
127
128 let e2e_p50 = state.end_to_end.percentile(0.5).as_micros();
129 let e2e_p90 = state.end_to_end.percentile(0.9).as_micros();
130 let e2e_p99 = state.end_to_end.percentile(0.99).as_micros();
131 let e2e_max = state.end_to_end.max().as_micros().max(e2e_p99);
133
134 Some(Snapshot {
135 copperlist_index: state.total_copperlists,
136 rate_whole: rate_x10 / 10,
137 rate_tenths: rate_x10 % 10,
138 e2e_p50_us: e2e_p50,
139 e2e_p90_us: e2e_p90,
140 e2e_p99_us: e2e_p99,
141 e2e_max_us: e2e_max,
142 slowest_task,
143 slowest_task_p99_us,
144 log_overhead_us: state.last_log_duration.as_micros(),
145 })
146 }
147}
148
149fn task_duration(meta: &CuMsgMetadata) -> Option<CuDuration> {
150 let start = Option::<CuTime>::from(meta.process_time.start)?;
151 let end = Option::<CuTime>::from(meta.process_time.end)?;
152 (end >= start).then_some(end - start)
153}
154
155fn end_to_end_latency(msgs: &[&CuMsgMetadata]) -> Option<CuDuration> {
156 let start = msgs
157 .first()
158 .and_then(|m| Option::<CuTime>::from(m.process_time.start));
159 let end = msgs
160 .last()
161 .and_then(|m| Option::<CuTime>::from(m.process_time.end));
162 match (start, end) {
163 (Some(s), Some(e)) if e >= s => Some(e - s),
164 _ => None,
165 }
166}
167
168fn find_slowest_task(per_task: &[CuDurationStatistics]) -> Option<(usize, CuDuration)> {
169 per_task
170 .iter()
171 .enumerate()
172 .filter_map(|(idx, stats)| {
173 if stats.is_empty() {
174 None
175 } else {
176 Some((idx, stats.percentile(0.99)))
177 }
178 })
179 .max_by_key(|(_, dur)| dur.as_nanos())
180}
181
182fn task_state_label(state: &CuTaskState) -> &'static str {
183 match state {
184 CuTaskState::Start => "start",
185 CuTaskState::Preprocess => "pre",
186 CuTaskState::Process => "process",
187 CuTaskState::Postprocess => "post",
188 CuTaskState::Stop => "stop",
189 }
190}
191
192impl CuMonitor for CuLogMon {
193 fn new(_config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self> {
194 #[cfg(target_os = "none")]
195 info!("CuLogMon::new: task_count={}", taskids.len());
196 let window = WindowState::new(taskids.len());
197 #[cfg(target_os = "none")]
198 info!("CuLogMon::new: window ready");
199 Ok(Self {
200 taskids,
201 clock: Mutex::new(None),
202 window: Mutex::new(window),
203 })
204 }
205
206 fn start(&mut self, clock: &RobotClock) -> CuResult<()> {
207 *self.clock.lock() = Some(clock.clone());
208 let mut window = self.window.lock();
209 window.last_report_at = Some(clock.recent());
210 info!("cu_logmon started ({} tasks)", self.taskids.len());
211
212 #[cfg(all(feature = "std", debug_assertions))]
214 register_live_log_listener(|entry, format_str, param_names| {
215 const PARAM_COLOR: &str = "\x1b[36m"; const RESET: &str = "\x1b[0m";
217
218 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
219 let colored_params: Vec<String> = params
220 .iter()
221 .map(|v| format!("{PARAM_COLOR}{v}{RESET}"))
222 .collect();
223 let colored_named: HashMap<String, String> = param_names
224 .iter()
225 .zip(params.iter())
226 .map(|(k, v)| (k.to_string(), format!("{PARAM_COLOR}{v}{RESET}")))
227 .collect();
228
229 if let Ok(msg) =
230 format_message_only(format_str, colored_params.as_slice(), &colored_named)
231 {
232 let level_color = match entry.level {
233 CuLogLevel::Debug => "\x1b[32m", CuLogLevel::Info => "\x1b[90m", CuLogLevel::Warning => "\x1b[93m", CuLogLevel::Error => "\x1b[91m", CuLogLevel::Critical => "\x1b[91m",
238 };
239 let ts_color = "\x1b[34m";
240 let ts = format_timestamp(entry.time);
241 println!(
242 "{ts_color}{ts}{reset} {level_color}[{:?}]{reset} {msg}",
243 entry.level,
244 ts = ts,
245 ts_color = ts_color,
246 level_color = level_color,
247 reset = RESET,
248 msg = msg
249 );
250 }
251 });
252 Ok(())
253 }
254
255 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
256 let Some(clock) = self.clock.lock().clone() else {
257 return Ok(());
258 };
259
260 let call_start = clock.recent();
261
262 let snapshot = {
263 let mut window = self.window.lock();
264 window.last_report_at.get_or_insert(call_start);
265
266 window.total_copperlists = window.total_copperlists.saturating_add(1);
267 window.window_copperlists = window.window_copperlists.saturating_add(1);
268
269 if let Some(latency) = end_to_end_latency(msgs) {
270 window.end_to_end.record(latency);
271 }
272
273 for (idx, meta) in msgs.iter().enumerate() {
274 if let Some(task_stat) = window.per_task.get_mut(idx)
275 && let Some(duration) = task_duration(meta)
276 {
277 task_stat.record(duration);
278 }
279 }
280
281 let snapshot = self.compute_snapshot(&window, call_start);
282 if snapshot.is_some() {
283 window.reset_window(call_start);
284 }
285 snapshot
286 };
287
288 if let Some(snapshot) = snapshot {
289 let log_start = clock.recent();
290 let use_color = cfg!(feature = "color_log");
291 let base = format!(
292 "[CL {}] rate {}.{} Hz | slowest {} {}us | e2e p50 {}us p90 {}us p99 {}us max {}us | log_overhead {}us",
293 snapshot.copperlist_index,
294 snapshot.rate_whole,
295 snapshot.rate_tenths,
296 snapshot.slowest_task,
297 snapshot.slowest_task_p99_us,
298 snapshot.e2e_p50_us,
299 snapshot.e2e_p90_us,
300 snapshot.e2e_p99_us,
301 snapshot.e2e_max_us,
302 snapshot.log_overhead_us,
303 );
304 if use_color {
305 const CL_COLOR: &str = "\x1b[94m"; const LABEL_COLOR: &str = "\x1b[92m"; const SUBLABEL_COLOR: &str = "\x1b[93m"; const TASK_NAME_COLOR: &str = "\x1b[38;5;208m"; const RESET: &str = "\x1b[0m";
311 let colored = format!(
312 "[{cl_color}CL {cl}{reset}] {label}rate{reset} {rate_whole}.{rate_tenths} Hz | {label}slowest{reset} {task_color}{slow_task}{reset} {slow_p99}us | {label}e2e{reset} {sublabel}p50{reset} {p50}us {sublabel}p90{reset} {p90}us {sublabel}p99{reset} {p99}us {sublabel}max{reset} {max}us | {label}log_overhead{reset} {log_overhead}us",
313 cl_color = CL_COLOR,
314 label = LABEL_COLOR,
315 sublabel = SUBLABEL_COLOR,
316 task_color = TASK_NAME_COLOR,
317 reset = RESET,
318 cl = snapshot.copperlist_index,
319 rate_whole = snapshot.rate_whole,
320 rate_tenths = snapshot.rate_tenths,
321 slow_task = snapshot.slowest_task,
322 slow_p99 = snapshot.slowest_task_p99_us,
323 p50 = snapshot.e2e_p50_us,
324 p90 = snapshot.e2e_p90_us,
325 p99 = snapshot.e2e_p99_us,
326 max = snapshot.e2e_max_us,
327 log_overhead = snapshot.log_overhead_us,
328 );
329 info!("{}", &colored);
330 } else {
331 info!("{}", &base);
332 }
333 let log_end = clock.recent();
334 self.window.lock().last_log_duration = log_end - log_start;
335 }
336
337 Ok(())
338 }
339
340 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
341 let task_name = self.taskids.get(taskid).copied().unwrap_or("<??>");
342 error!(
343 "Task {} @ {}: Error: {}.",
344 task_name,
345 task_state_label(&step),
346 error,
347 );
348 Decision::Ignore
349 }
350
351 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
352 #[cfg(all(feature = "std", debug_assertions))]
353 unregister_live_log_listener();
354 Ok(())
355 }
356}