1use crate::config::CuConfig;
5use crate::config::{
6 BridgeChannelConfigRepresentation, BridgeConfig, ComponentConfig, CuGraph, Flavor, NodeId,
7};
8use crate::context::CuContext;
9use crate::cutask::CuMsgMetadata;
10use bincode::Encode;
11use bincode::config::standard;
12use bincode::enc::EncoderImpl;
13use bincode::enc::write::SizeWriter;
14use compact_str::CompactString;
15use cu29_clock::CuDuration;
16#[allow(unused_imports)]
17use cu29_log::CuLogLevel;
18#[cfg(all(feature = "std", debug_assertions))]
19use cu29_log_runtime::{
20 format_message_only, register_live_log_listener, unregister_live_log_listener,
21};
22use cu29_traits::{
23 CuError, CuResult, ObservedWriter, abort_observed_encode, begin_observed_encode,
24 finish_observed_encode,
25};
26use portable_atomic::{
27 AtomicBool as PortableAtomicBool, AtomicU64 as PortableAtomicU64, Ordering as PortableOrdering,
28};
29use serde_derive::{Deserialize, Serialize};
30
31#[cfg(not(feature = "std"))]
32extern crate alloc;
33
34#[cfg(feature = "std")]
35use core::cell::Cell;
36#[cfg(feature = "std")]
37use std::sync::Arc;
38#[cfg(feature = "std")]
39use std::thread_local;
40#[cfg(feature = "std")]
41use std::{collections::HashMap as Map, string::String, string::ToString, vec::Vec};
42
43#[cfg(not(feature = "std"))]
44use alloc::{collections::BTreeMap as Map, string::String, string::ToString, vec::Vec};
45#[cfg(not(target_has_atomic = "64"))]
46use spin::Mutex;
47#[cfg(not(feature = "std"))]
48use spin::Mutex as SpinMutex;
49
50#[cfg(not(feature = "std"))]
51mod imp {
52 pub use alloc::alloc::{GlobalAlloc, Layout};
53 #[cfg(target_has_atomic = "64")]
54 pub use core::sync::atomic::AtomicU64;
55 pub use core::sync::atomic::{AtomicUsize, Ordering};
56 pub use libm::sqrt;
57}
58
59#[cfg(feature = "std")]
60mod imp {
61 #[cfg(feature = "memory_monitoring")]
62 use super::CountingAlloc;
63 #[cfg(feature = "memory_monitoring")]
64 pub use std::alloc::System;
65 pub use std::alloc::{GlobalAlloc, Layout};
66 #[cfg(target_has_atomic = "64")]
67 pub use std::sync::atomic::AtomicU64;
68 pub use std::sync::atomic::{AtomicUsize, Ordering};
69 #[cfg(feature = "memory_monitoring")]
70 #[global_allocator]
71 pub static GLOBAL: CountingAlloc<System> = CountingAlloc::new(System);
72}
73
74use imp::*;
75
76#[cfg(all(feature = "std", debug_assertions))]
77fn format_timestamp(time: CuDuration) -> String {
78 let nanos = time.as_nanos();
80 let total_seconds = nanos / 1_000_000_000;
81 let hours = total_seconds / 3600;
82 let minutes = (total_seconds / 60) % 60;
83 let seconds = total_seconds % 60;
84 let fractional_1e4 = (nanos % 1_000_000_000) / 100_000;
85 format!("{hours:02}:{minutes:02}:{seconds:02}.{fractional_1e4:04}")
86}
87
88#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
90pub enum CuComponentState {
91 Start,
92 Preprocess,
93 Process,
94 Postprocess,
95 Stop,
96}
97
98#[repr(transparent)]
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
101pub struct ComponentId(usize);
102
103impl ComponentId {
104 pub const INVALID: Self = Self(usize::MAX);
105
106 #[inline]
107 pub const fn new(index: usize) -> Self {
108 Self(index)
109 }
110
111 #[inline]
112 pub const fn index(self) -> usize {
113 self.0
114 }
115}
116
117impl core::fmt::Display for ComponentId {
118 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
119 self.0.fmt(f)
120 }
121}
122
123impl From<ComponentId> for usize {
124 fn from(value: ComponentId) -> Self {
125 value.index()
126 }
127}
128
129#[repr(transparent)]
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
132pub struct CuListSlot(usize);
133
134impl CuListSlot {
135 #[inline]
136 pub const fn new(index: usize) -> Self {
137 Self(index)
138 }
139
140 #[inline]
141 pub const fn index(self) -> usize {
142 self.0
143 }
144}
145
146impl From<CuListSlot> for usize {
147 fn from(value: CuListSlot) -> Self {
148 value.index()
149 }
150}
151
152#[derive(Debug, Clone, Copy)]
156pub struct CopperListLayout {
157 components: &'static [MonitorComponentMetadata],
158 slot_to_component: &'static [ComponentId],
159}
160
161impl CopperListLayout {
162 #[inline]
163 pub const fn new(
164 components: &'static [MonitorComponentMetadata],
165 slot_to_component: &'static [ComponentId],
166 ) -> Self {
167 Self {
168 components,
169 slot_to_component,
170 }
171 }
172
173 #[inline]
174 pub const fn components(self) -> &'static [MonitorComponentMetadata] {
175 self.components
176 }
177
178 #[inline]
179 pub const fn component_count(self) -> usize {
180 self.components.len()
181 }
182
183 #[inline]
184 pub const fn culist_slot_count(self) -> usize {
185 self.slot_to_component.len()
186 }
187
188 #[inline]
189 pub fn component(self, id: ComponentId) -> &'static MonitorComponentMetadata {
190 &self.components[id.index()]
191 }
192
193 #[inline]
194 pub fn component_for_slot(self, culist_slot: CuListSlot) -> ComponentId {
195 self.slot_to_component[culist_slot.index()]
196 }
197
198 #[inline]
199 pub const fn slot_to_component(self) -> &'static [ComponentId] {
200 self.slot_to_component
201 }
202
203 #[inline]
204 pub fn view<'a>(self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
205 CopperListView::new(self, msgs)
206 }
207}
208
209#[derive(Debug, Clone, Copy)]
211pub struct CopperListView<'a> {
212 layout: CopperListLayout,
213 msgs: &'a [&'a CuMsgMetadata],
214}
215
216impl<'a> CopperListView<'a> {
217 #[inline]
218 pub fn new(layout: CopperListLayout, msgs: &'a [&'a CuMsgMetadata]) -> Self {
219 assert_eq!(
220 msgs.len(),
221 layout.culist_slot_count(),
222 "invalid monitor CopperList view: msgs len {} != slot mapping len {}",
223 msgs.len(),
224 layout.culist_slot_count()
225 );
226 Self { layout, msgs }
227 }
228
229 #[inline]
230 pub const fn layout(self) -> CopperListLayout {
231 self.layout
232 }
233
234 #[inline]
235 pub const fn msgs(self) -> &'a [&'a CuMsgMetadata] {
236 self.msgs
237 }
238
239 #[inline]
240 pub const fn len(self) -> usize {
241 self.msgs.len()
242 }
243
244 #[inline]
245 pub const fn is_empty(self) -> bool {
246 self.msgs.is_empty()
247 }
248
249 #[inline]
250 pub fn entry(self, culist_slot: CuListSlot) -> CopperListEntry<'a> {
251 let index = culist_slot.index();
252 CopperListEntry {
253 culist_slot,
254 component_id: self.layout.component_for_slot(culist_slot),
255 msg: self.msgs[index],
256 }
257 }
258
259 pub fn entries(self) -> impl Iterator<Item = CopperListEntry<'a>> + 'a {
260 self.msgs.iter().enumerate().map(move |(idx, msg)| {
261 let culist_slot = CuListSlot::new(idx);
262 CopperListEntry {
263 culist_slot,
264 component_id: self.layout.component_for_slot(culist_slot),
265 msg,
266 }
267 })
268 }
269}
270
271#[derive(Debug, Clone, Copy)]
273pub struct CopperListEntry<'a> {
274 pub culist_slot: CuListSlot,
275 pub component_id: ComponentId,
276 pub msg: &'a CuMsgMetadata,
277}
278
279impl<'a> CopperListEntry<'a> {
280 #[inline]
281 pub fn component(self, layout: CopperListLayout) -> &'static MonitorComponentMetadata {
282 layout.component(self.component_id)
283 }
284
285 #[inline]
286 pub fn component_type(self, layout: CopperListLayout) -> ComponentType {
287 layout.component(self.component_id).kind()
288 }
289}
290
291#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
293pub struct ExecutionMarker {
294 pub component_id: ComponentId,
296 pub step: CuComponentState,
298 pub culistid: Option<u64>,
300}
301
302#[derive(Debug)]
308pub struct RuntimeExecutionProbe {
309 component_id: AtomicUsize,
310 step: AtomicUsize,
311 #[cfg(target_has_atomic = "64")]
312 culistid: AtomicU64,
313 #[cfg(target_has_atomic = "64")]
314 culistid_present: AtomicUsize,
315 #[cfg(not(target_has_atomic = "64"))]
316 culistid: Mutex<Option<u64>>,
317 sequence: AtomicUsize,
318}
319
320impl Default for RuntimeExecutionProbe {
321 fn default() -> Self {
322 Self {
323 component_id: AtomicUsize::new(ComponentId::INVALID.index()),
324 step: AtomicUsize::new(0),
325 #[cfg(target_has_atomic = "64")]
326 culistid: AtomicU64::new(0),
327 #[cfg(target_has_atomic = "64")]
328 culistid_present: AtomicUsize::new(0),
329 #[cfg(not(target_has_atomic = "64"))]
330 culistid: Mutex::new(None),
331 sequence: AtomicUsize::new(0),
332 }
333 }
334}
335
336impl RuntimeExecutionProbe {
337 #[inline]
338 pub fn record(&self, marker: ExecutionMarker) {
339 self.component_id
340 .store(marker.component_id.index(), Ordering::Relaxed);
341 self.step
342 .store(component_state_to_usize(marker.step), Ordering::Relaxed);
343 #[cfg(target_has_atomic = "64")]
344 match marker.culistid {
345 Some(culistid) => {
346 self.culistid.store(culistid, Ordering::Relaxed);
347 self.culistid_present.store(1, Ordering::Relaxed);
348 }
349 None => {
350 self.culistid_present.store(0, Ordering::Relaxed);
351 }
352 }
353 #[cfg(not(target_has_atomic = "64"))]
354 {
355 *self.culistid.lock() = marker.culistid;
356 }
357 self.sequence.fetch_add(1, Ordering::Release);
358 }
359
360 #[inline]
361 pub fn sequence(&self) -> usize {
362 self.sequence.load(Ordering::Acquire)
363 }
364
365 #[inline]
366 pub fn marker(&self) -> Option<ExecutionMarker> {
367 loop {
370 let seq_before = self.sequence.load(Ordering::Acquire);
371 let component_id = self.component_id.load(Ordering::Relaxed);
372 let step = self.step.load(Ordering::Relaxed);
373 #[cfg(target_has_atomic = "64")]
374 let culistid_present = self.culistid_present.load(Ordering::Relaxed);
375 #[cfg(target_has_atomic = "64")]
376 let culistid_value = self.culistid.load(Ordering::Relaxed);
377 #[cfg(not(target_has_atomic = "64"))]
378 let culistid = *self.culistid.lock();
379 let seq_after = self.sequence.load(Ordering::Acquire);
380 if seq_before == seq_after {
381 if component_id == ComponentId::INVALID.index() {
382 return None;
383 }
384 let step = usize_to_component_state(step);
385 #[cfg(target_has_atomic = "64")]
386 let culistid = if culistid_present == 0 {
387 None
388 } else {
389 Some(culistid_value)
390 };
391 return Some(ExecutionMarker {
392 component_id: ComponentId::new(component_id),
393 step,
394 culistid,
395 });
396 }
397 }
398 }
399}
400
401#[inline]
402const fn component_state_to_usize(step: CuComponentState) -> usize {
403 match step {
404 CuComponentState::Start => 0,
405 CuComponentState::Preprocess => 1,
406 CuComponentState::Process => 2,
407 CuComponentState::Postprocess => 3,
408 CuComponentState::Stop => 4,
409 }
410}
411
412#[inline]
413const fn usize_to_component_state(step: usize) -> CuComponentState {
414 match step {
415 0 => CuComponentState::Start,
416 1 => CuComponentState::Preprocess,
417 2 => CuComponentState::Process,
418 3 => CuComponentState::Postprocess,
419 _ => CuComponentState::Stop,
420 }
421}
422
423#[cfg(feature = "std")]
424pub type ExecutionProbeHandle = Arc<RuntimeExecutionProbe>;
425
426#[derive(Debug, Clone)]
431pub struct MonitorExecutionProbe {
432 #[cfg(feature = "std")]
433 inner: Option<ExecutionProbeHandle>,
434}
435
436impl Default for MonitorExecutionProbe {
437 fn default() -> Self {
438 Self::unavailable()
439 }
440}
441
442impl MonitorExecutionProbe {
443 #[cfg(feature = "std")]
444 pub fn from_shared(handle: ExecutionProbeHandle) -> Self {
445 Self {
446 inner: Some(handle),
447 }
448 }
449
450 pub const fn unavailable() -> Self {
451 Self {
452 #[cfg(feature = "std")]
453 inner: None,
454 }
455 }
456
457 pub fn is_available(&self) -> bool {
458 #[cfg(feature = "std")]
459 {
460 self.inner.is_some()
461 }
462 #[cfg(not(feature = "std"))]
463 {
464 false
465 }
466 }
467
468 pub fn marker(&self) -> Option<ExecutionMarker> {
469 #[cfg(feature = "std")]
470 {
471 self.inner.as_ref().and_then(|probe| probe.marker())
472 }
473 #[cfg(not(feature = "std"))]
474 {
475 None
476 }
477 }
478
479 pub fn sequence(&self) -> Option<usize> {
480 #[cfg(feature = "std")]
481 {
482 self.inner.as_ref().map(|probe| probe.sequence())
483 }
484 #[cfg(not(feature = "std"))]
485 {
486 None
487 }
488 }
489}
490
491#[derive(Debug, Clone, Copy, PartialEq, Eq)]
496#[non_exhaustive]
497pub enum ComponentType {
498 Source,
499 Task,
500 Sink,
501 Bridge,
502}
503
504impl ComponentType {
505 pub const fn is_task(self) -> bool {
506 !matches!(self, Self::Bridge)
507 }
508}
509
510#[derive(Debug, Clone, Copy, PartialEq, Eq)]
512pub struct MonitorComponentMetadata {
513 id: &'static str,
514 kind: ComponentType,
515 type_name: Option<&'static str>,
516}
517
518impl MonitorComponentMetadata {
519 pub const fn new(
520 id: &'static str,
521 kind: ComponentType,
522 type_name: Option<&'static str>,
523 ) -> Self {
524 Self {
525 id,
526 kind,
527 type_name,
528 }
529 }
530
531 pub const fn id(&self) -> &'static str {
533 self.id
534 }
535
536 pub const fn kind(&self) -> ComponentType {
537 self.kind
538 }
539
540 pub const fn type_name(&self) -> Option<&'static str> {
542 self.type_name
543 }
544}
545
546#[derive(Debug, Clone)]
551pub struct CuMonitoringMetadata {
552 mission_id: CompactString,
553 subsystem_id: Option<CompactString>,
554 instance_id: u32,
555 layout: CopperListLayout,
556 copperlist_info: CopperListInfo,
557 topology: MonitorTopology,
558 monitor_config: Option<ComponentConfig>,
559}
560
561impl CuMonitoringMetadata {
562 pub fn new(
563 mission_id: CompactString,
564 components: &'static [MonitorComponentMetadata],
565 culist_component_mapping: &'static [ComponentId],
566 copperlist_info: CopperListInfo,
567 topology: MonitorTopology,
568 monitor_config: Option<ComponentConfig>,
569 ) -> CuResult<Self> {
570 Self::validate_components(components)?;
571 Self::validate_culist_mapping(components.len(), culist_component_mapping)?;
572 Ok(Self {
573 mission_id,
574 subsystem_id: None,
575 instance_id: 0,
576 layout: CopperListLayout::new(components, culist_component_mapping),
577 copperlist_info,
578 topology,
579 monitor_config,
580 })
581 }
582
583 fn validate_components(components: &'static [MonitorComponentMetadata]) -> CuResult<()> {
584 let mut seen_bridge = false;
585 for component in components {
586 match component.kind() {
587 component_type if component_type.is_task() && seen_bridge => {
588 return Err(CuError::from(
589 "invalid monitor metadata: task-family components must appear before bridges",
590 ));
591 }
592 ComponentType::Bridge => seen_bridge = true,
593 _ => {}
594 }
595 }
596 Ok(())
597 }
598
599 fn validate_culist_mapping(
600 components_len: usize,
601 culist_component_mapping: &'static [ComponentId],
602 ) -> CuResult<()> {
603 for component_idx in culist_component_mapping {
604 if component_idx.index() >= components_len {
605 return Err(CuError::from(
606 "invalid monitor metadata: culist mapping points past components table",
607 ));
608 }
609 }
610 Ok(())
611 }
612
613 pub fn mission_id(&self) -> &str {
615 self.mission_id.as_str()
616 }
617
618 pub fn subsystem_id(&self) -> Option<&str> {
621 self.subsystem_id.as_deref()
622 }
623
624 pub fn instance_id(&self) -> u32 {
626 self.instance_id
627 }
628
629 pub fn components(&self) -> &'static [MonitorComponentMetadata] {
633 self.layout.components()
634 }
635
636 pub const fn component_count(&self) -> usize {
638 self.layout.component_count()
639 }
640
641 pub const fn layout(&self) -> CopperListLayout {
643 self.layout
644 }
645
646 pub fn component(&self, component_id: ComponentId) -> &'static MonitorComponentMetadata {
647 self.layout.component(component_id)
648 }
649
650 pub fn component_id(&self, component_id: ComponentId) -> &'static str {
651 self.component(component_id).id()
652 }
653
654 pub fn component_kind(&self, component_id: ComponentId) -> ComponentType {
655 self.component(component_id).kind()
656 }
657
658 pub fn component_index_by_id(&self, component_id: &str) -> Option<ComponentId> {
659 self.layout
660 .components()
661 .iter()
662 .position(|component| component.id() == component_id)
663 .map(ComponentId::new)
664 }
665
666 pub fn culist_component_mapping(&self) -> &'static [ComponentId] {
670 self.layout.slot_to_component()
671 }
672
673 pub fn component_for_culist_slot(&self, culist_slot: CuListSlot) -> ComponentId {
674 self.layout.component_for_slot(culist_slot)
675 }
676
677 pub fn copperlist_view<'a>(&self, msgs: &'a [&'a CuMsgMetadata]) -> CopperListView<'a> {
678 self.layout.view(msgs)
679 }
680
681 pub const fn copperlist_info(&self) -> CopperListInfo {
682 self.copperlist_info
683 }
684
685 pub fn topology(&self) -> &MonitorTopology {
690 &self.topology
691 }
692
693 pub fn monitor_config(&self) -> Option<&ComponentConfig> {
694 self.monitor_config.as_ref()
695 }
696
697 pub fn with_monitor_config(mut self, monitor_config: Option<ComponentConfig>) -> Self {
698 self.monitor_config = monitor_config;
699 self
700 }
701
702 pub fn with_subsystem_id(mut self, subsystem_id: Option<&str>) -> Self {
703 self.subsystem_id = subsystem_id.map(CompactString::from);
704 self
705 }
706
707 pub fn with_instance_id(mut self, instance_id: u32) -> Self {
708 self.instance_id = instance_id;
709 self
710 }
711}
712
713#[derive(Debug, Clone, Default)]
717pub struct CuMonitoringRuntime {
718 execution_probe: MonitorExecutionProbe,
719}
720
721impl CuMonitoringRuntime {
722 pub const fn new(execution_probe: MonitorExecutionProbe) -> Self {
723 Self { execution_probe }
724 }
725
726 pub const fn unavailable() -> Self {
727 Self::new(MonitorExecutionProbe::unavailable())
728 }
729
730 pub fn execution_probe(&self) -> &MonitorExecutionProbe {
731 &self.execution_probe
732 }
733}
734
735#[derive(Debug)]
737pub enum Decision {
738 Abort, Ignore, Shutdown, }
742
743fn merge_decision(lhs: Decision, rhs: Decision) -> Decision {
744 use Decision::{Abort, Ignore, Shutdown};
745 match (lhs, rhs) {
748 (Shutdown, _) | (_, Shutdown) => Shutdown,
749 (Abort, _) | (_, Abort) => Abort,
750 _ => Ignore,
751 }
752}
753
754#[derive(Debug, Clone)]
755pub struct MonitorNode {
756 pub id: String,
757 pub type_name: Option<String>,
758 pub kind: ComponentType,
759 pub inputs: Vec<String>,
761 pub outputs: Vec<String>,
763}
764
765#[derive(Debug, Clone)]
766pub struct MonitorConnection {
767 pub src: String,
768 pub src_port: Option<String>,
769 pub dst: String,
770 pub dst_port: Option<String>,
771 pub msg: String,
772}
773
774#[derive(Debug, Clone, Default)]
775pub struct MonitorTopology {
776 pub nodes: Vec<MonitorNode>,
777 pub connections: Vec<MonitorConnection>,
778}
779
780#[derive(Debug, Clone, Copy, Default)]
781pub struct CopperListInfo {
782 pub size_bytes: usize,
783 pub count: usize,
784}
785
786impl CopperListInfo {
787 pub const fn new(size_bytes: usize, count: usize) -> Self {
788 Self { size_bytes, count }
789 }
790}
791
792#[derive(Debug, Clone, Copy, Default)]
794pub struct CopperListIoStats {
795 pub raw_culist_bytes: u64,
800 pub handle_bytes: u64,
806 pub encoded_culist_bytes: u64,
808 pub keyframe_bytes: u64,
810 pub structured_log_bytes_total: u64,
812 pub culistid: u64,
814}
815
816#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
817pub struct PayloadIoStats {
818 pub resident_bytes: usize,
819 pub encoded_bytes: usize,
820 pub handle_bytes: usize,
821}
822
823#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
824pub struct CuMsgIoStats {
825 pub present: bool,
826 pub resident_bytes: u64,
827 pub encoded_bytes: u64,
828 pub handle_bytes: u64,
829}
830
831struct CuMsgIoEntry {
832 present: PortableAtomicBool,
833 resident_bytes: PortableAtomicU64,
834 encoded_bytes: PortableAtomicU64,
835 handle_bytes: PortableAtomicU64,
836}
837
838impl CuMsgIoEntry {
839 fn clear(&self) {
840 self.present.store(false, PortableOrdering::Release);
841 self.resident_bytes.store(0, PortableOrdering::Relaxed);
842 self.encoded_bytes.store(0, PortableOrdering::Relaxed);
843 self.handle_bytes.store(0, PortableOrdering::Relaxed);
844 }
845
846 fn get(&self) -> CuMsgIoStats {
847 if !self.present.load(PortableOrdering::Acquire) {
848 return CuMsgIoStats::default();
849 }
850
851 CuMsgIoStats {
852 present: true,
853 resident_bytes: self.resident_bytes.load(PortableOrdering::Relaxed),
854 encoded_bytes: self.encoded_bytes.load(PortableOrdering::Relaxed),
855 handle_bytes: self.handle_bytes.load(PortableOrdering::Relaxed),
856 }
857 }
858
859 fn set(&self, stats: CuMsgIoStats) {
860 self.resident_bytes
861 .store(stats.resident_bytes, PortableOrdering::Relaxed);
862 self.encoded_bytes
863 .store(stats.encoded_bytes, PortableOrdering::Relaxed);
864 self.handle_bytes
865 .store(stats.handle_bytes, PortableOrdering::Relaxed);
866 self.present.store(stats.present, PortableOrdering::Release);
867 }
868}
869
870impl Default for CuMsgIoEntry {
871 fn default() -> Self {
872 Self {
873 present: PortableAtomicBool::new(false),
874 resident_bytes: PortableAtomicU64::new(0),
875 encoded_bytes: PortableAtomicU64::new(0),
876 handle_bytes: PortableAtomicU64::new(0),
877 }
878 }
879}
880
881pub struct CuMsgIoCache<const N: usize> {
882 entries: [CuMsgIoEntry; N],
883}
884
885impl<const N: usize> CuMsgIoCache<N> {
886 pub fn clear(&self) {
887 for entry in &self.entries {
888 entry.clear();
889 }
890 }
891
892 pub fn get(&self, idx: usize) -> CuMsgIoStats {
893 self.entries[idx].get()
894 }
895
896 fn raw_parts(&self) -> (usize, usize) {
897 (self.entries.as_ptr() as usize, N)
898 }
899}
900
901impl<const N: usize> Default for CuMsgIoCache<N> {
902 fn default() -> Self {
903 Self {
904 entries: core::array::from_fn(|_| CuMsgIoEntry::default()),
905 }
906 }
907}
908
909#[derive(Clone, Copy)]
910struct ActiveCuMsgIoCapture {
911 cache_addr: usize,
912 cache_len: usize,
913 current_slot: Option<usize>,
914}
915
916#[cfg(feature = "std")]
917thread_local! {
918 static PAYLOAD_HANDLE_BYTES: Cell<Option<usize>> = const { Cell::new(None) };
919 static ACTIVE_COPPERLIST_CAPTURE: Cell<Option<ActiveCuMsgIoCapture>> = const { Cell::new(None) };
920 static LAST_COMPLETED_HANDLE_BYTES: Cell<u64> = const { Cell::new(0) };
921}
922
923#[cfg(not(feature = "std"))]
924static PAYLOAD_HANDLE_BYTES: SpinMutex<Option<usize>> = SpinMutex::new(None);
925#[cfg(not(feature = "std"))]
926static ACTIVE_COPPERLIST_CAPTURE: SpinMutex<Option<ActiveCuMsgIoCapture>> = SpinMutex::new(None);
927#[cfg(not(feature = "std"))]
928static LAST_COMPLETED_HANDLE_BYTES: SpinMutex<u64> = SpinMutex::new(0);
929
930fn begin_payload_io_measurement() {
931 #[cfg(feature = "std")]
932 PAYLOAD_HANDLE_BYTES.with(|bytes| {
933 debug_assert!(
934 bytes.get().is_none(),
935 "payload IO byte measurement must not be nested"
936 );
937 bytes.set(Some(0));
938 });
939
940 #[cfg(not(feature = "std"))]
941 {
942 let mut bytes = PAYLOAD_HANDLE_BYTES.lock();
943 debug_assert!(
944 bytes.is_none(),
945 "payload IO byte measurement must not be nested"
946 );
947 *bytes = Some(0);
948 }
949}
950
951fn finish_payload_io_measurement() -> usize {
952 #[cfg(feature = "std")]
953 {
954 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.replace(None).unwrap_or(0))
955 }
956
957 #[cfg(not(feature = "std"))]
958 {
959 PAYLOAD_HANDLE_BYTES.lock().take().unwrap_or(0)
960 }
961}
962
963fn abort_payload_io_measurement() {
964 #[cfg(feature = "std")]
965 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.set(None));
966
967 #[cfg(not(feature = "std"))]
968 {
969 *PAYLOAD_HANDLE_BYTES.lock() = None;
970 }
971}
972
973fn current_payload_io_measurement() -> usize {
974 #[cfg(feature = "std")]
975 {
976 PAYLOAD_HANDLE_BYTES.with(|bytes| bytes.get().unwrap_or(0))
977 }
978
979 #[cfg(not(feature = "std"))]
980 {
981 PAYLOAD_HANDLE_BYTES.lock().as_ref().copied().unwrap_or(0)
982 }
983}
984
985#[cfg(feature = "std")]
986pub(crate) fn record_payload_handle_bytes(bytes: usize) {
987 #[cfg(feature = "std")]
988 PAYLOAD_HANDLE_BYTES.with(|total| {
989 if let Some(current) = total.get() {
990 total.set(Some(current.saturating_add(bytes)));
991 }
992 });
993
994 #[cfg(not(feature = "std"))]
995 {
996 let mut total = PAYLOAD_HANDLE_BYTES.lock();
997 if let Some(current) = *total {
998 *total = Some(current.saturating_add(bytes));
999 }
1000 }
1001}
1002
1003fn set_last_completed_handle_bytes(bytes: u64) {
1004 #[cfg(feature = "std")]
1005 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.set(bytes));
1006
1007 #[cfg(not(feature = "std"))]
1008 {
1009 *LAST_COMPLETED_HANDLE_BYTES.lock() = bytes;
1010 }
1011}
1012
1013pub fn take_last_completed_handle_bytes() -> u64 {
1014 #[cfg(feature = "std")]
1015 {
1016 LAST_COMPLETED_HANDLE_BYTES.with(|total| total.replace(0))
1017 }
1018
1019 #[cfg(not(feature = "std"))]
1020 {
1021 let mut total = LAST_COMPLETED_HANDLE_BYTES.lock();
1022 let value = *total;
1023 *total = 0;
1024 value
1025 }
1026}
1027
1028fn with_active_capture_mut<R>(f: impl FnOnce(&mut ActiveCuMsgIoCapture) -> R) -> Option<R> {
1029 #[cfg(feature = "std")]
1030 {
1031 ACTIVE_COPPERLIST_CAPTURE.with(|capture| {
1032 let mut state = capture.get()?;
1033 let result = f(&mut state);
1034 capture.set(Some(state));
1035 Some(result)
1036 })
1037 }
1038
1039 #[cfg(not(feature = "std"))]
1040 {
1041 let mut capture = ACTIVE_COPPERLIST_CAPTURE.lock();
1042 let state = capture.as_mut()?;
1043 Some(f(state))
1044 }
1045}
1046
1047pub struct CuMsgIoCaptureGuard;
1048
1049impl CuMsgIoCaptureGuard {
1050 pub fn select_slot(&self, slot: usize) {
1051 let _ = with_active_capture_mut(|capture| {
1052 debug_assert!(slot < capture.cache_len, "payload IO slot out of range");
1053 capture.current_slot = Some(slot);
1054 });
1055 }
1056}
1057
1058impl Drop for CuMsgIoCaptureGuard {
1059 fn drop(&mut self) {
1060 set_last_completed_handle_bytes(finish_payload_io_measurement() as u64);
1061
1062 #[cfg(feature = "std")]
1063 ACTIVE_COPPERLIST_CAPTURE.with(|capture| capture.set(None));
1064
1065 #[cfg(not(feature = "std"))]
1066 {
1067 *ACTIVE_COPPERLIST_CAPTURE.lock() = None;
1068 }
1069 }
1070}
1071
1072pub fn start_copperlist_io_capture<const N: usize>(cache: &CuMsgIoCache<N>) -> CuMsgIoCaptureGuard {
1073 cache.clear();
1074 set_last_completed_handle_bytes(0);
1075 begin_payload_io_measurement();
1076 let (cache_addr, cache_len) = cache.raw_parts();
1077 let capture = ActiveCuMsgIoCapture {
1078 cache_addr,
1079 cache_len,
1080 current_slot: None,
1081 };
1082
1083 #[cfg(feature = "std")]
1084 ACTIVE_COPPERLIST_CAPTURE.with(|state| {
1085 debug_assert!(
1086 state.get().is_none(),
1087 "CopperList payload IO capture must not be nested"
1088 );
1089 state.set(Some(capture));
1090 });
1091
1092 #[cfg(not(feature = "std"))]
1093 {
1094 let mut state = ACTIVE_COPPERLIST_CAPTURE.lock();
1095 debug_assert!(
1096 state.is_none(),
1097 "CopperList payload IO capture must not be nested"
1098 );
1099 *state = Some(capture);
1100 }
1101
1102 CuMsgIoCaptureGuard
1103}
1104
1105pub(crate) fn current_payload_handle_bytes() -> usize {
1106 current_payload_io_measurement()
1107}
1108
1109pub(crate) fn record_current_slot_payload_io_stats(
1110 fixed_bytes: usize,
1111 encoded_bytes: usize,
1112 handle_bytes: usize,
1113) {
1114 let _ = with_active_capture_mut(|capture| {
1115 let Some(slot) = capture.current_slot else {
1116 return;
1117 };
1118 if slot >= capture.cache_len {
1119 return;
1120 }
1121 let cache_ptr = capture.cache_addr as *const CuMsgIoEntry;
1123 let entry = unsafe { &*cache_ptr.add(slot) };
1124 entry.set(CuMsgIoStats {
1125 present: true,
1126 resident_bytes: (fixed_bytes.saturating_add(handle_bytes)) as u64,
1127 encoded_bytes: encoded_bytes as u64,
1128 handle_bytes: handle_bytes as u64,
1129 });
1130 });
1131}
1132
1133pub fn payload_io_stats<T>(payload: &T) -> CuResult<PayloadIoStats>
1140where
1141 T: Encode,
1142{
1143 begin_payload_io_measurement();
1144 begin_observed_encode();
1145
1146 let result = (|| {
1147 let mut encoder =
1148 EncoderImpl::<_, _>::new(ObservedWriter::new(SizeWriter::default()), standard());
1149 payload.encode(&mut encoder).map_err(|e| {
1150 CuError::from("Failed to measure payload IO bytes").add_cause(&e.to_string())
1151 })?;
1152 let encoded_bytes = encoder.into_writer().into_inner().bytes_written;
1153 debug_assert_eq!(encoded_bytes, finish_observed_encode());
1154 let handle_bytes = finish_payload_io_measurement();
1155 Ok(PayloadIoStats {
1156 resident_bytes: core::mem::size_of::<T>().saturating_add(handle_bytes),
1157 encoded_bytes,
1158 handle_bytes,
1159 })
1160 })();
1161
1162 if result.is_err() {
1163 abort_payload_io_measurement();
1164 abort_observed_encode();
1165 }
1166
1167 result
1168}
1169
1170#[derive(Default, Debug, Clone, Copy)]
1171struct NodeIoUsage {
1172 has_incoming: bool,
1173 has_outgoing: bool,
1174}
1175
1176fn collect_output_ports(graph: &CuGraph, node_id: NodeId) -> Vec<(String, String)> {
1177 let mut edge_ids = graph.get_src_edges(node_id).unwrap_or_default();
1178 edge_ids.sort();
1179
1180 let mut outputs = Vec::new();
1181 let mut seen = Vec::new();
1182 let mut port_idx = 0usize;
1183 for edge_id in edge_ids {
1184 let Some(edge) = graph.edge(edge_id) else {
1185 continue;
1186 };
1187 if seen.iter().any(|msg| msg == &edge.msg) {
1188 continue;
1189 }
1190 seen.push(edge.msg.clone());
1191 let mut port_label = String::from("out");
1192 port_label.push_str(&port_idx.to_string());
1193 port_label.push_str(": ");
1194 port_label.push_str(edge.msg.as_str());
1195 outputs.push((edge.msg.clone(), port_label));
1196 port_idx += 1;
1197 }
1198 outputs
1199}
1200
1201pub fn build_monitor_topology(config: &CuConfig, mission: &str) -> CuResult<MonitorTopology> {
1203 let graph = config.get_graph(Some(mission))?;
1204 let mut nodes: Map<String, MonitorNode> = Map::new();
1205 let mut io_usage: Map<String, NodeIoUsage> = Map::new();
1206 let mut output_port_lookup: Map<String, Map<String, String>> = Map::new();
1207
1208 let mut bridge_lookup: Map<&str, &BridgeConfig> = Map::new();
1209 for bridge in &config.bridges {
1210 bridge_lookup.insert(bridge.id.as_str(), bridge);
1211 }
1212
1213 for cnx in graph.edges() {
1214 io_usage.entry(cnx.src.clone()).or_default().has_outgoing = true;
1215 io_usage.entry(cnx.dst.clone()).or_default().has_incoming = true;
1216 }
1217
1218 for (_, node) in graph.get_all_nodes() {
1219 let node_id = node.get_id();
1220 let usage = io_usage.get(node_id.as_str()).cloned().unwrap_or_default();
1221 let kind = match node.get_flavor() {
1222 Flavor::Bridge => ComponentType::Bridge,
1223 _ if !usage.has_incoming && usage.has_outgoing => ComponentType::Source,
1224 _ if usage.has_incoming && !usage.has_outgoing => ComponentType::Sink,
1225 _ => ComponentType::Task,
1226 };
1227
1228 let mut inputs = Vec::new();
1229 let mut outputs = Vec::new();
1230 if kind == ComponentType::Bridge {
1231 if let Some(bridge) = bridge_lookup.get(node_id.as_str()) {
1232 for ch in &bridge.channels {
1233 match ch {
1234 BridgeChannelConfigRepresentation::Rx { id, .. } => {
1235 outputs.push(id.clone())
1236 }
1237 BridgeChannelConfigRepresentation::Tx { id, .. } => inputs.push(id.clone()),
1238 }
1239 }
1240 }
1241 } else {
1242 if usage.has_incoming || !usage.has_outgoing {
1243 inputs.push("in".to_string());
1244 }
1245 if usage.has_outgoing {
1246 if let Some(node_idx) = graph.get_node_id_by_name(node_id.as_str()) {
1247 let ports = collect_output_ports(graph, node_idx);
1248 let mut port_map: Map<String, String> = Map::new();
1249 for (msg_type, label) in ports {
1250 port_map.insert(msg_type, label.clone());
1251 outputs.push(label);
1252 }
1253 output_port_lookup.insert(node_id.clone(), port_map);
1254 }
1255 } else if !usage.has_incoming {
1256 outputs.push("out".to_string());
1257 }
1258 }
1259
1260 nodes.insert(
1261 node_id.clone(),
1262 MonitorNode {
1263 id: node_id,
1264 type_name: Some(node.get_type().to_string()),
1265 kind,
1266 inputs,
1267 outputs,
1268 },
1269 );
1270 }
1271
1272 let mut connections = Vec::new();
1273 for cnx in graph.edges() {
1274 let src = cnx.src.clone();
1275 let dst = cnx.dst.clone();
1276
1277 let src_port = cnx.src_channel.clone().or_else(|| {
1278 output_port_lookup
1279 .get(&src)
1280 .and_then(|ports| ports.get(&cnx.msg).cloned())
1281 .or_else(|| {
1282 nodes
1283 .get(&src)
1284 .and_then(|node| node.outputs.first().cloned())
1285 })
1286 });
1287 let dst_port = cnx.dst_channel.clone().or_else(|| {
1288 nodes
1289 .get(&dst)
1290 .and_then(|node| node.inputs.first().cloned())
1291 });
1292
1293 connections.push(MonitorConnection {
1294 src,
1295 src_port,
1296 dst,
1297 dst_port,
1298 msg: cnx.msg.clone(),
1299 });
1300 }
1301
1302 Ok(MonitorTopology {
1303 nodes: nodes.into_values().collect(),
1304 connections,
1305 })
1306}
1307
1308pub trait CuMonitor: Sized {
1328 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1334 where
1335 Self: Sized;
1336
1337 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1339 Ok(())
1340 }
1341
1342 fn process_copperlist(&self, _ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()>;
1344
1345 fn observe_copperlist_io(&self, _stats: CopperListIoStats) {}
1347
1348 fn process_error(
1352 &self,
1353 component_id: ComponentId,
1354 step: CuComponentState,
1355 error: &CuError,
1356 ) -> Decision;
1357
1358 fn process_panic(&self, _panic_message: &str) {}
1360
1361 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1363 Ok(())
1364 }
1365}
1366
1367pub struct NoMonitor {}
1370impl CuMonitor for NoMonitor {
1371 fn new(_metadata: CuMonitoringMetadata, _runtime: CuMonitoringRuntime) -> CuResult<Self> {
1372 Ok(NoMonitor {})
1373 }
1374
1375 fn start(&mut self, _ctx: &CuContext) -> CuResult<()> {
1376 #[cfg(all(feature = "std", debug_assertions))]
1377 register_live_log_listener(|entry, format_str, param_names| {
1378 let params: Vec<String> = entry.params.iter().map(|v| v.to_string()).collect();
1379 let named: Map<String, String> = param_names
1380 .iter()
1381 .zip(params.iter())
1382 .map(|(k, v)| (k.to_string(), v.clone()))
1383 .collect();
1384
1385 if let Ok(msg) = format_message_only(format_str, params.as_slice(), &named) {
1386 let ts = format_timestamp(entry.time);
1387 println!("{} [{:?}] {}", ts, entry.level, msg);
1388 }
1389 });
1390 Ok(())
1391 }
1392
1393 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1394 Ok(())
1396 }
1397
1398 fn process_error(
1399 &self,
1400 _component_id: ComponentId,
1401 _step: CuComponentState,
1402 _error: &CuError,
1403 ) -> Decision {
1404 Decision::Ignore
1406 }
1407
1408 fn stop(&mut self, _ctx: &CuContext) -> CuResult<()> {
1409 #[cfg(all(feature = "std", debug_assertions))]
1410 unregister_live_log_listener();
1411 Ok(())
1412 }
1413}
1414
1415macro_rules! impl_monitor_tuple {
1416 ($($idx:tt => $name:ident),+) => {
1417 impl<$($name: CuMonitor),+> CuMonitor for ($($name,)+) {
1418 fn new(metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self>
1419 where
1420 Self: Sized,
1421 {
1422 Ok(($($name::new(metadata.clone(), runtime.clone())?,)+))
1423 }
1424
1425 fn start(&mut self, ctx: &CuContext) -> CuResult<()> {
1426 $(self.$idx.start(ctx)?;)+
1427 Ok(())
1428 }
1429
1430 fn process_copperlist(&self, ctx: &CuContext, view: CopperListView<'_>) -> CuResult<()> {
1431 $(self.$idx.process_copperlist(ctx, view)?;)+
1432 Ok(())
1433 }
1434
1435 fn observe_copperlist_io(&self, stats: CopperListIoStats) {
1436 $(self.$idx.observe_copperlist_io(stats);)+
1437 }
1438
1439 fn process_error(
1440 &self,
1441 component_id: ComponentId,
1442 step: CuComponentState,
1443 error: &CuError,
1444 ) -> Decision {
1445 let mut decision = Decision::Ignore;
1446 $(decision = merge_decision(decision, self.$idx.process_error(component_id, step, error));)+
1447 decision
1448 }
1449
1450 fn process_panic(&self, panic_message: &str) {
1451 $(self.$idx.process_panic(panic_message);)+
1452 }
1453
1454 fn stop(&mut self, ctx: &CuContext) -> CuResult<()> {
1455 $(self.$idx.stop(ctx)?;)+
1456 Ok(())
1457 }
1458 }
1459 };
1460}
1461
1462impl_monitor_tuple!(0 => M0, 1 => M1);
1463impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2);
1464impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3);
1465impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4);
1466impl_monitor_tuple!(0 => M0, 1 => M1, 2 => M2, 3 => M3, 4 => M4, 5 => M5);
1467
1468#[cfg(feature = "std")]
1469pub fn panic_payload_to_string(payload: &(dyn core::any::Any + Send)) -> String {
1470 if let Some(msg) = payload.downcast_ref::<&str>() {
1471 (*msg).to_string()
1472 } else if let Some(msg) = payload.downcast_ref::<String>() {
1473 msg.clone()
1474 } else {
1475 "panic with non-string payload".to_string()
1476 }
1477}
1478
1479pub struct CountingAlloc<A: GlobalAlloc> {
1481 inner: A,
1482 allocated: AtomicUsize,
1483 deallocated: AtomicUsize,
1484}
1485
1486impl<A: GlobalAlloc> CountingAlloc<A> {
1487 pub const fn new(inner: A) -> Self {
1488 CountingAlloc {
1489 inner,
1490 allocated: AtomicUsize::new(0),
1491 deallocated: AtomicUsize::new(0),
1492 }
1493 }
1494
1495 pub fn allocated(&self) -> usize {
1496 self.allocated.load(Ordering::SeqCst)
1497 }
1498
1499 pub fn deallocated(&self) -> usize {
1500 self.deallocated.load(Ordering::SeqCst)
1501 }
1502
1503 pub fn reset(&self) {
1504 self.allocated.store(0, Ordering::SeqCst);
1505 self.deallocated.store(0, Ordering::SeqCst);
1506 }
1507}
1508
1509unsafe impl<A: GlobalAlloc> GlobalAlloc for CountingAlloc<A> {
1511 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
1513 let p = unsafe { self.inner.alloc(layout) };
1515 if !p.is_null() {
1516 self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
1517 }
1518 p
1519 }
1520
1521 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
1523 unsafe { self.inner.dealloc(ptr, layout) }
1525 self.deallocated.fetch_add(layout.size(), Ordering::SeqCst);
1526 }
1527}
1528
1529#[cfg(feature = "memory_monitoring")]
1531pub struct ScopedAllocCounter {
1532 bf_allocated: usize,
1533 bf_deallocated: usize,
1534}
1535
1536#[cfg(feature = "memory_monitoring")]
1537impl Default for ScopedAllocCounter {
1538 fn default() -> Self {
1539 Self::new()
1540 }
1541}
1542
1543#[cfg(feature = "memory_monitoring")]
1544impl ScopedAllocCounter {
1545 pub fn new() -> Self {
1546 ScopedAllocCounter {
1547 bf_allocated: GLOBAL.allocated(),
1548 bf_deallocated: GLOBAL.deallocated(),
1549 }
1550 }
1551
1552 pub fn allocated(&self) -> usize {
1564 GLOBAL.allocated() - self.bf_allocated
1565 }
1566
1567 pub fn deallocated(&self) -> usize {
1580 GLOBAL.deallocated() - self.bf_deallocated
1581 }
1582}
1583
1584#[cfg(feature = "memory_monitoring")]
1586impl Drop for ScopedAllocCounter {
1587 fn drop(&mut self) {
1588 let _allocated = GLOBAL.allocated() - self.bf_allocated;
1589 let _deallocated = GLOBAL.deallocated() - self.bf_deallocated;
1590 }
1597}
1598
1599#[cfg(feature = "std")]
1600const BUCKET_COUNT: usize = 1024;
1601#[cfg(not(feature = "std"))]
1602const BUCKET_COUNT: usize = 256;
1603
1604#[derive(Debug, Clone)]
1607pub struct LiveStatistics {
1608 buckets: [u64; BUCKET_COUNT],
1609 min_val: u64,
1610 max_val: u64,
1611 sum: u128,
1612 sum_sq: u128,
1613 count: u64,
1614 max_value: u64,
1615}
1616
1617impl LiveStatistics {
1618 pub fn new_with_max(max_value: u64) -> Self {
1638 LiveStatistics {
1639 buckets: [0; BUCKET_COUNT],
1640 min_val: u64::MAX,
1641 max_val: 0,
1642 sum: 0,
1643 sum_sq: 0,
1644 count: 0,
1645 max_value,
1646 }
1647 }
1648
1649 #[inline]
1650 fn value_to_bucket(&self, value: u64) -> usize {
1651 if value >= self.max_value {
1652 BUCKET_COUNT - 1
1653 } else {
1654 ((value as u128 * BUCKET_COUNT as u128) / self.max_value as u128) as usize
1655 }
1656 }
1657
1658 #[inline]
1659 pub fn min(&self) -> u64 {
1660 if self.count == 0 { 0 } else { self.min_val }
1661 }
1662
1663 #[inline]
1664 pub fn max(&self) -> u64 {
1665 self.max_val
1666 }
1667
1668 #[inline]
1669 pub fn mean(&self) -> f64 {
1670 if self.count == 0 {
1671 0.0
1672 } else {
1673 self.sum as f64 / self.count as f64
1674 }
1675 }
1676
1677 #[inline]
1678 pub fn stdev(&self) -> f64 {
1679 if self.count == 0 {
1680 return 0.0;
1681 }
1682 let mean = self.mean();
1683 let variance = (self.sum_sq as f64 / self.count as f64) - (mean * mean);
1684 if variance < 0.0 {
1685 return 0.0;
1686 }
1687 #[cfg(feature = "std")]
1688 return variance.sqrt();
1689 #[cfg(not(feature = "std"))]
1690 return sqrt(variance);
1691 }
1692
1693 #[inline]
1694 pub fn percentile(&self, percentile: f64) -> u64 {
1695 if self.count == 0 {
1696 return 0;
1697 }
1698
1699 let target_count = (self.count as f64 * percentile) as u64;
1700 let mut accumulated = 0u64;
1701
1702 for (bucket_idx, &bucket_count) in self.buckets.iter().enumerate() {
1703 accumulated += bucket_count;
1704 if accumulated >= target_count {
1705 let bucket_start = (bucket_idx as u64 * self.max_value) / BUCKET_COUNT as u64;
1707 let bucket_end = ((bucket_idx + 1) as u64 * self.max_value) / BUCKET_COUNT as u64;
1708 let bucket_fraction = if bucket_count > 0 {
1709 (target_count - (accumulated - bucket_count)) as f64 / bucket_count as f64
1710 } else {
1711 0.5
1712 };
1713 return bucket_start
1714 + ((bucket_end - bucket_start) as f64 * bucket_fraction) as u64;
1715 }
1716 }
1717
1718 self.max_val
1719 }
1720
1721 #[inline]
1723 pub fn record(&mut self, value: u64) {
1724 if value < self.min_val {
1725 self.min_val = value;
1726 }
1727 if value > self.max_val {
1728 self.max_val = value;
1729 }
1730 let value_u128 = value as u128;
1731 self.sum += value_u128;
1732 self.sum_sq += value_u128 * value_u128;
1733 self.count += 1;
1734
1735 let bucket = self.value_to_bucket(value);
1736 self.buckets[bucket] += 1;
1737 }
1738
1739 #[inline]
1740 pub fn len(&self) -> u64 {
1741 self.count
1742 }
1743
1744 #[inline]
1745 pub fn is_empty(&self) -> bool {
1746 self.count == 0
1747 }
1748
1749 #[inline]
1750 pub fn reset(&mut self) {
1751 self.buckets.fill(0);
1752 self.min_val = u64::MAX;
1753 self.max_val = 0;
1754 self.sum = 0;
1755 self.sum_sq = 0;
1756 self.count = 0;
1757 }
1758}
1759
1760#[derive(Debug, Clone)]
1763pub struct CuDurationStatistics {
1764 bare: LiveStatistics,
1765 jitter: LiveStatistics,
1766 last_value: CuDuration,
1767}
1768
1769impl CuDurationStatistics {
1770 pub fn new(max: CuDuration) -> Self {
1771 let CuDuration(max) = max;
1772 CuDurationStatistics {
1773 bare: LiveStatistics::new_with_max(max),
1774 jitter: LiveStatistics::new_with_max(max),
1775 last_value: CuDuration::default(),
1776 }
1777 }
1778
1779 #[inline]
1780 pub fn min(&self) -> CuDuration {
1781 CuDuration(self.bare.min())
1782 }
1783
1784 #[inline]
1785 pub fn max(&self) -> CuDuration {
1786 CuDuration(self.bare.max())
1787 }
1788
1789 #[inline]
1790 pub fn mean(&self) -> CuDuration {
1791 CuDuration(self.bare.mean() as u64) }
1793
1794 #[inline]
1795 pub fn percentile(&self, percentile: f64) -> CuDuration {
1796 CuDuration(self.bare.percentile(percentile))
1797 }
1798
1799 #[inline]
1800 pub fn stddev(&self) -> CuDuration {
1801 CuDuration(self.bare.stdev() as u64)
1802 }
1803
1804 #[inline]
1805 pub fn len(&self) -> u64 {
1806 self.bare.len()
1807 }
1808
1809 #[inline]
1810 pub fn is_empty(&self) -> bool {
1811 self.bare.len() == 0
1812 }
1813
1814 #[inline]
1815 pub fn jitter_min(&self) -> CuDuration {
1816 CuDuration(self.jitter.min())
1817 }
1818
1819 #[inline]
1820 pub fn jitter_max(&self) -> CuDuration {
1821 CuDuration(self.jitter.max())
1822 }
1823
1824 #[inline]
1825 pub fn jitter_mean(&self) -> CuDuration {
1826 CuDuration(self.jitter.mean() as u64)
1827 }
1828
1829 #[inline]
1830 pub fn jitter_stddev(&self) -> CuDuration {
1831 CuDuration(self.jitter.stdev() as u64)
1832 }
1833
1834 #[inline]
1835 pub fn jitter_percentile(&self, percentile: f64) -> CuDuration {
1836 CuDuration(self.jitter.percentile(percentile))
1837 }
1838
1839 #[inline]
1840 pub fn record(&mut self, value: CuDuration) {
1841 let CuDuration(nanos) = value;
1842 if self.bare.is_empty() {
1843 self.bare.record(nanos);
1844 self.last_value = value;
1845 return;
1846 }
1847 self.bare.record(nanos);
1848 let CuDuration(last_nanos) = self.last_value;
1849 self.jitter.record(nanos.abs_diff(last_nanos));
1850 self.last_value = value;
1851 }
1852
1853 #[inline]
1854 pub fn reset(&mut self) {
1855 self.bare.reset();
1856 self.jitter.reset();
1857 }
1858}
1859
1860#[cfg(test)]
1861mod tests {
1862 use super::*;
1863 use core::sync::atomic::{AtomicUsize, Ordering};
1864
1865 #[derive(Clone, Copy)]
1866 enum TestDecision {
1867 Ignore,
1868 Abort,
1869 Shutdown,
1870 }
1871
1872 struct TestMonitor {
1873 decision: TestDecision,
1874 copperlist_calls: AtomicUsize,
1875 panic_calls: AtomicUsize,
1876 }
1877
1878 impl TestMonitor {
1879 fn new_with(decision: TestDecision) -> Self {
1880 Self {
1881 decision,
1882 copperlist_calls: AtomicUsize::new(0),
1883 panic_calls: AtomicUsize::new(0),
1884 }
1885 }
1886 }
1887
1888 fn test_metadata() -> CuMonitoringMetadata {
1889 const COMPONENTS: &[MonitorComponentMetadata] = &[
1890 MonitorComponentMetadata::new("a", ComponentType::Task, None),
1891 MonitorComponentMetadata::new("b", ComponentType::Task, None),
1892 ];
1893 CuMonitoringMetadata::new(
1894 CompactString::from(crate::config::DEFAULT_MISSION_ID),
1895 COMPONENTS,
1896 &[],
1897 CopperListInfo::new(0, 0),
1898 MonitorTopology::default(),
1899 None,
1900 )
1901 .expect("test metadata should be valid")
1902 }
1903
1904 impl CuMonitor for TestMonitor {
1905 fn new(_metadata: CuMonitoringMetadata, runtime: CuMonitoringRuntime) -> CuResult<Self> {
1906 let monitor = Self::new_with(TestDecision::Ignore);
1907 #[cfg(feature = "std")]
1908 let _ = runtime.execution_probe();
1909 Ok(monitor)
1910 }
1911
1912 fn process_copperlist(&self, _ctx: &CuContext, _view: CopperListView<'_>) -> CuResult<()> {
1913 self.copperlist_calls.fetch_add(1, Ordering::SeqCst);
1914 Ok(())
1915 }
1916
1917 fn process_error(
1918 &self,
1919 _component_id: ComponentId,
1920 _step: CuComponentState,
1921 _error: &CuError,
1922 ) -> Decision {
1923 match self.decision {
1924 TestDecision::Ignore => Decision::Ignore,
1925 TestDecision::Abort => Decision::Abort,
1926 TestDecision::Shutdown => Decision::Shutdown,
1927 }
1928 }
1929
1930 fn process_panic(&self, _panic_message: &str) {
1931 self.panic_calls.fetch_add(1, Ordering::SeqCst);
1932 }
1933 }
1934
1935 #[test]
1936 fn test_live_statistics_percentiles() {
1937 let mut stats = LiveStatistics::new_with_max(1000);
1938
1939 for i in 0..100 {
1941 stats.record(i);
1942 }
1943
1944 assert_eq!(stats.len(), 100);
1945 assert_eq!(stats.min(), 0);
1946 assert_eq!(stats.max(), 99);
1947 assert_eq!(stats.mean() as u64, 49); let p50 = stats.percentile(0.5);
1951 let p90 = stats.percentile(0.90);
1952 let p95 = stats.percentile(0.95);
1953 let p99 = stats.percentile(0.99);
1954
1955 assert!((p50 as i64 - 49).abs() < 5, "p50={} expected ~49", p50);
1957 assert!((p90 as i64 - 89).abs() < 5, "p90={} expected ~89", p90);
1958 assert!((p95 as i64 - 94).abs() < 5, "p95={} expected ~94", p95);
1959 assert!((p99 as i64 - 98).abs() < 5, "p99={} expected ~98", p99);
1960 }
1961
1962 #[test]
1963 fn test_duration_stats() {
1964 let mut stats = CuDurationStatistics::new(CuDuration(1000));
1965 stats.record(CuDuration(100));
1966 stats.record(CuDuration(200));
1967 stats.record(CuDuration(500));
1968 stats.record(CuDuration(400));
1969 assert_eq!(stats.min(), CuDuration(100));
1970 assert_eq!(stats.max(), CuDuration(500));
1971 assert_eq!(stats.mean(), CuDuration(300));
1972 assert_eq!(stats.len(), 4);
1973 assert_eq!(stats.jitter.len(), 3);
1974 assert_eq!(stats.jitter_min(), CuDuration(100));
1975 assert_eq!(stats.jitter_max(), CuDuration(300));
1976 assert_eq!(stats.jitter_mean(), CuDuration((100 + 300 + 100) / 3));
1977 stats.reset();
1978 assert_eq!(stats.len(), 0);
1979 }
1980
1981 #[test]
1982 fn test_duration_stats_large_samples_do_not_overflow() {
1983 let mut stats = CuDurationStatistics::new(CuDuration(10_000_000_000));
1984 stats.record(CuDuration(5_000_000_000));
1985 stats.record(CuDuration(8_000_000_000));
1986
1987 assert_eq!(stats.min(), CuDuration(5_000_000_000));
1988 assert_eq!(stats.max(), CuDuration(8_000_000_000));
1989 assert_eq!(stats.mean(), CuDuration(6_500_000_000));
1990 assert!(stats.stddev().as_nanos().abs_diff(1_500_000_000) <= 1);
1991 assert_eq!(stats.jitter_mean(), CuDuration(3_000_000_000));
1992 }
1993
1994 #[test]
1995 fn tuple_monitor_merges_contradictory_decisions_with_strictest_wins() {
1996 let err = CuError::from("boom");
1997
1998 let two = (
1999 TestMonitor::new_with(TestDecision::Ignore),
2000 TestMonitor::new_with(TestDecision::Shutdown),
2001 );
2002 assert!(matches!(
2003 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2004 Decision::Shutdown
2005 ));
2006
2007 let two = (
2008 TestMonitor::new_with(TestDecision::Ignore),
2009 TestMonitor::new_with(TestDecision::Abort),
2010 );
2011 assert!(matches!(
2012 two.process_error(ComponentId::new(0), CuComponentState::Process, &err),
2013 Decision::Abort
2014 ));
2015 }
2016
2017 #[test]
2018 fn tuple_monitor_fans_out_callbacks() {
2019 let monitors = <(TestMonitor, TestMonitor) as CuMonitor>::new(
2020 test_metadata(),
2021 CuMonitoringRuntime::unavailable(),
2022 )
2023 .expect("tuple new");
2024 let (ctx, _clock_control) = CuContext::new_mock_clock();
2025 let empty_view = test_metadata().layout().view(&[]);
2026 monitors
2027 .process_copperlist(&ctx, empty_view)
2028 .expect("process_copperlist should fan out");
2029 monitors.process_panic("panic marker");
2030
2031 assert_eq!(monitors.0.copperlist_calls.load(Ordering::SeqCst), 1);
2032 assert_eq!(monitors.1.copperlist_calls.load(Ordering::SeqCst), 1);
2033 assert_eq!(monitors.0.panic_calls.load(Ordering::SeqCst), 1);
2034 assert_eq!(monitors.1.panic_calls.load(Ordering::SeqCst), 1);
2035 }
2036
2037 fn encoded_size<E: Encode>(value: &E) -> usize {
2038 let mut encoder = EncoderImpl::<_, _>::new(SizeWriter::default(), standard());
2039 value
2040 .encode(&mut encoder)
2041 .expect("size measurement encoder should not fail");
2042 encoder.into_writer().bytes_written
2043 }
2044
2045 #[test]
2046 fn payload_io_stats_tracks_encode_path_size_for_plain_payloads() {
2047 let payload = vec![1u8, 2, 3, 4];
2048 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2049
2050 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2051 assert_eq!(io.resident_bytes, core::mem::size_of::<Vec<u8>>());
2052 assert_eq!(io.handle_bytes, 0);
2053 }
2054
2055 #[test]
2056 fn payload_io_stats_tracks_handle_backed_storage() {
2057 let payload = crate::pool::CuHandle::new_detached(vec![0u8; 32]);
2058 let io = payload_io_stats(&payload).expect("payload IO measurement should succeed");
2059
2060 assert_eq!(io.encoded_bytes, encoded_size(&payload));
2061 assert_eq!(
2062 io.resident_bytes,
2063 core::mem::size_of::<crate::pool::CuHandle<Vec<u8>>>() + 32
2064 );
2065 assert_eq!(io.handle_bytes, 32);
2066 }
2067
2068 #[test]
2069 fn runtime_execution_probe_roundtrip_marker() {
2070 let probe = RuntimeExecutionProbe::default();
2071 assert!(probe.marker().is_none());
2072 assert_eq!(probe.sequence(), 0);
2073
2074 probe.record(ExecutionMarker {
2075 component_id: ComponentId::new(7),
2076 step: CuComponentState::Process,
2077 culistid: Some(42),
2078 });
2079
2080 let marker = probe.marker().expect("marker should be available");
2081 assert_eq!(marker.component_id, ComponentId::new(7));
2082 assert!(matches!(marker.step, CuComponentState::Process));
2083 assert_eq!(marker.culistid, Some(42));
2084 assert_eq!(probe.sequence(), 1);
2085 }
2086}