1#![cfg_attr(not(feature = "std"), no_std)]
2
3extern crate alloc;
10
11use alloc::format;
12use alloc::vec;
13use alloc::vec::Vec;
14use cu29::prelude::*;
15use spin::Mutex;
16
17const REPORT_INTERVAL_SECS: u64 = 1;
18const MAX_LATENCY_SECS: u64 = 5;
19
20struct WindowState {
21 total_copperlists: u64,
22 window_copperlists: u32,
23 last_report_at: Option<CuTime>,
24 last_log_duration: CuDuration,
25 end_to_end: CuDurationStatistics,
26 per_task: Vec<CuDurationStatistics>,
27}
28
29impl WindowState {
30 fn new(task_count: usize) -> Self {
31 let max_sample = CuDuration::from_secs(MAX_LATENCY_SECS);
32 #[cfg(target_os = "none")]
33 info!("WindowState::new: init end_to_end");
34 let end_to_end = CuDurationStatistics::new(max_sample);
35 #[cfg(target_os = "none")]
36 info!("WindowState::new: init per_task");
37 #[cfg(target_os = "none")]
38 info!(
39 "WindowState::new: stats_size={} per_task_bytes={}",
40 core::mem::size_of::<CuDurationStatistics>(),
41 core::mem::size_of::<CuDurationStatistics>() * task_count
42 );
43 let per_task = vec![CuDurationStatistics::new(max_sample); task_count];
44 #[cfg(target_os = "none")]
45 info!("WindowState::new: init done");
46 Self {
47 total_copperlists: 0,
48 window_copperlists: 0,
49 last_report_at: None,
50 last_log_duration: CuDuration::MIN,
51 end_to_end,
52 per_task,
53 }
54 }
55
56 fn reset_window(&mut self, now: CuTime) {
57 self.window_copperlists = 0;
58 self.last_report_at = Some(now);
59 self.end_to_end.reset();
60 for stat in &mut self.per_task {
61 stat.reset();
62 }
63 }
64}
65
66struct Snapshot<'a> {
67 copperlist_index: u64,
68 rate_whole: u64,
69 rate_tenths: u64,
70 e2e_p50_us: u64,
71 e2e_p90_us: u64,
72 e2e_p99_us: u64,
73 e2e_max_us: u64,
74 slowest_task: &'a str,
75 slowest_task_p99_us: u64,
76 log_overhead_us: u64,
77}
78
79pub struct CuLogMon {
80 taskids: &'static [&'static str],
81 clock: Mutex<Option<RobotClock>>,
82 window: Mutex<WindowState>,
83}
84
85impl CuLogMon {
86 fn compute_snapshot<'a>(&'a self, state: &WindowState, now: CuTime) -> Option<Snapshot<'a>> {
87 let last_report = state.last_report_at?;
88
89 let elapsed = now - last_report;
90 let elapsed_ns = elapsed.as_nanos();
91
92 if elapsed_ns < CuDuration::from_secs(REPORT_INTERVAL_SECS).as_nanos() {
93 return None;
94 }
95
96 let rate_x10 = if elapsed_ns > 0 {
97 (state.window_copperlists as u64 * 10 * 1_000_000_000u64) / elapsed_ns
98 } else {
99 0
100 };
101
102 let slowest = find_slowest_task(&state.per_task);
103 let (slowest_task, slowest_task_p99_us) = slowest
104 .map(|(idx, dur)| {
105 let name = self.taskids.get(idx).copied().unwrap_or("<?>");
106 (name, dur.as_micros())
107 })
108 .unwrap_or(("none", 0));
109
110 let e2e_p50 = state.end_to_end.percentile(0.5).as_micros();
111 let e2e_p90 = state.end_to_end.percentile(0.9).as_micros();
112 let e2e_p99 = state.end_to_end.percentile(0.99).as_micros();
113 let e2e_max = state.end_to_end.max().as_micros().max(e2e_p99);
115
116 Some(Snapshot {
117 copperlist_index: state.total_copperlists,
118 rate_whole: rate_x10 / 10,
119 rate_tenths: rate_x10 % 10,
120 e2e_p50_us: e2e_p50,
121 e2e_p90_us: e2e_p90,
122 e2e_p99_us: e2e_p99,
123 e2e_max_us: e2e_max,
124 slowest_task,
125 slowest_task_p99_us,
126 log_overhead_us: state.last_log_duration.as_micros(),
127 })
128 }
129}
130
131fn task_duration(meta: &CuMsgMetadata) -> Option<CuDuration> {
132 let start = Option::<CuTime>::from(meta.process_time.start)?;
133 let end = Option::<CuTime>::from(meta.process_time.end)?;
134 (end >= start).then_some(end - start)
135}
136
137fn end_to_end_latency(msgs: &[&CuMsgMetadata]) -> Option<CuDuration> {
138 let start = msgs
139 .first()
140 .and_then(|m| Option::<CuTime>::from(m.process_time.start));
141 let end = msgs
142 .last()
143 .and_then(|m| Option::<CuTime>::from(m.process_time.end));
144 match (start, end) {
145 (Some(s), Some(e)) if e >= s => Some(e - s),
146 _ => None,
147 }
148}
149
150fn find_slowest_task(per_task: &[CuDurationStatistics]) -> Option<(usize, CuDuration)> {
151 per_task
152 .iter()
153 .enumerate()
154 .filter_map(|(idx, stats)| {
155 if stats.is_empty() {
156 None
157 } else {
158 Some((idx, stats.percentile(0.99)))
159 }
160 })
161 .max_by_key(|(_, dur)| dur.as_nanos())
162}
163
164fn task_state_label(state: &CuTaskState) -> &'static str {
165 match state {
166 CuTaskState::Start => "start",
167 CuTaskState::Preprocess => "pre",
168 CuTaskState::Process => "process",
169 CuTaskState::Postprocess => "post",
170 CuTaskState::Stop => "stop",
171 }
172}
173
174impl CuMonitor for CuLogMon {
175 fn new(_config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self> {
176 #[cfg(target_os = "none")]
177 info!("CuLogMon::new: task_count={}", taskids.len());
178 let window = WindowState::new(taskids.len());
179 #[cfg(target_os = "none")]
180 info!("CuLogMon::new: window ready");
181 Ok(Self {
182 taskids,
183 clock: Mutex::new(None),
184 window: Mutex::new(window),
185 })
186 }
187
188 fn start(&mut self, clock: &RobotClock) -> CuResult<()> {
189 *self.clock.lock() = Some(clock.clone());
190 let mut window = self.window.lock();
191 window.last_report_at = Some(clock.recent());
192 info!("cu_logmon started ({} tasks)", self.taskids.len());
193 Ok(())
194 }
195
196 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()> {
197 let Some(clock) = self.clock.lock().clone() else {
198 return Ok(());
199 };
200
201 let call_start = clock.recent();
202
203 let snapshot = {
204 let mut window = self.window.lock();
205 window.last_report_at.get_or_insert(call_start);
206
207 window.total_copperlists = window.total_copperlists.saturating_add(1);
208 window.window_copperlists = window.window_copperlists.saturating_add(1);
209
210 if let Some(latency) = end_to_end_latency(msgs) {
211 window.end_to_end.record(latency);
212 }
213
214 for (idx, meta) in msgs.iter().enumerate() {
215 if let Some(task_stat) = window.per_task.get_mut(idx)
216 && let Some(duration) = task_duration(meta)
217 {
218 task_stat.record(duration);
219 }
220 }
221
222 let snapshot = self.compute_snapshot(&window, call_start);
223 if snapshot.is_some() {
224 window.reset_window(call_start);
225 }
226 snapshot
227 };
228
229 if let Some(snapshot) = snapshot {
230 let log_start = clock.recent();
231 let use_color = cfg!(feature = "color_log");
232 let base = format!(
233 "[CL {}] rate {}.{} Hz | slowest {} {}us | e2e p50 {}us p90 {}us p99 {}us max {}us | log_overhead {}us",
234 snapshot.copperlist_index,
235 snapshot.rate_whole,
236 snapshot.rate_tenths,
237 snapshot.slowest_task,
238 snapshot.slowest_task_p99_us,
239 snapshot.e2e_p50_us,
240 snapshot.e2e_p90_us,
241 snapshot.e2e_p99_us,
242 snapshot.e2e_max_us,
243 snapshot.log_overhead_us,
244 );
245 if use_color {
246 const CL_COLOR: &str = "\x1b[94m"; const LABEL_COLOR: &str = "\x1b[92m"; const SUBLABEL_COLOR: &str = "\x1b[93m"; const TASK_NAME_COLOR: &str = "\x1b[38;5;208m"; const RESET: &str = "\x1b[0m";
252 let colored = format!(
253 "[{cl_color}CL {cl}{reset}] {label}rate{reset} {rate_whole}.{rate_tenths} Hz | {label}slowest{reset} {task_color}{slow_task}{reset} {slow_p99}us | {label}e2e{reset} {sublabel}p50{reset} {p50}us {sublabel}p90{reset} {p90}us {sublabel}p99{reset} {p99}us {sublabel}max{reset} {max}us | {label}log_overhead{reset} {log_overhead}us",
254 cl_color = CL_COLOR,
255 label = LABEL_COLOR,
256 sublabel = SUBLABEL_COLOR,
257 task_color = TASK_NAME_COLOR,
258 reset = RESET,
259 cl = snapshot.copperlist_index,
260 rate_whole = snapshot.rate_whole,
261 rate_tenths = snapshot.rate_tenths,
262 slow_task = snapshot.slowest_task,
263 slow_p99 = snapshot.slowest_task_p99_us,
264 p50 = snapshot.e2e_p50_us,
265 p90 = snapshot.e2e_p90_us,
266 p99 = snapshot.e2e_p99_us,
267 max = snapshot.e2e_max_us,
268 log_overhead = snapshot.log_overhead_us,
269 );
270 info!("{}", &colored);
271 } else {
272 info!("{}", &base);
273 }
274 let log_end = clock.recent();
275 self.window.lock().last_log_duration = log_end - log_start;
276 }
277
278 Ok(())
279 }
280
281 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision {
282 let task_name = self.taskids.get(taskid).copied().unwrap_or("<??>");
283 error!(
284 "Task {} @ {}: Error: {}.",
285 task_name,
286 task_state_label(&step),
287 error,
288 );
289 Decision::Ignore
290 }
291}