1use crate::config::CuConfig;
5use crate::config::{
6 BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7 TaskKind, resolve_task_kind_for_id,
8};
9use crate::context::CuContext;
10use crate::cutask::CuMsgMetadata;
11use bincode::Encode;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SizeWriter;
15use compact_str::CompactString;
16use cu29_clock::CuDuration;
17#[allow(unused_imports)]
18use cu29_log::CuLogLevel;
19#[cfg(all(feature = "std", debug_assertions))]
20use cu29_log_runtime::{
21 format_message_only, register_live_log_listener, unregister_live_log_listener,
22};
23use cu29_traits::{
24 CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
25 finish_observed_encode,
26};
27use portable_atomic::{
28 AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
29};
30use serde_derive::{Deserialize, Serialize};
31
32#[cfg(not(feature = "std"))]
33extern crate alloc;
34
35#[cfg(feature = "std")]
36use core::cell::Cell;
37#[cfg(feature = "std")]
38use std::backtrace::Backtrace;
39#[cfg(feature = "std")]
40use std::fs::File;
41#[cfg(feature = "std")]
42use std::io::Write;
43#[cfg(feature = "std")]
44use std::panic::PanicHookInfo;
45#[cfg(feature = "std")]
46use std::sync::{Arc, Mutex as StdMutex, OnceLock};
47#[cfg(feature = "std")]
48use std::thread_local;
49#[cfg(feature = "std")]
50use std::time::{SystemTime, UNIX_EPOCH};
51#[cfg(feature = "std")]
52use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
53
54#[cfg(not(feature = "std"))]
55use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
56#[cfg(not(target_has_atomic = "64"))]
57use spin::Mutex;
58#[cfg(not(feature = "std"))]
59use spin::Mutex as SpinMutex;
60
61#[cfg(not(feature = "std"))]
62mod imp {
63 pub use alloc::alloc::{GlobalAlloc, Layout};
64 #[cfg(target_has_atomic = "64")]
65 pub use core::sync::atomic::AtomicU64;
66 pub use core::sync::atomic::{AtomicUsize, Ordering};
67 pub use libm::sqrt;
68}
69
70#[cfg(feature = "std")]
71mod imp {
72 #[cfg(feature = "memory_monitoring")]
73 use super::CountingAlloc;
74 #[cfg(feature = "memory_monitoring")]
75 pub use std::alloc::System;
76 pub use std::alloc::{GlobalAlloc, Layout};
77 #[cfg(target_has_atomic = "64")]
78 pub use std::sync::atomic::AtomicU64;
79 pub use std::sync::atomic::{AtomicUsize, Ordering};
80 #[cfg(feature = "memory_monitoring")]
81 #[global_allocator]
82 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
83}
84
85use imp::*;
86
87#[cfg(all(feature = "std", debug_assertions))]
88fn format_timestamp(time: CuDuration) -> String {
89 let nanos = time.as_nanos();
91 let total_seconds = nanos / 1_000_000_000;
92 let hours = total_seconds / 3600;
93 let minutes = (total_seconds / 60) % 60;
94 let seconds = total_seconds % 60;
95 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
96 format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
97}
98
99#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
101pub enum CuComponentState {
102 Start,
103 Preprocess,
104 Process,
105 Postprocess,
106 Stop,
107}
108
109#[repr(transparent)]
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub struct ComponentId(usize);
113
114impl ComponentId {
115 pub const INVALID: Self = Self(usize::MAX);
116
117 #[inline]
118 pub const fn new(index: usize) -> Self {
119 Self(index)
120 }
121
122 #[inline]
123 pub const fn index(self) -> usize {
124 self.0
125 }
126}
127
128impl core::fmt::Display for ComponentId {
129 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
130 self.0.fmt(f)
131 }
132}
133
134impl From<ComponentId> for usize {
135 fn from(value: ComponentId) -> Self {
136 value.index()
137 }
138}
139
140#[repr(transparent)]
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
143pub struct CuListSlot(usize);
144
145impl CuListSlot {
146 #[inline]
147 pub const fn new(index: usize) -> Self {
148 Self(index)
149 }
150
151 #[inline]
152 pub const fn index(self) -> usize {
153 self.0
154 }
155}
156
157impl From<CuListSlot> for usize {
158 fn from(value: CuListSlot) -> Self {
159 value.index()
160 }
161}
162
163#[derive(Debug, Clone, Copy)]
167pub struct CopperListLayout {
168 components: &'static [MonitorComponentMetadata],
169 slot_to_component: &'static [ComponentId],
170}
171
172impl CopperListLayout {
173 #[inline]
174 pub const fn new(
175 components: &'static [MonitorComponentMetadata],
176 slot_to_component: &'static [ComponentId],
177 ) -> Self {
178 Self {
179 components,
180 slot_to_component,
181 }
182 }
183
184 #[inline]
185 pub const fn components(self) -> &'static [MonitorComponentMetadata] {
186 self.components
187 }
188
189 #[inline]
190 pub const fn component_count(self) -> usize {
191 self.components.len()
192 }
193
194 #[inline]
195 pub const fn culist_slot_count(self) -> usize {
196 self.slot_to_component.len()
197 }
198
199 #[inline]
200 pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
201 &self.components[id.index()]
202 }
203
204 #[inline]
205 pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
206 self.slot_to_component[culist_slot.index()]
207 }
208
209 #[inline]
210 pub const fn slot_to_component(self) -> &'static [ComponentId] {
211 self.slot_to_component
212 }
213
214 #[inline]
215 pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
216 CopperListView::new(self, msgs)
217 }
218}
219
220#[derive(Debug, Clone, Copy)]
222pub struct CopperListView<'a> {
223 layout: CopperListLayout,
224 msgs: &'a [&'a CuMsgMetadata],
225}
226
227impl<'a> CopperListView<'a> {
228 #[inline]
229 pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
230 assert_eq!(
231 msgs.len(),
232 layout.culist_slot_count(),
233 "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
234 msgs.len(),
235 layout.culist_slot_count()
236 );
237 Self { layout, msgs }
238 }
239
240 #[inline]
241 pub const fn layout(self) -> CopperListLayout {
242 self.layout
243 }
244
245 #[inline]
246 pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
247 self.msgs
248 }
249
250 #[inline]
251 pub const fn len(self) -> usize {
252 self.msgs.len()
253 }
254
255 #[inline]
256 pub const fn is_empty(self) -> bool {
257 self.msgs.is_empty()
258 }
259
260 #[inline]
261 pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
262 let index = culist_slot.index();
263 CopperListEntry {
264 culist_slot,
265 component_id: self.layout.component_for_slot(culist_slot),
266 msg: self.msgs[index],
267 }
268 }
269
270 pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
271 self.msgs.iter().enumerate().map(move |(idx, msg)| {
272 let culist_slot = CuListSlot::new(idx);
273 CopperListEntry {
274 culist_slot,
275 component_id: self.layout.component_for_slot(culist_slot),
276 msg,
277 }
278 })
279 }
280}
281
282#[derive(Debug, Clone, Copy)]
284pub struct CopperListEntry<'a> {
285 pub culist_slot: CuListSlot,
286 pub component_id: ComponentId,
287 pub msg: &'a CuMsgMetadata,
288}
289
290impl<'a> CopperListEntry<'a> {
291 #[inline]
292 pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
293 layout.component(self.component_id)
294 }
295
296 #[inline]
297 pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
298 layout.component(self.component_id).kind()
299 }
300}
301
302#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
304pub struct ExecutionMarker {
305 pub component_id: ComponentId,
307 pub step: CuComponentState,
309 pub culistid: Option<u64>,
311}
312
313#[derive(Debug)]
319pub struct RuntimeExecutionProbe {
320 component_id: AtomicUsize,
321 step: AtomicUsize,
322 #[cfg(target_has_atomic = "64")]
323 culistid: AtomicU64,
324 #[cfg(target_has_atomic = "64")]
325 culistid_present: AtomicUsize,
326 #[cfg(not(target_has_atomic = "64"))]
327 culistid: Mutex<Option<u64>>,
328 sequence: AtomicUsize,
329}
330
331impl Default for RuntimeExecutionProbe {
332 fn default() -> Self {
333 Self {
334 component_id: AtomicUsize::new(ComponentId::INVALID.index()),
335 step: AtomicUsize::new(0),
336 #[cfg(target_has_atomic = "64")]
337 culistid: AtomicU64::new(0),
338 #[cfg(target_has_atomic = "64")]
339 culistid_present: AtomicUsize::new(0),
340 #[cfg(not(target_has_atomic = "64"))]
341 culistid: Mutex::new(None),
342 sequence: AtomicUsize::new(0),
343 }
344 }
345}
346
347impl RuntimeExecutionProbe {
348 #[inline]
349 pub fn record(&self, marker: ExecutionMarker) {
350 self.component_id
351 .store(marker.component_id.index(), Ordering::Relaxed);
352 self.step
353 .store(component_state_to_usize(marker.step), Ordering::Relaxed);
354 #[cfg(target_has_atomic = "64")]
355 match marker.culistid {
356 Some(culistid) => {
357 self.culistid.store(culistid, Ordering::Relaxed);
358 self.culistid_present.store(1, Ordering::Relaxed);
359 }
360 None => {
361 self.culistid_present.store(0, Ordering::Relaxed);
362 }
363 }
364 #[cfg(not(target_has_atomic = "64"))]
365 {
366 *self.culistid.lock() = marker.culistid;
367 }
368 self.sequence.fetch_add(1, Ordering::Release);
369 }
370
371 #[inline]
372 pub fn sequence(&self) -> usize {
373 self.sequence.load(Ordering::Acquire)
374 }
375
376 #[inline]
377 pub fn marker(&self) -> Option<ExecutionMarker> {
378 loop {
381 let seq_before = self.sequence.load(Ordering::Acquire);
382 let component_id = self.component_id.load(Ordering::Relaxed);
383 let step = self.step.load(Ordering::Relaxed);
384 #[cfg(target_has_atomic = "64")]
385 let culistid_present = self.culistid_present.load(Ordering::Relaxed);
386 #[cfg(target_has_atomic = "64")]
387 let culistid_value = self.culistid.load(Ordering::Relaxed);
388 #[cfg(not(target_has_atomic = "64"))]
389 let culistid = *self.culistid.lock();
390 let seq_after = self.sequence.load(Ordering::Acquire);
391 if seq_before == seq_after {
392 if component_id == ComponentId::INVALID.index() {
393 return None;
394 }
395 let step = usize_to_component_state(step);
396 #[cfg(target_has_atomic = "64")]
397 let culistid = if culistid_present == 0 {
398 None
399 } else {
400 Some(culistid_value)
401 };
402 return Some(ExecutionMarker {
403 component_id: ComponentId::new(component_id),
404 step,
405 culistid,
406 });
407 }
408 }
409 }
410}
411
412#[inline]
413const fn component_state_to_usize(step: CuComponentState) -> usize {
414 match step {
415 CuComponentState::Start => 0,
416 CuComponentState::Preprocess => 1,
417 CuComponentState::Process => 2,
418 CuComponentState::Postprocess => 3,
419 CuComponentState::Stop => 4,
420 }
421}
422
423#[inline]
424const fn usize_to_component_state(step: usize) -> CuComponentState {
425 match step {
426 0 => CuComponentState::Start,
427 1 => CuComponentState::Preprocess,
428 2 => CuComponentState::Process,
429 3 => CuComponentState::Postprocess,
430 _ => CuComponentState::Stop,
431 }
432}
433
434#[cfg(feature = "std")]
435pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
436
437#[derive(Debug, Clone)]
442pub struct MonitorExecutionProbe {
443 #[cfg(feature = "std")]
444 inner: Option<ExecutionProbeHandle>,
445}
446
447impl Default for MonitorExecutionProbe {
448 fn default() -> Self {
449 Self::unavailable()
450 }
451}
452
453impl MonitorExecutionProbe {
454 #[cfg(feature = "std")]
455 pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
456 Self {
457 inner: Some(handle),
458 }
459 }
460
461 pub const fn unavailable() -> Self {
462 Self {
463 #[cfg(feature = "std")]
464 inner: None,
465 }
466 }
467
468 pub fn is_available(&self) -> bool {
469 #[cfg(feature = "std")]
470 {
471 self.inner.is_some()
472 }
473 #[cfg(not(feature = "std"))]
474 {
475 false
476 }
477 }
478
479 pub fn marker(&self) -> Option<ExecutionMarker> {
480 #[cfg(feature = "std")]
481 {
482 self.inner.as_ref().and_then(|probe| probe.marker())
483 }
484 #[cfg(not(feature = "std"))]
485 {
486 None
487 }
488 }
489
490 pub fn sequence(&self) -> Option<usize> {
491 #[cfg(feature = "std")]
492 {
493 self.inner.as_ref().map(|probe| probe.sequence())
494 }
495 #[cfg(not(feature = "std"))]
496 {
497 None
498 }
499 }
500}
501
502#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507#[non_exhaustive]
508pub enum ComponentType {
509 Source,
510 Task,
511 Sink,
512 Bridge,
513}
514
515impl ComponentType {
516 pub const fn is_task(self) -> bool {
517 !matches!(self, Self::Bridge)
518 }
519}
520
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523pub struct MonitorComponentMetadata {
524 id: &'static str,
525 kind: ComponentType,
526 type_name: Option<&'static str>,
527}
528
529impl MonitorComponentMetadata {
530 pub const fn new(
531 id: &'static str,
532 kind: ComponentType,
533 type_name: Option<&'static str>,
534 ) -> Self {
535 Self {
536 id,
537 kind,
538 type_name,
539 }
540 }
541
542 pub const fn id(&self) -> &'static str {
544 self.id
545 }
546
547 pub const fn kind(&self) -> ComponentType {
548 self.kind
549 }
550
551 pub const fn type_name(&self) -> Option<&'static str> {
553 self.type_name
554 }
555}
556
557#[derive(Debug, Clone)]
562pub struct CuMonitoringMetadata {
563 mission_id: CompactString,
564 subsystem_id: Option<CompactString>,
565 instance_id: u32,
566 layout: CopperListLayout,
567 copperlist_info: CopperListInfo,
568 topology: MonitorTopology,
569 monitor_config: Option<ComponentConfig>,
570}
571
572impl CuMonitoringMetadata {
573 pub fn new(
574 mission_id: CompactString,
575 components: &'static [MonitorComponentMetadata],
576 culist_component_mapping: &'static [ComponentId],
577 copperlist_info: CopperListInfo,
578 topology: MonitorTopology,
579 monitor_config: Option<ComponentConfig>,
580 ) -> CuResult<Self> {
581 Self::validate_components(components)?;
582 Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
583 Ok(Self {
584 mission_id,
585 subsystem_id: None,
586 instance_id: 0,
587 layout: CopperListLayout::new(components, culist_component_mapping),
588 copperlist_info,
589 topology,
590 monitor_config,
591 })
592 }
593
594 fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
595 let mut seen_bridge = false;
596 for component in components {
597 match component.kind() {
598 component_type if component_type.is_task() && seen_bridge => {
599 return Err(CuError::from(
600 "invalid monitor metadata: task-family components must appear before bridges",
601 ));
602 }
603 ComponentType::Bridge => seen_bridge = true,
604 _ => {}
605 }
606 }
607 Ok(())
608 }
609
610 fn validate_culist_mapping(
611 components_len: usize,
612 culist_component_mapping: &'static [ComponentId],
613 ) -> CuResult<()> {
614 for component_idx in culist_component_mapping {
615 if component_idx.index() >= components_len {
616 return Err(CuError::from(
617 "invalid monitor metadata: culist mapping points past components table",
618 ));
619 }
620 }
621 Ok(())
622 }
623
624 pub fn mission_id(&self) -> &str {
626 self.mission_id.as_str()
627 }
628
629 pub fn subsystem_id(&self) -> Option<&str> {
632 self.subsystem_id.as_deref()
633 }
634
635 pub fn instance_id(&self) -> u32 {
637 self.instance_id
638 }
639
640 pub fn components(&self) -> &'static [MonitorComponentMetadata] {
644 self.layout.components()
645 }
646
647 pub const fn component_count(&self) -> usize {
649 self.layout.component_count()
650 }
651
652 pub const fn layout(&self) -> CopperListLayout {
654 self.layout
655 }
656
657 pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
658 self.layout.component(component_id)
659 }
660
661 pub fn component_id(&self, component_id: ComponentId) -> &'static str {
662 self.component(component_id).id()
663 }
664
665 pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
666 self.component(component_id).kind()
667 }
668
669 pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
670 self.layout
671 .components()
672 .iter()
673 .position(|component| component.id() == component_id)
674 .map(ComponentId::new)
675 }
676
677 pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
681 self.layout.slot_to_component()
682 }
683
684 pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
685 self.layout.component_for_slot(culist_slot)
686 }
687
688 pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
689 self.layout.view(msgs)
690 }
691
692 pub const fn copperlist_info(&self) -> CopperListInfo {
693 self.copperlist_info
694 }
695
696 pub fn topology(&self) -> &MonitorTopology {
701 &self.topology
702 }
703
704 pub fn monitor_config(&self) -> Option<&ComponentConfig> {
705 self.monitor_config.as_ref()
706 }
707
708 pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
709 self.monitor_config = monitor_config;
710 self
711 }
712
713 pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
714 self.subsystem_id = subsystem_id.map(CompactString::from);
715 self
716 }
717
718 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
719 self.instance_id = instance_id;
720 self
721 }
722}
723
724#[derive(Debug, Clone, Default)]
728pub struct CuMonitoringRuntime {
729 execution_probe: MonitorExecutionProbe,
730}
731
732impl CuMonitoringRuntime {
733 #[cfg(feature = "std")]
734 pub fn new(execution_probe: MonitorExecutionProbe) -> Self {
735 ensure_runtime_panic_hook_installed();
736 Self { execution_probe }
737 }
738
739 #[cfg(not(feature = "std"))]
740 pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
741 Self { execution_probe }
742 }
743
744 #[cfg(feature = "std")]
745 pub fn unavailable() -> Self {
746 Self::new(MonitorExecutionProbe::unavailable())
747 }
748
749 #[cfg(not(feature = "std"))]
750 pub const fn unavailable() -> Self {
751 Self::new(MonitorExecutionProbe::unavailable())
752 }
753
754 pub fn execution_probe(&self) -> &MonitorExecutionProbe {
755 &self.execution_probe
756 }
757
758 #[cfg(feature = "std")]
759 pub fn register_panic_cleanup<F>(&self, callback: F) -> PanicHookRegistration
760 where
761 F: Fn(&PanicReport) + Send + Sync + 'static,
762 {
763 ensure_runtime_panic_hook_installed();
764 register_panic_cleanup(callback)
765 }
766
767 #[cfg(feature = "std")]
768 pub fn register_panic_action<F>(&self, callback: F) -> PanicHookRegistration
769 where
770 F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
771 {
772 ensure_runtime_panic_hook_installed();
773 register_panic_action(callback)
774 }
775}
776
777#[cfg(feature = "std")]
778type PanicCleanupCallback = Arc<dyn Fn(&PanicReport) + Send + Sync + 'static>;
779#[cfg(feature = "std")]
780type PanicActionCallback = Arc<dyn Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static>;
781
782#[cfg(feature = "std")]
783#[derive(Debug, Clone)]
784pub struct PanicReport {
785 message: String,
786 location: Option<String>,
787 thread_name: Option<String>,
788 backtrace: String,
789 timestamp_unix_ms: u128,
790 crash_report_path: Option<String>,
791}
792
793#[cfg(feature = "std")]
794impl PanicReport {
795 fn capture(info: &PanicHookInfo<'_>) -> Self {
796 let location = info
797 .location()
798 .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()));
799 let thread_name = std::thread::current().name().map(|name| name.to_string());
800 let timestamp_unix_ms = SystemTime::now()
801 .duration_since(UNIX_EPOCH)
802 .map(|dur| dur.as_millis())
803 .unwrap_or(0);
804
805 Self {
806 message: panic_hook_payload_to_string(info),
807 location,
808 thread_name,
809 backtrace: Backtrace::force_capture().to_string(),
810 timestamp_unix_ms,
811 crash_report_path: None,
812 }
813 }
814
815 pub fn message(&self) -> &str {
816 &self.message
817 }
818
819 pub fn location(&self) -> Option<&str> {
820 self.location.as_deref()
821 }
822
823 pub fn thread_name(&self) -> Option<&str> {
824 self.thread_name.as_deref()
825 }
826
827 pub fn backtrace(&self) -> &str {
828 &self.backtrace
829 }
830
831 pub fn timestamp_unix_ms(&self) -> u128 {
832 self.timestamp_unix_ms
833 }
834
835 pub fn crash_report_path(&self) -> Option<&str> {
836 self.crash_report_path.as_deref()
837 }
838
839 pub fn summary(&self) -> String {
840 match self.location() {
841 Some(location) => format!("panic at {location}: {}", self.message()),
842 None => format!("panic: {}", self.message()),
843 }
844 }
845}
846
847#[cfg(feature = "std")]
848#[derive(Clone, Copy, Debug, PartialEq, Eq)]
849enum PanicHookRegistrationKind {
850 Cleanup,
851 Action,
852}
853
854#[cfg(feature = "std")]
855#[derive(Clone)]
856struct RegisteredPanicCleanup {
857 id: usize,
858 callback: PanicCleanupCallback,
859}
860
861#[cfg(feature = "std")]
862#[derive(Clone)]
863struct RegisteredPanicAction {
864 id: usize,
865 callback: PanicActionCallback,
866}
867
868#[cfg(feature = "std")]
869#[derive(Default)]
870struct PanicHookRegistry {
871 cleanup_callbacks: StdMutex<Vec<RegisteredPanicCleanup>>,
872 action_callbacks: StdMutex<Vec<RegisteredPanicAction>>,
873}
874
875#[cfg(feature = "std")]
876#[derive(Debug)]
877pub struct PanicHookRegistration {
878 id: usize,
879 kind: PanicHookRegistrationKind,
880}
881
882#[cfg(feature = "std")]
883impl Drop for PanicHookRegistration {
884 fn drop(&mut self) {
885 unregister_panic_hook(self.kind, self.id);
886 }
887}
888
889#[cfg(feature = "std")]
890static PANIC_HOOK_REGISTRY: OnceLock<PanicHookRegistry> = OnceLock::new();
891#[cfg(feature = "std")]
892static PANIC_HOOK_INSTALL_ONCE: OnceLock<()> = OnceLock::new();
893#[cfg(feature = "std")]
894static PANIC_HOOK_REGISTRATION_ID: AtomicUsize = AtomicUsize::new(1);
895#[cfg(feature = "std")]
896static PANIC_HOOK_ACTIVE_COUNT: AtomicUsize = AtomicUsize::new(0);
897
898#[cfg(feature = "std")]
899fn panic_hook_registry() -> &'static PanicHookRegistry {
900 PANIC_HOOK_REGISTRY.get_or_init(PanicHookRegistry::default)
901}
902
903#[cfg(feature = "std")]
904fn ensure_runtime_panic_hook_installed() {
905 let _ = PANIC_HOOK_INSTALL_ONCE.get_or_init(|| {
906 std::panic::set_hook(Box::new(move |info| {
907 let _guard = PanicHookActiveGuard::new();
908 let mut report = PanicReport::capture(info);
909 run_panic_cleanup_callbacks(&report);
910 report.crash_report_path = write_panic_report_to_file(&report);
911 emit_panic_report(&report);
912
913 if let Some(exit_code) = run_panic_action_callbacks(&report) {
914 std::process::exit(exit_code);
915 }
916 }));
917 });
918}
919
920#[cfg(feature = "std")]
921struct PanicHookActiveGuard;
922
923#[cfg(feature = "std")]
924impl PanicHookActiveGuard {
925 fn new() -> Self {
926 PANIC_HOOK_ACTIVE_COUNT.fetch_add(1, Ordering::SeqCst);
927 Self
928 }
929}
930
931#[cfg(feature = "std")]
932impl Drop for PanicHookActiveGuard {
933 fn drop(&mut self) {
934 PANIC_HOOK_ACTIVE_COUNT.fetch_sub(1, Ordering::SeqCst);
935 }
936}
937
938#[cfg(feature = "std")]
939pub fn runtime_panic_hook_active() -> bool {
940 PANIC_HOOK_ACTIVE_COUNT.load(Ordering::SeqCst) > 0
941}
942
943#[cfg(not(feature = "std"))]
944pub const fn runtime_panic_hook_active() -> bool {
945 false
946}
947
948#[cfg(feature = "std")]
949fn register_panic_cleanup<F>(callback: F) -> PanicHookRegistration
950where
951 F: Fn(&PanicReport) + Send + Sync + 'static,
952{
953 let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
954 let callback = Arc::new(callback) as PanicCleanupCallback;
955 let mut callbacks = panic_hook_registry()
956 .cleanup_callbacks
957 .lock()
958 .unwrap_or_else(|poison| poison.into_inner());
959 callbacks.push(RegisteredPanicCleanup { id, callback });
960 PanicHookRegistration {
961 id,
962 kind: PanicHookRegistrationKind::Cleanup,
963 }
964}
965
966#[cfg(feature = "std")]
967fn register_panic_action<F>(callback: F) -> PanicHookRegistration
968where
969 F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
970{
971 let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
972 let callback = Arc::new(callback) as PanicActionCallback;
973 let mut callbacks = panic_hook_registry()
974 .action_callbacks
975 .lock()
976 .unwrap_or_else(|poison| poison.into_inner());
977 callbacks.push(RegisteredPanicAction { id, callback });
978 PanicHookRegistration {
979 id,
980 kind: PanicHookRegistrationKind::Action,
981 }
982}
983
984#[cfg(feature = "std")]
985fn unregister_panic_hook(kind: PanicHookRegistrationKind, id: usize) {
986 let registry = panic_hook_registry();
987 match kind {
988 PanicHookRegistrationKind::Cleanup => {
989 let mut callbacks = registry
990 .cleanup_callbacks
991 .lock()
992 .unwrap_or_else(|poison| poison.into_inner());
993 callbacks.retain(|entry| entry.id != id);
994 }
995 PanicHookRegistrationKind::Action => {
996 let mut callbacks = registry
997 .action_callbacks
998 .lock()
999 .unwrap_or_else(|poison| poison.into_inner());
1000 callbacks.retain(|entry| entry.id != id);
1001 }
1002 }
1003}
1004
1005#[cfg(feature = "std")]
1006fn run_panic_cleanup_callbacks(report: &PanicReport) {
1007 let callbacks = panic_hook_registry()
1008 .cleanup_callbacks
1009 .lock()
1010 .unwrap_or_else(|poison| poison.into_inner())
1011 .clone();
1012 for entry in callbacks {
1013 (entry.callback)(report);
1014 }
1015}
1016
1017#[cfg(feature = "std")]
1018fn run_panic_action_callbacks(report: &PanicReport) -> Option<i32> {
1019 let callbacks = panic_hook_registry()
1020 .action_callbacks
1021 .lock()
1022 .unwrap_or_else(|poison| poison.into_inner())
1023 .clone();
1024 let mut exit_code = None;
1025 for entry in callbacks {
1026 if exit_code.is_none() {
1027 exit_code = (entry.callback)(report);
1028 } else {
1029 let _ = (entry.callback)(report);
1030 }
1031 }
1032 exit_code
1033}
1034
1035#[cfg(feature = "std")]
1036fn panic_hook_payload_to_string(info: &PanicHookInfo<'_>) -> String {
1037 if let Some(msg) = info.payload().downcast_ref::<&str>() {
1038 (*msg).to_string()
1039 } else if let Some(msg) = info.payload().downcast_ref::<String>() {
1040 msg.clone()
1041 } else {
1042 "panic with non-string payload".to_string()
1043 }
1044}
1045
1046#[cfg(feature = "std")]
1047fn render_panic_report(report: &PanicReport) -> String {
1048 let mut rendered = String::from("Copper panic\n");
1049 rendered.push_str(&format!("time_unix_ms: {}\n", report.timestamp_unix_ms()));
1050 rendered.push_str(&format!(
1051 "thread: {}\n",
1052 report.thread_name().unwrap_or("<unnamed>")
1053 ));
1054 if let Some(location) = report.location() {
1055 rendered.push_str(&format!("location: {location}\n"));
1056 }
1057 rendered.push_str(&format!("message: {}\n", report.message()));
1058 if let Some(path) = report.crash_report_path() {
1059 rendered.push_str(&format!("crash_report: {path}\n"));
1060 }
1061 rendered.push_str("\nBacktrace:\n");
1062 rendered.push_str(report.backtrace());
1063 if !report.backtrace().ends_with('\n') {
1064 rendered.push('\n');
1065 }
1066 rendered
1067}
1068
1069#[cfg(feature = "std")]
1070fn emit_panic_report(report: &PanicReport) {
1071 let mut stderr = std::io::stderr().lock();
1072 let _ = stderr.write_all(render_panic_report(report).as_bytes());
1073 let _ = stderr.flush();
1074}
1075
1076#[cfg(feature = "std")]
1077fn write_panic_report_to_file(report: &PanicReport) -> Option<String> {
1078 let cwd = std::env::current_dir().ok()?;
1079 let file_name = format!(
1080 "copper-crash-{}-{}.txt",
1081 report.timestamp_unix_ms(),
1082 std::process::id()
1083 );
1084 let path = cwd.join(file_name);
1085 let path_string = path.to_string_lossy().to_string();
1086 let mut file = File::create(&path).ok()?;
1087 let mut report_with_path = report.clone();
1088 report_with_path.crash_report_path = Some(path_string.clone());
1089 file.write_all(render_panic_report(&report_with_path).as_bytes())
1090 .ok()?;
1091 file.flush().ok()?;
1092 Some(path_string)
1093}
1094
1095#[derive(Debug)]
1097pub enum Decision {
1098 Abort, Ignore, Shutdown, }
1102
1103fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
1104 use Decision::{Abort, Ignore, Shutdown};
1105 match (lhs, rhs) {
1108 (Shutdown, _) | (_, Shutdown) => Shutdown,
1109 (Abort, _) | (_, Abort) => Abort,
1110 _ => Ignore,
1111 }
1112}
1113
1114#[derive(Debug, Clone)]
1115pub struct MonitorNode {
1116 pub id: String,
1117 pub type_name: Option<String>,
1118 pub kind: ComponentType,
1119 pub inputs: Vec<String>,
1121 pub outputs: Vec<String>,
1123}
1124
1125#[derive(Debug, Clone)]
1126pub struct MonitorConnection {
1127 pub src: String,
1128 pub src_port: Option<String>,
1129 pub dst: String,
1130 pub dst_port: Option<String>,
1131 pub msg: String,
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135pub struct MonitorTopology {
1136 pub nodes: Vec<MonitorNode>,
1137 pub connections: Vec<MonitorConnection>,
1138}
1139
1140#[derive(Debug, Clone, Copy, Default)]
1141pub struct CopperListInfo {
1142 pub size_bytes: usize,
1143 pub count: usize,
1144}
1145
1146impl CopperListInfo {
1147 pub const fn new(size_bytes: usize, count: usize) -> Self {
1148 Self { size_bytes, count }
1149 }
1150}
1151
1152#[derive(Debug, Clone, Copy, Default)]
1154pub struct CopperListIoStats {
1155 pub raw_culist_bytes: u64,
1160 pub handle_bytes: u64,
1166 pub encoded_culist_bytes: u64,
1168 pub keyframe_bytes: u64,
1170 pub structured_log_bytes_total: u64,
1172 pub culistid: u64,
1174}
1175
1176#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1177pub struct PayloadIoStats {
1178 pub resident_bytes: usize,
1179 pub encoded_bytes: usize,
1180 pub handle_bytes: usize,
1181}
1182
1183#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1184pub struct CuMsgIoStats {
1185 pub present: bool,
1186 pub resident_bytes: u64,
1187 pub encoded_bytes: u64,
1188 pub handle_bytes: u64,
1189}
1190
1191struct CuMsgIoEntry {
1192 present: PortableAtomicBool,
1193 resident_bytes: PortableAtomicU64,
1194 encoded_bytes: PortableAtomicU64,
1195 handle_bytes: PortableAtomicU64,
1196}
1197
1198impl CuMsgIoEntry {
1199 fn clear(&self) {
1200 self.present.store(false, PortableOrdering::Release);
1201 self.resident_bytes.store(0, PortableOrdering::Relaxed);
1202 self.encoded_bytes.store(0, PortableOrdering::Relaxed);
1203 self.handle_bytes.store(0, PortableOrdering::Relaxed);
1204 }
1205
1206 fn get(&self) -> CuMsgIoStats {
1207 if !self.present.load(PortableOrdering::Acquire) {
1208 return CuMsgIoStats::default();
1209 }
1210
1211 CuMsgIoStats {
1212 present: true,
1213 resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
1214 encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
1215 handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
1216 }
1217 }
1218
1219 fn set(&self, stats: CuMsgIoStats) {
1220 self.resident_bytes
1221 .store(stats.resident_bytes, PortableOrdering::Relaxed);
1222 self.encoded_bytes
1223 .store(stats.encoded_bytes, PortableOrdering::Relaxed);
1224 self.handle_bytes
1225 .store(stats.handle_bytes, PortableOrdering::Relaxed);
1226 self.present.store(stats.present, PortableOrdering::Release);
1227 }
1228}
1229
1230impl Default for CuMsgIoEntry {
1231 fn default() -> Self {
1232 Self {
1233 present: PortableAtomicBool::new(false),
1234 resident_bytes: PortableAtomicU64::new(0),
1235 encoded_bytes: PortableAtomicU64::new(0),
1236 handle_bytes: PortableAtomicU64::new(0),
1237 }
1238 }
1239}
1240
1241pub struct CuMsgIoCache<const N: usize> {
1242 entries: [CuMsgIoEntry; N],
1243}
1244
1245impl<const N: usize> CuMsgIoCache<N> {
1246 pub fn clear(&self) {
1247 for entry in &self.entries {
1248 entry.clear();
1249 }
1250 }
1251
1252 pub fn get(&self, idx: usize) -> CuMsgIoStats {
1253 self.entries[idx].get()
1254 }
1255
1256 fn raw_parts(&self) -> (usize, usize) {
1257 (self.entries.as_ptr() as usize, N)
1258 }
1259}
1260
1261impl<const N: usize> Default for CuMsgIoCache<N> {
1262 fn default() -> Self {
1263 Self {
1264 entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
1265 }
1266 }
1267}
1268
1269#[derive(Clone, Copy)]
1270struct ActiveCuMsgIoCapture {
1271 cache_addr: usize,
1272 cache_len: usize,
1273 current_slot: Option<usize>,
1274}
1275
1276#[cfg(feature = "std")]
1277thread_local! {
1278 static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
1279 static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
1280 static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
1281}
1282
1283#[cfg(not(feature = "std"))]
1284static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
1285#[cfg(not(feature = "std"))]
1286static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
1287#[cfg(not(feature = "std"))]
1288static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
1289
1290fn begin_payload_io_measurement() {
1291 #[cfg(feature = "std")]
1292 PAYLOAD_HANDLE_BYTES.with(|bytes| {
1293 debug_assert!(
1294 bytes.get().is_none(),
1295 "payload IO byte measurement must not be nested"
1296 );
1297 bytes.set(Some(0));
1298 });
1299
1300 #[cfg(not(feature = "std"))]
1301 {
1302 let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
1303 debug_assert!(
1304 bytes.is_none(),
1305 "payload IO byte measurement must not be nested"
1306 );
1307 *bytes = Some(0);
1308 }
1309}
1310
1311fn finish_payload_io_measurement() -> usize {
1312 #[cfg(feature = "std")]
1313 {
1314 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
1315 }
1316
1317 #[cfg(not(feature = "std"))]
1318 {
1319 PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
1320 }
1321}
1322
1323fn abort_payload_io_measurement() {
1324 #[cfg(feature = "std")]
1325 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
1326
1327 #[cfg(not(feature = "std"))]
1328 {
1329 *PAYLOAD_HANDLE_BYTES.lock() = None;
1330 }
1331}
1332
1333fn current_payload_io_measurement() -> usize {
1334 #[cfg(feature = "std")]
1335 {
1336 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
1337 }
1338
1339 #[cfg(not(feature = "std"))]
1340 {
1341 PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
1342 }
1343}
1344
1345pub(crate) fn record_payload_handle_bytes(bytes: usize) {
1350 #[cfg(feature = "std")]
1351 PAYLOAD_HANDLE_BYTES.with(|total| {
1352 if let Some(current) = total.get() {
1353 total.set(Some(current.saturating_add(bytes)));
1354 }
1355 });
1356
1357 #[cfg(not(feature = "std"))]
1358 {
1359 let mut total = PAYLOAD_HANDLE_BYTES.lock();
1360 if let Some(current) = *total {
1361 *total = Some(current.saturating_add(bytes));
1362 }
1363 }
1364}
1365
1366fn set_last_completed_handle_bytes(bytes: u64) {
1367 #[cfg(feature = "std")]
1368 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1369
1370 #[cfg(not(feature = "std"))]
1371 {
1372 *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1373 }
1374}
1375
1376pub fn take_last_completed_handle_bytes() -> u64 {
1377 #[cfg(feature = "std")]
1378 {
1379 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1380 }
1381
1382 #[cfg(not(feature = "std"))]
1383 {
1384 let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1385 let value = *total;
1386 *total = 0;
1387 value
1388 }
1389}
1390
1391fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1392 #[cfg(feature = "std")]
1393 {
1394 ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1395 let mut state = capture.get()?;
1396 let result = f(&mut state);
1397 capture.set(Some(state));
1398 Some(result)
1399 })
1400 }
1401
1402 #[cfg(not(feature = "std"))]
1403 {
1404 let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1405 let state = capture.as_mut()?;
1406 Some(f(state))
1407 }
1408}
1409
1410pub struct CuMsgIoCaptureGuard;
1411
1412impl CuMsgIoCaptureGuard {
1413 pub fn select_slot(&self, slot: usize) {
1414 let _ = with_active_capture_mut(|capture| {
1415 debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1416 capture.current_slot = Some(slot);
1417 });
1418 }
1419}
1420
1421impl Drop for CuMsgIoCaptureGuard {
1422 fn drop(&mut self) {
1423 set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1424
1425 #[cfg(feature = "std")]
1426 ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1427
1428 #[cfg(not(feature = "std"))]
1429 {
1430 *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1431 }
1432 }
1433}
1434
1435pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1436 cache.clear();
1437 set_last_completed_handle_bytes(0);
1438 begin_payload_io_measurement();
1439 let (cache_addr, cache_len) = cache.raw_parts();
1440 let capture = ActiveCuMsgIoCapture {
1441 cache_addr,
1442 cache_len,
1443 current_slot: None,
1444 };
1445
1446 #[cfg(feature = "std")]
1447 ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1448 debug_assert!(
1449 state.get().is_none(),
1450 "CopperList payload IO capture must not be nested"
1451 );
1452 state.set(Some(capture));
1453 });
1454
1455 #[cfg(not(feature = "std"))]
1456 {
1457 let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1458 debug_assert!(
1459 state.is_none(),
1460 "CopperList payload IO capture must not be nested"
1461 );
1462 *state = Some(capture);
1463 }
1464
1465 CuMsgIoCaptureGuard
1466}
1467
1468pub(crate) fn current_payload_handle_bytes() -> usize {
1469 current_payload_io_measurement()
1470}
1471
1472pub(crate) fn record_current_slot_payload_io_stats(
1473 fixed_bytes: usize,
1474 encoded_bytes: usize,
1475 handle_bytes: usize,
1476) {
1477 let _ = with_active_capture_mut(|capture| {
1478 let Some(slot) = capture.current_slot else {
1479 return;
1480 };
1481 if slot >= capture.cache_len {
1482 return;
1483 }
1484 let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1486 let entry = unsafe { &*cache_ptr.add(slot) };
1487 entry.set(CuMsgIoStats {
1488 present: true,
1489 resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1490 encoded_bytes: encoded_bytes as u64,
1491 handle_bytes: handle_bytes as u64,
1492 });
1493 });
1494}
1495
1496pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1503where
1504 T: Encode,
1505{
1506 begin_payload_io_measurement();
1507 begin_observed_encode();
1508
1509 let result = (|| {
1510 let mut encoder =
1511 EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1512 payload.encode(&mut encoder).map_err(|e| {
1513 CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1514 })?;
1515 let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1516 debug_assert_eq!(encoded_bytes, finish_observed_encode());
1517 let handle_bytes = finish_payload_io_measurement();
1518 Ok(PayloadIoStats {
1519 resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1520 encoded_bytes,
1521 handle_bytes,
1522 })
1523 })();
1524
1525 if result.is_err() {
1526 abort_payload_io_measurement();
1527 abort_observed_encode();
1528 }
1529
1530 result
1531}
1532
1533fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1534 let Ok(msg_types) = graph.get_node_output_msg_types_by_id(node_id) else {
1535 return Vec::new();
1536 };
1537
1538 let mut outputs = Vec::new();
1539 for (port_idx, msg) in msg_types.into_iter().enumerate() {
1540 let mut port_label = String::from("out");
1541 port_label.push_str(&port_idx.to_string());
1542 port_label.push_str(": ");
1543 port_label.push_str(msg.as_str());
1544 outputs.push((msg, port_label));
1545 }
1546 outputs
1547}
1548
1549pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1551 let graph = config.get_graph(Some(mission))?;
1552 let mut nodes: Map<String, MonitorNode> = Map::new();
1553 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1554
1555 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1556 for bridge in &config.bridges {
1557 bridge_lookup.insert(bridge.id.as_str(), bridge);
1558 }
1559
1560 for (node_idx, node) in graph.get_all_nodes() {
1561 let node_id = node.get_id();
1562 let task_kind = match node.get_flavor() {
1563 Flavor::Bridge => ComponentType::Bridge,
1564 Flavor::Task => match resolve_task_kind_for_id(graph, node_idx)? {
1565 TaskKind::Source => ComponentType::Source,
1566 TaskKind::Regular => ComponentType::Task,
1567 TaskKind::Sink => ComponentType::Sink,
1568 },
1569 };
1570
1571 let mut inputs = Vec::new();
1572 let mut outputs = Vec::new();
1573 if task_kind == ComponentType::Bridge
1574 && let Some(bridge) = bridge_lookup.get(node_id.as_str())
1575 {
1576 for ch in &bridge.channels {
1577 match ch {
1578 BridgeChannelConfigRepresentation::Rx { id, .. } => outputs.push(id.clone()),
1579 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1580 }
1581 }
1582 } else {
1583 match task_kind {
1584 ComponentType::Source => {
1585 let ports = collect_output_ports(graph, node_idx);
1586 let mut port_map: Map<String, String> = Map::new();
1587 for (msg_type, label) in ports {
1588 port_map.insert(msg_type, label.clone());
1589 outputs.push(label);
1590 }
1591 output_port_lookup.insert(node_id.clone(), port_map);
1592 }
1593 ComponentType::Task => {
1594 inputs.push("in".to_string());
1595 let ports = collect_output_ports(graph, node_idx);
1596 let mut port_map: Map<String, String> = Map::new();
1597 for (msg_type, label) in ports {
1598 port_map.insert(msg_type, label.clone());
1599 outputs.push(label);
1600 }
1601 output_port_lookup.insert(node_id.clone(), port_map);
1602 }
1603 ComponentType::Sink => {
1604 inputs.push("in".to_string());
1605 }
1606 ComponentType::Bridge => unreachable!("handled above"),
1607 }
1608 }
1609
1610 nodes.insert(
1611 node_id.clone(),
1612 MonitorNode {
1613 id: node_id,
1614 type_name: Some(node.get_type().to_string()),
1615 kind: task_kind,
1616 inputs,
1617 outputs,
1618 },
1619 );
1620 }
1621
1622 let mut connections = Vec::new();
1623 for cnx in graph.edges() {
1624 let src = cnx.src.clone();
1625 let dst = cnx.dst.clone();
1626
1627 let src_port = cnx.src_channel.clone().or_else(|| {
1628 output_port_lookup
1629 .get(&src)
1630 .and_then(|ports| ports.get(&cnx.msg).cloned())
1631 .or_else(|| {
1632 nodes
1633 .get(&src)
1634 .and_then(|node| node.outputs.first().cloned())
1635 })
1636 });
1637 let dst_port = cnx.dst_channel.clone().or_else(|| {
1638 nodes
1639 .get(&dst)
1640 .and_then(|node| node.inputs.first().cloned())
1641 });
1642
1643 connections.push(MonitorConnection {
1644 src,
1645 src_port,
1646 dst,
1647 dst_port,
1648 msg: cnx.msg.clone(),
1649 });
1650 }
1651
1652 Ok(MonitorTopology {
1653 nodes: nodes.into_values().collect(),
1654 connections,
1655 })
1656}
1657
1658pub trait CuMonitor: Sized {
1678 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1684 where
1685 Self: Sized;
1686
1687 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1689 Ok(())
1690 }
1691
1692 fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1694
1695 fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1697
1698 fn process_error(
1702 &self,
1703 component_id: ComponentId,
1704 step: CuComponentState,
1705 error: &CuError,
1706 ) -> Decision;
1707
1708 fn process_panic(&self, _panic_message: &str) {}
1710
1711 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1713 Ok(())
1714 }
1715}
1716
1717pub struct NoMonitor {}
1720impl CuMonitor for NoMonitor {
1721 fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1722 Ok(NoMonitor {})
1723 }
1724
1725 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1726 #[cfg(all(feature = "std", debug_assertions))]
1727 register_live_log_listener(|entry, format_str, param_names| {
1728 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1729 let named: Map<String, String> = param_names
1730 .iter()
1731 .zip(params.iter())
1732 .map(|(k, v)| (k.to_string(), v.clone()))
1733 .collect();
1734
1735 if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1736 let ts = format_timestamp(entry.time.into());
1737 println!("{} [{:?}] {}", ts, entry.level, msg);
1738 }
1739 });
1740 Ok(())
1741 }
1742
1743 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1744 Ok(())
1746 }
1747
1748 fn process_error(
1749 &self,
1750 _component_id: ComponentId,
1751 _step: CuComponentState,
1752 _error: &CuError,
1753 ) -> Decision {
1754 Decision::Ignore
1756 }
1757
1758 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1759 #[cfg(all(feature = "std", debug_assertions))]
1760 unregister_live_log_listener();
1761 Ok(())
1762 }
1763}
1764
1765macro_rules! impl_monitor_tuple {
1766 ($($idx:tt => $name:ident),+) => {
1767 impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1768 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1769 where
1770 Self: Sized,
1771 {
1772 Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1773 }
1774
1775 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1776 $(self.$idx.start(ctx)?;)+
1777 Ok(())
1778 }
1779
1780 fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1781 $(self.$idx.process_copperlist(ctx, view)?;)+
1782 Ok(())
1783 }
1784
1785 fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1786 $(self.$idx.observe_copperlist_io(stats);)+
1787 }
1788
1789 fn process_error(
1790 &self,
1791 component_id: ComponentId,
1792 step: CuComponentState,
1793 error: &CuError,
1794 ) -> Decision {
1795 let mut decision = Decision::Ignore;
1796 $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1797 decision
1798 }
1799
1800 fn process_panic(&self, panic_message: &str) {
1801 $(self.$idx.process_panic(panic_message);)+
1802 }
1803
1804 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1805 $(self.$idx.stop(ctx)?;)+
1806 Ok(())
1807 }
1808 }
1809 };
1810}
1811
1812impl_monitor_tuple!(0 => M0, 1 => M1);
1813impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1814impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1815impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1816impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1817
1818#[cfg(feature = "std")]
1819pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1820 if let Some(msg) = payload.downcast_ref::<&str>() {
1821 (*msg).to_string()
1822 } else if let Some(msg) = payload.downcast_ref::<String>() {
1823 msg.clone()
1824 } else {
1825 "panic with non-string payload".to_string()
1826 }
1827}
1828
1829pub struct CountingAlloc<A: GlobalAlloc> {
1831 inner: A,
1832 allocated: AtomicUsize,
1833 deallocated: AtomicUsize,
1834}
1835
1836impl<A: GlobalAlloc> CountingAlloc<A> {
1837 pub const fn new(inner: A) -> Self {
1838 CountingAlloc {
1839 inner,
1840 allocated: AtomicUsize::new(0),
1841 deallocated: AtomicUsize::new(0),
1842 }
1843 }
1844
1845 pub fn allocated(&self) -> usize {
1846 self.allocated.load(Ordering::SeqCst)
1847 }
1848
1849 pub fn deallocated(&self) -> usize {
1850 self.deallocated.load(Ordering::SeqCst)
1851 }
1852
1853 pub fn reset(&self) {
1854 self.allocated.store(0, Ordering::SeqCst);
1855 self.deallocated.store(0, Ordering::SeqCst);
1856 }
1857}
1858
1859unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1861 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1863 let p = unsafe { self.inner.alloc(layout) };
1865 if !p.is_null() {
1866 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1867 }
1868 p
1869 }
1870
1871 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1873 unsafe { self.inner.dealloc(ptr, layout) }
1875 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1876 }
1877}
1878
1879#[cfg(feature = "memory_monitoring")]
1881pub struct ScopedAllocCounter {
1882 bf_allocated: usize,
1883 bf_deallocated: usize,
1884}
1885
1886#[cfg(feature = "memory_monitoring")]
1887impl Default for ScopedAllocCounter {
1888 fn default() -> Self {
1889 Self::new()
1890 }
1891}
1892
1893#[cfg(feature = "memory_monitoring")]
1894impl ScopedAllocCounter {
1895 pub fn new() -> Self {
1896 ScopedAllocCounter {
1897 bf_allocated: GLOBAL.allocated(),
1898 bf_deallocated: GLOBAL.deallocated(),
1899 }
1900 }
1901
1902 pub fn allocated(&self) -> usize {
1914 GLOBAL.allocated() - self.bf_allocated
1915 }
1916
1917 pub fn deallocated(&self) -> usize {
1930 GLOBAL.deallocated() - self.bf_deallocated
1931 }
1932}
1933
1934#[cfg(feature = "memory_monitoring")]
1936impl Drop for ScopedAllocCounter {
1937 fn drop(&mut self) {
1938 let _allocated = GLOBAL.allocated() - self.bf_allocated;
1939 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1940 }
1947}
1948
1949#[cfg(feature = "std")]
1950const BUCKET_COUNT: usize = 1024;
1951#[cfg(not(feature = "std"))]
1952const BUCKET_COUNT: usize = 256;
1953
1954#[derive(Debug, Clone)]
1957pub struct LiveStatistics {
1958 buckets: [u64; BUCKET_COUNT],
1959 min_val: u64,
1960 max_val: u64,
1961 sum: u128,
1962 sum_sq: u128,
1963 count: u64,
1964 max_value: u64,
1965}
1966
1967impl LiveStatistics {
1968 pub fn new_with_max(max_value: u64) -> Self {
1988 LiveStatistics {
1989 buckets: [0; BUCKET_COUNT],
1990 min_val: u64::MAX,
1991 max_val: 0,
1992 sum: 0,
1993 sum_sq: 0,
1994 count: 0,
1995 max_value,
1996 }
1997 }
1998
1999 #[inline]
2000 fn value_to_bucket(&self, value: u64) -> usize {
2001 if value >= self.max_value {
2002 BUCKET_COUNT - 1
2003 } else {
2004 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
2005 }
2006 }
2007
2008 #[inline]
2009 pub fn min(&self) -> u64 {
2010 if self.count == 0 { 0 } else { self.min_val }
2011 }
2012
2013 #[inline]
2014 pub fn max(&self) -> u64 {
2015 self.max_val
2016 }
2017
2018 #[inline]
2019 pub fn mean(&self) -> f64 {
2020 if self.count == 0 {
2021 0.0
2022 } else {
2023 self.sum as f64 / self.count as f64
2024 }
2025 }
2026
2027 #[inline]
2028 pub fn stdev(&self) -> f64 {
2029 if self.count == 0 {
2030 return 0.0;
2031 }
2032 let mean = self.mean();
2033 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
2034 if variance < 0.0 {
2035 return 0.0;
2036 }
2037 #[cfg(feature = "std")]
2038 return variance.sqrt();
2039 #[cfg(not(feature = "std"))]
2040 return sqrt(variance);
2041 }
2042
2043 #[inline]
2044 pub fn percentile(&self, percentile: f64) -> u64 {
2045 if self.count == 0 {
2046 return 0;
2047 }
2048
2049 let target_count = (self.count as f64 * percentile) as u64;
2050 let mut accumulated = 0u64;
2051
2052 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
2053 accumulated += bucket_count;
2054 if accumulated >= target_count {
2055 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
2057 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
2058 let bucket_fraction = if bucket_count > 0 {
2059 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
2060 } else {
2061 0.5
2062 };
2063 return bucket_start
2064 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
2065 }
2066 }
2067
2068 self.max_val
2069 }
2070
2071 #[inline]
2073 pub fn record(&mut self, value: u64) {
2074 if value < self.min_val {
2075 self.min_val = value;
2076 }
2077 if value > self.max_val {
2078 self.max_val = value;
2079 }
2080 let value_u128 = value as u128;
2081 self.sum += value_u128;
2082 self.sum_sq += value_u128 * value_u128;
2083 self.count += 1;
2084
2085 let bucket = self.value_to_bucket(value);
2086 self.buckets[bucket] += 1;
2087 }
2088
2089 #[inline]
2090 pub fn len(&self) -> u64 {
2091 self.count
2092 }
2093
2094 #[inline]
2095 pub fn is_empty(&self) -> bool {
2096 self.count == 0
2097 }
2098
2099 #[inline]
2100 pub fn reset(&mut self) {
2101 self.buckets.fill(0);
2102 self.min_val = u64::MAX;
2103 self.max_val = 0;
2104 self.sum = 0;
2105 self.sum_sq = 0;
2106 self.count = 0;
2107 }
2108}
2109
2110#[derive(Debug, Clone)]
2113pub struct CuDurationStatistics {
2114 bare: LiveStatistics,
2115 jitter: LiveStatistics,
2116 last_value: CuDuration,
2117}
2118
2119impl CuDurationStatistics {
2120 pub fn new(max: CuDuration) -> Self {
2121 let CuDuration(max) = max;
2122 CuDurationStatistics {
2123 bare: LiveStatistics::new_with_max(max),
2124 jitter: LiveStatistics::new_with_max(max),
2125 last_value: CuDuration::default(),
2126 }
2127 }
2128
2129 #[inline]
2130 pub fn min(&self) -> CuDuration {
2131 CuDuration(self.bare.min())
2132 }
2133
2134 #[inline]
2135 pub fn max(&self) -> CuDuration {
2136 CuDuration(self.bare.max())
2137 }
2138
2139 #[inline]
2140 pub fn mean(&self) -> CuDuration {
2141 CuDuration(self.bare.mean() as u64) }
2143
2144 #[inline]
2145 pub fn percentile(&self, percentile: f64) -> CuDuration {
2146 CuDuration(self.bare.percentile(percentile))
2147 }
2148
2149 #[inline]
2150 pub fn stddev(&self) -> CuDuration {
2151 CuDuration(self.bare.stdev() as u64)
2152 }
2153
2154 #[inline]
2155 pub fn len(&self) -> u64 {
2156 self.bare.len()
2157 }
2158
2159 #[inline]
2160 pub fn is_empty(&self) -> bool {
2161 self.bare.len() == 0
2162 }
2163
2164 #[inline]
2165 pub fn jitter_min(&self) -> CuDuration {
2166 CuDuration(self.jitter.min())
2167 }
2168
2169 #[inline]
2170 pub fn jitter_max(&self) -> CuDuration {
2171 CuDuration(self.jitter.max())
2172 }
2173
2174 #[inline]
2175 pub fn jitter_mean(&self) -> CuDuration {
2176 CuDuration(self.jitter.mean() as u64)
2177 }
2178
2179 #[inline]
2180 pub fn jitter_stddev(&self) -> CuDuration {
2181 CuDuration(self.jitter.stdev() as u64)
2182 }
2183
2184 #[inline]
2185 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
2186 CuDuration(self.jitter.percentile(percentile))
2187 }
2188
2189 #[inline]
2190 pub fn record(&mut self, value: CuDuration) {
2191 let CuDuration(nanos) = value;
2192 if self.bare.is_empty() {
2193 self.bare.record(nanos);
2194 self.last_value = value;
2195 return;
2196 }
2197 self.bare.record(nanos);
2198 let CuDuration(last_nanos) = self.last_value;
2199 self.jitter.record(nanos.abs_diff(last_nanos));
2200 self.last_value = value;
2201 }
2202
2203 #[inline]
2204 pub fn reset(&mut self) {
2205 self.bare.reset();
2206 self.jitter.reset();
2207 }
2208}
2209
2210#[cfg(test)]
2211mod tests {
2212 use super::*;
2213 use core::sync::atomic::{AtomicUsize, Ordering};
2214
2215 #[derive(Clone, Copy)]
2216 enum TestDecision {
2217 Ignore,
2218 Abort,
2219 Shutdown,
2220 }
2221
2222 struct TestMonitor {
2223 decision: TestDecision,
2224 copperlist_calls: AtomicUsize,
2225 panic_calls: AtomicUsize,
2226 }
2227
2228 impl TestMonitor {
2229 fn new_with(decision: TestDecision) -> Self {
2230 Self {
2231 decision,
2232 copperlist_calls: AtomicUsize::new(0),
2233 panic_calls: AtomicUsize::new(0),
2234 }
2235 }
2236 }
2237
2238 fn test_metadata() -> CuMonitoringMetadata {
2239 const COMPONENTS: &[MonitorComponentMetadata] = &[
2240 MonitorComponentMetadata::new("a", ComponentType::Task, None),
2241 MonitorComponentMetadata::new("b", ComponentType::Task, None),
2242 ];
2243 CuMonitoringMetadata::new(
2244 CompactString::from(crate::config::DEFAULT_MISSION_ID),
2245 COMPONENTS,
2246 &[],
2247 CopperListInfo::new(0, 0),
2248 MonitorTopology::default(),
2249 None,
2250 )
2251 .expect("test metadata should be valid")
2252 }
2253
2254 impl CuMonitor for TestMonitor {
2255 fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
2256 let monitor = Self::new_with(TestDecision::Ignore);
2257 #[cfg(feature = "std")]
2258 let _ = runtime.execution_probe();
2259 Ok(monitor)
2260 }
2261
2262 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
2263 self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
2264 Ok(())
2265 }
2266
2267 fn process_error(
2268 &self,
2269 _component_id: ComponentId,
2270 _step: CuComponentState,
2271 _error: &CuError,
2272 ) -> Decision {
2273 match self.decision {
2274 TestDecision::Ignore => Decision::Ignore,
2275 TestDecision::Abort => Decision::Abort,
2276 TestDecision::Shutdown => Decision::Shutdown,
2277 }
2278 }
2279
2280 fn process_panic(&self, _panic_message: &str) {
2281 self.panic_calls.fetch_add(1, Ordering::SeqCst);
2282 }
2283 }
2284
2285 #[test]
2286 fn test_live_statistics_percentiles() {
2287 let mut stats = LiveStatistics::new_with_max(1000);
2288
2289 for i in 0..100 {
2291 stats.record(i);
2292 }
2293
2294 assert_eq!(stats.len(), 100);
2295 assert_eq!(stats.min(), 0);
2296 assert_eq!(stats.max(), 99);
2297 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
2301 let p90 = stats.percentile(0.90);
2302 let p95 = stats.percentile(0.95);
2303 let p99 = stats.percentile(0.99);
2304
2305 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
2307 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
2308 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
2309 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
2310 }
2311
2312 #[test]
2313 fn test_duration_stats() {
2314 let mut stats = CuDurationStatistics::new(CuDuration(1000));
2315 stats.record(CuDuration(100));
2316 stats.record(CuDuration(200));
2317 stats.record(CuDuration(500));
2318 stats.record(CuDuration(400));
2319 assert_eq!(stats.min(), CuDuration(100));
2320 assert_eq!(stats.max(), CuDuration(500));
2321 assert_eq!(stats.mean(), CuDuration(300));
2322 assert_eq!(stats.len(), 4);
2323 assert_eq!(stats.jitter.len(), 3);
2324 assert_eq!(stats.jitter_min(), CuDuration(100));
2325 assert_eq!(stats.jitter_max(), CuDuration(300));
2326 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
2327 stats.reset();
2328 assert_eq!(stats.len(), 0);
2329 }
2330
2331 #[test]
2332 fn test_duration_stats_large_samples_do_not_overflow() {
2333 let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
2334 stats.record(CuDuration(5_000_000_000));
2335 stats.record(CuDuration(8_000_000_000));
2336
2337 assert_eq!(stats.min(), CuDuration(5_000_000_000));
2338 assert_eq!(stats.max(), CuDuration(8_000_000_000));
2339 assert_eq!(stats.mean(), CuDuration(6_500_000_000));
2340 assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
2341 assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
2342 }
2343
2344 #[test]
2345 fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
2346 let err = CuError::from("boom");
2347
2348 let two = (
2349 TestMonitor::new_with(TestDecision::Ignore),
2350 TestMonitor::new_with(TestDecision::Shutdown),
2351 );
2352 assert!(matches!(
2353 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2354 Decision::Shutdown
2355 ));
2356
2357 let two = (
2358 TestMonitor::new_with(TestDecision::Ignore),
2359 TestMonitor::new_with(TestDecision::Abort),
2360 );
2361 assert!(matches!(
2362 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2363 Decision::Abort
2364 ));
2365 }
2366
2367 #[test]
2368 fn tuple_monitor_fans_out_callbacks() {
2369 let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2370 test_metadata(),
2371 CuMonitoringRuntime::unavailable(),
2372 )
2373 .expect("tuple new");
2374 let (ctx, _clock_control) = CuContext::new_mock_clock();
2375 let empty_view = test_metadata().layout().view(&[]);
2376 monitors
2377 .process_copperlist(&ctx, empty_view)
2378 .expect("process_copperlist should fan out");
2379 monitors.process_panic("panic marker");
2380
2381 assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2382 assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2383 assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2384 assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2385 }
2386
2387 fn encoded_size<E: Encode>(value: &E) -> usize {
2388 let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2389 value
2390 .encode(&mut encoder)
2391 .expect("size measurement encoder should not fail");
2392 encoder.into_writer().bytes_written
2393 }
2394
2395 #[test]
2396 fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2397 let payload = vec![1u8, 2, 3, 4];
2398 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2399
2400 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2401 assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2402 assert_eq!(io.handle_bytes, 0);
2403 }
2404
2405 #[test]
2406 fn payload_io_stats_tracks_handle_backed_storage() {
2407 let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2408 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2409
2410 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2411 assert_eq!(
2412 io.resident_bytes,
2413 core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2414 );
2415 assert_eq!(io.handle_bytes, 32);
2416 }
2417
2418 #[test]
2419 fn runtime_execution_probe_roundtrip_marker() {
2420 let probe = RuntimeExecutionProbe::default();
2421 assert!(probe.marker().is_none());
2422 assert_eq!(probe.sequence(), 0);
2423
2424 probe.record(ExecutionMarker {
2425 component_id: ComponentId::new(7),
2426 step: CuComponentState::Process,
2427 culistid: Some(42),
2428 });
2429
2430 let marker = probe.marker().expect("marker should be available");
2431 assert_eq!(marker.component_id, ComponentId::new(7));
2432 assert!(matches!(marker.step, CuComponentState::Process));
2433 assert_eq!(marker.culistid, Some(42));
2434 assert_eq!(probe.sequence(), 1);
2435 }
2436}