opentelemetry_instrumentation_tower/
lib.rs

1use std::borrow::Cow;
2use std::future::Future;
3use std::pin::Pin;
4use std::string::String;
5use std::sync::Arc;
6use std::task::Poll::Ready;
7use std::task::{Context, Poll};
8use std::time::Instant;
9use std::{fmt, result};
10
11#[cfg(feature = "axum")]
12use axum::extract::MatchedPath;
13use futures_util::ready;
14use opentelemetry::metrics::{Histogram, Meter, UpDownCounter};
15use opentelemetry::KeyValue;
16use opentelemetry_semantic_conventions as semconv;
17use pin_project_lite::pin_project;
18use tower_layer::Layer;
19use tower_service::Service;
20
21const HTTP_SERVER_DURATION_METRIC: &str = semconv::metric::HTTP_SERVER_REQUEST_DURATION;
22const HTTP_SERVER_DURATION_UNIT: &str = "s";
23
24const _OTEL_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES: [f64; 14] = [
25    0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, 2.5, 5.0, 7.5, 10.0,
26];
27
28// OTEL default does not capture duration over 10s - a poor choice for an arbitrary http server;
29// we want to capture longer requests with some rough granularity on the upper end.
30// These choices are adapted from various recommendations in
31// https://github.com/open-telemetry/semantic-conventions/issues/336.
32const LIBRARY_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES: [f64; 14] = [
33    0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0,
34];
35const HTTP_SERVER_ACTIVE_REQUESTS_METRIC: &str = semconv::metric::HTTP_SERVER_ACTIVE_REQUESTS;
36const HTTP_SERVER_ACTIVE_REQUESTS_UNIT: &str = "{request}";
37
38const HTTP_SERVER_REQUEST_BODY_SIZE_METRIC: &str = semconv::metric::HTTP_SERVER_REQUEST_BODY_SIZE;
39const HTTP_SERVER_REQUEST_BODY_SIZE_UNIT: &str = "By";
40
41const HTTP_SERVER_RESPONSE_BODY_SIZE_METRIC: &str = semconv::metric::HTTP_SERVER_RESPONSE_BODY_SIZE;
42const HTTP_SERVER_RESPONSE_BODY_SIZE_UNIT: &str = "By";
43
44const NETWORK_PROTOCOL_NAME_LABEL: &str = semconv::attribute::NETWORK_PROTOCOL_NAME;
45const NETWORK_PROTOCOL_VERSION_LABEL: &str = "network.protocol.version";
46const URL_SCHEME_LABEL: &str = "url.scheme";
47
48const HTTP_REQUEST_METHOD_LABEL: &str = semconv::attribute::HTTP_REQUEST_METHOD;
49#[cfg(feature = "axum")]
50const HTTP_ROUTE_LABEL: &str = semconv::attribute::HTTP_ROUTE;
51const HTTP_RESPONSE_STATUS_CODE_LABEL: &str = semconv::attribute::HTTP_RESPONSE_STATUS_CODE;
52
53/// Trait for extracting custom attributes from HTTP requests
54pub trait RequestAttributeExtractor<B>: Clone + Send + Sync + 'static {
55    fn extract_attributes(&self, req: &http::Request<B>) -> Vec<KeyValue>;
56}
57
58/// Trait for extracting custom attributes from HTTP responses
59pub trait ResponseAttributeExtractor<B>: Clone + Send + Sync + 'static {
60    fn extract_attributes(&self, res: &http::Response<B>) -> Vec<KeyValue>;
61}
62
63/// Default implementation that extracts no attributes
64#[derive(Clone)]
65pub struct NoOpExtractor;
66
67impl<B> RequestAttributeExtractor<B> for NoOpExtractor {
68    fn extract_attributes(&self, _req: &http::Request<B>) -> Vec<KeyValue> {
69        vec![]
70    }
71}
72
73impl<B> ResponseAttributeExtractor<B> for NoOpExtractor {
74    fn extract_attributes(&self, _res: &http::Response<B>) -> Vec<KeyValue> {
75        vec![]
76    }
77}
78
79/// A function-based request attribute extractor
80#[derive(Clone)]
81pub struct FnRequestExtractor<F> {
82    extractor: F,
83}
84
85impl<F> FnRequestExtractor<F> {
86    pub fn new(extractor: F) -> Self {
87        Self { extractor }
88    }
89}
90
91impl<F, B> RequestAttributeExtractor<B> for FnRequestExtractor<F>
92where
93    F: Fn(&http::Request<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
94{
95    fn extract_attributes(&self, req: &http::Request<B>) -> Vec<KeyValue> {
96        (self.extractor)(req)
97    }
98}
99
100/// A function-based response attribute extractor
101#[derive(Clone)]
102pub struct FnResponseExtractor<F> {
103    extractor: F,
104}
105
106impl<F> FnResponseExtractor<F> {
107    pub fn new(extractor: F) -> Self {
108        Self { extractor }
109    }
110}
111
112impl<F, B> ResponseAttributeExtractor<B> for FnResponseExtractor<F>
113where
114    F: Fn(&http::Response<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
115{
116    fn extract_attributes(&self, res: &http::Response<B>) -> Vec<KeyValue> {
117        (self.extractor)(res)
118    }
119}
120
121/// State scoped to the entire middleware Layer.
122///
123/// For now the only global state we hold onto is the metrics instruments.
124/// The OTEL SDKs do support calling for the global meter provider instead of holding a reference
125/// but it seems ideal to avoid extra access to the global meter, which sits behind a RWLock.
126struct HTTPMetricsLayerState {
127    pub server_request_duration: Histogram<f64>,
128    pub server_active_requests: UpDownCounter<i64>,
129    pub server_request_body_size: Histogram<u64>,
130    pub server_response_body_size: Histogram<u64>,
131}
132
133#[derive(Clone)]
134/// [`Service`] used by [`HTTPMetricsLayer`]
135pub struct HTTPMetricsService<S, ReqExt = NoOpExtractor, ResExt = NoOpExtractor> {
136    pub(crate) state: Arc<HTTPMetricsLayerState>,
137    request_extractor: ReqExt,
138    response_extractor: ResExt,
139    inner_service: S,
140}
141
142#[derive(Clone)]
143/// [`Layer`] which applies the OTEL HTTP server metrics middleware
144pub struct HTTPMetricsLayer<ReqExt = NoOpExtractor, ResExt = NoOpExtractor> {
145    state: Arc<HTTPMetricsLayerState>,
146    request_extractor: ReqExt,
147    response_extractor: ResExt,
148}
149
150pub struct HTTPMetricsLayerBuilder<ReqExt = NoOpExtractor, ResExt = NoOpExtractor> {
151    meter: Option<Meter>,
152    req_dur_bounds: Option<Vec<f64>>,
153    request_extractor: ReqExt,
154    response_extractor: ResExt,
155}
156
157/// Error typedef to implement `std::error::Error` for `opentelemetry_instrumentation_tower`
158pub struct Error {
159    #[allow(dead_code)]
160    inner: ErrorKind,
161}
162
163impl std::fmt::Display for Error {
164    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
165        match self.inner {
166            ErrorKind::Other(ref s) => write!(f, "{s}"),
167            ErrorKind::Config(ref s) => write!(f, "config error: {s}"),
168        }
169    }
170}
171
172impl std::error::Error for Error {}
173
174/// `Result` typedef to use with the `opentelemetry_instrumentation_tower::Error` type
175pub type Result<T> = result::Result<T, Error>;
176
177enum ErrorKind {
178    #[allow(dead_code)]
179    /// Uncategorized
180    Other(String),
181    #[allow(dead_code)]
182    /// Invalid configuration
183    Config(String),
184}
185
186impl fmt::Debug for Error {
187    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
188        f.debug_tuple("opentelemetry_instrumentation_tower::Error")
189            .finish()
190    }
191}
192
193impl HTTPMetricsLayerBuilder {
194    pub fn builder() -> Self {
195        HTTPMetricsLayerBuilder {
196            meter: None,
197            req_dur_bounds: Some(LIBRARY_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES.to_vec()),
198            request_extractor: NoOpExtractor,
199            response_extractor: NoOpExtractor,
200        }
201    }
202}
203
204impl<ReqExt, ResExt> HTTPMetricsLayerBuilder<ReqExt, ResExt> {
205    /// Set a request attribute extractor
206    pub fn with_request_extractor<NewReqExt, B>(
207        self,
208        extractor: NewReqExt,
209    ) -> HTTPMetricsLayerBuilder<NewReqExt, ResExt>
210    where
211        NewReqExt: RequestAttributeExtractor<B>,
212    {
213        HTTPMetricsLayerBuilder {
214            meter: self.meter,
215            req_dur_bounds: self.req_dur_bounds,
216            request_extractor: extractor,
217            response_extractor: self.response_extractor,
218        }
219    }
220
221    /// Set a response attribute extractor
222    pub fn with_response_extractor<NewResExt, B>(
223        self,
224        extractor: NewResExt,
225    ) -> HTTPMetricsLayerBuilder<ReqExt, NewResExt>
226    where
227        NewResExt: ResponseAttributeExtractor<B>,
228    {
229        HTTPMetricsLayerBuilder {
230            meter: self.meter,
231            req_dur_bounds: self.req_dur_bounds,
232            request_extractor: self.request_extractor,
233            response_extractor: extractor,
234        }
235    }
236
237    /// Convenience method to set a function-based request extractor
238    pub fn with_request_extractor_fn<F, B>(
239        self,
240        f: F,
241    ) -> HTTPMetricsLayerBuilder<FnRequestExtractor<F>, ResExt>
242    where
243        F: Fn(&http::Request<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
244    {
245        self.with_request_extractor(FnRequestExtractor::new(f))
246    }
247
248    /// Convenience method to set a function-based response extractor
249    pub fn with_response_extractor_fn<F, B>(
250        self,
251        f: F,
252    ) -> HTTPMetricsLayerBuilder<ReqExt, FnResponseExtractor<F>>
253    where
254        F: Fn(&http::Response<B>) -> Vec<KeyValue> + Clone + Send + Sync + 'static,
255    {
256        self.with_response_extractor(FnResponseExtractor::new(f))
257    }
258
259    pub fn build(self) -> Result<HTTPMetricsLayer<ReqExt, ResExt>> {
260        let req_dur_bounds = self
261            .req_dur_bounds
262            .unwrap_or_else(|| LIBRARY_DEFAULT_HTTP_SERVER_DURATION_BOUNDARIES.to_vec());
263        match self.meter {
264            Some(meter) => Ok(HTTPMetricsLayer {
265                state: Arc::from(Self::make_state(meter, req_dur_bounds)),
266                request_extractor: self.request_extractor,
267                response_extractor: self.response_extractor,
268            }),
269            None => Err(Error {
270                inner: ErrorKind::Config(String::from("no meter provided")),
271            }),
272        }
273    }
274
275    pub fn with_meter(mut self, meter: Meter) -> Self {
276        self.meter = Some(meter);
277        self
278    }
279
280    pub fn with_request_duration_bounds(mut self, bounds: Vec<f64>) -> Self {
281        self.req_dur_bounds = Some(bounds);
282        self
283    }
284
285    fn make_state(meter: Meter, req_dur_bounds: Vec<f64>) -> HTTPMetricsLayerState {
286        HTTPMetricsLayerState {
287            server_request_duration: meter
288                .f64_histogram(Cow::from(HTTP_SERVER_DURATION_METRIC))
289                .with_description("Duration of HTTP server requests.")
290                .with_unit(Cow::from(HTTP_SERVER_DURATION_UNIT))
291                .with_boundaries(req_dur_bounds)
292                .build(),
293            server_active_requests: meter
294                .i64_up_down_counter(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_METRIC))
295                .with_description("Number of active HTTP server requests.")
296                .with_unit(Cow::from(HTTP_SERVER_ACTIVE_REQUESTS_UNIT))
297                .build(),
298            server_request_body_size: meter
299                .u64_histogram(HTTP_SERVER_REQUEST_BODY_SIZE_METRIC)
300                .with_description("Size of HTTP server request bodies.")
301                .with_unit(HTTP_SERVER_REQUEST_BODY_SIZE_UNIT)
302                .build(),
303            server_response_body_size: meter
304                .u64_histogram(HTTP_SERVER_RESPONSE_BODY_SIZE_METRIC)
305                .with_description("Size of HTTP server response bodies.")
306                .with_unit(HTTP_SERVER_RESPONSE_BODY_SIZE_UNIT)
307                .build(),
308        }
309    }
310}
311
312impl<S, ReqExt, ResExt> Layer<S> for HTTPMetricsLayer<ReqExt, ResExt>
313where
314    ReqExt: Clone,
315    ResExt: Clone,
316{
317    type Service = HTTPMetricsService<S, ReqExt, ResExt>;
318
319    fn layer(&self, service: S) -> Self::Service {
320        HTTPMetricsService {
321            state: self.state.clone(),
322            request_extractor: self.request_extractor.clone(),
323            response_extractor: self.response_extractor.clone(),
324            inner_service: service,
325        }
326    }
327}
328
329/// ResponseFutureMetricsState holds request-scoped data for metrics and their attributes.
330///
331/// ResponseFutureMetricsState lives inside the response future, as it needs to hold data
332/// initialized or extracted from the request before it is forwarded to the inner Service.
333/// The rest of the data (e.g. status code, error) can be extracted from the response
334/// or calculated with respect to the data held here (e.g., duration = now - duration start).
335#[derive(Clone)]
336struct ResponseFutureMetricsState {
337    // fields for the metric values
338    // https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverrequestduration
339    duration_start: Instant,
340    // https://opentelemetry.io/docs/specs/semconv/http/http-metrics/#metric-httpserverrequestbodysize
341    req_body_size: Option<u64>,
342
343    // fields for metric labels
344    protocol_name_kv: KeyValue,
345    protocol_version_kv: KeyValue,
346    url_scheme_kv: KeyValue,
347    method_kv: KeyValue,
348    route_kv_opt: Option<KeyValue>,
349
350    // Custom attributes from request
351    custom_request_attributes: Vec<KeyValue>,
352}
353
354pin_project! {
355    /// Response [`Future`] for [`HTTPMetricsService`].
356    pub struct HTTPMetricsResponseFuture<F, ResExt> {
357        #[pin]
358        inner_response_future: F,
359        layer_state: Arc<HTTPMetricsLayerState>,
360        metrics_state: ResponseFutureMetricsState,
361        response_extractor: ResExt,
362    }
363}
364
365impl<S, ReqBody, ResBody, ReqExt, ResExt> Service<http::Request<ReqBody>>
366    for HTTPMetricsService<S, ReqExt, ResExt>
367where
368    S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
369    ResBody: http_body::Body,
370    ReqExt: RequestAttributeExtractor<ReqBody>,
371    ResExt: ResponseAttributeExtractor<ResBody>,
372{
373    type Response = S::Response;
374    type Error = S::Error;
375    type Future = HTTPMetricsResponseFuture<S::Future, ResExt>;
376
377    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<result::Result<(), Self::Error>> {
378        self.inner_service.poll_ready(cx)
379    }
380
381    fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
382        let duration_start = Instant::now();
383
384        let headers = req.headers();
385        let content_length = headers
386            .get(http::header::CONTENT_LENGTH)
387            .and_then(|value| value.to_str().ok()?.parse::<u64>().ok());
388
389        let (protocol, version) = split_and_format_protocol_version(req.version());
390        let protocol_name_kv = KeyValue::new(NETWORK_PROTOCOL_NAME_LABEL, protocol);
391        let protocol_version_kv = KeyValue::new(NETWORK_PROTOCOL_VERSION_LABEL, version);
392
393        let scheme = req.uri().scheme_str().unwrap_or("").to_string();
394        let url_scheme_kv = KeyValue::new(URL_SCHEME_LABEL, scheme);
395
396        let method = req.method().as_str().to_owned();
397        let method_kv = KeyValue::new(HTTP_REQUEST_METHOD_LABEL, method);
398
399        #[allow(unused_mut)]
400        let mut route_kv_opt = None;
401        #[cfg(feature = "axum")]
402        if let Some(matched_path) = req.extensions().get::<MatchedPath>() {
403            route_kv_opt = Some(KeyValue::new(
404                HTTP_ROUTE_LABEL,
405                matched_path.as_str().to_owned(),
406            ));
407        };
408
409        // Extract custom request attributes
410        let custom_request_attributes = self.request_extractor.extract_attributes(&req);
411
412        self.state
413            .server_active_requests
414            .add(1, &[url_scheme_kv.clone(), method_kv.clone()]);
415
416        HTTPMetricsResponseFuture {
417            inner_response_future: self.inner_service.call(req),
418            layer_state: self.state.clone(),
419            metrics_state: ResponseFutureMetricsState {
420                duration_start,
421                req_body_size: content_length,
422
423                protocol_name_kv,
424                protocol_version_kv,
425                url_scheme_kv,
426                method_kv,
427                route_kv_opt,
428                custom_request_attributes,
429            },
430            response_extractor: self.response_extractor.clone(),
431        }
432    }
433}
434
435impl<F, ResBody, E, ResExt> Future for HTTPMetricsResponseFuture<F, ResExt>
436where
437    F: Future<Output = result::Result<http::Response<ResBody>, E>>,
438    ResBody: http_body::Body,
439    ResExt: ResponseAttributeExtractor<ResBody>,
440{
441    type Output = F::Output;
442
443    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
444        let this = self.project();
445        let response = ready!(this.inner_response_future.poll(cx))?;
446        let status = response.status();
447
448        // Build base label set
449        let mut label_superset = vec![
450            this.metrics_state.protocol_name_kv.clone(),
451            this.metrics_state.protocol_version_kv.clone(),
452            this.metrics_state.url_scheme_kv.clone(),
453            this.metrics_state.method_kv.clone(),
454            KeyValue::new(HTTP_RESPONSE_STATUS_CODE_LABEL, i64::from(status.as_u16())),
455        ];
456
457        if let Some(route_kv) = this.metrics_state.route_kv_opt.clone() {
458            label_superset.push(route_kv);
459        }
460
461        // Add custom request attributes
462        label_superset.extend(this.metrics_state.custom_request_attributes.clone());
463
464        // Extract and add custom response attributes
465        let custom_response_attributes = this.response_extractor.extract_attributes(&response);
466        label_superset.extend(custom_response_attributes);
467
468        this.layer_state.server_request_duration.record(
469            this.metrics_state.duration_start.elapsed().as_secs_f64(),
470            &label_superset,
471        );
472
473        if let Some(req_content_length) = this.metrics_state.req_body_size {
474            this.layer_state
475                .server_request_body_size
476                .record(req_content_length, &label_superset);
477        }
478
479        // use same approach for `http.server.response.body.size` as hyper does to set content-length
480        if let Some(resp_content_length) = response.body().size_hint().exact() {
481            this.layer_state
482                .server_response_body_size
483                .record(resp_content_length, &label_superset);
484        }
485
486        this.layer_state.server_active_requests.add(
487            -1,
488            &[
489                this.metrics_state.url_scheme_kv.clone(),
490                this.metrics_state.method_kv.clone(),
491            ],
492        );
493
494        Ready(Ok(response))
495    }
496}
497
498fn split_and_format_protocol_version(http_version: http::Version) -> (String, String) {
499    let version_str = match http_version {
500        http::Version::HTTP_09 => "0.9",
501        http::Version::HTTP_10 => "1.0",
502        http::Version::HTTP_11 => "1.1",
503        http::Version::HTTP_2 => "2.0",
504        http::Version::HTTP_3 => "3.0",
505        _ => "",
506    };
507    (String::from("http"), String::from(version_str))
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use http::{Request, Response, StatusCode};
514    use opentelemetry::metrics::MeterProvider;
515    use opentelemetry_sdk::metrics::{
516        data::{AggregatedMetrics, MetricData},
517        InMemoryMetricExporter, PeriodicReader, SdkMeterProvider,
518    };
519    use std::time::Duration;
520    use tower::Service;
521
522    #[tokio::test]
523    async fn test_metrics_labels() {
524        let exporter = InMemoryMetricExporter::default();
525        let reader = PeriodicReader::builder(exporter.clone())
526            .with_interval(Duration::from_millis(100))
527            .build();
528        let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
529        let meter = meter_provider.meter("test");
530
531        let layer = HTTPMetricsLayerBuilder::builder()
532            .with_meter(meter)
533            .build()
534            .unwrap();
535
536        let service = tower::service_fn(|_req: Request<String>| async {
537            Ok::<_, std::convert::Infallible>(
538                Response::builder()
539                    .status(StatusCode::OK)
540                    .body(String::from("Hello, World!"))
541                    .unwrap(),
542            )
543        });
544
545        let mut service = layer.layer(service);
546
547        let request = Request::builder()
548            .method("GET")
549            .uri("https://example.com/test")
550            .body("test body".to_string())
551            .unwrap();
552
553        let _response = service.call(request).await.unwrap();
554
555        tokio::time::sleep(Duration::from_millis(500)).await;
556
557        let metrics = exporter.get_finished_metrics().unwrap();
558        assert!(!metrics.is_empty());
559
560        let resource_metrics = &metrics[0];
561        let scope_metrics = resource_metrics
562            .scope_metrics()
563            .next()
564            .expect("Should have scope metrics");
565
566        let duration_metric = scope_metrics
567            .metrics()
568            .find(|m| m.name() == HTTP_SERVER_DURATION_METRIC)
569            .expect("Duration metric should exist");
570
571        if let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = duration_metric.data() {
572            let data_point = histogram
573                .data_points()
574                .next()
575                .expect("Should have data point");
576            let attributes: Vec<_> = data_point.attributes().collect();
577
578            // Duration metric should have 5 attributes: protocol_name, protocol_version, url_scheme, method, status_code
579            assert_eq!(
580                attributes.len(),
581                5,
582                "Duration metric should have exactly 5 attributes"
583            );
584
585            let protocol_name = attributes
586                .iter()
587                .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_NAME_LABEL)
588                .expect("Protocol name should be present");
589            assert_eq!(protocol_name.value.as_str(), "http");
590
591            let protocol_version = attributes
592                .iter()
593                .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_VERSION_LABEL)
594                .expect("Protocol version should be present");
595            assert_eq!(protocol_version.value.as_str(), "1.1");
596
597            let url_scheme = attributes
598                .iter()
599                .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
600                .expect("URL scheme should be present");
601            assert_eq!(url_scheme.value.as_str(), "https");
602
603            let method = attributes
604                .iter()
605                .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
606                .expect("HTTP method should be present");
607            assert_eq!(method.value.as_str(), "GET");
608
609            let status_code = attributes
610                .iter()
611                .find(|kv| kv.key.as_str() == HTTP_RESPONSE_STATUS_CODE_LABEL)
612                .expect("Status code should be present");
613            if let opentelemetry::Value::I64(code) = &status_code.value {
614                assert_eq!(*code, 200);
615            } else {
616                panic!("Expected i64 status code");
617            }
618        } else {
619            panic!("Expected histogram data for duration metric");
620        }
621
622        let request_body_size_metric = scope_metrics
623            .metrics()
624            .find(|m| m.name() == HTTP_SERVER_REQUEST_BODY_SIZE_METRIC);
625
626        if let Some(metric) = request_body_size_metric {
627            if let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = metric.data() {
628                let data_point = histogram
629                    .data_points()
630                    .next()
631                    .expect("Should have data point");
632                let attributes: Vec<_> = data_point.attributes().collect();
633
634                // Request body size metric should have 5 attributes: protocol_name, protocol_version, url_scheme, method, status_code
635                assert_eq!(
636                    attributes.len(),
637                    5,
638                    "Request body size metric should have exactly 5 attributes"
639                );
640
641                let protocol_name = attributes
642                    .iter()
643                    .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_NAME_LABEL)
644                    .expect("Protocol name should be present in request body size");
645                assert_eq!(protocol_name.value.as_str(), "http");
646
647                let protocol_version = attributes
648                    .iter()
649                    .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_VERSION_LABEL)
650                    .expect("Protocol version should be present in request body size");
651                assert_eq!(protocol_version.value.as_str(), "1.1");
652
653                let url_scheme = attributes
654                    .iter()
655                    .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
656                    .expect("URL scheme should be present in request body size");
657                assert_eq!(url_scheme.value.as_str(), "https");
658
659                let method = attributes
660                    .iter()
661                    .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
662                    .expect("HTTP method should be present in request body size");
663                assert_eq!(method.value.as_str(), "GET");
664
665                let status_code = attributes
666                    .iter()
667                    .find(|kv| kv.key.as_str() == HTTP_RESPONSE_STATUS_CODE_LABEL)
668                    .expect("Status code should be present in request body size");
669                if let opentelemetry::Value::I64(code) = &status_code.value {
670                    assert_eq!(*code, 200);
671                } else {
672                    panic!("Expected i64 status code");
673                }
674            }
675        }
676
677        // Test response body size metric
678        let response_body_size_metric = scope_metrics
679            .metrics()
680            .find(|m| m.name() == HTTP_SERVER_RESPONSE_BODY_SIZE_METRIC);
681
682        if let Some(metric) = response_body_size_metric {
683            if let AggregatedMetrics::F64(MetricData::Histogram(histogram)) = metric.data() {
684                let data_point = histogram
685                    .data_points()
686                    .next()
687                    .expect("Should have data point");
688                let attributes: Vec<_> = data_point.attributes().collect();
689
690                // Response body size metric should have 5 attributes: protocol_name, protocol_version, url_scheme, method, status_code
691                assert_eq!(
692                    attributes.len(),
693                    5,
694                    "Response body size metric should have exactly 5 attributes"
695                );
696
697                let protocol_name = attributes
698                    .iter()
699                    .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_NAME_LABEL)
700                    .expect("Protocol name should be present in response body size");
701                assert_eq!(protocol_name.value.as_str(), "http");
702
703                let protocol_version = attributes
704                    .iter()
705                    .find(|kv| kv.key.as_str() == NETWORK_PROTOCOL_VERSION_LABEL)
706                    .expect("Protocol version should be present in response body size");
707                assert_eq!(protocol_version.value.as_str(), "1.1");
708
709                let url_scheme = attributes
710                    .iter()
711                    .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
712                    .expect("URL scheme should be present in response body size");
713                assert_eq!(url_scheme.value.as_str(), "https");
714
715                let method = attributes
716                    .iter()
717                    .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
718                    .expect("HTTP method should be present in response body size");
719                assert_eq!(method.value.as_str(), "GET");
720
721                let status_code = attributes
722                    .iter()
723                    .find(|kv| kv.key.as_str() == HTTP_RESPONSE_STATUS_CODE_LABEL)
724                    .expect("Status code should be present in response body size");
725                if let opentelemetry::Value::I64(code) = &status_code.value {
726                    assert_eq!(*code, 200);
727                } else {
728                    panic!("Expected i64 status code");
729                }
730            }
731        }
732
733        // Test active requests metric
734        let active_requests_metric = scope_metrics
735            .metrics()
736            .find(|m| m.name() == HTTP_SERVER_ACTIVE_REQUESTS_METRIC);
737
738        if let Some(metric) = active_requests_metric {
739            if let AggregatedMetrics::I64(MetricData::Sum(sum)) = metric.data() {
740                let data_point = sum.data_points().next().expect("Should have data point");
741                let attributes: Vec<_> = data_point.attributes().collect();
742
743                // Active requests metric should have 2 attributes: method, url_scheme
744                assert_eq!(
745                    attributes.len(),
746                    2,
747                    "Active requests metric should have exactly 2 attributes"
748                );
749
750                let method = attributes
751                    .iter()
752                    .find(|kv| kv.key.as_str() == HTTP_REQUEST_METHOD_LABEL)
753                    .expect("HTTP method should be present in active requests");
754                assert_eq!(method.value.as_str(), "GET");
755
756                let url_scheme = attributes
757                    .iter()
758                    .find(|kv| kv.key.as_str() == URL_SCHEME_LABEL)
759                    .expect("URL scheme should be present in active requests");
760                assert_eq!(url_scheme.value.as_str(), "https");
761            }
762        }
763    }
764}