Skip to main content

cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the components it runs.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{
6    BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7};
8use crate::context::CuContext;
9use crate::cutask::CuMsgMetadata;
10use bincode::Encode;
11use bincode::config::standard;
12use bincode::enc::EncoderImpl;
13use bincode::enc::write::SizeWriter;
14use compact_str::CompactString;
15use cu29_clock::CuDuration;
16#[allow(unused_imports)]
17use cu29_log::CuLogLevel;
18#[cfg(all(feature = "std", debug_assertions))]
19use cu29_log_runtime::{
20    format_message_only, register_live_log_listener, unregister_live_log_listener,
21};
22use cu29_traits::{
23    CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
24    finish_observed_encode,
25};
26use portable_atomic::{
27    AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
28};
29use serde_derive::{Deserialize, Serialize};
30
31#[cfg(not(feature = "std"))]
32extern crate alloc;
33
34#[cfg(feature = "std")]
35use core::cell::Cell;
36#[cfg(feature = "std")]
37use std::sync::Arc;
38#[cfg(feature = "std")]
39use std::thread_local;
40#[cfg(feature = "std")]
41use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
42
43#[cfg(not(feature = "std"))]
44use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
45#[cfg(not(target_has_atomic = "64"))]
46use spin::Mutex;
47#[cfg(not(feature = "std"))]
48use spin::Mutex as SpinMutex;
49
50#[cfg(not(feature = "std"))]
51mod imp {
52    pub use alloc::alloc::{GlobalAlloc, Layout};
53    #[cfg(target_has_atomic = "64")]
54    pub use core::sync::atomic::AtomicU64;
55    pub use core::sync::atomic::{AtomicUsize, Ordering};
56    pub use libm::sqrt;
57}
58
59#[cfg(feature = "std")]
60mod imp {
61    #[cfg(feature = "memory_monitoring")]
62    use super::CountingAlloc;
63    #[cfg(feature = "memory_monitoring")]
64    pub use std::alloc::System;
65    pub use std::alloc::{GlobalAlloc, Layout};
66    #[cfg(target_has_atomic = "64")]
67    pub use std::sync::atomic::AtomicU64;
68    pub use std::sync::atomic::{AtomicUsize, Ordering};
69    #[cfg(feature = "memory_monitoring")]
70    #[global_allocator]
71    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
72}
73
74use imp::*;
75
76#[cfg(all(feature = "std", debug_assertions))]
77fn format_timestamp(time: CuDuration) -> String {
78    // Render CuTime/CuDuration as HH:mm:ss.xxxx (4 fractional digits of a second).
79    let nanos = time.as_nanos();
80    let total_seconds = nanos / 1_000_000_000;
81    let hours = total_seconds / 3600;
82    let minutes = (total_seconds / 60) % 60;
83    let seconds = total_seconds % 60;
84    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
85    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
86}
87
88/// Lifecycle state of a monitored component.
89#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
90pub enum CuComponentState {
91    Start,
92    Preprocess,
93    Process,
94    Postprocess,
95    Stop,
96}
97
98/// Strongly-typed index into [`CuMonitoringMetadata::components`].
99#[repr(transparent)]
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
101pub struct ComponentId(usize);
102
103impl ComponentId {
104    pub const INVALID: Self = Self(usize::MAX);
105
106    #[inline]
107    pub const fn new(index: usize) -> Self {
108        Self(index)
109    }
110
111    #[inline]
112    pub const fn index(self) -> usize {
113        self.0
114    }
115}
116
117impl core::fmt::Display for ComponentId {
118    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
119        self.0.fmt(f)
120    }
121}
122
123impl From<ComponentId> for usize {
124    fn from(value: ComponentId) -> Self {
125        value.index()
126    }
127}
128
129/// Strongly-typed CopperList slot index.
130#[repr(transparent)]
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
132pub struct CuListSlot(usize);
133
134impl CuListSlot {
135    #[inline]
136    pub const fn new(index: usize) -> Self {
137        Self(index)
138    }
139
140    #[inline]
141    pub const fn index(self) -> usize {
142        self.0
143    }
144}
145
146impl From<CuListSlot> for usize {
147    fn from(value: CuListSlot) -> Self {
148        value.index()
149    }
150}
151
152/// Static monitor-side CopperList indexing layout.
153///
154/// This layout is mission/runtime scoped and remains constant after monitor construction.
155#[derive(Debug, Clone, Copy)]
156pub struct CopperListLayout {
157    components: &'static [MonitorComponentMetadata],
158    slot_to_component: &'static [ComponentId],
159}
160
161impl CopperListLayout {
162    #[inline]
163    pub const fn new(
164        components: &'static [MonitorComponentMetadata],
165        slot_to_component: &'static [ComponentId],
166    ) -> Self {
167        Self {
168            components,
169            slot_to_component,
170        }
171    }
172
173    #[inline]
174    pub const fn components(self) -> &'static [MonitorComponentMetadata] {
175        self.components
176    }
177
178    #[inline]
179    pub const fn component_count(self) -> usize {
180        self.components.len()
181    }
182
183    #[inline]
184    pub const fn culist_slot_count(self) -> usize {
185        self.slot_to_component.len()
186    }
187
188    #[inline]
189    pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
190        &self.components[id.index()]
191    }
192
193    #[inline]
194    pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
195        self.slot_to_component[culist_slot.index()]
196    }
197
198    #[inline]
199    pub const fn slot_to_component(self) -> &'static [ComponentId] {
200        self.slot_to_component
201    }
202
203    #[inline]
204    pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
205        CopperListView::new(self, msgs)
206    }
207}
208
209/// Per-loop monitor view over CopperList metadata paired with static component mapping.
210#[derive(Debug, Clone, Copy)]
211pub struct CopperListView<'a> {
212    layout: CopperListLayout,
213    msgs: &'a [&'a CuMsgMetadata],
214}
215
216impl<'a> CopperListView<'a> {
217    #[inline]
218    pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
219        assert_eq!(
220            msgs.len(),
221            layout.culist_slot_count(),
222            "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
223            msgs.len(),
224            layout.culist_slot_count()
225        );
226        Self { layout, msgs }
227    }
228
229    #[inline]
230    pub const fn layout(self) -> CopperListLayout {
231        self.layout
232    }
233
234    #[inline]
235    pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
236        self.msgs
237    }
238
239    #[inline]
240    pub const fn len(self) -> usize {
241        self.msgs.len()
242    }
243
244    #[inline]
245    pub const fn is_empty(self) -> bool {
246        self.msgs.is_empty()
247    }
248
249    #[inline]
250    pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
251        let index = culist_slot.index();
252        CopperListEntry {
253            culist_slot,
254            component_id: self.layout.component_for_slot(culist_slot),
255            msg: self.msgs[index],
256        }
257    }
258
259    pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
260        self.msgs.iter().enumerate().map(move |(idx, msg)| {
261            let culist_slot = CuListSlot::new(idx);
262            CopperListEntry {
263                culist_slot,
264                component_id: self.layout.component_for_slot(culist_slot),
265                msg,
266            }
267        })
268    }
269}
270
271/// One message entry in CopperList slot order with resolved component identity.
272#[derive(Debug, Clone, Copy)]
273pub struct CopperListEntry<'a> {
274    pub culist_slot: CuListSlot,
275    pub component_id: ComponentId,
276    pub msg: &'a CuMsgMetadata,
277}
278
279impl<'a> CopperListEntry<'a> {
280    #[inline]
281    pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
282        layout.component(self.component_id)
283    }
284
285    #[inline]
286    pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
287        layout.component(self.component_id).kind()
288    }
289}
290
291/// Execution progress marker emitted by the runtime before running a component step.
292#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
293pub struct ExecutionMarker {
294    /// Index into `CuMonitoringMetadata::components()`.
295    pub component_id: ComponentId,
296    /// Lifecycle phase currently entered.
297    pub step: CuComponentState,
298    /// CopperList id when available (runtime loop), None during start/stop.
299    pub culistid: Option<u64>,
300}
301
302/// Lock-free runtime-side progress probe.
303///
304/// The runtime writes execution markers directly into this probe from the hot path
305/// (without calling monitor fan-out callbacks), and monitors can read a coherent
306/// snapshot from watchdog threads when diagnosing stalls.
307#[derive(Debug)]
308pub struct RuntimeExecutionProbe {
309    component_id: AtomicUsize,
310    step: AtomicUsize,
311    #[cfg(target_has_atomic = "64")]
312    culistid: AtomicU64,
313    #[cfg(target_has_atomic = "64")]
314    culistid_present: AtomicUsize,
315    #[cfg(not(target_has_atomic = "64"))]
316    culistid: Mutex<Option<u64>>,
317    sequence: AtomicUsize,
318}
319
320impl Default for RuntimeExecutionProbe {
321    fn default() -> Self {
322        Self {
323            component_id: AtomicUsize::new(ComponentId::INVALID.index()),
324            step: AtomicUsize::new(0),
325            #[cfg(target_has_atomic = "64")]
326            culistid: AtomicU64::new(0),
327            #[cfg(target_has_atomic = "64")]
328            culistid_present: AtomicUsize::new(0),
329            #[cfg(not(target_has_atomic = "64"))]
330            culistid: Mutex::new(None),
331            sequence: AtomicUsize::new(0),
332        }
333    }
334}
335
336impl RuntimeExecutionProbe {
337    #[inline]
338    pub fn record(&self, marker: ExecutionMarker) {
339        self.component_id
340            .store(marker.component_id.index(), Ordering::Relaxed);
341        self.step
342            .store(component_state_to_usize(marker.step), Ordering::Relaxed);
343        #[cfg(target_has_atomic = "64")]
344        match marker.culistid {
345            Some(culistid) => {
346                self.culistid.store(culistid, Ordering::Relaxed);
347                self.culistid_present.store(1, Ordering::Relaxed);
348            }
349            None => {
350                self.culistid_present.store(0, Ordering::Relaxed);
351            }
352        }
353        #[cfg(not(target_has_atomic = "64"))]
354        {
355            *self.culistid.lock() = marker.culistid;
356        }
357        self.sequence.fetch_add(1, Ordering::Release);
358    }
359
360    #[inline]
361    pub fn sequence(&self) -> usize {
362        self.sequence.load(Ordering::Acquire)
363    }
364
365    #[inline]
366    pub fn marker(&self) -> Option<ExecutionMarker> {
367        // Read a coherent snapshot. A concurrent writer may change values between reads;
368        // in that case we retry to keep the marker and sequence aligned.
369        loop {
370            let seq_before = self.sequence.load(Ordering::Acquire);
371            let component_id = self.component_id.load(Ordering::Relaxed);
372            let step = self.step.load(Ordering::Relaxed);
373            #[cfg(target_has_atomic = "64")]
374            let culistid_present = self.culistid_present.load(Ordering::Relaxed);
375            #[cfg(target_has_atomic = "64")]
376            let culistid_value = self.culistid.load(Ordering::Relaxed);
377            #[cfg(not(target_has_atomic = "64"))]
378            let culistid = *self.culistid.lock();
379            let seq_after = self.sequence.load(Ordering::Acquire);
380            if seq_before == seq_after {
381                if component_id == ComponentId::INVALID.index() {
382                    return None;
383                }
384                let step = usize_to_component_state(step);
385                #[cfg(target_has_atomic = "64")]
386                let culistid = if culistid_present == 0 {
387                    None
388                } else {
389                    Some(culistid_value)
390                };
391                return Some(ExecutionMarker {
392                    component_id: ComponentId::new(component_id),
393                    step,
394                    culistid,
395                });
396            }
397        }
398    }
399}
400
401#[inline]
402const fn component_state_to_usize(step: CuComponentState) -> usize {
403    match step {
404        CuComponentState::Start => 0,
405        CuComponentState::Preprocess => 1,
406        CuComponentState::Process => 2,
407        CuComponentState::Postprocess => 3,
408        CuComponentState::Stop => 4,
409    }
410}
411
412#[inline]
413const fn usize_to_component_state(step: usize) -> CuComponentState {
414    match step {
415        0 => CuComponentState::Start,
416        1 => CuComponentState::Preprocess,
417        2 => CuComponentState::Process,
418        3 => CuComponentState::Postprocess,
419        _ => CuComponentState::Stop,
420    }
421}
422
423#[cfg(feature = "std")]
424pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
425
426/// Platform-neutral monitor view of runtime execution progress.
427///
428/// In `std` builds this can wrap a shared runtime probe. In `no_std` builds it is currently
429/// unavailable and helper methods return `None`/`false`.
430#[derive(Debug, Clone)]
431pub struct MonitorExecutionProbe {
432    #[cfg(feature = "std")]
433    inner: Option<ExecutionProbeHandle>,
434}
435
436impl Default for MonitorExecutionProbe {
437    fn default() -> Self {
438        Self::unavailable()
439    }
440}
441
442impl MonitorExecutionProbe {
443    #[cfg(feature = "std")]
444    pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
445        Self {
446            inner: Some(handle),
447        }
448    }
449
450    pub const fn unavailable() -> Self {
451        Self {
452            #[cfg(feature = "std")]
453            inner: None,
454        }
455    }
456
457    pub fn is_available(&self) -> bool {
458        #[cfg(feature = "std")]
459        {
460            self.inner.is_some()
461        }
462        #[cfg(not(feature = "std"))]
463        {
464            false
465        }
466    }
467
468    pub fn marker(&self) -> Option<ExecutionMarker> {
469        #[cfg(feature = "std")]
470        {
471            self.inner.as_ref().and_then(|probe| probe.marker())
472        }
473        #[cfg(not(feature = "std"))]
474        {
475            None
476        }
477    }
478
479    pub fn sequence(&self) -> Option<usize> {
480        #[cfg(feature = "std")]
481        {
482            self.inner.as_ref().map(|probe| probe.sequence())
483        }
484        #[cfg(not(feature = "std"))]
485        {
486            None
487        }
488    }
489}
490
491/// Runtime component category used by monitoring metadata and topology.
492///
493/// A "task" is a regular Copper task (lifecycle callbacks + payload processing). A "bridge"
494/// is a monitored bridge-side execution component (bridge nodes and channel endpoints).
495#[derive(Debug, Clone, Copy, PartialEq, Eq)]
496#[non_exhaustive]
497pub enum ComponentType {
498    Source,
499    Task,
500    Sink,
501    Bridge,
502}
503
504impl ComponentType {
505    pub const fn is_task(self) -> bool {
506        !matches!(self, Self::Bridge)
507    }
508}
509
510/// Static identity entry for one monitored runtime component.
511#[derive(Debug, Clone, Copy, PartialEq, Eq)]
512pub struct MonitorComponentMetadata {
513    id: &'static str,
514    kind: ComponentType,
515    type_name: Option<&'static str>,
516}
517
518impl MonitorComponentMetadata {
519    pub const fn new(
520        id: &'static str,
521        kind: ComponentType,
522        type_name: Option<&'static str>,
523    ) -> Self {
524        Self {
525            id,
526            kind,
527            type_name,
528        }
529    }
530
531    /// Stable monitor component id (for logs/debug and joins with runtime markers).
532    pub const fn id(&self) -> &'static str {
533        self.id
534    }
535
536    pub const fn kind(&self) -> ComponentType {
537        self.kind
538    }
539
540    /// Rust type label when available (typically tasks); `None` for synthetic bridge entries.
541    pub const fn type_name(&self) -> Option<&'static str> {
542        self.type_name
543    }
544}
545
546/// Immutable runtime-provided metadata passed once to [`CuMonitor::new`].
547///
548/// This bundles identifiers, deterministic component layout, and monitor-specific config so monitor
549/// construction is explicit and does not need ad-hoc late setters.
550#[derive(Debug, Clone)]
551pub struct CuMonitoringMetadata {
552    mission_id: CompactString,
553    subsystem_id: Option<CompactString>,
554    instance_id: u32,
555    layout: CopperListLayout,
556    copperlist_info: CopperListInfo,
557    topology: MonitorTopology,
558    monitor_config: Option<ComponentConfig>,
559}
560
561impl CuMonitoringMetadata {
562    pub fn new(
563        mission_id: CompactString,
564        components: &'static [MonitorComponentMetadata],
565        culist_component_mapping: &'static [ComponentId],
566        copperlist_info: CopperListInfo,
567        topology: MonitorTopology,
568        monitor_config: Option<ComponentConfig>,
569    ) -> CuResult<Self> {
570        Self::validate_components(components)?;
571        Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
572        Ok(Self {
573            mission_id,
574            subsystem_id: None,
575            instance_id: 0,
576            layout: CopperListLayout::new(components, culist_component_mapping),
577            copperlist_info,
578            topology,
579            monitor_config,
580        })
581    }
582
583    fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
584        let mut seen_bridge = false;
585        for component in components {
586            match component.kind() {
587                component_type if component_type.is_task() && seen_bridge => {
588                    return Err(CuError::from(
589                        "invalid monitor metadata: task-family components must appear before bridges",
590                    ));
591                }
592                ComponentType::Bridge => seen_bridge = true,
593                _ => {}
594            }
595        }
596        Ok(())
597    }
598
599    fn validate_culist_mapping(
600        components_len: usize,
601        culist_component_mapping: &'static [ComponentId],
602    ) -> CuResult<()> {
603        for component_idx in culist_component_mapping {
604            if component_idx.index() >= components_len {
605                return Err(CuError::from(
606                    "invalid monitor metadata: culist mapping points past components table",
607                ));
608            }
609        }
610        Ok(())
611    }
612
613    /// Active mission identifier for this runtime instance.
614    pub fn mission_id(&self) -> &str {
615        self.mission_id.as_str()
616    }
617
618    /// Compile-time subsystem identifier for this runtime instance when running in a
619    /// multi-Copper deployment.
620    pub fn subsystem_id(&self) -> Option<&str> {
621        self.subsystem_id.as_deref()
622    }
623
624    /// Deployment/runtime instance identity for this runtime instance.
625    pub fn instance_id(&self) -> u32 {
626        self.instance_id
627    }
628
629    /// Canonical table of monitored runtime components.
630    ///
631    /// Ordering is deterministic and mission-scoped: tasks first, then bridge-side components.
632    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
633        self.layout.components()
634    }
635
636    /// Total number of monitored components.
637    pub const fn component_count(&self) -> usize {
638        self.layout.component_count()
639    }
640
641    /// Static runtime layout used to map CopperList slots to components.
642    pub const fn layout(&self) -> CopperListLayout {
643        self.layout
644    }
645
646    pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
647        self.layout.component(component_id)
648    }
649
650    pub fn component_id(&self, component_id: ComponentId) -> &'static str {
651        self.component(component_id).id()
652    }
653
654    pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
655        self.component(component_id).kind()
656    }
657
658    pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
659        self.layout
660            .components()
661            .iter()
662            .position(|component| component.id() == component_id)
663            .map(ComponentId::new)
664    }
665
666    /// CopperList slot -> monitored component index mapping.
667    ///
668    /// This table maps each CopperList slot index to the producing component index.
669    pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
670        self.layout.slot_to_component()
671    }
672
673    pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
674        self.layout.component_for_slot(culist_slot)
675    }
676
677    pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
678        self.layout.view(msgs)
679    }
680
681    pub const fn copperlist_info(&self) -> CopperListInfo {
682        self.copperlist_info
683    }
684
685    /// Resolved graph topology for the active mission.
686    ///
687    /// This is always available. Nodes represent config graph nodes, not every synthetic bridge
688    /// channel entry in `components()`.
689    pub fn topology(&self) -> &MonitorTopology {
690        &self.topology
691    }
692
693    pub fn monitor_config(&self) -> Option<&ComponentConfig> {
694        self.monitor_config.as_ref()
695    }
696
697    pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
698        self.monitor_config = monitor_config;
699        self
700    }
701
702    pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
703        self.subsystem_id = subsystem_id.map(CompactString::from);
704        self
705    }
706
707    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
708        self.instance_id = instance_id;
709        self
710    }
711}
712
713/// Runtime-provided dynamic monitoring handles passed once to [`CuMonitor::new`].
714///
715/// This context may expose live runtime state (for example execution progress probes).
716#[derive(Debug, Clone, Default)]
717pub struct CuMonitoringRuntime {
718    execution_probe: MonitorExecutionProbe,
719}
720
721impl CuMonitoringRuntime {
722    pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
723        Self { execution_probe }
724    }
725
726    pub const fn unavailable() -> Self {
727        Self::new(MonitorExecutionProbe::unavailable())
728    }
729
730    pub fn execution_probe(&self) -> &MonitorExecutionProbe {
731        &self.execution_probe
732    }
733}
734
735/// Monitor decision to be taken when a component step errored out.
736#[derive(Debug)]
737pub enum Decision {
738    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
739    Ignore, // Ignore this error and try to continue, ie calling the other component steps, setting a None return value and continue a copperlist.
740    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
741}
742
743fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
744    use Decision::{Abort, Ignore, Shutdown};
745    // Pick the strictest monitor decision when multiple monitors disagree.
746    // Shutdown dominates Abort, which dominates Ignore.
747    match (lhs, rhs) {
748        (Shutdown, _) | (_, Shutdown) => Shutdown,
749        (Abort, _) | (_, Abort) => Abort,
750        _ => Ignore,
751    }
752}
753
754#[derive(Debug, Clone)]
755pub struct MonitorNode {
756    pub id: String,
757    pub type_name: Option<String>,
758    pub kind: ComponentType,
759    /// Ordered list of input port identifiers.
760    pub inputs: Vec<String>,
761    /// Ordered list of output port identifiers.
762    pub outputs: Vec<String>,
763}
764
765#[derive(Debug, Clone)]
766pub struct MonitorConnection {
767    pub src: String,
768    pub src_port: Option<String>,
769    pub dst: String,
770    pub dst_port: Option<String>,
771    pub msg: String,
772}
773
774#[derive(Debug, Clone, Default)]
775pub struct MonitorTopology {
776    pub nodes: Vec<MonitorNode>,
777    pub connections: Vec<MonitorConnection>,
778}
779
780#[derive(Debug, Clone, Copy, Default)]
781pub struct CopperListInfo {
782    pub size_bytes: usize,
783    pub count: usize,
784}
785
786impl CopperListInfo {
787    pub const fn new(size_bytes: usize, count: usize) -> Self {
788        Self { size_bytes, count }
789    }
790}
791
792/// Reported data about CopperList IO for a single iteration.
793#[derive(Debug, Clone, Copy, Default)]
794pub struct CopperListIoStats {
795    /// CopperList bytes resident in RAM for this iteration.
796    ///
797    /// This includes the fixed CopperList struct size plus any pooled or
798    /// handle-backed payload bytes observed on the real encode path.
799    pub raw_culist_bytes: u64,
800    /// Bytes attributed to handle-backed storage while measuring payload IO.
801    ///
802    /// This is surfaced separately so monitors can show how much of the runtime
803    /// footprint lives in pooled payload buffers rather than inside the fixed
804    /// CopperList struct.
805    pub handle_bytes: u64,
806    /// Bytes produced by bincode serialization of the CopperList
807    pub encoded_culist_bytes: u64,
808    /// Bytes produced by bincode serialization of the KeyFrame (0 if none)
809    pub keyframe_bytes: u64,
810    /// Cumulative bytes written to the structured log stream so far
811    pub structured_log_bytes_total: u64,
812    /// CopperList identifier for reference in monitors
813    pub culistid: u64,
814}
815
816#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
817pub struct PayloadIoStats {
818    pub resident_bytes: usize,
819    pub encoded_bytes: usize,
820    pub handle_bytes: usize,
821}
822
823#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
824pub struct CuMsgIoStats {
825    pub present: bool,
826    pub resident_bytes: u64,
827    pub encoded_bytes: u64,
828    pub handle_bytes: u64,
829}
830
831struct CuMsgIoEntry {
832    present: PortableAtomicBool,
833    resident_bytes: PortableAtomicU64,
834    encoded_bytes: PortableAtomicU64,
835    handle_bytes: PortableAtomicU64,
836}
837
838impl CuMsgIoEntry {
839    fn clear(&self) {
840        self.present.store(false, PortableOrdering::Release);
841        self.resident_bytes.store(0, PortableOrdering::Relaxed);
842        self.encoded_bytes.store(0, PortableOrdering::Relaxed);
843        self.handle_bytes.store(0, PortableOrdering::Relaxed);
844    }
845
846    fn get(&self) -> CuMsgIoStats {
847        if !self.present.load(PortableOrdering::Acquire) {
848            return CuMsgIoStats::default();
849        }
850
851        CuMsgIoStats {
852            present: true,
853            resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
854            encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
855            handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
856        }
857    }
858
859    fn set(&self, stats: CuMsgIoStats) {
860        self.resident_bytes
861            .store(stats.resident_bytes, PortableOrdering::Relaxed);
862        self.encoded_bytes
863            .store(stats.encoded_bytes, PortableOrdering::Relaxed);
864        self.handle_bytes
865            .store(stats.handle_bytes, PortableOrdering::Relaxed);
866        self.present.store(stats.present, PortableOrdering::Release);
867    }
868}
869
870impl Default for CuMsgIoEntry {
871    fn default() -> Self {
872        Self {
873            present: PortableAtomicBool::new(false),
874            resident_bytes: PortableAtomicU64::new(0),
875            encoded_bytes: PortableAtomicU64::new(0),
876            handle_bytes: PortableAtomicU64::new(0),
877        }
878    }
879}
880
881pub struct CuMsgIoCache<const N: usize> {
882    entries: [CuMsgIoEntry; N],
883}
884
885impl<const N: usize> CuMsgIoCache<N> {
886    pub fn clear(&self) {
887        for entry in &self.entries {
888            entry.clear();
889        }
890    }
891
892    pub fn get(&self, idx: usize) -> CuMsgIoStats {
893        self.entries[idx].get()
894    }
895
896    fn raw_parts(&self) -> (usize, usize) {
897        (self.entries.as_ptr() as usize, N)
898    }
899}
900
901impl<const N: usize> Default for CuMsgIoCache<N> {
902    fn default() -> Self {
903        Self {
904            entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
905        }
906    }
907}
908
909#[derive(Clone, Copy)]
910struct ActiveCuMsgIoCapture {
911    cache_addr: usize,
912    cache_len: usize,
913    current_slot: Option<usize>,
914}
915
916#[cfg(feature = "std")]
917thread_local! {
918    static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
919    static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
920    static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
921}
922
923#[cfg(not(feature = "std"))]
924static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
925#[cfg(not(feature = "std"))]
926static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
927#[cfg(not(feature = "std"))]
928static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
929
930fn begin_payload_io_measurement() {
931    #[cfg(feature = "std")]
932    PAYLOAD_HANDLE_BYTES.with(|bytes| {
933        debug_assert!(
934            bytes.get().is_none(),
935            "payload IO byte measurement must not be nested"
936        );
937        bytes.set(Some(0));
938    });
939
940    #[cfg(not(feature = "std"))]
941    {
942        let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
943        debug_assert!(
944            bytes.is_none(),
945            "payload IO byte measurement must not be nested"
946        );
947        *bytes = Some(0);
948    }
949}
950
951fn finish_payload_io_measurement() -> usize {
952    #[cfg(feature = "std")]
953    {
954        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
955    }
956
957    #[cfg(not(feature = "std"))]
958    {
959        PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
960    }
961}
962
963fn abort_payload_io_measurement() {
964    #[cfg(feature = "std")]
965    PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
966
967    #[cfg(not(feature = "std"))]
968    {
969        *PAYLOAD_HANDLE_BYTES.lock() = None;
970    }
971}
972
973fn current_payload_io_measurement() -> usize {
974    #[cfg(feature = "std")]
975    {
976        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
977    }
978
979    #[cfg(not(feature = "std"))]
980    {
981        PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
982    }
983}
984
985#[cfg(feature = "std")]
986pub(crate) fn record_payload_handle_bytes(bytes: usize) {
987    #[cfg(feature = "std")]
988    PAYLOAD_HANDLE_BYTES.with(|total| {
989        if let Some(current) = total.get() {
990            total.set(Some(current.saturating_add(bytes)));
991        }
992    });
993
994    #[cfg(not(feature = "std"))]
995    {
996        let mut total = PAYLOAD_HANDLE_BYTES.lock();
997        if let Some(current) = *total {
998            *total = Some(current.saturating_add(bytes));
999        }
1000    }
1001}
1002
1003fn set_last_completed_handle_bytes(bytes: u64) {
1004    #[cfg(feature = "std")]
1005    LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1006
1007    #[cfg(not(feature = "std"))]
1008    {
1009        *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1010    }
1011}
1012
1013pub fn take_last_completed_handle_bytes() -> u64 {
1014    #[cfg(feature = "std")]
1015    {
1016        LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1017    }
1018
1019    #[cfg(not(feature = "std"))]
1020    {
1021        let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1022        let value = *total;
1023        *total = 0;
1024        value
1025    }
1026}
1027
1028fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1029    #[cfg(feature = "std")]
1030    {
1031        ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1032            let mut state = capture.get()?;
1033            let result = f(&mut state);
1034            capture.set(Some(state));
1035            Some(result)
1036        })
1037    }
1038
1039    #[cfg(not(feature = "std"))]
1040    {
1041        let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1042        let state = capture.as_mut()?;
1043        Some(f(state))
1044    }
1045}
1046
1047pub struct CuMsgIoCaptureGuard;
1048
1049impl CuMsgIoCaptureGuard {
1050    pub fn select_slot(&self, slot: usize) {
1051        let _ = with_active_capture_mut(|capture| {
1052            debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1053            capture.current_slot = Some(slot);
1054        });
1055    }
1056}
1057
1058impl Drop for CuMsgIoCaptureGuard {
1059    fn drop(&mut self) {
1060        set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1061
1062        #[cfg(feature = "std")]
1063        ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1064
1065        #[cfg(not(feature = "std"))]
1066        {
1067            *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1068        }
1069    }
1070}
1071
1072pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1073    cache.clear();
1074    set_last_completed_handle_bytes(0);
1075    begin_payload_io_measurement();
1076    let (cache_addr, cache_len) = cache.raw_parts();
1077    let capture = ActiveCuMsgIoCapture {
1078        cache_addr,
1079        cache_len,
1080        current_slot: None,
1081    };
1082
1083    #[cfg(feature = "std")]
1084    ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1085        debug_assert!(
1086            state.get().is_none(),
1087            "CopperList payload IO capture must not be nested"
1088        );
1089        state.set(Some(capture));
1090    });
1091
1092    #[cfg(not(feature = "std"))]
1093    {
1094        let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1095        debug_assert!(
1096            state.is_none(),
1097            "CopperList payload IO capture must not be nested"
1098        );
1099        *state = Some(capture);
1100    }
1101
1102    CuMsgIoCaptureGuard
1103}
1104
1105pub(crate) fn current_payload_handle_bytes() -> usize {
1106    current_payload_io_measurement()
1107}
1108
1109pub(crate) fn record_current_slot_payload_io_stats(
1110    fixed_bytes: usize,
1111    encoded_bytes: usize,
1112    handle_bytes: usize,
1113) {
1114    let _ = with_active_capture_mut(|capture| {
1115        let Some(slot) = capture.current_slot else {
1116            return;
1117        };
1118        if slot >= capture.cache_len {
1119            return;
1120        }
1121        // SAFETY: the capture guard holds the cache alive for the duration of the encode pass.
1122        let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1123        let entry = unsafe { &*cache_ptr.add(slot) };
1124        entry.set(CuMsgIoStats {
1125            present: true,
1126            resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1127            encoded_bytes: encoded_bytes as u64,
1128            handle_bytes: handle_bytes as u64,
1129        });
1130    });
1131}
1132
1133/// Measures payload bytes using the same encode path Copper uses for
1134/// logging/export.
1135///
1136/// `resident_bytes` is the payload's in-memory fixed footprint plus any
1137/// handle-backed dynamic storage reported during encoding. `encoded_bytes` is
1138/// the exact bincode payload size.
1139pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1140where
1141    T: Encode,
1142{
1143    begin_payload_io_measurement();
1144    begin_observed_encode();
1145
1146    let result = (|| {
1147        let mut encoder =
1148            EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1149        payload.encode(&mut encoder).map_err(|e| {
1150            CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1151        })?;
1152        let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1153        debug_assert_eq!(encoded_bytes, finish_observed_encode());
1154        let handle_bytes = finish_payload_io_measurement();
1155        Ok(PayloadIoStats {
1156            resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1157            encoded_bytes,
1158            handle_bytes,
1159        })
1160    })();
1161
1162    if result.is_err() {
1163        abort_payload_io_measurement();
1164        abort_observed_encode();
1165    }
1166
1167    result
1168}
1169
1170#[derive(Default, Debug, Clone, Copy)]
1171struct NodeIoUsage {
1172    has_incoming: bool,
1173    has_outgoing: bool,
1174}
1175
1176fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1177    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1178    edge_ids.sort();
1179
1180    let mut outputs = Vec::new();
1181    let mut seen = Vec::new();
1182    let mut port_idx = 0usize;
1183    for edge_id in edge_ids {
1184        let Some(edge) = graph.edge(edge_id) else {
1185            continue;
1186        };
1187        if seen.iter().any(|msg| msg == &edge.msg) {
1188            continue;
1189        }
1190        seen.push(edge.msg.clone());
1191        let mut port_label = String::from("out");
1192        port_label.push_str(&port_idx.to_string());
1193        port_label.push_str(": ");
1194        port_label.push_str(edge.msg.as_str());
1195        outputs.push((edge.msg.clone(), port_label));
1196        port_idx += 1;
1197    }
1198    outputs
1199}
1200
1201/// Derive a monitor-friendly topology from the runtime configuration.
1202pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1203    let graph = config.get_graph(Some(mission))?;
1204    let mut nodes: Map<String, MonitorNode> = Map::new();
1205    let mut io_usage: Map<String, NodeIoUsage> = Map::new();
1206    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1207
1208    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1209    for bridge in &config.bridges {
1210        bridge_lookup.insert(bridge.id.as_str(), bridge);
1211    }
1212
1213    for cnx in graph.edges() {
1214        io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
1215        io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
1216    }
1217
1218    for (_, node) in graph.get_all_nodes() {
1219        let node_id = node.get_id();
1220        let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
1221        let kind = match node.get_flavor() {
1222            Flavor::Bridge => ComponentType::Bridge,
1223            _ if !usage.has_incoming && usage.has_outgoing => ComponentType::Source,
1224            _ if usage.has_incoming && !usage.has_outgoing => ComponentType::Sink,
1225            _ => ComponentType::Task,
1226        };
1227
1228        let mut inputs = Vec::new();
1229        let mut outputs = Vec::new();
1230        if kind == ComponentType::Bridge {
1231            if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
1232                for ch in &bridge.channels {
1233                    match ch {
1234                        BridgeChannelConfigRepresentation::Rx { id, .. } => {
1235                            outputs.push(id.clone())
1236                        }
1237                        BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1238                    }
1239                }
1240            }
1241        } else {
1242            if usage.has_incoming || !usage.has_outgoing {
1243                inputs.push("in".to_string());
1244            }
1245            if usage.has_outgoing {
1246                if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
1247                    let ports = collect_output_ports(graph, node_idx);
1248                    let mut port_map: Map<String, String> = Map::new();
1249                    for (msg_type, label) in ports {
1250                        port_map.insert(msg_type, label.clone());
1251                        outputs.push(label);
1252                    }
1253                    output_port_lookup.insert(node_id.clone(), port_map);
1254                }
1255            } else if !usage.has_incoming {
1256                outputs.push("out".to_string());
1257            }
1258        }
1259
1260        nodes.insert(
1261            node_id.clone(),
1262            MonitorNode {
1263                id: node_id,
1264                type_name: Some(node.get_type().to_string()),
1265                kind,
1266                inputs,
1267                outputs,
1268            },
1269        );
1270    }
1271
1272    let mut connections = Vec::new();
1273    for cnx in graph.edges() {
1274        let src = cnx.src.clone();
1275        let dst = cnx.dst.clone();
1276
1277        let src_port = cnx.src_channel.clone().or_else(|| {
1278            output_port_lookup
1279                .get(&src)
1280                .and_then(|ports| ports.get(&cnx.msg).cloned())
1281                .or_else(|| {
1282                    nodes
1283                        .get(&src)
1284                        .and_then(|node| node.outputs.first().cloned())
1285                })
1286        });
1287        let dst_port = cnx.dst_channel.clone().or_else(|| {
1288            nodes
1289                .get(&dst)
1290                .and_then(|node| node.inputs.first().cloned())
1291        });
1292
1293        connections.push(MonitorConnection {
1294            src,
1295            src_port,
1296            dst,
1297            dst_port,
1298            msg: cnx.msg.clone(),
1299        });
1300    }
1301
1302    Ok(MonitorTopology {
1303        nodes: nodes.into_values().collect(),
1304        connections,
1305    })
1306}
1307
1308/// Runtime monitoring contract implemented by monitor components.
1309///
1310/// Lifecycle:
1311/// 1. [`CuMonitor::new`] is called once at runtime construction time.
1312/// 2. [`CuMonitor::start`] is called once before the first runtime iteration.
1313/// 3. For each iteration, [`CuMonitor::process_copperlist`] is called after component execution,
1314///    then [`CuMonitor::observe_copperlist_io`] after serialization accounting.
1315/// 4. [`CuMonitor::process_error`] is called synchronously when a monitored component step fails.
1316/// 5. [`CuMonitor::process_panic`] is called when the runtime catches a panic (`std` builds).
1317/// 6. [`CuMonitor::stop`] is called once during runtime shutdown.
1318///
1319/// Indexing model:
1320/// - `process_error(component_id, ..)` uses component indices into `metadata.components()`.
1321/// - `process_copperlist(..., view)` iterates CopperList slots with resolved component identity.
1322///
1323/// Error policy:
1324/// - [`Decision::Ignore`] continues execution.
1325/// - [`Decision::Abort`] aborts the current operation (step/copperlist scope).
1326/// - [`Decision::Shutdown`] triggers runtime shutdown.
1327pub trait CuMonitor: Sized {
1328    /// Construct the monitor once, before component execution starts.
1329    ///
1330    /// `metadata` contains mission/config/topology/static mapping information.
1331    /// `runtime` exposes dynamic runtime handles (for example execution probes).
1332    /// Use `metadata.monitor_config()` to decode monitor-specific parameters.
1333    fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1334    where
1335        Self: Sized;
1336
1337    /// Called once before processing the first CopperList.
1338    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1339        Ok(())
1340    }
1341
1342    /// Called once per processed CopperList after component execution.
1343    fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1344
1345    /// Called when runtime finishes CopperList serialization/IO accounting.
1346    fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1347
1348    /// Called when a monitored component step fails; must return an immediate runtime decision.
1349    ///
1350    /// `component_id` is an index into [`CuMonitoringMetadata::components`].
1351    fn process_error(
1352        &self,
1353        component_id: ComponentId,
1354        step: CuComponentState,
1355        error: &CuError,
1356    ) -> Decision;
1357
1358    /// Called when the runtime catches a panic (`std` builds).
1359    fn process_panic(&self, _panic_message: &str) {}
1360
1361    /// Called once during runtime shutdown.
1362    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1363        Ok(())
1364    }
1365}
1366
1367/// A do nothing monitor if no monitor is provided.
1368/// This is basically defining the default behavior of Copper in case of error.
1369pub struct NoMonitor {}
1370impl CuMonitor for NoMonitor {
1371    fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1372        Ok(NoMonitor {})
1373    }
1374
1375    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1376        #[cfg(all(feature = "std", debug_assertions))]
1377        register_live_log_listener(|entry, format_str, param_names| {
1378            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1379            let named: Map<String, String> = param_names
1380                .iter()
1381                .zip(params.iter())
1382                .map(|(k, v)| (k.to_string(), v.clone()))
1383                .collect();
1384
1385            if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1386                let ts = format_timestamp(entry.time);
1387                println!("{} [{:?}] {}", ts, entry.level, msg);
1388            }
1389        });
1390        Ok(())
1391    }
1392
1393    fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1394        // By default, do nothing.
1395        Ok(())
1396    }
1397
1398    fn process_error(
1399        &self,
1400        _component_id: ComponentId,
1401        _step: CuComponentState,
1402        _error: &CuError,
1403    ) -> Decision {
1404        // By default, just try to continue.
1405        Decision::Ignore
1406    }
1407
1408    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1409        #[cfg(all(feature = "std", debug_assertions))]
1410        unregister_live_log_listener();
1411        Ok(())
1412    }
1413}
1414
1415macro_rules! impl_monitor_tuple {
1416    ($($idx:tt => $name:ident),+) => {
1417        impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1418            fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1419            where
1420                Self: Sized,
1421            {
1422                Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1423            }
1424
1425            fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1426                $(self.$idx.start(ctx)?;)+
1427                Ok(())
1428            }
1429
1430            fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1431                $(self.$idx.process_copperlist(ctx, view)?;)+
1432                Ok(())
1433            }
1434
1435            fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1436                $(self.$idx.observe_copperlist_io(stats);)+
1437            }
1438
1439            fn process_error(
1440                &self,
1441                component_id: ComponentId,
1442                step: CuComponentState,
1443                error: &CuError,
1444            ) -> Decision {
1445                let mut decision = Decision::Ignore;
1446                $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1447                decision
1448            }
1449
1450            fn process_panic(&self, panic_message: &str) {
1451                $(self.$idx.process_panic(panic_message);)+
1452            }
1453
1454            fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1455                $(self.$idx.stop(ctx)?;)+
1456                Ok(())
1457            }
1458        }
1459    };
1460}
1461
1462impl_monitor_tuple!(0 => M0, 1 => M1);
1463impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1464impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1465impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1466impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1467
1468#[cfg(feature = "std")]
1469pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1470    if let Some(msg) = payload.downcast_ref::<&str>() {
1471        (*msg).to_string()
1472    } else if let Some(msg) = payload.downcast_ref::<String>() {
1473        msg.clone()
1474    } else {
1475        "panic with non-string payload".to_string()
1476    }
1477}
1478
1479/// A simple allocator that counts the number of bytes allocated and deallocated.
1480pub struct CountingAlloc<A: GlobalAlloc> {
1481    inner: A,
1482    allocated: AtomicUsize,
1483    deallocated: AtomicUsize,
1484}
1485
1486impl<A: GlobalAlloc> CountingAlloc<A> {
1487    pub const fn new(inner: A) -> Self {
1488        CountingAlloc {
1489            inner,
1490            allocated: AtomicUsize::new(0),
1491            deallocated: AtomicUsize::new(0),
1492        }
1493    }
1494
1495    pub fn allocated(&self) -> usize {
1496        self.allocated.load(Ordering::SeqCst)
1497    }
1498
1499    pub fn deallocated(&self) -> usize {
1500        self.deallocated.load(Ordering::SeqCst)
1501    }
1502
1503    pub fn reset(&self) {
1504        self.allocated.store(0, Ordering::SeqCst);
1505        self.deallocated.store(0, Ordering::SeqCst);
1506    }
1507}
1508
1509// SAFETY: Delegates allocation/deallocation to the inner allocator while tracking sizes.
1510unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1511    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1512    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1513        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1514        let p = unsafe { self.inner.alloc(layout) };
1515        if !p.is_null() {
1516            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1517        }
1518        p
1519    }
1520
1521    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1522    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1523        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1524        unsafe { self.inner.dealloc(ptr, layout) }
1525        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1526    }
1527}
1528
1529/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
1530#[cfg(feature = "memory_monitoring")]
1531pub struct ScopedAllocCounter {
1532    bf_allocated: usize,
1533    bf_deallocated: usize,
1534}
1535
1536#[cfg(feature = "memory_monitoring")]
1537impl Default for ScopedAllocCounter {
1538    fn default() -> Self {
1539        Self::new()
1540    }
1541}
1542
1543#[cfg(feature = "memory_monitoring")]
1544impl ScopedAllocCounter {
1545    pub fn new() -> Self {
1546        ScopedAllocCounter {
1547            bf_allocated: GLOBAL.allocated(),
1548            bf_deallocated: GLOBAL.deallocated(),
1549        }
1550    }
1551
1552    /// Returns the total number of bytes allocated in the current scope
1553    /// since the creation of this `ScopedAllocCounter`.
1554    ///
1555    /// # Example
1556    /// ```
1557    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1558    ///
1559    /// let counter = ScopedAllocCounter::new();
1560    /// let _vec = vec![0u8; 1024];
1561    /// println!("Bytes allocated: {}", counter.get_allocated());
1562    /// ```
1563    pub fn allocated(&self) -> usize {
1564        GLOBAL.allocated() - self.bf_allocated
1565    }
1566
1567    /// Returns the total number of bytes deallocated in the current scope
1568    /// since the creation of this `ScopedAllocCounter`.
1569    ///
1570    /// # Example
1571    /// ```
1572    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1573    ///
1574    /// let counter = ScopedAllocCounter::new();
1575    /// let _vec = vec![0u8; 1024];
1576    /// drop(_vec);
1577    /// println!("Bytes deallocated: {}", counter.get_deallocated());
1578    /// ```
1579    pub fn deallocated(&self) -> usize {
1580        GLOBAL.deallocated() - self.bf_deallocated
1581    }
1582}
1583
1584/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
1585#[cfg(feature = "memory_monitoring")]
1586impl Drop for ScopedAllocCounter {
1587    fn drop(&mut self) {
1588        let _allocated = GLOBAL.allocated() - self.bf_allocated;
1589        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1590        // TODO(gbin): Fix this when the logger is ready.
1591        // debug!(
1592        //     "Allocations: +{}B -{}B",
1593        //     allocated = allocated,
1594        //     deallocated = deallocated,
1595        // );
1596    }
1597}
1598
1599#[cfg(feature = "std")]
1600const BUCKET_COUNT: usize = 1024;
1601#[cfg(not(feature = "std"))]
1602const BUCKET_COUNT: usize = 256;
1603
1604/// Accumulative stat object that can give your some real time statistics.
1605/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
1606#[derive(Debug, Clone)]
1607pub struct LiveStatistics {
1608    buckets: [u64; BUCKET_COUNT],
1609    min_val: u64,
1610    max_val: u64,
1611    sum: u128,
1612    sum_sq: u128,
1613    count: u64,
1614    max_value: u64,
1615}
1616
1617impl LiveStatistics {
1618    /// Creates a new `LiveStatistics` instance with a specified maximum value.
1619    ///
1620    /// This function initializes a `LiveStatistics` structure with default values
1621    /// for tracking statistical data, while setting an upper limit for the data
1622    /// points that the structure tracks.
1623    ///
1624    /// # Parameters
1625    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
1626    ///
1627    /// # Returns
1628    /// A new instance of `LiveStatistics` with:
1629    /// - `buckets`: An array pre-filled with zeros to categorize data points.
1630    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
1631    /// - `max_val`: Initialized to zero.
1632    /// - `sum`: The sum of all data points, initialized to zero.
1633    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
1634    /// - `count`: The total number of data points, initialized to zero.
1635    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
1636    ///
1637    pub fn new_with_max(max_value: u64) -> Self {
1638        LiveStatistics {
1639            buckets: [0; BUCKET_COUNT],
1640            min_val: u64::MAX,
1641            max_val: 0,
1642            sum: 0,
1643            sum_sq: 0,
1644            count: 0,
1645            max_value,
1646        }
1647    }
1648
1649    #[inline]
1650    fn value_to_bucket(&self, value: u64) -> usize {
1651        if value >= self.max_value {
1652            BUCKET_COUNT - 1
1653        } else {
1654            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
1655        }
1656    }
1657
1658    #[inline]
1659    pub fn min(&self) -> u64 {
1660        if self.count == 0 { 0 } else { self.min_val }
1661    }
1662
1663    #[inline]
1664    pub fn max(&self) -> u64 {
1665        self.max_val
1666    }
1667
1668    #[inline]
1669    pub fn mean(&self) -> f64 {
1670        if self.count == 0 {
1671            0.0
1672        } else {
1673            self.sum as f64 / self.count as f64
1674        }
1675    }
1676
1677    #[inline]
1678    pub fn stdev(&self) -> f64 {
1679        if self.count == 0 {
1680            return 0.0;
1681        }
1682        let mean = self.mean();
1683        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
1684        if variance < 0.0 {
1685            return 0.0;
1686        }
1687        #[cfg(feature = "std")]
1688        return variance.sqrt();
1689        #[cfg(not(feature = "std"))]
1690        return sqrt(variance);
1691    }
1692
1693    #[inline]
1694    pub fn percentile(&self, percentile: f64) -> u64 {
1695        if self.count == 0 {
1696            return 0;
1697        }
1698
1699        let target_count = (self.count as f64 * percentile) as u64;
1700        let mut accumulated = 0u64;
1701
1702        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
1703            accumulated += bucket_count;
1704            if accumulated >= target_count {
1705                // Linear interpolation within the bucket
1706                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
1707                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
1708                let bucket_fraction = if bucket_count > 0 {
1709                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
1710                } else {
1711                    0.5
1712                };
1713                return bucket_start
1714                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
1715            }
1716        }
1717
1718        self.max_val
1719    }
1720
1721    /// Adds a value to the statistics.
1722    #[inline]
1723    pub fn record(&mut self, value: u64) {
1724        if value < self.min_val {
1725            self.min_val = value;
1726        }
1727        if value > self.max_val {
1728            self.max_val = value;
1729        }
1730        let value_u128 = value as u128;
1731        self.sum += value_u128;
1732        self.sum_sq += value_u128 * value_u128;
1733        self.count += 1;
1734
1735        let bucket = self.value_to_bucket(value);
1736        self.buckets[bucket] += 1;
1737    }
1738
1739    #[inline]
1740    pub fn len(&self) -> u64 {
1741        self.count
1742    }
1743
1744    #[inline]
1745    pub fn is_empty(&self) -> bool {
1746        self.count == 0
1747    }
1748
1749    #[inline]
1750    pub fn reset(&mut self) {
1751        self.buckets.fill(0);
1752        self.min_val = u64::MAX;
1753        self.max_val = 0;
1754        self.sum = 0;
1755        self.sum_sq = 0;
1756        self.count = 0;
1757    }
1758}
1759
1760/// A Specialized statistics object for CuDuration.
1761/// It will also keep track of the jitter between the values.
1762#[derive(Debug, Clone)]
1763pub struct CuDurationStatistics {
1764    bare: LiveStatistics,
1765    jitter: LiveStatistics,
1766    last_value: CuDuration,
1767}
1768
1769impl CuDurationStatistics {
1770    pub fn new(max: CuDuration) -> Self {
1771        let CuDuration(max) = max;
1772        CuDurationStatistics {
1773            bare: LiveStatistics::new_with_max(max),
1774            jitter: LiveStatistics::new_with_max(max),
1775            last_value: CuDuration::default(),
1776        }
1777    }
1778
1779    #[inline]
1780    pub fn min(&self) -> CuDuration {
1781        CuDuration(self.bare.min())
1782    }
1783
1784    #[inline]
1785    pub fn max(&self) -> CuDuration {
1786        CuDuration(self.bare.max())
1787    }
1788
1789    #[inline]
1790    pub fn mean(&self) -> CuDuration {
1791        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
1792    }
1793
1794    #[inline]
1795    pub fn percentile(&self, percentile: f64) -> CuDuration {
1796        CuDuration(self.bare.percentile(percentile))
1797    }
1798
1799    #[inline]
1800    pub fn stddev(&self) -> CuDuration {
1801        CuDuration(self.bare.stdev() as u64)
1802    }
1803
1804    #[inline]
1805    pub fn len(&self) -> u64 {
1806        self.bare.len()
1807    }
1808
1809    #[inline]
1810    pub fn is_empty(&self) -> bool {
1811        self.bare.len() == 0
1812    }
1813
1814    #[inline]
1815    pub fn jitter_min(&self) -> CuDuration {
1816        CuDuration(self.jitter.min())
1817    }
1818
1819    #[inline]
1820    pub fn jitter_max(&self) -> CuDuration {
1821        CuDuration(self.jitter.max())
1822    }
1823
1824    #[inline]
1825    pub fn jitter_mean(&self) -> CuDuration {
1826        CuDuration(self.jitter.mean() as u64)
1827    }
1828
1829    #[inline]
1830    pub fn jitter_stddev(&self) -> CuDuration {
1831        CuDuration(self.jitter.stdev() as u64)
1832    }
1833
1834    #[inline]
1835    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
1836        CuDuration(self.jitter.percentile(percentile))
1837    }
1838
1839    #[inline]
1840    pub fn record(&mut self, value: CuDuration) {
1841        let CuDuration(nanos) = value;
1842        if self.bare.is_empty() {
1843            self.bare.record(nanos);
1844            self.last_value = value;
1845            return;
1846        }
1847        self.bare.record(nanos);
1848        let CuDuration(last_nanos) = self.last_value;
1849        self.jitter.record(nanos.abs_diff(last_nanos));
1850        self.last_value = value;
1851    }
1852
1853    #[inline]
1854    pub fn reset(&mut self) {
1855        self.bare.reset();
1856        self.jitter.reset();
1857    }
1858}
1859
1860#[cfg(test)]
1861mod tests {
1862    use super::*;
1863    use core::sync::atomic::{AtomicUsize, Ordering};
1864
1865    #[derive(Clone, Copy)]
1866    enum TestDecision {
1867        Ignore,
1868        Abort,
1869        Shutdown,
1870    }
1871
1872    struct TestMonitor {
1873        decision: TestDecision,
1874        copperlist_calls: AtomicUsize,
1875        panic_calls: AtomicUsize,
1876    }
1877
1878    impl TestMonitor {
1879        fn new_with(decision: TestDecision) -> Self {
1880            Self {
1881                decision,
1882                copperlist_calls: AtomicUsize::new(0),
1883                panic_calls: AtomicUsize::new(0),
1884            }
1885        }
1886    }
1887
1888    fn test_metadata() -> CuMonitoringMetadata {
1889        const COMPONENTS: &[MonitorComponentMetadata] = &[
1890            MonitorComponentMetadata::new("a", ComponentType::Task, None),
1891            MonitorComponentMetadata::new("b", ComponentType::Task, None),
1892        ];
1893        CuMonitoringMetadata::new(
1894            CompactString::from(crate::config::DEFAULT_MISSION_ID),
1895            COMPONENTS,
1896            &[],
1897            CopperListInfo::new(0, 0),
1898            MonitorTopology::default(),
1899            None,
1900        )
1901        .expect("test metadata should be valid")
1902    }
1903
1904    impl CuMonitor for TestMonitor {
1905        fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
1906            let monitor = Self::new_with(TestDecision::Ignore);
1907            #[cfg(feature = "std")]
1908            let _ = runtime.execution_probe();
1909            Ok(monitor)
1910        }
1911
1912        fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1913            self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
1914            Ok(())
1915        }
1916
1917        fn process_error(
1918            &self,
1919            _component_id: ComponentId,
1920            _step: CuComponentState,
1921            _error: &CuError,
1922        ) -> Decision {
1923            match self.decision {
1924                TestDecision::Ignore => Decision::Ignore,
1925                TestDecision::Abort => Decision::Abort,
1926                TestDecision::Shutdown => Decision::Shutdown,
1927            }
1928        }
1929
1930        fn process_panic(&self, _panic_message: &str) {
1931            self.panic_calls.fetch_add(1, Ordering::SeqCst);
1932        }
1933    }
1934
1935    #[test]
1936    fn test_live_statistics_percentiles() {
1937        let mut stats = LiveStatistics::new_with_max(1000);
1938
1939        // Record 100 values from 0 to 99
1940        for i in 0..100 {
1941            stats.record(i);
1942        }
1943
1944        assert_eq!(stats.len(), 100);
1945        assert_eq!(stats.min(), 0);
1946        assert_eq!(stats.max(), 99);
1947        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
1948
1949        // Test percentiles - should be approximately correct
1950        let p50 = stats.percentile(0.5);
1951        let p90 = stats.percentile(0.90);
1952        let p95 = stats.percentile(0.95);
1953        let p99 = stats.percentile(0.99);
1954
1955        // With 100 samples from 0-99, percentiles should be close to their index
1956        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
1957        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
1958        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
1959        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
1960    }
1961
1962    #[test]
1963    fn test_duration_stats() {
1964        let mut stats = CuDurationStatistics::new(CuDuration(1000));
1965        stats.record(CuDuration(100));
1966        stats.record(CuDuration(200));
1967        stats.record(CuDuration(500));
1968        stats.record(CuDuration(400));
1969        assert_eq!(stats.min(), CuDuration(100));
1970        assert_eq!(stats.max(), CuDuration(500));
1971        assert_eq!(stats.mean(), CuDuration(300));
1972        assert_eq!(stats.len(), 4);
1973        assert_eq!(stats.jitter.len(), 3);
1974        assert_eq!(stats.jitter_min(), CuDuration(100));
1975        assert_eq!(stats.jitter_max(), CuDuration(300));
1976        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
1977        stats.reset();
1978        assert_eq!(stats.len(), 0);
1979    }
1980
1981    #[test]
1982    fn test_duration_stats_large_samples_do_not_overflow() {
1983        let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
1984        stats.record(CuDuration(5_000_000_000));
1985        stats.record(CuDuration(8_000_000_000));
1986
1987        assert_eq!(stats.min(), CuDuration(5_000_000_000));
1988        assert_eq!(stats.max(), CuDuration(8_000_000_000));
1989        assert_eq!(stats.mean(), CuDuration(6_500_000_000));
1990        assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
1991        assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
1992    }
1993
1994    #[test]
1995    fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
1996        let err = CuError::from("boom");
1997
1998        let two = (
1999            TestMonitor::new_with(TestDecision::Ignore),
2000            TestMonitor::new_with(TestDecision::Shutdown),
2001        );
2002        assert!(matches!(
2003            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2004            Decision::Shutdown
2005        ));
2006
2007        let two = (
2008            TestMonitor::new_with(TestDecision::Ignore),
2009            TestMonitor::new_with(TestDecision::Abort),
2010        );
2011        assert!(matches!(
2012            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2013            Decision::Abort
2014        ));
2015    }
2016
2017    #[test]
2018    fn tuple_monitor_fans_out_callbacks() {
2019        let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2020            test_metadata(),
2021            CuMonitoringRuntime::unavailable(),
2022        )
2023        .expect("tuple new");
2024        let (ctx, _clock_control) = CuContext::new_mock_clock();
2025        let empty_view = test_metadata().layout().view(&[]);
2026        monitors
2027            .process_copperlist(&ctx, empty_view)
2028            .expect("process_copperlist should fan out");
2029        monitors.process_panic("panic marker");
2030
2031        assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2032        assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2033        assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2034        assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2035    }
2036
2037    fn encoded_size<E: Encode>(value: &E) -> usize {
2038        let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2039        value
2040            .encode(&mut encoder)
2041            .expect("size measurement encoder should not fail");
2042        encoder.into_writer().bytes_written
2043    }
2044
2045    #[test]
2046    fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2047        let payload = vec![1u8, 2, 3, 4];
2048        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2049
2050        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2051        assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2052        assert_eq!(io.handle_bytes, 0);
2053    }
2054
2055    #[test]
2056    fn payload_io_stats_tracks_handle_backed_storage() {
2057        let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2058        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2059
2060        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2061        assert_eq!(
2062            io.resident_bytes,
2063            core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2064        );
2065        assert_eq!(io.handle_bytes, 32);
2066    }
2067
2068    #[test]
2069    fn runtime_execution_probe_roundtrip_marker() {
2070        let probe = RuntimeExecutionProbe::default();
2071        assert!(probe.marker().is_none());
2072        assert_eq!(probe.sequence(), 0);
2073
2074        probe.record(ExecutionMarker {
2075            component_id: ComponentId::new(7),
2076            step: CuComponentState::Process,
2077            culistid: Some(42),
2078        });
2079
2080        let marker = probe.marker().expect("marker should be available");
2081        assert_eq!(marker.component_id, ComponentId::new(7));
2082        assert!(matches!(marker.step, CuComponentState::Process));
2083        assert_eq!(marker.culistid, Some(42));
2084        assert_eq!(probe.sequence(), 1);
2085    }
2086}