rama_http/layer/
opentelemetry.rs

1//! Http OpenTelemetry [`Layer`] Support for Rama.
2//!
3//! [`Layer`]: rama_core::Layer
4
5use crate::service::web::response::IntoResponse;
6use crate::{Request, Response};
7use rama_core::telemetry::opentelemetry::{
8    AttributesFactory, InstrumentationScope, KeyValue, MeterOptions, ServiceInfo, global,
9    metrics::{Counter, Histogram, Meter},
10    semantic_conventions::{
11        self,
12        resource::{SERVICE_NAME, SERVICE_VERSION},
13    },
14};
15use rama_core::{Context, Layer, Service};
16use rama_net::http::RequestContext;
17use rama_utils::macros::define_inner_service_accessors;
18use std::{borrow::Cow, fmt, sync::Arc, time::SystemTime};
19
20// Follows the experimental semantic conventions for HTTP metrics:
21// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md
22
23use semantic_conventions::attribute::{
24    HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, NETWORK_PROTOCOL_VERSION, SERVER_PORT,
25    URL_SCHEME,
26};
27
28const HTTP_SERVER_DURATION: &str = "http.requests.duration";
29const HTTP_SERVER_TOTAL_REQUESTS: &str = "http.requests.total";
30const HTTP_SERVER_TOTAL_FAILURES: &str = "http.failures.total";
31const HTTP_SERVER_TOTAL_RESPONSES: &str = "http.responses.total";
32
33const HTTP_REQUEST_HOST: &str = "http.request.host";
34
35/// Records http server metrics
36///
37/// See the [spec] for details.
38///
39/// [spec]: https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-metrics.md#http-server
40#[derive(Clone, Debug)]
41struct Metrics {
42    http_server_duration: Histogram<f64>,
43    http_server_total_requests: Counter<u64>,
44    http_server_total_responses: Counter<u64>,
45    http_server_total_failures: Counter<u64>,
46}
47
48impl Metrics {
49    /// Create a new [`RequestMetrics`]
50    fn new(meter: Meter, prefix: Option<String>) -> Self {
51        let http_server_duration = meter
52            .f64_histogram(match &prefix {
53                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_DURATION}")),
54                None => Cow::Borrowed(HTTP_SERVER_DURATION),
55            })
56            .with_description("Measures the duration of inbound HTTP requests.")
57            .with_unit("s")
58            .build();
59
60        let http_server_total_requests = meter
61            .u64_counter(match &prefix {
62                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_REQUESTS}")),
63                None => Cow::Borrowed(HTTP_SERVER_TOTAL_REQUESTS),
64            })
65            .with_description("Measures the total number of HTTP requests have been seen.")
66            .build();
67
68        let http_server_total_responses = meter
69            .u64_counter(match &prefix {
70                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_RESPONSES}")),
71                None => Cow::Borrowed(HTTP_SERVER_TOTAL_RESPONSES),
72            })
73            .with_description("Measures the total number of HTTP responses have been seen.")
74            .build();
75
76        let http_server_total_failures = meter
77            .u64_counter(match &prefix {
78                Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_FAILURES}")),
79                None => Cow::Borrowed(HTTP_SERVER_TOTAL_FAILURES),
80            })
81            .with_description(
82                "Measures the total number of failed HTTP requests that have been seen.",
83            )
84            .build();
85
86        Metrics {
87            http_server_total_requests,
88            http_server_total_responses,
89            http_server_total_failures,
90            http_server_duration,
91        }
92    }
93}
94
95/// A layer that records http server metrics using OpenTelemetry.
96pub struct RequestMetricsLayer<F = ()> {
97    metrics: Arc<Metrics>,
98    base_attributes: Vec<KeyValue>,
99    attributes_factory: F,
100}
101
102impl<F: fmt::Debug> fmt::Debug for RequestMetricsLayer<F> {
103    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
104        f.debug_struct("RequestMetricsLayer")
105            .field("metrics", &self.metrics)
106            .field("base_attributes", &self.base_attributes)
107            .field("attributes_factory", &self.attributes_factory)
108            .finish()
109    }
110}
111
112impl<F: Clone> Clone for RequestMetricsLayer<F> {
113    fn clone(&self) -> Self {
114        RequestMetricsLayer {
115            metrics: self.metrics.clone(),
116            base_attributes: self.base_attributes.clone(),
117            attributes_factory: self.attributes_factory.clone(),
118        }
119    }
120}
121
122impl RequestMetricsLayer<()> {
123    /// Create a new [`RequestMetricsLayer`] using the global [`Meter`] provider,
124    /// with the default name and version.
125    pub fn new() -> Self {
126        Self::custom(MeterOptions::default())
127    }
128
129    /// Create a new [`RequestMetricsLayer`] using the global [`Meter`] provider,
130    /// with a custom name and version.
131    pub fn custom(opts: MeterOptions) -> Self {
132        let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
133            name: rama_utils::info::NAME.to_owned(),
134            version: rama_utils::info::VERSION.to_owned(),
135        });
136
137        let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
138        attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
139        attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
140
141        let meter = get_versioned_meter();
142        let metrics = Metrics::new(meter, opts.metric_prefix);
143
144        Self {
145            metrics: Arc::new(metrics),
146            base_attributes: attributes,
147            attributes_factory: (),
148        }
149    }
150
151    /// Attach an [`AttributesFactory`] to this [`RequestMetricsLayer`], allowing
152    /// you to inject custom attributes.
153    pub fn with_attributes<F>(self, attributes: F) -> RequestMetricsLayer<F> {
154        RequestMetricsLayer {
155            metrics: self.metrics,
156            base_attributes: self.base_attributes,
157            attributes_factory: attributes,
158        }
159    }
160}
161
162impl Default for RequestMetricsLayer {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168fn get_versioned_meter() -> Meter {
169    global::meter_with_scope(
170        InstrumentationScope::builder(const_format::formatcp!(
171            "{}-network-http",
172            rama_utils::info::NAME
173        ))
174        .with_version(rama_utils::info::VERSION)
175        .with_schema_url(semantic_conventions::SCHEMA_URL)
176        .build(),
177    )
178}
179
180impl<S, F: Clone> Layer<S> for RequestMetricsLayer<F> {
181    type Service = RequestMetricsService<S, F>;
182
183    fn layer(&self, inner: S) -> Self::Service {
184        RequestMetricsService {
185            inner,
186            metrics: self.metrics.clone(),
187            base_attributes: self.base_attributes.clone(),
188            attributes_factory: self.attributes_factory.clone(),
189        }
190    }
191
192    fn into_layer(self, inner: S) -> Self::Service {
193        RequestMetricsService {
194            inner,
195            metrics: self.metrics,
196            base_attributes: self.base_attributes,
197            attributes_factory: self.attributes_factory,
198        }
199    }
200}
201
202/// A [`Service`] that records [http] server metrics using OpenTelemetry.
203pub struct RequestMetricsService<S, F = ()> {
204    inner: S,
205    metrics: Arc<Metrics>,
206    base_attributes: Vec<KeyValue>,
207    attributes_factory: F,
208}
209
210impl<S> RequestMetricsService<S, ()> {
211    /// Create a new [`RequestMetricsService`].
212    pub fn new(inner: S) -> Self {
213        RequestMetricsLayer::new().into_layer(inner)
214    }
215
216    define_inner_service_accessors!();
217}
218
219impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for RequestMetricsService<S, F> {
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("RequestMetricsService")
222            .field("inner", &self.inner)
223            .field("metrics", &self.metrics)
224            .field("base_attributes", &self.base_attributes)
225            .field("attributes_factory", &self.attributes_factory)
226            .finish()
227    }
228}
229
230impl<S: Clone, F: Clone> Clone for RequestMetricsService<S, F> {
231    fn clone(&self) -> Self {
232        Self {
233            inner: self.inner.clone(),
234            metrics: self.metrics.clone(),
235            base_attributes: self.base_attributes.clone(),
236            attributes_factory: self.attributes_factory.clone(),
237        }
238    }
239}
240
241impl<S, F> RequestMetricsService<S, F> {
242    fn compute_attributes<Body, State>(
243        &self,
244        ctx: &mut Context<State>,
245        req: &Request<Body>,
246    ) -> Vec<KeyValue>
247    where
248        F: AttributesFactory<State>,
249    {
250        let mut attributes = self
251            .attributes_factory
252            .attributes(5 + self.base_attributes.len(), ctx);
253        attributes.extend(self.base_attributes.iter().cloned());
254
255        // server info
256        let request_ctx: Option<&mut RequestContext> = ctx
257            .get_or_try_insert_with_ctx(|ctx| (ctx, req).try_into())
258            .ok();
259        if let Some(authority) = request_ctx.as_ref().map(|rc| &rc.authority) {
260            attributes.push(KeyValue::new(
261                HTTP_REQUEST_HOST,
262                authority.host().to_string(),
263            ));
264            attributes.push(KeyValue::new(SERVER_PORT, authority.port() as i64));
265        }
266
267        // Request Info
268        if let Some(protocol) = request_ctx.as_ref().map(|rc| &rc.protocol) {
269            attributes.push(KeyValue::new(URL_SCHEME, protocol.to_string()));
270        }
271
272        attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
273        if let Some(http_version) = request_ctx.as_ref().and_then(|rc| match rc.http_version {
274            rama_http_types::Version::HTTP_09 => Some("0.9"),
275            rama_http_types::Version::HTTP_10 => Some("1.0"),
276            rama_http_types::Version::HTTP_11 => Some("1.1"),
277            rama_http_types::Version::HTTP_2 => Some("2"),
278            rama_http_types::Version::HTTP_3 => Some("3"),
279            _ => None,
280        }) {
281            attributes.push(KeyValue::new(NETWORK_PROTOCOL_VERSION, http_version));
282        }
283
284        attributes
285    }
286}
287
288impl<S, F, State, Body> Service<State, Request<Body>> for RequestMetricsService<S, F>
289where
290    S: Service<State, Request<Body>, Response: IntoResponse>,
291    F: AttributesFactory<State>,
292    State: Clone + Send + Sync + 'static,
293    Body: Send + 'static,
294{
295    type Response = Response;
296    type Error = S::Error;
297
298    async fn serve(
299        &self,
300        mut ctx: Context<State>,
301        req: Request<Body>,
302    ) -> Result<Self::Response, Self::Error> {
303        let mut attributes: Vec<KeyValue> = self.compute_attributes(&mut ctx, &req);
304
305        self.metrics.http_server_total_requests.add(1, &attributes);
306
307        // used to compute the duration of the request
308        let timer = SystemTime::now();
309
310        let result = self.inner.serve(ctx, req).await;
311
312        match result {
313            Ok(res) => {
314                let res = res.into_response();
315
316                attributes.push(KeyValue::new(
317                    HTTP_RESPONSE_STATUS_CODE,
318                    res.status().as_u16() as i64,
319                ));
320
321                self.metrics.http_server_total_responses.add(1, &attributes);
322                self.metrics.http_server_duration.record(
323                    timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
324                    &attributes,
325                );
326
327                Ok(res)
328            }
329            Err(err) => {
330                self.metrics.http_server_total_failures.add(1, &attributes);
331
332                Err(err)
333            }
334        }
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn test_default_svc_compute_attributes_default() {
344        let svc = RequestMetricsService::new(());
345        let mut ctx = Context::default();
346        let req = Request::builder()
347            .uri("http://www.example.com")
348            .body(())
349            .unwrap();
350
351        let attributes = svc.compute_attributes(&mut ctx, &req);
352        assert!(
353            attributes
354                .iter()
355                .any(|attr| attr.key.as_str() == SERVICE_NAME)
356        );
357        assert!(
358            attributes
359                .iter()
360                .any(|attr| attr.key.as_str() == SERVICE_VERSION)
361        );
362        assert!(
363            attributes
364                .iter()
365                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
366        );
367    }
368
369    #[test]
370    fn test_custom_svc_compute_attributes_default() {
371        let svc = RequestMetricsLayer::custom(MeterOptions {
372            service: Some(ServiceInfo {
373                name: "test".to_owned(),
374                version: "42".to_owned(),
375            }),
376            metric_prefix: Some("foo".to_owned()),
377            ..Default::default()
378        })
379        .into_layer(());
380        let mut ctx = Context::default();
381        let req = Request::builder()
382            .uri("http://www.example.com")
383            .body(())
384            .unwrap();
385
386        let attributes = svc.compute_attributes(&mut ctx, &req);
387        assert!(
388            attributes
389                .iter()
390                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
391        );
392        assert!(
393            attributes
394                .iter()
395                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
396        );
397        assert!(
398            attributes
399                .iter()
400                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
401        );
402    }
403
404    #[test]
405    fn test_custom_svc_compute_attributes_attributes_vec() {
406        let svc = RequestMetricsLayer::custom(MeterOptions {
407            service: Some(ServiceInfo {
408                name: "test".to_owned(),
409                version: "42".to_owned(),
410            }),
411            metric_prefix: Some("foo".to_owned()),
412            ..Default::default()
413        })
414        .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
415        .into_layer(());
416        let mut ctx = Context::default();
417        let req = Request::builder()
418            .uri("http://www.example.com")
419            .body(())
420            .unwrap();
421
422        let attributes = svc.compute_attributes(&mut ctx, &req);
423        assert!(
424            attributes
425                .iter()
426                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
427        );
428        assert!(
429            attributes
430                .iter()
431                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
432        );
433        assert!(
434            attributes
435                .iter()
436                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
437        );
438        assert!(
439            attributes
440                .iter()
441                .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
442        );
443    }
444
445    #[test]
446    fn test_custom_svc_compute_attributes_attribute_fn() {
447        let svc = RequestMetricsLayer::custom(MeterOptions {
448            service: Some(ServiceInfo {
449                name: "test".to_owned(),
450                version: "42".to_owned(),
451            }),
452            metric_prefix: Some("foo".to_owned()),
453            ..Default::default()
454        })
455        .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
456            let mut attributes = Vec::with_capacity(size_hint + 1);
457            attributes.push(KeyValue::new("test", "attribute_fn"));
458            attributes
459        })
460        .into_layer(());
461        let mut ctx = Context::default();
462        let req = Request::builder()
463            .uri("http://www.example.com")
464            .body(())
465            .unwrap();
466
467        let attributes = svc.compute_attributes(&mut ctx, &req);
468        assert!(
469            attributes
470                .iter()
471                .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
472        );
473        assert!(
474            attributes
475                .iter()
476                .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
477        );
478        assert!(
479            attributes
480                .iter()
481                .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
482        );
483        assert!(
484            attributes
485                .iter()
486                .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
487        );
488    }
489}