Skip to main content

cu29_runtime/
monitoring.rs

1//! Some basic internal monitoring tooling Copper uses to monitor itself and the components it runs.
2//!
3
4use crate::config::CuConfig;
5use crate::config::{
6    BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7};
8use crate::context::CuContext;
9use crate::cutask::CuMsgMetadata;
10use compact_str::CompactString;
11use cu29_clock::CuDuration;
12#[allow(unused_imports)]
13use cu29_log::CuLogLevel;
14#[cfg(all(feature = "std", debug_assertions))]
15use cu29_log_runtime::{
16    format_message_only, register_live_log_listener, unregister_live_log_listener,
17};
18use cu29_traits::{CuError, CuResult};
19use serde_derive::{Deserialize, Serialize};
20
21#[cfg(not(feature = "std"))]
22extern crate alloc;
23
24#[cfg(feature = "std")]
25use std::sync::Arc;
26#[cfg(feature = "std")]
27use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
28
29#[cfg(not(feature = "std"))]
30use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
31#[cfg(not(target_has_atomic = "64"))]
32use spin::Mutex;
33
34#[cfg(not(feature = "std"))]
35mod imp {
36    pub use alloc::alloc::{GlobalAlloc, Layout};
37    #[cfg(target_has_atomic = "64")]
38    pub use core::sync::atomic::AtomicU64;
39    pub use core::sync::atomic::{AtomicUsize, Ordering};
40    pub use libm::sqrt;
41}
42
43#[cfg(feature = "std")]
44mod imp {
45    #[cfg(feature = "memory_monitoring")]
46    use super::CountingAlloc;
47    #[cfg(feature = "memory_monitoring")]
48    pub use std::alloc::System;
49    pub use std::alloc::{GlobalAlloc, Layout};
50    #[cfg(target_has_atomic = "64")]
51    pub use std::sync::atomic::AtomicU64;
52    pub use std::sync::atomic::{AtomicUsize, Ordering};
53    #[cfg(feature = "memory_monitoring")]
54    #[global_allocator]
55    pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
56}
57
58use imp::*;
59
60#[cfg(all(feature = "std", debug_assertions))]
61fn format_timestamp(time: CuDuration) -> String {
62    // Render CuTime/CuDuration as HH:mm:ss.xxxx (4 fractional digits of a second).
63    let nanos = time.as_nanos();
64    let total_seconds = nanos / 1_000_000_000;
65    let hours = total_seconds / 3600;
66    let minutes = (total_seconds / 60) % 60;
67    let seconds = total_seconds % 60;
68    let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
69    format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
70}
71
72/// Lifecycle state of a monitored component.
73#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
74pub enum CuComponentState {
75    Start,
76    Preprocess,
77    Process,
78    Postprocess,
79    Stop,
80}
81
82/// Strongly-typed index into [`CuMonitoringMetadata::components`].
83#[repr(transparent)]
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
85pub struct ComponentId(usize);
86
87impl ComponentId {
88    pub const INVALID: Self = Self(usize::MAX);
89
90    #[inline]
91    pub const fn new(index: usize) -> Self {
92        Self(index)
93    }
94
95    #[inline]
96    pub const fn index(self) -> usize {
97        self.0
98    }
99}
100
101impl core::fmt::Display for ComponentId {
102    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
103        self.0.fmt(f)
104    }
105}
106
107impl From<ComponentId> for usize {
108    fn from(value: ComponentId) -> Self {
109        value.index()
110    }
111}
112
113/// Strongly-typed CopperList slot index.
114#[repr(transparent)]
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
116pub struct CuListSlot(usize);
117
118impl CuListSlot {
119    #[inline]
120    pub const fn new(index: usize) -> Self {
121        Self(index)
122    }
123
124    #[inline]
125    pub const fn index(self) -> usize {
126        self.0
127    }
128}
129
130impl From<CuListSlot> for usize {
131    fn from(value: CuListSlot) -> Self {
132        value.index()
133    }
134}
135
136/// Static monitor-side CopperList indexing layout.
137///
138/// This layout is mission/runtime scoped and remains constant after monitor construction.
139#[derive(Debug, Clone, Copy)]
140pub struct CopperListLayout {
141    components: &'static [MonitorComponentMetadata],
142    slot_to_component: &'static [ComponentId],
143}
144
145impl CopperListLayout {
146    #[inline]
147    pub const fn new(
148        components: &'static [MonitorComponentMetadata],
149        slot_to_component: &'static [ComponentId],
150    ) -> Self {
151        Self {
152            components,
153            slot_to_component,
154        }
155    }
156
157    #[inline]
158    pub const fn components(self) -> &'static [MonitorComponentMetadata] {
159        self.components
160    }
161
162    #[inline]
163    pub const fn component_count(self) -> usize {
164        self.components.len()
165    }
166
167    #[inline]
168    pub const fn culist_slot_count(self) -> usize {
169        self.slot_to_component.len()
170    }
171
172    #[inline]
173    pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
174        &self.components[id.index()]
175    }
176
177    #[inline]
178    pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
179        self.slot_to_component[culist_slot.index()]
180    }
181
182    #[inline]
183    pub const fn slot_to_component(self) -> &'static [ComponentId] {
184        self.slot_to_component
185    }
186
187    #[inline]
188    pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
189        CopperListView::new(self, msgs)
190    }
191}
192
193/// Per-loop monitor view over CopperList metadata paired with static component mapping.
194#[derive(Debug, Clone, Copy)]
195pub struct CopperListView<'a> {
196    layout: CopperListLayout,
197    msgs: &'a [&'a CuMsgMetadata],
198}
199
200impl<'a> CopperListView<'a> {
201    #[inline]
202    pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
203        assert_eq!(
204            msgs.len(),
205            layout.culist_slot_count(),
206            "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
207            msgs.len(),
208            layout.culist_slot_count()
209        );
210        Self { layout, msgs }
211    }
212
213    #[inline]
214    pub const fn layout(self) -> CopperListLayout {
215        self.layout
216    }
217
218    #[inline]
219    pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
220        self.msgs
221    }
222
223    #[inline]
224    pub const fn len(self) -> usize {
225        self.msgs.len()
226    }
227
228    #[inline]
229    pub const fn is_empty(self) -> bool {
230        self.msgs.is_empty()
231    }
232
233    #[inline]
234    pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
235        let index = culist_slot.index();
236        CopperListEntry {
237            culist_slot,
238            component_id: self.layout.component_for_slot(culist_slot),
239            msg: self.msgs[index],
240        }
241    }
242
243    pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
244        self.msgs.iter().enumerate().map(move |(idx, msg)| {
245            let culist_slot = CuListSlot::new(idx);
246            CopperListEntry {
247                culist_slot,
248                component_id: self.layout.component_for_slot(culist_slot),
249                msg,
250            }
251        })
252    }
253}
254
255/// One message entry in CopperList slot order with resolved component identity.
256#[derive(Debug, Clone, Copy)]
257pub struct CopperListEntry<'a> {
258    pub culist_slot: CuListSlot,
259    pub component_id: ComponentId,
260    pub msg: &'a CuMsgMetadata,
261}
262
263impl<'a> CopperListEntry<'a> {
264    #[inline]
265    pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
266        layout.component(self.component_id)
267    }
268
269    #[inline]
270    pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
271        layout.component(self.component_id).kind()
272    }
273}
274
275/// Execution progress marker emitted by the runtime before running a component step.
276#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
277pub struct ExecutionMarker {
278    /// Index into `CuMonitoringMetadata::components()`.
279    pub component_id: ComponentId,
280    /// Lifecycle phase currently entered.
281    pub step: CuComponentState,
282    /// CopperList id when available (runtime loop), None during start/stop.
283    pub culistid: Option<u64>,
284}
285
286/// Lock-free runtime-side progress probe.
287///
288/// The runtime writes execution markers directly into this probe from the hot path
289/// (without calling monitor fan-out callbacks), and monitors can read a coherent
290/// snapshot from watchdog threads when diagnosing stalls.
291#[derive(Debug)]
292pub struct RuntimeExecutionProbe {
293    component_id: AtomicUsize,
294    step: AtomicUsize,
295    #[cfg(target_has_atomic = "64")]
296    culistid: AtomicU64,
297    #[cfg(target_has_atomic = "64")]
298    culistid_present: AtomicUsize,
299    #[cfg(not(target_has_atomic = "64"))]
300    culistid: Mutex<Option<u64>>,
301    sequence: AtomicUsize,
302}
303
304impl Default for RuntimeExecutionProbe {
305    fn default() -> Self {
306        Self {
307            component_id: AtomicUsize::new(ComponentId::INVALID.index()),
308            step: AtomicUsize::new(0),
309            #[cfg(target_has_atomic = "64")]
310            culistid: AtomicU64::new(0),
311            #[cfg(target_has_atomic = "64")]
312            culistid_present: AtomicUsize::new(0),
313            #[cfg(not(target_has_atomic = "64"))]
314            culistid: Mutex::new(None),
315            sequence: AtomicUsize::new(0),
316        }
317    }
318}
319
320impl RuntimeExecutionProbe {
321    #[inline]
322    pub fn record(&self, marker: ExecutionMarker) {
323        self.component_id
324            .store(marker.component_id.index(), Ordering::Relaxed);
325        self.step
326            .store(component_state_to_usize(marker.step), Ordering::Relaxed);
327        #[cfg(target_has_atomic = "64")]
328        match marker.culistid {
329            Some(culistid) => {
330                self.culistid.store(culistid, Ordering::Relaxed);
331                self.culistid_present.store(1, Ordering::Relaxed);
332            }
333            None => {
334                self.culistid_present.store(0, Ordering::Relaxed);
335            }
336        }
337        #[cfg(not(target_has_atomic = "64"))]
338        {
339            *self.culistid.lock() = marker.culistid;
340        }
341        self.sequence.fetch_add(1, Ordering::Release);
342    }
343
344    #[inline]
345    pub fn sequence(&self) -> usize {
346        self.sequence.load(Ordering::Acquire)
347    }
348
349    #[inline]
350    pub fn marker(&self) -> Option<ExecutionMarker> {
351        // Read a coherent snapshot. A concurrent writer may change values between reads;
352        // in that case we retry to keep the marker and sequence aligned.
353        loop {
354            let seq_before = self.sequence.load(Ordering::Acquire);
355            let component_id = self.component_id.load(Ordering::Relaxed);
356            let step = self.step.load(Ordering::Relaxed);
357            #[cfg(target_has_atomic = "64")]
358            let culistid_present = self.culistid_present.load(Ordering::Relaxed);
359            #[cfg(target_has_atomic = "64")]
360            let culistid_value = self.culistid.load(Ordering::Relaxed);
361            #[cfg(not(target_has_atomic = "64"))]
362            let culistid = *self.culistid.lock();
363            let seq_after = self.sequence.load(Ordering::Acquire);
364            if seq_before == seq_after {
365                if component_id == ComponentId::INVALID.index() {
366                    return None;
367                }
368                let step = usize_to_component_state(step);
369                #[cfg(target_has_atomic = "64")]
370                let culistid = if culistid_present == 0 {
371                    None
372                } else {
373                    Some(culistid_value)
374                };
375                return Some(ExecutionMarker {
376                    component_id: ComponentId::new(component_id),
377                    step,
378                    culistid,
379                });
380            }
381        }
382    }
383}
384
385#[inline]
386const fn component_state_to_usize(step: CuComponentState) -> usize {
387    match step {
388        CuComponentState::Start => 0,
389        CuComponentState::Preprocess => 1,
390        CuComponentState::Process => 2,
391        CuComponentState::Postprocess => 3,
392        CuComponentState::Stop => 4,
393    }
394}
395
396#[inline]
397const fn usize_to_component_state(step: usize) -> CuComponentState {
398    match step {
399        0 => CuComponentState::Start,
400        1 => CuComponentState::Preprocess,
401        2 => CuComponentState::Process,
402        3 => CuComponentState::Postprocess,
403        _ => CuComponentState::Stop,
404    }
405}
406
407#[cfg(feature = "std")]
408pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
409
410/// Platform-neutral monitor view of runtime execution progress.
411///
412/// In `std` builds this can wrap a shared runtime probe. In `no_std` builds it is currently
413/// unavailable and helper methods return `None`/`false`.
414#[derive(Debug, Clone)]
415pub struct MonitorExecutionProbe {
416    #[cfg(feature = "std")]
417    inner: Option<ExecutionProbeHandle>,
418}
419
420impl Default for MonitorExecutionProbe {
421    fn default() -> Self {
422        Self::unavailable()
423    }
424}
425
426impl MonitorExecutionProbe {
427    #[cfg(feature = "std")]
428    pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
429        Self {
430            inner: Some(handle),
431        }
432    }
433
434    pub const fn unavailable() -> Self {
435        Self {
436            #[cfg(feature = "std")]
437            inner: None,
438        }
439    }
440
441    pub fn is_available(&self) -> bool {
442        #[cfg(feature = "std")]
443        {
444            self.inner.is_some()
445        }
446        #[cfg(not(feature = "std"))]
447        {
448            false
449        }
450    }
451
452    pub fn marker(&self) -> Option<ExecutionMarker> {
453        #[cfg(feature = "std")]
454        {
455            self.inner.as_ref().and_then(|probe| probe.marker())
456        }
457        #[cfg(not(feature = "std"))]
458        {
459            None
460        }
461    }
462
463    pub fn sequence(&self) -> Option<usize> {
464        #[cfg(feature = "std")]
465        {
466            self.inner.as_ref().map(|probe| probe.sequence())
467        }
468        #[cfg(not(feature = "std"))]
469        {
470            None
471        }
472    }
473}
474
475/// Runtime component category used by monitoring metadata and topology.
476///
477/// A "task" is a regular Copper task (lifecycle callbacks + payload processing). A "bridge"
478/// is a monitored bridge-side execution component (bridge nodes and channel endpoints).
479#[derive(Debug, Clone, Copy, PartialEq, Eq)]
480#[non_exhaustive]
481pub enum ComponentType {
482    Source,
483    Task,
484    Sink,
485    Bridge,
486}
487
488impl ComponentType {
489    pub const fn is_task(self) -> bool {
490        !matches!(self, Self::Bridge)
491    }
492}
493
494/// Static identity entry for one monitored runtime component.
495#[derive(Debug, Clone, Copy, PartialEq, Eq)]
496pub struct MonitorComponentMetadata {
497    id: &'static str,
498    kind: ComponentType,
499    type_name: Option<&'static str>,
500}
501
502impl MonitorComponentMetadata {
503    pub const fn new(
504        id: &'static str,
505        kind: ComponentType,
506        type_name: Option<&'static str>,
507    ) -> Self {
508        Self {
509            id,
510            kind,
511            type_name,
512        }
513    }
514
515    /// Stable monitor component id (for logs/debug and joins with runtime markers).
516    pub const fn id(&self) -> &'static str {
517        self.id
518    }
519
520    pub const fn kind(&self) -> ComponentType {
521        self.kind
522    }
523
524    /// Rust type label when available (typically tasks); `None` for synthetic bridge entries.
525    pub const fn type_name(&self) -> Option<&'static str> {
526        self.type_name
527    }
528}
529
530/// Immutable runtime-provided metadata passed once to [`CuMonitor::new`].
531///
532/// This bundles identifiers, deterministic component layout, and monitor-specific config so monitor
533/// construction is explicit and does not need ad-hoc late setters.
534#[derive(Debug, Clone)]
535pub struct CuMonitoringMetadata {
536    mission_id: CompactString,
537    layout: CopperListLayout,
538    copperlist_info: CopperListInfo,
539    topology: MonitorTopology,
540    monitor_config: Option<ComponentConfig>,
541}
542
543impl CuMonitoringMetadata {
544    pub fn new(
545        mission_id: CompactString,
546        components: &'static [MonitorComponentMetadata],
547        culist_component_mapping: &'static [ComponentId],
548        copperlist_info: CopperListInfo,
549        topology: MonitorTopology,
550        monitor_config: Option<ComponentConfig>,
551    ) -> CuResult<Self> {
552        Self::validate_components(components)?;
553        Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
554        Ok(Self {
555            mission_id,
556            layout: CopperListLayout::new(components, culist_component_mapping),
557            copperlist_info,
558            topology,
559            monitor_config,
560        })
561    }
562
563    fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
564        let mut seen_bridge = false;
565        for component in components {
566            match component.kind() {
567                component_type if component_type.is_task() && seen_bridge => {
568                    return Err(CuError::from(
569                        "invalid monitor metadata: task-family components must appear before bridges",
570                    ));
571                }
572                ComponentType::Bridge => seen_bridge = true,
573                _ => {}
574            }
575        }
576        Ok(())
577    }
578
579    fn validate_culist_mapping(
580        components_len: usize,
581        culist_component_mapping: &'static [ComponentId],
582    ) -> CuResult<()> {
583        for component_idx in culist_component_mapping {
584            if component_idx.index() >= components_len {
585                return Err(CuError::from(
586                    "invalid monitor metadata: culist mapping points past components table",
587                ));
588            }
589        }
590        Ok(())
591    }
592
593    /// Active mission identifier for this runtime instance.
594    pub fn mission_id(&self) -> &str {
595        self.mission_id.as_str()
596    }
597
598    /// Canonical table of monitored runtime components.
599    ///
600    /// Ordering is deterministic and mission-scoped: tasks first, then bridge-side components.
601    pub fn components(&self) -> &'static [MonitorComponentMetadata] {
602        self.layout.components()
603    }
604
605    /// Total number of monitored components.
606    pub const fn component_count(&self) -> usize {
607        self.layout.component_count()
608    }
609
610    /// Static runtime layout used to map CopperList slots to components.
611    pub const fn layout(&self) -> CopperListLayout {
612        self.layout
613    }
614
615    pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
616        self.layout.component(component_id)
617    }
618
619    pub fn component_id(&self, component_id: ComponentId) -> &'static str {
620        self.component(component_id).id()
621    }
622
623    pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
624        self.component(component_id).kind()
625    }
626
627    pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
628        self.layout
629            .components()
630            .iter()
631            .position(|component| component.id() == component_id)
632            .map(ComponentId::new)
633    }
634
635    /// CopperList slot -> monitored component index mapping.
636    ///
637    /// This table maps each CopperList slot index to the producing component index.
638    pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
639        self.layout.slot_to_component()
640    }
641
642    pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
643        self.layout.component_for_slot(culist_slot)
644    }
645
646    pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
647        self.layout.view(msgs)
648    }
649
650    pub const fn copperlist_info(&self) -> CopperListInfo {
651        self.copperlist_info
652    }
653
654    /// Resolved graph topology for the active mission.
655    ///
656    /// This is always available. Nodes represent config graph nodes, not every synthetic bridge
657    /// channel entry in `components()`.
658    pub fn topology(&self) -> &MonitorTopology {
659        &self.topology
660    }
661
662    pub fn monitor_config(&self) -> Option<&ComponentConfig> {
663        self.monitor_config.as_ref()
664    }
665
666    pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
667        self.monitor_config = monitor_config;
668        self
669    }
670}
671
672/// Runtime-provided dynamic monitoring handles passed once to [`CuMonitor::new`].
673///
674/// This context may expose live runtime state (for example execution progress probes).
675#[derive(Debug, Clone, Default)]
676pub struct CuMonitoringRuntime {
677    execution_probe: MonitorExecutionProbe,
678}
679
680impl CuMonitoringRuntime {
681    pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
682        Self { execution_probe }
683    }
684
685    pub const fn unavailable() -> Self {
686        Self::new(MonitorExecutionProbe::unavailable())
687    }
688
689    pub fn execution_probe(&self) -> &MonitorExecutionProbe {
690        &self.execution_probe
691    }
692}
693
694/// Monitor decision to be taken when a component step errored out.
695#[derive(Debug)]
696pub enum Decision {
697    Abort,    // for a step (stop, start) or a copperlist, just stop trying to process it.
698    Ignore, // Ignore this error and try to continue, ie calling the other component steps, setting a None return value and continue a copperlist.
699    Shutdown, // This is a fatal error, shutdown the copper as cleanly as possible.
700}
701
702fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
703    use Decision::{Abort, Ignore, Shutdown};
704    // Pick the strictest monitor decision when multiple monitors disagree.
705    // Shutdown dominates Abort, which dominates Ignore.
706    match (lhs, rhs) {
707        (Shutdown, _) | (_, Shutdown) => Shutdown,
708        (Abort, _) | (_, Abort) => Abort,
709        _ => Ignore,
710    }
711}
712
713#[derive(Debug, Clone)]
714pub struct MonitorNode {
715    pub id: String,
716    pub type_name: Option<String>,
717    pub kind: ComponentType,
718    /// Ordered list of input port identifiers.
719    pub inputs: Vec<String>,
720    /// Ordered list of output port identifiers.
721    pub outputs: Vec<String>,
722}
723
724#[derive(Debug, Clone)]
725pub struct MonitorConnection {
726    pub src: String,
727    pub src_port: Option<String>,
728    pub dst: String,
729    pub dst_port: Option<String>,
730    pub msg: String,
731}
732
733#[derive(Debug, Clone, Default)]
734pub struct MonitorTopology {
735    pub nodes: Vec<MonitorNode>,
736    pub connections: Vec<MonitorConnection>,
737}
738
739#[derive(Debug, Clone, Copy, Default)]
740pub struct CopperListInfo {
741    pub size_bytes: usize,
742    pub count: usize,
743}
744
745impl CopperListInfo {
746    pub const fn new(size_bytes: usize, count: usize) -> Self {
747        Self { size_bytes, count }
748    }
749}
750
751/// Reported data about CopperList IO for a single iteration.
752#[derive(Debug, Clone, Copy, Default)]
753pub struct CopperListIoStats {
754    /// CopperList struct size in RAM (excluding dynamic payloads/handles)
755    pub raw_culist_bytes: u64,
756    /// Bytes held by payloads that will be serialized (currently: pooled handles, vecs, slices)
757    pub handle_bytes: u64,
758    /// Bytes produced by bincode serialization of the CopperList
759    pub encoded_culist_bytes: u64,
760    /// Bytes produced by bincode serialization of the KeyFrame (0 if none)
761    pub keyframe_bytes: u64,
762    /// Cumulative bytes written to the structured log stream so far
763    pub structured_log_bytes_total: u64,
764    /// CopperList identifier for reference in monitors
765    pub culistid: u64,
766}
767
768/// Lightweight trait to estimate the amount of data a payload will contribute when serialized.
769/// Default implementations return the stack size; specific types override to report dynamic data.
770pub trait CuPayloadSize {
771    /// Total bytes represented by the payload in memory (stack + heap backing).
772    fn raw_bytes(&self) -> usize {
773        core::mem::size_of_val(self)
774    }
775
776    /// Bytes that correspond to reusable/pooled handles (used for IO budgeting).
777    fn handle_bytes(&self) -> usize {
778        0
779    }
780}
781
782impl<T> CuPayloadSize for T
783where
784    T: crate::cutask::CuMsgPayload,
785{
786    fn raw_bytes(&self) -> usize {
787        core::mem::size_of::<T>()
788    }
789}
790
791#[derive(Default, Debug, Clone, Copy)]
792struct NodeIoUsage {
793    has_incoming: bool,
794    has_outgoing: bool,
795}
796
797fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
798    let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
799    edge_ids.sort();
800
801    let mut outputs = Vec::new();
802    let mut seen = Vec::new();
803    let mut port_idx = 0usize;
804    for edge_id in edge_ids {
805        let Some(edge) = graph.edge(edge_id) else {
806            continue;
807        };
808        if seen.iter().any(|msg| msg == &edge.msg) {
809            continue;
810        }
811        seen.push(edge.msg.clone());
812        let mut port_label = String::from("out");
813        port_label.push_str(&port_idx.to_string());
814        port_label.push_str(": ");
815        port_label.push_str(edge.msg.as_str());
816        outputs.push((edge.msg.clone(), port_label));
817        port_idx += 1;
818    }
819    outputs
820}
821
822/// Derive a monitor-friendly topology from the runtime configuration.
823pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
824    let graph = config.get_graph(Some(mission))?;
825    let mut nodes: Map<String, MonitorNode> = Map::new();
826    let mut io_usage: Map<String, NodeIoUsage> = Map::new();
827    let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
828
829    let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
830    for bridge in &config.bridges {
831        bridge_lookup.insert(bridge.id.as_str(), bridge);
832    }
833
834    for cnx in graph.edges() {
835        io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
836        io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
837    }
838
839    for (_, node) in graph.get_all_nodes() {
840        let node_id = node.get_id();
841        let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
842        let kind = match node.get_flavor() {
843            Flavor::Bridge => ComponentType::Bridge,
844            _ if !usage.has_incoming && usage.has_outgoing => ComponentType::Source,
845            _ if usage.has_incoming && !usage.has_outgoing => ComponentType::Sink,
846            _ => ComponentType::Task,
847        };
848
849        let mut inputs = Vec::new();
850        let mut outputs = Vec::new();
851        if kind == ComponentType::Bridge {
852            if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
853                for ch in &bridge.channels {
854                    match ch {
855                        BridgeChannelConfigRepresentation::Rx { id, .. } => {
856                            outputs.push(id.clone())
857                        }
858                        BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
859                    }
860                }
861            }
862        } else {
863            if usage.has_incoming || !usage.has_outgoing {
864                inputs.push("in".to_string());
865            }
866            if usage.has_outgoing {
867                if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
868                    let ports = collect_output_ports(graph, node_idx);
869                    let mut port_map: Map<String, String> = Map::new();
870                    for (msg_type, label) in ports {
871                        port_map.insert(msg_type, label.clone());
872                        outputs.push(label);
873                    }
874                    output_port_lookup.insert(node_id.clone(), port_map);
875                }
876            } else if !usage.has_incoming {
877                outputs.push("out".to_string());
878            }
879        }
880
881        nodes.insert(
882            node_id.clone(),
883            MonitorNode {
884                id: node_id,
885                type_name: Some(node.get_type().to_string()),
886                kind,
887                inputs,
888                outputs,
889            },
890        );
891    }
892
893    let mut connections = Vec::new();
894    for cnx in graph.edges() {
895        let src = cnx.src.clone();
896        let dst = cnx.dst.clone();
897
898        let src_port = cnx.src_channel.clone().or_else(|| {
899            output_port_lookup
900                .get(&src)
901                .and_then(|ports| ports.get(&cnx.msg).cloned())
902                .or_else(|| {
903                    nodes
904                        .get(&src)
905                        .and_then(|node| node.outputs.first().cloned())
906                })
907        });
908        let dst_port = cnx.dst_channel.clone().or_else(|| {
909            nodes
910                .get(&dst)
911                .and_then(|node| node.inputs.first().cloned())
912        });
913
914        connections.push(MonitorConnection {
915            src,
916            src_port,
917            dst,
918            dst_port,
919            msg: cnx.msg.clone(),
920        });
921    }
922
923    Ok(MonitorTopology {
924        nodes: nodes.into_values().collect(),
925        connections,
926    })
927}
928
929/// Runtime monitoring contract implemented by monitor components.
930///
931/// Lifecycle:
932/// 1. [`CuMonitor::new`] is called once at runtime construction time.
933/// 2. [`CuMonitor::start`] is called once before the first runtime iteration.
934/// 3. For each iteration, [`CuMonitor::process_copperlist`] is called after component execution,
935///    then [`CuMonitor::observe_copperlist_io`] after serialization accounting.
936/// 4. [`CuMonitor::process_error`] is called synchronously when a monitored component step fails.
937/// 5. [`CuMonitor::process_panic`] is called when the runtime catches a panic (`std` builds).
938/// 6. [`CuMonitor::stop`] is called once during runtime shutdown.
939///
940/// Indexing model:
941/// - `process_error(component_id, ..)` uses component indices into `metadata.components()`.
942/// - `process_copperlist(..., view)` iterates CopperList slots with resolved component identity.
943///
944/// Error policy:
945/// - [`Decision::Ignore`] continues execution.
946/// - [`Decision::Abort`] aborts the current operation (step/copperlist scope).
947/// - [`Decision::Shutdown`] triggers runtime shutdown.
948pub trait CuMonitor: Sized {
949    /// Construct the monitor once, before component execution starts.
950    ///
951    /// `metadata` contains mission/config/topology/static mapping information.
952    /// `runtime` exposes dynamic runtime handles (for example execution probes).
953    /// Use `metadata.monitor_config()` to decode monitor-specific parameters.
954    fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
955    where
956        Self: Sized;
957
958    /// Called once before processing the first CopperList.
959    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
960        Ok(())
961    }
962
963    /// Called once per processed CopperList after component execution.
964    fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
965
966    /// Called when runtime finishes CopperList serialization/IO accounting.
967    fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
968
969    /// Called when a monitored component step fails; must return an immediate runtime decision.
970    ///
971    /// `component_id` is an index into [`CuMonitoringMetadata::components`].
972    fn process_error(
973        &self,
974        component_id: ComponentId,
975        step: CuComponentState,
976        error: &CuError,
977    ) -> Decision;
978
979    /// Called when the runtime catches a panic (`std` builds).
980    fn process_panic(&self, _panic_message: &str) {}
981
982    /// Called once during runtime shutdown.
983    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
984        Ok(())
985    }
986}
987
988/// A do nothing monitor if no monitor is provided.
989/// This is basically defining the default behavior of Copper in case of error.
990pub struct NoMonitor {}
991impl CuMonitor for NoMonitor {
992    fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
993        Ok(NoMonitor {})
994    }
995
996    fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
997        #[cfg(all(feature = "std", debug_assertions))]
998        register_live_log_listener(|entry, format_str, param_names| {
999            let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1000            let named: Map<String, String> = param_names
1001                .iter()
1002                .zip(params.iter())
1003                .map(|(k, v)| (k.to_string(), v.clone()))
1004                .collect();
1005
1006            if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1007                let ts = format_timestamp(entry.time);
1008                println!("{} [{:?}] {}", ts, entry.level, msg);
1009            }
1010        });
1011        Ok(())
1012    }
1013
1014    fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1015        // By default, do nothing.
1016        Ok(())
1017    }
1018
1019    fn process_error(
1020        &self,
1021        _component_id: ComponentId,
1022        _step: CuComponentState,
1023        _error: &CuError,
1024    ) -> Decision {
1025        // By default, just try to continue.
1026        Decision::Ignore
1027    }
1028
1029    fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1030        #[cfg(all(feature = "std", debug_assertions))]
1031        unregister_live_log_listener();
1032        Ok(())
1033    }
1034}
1035
1036macro_rules! impl_monitor_tuple {
1037    ($($idx:tt => $name:ident),+) => {
1038        impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1039            fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1040            where
1041                Self: Sized,
1042            {
1043                Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1044            }
1045
1046            fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1047                $(self.$idx.start(ctx)?;)+
1048                Ok(())
1049            }
1050
1051            fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1052                $(self.$idx.process_copperlist(ctx, view)?;)+
1053                Ok(())
1054            }
1055
1056            fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1057                $(self.$idx.observe_copperlist_io(stats);)+
1058            }
1059
1060            fn process_error(
1061                &self,
1062                component_id: ComponentId,
1063                step: CuComponentState,
1064                error: &CuError,
1065            ) -> Decision {
1066                let mut decision = Decision::Ignore;
1067                $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1068                decision
1069            }
1070
1071            fn process_panic(&self, panic_message: &str) {
1072                $(self.$idx.process_panic(panic_message);)+
1073            }
1074
1075            fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1076                $(self.$idx.stop(ctx)?;)+
1077                Ok(())
1078            }
1079        }
1080    };
1081}
1082
1083impl_monitor_tuple!(0 => M0, 1 => M1);
1084impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1085impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1086impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1087impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1088
1089#[cfg(feature = "std")]
1090pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1091    if let Some(msg) = payload.downcast_ref::<&str>() {
1092        (*msg).to_string()
1093    } else if let Some(msg) = payload.downcast_ref::<String>() {
1094        msg.clone()
1095    } else {
1096        "panic with non-string payload".to_string()
1097    }
1098}
1099
1100/// A simple allocator that counts the number of bytes allocated and deallocated.
1101pub struct CountingAlloc<A: GlobalAlloc> {
1102    inner: A,
1103    allocated: AtomicUsize,
1104    deallocated: AtomicUsize,
1105}
1106
1107impl<A: GlobalAlloc> CountingAlloc<A> {
1108    pub const fn new(inner: A) -> Self {
1109        CountingAlloc {
1110            inner,
1111            allocated: AtomicUsize::new(0),
1112            deallocated: AtomicUsize::new(0),
1113        }
1114    }
1115
1116    pub fn allocated(&self) -> usize {
1117        self.allocated.load(Ordering::SeqCst)
1118    }
1119
1120    pub fn deallocated(&self) -> usize {
1121        self.deallocated.load(Ordering::SeqCst)
1122    }
1123
1124    pub fn reset(&self) {
1125        self.allocated.store(0, Ordering::SeqCst);
1126        self.deallocated.store(0, Ordering::SeqCst);
1127    }
1128}
1129
1130// SAFETY: Delegates allocation/deallocation to the inner allocator while tracking sizes.
1131unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1132    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1133    unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1134        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1135        let p = unsafe { self.inner.alloc(layout) };
1136        if !p.is_null() {
1137            self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1138        }
1139        p
1140    }
1141
1142    // SAFETY: Callers uphold the GlobalAlloc contract; we delegate to the inner allocator.
1143    unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1144        // SAFETY: Forwarding to the inner allocator preserves GlobalAlloc invariants.
1145        unsafe { self.inner.dealloc(ptr, layout) }
1146        self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1147    }
1148}
1149
1150/// A simple struct that counts the number of bytes allocated and deallocated in a scope.
1151#[cfg(feature = "memory_monitoring")]
1152pub struct ScopedAllocCounter {
1153    bf_allocated: usize,
1154    bf_deallocated: usize,
1155}
1156
1157#[cfg(feature = "memory_monitoring")]
1158impl Default for ScopedAllocCounter {
1159    fn default() -> Self {
1160        Self::new()
1161    }
1162}
1163
1164#[cfg(feature = "memory_monitoring")]
1165impl ScopedAllocCounter {
1166    pub fn new() -> Self {
1167        ScopedAllocCounter {
1168            bf_allocated: GLOBAL.allocated(),
1169            bf_deallocated: GLOBAL.deallocated(),
1170        }
1171    }
1172
1173    /// Returns the total number of bytes allocated in the current scope
1174    /// since the creation of this `ScopedAllocCounter`.
1175    ///
1176    /// # Example
1177    /// ```
1178    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1179    ///
1180    /// let counter = ScopedAllocCounter::new();
1181    /// let _vec = vec![0u8; 1024];
1182    /// println!("Bytes allocated: {}", counter.get_allocated());
1183    /// ```
1184    pub fn allocated(&self) -> usize {
1185        GLOBAL.allocated() - self.bf_allocated
1186    }
1187
1188    /// Returns the total number of bytes deallocated in the current scope
1189    /// since the creation of this `ScopedAllocCounter`.
1190    ///
1191    /// # Example
1192    /// ```
1193    /// use cu29_runtime::monitoring::ScopedAllocCounter;
1194    ///
1195    /// let counter = ScopedAllocCounter::new();
1196    /// let _vec = vec![0u8; 1024];
1197    /// drop(_vec);
1198    /// println!("Bytes deallocated: {}", counter.get_deallocated());
1199    /// ```
1200    pub fn deallocated(&self) -> usize {
1201        GLOBAL.deallocated() - self.bf_deallocated
1202    }
1203}
1204
1205/// Build a difference between the number of bytes allocated and deallocated in the scope at drop time.
1206#[cfg(feature = "memory_monitoring")]
1207impl Drop for ScopedAllocCounter {
1208    fn drop(&mut self) {
1209        let _allocated = GLOBAL.allocated() - self.bf_allocated;
1210        let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1211        // TODO(gbin): Fix this when the logger is ready.
1212        // debug!(
1213        //     "Allocations: +{}B -{}B",
1214        //     allocated = allocated,
1215        //     deallocated = deallocated,
1216        // );
1217    }
1218}
1219
1220#[cfg(feature = "std")]
1221const BUCKET_COUNT: usize = 1024;
1222#[cfg(not(feature = "std"))]
1223const BUCKET_COUNT: usize = 256;
1224
1225/// Accumulative stat object that can give your some real time statistics.
1226/// Uses a fixed-size bucketed histogram for accurate percentile calculations.
1227#[derive(Debug, Clone)]
1228pub struct LiveStatistics {
1229    buckets: [u64; BUCKET_COUNT],
1230    min_val: u64,
1231    max_val: u64,
1232    sum: u64,
1233    sum_sq: u64,
1234    count: u64,
1235    max_value: u64,
1236}
1237
1238impl LiveStatistics {
1239    /// Creates a new `LiveStatistics` instance with a specified maximum value.
1240    ///
1241    /// This function initializes a `LiveStatistics` structure with default values
1242    /// for tracking statistical data, while setting an upper limit for the data
1243    /// points that the structure tracks.
1244    ///
1245    /// # Parameters
1246    /// - `max_value` (`u64`): The maximum value that can be recorded or tracked.
1247    ///
1248    /// # Returns
1249    /// A new instance of `LiveStatistics` with:
1250    /// - `buckets`: An array pre-filled with zeros to categorize data points.
1251    /// - `min_val`: Initialized to the maximum possible `u64` value to track the minimum correctly.
1252    /// - `max_val`: Initialized to zero.
1253    /// - `sum`: The sum of all data points, initialized to zero.
1254    /// - `sum_sq`: The sum of squares of all data points, initialized to zero.
1255    /// - `count`: The total number of data points, initialized to zero.
1256    /// - `max_value`: The maximum allowable value for data points, set to the provided `max_value`.
1257    ///
1258    pub fn new_with_max(max_value: u64) -> Self {
1259        LiveStatistics {
1260            buckets: [0; BUCKET_COUNT],
1261            min_val: u64::MAX,
1262            max_val: 0,
1263            sum: 0,
1264            sum_sq: 0,
1265            count: 0,
1266            max_value,
1267        }
1268    }
1269
1270    #[inline]
1271    fn value_to_bucket(&self, value: u64) -> usize {
1272        if value >= self.max_value {
1273            BUCKET_COUNT - 1
1274        } else {
1275            ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
1276        }
1277    }
1278
1279    #[inline]
1280    pub fn min(&self) -> u64 {
1281        if self.count == 0 { 0 } else { self.min_val }
1282    }
1283
1284    #[inline]
1285    pub fn max(&self) -> u64 {
1286        self.max_val
1287    }
1288
1289    #[inline]
1290    pub fn mean(&self) -> f64 {
1291        if self.count == 0 {
1292            0.0
1293        } else {
1294            self.sum as f64 / self.count as f64
1295        }
1296    }
1297
1298    #[inline]
1299    pub fn stdev(&self) -> f64 {
1300        if self.count == 0 {
1301            return 0.0;
1302        }
1303        let mean = self.mean();
1304        let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
1305        if variance < 0.0 {
1306            return 0.0;
1307        }
1308        #[cfg(feature = "std")]
1309        return variance.sqrt();
1310        #[cfg(not(feature = "std"))]
1311        return sqrt(variance);
1312    }
1313
1314    #[inline]
1315    pub fn percentile(&self, percentile: f64) -> u64 {
1316        if self.count == 0 {
1317            return 0;
1318        }
1319
1320        let target_count = (self.count as f64 * percentile) as u64;
1321        let mut accumulated = 0u64;
1322
1323        for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
1324            accumulated += bucket_count;
1325            if accumulated >= target_count {
1326                // Linear interpolation within the bucket
1327                let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
1328                let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
1329                let bucket_fraction = if bucket_count > 0 {
1330                    (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
1331                } else {
1332                    0.5
1333                };
1334                return bucket_start
1335                    + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
1336            }
1337        }
1338
1339        self.max_val
1340    }
1341
1342    /// Adds a value to the statistics.
1343    #[inline]
1344    pub fn record(&mut self, value: u64) {
1345        if value < self.min_val {
1346            self.min_val = value;
1347        }
1348        if value > self.max_val {
1349            self.max_val = value;
1350        }
1351        self.sum += value;
1352        self.sum_sq += value * value;
1353        self.count += 1;
1354
1355        let bucket = self.value_to_bucket(value);
1356        self.buckets[bucket] += 1;
1357    }
1358
1359    #[inline]
1360    pub fn len(&self) -> u64 {
1361        self.count
1362    }
1363
1364    #[inline]
1365    pub fn is_empty(&self) -> bool {
1366        self.count == 0
1367    }
1368
1369    #[inline]
1370    pub fn reset(&mut self) {
1371        self.buckets.fill(0);
1372        self.min_val = u64::MAX;
1373        self.max_val = 0;
1374        self.sum = 0;
1375        self.sum_sq = 0;
1376        self.count = 0;
1377    }
1378}
1379
1380/// A Specialized statistics object for CuDuration.
1381/// It will also keep track of the jitter between the values.
1382#[derive(Debug, Clone)]
1383pub struct CuDurationStatistics {
1384    bare: LiveStatistics,
1385    jitter: LiveStatistics,
1386    last_value: CuDuration,
1387}
1388
1389impl CuDurationStatistics {
1390    pub fn new(max: CuDuration) -> Self {
1391        let CuDuration(max) = max;
1392        CuDurationStatistics {
1393            bare: LiveStatistics::new_with_max(max),
1394            jitter: LiveStatistics::new_with_max(max),
1395            last_value: CuDuration::default(),
1396        }
1397    }
1398
1399    #[inline]
1400    pub fn min(&self) -> CuDuration {
1401        CuDuration(self.bare.min())
1402    }
1403
1404    #[inline]
1405    pub fn max(&self) -> CuDuration {
1406        CuDuration(self.bare.max())
1407    }
1408
1409    #[inline]
1410    pub fn mean(&self) -> CuDuration {
1411        CuDuration(self.bare.mean() as u64) // CuDuration is in ns, it is ok.
1412    }
1413
1414    #[inline]
1415    pub fn percentile(&self, percentile: f64) -> CuDuration {
1416        CuDuration(self.bare.percentile(percentile))
1417    }
1418
1419    #[inline]
1420    pub fn stddev(&self) -> CuDuration {
1421        CuDuration(self.bare.stdev() as u64)
1422    }
1423
1424    #[inline]
1425    pub fn len(&self) -> u64 {
1426        self.bare.len()
1427    }
1428
1429    #[inline]
1430    pub fn is_empty(&self) -> bool {
1431        self.bare.len() == 0
1432    }
1433
1434    #[inline]
1435    pub fn jitter_min(&self) -> CuDuration {
1436        CuDuration(self.jitter.min())
1437    }
1438
1439    #[inline]
1440    pub fn jitter_max(&self) -> CuDuration {
1441        CuDuration(self.jitter.max())
1442    }
1443
1444    #[inline]
1445    pub fn jitter_mean(&self) -> CuDuration {
1446        CuDuration(self.jitter.mean() as u64)
1447    }
1448
1449    #[inline]
1450    pub fn jitter_stddev(&self) -> CuDuration {
1451        CuDuration(self.jitter.stdev() as u64)
1452    }
1453
1454    #[inline]
1455    pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
1456        CuDuration(self.jitter.percentile(percentile))
1457    }
1458
1459    #[inline]
1460    pub fn record(&mut self, value: CuDuration) {
1461        let CuDuration(nanos) = value;
1462        if self.bare.is_empty() {
1463            self.bare.record(nanos);
1464            self.last_value = value;
1465            return;
1466        }
1467        self.bare.record(nanos);
1468        let CuDuration(last_nanos) = self.last_value;
1469        self.jitter.record(nanos.abs_diff(last_nanos));
1470        self.last_value = value;
1471    }
1472
1473    #[inline]
1474    pub fn reset(&mut self) {
1475        self.bare.reset();
1476        self.jitter.reset();
1477    }
1478}
1479
1480#[cfg(test)]
1481mod tests {
1482    use super::*;
1483    use core::sync::atomic::{AtomicUsize, Ordering};
1484
1485    #[derive(Clone, Copy)]
1486    enum TestDecision {
1487        Ignore,
1488        Abort,
1489        Shutdown,
1490    }
1491
1492    struct TestMonitor {
1493        decision: TestDecision,
1494        copperlist_calls: AtomicUsize,
1495        panic_calls: AtomicUsize,
1496    }
1497
1498    impl TestMonitor {
1499        fn new_with(decision: TestDecision) -> Self {
1500            Self {
1501                decision,
1502                copperlist_calls: AtomicUsize::new(0),
1503                panic_calls: AtomicUsize::new(0),
1504            }
1505        }
1506    }
1507
1508    fn test_metadata() -> CuMonitoringMetadata {
1509        const COMPONENTS: &[MonitorComponentMetadata] = &[
1510            MonitorComponentMetadata::new("a", ComponentType::Task, None),
1511            MonitorComponentMetadata::new("b", ComponentType::Task, None),
1512        ];
1513        CuMonitoringMetadata::new(
1514            CompactString::from(crate::config::DEFAULT_MISSION_ID),
1515            COMPONENTS,
1516            &[],
1517            CopperListInfo::new(0, 0),
1518            MonitorTopology::default(),
1519            None,
1520        )
1521        .expect("test metadata should be valid")
1522    }
1523
1524    impl CuMonitor for TestMonitor {
1525        fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
1526            let monitor = Self::new_with(TestDecision::Ignore);
1527            #[cfg(feature = "std")]
1528            let _ = runtime.execution_probe();
1529            Ok(monitor)
1530        }
1531
1532        fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1533            self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
1534            Ok(())
1535        }
1536
1537        fn process_error(
1538            &self,
1539            _component_id: ComponentId,
1540            _step: CuComponentState,
1541            _error: &CuError,
1542        ) -> Decision {
1543            match self.decision {
1544                TestDecision::Ignore => Decision::Ignore,
1545                TestDecision::Abort => Decision::Abort,
1546                TestDecision::Shutdown => Decision::Shutdown,
1547            }
1548        }
1549
1550        fn process_panic(&self, _panic_message: &str) {
1551            self.panic_calls.fetch_add(1, Ordering::SeqCst);
1552        }
1553    }
1554
1555    #[test]
1556    fn test_live_statistics_percentiles() {
1557        let mut stats = LiveStatistics::new_with_max(1000);
1558
1559        // Record 100 values from 0 to 99
1560        for i in 0..100 {
1561            stats.record(i);
1562        }
1563
1564        assert_eq!(stats.len(), 100);
1565        assert_eq!(stats.min(), 0);
1566        assert_eq!(stats.max(), 99);
1567        assert_eq!(stats.mean() as u64, 49); // Average of 0..99
1568
1569        // Test percentiles - should be approximately correct
1570        let p50 = stats.percentile(0.5);
1571        let p90 = stats.percentile(0.90);
1572        let p95 = stats.percentile(0.95);
1573        let p99 = stats.percentile(0.99);
1574
1575        // With 100 samples from 0-99, percentiles should be close to their index
1576        assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
1577        assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
1578        assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
1579        assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
1580    }
1581
1582    #[test]
1583    fn test_duration_stats() {
1584        let mut stats = CuDurationStatistics::new(CuDuration(1000));
1585        stats.record(CuDuration(100));
1586        stats.record(CuDuration(200));
1587        stats.record(CuDuration(500));
1588        stats.record(CuDuration(400));
1589        assert_eq!(stats.min(), CuDuration(100));
1590        assert_eq!(stats.max(), CuDuration(500));
1591        assert_eq!(stats.mean(), CuDuration(300));
1592        assert_eq!(stats.len(), 4);
1593        assert_eq!(stats.jitter.len(), 3);
1594        assert_eq!(stats.jitter_min(), CuDuration(100));
1595        assert_eq!(stats.jitter_max(), CuDuration(300));
1596        assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
1597        stats.reset();
1598        assert_eq!(stats.len(), 0);
1599    }
1600
1601    #[test]
1602    fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
1603        let err = CuError::from("boom");
1604
1605        let two = (
1606            TestMonitor::new_with(TestDecision::Ignore),
1607            TestMonitor::new_with(TestDecision::Shutdown),
1608        );
1609        assert!(matches!(
1610            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
1611            Decision::Shutdown
1612        ));
1613
1614        let two = (
1615            TestMonitor::new_with(TestDecision::Ignore),
1616            TestMonitor::new_with(TestDecision::Abort),
1617        );
1618        assert!(matches!(
1619            two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
1620            Decision::Abort
1621        ));
1622    }
1623
1624    #[test]
1625    fn tuple_monitor_fans_out_callbacks() {
1626        let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
1627            test_metadata(),
1628            CuMonitoringRuntime::unavailable(),
1629        )
1630        .expect("tuple new");
1631        let (ctx, _clock_control) = CuContext::new_mock_clock();
1632        let empty_view = test_metadata().layout().view(&[]);
1633        monitors
1634            .process_copperlist(&ctx, empty_view)
1635            .expect("process_copperlist should fan out");
1636        monitors.process_panic("panic marker");
1637
1638        assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
1639        assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
1640        assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
1641        assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
1642    }
1643
1644    #[test]
1645    fn runtime_execution_probe_roundtrip_marker() {
1646        let probe = RuntimeExecutionProbe::default();
1647        assert!(probe.marker().is_none());
1648        assert_eq!(probe.sequence(), 0);
1649
1650        probe.record(ExecutionMarker {
1651            component_id: ComponentId::new(7),
1652            step: CuComponentState::Process,
1653            culistid: Some(42),
1654        });
1655
1656        let marker = probe.marker().expect("marker should be available");
1657        assert_eq!(marker.component_id, ComponentId::new(7));
1658        assert!(matches!(marker.step, CuComponentState::Process));
1659        assert_eq!(marker.culistid, Some(42));
1660        assert_eq!(probe.sequence(), 1);
1661    }
1662}