1use crate::service::web::response::IntoResponse;
6use crate::{Request, Response};
7use rama_core::telemetry::opentelemetry::{
8 AttributesFactory, InstrumentationScope, KeyValue, MeterOptions, ServiceInfo, global,
9 metrics::{Counter, Histogram, Meter},
10 semantic_conventions::{
11 self,
12 resource::{SERVICE_NAME, SERVICE_VERSION},
13 },
14};
15use rama_core::{Context, Layer, Service};
16use rama_net::http::RequestContext;
17use rama_utils::macros::define_inner_service_accessors;
18use std::{borrow::Cow, fmt, sync::Arc, time::SystemTime};
19
20use semantic_conventions::attribute::{
24 HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, NETWORK_PROTOCOL_VERSION, SERVER_PORT,
25 URL_SCHEME,
26};
27
28const HTTP_SERVER_DURATION: &str = "http.requests.duration";
29const HTTP_SERVER_TOTAL_REQUESTS: &str = "http.requests.total";
30const HTTP_SERVER_TOTAL_FAILURES: &str = "http.failures.total";
31const HTTP_SERVER_TOTAL_RESPONSES: &str = "http.responses.total";
32
33const HTTP_REQUEST_HOST: &str = "http.request.host";
34
35#[derive(Clone, Debug)]
41struct Metrics {
42 http_server_duration: Histogram<f64>,
43 http_server_total_requests: Counter<u64>,
44 http_server_total_responses: Counter<u64>,
45 http_server_total_failures: Counter<u64>,
46}
47
48impl Metrics {
49 fn new(meter: Meter, prefix: Option<String>) -> Self {
51 let http_server_duration = meter
52 .f64_histogram(match &prefix {
53 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_DURATION}")),
54 None => Cow::Borrowed(HTTP_SERVER_DURATION),
55 })
56 .with_description("Measures the duration of inbound HTTP requests.")
57 .with_unit("s")
58 .build();
59
60 let http_server_total_requests = meter
61 .u64_counter(match &prefix {
62 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_REQUESTS}")),
63 None => Cow::Borrowed(HTTP_SERVER_TOTAL_REQUESTS),
64 })
65 .with_description("Measures the total number of HTTP requests have been seen.")
66 .build();
67
68 let http_server_total_responses = meter
69 .u64_counter(match &prefix {
70 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_RESPONSES}")),
71 None => Cow::Borrowed(HTTP_SERVER_TOTAL_RESPONSES),
72 })
73 .with_description("Measures the total number of HTTP responses have been seen.")
74 .build();
75
76 let http_server_total_failures = meter
77 .u64_counter(match &prefix {
78 Some(prefix) => Cow::Owned(format!("{prefix}.{HTTP_SERVER_TOTAL_FAILURES}")),
79 None => Cow::Borrowed(HTTP_SERVER_TOTAL_FAILURES),
80 })
81 .with_description(
82 "Measures the total number of failed HTTP requests that have been seen.",
83 )
84 .build();
85
86 Metrics {
87 http_server_total_requests,
88 http_server_total_responses,
89 http_server_total_failures,
90 http_server_duration,
91 }
92 }
93}
94
95pub struct RequestMetricsLayer<F = ()> {
97 metrics: Arc<Metrics>,
98 base_attributes: Vec<KeyValue>,
99 attributes_factory: F,
100}
101
102impl<F: fmt::Debug> fmt::Debug for RequestMetricsLayer<F> {
103 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
104 f.debug_struct("RequestMetricsLayer")
105 .field("metrics", &self.metrics)
106 .field("base_attributes", &self.base_attributes)
107 .field("attributes_factory", &self.attributes_factory)
108 .finish()
109 }
110}
111
112impl<F: Clone> Clone for RequestMetricsLayer<F> {
113 fn clone(&self) -> Self {
114 RequestMetricsLayer {
115 metrics: self.metrics.clone(),
116 base_attributes: self.base_attributes.clone(),
117 attributes_factory: self.attributes_factory.clone(),
118 }
119 }
120}
121
122impl RequestMetricsLayer<()> {
123 pub fn new() -> Self {
126 Self::custom(MeterOptions::default())
127 }
128
129 pub fn custom(opts: MeterOptions) -> Self {
132 let service_info = opts.service.unwrap_or_else(|| ServiceInfo {
133 name: rama_utils::info::NAME.to_owned(),
134 version: rama_utils::info::VERSION.to_owned(),
135 });
136
137 let mut attributes = opts.attributes.unwrap_or_else(|| Vec::with_capacity(2));
138 attributes.push(KeyValue::new(SERVICE_NAME, service_info.name.clone()));
139 attributes.push(KeyValue::new(SERVICE_VERSION, service_info.version.clone()));
140
141 let meter = get_versioned_meter();
142 let metrics = Metrics::new(meter, opts.metric_prefix);
143
144 Self {
145 metrics: Arc::new(metrics),
146 base_attributes: attributes,
147 attributes_factory: (),
148 }
149 }
150
151 pub fn with_attributes<F>(self, attributes: F) -> RequestMetricsLayer<F> {
154 RequestMetricsLayer {
155 metrics: self.metrics,
156 base_attributes: self.base_attributes,
157 attributes_factory: attributes,
158 }
159 }
160}
161
162impl Default for RequestMetricsLayer {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168fn get_versioned_meter() -> Meter {
169 global::meter_with_scope(
170 InstrumentationScope::builder(const_format::formatcp!(
171 "{}-network-http",
172 rama_utils::info::NAME
173 ))
174 .with_version(rama_utils::info::VERSION)
175 .with_schema_url(semantic_conventions::SCHEMA_URL)
176 .build(),
177 )
178}
179
180impl<S, F: Clone> Layer<S> for RequestMetricsLayer<F> {
181 type Service = RequestMetricsService<S, F>;
182
183 fn layer(&self, inner: S) -> Self::Service {
184 RequestMetricsService {
185 inner,
186 metrics: self.metrics.clone(),
187 base_attributes: self.base_attributes.clone(),
188 attributes_factory: self.attributes_factory.clone(),
189 }
190 }
191
192 fn into_layer(self, inner: S) -> Self::Service {
193 RequestMetricsService {
194 inner,
195 metrics: self.metrics,
196 base_attributes: self.base_attributes,
197 attributes_factory: self.attributes_factory,
198 }
199 }
200}
201
202pub struct RequestMetricsService<S, F = ()> {
204 inner: S,
205 metrics: Arc<Metrics>,
206 base_attributes: Vec<KeyValue>,
207 attributes_factory: F,
208}
209
210impl<S> RequestMetricsService<S, ()> {
211 pub fn new(inner: S) -> Self {
213 RequestMetricsLayer::new().into_layer(inner)
214 }
215
216 define_inner_service_accessors!();
217}
218
219impl<S: fmt::Debug, F: fmt::Debug> fmt::Debug for RequestMetricsService<S, F> {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.debug_struct("RequestMetricsService")
222 .field("inner", &self.inner)
223 .field("metrics", &self.metrics)
224 .field("base_attributes", &self.base_attributes)
225 .field("attributes_factory", &self.attributes_factory)
226 .finish()
227 }
228}
229
230impl<S: Clone, F: Clone> Clone for RequestMetricsService<S, F> {
231 fn clone(&self) -> Self {
232 Self {
233 inner: self.inner.clone(),
234 metrics: self.metrics.clone(),
235 base_attributes: self.base_attributes.clone(),
236 attributes_factory: self.attributes_factory.clone(),
237 }
238 }
239}
240
241impl<S, F> RequestMetricsService<S, F> {
242 fn compute_attributes<Body, State>(
243 &self,
244 ctx: &mut Context<State>,
245 req: &Request<Body>,
246 ) -> Vec<KeyValue>
247 where
248 F: AttributesFactory<State>,
249 {
250 let mut attributes = self
251 .attributes_factory
252 .attributes(5 + self.base_attributes.len(), ctx);
253 attributes.extend(self.base_attributes.iter().cloned());
254
255 let request_ctx: Option<&mut RequestContext> = ctx
257 .get_or_try_insert_with_ctx(|ctx| (ctx, req).try_into())
258 .ok();
259 if let Some(authority) = request_ctx.as_ref().map(|rc| &rc.authority) {
260 attributes.push(KeyValue::new(
261 HTTP_REQUEST_HOST,
262 authority.host().to_string(),
263 ));
264 attributes.push(KeyValue::new(SERVER_PORT, authority.port() as i64));
265 }
266
267 if let Some(protocol) = request_ctx.as_ref().map(|rc| &rc.protocol) {
269 attributes.push(KeyValue::new(URL_SCHEME, protocol.to_string()));
270 }
271
272 attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
273 if let Some(http_version) = request_ctx.as_ref().and_then(|rc| match rc.http_version {
274 rama_http_types::Version::HTTP_09 => Some("0.9"),
275 rama_http_types::Version::HTTP_10 => Some("1.0"),
276 rama_http_types::Version::HTTP_11 => Some("1.1"),
277 rama_http_types::Version::HTTP_2 => Some("2"),
278 rama_http_types::Version::HTTP_3 => Some("3"),
279 _ => None,
280 }) {
281 attributes.push(KeyValue::new(NETWORK_PROTOCOL_VERSION, http_version));
282 }
283
284 attributes
285 }
286}
287
288impl<S, F, State, Body> Service<State, Request<Body>> for RequestMetricsService<S, F>
289where
290 S: Service<State, Request<Body>, Response: IntoResponse>,
291 F: AttributesFactory<State>,
292 State: Clone + Send + Sync + 'static,
293 Body: Send + 'static,
294{
295 type Response = Response;
296 type Error = S::Error;
297
298 async fn serve(
299 &self,
300 mut ctx: Context<State>,
301 req: Request<Body>,
302 ) -> Result<Self::Response, Self::Error> {
303 let mut attributes: Vec<KeyValue> = self.compute_attributes(&mut ctx, &req);
304
305 self.metrics.http_server_total_requests.add(1, &attributes);
306
307 let timer = SystemTime::now();
309
310 let result = self.inner.serve(ctx, req).await;
311
312 match result {
313 Ok(res) => {
314 let res = res.into_response();
315
316 attributes.push(KeyValue::new(
317 HTTP_RESPONSE_STATUS_CODE,
318 res.status().as_u16() as i64,
319 ));
320
321 self.metrics.http_server_total_responses.add(1, &attributes);
322 self.metrics.http_server_duration.record(
323 timer.elapsed().map(|t| t.as_secs_f64()).unwrap_or_default(),
324 &attributes,
325 );
326
327 Ok(res)
328 }
329 Err(err) => {
330 self.metrics.http_server_total_failures.add(1, &attributes);
331
332 Err(err)
333 }
334 }
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn test_default_svc_compute_attributes_default() {
344 let svc = RequestMetricsService::new(());
345 let mut ctx = Context::default();
346 let req = Request::builder()
347 .uri("http://www.example.com")
348 .body(())
349 .unwrap();
350
351 let attributes = svc.compute_attributes(&mut ctx, &req);
352 assert!(
353 attributes
354 .iter()
355 .any(|attr| attr.key.as_str() == SERVICE_NAME)
356 );
357 assert!(
358 attributes
359 .iter()
360 .any(|attr| attr.key.as_str() == SERVICE_VERSION)
361 );
362 assert!(
363 attributes
364 .iter()
365 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
366 );
367 }
368
369 #[test]
370 fn test_custom_svc_compute_attributes_default() {
371 let svc = RequestMetricsLayer::custom(MeterOptions {
372 service: Some(ServiceInfo {
373 name: "test".to_owned(),
374 version: "42".to_owned(),
375 }),
376 metric_prefix: Some("foo".to_owned()),
377 ..Default::default()
378 })
379 .into_layer(());
380 let mut ctx = Context::default();
381 let req = Request::builder()
382 .uri("http://www.example.com")
383 .body(())
384 .unwrap();
385
386 let attributes = svc.compute_attributes(&mut ctx, &req);
387 assert!(
388 attributes
389 .iter()
390 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
391 );
392 assert!(
393 attributes
394 .iter()
395 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
396 );
397 assert!(
398 attributes
399 .iter()
400 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
401 );
402 }
403
404 #[test]
405 fn test_custom_svc_compute_attributes_attributes_vec() {
406 let svc = RequestMetricsLayer::custom(MeterOptions {
407 service: Some(ServiceInfo {
408 name: "test".to_owned(),
409 version: "42".to_owned(),
410 }),
411 metric_prefix: Some("foo".to_owned()),
412 ..Default::default()
413 })
414 .with_attributes(vec![KeyValue::new("test", "attribute_fn")])
415 .into_layer(());
416 let mut ctx = Context::default();
417 let req = Request::builder()
418 .uri("http://www.example.com")
419 .body(())
420 .unwrap();
421
422 let attributes = svc.compute_attributes(&mut ctx, &req);
423 assert!(
424 attributes
425 .iter()
426 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
427 );
428 assert!(
429 attributes
430 .iter()
431 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
432 );
433 assert!(
434 attributes
435 .iter()
436 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
437 );
438 assert!(
439 attributes
440 .iter()
441 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
442 );
443 }
444
445 #[test]
446 fn test_custom_svc_compute_attributes_attribute_fn() {
447 let svc = RequestMetricsLayer::custom(MeterOptions {
448 service: Some(ServiceInfo {
449 name: "test".to_owned(),
450 version: "42".to_owned(),
451 }),
452 metric_prefix: Some("foo".to_owned()),
453 ..Default::default()
454 })
455 .with_attributes(|size_hint: usize, _ctx: &Context<()>| {
456 let mut attributes = Vec::with_capacity(size_hint + 1);
457 attributes.push(KeyValue::new("test", "attribute_fn"));
458 attributes
459 })
460 .into_layer(());
461 let mut ctx = Context::default();
462 let req = Request::builder()
463 .uri("http://www.example.com")
464 .body(())
465 .unwrap();
466
467 let attributes = svc.compute_attributes(&mut ctx, &req);
468 assert!(
469 attributes
470 .iter()
471 .any(|attr| attr.key.as_str() == SERVICE_NAME && attr.value.as_str() == "test")
472 );
473 assert!(
474 attributes
475 .iter()
476 .any(|attr| attr.key.as_str() == SERVICE_VERSION && attr.value.as_str() == "42")
477 );
478 assert!(
479 attributes
480 .iter()
481 .any(|attr| attr.key.as_str() == HTTP_REQUEST_HOST)
482 );
483 assert!(
484 attributes
485 .iter()
486 .any(|attr| attr.key.as_str() == "test" && attr.value.as_str() == "attribute_fn")
487 );
488 }
489}