1use crate::config::CuConfig;
5use crate::config::{
6 BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7 TaskKind, resolve_task_kind_for_id,
8};
9use crate::context::CuContext;
10use crate::cutask::CuMsgMetadata;
11use bincode::Encode;
12use bincode::config::standard;
13use bincode::enc::EncoderImpl;
14use bincode::enc::write::SizeWriter;
15use compact_str::CompactString;
16use cu29_clock::CuDuration;
17#[allow(unused_imports)]
18use cu29_log::CuLogLevel;
19#[cfg(all(feature = "std", debug_assertions))]
20use cu29_log_runtime::{
21 format_message_only, register_live_log_listener, unregister_live_log_listener,
22};
23use cu29_traits::{
24 CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
25 finish_observed_encode,
26};
27use portable_atomic::{
28 AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
29};
30use serde_derive::{Deserialize, Serialize};
31
32#[cfg(not(feature = "std"))]
33extern crate alloc;
34
35#[cfg(feature = "std")]
36use core::cell::Cell;
37#[cfg(feature = "std")]
38use std::backtrace::Backtrace;
39#[cfg(feature = "std")]
40use std::fs::File;
41#[cfg(feature = "std")]
42use std::io::Write;
43#[cfg(feature = "std")]
44use std::panic::PanicHookInfo;
45#[cfg(feature = "std")]
46use std::sync::{Arc, Mutex as StdMutex, OnceLock};
47#[cfg(feature = "std")]
48use std::thread_local;
49#[cfg(feature = "std")]
50use std::time::{SystemTime, UNIX_EPOCH};
51#[cfg(feature = "std")]
52use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
53
54#[cfg(not(feature = "std"))]
55use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
56#[cfg(not(target_has_atomic = "64"))]
57use spin::Mutex;
58#[cfg(not(feature = "std"))]
59use spin::Mutex as SpinMutex;
60
61#[cfg(not(feature = "std"))]
62mod imp {
63 pub use alloc::alloc::{GlobalAlloc, Layout};
64 #[cfg(target_has_atomic = "64")]
65 pub use core::sync::atomic::AtomicU64;
66 pub use core::sync::atomic::{AtomicUsize, Ordering};
67 pub use libm::sqrt;
68}
69
70#[cfg(feature = "std")]
71mod imp {
72 #[cfg(feature = "memory_monitoring")]
73 use super::CountingAlloc;
74 #[cfg(feature = "memory_monitoring")]
75 pub use std::alloc::System;
76 pub use std::alloc::{GlobalAlloc, Layout};
77 #[cfg(target_has_atomic = "64")]
78 pub use std::sync::atomic::AtomicU64;
79 pub use std::sync::atomic::{AtomicUsize, Ordering};
80 #[cfg(feature = "memory_monitoring")]
81 #[global_allocator]
82 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
83}
84
85use imp::*;
86
87#[cfg(all(feature = "std", debug_assertions))]
88fn format_timestamp(time: CuDuration) -> String {
89 let nanos = time.as_nanos();
91 let total_seconds = nanos / 1_000_000_000;
92 let hours = total_seconds / 3600;
93 let minutes = (total_seconds / 60) % 60;
94 let seconds = total_seconds % 60;
95 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
96 format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
97}
98
99#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
101pub enum CuComponentState {
102 Start,
103 Preprocess,
104 Process,
105 Postprocess,
106 Stop,
107}
108
109#[repr(transparent)]
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub struct ComponentId(usize);
113
114impl ComponentId {
115 pub const INVALID: Self = Self(usize::MAX);
116
117 #[inline]
118 pub const fn new(index: usize) -> Self {
119 Self(index)
120 }
121
122 #[inline]
123 pub const fn index(self) -> usize {
124 self.0
125 }
126}
127
128impl core::fmt::Display for ComponentId {
129 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
130 self.0.fmt(f)
131 }
132}
133
134impl From<ComponentId> for usize {
135 fn from(value: ComponentId) -> Self {
136 value.index()
137 }
138}
139
140#[repr(transparent)]
142#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
143pub struct CuListSlot(usize);
144
145impl CuListSlot {
146 #[inline]
147 pub const fn new(index: usize) -> Self {
148 Self(index)
149 }
150
151 #[inline]
152 pub const fn index(self) -> usize {
153 self.0
154 }
155}
156
157impl From<CuListSlot> for usize {
158 fn from(value: CuListSlot) -> Self {
159 value.index()
160 }
161}
162
163#[derive(Debug, Clone, Copy)]
167pub struct CopperListLayout {
168 components: &'static [MonitorComponentMetadata],
169 slot_to_component: &'static [ComponentId],
170}
171
172impl CopperListLayout {
173 #[inline]
174 pub const fn new(
175 components: &'static [MonitorComponentMetadata],
176 slot_to_component: &'static [ComponentId],
177 ) -> Self {
178 Self {
179 components,
180 slot_to_component,
181 }
182 }
183
184 #[inline]
185 pub const fn components(self) -> &'static [MonitorComponentMetadata] {
186 self.components
187 }
188
189 #[inline]
190 pub const fn component_count(self) -> usize {
191 self.components.len()
192 }
193
194 #[inline]
195 pub const fn culist_slot_count(self) -> usize {
196 self.slot_to_component.len()
197 }
198
199 #[inline]
200 pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
201 &self.components[id.index()]
202 }
203
204 #[inline]
205 pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
206 self.slot_to_component[culist_slot.index()]
207 }
208
209 #[inline]
210 pub const fn slot_to_component(self) -> &'static [ComponentId] {
211 self.slot_to_component
212 }
213
214 #[inline]
215 pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
216 CopperListView::new(self, msgs)
217 }
218}
219
220#[derive(Debug, Clone, Copy)]
222pub struct CopperListView<'a> {
223 layout: CopperListLayout,
224 msgs: &'a [&'a CuMsgMetadata],
225}
226
227impl<'a> CopperListView<'a> {
228 #[inline]
229 pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
230 assert_eq!(
231 msgs.len(),
232 layout.culist_slot_count(),
233 "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
234 msgs.len(),
235 layout.culist_slot_count()
236 );
237 Self { layout, msgs }
238 }
239
240 #[inline]
241 pub const fn layout(self) -> CopperListLayout {
242 self.layout
243 }
244
245 #[inline]
246 pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
247 self.msgs
248 }
249
250 #[inline]
251 pub const fn len(self) -> usize {
252 self.msgs.len()
253 }
254
255 #[inline]
256 pub const fn is_empty(self) -> bool {
257 self.msgs.is_empty()
258 }
259
260 #[inline]
261 pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
262 let index = culist_slot.index();
263 CopperListEntry {
264 culist_slot,
265 component_id: self.layout.component_for_slot(culist_slot),
266 msg: self.msgs[index],
267 }
268 }
269
270 pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
271 self.msgs.iter().enumerate().map(move |(idx, msg)| {
272 let culist_slot = CuListSlot::new(idx);
273 CopperListEntry {
274 culist_slot,
275 component_id: self.layout.component_for_slot(culist_slot),
276 msg,
277 }
278 })
279 }
280}
281
282#[derive(Debug, Clone, Copy)]
284pub struct CopperListEntry<'a> {
285 pub culist_slot: CuListSlot,
286 pub component_id: ComponentId,
287 pub msg: &'a CuMsgMetadata,
288}
289
290impl<'a> CopperListEntry<'a> {
291 #[inline]
292 pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
293 layout.component(self.component_id)
294 }
295
296 #[inline]
297 pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
298 layout.component(self.component_id).kind()
299 }
300}
301
302#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
304pub struct ExecutionMarker {
305 pub component_id: ComponentId,
307 pub step: CuComponentState,
309 pub culistid: Option<u64>,
311}
312
313#[derive(Debug)]
319pub struct RuntimeExecutionProbe {
320 component_id: AtomicUsize,
321 step: AtomicUsize,
322 #[cfg(target_has_atomic = "64")]
323 culistid: AtomicU64,
324 #[cfg(target_has_atomic = "64")]
325 culistid_present: AtomicUsize,
326 #[cfg(not(target_has_atomic = "64"))]
327 culistid: Mutex<Option<u64>>,
328 sequence: AtomicUsize,
329}
330
331impl Default for RuntimeExecutionProbe {
332 fn default() -> Self {
333 Self {
334 component_id: AtomicUsize::new(ComponentId::INVALID.index()),
335 step: AtomicUsize::new(0),
336 #[cfg(target_has_atomic = "64")]
337 culistid: AtomicU64::new(0),
338 #[cfg(target_has_atomic = "64")]
339 culistid_present: AtomicUsize::new(0),
340 #[cfg(not(target_has_atomic = "64"))]
341 culistid: Mutex::new(None),
342 sequence: AtomicUsize::new(0),
343 }
344 }
345}
346
347impl RuntimeExecutionProbe {
348 #[inline]
349 pub fn record(&self, marker: ExecutionMarker) {
350 self.component_id
351 .store(marker.component_id.index(), Ordering::Relaxed);
352 self.step
353 .store(component_state_to_usize(marker.step), Ordering::Relaxed);
354 #[cfg(target_has_atomic = "64")]
355 match marker.culistid {
356 Some(culistid) => {
357 self.culistid.store(culistid, Ordering::Relaxed);
358 self.culistid_present.store(1, Ordering::Relaxed);
359 }
360 None => {
361 self.culistid_present.store(0, Ordering::Relaxed);
362 }
363 }
364 #[cfg(not(target_has_atomic = "64"))]
365 {
366 *self.culistid.lock() = marker.culistid;
367 }
368 self.sequence.fetch_add(1, Ordering::Release);
369 }
370
371 #[inline]
372 pub fn sequence(&self) -> usize {
373 self.sequence.load(Ordering::Acquire)
374 }
375
376 #[inline]
377 pub fn marker(&self) -> Option<ExecutionMarker> {
378 loop {
381 let seq_before = self.sequence.load(Ordering::Acquire);
382 let component_id = self.component_id.load(Ordering::Relaxed);
383 let step = self.step.load(Ordering::Relaxed);
384 #[cfg(target_has_atomic = "64")]
385 let culistid_present = self.culistid_present.load(Ordering::Relaxed);
386 #[cfg(target_has_atomic = "64")]
387 let culistid_value = self.culistid.load(Ordering::Relaxed);
388 #[cfg(not(target_has_atomic = "64"))]
389 let culistid = *self.culistid.lock();
390 let seq_after = self.sequence.load(Ordering::Acquire);
391 if seq_before == seq_after {
392 if component_id == ComponentId::INVALID.index() {
393 return None;
394 }
395 let step = usize_to_component_state(step);
396 #[cfg(target_has_atomic = "64")]
397 let culistid = if culistid_present == 0 {
398 None
399 } else {
400 Some(culistid_value)
401 };
402 return Some(ExecutionMarker {
403 component_id: ComponentId::new(component_id),
404 step,
405 culistid,
406 });
407 }
408 }
409 }
410}
411
412#[inline]
413const fn component_state_to_usize(step: CuComponentState) -> usize {
414 match step {
415 CuComponentState::Start => 0,
416 CuComponentState::Preprocess => 1,
417 CuComponentState::Process => 2,
418 CuComponentState::Postprocess => 3,
419 CuComponentState::Stop => 4,
420 }
421}
422
423#[inline]
424const fn usize_to_component_state(step: usize) -> CuComponentState {
425 match step {
426 0 => CuComponentState::Start,
427 1 => CuComponentState::Preprocess,
428 2 => CuComponentState::Process,
429 3 => CuComponentState::Postprocess,
430 _ => CuComponentState::Stop,
431 }
432}
433
434#[cfg(feature = "std")]
435pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
436
437#[derive(Debug, Clone)]
442pub struct MonitorExecutionProbe {
443 #[cfg(feature = "std")]
444 inner: Option<ExecutionProbeHandle>,
445}
446
447impl Default for MonitorExecutionProbe {
448 fn default() -> Self {
449 Self::unavailable()
450 }
451}
452
453impl MonitorExecutionProbe {
454 #[cfg(feature = "std")]
455 pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
456 Self {
457 inner: Some(handle),
458 }
459 }
460
461 pub const fn unavailable() -> Self {
462 Self {
463 #[cfg(feature = "std")]
464 inner: None,
465 }
466 }
467
468 pub fn is_available(&self) -> bool {
469 #[cfg(feature = "std")]
470 {
471 self.inner.is_some()
472 }
473 #[cfg(not(feature = "std"))]
474 {
475 false
476 }
477 }
478
479 pub fn marker(&self) -> Option<ExecutionMarker> {
480 #[cfg(feature = "std")]
481 {
482 self.inner.as_ref().and_then(|probe| probe.marker())
483 }
484 #[cfg(not(feature = "std"))]
485 {
486 None
487 }
488 }
489
490 pub fn sequence(&self) -> Option<usize> {
491 #[cfg(feature = "std")]
492 {
493 self.inner.as_ref().map(|probe| probe.sequence())
494 }
495 #[cfg(not(feature = "std"))]
496 {
497 None
498 }
499 }
500}
501
502#[derive(Debug, Clone, Copy, PartialEq, Eq)]
507#[non_exhaustive]
508pub enum ComponentType {
509 Source,
510 Task,
511 Sink,
512 Bridge,
513}
514
515impl ComponentType {
516 pub const fn is_task(self) -> bool {
517 !matches!(self, Self::Bridge)
518 }
519}
520
521#[derive(Debug, Clone, Copy, PartialEq, Eq)]
523pub struct MonitorComponentMetadata {
524 id: &'static str,
525 kind: ComponentType,
526 type_name: Option<&'static str>,
527}
528
529impl MonitorComponentMetadata {
530 pub const fn new(
531 id: &'static str,
532 kind: ComponentType,
533 type_name: Option<&'static str>,
534 ) -> Self {
535 Self {
536 id,
537 kind,
538 type_name,
539 }
540 }
541
542 pub const fn id(&self) -> &'static str {
544 self.id
545 }
546
547 pub const fn kind(&self) -> ComponentType {
548 self.kind
549 }
550
551 pub const fn type_name(&self) -> Option<&'static str> {
553 self.type_name
554 }
555}
556
557#[derive(Debug, Clone)]
562pub struct CuMonitoringMetadata {
563 mission_id: CompactString,
564 subsystem_id: Option<CompactString>,
565 instance_id: u32,
566 layout: CopperListLayout,
567 copperlist_info: CopperListInfo,
568 topology: MonitorTopology,
569 monitor_config: Option<ComponentConfig>,
570}
571
572impl CuMonitoringMetadata {
573 pub fn new(
574 mission_id: CompactString,
575 components: &'static [MonitorComponentMetadata],
576 culist_component_mapping: &'static [ComponentId],
577 copperlist_info: CopperListInfo,
578 topology: MonitorTopology,
579 monitor_config: Option<ComponentConfig>,
580 ) -> CuResult<Self> {
581 Self::validate_components(components)?;
582 Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
583 Ok(Self {
584 mission_id,
585 subsystem_id: None,
586 instance_id: 0,
587 layout: CopperListLayout::new(components, culist_component_mapping),
588 copperlist_info,
589 topology,
590 monitor_config,
591 })
592 }
593
594 fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
595 let mut seen_bridge = false;
596 for component in components {
597 match component.kind() {
598 component_type if component_type.is_task() && seen_bridge => {
599 return Err(CuError::from(
600 "invalid monitor metadata: task-family components must appear before bridges",
601 ));
602 }
603 ComponentType::Bridge => seen_bridge = true,
604 _ => {}
605 }
606 }
607 Ok(())
608 }
609
610 fn validate_culist_mapping(
611 components_len: usize,
612 culist_component_mapping: &'static [ComponentId],
613 ) -> CuResult<()> {
614 for component_idx in culist_component_mapping {
615 if component_idx.index() >= components_len {
616 return Err(CuError::from(
617 "invalid monitor metadata: culist mapping points past components table",
618 ));
619 }
620 }
621 Ok(())
622 }
623
624 pub fn mission_id(&self) -> &str {
626 self.mission_id.as_str()
627 }
628
629 pub fn subsystem_id(&self) -> Option<&str> {
632 self.subsystem_id.as_deref()
633 }
634
635 pub fn instance_id(&self) -> u32 {
637 self.instance_id
638 }
639
640 pub fn components(&self) -> &'static [MonitorComponentMetadata] {
644 self.layout.components()
645 }
646
647 pub const fn component_count(&self) -> usize {
649 self.layout.component_count()
650 }
651
652 pub const fn layout(&self) -> CopperListLayout {
654 self.layout
655 }
656
657 pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
658 self.layout.component(component_id)
659 }
660
661 pub fn component_id(&self, component_id: ComponentId) -> &'static str {
662 self.component(component_id).id()
663 }
664
665 pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
666 self.component(component_id).kind()
667 }
668
669 pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
670 self.layout
671 .components()
672 .iter()
673 .position(|component| component.id() == component_id)
674 .map(ComponentId::new)
675 }
676
677 pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
681 self.layout.slot_to_component()
682 }
683
684 pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
685 self.layout.component_for_slot(culist_slot)
686 }
687
688 pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
689 self.layout.view(msgs)
690 }
691
692 pub const fn copperlist_info(&self) -> CopperListInfo {
693 self.copperlist_info
694 }
695
696 pub fn topology(&self) -> &MonitorTopology {
701 &self.topology
702 }
703
704 pub fn monitor_config(&self) -> Option<&ComponentConfig> {
705 self.monitor_config.as_ref()
706 }
707
708 pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
709 self.monitor_config = monitor_config;
710 self
711 }
712
713 pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
714 self.subsystem_id = subsystem_id.map(CompactString::from);
715 self
716 }
717
718 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
719 self.instance_id = instance_id;
720 self
721 }
722}
723
724#[derive(Debug, Clone, Default)]
728pub struct CuMonitoringRuntime {
729 execution_probe: MonitorExecutionProbe,
730}
731
732impl CuMonitoringRuntime {
733 #[cfg(feature = "std")]
734 pub fn new(execution_probe: MonitorExecutionProbe) -> Self {
735 ensure_runtime_panic_hook_installed();
736 Self { execution_probe }
737 }
738
739 #[cfg(not(feature = "std"))]
740 pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
741 Self { execution_probe }
742 }
743
744 #[cfg(feature = "std")]
745 pub fn unavailable() -> Self {
746 Self::new(MonitorExecutionProbe::unavailable())
747 }
748
749 #[cfg(not(feature = "std"))]
750 pub const fn unavailable() -> Self {
751 Self::new(MonitorExecutionProbe::unavailable())
752 }
753
754 pub fn execution_probe(&self) -> &MonitorExecutionProbe {
755 &self.execution_probe
756 }
757
758 #[cfg(feature = "std")]
759 pub fn register_panic_cleanup<F>(&self, callback: F) -> PanicHookRegistration
760 where
761 F: Fn(&PanicReport) + Send + Sync + 'static,
762 {
763 ensure_runtime_panic_hook_installed();
764 register_panic_cleanup(callback)
765 }
766
767 #[cfg(feature = "std")]
768 pub fn register_panic_action<F>(&self, callback: F) -> PanicHookRegistration
769 where
770 F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
771 {
772 ensure_runtime_panic_hook_installed();
773 register_panic_action(callback)
774 }
775}
776
777#[cfg(feature = "std")]
778type PanicCleanupCallback = Arc<dyn Fn(&PanicReport) + Send + Sync + 'static>;
779#[cfg(feature = "std")]
780type PanicActionCallback = Arc<dyn Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static>;
781
782#[cfg(feature = "std")]
783#[derive(Debug, Clone)]
784pub struct PanicReport {
785 message: String,
786 location: Option<String>,
787 thread_name: Option<String>,
788 backtrace: String,
789 timestamp_unix_ms: u128,
790 crash_report_path: Option<String>,
791}
792
793#[cfg(feature = "std")]
794impl PanicReport {
795 fn capture(info: &PanicHookInfo<'_>) -> Self {
796 let location = info
797 .location()
798 .map(|loc| format!("{}:{}:{}", loc.file(), loc.line(), loc.column()));
799 let thread_name = std::thread::current().name().map(|name| name.to_string());
800 let timestamp_unix_ms = SystemTime::now()
801 .duration_since(UNIX_EPOCH)
802 .map(|dur| dur.as_millis())
803 .unwrap_or(0);
804
805 Self {
806 message: panic_hook_payload_to_string(info),
807 location,
808 thread_name,
809 backtrace: Backtrace::force_capture().to_string(),
810 timestamp_unix_ms,
811 crash_report_path: None,
812 }
813 }
814
815 pub fn message(&self) -> &str {
816 &self.message
817 }
818
819 pub fn location(&self) -> Option<&str> {
820 self.location.as_deref()
821 }
822
823 pub fn thread_name(&self) -> Option<&str> {
824 self.thread_name.as_deref()
825 }
826
827 pub fn backtrace(&self) -> &str {
828 &self.backtrace
829 }
830
831 pub fn timestamp_unix_ms(&self) -> u128 {
832 self.timestamp_unix_ms
833 }
834
835 pub fn crash_report_path(&self) -> Option<&str> {
836 self.crash_report_path.as_deref()
837 }
838
839 pub fn summary(&self) -> String {
840 match self.location() {
841 Some(location) => format!("panic at {location}: {}", self.message()),
842 None => format!("panic: {}", self.message()),
843 }
844 }
845}
846
847#[cfg(feature = "std")]
848#[derive(Clone, Copy, Debug, PartialEq, Eq)]
849enum PanicHookRegistrationKind {
850 Cleanup,
851 Action,
852}
853
854#[cfg(feature = "std")]
855#[derive(Clone)]
856struct RegisteredPanicCleanup {
857 id: usize,
858 callback: PanicCleanupCallback,
859}
860
861#[cfg(feature = "std")]
862#[derive(Clone)]
863struct RegisteredPanicAction {
864 id: usize,
865 callback: PanicActionCallback,
866}
867
868#[cfg(feature = "std")]
869#[derive(Default)]
870struct PanicHookRegistry {
871 cleanup_callbacks: StdMutex<Vec<RegisteredPanicCleanup>>,
872 action_callbacks: StdMutex<Vec<RegisteredPanicAction>>,
873}
874
875#[cfg(feature = "std")]
876#[derive(Debug)]
877pub struct PanicHookRegistration {
878 id: usize,
879 kind: PanicHookRegistrationKind,
880}
881
882#[cfg(feature = "std")]
883impl Drop for PanicHookRegistration {
884 fn drop(&mut self) {
885 unregister_panic_hook(self.kind, self.id);
886 }
887}
888
889#[cfg(feature = "std")]
890static PANIC_HOOK_REGISTRY: OnceLock<PanicHookRegistry> = OnceLock::new();
891#[cfg(feature = "std")]
892static PANIC_HOOK_INSTALL_ONCE: OnceLock<()> = OnceLock::new();
893#[cfg(feature = "std")]
894static PANIC_HOOK_REGISTRATION_ID: AtomicUsize = AtomicUsize::new(1);
895#[cfg(feature = "std")]
896static PANIC_HOOK_ACTIVE_COUNT: AtomicUsize = AtomicUsize::new(0);
897
898#[cfg(feature = "std")]
899fn panic_hook_registry() -> &'static PanicHookRegistry {
900 PANIC_HOOK_REGISTRY.get_or_init(PanicHookRegistry::default)
901}
902
903#[cfg(feature = "std")]
904fn ensure_runtime_panic_hook_installed() {
905 let _ = PANIC_HOOK_INSTALL_ONCE.get_or_init(|| {
906 std::panic::set_hook(Box::new(move |info| {
907 let _guard = PanicHookActiveGuard::new();
908 let mut report = PanicReport::capture(info);
909 run_panic_cleanup_callbacks(&report);
910 report.crash_report_path = write_panic_report_to_file(&report);
911 emit_panic_report(&report);
912
913 if let Some(exit_code) = run_panic_action_callbacks(&report) {
914 std::process::exit(exit_code);
915 }
916 }));
917 });
918}
919
920#[cfg(feature = "std")]
921struct PanicHookActiveGuard;
922
923#[cfg(feature = "std")]
924impl PanicHookActiveGuard {
925 fn new() -> Self {
926 PANIC_HOOK_ACTIVE_COUNT.fetch_add(1, Ordering::SeqCst);
927 Self
928 }
929}
930
931#[cfg(feature = "std")]
932impl Drop for PanicHookActiveGuard {
933 fn drop(&mut self) {
934 PANIC_HOOK_ACTIVE_COUNT.fetch_sub(1, Ordering::SeqCst);
935 }
936}
937
938#[cfg(feature = "std")]
939pub fn runtime_panic_hook_active() -> bool {
940 PANIC_HOOK_ACTIVE_COUNT.load(Ordering::SeqCst) > 0
941}
942
943#[cfg(not(feature = "std"))]
944pub const fn runtime_panic_hook_active() -> bool {
945 false
946}
947
948#[cfg(feature = "std")]
949fn register_panic_cleanup<F>(callback: F) -> PanicHookRegistration
950where
951 F: Fn(&PanicReport) + Send + Sync + 'static,
952{
953 let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
954 let callback = Arc::new(callback) as PanicCleanupCallback;
955 let mut callbacks = panic_hook_registry()
956 .cleanup_callbacks
957 .lock()
958 .unwrap_or_else(|poison| poison.into_inner());
959 callbacks.push(RegisteredPanicCleanup { id, callback });
960 PanicHookRegistration {
961 id,
962 kind: PanicHookRegistrationKind::Cleanup,
963 }
964}
965
966#[cfg(feature = "std")]
967fn register_panic_action<F>(callback: F) -> PanicHookRegistration
968where
969 F: Fn(&PanicReport) -> Option<i32> + Send + Sync + 'static,
970{
971 let id = PANIC_HOOK_REGISTRATION_ID.fetch_add(1, Ordering::Relaxed);
972 let callback = Arc::new(callback) as PanicActionCallback;
973 let mut callbacks = panic_hook_registry()
974 .action_callbacks
975 .lock()
976 .unwrap_or_else(|poison| poison.into_inner());
977 callbacks.push(RegisteredPanicAction { id, callback });
978 PanicHookRegistration {
979 id,
980 kind: PanicHookRegistrationKind::Action,
981 }
982}
983
984#[cfg(feature = "std")]
985fn unregister_panic_hook(kind: PanicHookRegistrationKind, id: usize) {
986 let registry = panic_hook_registry();
987 match kind {
988 PanicHookRegistrationKind::Cleanup => {
989 let mut callbacks = registry
990 .cleanup_callbacks
991 .lock()
992 .unwrap_or_else(|poison| poison.into_inner());
993 callbacks.retain(|entry| entry.id != id);
994 }
995 PanicHookRegistrationKind::Action => {
996 let mut callbacks = registry
997 .action_callbacks
998 .lock()
999 .unwrap_or_else(|poison| poison.into_inner());
1000 callbacks.retain(|entry| entry.id != id);
1001 }
1002 }
1003}
1004
1005#[cfg(feature = "std")]
1006fn run_panic_cleanup_callbacks(report: &PanicReport) {
1007 let callbacks = panic_hook_registry()
1008 .cleanup_callbacks
1009 .lock()
1010 .unwrap_or_else(|poison| poison.into_inner())
1011 .clone();
1012 for entry in callbacks {
1013 (entry.callback)(report);
1014 }
1015}
1016
1017#[cfg(feature = "std")]
1018fn run_panic_action_callbacks(report: &PanicReport) -> Option<i32> {
1019 let callbacks = panic_hook_registry()
1020 .action_callbacks
1021 .lock()
1022 .unwrap_or_else(|poison| poison.into_inner())
1023 .clone();
1024 let mut exit_code = None;
1025 for entry in callbacks {
1026 if exit_code.is_none() {
1027 exit_code = (entry.callback)(report);
1028 } else {
1029 let _ = (entry.callback)(report);
1030 }
1031 }
1032 exit_code
1033}
1034
1035#[cfg(feature = "std")]
1036fn panic_hook_payload_to_string(info: &PanicHookInfo<'_>) -> String {
1037 if let Some(msg) = info.payload().downcast_ref::<&str>() {
1038 (*msg).to_string()
1039 } else if let Some(msg) = info.payload().downcast_ref::<String>() {
1040 msg.clone()
1041 } else {
1042 "panic with non-string payload".to_string()
1043 }
1044}
1045
1046#[cfg(feature = "std")]
1047fn render_panic_report(report: &PanicReport) -> String {
1048 let mut rendered = String::from("Copper panic\n");
1049 rendered.push_str(&format!("time_unix_ms: {}\n", report.timestamp_unix_ms()));
1050 rendered.push_str(&format!(
1051 "thread: {}\n",
1052 report.thread_name().unwrap_or("<unnamed>")
1053 ));
1054 if let Some(location) = report.location() {
1055 rendered.push_str(&format!("location: {location}\n"));
1056 }
1057 rendered.push_str(&format!("message: {}\n", report.message()));
1058 if let Some(path) = report.crash_report_path() {
1059 rendered.push_str(&format!("crash_report: {path}\n"));
1060 }
1061 rendered.push_str("\nBacktrace:\n");
1062 rendered.push_str(report.backtrace());
1063 if !report.backtrace().ends_with('\n') {
1064 rendered.push('\n');
1065 }
1066 rendered
1067}
1068
1069#[cfg(feature = "std")]
1070fn emit_panic_report(report: &PanicReport) {
1071 let mut stderr = std::io::stderr().lock();
1072 let _ = stderr.write_all(render_panic_report(report).as_bytes());
1073 let _ = stderr.flush();
1074}
1075
1076#[cfg(feature = "std")]
1077fn write_panic_report_to_file(report: &PanicReport) -> Option<String> {
1078 let cwd = std::env::current_dir().ok()?;
1079 let file_name = format!(
1080 "copper-crash-{}-{}.txt",
1081 report.timestamp_unix_ms(),
1082 std::process::id()
1083 );
1084 let path = cwd.join(file_name);
1085 let path_string = path.to_string_lossy().to_string();
1086 let mut file = File::create(&path).ok()?;
1087 let mut report_with_path = report.clone();
1088 report_with_path.crash_report_path = Some(path_string.clone());
1089 file.write_all(render_panic_report(&report_with_path).as_bytes())
1090 .ok()?;
1091 file.flush().ok()?;
1092 Some(path_string)
1093}
1094
1095#[derive(Debug)]
1097pub enum Decision {
1098 Abort, Ignore, Shutdown, }
1102
1103fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
1104 use Decision::{Abort, Ignore, Shutdown};
1105 match (lhs, rhs) {
1108 (Shutdown, _) | (_, Shutdown) => Shutdown,
1109 (Abort, _) | (_, Abort) => Abort,
1110 _ => Ignore,
1111 }
1112}
1113
1114#[derive(Debug, Clone)]
1115pub struct MonitorNode {
1116 pub id: String,
1117 pub type_name: Option<String>,
1118 pub kind: ComponentType,
1119 pub inputs: Vec<String>,
1121 pub outputs: Vec<String>,
1123}
1124
1125#[derive(Debug, Clone)]
1126pub struct MonitorConnection {
1127 pub src: String,
1128 pub src_port: Option<String>,
1129 pub dst: String,
1130 pub dst_port: Option<String>,
1131 pub msg: String,
1132}
1133
1134#[derive(Debug, Clone, Default)]
1135pub struct MonitorTopology {
1136 pub nodes: Vec<MonitorNode>,
1137 pub connections: Vec<MonitorConnection>,
1138}
1139
1140#[derive(Debug, Clone, Copy, Default)]
1141pub struct CopperListInfo {
1142 pub size_bytes: usize,
1143 pub count: usize,
1144}
1145
1146impl CopperListInfo {
1147 pub const fn new(size_bytes: usize, count: usize) -> Self {
1148 Self { size_bytes, count }
1149 }
1150}
1151
1152#[derive(Debug, Clone, Copy, Default)]
1154pub struct CopperListIoStats {
1155 pub raw_culist_bytes: u64,
1160 pub handle_bytes: u64,
1166 pub encoded_culist_bytes: u64,
1168 pub keyframe_bytes: u64,
1170 pub structured_log_bytes_total: u64,
1172 pub culistid: u64,
1174}
1175
1176#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1177pub struct PayloadIoStats {
1178 pub resident_bytes: usize,
1179 pub encoded_bytes: usize,
1180 pub handle_bytes: usize,
1181}
1182
1183#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
1184pub struct CuMsgIoStats {
1185 pub present: bool,
1186 pub resident_bytes: u64,
1187 pub encoded_bytes: u64,
1188 pub handle_bytes: u64,
1189}
1190
1191struct CuMsgIoEntry {
1192 present: PortableAtomicBool,
1193 resident_bytes: PortableAtomicU64,
1194 encoded_bytes: PortableAtomicU64,
1195 handle_bytes: PortableAtomicU64,
1196}
1197
1198impl CuMsgIoEntry {
1199 fn clear(&self) {
1200 self.present.store(false, PortableOrdering::Release);
1201 self.resident_bytes.store(0, PortableOrdering::Relaxed);
1202 self.encoded_bytes.store(0, PortableOrdering::Relaxed);
1203 self.handle_bytes.store(0, PortableOrdering::Relaxed);
1204 }
1205
1206 fn get(&self) -> CuMsgIoStats {
1207 if !self.present.load(PortableOrdering::Acquire) {
1208 return CuMsgIoStats::default();
1209 }
1210
1211 CuMsgIoStats {
1212 present: true,
1213 resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
1214 encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
1215 handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
1216 }
1217 }
1218
1219 fn set(&self, stats: CuMsgIoStats) {
1220 self.resident_bytes
1221 .store(stats.resident_bytes, PortableOrdering::Relaxed);
1222 self.encoded_bytes
1223 .store(stats.encoded_bytes, PortableOrdering::Relaxed);
1224 self.handle_bytes
1225 .store(stats.handle_bytes, PortableOrdering::Relaxed);
1226 self.present.store(stats.present, PortableOrdering::Release);
1227 }
1228}
1229
1230impl Default for CuMsgIoEntry {
1231 fn default() -> Self {
1232 Self {
1233 present: PortableAtomicBool::new(false),
1234 resident_bytes: PortableAtomicU64::new(0),
1235 encoded_bytes: PortableAtomicU64::new(0),
1236 handle_bytes: PortableAtomicU64::new(0),
1237 }
1238 }
1239}
1240
1241pub struct CuMsgIoCache<const N: usize> {
1242 entries: [CuMsgIoEntry; N],
1243}
1244
1245impl<const N: usize> CuMsgIoCache<N> {
1246 pub fn clear(&self) {
1247 for entry in &self.entries {
1248 entry.clear();
1249 }
1250 }
1251
1252 pub fn get(&self, idx: usize) -> CuMsgIoStats {
1253 self.entries[idx].get()
1254 }
1255
1256 fn raw_parts(&self) -> (usize, usize) {
1257 (self.entries.as_ptr() as usize, N)
1258 }
1259}
1260
1261impl<const N: usize> Default for CuMsgIoCache<N> {
1262 fn default() -> Self {
1263 Self {
1264 entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
1265 }
1266 }
1267}
1268
1269#[derive(Clone, Copy)]
1270struct ActiveCuMsgIoCapture {
1271 cache_addr: usize,
1272 cache_len: usize,
1273 current_slot: Option<usize>,
1274}
1275
1276#[cfg(feature = "std")]
1277thread_local! {
1278 static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
1279 static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
1280 static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
1281}
1282
1283#[cfg(not(feature = "std"))]
1284static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
1285#[cfg(not(feature = "std"))]
1286static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
1287#[cfg(not(feature = "std"))]
1288static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
1289
1290fn begin_payload_io_measurement() {
1291 #[cfg(feature = "std")]
1292 PAYLOAD_HANDLE_BYTES.with(|bytes| {
1293 debug_assert!(
1294 bytes.get().is_none(),
1295 "payload IO byte measurement must not be nested"
1296 );
1297 bytes.set(Some(0));
1298 });
1299
1300 #[cfg(not(feature = "std"))]
1301 {
1302 let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
1303 debug_assert!(
1304 bytes.is_none(),
1305 "payload IO byte measurement must not be nested"
1306 );
1307 *bytes = Some(0);
1308 }
1309}
1310
1311fn finish_payload_io_measurement() -> usize {
1312 #[cfg(feature = "std")]
1313 {
1314 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
1315 }
1316
1317 #[cfg(not(feature = "std"))]
1318 {
1319 PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
1320 }
1321}
1322
1323fn abort_payload_io_measurement() {
1324 #[cfg(feature = "std")]
1325 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
1326
1327 #[cfg(not(feature = "std"))]
1328 {
1329 *PAYLOAD_HANDLE_BYTES.lock() = None;
1330 }
1331}
1332
1333fn current_payload_io_measurement() -> usize {
1334 #[cfg(feature = "std")]
1335 {
1336 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
1337 }
1338
1339 #[cfg(not(feature = "std"))]
1340 {
1341 PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
1342 }
1343}
1344
1345#[cfg(feature = "std")]
1346pub(crate) fn record_payload_handle_bytes(bytes: usize) {
1347 #[cfg(feature = "std")]
1348 PAYLOAD_HANDLE_BYTES.with(|total| {
1349 if let Some(current) = total.get() {
1350 total.set(Some(current.saturating_add(bytes)));
1351 }
1352 });
1353
1354 #[cfg(not(feature = "std"))]
1355 {
1356 let mut total = PAYLOAD_HANDLE_BYTES.lock();
1357 if let Some(current) = *total {
1358 *total = Some(current.saturating_add(bytes));
1359 }
1360 }
1361}
1362
1363fn set_last_completed_handle_bytes(bytes: u64) {
1364 #[cfg(feature = "std")]
1365 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1366
1367 #[cfg(not(feature = "std"))]
1368 {
1369 *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1370 }
1371}
1372
1373pub fn take_last_completed_handle_bytes() -> u64 {
1374 #[cfg(feature = "std")]
1375 {
1376 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1377 }
1378
1379 #[cfg(not(feature = "std"))]
1380 {
1381 let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1382 let value = *total;
1383 *total = 0;
1384 value
1385 }
1386}
1387
1388fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1389 #[cfg(feature = "std")]
1390 {
1391 ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1392 let mut state = capture.get()?;
1393 let result = f(&mut state);
1394 capture.set(Some(state));
1395 Some(result)
1396 })
1397 }
1398
1399 #[cfg(not(feature = "std"))]
1400 {
1401 let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1402 let state = capture.as_mut()?;
1403 Some(f(state))
1404 }
1405}
1406
1407pub struct CuMsgIoCaptureGuard;
1408
1409impl CuMsgIoCaptureGuard {
1410 pub fn select_slot(&self, slot: usize) {
1411 let _ = with_active_capture_mut(|capture| {
1412 debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1413 capture.current_slot = Some(slot);
1414 });
1415 }
1416}
1417
1418impl Drop for CuMsgIoCaptureGuard {
1419 fn drop(&mut self) {
1420 set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1421
1422 #[cfg(feature = "std")]
1423 ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1424
1425 #[cfg(not(feature = "std"))]
1426 {
1427 *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1428 }
1429 }
1430}
1431
1432pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1433 cache.clear();
1434 set_last_completed_handle_bytes(0);
1435 begin_payload_io_measurement();
1436 let (cache_addr, cache_len) = cache.raw_parts();
1437 let capture = ActiveCuMsgIoCapture {
1438 cache_addr,
1439 cache_len,
1440 current_slot: None,
1441 };
1442
1443 #[cfg(feature = "std")]
1444 ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1445 debug_assert!(
1446 state.get().is_none(),
1447 "CopperList payload IO capture must not be nested"
1448 );
1449 state.set(Some(capture));
1450 });
1451
1452 #[cfg(not(feature = "std"))]
1453 {
1454 let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1455 debug_assert!(
1456 state.is_none(),
1457 "CopperList payload IO capture must not be nested"
1458 );
1459 *state = Some(capture);
1460 }
1461
1462 CuMsgIoCaptureGuard
1463}
1464
1465pub(crate) fn current_payload_handle_bytes() -> usize {
1466 current_payload_io_measurement()
1467}
1468
1469pub(crate) fn record_current_slot_payload_io_stats(
1470 fixed_bytes: usize,
1471 encoded_bytes: usize,
1472 handle_bytes: usize,
1473) {
1474 let _ = with_active_capture_mut(|capture| {
1475 let Some(slot) = capture.current_slot else {
1476 return;
1477 };
1478 if slot >= capture.cache_len {
1479 return;
1480 }
1481 let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1483 let entry = unsafe { &*cache_ptr.add(slot) };
1484 entry.set(CuMsgIoStats {
1485 present: true,
1486 resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1487 encoded_bytes: encoded_bytes as u64,
1488 handle_bytes: handle_bytes as u64,
1489 });
1490 });
1491}
1492
1493pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1500where
1501 T: Encode,
1502{
1503 begin_payload_io_measurement();
1504 begin_observed_encode();
1505
1506 let result = (|| {
1507 let mut encoder =
1508 EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1509 payload.encode(&mut encoder).map_err(|e| {
1510 CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1511 })?;
1512 let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1513 debug_assert_eq!(encoded_bytes, finish_observed_encode());
1514 let handle_bytes = finish_payload_io_measurement();
1515 Ok(PayloadIoStats {
1516 resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1517 encoded_bytes,
1518 handle_bytes,
1519 })
1520 })();
1521
1522 if result.is_err() {
1523 abort_payload_io_measurement();
1524 abort_observed_encode();
1525 }
1526
1527 result
1528}
1529
1530fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1531 let Ok(msg_types) = graph.get_node_output_msg_types_by_id(node_id) else {
1532 return Vec::new();
1533 };
1534
1535 let mut outputs = Vec::new();
1536 for (port_idx, msg) in msg_types.into_iter().enumerate() {
1537 let mut port_label = String::from("out");
1538 port_label.push_str(&port_idx.to_string());
1539 port_label.push_str(": ");
1540 port_label.push_str(msg.as_str());
1541 outputs.push((msg, port_label));
1542 }
1543 outputs
1544}
1545
1546pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1548 let graph = config.get_graph(Some(mission))?;
1549 let mut nodes: Map<String, MonitorNode> = Map::new();
1550 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1551
1552 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1553 for bridge in &config.bridges {
1554 bridge_lookup.insert(bridge.id.as_str(), bridge);
1555 }
1556
1557 for (node_idx, node) in graph.get_all_nodes() {
1558 let node_id = node.get_id();
1559 let task_kind = match node.get_flavor() {
1560 Flavor::Bridge => ComponentType::Bridge,
1561 Flavor::Task => match resolve_task_kind_for_id(graph, node_idx)? {
1562 TaskKind::Source => ComponentType::Source,
1563 TaskKind::Regular => ComponentType::Task,
1564 TaskKind::Sink => ComponentType::Sink,
1565 },
1566 };
1567
1568 let mut inputs = Vec::new();
1569 let mut outputs = Vec::new();
1570 if task_kind == ComponentType::Bridge
1571 && let Some(bridge) = bridge_lookup.get(node_id.as_str())
1572 {
1573 for ch in &bridge.channels {
1574 match ch {
1575 BridgeChannelConfigRepresentation::Rx { id, .. } => outputs.push(id.clone()),
1576 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1577 }
1578 }
1579 } else {
1580 match task_kind {
1581 ComponentType::Source => {
1582 let ports = collect_output_ports(graph, node_idx);
1583 let mut port_map: Map<String, String> = Map::new();
1584 for (msg_type, label) in ports {
1585 port_map.insert(msg_type, label.clone());
1586 outputs.push(label);
1587 }
1588 output_port_lookup.insert(node_id.clone(), port_map);
1589 }
1590 ComponentType::Task => {
1591 inputs.push("in".to_string());
1592 let ports = collect_output_ports(graph, node_idx);
1593 let mut port_map: Map<String, String> = Map::new();
1594 for (msg_type, label) in ports {
1595 port_map.insert(msg_type, label.clone());
1596 outputs.push(label);
1597 }
1598 output_port_lookup.insert(node_id.clone(), port_map);
1599 }
1600 ComponentType::Sink => {
1601 inputs.push("in".to_string());
1602 }
1603 ComponentType::Bridge => unreachable!("handled above"),
1604 }
1605 }
1606
1607 nodes.insert(
1608 node_id.clone(),
1609 MonitorNode {
1610 id: node_id,
1611 type_name: Some(node.get_type().to_string()),
1612 kind: task_kind,
1613 inputs,
1614 outputs,
1615 },
1616 );
1617 }
1618
1619 let mut connections = Vec::new();
1620 for cnx in graph.edges() {
1621 let src = cnx.src.clone();
1622 let dst = cnx.dst.clone();
1623
1624 let src_port = cnx.src_channel.clone().or_else(|| {
1625 output_port_lookup
1626 .get(&src)
1627 .and_then(|ports| ports.get(&cnx.msg).cloned())
1628 .or_else(|| {
1629 nodes
1630 .get(&src)
1631 .and_then(|node| node.outputs.first().cloned())
1632 })
1633 });
1634 let dst_port = cnx.dst_channel.clone().or_else(|| {
1635 nodes
1636 .get(&dst)
1637 .and_then(|node| node.inputs.first().cloned())
1638 });
1639
1640 connections.push(MonitorConnection {
1641 src,
1642 src_port,
1643 dst,
1644 dst_port,
1645 msg: cnx.msg.clone(),
1646 });
1647 }
1648
1649 Ok(MonitorTopology {
1650 nodes: nodes.into_values().collect(),
1651 connections,
1652 })
1653}
1654
1655pub trait CuMonitor: Sized {
1675 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1681 where
1682 Self: Sized;
1683
1684 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1686 Ok(())
1687 }
1688
1689 fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1691
1692 fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1694
1695 fn process_error(
1699 &self,
1700 component_id: ComponentId,
1701 step: CuComponentState,
1702 error: &CuError,
1703 ) -> Decision;
1704
1705 fn process_panic(&self, _panic_message: &str) {}
1707
1708 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1710 Ok(())
1711 }
1712}
1713
1714pub struct NoMonitor {}
1717impl CuMonitor for NoMonitor {
1718 fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1719 Ok(NoMonitor {})
1720 }
1721
1722 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1723 #[cfg(all(feature = "std", debug_assertions))]
1724 register_live_log_listener(|entry, format_str, param_names| {
1725 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1726 let named: Map<String, String> = param_names
1727 .iter()
1728 .zip(params.iter())
1729 .map(|(k, v)| (k.to_string(), v.clone()))
1730 .collect();
1731
1732 if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1733 let ts = format_timestamp(entry.time.into());
1734 println!("{} [{:?}] {}", ts, entry.level, msg);
1735 }
1736 });
1737 Ok(())
1738 }
1739
1740 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1741 Ok(())
1743 }
1744
1745 fn process_error(
1746 &self,
1747 _component_id: ComponentId,
1748 _step: CuComponentState,
1749 _error: &CuError,
1750 ) -> Decision {
1751 Decision::Ignore
1753 }
1754
1755 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1756 #[cfg(all(feature = "std", debug_assertions))]
1757 unregister_live_log_listener();
1758 Ok(())
1759 }
1760}
1761
1762macro_rules! impl_monitor_tuple {
1763 ($($idx:tt => $name:ident),+) => {
1764 impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1765 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1766 where
1767 Self: Sized,
1768 {
1769 Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1770 }
1771
1772 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1773 $(self.$idx.start(ctx)?;)+
1774 Ok(())
1775 }
1776
1777 fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1778 $(self.$idx.process_copperlist(ctx, view)?;)+
1779 Ok(())
1780 }
1781
1782 fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1783 $(self.$idx.observe_copperlist_io(stats);)+
1784 }
1785
1786 fn process_error(
1787 &self,
1788 component_id: ComponentId,
1789 step: CuComponentState,
1790 error: &CuError,
1791 ) -> Decision {
1792 let mut decision = Decision::Ignore;
1793 $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1794 decision
1795 }
1796
1797 fn process_panic(&self, panic_message: &str) {
1798 $(self.$idx.process_panic(panic_message);)+
1799 }
1800
1801 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1802 $(self.$idx.stop(ctx)?;)+
1803 Ok(())
1804 }
1805 }
1806 };
1807}
1808
1809impl_monitor_tuple!(0 => M0, 1 => M1);
1810impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1811impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1812impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1813impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1814
1815#[cfg(feature = "std")]
1816pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1817 if let Some(msg) = payload.downcast_ref::<&str>() {
1818 (*msg).to_string()
1819 } else if let Some(msg) = payload.downcast_ref::<String>() {
1820 msg.clone()
1821 } else {
1822 "panic with non-string payload".to_string()
1823 }
1824}
1825
1826pub struct CountingAlloc<A: GlobalAlloc> {
1828 inner: A,
1829 allocated: AtomicUsize,
1830 deallocated: AtomicUsize,
1831}
1832
1833impl<A: GlobalAlloc> CountingAlloc<A> {
1834 pub const fn new(inner: A) -> Self {
1835 CountingAlloc {
1836 inner,
1837 allocated: AtomicUsize::new(0),
1838 deallocated: AtomicUsize::new(0),
1839 }
1840 }
1841
1842 pub fn allocated(&self) -> usize {
1843 self.allocated.load(Ordering::SeqCst)
1844 }
1845
1846 pub fn deallocated(&self) -> usize {
1847 self.deallocated.load(Ordering::SeqCst)
1848 }
1849
1850 pub fn reset(&self) {
1851 self.allocated.store(0, Ordering::SeqCst);
1852 self.deallocated.store(0, Ordering::SeqCst);
1853 }
1854}
1855
1856unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1858 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1860 let p = unsafe { self.inner.alloc(layout) };
1862 if !p.is_null() {
1863 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1864 }
1865 p
1866 }
1867
1868 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1870 unsafe { self.inner.dealloc(ptr, layout) }
1872 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1873 }
1874}
1875
1876#[cfg(feature = "memory_monitoring")]
1878pub struct ScopedAllocCounter {
1879 bf_allocated: usize,
1880 bf_deallocated: usize,
1881}
1882
1883#[cfg(feature = "memory_monitoring")]
1884impl Default for ScopedAllocCounter {
1885 fn default() -> Self {
1886 Self::new()
1887 }
1888}
1889
1890#[cfg(feature = "memory_monitoring")]
1891impl ScopedAllocCounter {
1892 pub fn new() -> Self {
1893 ScopedAllocCounter {
1894 bf_allocated: GLOBAL.allocated(),
1895 bf_deallocated: GLOBAL.deallocated(),
1896 }
1897 }
1898
1899 pub fn allocated(&self) -> usize {
1911 GLOBAL.allocated() - self.bf_allocated
1912 }
1913
1914 pub fn deallocated(&self) -> usize {
1927 GLOBAL.deallocated() - self.bf_deallocated
1928 }
1929}
1930
1931#[cfg(feature = "memory_monitoring")]
1933impl Drop for ScopedAllocCounter {
1934 fn drop(&mut self) {
1935 let _allocated = GLOBAL.allocated() - self.bf_allocated;
1936 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1937 }
1944}
1945
1946#[cfg(feature = "std")]
1947const BUCKET_COUNT: usize = 1024;
1948#[cfg(not(feature = "std"))]
1949const BUCKET_COUNT: usize = 256;
1950
1951#[derive(Debug, Clone)]
1954pub struct LiveStatistics {
1955 buckets: [u64; BUCKET_COUNT],
1956 min_val: u64,
1957 max_val: u64,
1958 sum: u128,
1959 sum_sq: u128,
1960 count: u64,
1961 max_value: u64,
1962}
1963
1964impl LiveStatistics {
1965 pub fn new_with_max(max_value: u64) -> Self {
1985 LiveStatistics {
1986 buckets: [0; BUCKET_COUNT],
1987 min_val: u64::MAX,
1988 max_val: 0,
1989 sum: 0,
1990 sum_sq: 0,
1991 count: 0,
1992 max_value,
1993 }
1994 }
1995
1996 #[inline]
1997 fn value_to_bucket(&self, value: u64) -> usize {
1998 if value >= self.max_value {
1999 BUCKET_COUNT - 1
2000 } else {
2001 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
2002 }
2003 }
2004
2005 #[inline]
2006 pub fn min(&self) -> u64 {
2007 if self.count == 0 { 0 } else { self.min_val }
2008 }
2009
2010 #[inline]
2011 pub fn max(&self) -> u64 {
2012 self.max_val
2013 }
2014
2015 #[inline]
2016 pub fn mean(&self) -> f64 {
2017 if self.count == 0 {
2018 0.0
2019 } else {
2020 self.sum as f64 / self.count as f64
2021 }
2022 }
2023
2024 #[inline]
2025 pub fn stdev(&self) -> f64 {
2026 if self.count == 0 {
2027 return 0.0;
2028 }
2029 let mean = self.mean();
2030 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
2031 if variance < 0.0 {
2032 return 0.0;
2033 }
2034 #[cfg(feature = "std")]
2035 return variance.sqrt();
2036 #[cfg(not(feature = "std"))]
2037 return sqrt(variance);
2038 }
2039
2040 #[inline]
2041 pub fn percentile(&self, percentile: f64) -> u64 {
2042 if self.count == 0 {
2043 return 0;
2044 }
2045
2046 let target_count = (self.count as f64 * percentile) as u64;
2047 let mut accumulated = 0u64;
2048
2049 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
2050 accumulated += bucket_count;
2051 if accumulated >= target_count {
2052 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
2054 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
2055 let bucket_fraction = if bucket_count > 0 {
2056 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
2057 } else {
2058 0.5
2059 };
2060 return bucket_start
2061 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
2062 }
2063 }
2064
2065 self.max_val
2066 }
2067
2068 #[inline]
2070 pub fn record(&mut self, value: u64) {
2071 if value < self.min_val {
2072 self.min_val = value;
2073 }
2074 if value > self.max_val {
2075 self.max_val = value;
2076 }
2077 let value_u128 = value as u128;
2078 self.sum += value_u128;
2079 self.sum_sq += value_u128 * value_u128;
2080 self.count += 1;
2081
2082 let bucket = self.value_to_bucket(value);
2083 self.buckets[bucket] += 1;
2084 }
2085
2086 #[inline]
2087 pub fn len(&self) -> u64 {
2088 self.count
2089 }
2090
2091 #[inline]
2092 pub fn is_empty(&self) -> bool {
2093 self.count == 0
2094 }
2095
2096 #[inline]
2097 pub fn reset(&mut self) {
2098 self.buckets.fill(0);
2099 self.min_val = u64::MAX;
2100 self.max_val = 0;
2101 self.sum = 0;
2102 self.sum_sq = 0;
2103 self.count = 0;
2104 }
2105}
2106
2107#[derive(Debug, Clone)]
2110pub struct CuDurationStatistics {
2111 bare: LiveStatistics,
2112 jitter: LiveStatistics,
2113 last_value: CuDuration,
2114}
2115
2116impl CuDurationStatistics {
2117 pub fn new(max: CuDuration) -> Self {
2118 let CuDuration(max) = max;
2119 CuDurationStatistics {
2120 bare: LiveStatistics::new_with_max(max),
2121 jitter: LiveStatistics::new_with_max(max),
2122 last_value: CuDuration::default(),
2123 }
2124 }
2125
2126 #[inline]
2127 pub fn min(&self) -> CuDuration {
2128 CuDuration(self.bare.min())
2129 }
2130
2131 #[inline]
2132 pub fn max(&self) -> CuDuration {
2133 CuDuration(self.bare.max())
2134 }
2135
2136 #[inline]
2137 pub fn mean(&self) -> CuDuration {
2138 CuDuration(self.bare.mean() as u64) }
2140
2141 #[inline]
2142 pub fn percentile(&self, percentile: f64) -> CuDuration {
2143 CuDuration(self.bare.percentile(percentile))
2144 }
2145
2146 #[inline]
2147 pub fn stddev(&self) -> CuDuration {
2148 CuDuration(self.bare.stdev() as u64)
2149 }
2150
2151 #[inline]
2152 pub fn len(&self) -> u64 {
2153 self.bare.len()
2154 }
2155
2156 #[inline]
2157 pub fn is_empty(&self) -> bool {
2158 self.bare.len() == 0
2159 }
2160
2161 #[inline]
2162 pub fn jitter_min(&self) -> CuDuration {
2163 CuDuration(self.jitter.min())
2164 }
2165
2166 #[inline]
2167 pub fn jitter_max(&self) -> CuDuration {
2168 CuDuration(self.jitter.max())
2169 }
2170
2171 #[inline]
2172 pub fn jitter_mean(&self) -> CuDuration {
2173 CuDuration(self.jitter.mean() as u64)
2174 }
2175
2176 #[inline]
2177 pub fn jitter_stddev(&self) -> CuDuration {
2178 CuDuration(self.jitter.stdev() as u64)
2179 }
2180
2181 #[inline]
2182 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
2183 CuDuration(self.jitter.percentile(percentile))
2184 }
2185
2186 #[inline]
2187 pub fn record(&mut self, value: CuDuration) {
2188 let CuDuration(nanos) = value;
2189 if self.bare.is_empty() {
2190 self.bare.record(nanos);
2191 self.last_value = value;
2192 return;
2193 }
2194 self.bare.record(nanos);
2195 let CuDuration(last_nanos) = self.last_value;
2196 self.jitter.record(nanos.abs_diff(last_nanos));
2197 self.last_value = value;
2198 }
2199
2200 #[inline]
2201 pub fn reset(&mut self) {
2202 self.bare.reset();
2203 self.jitter.reset();
2204 }
2205}
2206
2207#[cfg(test)]
2208mod tests {
2209 use super::*;
2210 use core::sync::atomic::{AtomicUsize, Ordering};
2211
2212 #[derive(Clone, Copy)]
2213 enum TestDecision {
2214 Ignore,
2215 Abort,
2216 Shutdown,
2217 }
2218
2219 struct TestMonitor {
2220 decision: TestDecision,
2221 copperlist_calls: AtomicUsize,
2222 panic_calls: AtomicUsize,
2223 }
2224
2225 impl TestMonitor {
2226 fn new_with(decision: TestDecision) -> Self {
2227 Self {
2228 decision,
2229 copperlist_calls: AtomicUsize::new(0),
2230 panic_calls: AtomicUsize::new(0),
2231 }
2232 }
2233 }
2234
2235 fn test_metadata() -> CuMonitoringMetadata {
2236 const COMPONENTS: &[MonitorComponentMetadata] = &[
2237 MonitorComponentMetadata::new("a", ComponentType::Task, None),
2238 MonitorComponentMetadata::new("b", ComponentType::Task, None),
2239 ];
2240 CuMonitoringMetadata::new(
2241 CompactString::from(crate::config::DEFAULT_MISSION_ID),
2242 COMPONENTS,
2243 &[],
2244 CopperListInfo::new(0, 0),
2245 MonitorTopology::default(),
2246 None,
2247 )
2248 .expect("test metadata should be valid")
2249 }
2250
2251 impl CuMonitor for TestMonitor {
2252 fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
2253 let monitor = Self::new_with(TestDecision::Ignore);
2254 #[cfg(feature = "std")]
2255 let _ = runtime.execution_probe();
2256 Ok(monitor)
2257 }
2258
2259 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
2260 self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
2261 Ok(())
2262 }
2263
2264 fn process_error(
2265 &self,
2266 _component_id: ComponentId,
2267 _step: CuComponentState,
2268 _error: &CuError,
2269 ) -> Decision {
2270 match self.decision {
2271 TestDecision::Ignore => Decision::Ignore,
2272 TestDecision::Abort => Decision::Abort,
2273 TestDecision::Shutdown => Decision::Shutdown,
2274 }
2275 }
2276
2277 fn process_panic(&self, _panic_message: &str) {
2278 self.panic_calls.fetch_add(1, Ordering::SeqCst);
2279 }
2280 }
2281
2282 #[test]
2283 fn test_live_statistics_percentiles() {
2284 let mut stats = LiveStatistics::new_with_max(1000);
2285
2286 for i in 0..100 {
2288 stats.record(i);
2289 }
2290
2291 assert_eq!(stats.len(), 100);
2292 assert_eq!(stats.min(), 0);
2293 assert_eq!(stats.max(), 99);
2294 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
2298 let p90 = stats.percentile(0.90);
2299 let p95 = stats.percentile(0.95);
2300 let p99 = stats.percentile(0.99);
2301
2302 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
2304 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
2305 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
2306 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
2307 }
2308
2309 #[test]
2310 fn test_duration_stats() {
2311 let mut stats = CuDurationStatistics::new(CuDuration(1000));
2312 stats.record(CuDuration(100));
2313 stats.record(CuDuration(200));
2314 stats.record(CuDuration(500));
2315 stats.record(CuDuration(400));
2316 assert_eq!(stats.min(), CuDuration(100));
2317 assert_eq!(stats.max(), CuDuration(500));
2318 assert_eq!(stats.mean(), CuDuration(300));
2319 assert_eq!(stats.len(), 4);
2320 assert_eq!(stats.jitter.len(), 3);
2321 assert_eq!(stats.jitter_min(), CuDuration(100));
2322 assert_eq!(stats.jitter_max(), CuDuration(300));
2323 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
2324 stats.reset();
2325 assert_eq!(stats.len(), 0);
2326 }
2327
2328 #[test]
2329 fn test_duration_stats_large_samples_do_not_overflow() {
2330 let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
2331 stats.record(CuDuration(5_000_000_000));
2332 stats.record(CuDuration(8_000_000_000));
2333
2334 assert_eq!(stats.min(), CuDuration(5_000_000_000));
2335 assert_eq!(stats.max(), CuDuration(8_000_000_000));
2336 assert_eq!(stats.mean(), CuDuration(6_500_000_000));
2337 assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
2338 assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
2339 }
2340
2341 #[test]
2342 fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
2343 let err = CuError::from("boom");
2344
2345 let two = (
2346 TestMonitor::new_with(TestDecision::Ignore),
2347 TestMonitor::new_with(TestDecision::Shutdown),
2348 );
2349 assert!(matches!(
2350 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2351 Decision::Shutdown
2352 ));
2353
2354 let two = (
2355 TestMonitor::new_with(TestDecision::Ignore),
2356 TestMonitor::new_with(TestDecision::Abort),
2357 );
2358 assert!(matches!(
2359 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2360 Decision::Abort
2361 ));
2362 }
2363
2364 #[test]
2365 fn tuple_monitor_fans_out_callbacks() {
2366 let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2367 test_metadata(),
2368 CuMonitoringRuntime::unavailable(),
2369 )
2370 .expect("tuple new");
2371 let (ctx, _clock_control) = CuContext::new_mock_clock();
2372 let empty_view = test_metadata().layout().view(&[]);
2373 monitors
2374 .process_copperlist(&ctx, empty_view)
2375 .expect("process_copperlist should fan out");
2376 monitors.process_panic("panic marker");
2377
2378 assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2379 assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2380 assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2381 assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2382 }
2383
2384 fn encoded_size<E: Encode>(value: &E) -> usize {
2385 let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2386 value
2387 .encode(&mut encoder)
2388 .expect("size measurement encoder should not fail");
2389 encoder.into_writer().bytes_written
2390 }
2391
2392 #[test]
2393 fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2394 let payload = vec![1u8, 2, 3, 4];
2395 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2396
2397 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2398 assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2399 assert_eq!(io.handle_bytes, 0);
2400 }
2401
2402 #[test]
2403 fn payload_io_stats_tracks_handle_backed_storage() {
2404 let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2405 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2406
2407 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2408 assert_eq!(
2409 io.resident_bytes,
2410 core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2411 );
2412 assert_eq!(io.handle_bytes, 32);
2413 }
2414
2415 #[test]
2416 fn runtime_execution_probe_roundtrip_marker() {
2417 let probe = RuntimeExecutionProbe::default();
2418 assert!(probe.marker().is_none());
2419 assert_eq!(probe.sequence(), 0);
2420
2421 probe.record(ExecutionMarker {
2422 component_id: ComponentId::new(7),
2423 step: CuComponentState::Process,
2424 culistid: Some(42),
2425 });
2426
2427 let marker = probe.marker().expect("marker should be available");
2428 assert_eq!(marker.component_id, ComponentId::new(7));
2429 assert!(matches!(marker.step, CuComponentState::Process));
2430 assert_eq!(marker.culistid, Some(42));
2431 assert_eq!(probe.sequence(), 1);
2432 }
2433}