rama_http/layer/
opentelemetry.rs

1//! Http OpenTelemetry [`Layer`] Support for Rama.
2//!
3//! [`Layer`]: rama_core::Layer
4
5use crate::{
6    IntoResponse, Request, Response,
7    headers::{HeaderMapExt, UserAgent},
8};
9use rama_core::telemetry::opentelemetry::{
10    AttributesFactory, InstrumentationScope, KeyValue, MeterOptions, ServiceInfo, global,
11    metrics::{Counter, Histogram, Meter},
12    semantic_conventions::{
13        self,
14        resource::{SERVICE_NAME, SERVICE_VERSION},
15    },
16};
17use rama_core::{Context, Layer, Service};
18use rama_net::http::RequestContext;
19use rama_utils::macros::define_inner_service_accessors;
20use std::{borrow::Cow, fmt, sync::Arc, time::SystemTime};
21
22// Follows the experimental semantic conventions for HTTP metrics:
23// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md
24
25use semantic_conventions::attribute::{
26    HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, NETWORK_PROTOCOL_VERSION, SERVER_PORT,
27    URL_SCHEME, USER_AGENT_ORIGINAL,
28};
29
30const HTTP_SERVER_DURATION: &str = "http.requests.duration";
31const HTTP_SERVER_TOTAL_REQUESTS: &str = "http.requests.total";
32const HTTP_SERVER_TOTAL_FAILURES: &str = "http.failures.total";
33const HTTP_SERVER_TOTAL_RESPONSES: &str = "http.responses.total";
34
35const HTTP_REQUEST_HOST: &str = "http.request.host";
36
37/// Records http server metrics
38///
39/// See the [spec] for details.
40///
41/// [spec]: https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-metrics.md#http-server
42#[derive(Clone, Debug)]
43struct Metrics {
44    http_server_duration: Histogram<f64>,
45    http_server_total_requests: Counter<u64>,
46    http_server_total_responses: Counter<u64>,
47    http_server_total_failures: Counter<u64>,
48}
49
50impl Metrics {
51    /// Create a new [`RequestMetrics`]
52    fn new(meter: Meter, prefix: Option<String>) -> Self {
53        let http_server_duration = meter
54            .f64_histogram(match &prefix {
55                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_DURATION}")),
56                None => Cow::Borrowed(HTTP_SERVER_DURATION),
57            })
58            .with_description("Measures the duration of inbound HTTP requests.")
59            .with_unit("s")
60            .build();
61
62        let http_server_total_requests = meter
63            .u64_counter(match &prefix {
64                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_REQUESTS}")),
65                None => Cow::Borrowed(HTTP_SERVER_TOTAL_REQUESTS),
66            })
67            .with_description("Measures the total number of HTTP requests have been seen.")
68            .build();
69
70        let http_server_total_responses = meter
71            .u64_counter(match &prefix {
72                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_RESPONSES}")),
73                None => Cow::Borrowed(HTTP_SERVER_TOTAL_RESPONSES),
74            })
75            .with_description("Measures the total number of HTTP responses have been seen.")
76            .build();
77
78        let http_server_total_failures = meter
79            .u64_counter(match &prefix {
80                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_FAILURES}")),
81                None => Cow::Borrowed(HTTP_SERVER_TOTAL_FAILURES),
82            })
83            .with_description(
84                "Measures the total number of failed HTTP requests that have been seen.",
85            )
86            .build();
87
88        Metrics {
89            http_server_total_requests,
90            http_server_total_responses,
91            http_server_total_failures,
92            http_server_duration,
93        }
94    }
95}
96
97/// A layer that records http server metrics using OpenTelemetry.
98pub struct RequestMetricsLayer<F = ()> {
99    metrics: Arc<Metrics>,
100    base_attributes: Vec<KeyValue>,
101    attributes_factory: F,
102}
103
104impl<F: fmt::Debug> fmt::Debug for RequestMetricsLayer<F> {
105    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106        f.debug_struct("RequestMetricsLayer")
107            .field("metrics", &self.metrics)
108            .field("base_attributes", &self.base_attributes)
109            .field("attributes_factory", &self.attributes_factory)
110            .finish()
111    }
112}
113
114impl<F: Clone> Clone for RequestMetricsLayer<F> {
115    fn clone(&self) -> Self {
116        RequestMetricsLayer {
117            metrics: self.metrics.clone(),
118            base_attributes: self.base_attributes.clone(),
119            attributes_factory: self.attributes_factory.clone(),
120        }
121    }
122}
123
124impl RequestMetricsLayer<()> {
125    /// Create a new [`RequestMetricsLayer`] using the global [`Meter`] provider,
126    /// with the default name and version.
127    pub fn new() -> Self {
128        Self::custom(MeterOptions::default())
129    }
130
131    /// Create a new [`RequestMetricsLayer`] using the global [`Meter`] provider,
132    /// with a custom name and version.
133    pub fn custom(opts: MeterOptions) -> Self {
134        let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
135            name: rama_utils::info::NAME.to_owned(),
136            version: rama_utils::info::VERSION.to_owned(),
137        });
138
139        let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
140        attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
141        attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
142
143        let meter = get_versioned_meter();
144        let metrics = Metrics::new(meter, opts.metric_prefix);
145
146        Self {
147            metrics: Arc::new(metrics),
148            base_attributes: attributes,
149            attributes_factory: (),
150        }
151    }
152
153    /// Attach an [`AttributesFactory`] to this [`RequestMetricsLayer`], allowing
154    /// you to inject custom attributes.
155    pub fn with_attributes<F>(self, attributes: F) -> RequestMetricsLayer<F> {
156        RequestMetricsLayer {
157            metrics: self.metrics,
158            base_attributes: self.base_attributes,
159            attributes_factory: attributes,
160        }
161    }
162}
163
164impl Default for RequestMetricsLayer {
165    fn default() -> Self {
166        Self::new()
167    }
168}
169
170fn get_versioned_meter() -> Meter {
171    global::meter_with_scope(
172        InstrumentationScope::builder(const_format::formatcp!(
173            "{}-network-http",
174            rama_utils::info::NAME
175        ))
176        .with_version(rama_utils::info::VERSION)
177        .with_schema_url(semantic_conventions::SCHEMA_URL)
178        .build(),
179    )
180}
181
182impl<S, F: Clone> Layer<S> for RequestMetricsLayer<F> {
183    type Service = RequestMetricsService<S, F>;
184
185    fn layer(&self, inner: S) -> Self::Service {
186        RequestMetricsService {
187            inner,
188            metrics: self.metrics.clone(),
189            base_attributes: self.base_attributes.clone(),
190            attributes_factory: self.attributes_factory.clone(),
191        }
192    }
193
194    fn into_layer(self, inner: S) -> Self::Service {
195        RequestMetricsService {
196            inner,
197            metrics: self.metrics,
198            base_attributes: self.base_attributes,
199            attributes_factory: self.attributes_factory,
200        }
201    }
202}
203
204/// A [`Service`] that records [http] server metrics using OpenTelemetry.
205pub struct RequestMetricsService<S, F = ()> {
206    inner: S,
207    metrics: Arc<Metrics>,
208    base_attributes: Vec<KeyValue>,
209    attributes_factory: F,
210}
211
212impl<S> RequestMetricsService<S, ()> {
213    /// Create a new [`RequestMetricsService`].
214    pub fn new(inner: S) -> Self {
215        RequestMetricsLayer::new().into_layer(inner)
216    }
217
218    define_inner_service_accessors!();
219}
220
221impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for RequestMetricsService<S, F> {
222    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223        f.debug_struct("RequestMetricsService")
224            .field("inner", &self.inner)
225            .field("metrics", &self.metrics)
226            .field("base_attributes", &self.base_attributes)
227            .field("attributes_factory", &self.attributes_factory)
228            .finish()
229    }
230}
231
232impl<S: Clone, F: Clone> Clone for RequestMetricsService<S, F> {
233    fn clone(&self) -> Self {
234        Self {
235            inner: self.inner.clone(),
236            metrics: self.metrics.clone(),
237            base_attributes: self.base_attributes.clone(),
238            attributes_factory: self.attributes_factory.clone(),
239        }
240    }
241}
242
243impl<S, F> RequestMetricsService<S, F> {
244    fn compute_attributes<Body, State>(
245        &self,
246        ctx: &mut Context<State>,
247        req: &Request<Body>,
248    ) -> Vec<KeyValue>
249    where
250        F: AttributesFactory<State>,
251    {
252        let mut attributes = self
253            .attributes_factory
254            .attributes(6 + self.base_attributes.len(), ctx);
255        attributes.extend(self.base_attributes.iter().cloned());
256
257        // server info
258        let request_ctx: Option<&mut RequestContext> = ctx
259            .get_or_try_insert_with_ctx(|ctx| (ctx, req).try_into())
260            .ok();
261        if let Some(authority) = request_ctx.as_ref().map(|rc| &rc.authority) {
262            attributes.push(KeyValue::new(
263                HTTP_REQUEST_HOST,
264                authority.host().to_string(),
265            ));
266            attributes.push(KeyValue::new(SERVER_PORT, authority.port() as i64));
267        }
268
269        // Request Info
270        if let Some(protocol) = request_ctx.as_ref().map(|rc| &rc.protocol) {
271            attributes.push(KeyValue::new(URL_SCHEME, protocol.to_string()));
272        }
273
274        // Common attrs (Request Info)
275        // <https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#common-attributes>
276
277        attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
278        if let Some(http_version) = request_ctx.as_ref().and_then(|rc| match rc.http_version {
279            rama_http_types::Version::HTTP_09 => Some("0.9"),
280            rama_http_types::Version::HTTP_10 => Some("1.0"),
281            rama_http_types::Version::HTTP_11 => Some("1.1"),
282            rama_http_types::Version::HTTP_2 => Some("2"),
283            rama_http_types::Version::HTTP_3 => Some("3"),
284            _ => None,
285        }) {
286            attributes.push(KeyValue::new(NETWORK_PROTOCOL_VERSION, http_version));
287        }
288
289        if let Some(ua) = req.headers().typed_get::<UserAgent>() {
290            attributes.push(KeyValue::new(USER_AGENT_ORIGINAL, ua.to_string()));
291        }
292
293        attributes
294    }
295}
296
297impl<S, F, State, Body> Service<State, Request<Body>> for RequestMetricsService<S, F>
298where
299    S: Service<State, Request<Body>, Response: IntoResponse>,
300    F: AttributesFactory<State>,
301    State: Clone + Send + Sync + 'static,
302    Body: Send + 'static,
303{
304    type Response = Response;
305    type Error = S::Error;
306
307    async fn serve(
308        &self,
309        mut ctx: Context<State>,
310        req: Request<Body>,
311    ) -> Result<Self::Response, Self::Error> {
312        let mut attributes: Vec<KeyValue> = self.compute_attributes(&mut ctx, &req);
313
314        self.metrics.http_server_total_requests.add(1, &attributes);
315
316        // used to compute the duration of the request
317        let timer = SystemTime::now();
318
319        let result = self.inner.serve(ctx, req).await;
320
321        match result {
322            Ok(res) => {
323                let res = res.into_response();
324
325                attributes.push(KeyValue::new(
326                    HTTP_RESPONSE_STATUS_CODE,
327                    res.status().as_u16() as i64,
328                ));
329
330                self.metrics.http_server_total_responses.add(1, &attributes);
331                self.metrics.http_server_duration.record(
332                    timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
333                    &attributes,
334                );
335
336                Ok(res)
337            }
338            Err(err) => {
339                self.metrics.http_server_total_failures.add(1, &attributes);
340
341                Err(err)
342            }
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_default_svc_compute_attributes_default() {
353        let svc = RequestMetricsService::new(());
354        let mut ctx = Context::default();
355        let req = Request::builder()
356            .uri("http://www.example.com")
357            .body(())
358            .unwrap();
359
360        let attributes = svc.compute_attributes(&mut ctx, &req);
361        assert!(
362            attributes
363                .iter()
364                .any(|attr| attr.key.as_str() == SERVICE_NAME)
365        );
366        assert!(
367            attributes
368                .iter()
369                .any(|attr| attr.key.as_str() == SERVICE_VERSION)
370        );
371        assert!(
372            attributes
373                .iter()
374                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
375        );
376    }
377
378    #[test]
379    fn test_custom_svc_compute_attributes_default() {
380        let svc = RequestMetricsLayer::custom(MeterOptions {
381            service: Some(ServiceInfo {
382                name: "test".to_owned(),
383                version: "42".to_owned(),
384            }),
385            metric_prefix: Some("foo".to_owned()),
386            ..Default::default()
387        })
388        .into_layer(());
389        let mut ctx = Context::default();
390        let req = Request::builder()
391            .uri("http://www.example.com")
392            .body(())
393            .unwrap();
394
395        let attributes = svc.compute_attributes(&mut ctx, &req);
396        assert!(
397            attributes
398                .iter()
399                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
400        );
401        assert!(
402            attributes
403                .iter()
404                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
405        );
406        assert!(
407            attributes
408                .iter()
409                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
410        );
411    }
412
413    #[test]
414    fn test_custom_svc_compute_attributes_attributes_vec() {
415        let svc = RequestMetricsLayer::custom(MeterOptions {
416            service: Some(ServiceInfo {
417                name: "test".to_owned(),
418                version: "42".to_owned(),
419            }),
420            metric_prefix: Some("foo".to_owned()),
421            ..Default::default()
422        })
423        .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
424        .into_layer(());
425        let mut ctx = Context::default();
426        let req = Request::builder()
427            .uri("http://www.example.com")
428            .body(())
429            .unwrap();
430
431        let attributes = svc.compute_attributes(&mut ctx, &req);
432        assert!(
433            attributes
434                .iter()
435                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
436        );
437        assert!(
438            attributes
439                .iter()
440                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
441        );
442        assert!(
443            attributes
444                .iter()
445                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
446        );
447        assert!(
448            attributes
449                .iter()
450                .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
451        );
452    }
453
454    #[test]
455    fn test_custom_svc_compute_attributes_attribute_fn() {
456        let svc = RequestMetricsLayer::custom(MeterOptions {
457            service: Some(ServiceInfo {
458                name: "test".to_owned(),
459                version: "42".to_owned(),
460            }),
461            metric_prefix: Some("foo".to_owned()),
462            ..Default::default()
463        })
464        .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
465            let mut attributes = Vec::with_capacity(size_hint + 1);
466            attributes.push(KeyValue::new("test", "attribute_fn"));
467            attributes
468        })
469        .into_layer(());
470        let mut ctx = Context::default();
471        let req = Request::builder()
472            .uri("http://www.example.com")
473            .body(())
474            .unwrap();
475
476        let attributes = svc.compute_attributes(&mut ctx, &req);
477        assert!(
478            attributes
479                .iter()
480                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
481        );
482        assert!(
483            attributes
484                .iter()
485                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
486        );
487        assert!(
488            attributes
489                .iter()
490                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
491        );
492        assert!(
493            attributes
494                .iter()
495                .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
496        );
497    }
498}