grapsus_agent_protocol/v2/
protocol_metrics.rs1use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13
14#[derive(Debug, Default)]
16pub struct ProtocolMetrics {
17 pub requests_total: AtomicU64,
20 pub responses_total: AtomicU64,
22 pub timeouts_total: AtomicU64,
24 pub connection_errors_total: AtomicU64,
26 pub serialization_errors_total: AtomicU64,
28 pub flow_control_pauses_total: AtomicU64,
30 pub flow_control_resumes_total: AtomicU64,
32 pub flow_control_rejections_total: AtomicU64,
34
35 pub in_flight_requests: AtomicU64,
38 pub buffer_utilization_percent: AtomicU64,
40 pub healthy_connections: AtomicU64,
42 pub paused_connections: AtomicU64,
44
45 pub serialization_time: HistogramMetric,
48 pub request_duration: HistogramMetric,
50}
51
52impl ProtocolMetrics {
53 pub fn new() -> Self {
55 Self::default()
56 }
57
58 #[inline]
60 pub fn inc_requests(&self) {
61 self.requests_total.fetch_add(1, Ordering::Relaxed);
62 }
63
64 #[inline]
66 pub fn inc_responses(&self) {
67 self.responses_total.fetch_add(1, Ordering::Relaxed);
68 }
69
70 #[inline]
72 pub fn inc_timeouts(&self) {
73 self.timeouts_total.fetch_add(1, Ordering::Relaxed);
74 }
75
76 #[inline]
78 pub fn inc_connection_errors(&self) {
79 self.connection_errors_total.fetch_add(1, Ordering::Relaxed);
80 }
81
82 #[inline]
84 pub fn inc_serialization_errors(&self) {
85 self.serialization_errors_total
86 .fetch_add(1, Ordering::Relaxed);
87 }
88
89 #[inline]
91 pub fn record_flow_pause(&self) {
92 self.flow_control_pauses_total
93 .fetch_add(1, Ordering::Relaxed);
94 }
95
96 #[inline]
98 pub fn record_flow_resume(&self) {
99 self.flow_control_resumes_total
100 .fetch_add(1, Ordering::Relaxed);
101 }
102
103 #[inline]
105 pub fn record_flow_rejection(&self) {
106 self.flow_control_rejections_total
107 .fetch_add(1, Ordering::Relaxed);
108 }
109
110 #[inline]
112 pub fn set_in_flight(&self, count: u64) {
113 self.in_flight_requests.store(count, Ordering::Relaxed);
114 }
115
116 #[inline]
118 pub fn inc_in_flight(&self) {
119 self.in_flight_requests.fetch_add(1, Ordering::Relaxed);
120 }
121
122 #[inline]
124 pub fn dec_in_flight(&self) {
125 self.in_flight_requests.fetch_sub(1, Ordering::Relaxed);
126 }
127
128 #[inline]
130 pub fn set_buffer_utilization(&self, percent: u64) {
131 self.buffer_utilization_percent
132 .store(percent.min(100), Ordering::Relaxed);
133 }
134
135 #[inline]
137 pub fn set_healthy_connections(&self, count: u64) {
138 self.healthy_connections.store(count, Ordering::Relaxed);
139 }
140
141 #[inline]
143 pub fn set_paused_connections(&self, count: u64) {
144 self.paused_connections.store(count, Ordering::Relaxed);
145 }
146
147 #[inline]
149 pub fn record_serialization_time(&self, duration: Duration) {
150 self.serialization_time.record(duration);
151 }
152
153 #[inline]
155 pub fn record_request_duration(&self, duration: Duration) {
156 self.request_duration.record(duration);
157 }
158
159 pub fn snapshot(&self) -> ProtocolMetricsSnapshot {
161 ProtocolMetricsSnapshot {
162 requests_total: self.requests_total.load(Ordering::Relaxed),
163 responses_total: self.responses_total.load(Ordering::Relaxed),
164 timeouts_total: self.timeouts_total.load(Ordering::Relaxed),
165 connection_errors_total: self.connection_errors_total.load(Ordering::Relaxed),
166 serialization_errors_total: self.serialization_errors_total.load(Ordering::Relaxed),
167 flow_control_pauses_total: self.flow_control_pauses_total.load(Ordering::Relaxed),
168 flow_control_resumes_total: self.flow_control_resumes_total.load(Ordering::Relaxed),
169 flow_control_rejections_total: self
170 .flow_control_rejections_total
171 .load(Ordering::Relaxed),
172 in_flight_requests: self.in_flight_requests.load(Ordering::Relaxed),
173 buffer_utilization_percent: self.buffer_utilization_percent.load(Ordering::Relaxed),
174 healthy_connections: self.healthy_connections.load(Ordering::Relaxed),
175 paused_connections: self.paused_connections.load(Ordering::Relaxed),
176 serialization_time: self.serialization_time.snapshot(),
177 request_duration: self.request_duration.snapshot(),
178 }
179 }
180
181 pub fn to_prometheus(&self, prefix: &str) -> String {
183 let snap = self.snapshot();
184 let mut output = String::with_capacity(2048);
185
186 output.push_str(&format!(
188 "# HELP {prefix}_requests_total Total requests sent to agents\n\
189 # TYPE {prefix}_requests_total counter\n\
190 {prefix}_requests_total {}\n\n",
191 snap.requests_total
192 ));
193
194 output.push_str(&format!(
195 "# HELP {prefix}_responses_total Total responses received from agents\n\
196 # TYPE {prefix}_responses_total counter\n\
197 {prefix}_responses_total {}\n\n",
198 snap.responses_total
199 ));
200
201 output.push_str(&format!(
202 "# HELP {prefix}_timeouts_total Total request timeouts\n\
203 # TYPE {prefix}_timeouts_total counter\n\
204 {prefix}_timeouts_total {}\n\n",
205 snap.timeouts_total
206 ));
207
208 output.push_str(&format!(
209 "# HELP {prefix}_connection_errors_total Total connection errors\n\
210 # TYPE {prefix}_connection_errors_total counter\n\
211 {prefix}_connection_errors_total {}\n\n",
212 snap.connection_errors_total
213 ));
214
215 output.push_str(&format!(
216 "# HELP {prefix}_flow_control_pauses_total Flow control pause events\n\
217 # TYPE {prefix}_flow_control_pauses_total counter\n\
218 {prefix}_flow_control_pauses_total {}\n\n",
219 snap.flow_control_pauses_total
220 ));
221
222 output.push_str(&format!(
223 "# HELP {prefix}_flow_control_rejections_total Requests rejected due to flow control\n\
224 # TYPE {prefix}_flow_control_rejections_total counter\n\
225 {prefix}_flow_control_rejections_total {}\n\n",
226 snap.flow_control_rejections_total
227 ));
228
229 output.push_str(&format!(
231 "# HELP {prefix}_in_flight_requests Current in-flight requests\n\
232 # TYPE {prefix}_in_flight_requests gauge\n\
233 {prefix}_in_flight_requests {}\n\n",
234 snap.in_flight_requests
235 ));
236
237 output.push_str(&format!(
238 "# HELP {prefix}_buffer_utilization_percent Buffer utilization percentage\n\
239 # TYPE {prefix}_buffer_utilization_percent gauge\n\
240 {prefix}_buffer_utilization_percent {}\n\n",
241 snap.buffer_utilization_percent
242 ));
243
244 output.push_str(&format!(
245 "# HELP {prefix}_healthy_connections Number of healthy agent connections\n\
246 # TYPE {prefix}_healthy_connections gauge\n\
247 {prefix}_healthy_connections {}\n\n",
248 snap.healthy_connections
249 ));
250
251 output.push_str(&format!(
252 "# HELP {prefix}_paused_connections Number of flow-control paused connections\n\
253 # TYPE {prefix}_paused_connections gauge\n\
254 {prefix}_paused_connections {}\n\n",
255 snap.paused_connections
256 ));
257
258 output.push_str(&snap.serialization_time.to_prometheus(
260 &format!("{prefix}_serialization_seconds"),
261 "Serialization time in seconds",
262 ));
263
264 output.push_str(&snap.request_duration.to_prometheus(
265 &format!("{prefix}_request_duration_seconds"),
266 "Request duration in seconds",
267 ));
268
269 output
270 }
271}
272
273#[derive(Debug)]
275pub struct HistogramMetric {
276 buckets: Vec<u64>,
278 counts: Vec<AtomicU64>,
280 sum: AtomicU64,
282 count: AtomicU64,
284}
285
286impl Default for HistogramMetric {
287 fn default() -> Self {
288 let buckets = vec![
290 10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000,
291 ];
292 let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
293 Self {
294 buckets,
295 counts,
296 sum: AtomicU64::new(0),
297 count: AtomicU64::new(0),
298 }
299 }
300}
301
302impl HistogramMetric {
303 pub fn with_buckets(buckets: Vec<u64>) -> Self {
305 let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
306 Self {
307 buckets,
308 counts,
309 sum: AtomicU64::new(0),
310 count: AtomicU64::new(0),
311 }
312 }
313
314 #[inline]
316 pub fn record(&self, duration: Duration) {
317 let micros = duration.as_micros() as u64;
318
319 self.sum.fetch_add(micros, Ordering::Relaxed);
321 self.count.fetch_add(1, Ordering::Relaxed);
322
323 let bucket_idx = self
325 .buckets
326 .iter()
327 .position(|&b| micros <= b)
328 .unwrap_or(self.buckets.len());
329 self.counts[bucket_idx].fetch_add(1, Ordering::Relaxed);
330 }
331
332 pub fn snapshot(&self) -> HistogramSnapshot {
334 HistogramSnapshot {
335 buckets: self.buckets.clone(),
336 counts: self
337 .counts
338 .iter()
339 .map(|c| c.load(Ordering::Relaxed))
340 .collect(),
341 sum: self.sum.load(Ordering::Relaxed),
342 count: self.count.load(Ordering::Relaxed),
343 }
344 }
345}
346
347#[derive(Debug, Clone)]
349pub struct HistogramSnapshot {
350 pub buckets: Vec<u64>,
352 pub counts: Vec<u64>,
354 pub sum: u64,
356 pub count: u64,
358}
359
360impl HistogramSnapshot {
361 pub fn to_prometheus(&self, name: &str, help: &str) -> String {
363 let mut output = String::with_capacity(512);
364
365 output.push_str(&format!("# HELP {name} {help}\n"));
366 output.push_str(&format!("# TYPE {name} histogram\n"));
367
368 let mut cumulative = 0u64;
370 for (i, &boundary) in self.buckets.iter().enumerate() {
371 cumulative += self.counts[i];
372 let le = boundary as f64 / 1_000_000.0; output.push_str(&format!("{name}_bucket{{le=\"{le:.6}\"}} {cumulative}\n"));
374 }
375
376 cumulative += self.counts.last().copied().unwrap_or(0);
378 output.push_str(&format!("{name}_bucket{{le=\"+Inf\"}} {cumulative}\n"));
379
380 let sum_seconds = self.sum as f64 / 1_000_000.0;
382 output.push_str(&format!("{name}_sum {sum_seconds:.6}\n"));
383 output.push_str(&format!("{name}_count {}\n\n", self.count));
384
385 output
386 }
387
388 pub fn mean_micros(&self) -> f64 {
390 if self.count == 0 {
391 0.0
392 } else {
393 self.sum as f64 / self.count as f64
394 }
395 }
396
397 pub fn percentile_micros(&self, p: f64) -> u64 {
399 if self.count == 0 {
400 return 0;
401 }
402
403 let target = (self.count as f64 * p / 100.0).ceil() as u64;
404 let mut cumulative = 0u64;
405
406 for (i, &count) in self.counts.iter().enumerate() {
407 cumulative += count;
408 if cumulative >= target {
409 return if i < self.buckets.len() {
410 self.buckets[i]
411 } else {
412 self.buckets.last().copied().unwrap_or(0)
414 };
415 }
416 }
417
418 self.buckets.last().copied().unwrap_or(0)
419 }
420}
421
422#[derive(Debug, Clone)]
424pub struct ProtocolMetricsSnapshot {
425 pub requests_total: u64,
427 pub responses_total: u64,
428 pub timeouts_total: u64,
429 pub connection_errors_total: u64,
430 pub serialization_errors_total: u64,
431 pub flow_control_pauses_total: u64,
432 pub flow_control_resumes_total: u64,
433 pub flow_control_rejections_total: u64,
434
435 pub in_flight_requests: u64,
437 pub buffer_utilization_percent: u64,
438 pub healthy_connections: u64,
439 pub paused_connections: u64,
440
441 pub serialization_time: HistogramSnapshot,
443 pub request_duration: HistogramSnapshot,
444}
445
446pub struct DurationRecorder<'a> {
448 histogram: &'a HistogramMetric,
449 start: Instant,
450}
451
452impl<'a> DurationRecorder<'a> {
453 pub fn new(histogram: &'a HistogramMetric) -> Self {
455 Self {
456 histogram,
457 start: Instant::now(),
458 }
459 }
460
461 pub fn record(self) {
463 self.histogram.record(self.start.elapsed());
464 }
465}
466
467impl Drop for DurationRecorder<'_> {
468 fn drop(&mut self) {
469 }
471}
472
473#[cfg(test)]
474mod tests {
475 use super::*;
476
477 #[test]
478 fn test_counter_increments() {
479 let metrics = ProtocolMetrics::new();
480
481 metrics.inc_requests();
482 metrics.inc_requests();
483 metrics.inc_responses();
484
485 let snap = metrics.snapshot();
486 assert_eq!(snap.requests_total, 2);
487 assert_eq!(snap.responses_total, 1);
488 }
489
490 #[test]
491 fn test_gauge_updates() {
492 let metrics = ProtocolMetrics::new();
493
494 metrics.set_in_flight(5);
495 metrics.inc_in_flight();
496 metrics.dec_in_flight();
497
498 let snap = metrics.snapshot();
499 assert_eq!(snap.in_flight_requests, 5);
500 }
501
502 #[test]
503 fn test_histogram_recording() {
504 let metrics = ProtocolMetrics::new();
505
506 metrics.record_serialization_time(Duration::from_micros(50));
507 metrics.record_serialization_time(Duration::from_micros(150));
508 metrics.record_serialization_time(Duration::from_millis(5));
509
510 let snap = metrics.snapshot();
511 assert_eq!(snap.serialization_time.count, 3);
512 assert_eq!(snap.serialization_time.sum, 50 + 150 + 5000);
513 }
514
515 #[test]
516 fn test_histogram_percentile() {
517 let hist = HistogramMetric::default();
518
519 for i in 1..=100 {
521 hist.record(Duration::from_micros(i));
522 }
523
524 let snap = hist.snapshot();
525 assert_eq!(snap.count, 100);
526
527 let p50 = snap.percentile_micros(50.0);
529 assert!(p50 <= 100, "p50 was {}", p50);
530 }
531
532 #[test]
533 fn test_flow_control_metrics() {
534 let metrics = ProtocolMetrics::new();
535
536 metrics.record_flow_pause();
537 metrics.record_flow_pause();
538 metrics.record_flow_rejection();
539
540 let snap = metrics.snapshot();
541 assert_eq!(snap.flow_control_pauses_total, 2);
542 assert_eq!(snap.flow_control_rejections_total, 1);
543 }
544
545 #[test]
546 fn test_prometheus_export() {
547 let metrics = ProtocolMetrics::new();
548
549 metrics.inc_requests();
550 metrics.set_healthy_connections(3);
551 metrics.record_serialization_time(Duration::from_micros(100));
552
553 let output = metrics.to_prometheus("agent_protocol");
554
555 assert!(output.contains("agent_protocol_requests_total 1"));
556 assert!(output.contains("agent_protocol_healthy_connections 3"));
557 assert!(output.contains("agent_protocol_serialization_seconds"));
558 }
559}