lxy 0.1.1

A convenient async http and RPC framework in Rust
Documentation
//! HTTP implementation of OpenTelemetry tracing layer.

use std::{
  future::Future,
  pin::Pin,
  task::{Context, Poll},
};

use axum::response::IntoResponse;
use axum::{body::Body, response::Response};
use http::Request;
use tower::Service;
use tracing::{Instrument, Span};

use crate::telemetry::TracingService;

type HttpRequest = Request<Body>;

impl<S> Service<HttpRequest> for TracingService<S>
where
  S: Service<HttpRequest> + Clone + Send + 'static,
  S::Response: IntoResponse,
  S::Future: Send,
{
  type Response = Response;
  type Error = S::Error;
  type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

  fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    self.inner.poll_ready(cx)
  }

  fn call(&mut self, req: HttpRequest) -> Self::Future {
    let inner = self.inner.clone();
    let mut inner = std::mem::replace(&mut self.inner, inner);

    let method = req.method().clone();
    let path = req.uri().path().to_string();
    let version = format!("{:?}", req.version());

    let span = tracing::info_span!(
      "http.request",
      http.method = %method,
      http.route = %path,
      http.flavor = %version,
      http.status_code = tracing::field::Empty,
      http.latency_ms = tracing::field::Empty,
      otel.kind = "server",
      otel.status_code = tracing::field::Empty,
    );

    Box::pin(
      async move {
        let (result, latency_ms) = crate::telemetry::with_timing(inner.call(req)).await;

        match result {
          Ok(response) => {
            let response = response.into_response();
            let status = response.status();
            record_http_response(&Span::current(), status.as_u16(), latency_ms);
            Ok(response)
          }
          Err(err) => {
            record_http_error(&Span::current(), latency_ms);
            Err(err)
          }
        }
      }
      .instrument(span),
    )
  }
}

fn record_http_response(span: &Span, status_code: u16, latency_ms: i64) {
  span.record("http.status_code", status_code as i64);
  span.record("http.latency_ms", latency_ms);

  let otel_status = if status_code >= 400 { "ERROR" } else { "OK" };
  span.record("otel.status_code", otel_status);
}

fn record_http_error(span: &Span, latency_ms: i64) {
  span.record("http.status_code", 500_i64);
  span.record("http.latency_ms", latency_ms);
  span.record("otel.status_code", "ERROR");
}

#[cfg(test)]
mod tests {
  use tower::{Layer, ServiceExt};

  use super::*;
  use crate::telemetry::TracingLayer;

  #[tokio::test]
  async fn test_http_tracing_layer() {
    let layer = TracingLayer;
    let service = layer.layer(tower::service_fn(|_req: Request<Body>| async {
      Ok::<_, std::convert::Infallible>("OK".into_response())
    }));

    let req = Request::builder()
      .uri("/test")
      .method("GET")
      .body(Body::empty())
      .unwrap();

    let response = service.oneshot(req).await.unwrap();
    assert_eq!(response.status(), 200);
  }
}