1use crate::config::CuConfig;
5use crate::config::{BridgeChannelConfigRepresentation, BridgeConfig, CuGraph, Flavor, NodeId};
6use crate::cutask::CuMsgMetadata;
7use cu29_clock::{CuDuration, RobotClock};
8#[allow(unused_imports)]
9use cu29_log::CuLogLevel;
10use cu29_traits::{CuError, CuResult};
11use serde_derive::{Deserialize, Serialize};
12
13#[cfg(not(feature = "std"))]
14extern crate alloc;
15
16#[cfg(feature = "std")]
17use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
18
19#[cfg(not(feature = "std"))]
20use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
21
22#[cfg(not(feature = "std"))]
23mod imp {
24 pub use alloc::alloc::{GlobalAlloc, Layout};
25 pub use core::sync::atomic::{AtomicUsize, Ordering};
26 pub use libm::sqrt;
27}
28
29#[cfg(feature = "std")]
30mod imp {
31 #[cfg(feature = "memory_monitoring")]
32 use super::CountingAlloc;
33 #[cfg(feature = "memory_monitoring")]
34 pub use std::alloc::System;
35 pub use std::alloc::{GlobalAlloc, Layout};
36 pub use std::sync::atomic::{AtomicUsize, Ordering};
37 #[cfg(feature = "memory_monitoring")]
38 #[global_allocator]
39 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
40}
41
42use imp::*;
43
44#[derive(Debug, Serialize, Deserialize)]
46pub enum CuTaskState {
47 Start,
48 Preprocess,
49 Process,
50 Postprocess,
51 Stop,
52}
53
54#[derive(Debug)]
56pub enum Decision {
57 Abort, Ignore, Shutdown, }
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63pub enum ComponentKind {
64 Task,
65 Bridge,
66}
67
68#[derive(Debug, Clone)]
69pub struct MonitorNode {
70 pub id: String,
71 pub type_name: Option<String>,
72 pub kind: ComponentKind,
73 pub inputs: Vec<String>,
75 pub outputs: Vec<String>,
77}
78
79#[derive(Debug, Clone)]
80pub struct MonitorConnection {
81 pub src: String,
82 pub src_port: Option<String>,
83 pub dst: String,
84 pub dst_port: Option<String>,
85 pub msg: String,
86}
87
88#[derive(Debug, Clone, Default)]
89pub struct MonitorTopology {
90 pub nodes: Vec<MonitorNode>,
91 pub connections: Vec<MonitorConnection>,
92}
93
94#[derive(Default, Debug, Clone, Copy)]
95struct NodeIoUsage {
96 has_incoming: bool,
97 has_outgoing: bool,
98}
99
100fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
101 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
102 edge_ids.sort();
103
104 let mut outputs = Vec::new();
105 let mut seen = Vec::new();
106 let mut port_idx = 0usize;
107 for edge_id in edge_ids {
108 let Some(edge) = graph.edge(edge_id) else {
109 continue;
110 };
111 if seen.iter().any(|msg| msg == &edge.msg) {
112 continue;
113 }
114 seen.push(edge.msg.clone());
115 let mut port_label = String::from("out");
116 port_label.push_str(&port_idx.to_string());
117 port_label.push_str(": ");
118 port_label.push_str(edge.msg.as_str());
119 outputs.push((edge.msg.clone(), port_label));
120 port_idx += 1;
121 }
122 outputs
123}
124
125pub fn build_monitor_topology(
127 config: &CuConfig,
128 mission: Option<&str>,
129) -> CuResult<MonitorTopology> {
130 let graph = config.get_graph(mission)?;
131 let mut nodes: Map<String, MonitorNode> = Map::new();
132 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
133 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
134
135 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
136 for bridge in &config.bridges {
137 bridge_lookup.insert(bridge.id.as_str(), bridge);
138 }
139
140 for cnx in graph.edges() {
141 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
142 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
143 }
144
145 for (_, node) in graph.get_all_nodes() {
146 let kind = match node.get_flavor() {
147 Flavor::Bridge => ComponentKind::Bridge,
148 _ => ComponentKind::Task,
149 };
150 let node_id = node.get_id();
151
152 let mut inputs = Vec::new();
153 let mut outputs = Vec::new();
154 if kind == ComponentKind::Bridge {
155 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
156 for ch in &bridge.channels {
157 match ch {
158 BridgeChannelConfigRepresentation::Rx { id, .. } => {
159 outputs.push(id.clone())
160 }
161 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
162 }
163 }
164 }
165 } else {
166 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
167 if usage.has_incoming || !usage.has_outgoing {
168 inputs.push("in".to_string());
169 }
170 if usage.has_outgoing {
171 if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
172 let ports = collect_output_ports(graph, node_idx);
173 let mut port_map: Map<String, String> = Map::new();
174 for (msg_type, label) in ports {
175 port_map.insert(msg_type, label.clone());
176 outputs.push(label);
177 }
178 output_port_lookup.insert(node_id.clone(), port_map);
179 }
180 } else if !usage.has_incoming {
181 outputs.push("out".to_string());
182 }
183 }
184
185 nodes.insert(
186 node_id.clone(),
187 MonitorNode {
188 id: node_id,
189 type_name: Some(node.get_type().to_string()),
190 kind,
191 inputs,
192 outputs,
193 },
194 );
195 }
196
197 let mut connections = Vec::new();
198 for cnx in graph.edges() {
199 let src = cnx.src.clone();
200 let dst = cnx.dst.clone();
201
202 let src_port = cnx.src_channel.clone().or_else(|| {
203 output_port_lookup
204 .get(&src)
205 .and_then(|ports| ports.get(&cnx.msg).cloned())
206 .or_else(|| {
207 nodes
208 .get(&src)
209 .and_then(|node| node.outputs.first().cloned())
210 })
211 });
212 let dst_port = cnx.dst_channel.clone().or_else(|| {
213 nodes
214 .get(&dst)
215 .and_then(|node| node.inputs.first().cloned())
216 });
217
218 connections.push(MonitorConnection {
219 src,
220 src_port,
221 dst,
222 dst_port,
223 msg: cnx.msg.clone(),
224 });
225 }
226
227 Ok(MonitorTopology {
228 nodes: nodes.into_values().collect(),
229 connections,
230 })
231}
232
233pub trait CuMonitor: Sized {
235 fn new(config: &CuConfig, taskids: &'static [&'static str]) -> CuResult<Self>
236 where
237 Self: Sized;
238
239 fn set_topology(&mut self, _topology: MonitorTopology) {}
240
241 fn start(&mut self, _clock: &RobotClock) -> CuResult<()> {
242 Ok(())
243 }
244
245 fn process_copperlist(&self, msgs: &[&CuMsgMetadata]) -> CuResult<()>;
247
248 fn process_error(&self, taskid: usize, step: CuTaskState, error: &CuError) -> Decision;
250
251 fn stop(&mut self, _clock: &RobotClock) -> CuResult<()> {
253 Ok(())
254 }
255}
256
257pub struct NoMonitor {}
260impl CuMonitor for NoMonitor {
261 fn new(_config: &CuConfig, _taskids: &'static [&'static str]) -> CuResult<Self> {
262 Ok(NoMonitor {})
263 }
264
265 fn process_copperlist(&self, _msgs: &[&CuMsgMetadata]) -> CuResult<()> {
266 Ok(())
268 }
269
270 fn process_error(&self, _taskid: usize, _step: CuTaskState, _error: &CuError) -> Decision {
271 Decision::Ignore
273 }
274}
275
276pub struct CountingAlloc<A: GlobalAlloc> {
278 inner: A,
279 allocated: AtomicUsize,
280 deallocated: AtomicUsize,
281}
282
283impl<A: GlobalAlloc> CountingAlloc<A> {
284 pub const fn new(inner: A) -> Self {
285 CountingAlloc {
286 inner,
287 allocated: AtomicUsize::new(0),
288 deallocated: AtomicUsize::new(0),
289 }
290 }
291
292 pub fn allocated(&self) -> usize {
293 self.allocated.load(Ordering::SeqCst)
294 }
295
296 pub fn deallocated(&self) -> usize {
297 self.deallocated.load(Ordering::SeqCst)
298 }
299
300 pub fn reset(&self) {
301 self.allocated.store(0, Ordering::SeqCst);
302 self.deallocated.store(0, Ordering::SeqCst);
303 }
304}
305
306unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
307 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
308 let p = unsafe { self.inner.alloc(layout) };
309 if !p.is_null() {
310 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
311 }
312 p
313 }
314
315 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
316 unsafe {
317 self.inner.dealloc(ptr, layout);
318 }
319 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
320 }
321}
322
323#[cfg(feature = "memory_monitoring")]
325pub struct ScopedAllocCounter {
326 bf_allocated: usize,
327 bf_deallocated: usize,
328}
329
330#[cfg(feature = "memory_monitoring")]
331impl Default for ScopedAllocCounter {
332 fn default() -> Self {
333 Self::new()
334 }
335}
336
337#[cfg(feature = "memory_monitoring")]
338impl ScopedAllocCounter {
339 pub fn new() -> Self {
340 ScopedAllocCounter {
341 bf_allocated: GLOBAL.allocated(),
342 bf_deallocated: GLOBAL.deallocated(),
343 }
344 }
345
346 pub fn allocated(&self) -> usize {
358 GLOBAL.allocated() - self.bf_allocated
359 }
360
361 pub fn deallocated(&self) -> usize {
374 GLOBAL.deallocated() - self.bf_deallocated
375 }
376}
377
378#[cfg(feature = "memory_monitoring")]
380impl Drop for ScopedAllocCounter {
381 fn drop(&mut self) {
382 let _allocated = GLOBAL.allocated() - self.bf_allocated;
383 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
384 }
391}
392
393#[cfg(feature = "std")]
394const BUCKET_COUNT: usize = 1024;
395#[cfg(not(feature = "std"))]
396const BUCKET_COUNT: usize = 256;
397
398#[derive(Debug, Clone)]
401pub struct LiveStatistics {
402 buckets: [u64; BUCKET_COUNT],
403 min_val: u64,
404 max_val: u64,
405 sum: u64,
406 sum_sq: u64,
407 count: u64,
408 max_value: u64,
409}
410
411impl LiveStatistics {
412 pub fn new_with_max(max_value: u64) -> Self {
432 LiveStatistics {
433 buckets: [0; BUCKET_COUNT],
434 min_val: u64::MAX,
435 max_val: 0,
436 sum: 0,
437 sum_sq: 0,
438 count: 0,
439 max_value,
440 }
441 }
442
443 #[inline]
444 fn value_to_bucket(&self, value: u64) -> usize {
445 if value >= self.max_value {
446 BUCKET_COUNT - 1
447 } else {
448 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
449 }
450 }
451
452 #[inline]
453 pub fn min(&self) -> u64 {
454 if self.count == 0 { 0 } else { self.min_val }
455 }
456
457 #[inline]
458 pub fn max(&self) -> u64 {
459 self.max_val
460 }
461
462 #[inline]
463 pub fn mean(&self) -> f64 {
464 if self.count == 0 {
465 0.0
466 } else {
467 self.sum as f64 / self.count as f64
468 }
469 }
470
471 #[inline]
472 pub fn stdev(&self) -> f64 {
473 if self.count == 0 {
474 return 0.0;
475 }
476 let mean = self.mean();
477 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
478 if variance < 0.0 {
479 return 0.0;
480 }
481 #[cfg(feature = "std")]
482 return variance.sqrt();
483 #[cfg(not(feature = "std"))]
484 return sqrt(variance);
485 }
486
487 #[inline]
488 pub fn percentile(&self, percentile: f64) -> u64 {
489 if self.count == 0 {
490 return 0;
491 }
492
493 let target_count = (self.count as f64 * percentile) as u64;
494 let mut accumulated = 0u64;
495
496 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
497 accumulated += bucket_count;
498 if accumulated >= target_count {
499 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
501 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
502 let bucket_fraction = if bucket_count > 0 {
503 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
504 } else {
505 0.5
506 };
507 return bucket_start
508 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
509 }
510 }
511
512 self.max_val
513 }
514
515 #[inline]
517 pub fn record(&mut self, value: u64) {
518 if value < self.min_val {
519 self.min_val = value;
520 }
521 if value > self.max_val {
522 self.max_val = value;
523 }
524 self.sum += value;
525 self.sum_sq += value * value;
526 self.count += 1;
527
528 let bucket = self.value_to_bucket(value);
529 self.buckets[bucket] += 1;
530 }
531
532 #[inline]
533 pub fn len(&self) -> u64 {
534 self.count
535 }
536
537 #[inline]
538 pub fn is_empty(&self) -> bool {
539 self.count == 0
540 }
541
542 #[inline]
543 pub fn reset(&mut self) {
544 self.buckets.fill(0);
545 self.min_val = u64::MAX;
546 self.max_val = 0;
547 self.sum = 0;
548 self.sum_sq = 0;
549 self.count = 0;
550 }
551}
552
553#[derive(Debug, Clone)]
556pub struct CuDurationStatistics {
557 bare: LiveStatistics,
558 jitter: LiveStatistics,
559 last_value: CuDuration,
560}
561
562impl CuDurationStatistics {
563 pub fn new(max: CuDuration) -> Self {
564 let CuDuration(max) = max;
565 CuDurationStatistics {
566 bare: LiveStatistics::new_with_max(max),
567 jitter: LiveStatistics::new_with_max(max),
568 last_value: CuDuration::default(),
569 }
570 }
571
572 #[inline]
573 pub fn min(&self) -> CuDuration {
574 CuDuration(self.bare.min())
575 }
576
577 #[inline]
578 pub fn max(&self) -> CuDuration {
579 CuDuration(self.bare.max())
580 }
581
582 #[inline]
583 pub fn mean(&self) -> CuDuration {
584 CuDuration(self.bare.mean() as u64) }
586
587 #[inline]
588 pub fn percentile(&self, percentile: f64) -> CuDuration {
589 CuDuration(self.bare.percentile(percentile))
590 }
591
592 #[inline]
593 pub fn stddev(&self) -> CuDuration {
594 CuDuration(self.bare.stdev() as u64)
595 }
596
597 #[inline]
598 pub fn len(&self) -> u64 {
599 self.bare.len()
600 }
601
602 #[inline]
603 pub fn is_empty(&self) -> bool {
604 self.bare.len() == 0
605 }
606
607 #[inline]
608 pub fn jitter_min(&self) -> CuDuration {
609 CuDuration(self.jitter.min())
610 }
611
612 #[inline]
613 pub fn jitter_max(&self) -> CuDuration {
614 CuDuration(self.jitter.max())
615 }
616
617 #[inline]
618 pub fn jitter_mean(&self) -> CuDuration {
619 CuDuration(self.jitter.mean() as u64)
620 }
621
622 #[inline]
623 pub fn jitter_stddev(&self) -> CuDuration {
624 CuDuration(self.jitter.stdev() as u64)
625 }
626
627 #[inline]
628 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
629 CuDuration(self.jitter.percentile(percentile))
630 }
631
632 #[inline]
633 pub fn record(&mut self, value: CuDuration) {
634 let CuDuration(nanos) = value;
635 if self.bare.is_empty() {
636 self.bare.record(nanos);
637 self.last_value = value;
638 return;
639 }
640 self.bare.record(nanos);
641 let CuDuration(last_nanos) = self.last_value;
642 self.jitter.record(nanos.abs_diff(last_nanos));
643 self.last_value = value;
644 }
645
646 #[inline]
647 pub fn reset(&mut self) {
648 self.bare.reset();
649 self.jitter.reset();
650 }
651}
652
653#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[test]
658 fn test_live_statistics_percentiles() {
659 let mut stats = LiveStatistics::new_with_max(1000);
660
661 for i in 0..100 {
663 stats.record(i);
664 }
665
666 assert_eq!(stats.len(), 100);
667 assert_eq!(stats.min(), 0);
668 assert_eq!(stats.max(), 99);
669 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
673 let p90 = stats.percentile(0.90);
674 let p95 = stats.percentile(0.95);
675 let p99 = stats.percentile(0.99);
676
677 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
679 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
680 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
681 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
682 }
683
684 #[test]
685 fn test_duration_stats() {
686 let mut stats = CuDurationStatistics::new(CuDuration(1000));
687 stats.record(CuDuration(100));
688 stats.record(CuDuration(200));
689 stats.record(CuDuration(500));
690 stats.record(CuDuration(400));
691 assert_eq!(stats.min(), CuDuration(100));
692 assert_eq!(stats.max(), CuDuration(500));
693 assert_eq!(stats.mean(), CuDuration(300));
694 assert_eq!(stats.len(), 4);
695 assert_eq!(stats.jitter.len(), 3);
696 assert_eq!(stats.jitter_min(), CuDuration(100));
697 assert_eq!(stats.jitter_max(), CuDuration(300));
698 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
699 stats.reset();
700 assert_eq!(stats.len(), 0);
701 }
702}