1use crate::{
6 IntoResponse, Request, Response,
7 headers::{HeaderMapExt, UserAgent},
8};
9use rama_core::telemetry::opentelemetry::{
10 AttributesFactory, InstrumentationScope, KeyValue, MeterOptions, ServiceInfo, global,
11 metrics::{Counter, Histogram, Meter},
12 semantic_conventions::{
13 self,
14 resource::{SERVICE_NAME, SERVICE_VERSION},
15 },
16};
17use rama_core::{Context, Layer, Service};
18use rama_net::http::RequestContext;
19use rama_utils::macros::define_inner_service_accessors;
20use std::{borrow::Cow, fmt, sync::Arc, time::SystemTime};
21
22use semantic_conventions::attribute::{
26 HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, NETWORK_PROTOCOL_VERSION, SERVER_PORT,
27 URL_SCHEME, USER_AGENT_ORIGINAL,
28};
29
30const HTTP_SERVER_DURATION: &str = "http.requests.duration";
31const HTTP_SERVER_TOTAL_REQUESTS: &str = "http.requests.total";
32const HTTP_SERVER_TOTAL_FAILURES: &str = "http.failures.total";
33const HTTP_SERVER_TOTAL_RESPONSES: &str = "http.responses.total";
34
35const HTTP_REQUEST_HOST: &str = "http.request.host";
36
37#[derive(Clone, Debug)]
43struct Metrics {
44 http_server_duration: Histogram<f64>,
45 http_server_total_requests: Counter<u64>,
46 http_server_total_responses: Counter<u64>,
47 http_server_total_failures: Counter<u64>,
48}
49
50impl Metrics {
51 fn new(meter: Meter, prefix: Option<String>) -> Self {
53 let http_server_duration = meter
54 .f64_histogram(match &prefix {
55 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_DURATION}")),
56 None => Cow::Borrowed(HTTP_SERVER_DURATION),
57 })
58 .with_description("Measures the duration of inbound HTTP requests.")
59 .with_unit("s")
60 .build();
61
62 let http_server_total_requests = meter
63 .u64_counter(match &prefix {
64 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_REQUESTS}")),
65 None => Cow::Borrowed(HTTP_SERVER_TOTAL_REQUESTS),
66 })
67 .with_description("Measures the total number of HTTP requests have been seen.")
68 .build();
69
70 let http_server_total_responses = meter
71 .u64_counter(match &prefix {
72 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_RESPONSES}")),
73 None => Cow::Borrowed(HTTP_SERVER_TOTAL_RESPONSES),
74 })
75 .with_description("Measures the total number of HTTP responses have been seen.")
76 .build();
77
78 let http_server_total_failures = meter
79 .u64_counter(match &prefix {
80 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_FAILURES}")),
81 None => Cow::Borrowed(HTTP_SERVER_TOTAL_FAILURES),
82 })
83 .with_description(
84 "Measures the total number of failed HTTP requests that have been seen.",
85 )
86 .build();
87
88 Metrics {
89 http_server_total_requests,
90 http_server_total_responses,
91 http_server_total_failures,
92 http_server_duration,
93 }
94 }
95}
96
97pub struct RequestMetricsLayer<F = ()> {
99 metrics: Arc<Metrics>,
100 base_attributes: Vec<KeyValue>,
101 attributes_factory: F,
102}
103
104impl<F: fmt::Debug> fmt::Debug for RequestMetricsLayer<F> {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 f.debug_struct("RequestMetricsLayer")
107 .field("metrics", &self.metrics)
108 .field("base_attributes", &self.base_attributes)
109 .field("attributes_factory", &self.attributes_factory)
110 .finish()
111 }
112}
113
114impl<F: Clone> Clone for RequestMetricsLayer<F> {
115 fn clone(&self) -> Self {
116 RequestMetricsLayer {
117 metrics: self.metrics.clone(),
118 base_attributes: self.base_attributes.clone(),
119 attributes_factory: self.attributes_factory.clone(),
120 }
121 }
122}
123
124impl RequestMetricsLayer<()> {
125 pub fn new() -> Self {
128 Self::custom(MeterOptions::default())
129 }
130
131 pub fn custom(opts: MeterOptions) -> Self {
134 let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
135 name: rama_utils::info::NAME.to_owned(),
136 version: rama_utils::info::VERSION.to_owned(),
137 });
138
139 let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
140 attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
141 attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
142
143 let meter = get_versioned_meter();
144 let metrics = Metrics::new(meter, opts.metric_prefix);
145
146 Self {
147 metrics: Arc::new(metrics),
148 base_attributes: attributes,
149 attributes_factory: (),
150 }
151 }
152
153 pub fn with_attributes<F>(self, attributes: F) -> RequestMetricsLayer<F> {
156 RequestMetricsLayer {
157 metrics: self.metrics,
158 base_attributes: self.base_attributes,
159 attributes_factory: attributes,
160 }
161 }
162}
163
164impl Default for RequestMetricsLayer {
165 fn default() -> Self {
166 Self::new()
167 }
168}
169
170fn get_versioned_meter() -> Meter {
171 global::meter_with_scope(
172 InstrumentationScope::builder(const_format::formatcp!(
173 "{}-network-http",
174 rama_utils::info::NAME
175 ))
176 .with_version(rama_utils::info::VERSION)
177 .with_schema_url(semantic_conventions::SCHEMA_URL)
178 .build(),
179 )
180}
181
182impl<S, F: Clone> Layer<S> for RequestMetricsLayer<F> {
183 type Service = RequestMetricsService<S, F>;
184
185 fn layer(&self, inner: S) -> Self::Service {
186 RequestMetricsService {
187 inner,
188 metrics: self.metrics.clone(),
189 base_attributes: self.base_attributes.clone(),
190 attributes_factory: self.attributes_factory.clone(),
191 }
192 }
193
194 fn into_layer(self, inner: S) -> Self::Service {
195 RequestMetricsService {
196 inner,
197 metrics: self.metrics,
198 base_attributes: self.base_attributes,
199 attributes_factory: self.attributes_factory,
200 }
201 }
202}
203
204pub struct RequestMetricsService<S, F = ()> {
206 inner: S,
207 metrics: Arc<Metrics>,
208 base_attributes: Vec<KeyValue>,
209 attributes_factory: F,
210}
211
212impl<S> RequestMetricsService<S, ()> {
213 pub fn new(inner: S) -> Self {
215 RequestMetricsLayer::new().into_layer(inner)
216 }
217
218 define_inner_service_accessors!();
219}
220
221impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for RequestMetricsService<S, F> {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 f.debug_struct("RequestMetricsService")
224 .field("inner", &self.inner)
225 .field("metrics", &self.metrics)
226 .field("base_attributes", &self.base_attributes)
227 .field("attributes_factory", &self.attributes_factory)
228 .finish()
229 }
230}
231
232impl<S: Clone, F: Clone> Clone for RequestMetricsService<S, F> {
233 fn clone(&self) -> Self {
234 Self {
235 inner: self.inner.clone(),
236 metrics: self.metrics.clone(),
237 base_attributes: self.base_attributes.clone(),
238 attributes_factory: self.attributes_factory.clone(),
239 }
240 }
241}
242
243impl<S, F> RequestMetricsService<S, F> {
244 fn compute_attributes<Body, State>(
245 &self,
246 ctx: &mut Context<State>,
247 req: &Request<Body>,
248 ) -> Vec<KeyValue>
249 where
250 F: AttributesFactory<State>,
251 {
252 let mut attributes = self
253 .attributes_factory
254 .attributes(6 + self.base_attributes.len(), ctx);
255 attributes.extend(self.base_attributes.iter().cloned());
256
257 let request_ctx: Option<&mut RequestContext> = ctx
259 .get_or_try_insert_with_ctx(|ctx| (ctx, req).try_into())
260 .ok();
261 if let Some(authority) = request_ctx.as_ref().map(|rc| &rc.authority) {
262 attributes.push(KeyValue::new(
263 HTTP_REQUEST_HOST,
264 authority.host().to_string(),
265 ));
266 attributes.push(KeyValue::new(SERVER_PORT, authority.port() as i64));
267 }
268
269 if let Some(protocol) = request_ctx.as_ref().map(|rc| &rc.protocol) {
271 attributes.push(KeyValue::new(URL_SCHEME, protocol.to_string()));
272 }
273
274 attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
278 if let Some(http_version) = request_ctx.as_ref().and_then(|rc| match rc.http_version {
279 rama_http_types::Version::HTTP_09 => Some("0.9"),
280 rama_http_types::Version::HTTP_10 => Some("1.0"),
281 rama_http_types::Version::HTTP_11 => Some("1.1"),
282 rama_http_types::Version::HTTP_2 => Some("2"),
283 rama_http_types::Version::HTTP_3 => Some("3"),
284 _ => None,
285 }) {
286 attributes.push(KeyValue::new(NETWORK_PROTOCOL_VERSION, http_version));
287 }
288
289 if let Some(ua) = req.headers().typed_get::<UserAgent>() {
290 attributes.push(KeyValue::new(USER_AGENT_ORIGINAL, ua.to_string()));
291 }
292
293 attributes
294 }
295}
296
297impl<S, F, State, Body> Service<State, Request<Body>> for RequestMetricsService<S, F>
298where
299 S: Service<State, Request<Body>, Response: IntoResponse>,
300 F: AttributesFactory<State>,
301 State: Clone + Send + Sync + 'static,
302 Body: Send + 'static,
303{
304 type Response = Response;
305 type Error = S::Error;
306
307 async fn serve(
308 &self,
309 mut ctx: Context<State>,
310 req: Request<Body>,
311 ) -> Result<Self::Response, Self::Error> {
312 let mut attributes: Vec<KeyValue> = self.compute_attributes(&mut ctx, &req);
313
314 self.metrics.http_server_total_requests.add(1, &attributes);
315
316 let timer = SystemTime::now();
318
319 let result = self.inner.serve(ctx, req).await;
320
321 match result {
322 Ok(res) => {
323 let res = res.into_response();
324
325 attributes.push(KeyValue::new(
326 HTTP_RESPONSE_STATUS_CODE,
327 res.status().as_u16() as i64,
328 ));
329
330 self.metrics.http_server_total_responses.add(1, &attributes);
331 self.metrics.http_server_duration.record(
332 timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
333 &attributes,
334 );
335
336 Ok(res)
337 }
338 Err(err) => {
339 self.metrics.http_server_total_failures.add(1, &attributes);
340
341 Err(err)
342 }
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_default_svc_compute_attributes_default() {
353 let svc = RequestMetricsService::new(());
354 let mut ctx = Context::default();
355 let req = Request::builder()
356 .uri("http://www.example.com")
357 .body(())
358 .unwrap();
359
360 let attributes = svc.compute_attributes(&mut ctx, &req);
361 assert!(
362 attributes
363 .iter()
364 .any(|attr| attr.key.as_str() == SERVICE_NAME)
365 );
366 assert!(
367 attributes
368 .iter()
369 .any(|attr| attr.key.as_str() == SERVICE_VERSION)
370 );
371 assert!(
372 attributes
373 .iter()
374 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
375 );
376 }
377
378 #[test]
379 fn test_custom_svc_compute_attributes_default() {
380 let svc = RequestMetricsLayer::custom(MeterOptions {
381 service: Some(ServiceInfo {
382 name: "test".to_owned(),
383 version: "42".to_owned(),
384 }),
385 metric_prefix: Some("foo".to_owned()),
386 ..Default::default()
387 })
388 .into_layer(());
389 let mut ctx = Context::default();
390 let req = Request::builder()
391 .uri("http://www.example.com")
392 .body(())
393 .unwrap();
394
395 let attributes = svc.compute_attributes(&mut ctx, &req);
396 assert!(
397 attributes
398 .iter()
399 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
400 );
401 assert!(
402 attributes
403 .iter()
404 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
405 );
406 assert!(
407 attributes
408 .iter()
409 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
410 );
411 }
412
413 #[test]
414 fn test_custom_svc_compute_attributes_attributes_vec() {
415 let svc = RequestMetricsLayer::custom(MeterOptions {
416 service: Some(ServiceInfo {
417 name: "test".to_owned(),
418 version: "42".to_owned(),
419 }),
420 metric_prefix: Some("foo".to_owned()),
421 ..Default::default()
422 })
423 .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
424 .into_layer(());
425 let mut ctx = Context::default();
426 let req = Request::builder()
427 .uri("http://www.example.com")
428 .body(())
429 .unwrap();
430
431 let attributes = svc.compute_attributes(&mut ctx, &req);
432 assert!(
433 attributes
434 .iter()
435 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
436 );
437 assert!(
438 attributes
439 .iter()
440 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
441 );
442 assert!(
443 attributes
444 .iter()
445 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
446 );
447 assert!(
448 attributes
449 .iter()
450 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
451 );
452 }
453
454 #[test]
455 fn test_custom_svc_compute_attributes_attribute_fn() {
456 let svc = RequestMetricsLayer::custom(MeterOptions {
457 service: Some(ServiceInfo {
458 name: "test".to_owned(),
459 version: "42".to_owned(),
460 }),
461 metric_prefix: Some("foo".to_owned()),
462 ..Default::default()
463 })
464 .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
465 let mut attributes = Vec::with_capacity(size_hint + 1);
466 attributes.push(KeyValue::new("test", "attribute_fn"));
467 attributes
468 })
469 .into_layer(());
470 let mut ctx = Context::default();
471 let req = Request::builder()
472 .uri("http://www.example.com")
473 .body(())
474 .unwrap();
475
476 let attributes = svc.compute_attributes(&mut ctx, &req);
477 assert!(
478 attributes
479 .iter()
480 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
481 );
482 assert!(
483 attributes
484 .iter()
485 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
486 );
487 assert!(
488 attributes
489 .iter()
490 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
491 );
492 assert!(
493 attributes
494 .iter()
495 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
496 );
497 }
498}