Skip to main content

limen_core/
telemetry.rs

1//! Telemetry primitives for Limen runtimes.
2//!
3//! This module provides `no_std`-friendly metrics, structured telemetry events,
4//! and timing spans that can be used by runtimes without imposing any logging,
5//! allocation, or I/O policy.
6
7pub mod event_message;
8pub mod graph_telemetry;
9pub mod sink;
10
11#[cfg(feature = "std")]
12pub mod concurrent;
13
14use core::fmt;
15
16use crate::policy::WatermarkState;
17use crate::types::{EdgeIndex, NodeIndex};
18use event_message::EventMessage;
19use sink::write_u64;
20
21// ====================== Core telemetry trait and keys ===================
22
23/// Core interface for collecting runtime metrics and structured telemetry events.
24///
25/// This trait is intentionally minimal and I/O-agnostic so that it can be
26/// implemented in both `no_std` and `std` environments. Implementations are free to
27/// ignore any subset of calls.
28pub trait Telemetry {
29    /// Compile-time flag indicating whether this telemetry implementation
30    /// wants metrics (counters, gauges, latencies) at all.
31    ///
32    /// Runtimes can use this to completely compile out metric collection
33    /// when `METRICS_ENABLED` is `false` for a given `Telemetry` type.
34    const METRICS_ENABLED: bool = true;
35
36    /// Compile-time flag indicating whether this telemetry implementation
37    /// ever produces structured events.
38    ///
39    /// When this is `false`, runtimes can skip both the construction of
40    /// `TelemetryEvent` values and any calls to `events_enabled()`,
41    /// allowing event handling code to compile out entirely.
42    const EVENTS_STATICALLY_ENABLED: bool = true;
43
44    /// Increment a counter metric identified by the given key.
45    ///
46    /// Counters are monotonically increasing and are typically used for counts such
47    /// as processed messages, dropped messages, or deadline misses.
48    fn incr_counter(&mut self, key: TelemetryKey, delta: u64);
49
50    /// Set a gauge metric identified by the given key.
51    ///
52    /// Gauges represent the latest value of a quantity such as queue depth or
53    /// current occupancy.
54    fn set_gauge(&mut self, key: TelemetryKey, value: u64);
55
56    /// Record a latency sample in nanoseconds for the given key.
57    ///
58    /// Implementations are free to aggregate these values as histograms, rolling
59    /// averages, or to ignore them.
60    fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64);
61
62    /// Optional: push a snapshot of aggregated metrics to the sink.
63    ///
64    /// Runtimes can call this periodically without knowing how metrics are
65    /// stored. Implementations that have no aggregated metrics can keep the
66    /// default no-op.
67    #[inline]
68    fn push_metrics(&mut self) {}
69
70    /// Return true if this telemetry collector wants structured events.
71    ///
72    /// Runtimes and nodes can use this to avoid constructing `TelemetryEvent`
73    /// values when events are disabled, keeping the hot path as cheap as possible.
74    #[inline]
75    fn events_enabled(&self) -> bool {
76        false
77    }
78
79    /// Emit a structured telemetry event.
80    ///
81    /// The default implementation is a no operation so that simple collectors can
82    /// ignore structured events entirely.
83    #[inline]
84    fn push_event(&mut self, _event: TelemetryEvent) {}
85
86    /// Flush any buffered telemetry data to the underlying sink.
87    ///
88    /// The default implementation is a no operation. Implementations that buffer
89    /// data or write to external sinks can use this to force a drain.
90    #[inline]
91    fn flush(&mut self) {}
92}
93
94/// Compact identifier for a metric or latency sample.
95///
96/// Keys combine a namespace, an integer identifier, and a logical kind so that
97/// backends can map them to their own internal representation.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
99pub struct TelemetryKey {
100    /// Namespace that this key belongs to (node, edge, or runtime).
101    ns: TelemetryNs,
102    /// Integer identifier within the namespace (for example node index).
103    id: u32,
104    /// Logical kind of metric represented by this key.
105    kind: TelemetryKind,
106}
107
108impl TelemetryKey {
109    /// Construct a key for a node level metric.
110    ///
111    /// The `node_id` is the zero based index of the node in the graph.
112    #[inline]
113    pub const fn node(node_id: u32, kind: TelemetryKind) -> Self {
114        Self {
115            ns: TelemetryNs::Node,
116            id: node_id,
117            kind,
118        }
119    }
120
121    /// Construct a key for an edge level metric.
122    ///
123    /// The `edge_id` is the zero based index of the edge in the graph.
124    #[inline]
125    pub const fn edge(edge_id: u32, kind: TelemetryKind) -> Self {
126        Self {
127            ns: TelemetryNs::Edge,
128            id: edge_id,
129            kind,
130        }
131    }
132
133    /// Construct a key for a runtime level metric.
134    ///
135    /// The identifier is currently always zero and reserved for future use.
136    #[inline]
137    pub const fn runtime(kind: TelemetryKind) -> Self {
138        Self {
139            ns: TelemetryNs::Runtime,
140            id: 0,
141            kind,
142        }
143    }
144
145    /// Construct a compact key for a node port metric.
146    ///
147    /// The identifier encodes the node identifier, the port index, and whether
148    /// this is an input or output port into a single integer.
149    #[inline]
150    pub const fn node_port(
151        node_id: u32,
152        port_index: u16,
153        is_output: bool,
154        kind: TelemetryKind,
155    ) -> Self {
156        let enc = ((node_id & 0x000F_FFFF) << 12)
157            | (((is_output as u32) & 0x1) << 11)
158            | (port_index as u32 & 0x7FF);
159        Self {
160            ns: TelemetryNs::Node,
161            id: enc,
162            kind,
163        }
164    }
165
166    /// Return the namespace.
167    #[inline]
168    pub const fn ns(&self) -> &TelemetryNs {
169        &self.ns
170    }
171
172    /// Return the identifier.
173    #[inline]
174    pub const fn id(&self) -> &u32 {
175        &self.id
176    }
177
178    /// Return the logical kind.
179    #[inline]
180    pub const fn kind(&self) -> &TelemetryKind {
181        &self.kind
182    }
183}
184
185/// Logical namespace for telemetry keys.
186///
187/// Separates metrics that describe nodes, edges, and the runtime itself.
188#[non_exhaustive]
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
190pub enum TelemetryNs {
191    /// Node level metrics such as processed counts and latencies.
192    Node,
193    /// Edge level metrics such as queue depth.
194    Edge,
195    /// Runtime level metrics that are not tied to a particular node or edge.
196    Runtime,
197}
198
199/// Logical kind of metric represented by a telemetry key.
200///
201/// These kinds are interpreted by collectors such as fixed and dynamic telemetry
202/// stores and are intentionally small and generic.
203#[non_exhaustive]
204#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
205pub enum TelemetryKind {
206    /// Number of items successfully processed.
207    Processed,
208    /// Number of items dropped or discarded.
209    Dropped,
210    /// Number of deadline misses observed.
211    DeadlineMiss,
212    /// Current depth of a queue or buffer.
213    QueueDepth,
214    /// Latency measurement in nanoseconds.
215    Latency,
216    /// Number of ingress messages received.
217    IngressMsgs,
218    /// Number of egress messages emitted.
219    EgressMsgs,
220}
221
222/// Unique identifier for a running graph instance.
223///
224/// This allows telemetry to distinguish between multiple graphs managed by the
225/// same runtime.
226pub type GraphInstanceId = u32;
227
228/// High level classification of a node level error used in telemetry.
229///
230/// This mirrors `crate::errors::NodeErrorKind` so that telemetry can
231/// faithfully report the scheduler-visible error semantics without
232/// inventing additional information that is not available at this layer.
233#[non_exhaustive]
234#[derive(Copy, Clone, Debug)]
235pub enum NodeStepError {
236    /// Inputs were not available to progress this node.
237    NoInput,
238    /// Outputs could not be enqueued due to backpressure.
239    Backpressured,
240    /// An execution budget or deadline was exceeded.
241    OverBudget,
242    /// External dependency (device, transport) was unavailable or timed out.
243    ExternalUnavailable,
244    /// A generic failure in node logic.
245    ExecutionFailed,
246}
247
248/// Structured telemetry produced for each node step.
249///
250/// A node step represents a single scheduling decision in which a node consumes
251/// zero or more input messages and produces zero or more output messages.
252#[non_exhaustive]
253#[derive(Copy, Clone, Debug)]
254pub struct NodeStepTelemetry {
255    /// Identifier of the graph instance this node belongs to.
256    graph_id: GraphInstanceId,
257    /// Index of the node within the graph.
258    node_index: NodeIndex,
259    /// Optional static node name for debugging and correlation.
260    node_name: Option<&'static str>,
261
262    /// Start timestamp of the step in nanoseconds since an arbitrary epoch.
263    timestamp_start_ns: u64,
264    /// End timestamp of the step in nanoseconds since an arbitrary epoch.
265    timestamp_end_ns: u64,
266    /// Duration of the step in nanoseconds.
267    duration_ns: u64,
268
269    /// Number of messages processed by this step (batch size or 1 for single-message).
270    processed_count: u64,
271
272    /// Optional absolute deadline in nanoseconds for this step.
273    deadline_ns: Option<u64>,
274    /// Whether the deadline was missed during this step.
275    deadline_missed: bool,
276
277    /// Optional high level error classification for this step.
278    error_kind: Option<NodeStepError>,
279}
280
281impl NodeStepTelemetry {
282    /// Construct a new `NodeStepTelemetry` record.
283    #[inline]
284    #[allow(clippy::too_many_arguments)]
285    pub const fn new(
286        graph_id: GraphInstanceId,
287        node_index: NodeIndex,
288        node_name: Option<&'static str>,
289        timestamp_start_ns: u64,
290        timestamp_end_ns: u64,
291        duration_ns: u64,
292        processed_count: u64,
293        deadline_ns: Option<u64>,
294        deadline_missed: bool,
295        error_kind: Option<NodeStepError>,
296    ) -> Self {
297        Self {
298            graph_id,
299            node_index,
300            node_name,
301            timestamp_start_ns,
302            timestamp_end_ns,
303            duration_ns,
304            processed_count,
305            deadline_ns,
306            deadline_missed,
307            error_kind,
308        }
309    }
310
311    /// Returns the identifier of the graph instance this step belongs to.
312    #[inline]
313    pub const fn graph_id(&self) -> &GraphInstanceId {
314        &self.graph_id
315    }
316
317    /// Returns the index of the node within the graph.
318    #[inline]
319    pub const fn node_index(&self) -> &NodeIndex {
320        &self.node_index
321    }
322
323    /// Returns the optional static node name associated with this step.
324    #[inline]
325    pub const fn node_name(&self) -> &Option<&'static str> {
326        &self.node_name
327    }
328
329    /// Returns the step start timestamp in nanoseconds since an arbitrary epoch.
330    #[inline]
331    pub const fn timestamp_start_ns(&self) -> &u64 {
332        &self.timestamp_start_ns
333    }
334
335    /// Returns the step end timestamp in nanoseconds since an arbitrary epoch.
336    #[inline]
337    pub const fn timestamp_end_ns(&self) -> &u64 {
338        &self.timestamp_end_ns
339    }
340
341    /// Returns the step duration in nanoseconds.
342    #[inline]
343    pub const fn duration_ns(&self) -> &u64 {
344        &self.duration_ns
345    }
346
347    /// Returns the number of messages processed by this step.
348    #[inline]
349    pub const fn processed_count(&self) -> &u64 {
350        &self.processed_count
351    }
352
353    /// Returns the optional absolute deadline for this step in nanoseconds.
354    #[inline]
355    pub const fn deadline_ns(&self) -> &Option<u64> {
356        &self.deadline_ns
357    }
358
359    /// Returns whether the step exceeded its deadline.
360    #[inline]
361    pub const fn deadline_missed(&self) -> &bool {
362        &self.deadline_missed
363    }
364
365    /// Returns the optional high-level error classification for this step.
366    #[inline]
367    pub const fn error_kind(&self) -> &Option<NodeStepError> {
368        &self.error_kind
369    }
370}
371
372/// Structured snapshot describing the state of a single edge.
373///
374/// These snapshots are typically taken by runtimes when they want to record
375/// backpressure or queue depth for a link between nodes.
376#[non_exhaustive]
377#[derive(Copy, Clone, Debug)]
378pub struct EdgeSnapshotTelemetry {
379    /// Identifier of the graph instance this edge belongs to.
380    graph_id: GraphInstanceId,
381    /// Index of the edge within the graph.
382    edge_index: EdgeIndex,
383    /// Index of the source node for this edge.
384    source_node_index: NodeIndex,
385    /// Index of the target node for this edge.
386    target_node_index: NodeIndex,
387
388    /// Timestamp of the snapshot in nanoseconds since an arbitrary epoch.
389    timestamp_ns: u64,
390    /// Current occupancy of the edge buffer.
391    current_occupancy: u32,
392    /// Configured soft watermark for this edge.
393    soft_watermark: u32,
394    /// Configured hard watermark for this edge.
395    hard_watermark: u32,
396    /// Current watermark state relative to the configured thresholds.
397    watermark_state: WatermarkState,
398}
399
400impl EdgeSnapshotTelemetry {
401    /// Creates a new snapshot record for a single edge.
402    #[inline]
403    #[allow(clippy::too_many_arguments)]
404    pub const fn new(
405        graph_id: GraphInstanceId,
406        edge_index: EdgeIndex,
407        source_node_index: NodeIndex,
408        target_node_index: NodeIndex,
409        timestamp_ns: u64,
410        current_occupancy: u32,
411        soft_watermark: u32,
412        hard_watermark: u32,
413        watermark_state: WatermarkState,
414    ) -> Self {
415        Self {
416            graph_id,
417            edge_index,
418            source_node_index,
419            target_node_index,
420            timestamp_ns,
421            current_occupancy,
422            soft_watermark,
423            hard_watermark,
424            watermark_state,
425        }
426    }
427
428    /// Returns the identifier of the graph instance this edge belongs to.
429    #[inline]
430    pub const fn graph_id(&self) -> &GraphInstanceId {
431        &self.graph_id
432    }
433
434    /// Returns the index of the edge within the graph.
435    #[inline]
436    pub const fn edge_index(&self) -> &EdgeIndex {
437        &self.edge_index
438    }
439
440    /// Returns the index of the source node for this edge.
441    #[inline]
442    pub const fn source_node_index(&self) -> &NodeIndex {
443        &self.source_node_index
444    }
445
446    /// Returns the index of the target node for this edge.
447    #[inline]
448    pub const fn target_node_index(&self) -> &NodeIndex {
449        &self.target_node_index
450    }
451
452    /// Returns the snapshot timestamp in nanoseconds since an arbitrary epoch.
453    #[inline]
454    pub const fn timestamp_ns(&self) -> &u64 {
455        &self.timestamp_ns
456    }
457
458    /// Returns the current occupancy of the edge buffer.
459    #[inline]
460    pub const fn current_occupancy(&self) -> &u32 {
461        &self.current_occupancy
462    }
463
464    /// Returns the configured soft watermark for this edge.
465    #[inline]
466    pub const fn soft_watermark(&self) -> &u32 {
467        &self.soft_watermark
468    }
469
470    /// Returns the configured hard watermark for this edge.
471    #[inline]
472    pub const fn hard_watermark(&self) -> &u32 {
473        &self.hard_watermark
474    }
475
476    /// Returns the current watermark state relative to the configured thresholds.
477    #[inline]
478    pub const fn watermark_state(&self) -> &WatermarkState {
479        &self.watermark_state
480    }
481}
482
483/// Classification of runtime level events that are not tied to a single node.
484///
485/// These events are useful for monitoring graph lifecycle, connectivity, and
486/// data quality issues.
487#[non_exhaustive]
488#[derive(Copy, Clone, Debug)]
489pub enum RuntimeTelemetryEventKind {
490    /// A graph instance has started running.
491    GraphStarted,
492    /// A graph instance has stopped cleanly.
493    GraphStopped,
494    /// A graph instance has panicked or aborted unexpectedly.
495    GraphPanicked,
496    /// A sensor connection has been lost.
497    SensorDisconnected,
498    /// A sensor connection has been reestablished.
499    SensorRecovered,
500    /// A model failed to load or initialize.
501    ModelLoadFailed,
502    /// A model has recovered after a previous failure.
503    ModelRecovered,
504    /// The message broker connection has been lost.
505    MqttDisconnected,
506    /// The message broker connection has been reestablished.
507    MqttRecovered,
508    /// A gap in the input data stream has been detected.
509    DataGapDetected,
510    /// Invalid or malformed data has been observed.
511    InvalidDataSeen,
512}
513
514/// Structured runtime level telemetry event.
515///
516/// These events describe lifecycle transitions, connectivity changes, and
517/// coarse grained data quality issues at the graph level.
518#[non_exhaustive]
519#[derive(Copy, Clone, Debug)]
520pub struct RuntimeTelemetryEvent {
521    /// Identifier of the graph instance this event refers to.
522    graph_id: GraphInstanceId,
523    /// Timestamp of the event in nanoseconds since an arbitrary epoch.
524    timestamp_ns: u64,
525    /// Kind of runtime event that occurred.
526    event_kind: RuntimeTelemetryEventKind,
527    /// Optional static message with additional context.
528    ///
529    /// NOTE: `message` is rendered as the final `msg=` field in `fmt_event`.
530    /// It must not contain newlines; spaces are allowed and are treated
531    /// as part of the message up to end-of-line.
532    message: Option<EventMessage>,
533}
534
535impl RuntimeTelemetryEvent {
536    /// Creates a new runtime telemetry event record.
537    #[inline]
538    pub const fn new(
539        graph_id: GraphInstanceId,
540        timestamp_ns: u64,
541        event_kind: RuntimeTelemetryEventKind,
542        message: Option<EventMessage>,
543    ) -> Self {
544        Self {
545            graph_id,
546            timestamp_ns,
547            event_kind,
548            message,
549        }
550    }
551
552    /// Returns the identifier of the graph instance this event refers to.
553    #[inline]
554    pub const fn graph_id(&self) -> &GraphInstanceId {
555        &self.graph_id
556    }
557
558    /// Returns the event timestamp in nanoseconds since an arbitrary epoch.
559    #[inline]
560    pub const fn timestamp_ns(&self) -> &u64 {
561        &self.timestamp_ns
562    }
563
564    /// Returns the kind of runtime event that occurred.
565    #[inline]
566    pub const fn event_kind(&self) -> &RuntimeTelemetryEventKind {
567        &self.event_kind
568    }
569
570    /// Returns the optional static message associated with this event.
571    #[inline]
572    pub const fn message(&self) -> &Option<EventMessage> {
573        &self.message
574    }
575}
576
577/// Discriminated union of all structured telemetry events.
578///
579/// This is the type carried by event writers and is the payload for all
580/// structured telemetry emission.
581#[non_exhaustive]
582#[derive(Copy, Clone, Debug)]
583pub enum TelemetryEvent {
584    /// Node level timing and throughput information for a single step.
585    NodeStep(NodeStepTelemetry),
586    /// Edge level snapshot representing queue state and watermarks.
587    EdgeSnapshot(EdgeSnapshotTelemetry),
588    /// Runtime level lifecycle and connectivity event.
589    Runtime(RuntimeTelemetryEvent),
590}
591
592impl TelemetryEvent {
593    /// Creates a telemetry event from a node step telemetry record.
594    #[inline]
595    pub const fn node_step(ev: NodeStepTelemetry) -> Self {
596        TelemetryEvent::NodeStep(ev)
597    }
598
599    /// Creates a telemetry event from an edge snapshot telemetry record.
600    #[inline]
601    pub const fn edge_snapshot(ev: EdgeSnapshotTelemetry) -> Self {
602        TelemetryEvent::EdgeSnapshot(ev)
603    }
604
605    /// Creates a telemetry event from a runtime telemetry event record.
606    #[inline]
607    pub const fn runtime(ev: RuntimeTelemetryEvent) -> Self {
608        TelemetryEvent::Runtime(ev)
609    }
610}
611
612/// Per node metrics aggregated by fixed and dynamic collectors.
613///
614/// These metrics are updated via the `Telemetry` trait and represent simple
615/// counters and latency aggregates.
616#[non_exhaustive]
617#[derive(Debug, Clone, Copy)]
618pub struct NodeMetrics {
619    /// Number of items successfully processed by the node.
620    processed: u64,
621    /// Number of items dropped by the node (including deadline misses).
622    dropped: u64,
623    /// Number of ingress messages observed by the node.
624    ingress: u64,
625    /// Number of egress messages emitted by the node.
626    egress: u64,
627    /// Sum of all recorded latencies in nanoseconds.
628    lat_sum: u64,
629    /// Number of latency samples recorded.
630    lat_cnt: u64,
631    /// Maximum latency observed in nanoseconds.
632    lat_max: u64,
633    /// Number of deadline misses observed for this node.
634    deadline_miss_count: u64,
635}
636
637impl Default for NodeMetrics {
638    fn default() -> Self {
639        Self::new()
640    }
641}
642
643impl NodeMetrics {
644    /// Create a new zero initialized metrics record.
645    pub const fn new() -> Self {
646        Self {
647            processed: 0,
648            dropped: 0,
649            ingress: 0,
650            egress: 0,
651            lat_sum: 0,
652            lat_cnt: 0,
653            lat_max: 0,
654            deadline_miss_count: 0,
655        }
656    }
657
658    /// Returns the number of items successfully processed by the node.
659    #[inline]
660    pub const fn processed(&self) -> &u64 {
661        &self.processed
662    }
663
664    /// Returns the number of items dropped by the node.
665    #[inline]
666    pub const fn dropped(&self) -> &u64 {
667        &self.dropped
668    }
669
670    /// Returns the number of ingress messages observed by the node.
671    #[inline]
672    pub const fn ingress(&self) -> &u64 {
673        &self.ingress
674    }
675
676    /// Returns the number of egress messages emitted by the node.
677    #[inline]
678    pub const fn egress(&self) -> &u64 {
679        &self.egress
680    }
681
682    /// Returns the sum of all recorded latencies in nanoseconds.
683    #[inline]
684    pub const fn lat_sum(&self) -> &u64 {
685        &self.lat_sum
686    }
687
688    /// Returns the number of latency samples recorded.
689    #[inline]
690    pub const fn lat_cnt(&self) -> &u64 {
691        &self.lat_cnt
692    }
693
694    /// Returns the maximum latency observed in nanoseconds.
695    #[inline]
696    pub const fn lat_max(&self) -> &u64 {
697        &self.lat_max
698    }
699
700    /// Returns the number of deadline misses observed for this node.
701    #[inline]
702    pub const fn deadline_miss_count(&self) -> &u64 {
703        &self.deadline_miss_count
704    }
705
706    /// Increment `processed` by `delta` (saturating).
707    #[inline]
708    pub fn inc_processed(&mut self, delta: u64) {
709        self.processed = self.processed.saturating_add(delta);
710    }
711
712    /// Subtract from `processed` (saturating at zero).
713    #[inline]
714    pub fn dec_processed(&mut self, delta: u64) {
715        self.processed = self.processed.saturating_sub(delta);
716    }
717
718    /// Set `processed` to `v`.
719    #[inline]
720    pub fn set_processed(&mut self, v: u64) {
721        self.processed = v;
722    }
723
724    /// Increment `dropped` by `delta` (saturating).
725    #[inline]
726    pub fn inc_dropped(&mut self, delta: u64) {
727        self.dropped = self.dropped.saturating_add(delta);
728    }
729
730    /// Subtract from `dropped` (saturating at zero).
731    #[inline]
732    pub fn dec_dropped(&mut self, delta: u64) {
733        self.dropped = self.dropped.saturating_sub(delta);
734    }
735
736    /// Set `dropped` to `v`.
737    #[inline]
738    pub fn set_dropped(&mut self, v: u64) {
739        self.dropped = v;
740    }
741
742    /// Increment `ingress` by `delta` (saturating).
743    #[inline]
744    pub fn inc_ingress(&mut self, delta: u64) {
745        self.ingress = self.ingress.saturating_add(delta);
746    }
747
748    /// Subtract from `ingress` (saturating at zero).
749    #[inline]
750    pub fn dec_ingress(&mut self, delta: u64) {
751        self.ingress = self.ingress.saturating_sub(delta);
752    }
753
754    /// Set `ingress` to `v`.
755    #[inline]
756    pub fn set_ingress(&mut self, v: u64) {
757        self.ingress = v;
758    }
759
760    /// Increment `egress` by `delta` (saturating).
761    #[inline]
762    pub fn inc_egress(&mut self, delta: u64) {
763        self.egress = self.egress.saturating_add(delta);
764    }
765
766    /// Subtract from `egress` (saturating at zero).
767    #[inline]
768    pub fn dec_egress(&mut self, delta: u64) {
769        self.egress = self.egress.saturating_sub(delta);
770    }
771
772    /// Set `egress` to `v`.
773    #[inline]
774    pub fn set_egress(&mut self, v: u64) {
775        self.egress = v;
776    }
777
778    /// Record a latency sample in nanoseconds.
779    ///
780    /// This updates `lat_sum`, `lat_cnt`, and `lat_max`. Uses saturating
781    /// addition to avoid overflow on long running systems.
782    #[inline]
783    pub fn record_latency_ns(&mut self, value_ns: u64) {
784        self.lat_sum = self.lat_sum.saturating_add(value_ns);
785        self.lat_cnt = self.lat_cnt.saturating_add(1);
786        if value_ns > self.lat_max {
787            self.lat_max = value_ns;
788        }
789    }
790
791    /// Merge another `NodeMetrics` into `self`. This uses saturating addition
792    /// for counters and takes the maximum for latency max.
793    #[inline]
794    pub fn merge_from(&mut self, other: &Self) {
795        self.processed = self.processed.saturating_add(other.processed);
796        self.dropped = self.dropped.saturating_add(other.dropped);
797        self.ingress = self.ingress.saturating_add(other.ingress);
798        self.egress = self.egress.saturating_add(other.egress);
799        self.lat_sum = self.lat_sum.saturating_add(other.lat_sum);
800        self.lat_cnt = self.lat_cnt.saturating_add(other.lat_cnt);
801        if other.lat_max > self.lat_max {
802            self.lat_max = other.lat_max;
803        }
804        self.deadline_miss_count = self
805            .deadline_miss_count
806            .saturating_add(other.deadline_miss_count);
807    }
808
809    /// Increment the deadline miss counter by `delta` (saturating).
810    #[inline]
811    pub fn inc_deadline_miss_count(&mut self, delta: u64) {
812        self.deadline_miss_count = self.deadline_miss_count.saturating_add(delta);
813    }
814
815    /// Reset all counters and aggregates to zero.
816    #[inline]
817    pub fn reset(&mut self) {
818        *self = Self::new();
819    }
820}
821
822/// Per edge metrics aggregated by fixed and dynamic collectors.
823#[non_exhaustive]
824#[derive(Debug, Clone, Copy)]
825pub struct EdgeMetrics {
826    /// Current queue depth for the edge.
827    queue_depth: u32,
828}
829
830impl Default for EdgeMetrics {
831    fn default() -> Self {
832        Self::new()
833    }
834}
835
836impl EdgeMetrics {
837    /// Create a new metrics record with a queue depth of zero.
838    pub const fn new() -> Self {
839        Self { queue_depth: 0 }
840    }
841
842    /// Returns the current queue depth for the edge.
843    #[inline]
844    pub const fn queue_depth(&self) -> &u32 {
845        &self.queue_depth
846    }
847
848    /// Sets the current queue depth for the edge.
849    #[inline]
850    pub fn set_queue_depth(&mut self, v: u32) {
851        self.queue_depth = v;
852    }
853
854    /// Increment queue depth by `delta` (saturating).
855    #[inline]
856    pub fn inc_queue_depth(&mut self, delta: u32) {
857        self.queue_depth = self.queue_depth.saturating_add(delta);
858    }
859
860    /// Decrement queue depth by `delta` (saturating at zero).
861    #[inline]
862    pub fn dec_queue_depth(&mut self, delta: u32) {
863        self.queue_depth = self.queue_depth.saturating_sub(delta);
864    }
865
866    /// Merge another `EdgeMetrics` into `self`. For queue depth we follow the
867    /// last-writer-wins semantics used by higher-level merge logic.
868    #[inline]
869    pub fn merge_from(&mut self, other: &Self) {
870        self.queue_depth = other.queue_depth;
871    }
872
873    /// Reset all counters and aggregates to zero.
874    #[inline]
875    pub fn reset(&mut self) {
876        *self = Self::new();
877    }
878}
879
880/// Per graph telemetry metrics.
881#[derive(Debug, Clone, Copy)]
882pub struct GraphMetrics<const MAX_NODES: usize, const MAX_EDGES: usize> {
883    /// Graph id.
884    id: u32,
885    /// Per-node metrics.
886    nodes: [NodeMetrics; MAX_NODES],
887    /// Per-edge metrics.
888    edges: [EdgeMetrics; MAX_EDGES],
889}
890
891impl<const MAX_NODES: usize, const MAX_EDGES: usize> GraphMetrics<MAX_NODES, MAX_EDGES> {
892    /// Create a new metrics record with a queue depth of zero.
893    pub const fn new(id: u32) -> Self {
894        Self {
895            id,
896            nodes: [NodeMetrics::new(); MAX_NODES],
897            edges: [EdgeMetrics::new(); MAX_EDGES],
898        }
899    }
900
901    /// Return the identifier of this graph.
902    pub fn id(&self) -> &u32 {
903        &self.id
904    }
905
906    /// Return the metrics for all nodes.
907    pub fn nodes(&self) -> &[NodeMetrics; MAX_NODES] {
908        &self.nodes
909    }
910
911    /// Return the metrics for all edges.
912    pub fn edges(&self) -> &[EdgeMetrics; MAX_EDGES] {
913        &self.edges
914    }
915}
916
917impl<const MAX_NODES: usize, const MAX_EDGES: usize> GraphMetrics<MAX_NODES, MAX_EDGES> {
918    /// Format this graph metrics record as multi-line text.
919    ///
920    /// The first line contains the graph identifier. Each subsequent line
921    /// contains metrics for a single node or edge, indented by two spaces and
922    /// prefixed with `node id:` or `edge id:` respectively.
923    pub fn fmt<W: fmt::Write>(&self, w: &mut W) -> fmt::Result {
924        // First line: graph id
925        w.write_str("graph id: ")?;
926        write_u64(w, self.id as u64)?;
927        w.write_str("\n")?;
928
929        // Nodes
930        for i in 0..MAX_NODES {
931            let m = &self.nodes[i];
932            w.write_str("  node id: ")?;
933            write_u64(w, i as u64)?;
934            w.write_str(" processed=")?;
935            write_u64(w, m.processed)?;
936            w.write_str(" dropped=")?;
937            write_u64(w, m.dropped)?;
938            w.write_str(" ingress=")?;
939            write_u64(w, m.ingress)?;
940            w.write_str(" egress=")?;
941            write_u64(w, m.egress)?;
942            w.write_str(" lat_sum=")?;
943            write_u64(w, m.lat_sum)?;
944            w.write_str(" lat_cnt=")?;
945            write_u64(w, m.lat_cnt)?;
946            w.write_str(" lat_max=")?;
947            write_u64(w, m.lat_max)?;
948            w.write_str(" deadline_miss_count=")?;
949            write_u64(w, m.deadline_miss_count)?;
950            w.write_str("\n")?;
951        }
952
953        // Edges
954        for i in 0..MAX_EDGES {
955            let m = &self.edges[i];
956            w.write_str("  edge id: ")?;
957            write_u64(w, i as u64)?;
958            w.write_str(" queue_depth=")?;
959            write_u64(w, m.queue_depth as u64)?;
960            w.write_str("\n")?;
961        }
962
963        Ok(())
964    }
965}
966
967/// Telemetry implementation that discards all metrics and events.
968///
969/// This is useful in tests and extremely constrained environments where
970/// telemetry is not required.
971#[derive(Debug, Default, Clone, Copy)]
972pub struct NoopTelemetry;
973
974impl Telemetry for NoopTelemetry {
975    const METRICS_ENABLED: bool = false;
976    const EVENTS_STATICALLY_ENABLED: bool = false;
977
978    #[inline]
979    fn incr_counter(&mut self, _key: TelemetryKey, _delta: u64) {}
980    #[inline]
981    fn set_gauge(&mut self, _key: TelemetryKey, _value: u64) {}
982    #[inline]
983    fn record_latency_ns(&mut self, _key: TelemetryKey, _value_ns: u64) {}
984}
985
986impl Telemetry for () {
987    const METRICS_ENABLED: bool = false;
988    const EVENTS_STATICALLY_ENABLED: bool = false;
989
990    #[inline]
991    fn incr_counter(&mut self, _key: TelemetryKey, _delta: u64) {}
992
993    #[inline]
994    fn set_gauge(&mut self, _key: TelemetryKey, _value: u64) {}
995
996    #[inline]
997    fn record_latency_ns(&mut self, _key: TelemetryKey, _value_ns: u64) {}
998}