Skip to main content

grapsus_agent_protocol/v2/
protocol_metrics.rs

1//! Protocol-level metrics for Agent Protocol v2.
2//!
3//! This module provides internal metrics tracking for the protocol layer:
4//! - Serialization time histograms
5//! - Flow control event counters
6//! - Buffer utilization gauges
7//! - Request/response counters
8//!
9//! These metrics are for proxy-side instrumentation, not agent-reported metrics.
10
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13
14/// Protocol-level metrics for the agent pool.
15#[derive(Debug, Default)]
16pub struct ProtocolMetrics {
17    // Counters
18    /// Total requests sent
19    pub requests_total: AtomicU64,
20    /// Total responses received
21    pub responses_total: AtomicU64,
22    /// Requests that timed out
23    pub timeouts_total: AtomicU64,
24    /// Connection errors
25    pub connection_errors_total: AtomicU64,
26    /// Serialization errors
27    pub serialization_errors_total: AtomicU64,
28    /// Flow control pause events
29    pub flow_control_pauses_total: AtomicU64,
30    /// Flow control resume events
31    pub flow_control_resumes_total: AtomicU64,
32    /// Requests rejected due to flow control
33    pub flow_control_rejections_total: AtomicU64,
34
35    // Gauges
36    /// Current in-flight requests
37    pub in_flight_requests: AtomicU64,
38    /// Current buffer utilization (0-100)
39    pub buffer_utilization_percent: AtomicU64,
40    /// Number of healthy connections
41    pub healthy_connections: AtomicU64,
42    /// Number of paused connections (flow control)
43    pub paused_connections: AtomicU64,
44
45    // Histograms (using simple bucketed approach)
46    /// Serialization time histogram
47    pub serialization_time: HistogramMetric,
48    /// Request duration histogram (end-to-end)
49    pub request_duration: HistogramMetric,
50}
51
52impl ProtocolMetrics {
53    /// Create new protocol metrics.
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    /// Increment requests total.
59    #[inline]
60    pub fn inc_requests(&self) {
61        self.requests_total.fetch_add(1, Ordering::Relaxed);
62    }
63
64    /// Increment responses total.
65    #[inline]
66    pub fn inc_responses(&self) {
67        self.responses_total.fetch_add(1, Ordering::Relaxed);
68    }
69
70    /// Increment timeouts.
71    #[inline]
72    pub fn inc_timeouts(&self) {
73        self.timeouts_total.fetch_add(1, Ordering::Relaxed);
74    }
75
76    /// Increment connection errors.
77    #[inline]
78    pub fn inc_connection_errors(&self) {
79        self.connection_errors_total.fetch_add(1, Ordering::Relaxed);
80    }
81
82    /// Increment serialization errors.
83    #[inline]
84    pub fn inc_serialization_errors(&self) {
85        self.serialization_errors_total
86            .fetch_add(1, Ordering::Relaxed);
87    }
88
89    /// Record flow control pause.
90    #[inline]
91    pub fn record_flow_pause(&self) {
92        self.flow_control_pauses_total
93            .fetch_add(1, Ordering::Relaxed);
94    }
95
96    /// Record flow control resume.
97    #[inline]
98    pub fn record_flow_resume(&self) {
99        self.flow_control_resumes_total
100            .fetch_add(1, Ordering::Relaxed);
101    }
102
103    /// Record flow control rejection.
104    #[inline]
105    pub fn record_flow_rejection(&self) {
106        self.flow_control_rejections_total
107            .fetch_add(1, Ordering::Relaxed);
108    }
109
110    /// Set in-flight requests gauge.
111    #[inline]
112    pub fn set_in_flight(&self, count: u64) {
113        self.in_flight_requests.store(count, Ordering::Relaxed);
114    }
115
116    /// Increment in-flight requests.
117    #[inline]
118    pub fn inc_in_flight(&self) {
119        self.in_flight_requests.fetch_add(1, Ordering::Relaxed);
120    }
121
122    /// Decrement in-flight requests.
123    #[inline]
124    pub fn dec_in_flight(&self) {
125        self.in_flight_requests.fetch_sub(1, Ordering::Relaxed);
126    }
127
128    /// Set buffer utilization percentage.
129    #[inline]
130    pub fn set_buffer_utilization(&self, percent: u64) {
131        self.buffer_utilization_percent
132            .store(percent.min(100), Ordering::Relaxed);
133    }
134
135    /// Set healthy connections gauge.
136    #[inline]
137    pub fn set_healthy_connections(&self, count: u64) {
138        self.healthy_connections.store(count, Ordering::Relaxed);
139    }
140
141    /// Set paused connections gauge.
142    #[inline]
143    pub fn set_paused_connections(&self, count: u64) {
144        self.paused_connections.store(count, Ordering::Relaxed);
145    }
146
147    /// Record serialization time.
148    #[inline]
149    pub fn record_serialization_time(&self, duration: Duration) {
150        self.serialization_time.record(duration);
151    }
152
153    /// Record request duration.
154    #[inline]
155    pub fn record_request_duration(&self, duration: Duration) {
156        self.request_duration.record(duration);
157    }
158
159    /// Get a snapshot of all metrics.
160    pub fn snapshot(&self) -> ProtocolMetricsSnapshot {
161        ProtocolMetricsSnapshot {
162            requests_total: self.requests_total.load(Ordering::Relaxed),
163            responses_total: self.responses_total.load(Ordering::Relaxed),
164            timeouts_total: self.timeouts_total.load(Ordering::Relaxed),
165            connection_errors_total: self.connection_errors_total.load(Ordering::Relaxed),
166            serialization_errors_total: self.serialization_errors_total.load(Ordering::Relaxed),
167            flow_control_pauses_total: self.flow_control_pauses_total.load(Ordering::Relaxed),
168            flow_control_resumes_total: self.flow_control_resumes_total.load(Ordering::Relaxed),
169            flow_control_rejections_total: self
170                .flow_control_rejections_total
171                .load(Ordering::Relaxed),
172            in_flight_requests: self.in_flight_requests.load(Ordering::Relaxed),
173            buffer_utilization_percent: self.buffer_utilization_percent.load(Ordering::Relaxed),
174            healthy_connections: self.healthy_connections.load(Ordering::Relaxed),
175            paused_connections: self.paused_connections.load(Ordering::Relaxed),
176            serialization_time: self.serialization_time.snapshot(),
177            request_duration: self.request_duration.snapshot(),
178        }
179    }
180
181    /// Export metrics in Prometheus text format.
182    pub fn to_prometheus(&self, prefix: &str) -> String {
183        let snap = self.snapshot();
184        let mut output = String::with_capacity(2048);
185
186        // Counters
187        output.push_str(&format!(
188            "# HELP {prefix}_requests_total Total requests sent to agents\n\
189             # TYPE {prefix}_requests_total counter\n\
190             {prefix}_requests_total {}\n\n",
191            snap.requests_total
192        ));
193
194        output.push_str(&format!(
195            "# HELP {prefix}_responses_total Total responses received from agents\n\
196             # TYPE {prefix}_responses_total counter\n\
197             {prefix}_responses_total {}\n\n",
198            snap.responses_total
199        ));
200
201        output.push_str(&format!(
202            "# HELP {prefix}_timeouts_total Total request timeouts\n\
203             # TYPE {prefix}_timeouts_total counter\n\
204             {prefix}_timeouts_total {}\n\n",
205            snap.timeouts_total
206        ));
207
208        output.push_str(&format!(
209            "# HELP {prefix}_connection_errors_total Total connection errors\n\
210             # TYPE {prefix}_connection_errors_total counter\n\
211             {prefix}_connection_errors_total {}\n\n",
212            snap.connection_errors_total
213        ));
214
215        output.push_str(&format!(
216            "# HELP {prefix}_flow_control_pauses_total Flow control pause events\n\
217             # TYPE {prefix}_flow_control_pauses_total counter\n\
218             {prefix}_flow_control_pauses_total {}\n\n",
219            snap.flow_control_pauses_total
220        ));
221
222        output.push_str(&format!(
223            "# HELP {prefix}_flow_control_rejections_total Requests rejected due to flow control\n\
224             # TYPE {prefix}_flow_control_rejections_total counter\n\
225             {prefix}_flow_control_rejections_total {}\n\n",
226            snap.flow_control_rejections_total
227        ));
228
229        // Gauges
230        output.push_str(&format!(
231            "# HELP {prefix}_in_flight_requests Current in-flight requests\n\
232             # TYPE {prefix}_in_flight_requests gauge\n\
233             {prefix}_in_flight_requests {}\n\n",
234            snap.in_flight_requests
235        ));
236
237        output.push_str(&format!(
238            "# HELP {prefix}_buffer_utilization_percent Buffer utilization percentage\n\
239             # TYPE {prefix}_buffer_utilization_percent gauge\n\
240             {prefix}_buffer_utilization_percent {}\n\n",
241            snap.buffer_utilization_percent
242        ));
243
244        output.push_str(&format!(
245            "# HELP {prefix}_healthy_connections Number of healthy agent connections\n\
246             # TYPE {prefix}_healthy_connections gauge\n\
247             {prefix}_healthy_connections {}\n\n",
248            snap.healthy_connections
249        ));
250
251        output.push_str(&format!(
252            "# HELP {prefix}_paused_connections Number of flow-control paused connections\n\
253             # TYPE {prefix}_paused_connections gauge\n\
254             {prefix}_paused_connections {}\n\n",
255            snap.paused_connections
256        ));
257
258        // Histograms
259        output.push_str(&snap.serialization_time.to_prometheus(
260            &format!("{prefix}_serialization_seconds"),
261            "Serialization time in seconds",
262        ));
263
264        output.push_str(&snap.request_duration.to_prometheus(
265            &format!("{prefix}_request_duration_seconds"),
266            "Request duration in seconds",
267        ));
268
269        output
270    }
271}
272
273/// Simple histogram metric using predefined buckets.
274#[derive(Debug)]
275pub struct HistogramMetric {
276    /// Bucket boundaries in microseconds
277    buckets: Vec<u64>,
278    /// Counts per bucket (one extra for +Inf)
279    counts: Vec<AtomicU64>,
280    /// Sum of all observations in microseconds
281    sum: AtomicU64,
282    /// Total count
283    count: AtomicU64,
284}
285
286impl Default for HistogramMetric {
287    fn default() -> Self {
288        // Default buckets: 10μs, 50μs, 100μs, 500μs, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s
289        let buckets = vec![
290            10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000,
291        ];
292        let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
293        Self {
294            buckets,
295            counts,
296            sum: AtomicU64::new(0),
297            count: AtomicU64::new(0),
298        }
299    }
300}
301
302impl HistogramMetric {
303    /// Create a new histogram with custom bucket boundaries (in microseconds).
304    pub fn with_buckets(buckets: Vec<u64>) -> Self {
305        let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
306        Self {
307            buckets,
308            counts,
309            sum: AtomicU64::new(0),
310            count: AtomicU64::new(0),
311        }
312    }
313
314    /// Record an observation.
315    #[inline]
316    pub fn record(&self, duration: Duration) {
317        let micros = duration.as_micros() as u64;
318
319        // Update sum and count
320        self.sum.fetch_add(micros, Ordering::Relaxed);
321        self.count.fetch_add(1, Ordering::Relaxed);
322
323        // Find bucket and increment
324        let bucket_idx = self
325            .buckets
326            .iter()
327            .position(|&b| micros <= b)
328            .unwrap_or(self.buckets.len());
329        self.counts[bucket_idx].fetch_add(1, Ordering::Relaxed);
330    }
331
332    /// Get a snapshot of the histogram.
333    pub fn snapshot(&self) -> HistogramSnapshot {
334        HistogramSnapshot {
335            buckets: self.buckets.clone(),
336            counts: self
337                .counts
338                .iter()
339                .map(|c| c.load(Ordering::Relaxed))
340                .collect(),
341            sum: self.sum.load(Ordering::Relaxed),
342            count: self.count.load(Ordering::Relaxed),
343        }
344    }
345}
346
347/// Snapshot of histogram data.
348#[derive(Debug, Clone)]
349pub struct HistogramSnapshot {
350    /// Bucket boundaries in microseconds
351    pub buckets: Vec<u64>,
352    /// Counts per bucket
353    pub counts: Vec<u64>,
354    /// Sum of all observations in microseconds
355    pub sum: u64,
356    /// Total count
357    pub count: u64,
358}
359
360impl HistogramSnapshot {
361    /// Export to Prometheus format.
362    pub fn to_prometheus(&self, name: &str, help: &str) -> String {
363        let mut output = String::with_capacity(512);
364
365        output.push_str(&format!("# HELP {name} {help}\n"));
366        output.push_str(&format!("# TYPE {name} histogram\n"));
367
368        // Cumulative bucket counts
369        let mut cumulative = 0u64;
370        for (i, &boundary) in self.buckets.iter().enumerate() {
371            cumulative += self.counts[i];
372            let le = boundary as f64 / 1_000_000.0; // Convert to seconds
373            output.push_str(&format!("{name}_bucket{{le=\"{le:.6}\"}} {cumulative}\n"));
374        }
375
376        // +Inf bucket
377        cumulative += self.counts.last().copied().unwrap_or(0);
378        output.push_str(&format!("{name}_bucket{{le=\"+Inf\"}} {cumulative}\n"));
379
380        // Sum and count
381        let sum_seconds = self.sum as f64 / 1_000_000.0;
382        output.push_str(&format!("{name}_sum {sum_seconds:.6}\n"));
383        output.push_str(&format!("{name}_count {}\n\n", self.count));
384
385        output
386    }
387
388    /// Get the mean value in microseconds.
389    pub fn mean_micros(&self) -> f64 {
390        if self.count == 0 {
391            0.0
392        } else {
393            self.sum as f64 / self.count as f64
394        }
395    }
396
397    /// Get the approximate percentile value in microseconds.
398    pub fn percentile_micros(&self, p: f64) -> u64 {
399        if self.count == 0 {
400            return 0;
401        }
402
403        let target = (self.count as f64 * p / 100.0).ceil() as u64;
404        let mut cumulative = 0u64;
405
406        for (i, &count) in self.counts.iter().enumerate() {
407            cumulative += count;
408            if cumulative >= target {
409                return if i < self.buckets.len() {
410                    self.buckets[i]
411                } else {
412                    // +Inf bucket, return last finite bucket
413                    self.buckets.last().copied().unwrap_or(0)
414                };
415            }
416        }
417
418        self.buckets.last().copied().unwrap_or(0)
419    }
420}
421
422/// Snapshot of all protocol metrics.
423#[derive(Debug, Clone)]
424pub struct ProtocolMetricsSnapshot {
425    // Counters
426    pub requests_total: u64,
427    pub responses_total: u64,
428    pub timeouts_total: u64,
429    pub connection_errors_total: u64,
430    pub serialization_errors_total: u64,
431    pub flow_control_pauses_total: u64,
432    pub flow_control_resumes_total: u64,
433    pub flow_control_rejections_total: u64,
434
435    // Gauges
436    pub in_flight_requests: u64,
437    pub buffer_utilization_percent: u64,
438    pub healthy_connections: u64,
439    pub paused_connections: u64,
440
441    // Histograms
442    pub serialization_time: HistogramSnapshot,
443    pub request_duration: HistogramSnapshot,
444}
445
446/// Helper to measure and record duration.
447pub struct DurationRecorder<'a> {
448    histogram: &'a HistogramMetric,
449    start: Instant,
450}
451
452impl<'a> DurationRecorder<'a> {
453    /// Start recording duration.
454    pub fn new(histogram: &'a HistogramMetric) -> Self {
455        Self {
456            histogram,
457            start: Instant::now(),
458        }
459    }
460
461    /// Record the elapsed duration.
462    pub fn record(self) {
463        self.histogram.record(self.start.elapsed());
464    }
465}
466
467impl Drop for DurationRecorder<'_> {
468    fn drop(&mut self) {
469        // Don't double-record, this is just a safety net
470    }
471}
472
473#[cfg(test)]
474mod tests {
475    use super::*;
476
477    #[test]
478    fn test_counter_increments() {
479        let metrics = ProtocolMetrics::new();
480
481        metrics.inc_requests();
482        metrics.inc_requests();
483        metrics.inc_responses();
484
485        let snap = metrics.snapshot();
486        assert_eq!(snap.requests_total, 2);
487        assert_eq!(snap.responses_total, 1);
488    }
489
490    #[test]
491    fn test_gauge_updates() {
492        let metrics = ProtocolMetrics::new();
493
494        metrics.set_in_flight(5);
495        metrics.inc_in_flight();
496        metrics.dec_in_flight();
497
498        let snap = metrics.snapshot();
499        assert_eq!(snap.in_flight_requests, 5);
500    }
501
502    #[test]
503    fn test_histogram_recording() {
504        let metrics = ProtocolMetrics::new();
505
506        metrics.record_serialization_time(Duration::from_micros(50));
507        metrics.record_serialization_time(Duration::from_micros(150));
508        metrics.record_serialization_time(Duration::from_millis(5));
509
510        let snap = metrics.snapshot();
511        assert_eq!(snap.serialization_time.count, 3);
512        assert_eq!(snap.serialization_time.sum, 50 + 150 + 5000);
513    }
514
515    #[test]
516    fn test_histogram_percentile() {
517        let hist = HistogramMetric::default();
518
519        // Record 100 observations from 1μs to 100μs
520        for i in 1..=100 {
521            hist.record(Duration::from_micros(i));
522        }
523
524        let snap = hist.snapshot();
525        assert_eq!(snap.count, 100);
526
527        // p50 should be around 50μs (in the 50μs bucket)
528        let p50 = snap.percentile_micros(50.0);
529        assert!(p50 <= 100, "p50 was {}", p50);
530    }
531
532    #[test]
533    fn test_flow_control_metrics() {
534        let metrics = ProtocolMetrics::new();
535
536        metrics.record_flow_pause();
537        metrics.record_flow_pause();
538        metrics.record_flow_rejection();
539
540        let snap = metrics.snapshot();
541        assert_eq!(snap.flow_control_pauses_total, 2);
542        assert_eq!(snap.flow_control_rejections_total, 1);
543    }
544
545    #[test]
546    fn test_prometheus_export() {
547        let metrics = ProtocolMetrics::new();
548
549        metrics.inc_requests();
550        metrics.set_healthy_connections(3);
551        metrics.record_serialization_time(Duration::from_micros(100));
552
553        let output = metrics.to_prometheus("agent_protocol");
554
555        assert!(output.contains("agent_protocol_requests_total 1"));
556        assert!(output.contains("agent_protocol_healthy_connections 3"));
557        assert!(output.contains("agent_protocol_serialization_seconds"));
558    }
559}