sentinel_agent_protocol/v2/
protocol_metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13
14#[derive(Debug, Default)]
16pub struct ProtocolMetrics {
17 pub requests_total: AtomicU64,
20 pub responses_total: AtomicU64,
22 pub timeouts_total: AtomicU64,
24 pub connection_errors_total: AtomicU64,
26 pub serialization_errors_total: AtomicU64,
28 pub flow_control_pauses_total: AtomicU64,
30 pub flow_control_resumes_total: AtomicU64,
32 pub flow_control_rejections_total: AtomicU64,
34
35 pub in_flight_requests: AtomicU64,
38 pub buffer_utilization_percent: AtomicU64,
40 pub healthy_connections: AtomicU64,
42 pub paused_connections: AtomicU64,
44
45 pub serialization_time: HistogramMetric,
48 pub request_duration: HistogramMetric,
50}
51
52impl ProtocolMetrics {
53 pub fn new() -> Self {
55 Self::default()
56 }
57
58 #[inline]
60 pub fn inc_requests(&self) {
61 self.requests_total.fetch_add(1, Ordering::Relaxed);
62 }
63
64 #[inline]
66 pub fn inc_responses(&self) {
67 self.responses_total.fetch_add(1, Ordering::Relaxed);
68 }
69
70 #[inline]
72 pub fn inc_timeouts(&self) {
73 self.timeouts_total.fetch_add(1, Ordering::Relaxed);
74 }
75
76 #[inline]
78 pub fn inc_connection_errors(&self) {
79 self.connection_errors_total.fetch_add(1, Ordering::Relaxed);
80 }
81
82 #[inline]
84 pub fn inc_serialization_errors(&self) {
85 self.serialization_errors_total.fetch_add(1, Ordering::Relaxed);
86 }
87
88 #[inline]
90 pub fn record_flow_pause(&self) {
91 self.flow_control_pauses_total.fetch_add(1, Ordering::Relaxed);
92 }
93
94 #[inline]
96 pub fn record_flow_resume(&self) {
97 self.flow_control_resumes_total.fetch_add(1, Ordering::Relaxed);
98 }
99
100 #[inline]
102 pub fn record_flow_rejection(&self) {
103 self.flow_control_rejections_total.fetch_add(1, Ordering::Relaxed);
104 }
105
106 #[inline]
108 pub fn set_in_flight(&self, count: u64) {
109 self.in_flight_requests.store(count, Ordering::Relaxed);
110 }
111
112 #[inline]
114 pub fn inc_in_flight(&self) {
115 self.in_flight_requests.fetch_add(1, Ordering::Relaxed);
116 }
117
118 #[inline]
120 pub fn dec_in_flight(&self) {
121 self.in_flight_requests.fetch_sub(1, Ordering::Relaxed);
122 }
123
124 #[inline]
126 pub fn set_buffer_utilization(&self, percent: u64) {
127 self.buffer_utilization_percent.store(percent.min(100), Ordering::Relaxed);
128 }
129
130 #[inline]
132 pub fn set_healthy_connections(&self, count: u64) {
133 self.healthy_connections.store(count, Ordering::Relaxed);
134 }
135
136 #[inline]
138 pub fn set_paused_connections(&self, count: u64) {
139 self.paused_connections.store(count, Ordering::Relaxed);
140 }
141
142 #[inline]
144 pub fn record_serialization_time(&self, duration: Duration) {
145 self.serialization_time.record(duration);
146 }
147
148 #[inline]
150 pub fn record_request_duration(&self, duration: Duration) {
151 self.request_duration.record(duration);
152 }
153
154 pub fn snapshot(&self) -> ProtocolMetricsSnapshot {
156 ProtocolMetricsSnapshot {
157 requests_total: self.requests_total.load(Ordering::Relaxed),
158 responses_total: self.responses_total.load(Ordering::Relaxed),
159 timeouts_total: self.timeouts_total.load(Ordering::Relaxed),
160 connection_errors_total: self.connection_errors_total.load(Ordering::Relaxed),
161 serialization_errors_total: self.serialization_errors_total.load(Ordering::Relaxed),
162 flow_control_pauses_total: self.flow_control_pauses_total.load(Ordering::Relaxed),
163 flow_control_resumes_total: self.flow_control_resumes_total.load(Ordering::Relaxed),
164 flow_control_rejections_total: self.flow_control_rejections_total.load(Ordering::Relaxed),
165 in_flight_requests: self.in_flight_requests.load(Ordering::Relaxed),
166 buffer_utilization_percent: self.buffer_utilization_percent.load(Ordering::Relaxed),
167 healthy_connections: self.healthy_connections.load(Ordering::Relaxed),
168 paused_connections: self.paused_connections.load(Ordering::Relaxed),
169 serialization_time: self.serialization_time.snapshot(),
170 request_duration: self.request_duration.snapshot(),
171 }
172 }
173
174 pub fn to_prometheus(&self, prefix: &str) -> String {
176 let snap = self.snapshot();
177 let mut output = String::with_capacity(2048);
178
179 output.push_str(&format!(
181 "# HELP {prefix}_requests_total Total requests sent to agents\n\
182 # TYPE {prefix}_requests_total counter\n\
183 {prefix}_requests_total {}\n\n",
184 snap.requests_total
185 ));
186
187 output.push_str(&format!(
188 "# HELP {prefix}_responses_total Total responses received from agents\n\
189 # TYPE {prefix}_responses_total counter\n\
190 {prefix}_responses_total {}\n\n",
191 snap.responses_total
192 ));
193
194 output.push_str(&format!(
195 "# HELP {prefix}_timeouts_total Total request timeouts\n\
196 # TYPE {prefix}_timeouts_total counter\n\
197 {prefix}_timeouts_total {}\n\n",
198 snap.timeouts_total
199 ));
200
201 output.push_str(&format!(
202 "# HELP {prefix}_connection_errors_total Total connection errors\n\
203 # TYPE {prefix}_connection_errors_total counter\n\
204 {prefix}_connection_errors_total {}\n\n",
205 snap.connection_errors_total
206 ));
207
208 output.push_str(&format!(
209 "# HELP {prefix}_flow_control_pauses_total Flow control pause events\n\
210 # TYPE {prefix}_flow_control_pauses_total counter\n\
211 {prefix}_flow_control_pauses_total {}\n\n",
212 snap.flow_control_pauses_total
213 ));
214
215 output.push_str(&format!(
216 "# HELP {prefix}_flow_control_rejections_total Requests rejected due to flow control\n\
217 # TYPE {prefix}_flow_control_rejections_total counter\n\
218 {prefix}_flow_control_rejections_total {}\n\n",
219 snap.flow_control_rejections_total
220 ));
221
222 output.push_str(&format!(
224 "# HELP {prefix}_in_flight_requests Current in-flight requests\n\
225 # TYPE {prefix}_in_flight_requests gauge\n\
226 {prefix}_in_flight_requests {}\n\n",
227 snap.in_flight_requests
228 ));
229
230 output.push_str(&format!(
231 "# HELP {prefix}_buffer_utilization_percent Buffer utilization percentage\n\
232 # TYPE {prefix}_buffer_utilization_percent gauge\n\
233 {prefix}_buffer_utilization_percent {}\n\n",
234 snap.buffer_utilization_percent
235 ));
236
237 output.push_str(&format!(
238 "# HELP {prefix}_healthy_connections Number of healthy agent connections\n\
239 # TYPE {prefix}_healthy_connections gauge\n\
240 {prefix}_healthy_connections {}\n\n",
241 snap.healthy_connections
242 ));
243
244 output.push_str(&format!(
245 "# HELP {prefix}_paused_connections Number of flow-control paused connections\n\
246 # TYPE {prefix}_paused_connections gauge\n\
247 {prefix}_paused_connections {}\n\n",
248 snap.paused_connections
249 ));
250
251 output.push_str(&snap.serialization_time.to_prometheus(
253 &format!("{prefix}_serialization_seconds"),
254 "Serialization time in seconds",
255 ));
256
257 output.push_str(&snap.request_duration.to_prometheus(
258 &format!("{prefix}_request_duration_seconds"),
259 "Request duration in seconds",
260 ));
261
262 output
263 }
264}
265
266#[derive(Debug)]
268pub struct HistogramMetric {
269 buckets: Vec<u64>,
271 counts: Vec<AtomicU64>,
273 sum: AtomicU64,
275 count: AtomicU64,
277}
278
279impl Default for HistogramMetric {
280 fn default() -> Self {
281 let buckets = vec![10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000];
283 let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
284 Self {
285 buckets,
286 counts,
287 sum: AtomicU64::new(0),
288 count: AtomicU64::new(0),
289 }
290 }
291}
292
293impl HistogramMetric {
294 pub fn with_buckets(buckets: Vec<u64>) -> Self {
296 let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
297 Self {
298 buckets,
299 counts,
300 sum: AtomicU64::new(0),
301 count: AtomicU64::new(0),
302 }
303 }
304
305 #[inline]
307 pub fn record(&self, duration: Duration) {
308 let micros = duration.as_micros() as u64;
309
310 self.sum.fetch_add(micros, Ordering::Relaxed);
312 self.count.fetch_add(1, Ordering::Relaxed);
313
314 let bucket_idx = self.buckets.iter().position(|&b| micros <= b).unwrap_or(self.buckets.len());
316 self.counts[bucket_idx].fetch_add(1, Ordering::Relaxed);
317 }
318
319 pub fn snapshot(&self) -> HistogramSnapshot {
321 HistogramSnapshot {
322 buckets: self.buckets.clone(),
323 counts: self.counts.iter().map(|c| c.load(Ordering::Relaxed)).collect(),
324 sum: self.sum.load(Ordering::Relaxed),
325 count: self.count.load(Ordering::Relaxed),
326 }
327 }
328}
329
330#[derive(Debug, Clone)]
332pub struct HistogramSnapshot {
333 pub buckets: Vec<u64>,
335 pub counts: Vec<u64>,
337 pub sum: u64,
339 pub count: u64,
341}
342
343impl HistogramSnapshot {
344 pub fn to_prometheus(&self, name: &str, help: &str) -> String {
346 let mut output = String::with_capacity(512);
347
348 output.push_str(&format!("# HELP {name} {help}\n"));
349 output.push_str(&format!("# TYPE {name} histogram\n"));
350
351 let mut cumulative = 0u64;
353 for (i, &boundary) in self.buckets.iter().enumerate() {
354 cumulative += self.counts[i];
355 let le = boundary as f64 / 1_000_000.0; output.push_str(&format!("{name}_bucket{{le=\"{le:.6}\"}} {cumulative}\n"));
357 }
358
359 cumulative += self.counts.last().copied().unwrap_or(0);
361 output.push_str(&format!("{name}_bucket{{le=\"+Inf\"}} {cumulative}\n"));
362
363 let sum_seconds = self.sum as f64 / 1_000_000.0;
365 output.push_str(&format!("{name}_sum {sum_seconds:.6}\n"));
366 output.push_str(&format!("{name}_count {}\n\n", self.count));
367
368 output
369 }
370
371 pub fn mean_micros(&self) -> f64 {
373 if self.count == 0 {
374 0.0
375 } else {
376 self.sum as f64 / self.count as f64
377 }
378 }
379
380 pub fn percentile_micros(&self, p: f64) -> u64 {
382 if self.count == 0 {
383 return 0;
384 }
385
386 let target = (self.count as f64 * p / 100.0).ceil() as u64;
387 let mut cumulative = 0u64;
388
389 for (i, &count) in self.counts.iter().enumerate() {
390 cumulative += count;
391 if cumulative >= target {
392 return if i < self.buckets.len() {
393 self.buckets[i]
394 } else {
395 self.buckets.last().copied().unwrap_or(0)
397 };
398 }
399 }
400
401 self.buckets.last().copied().unwrap_or(0)
402 }
403}
404
405#[derive(Debug, Clone)]
407pub struct ProtocolMetricsSnapshot {
408 pub requests_total: u64,
410 pub responses_total: u64,
411 pub timeouts_total: u64,
412 pub connection_errors_total: u64,
413 pub serialization_errors_total: u64,
414 pub flow_control_pauses_total: u64,
415 pub flow_control_resumes_total: u64,
416 pub flow_control_rejections_total: u64,
417
418 pub in_flight_requests: u64,
420 pub buffer_utilization_percent: u64,
421 pub healthy_connections: u64,
422 pub paused_connections: u64,
423
424 pub serialization_time: HistogramSnapshot,
426 pub request_duration: HistogramSnapshot,
427}
428
429pub struct DurationRecorder<'a> {
431 histogram: &'a HistogramMetric,
432 start: Instant,
433}
434
435impl<'a> DurationRecorder<'a> {
436 pub fn new(histogram: &'a HistogramMetric) -> Self {
438 Self {
439 histogram,
440 start: Instant::now(),
441 }
442 }
443
444 pub fn record(self) {
446 self.histogram.record(self.start.elapsed());
447 }
448}
449
450impl Drop for DurationRecorder<'_> {
451 fn drop(&mut self) {
452 }
454}
455
456#[cfg(test)]
457mod tests {
458 use super::*;
459
460 #[test]
461 fn test_counter_increments() {
462 let metrics = ProtocolMetrics::new();
463
464 metrics.inc_requests();
465 metrics.inc_requests();
466 metrics.inc_responses();
467
468 let snap = metrics.snapshot();
469 assert_eq!(snap.requests_total, 2);
470 assert_eq!(snap.responses_total, 1);
471 }
472
473 #[test]
474 fn test_gauge_updates() {
475 let metrics = ProtocolMetrics::new();
476
477 metrics.set_in_flight(5);
478 metrics.inc_in_flight();
479 metrics.dec_in_flight();
480
481 let snap = metrics.snapshot();
482 assert_eq!(snap.in_flight_requests, 5);
483 }
484
485 #[test]
486 fn test_histogram_recording() {
487 let metrics = ProtocolMetrics::new();
488
489 metrics.record_serialization_time(Duration::from_micros(50));
490 metrics.record_serialization_time(Duration::from_micros(150));
491 metrics.record_serialization_time(Duration::from_millis(5));
492
493 let snap = metrics.snapshot();
494 assert_eq!(snap.serialization_time.count, 3);
495 assert_eq!(snap.serialization_time.sum, 50 + 150 + 5000);
496 }
497
498 #[test]
499 fn test_histogram_percentile() {
500 let hist = HistogramMetric::default();
501
502 for i in 1..=100 {
504 hist.record(Duration::from_micros(i));
505 }
506
507 let snap = hist.snapshot();
508 assert_eq!(snap.count, 100);
509
510 let p50 = snap.percentile_micros(50.0);
512 assert!(p50 <= 100, "p50 was {}", p50);
513 }
514
515 #[test]
516 fn test_flow_control_metrics() {
517 let metrics = ProtocolMetrics::new();
518
519 metrics.record_flow_pause();
520 metrics.record_flow_pause();
521 metrics.record_flow_rejection();
522
523 let snap = metrics.snapshot();
524 assert_eq!(snap.flow_control_pauses_total, 2);
525 assert_eq!(snap.flow_control_rejections_total, 1);
526 }
527
528 #[test]
529 fn test_prometheus_export() {
530 let metrics = ProtocolMetrics::new();
531
532 metrics.inc_requests();
533 metrics.set_healthy_connections(3);
534 metrics.record_serialization_time(Duration::from_micros(100));
535
536 let output = metrics.to_prometheus("agent_protocol");
537
538 assert!(output.contains("agent_protocol_requests_total 1"));
539 assert!(output.contains("agent_protocol_healthy_connections 3"));
540 assert!(output.contains("agent_protocol_serialization_seconds"));
541 }
542}