use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use http::Request;
use tonic::body::Body;
use tower::Service;
use tracing::{Instrument, Span};
use crate::telemetry::TracingService;
type GrpcRequest = Request<Body>;
impl<S> Service<GrpcRequest> for TracingService<S>
where
S: Service<GrpcRequest> + Clone + Send + 'static,
S::Future: Send,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: GrpcRequest) -> Self::Future {
let inner = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, inner);
let path = req.uri().path().to_string();
let span = tracing::info_span!(
"grpc.request",
rpc.system = "grpc",
rpc.service = tracing::field::Empty,
rpc.method = tracing::field::Empty,
rpc.grpc.status_code = tracing::field::Empty,
rpc.latency_ms = tracing::field::Empty,
otel.kind = "server",
otel.status_code = tracing::field::Empty,
);
if let Some((service, method_name)) = parse_grpc_path(&path) {
span.record("rpc.service", service);
span.record("rpc.method", method_name);
}
Box::pin(
async move {
let (result, latency_ms) = crate::telemetry::with_timing(inner.call(req)).await;
let span = Span::current();
span.record("rpc.latency_ms", latency_ms);
match &result {
Ok(_) => {
span.record("rpc.grpc.status_code", 0_i64);
span.record("otel.status_code", "OK");
}
Err(_) => {
span.record("rpc.grpc.status_code", 2_i64);
span.record("otel.status_code", "ERROR");
}
}
result
}
.instrument(span),
)
}
}
fn parse_grpc_path(path: &str) -> Option<(&str, &str)> {
let path = path.strip_prefix('/')?;
let (service, method) = path.split_once('/')?;
Some((service, method))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::telemetry::tracing_layer::TracingLayer;
use tower::{Layer, ServiceExt};
#[tokio::test]
async fn test_grpc_tracing_layer() {
let layer = TracingLayer;
let service = layer.layer(tower::service_fn(|_req: Request<Body>| async {
Ok::<_, std::convert::Infallible>(http::Response::new(Body::empty()))
}));
let req = Request::builder()
.uri("/test.Service/Method")
.method("POST")
.body(Body::empty())
.unwrap();
let response = service.oneshot(req).await.unwrap();
assert_eq!(response.status(), 200);
}
#[test]
fn test_parse_grpc_path() {
assert_eq!(
parse_grpc_path("/grpc.health.v1.Health/Check"),
Some(("grpc.health.v1.Health", "Check"))
);
assert_eq!(
parse_grpc_path("/Service/Method"),
Some(("Service", "Method"))
);
assert_eq!(parse_grpc_path("invalid"), None);
}
}