lxy 0.1.1

A convenient async http and RPC framework in Rust
Documentation
//! gRPC implementation of OpenTelemetry tracing layer.

use std::{
  future::Future,
  pin::Pin,
  task::{Context, Poll},
};

use http::Request;
use tonic::body::Body;
use tower::Service;
use tracing::{Instrument, Span};

use crate::telemetry::TracingService;

type GrpcRequest = Request<Body>;

impl<S> Service<GrpcRequest> for TracingService<S>
where
  S: Service<GrpcRequest> + Clone + Send + 'static,
  S::Future: Send,
{
  type Response = S::Response;
  type Error = S::Error;
  type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

  fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
    self.inner.poll_ready(cx)
  }

  fn call(&mut self, req: GrpcRequest) -> Self::Future {
    let inner = self.inner.clone();
    let mut inner = std::mem::replace(&mut self.inner, inner);

    let path = req.uri().path().to_string();

    let span = tracing::info_span!(
      "grpc.request",
      rpc.system = "grpc",
      rpc.service = tracing::field::Empty,
      rpc.method = tracing::field::Empty,
      rpc.grpc.status_code = tracing::field::Empty,
      rpc.latency_ms = tracing::field::Empty,
      otel.kind = "server",
      otel.status_code = tracing::field::Empty,
    );

    if let Some((service, method_name)) = parse_grpc_path(&path) {
      span.record("rpc.service", service);
      span.record("rpc.method", method_name);
    }

    Box::pin(
      async move {
        let (result, latency_ms) = crate::telemetry::with_timing(inner.call(req)).await;

        let span = Span::current();
        span.record("rpc.latency_ms", latency_ms);

        match &result {
          Ok(_) => {
            span.record("rpc.grpc.status_code", 0_i64);
            span.record("otel.status_code", "OK");
          }
          Err(_) => {
            span.record("rpc.grpc.status_code", 2_i64);
            span.record("otel.status_code", "ERROR");
          }
        }

        result
      }
      .instrument(span),
    )
  }
}

fn parse_grpc_path(path: &str) -> Option<(&str, &str)> {
  let path = path.strip_prefix('/')?;
  let (service, method) = path.split_once('/')?;
  Some((service, method))
}

#[cfg(test)]
mod tests {
  use super::*;
  use crate::telemetry::tracing_layer::TracingLayer;
  use tower::{Layer, ServiceExt};

  #[tokio::test]
  async fn test_grpc_tracing_layer() {
    let layer = TracingLayer;
    let service = layer.layer(tower::service_fn(|_req: Request<Body>| async {
      Ok::<_, std::convert::Infallible>(http::Response::new(Body::empty()))
    }));

    let req = Request::builder()
      .uri("/test.Service/Method")
      .method("POST")
      .body(Body::empty())
      .unwrap();

    let response = service.oneshot(req).await.unwrap();
    assert_eq!(response.status(), 200);
  }

  #[test]
  fn test_parse_grpc_path() {
    assert_eq!(
      parse_grpc_path("/grpc.health.v1.Health/Check"),
      Some(("grpc.health.v1.Health", "Check"))
    );
    assert_eq!(
      parse_grpc_path("/Service/Method"),
      Some(("Service", "Method"))
    );
    assert_eq!(parse_grpc_path("invalid"), None);
  }
}