Skip to main content

sentinel_agent_protocol/v2/
protocol_metrics.rs

1//! Protocol-level metrics for Agent Protocol v2.
2//!
3//! This module provides internal metrics tracking for the protocol layer:
4//! - Serialization time histograms
5//! - Flow control event counters
6//! - Buffer utilization gauges
7//! - Request/response counters
8//!
9//! These metrics are for proxy-side instrumentation, not agent-reported metrics.
10
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::time::{Duration, Instant};
13
14/// Protocol-level metrics for the agent pool.
15#[derive(Debug, Default)]
16pub struct ProtocolMetrics {
17    // Counters
18    /// Total requests sent
19    pub requests_total: AtomicU64,
20    /// Total responses received
21    pub responses_total: AtomicU64,
22    /// Requests that timed out
23    pub timeouts_total: AtomicU64,
24    /// Connection errors
25    pub connection_errors_total: AtomicU64,
26    /// Serialization errors
27    pub serialization_errors_total: AtomicU64,
28    /// Flow control pause events
29    pub flow_control_pauses_total: AtomicU64,
30    /// Flow control resume events
31    pub flow_control_resumes_total: AtomicU64,
32    /// Requests rejected due to flow control
33    pub flow_control_rejections_total: AtomicU64,
34
35    // Gauges
36    /// Current in-flight requests
37    pub in_flight_requests: AtomicU64,
38    /// Current buffer utilization (0-100)
39    pub buffer_utilization_percent: AtomicU64,
40    /// Number of healthy connections
41    pub healthy_connections: AtomicU64,
42    /// Number of paused connections (flow control)
43    pub paused_connections: AtomicU64,
44
45    // Histograms (using simple bucketed approach)
46    /// Serialization time histogram
47    pub serialization_time: HistogramMetric,
48    /// Request duration histogram (end-to-end)
49    pub request_duration: HistogramMetric,
50}
51
52impl ProtocolMetrics {
53    /// Create new protocol metrics.
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    /// Increment requests total.
59    #[inline]
60    pub fn inc_requests(&self) {
61        self.requests_total.fetch_add(1, Ordering::Relaxed);
62    }
63
64    /// Increment responses total.
65    #[inline]
66    pub fn inc_responses(&self) {
67        self.responses_total.fetch_add(1, Ordering::Relaxed);
68    }
69
70    /// Increment timeouts.
71    #[inline]
72    pub fn inc_timeouts(&self) {
73        self.timeouts_total.fetch_add(1, Ordering::Relaxed);
74    }
75
76    /// Increment connection errors.
77    #[inline]
78    pub fn inc_connection_errors(&self) {
79        self.connection_errors_total.fetch_add(1, Ordering::Relaxed);
80    }
81
82    /// Increment serialization errors.
83    #[inline]
84    pub fn inc_serialization_errors(&self) {
85        self.serialization_errors_total.fetch_add(1, Ordering::Relaxed);
86    }
87
88    /// Record flow control pause.
89    #[inline]
90    pub fn record_flow_pause(&self) {
91        self.flow_control_pauses_total.fetch_add(1, Ordering::Relaxed);
92    }
93
94    /// Record flow control resume.
95    #[inline]
96    pub fn record_flow_resume(&self) {
97        self.flow_control_resumes_total.fetch_add(1, Ordering::Relaxed);
98    }
99
100    /// Record flow control rejection.
101    #[inline]
102    pub fn record_flow_rejection(&self) {
103        self.flow_control_rejections_total.fetch_add(1, Ordering::Relaxed);
104    }
105
106    /// Set in-flight requests gauge.
107    #[inline]
108    pub fn set_in_flight(&self, count: u64) {
109        self.in_flight_requests.store(count, Ordering::Relaxed);
110    }
111
112    /// Increment in-flight requests.
113    #[inline]
114    pub fn inc_in_flight(&self) {
115        self.in_flight_requests.fetch_add(1, Ordering::Relaxed);
116    }
117
118    /// Decrement in-flight requests.
119    #[inline]
120    pub fn dec_in_flight(&self) {
121        self.in_flight_requests.fetch_sub(1, Ordering::Relaxed);
122    }
123
124    /// Set buffer utilization percentage.
125    #[inline]
126    pub fn set_buffer_utilization(&self, percent: u64) {
127        self.buffer_utilization_percent.store(percent.min(100), Ordering::Relaxed);
128    }
129
130    /// Set healthy connections gauge.
131    #[inline]
132    pub fn set_healthy_connections(&self, count: u64) {
133        self.healthy_connections.store(count, Ordering::Relaxed);
134    }
135
136    /// Set paused connections gauge.
137    #[inline]
138    pub fn set_paused_connections(&self, count: u64) {
139        self.paused_connections.store(count, Ordering::Relaxed);
140    }
141
142    /// Record serialization time.
143    #[inline]
144    pub fn record_serialization_time(&self, duration: Duration) {
145        self.serialization_time.record(duration);
146    }
147
148    /// Record request duration.
149    #[inline]
150    pub fn record_request_duration(&self, duration: Duration) {
151        self.request_duration.record(duration);
152    }
153
154    /// Get a snapshot of all metrics.
155    pub fn snapshot(&self) -> ProtocolMetricsSnapshot {
156        ProtocolMetricsSnapshot {
157            requests_total: self.requests_total.load(Ordering::Relaxed),
158            responses_total: self.responses_total.load(Ordering::Relaxed),
159            timeouts_total: self.timeouts_total.load(Ordering::Relaxed),
160            connection_errors_total: self.connection_errors_total.load(Ordering::Relaxed),
161            serialization_errors_total: self.serialization_errors_total.load(Ordering::Relaxed),
162            flow_control_pauses_total: self.flow_control_pauses_total.load(Ordering::Relaxed),
163            flow_control_resumes_total: self.flow_control_resumes_total.load(Ordering::Relaxed),
164            flow_control_rejections_total: self.flow_control_rejections_total.load(Ordering::Relaxed),
165            in_flight_requests: self.in_flight_requests.load(Ordering::Relaxed),
166            buffer_utilization_percent: self.buffer_utilization_percent.load(Ordering::Relaxed),
167            healthy_connections: self.healthy_connections.load(Ordering::Relaxed),
168            paused_connections: self.paused_connections.load(Ordering::Relaxed),
169            serialization_time: self.serialization_time.snapshot(),
170            request_duration: self.request_duration.snapshot(),
171        }
172    }
173
174    /// Export metrics in Prometheus text format.
175    pub fn to_prometheus(&self, prefix: &str) -> String {
176        let snap = self.snapshot();
177        let mut output = String::with_capacity(2048);
178
179        // Counters
180        output.push_str(&format!(
181            "# HELP {prefix}_requests_total Total requests sent to agents\n\
182             # TYPE {prefix}_requests_total counter\n\
183             {prefix}_requests_total {}\n\n",
184            snap.requests_total
185        ));
186
187        output.push_str(&format!(
188            "# HELP {prefix}_responses_total Total responses received from agents\n\
189             # TYPE {prefix}_responses_total counter\n\
190             {prefix}_responses_total {}\n\n",
191            snap.responses_total
192        ));
193
194        output.push_str(&format!(
195            "# HELP {prefix}_timeouts_total Total request timeouts\n\
196             # TYPE {prefix}_timeouts_total counter\n\
197             {prefix}_timeouts_total {}\n\n",
198            snap.timeouts_total
199        ));
200
201        output.push_str(&format!(
202            "# HELP {prefix}_connection_errors_total Total connection errors\n\
203             # TYPE {prefix}_connection_errors_total counter\n\
204             {prefix}_connection_errors_total {}\n\n",
205            snap.connection_errors_total
206        ));
207
208        output.push_str(&format!(
209            "# HELP {prefix}_flow_control_pauses_total Flow control pause events\n\
210             # TYPE {prefix}_flow_control_pauses_total counter\n\
211             {prefix}_flow_control_pauses_total {}\n\n",
212            snap.flow_control_pauses_total
213        ));
214
215        output.push_str(&format!(
216            "# HELP {prefix}_flow_control_rejections_total Requests rejected due to flow control\n\
217             # TYPE {prefix}_flow_control_rejections_total counter\n\
218             {prefix}_flow_control_rejections_total {}\n\n",
219            snap.flow_control_rejections_total
220        ));
221
222        // Gauges
223        output.push_str(&format!(
224            "# HELP {prefix}_in_flight_requests Current in-flight requests\n\
225             # TYPE {prefix}_in_flight_requests gauge\n\
226             {prefix}_in_flight_requests {}\n\n",
227            snap.in_flight_requests
228        ));
229
230        output.push_str(&format!(
231            "# HELP {prefix}_buffer_utilization_percent Buffer utilization percentage\n\
232             # TYPE {prefix}_buffer_utilization_percent gauge\n\
233             {prefix}_buffer_utilization_percent {}\n\n",
234            snap.buffer_utilization_percent
235        ));
236
237        output.push_str(&format!(
238            "# HELP {prefix}_healthy_connections Number of healthy agent connections\n\
239             # TYPE {prefix}_healthy_connections gauge\n\
240             {prefix}_healthy_connections {}\n\n",
241            snap.healthy_connections
242        ));
243
244        output.push_str(&format!(
245            "# HELP {prefix}_paused_connections Number of flow-control paused connections\n\
246             # TYPE {prefix}_paused_connections gauge\n\
247             {prefix}_paused_connections {}\n\n",
248            snap.paused_connections
249        ));
250
251        // Histograms
252        output.push_str(&snap.serialization_time.to_prometheus(
253            &format!("{prefix}_serialization_seconds"),
254            "Serialization time in seconds",
255        ));
256
257        output.push_str(&snap.request_duration.to_prometheus(
258            &format!("{prefix}_request_duration_seconds"),
259            "Request duration in seconds",
260        ));
261
262        output
263    }
264}
265
266/// Simple histogram metric using predefined buckets.
267#[derive(Debug)]
268pub struct HistogramMetric {
269    /// Bucket boundaries in microseconds
270    buckets: Vec<u64>,
271    /// Counts per bucket (one extra for +Inf)
272    counts: Vec<AtomicU64>,
273    /// Sum of all observations in microseconds
274    sum: AtomicU64,
275    /// Total count
276    count: AtomicU64,
277}
278
279impl Default for HistogramMetric {
280    fn default() -> Self {
281        // Default buckets: 10μs, 50μs, 100μs, 500μs, 1ms, 5ms, 10ms, 50ms, 100ms, 500ms, 1s
282        let buckets = vec![10, 50, 100, 500, 1_000, 5_000, 10_000, 50_000, 100_000, 500_000, 1_000_000];
283        let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
284        Self {
285            buckets,
286            counts,
287            sum: AtomicU64::new(0),
288            count: AtomicU64::new(0),
289        }
290    }
291}
292
293impl HistogramMetric {
294    /// Create a new histogram with custom bucket boundaries (in microseconds).
295    pub fn with_buckets(buckets: Vec<u64>) -> Self {
296        let counts = (0..=buckets.len()).map(|_| AtomicU64::new(0)).collect();
297        Self {
298            buckets,
299            counts,
300            sum: AtomicU64::new(0),
301            count: AtomicU64::new(0),
302        }
303    }
304
305    /// Record an observation.
306    #[inline]
307    pub fn record(&self, duration: Duration) {
308        let micros = duration.as_micros() as u64;
309
310        // Update sum and count
311        self.sum.fetch_add(micros, Ordering::Relaxed);
312        self.count.fetch_add(1, Ordering::Relaxed);
313
314        // Find bucket and increment
315        let bucket_idx = self.buckets.iter().position(|&b| micros <= b).unwrap_or(self.buckets.len());
316        self.counts[bucket_idx].fetch_add(1, Ordering::Relaxed);
317    }
318
319    /// Get a snapshot of the histogram.
320    pub fn snapshot(&self) -> HistogramSnapshot {
321        HistogramSnapshot {
322            buckets: self.buckets.clone(),
323            counts: self.counts.iter().map(|c| c.load(Ordering::Relaxed)).collect(),
324            sum: self.sum.load(Ordering::Relaxed),
325            count: self.count.load(Ordering::Relaxed),
326        }
327    }
328}
329
330/// Snapshot of histogram data.
331#[derive(Debug, Clone)]
332pub struct HistogramSnapshot {
333    /// Bucket boundaries in microseconds
334    pub buckets: Vec<u64>,
335    /// Counts per bucket
336    pub counts: Vec<u64>,
337    /// Sum of all observations in microseconds
338    pub sum: u64,
339    /// Total count
340    pub count: u64,
341}
342
343impl HistogramSnapshot {
344    /// Export to Prometheus format.
345    pub fn to_prometheus(&self, name: &str, help: &str) -> String {
346        let mut output = String::with_capacity(512);
347
348        output.push_str(&format!("# HELP {name} {help}\n"));
349        output.push_str(&format!("# TYPE {name} histogram\n"));
350
351        // Cumulative bucket counts
352        let mut cumulative = 0u64;
353        for (i, &boundary) in self.buckets.iter().enumerate() {
354            cumulative += self.counts[i];
355            let le = boundary as f64 / 1_000_000.0; // Convert to seconds
356            output.push_str(&format!("{name}_bucket{{le=\"{le:.6}\"}} {cumulative}\n"));
357        }
358
359        // +Inf bucket
360        cumulative += self.counts.last().copied().unwrap_or(0);
361        output.push_str(&format!("{name}_bucket{{le=\"+Inf\"}} {cumulative}\n"));
362
363        // Sum and count
364        let sum_seconds = self.sum as f64 / 1_000_000.0;
365        output.push_str(&format!("{name}_sum {sum_seconds:.6}\n"));
366        output.push_str(&format!("{name}_count {}\n\n", self.count));
367
368        output
369    }
370
371    /// Get the mean value in microseconds.
372    pub fn mean_micros(&self) -> f64 {
373        if self.count == 0 {
374            0.0
375        } else {
376            self.sum as f64 / self.count as f64
377        }
378    }
379
380    /// Get the approximate percentile value in microseconds.
381    pub fn percentile_micros(&self, p: f64) -> u64 {
382        if self.count == 0 {
383            return 0;
384        }
385
386        let target = (self.count as f64 * p / 100.0).ceil() as u64;
387        let mut cumulative = 0u64;
388
389        for (i, &count) in self.counts.iter().enumerate() {
390            cumulative += count;
391            if cumulative >= target {
392                return if i < self.buckets.len() {
393                    self.buckets[i]
394                } else {
395                    // +Inf bucket, return last finite bucket
396                    self.buckets.last().copied().unwrap_or(0)
397                };
398            }
399        }
400
401        self.buckets.last().copied().unwrap_or(0)
402    }
403}
404
405/// Snapshot of all protocol metrics.
406#[derive(Debug, Clone)]
407pub struct ProtocolMetricsSnapshot {
408    // Counters
409    pub requests_total: u64,
410    pub responses_total: u64,
411    pub timeouts_total: u64,
412    pub connection_errors_total: u64,
413    pub serialization_errors_total: u64,
414    pub flow_control_pauses_total: u64,
415    pub flow_control_resumes_total: u64,
416    pub flow_control_rejections_total: u64,
417
418    // Gauges
419    pub in_flight_requests: u64,
420    pub buffer_utilization_percent: u64,
421    pub healthy_connections: u64,
422    pub paused_connections: u64,
423
424    // Histograms
425    pub serialization_time: HistogramSnapshot,
426    pub request_duration: HistogramSnapshot,
427}
428
429/// Helper to measure and record duration.
430pub struct DurationRecorder<'a> {
431    histogram: &'a HistogramMetric,
432    start: Instant,
433}
434
435impl<'a> DurationRecorder<'a> {
436    /// Start recording duration.
437    pub fn new(histogram: &'a HistogramMetric) -> Self {
438        Self {
439            histogram,
440            start: Instant::now(),
441        }
442    }
443
444    /// Record the elapsed duration.
445    pub fn record(self) {
446        self.histogram.record(self.start.elapsed());
447    }
448}
449
450impl Drop for DurationRecorder<'_> {
451    fn drop(&mut self) {
452        // Don't double-record, this is just a safety net
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use super::*;
459
460    #[test]
461    fn test_counter_increments() {
462        let metrics = ProtocolMetrics::new();
463
464        metrics.inc_requests();
465        metrics.inc_requests();
466        metrics.inc_responses();
467
468        let snap = metrics.snapshot();
469        assert_eq!(snap.requests_total, 2);
470        assert_eq!(snap.responses_total, 1);
471    }
472
473    #[test]
474    fn test_gauge_updates() {
475        let metrics = ProtocolMetrics::new();
476
477        metrics.set_in_flight(5);
478        metrics.inc_in_flight();
479        metrics.dec_in_flight();
480
481        let snap = metrics.snapshot();
482        assert_eq!(snap.in_flight_requests, 5);
483    }
484
485    #[test]
486    fn test_histogram_recording() {
487        let metrics = ProtocolMetrics::new();
488
489        metrics.record_serialization_time(Duration::from_micros(50));
490        metrics.record_serialization_time(Duration::from_micros(150));
491        metrics.record_serialization_time(Duration::from_millis(5));
492
493        let snap = metrics.snapshot();
494        assert_eq!(snap.serialization_time.count, 3);
495        assert_eq!(snap.serialization_time.sum, 50 + 150 + 5000);
496    }
497
498    #[test]
499    fn test_histogram_percentile() {
500        let hist = HistogramMetric::default();
501
502        // Record 100 observations from 1μs to 100μs
503        for i in 1..=100 {
504            hist.record(Duration::from_micros(i));
505        }
506
507        let snap = hist.snapshot();
508        assert_eq!(snap.count, 100);
509
510        // p50 should be around 50μs (in the 50μs bucket)
511        let p50 = snap.percentile_micros(50.0);
512        assert!(p50 <= 100, "p50 was {}", p50);
513    }
514
515    #[test]
516    fn test_flow_control_metrics() {
517        let metrics = ProtocolMetrics::new();
518
519        metrics.record_flow_pause();
520        metrics.record_flow_pause();
521        metrics.record_flow_rejection();
522
523        let snap = metrics.snapshot();
524        assert_eq!(snap.flow_control_pauses_total, 2);
525        assert_eq!(snap.flow_control_rejections_total, 1);
526    }
527
528    #[test]
529    fn test_prometheus_export() {
530        let metrics = ProtocolMetrics::new();
531
532        metrics.inc_requests();
533        metrics.set_healthy_connections(3);
534        metrics.record_serialization_time(Duration::from_micros(100));
535
536        let output = metrics.to_prometheus("agent_protocol");
537
538        assert!(output.contains("agent_protocol_requests_total 1"));
539        assert!(output.contains("agent_protocol_healthy_connections 3"));
540        assert!(output.contains("agent_protocol_serialization_seconds"));
541    }
542}