1use std::borrow::Cow;
2use std::future::Future;
3use std::pin::Pin;
4use std::string::String;
5use std::sync::Arc;
6use std::task::Poll::Ready;
7use std::task::{Context, Poll};
8use std::time::Instant;
9use std::{fmt, result};
10
11#[cfg(feature = "axum")]
12use axum::extract::MatchedPath;
13use futures_util::ready;
14use opentelemetry::metrics::{Histogram, Meter, UpDownCounter};
15use opentelemetry::KeyValue;
16use opentelemetry_semantic_conventions as semconv;
17use pin_project_lite::pin_project;
18use tower_layer::Layer;
19use tower_service::Service;
20
21const HTTP_SERVER_DURATION_METRIC: &str = semconv::metric::HTTP_SERVER_REQUEST_DURATION;
22const HTTP_SERVER_DURATION_UNIT: &str = "s";
23
24const _OTEL_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES: [f64; 14] = [
25 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
26];
27
28const LIBRARY_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES: [f64; 14] = [
33 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
34];
35const HTTP_SERVER_ACTIVE_REQUESTS_METRIC: &str = semconv::metric::HTTP_SERVER_ACTIVE_REQUESTS;
36const HTTP_SERVER_ACTIVE_REQUESTS_UNIT: &str = "{request}";
37
38const HTTP_SERVER_REQUEST_BODY_SIZE_METRIC: &str = semconv::metric::HTTP_SERVER_REQUEST_BODY_SIZE;
39const HTTP_SERVER_REQUEST_BODY_SIZE_UNIT: &str = "By";
40
41const HTTP_SERVER_RESPONSE_BODY_SIZE_METRIC: &str = semconv::metric::HTTP_SERVER_RESPONSE_BODY_SIZE;
42const HTTP_SERVER_RESPONSE_BODY_SIZE_UNIT: &str = "By";
43
44const NETWORK_PROTOCOL_NAME_LABEL: &str = semconv::attribute::NETWORK_PROTOCOL_NAME;
45const NETWORK_PROTOCOL_VERSION_LABEL: &str = "network.protocol.version";
46const URL_SCHEME_LABEL: &str = "url.scheme";
47
48const HTTP_REQUEST_METHOD_LABEL: &str = semconv::attribute::HTTP_REQUEST_METHOD;
49#[cfg(feature = "axum")]
50const HTTP_ROUTE_LABEL: &str = semconv::attribute::HTTP_ROUTE;
51const HTTP_RESPONSE_STATUS_CODE_LABEL: &str = semconv::attribute::HTTP_RESPONSE_STATUS_CODE;
52
53pub trait RequestAttributeExtractor<B>: Clone + Send + Sync + 'static {
55 fn extract_attributes(&self, req: &http::Request<B>) -> Vec<KeyValue>;
56}
57
58pub trait ResponseAttributeExtractor<B>: Clone + Send + Sync + 'static {
60 fn extract_attributes(&self, res: &http::Response<B>) -> Vec<KeyValue>;
61}
62
63#[derive(Clone)]
65pub struct NoOpExtractor;
66
67impl<B> RequestAttributeExtractor<B> for NoOpExtractor {
68 fn extract_attributes(&self, _req: &http::Request<B>) -> Vec<KeyValue> {
69 vec![]
70 }
71}
72
73impl<B> ResponseAttributeExtractor<B> for NoOpExtractor {
74 fn extract_attributes(&self, _res: &http::Response<B>) -> Vec<KeyValue> {
75 vec![]
76 }
77}
78
79#[derive(Clone)]
81pub struct FnRequestExtractor<F> {
82 extractor: F,
83}
84
85impl<F> FnRequestExtractor<F> {
86 pub fn new(extractor: F) -> Self {
87 Self { extractor }
88 }
89}
90
91impl<F, B> RequestAttributeExtractor<B> for FnRequestExtractor<F>
92where
93 F: Fn(&http::Request<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
94{
95 fn extract_attributes(&self, req: &http::Request<B>) -> Vec<KeyValue> {
96 (self.extractor)(req)
97 }
98}
99
100#[derive(Clone)]
102pub struct FnResponseExtractor<F> {
103 extractor: F,
104}
105
106impl<F> FnResponseExtractor<F> {
107 pub fn new(extractor: F) -> Self {
108 Self { extractor }
109 }
110}
111
112impl<F, B> ResponseAttributeExtractor<B> for FnResponseExtractor<F>
113where
114 F: Fn(&http::Response<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
115{
116 fn extract_attributes(&self, res: &http::Response<B>) -> Vec<KeyValue> {
117 (self.extractor)(res)
118 }
119}
120
121struct HTTPMetricsLayerState {
127 pub server_request_duration: Histogram<f64>,
128 pub server_active_requests: UpDownCounter<i64>,
129 pub server_request_body_size: Histogram<u64>,
130 pub server_response_body_size: Histogram<u64>,
131}
132
133#[derive(Clone)]
134pub struct HTTPMetricsService<S, ReqExt = NoOpExtractor, ResExt = NoOpExtractor> {
136 pub(crate) state: Arc<HTTPMetricsLayerState>,
137 request_extractor: ReqExt,
138 response_extractor: ResExt,
139 inner_service: S,
140}
141
142#[derive(Clone)]
143pub struct HTTPMetricsLayer<ReqExt = NoOpExtractor, ResExt = NoOpExtractor> {
145 state: Arc<HTTPMetricsLayerState>,
146 request_extractor: ReqExt,
147 response_extractor: ResExt,
148}
149
150pub struct HTTPMetricsLayerBuilder<ReqExt = NoOpExtractor, ResExt = NoOpExtractor> {
151 meter: Option<Meter>,
152 req_dur_bounds: Option<Vec<f64>>,
153 request_extractor: ReqExt,
154 response_extractor: ResExt,
155}
156
157pub struct Error {
159 #[allow(dead_code)]
160 inner: ErrorKind,
161}
162
163impl std::fmt::Display for Error {
164 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165 match self.inner {
166 ErrorKind::Other(ref s) => write!(f, "{s}"),
167 ErrorKind::Config(ref s) => write!(f, "config error: {s}"),
168 }
169 }
170}
171
172impl std::error::Error for Error {}
173
174pub type Result<T> = result::Result<T, Error>;
176
177enum ErrorKind {
178 #[allow(dead_code)]
179 Other(String),
181 #[allow(dead_code)]
182 Config(String),
184}
185
186impl fmt::Debug for Error {
187 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
188 f.debug_tuple("opentelemetry_instrumentation_tower::Error")
189 .finish()
190 }
191}
192
193impl HTTPMetricsLayerBuilder {
194 pub fn builder() -> Self {
195 HTTPMetricsLayerBuilder {
196 meter: None,
197 req_dur_bounds: Some(LIBRARY_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES.to_vec()),
198 request_extractor: NoOpExtractor,
199 response_extractor: NoOpExtractor,
200 }
201 }
202}
203
204impl<ReqExt, ResExt> HTTPMetricsLayerBuilder<ReqExt, ResExt> {
205 pub fn with_request_extractor<NewReqExt, B>(
207 self,
208 extractor: NewReqExt,
209 ) -> HTTPMetricsLayerBuilder<NewReqExt, ResExt>
210 where
211 NewReqExt: RequestAttributeExtractor<B>,
212 {
213 HTTPMetricsLayerBuilder {
214 meter: self.meter,
215 req_dur_bounds: self.req_dur_bounds,
216 request_extractor: extractor,
217 response_extractor: self.response_extractor,
218 }
219 }
220
221 pub fn with_response_extractor<NewResExt, B>(
223 self,
224 extractor: NewResExt,
225 ) -> HTTPMetricsLayerBuilder<ReqExt, NewResExt>
226 where
227 NewResExt: ResponseAttributeExtractor<B>,
228 {
229 HTTPMetricsLayerBuilder {
230 meter: self.meter,
231 req_dur_bounds: self.req_dur_bounds,
232 request_extractor: self.request_extractor,
233 response_extractor: extractor,
234 }
235 }
236
237 pub fn with_request_extractor_fn<F, B>(
239 self,
240 f: F,
241 ) -> HTTPMetricsLayerBuilder<FnRequestExtractor<F>, ResExt>
242 where
243 F: Fn(&http::Request<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
244 {
245 self.with_request_extractor(FnRequestExtractor::new(f))
246 }
247
248 pub fn with_response_extractor_fn<F, B>(
250 self,
251 f: F,
252 ) -> HTTPMetricsLayerBuilder<ReqExt, FnResponseExtractor<F>>
253 where
254 F: Fn(&http::Response<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
255 {
256 self.with_response_extractor(FnResponseExtractor::new(f))
257 }
258
259 pub fn build(self) -> Result<HTTPMetricsLayer<ReqExt, ResExt>> {
260 let req_dur_bounds = self
261 .req_dur_bounds
262 .unwrap_or_else(|| LIBRARY_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES.to_vec());
263 match self.meter {
264 Some(meter) => Ok(HTTPMetricsLayer {
265 state: Arc::from(Self::make_state(meter, req_dur_bounds)),
266 request_extractor: self.request_extractor,
267 response_extractor: self.response_extractor,
268 }),
269 None => Err(Error {
270 inner: ErrorKind::Config(String::from("no meter provided")),
271 }),
272 }
273 }
274
275 pub fn with_meter(mut self, meter: Meter) -> Self {
276 self.meter = Some(meter);
277 self
278 }
279
280 pub fn with_request_duration_bounds(mut self, bounds: Vec<f64>) -> Self {
281 self.req_dur_bounds = Some(bounds);
282 self
283 }
284
285 fn make_state(meter: Meter, req_dur_bounds: Vec<f64>) -> HTTPMetricsLayerState {
286 HTTPMetricsLayerState {
287 server_request_duration: meter
288 .f64_histogram(Cow::from(HTTP_SERVER_DURATION_METRIC))
289 .with_description("Duration of HTTP server requests.")
290 .with_unit(Cow::from(HTTP_SERVER_DURATION_UNIT))
291 .with_boundaries(req_dur_bounds)
292 .build(),
293 server_active_requests: meter
294 .i64_up_down_counter(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_METRIC))
295 .with_description("Number of active HTTP server requests.")
296 .with_unit(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_UNIT))
297 .build(),
298 server_request_body_size: meter
299 .u64_histogram(HTTP_SERVER_REQUEST_BODY_SIZE_METRIC)
300 .with_description("Size of HTTP server request bodies.")
301 .with_unit(HTTP_SERVER_REQUEST_BODY_SIZE_UNIT)
302 .build(),
303 server_response_body_size: meter
304 .u64_histogram(HTTP_SERVER_RESPONSE_BODY_SIZE_METRIC)
305 .with_description("Size of HTTP server response bodies.")
306 .with_unit(HTTP_SERVER_RESPONSE_BODY_SIZE_UNIT)
307 .build(),
308 }
309 }
310}
311
312impl<S, ReqExt, ResExt> Layer<S> for HTTPMetricsLayer<ReqExt, ResExt>
313where
314 ReqExt: Clone,
315 ResExt: Clone,
316{
317 type Service = HTTPMetricsService<S, ReqExt, ResExt>;
318
319 fn layer(&self, service: S) -> Self::Service {
320 HTTPMetricsService {
321 state: self.state.clone(),
322 request_extractor: self.request_extractor.clone(),
323 response_extractor: self.response_extractor.clone(),
324 inner_service: service,
325 }
326 }
327}
328
329#[derive(Clone)]
336struct ResponseFutureMetricsState {
337 duration_start: Instant,
340 req_body_size: Option<u64>,
342
343 protocol_name_kv: KeyValue,
345 protocol_version_kv: KeyValue,
346 url_scheme_kv: KeyValue,
347 method_kv: KeyValue,
348 route_kv_opt: Option<KeyValue>,
349
350 custom_request_attributes: Vec<KeyValue>,
352}
353
354pin_project! {
355 pub struct HTTPMetricsResponseFuture<F, ResExt> {
357 #[pin]
358 inner_response_future: F,
359 layer_state: Arc<HTTPMetricsLayerState>,
360 metrics_state: ResponseFutureMetricsState,
361 response_extractor: ResExt,
362 }
363}
364
365impl<S, ReqBody, ResBody, ReqExt, ResExt> Service<http::Request<ReqBody>>
366 for HTTPMetricsService<S, ReqExt, ResExt>
367where
368 S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
369 ResBody: http_body::Body,
370 ReqExt: RequestAttributeExtractor<ReqBody>,
371 ResExt: ResponseAttributeExtractor<ResBody>,
372{
373 type Response = S::Response;
374 type Error = S::Error;
375 type Future = HTTPMetricsResponseFuture<S::Future, ResExt>;
376
377 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<result::Result<(), Self::Error>> {
378 self.inner_service.poll_ready(cx)
379 }
380
381 fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
382 let duration_start = Instant::now();
383
384 let headers = req.headers();
385 let content_length = headers
386 .get(http::header::CONTENT_LENGTH)
387 .and_then(|value| value.to_str().ok()?.parse::<u64>().ok());
388
389 let (protocol, version) = split_and_format_protocol_version(req.version());
390 let protocol_name_kv = KeyValue::new(NETWORK_PROTOCOL_NAME_LABEL, protocol);
391 let protocol_version_kv = KeyValue::new(NETWORK_PROTOCOL_VERSION_LABEL, version);
392
393 let scheme = req.uri().scheme_str().unwrap_or("").to_string();
394 let url_scheme_kv = KeyValue::new(URL_SCHEME_LABEL, scheme);
395
396 let method = req.method().as_str().to_owned();
397 let method_kv = KeyValue::new(HTTP_REQUEST_METHOD_LABEL, method);
398
399 #[allow(unused_mut)]
400 let mut route_kv_opt = None;
401 #[cfg(feature = "axum")]
402 if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
403 route_kv_opt = Some(KeyValue::new(
404 HTTP_ROUTE_LABEL,
405 matched_path.as_str().to_owned(),
406 ));
407 };
408
409 let custom_request_attributes = self.request_extractor.extract_attributes(&req);
411
412 self.state
413 .server_active_requests
414 .add(1, &[url_scheme_kv.clone(), method_kv.clone()]);
415
416 HTTPMetricsResponseFuture {
417 inner_response_future: self.inner_service.call(req),
418 layer_state: self.state.clone(),
419 metrics_state: ResponseFutureMetricsState {
420 duration_start,
421 req_body_size: content_length,
422
423 protocol_name_kv,
424 protocol_version_kv,
425 url_scheme_kv,
426 method_kv,
427 route_kv_opt,
428 custom_request_attributes,
429 },
430 response_extractor: self.response_extractor.clone(),
431 }
432 }
433}
434
435impl<F, ResBody, E, ResExt> Future for HTTPMetricsResponseFuture<F, ResExt>
436where
437 F: Future<Output = result::Result<http::Response<ResBody>, E>>,
438 ResBody: http_body::Body,
439 ResExt: ResponseAttributeExtractor<ResBody>,
440{
441 type Output = F::Output;
442
443 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
444 let this = self.project();
445 let response = ready!(this.inner_response_future.poll(cx))?;
446 let status = response.status();
447
448 let mut label_superset = vec![
450 this.metrics_state.protocol_name_kv.clone(),
451 this.metrics_state.protocol_version_kv.clone(),
452 this.metrics_state.url_scheme_kv.clone(),
453 this.metrics_state.method_kv.clone(),
454 KeyValue::new(HTTP_RESPONSE_STATUS_CODE_LABEL, i64::from(status.as_u16())),
455 ];
456
457 if let Some(route_kv) = this.metrics_state.route_kv_opt.clone() {
458 label_superset.push(route_kv);
459 }
460
461 label_superset.extend(this.metrics_state.custom_request_attributes.clone());
463
464 let custom_response_attributes = this.response_extractor.extract_attributes(&response);
466 label_superset.extend(custom_response_attributes);
467
468 this.layer_state.server_request_duration.record(
469 this.metrics_state.duration_start.elapsed().as_secs_f64(),
470 &label_superset,
471 );
472
473 if let Some(req_content_length) = this.metrics_state.req_body_size {
474 this.layer_state
475 .server_request_body_size
476 .record(req_content_length, &label_superset);
477 }
478
479 if let Some(resp_content_length) = response.body().size_hint().exact() {
481 this.layer_state
482 .server_response_body_size
483 .record(resp_content_length, &label_superset);
484 }
485
486 this.layer_state.server_active_requests.add(
487 -1,
488 &[
489 this.metrics_state.url_scheme_kv.clone(),
490 this.metrics_state.method_kv.clone(),
491 ],
492 );
493
494 Ready(Ok(response))
495 }
496}
497
498fn split_and_format_protocol_version(http_version: http::Version) -> (String, String) {
499 let version_str = match http_version {
500 http::Version::HTTP_09 => "0.9",
501 http::Version::HTTP_10 => "1.0",
502 http::Version::HTTP_11 => "1.1",
503 http::Version::HTTP_2 => "2.0",
504 http::Version::HTTP_3 => "3.0",
505 _ => "",
506 };
507 (String::from("http"), String::from(version_str))
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use http::{Request, Response, StatusCode};
514 use opentelemetry::metrics::MeterProvider;
515 use opentelemetry_sdk::metrics::{
516 data::{AggregatedMetrics, MetricData},
517 InMemoryMetricExporter, PeriodicReader, SdkMeterProvider,
518 };
519 use std::time::Duration;
520 use tower::Service;
521
522 #[tokio::test]
523 async fn test_metrics_labels() {
524 let exporter = InMemoryMetricExporter::default();
525 let reader = PeriodicReader::builder(exporter.clone())
526 .with_interval(Duration::from_millis(100))
527 .build();
528 let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
529 let meter = meter_provider.meter("test");
530
531 let layer = HTTPMetricsLayerBuilder::builder()
532 .with_meter(meter)
533 .build()
534 .unwrap();
535
536 let service = tower::service_fn(|_req: Request<String>| async {
537 Ok::<_, std::convert::Infallible>(
538 Response::builder()
539 .status(StatusCode::OK)
540 .body(String::from("Hello, World!"))
541 .unwrap(),
542 )
543 });
544
545 let mut service = layer.layer(service);
546
547 let request = Request::builder()
548 .method("GET")
549 .uri("https://example.com/test")
550 .body("test body".to_string())
551 .unwrap();
552
553 let _response = service.call(request).await.unwrap();
554
555 tokio::time::sleep(Duration::from_millis(500)).await;
556
557 let metrics = exporter.get_finished_metrics().unwrap();
558 assert!(!metrics.is_empty());
559
560 let resource_metrics = &metrics[0];
561 let scope_metrics = resource_metrics
562 .scope_metrics()
563 .next()
564 .expect("Should have scope metrics");
565
566 let duration_metric = scope_metrics
567 .metrics()
568 .find(|m| m.name() == HTTP_SERVER_DURATION_METRIC)
569 .expect("Duration metric should exist");
570
571 if let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = duration_metric.data() {
572 let data_point = histogram
573 .data_points()
574 .next()
575 .expect("Should have data point");
576 let attributes: Vec<_> = data_point.attributes().collect();
577
578 assert_eq!(
580 attributes.len(),
581 5,
582 "Duration metric should have exactly 5 attributes"
583 );
584
585 let protocol_name = attributes
586 .iter()
587 .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_NAME_LABEL)
588 .expect("Protocol name should be present");
589 assert_eq!(protocol_name.value.as_str(), "http");
590
591 let protocol_version = attributes
592 .iter()
593 .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_VERSION_LABEL)
594 .expect("Protocol version should be present");
595 assert_eq!(protocol_version.value.as_str(), "1.1");
596
597 let url_scheme = attributes
598 .iter()
599 .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
600 .expect("URL scheme should be present");
601 assert_eq!(url_scheme.value.as_str(), "https");
602
603 let method = attributes
604 .iter()
605 .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
606 .expect("HTTP method should be present");
607 assert_eq!(method.value.as_str(), "GET");
608
609 let status_code = attributes
610 .iter()
611 .find(|kv| kv.key.as_str() == HTTP_RESPONSE_STATUS_CODE_LABEL)
612 .expect("Status code should be present");
613 if let opentelemetry::Value::I64(code) = &status_code.value {
614 assert_eq!(*code, 200);
615 } else {
616 panic!("Expected i64 status code");
617 }
618 } else {
619 panic!("Expected histogram data for duration metric");
620 }
621
622 let request_body_size_metric = scope_metrics
623 .metrics()
624 .find(|m| m.name() == HTTP_SERVER_REQUEST_BODY_SIZE_METRIC);
625
626 if let Some(metric) = request_body_size_metric {
627 if let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = metric.data() {
628 let data_point = histogram
629 .data_points()
630 .next()
631 .expect("Should have data point");
632 let attributes: Vec<_> = data_point.attributes().collect();
633
634 assert_eq!(
636 attributes.len(),
637 5,
638 "Request body size metric should have exactly 5 attributes"
639 );
640
641 let protocol_name = attributes
642 .iter()
643 .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_NAME_LABEL)
644 .expect("Protocol name should be present in request body size");
645 assert_eq!(protocol_name.value.as_str(), "http");
646
647 let protocol_version = attributes
648 .iter()
649 .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_VERSION_LABEL)
650 .expect("Protocol version should be present in request body size");
651 assert_eq!(protocol_version.value.as_str(), "1.1");
652
653 let url_scheme = attributes
654 .iter()
655 .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
656 .expect("URL scheme should be present in request body size");
657 assert_eq!(url_scheme.value.as_str(), "https");
658
659 let method = attributes
660 .iter()
661 .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
662 .expect("HTTP method should be present in request body size");
663 assert_eq!(method.value.as_str(), "GET");
664
665 let status_code = attributes
666 .iter()
667 .find(|kv| kv.key.as_str() == HTTP_RESPONSE_STATUS_CODE_LABEL)
668 .expect("Status code should be present in request body size");
669 if let opentelemetry::Value::I64(code) = &status_code.value {
670 assert_eq!(*code, 200);
671 } else {
672 panic!("Expected i64 status code");
673 }
674 }
675 }
676
677 let response_body_size_metric = scope_metrics
679 .metrics()
680 .find(|m| m.name() == HTTP_SERVER_RESPONSE_BODY_SIZE_METRIC);
681
682 if let Some(metric) = response_body_size_metric {
683 if let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = metric.data() {
684 let data_point = histogram
685 .data_points()
686 .next()
687 .expect("Should have data point");
688 let attributes: Vec<_> = data_point.attributes().collect();
689
690 assert_eq!(
692 attributes.len(),
693 5,
694 "Response body size metric should have exactly 5 attributes"
695 );
696
697 let protocol_name = attributes
698 .iter()
699 .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_NAME_LABEL)
700 .expect("Protocol name should be present in response body size");
701 assert_eq!(protocol_name.value.as_str(), "http");
702
703 let protocol_version = attributes
704 .iter()
705 .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_VERSION_LABEL)
706 .expect("Protocol version should be present in response body size");
707 assert_eq!(protocol_version.value.as_str(), "1.1");
708
709 let url_scheme = attributes
710 .iter()
711 .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
712 .expect("URL scheme should be present in response body size");
713 assert_eq!(url_scheme.value.as_str(), "https");
714
715 let method = attributes
716 .iter()
717 .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
718 .expect("HTTP method should be present in response body size");
719 assert_eq!(method.value.as_str(), "GET");
720
721 let status_code = attributes
722 .iter()
723 .find(|kv| kv.key.as_str() == HTTP_RESPONSE_STATUS_CODE_LABEL)
724 .expect("Status code should be present in response body size");
725 if let opentelemetry::Value::I64(code) = &status_code.value {
726 assert_eq!(*code, 200);
727 } else {
728 panic!("Expected i64 status code");
729 }
730 }
731 }
732
733 let active_requests_metric = scope_metrics
735 .metrics()
736 .find(|m| m.name() == HTTP_SERVER_ACTIVE_REQUESTS_METRIC);
737
738 if let Some(metric) = active_requests_metric {
739 if let AggregatedMetrics::I64(MetricData::Sum(sum)) = metric.data() {
740 let data_point = sum.data_points().next().expect("Should have data point");
741 let attributes: Vec<_> = data_point.attributes().collect();
742
743 assert_eq!(
745 attributes.len(),
746 2,
747 "Active requests metric should have exactly 2 attributes"
748 );
749
750 let method = attributes
751 .iter()
752 .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
753 .expect("HTTP method should be present in active requests");
754 assert_eq!(method.value.as_str(), "GET");
755
756 let url_scheme = attributes
757 .iter()
758 .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
759 .expect("URL scheme should be present in active requests");
760 assert_eq!(url_scheme.value.as_str(), "https");
761 }
762 }
763 }
764}