Skip to main content

limen_core/telemetry/
graph_telemetry.rs

1//! Basic graph level telemetry implementation,
2
3use crate::prelude::sink::TelemetrySink;
4
5use super::*;
6
7/// Fixed size telemetry collector backed by arrays.
8///
9/// This collector stores node and edge metrics in statically sized arrays and
10/// forwards structured events to a `TelemetrySink`. It is suitable for no_std
11/// environments where heap allocation is not available.
12pub struct GraphTelemetry<const MAX_NODES: usize, const MAX_EDGES: usize, Sink: TelemetrySink> {
13    /// Per-graph metrics (nodes and edges) indexed by identifier.
14    metrics: GraphMetrics<MAX_NODES, MAX_EDGES>,
15    /// Event writer used for structured telemetry.
16    writer: Sink,
17    /// Flag that controls whether structured events are forwarded.
18    events: bool,
19}
20
21impl<const MAX_NODES: usize, const MAX_EDGES: usize, Writer: TelemetrySink>
22    GraphTelemetry<MAX_NODES, MAX_EDGES, Writer>
23{
24    /// Create a new fixed size collector with the given event writer.
25    ///
26    /// All node and edge metrics are initialized to zero.
27    pub const fn new(id: u32, events: bool, writer: Writer) -> Self {
28        Self {
29            metrics: GraphMetrics::new(id),
30            writer,
31            events,
32        }
33        // const NM: NodeMetrics = NodeMetrics::new();
34        // const EM: EdgeMetrics = EdgeMetrics::new();
35    }
36
37    /// Enable emission of structured telemetry events.
38    #[inline]
39    pub fn enable_events(&mut self) {
40        self.events = true;
41    }
42
43    /// Disable emission of structured telemetry events.
44    #[inline]
45    pub fn disable_events(&mut self) {
46        self.events = false;
47    }
48
49    /// Return true if the given node identifier is within bounds.
50    #[inline]
51    fn node_ok(id: u32) -> bool {
52        (id as usize) < MAX_NODES
53    }
54
55    /// Return true if the given edge identifier is within bounds.
56    #[inline]
57    fn edge_ok(id: u32) -> bool {
58        (id as usize) < MAX_EDGES
59    }
60
61    /// Access the underlying metrics record.
62    #[inline]
63    pub fn metrics(&self) -> &GraphMetrics<MAX_NODES, MAX_EDGES> {
64        &self.metrics
65    }
66
67    /// Access the full array of node metrics.
68    #[inline]
69    pub fn nodes(&self) -> &[NodeMetrics; MAX_NODES] {
70        &self.metrics.nodes
71    }
72
73    /// Access the full array of edge metrics.
74    #[inline]
75    pub fn edges(&self) -> &[EdgeMetrics; MAX_EDGES] {
76        &self.metrics.edges
77    }
78
79    /// Access the underlying event writer.
80    #[inline]
81    pub fn writer(&self) -> &Writer {
82        &self.writer
83    }
84
85    /// Merge metrics from another fixed size collector into this one.
86    ///
87    /// Node metrics are combined by saturating addition and maximum for latency
88    /// maxima, while edge queue depth uses a last writer wins strategy.
89    pub fn merge_from<const N2: usize, const E2: usize, W2: TelemetrySink>(
90        &mut self,
91        other: &GraphTelemetry<N2, E2, W2>,
92    ) {
93        let n = core::cmp::min(MAX_NODES, N2);
94        let e = core::cmp::min(MAX_EDGES, E2);
95
96        for i in 0..n {
97            let dst = &mut self.metrics.nodes[i];
98            let src = &other.metrics.nodes[i];
99            dst.processed = dst.processed.saturating_add(src.processed);
100            dst.dropped = dst.dropped.saturating_add(src.dropped);
101            dst.ingress = dst.ingress.saturating_add(src.ingress);
102            dst.egress = dst.egress.saturating_add(src.egress);
103            dst.lat_sum = dst.lat_sum.saturating_add(src.lat_sum);
104            dst.lat_cnt = dst.lat_cnt.saturating_add(src.lat_cnt);
105            if src.lat_max > dst.lat_max {
106                dst.lat_max = src.lat_max;
107            }
108        }
109
110        for i in 0..e {
111            // last-wins
112            self.metrics.edges[i].queue_depth = other.metrics.edges[i].queue_depth;
113        }
114    }
115}
116
117impl<const N: usize, const E: usize, W: TelemetrySink> Telemetry for GraphTelemetry<N, E, W> {
118    const METRICS_ENABLED: bool = true;
119    const EVENTS_STATICALLY_ENABLED: bool = true;
120
121    #[inline]
122    fn incr_counter(&mut self, key: TelemetryKey, delta: u64) {
123        match (key.ns(), key.kind()) {
124            (TelemetryNs::Node, TelemetryKind::Processed) if Self::node_ok(*key.id()) => {
125                self.metrics.nodes[*key.id() as usize].processed = self.metrics.nodes
126                    [*key.id() as usize]
127                    .processed
128                    .saturating_add(delta);
129            }
130            (TelemetryNs::Node, TelemetryKind::Dropped) if Self::node_ok(*key.id()) => {
131                self.metrics.nodes[*key.id() as usize].dropped = self.metrics.nodes
132                    [*key.id() as usize]
133                    .dropped
134                    .saturating_add(delta);
135            }
136            (TelemetryNs::Node, TelemetryKind::IngressMsgs) if Self::node_ok(*key.id()) => {
137                self.metrics.nodes[*key.id() as usize].ingress = self.metrics.nodes
138                    [*key.id() as usize]
139                    .ingress
140                    .saturating_add(delta);
141            }
142            (TelemetryNs::Node, TelemetryKind::EgressMsgs) if Self::node_ok(*key.id()) => {
143                self.metrics.nodes[*key.id() as usize].egress = self.metrics.nodes
144                    [*key.id() as usize]
145                    .egress
146                    .saturating_add(delta);
147            }
148            (TelemetryNs::Node, TelemetryKind::DeadlineMiss) if Self::node_ok(*key.id()) => {
149                self.metrics.nodes[*key.id() as usize].deadline_miss_count = self.metrics.nodes
150                    [*key.id() as usize]
151                    .deadline_miss_count
152                    .saturating_add(delta);
153            }
154            _ => {}
155        }
156    }
157
158    #[inline]
159    fn set_gauge(&mut self, key: TelemetryKey, value: u64) {
160        if matches!(key.ns(), TelemetryNs::Edge)
161            && matches!(key.kind(), TelemetryKind::QueueDepth)
162            && Self::edge_ok(*key.id())
163        {
164            self.metrics.edges[*key.id() as usize].queue_depth = value as u32;
165        }
166    }
167
168    #[inline]
169    fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64) {
170        if matches!(key.ns(), TelemetryNs::Node)
171            && matches!(key.kind(), TelemetryKind::Latency)
172            && Self::node_ok(*key.id())
173        {
174            let m = &mut self.metrics.nodes[*key.id() as usize];
175            m.lat_sum = m.lat_sum.saturating_add(value_ns);
176            m.lat_cnt = m.lat_cnt.saturating_add(1);
177            if value_ns > m.lat_max {
178                m.lat_max = value_ns;
179            }
180        }
181    }
182
183    #[inline]
184    fn push_metrics(&mut self) {
185        let _ = self.writer.push_metrics(&self.metrics);
186    }
187
188    /// Return true if this telemetry collector wants structured events.
189    ///
190    /// Runtimes and nodes can use this to avoid constructing `TelemetryEvent`
191    /// values when events are disabled, keeping the hot path as cheap as possible.
192    #[inline]
193    fn events_enabled(&self) -> bool {
194        self.events
195    }
196
197    #[inline]
198    fn push_event(&mut self, event: TelemetryEvent) {
199        if self.events {
200            let _ = self.writer.push_event(&event);
201        }
202    }
203
204    #[inline]
205    fn flush(&mut self) {
206        let _ = self.writer.flush();
207    }
208}
209
210impl<const N: usize, const E: usize, W: TelemetrySink + Clone> Clone for GraphTelemetry<N, E, W> {
211    fn clone(&self) -> Self {
212        Self {
213            metrics: self.metrics,
214            writer: self.writer.clone(),
215            events: self.events,
216        }
217    }
218}
219
220/// Convenience helper to merge one fixed collector into another.
221///
222/// This simply forwards to `FixedTelemetry::merge_from` and is useful when
223/// working with references to the collectors.
224pub fn merge_fixed_telemetry<
225    const N1: usize,
226    const E1: usize,
227    W1: TelemetrySink,
228    const N2: usize,
229    const E2: usize,
230    W2: TelemetrySink,
231>(
232    dst: &mut GraphTelemetry<N1, E1, W1>,
233    src: &GraphTelemetry<N2, E2, W2>,
234) {
235    dst.merge_from(src);
236}