Skip to main content

cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the components it runs.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{
6    BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7    TaskKind, resolve_task_kind_for_id,
8};
9use crate::context::CuContext;
10use crate::cutask::CuMsgMetadata;
11use bincode::Encode;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SizeWriter;
15use compact_str::CompactString;
16use cu29_clock::CuDuration;
17#[allow(unused_imports)]
18use cu29_log::CuLogLevel;
19#[cfg(all(feature = "std", debug_assertions))]
20use cu29_log_runtime::{
21    format_message_only, register_live_log_listener, unregister_live_log_listener,
22};
23use cu29_traits::{
24    CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
25    finish_observed_encode,
26};
27use portable_atomic::{
28    AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
29};
30use serde_derive::{Deserialize, Serialize};
31
32#[cfg(not(feature = "std"))]
33extern crate alloc;
34
35#[cfg(feature = "std")]
36use core::cell::Cell;
37#[cfg(feature = "std")]
38use std::backtrace::Backtrace;
39#[cfg(feature = "std")]
40use std::fs::File;
41#[cfg(feature = "std")]
42use std::io::Write;
43#[cfg(feature = "std")]
44use std::panic::PanicHookInfo;
45#[cfg(feature = "std")]
46use std::sync::{Arc, Mutex as StdMutex, OnceLock};
47#[cfg(feature = "std")]
48use std::thread_local;
49#[cfg(feature = "std")]
50use std::time::{SystemTime, UNIX_EPOCH};
51#[cfg(feature = "std")]
52use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
53
54#[cfg(not(feature = "std"))]
55use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
56#[cfg(not(target_has_atomic = "64"))]
57use spin::Mutex;
58#[cfg(not(feature = "std"))]
59use spin::Mutex as SpinMutex;
60
61#[cfg(not(feature = "std"))]
62mod imp {
63    pub use alloc::alloc::{GlobalAlloc, Layout};
64    #[cfg(target_has_atomic = "64")]
65    pub use core::sync::atomic::AtomicU64;
66    pub use core::sync::atomic::{AtomicUsize, Ordering};
67    pub use libm::sqrt;
68}
69
70#[cfg(feature = "std")]
71mod imp {
72    #[cfg(feature = "memory_monitoring")]
73    use super::CountingAlloc;
74    #[cfg(feature = "memory_monitoring")]
75    pub use std::alloc::System;
76    pub use std::alloc::{GlobalAlloc, Layout};
77    #[cfg(target_has_atomic = "64")]
78    pub use std::sync::atomic::AtomicU64;
79    pub use std::sync::atomic::{AtomicUsize, Ordering};
80    #[cfg(feature = "memory_monitoring")]
81    #[global_allocator]
82    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
83}
84
85use imp::*;
86
87#[cfg(all(feature = "std", debug_assertions))]
88fn format_timestamp(time: CuDuration) -> String {
89    // Render CuTime/CuDuration as HH:mm:ss.xxxx (4 fractional digits of a second).
90    let nanos = time.as_nanos();
91    let total_seconds = nanos / 1_000_000_000;
92    let hours = total_seconds / 3600;
93    let minutes = (total_seconds / 60) % 60;
94    let seconds = total_seconds % 60;
95    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
96    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
97}
98
99/// Lifecycle state of a monitored component.
100#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
101pub enum CuComponentState {
102    Start,
103    Preprocess,
104    Process,
105    Postprocess,
106    Stop,
107}
108
109/// Strongly-typed index into [`CuMonitoringMetadata::components`].
110#[repr(transparent)]
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub struct ComponentId(usize);
113
114impl ComponentId {
115    pub const INVALID: Self = Self(usize::MAX);
116
117    #[inline]
118    pub const fn new(index: usize) -> Self {
119        Self(index)
120    }
121
122    #[inline]
123    pub const fn index(self) -> usize {
124        self.0
125    }
126}
127
128impl core::fmt::Display for ComponentId {
129    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
130        self.0.fmt(f)
131    }
132}
133
134impl From<ComponentId> for usize {
135    fn from(value: ComponentId) -> Self {
136        value.index()
137    }
138}
139
140/// Strongly-typed CopperList slot index.
141#[repr(transparent)]
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
143pub struct CuListSlot(usize);
144
145impl CuListSlot {
146    #[inline]
147    pub const fn new(index: usize) -> Self {
148        Self(index)
149    }
150
151    #[inline]
152    pub const fn index(self) -> usize {
153        self.0
154    }
155}
156
157impl From<CuListSlot> for usize {
158    fn from(value: CuListSlot) -> Self {
159        value.index()
160    }
161}
162
163/// Static monitor-side CopperList indexing layout.
164///
165/// This layout is mission/runtime scoped and remains constant after monitor construction.
166#[derive(Debug, Clone, Copy)]
167pub struct CopperListLayout {
168    components: &'static [MonitorComponentMetadata],
169    slot_to_component: &'static [ComponentId],
170}
171
172impl CopperListLayout {
173    #[inline]
174    pub const fn new(
175        components: &'static [MonitorComponentMetadata],
176        slot_to_component: &'static [ComponentId],
177    ) -> Self {
178        Self {
179            components,
180            slot_to_component,
181        }
182    }
183
184    #[inline]
185    pub const fn components(self) -> &'static [MonitorComponentMetadata] {
186        self.components
187    }
188
189    #[inline]
190    pub const fn component_count(self) -> usize {
191        self.components.len()
192    }
193
194    #[inline]
195    pub const fn culist_slot_count(self) -> usize {
196        self.slot_to_component.len()
197    }
198
199    #[inline]
200    pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
201        &self.components[id.index()]
202    }
203
204    #[inline]
205    pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
206        self.slot_to_component[culist_slot.index()]
207    }
208
209    #[inline]
210    pub const fn slot_to_component(self) -> &'static [ComponentId] {
211        self.slot_to_component
212    }
213
214    #[inline]
215    pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
216        CopperListView::new(self, msgs)
217    }
218}
219
220/// Per-loop monitor view over CopperList metadata paired with static component mapping.
221#[derive(Debug, Clone, Copy)]
222pub struct CopperListView<'a> {
223    layout: CopperListLayout,
224    msgs: &'a [&'a CuMsgMetadata],
225}
226
227impl<'a> CopperListView<'a> {
228    #[inline]
229    pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
230        assert_eq!(
231            msgs.len(),
232            layout.culist_slot_count(),
233            "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
234            msgs.len(),
235            layout.culist_slot_count()
236        );
237        Self { layout, msgs }
238    }
239
240    #[inline]
241    pub const fn layout(self) -> CopperListLayout {
242        self.layout
243    }
244
245    #[inline]
246    pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
247        self.msgs
248    }
249
250    #[inline]
251    pub const fn len(self) -> usize {
252        self.msgs.len()
253    }
254
255    #[inline]
256    pub const fn is_empty(self) -> bool {
257        self.msgs.is_empty()
258    }
259
260    #[inline]
261    pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
262        let index = culist_slot.index();
263        CopperListEntry {
264            culist_slot,
265            component_id: self.layout.component_for_slot(culist_slot),
266            msg: self.msgs[index],
267        }
268    }
269
270    pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
271        self.msgs.iter().enumerate().map(move |(idx, msg)| {
272            let culist_slot = CuListSlot::new(idx);
273            CopperListEntry {
274                culist_slot,
275                component_id: self.layout.component_for_slot(culist_slot),
276                msg,
277            }
278        })
279    }
280}
281
282/// One message entry in CopperList slot order with resolved component identity.
283#[derive(Debug, Clone, Copy)]
284pub struct CopperListEntry<'a> {
285    pub culist_slot: CuListSlot,
286    pub component_id: ComponentId,
287    pub msg: &'a CuMsgMetadata,
288}
289
290impl<'a> CopperListEntry<'a> {
291    #[inline]
292    pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
293        layout.component(self.component_id)
294    }
295
296    #[inline]
297    pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
298        layout.component(self.component_id).kind()
299    }
300}
301
302/// Execution progress marker emitted by the runtime before running a component step.
303#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
304pub struct ExecutionMarker {
305    /// Index into `CuMonitoringMetadata::components()`.
306    pub component_id: ComponentId,
307    /// Lifecycle phase currently entered.
308    pub step: CuComponentState,
309    /// CopperList id when available (runtime loop), None during start/stop.
310    pub culistid: Option<u64>,
311}
312
313/// Lock-free runtime-side progress probe.
314///
315/// The runtime writes execution markers directly into this probe from the hot path
316/// (without calling monitor fan-out callbacks), and monitors can read a coherent
317/// snapshot from watchdog threads when diagnosing stalls.
318#[derive(Debug)]
319pub struct RuntimeExecutionProbe {
320    component_id: AtomicUsize,
321    step: AtomicUsize,
322    #[cfg(target_has_atomic = "64")]
323    culistid: AtomicU64,
324    #[cfg(target_has_atomic = "64")]
325    culistid_present: AtomicUsize,
326    #[cfg(not(target_has_atomic = "64"))]
327    culistid: Mutex<Option<u64>>,
328    sequence: AtomicUsize,
329}
330
331impl Default for RuntimeExecutionProbe {
332    fn default() -> Self {
333        Self {
334            component_id: AtomicUsize::new(ComponentId::INVALID.index()),
335            step: AtomicUsize::new(0),
336            #[cfg(target_has_atomic = "64")]
337            culistid: AtomicU64::new(0),
338            #[cfg(target_has_atomic = "64")]
339            culistid_present: AtomicUsize::new(0),
340            #[cfg(not(target_has_atomic = "64"))]
341            culistid: Mutex::new(None),
342            sequence: AtomicUsize::new(0),
343        }
344    }
345}
346
347impl RuntimeExecutionProbe {
348    #[inline]
349    pub fn record(&self, marker: ExecutionMarker) {
350        self.component_id
351            .store(marker.component_id.index(), Ordering::Relaxed);
352        self.step
353            .store(component_state_to_usize(marker.step), Ordering::Relaxed);
354        #[cfg(target_has_atomic = "64")]
355        match marker.culistid {
356            Some(culistid) => {
357                self.culistid.store(culistid, Ordering::Relaxed);
358                self.culistid_present.store(1, Ordering::Relaxed);
359            }
360            None => {
361                self.culistid_present.store(0, Ordering::Relaxed);
362            }
363        }
364        #[cfg(not(target_has_atomic = "64"))]
365        {
366            *self.culistid.lock() = marker.culistid;
367        }
368        self.sequence.fetch_add(1, Ordering::Release);
369    }
370
371    #[inline]
372    pub fn sequence(&self) -> usize {
373        self.sequence.load(Ordering::Acquire)
374    }
375
376    #[inline]
377    pub fn marker(&self) -> Option<ExecutionMarker> {
378        // Read a coherent snapshot. A concurrent writer may change values between reads;
379        // in that case we retry to keep the marker and sequence aligned.
380        loop {
381            let seq_before = self.sequence.load(Ordering::Acquire);
382            let component_id = self.component_id.load(Ordering::Relaxed);
383            let step = self.step.load(Ordering::Relaxed);
384            #[cfg(target_has_atomic = "64")]
385            let culistid_present = self.culistid_present.load(Ordering::Relaxed);
386            #[cfg(target_has_atomic = "64")]
387            let culistid_value = self.culistid.load(Ordering::Relaxed);
388            #[cfg(not(target_has_atomic = "64"))]
389            let culistid = *self.culistid.lock();
390            let seq_after = self.sequence.load(Ordering::Acquire);
391            if seq_before == seq_after {
392                if component_id == ComponentId::INVALID.index() {
393                    return None;
394                }
395                let step = usize_to_component_state(step);
396                #[cfg(target_has_atomic = "64")]
397                let culistid = if culistid_present == 0 {
398                    None
399                } else {
400                    Some(culistid_value)
401                };
402                return Some(ExecutionMarker {
403                    component_id: ComponentId::new(component_id),
404                    step,
405                    culistid,
406                });
407            }
408        }
409    }
410}
411
412#[inline]
413const fn component_state_to_usize(step: CuComponentState) -> usize {
414    match step {
415        CuComponentState::Start => 0,
416        CuComponentState::Preprocess => 1,
417        CuComponentState::Process => 2,
418        CuComponentState::Postprocess => 3,
419        CuComponentState::Stop => 4,
420    }
421}
422
423#[inline]
424const fn usize_to_component_state(step: usize) -> CuComponentState {
425    match step {
426        0 => CuComponentState::Start,
427        1 => CuComponentState::Preprocess,
428        2 => CuComponentState::Process,
429        3 => CuComponentState::Postprocess,
430        _ => CuComponentState::Stop,
431    }
432}
433
434#[cfg(feature = "std")]
435pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
436
437/// Platform-neutral monitor view of runtime execution progress.
438///
439/// In `std` builds this can wrap a shared runtime probe. In `no_std` builds it is currently
440/// unavailable and helper methods return `None`/`false`.
441#[derive(Debug, Clone)]
442pub struct MonitorExecutionProbe {
443    #[cfg(feature = "std")]
444    inner: Option<ExecutionProbeHandle>,
445}
446
447impl Default for MonitorExecutionProbe {
448    fn default() -> Self {
449        Self::unavailable()
450    }
451}
452
453impl MonitorExecutionProbe {
454    #[cfg(feature = "std")]
455    pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
456        Self {
457            inner: Some(handle),
458        }
459    }
460
461    pub const fn unavailable() -> Self {
462        Self {
463            #[cfg(feature = "std")]
464            inner: None,
465        }
466    }
467
468    pub fn is_available(&self) -> bool {
469        #[cfg(feature = "std")]
470        {
471            self.inner.is_some()
472        }
473        #[cfg(not(feature = "std"))]
474        {
475            false
476        }
477    }
478
479    pub fn marker(&self) -> Option<ExecutionMarker> {
480        #[cfg(feature = "std")]
481        {
482            self.inner.as_ref().and_then(|probe| probe.marker())
483        }
484        #[cfg(not(feature = "std"))]
485        {
486            None
487        }
488    }
489
490    pub fn sequence(&self) -> Option<usize> {
491        #[cfg(feature = "std")]
492        {
493            self.inner.as_ref().map(|probe| probe.sequence())
494        }
495        #[cfg(not(feature = "std"))]
496        {
497            None
498        }
499    }
500}
501
502/// Runtime component category used by monitoring metadata and topology.
503///
504/// A "task" is a regular Copper task (lifecycle callbacks + payload processing). A "bridge"
505/// is a monitored bridge-side execution component (bridge nodes and channel endpoints).
506#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507#[non_exhaustive]
508pub enum ComponentType {
509    Source,
510    Task,
511    Sink,
512    Bridge,
513}
514
515impl ComponentType {
516    pub const fn is_task(self) -> bool {
517        !matches!(self, Self::Bridge)
518    }
519}
520
521/// Static identity entry for one monitored runtime component.
522#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523pub struct MonitorComponentMetadata {
524    id: &'static str,
525    kind: ComponentType,
526    type_name: Option<&'static str>,
527}
528
529impl MonitorComponentMetadata {
530    pub const fn new(
531        id: &'static str,
532        kind: ComponentType,
533        type_name: Option<&'static str>,
534    ) -> Self {
535        Self {
536            id,
537            kind,
538            type_name,
539        }
540    }
541
542    /// Stable monitor component id (for logs/debug and joins with runtime markers).
543    pub const fn id(&self) -> &'static str {
544        self.id
545    }
546
547    pub const fn kind(&self) -> ComponentType {
548        self.kind
549    }
550
551    /// Rust type label when available (typically tasks); `None` for synthetic bridge entries.
552    pub const fn type_name(&self) -> Option<&'static str> {
553        self.type_name
554    }
555}
556
557/// Immutable runtime-provided metadata passed once to [`CuMonitor::new`].
558///
559/// This bundles identifiers, deterministic component layout, and monitor-specific config so monitor
560/// construction is explicit and does not need ad-hoc late setters.
561#[derive(Debug, Clone)]
562pub struct CuMonitoringMetadata {
563    mission_id: CompactString,
564    subsystem_id: Option<CompactString>,
565    instance_id: u32,
566    layout: CopperListLayout,
567    copperlist_info: CopperListInfo,
568    topology: MonitorTopology,
569    monitor_config: Option<ComponentConfig>,
570}
571
572impl CuMonitoringMetadata {
573    pub fn new(
574        mission_id: CompactString,
575        components: &'static [MonitorComponentMetadata],
576        culist_component_mapping: &'static [ComponentId],
577        copperlist_info: CopperListInfo,
578        topology: MonitorTopology,
579        monitor_config: Option<ComponentConfig>,
580    ) -> CuResult<Self> {
581        Self::validate_components(components)?;
582        Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
583        Ok(Self {
584            mission_id,
585            subsystem_id: None,
586            instance_id: 0,
587            layout: CopperListLayout::new(components, culist_component_mapping),
588            copperlist_info,
589            topology,
590            monitor_config,
591        })
592    }
593
594    fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
595        let mut seen_bridge = false;
596        for component in components {
597            match component.kind() {
598                component_type if component_type.is_task() && seen_bridge => {
599                    return Err(CuError::from(
600                        "invalid monitor metadata: task-family components must appear before bridges",
601                    ));
602                }
603                ComponentType::Bridge => seen_bridge = true,
604                _ => {}
605            }
606        }
607        Ok(())
608    }
609
610    fn validate_culist_mapping(
611        components_len: usize,
612        culist_component_mapping: &'static [ComponentId],
613    ) -> CuResult<()> {
614        for component_idx in culist_component_mapping {
615            if component_idx.index() >= components_len {
616                return Err(CuError::from(
617                    "invalid monitor metadata: culist mapping points past components table",
618                ));
619            }
620        }
621        Ok(())
622    }
623
624    /// Active mission identifier for this runtime instance.
625    pub fn mission_id(&self) -> &str {
626        self.mission_id.as_str()
627    }
628
629    /// Compile-time subsystem identifier for this runtime instance when running in a
630    /// multi-Copper deployment.
631    pub fn subsystem_id(&self) -> Option<&str> {
632        self.subsystem_id.as_deref()
633    }
634
635    /// Deployment/runtime instance identity for this runtime instance.
636    pub fn instance_id(&self) -> u32 {
637        self.instance_id
638    }
639
640    /// Canonical table of monitored runtime components.
641    ///
642    /// Ordering is deterministic and mission-scoped: tasks first, then bridge-side components.
643    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
644        self.layout.components()
645    }
646
647    /// Total number of monitored components.
648    pub const fn component_count(&self) -> usize {
649        self.layout.component_count()
650    }
651
652    /// Static runtime layout used to map CopperList slots to components.
653    pub const fn layout(&self) -> CopperListLayout {
654        self.layout
655    }
656
657    pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
658        self.layout.component(component_id)
659    }
660
661    pub fn component_id(&self, component_id: ComponentId) -> &'static str {
662        self.component(component_id).id()
663    }
664
665    pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
666        self.component(component_id).kind()
667    }
668
669    pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
670        self.layout
671            .components()
672            .iter()
673            .position(|component| component.id() == component_id)
674            .map(ComponentId::new)
675    }
676
677    /// CopperList slot -> monitored component index mapping.
678    ///
679    /// This table maps each CopperList slot index to the producing component index.
680    pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
681        self.layout.slot_to_component()
682    }
683
684    pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
685        self.layout.component_for_slot(culist_slot)
686    }
687
688    pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
689        self.layout.view(msgs)
690    }
691
692    pub const fn copperlist_info(&self) -> CopperListInfo {
693        self.copperlist_info
694    }
695
696    /// Resolved graph topology for the active mission.
697    ///
698    /// This is always available. Nodes represent config graph nodes, not every synthetic bridge
699    /// channel entry in `components()`.
700    pub fn topology(&self) -> &MonitorTopology {
701        &self.topology
702    }
703
704    pub fn monitor_config(&self) -> Option<&ComponentConfig> {
705        self.monitor_config.as_ref()
706    }
707
708    pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
709        self.monitor_config = monitor_config;
710        self
711    }
712
713    pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
714        self.subsystem_id = subsystem_id.map(CompactString::from);
715        self
716    }
717
718    pub fn with_instance_id(mut self, instance_id: u32) -> Self {
719        self.instance_id = instance_id;
720        self
721    }
722}
723
724/// Runtime-provided dynamic monitoring handles passed once to [`CuMonitor::new`].
725///
726/// This context may expose live runtime state (for example execution progress probes).
727#[derive(Debug, Clone, Default)]
728pub struct CuMonitoringRuntime {
729    execution_probe: MonitorExecutionProbe,
730}
731
732impl CuMonitoringRuntime {
733    #[cfg(feature = "std")]
734    pub fn new(execution_probe: MonitorExecutionProbe) -> Self {
735        ensure_runtime_panic_hook_installed();
736        Self { execution_probe }
737    }
738
739    #[cfg(not(feature = "std"))]
740    pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
741        Self { execution_probe }
742    }
743
744    #[cfg(feature = "std")]
745    pub fn unavailable() -> Self {
746        Self::new(MonitorExecutionProbe::unavailable())
747    }
748
749    #[cfg(not(feature = "std"))]
750    pub const fn unavailable() -> Self {
751        Self::new(MonitorExecutionProbe::unavailable())
752    }
753
754    pub fn execution_probe(&self) -> &MonitorExecutionProbe {
755        &self.execution_probe
756    }
757
758    #[cfg(feature = "std")]
759    pub fn register_panic_cleanup<F>(&self, callback: F) -> PanicHookRegistration
760    where
761        F: Fn(&PanicReport) + Send + Sync + 'static,
762    {
763        ensure_runtime_panic_hook_installed();
764        register_panic_cleanup(callback)
765    }
766
767    #[cfg(feature = "std")]
768    pub fn register_panic_action<F>(&self, callback: F) -> PanicHookRegistration
769    where
770        F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
771    {
772        ensure_runtime_panic_hook_installed();
773        register_panic_action(callback)
774    }
775}
776
777#[cfg(feature = "std")]
778type PanicCleanupCallback = Arc<dyn Fn(&PanicReport) + Send + Sync + 'static>;
779#[cfg(feature = "std")]
780type PanicActionCallback = Arc<dyn Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static>;
781
782#[cfg(feature = "std")]
783#[derive(Debug, Clone)]
784pub struct PanicReport {
785    message: String,
786    location: Option<String>,
787    thread_name: Option<String>,
788    backtrace: String,
789    timestamp_unix_ms: u128,
790    crash_report_path: Option<String>,
791}
792
793#[cfg(feature = "std")]
794impl PanicReport {
795    fn capture(info: &PanicHookInfo<'_>) -> Self {
796        let location = info
797            .location()
798            .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()));
799        let thread_name = std::thread::current().name().map(|name| name.to_string());
800        let timestamp_unix_ms = SystemTime::now()
801            .duration_since(UNIX_EPOCH)
802            .map(|dur| dur.as_millis())
803            .unwrap_or(0);
804
805        Self {
806            message: panic_hook_payload_to_string(info),
807            location,
808            thread_name,
809            backtrace: Backtrace::force_capture().to_string(),
810            timestamp_unix_ms,
811            crash_report_path: None,
812        }
813    }
814
815    pub fn message(&self) -> &str {
816        &self.message
817    }
818
819    pub fn location(&self) -> Option<&str> {
820        self.location.as_deref()
821    }
822
823    pub fn thread_name(&self) -> Option<&str> {
824        self.thread_name.as_deref()
825    }
826
827    pub fn backtrace(&self) -> &str {
828        &self.backtrace
829    }
830
831    pub fn timestamp_unix_ms(&self) -> u128 {
832        self.timestamp_unix_ms
833    }
834
835    pub fn crash_report_path(&self) -> Option<&str> {
836        self.crash_report_path.as_deref()
837    }
838
839    pub fn summary(&self) -> String {
840        match self.location() {
841            Some(location) => format!("panic at {location}: {}", self.message()),
842            None => format!("panic: {}", self.message()),
843        }
844    }
845}
846
847#[cfg(feature = "std")]
848#[derive(Clone, Copy, Debug, PartialEq, Eq)]
849enum PanicHookRegistrationKind {
850    Cleanup,
851    Action,
852}
853
854#[cfg(feature = "std")]
855#[derive(Clone)]
856struct RegisteredPanicCleanup {
857    id: usize,
858    callback: PanicCleanupCallback,
859}
860
861#[cfg(feature = "std")]
862#[derive(Clone)]
863struct RegisteredPanicAction {
864    id: usize,
865    callback: PanicActionCallback,
866}
867
868#[cfg(feature = "std")]
869#[derive(Default)]
870struct PanicHookRegistry {
871    cleanup_callbacks: StdMutex<Vec<RegisteredPanicCleanup>>,
872    action_callbacks: StdMutex<Vec<RegisteredPanicAction>>,
873}
874
875#[cfg(feature = "std")]
876#[derive(Debug)]
877pub struct PanicHookRegistration {
878    id: usize,
879    kind: PanicHookRegistrationKind,
880}
881
882#[cfg(feature = "std")]
883impl Drop for PanicHookRegistration {
884    fn drop(&mut self) {
885        unregister_panic_hook(self.kind, self.id);
886    }
887}
888
889#[cfg(feature = "std")]
890static PANIC_HOOK_REGISTRY: OnceLock<PanicHookRegistry> = OnceLock::new();
891#[cfg(feature = "std")]
892static PANIC_HOOK_INSTALL_ONCE: OnceLock<()> = OnceLock::new();
893#[cfg(feature = "std")]
894static PANIC_HOOK_REGISTRATION_ID: AtomicUsize = AtomicUsize::new(1);
895#[cfg(feature = "std")]
896static PANIC_HOOK_ACTIVE_COUNT: AtomicUsize = AtomicUsize::new(0);
897
898#[cfg(feature = "std")]
899fn panic_hook_registry() -> &'static PanicHookRegistry {
900    PANIC_HOOK_REGISTRY.get_or_init(PanicHookRegistry::default)
901}
902
903#[cfg(feature = "std")]
904fn ensure_runtime_panic_hook_installed() {
905    let _ = PANIC_HOOK_INSTALL_ONCE.get_or_init(|| {
906        std::panic::set_hook(Box::new(move |info| {
907            let _guard = PanicHookActiveGuard::new();
908            let mut report = PanicReport::capture(info);
909            run_panic_cleanup_callbacks(&report);
910            report.crash_report_path = write_panic_report_to_file(&report);
911            emit_panic_report(&report);
912
913            if let Some(exit_code) = run_panic_action_callbacks(&report) {
914                std::process::exit(exit_code);
915            }
916        }));
917    });
918}
919
920#[cfg(feature = "std")]
921struct PanicHookActiveGuard;
922
923#[cfg(feature = "std")]
924impl PanicHookActiveGuard {
925    fn new() -> Self {
926        PANIC_HOOK_ACTIVE_COUNT.fetch_add(1, Ordering::SeqCst);
927        Self
928    }
929}
930
931#[cfg(feature = "std")]
932impl Drop for PanicHookActiveGuard {
933    fn drop(&mut self) {
934        PANIC_HOOK_ACTIVE_COUNT.fetch_sub(1, Ordering::SeqCst);
935    }
936}
937
938#[cfg(feature = "std")]
939pub fn runtime_panic_hook_active() -> bool {
940    PANIC_HOOK_ACTIVE_COUNT.load(Ordering::SeqCst) > 0
941}
942
943#[cfg(not(feature = "std"))]
944pub const fn runtime_panic_hook_active() -> bool {
945    false
946}
947
948#[cfg(feature = "std")]
949fn register_panic_cleanup<F>(callback: F) -> PanicHookRegistration
950where
951    F: Fn(&PanicReport) + Send + Sync + 'static,
952{
953    let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
954    let callback = Arc::new(callback) as PanicCleanupCallback;
955    let mut callbacks = panic_hook_registry()
956        .cleanup_callbacks
957        .lock()
958        .unwrap_or_else(|poison| poison.into_inner());
959    callbacks.push(RegisteredPanicCleanup { id, callback });
960    PanicHookRegistration {
961        id,
962        kind: PanicHookRegistrationKind::Cleanup,
963    }
964}
965
966#[cfg(feature = "std")]
967fn register_panic_action<F>(callback: F) -> PanicHookRegistration
968where
969    F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
970{
971    let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
972    let callback = Arc::new(callback) as PanicActionCallback;
973    let mut callbacks = panic_hook_registry()
974        .action_callbacks
975        .lock()
976        .unwrap_or_else(|poison| poison.into_inner());
977    callbacks.push(RegisteredPanicAction { id, callback });
978    PanicHookRegistration {
979        id,
980        kind: PanicHookRegistrationKind::Action,
981    }
982}
983
984#[cfg(feature = "std")]
985fn unregister_panic_hook(kind: PanicHookRegistrationKind, id: usize) {
986    let registry = panic_hook_registry();
987    match kind {
988        PanicHookRegistrationKind::Cleanup => {
989            let mut callbacks = registry
990                .cleanup_callbacks
991                .lock()
992                .unwrap_or_else(|poison| poison.into_inner());
993            callbacks.retain(|entry| entry.id != id);
994        }
995        PanicHookRegistrationKind::Action => {
996            let mut callbacks = registry
997                .action_callbacks
998                .lock()
999                .unwrap_or_else(|poison| poison.into_inner());
1000            callbacks.retain(|entry| entry.id != id);
1001        }
1002    }
1003}
1004
1005#[cfg(feature = "std")]
1006fn run_panic_cleanup_callbacks(report: &PanicReport) {
1007    let callbacks = panic_hook_registry()
1008        .cleanup_callbacks
1009        .lock()
1010        .unwrap_or_else(|poison| poison.into_inner())
1011        .clone();
1012    for entry in callbacks {
1013        (entry.callback)(report);
1014    }
1015}
1016
1017#[cfg(feature = "std")]
1018fn run_panic_action_callbacks(report: &PanicReport) -> Option<i32> {
1019    let callbacks = panic_hook_registry()
1020        .action_callbacks
1021        .lock()
1022        .unwrap_or_else(|poison| poison.into_inner())
1023        .clone();
1024    let mut exit_code = None;
1025    for entry in callbacks {
1026        if exit_code.is_none() {
1027            exit_code = (entry.callback)(report);
1028        } else {
1029            let _ = (entry.callback)(report);
1030        }
1031    }
1032    exit_code
1033}
1034
1035#[cfg(feature = "std")]
1036fn panic_hook_payload_to_string(info: &PanicHookInfo<'_>) -> String {
1037    if let Some(msg) = info.payload().downcast_ref::<&str>() {
1038        (*msg).to_string()
1039    } else if let Some(msg) = info.payload().downcast_ref::<String>() {
1040        msg.clone()
1041    } else {
1042        "panic with non-string payload".to_string()
1043    }
1044}
1045
1046#[cfg(feature = "std")]
1047fn render_panic_report(report: &PanicReport) -> String {
1048    let mut rendered = String::from("Copper panic\n");
1049    rendered.push_str(&format!("time_unix_ms: {}\n", report.timestamp_unix_ms()));
1050    rendered.push_str(&format!(
1051        "thread: {}\n",
1052        report.thread_name().unwrap_or("<unnamed>")
1053    ));
1054    if let Some(location) = report.location() {
1055        rendered.push_str(&format!("location: {location}\n"));
1056    }
1057    rendered.push_str(&format!("message: {}\n", report.message()));
1058    if let Some(path) = report.crash_report_path() {
1059        rendered.push_str(&format!("crash_report: {path}\n"));
1060    }
1061    rendered.push_str("\nBacktrace:\n");
1062    rendered.push_str(report.backtrace());
1063    if !report.backtrace().ends_with('\n') {
1064        rendered.push('\n');
1065    }
1066    rendered
1067}
1068
1069#[cfg(feature = "std")]
1070fn emit_panic_report(report: &PanicReport) {
1071    let mut stderr = std::io::stderr().lock();
1072    let _ = stderr.write_all(render_panic_report(report).as_bytes());
1073    let _ = stderr.flush();
1074}
1075
1076#[cfg(feature = "std")]
1077fn write_panic_report_to_file(report: &PanicReport) -> Option<String> {
1078    let cwd = std::env::current_dir().ok()?;
1079    let file_name = format!(
1080        "copper-crash-{}-{}.txt",
1081        report.timestamp_unix_ms(),
1082        std::process::id()
1083    );
1084    let path = cwd.join(file_name);
1085    let path_string = path.to_string_lossy().to_string();
1086    let mut file = File::create(&path).ok()?;
1087    let mut report_with_path = report.clone();
1088    report_with_path.crash_report_path = Some(path_string.clone());
1089    file.write_all(render_panic_report(&report_with_path).as_bytes())
1090        .ok()?;
1091    file.flush().ok()?;
1092    Some(path_string)
1093}
1094
1095/// Monitor decision to be taken when a component step errored out.
1096#[derive(Debug)]
1097pub enum Decision {
1098    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
1099    Ignore, // Ignore this error and try to continue, ie calling the other component steps, setting a None return value and continue a copperlist.
1100    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
1101}
1102
1103fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
1104    use Decision::{Abort, Ignore, Shutdown};
1105    // Pick the strictest monitor decision when multiple monitors disagree.
1106    // Shutdown dominates Abort, which dominates Ignore.
1107    match (lhs, rhs) {
1108        (Shutdown, _) | (_, Shutdown) => Shutdown,
1109        (Abort, _) | (_, Abort) => Abort,
1110        _ => Ignore,
1111    }
1112}
1113
1114#[derive(Debug, Clone)]
1115pub struct MonitorNode {
1116    pub id: String,
1117    pub type_name: Option<String>,
1118    pub kind: ComponentType,
1119    /// Ordered list of input port identifiers.
1120    pub inputs: Vec<String>,
1121    /// Ordered list of output port identifiers.
1122    pub outputs: Vec<String>,
1123}
1124
1125#[derive(Debug, Clone)]
1126pub struct MonitorConnection {
1127    pub src: String,
1128    pub src_port: Option<String>,
1129    pub dst: String,
1130    pub dst_port: Option<String>,
1131    pub msg: String,
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135pub struct MonitorTopology {
1136    pub nodes: Vec<MonitorNode>,
1137    pub connections: Vec<MonitorConnection>,
1138}
1139
1140#[derive(Debug, Clone, Copy, Default)]
1141pub struct CopperListInfo {
1142    pub size_bytes: usize,
1143    pub count: usize,
1144}
1145
1146impl CopperListInfo {
1147    pub const fn new(size_bytes: usize, count: usize) -> Self {
1148        Self { size_bytes, count }
1149    }
1150}
1151
1152/// Reported data about CopperList IO for a single iteration.
1153#[derive(Debug, Clone, Copy, Default)]
1154pub struct CopperListIoStats {
1155    /// CopperList bytes resident in RAM for this iteration.
1156    ///
1157    /// This includes the fixed CopperList struct size plus any pooled or
1158    /// handle-backed payload bytes observed on the real encode path.
1159    pub raw_culist_bytes: u64,
1160    /// Bytes attributed to handle-backed storage while measuring payload IO.
1161    ///
1162    /// This is surfaced separately so monitors can show how much of the runtime
1163    /// footprint lives in pooled payload buffers rather than inside the fixed
1164    /// CopperList struct.
1165    pub handle_bytes: u64,
1166    /// Bytes produced by bincode serialization of the CopperList
1167    pub encoded_culist_bytes: u64,
1168    /// Bytes produced by bincode serialization of the KeyFrame (0 if none)
1169    pub keyframe_bytes: u64,
1170    /// Cumulative bytes written to the structured log stream so far
1171    pub structured_log_bytes_total: u64,
1172    /// CopperList identifier for reference in monitors
1173    pub culistid: u64,
1174}
1175
1176#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1177pub struct PayloadIoStats {
1178    pub resident_bytes: usize,
1179    pub encoded_bytes: usize,
1180    pub handle_bytes: usize,
1181}
1182
1183#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1184pub struct CuMsgIoStats {
1185    pub present: bool,
1186    pub resident_bytes: u64,
1187    pub encoded_bytes: u64,
1188    pub handle_bytes: u64,
1189}
1190
1191struct CuMsgIoEntry {
1192    present: PortableAtomicBool,
1193    resident_bytes: PortableAtomicU64,
1194    encoded_bytes: PortableAtomicU64,
1195    handle_bytes: PortableAtomicU64,
1196}
1197
1198impl CuMsgIoEntry {
1199    fn clear(&self) {
1200        self.present.store(false, PortableOrdering::Release);
1201        self.resident_bytes.store(0, PortableOrdering::Relaxed);
1202        self.encoded_bytes.store(0, PortableOrdering::Relaxed);
1203        self.handle_bytes.store(0, PortableOrdering::Relaxed);
1204    }
1205
1206    fn get(&self) -> CuMsgIoStats {
1207        if !self.present.load(PortableOrdering::Acquire) {
1208            return CuMsgIoStats::default();
1209        }
1210
1211        CuMsgIoStats {
1212            present: true,
1213            resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
1214            encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
1215            handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
1216        }
1217    }
1218
1219    fn set(&self, stats: CuMsgIoStats) {
1220        self.resident_bytes
1221            .store(stats.resident_bytes, PortableOrdering::Relaxed);
1222        self.encoded_bytes
1223            .store(stats.encoded_bytes, PortableOrdering::Relaxed);
1224        self.handle_bytes
1225            .store(stats.handle_bytes, PortableOrdering::Relaxed);
1226        self.present.store(stats.present, PortableOrdering::Release);
1227    }
1228}
1229
1230impl Default for CuMsgIoEntry {
1231    fn default() -> Self {
1232        Self {
1233            present: PortableAtomicBool::new(false),
1234            resident_bytes: PortableAtomicU64::new(0),
1235            encoded_bytes: PortableAtomicU64::new(0),
1236            handle_bytes: PortableAtomicU64::new(0),
1237        }
1238    }
1239}
1240
1241pub struct CuMsgIoCache<const N: usize> {
1242    entries: [CuMsgIoEntry; N],
1243}
1244
1245impl<const N: usize> CuMsgIoCache<N> {
1246    pub fn clear(&self) {
1247        for entry in &self.entries {
1248            entry.clear();
1249        }
1250    }
1251
1252    pub fn get(&self, idx: usize) -> CuMsgIoStats {
1253        self.entries[idx].get()
1254    }
1255
1256    fn raw_parts(&self) -> (usize, usize) {
1257        (self.entries.as_ptr() as usize, N)
1258    }
1259}
1260
1261impl<const N: usize> Default for CuMsgIoCache<N> {
1262    fn default() -> Self {
1263        Self {
1264            entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
1265        }
1266    }
1267}
1268
1269#[derive(Clone, Copy)]
1270struct ActiveCuMsgIoCapture {
1271    cache_addr: usize,
1272    cache_len: usize,
1273    current_slot: Option<usize>,
1274}
1275
1276#[cfg(feature = "std")]
1277thread_local! {
1278    static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
1279    static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
1280    static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
1281}
1282
1283#[cfg(not(feature = "std"))]
1284static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
1285#[cfg(not(feature = "std"))]
1286static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
1287#[cfg(not(feature = "std"))]
1288static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
1289
1290fn begin_payload_io_measurement() {
1291    #[cfg(feature = "std")]
1292    PAYLOAD_HANDLE_BYTES.with(|bytes| {
1293        debug_assert!(
1294            bytes.get().is_none(),
1295            "payload IO byte measurement must not be nested"
1296        );
1297        bytes.set(Some(0));
1298    });
1299
1300    #[cfg(not(feature = "std"))]
1301    {
1302        let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
1303        debug_assert!(
1304            bytes.is_none(),
1305            "payload IO byte measurement must not be nested"
1306        );
1307        *bytes = Some(0);
1308    }
1309}
1310
1311fn finish_payload_io_measurement() -> usize {
1312    #[cfg(feature = "std")]
1313    {
1314        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
1315    }
1316
1317    #[cfg(not(feature = "std"))]
1318    {
1319        PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
1320    }
1321}
1322
1323fn abort_payload_io_measurement() {
1324    #[cfg(feature = "std")]
1325    PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
1326
1327    #[cfg(not(feature = "std"))]
1328    {
1329        *PAYLOAD_HANDLE_BYTES.lock() = None;
1330    }
1331}
1332
1333fn current_payload_io_measurement() -> usize {
1334    #[cfg(feature = "std")]
1335    {
1336        PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
1337    }
1338
1339    #[cfg(not(feature = "std"))]
1340    {
1341        PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
1342    }
1343}
1344
1345/// Records handle-backed payload bytes for the active CopperList IO capture.
1346///
1347/// This is called automatically by handle encoders such as `CuHandle::encode()`.
1348/// Custom log codecs report equivalent bytes via `CuLogCodec::source_payload_handle_bytes()`.
1349pub(crate) fn record_payload_handle_bytes(bytes: usize) {
1350    #[cfg(feature = "std")]
1351    PAYLOAD_HANDLE_BYTES.with(|total| {
1352        if let Some(current) = total.get() {
1353            total.set(Some(current.saturating_add(bytes)));
1354        }
1355    });
1356
1357    #[cfg(not(feature = "std"))]
1358    {
1359        let mut total = PAYLOAD_HANDLE_BYTES.lock();
1360        if let Some(current) = *total {
1361            *total = Some(current.saturating_add(bytes));
1362        }
1363    }
1364}
1365
1366fn set_last_completed_handle_bytes(bytes: u64) {
1367    #[cfg(feature = "std")]
1368    LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1369
1370    #[cfg(not(feature = "std"))]
1371    {
1372        *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1373    }
1374}
1375
1376pub fn take_last_completed_handle_bytes() -> u64 {
1377    #[cfg(feature = "std")]
1378    {
1379        LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1380    }
1381
1382    #[cfg(not(feature = "std"))]
1383    {
1384        let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1385        let value = *total;
1386        *total = 0;
1387        value
1388    }
1389}
1390
1391fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1392    #[cfg(feature = "std")]
1393    {
1394        ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1395            let mut state = capture.get()?;
1396            let result = f(&mut state);
1397            capture.set(Some(state));
1398            Some(result)
1399        })
1400    }
1401
1402    #[cfg(not(feature = "std"))]
1403    {
1404        let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1405        let state = capture.as_mut()?;
1406        Some(f(state))
1407    }
1408}
1409
1410pub struct CuMsgIoCaptureGuard;
1411
1412impl CuMsgIoCaptureGuard {
1413    pub fn select_slot(&self, slot: usize) {
1414        let _ = with_active_capture_mut(|capture| {
1415            debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1416            capture.current_slot = Some(slot);
1417        });
1418    }
1419}
1420
1421impl Drop for CuMsgIoCaptureGuard {
1422    fn drop(&mut self) {
1423        set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1424
1425        #[cfg(feature = "std")]
1426        ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1427
1428        #[cfg(not(feature = "std"))]
1429        {
1430            *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1431        }
1432    }
1433}
1434
1435pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1436    cache.clear();
1437    set_last_completed_handle_bytes(0);
1438    begin_payload_io_measurement();
1439    let (cache_addr, cache_len) = cache.raw_parts();
1440    let capture = ActiveCuMsgIoCapture {
1441        cache_addr,
1442        cache_len,
1443        current_slot: None,
1444    };
1445
1446    #[cfg(feature = "std")]
1447    ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1448        debug_assert!(
1449            state.get().is_none(),
1450            "CopperList payload IO capture must not be nested"
1451        );
1452        state.set(Some(capture));
1453    });
1454
1455    #[cfg(not(feature = "std"))]
1456    {
1457        let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1458        debug_assert!(
1459            state.is_none(),
1460            "CopperList payload IO capture must not be nested"
1461        );
1462        *state = Some(capture);
1463    }
1464
1465    CuMsgIoCaptureGuard
1466}
1467
1468pub(crate) fn current_payload_handle_bytes() -> usize {
1469    current_payload_io_measurement()
1470}
1471
1472pub(crate) fn record_current_slot_payload_io_stats(
1473    fixed_bytes: usize,
1474    encoded_bytes: usize,
1475    handle_bytes: usize,
1476) {
1477    let _ = with_active_capture_mut(|capture| {
1478        let Some(slot) = capture.current_slot else {
1479            return;
1480        };
1481        if slot >= capture.cache_len {
1482            return;
1483        }
1484        // SAFETY: the capture guard holds the cache alive for the duration of the encode pass.
1485        let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1486        let entry = unsafe { &*cache_ptr.add(slot) };
1487        entry.set(CuMsgIoStats {
1488            present: true,
1489            resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1490            encoded_bytes: encoded_bytes as u64,
1491            handle_bytes: handle_bytes as u64,
1492        });
1493    });
1494}
1495
1496/// Measures payload bytes using the same encode path Copper uses for
1497/// logging/export.
1498///
1499/// `resident_bytes` is the payload's in-memory fixed footprint plus any
1500/// handle-backed dynamic storage reported during encoding. `encoded_bytes` is
1501/// the exact bincode payload size.
1502pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1503where
1504    T: Encode,
1505{
1506    begin_payload_io_measurement();
1507    begin_observed_encode();
1508
1509    let result = (|| {
1510        let mut encoder =
1511            EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1512        payload.encode(&mut encoder).map_err(|e| {
1513            CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1514        })?;
1515        let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1516        debug_assert_eq!(encoded_bytes, finish_observed_encode());
1517        let handle_bytes = finish_payload_io_measurement();
1518        Ok(PayloadIoStats {
1519            resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1520            encoded_bytes,
1521            handle_bytes,
1522        })
1523    })();
1524
1525    if result.is_err() {
1526        abort_payload_io_measurement();
1527        abort_observed_encode();
1528    }
1529
1530    result
1531}
1532
1533fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1534    let Ok(msg_types) = graph.get_node_output_msg_types_by_id(node_id) else {
1535        return Vec::new();
1536    };
1537
1538    let mut outputs = Vec::new();
1539    for (port_idx, msg) in msg_types.into_iter().enumerate() {
1540        let mut port_label = String::from("out");
1541        port_label.push_str(&port_idx.to_string());
1542        port_label.push_str(": ");
1543        port_label.push_str(msg.as_str());
1544        outputs.push((msg, port_label));
1545    }
1546    outputs
1547}
1548
1549/// Derive a monitor-friendly topology from the runtime configuration.
1550pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1551    let graph = config.get_graph(Some(mission))?;
1552    let mut nodes: Map<String, MonitorNode> = Map::new();
1553    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1554
1555    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1556    for bridge in &config.bridges {
1557        bridge_lookup.insert(bridge.id.as_str(), bridge);
1558    }
1559
1560    for (node_idx, node) in graph.get_all_nodes() {
1561        let node_id = node.get_id();
1562        let task_kind = match node.get_flavor() {
1563            Flavor::Bridge => ComponentType::Bridge,
1564            Flavor::Task => match resolve_task_kind_for_id(graph, node_idx)? {
1565                TaskKind::Source => ComponentType::Source,
1566                TaskKind::Regular => ComponentType::Task,
1567                TaskKind::Sink => ComponentType::Sink,
1568            },
1569        };
1570
1571        let mut inputs = Vec::new();
1572        let mut outputs = Vec::new();
1573        if task_kind == ComponentType::Bridge
1574            && let Some(bridge) = bridge_lookup.get(node_id.as_str())
1575        {
1576            for ch in &bridge.channels {
1577                match ch {
1578                    BridgeChannelConfigRepresentation::Rx { id, .. } => outputs.push(id.clone()),
1579                    BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1580                }
1581            }
1582        } else {
1583            match task_kind {
1584                ComponentType::Source => {
1585                    let ports = collect_output_ports(graph, node_idx);
1586                    let mut port_map: Map<String, String> = Map::new();
1587                    for (msg_type, label) in ports {
1588                        port_map.insert(msg_type, label.clone());
1589                        outputs.push(label);
1590                    }
1591                    output_port_lookup.insert(node_id.clone(), port_map);
1592                }
1593                ComponentType::Task => {
1594                    inputs.push("in".to_string());
1595                    let ports = collect_output_ports(graph, node_idx);
1596                    let mut port_map: Map<String, String> = Map::new();
1597                    for (msg_type, label) in ports {
1598                        port_map.insert(msg_type, label.clone());
1599                        outputs.push(label);
1600                    }
1601                    output_port_lookup.insert(node_id.clone(), port_map);
1602                }
1603                ComponentType::Sink => {
1604                    inputs.push("in".to_string());
1605                }
1606                ComponentType::Bridge => unreachable!("handled above"),
1607            }
1608        }
1609
1610        nodes.insert(
1611            node_id.clone(),
1612            MonitorNode {
1613                id: node_id,
1614                type_name: Some(node.get_type().to_string()),
1615                kind: task_kind,
1616                inputs,
1617                outputs,
1618            },
1619        );
1620    }
1621
1622    let mut connections = Vec::new();
1623    for cnx in graph.edges() {
1624        let src = cnx.src.clone();
1625        let dst = cnx.dst.clone();
1626
1627        let src_port = cnx.src_channel.clone().or_else(|| {
1628            output_port_lookup
1629                .get(&src)
1630                .and_then(|ports| ports.get(&cnx.msg).cloned())
1631                .or_else(|| {
1632                    nodes
1633                        .get(&src)
1634                        .and_then(|node| node.outputs.first().cloned())
1635                })
1636        });
1637        let dst_port = cnx.dst_channel.clone().or_else(|| {
1638            nodes
1639                .get(&dst)
1640                .and_then(|node| node.inputs.first().cloned())
1641        });
1642
1643        connections.push(MonitorConnection {
1644            src,
1645            src_port,
1646            dst,
1647            dst_port,
1648            msg: cnx.msg.clone(),
1649        });
1650    }
1651
1652    Ok(MonitorTopology {
1653        nodes: nodes.into_values().collect(),
1654        connections,
1655    })
1656}
1657
1658/// Runtime monitoring contract implemented by monitor components.
1659///
1660/// Lifecycle:
1661/// 1. [`CuMonitor::new`] is called once at runtime construction time.
1662/// 2. [`CuMonitor::start`] is called once before the first runtime iteration.
1663/// 3. For each iteration, [`CuMonitor::process_copperlist`] is called after component execution,
1664///    then [`CuMonitor::observe_copperlist_io`] after serialization accounting.
1665/// 4. [`CuMonitor::process_error`] is called synchronously when a monitored component step fails.
1666/// 5. [`CuMonitor::process_panic`] is called when the runtime catches a panic (`std` builds).
1667/// 6. [`CuMonitor::stop`] is called once during runtime shutdown.
1668///
1669/// Indexing model:
1670/// - `process_error(component_id, ..)` uses component indices into `metadata.components()`.
1671/// - `process_copperlist(..., view)` iterates CopperList slots with resolved component identity.
1672///
1673/// Error policy:
1674/// - [`Decision::Ignore`] continues execution.
1675/// - [`Decision::Abort`] aborts the current operation (step/copperlist scope).
1676/// - [`Decision::Shutdown`] triggers runtime shutdown.
1677pub trait CuMonitor: Sized {
1678    /// Construct the monitor once, before component execution starts.
1679    ///
1680    /// `metadata` contains mission/config/topology/static mapping information.
1681    /// `runtime` exposes dynamic runtime handles (for example execution probes).
1682    /// Use `metadata.monitor_config()` to decode monitor-specific parameters.
1683    fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1684    where
1685        Self: Sized;
1686
1687    /// Called once before processing the first CopperList.
1688    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1689        Ok(())
1690    }
1691
1692    /// Called once per processed CopperList after component execution.
1693    fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1694
1695    /// Called when runtime finishes CopperList serialization/IO accounting.
1696    fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1697
1698    /// Called when a monitored component step fails; must return an immediate runtime decision.
1699    ///
1700    /// `component_id` is an index into [`CuMonitoringMetadata::components`].
1701    fn process_error(
1702        &self,
1703        component_id: ComponentId,
1704        step: CuComponentState,
1705        error: &CuError,
1706    ) -> Decision;
1707
1708    /// Called when the runtime catches a panic (`std` builds).
1709    fn process_panic(&self, _panic_message: &str) {}
1710
1711    /// Called once during runtime shutdown.
1712    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1713        Ok(())
1714    }
1715}
1716
1717/// A do nothing monitor if no monitor is provided.
1718/// This is basically defining the default behavior of Copper in case of error.
1719pub struct NoMonitor {}
1720impl CuMonitor for NoMonitor {
1721    fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1722        Ok(NoMonitor {})
1723    }
1724
1725    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1726        #[cfg(all(feature = "std", debug_assertions))]
1727        register_live_log_listener(|entry, format_str, param_names| {
1728            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1729            let named: Map<String, String> = param_names
1730                .iter()
1731                .zip(params.iter())
1732                .map(|(k, v)| (k.to_string(), v.clone()))
1733                .collect();
1734
1735            if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1736                let ts = format_timestamp(entry.time.into());
1737                println!("{} [{:?}] {}", ts, entry.level, msg);
1738            }
1739        });
1740        Ok(())
1741    }
1742
1743    fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1744        // By default, do nothing.
1745        Ok(())
1746    }
1747
1748    fn process_error(
1749        &self,
1750        _component_id: ComponentId,
1751        _step: CuComponentState,
1752        _error: &CuError,
1753    ) -> Decision {
1754        // By default, just try to continue.
1755        Decision::Ignore
1756    }
1757
1758    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1759        #[cfg(all(feature = "std", debug_assertions))]
1760        unregister_live_log_listener();
1761        Ok(())
1762    }
1763}
1764
1765macro_rules! impl_monitor_tuple {
1766    ($($idx:tt => $name:ident),+) => {
1767        impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1768            fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1769            where
1770                Self: Sized,
1771            {
1772                Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1773            }
1774
1775            fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1776                $(self.$idx.start(ctx)?;)+
1777                Ok(())
1778            }
1779
1780            fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1781                $(self.$idx.process_copperlist(ctx, view)?;)+
1782                Ok(())
1783            }
1784
1785            fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1786                $(self.$idx.observe_copperlist_io(stats);)+
1787            }
1788
1789            fn process_error(
1790                &self,
1791                component_id: ComponentId,
1792                step: CuComponentState,
1793                error: &CuError,
1794            ) -> Decision {
1795                let mut decision = Decision::Ignore;
1796                $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1797                decision
1798            }
1799
1800            fn process_panic(&self, panic_message: &str) {
1801                $(self.$idx.process_panic(panic_message);)+
1802            }
1803
1804            fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1805                $(self.$idx.stop(ctx)?;)+
1806                Ok(())
1807            }
1808        }
1809    };
1810}
1811
1812impl_monitor_tuple!(0 => M0, 1 => M1);
1813impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1814impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1815impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1816impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1817
1818#[cfg(feature = "std")]
1819pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1820    if let Some(msg) = payload.downcast_ref::<&str>() {
1821        (*msg).to_string()
1822    } else if let Some(msg) = payload.downcast_ref::<String>() {
1823        msg.clone()
1824    } else {
1825        "panic with non-string payload".to_string()
1826    }
1827}
1828
1829/// A simple allocator that counts the number of bytes allocated and deallocated.
1830pub struct CountingAlloc<A: GlobalAlloc> {
1831    inner: A,
1832    allocated: AtomicUsize,
1833    deallocated: AtomicUsize,
1834}
1835
1836impl<A: GlobalAlloc> CountingAlloc<A> {
1837    pub const fn new(inner: A) -> Self {
1838        CountingAlloc {
1839            inner,
1840            allocated: AtomicUsize::new(0),
1841            deallocated: AtomicUsize::new(0),
1842        }
1843    }
1844
1845    pub fn allocated(&self) -> usize {
1846        self.allocated.load(Ordering::SeqCst)
1847    }
1848
1849    pub fn deallocated(&self) -> usize {
1850        self.deallocated.load(Ordering::SeqCst)
1851    }
1852
1853    pub fn reset(&self) {
1854        self.allocated.store(0, Ordering::SeqCst);
1855        self.deallocated.store(0, Ordering::SeqCst);
1856    }
1857}
1858
1859// SAFETY: Delegates allocation/deallocation to the inner allocator while tracking sizes.
1860unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1861    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1862    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1863        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1864        let p = unsafe { self.inner.alloc(layout) };
1865        if !p.is_null() {
1866            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1867        }
1868        p
1869    }
1870
1871    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1872    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1873        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1874        unsafe { self.inner.dealloc(ptr, layout) }
1875        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1876    }
1877}
1878
1879/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
1880#[cfg(feature = "memory_monitoring")]
1881pub struct ScopedAllocCounter {
1882    bf_allocated: usize,
1883    bf_deallocated: usize,
1884}
1885
1886#[cfg(feature = "memory_monitoring")]
1887impl Default for ScopedAllocCounter {
1888    fn default() -> Self {
1889        Self::new()
1890    }
1891}
1892
1893#[cfg(feature = "memory_monitoring")]
1894impl ScopedAllocCounter {
1895    pub fn new() -> Self {
1896        ScopedAllocCounter {
1897            bf_allocated: GLOBAL.allocated(),
1898            bf_deallocated: GLOBAL.deallocated(),
1899        }
1900    }
1901
1902    /// Returns the total number of bytes allocated in the current scope
1903    /// since the creation of this `ScopedAllocCounter`.
1904    ///
1905    /// # Example
1906    /// ```
1907    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1908    ///
1909    /// let counter = ScopedAllocCounter::new();
1910    /// let _vec = vec![0u8; 1024];
1911    /// println!("Bytes allocated: {}", counter.get_allocated());
1912    /// ```
1913    pub fn allocated(&self) -> usize {
1914        GLOBAL.allocated() - self.bf_allocated
1915    }
1916
1917    /// Returns the total number of bytes deallocated in the current scope
1918    /// since the creation of this `ScopedAllocCounter`.
1919    ///
1920    /// # Example
1921    /// ```
1922    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1923    ///
1924    /// let counter = ScopedAllocCounter::new();
1925    /// let _vec = vec![0u8; 1024];
1926    /// drop(_vec);
1927    /// println!("Bytes deallocated: {}", counter.get_deallocated());
1928    /// ```
1929    pub fn deallocated(&self) -> usize {
1930        GLOBAL.deallocated() - self.bf_deallocated
1931    }
1932}
1933
1934/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
1935#[cfg(feature = "memory_monitoring")]
1936impl Drop for ScopedAllocCounter {
1937    fn drop(&mut self) {
1938        let _allocated = GLOBAL.allocated() - self.bf_allocated;
1939        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1940        // TODO(gbin): Fix this when the logger is ready.
1941        // debug!(
1942        //     "Allocations: +{}B -{}B",
1943        //     allocated = allocated,
1944        //     deallocated = deallocated,
1945        // );
1946    }
1947}
1948
1949#[cfg(feature = "std")]
1950const BUCKET_COUNT: usize = 1024;
1951#[cfg(not(feature = "std"))]
1952const BUCKET_COUNT: usize = 256;
1953
1954/// Accumulative stat object that can give your some real time statistics.
1955/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
1956#[derive(Debug, Clone)]
1957pub struct LiveStatistics {
1958    buckets: [u64; BUCKET_COUNT],
1959    min_val: u64,
1960    max_val: u64,
1961    sum: u128,
1962    sum_sq: u128,
1963    count: u64,
1964    max_value: u64,
1965}
1966
1967impl LiveStatistics {
1968    /// Creates a new `LiveStatistics` instance with a specified maximum value.
1969    ///
1970    /// This function initializes a `LiveStatistics` structure with default values
1971    /// for tracking statistical data, while setting an upper limit for the data
1972    /// points that the structure tracks.
1973    ///
1974    /// # Parameters
1975    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
1976    ///
1977    /// # Returns
1978    /// A new instance of `LiveStatistics` with:
1979    /// - `buckets`: An array pre-filled with zeros to categorize data points.
1980    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
1981    /// - `max_val`: Initialized to zero.
1982    /// - `sum`: The sum of all data points, initialized to zero.
1983    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
1984    /// - `count`: The total number of data points, initialized to zero.
1985    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
1986    ///
1987    pub fn new_with_max(max_value: u64) -> Self {
1988        LiveStatistics {
1989            buckets: [0; BUCKET_COUNT],
1990            min_val: u64::MAX,
1991            max_val: 0,
1992            sum: 0,
1993            sum_sq: 0,
1994            count: 0,
1995            max_value,
1996        }
1997    }
1998
1999    #[inline]
2000    fn value_to_bucket(&self, value: u64) -> usize {
2001        if value >= self.max_value {
2002            BUCKET_COUNT - 1
2003        } else {
2004            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
2005        }
2006    }
2007
2008    #[inline]
2009    pub fn min(&self) -> u64 {
2010        if self.count == 0 { 0 } else { self.min_val }
2011    }
2012
2013    #[inline]
2014    pub fn max(&self) -> u64 {
2015        self.max_val
2016    }
2017
2018    #[inline]
2019    pub fn mean(&self) -> f64 {
2020        if self.count == 0 {
2021            0.0
2022        } else {
2023            self.sum as f64 / self.count as f64
2024        }
2025    }
2026
2027    #[inline]
2028    pub fn stdev(&self) -> f64 {
2029        if self.count == 0 {
2030            return 0.0;
2031        }
2032        let mean = self.mean();
2033        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
2034        if variance < 0.0 {
2035            return 0.0;
2036        }
2037        #[cfg(feature = "std")]
2038        return variance.sqrt();
2039        #[cfg(not(feature = "std"))]
2040        return sqrt(variance);
2041    }
2042
2043    #[inline]
2044    pub fn percentile(&self, percentile: f64) -> u64 {
2045        if self.count == 0 {
2046            return 0;
2047        }
2048
2049        let target_count = (self.count as f64 * percentile) as u64;
2050        let mut accumulated = 0u64;
2051
2052        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
2053            accumulated += bucket_count;
2054            if accumulated >= target_count {
2055                // Linear interpolation within the bucket
2056                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
2057                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
2058                let bucket_fraction = if bucket_count > 0 {
2059                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
2060                } else {
2061                    0.5
2062                };
2063                return bucket_start
2064                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
2065            }
2066        }
2067
2068        self.max_val
2069    }
2070
2071    /// Adds a value to the statistics.
2072    #[inline]
2073    pub fn record(&mut self, value: u64) {
2074        if value < self.min_val {
2075            self.min_val = value;
2076        }
2077        if value > self.max_val {
2078            self.max_val = value;
2079        }
2080        let value_u128 = value as u128;
2081        self.sum += value_u128;
2082        self.sum_sq += value_u128 * value_u128;
2083        self.count += 1;
2084
2085        let bucket = self.value_to_bucket(value);
2086        self.buckets[bucket] += 1;
2087    }
2088
2089    #[inline]
2090    pub fn len(&self) -> u64 {
2091        self.count
2092    }
2093
2094    #[inline]
2095    pub fn is_empty(&self) -> bool {
2096        self.count == 0
2097    }
2098
2099    #[inline]
2100    pub fn reset(&mut self) {
2101        self.buckets.fill(0);
2102        self.min_val = u64::MAX;
2103        self.max_val = 0;
2104        self.sum = 0;
2105        self.sum_sq = 0;
2106        self.count = 0;
2107    }
2108}
2109
2110/// A Specialized statistics object for CuDuration.
2111/// It will also keep track of the jitter between the values.
2112#[derive(Debug, Clone)]
2113pub struct CuDurationStatistics {
2114    bare: LiveStatistics,
2115    jitter: LiveStatistics,
2116    last_value: CuDuration,
2117}
2118
2119impl CuDurationStatistics {
2120    pub fn new(max: CuDuration) -> Self {
2121        let CuDuration(max) = max;
2122        CuDurationStatistics {
2123            bare: LiveStatistics::new_with_max(max),
2124            jitter: LiveStatistics::new_with_max(max),
2125            last_value: CuDuration::default(),
2126        }
2127    }
2128
2129    #[inline]
2130    pub fn min(&self) -> CuDuration {
2131        CuDuration(self.bare.min())
2132    }
2133
2134    #[inline]
2135    pub fn max(&self) -> CuDuration {
2136        CuDuration(self.bare.max())
2137    }
2138
2139    #[inline]
2140    pub fn mean(&self) -> CuDuration {
2141        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
2142    }
2143
2144    #[inline]
2145    pub fn percentile(&self, percentile: f64) -> CuDuration {
2146        CuDuration(self.bare.percentile(percentile))
2147    }
2148
2149    #[inline]
2150    pub fn stddev(&self) -> CuDuration {
2151        CuDuration(self.bare.stdev() as u64)
2152    }
2153
2154    #[inline]
2155    pub fn len(&self) -> u64 {
2156        self.bare.len()
2157    }
2158
2159    #[inline]
2160    pub fn is_empty(&self) -> bool {
2161        self.bare.len() == 0
2162    }
2163
2164    #[inline]
2165    pub fn jitter_min(&self) -> CuDuration {
2166        CuDuration(self.jitter.min())
2167    }
2168
2169    #[inline]
2170    pub fn jitter_max(&self) -> CuDuration {
2171        CuDuration(self.jitter.max())
2172    }
2173
2174    #[inline]
2175    pub fn jitter_mean(&self) -> CuDuration {
2176        CuDuration(self.jitter.mean() as u64)
2177    }
2178
2179    #[inline]
2180    pub fn jitter_stddev(&self) -> CuDuration {
2181        CuDuration(self.jitter.stdev() as u64)
2182    }
2183
2184    #[inline]
2185    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
2186        CuDuration(self.jitter.percentile(percentile))
2187    }
2188
2189    #[inline]
2190    pub fn record(&mut self, value: CuDuration) {
2191        let CuDuration(nanos) = value;
2192        if self.bare.is_empty() {
2193            self.bare.record(nanos);
2194            self.last_value = value;
2195            return;
2196        }
2197        self.bare.record(nanos);
2198        let CuDuration(last_nanos) = self.last_value;
2199        self.jitter.record(nanos.abs_diff(last_nanos));
2200        self.last_value = value;
2201    }
2202
2203    #[inline]
2204    pub fn reset(&mut self) {
2205        self.bare.reset();
2206        self.jitter.reset();
2207    }
2208}
2209
2210#[cfg(test)]
2211mod tests {
2212    use super::*;
2213    use core::sync::atomic::{AtomicUsize, Ordering};
2214
2215    #[derive(Clone, Copy)]
2216    enum TestDecision {
2217        Ignore,
2218        Abort,
2219        Shutdown,
2220    }
2221
2222    struct TestMonitor {
2223        decision: TestDecision,
2224        copperlist_calls: AtomicUsize,
2225        panic_calls: AtomicUsize,
2226    }
2227
2228    impl TestMonitor {
2229        fn new_with(decision: TestDecision) -> Self {
2230            Self {
2231                decision,
2232                copperlist_calls: AtomicUsize::new(0),
2233                panic_calls: AtomicUsize::new(0),
2234            }
2235        }
2236    }
2237
2238    fn test_metadata() -> CuMonitoringMetadata {
2239        const COMPONENTS: &[MonitorComponentMetadata] = &[
2240            MonitorComponentMetadata::new("a", ComponentType::Task, None),
2241            MonitorComponentMetadata::new("b", ComponentType::Task, None),
2242        ];
2243        CuMonitoringMetadata::new(
2244            CompactString::from(crate::config::DEFAULT_MISSION_ID),
2245            COMPONENTS,
2246            &[],
2247            CopperListInfo::new(0, 0),
2248            MonitorTopology::default(),
2249            None,
2250        )
2251        .expect("test metadata should be valid")
2252    }
2253
2254    impl CuMonitor for TestMonitor {
2255        fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
2256            let monitor = Self::new_with(TestDecision::Ignore);
2257            #[cfg(feature = "std")]
2258            let _ = runtime.execution_probe();
2259            Ok(monitor)
2260        }
2261
2262        fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
2263            self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
2264            Ok(())
2265        }
2266
2267        fn process_error(
2268            &self,
2269            _component_id: ComponentId,
2270            _step: CuComponentState,
2271            _error: &CuError,
2272        ) -> Decision {
2273            match self.decision {
2274                TestDecision::Ignore => Decision::Ignore,
2275                TestDecision::Abort => Decision::Abort,
2276                TestDecision::Shutdown => Decision::Shutdown,
2277            }
2278        }
2279
2280        fn process_panic(&self, _panic_message: &str) {
2281            self.panic_calls.fetch_add(1, Ordering::SeqCst);
2282        }
2283    }
2284
2285    #[test]
2286    fn test_live_statistics_percentiles() {
2287        let mut stats = LiveStatistics::new_with_max(1000);
2288
2289        // Record 100 values from 0 to 99
2290        for i in 0..100 {
2291            stats.record(i);
2292        }
2293
2294        assert_eq!(stats.len(), 100);
2295        assert_eq!(stats.min(), 0);
2296        assert_eq!(stats.max(), 99);
2297        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
2298
2299        // Test percentiles - should be approximately correct
2300        let p50 = stats.percentile(0.5);
2301        let p90 = stats.percentile(0.90);
2302        let p95 = stats.percentile(0.95);
2303        let p99 = stats.percentile(0.99);
2304
2305        // With 100 samples from 0-99, percentiles should be close to their index
2306        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
2307        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
2308        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
2309        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
2310    }
2311
2312    #[test]
2313    fn test_duration_stats() {
2314        let mut stats = CuDurationStatistics::new(CuDuration(1000));
2315        stats.record(CuDuration(100));
2316        stats.record(CuDuration(200));
2317        stats.record(CuDuration(500));
2318        stats.record(CuDuration(400));
2319        assert_eq!(stats.min(), CuDuration(100));
2320        assert_eq!(stats.max(), CuDuration(500));
2321        assert_eq!(stats.mean(), CuDuration(300));
2322        assert_eq!(stats.len(), 4);
2323        assert_eq!(stats.jitter.len(), 3);
2324        assert_eq!(stats.jitter_min(), CuDuration(100));
2325        assert_eq!(stats.jitter_max(), CuDuration(300));
2326        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
2327        stats.reset();
2328        assert_eq!(stats.len(), 0);
2329    }
2330
2331    #[test]
2332    fn test_duration_stats_large_samples_do_not_overflow() {
2333        let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
2334        stats.record(CuDuration(5_000_000_000));
2335        stats.record(CuDuration(8_000_000_000));
2336
2337        assert_eq!(stats.min(), CuDuration(5_000_000_000));
2338        assert_eq!(stats.max(), CuDuration(8_000_000_000));
2339        assert_eq!(stats.mean(), CuDuration(6_500_000_000));
2340        assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
2341        assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
2342    }
2343
2344    #[test]
2345    fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
2346        let err = CuError::from("boom");
2347
2348        let two = (
2349            TestMonitor::new_with(TestDecision::Ignore),
2350            TestMonitor::new_with(TestDecision::Shutdown),
2351        );
2352        assert!(matches!(
2353            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2354            Decision::Shutdown
2355        ));
2356
2357        let two = (
2358            TestMonitor::new_with(TestDecision::Ignore),
2359            TestMonitor::new_with(TestDecision::Abort),
2360        );
2361        assert!(matches!(
2362            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2363            Decision::Abort
2364        ));
2365    }
2366
2367    #[test]
2368    fn tuple_monitor_fans_out_callbacks() {
2369        let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2370            test_metadata(),
2371            CuMonitoringRuntime::unavailable(),
2372        )
2373        .expect("tuple new");
2374        let (ctx, _clock_control) = CuContext::new_mock_clock();
2375        let empty_view = test_metadata().layout().view(&[]);
2376        monitors
2377            .process_copperlist(&ctx, empty_view)
2378            .expect("process_copperlist should fan out");
2379        monitors.process_panic("panic marker");
2380
2381        assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2382        assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2383        assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2384        assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2385    }
2386
2387    fn encoded_size<E: Encode>(value: &E) -> usize {
2388        let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2389        value
2390            .encode(&mut encoder)
2391            .expect("size measurement encoder should not fail");
2392        encoder.into_writer().bytes_written
2393    }
2394
2395    #[test]
2396    fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2397        let payload = vec![1u8, 2, 3, 4];
2398        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2399
2400        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2401        assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2402        assert_eq!(io.handle_bytes, 0);
2403    }
2404
2405    #[test]
2406    fn payload_io_stats_tracks_handle_backed_storage() {
2407        let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2408        let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2409
2410        assert_eq!(io.encoded_bytes, encoded_size(&payload));
2411        assert_eq!(
2412            io.resident_bytes,
2413            core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2414        );
2415        assert_eq!(io.handle_bytes, 32);
2416    }
2417
2418    #[test]
2419    fn runtime_execution_probe_roundtrip_marker() {
2420        let probe = RuntimeExecutionProbe::default();
2421        assert!(probe.marker().is_none());
2422        assert_eq!(probe.sequence(), 0);
2423
2424        probe.record(ExecutionMarker {
2425            component_id: ComponentId::new(7),
2426            step: CuComponentState::Process,
2427            culistid: Some(42),
2428        });
2429
2430        let marker = probe.marker().expect("marker should be available");
2431        assert_eq!(marker.component_id, ComponentId::new(7));
2432        assert!(matches!(marker.step, CuComponentState::Process));
2433        assert_eq!(marker.culistid, Some(42));
2434        assert_eq!(probe.sequence(), 1);
2435    }
2436}