1#![warn(missing_docs)]
40#![allow(clippy::style)]
41
42pub use http;
43mod grpc;
44mod headers;
45#[cfg(feature = "opentelemetry")]
46pub mod opentelemetry;
47#[cfg(feature = "datadog")]
48pub mod datadog;
49
50use std::net::IpAddr;
51use core::{cmp, fmt, ptr, task};
52use core::pin::Pin;
53use core::future::Future;
54
55pub use tracing;
56
57pub const REQUEST_ID: http::HeaderName = http::HeaderName::from_static("x-request-id");
59pub type MakeSpan = fn() -> tracing::Span;
61
62#[derive(Copy, Clone, PartialEq, Eq)]
63pub enum Protocol {
65 Http,
69 Grpc,
71}
72
73impl Protocol {
74 #[inline(always)]
75 pub fn from_content_type(typ: &[u8]) -> Self {
77 if typ.starts_with(b"application/grpc") {
78 Self::Grpc
79 } else {
80 Self::Http
81 }
82 }
83
84 #[inline(always)]
85 pub const fn as_str(&self) -> &'static str {
87 match self {
88 Self::Grpc => "grpc",
89 Self::Http => "http"
90 }
91 }
92}
93
94impl fmt::Debug for Protocol {
95 #[inline(always)]
96 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
97 fmt::Debug::fmt(self.as_str(), fmt)
98 }
99}
100
101impl fmt::Display for Protocol {
102 #[inline(always)]
103 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
104 fmt::Display::fmt(self.as_str(), fmt)
105 }
106}
107
108type RequestIdBuffer = [u8; 64];
109
110#[derive(Clone)]
111pub struct RequestId {
115 buffer: RequestIdBuffer,
116 len: u8,
117}
118
119impl RequestId {
120 fn from_bytes(bytes: &[u8]) -> Self {
121 let mut buffer: RequestIdBuffer = [0; 64];
122
123 let len = cmp::min(buffer.len(), bytes.len());
124
125 unsafe {
126 ptr::copy_nonoverlapping(bytes.as_ptr(), buffer.as_mut_ptr(), len)
127 };
128
129 Self {
130 buffer,
131 len: len as _,
132 }
133 }
134
135 fn from_uuid(uuid: uuid::Uuid) -> Self {
136 let mut buffer: RequestIdBuffer = [0; 64];
137 let uuid = uuid.as_hyphenated();
138 let len = uuid.encode_lower(&mut buffer).len();
139
140 Self {
141 buffer,
142 len: len as _,
143 }
144 }
145
146 #[inline]
147 pub const fn as_bytes(&self) -> &[u8] {
149 unsafe {
150 core::slice::from_raw_parts(self.buffer.as_ptr(), self.len as _)
151 }
152 }
153
154 #[inline(always)]
155 pub const fn as_str(&self) -> Option<&str> {
157 match core::str::from_utf8(self.as_bytes()) {
158 Ok(header) => Some(header),
159 Err(_) => None,
160 }
161 }
162}
163
164impl fmt::Debug for RequestId {
165 #[inline(always)]
166 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
167 match self.as_str() {
168 Some(id) => fmt::Debug::fmt(id, fmt),
169 None => fmt::Debug::fmt(self.as_bytes(), fmt),
170 }
171 }
172}
173
174impl fmt::Display for RequestId {
175 #[inline(always)]
176 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
177 match self.as_str() {
178 Some(id) => fmt::Display::fmt(id, fmt),
179 None => fmt::Display::fmt("<non-utf8>", fmt),
180 }
181 }
182}
183
184#[macro_export]
185macro_rules! make_request_spanner {
226 ($fn:ident($name:literal, $level:expr)) => {
227 $crate::make_request_spanner!($fn($name, $level,));
228 };
229 ($fn:ident($name:literal, $level:expr, $($fields:tt)*)) => {
230 #[track_caller]
231 pub fn $fn() -> $crate::tracing::Span {
232 use $crate::tracing::field;
233
234 $crate::tracing::span!(
235 $level,
236 $name,
237 span.kind = "server",
239 http.request.method = field::Empty,
241 url.path = field::Empty,
242 url.query = field::Empty,
243 url.scheme = field::Empty,
244 http.request_id = field::Empty,
245 user_agent.original = field::Empty,
246 http.headers = field::Empty,
247 network.protocol.name = field::Empty,
248 network.protocol.version = field::Empty,
249 client.address = field::Empty,
251 http.response.status_code = field::Empty,
253 error.type = field::Empty,
254 error.message = field::Empty,
255 $(
256 $fields
257 )*
258 )
259 }
260 };
261}
262
263#[derive(Clone, Debug)]
264pub struct RequestInfo {
268 pub protocol: Protocol,
270 pub request_id: RequestId,
272 pub client_ip: Option<IpAddr>,
274}
275
276pub struct RequestSpan {
280 pub span: tracing::Span,
282 pub info: RequestInfo,
284}
285
286impl RequestSpan {
287 pub fn new(context: &impl LayerContext, span: tracing::Span, parts: &http::request::Parts) -> Self {
289 let _entered = span.enter();
290
291 let client_ip = context.extract_client_ip(&span, parts);
292 let protocol = parts.headers
293 .get(http::header::CONTENT_TYPE)
294 .map_or(Protocol::Http, |content_type| Protocol::from_content_type(content_type.as_bytes()));
295
296 let request_id = if let Some(request_id) = parts.headers.get(REQUEST_ID) {
297 RequestId::from_bytes(request_id.as_bytes())
298 } else {
299 RequestId::from_uuid(uuid::Uuid::new_v4())
300 };
301
302 if let Some(user_agent) = parts.headers.get(http::header::USER_AGENT).and_then(|header| header.to_str().ok()) {
303 span.record("user_agent.original", user_agent);
304 }
305 span.record("http.request.method", parts.method.as_str());
306 span.record("url.path", parts.uri.path());
307 if let Some(query) = parts.uri.query() {
308 span.record("url.query", query);
309 }
310 if let Some(scheme) = parts.uri.scheme() {
311 span.record("url.scheme", scheme.as_str());
312 }
313 if let Some(request_id) = request_id.as_str() {
314 span.record("http.request_id", &request_id);
315 } else {
316 span.record("http.request_id", request_id.as_bytes());
317 }
318 if let Some(client_ip) = client_ip {
319 span.record("client.address", tracing::field::display(client_ip));
320 }
321 span.record("network.protocol.name", protocol.as_str());
322 if let Protocol::Http = protocol {
323 match parts.version {
324 http::Version::HTTP_09 => span.record("network.protocol.version", 0.9),
325 http::Version::HTTP_10 => span.record("network.protocol.version", 1.0),
326 http::Version::HTTP_11 => span.record("network.protocol.version", 1.1),
327 http::Version::HTTP_2 => span.record("network.protocol.version", 2),
328 http::Version::HTTP_3 => span.record("network.protocol.version", 3),
329 _ => span.record("network.protocol.version", 0),
331 };
332 }
333
334 drop(_entered);
335
336 Self {
337 span,
338 info: RequestInfo {
339 protocol,
340 request_id,
341 client_ip
342 }
343 }
344 }
345}
346
347pub trait LayerContext: Clone + Send + Sync {
349 const INSPECT_HEADERS: &'static [&'static http::HeaderName];
351
352 #[allow(unused)]
353 #[inline(always)]
354 fn extract_client_ip(&self, span: &tracing::Span, parts: &http::request::Parts) -> Option<IpAddr> {
358 None
359 }
360
361 #[allow(unused)]
362 #[inline(always)]
363 fn on_request<T>(&self, span: &tracing::Span, request: &http::Request<T>) {
367 }
368
369 #[allow(unused)]
370 #[inline(always)]
371 fn on_response_ok<T>(&self, span: &tracing::Span, response: &mut http::Response<T>) {
375 }
376
377 #[allow(unused)]
378 #[inline(always)]
379 fn on_response_error(&self, span: &tracing::Span, error: &impl std::error::Error) {
383 }
384}
385
386#[derive(Copy, Clone)]
387pub struct Noop;
389impl LayerContext for Noop {
390 const INSPECT_HEADERS: &'static [&'static http::HeaderName] = &[];
391}
392
393impl<I: LayerContext> LayerContext for Box<I> {
394 const INSPECT_HEADERS: &'static [&'static http::HeaderName] = I::INSPECT_HEADERS;
395
396 #[inline(always)]
397 fn on_request<T>(&self, span: &tracing::Span, request: &http::Request<T>) {
398 I::on_request(self, span, request)
399 }
400
401 #[inline(always)]
402 fn on_response_ok<T>(&self, span: &tracing::Span, response: &mut http::Response<T>) {
403 I::on_response_ok(self, span, response)
404 }
405
406 #[inline(always)]
407 fn on_response_error(&self, span: &tracing::Span, error: &impl std::error::Error) {
408 I::on_response_error(self, span, error)
409 }
410
411 #[inline(always)]
412 fn extract_client_ip(&self, span: &tracing::Span, parts: &http::request::Parts) -> Option<IpAddr> {
413 I::extract_client_ip(self, span, parts)
414 }
415}
416
417impl<I: LayerContext> LayerContext for std::sync::Arc<I> {
418 const INSPECT_HEADERS: &'static [&'static http::HeaderName] = I::INSPECT_HEADERS;
419
420 #[inline(always)]
421 fn on_request<T>(&self, span: &tracing::Span, request: &http::Request<T>) {
422 I::on_request(self, span, request)
423 }
424
425 #[inline(always)]
426 fn on_response_ok<T>(&self, span: &tracing::Span, response: &mut http::Response<T>) {
427 I::on_response_ok(self, span, response)
428 }
429
430 #[inline(always)]
431 fn on_response_error(&self, span: &tracing::Span, error: &impl std::error::Error) {
432 I::on_response_error(self, span, error)
433 }
434
435 #[inline(always)]
436 fn extract_client_ip(&self, span: &tracing::Span, parts: &http::request::Parts) -> Option<IpAddr> {
437 I::extract_client_ip(self, span, parts)
438 }
439}
440
441#[derive(Clone)]
442pub struct HttpRequestLayer<C: LayerContext = Noop> {
444 make_span: MakeSpan,
445 context: C,
446}
447
448impl HttpRequestLayer {
449 #[inline]
450 pub fn new_simple(make_span: MakeSpan) -> Self {
452 Self {
453 make_span,
454 context: Noop
455 }
456 }
457}
458
459impl<C: LayerContext> HttpRequestLayer<C> {
460 #[inline]
461 pub fn new(make_span: MakeSpan, context: C) -> Self {
463 Self {
464 make_span,
465 context,
466 }
467 }
468
469 #[inline]
470 pub fn with_context<C2: LayerContext>(self, context: C2) -> HttpRequestLayer<C2> {
472 HttpRequestLayer {
473 make_span: self.make_span,
474 context
475 }
476 }
477}
478
479impl<S, C: LayerContext> tower_layer::Layer<S> for HttpRequestLayer<C> {
480 type Service = HttpRequestService<S, C>;
481 #[inline(always)]
482 fn layer(&self, inner: S) -> Self::Service {
483 HttpRequestService {
484 layer: self.clone(),
485 inner,
486 }
487 }
488}
489
490pub struct HttpRequestService<S, C: LayerContext> {
492 layer: HttpRequestLayer<C>,
493 inner: S
494}
495
496impl<C: LayerContext, ReqBody, ResBody, S: tower_service::Service<http::Request<ReqBody>, Response = http::Response<ResBody>>> tower_service::Service<http::Request<ReqBody>> for HttpRequestService<S, C> where S::Error: std::error::Error {
497 type Response = S::Response;
498 type Error = S::Error;
499 type Future = ResponseFut<S::Future, C>;
500
501 #[inline(always)]
502 fn poll_ready(&mut self, ctx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
503 self.inner.poll_ready(ctx)
504 }
505
506 fn call(&mut self, req: http::Request<ReqBody>) -> Self::Future {
507 let (parts, body) = req.into_parts();
508 let RequestSpan { span, info } = RequestSpan::new(&self.layer.context, (self.layer.make_span)(), &parts);
509
510 let mut req = http::Request::from_parts(parts, body);
511 self.layer.context.on_request(&span, &req);
512 #[cfg(feature = "opentelemetry")]
513 opentelemetry::on_request(&span, &req);
514 #[cfg(feature = "datadog")]
515 datadog::on_request(&span, &req);
516
517 let _entered = span.enter();
518 if !C::INSPECT_HEADERS.is_empty() {
519 span.record("http.headers", tracing::field::debug(headers::InspectHeaders {
520 header_list: C::INSPECT_HEADERS,
521 headers: req.headers()
522 }));
523 }
524 let request_id = info.request_id.clone();
525 let protocol = info.protocol;
526 req.extensions_mut().insert(info);
527
528 let inner = self.inner.call(req);
529
530 drop(_entered);
531 ResponseFut {
532 inner,
533 context: self.layer.context.clone(),
534 span,
535 protocol,
536 request_id
537 }
538 }
539}
540
541pub struct ResponseFut<F, C> {
543 inner: F,
544 context: C,
545 span: tracing::Span,
546 protocol: Protocol,
547 request_id: RequestId,
548}
549
550impl<C: LayerContext, ResBody, E: std::error::Error, F: Future<Output = Result<http::Response<ResBody>, E>>> Future for ResponseFut<F, C> {
551 type Output = F::Output;
552
553 fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
554 let (fut, context, span, protocol, request_id) = unsafe {
555 let this = self.get_unchecked_mut();
556 (
557 Pin::new_unchecked(&mut this.inner),
558 &this.context,
559 &this.span,
560 this.protocol,
561 &this.request_id,
562 )
563 };
564 let _entered = span.enter();
565 match Future::poll(fut, ctx) {
566 task::Poll::Ready(Ok(mut resp)) => {
567 if let Ok(request_id) = http::HeaderValue::from_bytes(request_id.as_bytes()) {
568 resp.headers_mut().insert(REQUEST_ID, request_id);
569 }
570 let status = match protocol {
571 Protocol::Http => resp.status().as_u16(),
572 Protocol::Grpc => match resp.headers().get("grpc-status") {
573 Some(status) => grpc::parse_grpc_status(status.as_bytes()),
574 None => 2,
575 }
576 };
577 span.record("http.response.status_code", status);
578
579 context.on_response_ok(&span, &mut resp);
580 #[cfg(feature = "opentelemetry")]
581 opentelemetry::on_response_ok(&span, &mut resp);
582 #[cfg(feature = "datadog")]
583 datadog::on_response_ok(&span, &mut resp);
584
585 task::Poll::Ready(Ok(resp))
586 }
587 task::Poll::Ready(Err(error)) => {
588 let status = match protocol {
589 Protocol::Http => 500u16,
590 Protocol::Grpc => 13,
591 };
592 span.record("http.response.status_code", status);
593 span.record("error.type", core::any::type_name::<E>());
594 span.record("error.message", tracing::field::display(&error));
595
596 context.on_response_error(&span, &error);
597 #[cfg(feature = "opentelemetry")]
598 opentelemetry::on_response_error(&span, &error);
599 #[cfg(feature = "datadog")]
600 datadog::on_response_error(&span, &error);
601
602 task::Poll::Ready(Err(error))
603 },
604 task::Poll::Pending => task::Poll::Pending
605 }
606 }
607}