limen_core/telemetry.rs
1//! Telemetry primitives for Limen runtimes.
2//!
3//! This module provides `no_std`-friendly metrics, structured telemetry events,
4//! and timing spans that can be used by runtimes without imposing any logging,
5//! allocation, or I/O policy.
6
7pub mod event_message;
8pub mod graph_telemetry;
9pub mod sink;
10
11#[cfg(feature = "std")]
12pub mod concurrent;
13
14use core::fmt;
15
16use crate::policy::WatermarkState;
17use crate::types::{EdgeIndex, NodeIndex};
18use event_message::EventMessage;
19use sink::write_u64;
20
21// ====================== Core telemetry trait and keys ===================
22
23/// Core interface for collecting runtime metrics and structured telemetry events.
24///
25/// This trait is intentionally minimal and I/O-agnostic so that it can be
26/// implemented in both `no_std` and `std` environments. Implementations are free to
27/// ignore any subset of calls.
28pub trait Telemetry {
29 /// Compile-time flag indicating whether this telemetry implementation
30 /// wants metrics (counters, gauges, latencies) at all.
31 ///
32 /// Runtimes can use this to completely compile out metric collection
33 /// when `METRICS_ENABLED` is `false` for a given `Telemetry` type.
34 const METRICS_ENABLED: bool = true;
35
36 /// Compile-time flag indicating whether this telemetry implementation
37 /// ever produces structured events.
38 ///
39 /// When this is `false`, runtimes can skip both the construction of
40 /// `TelemetryEvent` values and any calls to `events_enabled()`,
41 /// allowing event handling code to compile out entirely.
42 const EVENTS_STATICALLY_ENABLED: bool = true;
43
44 /// Increment a counter metric identified by the given key.
45 ///
46 /// Counters are monotonically increasing and are typically used for counts such
47 /// as processed messages, dropped messages, or deadline misses.
48 fn incr_counter(&mut self, key: TelemetryKey, delta: u64);
49
50 /// Set a gauge metric identified by the given key.
51 ///
52 /// Gauges represent the latest value of a quantity such as queue depth or
53 /// current occupancy.
54 fn set_gauge(&mut self, key: TelemetryKey, value: u64);
55
56 /// Record a latency sample in nanoseconds for the given key.
57 ///
58 /// Implementations are free to aggregate these values as histograms, rolling
59 /// averages, or to ignore them.
60 fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64);
61
62 /// Optional: push a snapshot of aggregated metrics to the sink.
63 ///
64 /// Runtimes can call this periodically without knowing how metrics are
65 /// stored. Implementations that have no aggregated metrics can keep the
66 /// default no-op.
67 #[inline]
68 fn push_metrics(&mut self) {}
69
70 /// Return true if this telemetry collector wants structured events.
71 ///
72 /// Runtimes and nodes can use this to avoid constructing `TelemetryEvent`
73 /// values when events are disabled, keeping the hot path as cheap as possible.
74 #[inline]
75 fn events_enabled(&self) -> bool {
76 false
77 }
78
79 /// Emit a structured telemetry event.
80 ///
81 /// The default implementation is a no operation so that simple collectors can
82 /// ignore structured events entirely.
83 #[inline]
84 fn push_event(&mut self, _event: TelemetryEvent) {}
85
86 /// Flush any buffered telemetry data to the underlying sink.
87 ///
88 /// The default implementation is a no operation. Implementations that buffer
89 /// data or write to external sinks can use this to force a drain.
90 #[inline]
91 fn flush(&mut self) {}
92}
93
94/// Compact identifier for a metric or latency sample.
95///
96/// Keys combine a namespace, an integer identifier, and a logical kind so that
97/// backends can map them to their own internal representation.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
99pub struct TelemetryKey {
100 /// Namespace that this key belongs to (node, edge, or runtime).
101 ns: TelemetryNs,
102 /// Integer identifier within the namespace (for example node index).
103 id: u32,
104 /// Logical kind of metric represented by this key.
105 kind: TelemetryKind,
106}
107
108impl TelemetryKey {
109 /// Construct a key for a node level metric.
110 ///
111 /// The `node_id` is the zero based index of the node in the graph.
112 #[inline]
113 pub const fn node(node_id: u32, kind: TelemetryKind) -> Self {
114 Self {
115 ns: TelemetryNs::Node,
116 id: node_id,
117 kind,
118 }
119 }
120
121 /// Construct a key for an edge level metric.
122 ///
123 /// The `edge_id` is the zero based index of the edge in the graph.
124 #[inline]
125 pub const fn edge(edge_id: u32, kind: TelemetryKind) -> Self {
126 Self {
127 ns: TelemetryNs::Edge,
128 id: edge_id,
129 kind,
130 }
131 }
132
133 /// Construct a key for a runtime level metric.
134 ///
135 /// The identifier is currently always zero and reserved for future use.
136 #[inline]
137 pub const fn runtime(kind: TelemetryKind) -> Self {
138 Self {
139 ns: TelemetryNs::Runtime,
140 id: 0,
141 kind,
142 }
143 }
144
145 /// Construct a compact key for a node port metric.
146 ///
147 /// The identifier encodes the node identifier, the port index, and whether
148 /// this is an input or output port into a single integer.
149 #[inline]
150 pub const fn node_port(
151 node_id: u32,
152 port_index: u16,
153 is_output: bool,
154 kind: TelemetryKind,
155 ) -> Self {
156 let enc = ((node_id & 0x000F_FFFF) << 12)
157 | (((is_output as u32) & 0x1) << 11)
158 | (port_index as u32 & 0x7FF);
159 Self {
160 ns: TelemetryNs::Node,
161 id: enc,
162 kind,
163 }
164 }
165
166 /// Return the namespace.
167 #[inline]
168 pub const fn ns(&self) -> &TelemetryNs {
169 &self.ns
170 }
171
172 /// Return the identifier.
173 #[inline]
174 pub const fn id(&self) -> &u32 {
175 &self.id
176 }
177
178 /// Return the logical kind.
179 #[inline]
180 pub const fn kind(&self) -> &TelemetryKind {
181 &self.kind
182 }
183}
184
185/// Logical namespace for telemetry keys.
186///
187/// Separates metrics that describe nodes, edges, and the runtime itself.
188#[non_exhaustive]
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
190pub enum TelemetryNs {
191 /// Node level metrics such as processed counts and latencies.
192 Node,
193 /// Edge level metrics such as queue depth.
194 Edge,
195 /// Runtime level metrics that are not tied to a particular node or edge.
196 Runtime,
197}
198
199/// Logical kind of metric represented by a telemetry key.
200///
201/// These kinds are interpreted by collectors such as fixed and dynamic telemetry
202/// stores and are intentionally small and generic.
203#[non_exhaustive]
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
205pub enum TelemetryKind {
206 /// Number of items successfully processed.
207 Processed,
208 /// Number of items dropped or discarded.
209 Dropped,
210 /// Number of deadline misses observed.
211 DeadlineMiss,
212 /// Current depth of a queue or buffer.
213 QueueDepth,
214 /// Latency measurement in nanoseconds.
215 Latency,
216 /// Number of ingress messages received.
217 IngressMsgs,
218 /// Number of egress messages emitted.
219 EgressMsgs,
220}
221
222/// Unique identifier for a running graph instance.
223///
224/// This allows telemetry to distinguish between multiple graphs managed by the
225/// same runtime.
226pub type GraphInstanceId = u32;
227
228/// High level classification of a node level error used in telemetry.
229///
230/// This mirrors `crate::errors::NodeErrorKind` so that telemetry can
231/// faithfully report the scheduler-visible error semantics without
232/// inventing additional information that is not available at this layer.
233#[non_exhaustive]
234#[derive(Copy, Clone, Debug)]
235pub enum NodeStepError {
236 /// Inputs were not available to progress this node.
237 NoInput,
238 /// Outputs could not be enqueued due to backpressure.
239 Backpressured,
240 /// An execution budget or deadline was exceeded.
241 OverBudget,
242 /// External dependency (device, transport) was unavailable or timed out.
243 ExternalUnavailable,
244 /// A generic failure in node logic.
245 ExecutionFailed,
246}
247
248/// Structured telemetry produced for each node step.
249///
250/// A node step represents a single scheduling decision in which a node consumes
251/// zero or more input messages and produces zero or more output messages.
252#[non_exhaustive]
253#[derive(Copy, Clone, Debug)]
254pub struct NodeStepTelemetry {
255 /// Identifier of the graph instance this node belongs to.
256 graph_id: GraphInstanceId,
257 /// Index of the node within the graph.
258 node_index: NodeIndex,
259 /// Optional static node name for debugging and correlation.
260 node_name: Option<&'static str>,
261
262 /// Start timestamp of the step in nanoseconds since an arbitrary epoch.
263 timestamp_start_ns: u64,
264 /// End timestamp of the step in nanoseconds since an arbitrary epoch.
265 timestamp_end_ns: u64,
266 /// Duration of the step in nanoseconds.
267 duration_ns: u64,
268
269 /// Number of messages processed by this step (batch size or 1 for single-message).
270 processed_count: u64,
271
272 /// Optional absolute deadline in nanoseconds for this step.
273 deadline_ns: Option<u64>,
274 /// Whether the deadline was missed during this step.
275 deadline_missed: bool,
276
277 /// Optional high level error classification for this step.
278 error_kind: Option<NodeStepError>,
279}
280
281impl NodeStepTelemetry {
282 /// Construct a new `NodeStepTelemetry` record.
283 #[inline]
284 #[allow(clippy::too_many_arguments)]
285 pub const fn new(
286 graph_id: GraphInstanceId,
287 node_index: NodeIndex,
288 node_name: Option<&'static str>,
289 timestamp_start_ns: u64,
290 timestamp_end_ns: u64,
291 duration_ns: u64,
292 processed_count: u64,
293 deadline_ns: Option<u64>,
294 deadline_missed: bool,
295 error_kind: Option<NodeStepError>,
296 ) -> Self {
297 Self {
298 graph_id,
299 node_index,
300 node_name,
301 timestamp_start_ns,
302 timestamp_end_ns,
303 duration_ns,
304 processed_count,
305 deadline_ns,
306 deadline_missed,
307 error_kind,
308 }
309 }
310
311 /// Returns the identifier of the graph instance this step belongs to.
312 #[inline]
313 pub const fn graph_id(&self) -> &GraphInstanceId {
314 &self.graph_id
315 }
316
317 /// Returns the index of the node within the graph.
318 #[inline]
319 pub const fn node_index(&self) -> &NodeIndex {
320 &self.node_index
321 }
322
323 /// Returns the optional static node name associated with this step.
324 #[inline]
325 pub const fn node_name(&self) -> &Option<&'static str> {
326 &self.node_name
327 }
328
329 /// Returns the step start timestamp in nanoseconds since an arbitrary epoch.
330 #[inline]
331 pub const fn timestamp_start_ns(&self) -> &u64 {
332 &self.timestamp_start_ns
333 }
334
335 /// Returns the step end timestamp in nanoseconds since an arbitrary epoch.
336 #[inline]
337 pub const fn timestamp_end_ns(&self) -> &u64 {
338 &self.timestamp_end_ns
339 }
340
341 /// Returns the step duration in nanoseconds.
342 #[inline]
343 pub const fn duration_ns(&self) -> &u64 {
344 &self.duration_ns
345 }
346
347 /// Returns the number of messages processed by this step.
348 #[inline]
349 pub const fn processed_count(&self) -> &u64 {
350 &self.processed_count
351 }
352
353 /// Returns the optional absolute deadline for this step in nanoseconds.
354 #[inline]
355 pub const fn deadline_ns(&self) -> &Option<u64> {
356 &self.deadline_ns
357 }
358
359 /// Returns whether the step exceeded its deadline.
360 #[inline]
361 pub const fn deadline_missed(&self) -> &bool {
362 &self.deadline_missed
363 }
364
365 /// Returns the optional high-level error classification for this step.
366 #[inline]
367 pub const fn error_kind(&self) -> &Option<NodeStepError> {
368 &self.error_kind
369 }
370}
371
372/// Structured snapshot describing the state of a single edge.
373///
374/// These snapshots are typically taken by runtimes when they want to record
375/// backpressure or queue depth for a link between nodes.
376#[non_exhaustive]
377#[derive(Copy, Clone, Debug)]
378pub struct EdgeSnapshotTelemetry {
379 /// Identifier of the graph instance this edge belongs to.
380 graph_id: GraphInstanceId,
381 /// Index of the edge within the graph.
382 edge_index: EdgeIndex,
383 /// Index of the source node for this edge.
384 source_node_index: NodeIndex,
385 /// Index of the target node for this edge.
386 target_node_index: NodeIndex,
387
388 /// Timestamp of the snapshot in nanoseconds since an arbitrary epoch.
389 timestamp_ns: u64,
390 /// Current occupancy of the edge buffer.
391 current_occupancy: u32,
392 /// Configured soft watermark for this edge.
393 soft_watermark: u32,
394 /// Configured hard watermark for this edge.
395 hard_watermark: u32,
396 /// Current watermark state relative to the configured thresholds.
397 watermark_state: WatermarkState,
398}
399
400impl EdgeSnapshotTelemetry {
401 /// Creates a new snapshot record for a single edge.
402 #[inline]
403 #[allow(clippy::too_many_arguments)]
404 pub const fn new(
405 graph_id: GraphInstanceId,
406 edge_index: EdgeIndex,
407 source_node_index: NodeIndex,
408 target_node_index: NodeIndex,
409 timestamp_ns: u64,
410 current_occupancy: u32,
411 soft_watermark: u32,
412 hard_watermark: u32,
413 watermark_state: WatermarkState,
414 ) -> Self {
415 Self {
416 graph_id,
417 edge_index,
418 source_node_index,
419 target_node_index,
420 timestamp_ns,
421 current_occupancy,
422 soft_watermark,
423 hard_watermark,
424 watermark_state,
425 }
426 }
427
428 /// Returns the identifier of the graph instance this edge belongs to.
429 #[inline]
430 pub const fn graph_id(&self) -> &GraphInstanceId {
431 &self.graph_id
432 }
433
434 /// Returns the index of the edge within the graph.
435 #[inline]
436 pub const fn edge_index(&self) -> &EdgeIndex {
437 &self.edge_index
438 }
439
440 /// Returns the index of the source node for this edge.
441 #[inline]
442 pub const fn source_node_index(&self) -> &NodeIndex {
443 &self.source_node_index
444 }
445
446 /// Returns the index of the target node for this edge.
447 #[inline]
448 pub const fn target_node_index(&self) -> &NodeIndex {
449 &self.target_node_index
450 }
451
452 /// Returns the snapshot timestamp in nanoseconds since an arbitrary epoch.
453 #[inline]
454 pub const fn timestamp_ns(&self) -> &u64 {
455 &self.timestamp_ns
456 }
457
458 /// Returns the current occupancy of the edge buffer.
459 #[inline]
460 pub const fn current_occupancy(&self) -> &u32 {
461 &self.current_occupancy
462 }
463
464 /// Returns the configured soft watermark for this edge.
465 #[inline]
466 pub const fn soft_watermark(&self) -> &u32 {
467 &self.soft_watermark
468 }
469
470 /// Returns the configured hard watermark for this edge.
471 #[inline]
472 pub const fn hard_watermark(&self) -> &u32 {
473 &self.hard_watermark
474 }
475
476 /// Returns the current watermark state relative to the configured thresholds.
477 #[inline]
478 pub const fn watermark_state(&self) -> &WatermarkState {
479 &self.watermark_state
480 }
481}
482
483/// Classification of runtime level events that are not tied to a single node.
484///
485/// These events are useful for monitoring graph lifecycle, connectivity, and
486/// data quality issues.
487#[non_exhaustive]
488#[derive(Copy, Clone, Debug)]
489pub enum RuntimeTelemetryEventKind {
490 /// A graph instance has started running.
491 GraphStarted,
492 /// A graph instance has stopped cleanly.
493 GraphStopped,
494 /// A graph instance has panicked or aborted unexpectedly.
495 GraphPanicked,
496 /// A sensor connection has been lost.
497 SensorDisconnected,
498 /// A sensor connection has been reestablished.
499 SensorRecovered,
500 /// A model failed to load or initialize.
501 ModelLoadFailed,
502 /// A model has recovered after a previous failure.
503 ModelRecovered,
504 /// The message broker connection has been lost.
505 MqttDisconnected,
506 /// The message broker connection has been reestablished.
507 MqttRecovered,
508 /// A gap in the input data stream has been detected.
509 DataGapDetected,
510 /// Invalid or malformed data has been observed.
511 InvalidDataSeen,
512}
513
514/// Structured runtime level telemetry event.
515///
516/// These events describe lifecycle transitions, connectivity changes, and
517/// coarse grained data quality issues at the graph level.
518#[non_exhaustive]
519#[derive(Copy, Clone, Debug)]
520pub struct RuntimeTelemetryEvent {
521 /// Identifier of the graph instance this event refers to.
522 graph_id: GraphInstanceId,
523 /// Timestamp of the event in nanoseconds since an arbitrary epoch.
524 timestamp_ns: u64,
525 /// Kind of runtime event that occurred.
526 event_kind: RuntimeTelemetryEventKind,
527 /// Optional static message with additional context.
528 ///
529 /// NOTE: `message` is rendered as the final `msg=` field in `fmt_event`.
530 /// It must not contain newlines; spaces are allowed and are treated
531 /// as part of the message up to end-of-line.
532 message: Option<EventMessage>,
533}
534
535impl RuntimeTelemetryEvent {
536 /// Creates a new runtime telemetry event record.
537 #[inline]
538 pub const fn new(
539 graph_id: GraphInstanceId,
540 timestamp_ns: u64,
541 event_kind: RuntimeTelemetryEventKind,
542 message: Option<EventMessage>,
543 ) -> Self {
544 Self {
545 graph_id,
546 timestamp_ns,
547 event_kind,
548 message,
549 }
550 }
551
552 /// Returns the identifier of the graph instance this event refers to.
553 #[inline]
554 pub const fn graph_id(&self) -> &GraphInstanceId {
555 &self.graph_id
556 }
557
558 /// Returns the event timestamp in nanoseconds since an arbitrary epoch.
559 #[inline]
560 pub const fn timestamp_ns(&self) -> &u64 {
561 &self.timestamp_ns
562 }
563
564 /// Returns the kind of runtime event that occurred.
565 #[inline]
566 pub const fn event_kind(&self) -> &RuntimeTelemetryEventKind {
567 &self.event_kind
568 }
569
570 /// Returns the optional static message associated with this event.
571 #[inline]
572 pub const fn message(&self) -> &Option<EventMessage> {
573 &self.message
574 }
575}
576
577/// Discriminated union of all structured telemetry events.
578///
579/// This is the type carried by event writers and is the payload for all
580/// structured telemetry emission.
581#[non_exhaustive]
582#[derive(Copy, Clone, Debug)]
583pub enum TelemetryEvent {
584 /// Node level timing and throughput information for a single step.
585 NodeStep(NodeStepTelemetry),
586 /// Edge level snapshot representing queue state and watermarks.
587 EdgeSnapshot(EdgeSnapshotTelemetry),
588 /// Runtime level lifecycle and connectivity event.
589 Runtime(RuntimeTelemetryEvent),
590}
591
592impl TelemetryEvent {
593 /// Creates a telemetry event from a node step telemetry record.
594 #[inline]
595 pub const fn node_step(ev: NodeStepTelemetry) -> Self {
596 TelemetryEvent::NodeStep(ev)
597 }
598
599 /// Creates a telemetry event from an edge snapshot telemetry record.
600 #[inline]
601 pub const fn edge_snapshot(ev: EdgeSnapshotTelemetry) -> Self {
602 TelemetryEvent::EdgeSnapshot(ev)
603 }
604
605 /// Creates a telemetry event from a runtime telemetry event record.
606 #[inline]
607 pub const fn runtime(ev: RuntimeTelemetryEvent) -> Self {
608 TelemetryEvent::Runtime(ev)
609 }
610}
611
612/// Per node metrics aggregated by fixed and dynamic collectors.
613///
614/// These metrics are updated via the `Telemetry` trait and represent simple
615/// counters and latency aggregates.
616#[non_exhaustive]
617#[derive(Debug, Clone, Copy)]
618pub struct NodeMetrics {
619 /// Number of items successfully processed by the node.
620 processed: u64,
621 /// Number of items dropped by the node (including deadline misses).
622 dropped: u64,
623 /// Number of ingress messages observed by the node.
624 ingress: u64,
625 /// Number of egress messages emitted by the node.
626 egress: u64,
627 /// Sum of all recorded latencies in nanoseconds.
628 lat_sum: u64,
629 /// Number of latency samples recorded.
630 lat_cnt: u64,
631 /// Maximum latency observed in nanoseconds.
632 lat_max: u64,
633 /// Number of deadline misses observed for this node.
634 deadline_miss_count: u64,
635}
636
637impl Default for NodeMetrics {
638 fn default() -> Self {
639 Self::new()
640 }
641}
642
643impl NodeMetrics {
644 /// Create a new zero initialized metrics record.
645 pub const fn new() -> Self {
646 Self {
647 processed: 0,
648 dropped: 0,
649 ingress: 0,
650 egress: 0,
651 lat_sum: 0,
652 lat_cnt: 0,
653 lat_max: 0,
654 deadline_miss_count: 0,
655 }
656 }
657
658 /// Returns the number of items successfully processed by the node.
659 #[inline]
660 pub const fn processed(&self) -> &u64 {
661 &self.processed
662 }
663
664 /// Returns the number of items dropped by the node.
665 #[inline]
666 pub const fn dropped(&self) -> &u64 {
667 &self.dropped
668 }
669
670 /// Returns the number of ingress messages observed by the node.
671 #[inline]
672 pub const fn ingress(&self) -> &u64 {
673 &self.ingress
674 }
675
676 /// Returns the number of egress messages emitted by the node.
677 #[inline]
678 pub const fn egress(&self) -> &u64 {
679 &self.egress
680 }
681
682 /// Returns the sum of all recorded latencies in nanoseconds.
683 #[inline]
684 pub const fn lat_sum(&self) -> &u64 {
685 &self.lat_sum
686 }
687
688 /// Returns the number of latency samples recorded.
689 #[inline]
690 pub const fn lat_cnt(&self) -> &u64 {
691 &self.lat_cnt
692 }
693
694 /// Returns the maximum latency observed in nanoseconds.
695 #[inline]
696 pub const fn lat_max(&self) -> &u64 {
697 &self.lat_max
698 }
699
700 /// Returns the number of deadline misses observed for this node.
701 #[inline]
702 pub const fn deadline_miss_count(&self) -> &u64 {
703 &self.deadline_miss_count
704 }
705
706 /// Increment `processed` by `delta` (saturating).
707 #[inline]
708 pub fn inc_processed(&mut self, delta: u64) {
709 self.processed = self.processed.saturating_add(delta);
710 }
711
712 /// Subtract from `processed` (saturating at zero).
713 #[inline]
714 pub fn dec_processed(&mut self, delta: u64) {
715 self.processed = self.processed.saturating_sub(delta);
716 }
717
718 /// Set `processed` to `v`.
719 #[inline]
720 pub fn set_processed(&mut self, v: u64) {
721 self.processed = v;
722 }
723
724 /// Increment `dropped` by `delta` (saturating).
725 #[inline]
726 pub fn inc_dropped(&mut self, delta: u64) {
727 self.dropped = self.dropped.saturating_add(delta);
728 }
729
730 /// Subtract from `dropped` (saturating at zero).
731 #[inline]
732 pub fn dec_dropped(&mut self, delta: u64) {
733 self.dropped = self.dropped.saturating_sub(delta);
734 }
735
736 /// Set `dropped` to `v`.
737 #[inline]
738 pub fn set_dropped(&mut self, v: u64) {
739 self.dropped = v;
740 }
741
742 /// Increment `ingress` by `delta` (saturating).
743 #[inline]
744 pub fn inc_ingress(&mut self, delta: u64) {
745 self.ingress = self.ingress.saturating_add(delta);
746 }
747
748 /// Subtract from `ingress` (saturating at zero).
749 #[inline]
750 pub fn dec_ingress(&mut self, delta: u64) {
751 self.ingress = self.ingress.saturating_sub(delta);
752 }
753
754 /// Set `ingress` to `v`.
755 #[inline]
756 pub fn set_ingress(&mut self, v: u64) {
757 self.ingress = v;
758 }
759
760 /// Increment `egress` by `delta` (saturating).
761 #[inline]
762 pub fn inc_egress(&mut self, delta: u64) {
763 self.egress = self.egress.saturating_add(delta);
764 }
765
766 /// Subtract from `egress` (saturating at zero).
767 #[inline]
768 pub fn dec_egress(&mut self, delta: u64) {
769 self.egress = self.egress.saturating_sub(delta);
770 }
771
772 /// Set `egress` to `v`.
773 #[inline]
774 pub fn set_egress(&mut self, v: u64) {
775 self.egress = v;
776 }
777
778 /// Record a latency sample in nanoseconds.
779 ///
780 /// This updates `lat_sum`, `lat_cnt`, and `lat_max`. Uses saturating
781 /// addition to avoid overflow on long running systems.
782 #[inline]
783 pub fn record_latency_ns(&mut self, value_ns: u64) {
784 self.lat_sum = self.lat_sum.saturating_add(value_ns);
785 self.lat_cnt = self.lat_cnt.saturating_add(1);
786 if value_ns > self.lat_max {
787 self.lat_max = value_ns;
788 }
789 }
790
791 /// Merge another `NodeMetrics` into `self`. This uses saturating addition
792 /// for counters and takes the maximum for latency max.
793 #[inline]
794 pub fn merge_from(&mut self, other: &Self) {
795 self.processed = self.processed.saturating_add(other.processed);
796 self.dropped = self.dropped.saturating_add(other.dropped);
797 self.ingress = self.ingress.saturating_add(other.ingress);
798 self.egress = self.egress.saturating_add(other.egress);
799 self.lat_sum = self.lat_sum.saturating_add(other.lat_sum);
800 self.lat_cnt = self.lat_cnt.saturating_add(other.lat_cnt);
801 if other.lat_max > self.lat_max {
802 self.lat_max = other.lat_max;
803 }
804 self.deadline_miss_count = self
805 .deadline_miss_count
806 .saturating_add(other.deadline_miss_count);
807 }
808
809 /// Increment the deadline miss counter by `delta` (saturating).
810 #[inline]
811 pub fn inc_deadline_miss_count(&mut self, delta: u64) {
812 self.deadline_miss_count = self.deadline_miss_count.saturating_add(delta);
813 }
814
815 /// Reset all counters and aggregates to zero.
816 #[inline]
817 pub fn reset(&mut self) {
818 *self = Self::new();
819 }
820}
821
822/// Per edge metrics aggregated by fixed and dynamic collectors.
823#[non_exhaustive]
824#[derive(Debug, Clone, Copy)]
825pub struct EdgeMetrics {
826 /// Current queue depth for the edge.
827 queue_depth: u32,
828}
829
830impl Default for EdgeMetrics {
831 fn default() -> Self {
832 Self::new()
833 }
834}
835
836impl EdgeMetrics {
837 /// Create a new metrics record with a queue depth of zero.
838 pub const fn new() -> Self {
839 Self { queue_depth: 0 }
840 }
841
842 /// Returns the current queue depth for the edge.
843 #[inline]
844 pub const fn queue_depth(&self) -> &u32 {
845 &self.queue_depth
846 }
847
848 /// Sets the current queue depth for the edge.
849 #[inline]
850 pub fn set_queue_depth(&mut self, v: u32) {
851 self.queue_depth = v;
852 }
853
854 /// Increment queue depth by `delta` (saturating).
855 #[inline]
856 pub fn inc_queue_depth(&mut self, delta: u32) {
857 self.queue_depth = self.queue_depth.saturating_add(delta);
858 }
859
860 /// Decrement queue depth by `delta` (saturating at zero).
861 #[inline]
862 pub fn dec_queue_depth(&mut self, delta: u32) {
863 self.queue_depth = self.queue_depth.saturating_sub(delta);
864 }
865
866 /// Merge another `EdgeMetrics` into `self`. For queue depth we follow the
867 /// last-writer-wins semantics used by higher-level merge logic.
868 #[inline]
869 pub fn merge_from(&mut self, other: &Self) {
870 self.queue_depth = other.queue_depth;
871 }
872
873 /// Reset all counters and aggregates to zero.
874 #[inline]
875 pub fn reset(&mut self) {
876 *self = Self::new();
877 }
878}
879
880/// Per graph telemetry metrics.
881#[derive(Debug, Clone, Copy)]
882pub struct GraphMetrics<const MAX_NODES: usize, const MAX_EDGES: usize> {
883 /// Graph id.
884 id: u32,
885 /// Per-node metrics.
886 nodes: [NodeMetrics; MAX_NODES],
887 /// Per-edge metrics.
888 edges: [EdgeMetrics; MAX_EDGES],
889}
890
891impl<const MAX_NODES: usize, const MAX_EDGES: usize> GraphMetrics<MAX_NODES, MAX_EDGES> {
892 /// Create a new metrics record with a queue depth of zero.
893 pub const fn new(id: u32) -> Self {
894 Self {
895 id,
896 nodes: [NodeMetrics::new(); MAX_NODES],
897 edges: [EdgeMetrics::new(); MAX_EDGES],
898 }
899 }
900
901 /// Return the identifier of this graph.
902 pub fn id(&self) -> &u32 {
903 &self.id
904 }
905
906 /// Return the metrics for all nodes.
907 pub fn nodes(&self) -> &[NodeMetrics; MAX_NODES] {
908 &self.nodes
909 }
910
911 /// Return the metrics for all edges.
912 pub fn edges(&self) -> &[EdgeMetrics; MAX_EDGES] {
913 &self.edges
914 }
915}
916
917impl<const MAX_NODES: usize, const MAX_EDGES: usize> GraphMetrics<MAX_NODES, MAX_EDGES> {
918 /// Format this graph metrics record as multi-line text.
919 ///
920 /// The first line contains the graph identifier. Each subsequent line
921 /// contains metrics for a single node or edge, indented by two spaces and
922 /// prefixed with `node id:` or `edge id:` respectively.
923 pub fn fmt<W: fmt::Write>(&self, w: &mut W) -> fmt::Result {
924 // First line: graph id
925 w.write_str("graph id: ")?;
926 write_u64(w, self.id as u64)?;
927 w.write_str("\n")?;
928
929 // Nodes
930 for i in 0..MAX_NODES {
931 let m = &self.nodes[i];
932 w.write_str(" node id: ")?;
933 write_u64(w, i as u64)?;
934 w.write_str(" processed=")?;
935 write_u64(w, m.processed)?;
936 w.write_str(" dropped=")?;
937 write_u64(w, m.dropped)?;
938 w.write_str(" ingress=")?;
939 write_u64(w, m.ingress)?;
940 w.write_str(" egress=")?;
941 write_u64(w, m.egress)?;
942 w.write_str(" lat_sum=")?;
943 write_u64(w, m.lat_sum)?;
944 w.write_str(" lat_cnt=")?;
945 write_u64(w, m.lat_cnt)?;
946 w.write_str(" lat_max=")?;
947 write_u64(w, m.lat_max)?;
948 w.write_str(" deadline_miss_count=")?;
949 write_u64(w, m.deadline_miss_count)?;
950 w.write_str("\n")?;
951 }
952
953 // Edges
954 for i in 0..MAX_EDGES {
955 let m = &self.edges[i];
956 w.write_str(" edge id: ")?;
957 write_u64(w, i as u64)?;
958 w.write_str(" queue_depth=")?;
959 write_u64(w, m.queue_depth as u64)?;
960 w.write_str("\n")?;
961 }
962
963 Ok(())
964 }
965}
966
967/// Telemetry implementation that discards all metrics and events.
968///
969/// This is useful in tests and extremely constrained environments where
970/// telemetry is not required.
971#[derive(Debug, Default, Clone, Copy)]
972pub struct NoopTelemetry;
973
974impl Telemetry for NoopTelemetry {
975 const METRICS_ENABLED: bool = false;
976 const EVENTS_STATICALLY_ENABLED: bool = false;
977
978 #[inline]
979 fn incr_counter(&mut self, _key: TelemetryKey, _delta: u64) {}
980 #[inline]
981 fn set_gauge(&mut self, _key: TelemetryKey, _value: u64) {}
982 #[inline]
983 fn record_latency_ns(&mut self, _key: TelemetryKey, _value_ns: u64) {}
984}
985
986impl Telemetry for () {
987 const METRICS_ENABLED: bool = false;
988 const EVENTS_STATICALLY_ENABLED: bool = false;
989
990 #[inline]
991 fn incr_counter(&mut self, _key: TelemetryKey, _delta: u64) {}
992
993 #[inline]
994 fn set_gauge(&mut self, _key: TelemetryKey, _value: u64) {}
995
996 #[inline]
997 fn record_latency_ns(&mut self, _key: TelemetryKey, _value_ns: u64) {}
998}