limen_core/telemetry/
graph_telemetry.rs1use crate::prelude::sink::TelemetrySink;
4
5use super::*;
6
7pub struct GraphTelemetry<const MAX_NODES: usize, const MAX_EDGES: usize, Sink: TelemetrySink> {
13 metrics: GraphMetrics<MAX_NODES, MAX_EDGES>,
15 writer: Sink,
17 events: bool,
19}
20
21impl<const MAX_NODES: usize, const MAX_EDGES: usize, Writer: TelemetrySink>
22 GraphTelemetry<MAX_NODES, MAX_EDGES, Writer>
23{
24 pub const fn new(id: u32, events: bool, writer: Writer) -> Self {
28 Self {
29 metrics: GraphMetrics::new(id),
30 writer,
31 events,
32 }
33 }
36
37 #[inline]
39 pub fn enable_events(&mut self) {
40 self.events = true;
41 }
42
43 #[inline]
45 pub fn disable_events(&mut self) {
46 self.events = false;
47 }
48
49 #[inline]
51 fn node_ok(id: u32) -> bool {
52 (id as usize) < MAX_NODES
53 }
54
55 #[inline]
57 fn edge_ok(id: u32) -> bool {
58 (id as usize) < MAX_EDGES
59 }
60
61 #[inline]
63 pub fn metrics(&self) -> &GraphMetrics<MAX_NODES, MAX_EDGES> {
64 &self.metrics
65 }
66
67 #[inline]
69 pub fn nodes(&self) -> &[NodeMetrics; MAX_NODES] {
70 &self.metrics.nodes
71 }
72
73 #[inline]
75 pub fn edges(&self) -> &[EdgeMetrics; MAX_EDGES] {
76 &self.metrics.edges
77 }
78
79 #[inline]
81 pub fn writer(&self) -> &Writer {
82 &self.writer
83 }
84
85 pub fn merge_from<const N2: usize, const E2: usize, W2: TelemetrySink>(
90 &mut self,
91 other: &GraphTelemetry<N2, E2, W2>,
92 ) {
93 let n = core::cmp::min(MAX_NODES, N2);
94 let e = core::cmp::min(MAX_EDGES, E2);
95
96 for i in 0..n {
97 let dst = &mut self.metrics.nodes[i];
98 let src = &other.metrics.nodes[i];
99 dst.processed = dst.processed.saturating_add(src.processed);
100 dst.dropped = dst.dropped.saturating_add(src.dropped);
101 dst.ingress = dst.ingress.saturating_add(src.ingress);
102 dst.egress = dst.egress.saturating_add(src.egress);
103 dst.lat_sum = dst.lat_sum.saturating_add(src.lat_sum);
104 dst.lat_cnt = dst.lat_cnt.saturating_add(src.lat_cnt);
105 if src.lat_max > dst.lat_max {
106 dst.lat_max = src.lat_max;
107 }
108 }
109
110 for i in 0..e {
111 self.metrics.edges[i].queue_depth = other.metrics.edges[i].queue_depth;
113 }
114 }
115}
116
117impl<const N: usize, const E: usize, W: TelemetrySink> Telemetry for GraphTelemetry<N, E, W> {
118 const METRICS_ENABLED: bool = true;
119 const EVENTS_STATICALLY_ENABLED: bool = true;
120
121 #[inline]
122 fn incr_counter(&mut self, key: TelemetryKey, delta: u64) {
123 match (key.ns(), key.kind()) {
124 (TelemetryNs::Node, TelemetryKind::Processed) if Self::node_ok(*key.id()) => {
125 self.metrics.nodes[*key.id() as usize].processed = self.metrics.nodes
126 [*key.id() as usize]
127 .processed
128 .saturating_add(delta);
129 }
130 (TelemetryNs::Node, TelemetryKind::Dropped) if Self::node_ok(*key.id()) => {
131 self.metrics.nodes[*key.id() as usize].dropped = self.metrics.nodes
132 [*key.id() as usize]
133 .dropped
134 .saturating_add(delta);
135 }
136 (TelemetryNs::Node, TelemetryKind::IngressMsgs) if Self::node_ok(*key.id()) => {
137 self.metrics.nodes[*key.id() as usize].ingress = self.metrics.nodes
138 [*key.id() as usize]
139 .ingress
140 .saturating_add(delta);
141 }
142 (TelemetryNs::Node, TelemetryKind::EgressMsgs) if Self::node_ok(*key.id()) => {
143 self.metrics.nodes[*key.id() as usize].egress = self.metrics.nodes
144 [*key.id() as usize]
145 .egress
146 .saturating_add(delta);
147 }
148 (TelemetryNs::Node, TelemetryKind::DeadlineMiss) if Self::node_ok(*key.id()) => {
149 self.metrics.nodes[*key.id() as usize].deadline_miss_count = self.metrics.nodes
150 [*key.id() as usize]
151 .deadline_miss_count
152 .saturating_add(delta);
153 }
154 _ => {}
155 }
156 }
157
158 #[inline]
159 fn set_gauge(&mut self, key: TelemetryKey, value: u64) {
160 if matches!(key.ns(), TelemetryNs::Edge)
161 && matches!(key.kind(), TelemetryKind::QueueDepth)
162 && Self::edge_ok(*key.id())
163 {
164 self.metrics.edges[*key.id() as usize].queue_depth = value as u32;
165 }
166 }
167
168 #[inline]
169 fn record_latency_ns(&mut self, key: TelemetryKey, value_ns: u64) {
170 if matches!(key.ns(), TelemetryNs::Node)
171 && matches!(key.kind(), TelemetryKind::Latency)
172 && Self::node_ok(*key.id())
173 {
174 let m = &mut self.metrics.nodes[*key.id() as usize];
175 m.lat_sum = m.lat_sum.saturating_add(value_ns);
176 m.lat_cnt = m.lat_cnt.saturating_add(1);
177 if value_ns > m.lat_max {
178 m.lat_max = value_ns;
179 }
180 }
181 }
182
183 #[inline]
184 fn push_metrics(&mut self) {
185 let _ = self.writer.push_metrics(&self.metrics);
186 }
187
188 #[inline]
193 fn events_enabled(&self) -> bool {
194 self.events
195 }
196
197 #[inline]
198 fn push_event(&mut self, event: TelemetryEvent) {
199 if self.events {
200 let _ = self.writer.push_event(&event);
201 }
202 }
203
204 #[inline]
205 fn flush(&mut self) {
206 let _ = self.writer.flush();
207 }
208}
209
210impl<const N: usize, const E: usize, W: TelemetrySink + Clone> Clone for GraphTelemetry<N, E, W> {
211 fn clone(&self) -> Self {
212 Self {
213 metrics: self.metrics,
214 writer: self.writer.clone(),
215 events: self.events,
216 }
217 }
218}
219
220pub fn merge_fixed_telemetry<
225 const N1: usize,
226 const E1: usize,
227 W1: TelemetrySink,
228 const N2: usize,
229 const E2: usize,
230 W2: TelemetrySink,
231>(
232 dst: &mut GraphTelemetry<N1, E1, W1>,
233 src: &GraphTelemetry<N2, E2, W2>,
234) {
235 dst.merge_from(src);
236}