1use crate::config::CuConfig;
5use crate::config::{
6 BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7};
8use crate::context::CuContext;
9use crate::cutask::CuMsgMetadata;
10use compact_str::CompactString;
11use cu29_clock::CuDuration;
12#[allow(unused_imports)]
13use cu29_log::CuLogLevel;
14#[cfg(all(feature = "std", debug_assertions))]
15use cu29_log_runtime::{
16 format_message_only, register_live_log_listener, unregister_live_log_listener,
17};
18use cu29_traits::{CuError, CuResult};
19use serde_derive::{Deserialize, Serialize};
20
21#[cfg(not(feature = "std"))]
22extern crate alloc;
23
24#[cfg(feature = "std")]
25use std::sync::Arc;
26#[cfg(feature = "std")]
27use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
28
29#[cfg(not(feature = "std"))]
30use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
31#[cfg(not(target_has_atomic = "64"))]
32use spin::Mutex;
33
34#[cfg(not(feature = "std"))]
35mod imp {
36 pub use alloc::alloc::{GlobalAlloc, Layout};
37 #[cfg(target_has_atomic = "64")]
38 pub use core::sync::atomic::AtomicU64;
39 pub use core::sync::atomic::{AtomicUsize, Ordering};
40 pub use libm::sqrt;
41}
42
43#[cfg(feature = "std")]
44mod imp {
45 #[cfg(feature = "memory_monitoring")]
46 use super::CountingAlloc;
47 #[cfg(feature = "memory_monitoring")]
48 pub use std::alloc::System;
49 pub use std::alloc::{GlobalAlloc, Layout};
50 #[cfg(target_has_atomic = "64")]
51 pub use std::sync::atomic::AtomicU64;
52 pub use std::sync::atomic::{AtomicUsize, Ordering};
53 #[cfg(feature = "memory_monitoring")]
54 #[global_allocator]
55 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
56}
57
58use imp::*;
59
60#[cfg(all(feature = "std", debug_assertions))]
61fn format_timestamp(time: CuDuration) -> String {
62 let nanos = time.as_nanos();
64 let total_seconds = nanos / 1_000_000_000;
65 let hours = total_seconds / 3600;
66 let minutes = (total_seconds / 60) % 60;
67 let seconds = total_seconds % 60;
68 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
69 format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
70}
71
72#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
74pub enum CuComponentState {
75 Start,
76 Preprocess,
77 Process,
78 Postprocess,
79 Stop,
80}
81
82#[repr(transparent)]
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
85pub struct ComponentId(usize);
86
87impl ComponentId {
88 pub const INVALID: Self = Self(usize::MAX);
89
90 #[inline]
91 pub const fn new(index: usize) -> Self {
92 Self(index)
93 }
94
95 #[inline]
96 pub const fn index(self) -> usize {
97 self.0
98 }
99}
100
101impl core::fmt::Display for ComponentId {
102 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
103 self.0.fmt(f)
104 }
105}
106
107impl From<ComponentId> for usize {
108 fn from(value: ComponentId) -> Self {
109 value.index()
110 }
111}
112
113#[repr(transparent)]
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
116pub struct CuListSlot(usize);
117
118impl CuListSlot {
119 #[inline]
120 pub const fn new(index: usize) -> Self {
121 Self(index)
122 }
123
124 #[inline]
125 pub const fn index(self) -> usize {
126 self.0
127 }
128}
129
130impl From<CuListSlot> for usize {
131 fn from(value: CuListSlot) -> Self {
132 value.index()
133 }
134}
135
136#[derive(Debug, Clone, Copy)]
140pub struct CopperListLayout {
141 components: &'static [MonitorComponentMetadata],
142 slot_to_component: &'static [ComponentId],
143}
144
145impl CopperListLayout {
146 #[inline]
147 pub const fn new(
148 components: &'static [MonitorComponentMetadata],
149 slot_to_component: &'static [ComponentId],
150 ) -> Self {
151 Self {
152 components,
153 slot_to_component,
154 }
155 }
156
157 #[inline]
158 pub const fn components(self) -> &'static [MonitorComponentMetadata] {
159 self.components
160 }
161
162 #[inline]
163 pub const fn component_count(self) -> usize {
164 self.components.len()
165 }
166
167 #[inline]
168 pub const fn culist_slot_count(self) -> usize {
169 self.slot_to_component.len()
170 }
171
172 #[inline]
173 pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
174 &self.components[id.index()]
175 }
176
177 #[inline]
178 pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
179 self.slot_to_component[culist_slot.index()]
180 }
181
182 #[inline]
183 pub const fn slot_to_component(self) -> &'static [ComponentId] {
184 self.slot_to_component
185 }
186
187 #[inline]
188 pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
189 CopperListView::new(self, msgs)
190 }
191}
192
193#[derive(Debug, Clone, Copy)]
195pub struct CopperListView<'a> {
196 layout: CopperListLayout,
197 msgs: &'a [&'a CuMsgMetadata],
198}
199
200impl<'a> CopperListView<'a> {
201 #[inline]
202 pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
203 assert_eq!(
204 msgs.len(),
205 layout.culist_slot_count(),
206 "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
207 msgs.len(),
208 layout.culist_slot_count()
209 );
210 Self { layout, msgs }
211 }
212
213 #[inline]
214 pub const fn layout(self) -> CopperListLayout {
215 self.layout
216 }
217
218 #[inline]
219 pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
220 self.msgs
221 }
222
223 #[inline]
224 pub const fn len(self) -> usize {
225 self.msgs.len()
226 }
227
228 #[inline]
229 pub const fn is_empty(self) -> bool {
230 self.msgs.is_empty()
231 }
232
233 #[inline]
234 pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
235 let index = culist_slot.index();
236 CopperListEntry {
237 culist_slot,
238 component_id: self.layout.component_for_slot(culist_slot),
239 msg: self.msgs[index],
240 }
241 }
242
243 pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
244 self.msgs.iter().enumerate().map(move |(idx, msg)| {
245 let culist_slot = CuListSlot::new(idx);
246 CopperListEntry {
247 culist_slot,
248 component_id: self.layout.component_for_slot(culist_slot),
249 msg,
250 }
251 })
252 }
253}
254
255#[derive(Debug, Clone, Copy)]
257pub struct CopperListEntry<'a> {
258 pub culist_slot: CuListSlot,
259 pub component_id: ComponentId,
260 pub msg: &'a CuMsgMetadata,
261}
262
263impl<'a> CopperListEntry<'a> {
264 #[inline]
265 pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
266 layout.component(self.component_id)
267 }
268
269 #[inline]
270 pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
271 layout.component(self.component_id).kind()
272 }
273}
274
275#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
277pub struct ExecutionMarker {
278 pub component_id: ComponentId,
280 pub step: CuComponentState,
282 pub culistid: Option<u64>,
284}
285
286#[derive(Debug)]
292pub struct RuntimeExecutionProbe {
293 component_id: AtomicUsize,
294 step: AtomicUsize,
295 #[cfg(target_has_atomic = "64")]
296 culistid: AtomicU64,
297 #[cfg(target_has_atomic = "64")]
298 culistid_present: AtomicUsize,
299 #[cfg(not(target_has_atomic = "64"))]
300 culistid: Mutex<Option<u64>>,
301 sequence: AtomicUsize,
302}
303
304impl Default for RuntimeExecutionProbe {
305 fn default() -> Self {
306 Self {
307 component_id: AtomicUsize::new(ComponentId::INVALID.index()),
308 step: AtomicUsize::new(0),
309 #[cfg(target_has_atomic = "64")]
310 culistid: AtomicU64::new(0),
311 #[cfg(target_has_atomic = "64")]
312 culistid_present: AtomicUsize::new(0),
313 #[cfg(not(target_has_atomic = "64"))]
314 culistid: Mutex::new(None),
315 sequence: AtomicUsize::new(0),
316 }
317 }
318}
319
320impl RuntimeExecutionProbe {
321 #[inline]
322 pub fn record(&self, marker: ExecutionMarker) {
323 self.component_id
324 .store(marker.component_id.index(), Ordering::Relaxed);
325 self.step
326 .store(component_state_to_usize(marker.step), Ordering::Relaxed);
327 #[cfg(target_has_atomic = "64")]
328 match marker.culistid {
329 Some(culistid) => {
330 self.culistid.store(culistid, Ordering::Relaxed);
331 self.culistid_present.store(1, Ordering::Relaxed);
332 }
333 None => {
334 self.culistid_present.store(0, Ordering::Relaxed);
335 }
336 }
337 #[cfg(not(target_has_atomic = "64"))]
338 {
339 *self.culistid.lock() = marker.culistid;
340 }
341 self.sequence.fetch_add(1, Ordering::Release);
342 }
343
344 #[inline]
345 pub fn sequence(&self) -> usize {
346 self.sequence.load(Ordering::Acquire)
347 }
348
349 #[inline]
350 pub fn marker(&self) -> Option<ExecutionMarker> {
351 loop {
354 let seq_before = self.sequence.load(Ordering::Acquire);
355 let component_id = self.component_id.load(Ordering::Relaxed);
356 let step = self.step.load(Ordering::Relaxed);
357 #[cfg(target_has_atomic = "64")]
358 let culistid_present = self.culistid_present.load(Ordering::Relaxed);
359 #[cfg(target_has_atomic = "64")]
360 let culistid_value = self.culistid.load(Ordering::Relaxed);
361 #[cfg(not(target_has_atomic = "64"))]
362 let culistid = *self.culistid.lock();
363 let seq_after = self.sequence.load(Ordering::Acquire);
364 if seq_before == seq_after {
365 if component_id == ComponentId::INVALID.index() {
366 return None;
367 }
368 let step = usize_to_component_state(step);
369 #[cfg(target_has_atomic = "64")]
370 let culistid = if culistid_present == 0 {
371 None
372 } else {
373 Some(culistid_value)
374 };
375 return Some(ExecutionMarker {
376 component_id: ComponentId::new(component_id),
377 step,
378 culistid,
379 });
380 }
381 }
382 }
383}
384
385#[inline]
386const fn component_state_to_usize(step: CuComponentState) -> usize {
387 match step {
388 CuComponentState::Start => 0,
389 CuComponentState::Preprocess => 1,
390 CuComponentState::Process => 2,
391 CuComponentState::Postprocess => 3,
392 CuComponentState::Stop => 4,
393 }
394}
395
396#[inline]
397const fn usize_to_component_state(step: usize) -> CuComponentState {
398 match step {
399 0 => CuComponentState::Start,
400 1 => CuComponentState::Preprocess,
401 2 => CuComponentState::Process,
402 3 => CuComponentState::Postprocess,
403 _ => CuComponentState::Stop,
404 }
405}
406
407#[cfg(feature = "std")]
408pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
409
410#[derive(Debug, Clone)]
415pub struct MonitorExecutionProbe {
416 #[cfg(feature = "std")]
417 inner: Option<ExecutionProbeHandle>,
418}
419
420impl Default for MonitorExecutionProbe {
421 fn default() -> Self {
422 Self::unavailable()
423 }
424}
425
426impl MonitorExecutionProbe {
427 #[cfg(feature = "std")]
428 pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
429 Self {
430 inner: Some(handle),
431 }
432 }
433
434 pub const fn unavailable() -> Self {
435 Self {
436 #[cfg(feature = "std")]
437 inner: None,
438 }
439 }
440
441 pub fn is_available(&self) -> bool {
442 #[cfg(feature = "std")]
443 {
444 self.inner.is_some()
445 }
446 #[cfg(not(feature = "std"))]
447 {
448 false
449 }
450 }
451
452 pub fn marker(&self) -> Option<ExecutionMarker> {
453 #[cfg(feature = "std")]
454 {
455 self.inner.as_ref().and_then(|probe| probe.marker())
456 }
457 #[cfg(not(feature = "std"))]
458 {
459 None
460 }
461 }
462
463 pub fn sequence(&self) -> Option<usize> {
464 #[cfg(feature = "std")]
465 {
466 self.inner.as_ref().map(|probe| probe.sequence())
467 }
468 #[cfg(not(feature = "std"))]
469 {
470 None
471 }
472 }
473}
474
475#[derive(Debug, Clone, Copy, PartialEq, Eq)]
480#[non_exhaustive]
481pub enum ComponentType {
482 Source,
483 Task,
484 Sink,
485 Bridge,
486}
487
488impl ComponentType {
489 pub const fn is_task(self) -> bool {
490 !matches!(self, Self::Bridge)
491 }
492}
493
494#[derive(Debug, Clone, Copy, PartialEq, Eq)]
496pub struct MonitorComponentMetadata {
497 id: &'static str,
498 kind: ComponentType,
499 type_name: Option<&'static str>,
500}
501
502impl MonitorComponentMetadata {
503 pub const fn new(
504 id: &'static str,
505 kind: ComponentType,
506 type_name: Option<&'static str>,
507 ) -> Self {
508 Self {
509 id,
510 kind,
511 type_name,
512 }
513 }
514
515 pub const fn id(&self) -> &'static str {
517 self.id
518 }
519
520 pub const fn kind(&self) -> ComponentType {
521 self.kind
522 }
523
524 pub const fn type_name(&self) -> Option<&'static str> {
526 self.type_name
527 }
528}
529
530#[derive(Debug, Clone)]
535pub struct CuMonitoringMetadata {
536 mission_id: CompactString,
537 layout: CopperListLayout,
538 copperlist_info: CopperListInfo,
539 topology: MonitorTopology,
540 monitor_config: Option<ComponentConfig>,
541}
542
543impl CuMonitoringMetadata {
544 pub fn new(
545 mission_id: CompactString,
546 components: &'static [MonitorComponentMetadata],
547 culist_component_mapping: &'static [ComponentId],
548 copperlist_info: CopperListInfo,
549 topology: MonitorTopology,
550 monitor_config: Option<ComponentConfig>,
551 ) -> CuResult<Self> {
552 Self::validate_components(components)?;
553 Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
554 Ok(Self {
555 mission_id,
556 layout: CopperListLayout::new(components, culist_component_mapping),
557 copperlist_info,
558 topology,
559 monitor_config,
560 })
561 }
562
563 fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
564 let mut seen_bridge = false;
565 for component in components {
566 match component.kind() {
567 component_type if component_type.is_task() && seen_bridge => {
568 return Err(CuError::from(
569 "invalid monitor metadata: task-family components must appear before bridges",
570 ));
571 }
572 ComponentType::Bridge => seen_bridge = true,
573 _ => {}
574 }
575 }
576 Ok(())
577 }
578
579 fn validate_culist_mapping(
580 components_len: usize,
581 culist_component_mapping: &'static [ComponentId],
582 ) -> CuResult<()> {
583 for component_idx in culist_component_mapping {
584 if component_idx.index() >= components_len {
585 return Err(CuError::from(
586 "invalid monitor metadata: culist mapping points past components table",
587 ));
588 }
589 }
590 Ok(())
591 }
592
593 pub fn mission_id(&self) -> &str {
595 self.mission_id.as_str()
596 }
597
598 pub fn components(&self) -> &'static [MonitorComponentMetadata] {
602 self.layout.components()
603 }
604
605 pub const fn component_count(&self) -> usize {
607 self.layout.component_count()
608 }
609
610 pub const fn layout(&self) -> CopperListLayout {
612 self.layout
613 }
614
615 pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
616 self.layout.component(component_id)
617 }
618
619 pub fn component_id(&self, component_id: ComponentId) -> &'static str {
620 self.component(component_id).id()
621 }
622
623 pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
624 self.component(component_id).kind()
625 }
626
627 pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
628 self.layout
629 .components()
630 .iter()
631 .position(|component| component.id() == component_id)
632 .map(ComponentId::new)
633 }
634
635 pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
639 self.layout.slot_to_component()
640 }
641
642 pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
643 self.layout.component_for_slot(culist_slot)
644 }
645
646 pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
647 self.layout.view(msgs)
648 }
649
650 pub const fn copperlist_info(&self) -> CopperListInfo {
651 self.copperlist_info
652 }
653
654 pub fn topology(&self) -> &MonitorTopology {
659 &self.topology
660 }
661
662 pub fn monitor_config(&self) -> Option<&ComponentConfig> {
663 self.monitor_config.as_ref()
664 }
665
666 pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
667 self.monitor_config = monitor_config;
668 self
669 }
670}
671
672#[derive(Debug, Clone, Default)]
676pub struct CuMonitoringRuntime {
677 execution_probe: MonitorExecutionProbe,
678}
679
680impl CuMonitoringRuntime {
681 pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
682 Self { execution_probe }
683 }
684
685 pub const fn unavailable() -> Self {
686 Self::new(MonitorExecutionProbe::unavailable())
687 }
688
689 pub fn execution_probe(&self) -> &MonitorExecutionProbe {
690 &self.execution_probe
691 }
692}
693
694#[derive(Debug)]
696pub enum Decision {
697 Abort, Ignore, Shutdown, }
701
702fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
703 use Decision::{Abort, Ignore, Shutdown};
704 match (lhs, rhs) {
707 (Shutdown, _) | (_, Shutdown) => Shutdown,
708 (Abort, _) | (_, Abort) => Abort,
709 _ => Ignore,
710 }
711}
712
713#[derive(Debug, Clone)]
714pub struct MonitorNode {
715 pub id: String,
716 pub type_name: Option<String>,
717 pub kind: ComponentType,
718 pub inputs: Vec<String>,
720 pub outputs: Vec<String>,
722}
723
724#[derive(Debug, Clone)]
725pub struct MonitorConnection {
726 pub src: String,
727 pub src_port: Option<String>,
728 pub dst: String,
729 pub dst_port: Option<String>,
730 pub msg: String,
731}
732
733#[derive(Debug, Clone, Default)]
734pub struct MonitorTopology {
735 pub nodes: Vec<MonitorNode>,
736 pub connections: Vec<MonitorConnection>,
737}
738
739#[derive(Debug, Clone, Copy, Default)]
740pub struct CopperListInfo {
741 pub size_bytes: usize,
742 pub count: usize,
743}
744
745impl CopperListInfo {
746 pub const fn new(size_bytes: usize, count: usize) -> Self {
747 Self { size_bytes, count }
748 }
749}
750
751#[derive(Debug, Clone, Copy, Default)]
753pub struct CopperListIoStats {
754 pub raw_culist_bytes: u64,
756 pub handle_bytes: u64,
758 pub encoded_culist_bytes: u64,
760 pub keyframe_bytes: u64,
762 pub structured_log_bytes_total: u64,
764 pub culistid: u64,
766}
767
768pub trait CuPayloadSize {
771 fn raw_bytes(&self) -> usize {
773 core::mem::size_of_val(self)
774 }
775
776 fn handle_bytes(&self) -> usize {
778 0
779 }
780}
781
782impl<T> CuPayloadSize for T
783where
784 T: crate::cutask::CuMsgPayload,
785{
786 fn raw_bytes(&self) -> usize {
787 core::mem::size_of::<T>()
788 }
789}
790
791#[derive(Default, Debug, Clone, Copy)]
792struct NodeIoUsage {
793 has_incoming: bool,
794 has_outgoing: bool,
795}
796
797fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
798 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
799 edge_ids.sort();
800
801 let mut outputs = Vec::new();
802 let mut seen = Vec::new();
803 let mut port_idx = 0usize;
804 for edge_id in edge_ids {
805 let Some(edge) = graph.edge(edge_id) else {
806 continue;
807 };
808 if seen.iter().any(|msg| msg == &edge.msg) {
809 continue;
810 }
811 seen.push(edge.msg.clone());
812 let mut port_label = String::from("out");
813 port_label.push_str(&port_idx.to_string());
814 port_label.push_str(": ");
815 port_label.push_str(edge.msg.as_str());
816 outputs.push((edge.msg.clone(), port_label));
817 port_idx += 1;
818 }
819 outputs
820}
821
822pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
824 let graph = config.get_graph(Some(mission))?;
825 let mut nodes: Map<String, MonitorNode> = Map::new();
826 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
827 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
828
829 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
830 for bridge in &config.bridges {
831 bridge_lookup.insert(bridge.id.as_str(), bridge);
832 }
833
834 for cnx in graph.edges() {
835 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
836 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
837 }
838
839 for (_, node) in graph.get_all_nodes() {
840 let node_id = node.get_id();
841 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
842 let kind = match node.get_flavor() {
843 Flavor::Bridge => ComponentType::Bridge,
844 _ if !usage.has_incoming && usage.has_outgoing => ComponentType::Source,
845 _ if usage.has_incoming && !usage.has_outgoing => ComponentType::Sink,
846 _ => ComponentType::Task,
847 };
848
849 let mut inputs = Vec::new();
850 let mut outputs = Vec::new();
851 if kind == ComponentType::Bridge {
852 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
853 for ch in &bridge.channels {
854 match ch {
855 BridgeChannelConfigRepresentation::Rx { id, .. } => {
856 outputs.push(id.clone())
857 }
858 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
859 }
860 }
861 }
862 } else {
863 if usage.has_incoming || !usage.has_outgoing {
864 inputs.push("in".to_string());
865 }
866 if usage.has_outgoing {
867 if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
868 let ports = collect_output_ports(graph, node_idx);
869 let mut port_map: Map<String, String> = Map::new();
870 for (msg_type, label) in ports {
871 port_map.insert(msg_type, label.clone());
872 outputs.push(label);
873 }
874 output_port_lookup.insert(node_id.clone(), port_map);
875 }
876 } else if !usage.has_incoming {
877 outputs.push("out".to_string());
878 }
879 }
880
881 nodes.insert(
882 node_id.clone(),
883 MonitorNode {
884 id: node_id,
885 type_name: Some(node.get_type().to_string()),
886 kind,
887 inputs,
888 outputs,
889 },
890 );
891 }
892
893 let mut connections = Vec::new();
894 for cnx in graph.edges() {
895 let src = cnx.src.clone();
896 let dst = cnx.dst.clone();
897
898 let src_port = cnx.src_channel.clone().or_else(|| {
899 output_port_lookup
900 .get(&src)
901 .and_then(|ports| ports.get(&cnx.msg).cloned())
902 .or_else(|| {
903 nodes
904 .get(&src)
905 .and_then(|node| node.outputs.first().cloned())
906 })
907 });
908 let dst_port = cnx.dst_channel.clone().or_else(|| {
909 nodes
910 .get(&dst)
911 .and_then(|node| node.inputs.first().cloned())
912 });
913
914 connections.push(MonitorConnection {
915 src,
916 src_port,
917 dst,
918 dst_port,
919 msg: cnx.msg.clone(),
920 });
921 }
922
923 Ok(MonitorTopology {
924 nodes: nodes.into_values().collect(),
925 connections,
926 })
927}
928
929pub trait CuMonitor: Sized {
949 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
955 where
956 Self: Sized;
957
958 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
960 Ok(())
961 }
962
963 fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
965
966 fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
968
969 fn process_error(
973 &self,
974 component_id: ComponentId,
975 step: CuComponentState,
976 error: &CuError,
977 ) -> Decision;
978
979 fn process_panic(&self, _panic_message: &str) {}
981
982 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
984 Ok(())
985 }
986}
987
988pub struct NoMonitor {}
991impl CuMonitor for NoMonitor {
992 fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
993 Ok(NoMonitor {})
994 }
995
996 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
997 #[cfg(all(feature = "std", debug_assertions))]
998 register_live_log_listener(|entry, format_str, param_names| {
999 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1000 let named: Map<String, String> = param_names
1001 .iter()
1002 .zip(params.iter())
1003 .map(|(k, v)| (k.to_string(), v.clone()))
1004 .collect();
1005
1006 if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1007 let ts = format_timestamp(entry.time);
1008 println!("{} [{:?}] {}", ts, entry.level, msg);
1009 }
1010 });
1011 Ok(())
1012 }
1013
1014 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1015 Ok(())
1017 }
1018
1019 fn process_error(
1020 &self,
1021 _component_id: ComponentId,
1022 _step: CuComponentState,
1023 _error: &CuError,
1024 ) -> Decision {
1025 Decision::Ignore
1027 }
1028
1029 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1030 #[cfg(all(feature = "std", debug_assertions))]
1031 unregister_live_log_listener();
1032 Ok(())
1033 }
1034}
1035
1036macro_rules! impl_monitor_tuple {
1037 ($($idx:tt => $name:ident),+) => {
1038 impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1039 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1040 where
1041 Self: Sized,
1042 {
1043 Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1044 }
1045
1046 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1047 $(self.$idx.start(ctx)?;)+
1048 Ok(())
1049 }
1050
1051 fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1052 $(self.$idx.process_copperlist(ctx, view)?;)+
1053 Ok(())
1054 }
1055
1056 fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1057 $(self.$idx.observe_copperlist_io(stats);)+
1058 }
1059
1060 fn process_error(
1061 &self,
1062 component_id: ComponentId,
1063 step: CuComponentState,
1064 error: &CuError,
1065 ) -> Decision {
1066 let mut decision = Decision::Ignore;
1067 $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1068 decision
1069 }
1070
1071 fn process_panic(&self, panic_message: &str) {
1072 $(self.$idx.process_panic(panic_message);)+
1073 }
1074
1075 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1076 $(self.$idx.stop(ctx)?;)+
1077 Ok(())
1078 }
1079 }
1080 };
1081}
1082
1083impl_monitor_tuple!(0 => M0, 1 => M1);
1084impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1085impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1086impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1087impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1088
1089#[cfg(feature = "std")]
1090pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1091 if let Some(msg) = payload.downcast_ref::<&str>() {
1092 (*msg).to_string()
1093 } else if let Some(msg) = payload.downcast_ref::<String>() {
1094 msg.clone()
1095 } else {
1096 "panic with non-string payload".to_string()
1097 }
1098}
1099
1100pub struct CountingAlloc<A: GlobalAlloc> {
1102 inner: A,
1103 allocated: AtomicUsize,
1104 deallocated: AtomicUsize,
1105}
1106
1107impl<A: GlobalAlloc> CountingAlloc<A> {
1108 pub const fn new(inner: A) -> Self {
1109 CountingAlloc {
1110 inner,
1111 allocated: AtomicUsize::new(0),
1112 deallocated: AtomicUsize::new(0),
1113 }
1114 }
1115
1116 pub fn allocated(&self) -> usize {
1117 self.allocated.load(Ordering::SeqCst)
1118 }
1119
1120 pub fn deallocated(&self) -> usize {
1121 self.deallocated.load(Ordering::SeqCst)
1122 }
1123
1124 pub fn reset(&self) {
1125 self.allocated.store(0, Ordering::SeqCst);
1126 self.deallocated.store(0, Ordering::SeqCst);
1127 }
1128}
1129
1130unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1132 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1134 let p = unsafe { self.inner.alloc(layout) };
1136 if !p.is_null() {
1137 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1138 }
1139 p
1140 }
1141
1142 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1144 unsafe { self.inner.dealloc(ptr, layout) }
1146 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1147 }
1148}
1149
1150#[cfg(feature = "memory_monitoring")]
1152pub struct ScopedAllocCounter {
1153 bf_allocated: usize,
1154 bf_deallocated: usize,
1155}
1156
1157#[cfg(feature = "memory_monitoring")]
1158impl Default for ScopedAllocCounter {
1159 fn default() -> Self {
1160 Self::new()
1161 }
1162}
1163
1164#[cfg(feature = "memory_monitoring")]
1165impl ScopedAllocCounter {
1166 pub fn new() -> Self {
1167 ScopedAllocCounter {
1168 bf_allocated: GLOBAL.allocated(),
1169 bf_deallocated: GLOBAL.deallocated(),
1170 }
1171 }
1172
1173 pub fn allocated(&self) -> usize {
1185 GLOBAL.allocated() - self.bf_allocated
1186 }
1187
1188 pub fn deallocated(&self) -> usize {
1201 GLOBAL.deallocated() - self.bf_deallocated
1202 }
1203}
1204
1205#[cfg(feature = "memory_monitoring")]
1207impl Drop for ScopedAllocCounter {
1208 fn drop(&mut self) {
1209 let _allocated = GLOBAL.allocated() - self.bf_allocated;
1210 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1211 }
1218}
1219
1220#[cfg(feature = "std")]
1221const BUCKET_COUNT: usize = 1024;
1222#[cfg(not(feature = "std"))]
1223const BUCKET_COUNT: usize = 256;
1224
1225#[derive(Debug, Clone)]
1228pub struct LiveStatistics {
1229 buckets: [u64; BUCKET_COUNT],
1230 min_val: u64,
1231 max_val: u64,
1232 sum: u64,
1233 sum_sq: u64,
1234 count: u64,
1235 max_value: u64,
1236}
1237
1238impl LiveStatistics {
1239 pub fn new_with_max(max_value: u64) -> Self {
1259 LiveStatistics {
1260 buckets: [0; BUCKET_COUNT],
1261 min_val: u64::MAX,
1262 max_val: 0,
1263 sum: 0,
1264 sum_sq: 0,
1265 count: 0,
1266 max_value,
1267 }
1268 }
1269
1270 #[inline]
1271 fn value_to_bucket(&self, value: u64) -> usize {
1272 if value >= self.max_value {
1273 BUCKET_COUNT - 1
1274 } else {
1275 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
1276 }
1277 }
1278
1279 #[inline]
1280 pub fn min(&self) -> u64 {
1281 if self.count == 0 { 0 } else { self.min_val }
1282 }
1283
1284 #[inline]
1285 pub fn max(&self) -> u64 {
1286 self.max_val
1287 }
1288
1289 #[inline]
1290 pub fn mean(&self) -> f64 {
1291 if self.count == 0 {
1292 0.0
1293 } else {
1294 self.sum as f64 / self.count as f64
1295 }
1296 }
1297
1298 #[inline]
1299 pub fn stdev(&self) -> f64 {
1300 if self.count == 0 {
1301 return 0.0;
1302 }
1303 let mean = self.mean();
1304 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
1305 if variance < 0.0 {
1306 return 0.0;
1307 }
1308 #[cfg(feature = "std")]
1309 return variance.sqrt();
1310 #[cfg(not(feature = "std"))]
1311 return sqrt(variance);
1312 }
1313
1314 #[inline]
1315 pub fn percentile(&self, percentile: f64) -> u64 {
1316 if self.count == 0 {
1317 return 0;
1318 }
1319
1320 let target_count = (self.count as f64 * percentile) as u64;
1321 let mut accumulated = 0u64;
1322
1323 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
1324 accumulated += bucket_count;
1325 if accumulated >= target_count {
1326 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
1328 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
1329 let bucket_fraction = if bucket_count > 0 {
1330 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
1331 } else {
1332 0.5
1333 };
1334 return bucket_start
1335 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
1336 }
1337 }
1338
1339 self.max_val
1340 }
1341
1342 #[inline]
1344 pub fn record(&mut self, value: u64) {
1345 if value < self.min_val {
1346 self.min_val = value;
1347 }
1348 if value > self.max_val {
1349 self.max_val = value;
1350 }
1351 self.sum += value;
1352 self.sum_sq += value * value;
1353 self.count += 1;
1354
1355 let bucket = self.value_to_bucket(value);
1356 self.buckets[bucket] += 1;
1357 }
1358
1359 #[inline]
1360 pub fn len(&self) -> u64 {
1361 self.count
1362 }
1363
1364 #[inline]
1365 pub fn is_empty(&self) -> bool {
1366 self.count == 0
1367 }
1368
1369 #[inline]
1370 pub fn reset(&mut self) {
1371 self.buckets.fill(0);
1372 self.min_val = u64::MAX;
1373 self.max_val = 0;
1374 self.sum = 0;
1375 self.sum_sq = 0;
1376 self.count = 0;
1377 }
1378}
1379
1380#[derive(Debug, Clone)]
1383pub struct CuDurationStatistics {
1384 bare: LiveStatistics,
1385 jitter: LiveStatistics,
1386 last_value: CuDuration,
1387}
1388
1389impl CuDurationStatistics {
1390 pub fn new(max: CuDuration) -> Self {
1391 let CuDuration(max) = max;
1392 CuDurationStatistics {
1393 bare: LiveStatistics::new_with_max(max),
1394 jitter: LiveStatistics::new_with_max(max),
1395 last_value: CuDuration::default(),
1396 }
1397 }
1398
1399 #[inline]
1400 pub fn min(&self) -> CuDuration {
1401 CuDuration(self.bare.min())
1402 }
1403
1404 #[inline]
1405 pub fn max(&self) -> CuDuration {
1406 CuDuration(self.bare.max())
1407 }
1408
1409 #[inline]
1410 pub fn mean(&self) -> CuDuration {
1411 CuDuration(self.bare.mean() as u64) }
1413
1414 #[inline]
1415 pub fn percentile(&self, percentile: f64) -> CuDuration {
1416 CuDuration(self.bare.percentile(percentile))
1417 }
1418
1419 #[inline]
1420 pub fn stddev(&self) -> CuDuration {
1421 CuDuration(self.bare.stdev() as u64)
1422 }
1423
1424 #[inline]
1425 pub fn len(&self) -> u64 {
1426 self.bare.len()
1427 }
1428
1429 #[inline]
1430 pub fn is_empty(&self) -> bool {
1431 self.bare.len() == 0
1432 }
1433
1434 #[inline]
1435 pub fn jitter_min(&self) -> CuDuration {
1436 CuDuration(self.jitter.min())
1437 }
1438
1439 #[inline]
1440 pub fn jitter_max(&self) -> CuDuration {
1441 CuDuration(self.jitter.max())
1442 }
1443
1444 #[inline]
1445 pub fn jitter_mean(&self) -> CuDuration {
1446 CuDuration(self.jitter.mean() as u64)
1447 }
1448
1449 #[inline]
1450 pub fn jitter_stddev(&self) -> CuDuration {
1451 CuDuration(self.jitter.stdev() as u64)
1452 }
1453
1454 #[inline]
1455 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
1456 CuDuration(self.jitter.percentile(percentile))
1457 }
1458
1459 #[inline]
1460 pub fn record(&mut self, value: CuDuration) {
1461 let CuDuration(nanos) = value;
1462 if self.bare.is_empty() {
1463 self.bare.record(nanos);
1464 self.last_value = value;
1465 return;
1466 }
1467 self.bare.record(nanos);
1468 let CuDuration(last_nanos) = self.last_value;
1469 self.jitter.record(nanos.abs_diff(last_nanos));
1470 self.last_value = value;
1471 }
1472
1473 #[inline]
1474 pub fn reset(&mut self) {
1475 self.bare.reset();
1476 self.jitter.reset();
1477 }
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482 use super::*;
1483 use core::sync::atomic::{AtomicUsize, Ordering};
1484
1485 #[derive(Clone, Copy)]
1486 enum TestDecision {
1487 Ignore,
1488 Abort,
1489 Shutdown,
1490 }
1491
1492 struct TestMonitor {
1493 decision: TestDecision,
1494 copperlist_calls: AtomicUsize,
1495 panic_calls: AtomicUsize,
1496 }
1497
1498 impl TestMonitor {
1499 fn new_with(decision: TestDecision) -> Self {
1500 Self {
1501 decision,
1502 copperlist_calls: AtomicUsize::new(0),
1503 panic_calls: AtomicUsize::new(0),
1504 }
1505 }
1506 }
1507
1508 fn test_metadata() -> CuMonitoringMetadata {
1509 const COMPONENTS: &[MonitorComponentMetadata] = &[
1510 MonitorComponentMetadata::new("a", ComponentType::Task, None),
1511 MonitorComponentMetadata::new("b", ComponentType::Task, None),
1512 ];
1513 CuMonitoringMetadata::new(
1514 CompactString::from(crate::config::DEFAULT_MISSION_ID),
1515 COMPONENTS,
1516 &[],
1517 CopperListInfo::new(0, 0),
1518 MonitorTopology::default(),
1519 None,
1520 )
1521 .expect("test metadata should be valid")
1522 }
1523
1524 impl CuMonitor for TestMonitor {
1525 fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
1526 let monitor = Self::new_with(TestDecision::Ignore);
1527 #[cfg(feature = "std")]
1528 let _ = runtime.execution_probe();
1529 Ok(monitor)
1530 }
1531
1532 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1533 self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
1534 Ok(())
1535 }
1536
1537 fn process_error(
1538 &self,
1539 _component_id: ComponentId,
1540 _step: CuComponentState,
1541 _error: &CuError,
1542 ) -> Decision {
1543 match self.decision {
1544 TestDecision::Ignore => Decision::Ignore,
1545 TestDecision::Abort => Decision::Abort,
1546 TestDecision::Shutdown => Decision::Shutdown,
1547 }
1548 }
1549
1550 fn process_panic(&self, _panic_message: &str) {
1551 self.panic_calls.fetch_add(1, Ordering::SeqCst);
1552 }
1553 }
1554
1555 #[test]
1556 fn test_live_statistics_percentiles() {
1557 let mut stats = LiveStatistics::new_with_max(1000);
1558
1559 for i in 0..100 {
1561 stats.record(i);
1562 }
1563
1564 assert_eq!(stats.len(), 100);
1565 assert_eq!(stats.min(), 0);
1566 assert_eq!(stats.max(), 99);
1567 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
1571 let p90 = stats.percentile(0.90);
1572 let p95 = stats.percentile(0.95);
1573 let p99 = stats.percentile(0.99);
1574
1575 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
1577 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
1578 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
1579 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
1580 }
1581
1582 #[test]
1583 fn test_duration_stats() {
1584 let mut stats = CuDurationStatistics::new(CuDuration(1000));
1585 stats.record(CuDuration(100));
1586 stats.record(CuDuration(200));
1587 stats.record(CuDuration(500));
1588 stats.record(CuDuration(400));
1589 assert_eq!(stats.min(), CuDuration(100));
1590 assert_eq!(stats.max(), CuDuration(500));
1591 assert_eq!(stats.mean(), CuDuration(300));
1592 assert_eq!(stats.len(), 4);
1593 assert_eq!(stats.jitter.len(), 3);
1594 assert_eq!(stats.jitter_min(), CuDuration(100));
1595 assert_eq!(stats.jitter_max(), CuDuration(300));
1596 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
1597 stats.reset();
1598 assert_eq!(stats.len(), 0);
1599 }
1600
1601 #[test]
1602 fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
1603 let err = CuError::from("boom");
1604
1605 let two = (
1606 TestMonitor::new_with(TestDecision::Ignore),
1607 TestMonitor::new_with(TestDecision::Shutdown),
1608 );
1609 assert!(matches!(
1610 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
1611 Decision::Shutdown
1612 ));
1613
1614 let two = (
1615 TestMonitor::new_with(TestDecision::Ignore),
1616 TestMonitor::new_with(TestDecision::Abort),
1617 );
1618 assert!(matches!(
1619 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
1620 Decision::Abort
1621 ));
1622 }
1623
1624 #[test]
1625 fn tuple_monitor_fans_out_callbacks() {
1626 let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
1627 test_metadata(),
1628 CuMonitoringRuntime::unavailable(),
1629 )
1630 .expect("tuple new");
1631 let (ctx, _clock_control) = CuContext::new_mock_clock();
1632 let empty_view = test_metadata().layout().view(&[]);
1633 monitors
1634 .process_copperlist(&ctx, empty_view)
1635 .expect("process_copperlist should fan out");
1636 monitors.process_panic("panic marker");
1637
1638 assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
1639 assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
1640 assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
1641 assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
1642 }
1643
1644 #[test]
1645 fn runtime_execution_probe_roundtrip_marker() {
1646 let probe = RuntimeExecutionProbe::default();
1647 assert!(probe.marker().is_none());
1648 assert_eq!(probe.sequence(), 0);
1649
1650 probe.record(ExecutionMarker {
1651 component_id: ComponentId::new(7),
1652 step: CuComponentState::Process,
1653 culistid: Some(42),
1654 });
1655
1656 let marker = probe.marker().expect("marker should be available");
1657 assert_eq!(marker.component_id, ComponentId::new(7));
1658 assert!(matches!(marker.step, CuComponentState::Process));
1659 assert_eq!(marker.culistid, Some(42));
1660 assert_eq!(probe.sequence(), 1);
1661 }
1662}