use std::{fmt, time::Duration};
use tracing::Level;
pub use self::{
body::ResponseBody,
future::ResponseFuture,
layer::TraceLayer,
make_span::{DefaultMakeSpan, MakeSpan},
on_body_chunk::{DefaultOnBodyChunk, OnBodyChunk},
on_eos::{DefaultOnEos, OnEos},
on_failure::{DefaultOnFailure, OnFailure},
on_request::{DefaultOnRequest, OnRequest},
on_response::{DefaultOnResponse, OnResponse},
service::Trace,
};
use crate::{
classify::{GrpcErrorsAsFailures, ServerErrorsAsFailures, SharedClassifier},
LatencyUnit,
};
pub type HttpMakeClassifier = SharedClassifier<ServerErrorsAsFailures>;
pub type GrpcMakeClassifier = SharedClassifier<GrpcErrorsAsFailures>;
macro_rules! event_dynamic_lvl {
( target: $target:expr, parent: $parent:expr, $lvl:expr, $($tt:tt)* ) => {
match $lvl {
tracing::Level::ERROR => {
tracing::event!(target: $target, parent: $parent, tracing::Level::ERROR, $($tt)*);
}
tracing::Level::WARN => {
tracing::event!(target: $target, parent: $parent, tracing::Level::WARN, $($tt)*);
}
tracing::Level::INFO => {
tracing::event!(target: $target, parent: $parent, tracing::Level::INFO, $($tt)*);
}
tracing::Level::DEBUG => {
tracing::event!(target: $target, parent: $parent, tracing::Level::DEBUG, $($tt)*);
}
tracing::Level::TRACE => {
tracing::event!(target: $target, parent: $parent, tracing::Level::TRACE, $($tt)*);
}
}
};
( target: $target:expr, $lvl:expr, $($tt:tt)* ) => {
match $lvl {
tracing::Level::ERROR => {
tracing::event!(target: $target, tracing::Level::ERROR, $($tt)*);
}
tracing::Level::WARN => {
tracing::event!(target: $target, tracing::Level::WARN, $($tt)*);
}
tracing::Level::INFO => {
tracing::event!(target: $target, tracing::Level::INFO, $($tt)*);
}
tracing::Level::DEBUG => {
tracing::event!(target: $target, tracing::Level::DEBUG, $($tt)*);
}
tracing::Level::TRACE => {
tracing::event!(target: $target, tracing::Level::TRACE, $($tt)*);
}
}
};
( parent: $parent:expr, $lvl:expr, $($tt:tt)* ) => {
match $lvl {
tracing::Level::ERROR => {
tracing::event!(parent: $parent, tracing::Level::ERROR, $($tt)*);
}
tracing::Level::WARN => {
tracing::event!(parent: $parent, tracing::Level::WARN, $($tt)*);
}
tracing::Level::INFO => {
tracing::event!(parent: $parent, tracing::Level::INFO, $($tt)*);
}
tracing::Level::DEBUG => {
tracing::event!(parent: $parent, tracing::Level::DEBUG, $($tt)*);
}
tracing::Level::TRACE => {
tracing::event!(parent: $parent, tracing::Level::TRACE, $($tt)*);
}
}
};
( $lvl:expr, $($tt:tt)* ) => {
match $lvl {
tracing::Level::ERROR => {
tracing::event!(tracing::Level::ERROR, $($tt)*);
}
tracing::Level::WARN => {
tracing::event!(tracing::Level::WARN, $($tt)*);
}
tracing::Level::INFO => {
tracing::event!(tracing::Level::INFO, $($tt)*);
}
tracing::Level::DEBUG => {
tracing::event!(tracing::Level::DEBUG, $($tt)*);
}
tracing::Level::TRACE => {
tracing::event!(tracing::Level::TRACE, $($tt)*);
}
}
};
}
mod body;
mod future;
mod layer;
mod make_span;
mod on_body_chunk;
mod on_eos;
mod on_failure;
mod on_request;
mod on_response;
mod service;
const DEFAULT_MESSAGE_LEVEL: Level = Level::DEBUG;
const DEFAULT_ERROR_LEVEL: Level = Level::ERROR;
struct Latency {
unit: LatencyUnit,
duration: Duration,
}
impl fmt::Display for Latency {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.unit {
LatencyUnit::Seconds => write!(f, "{} s", self.duration.as_secs_f64()),
LatencyUnit::Millis => write!(f, "{} ms", self.duration.as_millis()),
LatencyUnit::Micros => write!(f, "{} μs", self.duration.as_micros()),
LatencyUnit::Nanos => write!(f, "{} ns", self.duration.as_nanos()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::classify::ServerErrorsFailureClass;
use crate::test_helpers::Body;
use bytes::Bytes;
use http::{HeaderMap, Request, Response};
use once_cell::sync::Lazy;
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use tower::{BoxError, Service, ServiceBuilder, ServiceExt};
use tracing::Span;
#[tokio::test]
async fn unary_request() {
static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new_for_http()
.make_span_with(|_req: &Request<Body>| {
tracing::info_span!("test-span", foo = tracing::field::Empty)
})
.on_request(|_req: &Request<Body>, span: &Span| {
span.record("foo", 42);
ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
})
.on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
})
.on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
})
.on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
)
.on_failure(
|_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
ON_FAILURE.fetch_add(1, Ordering::SeqCst);
},
);
let mut svc = ServiceBuilder::new().layer(trace_layer).service_fn(echo);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::from("foobar")))
.await
.unwrap();
assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
assert_eq!(1, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
}
#[tokio::test]
async fn streaming_response() {
static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new_for_http()
.on_request(|_req: &Request<Body>, _span: &Span| {
ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
})
.on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
})
.on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
})
.on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
)
.on_failure(
|_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
ON_FAILURE.fetch_add(1, Ordering::SeqCst);
},
);
let mut svc = ServiceBuilder::new()
.layer(trace_layer)
.service_fn(streaming_body);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::empty()))
.await
.unwrap();
assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
assert_eq!(3, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
}
#[tokio::test]
async fn classify_eos_on_trailers_success() {
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new(TestClassify::new(false))
.on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
)
.on_failure(|_class: &'static str, _latency: Duration, _span: &Span| {
ON_FAILURE.fetch_add(1, Ordering::SeqCst);
});
let mut svc = ServiceBuilder::new()
.layer(trace_layer)
.service_fn(body_with_trailers);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::empty()))
.await
.unwrap();
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
}
#[tokio::test]
async fn classify_eos_on_trailers_failure() {
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new(TestClassify::new(true))
.on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
)
.on_failure(|_class: &'static str, _latency: Duration, _span: &Span| {
ON_FAILURE.fetch_add(1, Ordering::SeqCst);
});
let mut svc = ServiceBuilder::new()
.layer(trace_layer)
.service_fn(body_with_trailers);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::empty()))
.await
.unwrap();
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
assert_eq!(1, ON_FAILURE.load(Ordering::SeqCst), "failure");
}
#[tokio::test]
async fn classify_eos_on_empty_stream() {
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new(TestClassify::new(true))
.on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
)
.on_failure(|_class: &'static str, _latency: Duration, _span: &Span| {
ON_FAILURE.fetch_add(1, Ordering::SeqCst);
});
let mut svc = ServiceBuilder::new()
.layer(trace_layer)
.service_fn(streaming_body);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::empty()))
.await
.unwrap();
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
assert_eq!(1, ON_FAILURE.load(Ordering::SeqCst), "failure");
}
#[tokio::test]
async fn on_eos_fires_for_content_length_body() {
use http_body_util::BodyExt;
static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new_for_http()
.on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
})
.on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
);
let mut svc = ServiceBuilder::new().layer(trace_layer).service_fn(echo);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::from("hello")))
.await
.unwrap();
let mut body = res.into_body();
let frame = body.frame().await.unwrap().unwrap();
assert!(frame.data_ref().is_some());
assert_eq!(1, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
}
#[tokio::test]
async fn on_eos_fires_for_streaming_body_on_none() {
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new_for_http().on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
);
let mut svc = ServiceBuilder::new()
.layer(trace_layer)
.service_fn(streaming_body);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::empty()))
.await
.unwrap();
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
}
#[tokio::test]
async fn on_eos_not_called_twice() {
static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
let trace_layer = TraceLayer::new_for_http().on_eos(
|_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
ON_EOS.fetch_add(1, Ordering::SeqCst);
},
);
let mut svc = ServiceBuilder::new().layer(trace_layer).service_fn(echo);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::from("hello")))
.await
.unwrap();
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
assert_eq!(1, ON_EOS.load(Ordering::SeqCst), "eos");
}
async fn echo(req: Request<Body>) -> Result<Response<Body>, BoxError> {
Ok(Response::new(req.into_body()))
}
async fn streaming_body(_req: Request<Body>) -> Result<Response<Body>, BoxError> {
use futures_util::stream::iter;
let stream = iter(vec![
Ok::<_, BoxError>(Bytes::from("one")),
Ok::<_, BoxError>(Bytes::from("two")),
Ok::<_, BoxError>(Bytes::from("three")),
]);
let body = Body::from_stream(stream);
Ok(Response::new(body))
}
async fn body_with_trailers(_req: Request<Body>) -> Result<Response<Body>, BoxError> {
let mut trailers = HeaderMap::new();
trailers.insert("x-test-trailer", "value".parse().unwrap());
let body = Body::new(Body::from(Bytes::from("data")).with_trailers(trailers));
Ok(Response::new(body))
}
#[derive(Clone)]
struct TestClassify {
reject: bool,
}
impl TestClassify {
fn new(reject: bool) -> Self {
Self { reject }
}
}
impl crate::classify::MakeClassifier for TestClassify {
type FailureClass = &'static str;
type ClassifyEos = TestClassifyEos;
type Classifier = TestClassifyResponse;
fn make_classifier<B>(&self, _req: &Request<B>) -> Self::Classifier {
TestClassifyResponse {
reject: self.reject,
}
}
}
#[derive(Clone)]
struct TestClassifyResponse {
reject: bool,
}
impl crate::classify::ClassifyResponse for TestClassifyResponse {
type FailureClass = &'static str;
type ClassifyEos = TestClassifyEos;
fn classify_response<B>(
self,
_res: &Response<B>,
) -> crate::classify::ClassifiedResponse<Self::FailureClass, Self::ClassifyEos> {
crate::classify::ClassifiedResponse::RequiresEos(TestClassifyEos {
reject: self.reject,
})
}
fn classify_error<E>(self, _error: &E) -> Self::FailureClass
where
E: std::fmt::Display + 'static,
{
"error"
}
}
#[derive(Clone)]
struct TestClassifyEos {
reject: bool,
}
impl crate::classify::ClassifyEos for TestClassifyEos {
type FailureClass = &'static str;
fn classify_eos(self, _trailers: Option<&HeaderMap>) -> Result<(), Self::FailureClass> {
if self.reject {
Err("classified as failure")
} else {
Ok(())
}
}
fn classify_error<E>(self, _error: &E) -> Self::FailureClass
where
E: std::fmt::Display + 'static,
{
"error"
}
}
#[test]
fn events_have_span_context_when_span_is_disabled() {
use std::sync::{Arc, Mutex};
use tracing::subscriber::with_default;
use tracing_subscriber::{layer::SubscriberExt, registry::LookupSpan, Layer as _};
struct DisableSpansFilter;
impl<S: tracing::Subscriber> tracing_subscriber::layer::Filter<S> for DisableSpansFilter {
fn enabled(
&self,
meta: &tracing::Metadata<'_>,
_cx: &tracing_subscriber::layer::Context<'_, S>,
) -> bool {
!meta.is_span()
}
}
#[derive(Clone)]
struct RecordingLayer {
events: Arc<Mutex<Vec<(String, bool)>>>,
}
impl<S> tracing_subscriber::Layer<S> for RecordingLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_event(
&self,
event: &tracing::Event<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut msg = String::new();
event.record(&mut MessageVisitor(&mut msg));
let has_parent = event.parent().is_some() || ctx.event_span(event).is_some();
self.events.lock().unwrap().push((msg, has_parent));
}
}
struct MessageVisitor<'a>(&'a mut String);
impl tracing::field::Visit for MessageVisitor<'_> {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
*self.0 = format!("{:?}", value);
}
}
}
let events = Arc::new(Mutex::new(Vec::new()));
let layer = RecordingLayer {
events: events.clone(),
};
let subscriber = tracing_subscriber::registry().with(layer.with_filter(DisableSpansFilter));
with_default(subscriber, || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let mut svc = ServiceBuilder::new()
.layer(TraceLayer::new_for_http())
.service_fn(echo);
let res = svc
.ready()
.await
.unwrap()
.call(Request::new(Body::from("test")))
.await
.unwrap();
crate::test_helpers::to_bytes(res.into_body())
.await
.unwrap();
});
});
let events = events.lock().unwrap();
let request_events: Vec<_> = events
.iter()
.filter(|(msg, _)| {
msg.contains("started processing request")
|| msg.contains("finished processing request")
})
.collect();
assert!(
request_events.len() >= 2,
"expected on_request and on_response events to fire"
);
for (msg, has_parent) in &request_events {
assert!(
*has_parent,
"event {:?} has no span context. When the request span is \
disabled by a filter, events must still reference it via \
explicit parent so subscribers can associate them correctly.",
msg
);
}
}
}