Skip to main content

cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the components it runs.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{
6    BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7    TaskKind, resolve_task_kind_for_id,
8};
9use crate::context::CuContext;
10use crate::cutask::CuMsgMetadata;
11use bincode::Encode;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SizeWriter;
15use compact_str::CompactString;
16use cu29_clock::CuDuration;
17#[allow(unused_imports)]
18use cu29_log::CuLogLevel;
19#[cfg(all(feature = "std", debug_assertions))]
20use cu29_log_runtime::{
21    format_message_only, register_live_log_listener, unregister_live_log_listener,
22};
23use cu29_traits::{
24    CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
25    finish_observed_encode,
26};
27use portable_atomic::{
28    AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
29};
30use serde_derive::{Deserialize, Serialize};
31
32#[cfg(not(feature = "std"))]
33extern crate alloc;
34
35#[cfg(feature = "std")]
36use core::cell::Cell;
37#[cfg(feature = "std")]
38use std::backtrace::Backtrace;
39#[cfg(feature = "std")]
40use std::fs::File;
41#[cfg(feature = "std")]
42use std::io::Write;
43#[cfg(feature = "std")]
44use std::panic::PanicHookInfo;
45#[cfg(feature = "std")]
46use std::sync::{Arc, Mutex as StdMutex, OnceLock};
47#[cfg(feature = "std")]
48use std::thread_local;
49#[cfg(feature = "std")]
50use std::time::{SystemTime, UNIX_EPOCH};
51#[cfg(feature = "std")]
52use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
53
54#[cfg(not(feature = "std"))]
55use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
56#[cfg(not(target_has_atomic = "64"))]
57use spin::Mutex;
58#[cfg(not(feature = "std"))]
59use spin::Mutex as SpinMutex;
60
61#[cfg(not(feature = "std"))]
62mod imp {
63    pub use alloc::alloc::{GlobalAlloc, Layout};
64    #[cfg(target_has_atomic = "64")]
65    pub use core::sync::atomic::AtomicU64;
66    pub use core::sync::atomic::{AtomicUsize, Ordering};
67    pub use libm::sqrt;
68}
69
70#[cfg(feature = "std")]
71mod imp {
72    #[cfg(feature = "memory_monitoring")]
73    use super::CountingAlloc;
74    #[cfg(feature = "memory_monitoring")]
75    pub use std::alloc::System;
76    pub use std::alloc::{GlobalAlloc, Layout};
77    #[cfg(target_has_atomic = "64")]
78    pub use std::sync::atomic::AtomicU64;
79    pub use std::sync::atomic::{AtomicUsize, Ordering};
80    #[cfg(feature = "memory_monitoring")]
81    #[global_allocator]
82    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
83}
84
85use imp::*;
86
87#[cfg(all(feature = "std", debug_assertions))]
88fn format_timestamp(time: CuDuration) -> String {
89    // Render CuTime/CuDuration as HH:mm:ss.xxxx (4 fractional digits of a second).
90    let nanos = time.as_nanos();
91    let total_seconds = nanos / 1_000_000_000;
92    let hours = total_seconds / 3600;
93    let minutes = (total_seconds / 60) % 60;
94    let seconds = total_seconds % 60;
95    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
96    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
97}
98
99/// Lifecycle state of a monitored component.
100#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
101pub enum CuComponentState {
102    Start,
103    Preprocess,
104    Process,
105    Postprocess,
106    Stop,
107}
108
109/// Strongly-typed index into [`CuMonitoringMetadata::components`].
110#[repr(transparent)]
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub struct ComponentId(usize);
113
114impl ComponentId {
115    pub const INVALID: Self = Self(usize::MAX);
116
117    #[inline]
118    pub const fn new(index: usize) -> Self {
119        Self(index)
120    }
121
122    #[inline]
123    pub const fn index(self) -> usize {
124        self.0
125    }
126}
127
128impl core::fmt::Display for ComponentId {
129    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
130        self.0.fmt(f)
131    }
132}
133
134impl From<ComponentId> for usize {
135    fn from(value: ComponentId) -> Self {
136        value.index()
137    }
138}
139
140/// Strongly-typed CopperList slot index.
141#[repr(transparent)]
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
143pub struct CuListSlot(usize);
144
145impl CuListSlot {
146    #[inline]
147    pub const fn new(index: usize) -> Self {
148        Self(index)
149    }
150
151    #[inline]
152    pub const fn index(self) -> usize {
153        self.0
154    }
155}
156
157impl From<CuListSlot> for usize {
158    fn from(value: CuListSlot) -> Self {
159        value.index()
160    }
161}
162
163/// Static monitor-side CopperList indexing layout.
164///
165/// This layout is mission/runtime scoped and remains constant after monitor construction.
166#[derive(Debug, Clone, Copy)]
167pub struct CopperListLayout {
168    components: &'static [MonitorComponentMetadata],
169    slot_to_component: &'static [ComponentId],
170}
171
172impl CopperListLayout {
173    #[inline]
174    pub const fn new(
175        components: &'static [MonitorComponentMetadata],
176        slot_to_component: &'static [ComponentId],
177    ) -> Self {
178        Self {
179            components,
180            slot_to_component,
181        }
182    }
183
184    #[inline]
185    pub const fn components(self) -> &'static [MonitorComponentMetadata] {
186        self.components
187    }
188
189    #[inline]
190    pub const fn component_count(self) -> usize {
191        self.components.len()
192    }
193
194    #[inline]
195    pub const fn culist_slot_count(self) -> usize {
196        self.slot_to_component.len()
197    }
198
199    #[inline]
200    pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
201        &self.components[id.index()]
202    }
203
204    #[inline]
205    pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
206        self.slot_to_component[culist_slot.index()]
207    }
208
209    #[inline]
210    pub const fn slot_to_component(self) -> &'static [ComponentId] {
211        self.slot_to_component
212    }
213
214    #[inline]
215    pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
216        CopperListView::new(self, msgs)
217    }
218}
219
220/// Per-loop monitor view over CopperList metadata paired with static component mapping.
221#[derive(Debug, Clone, Copy)]
222pub struct CopperListView<'a> {
223    layout: CopperListLayout,
224    msgs: &'a [&'a CuMsgMetadata],
225}
226
227impl<'a> CopperListView<'a> {
228    #[inline]
229    pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
230        assert_eq!(
231            msgs.len(),
232            layout.culist_slot_count(),
233            "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
234            msgs.len(),
235            layout.culist_slot_count()
236        );
237        Self { layout, msgs }
238    }
239
240    #[inline]
241    pub const fn layout(self) -> CopperListLayout {
242        self.layout
243    }
244
245    #[inline]
246    pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
247        self.msgs
248    }
249
250    #[inline]
251    pub const fn len(self) -> usize {
252        self.msgs.len()
253    }
254
255    #[inline]
256    pub const fn is_empty(self) -> bool {
257        self.msgs.is_empty()
258    }
259
260    #[inline]
261    pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
262        let index = culist_slot.index();
263        CopperListEntry {
264            culist_slot,
265            component_id: self.layout.component_for_slot(culist_slot),
266            msg: self.msgs[index],
267        }
268    }
269
270    pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
271        self.msgs.iter().enumerate().map(move |(idx, msg)| {
272            let culist_slot = CuListSlot::new(idx);
273            CopperListEntry {
274                culist_slot,
275                component_id: self.layout.component_for_slot(culist_slot),
276                msg,
277            }
278        })
279    }
280}
281
282/// One message entry in CopperList slot order with resolved component identity.
283#[derive(Debug, Clone, Copy)]
284pub struct CopperListEntry<'a> {
285    pub culist_slot: CuListSlot,
286    pub component_id: ComponentId,
287    pub msg: &'a CuMsgMetadata,
288}
289
290impl<'a> CopperListEntry<'a> {
291    #[inline]
292    pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
293        layout.component(self.component_id)
294    }
295
296    #[inline]
297    pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
298        layout.component(self.component_id).kind()
299    }
300}
301
302/// Execution progress marker emitted by the runtime before running a component step.
303#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
304pub struct ExecutionMarker {
305    /// Index into `CuMonitoringMetadata::components()`.
306    pub component_id: ComponentId,
307    /// Lifecycle phase currently entered.
308    pub step: CuComponentState,
309    /// CopperList id when available (runtime loop), None during start/stop.
310    pub culistid: Option<u64>,
311}
312
313/// Lock-free runtime-side progress probe.
314///
315/// The runtime writes execution markers directly into this probe from the hot path
316/// (without calling monitor fan-out callbacks), and monitors can read a coherent
317/// snapshot from watchdog threads when diagnosing stalls.
318#[derive(Debug)]
319pub struct RuntimeExecutionProbe {
320    component_id: AtomicUsize,
321    step: AtomicUsize,
322    #[cfg(target_has_atomic = "64")]
323    culistid: AtomicU64,
324    #[cfg(target_has_atomic = "64")]
325    culistid_present: AtomicUsize,
326    #[cfg(not(target_has_atomic = "64"))]
327    culistid: Mutex<Option<u64>>,
328    sequence: AtomicUsize,
329}
330
331impl Default for RuntimeExecutionProbe {
332    fn default() -> Self {
333        Self {
334            component_id: AtomicUsize::new(ComponentId::INVALID.index()),
335            step: AtomicUsize::new(0),
336            #[cfg(target_has_atomic = "64")]
337            culistid: AtomicU64::new(0),
338            #[cfg(target_has_atomic = "64")]
339            culistid_present: AtomicUsize::new(0),
340            #[cfg(not(target_has_atomic = "64"))]
341            culistid: Mutex::new(None),
342            sequence: AtomicUsize::new(0),
343        }
344    }
345}
346
347impl RuntimeExecutionProbe {
348    #[inline]
349    pub fn record(&self, marker: ExecutionMarker) {
350        self.component_id
351            .store(marker.component_id.index(), Ordering::Relaxed);
352        self.step
353            .store(component_state_to_usize(marker.step), Ordering::Relaxed);
354        #[cfg(target_has_atomic = "64")]
355        match marker.culistid {
356            Some(culistid) => {
357                self.culistid.store(culistid, Ordering::Relaxed);
358                self.culistid_present.store(1, Ordering::Relaxed);
359            }
360            None => {
361                self.culistid_present.store(0, Ordering::Relaxed);
362            }
363        }
364        #[cfg(not(target_has_atomic = "64"))]
365        {
366            *self.culistid.lock() = marker.culistid;
367        }
368        self.sequence.fetch_add(1, Ordering::Release);
369    }
370
371    #[inline]
372    pub fn sequence(&self) -> usize {
373        self.sequence.load(Ordering::Acquire)
374    }
375
376    #[inline]
377    pub fn marker(&self) -> Option<ExecutionMarker> {
378        // Read a coherent snapshot. A concurrent writer may change values between reads;
379        // in that case we retry to keep the marker and sequence aligned.
380        loop {
381            let seq_before = self.sequence.load(Ordering::Acquire);
382            let component_id = self.component_id.load(Ordering::Relaxed);
383            let step = self.step.load(Ordering::Relaxed);
384            #[cfg(target_has_atomic = "64")]
385            let culistid_present = self.culistid_present.load(Ordering::Relaxed);
386            #[cfg(target_has_atomic = "64")]
387            let culistid_value = self.culistid.load(Ordering::Relaxed);
388            #[cfg(not(target_has_atomic = "64"))]
389            let culistid = *self.culistid.lock();
390            let seq_after = self.sequence.load(Ordering::Acquire);
391            if seq_before == seq_after {
392                if component_id == ComponentId::INVALID.index() {
393                    return None;
394                }
395                let step = usize_to_component_state(step);
396                #[cfg(target_has_atomic = "64")]
397                let culistid = if culistid_present == 0 {
398                    None
399                } else {
400                    Some(culistid_value)
401                };
402                return Some(ExecutionMarker {
403                    component_id: ComponentId::new(component_id),
404                    step,
405                    culistid,
406                });
407            }
408        }
409    }
410}
411
412#[inline]
413const fn component_state_to_usize(step: CuComponentState) -> usize {
414    match step {
415        CuComponentState::Start => 0,
416        CuComponentState::Preprocess => 1,
417        CuComponentState::Process => 2,
418        CuComponentState::Postprocess => 3,
419        CuComponentState::Stop => 4,
420    }
421}
422
423#[inline]
424const fn usize_to_component_state(step: usize) -> CuComponentState {
425    match step {
426        0 => CuComponentState::Start,
427        1 => CuComponentState::Preprocess,
428        2 => CuComponentState::Process,
429        3 => CuComponentState::Postprocess,
430        _ => CuComponentState::Stop,
431    }
432}
433
434#[cfg(feature = "std")]
435pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
436
437/// Platform-neutral monitor view of runtime execution progress.
438///
439/// In `std` builds this can wrap a shared runtime probe. In `no_std` builds it is currently
440/// unavailable and helper methods return `None`/`false`.
441#[derive(Debug, Clone)]
442pub struct MonitorExecutionProbe {
443    #[cfg(feature = "std")]
444    inner: Option<ExecutionProbeHandle>,
445}
446
447impl Default for MonitorExecutionProbe {
448    fn default() -> Self {
449        Self::unavailable()
450    }
451}
452
453impl MonitorExecutionProbe {
454    #[cfg(feature = "std")]
455    pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
456        Self {
457            inner: Some(handle),
458        }
459    }
460
461    pub const fn unavailable() -> Self {
462        Self {
463            #[cfg(feature = "std")]
464            inner: None,
465        }
466    }
467
468    pub fn is_available(&self) -> bool {
469        #[cfg(feature = "std")]
470        {
471            self.inner.is_some()
472        }
473        #[cfg(not(feature = "std"))]
474        {
475            false
476        }
477    }
478
479    pub fn marker(&self) -> Option<ExecutionMarker> {
480        #[cfg(feature = "std")]
481        {
482            self.inner.as_ref().and_then(|probe| probe.marker())
483        }
484        #[cfg(not(feature = "std"))]
485        {
486            None
487        }
488    }
489
490    pub fn sequence(&self) -> Option<usize> {
491        #[cfg(feature = "std")]
492        {
493            self.inner.as_ref().map(|probe| probe.sequence())
494        }
495        #[cfg(not(feature = "std"))]
496        {
497            None
498        }
499    }
500}
501
502/// Runtime component category used by monitoring metadata and topology.
503///
504/// A "task" is a regular Copper task (lifecycle callbacks + payload processing). A "bridge"
505/// is a monitored bridge-side execution component (bridge nodes and channel endpoints).
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507#[non_exhaustive]
508pub enum ComponentType {
509    Source,
510    Task,
511    Sink,
512    Bridge,
513}
514
515impl ComponentType {
516    pub const fn is_task(self) -> bool {
517        !matches!(self, Self::Bridge)
518    }
519}
520
521/// Static identity entry for one monitored runtime component.
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523pub struct MonitorComponentMetadata {
524    id: &'static str,
525    kind: ComponentType,
526    type_name: Option<&'static str>,
527}
528
529impl MonitorComponentMetadata {
530    pub const fn new(
531        id: &'static str,
532        kind: ComponentType,
533        type_name: Option<&'static str>,
534    ) -> Self {
535        Self {
536            id,
537            kind,
538            type_name,
539        }
540    }
541
542    /// Stable monitor component id (for logs/debug and joins with runtime markers).
543    pub const fn id(&self) -> &'static str {
544        self.id
545    }
546
547    pub const fn kind(&self) -> ComponentType {
548        self.kind
549    }
550
551    /// Rust type label when available (typically tasks); `None` for synthetic bridge entries.
552    pub const fn type_name(&self) -> Option<&'static str> {
553        self.type_name
554    }
555}
556
557/// Immutable runtime-provided metadata passed once to [`CuMonitor::new`].
558///
559/// This bundles identifiers, deterministic component layout, and monitor-specific config so monitor
560/// construction is explicit and does not need ad-hoc late setters.
561#[derive(Debug, Clone)]
562pub struct CuMonitoringMetadata {
563    mission_id: CompactString,
564    subsystem_id: Option<CompactString>,
565    instance_id: u32,
566    layout: CopperListLayout,
567    copperlist_info: CopperListInfo,
568    topology: MonitorTopology,
569    monitor_config: Option<ComponentConfig>,
570}
571
572impl CuMonitoringMetadata {
573    pub fn new(
574        mission_id: CompactString,
575        components: &'static [MonitorComponentMetadata],
576        culist_component_mapping: &'static [ComponentId],
577        copperlist_info: CopperListInfo,
578        topology: MonitorTopology,
579        monitor_config: Option<ComponentConfig>,
580    ) -> CuResult<Self> {
581        Self::validate_components(components)?;
582        Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
583        Ok(Self {
584            mission_id,
585            subsystem_id: None,
586            instance_id: 0,
587            layout: CopperListLayout::new(components, culist_component_mapping),
588            copperlist_info,
589            topology,
590            monitor_config,
591        })
592    }
593
594    fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
595        let mut seen_bridge = false;
596        for component in components {
597            match component.kind() {
598                component_type if component_type.is_task() && seen_bridge => {
599                    return Err(CuError::from(
600                        "invalid monitor metadata: task-family components must appear before bridges",
601                    ));
602                }
603                ComponentType::Bridge => seen_bridge = true,
604                _ => {}
605            }
606        }
607        Ok(())
608    }
609
610    fn validate_culist_mapping(
611        components_len: usize,
612        culist_component_mapping: &'static [ComponentId],
613    ) -> CuResult<()> {
614        for component_idx in culist_component_mapping {
615            if component_idx.index() >= components_len {
616                return Err(CuError::from(
617                    "invalid monitor metadata: culist mapping points past components table",
618                ));
619            }
620        }
621        Ok(())
622    }
623
624    /// Active mission identifier for this runtime instance.
625    pub fn mission_id(&self) -> &str {
626        self.mission_id.as_str()
627    }
628
629    /// Compile-time subsystem identifier for this runtime instance when running in a
630    /// multi-Copper deployment.
631    pub fn subsystem_id(&self) -> Option<&str> {
632        self.subsystem_id.as_deref()
633    }
634
635    /// Deployment/runtime instance identity for this runtime instance.
636    pub fn instance_id(&self) -> u32 {
637        self.instance_id
638    }
639
640    /// Canonical table of monitored runtime components.
641    ///
642    /// Ordering is deterministic and mission-scoped: tasks first, then bridge-side components.
643    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
644        self.layout.components()
645    }
646
647    /// Total number of monitored components.
648    pub const fn component_count(&self) -> usize {
649        self.layout.component_count()
650    }
651
652    /// Static runtime layout used to map CopperList slots to components.
653    pub const fn layout(&self) -> CopperListLayout {
654        self.layout
655    }
656
657    pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
658        self.layout.component(component_id)
659    }
660
661    pub fn component_id(&self, component_id: ComponentId) -> &'static str {
662        self.component(component_id).id()
663    }
664
665    pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
666        self.component(component_id).kind()
667    }
668
669    pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
670        self.layout
671            .components()
672            .iter()
673            .position(|component| component.id() == component_id)
674            .map(ComponentId::new)
675    }
676
677    /// CopperList slot -> monitored component index mapping.
678    ///
679    /// This table maps each CopperList slot index to the producing component index.
680    pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
681        self.layout.slot_to_component()
682    }
683
684    pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
685        self.layout.component_for_slot(culist_slot)
686    }
687
688    pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
689        self.layout.view(msgs)
690    }
691
692    pub const fn copperlist_info(&self) -> CopperListInfo {
693        self.copperlist_info
694    }
695
696    /// Resolved graph topology for the active mission.
697    ///
698    /// This is always available. Nodes represent config graph nodes, not every synthetic bridge
699    /// channel entry in `components()`.
700    pub fn topology(&self) -> &MonitorTopology {
701        &self.topology
702    }
703
704    pub fn monitor_config(&self) -> Option<&ComponentConfig> {
705        self.monitor_config.as_ref()
706    }
707
708    pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
709        self.monitor_config = monitor_config;
710        self
711    }
712
713    pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
714        self.subsystem_id = subsystem_id.map(CompactString::from);
715        self
716    }
717
718    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
719        self.instance_id = instance_id;
720        self
721    }
722}
723
724/// Runtime-provided dynamic monitoring handles passed once to [`CuMonitor::new`].
725///
726/// This context may expose live runtime state (for example execution progress probes).
727#[derive(Debug, Clone, Default)]
728pub struct CuMonitoringRuntime {
729    execution_probe: MonitorExecutionProbe,
730}
731
732impl CuMonitoringRuntime {
733    #[cfg(feature = "std")]
734    pub fn new(execution_probe: MonitorExecutionProbe) -> Self {
735        ensure_runtime_panic_hook_installed();
736        Self { execution_probe }
737    }
738
739    #[cfg(not(feature = "std"))]
740    pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
741        Self { execution_probe }
742    }
743
744    #[cfg(feature = "std")]
745    pub fn unavailable() -> Self {
746        Self::new(MonitorExecutionProbe::unavailable())
747    }
748
749    #[cfg(not(feature = "std"))]
750    pub const fn unavailable() -> Self {
751        Self::new(MonitorExecutionProbe::unavailable())
752    }
753
754    pub fn execution_probe(&self) -> &MonitorExecutionProbe {
755        &self.execution_probe
756    }
757
758    #[cfg(feature = "std")]
759    pub fn register_panic_cleanup<F>(&self, callback: F) -> PanicHookRegistration
760    where
761        F: Fn(&PanicReport) + Send + Sync + 'static,
762    {
763        ensure_runtime_panic_hook_installed();
764        register_panic_cleanup(callback)
765    }
766
767    #[cfg(feature = "std")]
768    pub fn register_panic_action<F>(&self, callback: F) -> PanicHookRegistration
769    where
770        F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
771    {
772        ensure_runtime_panic_hook_installed();
773        register_panic_action(callback)
774    }
775}
776
777#[cfg(feature = "std")]
778type PanicCleanupCallback = Arc<dyn Fn(&PanicReport) + Send + Sync + 'static>;
779#[cfg(feature = "std")]
780type PanicActionCallback = Arc<dyn Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static>;
781
782#[cfg(feature = "std")]
783#[derive(Debug, Clone)]
784pub struct PanicReport {
785    message: String,
786    location: Option<String>,
787    thread_name: Option<String>,
788    backtrace: String,
789    timestamp_unix_ms: u128,
790    crash_report_path: Option<String>,
791}
792
793#[cfg(feature = "std")]
794impl PanicReport {
795    fn capture(info: &PanicHookInfo<'_>) -> Self {
796        let location = info
797            .location()
798            .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()));
799        let thread_name = std::thread::current().name().map(|name| name.to_string());
800        let timestamp_unix_ms = SystemTime::now()
801            .duration_since(UNIX_EPOCH)
802            .map(|dur| dur.as_millis())
803            .unwrap_or(0);
804
805        Self {
806            message: panic_hook_payload_to_string(info),
807            location,
808            thread_name,
809            backtrace: Backtrace::force_capture().to_string(),
810            timestamp_unix_ms,
811            crash_report_path: None,
812        }
813    }
814
815    pub fn message(&self) -> &str {
816        &self.message
817    }
818
819    pub fn location(&self) -> Option<&str> {
820        self.location.as_deref()
821    }
822
823    pub fn thread_name(&self) -> Option<&str> {
824        self.thread_name.as_deref()
825    }
826
827    pub fn backtrace(&self) -> &str {
828        &self.backtrace
829    }
830
831    pub fn timestamp_unix_ms(&self) -> u128 {
832        self.timestamp_unix_ms
833    }
834
835    pub fn crash_report_path(&self) -> Option<&str> {
836        self.crash_report_path.as_deref()
837    }
838
839    pub fn summary(&self) -> String {
840        match self.location() {
841            Some(location) => format!("panic at {location}: {}", self.message()),
842            None => format!("panic: {}", self.message()),
843        }
844    }
845}
846
847#[cfg(feature = "std")]
848#[derive(Clone, Copy, Debug, PartialEq, Eq)]
849enum PanicHookRegistrationKind {
850    Cleanup,
851    Action,
852}
853
854#[cfg(feature = "std")]
855#[derive(Clone)]
856struct RegisteredPanicCleanup {
857    id: usize,
858    callback: PanicCleanupCallback,
859}
860
861#[cfg(feature = "std")]
862#[derive(Clone)]
863struct RegisteredPanicAction {
864    id: usize,
865    callback: PanicActionCallback,
866}
867
868#[cfg(feature = "std")]
869#[derive(Default)]
870struct PanicHookRegistry {
871    cleanup_callbacks: StdMutex<Vec<RegisteredPanicCleanup>>,
872    action_callbacks: StdMutex<Vec<RegisteredPanicAction>>,
873}
874
875#[cfg(feature = "std")]
876#[derive(Debug)]
877pub struct PanicHookRegistration {
878    id: usize,
879    kind: PanicHookRegistrationKind,
880}
881
882#[cfg(feature = "std")]
883impl Drop for PanicHookRegistration {
884    fn drop(&mut self) {
885        unregister_panic_hook(self.kind, self.id);
886    }
887}
888
889#[cfg(feature = "std")]
890static PANIC_HOOK_REGISTRY: OnceLock<PanicHookRegistry> = OnceLock::new();
891#[cfg(feature = "std")]
892static PANIC_HOOK_INSTALL_ONCE: OnceLock<()> = OnceLock::new();
893#[cfg(feature = "std")]
894static PANIC_HOOK_REGISTRATION_ID: AtomicUsize = AtomicUsize::new(1);
895#[cfg(feature = "std")]
896static PANIC_HOOK_ACTIVE_COUNT: AtomicUsize = AtomicUsize::new(0);
897
898#[cfg(feature = "std")]
899fn panic_hook_registry() -> &'static PanicHookRegistry {
900    PANIC_HOOK_REGISTRY.get_or_init(PanicHookRegistry::default)
901}
902
903#[cfg(feature = "std")]
904fn ensure_runtime_panic_hook_installed() {
905    let _ = PANIC_HOOK_INSTALL_ONCE.get_or_init(|| {
906        std::panic::set_hook(Box::new(move |info| {
907            let _guard = PanicHookActiveGuard::new();
908            let mut report = PanicReport::capture(info);
909            run_panic_cleanup_callbacks(&report);
910            report.crash_report_path = write_panic_report_to_file(&report);
911            emit_panic_report(&report);
912
913            if let Some(exit_code) = run_panic_action_callbacks(&report) {
914                std::process::exit(exit_code);
915            }
916        }));
917    });
918}
919
920#[cfg(feature = "std")]
921struct PanicHookActiveGuard;
922
923#[cfg(feature = "std")]
924impl PanicHookActiveGuard {
925    fn new() -> Self {
926        PANIC_HOOK_ACTIVE_COUNT.fetch_add(1, Ordering::SeqCst);
927        Self
928    }
929}
930
931#[cfg(feature = "std")]
932impl Drop for PanicHookActiveGuard {
933    fn drop(&mut self) {
934        PANIC_HOOK_ACTIVE_COUNT.fetch_sub(1, Ordering::SeqCst);
935    }
936}
937
938#[cfg(feature = "std")]
939pub fn runtime_panic_hook_active() -> bool {
940    PANIC_HOOK_ACTIVE_COUNT.load(Ordering::SeqCst) > 0
941}
942
943#[cfg(not(feature = "std"))]
944pub const fn runtime_panic_hook_active() -> bool {
945    false
946}
947
948#[cfg(feature = "std")]
949fn register_panic_cleanup<F>(callback: F) -> PanicHookRegistration
950where
951    F: Fn(&PanicReport) + Send + Sync + 'static,
952{
953    let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
954    let callback = Arc::new(callback) as PanicCleanupCallback;
955    let mut callbacks = panic_hook_registry()
956        .cleanup_callbacks
957        .lock()
958        .unwrap_or_else(|poison| poison.into_inner());
959    callbacks.push(RegisteredPanicCleanup { id, callback });
960    PanicHookRegistration {
961        id,
962        kind: PanicHookRegistrationKind::Cleanup,
963    }
964}
965
966#[cfg(feature = "std")]
967fn register_panic_action<F>(callback: F) -> PanicHookRegistration
968where
969    F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
970{
971    let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
972    let callback = Arc::new(callback) as PanicActionCallback;
973    let mut callbacks = panic_hook_registry()
974        .action_callbacks
975        .lock()
976        .unwrap_or_else(|poison| poison.into_inner());
977    callbacks.push(RegisteredPanicAction { id, callback });
978    PanicHookRegistration {
979        id,
980        kind: PanicHookRegistrationKind::Action,
981    }
982}
983
984#[cfg(feature = "std")]
985fn unregister_panic_hook(kind: PanicHookRegistrationKind, id: usize) {
986    let registry = panic_hook_registry();
987    match kind {
988        PanicHookRegistrationKind::Cleanup => {
989            let mut callbacks = registry
990                .cleanup_callbacks
991                .lock()
992                .unwrap_or_else(|poison| poison.into_inner());
993            callbacks.retain(|entry| entry.id != id);
994        }
995        PanicHookRegistrationKind::Action => {
996            let mut callbacks = registry
997                .action_callbacks
998                .lock()
999                .unwrap_or_else(|poison| poison.into_inner());
1000            callbacks.retain(|entry| entry.id != id);
1001        }
1002    }
1003}
1004
1005#[cfg(feature = "std")]
1006fn run_panic_cleanup_callbacks(report: &PanicReport) {
1007    let callbacks = panic_hook_registry()
1008        .cleanup_callbacks
1009        .lock()
1010        .unwrap_or_else(|poison| poison.into_inner())
1011        .clone();
1012    for entry in callbacks {
1013        (entry.callback)(report);
1014    }
1015}
1016
1017#[cfg(feature = "std")]
1018fn run_panic_action_callbacks(report: &PanicReport) -> Option<i32> {
1019    let callbacks = panic_hook_registry()
1020        .action_callbacks
1021        .lock()
1022        .unwrap_or_else(|poison| poison.into_inner())
1023        .clone();
1024    let mut exit_code = None;
1025    for entry in callbacks {
1026        if exit_code.is_none() {
1027            exit_code = (entry.callback)(report);
1028        } else {
1029            let _ = (entry.callback)(report);
1030        }
1031    }
1032    exit_code
1033}
1034
1035#[cfg(feature = "std")]
1036fn panic_hook_payload_to_string(info: &PanicHookInfo<'_>) -> String {
1037    if let Some(msg) = info.payload().downcast_ref::<&str>() {
1038        (*msg).to_string()
1039    } else if let Some(msg) = info.payload().downcast_ref::<String>() {
1040        msg.clone()
1041    } else {
1042        "panic with non-string payload".to_string()
1043    }
1044}
1045
1046#[cfg(feature = "std")]
1047fn render_panic_report(report: &PanicReport) -> String {
1048    let mut rendered = String::from("Copper panic\n");
1049    rendered.push_str(&format!("time_unix_ms: {}\n", report.timestamp_unix_ms()));
1050    rendered.push_str(&format!(
1051        "thread: {}\n",
1052        report.thread_name().unwrap_or("<unnamed>")
1053    ));
1054    if let Some(location) = report.location() {
1055        rendered.push_str(&format!("location: {location}\n"));
1056    }
1057    rendered.push_str(&format!("message: {}\n", report.message()));
1058    if let Some(path) = report.crash_report_path() {
1059        rendered.push_str(&format!("crash_report: {path}\n"));
1060    }
1061    rendered.push_str("\nBacktrace:\n");
1062    rendered.push_str(report.backtrace());
1063    if !report.backtrace().ends_with('\n') {
1064        rendered.push('\n');
1065    }
1066    rendered
1067}
1068
1069#[cfg(feature = "std")]
1070fn emit_panic_report(report: &PanicReport) {
1071    let mut stderr = std::io::stderr().lock();
1072    let _ = stderr.write_all(render_panic_report(report).as_bytes());
1073    let _ = stderr.flush();
1074}
1075
1076#[cfg(feature = "std")]
1077fn write_panic_report_to_file(report: &PanicReport) -> Option<String> {
1078    let cwd = std::env::current_dir().ok()?;
1079    let file_name = format!(
1080        "copper-crash-{}-{}.txt",
1081        report.timestamp_unix_ms(),
1082        std::process::id()
1083    );
1084    let path = cwd.join(file_name);
1085    let path_string = path.to_string_lossy().to_string();
1086    let mut file = File::create(&path).ok()?;
1087    let mut report_with_path = report.clone();
1088    report_with_path.crash_report_path = Some(path_string.clone());
1089    file.write_all(render_panic_report(&report_with_path).as_bytes())
1090        .ok()?;
1091    file.flush().ok()?;
1092    Some(path_string)
1093}
1094
1095/// Monitor decision to be taken when a component step errored out.
1096#[derive(Debug)]
1097pub enum Decision {
1098    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
1099    Ignore, // Ignore this error and try to continue, ie calling the other component steps, setting a None return value and continue a copperlist.
1100    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
1101}
1102
1103fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
1104    use Decision::{Abort, Ignore, Shutdown};
1105    // Pick the strictest monitor decision when multiple monitors disagree.
1106    // Shutdown dominates Abort, which dominates Ignore.
1107    match (lhs, rhs) {
1108        (Shutdown, _) | (_, Shutdown) => Shutdown,
1109        (Abort, _) | (_, Abort) => Abort,
1110        _ => Ignore,
1111    }
1112}
1113
1114#[derive(Debug, Clone)]
1115pub struct MonitorNode {
1116    pub id: String,
1117    pub type_name: Option<String>,
1118    pub kind: ComponentType,
1119    /// Ordered list of input port identifiers.
1120    pub inputs: Vec<String>,
1121    /// Ordered list of output port identifiers.
1122    pub outputs: Vec<String>,
1123}
1124
1125#[derive(Debug, Clone)]
1126pub struct MonitorConnection {
1127    pub src: String,
1128    pub src_port: Option<String>,
1129    pub dst: String,
1130    pub dst_port: Option<String>,
1131    pub msg: String,
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135pub struct MonitorTopology {
1136    pub nodes: Vec<MonitorNode>,
1137    pub connections: Vec<MonitorConnection>,
1138}
1139
1140#[derive(Debug, Clone, Copy, Default)]
1141pub struct CopperListInfo {
1142    pub size_bytes: usize,
1143    pub count: usize,
1144}
1145
1146impl CopperListInfo {
1147    pub const fn new(size_bytes: usize, count: usize) -> Self {
1148        Self { size_bytes, count }
1149    }
1150}
1151
1152/// Reported data about CopperList IO for a single iteration.
1153#[derive(Debug, Clone, Copy, Default)]
1154pub struct CopperListIoStats {
1155    /// CopperList bytes resident in RAM for this iteration.
1156    ///
1157    /// This includes the fixed CopperList struct size plus any pooled or
1158    /// handle-backed payload bytes observed on the real encode path.
1159    pub raw_culist_bytes: u64,
1160    /// Bytes attributed to handle-backed storage while measuring payload IO.
1161    ///
1162    /// This is surfaced separately so monitors can show how much of the runtime
1163    /// footprint lives in pooled payload buffers rather than inside the fixed
1164    /// CopperList struct.
1165    pub handle_bytes: u64,
1166    /// Bytes produced by bincode serialization of the CopperList
1167    pub encoded_culist_bytes: u64,
1168    /// Bytes produced by bincode serialization of the KeyFrame (0 if none)
1169    pub keyframe_bytes: u64,
1170    /// Cumulative bytes written to the structured log stream so far
1171    pub structured_log_bytes_total: u64,
1172    /// CopperList identifier for reference in monitors
1173    pub culistid: u64,
1174}
1175
1176#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1177pub struct PayloadIoStats {
1178    pub resident_bytes: usize,
1179    pub encoded_bytes: usize,
1180    pub handle_bytes: usize,
1181}
1182
1183#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1184pub struct CuMsgIoStats {
1185    pub present: bool,
1186    pub resident_bytes: u64,
1187    pub encoded_bytes: u64,
1188    pub handle_bytes: u64,
1189}
1190
1191struct CuMsgIoEntry {
1192    present: PortableAtomicBool,
1193    resident_bytes: PortableAtomicU64,
1194    encoded_bytes: PortableAtomicU64,
1195    handle_bytes: PortableAtomicU64,
1196}
1197
1198impl CuMsgIoEntry {
1199    fn clear(&self) {
1200        self.present.store(false, PortableOrdering::Release);
1201        self.resident_bytes.store(0, PortableOrdering::Relaxed);
1202        self.encoded_bytes.store(0, PortableOrdering::Relaxed);
1203        self.handle_bytes.store(0, PortableOrdering::Relaxed);
1204    }
1205
1206    fn get(&self) -> CuMsgIoStats {
1207        if !self.present.load(PortableOrdering::Acquire) {
1208            return CuMsgIoStats::default();
1209        }
1210
1211        CuMsgIoStats {
1212            present: true,
1213            resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
1214            encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
1215            handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
1216        }
1217    }
1218
1219    fn set(&self, stats: CuMsgIoStats) {
1220        self.resident_bytes
1221            .store(stats.resident_bytes, PortableOrdering::Relaxed);
1222        self.encoded_bytes
1223            .store(stats.encoded_bytes, PortableOrdering::Relaxed);
1224        self.handle_bytes
1225            .store(stats.handle_bytes, PortableOrdering::Relaxed);
1226        self.present.store(stats.present, PortableOrdering::Release);
1227    }
1228}
1229
1230impl Default for CuMsgIoEntry {
1231    fn default() -> Self {
1232        Self {
1233            present: PortableAtomicBool::new(false),
1234            resident_bytes: PortableAtomicU64::new(0),
1235            encoded_bytes: PortableAtomicU64::new(0),
1236            handle_bytes: PortableAtomicU64::new(0),
1237        }
1238    }
1239}
1240
1241pub struct CuMsgIoCache<const N: usize> {
1242    entries: [CuMsgIoEntry; N],
1243}
1244
1245impl<const N: usize> CuMsgIoCache<N> {
1246    pub fn clear(&self) {
1247        for entry in &self.entries {
1248            entry.clear();
1249        }
1250    }
1251
1252    pub fn get(&self, idx: usize) -> CuMsgIoStats {
1253        self.entries[idx].get()
1254    }
1255
1256    fn raw_parts(&self) -> (usize, usize) {
1257        (self.entries.as_ptr() as usize, N)
1258    }
1259}
1260
1261impl<const N: usize> Default for CuMsgIoCache<N> {
1262    fn default() -> Self {
1263        Self {
1264            entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
1265        }
1266    }
1267}
1268
1269#[derive(Clone, Copy)]
1270struct ActiveCuMsgIoCapture {
1271    cache_addr: usize,
1272    cache_len: usize,
1273    current_slot: Option<usize>,
1274}
1275
1276#[cfg(feature = "std")]
1277thread_local! {
1278    static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
1279    static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
1280    static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
1281}
1282
1283#[cfg(not(feature = "std"))]
1284static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
1285#[cfg(not(feature = "std"))]
1286static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
1287#[cfg(not(feature = "std"))]
1288static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
1289
1290fn begin_payload_io_measurement() {
1291    #[cfg(feature = "std")]
1292    PAYLOAD_HANDLE_BYTES.with(|bytes| {
1293        debug_assert!(
1294            bytes.get().is_none(),
1295            "payload IO byte measurement must not be nested"
1296        );
1297        bytes.set(Some(0));
1298    });
1299
1300    #[cfg(not(feature = "std"))]
1301    {
1302        let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
1303        debug_assert!(
1304            bytes.is_none(),
1305            "payload IO byte measurement must not be nested"
1306        );
1307        *bytes = Some(0);
1308    }
1309}
1310
1311fn finish_payload_io_measurement() -> usize {
1312    #[cfg(feature = "std")]
1313    {
1314        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
1315    }
1316
1317    #[cfg(not(feature = "std"))]
1318    {
1319        PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
1320    }
1321}
1322
1323fn abort_payload_io_measurement() {
1324    #[cfg(feature = "std")]
1325    PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
1326
1327    #[cfg(not(feature = "std"))]
1328    {
1329        *PAYLOAD_HANDLE_BYTES.lock() = None;
1330    }
1331}
1332
1333fn current_payload_io_measurement() -> usize {
1334    #[cfg(feature = "std")]
1335    {
1336        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
1337    }
1338
1339    #[cfg(not(feature = "std"))]
1340    {
1341        PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
1342    }
1343}
1344
1345#[cfg(feature = "std")]
1346pub(crate) fn record_payload_handle_bytes(bytes: usize) {
1347    #[cfg(feature = "std")]
1348    PAYLOAD_HANDLE_BYTES.with(|total| {
1349        if let Some(current) = total.get() {
1350            total.set(Some(current.saturating_add(bytes)));
1351        }
1352    });
1353
1354    #[cfg(not(feature = "std"))]
1355    {
1356        let mut total = PAYLOAD_HANDLE_BYTES.lock();
1357        if let Some(current) = *total {
1358            *total = Some(current.saturating_add(bytes));
1359        }
1360    }
1361}
1362
1363fn set_last_completed_handle_bytes(bytes: u64) {
1364    #[cfg(feature = "std")]
1365    LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1366
1367    #[cfg(not(feature = "std"))]
1368    {
1369        *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1370    }
1371}
1372
1373pub fn take_last_completed_handle_bytes() -> u64 {
1374    #[cfg(feature = "std")]
1375    {
1376        LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1377    }
1378
1379    #[cfg(not(feature = "std"))]
1380    {
1381        let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1382        let value = *total;
1383        *total = 0;
1384        value
1385    }
1386}
1387
1388fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1389    #[cfg(feature = "std")]
1390    {
1391        ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1392            let mut state = capture.get()?;
1393            let result = f(&mut state);
1394            capture.set(Some(state));
1395            Some(result)
1396        })
1397    }
1398
1399    #[cfg(not(feature = "std"))]
1400    {
1401        let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1402        let state = capture.as_mut()?;
1403        Some(f(state))
1404    }
1405}
1406
1407pub struct CuMsgIoCaptureGuard;
1408
1409impl CuMsgIoCaptureGuard {
1410    pub fn select_slot(&self, slot: usize) {
1411        let _ = with_active_capture_mut(|capture| {
1412            debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1413            capture.current_slot = Some(slot);
1414        });
1415    }
1416}
1417
1418impl Drop for CuMsgIoCaptureGuard {
1419    fn drop(&mut self) {
1420        set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1421
1422        #[cfg(feature = "std")]
1423        ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1424
1425        #[cfg(not(feature = "std"))]
1426        {
1427            *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1428        }
1429    }
1430}
1431
1432pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1433    cache.clear();
1434    set_last_completed_handle_bytes(0);
1435    begin_payload_io_measurement();
1436    let (cache_addr, cache_len) = cache.raw_parts();
1437    let capture = ActiveCuMsgIoCapture {
1438        cache_addr,
1439        cache_len,
1440        current_slot: None,
1441    };
1442
1443    #[cfg(feature = "std")]
1444    ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1445        debug_assert!(
1446            state.get().is_none(),
1447            "CopperList payload IO capture must not be nested"
1448        );
1449        state.set(Some(capture));
1450    });
1451
1452    #[cfg(not(feature = "std"))]
1453    {
1454        let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1455        debug_assert!(
1456            state.is_none(),
1457            "CopperList payload IO capture must not be nested"
1458        );
1459        *state = Some(capture);
1460    }
1461
1462    CuMsgIoCaptureGuard
1463}
1464
1465pub(crate) fn current_payload_handle_bytes() -> usize {
1466    current_payload_io_measurement()
1467}
1468
1469pub(crate) fn record_current_slot_payload_io_stats(
1470    fixed_bytes: usize,
1471    encoded_bytes: usize,
1472    handle_bytes: usize,
1473) {
1474    let _ = with_active_capture_mut(|capture| {
1475        let Some(slot) = capture.current_slot else {
1476            return;
1477        };
1478        if slot >= capture.cache_len {
1479            return;
1480        }
1481        // SAFETY: the capture guard holds the cache alive for the duration of the encode pass.
1482        let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1483        let entry = unsafe { &*cache_ptr.add(slot) };
1484        entry.set(CuMsgIoStats {
1485            present: true,
1486            resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1487            encoded_bytes: encoded_bytes as u64,
1488            handle_bytes: handle_bytes as u64,
1489        });
1490    });
1491}
1492
1493/// Measures payload bytes using the same encode path Copper uses for
1494/// logging/export.
1495///
1496/// `resident_bytes` is the payload's in-memory fixed footprint plus any
1497/// handle-backed dynamic storage reported during encoding. `encoded_bytes` is
1498/// the exact bincode payload size.
1499pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1500where
1501    T: Encode,
1502{
1503    begin_payload_io_measurement();
1504    begin_observed_encode();
1505
1506    let result = (|| {
1507        let mut encoder =
1508            EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1509        payload.encode(&mut encoder).map_err(|e| {
1510            CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1511        })?;
1512        let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1513        debug_assert_eq!(encoded_bytes, finish_observed_encode());
1514        let handle_bytes = finish_payload_io_measurement();
1515        Ok(PayloadIoStats {
1516            resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1517            encoded_bytes,
1518            handle_bytes,
1519        })
1520    })();
1521
1522    if result.is_err() {
1523        abort_payload_io_measurement();
1524        abort_observed_encode();
1525    }
1526
1527    result
1528}
1529
1530fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1531    let Ok(msg_types) = graph.get_node_output_msg_types_by_id(node_id) else {
1532        return Vec::new();
1533    };
1534
1535    let mut outputs = Vec::new();
1536    for (port_idx, msg) in msg_types.into_iter().enumerate() {
1537        let mut port_label = String::from("out");
1538        port_label.push_str(&port_idx.to_string());
1539        port_label.push_str(": ");
1540        port_label.push_str(msg.as_str());
1541        outputs.push((msg, port_label));
1542    }
1543    outputs
1544}
1545
1546/// Derive a monitor-friendly topology from the runtime configuration.
1547pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1548    let graph = config.get_graph(Some(mission))?;
1549    let mut nodes: Map<String, MonitorNode> = Map::new();
1550    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1551
1552    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1553    for bridge in &config.bridges {
1554        bridge_lookup.insert(bridge.id.as_str(), bridge);
1555    }
1556
1557    for (node_idx, node) in graph.get_all_nodes() {
1558        let node_id = node.get_id();
1559        let task_kind = match node.get_flavor() {
1560            Flavor::Bridge => ComponentType::Bridge,
1561            Flavor::Task => match resolve_task_kind_for_id(graph, node_idx)? {
1562                TaskKind::Source => ComponentType::Source,
1563                TaskKind::Regular => ComponentType::Task,
1564                TaskKind::Sink => ComponentType::Sink,
1565            },
1566        };
1567
1568        let mut inputs = Vec::new();
1569        let mut outputs = Vec::new();
1570        if task_kind == ComponentType::Bridge
1571            && let Some(bridge) = bridge_lookup.get(node_id.as_str())
1572        {
1573            for ch in &bridge.channels {
1574                match ch {
1575                    BridgeChannelConfigRepresentation::Rx { id, .. } => outputs.push(id.clone()),
1576                    BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1577                }
1578            }
1579        } else {
1580            match task_kind {
1581                ComponentType::Source => {
1582                    let ports = collect_output_ports(graph, node_idx);
1583                    let mut port_map: Map<String, String> = Map::new();
1584                    for (msg_type, label) in ports {
1585                        port_map.insert(msg_type, label.clone());
1586                        outputs.push(label);
1587                    }
1588                    output_port_lookup.insert(node_id.clone(), port_map);
1589                }
1590                ComponentType::Task => {
1591                    inputs.push("in".to_string());
1592                    let ports = collect_output_ports(graph, node_idx);
1593                    let mut port_map: Map<String, String> = Map::new();
1594                    for (msg_type, label) in ports {
1595                        port_map.insert(msg_type, label.clone());
1596                        outputs.push(label);
1597                    }
1598                    output_port_lookup.insert(node_id.clone(), port_map);
1599                }
1600                ComponentType::Sink => {
1601                    inputs.push("in".to_string());
1602                }
1603                ComponentType::Bridge => unreachable!("handled above"),
1604            }
1605        }
1606
1607        nodes.insert(
1608            node_id.clone(),
1609            MonitorNode {
1610                id: node_id,
1611                type_name: Some(node.get_type().to_string()),
1612                kind: task_kind,
1613                inputs,
1614                outputs,
1615            },
1616        );
1617    }
1618
1619    let mut connections = Vec::new();
1620    for cnx in graph.edges() {
1621        let src = cnx.src.clone();
1622        let dst = cnx.dst.clone();
1623
1624        let src_port = cnx.src_channel.clone().or_else(|| {
1625            output_port_lookup
1626                .get(&src)
1627                .and_then(|ports| ports.get(&cnx.msg).cloned())
1628                .or_else(|| {
1629                    nodes
1630                        .get(&src)
1631                        .and_then(|node| node.outputs.first().cloned())
1632                })
1633        });
1634        let dst_port = cnx.dst_channel.clone().or_else(|| {
1635            nodes
1636                .get(&dst)
1637                .and_then(|node| node.inputs.first().cloned())
1638        });
1639
1640        connections.push(MonitorConnection {
1641            src,
1642            src_port,
1643            dst,
1644            dst_port,
1645            msg: cnx.msg.clone(),
1646        });
1647    }
1648
1649    Ok(MonitorTopology {
1650        nodes: nodes.into_values().collect(),
1651        connections,
1652    })
1653}
1654
1655/// Runtime monitoring contract implemented by monitor components.
1656///
1657/// Lifecycle:
1658/// 1. [`CuMonitor::new`] is called once at runtime construction time.
1659/// 2. [`CuMonitor::start`] is called once before the first runtime iteration.
1660/// 3. For each iteration, [`CuMonitor::process_copperlist`] is called after component execution,
1661///    then [`CuMonitor::observe_copperlist_io`] after serialization accounting.
1662/// 4. [`CuMonitor::process_error`] is called synchronously when a monitored component step fails.
1663/// 5. [`CuMonitor::process_panic`] is called when the runtime catches a panic (`std` builds).
1664/// 6. [`CuMonitor::stop`] is called once during runtime shutdown.
1665///
1666/// Indexing model:
1667/// - `process_error(component_id, ..)` uses component indices into `metadata.components()`.
1668/// - `process_copperlist(..., view)` iterates CopperList slots with resolved component identity.
1669///
1670/// Error policy:
1671/// - [`Decision::Ignore`] continues execution.
1672/// - [`Decision::Abort`] aborts the current operation (step/copperlist scope).
1673/// - [`Decision::Shutdown`] triggers runtime shutdown.
1674pub trait CuMonitor: Sized {
1675    /// Construct the monitor once, before component execution starts.
1676    ///
1677    /// `metadata` contains mission/config/topology/static mapping information.
1678    /// `runtime` exposes dynamic runtime handles (for example execution probes).
1679    /// Use `metadata.monitor_config()` to decode monitor-specific parameters.
1680    fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1681    where
1682        Self: Sized;
1683
1684    /// Called once before processing the first CopperList.
1685    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1686        Ok(())
1687    }
1688
1689    /// Called once per processed CopperList after component execution.
1690    fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1691
1692    /// Called when runtime finishes CopperList serialization/IO accounting.
1693    fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1694
1695    /// Called when a monitored component step fails; must return an immediate runtime decision.
1696    ///
1697    /// `component_id` is an index into [`CuMonitoringMetadata::components`].
1698    fn process_error(
1699        &self,
1700        component_id: ComponentId,
1701        step: CuComponentState,
1702        error: &CuError,
1703    ) -> Decision;
1704
1705    /// Called when the runtime catches a panic (`std` builds).
1706    fn process_panic(&self, _panic_message: &str) {}
1707
1708    /// Called once during runtime shutdown.
1709    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1710        Ok(())
1711    }
1712}
1713
1714/// A do nothing monitor if no monitor is provided.
1715/// This is basically defining the default behavior of Copper in case of error.
1716pub struct NoMonitor {}
1717impl CuMonitor for NoMonitor {
1718    fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1719        Ok(NoMonitor {})
1720    }
1721
1722    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1723        #[cfg(all(feature = "std", debug_assertions))]
1724        register_live_log_listener(|entry, format_str, param_names| {
1725            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1726            let named: Map<String, String> = param_names
1727                .iter()
1728                .zip(params.iter())
1729                .map(|(k, v)| (k.to_string(), v.clone()))
1730                .collect();
1731
1732            if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1733                let ts = format_timestamp(entry.time.into());
1734                println!("{} [{:?}] {}", ts, entry.level, msg);
1735            }
1736        });
1737        Ok(())
1738    }
1739
1740    fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1741        // By default, do nothing.
1742        Ok(())
1743    }
1744
1745    fn process_error(
1746        &self,
1747        _component_id: ComponentId,
1748        _step: CuComponentState,
1749        _error: &CuError,
1750    ) -> Decision {
1751        // By default, just try to continue.
1752        Decision::Ignore
1753    }
1754
1755    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1756        #[cfg(all(feature = "std", debug_assertions))]
1757        unregister_live_log_listener();
1758        Ok(())
1759    }
1760}
1761
1762macro_rules! impl_monitor_tuple {
1763    ($($idx:tt => $name:ident),+) => {
1764        impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1765            fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1766            where
1767                Self: Sized,
1768            {
1769                Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1770            }
1771
1772            fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1773                $(self.$idx.start(ctx)?;)+
1774                Ok(())
1775            }
1776
1777            fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1778                $(self.$idx.process_copperlist(ctx, view)?;)+
1779                Ok(())
1780            }
1781
1782            fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1783                $(self.$idx.observe_copperlist_io(stats);)+
1784            }
1785
1786            fn process_error(
1787                &self,
1788                component_id: ComponentId,
1789                step: CuComponentState,
1790                error: &CuError,
1791            ) -> Decision {
1792                let mut decision = Decision::Ignore;
1793                $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1794                decision
1795            }
1796
1797            fn process_panic(&self, panic_message: &str) {
1798                $(self.$idx.process_panic(panic_message);)+
1799            }
1800
1801            fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1802                $(self.$idx.stop(ctx)?;)+
1803                Ok(())
1804            }
1805        }
1806    };
1807}
1808
1809impl_monitor_tuple!(0 => M0, 1 => M1);
1810impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1811impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1812impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1813impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1814
1815#[cfg(feature = "std")]
1816pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1817    if let Some(msg) = payload.downcast_ref::<&str>() {
1818        (*msg).to_string()
1819    } else if let Some(msg) = payload.downcast_ref::<String>() {
1820        msg.clone()
1821    } else {
1822        "panic with non-string payload".to_string()
1823    }
1824}
1825
1826/// A simple allocator that counts the number of bytes allocated and deallocated.
1827pub struct CountingAlloc<A: GlobalAlloc> {
1828    inner: A,
1829    allocated: AtomicUsize,
1830    deallocated: AtomicUsize,
1831}
1832
1833impl<A: GlobalAlloc> CountingAlloc<A> {
1834    pub const fn new(inner: A) -> Self {
1835        CountingAlloc {
1836            inner,
1837            allocated: AtomicUsize::new(0),
1838            deallocated: AtomicUsize::new(0),
1839        }
1840    }
1841
1842    pub fn allocated(&self) -> usize {
1843        self.allocated.load(Ordering::SeqCst)
1844    }
1845
1846    pub fn deallocated(&self) -> usize {
1847        self.deallocated.load(Ordering::SeqCst)
1848    }
1849
1850    pub fn reset(&self) {
1851        self.allocated.store(0, Ordering::SeqCst);
1852        self.deallocated.store(0, Ordering::SeqCst);
1853    }
1854}
1855
1856// SAFETY: Delegates allocation/deallocation to the inner allocator while tracking sizes.
1857unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1858    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1859    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1860        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1861        let p = unsafe { self.inner.alloc(layout) };
1862        if !p.is_null() {
1863            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1864        }
1865        p
1866    }
1867
1868    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1869    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1870        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1871        unsafe { self.inner.dealloc(ptr, layout) }
1872        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1873    }
1874}
1875
1876/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
1877#[cfg(feature = "memory_monitoring")]
1878pub struct ScopedAllocCounter {
1879    bf_allocated: usize,
1880    bf_deallocated: usize,
1881}
1882
1883#[cfg(feature = "memory_monitoring")]
1884impl Default for ScopedAllocCounter {
1885    fn default() -> Self {
1886        Self::new()
1887    }
1888}
1889
1890#[cfg(feature = "memory_monitoring")]
1891impl ScopedAllocCounter {
1892    pub fn new() -> Self {
1893        ScopedAllocCounter {
1894            bf_allocated: GLOBAL.allocated(),
1895            bf_deallocated: GLOBAL.deallocated(),
1896        }
1897    }
1898
1899    /// Returns the total number of bytes allocated in the current scope
1900    /// since the creation of this `ScopedAllocCounter`.
1901    ///
1902    /// # Example
1903    /// ```
1904    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1905    ///
1906    /// let counter = ScopedAllocCounter::new();
1907    /// let _vec = vec![0u8; 1024];
1908    /// println!("Bytes allocated: {}", counter.get_allocated());
1909    /// ```
1910    pub fn allocated(&self) -> usize {
1911        GLOBAL.allocated() - self.bf_allocated
1912    }
1913
1914    /// Returns the total number of bytes deallocated in the current scope
1915    /// since the creation of this `ScopedAllocCounter`.
1916    ///
1917    /// # Example
1918    /// ```
1919    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1920    ///
1921    /// let counter = ScopedAllocCounter::new();
1922    /// let _vec = vec![0u8; 1024];
1923    /// drop(_vec);
1924    /// println!("Bytes deallocated: {}", counter.get_deallocated());
1925    /// ```
1926    pub fn deallocated(&self) -> usize {
1927        GLOBAL.deallocated() - self.bf_deallocated
1928    }
1929}
1930
1931/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
1932#[cfg(feature = "memory_monitoring")]
1933impl Drop for ScopedAllocCounter {
1934    fn drop(&mut self) {
1935        let _allocated = GLOBAL.allocated() - self.bf_allocated;
1936        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1937        // TODO(gbin): Fix this when the logger is ready.
1938        // debug!(
1939        //     "Allocations: +{}B -{}B",
1940        //     allocated = allocated,
1941        //     deallocated = deallocated,
1942        // );
1943    }
1944}
1945
1946#[cfg(feature = "std")]
1947const BUCKET_COUNT: usize = 1024;
1948#[cfg(not(feature = "std"))]
1949const BUCKET_COUNT: usize = 256;
1950
1951/// Accumulative stat object that can give your some real time statistics.
1952/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
1953#[derive(Debug, Clone)]
1954pub struct LiveStatistics {
1955    buckets: [u64; BUCKET_COUNT],
1956    min_val: u64,
1957    max_val: u64,
1958    sum: u128,
1959    sum_sq: u128,
1960    count: u64,
1961    max_value: u64,
1962}
1963
1964impl LiveStatistics {
1965    /// Creates a new `LiveStatistics` instance with a specified maximum value.
1966    ///
1967    /// This function initializes a `LiveStatistics` structure with default values
1968    /// for tracking statistical data, while setting an upper limit for the data
1969    /// points that the structure tracks.
1970    ///
1971    /// # Parameters
1972    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
1973    ///
1974    /// # Returns
1975    /// A new instance of `LiveStatistics` with:
1976    /// - `buckets`: An array pre-filled with zeros to categorize data points.
1977    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
1978    /// - `max_val`: Initialized to zero.
1979    /// - `sum`: The sum of all data points, initialized to zero.
1980    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
1981    /// - `count`: The total number of data points, initialized to zero.
1982    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
1983    ///
1984    pub fn new_with_max(max_value: u64) -> Self {
1985        LiveStatistics {
1986            buckets: [0; BUCKET_COUNT],
1987            min_val: u64::MAX,
1988            max_val: 0,
1989            sum: 0,
1990            sum_sq: 0,
1991            count: 0,
1992            max_value,
1993        }
1994    }
1995
1996    #[inline]
1997    fn value_to_bucket(&self, value: u64) -> usize {
1998        if value >= self.max_value {
1999            BUCKET_COUNT - 1
2000        } else {
2001            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
2002        }
2003    }
2004
2005    #[inline]
2006    pub fn min(&self) -> u64 {
2007        if self.count == 0 { 0 } else { self.min_val }
2008    }
2009
2010    #[inline]
2011    pub fn max(&self) -> u64 {
2012        self.max_val
2013    }
2014
2015    #[inline]
2016    pub fn mean(&self) -> f64 {
2017        if self.count == 0 {
2018            0.0
2019        } else {
2020            self.sum as f64 / self.count as f64
2021        }
2022    }
2023
2024    #[inline]
2025    pub fn stdev(&self) -> f64 {
2026        if self.count == 0 {
2027            return 0.0;
2028        }
2029        let mean = self.mean();
2030        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
2031        if variance < 0.0 {
2032            return 0.0;
2033        }
2034        #[cfg(feature = "std")]
2035        return variance.sqrt();
2036        #[cfg(not(feature = "std"))]
2037        return sqrt(variance);
2038    }
2039
2040    #[inline]
2041    pub fn percentile(&self, percentile: f64) -> u64 {
2042        if self.count == 0 {
2043            return 0;
2044        }
2045
2046        let target_count = (self.count as f64 * percentile) as u64;
2047        let mut accumulated = 0u64;
2048
2049        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
2050            accumulated += bucket_count;
2051            if accumulated >= target_count {
2052                // Linear interpolation within the bucket
2053                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
2054                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
2055                let bucket_fraction = if bucket_count > 0 {
2056                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
2057                } else {
2058                    0.5
2059                };
2060                return bucket_start
2061                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
2062            }
2063        }
2064
2065        self.max_val
2066    }
2067
2068    /// Adds a value to the statistics.
2069    #[inline]
2070    pub fn record(&mut self, value: u64) {
2071        if value < self.min_val {
2072            self.min_val = value;
2073        }
2074        if value > self.max_val {
2075            self.max_val = value;
2076        }
2077        let value_u128 = value as u128;
2078        self.sum += value_u128;
2079        self.sum_sq += value_u128 * value_u128;
2080        self.count += 1;
2081
2082        let bucket = self.value_to_bucket(value);
2083        self.buckets[bucket] += 1;
2084    }
2085
2086    #[inline]
2087    pub fn len(&self) -> u64 {
2088        self.count
2089    }
2090
2091    #[inline]
2092    pub fn is_empty(&self) -> bool {
2093        self.count == 0
2094    }
2095
2096    #[inline]
2097    pub fn reset(&mut self) {
2098        self.buckets.fill(0);
2099        self.min_val = u64::MAX;
2100        self.max_val = 0;
2101        self.sum = 0;
2102        self.sum_sq = 0;
2103        self.count = 0;
2104    }
2105}
2106
2107/// A Specialized statistics object for CuDuration.
2108/// It will also keep track of the jitter between the values.
2109#[derive(Debug, Clone)]
2110pub struct CuDurationStatistics {
2111    bare: LiveStatistics,
2112    jitter: LiveStatistics,
2113    last_value: CuDuration,
2114}
2115
2116impl CuDurationStatistics {
2117    pub fn new(max: CuDuration) -> Self {
2118        let CuDuration(max) = max;
2119        CuDurationStatistics {
2120            bare: LiveStatistics::new_with_max(max),
2121            jitter: LiveStatistics::new_with_max(max),
2122            last_value: CuDuration::default(),
2123        }
2124    }
2125
2126    #[inline]
2127    pub fn min(&self) -> CuDuration {
2128        CuDuration(self.bare.min())
2129    }
2130
2131    #[inline]
2132    pub fn max(&self) -> CuDuration {
2133        CuDuration(self.bare.max())
2134    }
2135
2136    #[inline]
2137    pub fn mean(&self) -> CuDuration {
2138        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
2139    }
2140
2141    #[inline]
2142    pub fn percentile(&self, percentile: f64) -> CuDuration {
2143        CuDuration(self.bare.percentile(percentile))
2144    }
2145
2146    #[inline]
2147    pub fn stddev(&self) -> CuDuration {
2148        CuDuration(self.bare.stdev() as u64)
2149    }
2150
2151    #[inline]
2152    pub fn len(&self) -> u64 {
2153        self.bare.len()
2154    }
2155
2156    #[inline]
2157    pub fn is_empty(&self) -> bool {
2158        self.bare.len() == 0
2159    }
2160
2161    #[inline]
2162    pub fn jitter_min(&self) -> CuDuration {
2163        CuDuration(self.jitter.min())
2164    }
2165
2166    #[inline]
2167    pub fn jitter_max(&self) -> CuDuration {
2168        CuDuration(self.jitter.max())
2169    }
2170
2171    #[inline]
2172    pub fn jitter_mean(&self) -> CuDuration {
2173        CuDuration(self.jitter.mean() as u64)
2174    }
2175
2176    #[inline]
2177    pub fn jitter_stddev(&self) -> CuDuration {
2178        CuDuration(self.jitter.stdev() as u64)
2179    }
2180
2181    #[inline]
2182    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
2183        CuDuration(self.jitter.percentile(percentile))
2184    }
2185
2186    #[inline]
2187    pub fn record(&mut self, value: CuDuration) {
2188        let CuDuration(nanos) = value;
2189        if self.bare.is_empty() {
2190            self.bare.record(nanos);
2191            self.last_value = value;
2192            return;
2193        }
2194        self.bare.record(nanos);
2195        let CuDuration(last_nanos) = self.last_value;
2196        self.jitter.record(nanos.abs_diff(last_nanos));
2197        self.last_value = value;
2198    }
2199
2200    #[inline]
2201    pub fn reset(&mut self) {
2202        self.bare.reset();
2203        self.jitter.reset();
2204    }
2205}
2206
2207#[cfg(test)]
2208mod tests {
2209    use super::*;
2210    use core::sync::atomic::{AtomicUsize, Ordering};
2211
2212    #[derive(Clone, Copy)]
2213    enum TestDecision {
2214        Ignore,
2215        Abort,
2216        Shutdown,
2217    }
2218
2219    struct TestMonitor {
2220        decision: TestDecision,
2221        copperlist_calls: AtomicUsize,
2222        panic_calls: AtomicUsize,
2223    }
2224
2225    impl TestMonitor {
2226        fn new_with(decision: TestDecision) -> Self {
2227            Self {
2228                decision,
2229                copperlist_calls: AtomicUsize::new(0),
2230                panic_calls: AtomicUsize::new(0),
2231            }
2232        }
2233    }
2234
2235    fn test_metadata() -> CuMonitoringMetadata {
2236        const COMPONENTS: &[MonitorComponentMetadata] = &[
2237            MonitorComponentMetadata::new("a", ComponentType::Task, None),
2238            MonitorComponentMetadata::new("b", ComponentType::Task, None),
2239        ];
2240        CuMonitoringMetadata::new(
2241            CompactString::from(crate::config::DEFAULT_MISSION_ID),
2242            COMPONENTS,
2243            &[],
2244            CopperListInfo::new(0, 0),
2245            MonitorTopology::default(),
2246            None,
2247        )
2248        .expect("test metadata should be valid")
2249    }
2250
2251    impl CuMonitor for TestMonitor {
2252        fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
2253            let monitor = Self::new_with(TestDecision::Ignore);
2254            #[cfg(feature = "std")]
2255            let _ = runtime.execution_probe();
2256            Ok(monitor)
2257        }
2258
2259        fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
2260            self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
2261            Ok(())
2262        }
2263
2264        fn process_error(
2265            &self,
2266            _component_id: ComponentId,
2267            _step: CuComponentState,
2268            _error: &CuError,
2269        ) -> Decision {
2270            match self.decision {
2271                TestDecision::Ignore => Decision::Ignore,
2272                TestDecision::Abort => Decision::Abort,
2273                TestDecision::Shutdown => Decision::Shutdown,
2274            }
2275        }
2276
2277        fn process_panic(&self, _panic_message: &str) {
2278            self.panic_calls.fetch_add(1, Ordering::SeqCst);
2279        }
2280    }
2281
2282    #[test]
2283    fn test_live_statistics_percentiles() {
2284        let mut stats = LiveStatistics::new_with_max(1000);
2285
2286        // Record 100 values from 0 to 99
2287        for i in 0..100 {
2288            stats.record(i);
2289        }
2290
2291        assert_eq!(stats.len(), 100);
2292        assert_eq!(stats.min(), 0);
2293        assert_eq!(stats.max(), 99);
2294        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
2295
2296        // Test percentiles - should be approximately correct
2297        let p50 = stats.percentile(0.5);
2298        let p90 = stats.percentile(0.90);
2299        let p95 = stats.percentile(0.95);
2300        let p99 = stats.percentile(0.99);
2301
2302        // With 100 samples from 0-99, percentiles should be close to their index
2303        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
2304        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
2305        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
2306        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
2307    }
2308
2309    #[test]
2310    fn test_duration_stats() {
2311        let mut stats = CuDurationStatistics::new(CuDuration(1000));
2312        stats.record(CuDuration(100));
2313        stats.record(CuDuration(200));
2314        stats.record(CuDuration(500));
2315        stats.record(CuDuration(400));
2316        assert_eq!(stats.min(), CuDuration(100));
2317        assert_eq!(stats.max(), CuDuration(500));
2318        assert_eq!(stats.mean(), CuDuration(300));
2319        assert_eq!(stats.len(), 4);
2320        assert_eq!(stats.jitter.len(), 3);
2321        assert_eq!(stats.jitter_min(), CuDuration(100));
2322        assert_eq!(stats.jitter_max(), CuDuration(300));
2323        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
2324        stats.reset();
2325        assert_eq!(stats.len(), 0);
2326    }
2327
2328    #[test]
2329    fn test_duration_stats_large_samples_do_not_overflow() {
2330        let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
2331        stats.record(CuDuration(5_000_000_000));
2332        stats.record(CuDuration(8_000_000_000));
2333
2334        assert_eq!(stats.min(), CuDuration(5_000_000_000));
2335        assert_eq!(stats.max(), CuDuration(8_000_000_000));
2336        assert_eq!(stats.mean(), CuDuration(6_500_000_000));
2337        assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
2338        assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
2339    }
2340
2341    #[test]
2342    fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
2343        let err = CuError::from("boom");
2344
2345        let two = (
2346            TestMonitor::new_with(TestDecision::Ignore),
2347            TestMonitor::new_with(TestDecision::Shutdown),
2348        );
2349        assert!(matches!(
2350            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2351            Decision::Shutdown
2352        ));
2353
2354        let two = (
2355            TestMonitor::new_with(TestDecision::Ignore),
2356            TestMonitor::new_with(TestDecision::Abort),
2357        );
2358        assert!(matches!(
2359            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2360            Decision::Abort
2361        ));
2362    }
2363
2364    #[test]
2365    fn tuple_monitor_fans_out_callbacks() {
2366        let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2367            test_metadata(),
2368            CuMonitoringRuntime::unavailable(),
2369        )
2370        .expect("tuple new");
2371        let (ctx, _clock_control) = CuContext::new_mock_clock();
2372        let empty_view = test_metadata().layout().view(&[]);
2373        monitors
2374            .process_copperlist(&ctx, empty_view)
2375            .expect("process_copperlist should fan out");
2376        monitors.process_panic("panic marker");
2377
2378        assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2379        assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2380        assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2381        assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2382    }
2383
2384    fn encoded_size<E: Encode>(value: &E) -> usize {
2385        let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2386        value
2387            .encode(&mut encoder)
2388            .expect("size measurement encoder should not fail");
2389        encoder.into_writer().bytes_written
2390    }
2391
2392    #[test]
2393    fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2394        let payload = vec![1u8, 2, 3, 4];
2395        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2396
2397        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2398        assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2399        assert_eq!(io.handle_bytes, 0);
2400    }
2401
2402    #[test]
2403    fn payload_io_stats_tracks_handle_backed_storage() {
2404        let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2405        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2406
2407        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2408        assert_eq!(
2409            io.resident_bytes,
2410            core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2411        );
2412        assert_eq!(io.handle_bytes, 32);
2413    }
2414
2415    #[test]
2416    fn runtime_execution_probe_roundtrip_marker() {
2417        let probe = RuntimeExecutionProbe::default();
2418        assert!(probe.marker().is_none());
2419        assert_eq!(probe.sequence(), 0);
2420
2421        probe.record(ExecutionMarker {
2422            component_id: ComponentId::new(7),
2423            step: CuComponentState::Process,
2424            culistid: Some(42),
2425        });
2426
2427        let marker = probe.marker().expect("marker should be available");
2428        assert_eq!(marker.component_id, ComponentId::new(7));
2429        assert!(matches!(marker.step, CuComponentState::Process));
2430        assert_eq!(marker.culistid, Some(42));
2431        assert_eq!(probe.sequence(), 1);
2432    }
2433}