a2a_protocol_server/otel/
mod.rs1mod builder;
45mod pipeline;
46
47use std::time::Duration;
48
49use opentelemetry::metrics::{Counter, Gauge, Histogram, Meter};
50use opentelemetry::KeyValue;
51
52use crate::metrics::{ConnectionPoolStats, Metrics};
53
54pub use builder::OtelMetricsBuilder;
55pub use pipeline::init_otlp_pipeline;
56
57pub struct OtelMetrics {
78 request_counter: Counter<u64>,
79 response_counter: Counter<u64>,
80 error_counter: Counter<u64>,
81 latency_histogram: Histogram<f64>,
82 queue_depth_gauge: Gauge<u64>,
83 pool_active_gauge: Gauge<u64>,
84 pool_idle_gauge: Gauge<u64>,
85 pool_created_counter: Counter<u64>,
86 pool_closed_counter: Counter<u64>,
87}
88
89impl std::fmt::Debug for OtelMetrics {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 f.debug_struct("OtelMetrics").finish_non_exhaustive()
92 }
93}
94
95impl OtelMetrics {
96 #[must_use]
100 pub fn from_meter(meter: &Meter) -> Self {
101 let request_counter = meter
102 .u64_counter("a2a.server.requests")
103 .with_description("Total number of inbound A2A requests")
104 .with_unit("request")
105 .build();
106
107 let response_counter = meter
108 .u64_counter("a2a.server.responses")
109 .with_description("Total number of outbound A2A responses")
110 .with_unit("response")
111 .build();
112
113 let error_counter = meter
114 .u64_counter("a2a.server.errors")
115 .with_description("Total number of A2A request errors")
116 .with_unit("error")
117 .build();
118
119 let latency_histogram = meter
120 .f64_histogram("a2a.server.latency")
121 .with_description("A2A request latency")
122 .with_unit("s")
123 .build();
124
125 let queue_depth_gauge = meter
126 .u64_gauge("a2a.server.queue_depth")
127 .with_description("Number of active event queues")
128 .with_unit("queue")
129 .build();
130
131 let pool_active_gauge = meter
132 .u64_gauge("a2a.server.pool.active")
133 .with_description("Number of active (in-use) HTTP connections")
134 .with_unit("connection")
135 .build();
136
137 let pool_idle_gauge = meter
138 .u64_gauge("a2a.server.pool.idle")
139 .with_description("Number of idle HTTP connections")
140 .with_unit("connection")
141 .build();
142
143 let pool_created_counter = meter
144 .u64_counter("a2a.server.pool.created")
145 .with_description("Total HTTP connections created since process start")
146 .with_unit("connection")
147 .build();
148
149 let pool_closed_counter = meter
150 .u64_counter("a2a.server.pool.closed")
151 .with_description("HTTP connections closed due to errors or timeouts")
152 .with_unit("connection")
153 .build();
154
155 Self {
156 request_counter,
157 response_counter,
158 error_counter,
159 latency_histogram,
160 queue_depth_gauge,
161 pool_active_gauge,
162 pool_idle_gauge,
163 pool_created_counter,
164 pool_closed_counter,
165 }
166 }
167}
168
169impl Metrics for OtelMetrics {
170 fn on_request(&self, method: &str) {
171 self.request_counter
172 .add(1, &[KeyValue::new("method", method.to_owned())]);
173 }
174
175 fn on_response(&self, method: &str) {
176 self.response_counter
177 .add(1, &[KeyValue::new("method", method.to_owned())]);
178 }
179
180 fn on_error(&self, method: &str, error: &str) {
181 self.error_counter.add(
182 1,
183 &[
184 KeyValue::new("method", method.to_owned()),
185 KeyValue::new("error", error.to_owned()),
186 ],
187 );
188 }
189
190 fn on_latency(&self, method: &str, duration: Duration) {
191 self.latency_histogram.record(
192 duration.as_secs_f64(),
193 &[KeyValue::new("method", method.to_owned())],
194 );
195 }
196
197 fn on_queue_depth_change(&self, active_queues: usize) {
198 #[allow(clippy::cast_possible_truncation)]
199 self.queue_depth_gauge.record(active_queues as u64, &[]);
200 }
201
202 fn on_connection_pool_stats(&self, stats: &ConnectionPoolStats) {
203 self.pool_active_gauge
204 .record(u64::from(stats.active_connections), &[]);
205 self.pool_idle_gauge
206 .record(u64::from(stats.idle_connections), &[]);
207 self.pool_created_counter
208 .add(stats.total_connections_created, &[]);
209 self.pool_closed_counter.add(stats.connections_closed, &[]);
210 }
211}
212
213#[cfg(test)]
216mod tests {
217 use super::*;
218
219 fn noop_otel_metrics() -> OtelMetrics {
221 let meter = opentelemetry::global::meter("test");
222 OtelMetrics::from_meter(&meter)
223 }
224
225 #[test]
226 fn from_meter_creates_all_instruments() {
227 let metrics = noop_otel_metrics();
228 let debug = format!("{metrics:?}");
229 assert!(debug.contains("OtelMetrics"));
230 }
231
232 #[test]
233 fn on_request_does_not_panic() {
234 let metrics = noop_otel_metrics();
235 metrics.on_request("message/send");
236 metrics.on_request("tasks/get");
237 }
238
239 #[test]
240 fn on_response_does_not_panic() {
241 let metrics = noop_otel_metrics();
242 metrics.on_response("message/send");
243 }
244
245 #[test]
246 fn on_error_does_not_panic() {
247 let metrics = noop_otel_metrics();
248 metrics.on_error("message/send", "timeout");
249 metrics.on_error("tasks/get", "not_found");
250 }
251
252 #[test]
253 fn on_latency_does_not_panic() {
254 let metrics = noop_otel_metrics();
255 metrics.on_latency("message/send", Duration::from_millis(42));
256 metrics.on_latency("message/send", Duration::from_secs(0));
257 }
258
259 #[test]
260 fn on_queue_depth_change_does_not_panic() {
261 let metrics = noop_otel_metrics();
262 metrics.on_queue_depth_change(0);
263 metrics.on_queue_depth_change(100);
264 }
265
266 #[test]
267 fn on_connection_pool_stats_does_not_panic() {
268 let metrics = noop_otel_metrics();
269 metrics.on_connection_pool_stats(&ConnectionPoolStats {
270 active_connections: 5,
271 idle_connections: 10,
272 total_connections_created: 42,
273 connections_closed: 3,
274 });
275 }
276
277 use opentelemetry::metrics::MeterProvider;
280 use opentelemetry_sdk::metrics::data::{ResourceMetrics, Sum};
281 use opentelemetry_sdk::metrics::reader::MetricReader;
282 use opentelemetry_sdk::metrics::{ManualReader, SdkMeterProvider};
283 use opentelemetry_sdk::Resource;
284
285 struct CloneableReader(std::sync::Arc<ManualReader>);
286
287 impl std::fmt::Debug for CloneableReader {
288 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
289 f.write_str("CloneableReader")
290 }
291 }
292
293 impl Clone for CloneableReader {
294 fn clone(&self) -> Self {
295 Self(self.0.clone())
296 }
297 }
298
299 impl MetricReader for CloneableReader {
300 fn register_pipeline(
301 &self,
302 pipeline: std::sync::Weak<opentelemetry_sdk::metrics::Pipeline>,
303 ) {
304 self.0.register_pipeline(pipeline);
305 }
306 fn collect(
307 &self,
308 rm: &mut ResourceMetrics,
309 ) -> opentelemetry_sdk::metrics::MetricResult<()> {
310 self.0.collect(rm)
311 }
312 fn force_flush(&self) -> opentelemetry_sdk::error::OTelSdkResult {
313 self.0.force_flush()
314 }
315 fn shutdown(&self) -> opentelemetry_sdk::error::OTelSdkResult {
316 self.0.shutdown()
317 }
318 fn temporality(
319 &self,
320 kind: opentelemetry_sdk::metrics::InstrumentKind,
321 ) -> opentelemetry_sdk::metrics::Temporality {
322 self.0.temporality(kind)
323 }
324 }
325
326 fn metrics_with_reader() -> (OtelMetrics, CloneableReader) {
327 let reader = CloneableReader(std::sync::Arc::new(ManualReader::default()));
328 let provider = SdkMeterProvider::builder()
329 .with_reader(reader.clone())
330 .with_resource(Resource::builder().build())
331 .build();
332 let meter = provider.meter("test");
333 let metrics = OtelMetrics::from_meter(&meter);
334 std::mem::forget(provider);
335 (metrics, reader)
336 }
337
338 fn collect_metrics(reader: &CloneableReader) -> ResourceMetrics {
339 let mut rm = ResourceMetrics {
340 resource: Resource::builder().build(),
341 scope_metrics: vec![],
342 };
343 reader.collect(&mut rm).expect("collect");
344 rm
345 }
346
347 fn find_sum_u64(rm: &ResourceMetrics, name: &str) -> u64 {
348 for scope in &rm.scope_metrics {
349 for metric in &scope.metrics {
350 if metric.name == name {
351 if let Some(sum) = metric.data.as_any().downcast_ref::<Sum<u64>>() {
352 return sum.data_points.iter().map(|dp| dp.value).sum();
353 }
354 }
355 }
356 }
357 0
358 }
359
360 #[test]
361 fn on_request_increments_counter() {
362 let (metrics, reader) = metrics_with_reader();
363 metrics.on_request("test/method");
364 let rm = collect_metrics(&reader);
365 assert!(
366 find_sum_u64(&rm, "a2a.server.requests") > 0,
367 "request counter should be incremented"
368 );
369 }
370
371 #[test]
372 fn on_response_increments_counter() {
373 let (metrics, reader) = metrics_with_reader();
374 metrics.on_response("test/method");
375 let rm = collect_metrics(&reader);
376 assert!(
377 find_sum_u64(&rm, "a2a.server.responses") > 0,
378 "response counter should be incremented"
379 );
380 }
381
382 #[test]
383 fn on_error_increments_counter() {
384 let (metrics, reader) = metrics_with_reader();
385 metrics.on_error("test/method", "timeout");
386 let rm = collect_metrics(&reader);
387 assert!(
388 find_sum_u64(&rm, "a2a.server.errors") > 0,
389 "error counter should be incremented"
390 );
391 }
392
393 #[test]
394 fn on_latency_records_histogram() {
395 use opentelemetry_sdk::metrics::data::Histogram as DataHistogram;
396
397 let (metrics, reader) = metrics_with_reader();
398 metrics.on_latency("test/method", Duration::from_millis(42));
399 let rm = collect_metrics(&reader);
400
401 let mut found = false;
402 for scope in &rm.scope_metrics {
403 for metric in &scope.metrics {
404 if metric.name == "a2a.server.latency" {
405 if let Some(hist) = metric.data.as_any().downcast_ref::<DataHistogram<f64>>() {
406 let count: u64 = hist.data_points.iter().map(|dp| dp.count).sum();
407 assert!(count > 0, "histogram should have recorded a value");
408 found = true;
409 }
410 }
411 }
412 }
413 assert!(found, "latency histogram metric should exist");
414 }
415
416 #[test]
417 fn on_queue_depth_records_gauge() {
418 use opentelemetry_sdk::metrics::data::Gauge as DataGauge;
419
420 let (metrics, reader) = metrics_with_reader();
421 metrics.on_queue_depth_change(42);
422 let rm = collect_metrics(&reader);
423
424 let mut found = false;
425 for scope in &rm.scope_metrics {
426 for metric in &scope.metrics {
427 if metric.name == "a2a.server.queue_depth" {
428 if let Some(gauge) = metric.data.as_any().downcast_ref::<DataGauge<u64>>() {
429 let val: u64 = gauge.data_points.iter().map(|dp| dp.value).sum();
430 assert_eq!(val, 42, "gauge should record 42");
431 found = true;
432 }
433 }
434 }
435 }
436 assert!(found, "queue_depth gauge metric should exist");
437 }
438
439 #[test]
440 fn on_connection_pool_stats_records_all_instruments() {
441 let (metrics, reader) = metrics_with_reader();
442 metrics.on_connection_pool_stats(&ConnectionPoolStats {
443 active_connections: 5,
444 idle_connections: 10,
445 total_connections_created: 42,
446 connections_closed: 3,
447 });
448 let rm = collect_metrics(&reader);
449
450 assert!(
451 find_sum_u64(&rm, "a2a.server.pool.created") > 0,
452 "pool.created counter should be incremented"
453 );
454 assert!(
455 find_sum_u64(&rm, "a2a.server.pool.closed") > 0,
456 "pool.closed counter should be incremented"
457 );
458 }
459}