a2a_protocol_server/
metrics.rs1use std::sync::Arc;
31use std::time::Duration;
32
33#[derive(Debug, Clone, Copy, Default)]
37pub struct ConnectionPoolStats {
38 pub active_connections: u32,
40 pub idle_connections: u32,
42 pub total_connections_created: u64,
44 pub connections_closed: u64,
46}
47
48pub trait Metrics: Send + Sync + 'static {
53 fn on_request(&self, _method: &str) {}
55
56 fn on_response(&self, _method: &str) {}
58
59 fn on_error(&self, _method: &str, _error: &str) {}
61
62 fn on_latency(&self, _method: &str, _duration: Duration) {}
68
69 fn on_queue_depth_change(&self, _active_queues: usize) {}
71
72 fn on_connection_pool_stats(&self, _stats: &ConnectionPoolStats) {}
76}
77
78#[derive(Debug, Default)]
80pub struct NoopMetrics;
81
82impl Metrics for NoopMetrics {}
83
84impl<T: Metrics + ?Sized> Metrics for Arc<T> {
89 fn on_request(&self, method: &str) {
90 (**self).on_request(method);
91 }
92
93 fn on_response(&self, method: &str) {
94 (**self).on_response(method);
95 }
96
97 fn on_error(&self, method: &str, error: &str) {
98 (**self).on_error(method, error);
99 }
100
101 fn on_latency(&self, method: &str, duration: Duration) {
102 (**self).on_latency(method, duration);
103 }
104
105 fn on_queue_depth_change(&self, active_queues: usize) {
106 (**self).on_queue_depth_change(active_queues);
107 }
108
109 fn on_connection_pool_stats(&self, stats: &ConnectionPoolStats) {
110 (**self).on_connection_pool_stats(stats);
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117 use std::sync::atomic::{AtomicU64, Ordering};
118
119 struct RecordingMetrics {
121 requests: AtomicU64,
122 responses: AtomicU64,
123 errors: AtomicU64,
124 latencies: AtomicU64,
125 queue_depths: AtomicU64,
126 pool_stats: AtomicU64,
127 }
128
129 impl RecordingMetrics {
130 fn new() -> Self {
131 Self {
132 requests: AtomicU64::new(0),
133 responses: AtomicU64::new(0),
134 errors: AtomicU64::new(0),
135 latencies: AtomicU64::new(0),
136 queue_depths: AtomicU64::new(0),
137 pool_stats: AtomicU64::new(0),
138 }
139 }
140 }
141
142 impl Metrics for RecordingMetrics {
143 fn on_request(&self, _method: &str) {
144 self.requests.fetch_add(1, Ordering::Relaxed);
145 }
146 fn on_response(&self, _method: &str) {
147 self.responses.fetch_add(1, Ordering::Relaxed);
148 }
149 fn on_error(&self, _method: &str, _error: &str) {
150 self.errors.fetch_add(1, Ordering::Relaxed);
151 }
152 fn on_latency(&self, _method: &str, _duration: Duration) {
153 self.latencies.fetch_add(1, Ordering::Relaxed);
154 }
155 fn on_queue_depth_change(&self, _active_queues: usize) {
156 self.queue_depths.fetch_add(1, Ordering::Relaxed);
157 }
158 fn on_connection_pool_stats(&self, _stats: &ConnectionPoolStats) {
159 self.pool_stats.fetch_add(1, Ordering::Relaxed);
160 }
161 }
162
163 #[test]
164 fn arc_delegates_on_request() {
165 let inner = Arc::new(RecordingMetrics::new());
166 let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
167 arc_metrics.on_request("test");
168 assert_eq!(inner.requests.load(Ordering::Relaxed), 1);
169 }
170
171 #[test]
172 fn arc_delegates_on_response() {
173 let inner = Arc::new(RecordingMetrics::new());
174 let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
175 arc_metrics.on_response("test");
176 assert_eq!(inner.responses.load(Ordering::Relaxed), 1);
177 }
178
179 #[test]
180 fn arc_delegates_on_error() {
181 let inner = Arc::new(RecordingMetrics::new());
182 let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
183 arc_metrics.on_error("test", "err");
184 assert_eq!(inner.errors.load(Ordering::Relaxed), 1);
185 }
186
187 #[test]
188 fn arc_delegates_on_latency() {
189 let inner = Arc::new(RecordingMetrics::new());
190 let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
191 arc_metrics.on_latency("test", Duration::from_millis(10));
192 assert_eq!(inner.latencies.load(Ordering::Relaxed), 1);
193 }
194
195 #[test]
196 fn arc_delegates_on_queue_depth_change() {
197 let inner = Arc::new(RecordingMetrics::new());
198 let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
199 arc_metrics.on_queue_depth_change(5);
200 assert_eq!(inner.queue_depths.load(Ordering::Relaxed), 1);
201 }
202
203 #[test]
204 fn arc_delegates_on_connection_pool_stats() {
205 let inner = Arc::new(RecordingMetrics::new());
206 let arc_metrics: Arc<RecordingMetrics> = Arc::clone(&inner);
207 arc_metrics.on_connection_pool_stats(&ConnectionPoolStats::default());
208 assert_eq!(inner.pool_stats.load(Ordering::Relaxed), 1);
209 }
210}