1use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, CuGraph, Flavor, NodeId};
6use crate::cutask::CuMsgMetadata;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10#[cfg(all(feature = "std", debug_assertions))]
11use cu29_log_runtime::{
12 format_message_only, register_live_log_listener, unregister_live_log_listener,
13};
14use cu29_traits::{CuError, CuResult};
15use serde_derive::{Deserialize, Serialize};
16
17#[cfg(not(feature = "std"))]
18extern crate alloc;
19
20#[cfg(feature = "std")]
21use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
22
23#[cfg(not(feature = "std"))]
24use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
25
26#[cfg(not(feature = "std"))]
27mod imp {
28 pub use alloc::alloc::{GlobalAlloc, Layout};
29 pub use core::sync::atomic::{AtomicUsize, Ordering};
30 pub use libm::sqrt;
31}
32
33#[cfg(feature = "std")]
34mod imp {
35 #[cfg(feature = "memory_monitoring")]
36 use super::CountingAlloc;
37 #[cfg(feature = "memory_monitoring")]
38 pub use std::alloc::System;
39 pub use std::alloc::{GlobalAlloc, Layout};
40 pub use std::sync::atomic::{AtomicUsize, Ordering};
41 #[cfg(feature = "memory_monitoring")]
42 #[global_allocator]
43 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
44}
45
46use imp::*;
47
48#[cfg(all(feature = "std", debug_assertions))]
49fn format_timestamp(time: CuDuration) -> String {
50 let nanos = time.as_nanos();
52 let total_seconds = nanos / 1_000_000_000;
53 let hours = total_seconds / 3600;
54 let minutes = (total_seconds / 60) % 60;
55 let seconds = total_seconds % 60;
56 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
57 format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
58}
59
60#[derive(Debug, Serialize, Deserialize)]
62pub enum CuTaskState {
63 Start,
64 Preprocess,
65 Process,
66 Postprocess,
67 Stop,
68}
69
70#[derive(Debug)]
72pub enum Decision {
73 Abort, Ignore, Shutdown, }
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub enum ComponentKind {
80 Task,
81 Bridge,
82}
83
84#[derive(Debug, Clone)]
85pub struct MonitorNode {
86 pub id: String,
87 pub type_name: Option<String>,
88 pub kind: ComponentKind,
89 pub inputs: Vec<String>,
91 pub outputs: Vec<String>,
93}
94
95#[derive(Debug, Clone)]
96pub struct MonitorConnection {
97 pub src: String,
98 pub src_port: Option<String>,
99 pub dst: String,
100 pub dst_port: Option<String>,
101 pub msg: String,
102}
103
104#[derive(Debug, Clone, Default)]
105pub struct MonitorTopology {
106 pub nodes: Vec<MonitorNode>,
107 pub connections: Vec<MonitorConnection>,
108}
109
110#[derive(Debug, Clone, Copy, Default)]
111pub struct CopperListInfo {
112 pub size_bytes: usize,
113 pub count: usize,
114}
115
116impl CopperListInfo {
117 pub const fn new(size_bytes: usize, count: usize) -> Self {
118 Self { size_bytes, count }
119 }
120}
121
122#[derive(Debug, Clone, Copy, Default)]
124pub struct CopperListIoStats {
125 pub raw_culist_bytes: u64,
127 pub handle_bytes: u64,
129 pub encoded_culist_bytes: u64,
131 pub keyframe_bytes: u64,
133 pub structured_log_bytes_total: u64,
135 pub culistid: u32,
137}
138
139pub trait CuPayloadSize {
142 fn raw_bytes(&self) -> usize {
144 core::mem::size_of_val(self)
145 }
146
147 fn handle_bytes(&self) -> usize {
149 0
150 }
151}
152
153impl<T> CuPayloadSize for T
154where
155 T: crate::cutask::CuMsgPayload,
156{
157 fn raw_bytes(&self) -> usize {
158 core::mem::size_of::<T>()
159 }
160}
161
162#[derive(Default, Debug, Clone, Copy)]
163struct NodeIoUsage {
164 has_incoming: bool,
165 has_outgoing: bool,
166}
167
168fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
169 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
170 edge_ids.sort();
171
172 let mut outputs = Vec::new();
173 let mut seen = Vec::new();
174 let mut port_idx = 0usize;
175 for edge_id in edge_ids {
176 let Some(edge) = graph.edge(edge_id) else {
177 continue;
178 };
179 if seen.iter().any(|msg| msg == &edge.msg) {
180 continue;
181 }
182 seen.push(edge.msg.clone());
183 let mut port_label = String::from("out");
184 port_label.push_str(&port_idx.to_string());
185 port_label.push_str(": ");
186 port_label.push_str(edge.msg.as_str());
187 outputs.push((edge.msg.clone(), port_label));
188 port_idx += 1;
189 }
190 outputs
191}
192
193pub fn build_monitor_topology(
195 config: &CuConfig,
196 mission: Option<&str>,
197) -> CuResult<MonitorTopology> {
198 let graph = config.get_graph(mission)?;
199 let mut nodes: Map<String, MonitorNode> = Map::new();
200 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
201 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
202
203 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
204 for bridge in &config.bridges {
205 bridge_lookup.insert(bridge.id.as_str(), bridge);
206 }
207
208 for cnx in graph.edges() {
209 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
210 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
211 }
212
213 for (_, node) in graph.get_all_nodes() {
214 let kind = match node.get_flavor() {
215 Flavor::Bridge => ComponentKind::Bridge,
216 _ => ComponentKind::Task,
217 };
218 let node_id = node.get_id();
219
220 let mut inputs = Vec::new();
221 let mut outputs = Vec::new();
222 if kind == ComponentKind::Bridge {
223 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
224 for ch in &bridge.channels {
225 match ch {
226 BridgeChannelConfigRepresentation::Rx { id, .. } => {
227 outputs.push(id.clone())
228 }
229 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
230 }
231 }
232 }
233 } else {
234 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
235 if usage.has_incoming || !usage.has_outgoing {
236 inputs.push("in".to_string());
237 }
238 if usage.has_outgoing {
239 if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
240 let ports = collect_output_ports(graph, node_idx);
241 let mut port_map: Map<String, String> = Map::new();
242 for (msg_type, label) in ports {
243 port_map.insert(msg_type, label.clone());
244 outputs.push(label);
245 }
246 output_port_lookup.insert(node_id.clone(), port_map);
247 }
248 } else if !usage.has_incoming {
249 outputs.push("out".to_string());
250 }
251 }
252
253 nodes.insert(
254 node_id.clone(),
255 MonitorNode {
256 id: node_id,
257 type_name: Some(node.get_type().to_string()),
258 kind,
259 inputs,
260 outputs,
261 },
262 );
263 }
264
265 let mut connections = Vec::new();
266 for cnx in graph.edges() {
267 let src = cnx.src.clone();
268 let dst = cnx.dst.clone();
269
270 let src_port = cnx.src_channel.clone().or_else(|| {
271 output_port_lookup
272 .get(&src)
273 .and_then(|ports| ports.get(&cnx.msg).cloned())
274 .or_else(|| {
275 nodes
276 .get(&src)
277 .and_then(|node| node.outputs.first().cloned())
278 })
279 });
280 let dst_port = cnx.dst_channel.clone().or_else(|| {
281 nodes
282 .get(&dst)
283 .and_then(|node| node.inputs.first().cloned())
284 });
285
286 connections.push(MonitorConnection {
287 src,
288 src_port,
289 dst,
290 dst_port,
291 msg: cnx.msg.clone(),
292 });
293 }
294
295 Ok(MonitorTopology {
296 nodes: nodes.into_values().collect(),
297 connections,
298 })
299}
300
301pub trait CuMonitor: Sized {
303 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
304 where
305 Self: Sized;
306
307 fn set_topology(&mut self, _topology: MonitorTopology) {}
308
309 fn set_copperlist_info(&mut self, _info: CopperListInfo) {}
310
311 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
312 Ok(())
313 }
314
315 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
317
318 fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
320
321 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
323
324 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
326 Ok(())
327 }
328}
329
330pub struct NoMonitor {}
333impl CuMonitor for NoMonitor {
334 fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
335 Ok(NoMonitor {})
336 }
337
338 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
339 #[cfg(all(feature = "std", debug_assertions))]
340 register_live_log_listener(|entry, format_str, param_names| {
341 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
342 let named: Map<String, String> = param_names
343 .iter()
344 .zip(params.iter())
345 .map(|(k, v)| (k.to_string(), v.clone()))
346 .collect();
347
348 if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
349 let ts = format_timestamp(entry.time);
350 println!("{} [{:?}] {}", ts, entry.level, msg);
351 }
352 });
353 Ok(())
354 }
355
356 fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
357 Ok(())
359 }
360
361 fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
362 Decision::Ignore
364 }
365
366 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
367 #[cfg(all(feature = "std", debug_assertions))]
368 unregister_live_log_listener();
369 Ok(())
370 }
371}
372
373pub struct CountingAlloc<A: GlobalAlloc> {
375 inner: A,
376 allocated: AtomicUsize,
377 deallocated: AtomicUsize,
378}
379
380impl<A: GlobalAlloc> CountingAlloc<A> {
381 pub const fn new(inner: A) -> Self {
382 CountingAlloc {
383 inner,
384 allocated: AtomicUsize::new(0),
385 deallocated: AtomicUsize::new(0),
386 }
387 }
388
389 pub fn allocated(&self) -> usize {
390 self.allocated.load(Ordering::SeqCst)
391 }
392
393 pub fn deallocated(&self) -> usize {
394 self.deallocated.load(Ordering::SeqCst)
395 }
396
397 pub fn reset(&self) {
398 self.allocated.store(0, Ordering::SeqCst);
399 self.deallocated.store(0, Ordering::SeqCst);
400 }
401}
402
403unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
405 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
407 let p = unsafe { self.inner.alloc(layout) };
409 if !p.is_null() {
410 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
411 }
412 p
413 }
414
415 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
417 unsafe { self.inner.dealloc(ptr, layout) }
419 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
420 }
421}
422
423#[cfg(feature = "memory_monitoring")]
425pub struct ScopedAllocCounter {
426 bf_allocated: usize,
427 bf_deallocated: usize,
428}
429
430#[cfg(feature = "memory_monitoring")]
431impl Default for ScopedAllocCounter {
432 fn default() -> Self {
433 Self::new()
434 }
435}
436
437#[cfg(feature = "memory_monitoring")]
438impl ScopedAllocCounter {
439 pub fn new() -> Self {
440 ScopedAllocCounter {
441 bf_allocated: GLOBAL.allocated(),
442 bf_deallocated: GLOBAL.deallocated(),
443 }
444 }
445
446 pub fn allocated(&self) -> usize {
458 GLOBAL.allocated() - self.bf_allocated
459 }
460
461 pub fn deallocated(&self) -> usize {
474 GLOBAL.deallocated() - self.bf_deallocated
475 }
476}
477
478#[cfg(feature = "memory_monitoring")]
480impl Drop for ScopedAllocCounter {
481 fn drop(&mut self) {
482 let _allocated = GLOBAL.allocated() - self.bf_allocated;
483 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
484 }
491}
492
493#[cfg(feature = "std")]
494const BUCKET_COUNT: usize = 1024;
495#[cfg(not(feature = "std"))]
496const BUCKET_COUNT: usize = 256;
497
498#[derive(Debug, Clone)]
501pub struct LiveStatistics {
502 buckets: [u64; BUCKET_COUNT],
503 min_val: u64,
504 max_val: u64,
505 sum: u64,
506 sum_sq: u64,
507 count: u64,
508 max_value: u64,
509}
510
511impl LiveStatistics {
512 pub fn new_with_max(max_value: u64) -> Self {
532 LiveStatistics {
533 buckets: [0; BUCKET_COUNT],
534 min_val: u64::MAX,
535 max_val: 0,
536 sum: 0,
537 sum_sq: 0,
538 count: 0,
539 max_value,
540 }
541 }
542
543 #[inline]
544 fn value_to_bucket(&self, value: u64) -> usize {
545 if value >= self.max_value {
546 BUCKET_COUNT - 1
547 } else {
548 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
549 }
550 }
551
552 #[inline]
553 pub fn min(&self) -> u64 {
554 if self.count == 0 { 0 } else { self.min_val }
555 }
556
557 #[inline]
558 pub fn max(&self) -> u64 {
559 self.max_val
560 }
561
562 #[inline]
563 pub fn mean(&self) -> f64 {
564 if self.count == 0 {
565 0.0
566 } else {
567 self.sum as f64 / self.count as f64
568 }
569 }
570
571 #[inline]
572 pub fn stdev(&self) -> f64 {
573 if self.count == 0 {
574 return 0.0;
575 }
576 let mean = self.mean();
577 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
578 if variance < 0.0 {
579 return 0.0;
580 }
581 #[cfg(feature = "std")]
582 return variance.sqrt();
583 #[cfg(not(feature = "std"))]
584 return sqrt(variance);
585 }
586
587 #[inline]
588 pub fn percentile(&self, percentile: f64) -> u64 {
589 if self.count == 0 {
590 return 0;
591 }
592
593 let target_count = (self.count as f64 * percentile) as u64;
594 let mut accumulated = 0u64;
595
596 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
597 accumulated += bucket_count;
598 if accumulated >= target_count {
599 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
601 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
602 let bucket_fraction = if bucket_count > 0 {
603 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
604 } else {
605 0.5
606 };
607 return bucket_start
608 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
609 }
610 }
611
612 self.max_val
613 }
614
615 #[inline]
617 pub fn record(&mut self, value: u64) {
618 if value < self.min_val {
619 self.min_val = value;
620 }
621 if value > self.max_val {
622 self.max_val = value;
623 }
624 self.sum += value;
625 self.sum_sq += value * value;
626 self.count += 1;
627
628 let bucket = self.value_to_bucket(value);
629 self.buckets[bucket] += 1;
630 }
631
632 #[inline]
633 pub fn len(&self) -> u64 {
634 self.count
635 }
636
637 #[inline]
638 pub fn is_empty(&self) -> bool {
639 self.count == 0
640 }
641
642 #[inline]
643 pub fn reset(&mut self) {
644 self.buckets.fill(0);
645 self.min_val = u64::MAX;
646 self.max_val = 0;
647 self.sum = 0;
648 self.sum_sq = 0;
649 self.count = 0;
650 }
651}
652
653#[derive(Debug, Clone)]
656pub struct CuDurationStatistics {
657 bare: LiveStatistics,
658 jitter: LiveStatistics,
659 last_value: CuDuration,
660}
661
662impl CuDurationStatistics {
663 pub fn new(max: CuDuration) -> Self {
664 let CuDuration(max) = max;
665 CuDurationStatistics {
666 bare: LiveStatistics::new_with_max(max),
667 jitter: LiveStatistics::new_with_max(max),
668 last_value: CuDuration::default(),
669 }
670 }
671
672 #[inline]
673 pub fn min(&self) -> CuDuration {
674 CuDuration(self.bare.min())
675 }
676
677 #[inline]
678 pub fn max(&self) -> CuDuration {
679 CuDuration(self.bare.max())
680 }
681
682 #[inline]
683 pub fn mean(&self) -> CuDuration {
684 CuDuration(self.bare.mean() as u64) }
686
687 #[inline]
688 pub fn percentile(&self, percentile: f64) -> CuDuration {
689 CuDuration(self.bare.percentile(percentile))
690 }
691
692 #[inline]
693 pub fn stddev(&self) -> CuDuration {
694 CuDuration(self.bare.stdev() as u64)
695 }
696
697 #[inline]
698 pub fn len(&self) -> u64 {
699 self.bare.len()
700 }
701
702 #[inline]
703 pub fn is_empty(&self) -> bool {
704 self.bare.len() == 0
705 }
706
707 #[inline]
708 pub fn jitter_min(&self) -> CuDuration {
709 CuDuration(self.jitter.min())
710 }
711
712 #[inline]
713 pub fn jitter_max(&self) -> CuDuration {
714 CuDuration(self.jitter.max())
715 }
716
717 #[inline]
718 pub fn jitter_mean(&self) -> CuDuration {
719 CuDuration(self.jitter.mean() as u64)
720 }
721
722 #[inline]
723 pub fn jitter_stddev(&self) -> CuDuration {
724 CuDuration(self.jitter.stdev() as u64)
725 }
726
727 #[inline]
728 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
729 CuDuration(self.jitter.percentile(percentile))
730 }
731
732 #[inline]
733 pub fn record(&mut self, value: CuDuration) {
734 let CuDuration(nanos) = value;
735 if self.bare.is_empty() {
736 self.bare.record(nanos);
737 self.last_value = value;
738 return;
739 }
740 self.bare.record(nanos);
741 let CuDuration(last_nanos) = self.last_value;
742 self.jitter.record(nanos.abs_diff(last_nanos));
743 self.last_value = value;
744 }
745
746 #[inline]
747 pub fn reset(&mut self) {
748 self.bare.reset();
749 self.jitter.reset();
750 }
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756
757 #[test]
758 fn test_live_statistics_percentiles() {
759 let mut stats = LiveStatistics::new_with_max(1000);
760
761 for i in 0..100 {
763 stats.record(i);
764 }
765
766 assert_eq!(stats.len(), 100);
767 assert_eq!(stats.min(), 0);
768 assert_eq!(stats.max(), 99);
769 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
773 let p90 = stats.percentile(0.90);
774 let p95 = stats.percentile(0.95);
775 let p99 = stats.percentile(0.99);
776
777 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
779 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
780 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
781 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
782 }
783
784 #[test]
785 fn test_duration_stats() {
786 let mut stats = CuDurationStatistics::new(CuDuration(1000));
787 stats.record(CuDuration(100));
788 stats.record(CuDuration(200));
789 stats.record(CuDuration(500));
790 stats.record(CuDuration(400));
791 assert_eq!(stats.min(), CuDuration(100));
792 assert_eq!(stats.max(), CuDuration(500));
793 assert_eq!(stats.mean(), CuDuration(300));
794 assert_eq!(stats.len(), 4);
795 assert_eq!(stats.jitter.len(), 3);
796 assert_eq!(stats.jitter_min(), CuDuration(100));
797 assert_eq!(stats.jitter_max(), CuDuration(300));
798 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
799 stats.reset();
800 assert_eq!(stats.len(), 0);
801 }
802}