tower_async_http/trace/mod.rs
1//! Middleware that adds high level [tracing] to a [`Service`].
2//!
3//! # Example
4//!
5//! Adding tracing to your service can be as simple as:
6//!
7//! ```rust
8//! use http::{Request, Response};
9//! use http_body_util::Full;
10//! use bytes::Bytes;
11//! use tower_async::{ServiceBuilder, Service};
12//! use tower_async_http::trace::TraceLayer;
13//! use std::convert::Infallible;
14//!
15//! async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
16//! Ok(Response::new(Full::from("foo")))
17//! }
18//!
19//! # #[tokio::main]
20//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//! // Setup tracing
22//! tracing_subscriber::fmt::init();
23//!
24//! let mut service = ServiceBuilder::new()
25//! .layer(TraceLayer::new_for_http())
26//! .service_fn(handle);
27//!
28//! let request = Request::new(Full::from("foo"));
29//!
30//! let response = service
31//! .call(request)
32//! .await?;
33//! # Ok(())
34//! # }
35//! ```
36//!
37//! If you run this application with `RUST_LOG=tower_http=trace cargo run` you should see logs like:
38//!
39//! ```text
40//! Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_async_http::trace::on_request: started processing request
41//! Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_async_http::trace::on_response: finished processing request latency=1 ms status=200
42//! ```
43//!
44//! # Customization
45//!
46//! [`Trace`] comes with good defaults but also supports customizing many aspects of the output.
47//!
48//! The default behaviour supports some customization:
49//!
50//! ```rust
51//! use http::{Request, Response, HeaderMap, StatusCode};
52//! use http_body_util::Full;
53//! use bytes::Bytes;
54//! use tower_async::ServiceBuilder;
55//! use tracing::Level;
56//! use tower_async_http::{
57//! LatencyUnit,
58//! trace::{TraceLayer, DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse},
59//! };
60//! use std::time::Duration;
61//! # use tower_async::Service;
62//! # use std::convert::Infallible;
63//!
64//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
65//! # Ok(Response::new(Full::from("foo")))
66//! # }
67//! # #[tokio::main]
68//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
69//! # tracing_subscriber::fmt::init();
70//! #
71//! let service = ServiceBuilder::new()
72//! .layer(
73//! TraceLayer::new_for_http()
74//! .make_span_with(
75//! DefaultMakeSpan::new().include_headers(true)
76//! )
77//! .on_request(
78//! DefaultOnRequest::new().level(Level::INFO)
79//! )
80//! .on_response(
81//! DefaultOnResponse::new()
82//! .level(Level::INFO)
83//! .latency_unit(LatencyUnit::Micros)
84//! )
85//! // on so on for `on_eos`, `on_body_chunk`, and `on_failure`
86//! )
87//! .service_fn(handle);
88//! # let mut service = service;
89//! # let response = service
90//! # .call(Request::new(Full::from("foo")))
91//! # .await?;
92//! # Ok(())
93//! # }
94//! ```
95//!
96//! However for maximum control you can provide callbacks:
97//!
98//! ```rust
99//! use http::{Request, Response, HeaderMap, StatusCode};
100//! use http_body_util::Full;
101//! use bytes::Bytes;
102//! use tower_async::ServiceBuilder;
103//! use tower_async_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
104//! use std::time::Duration;
105//! use tracing::Span;
106//! # use tower_async::Service;
107//! # use std::convert::Infallible;
108//!
109//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
110//! # Ok(Response::new(Full::from("foo")))
111//! # }
112//! # #[tokio::main]
113//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
114//! # tracing_subscriber::fmt::init();
115//! #
116//! let service = ServiceBuilder::new()
117//! .layer(
118//! TraceLayer::new_for_http()
119//! .make_span_with(|request: &Request<Full<Bytes>>| {
120//! tracing::debug_span!("http-request")
121//! })
122//! .on_request(|request: &Request<Full<Bytes>>, _span: &Span| {
123//! tracing::debug!("started {} {}", request.method(), request.uri().path())
124//! })
125//! .on_response(|response: &Response<Full<Bytes>>, latency: Duration, _span: &Span| {
126//! tracing::debug!("response generated in {:?}", latency)
127//! })
128//! .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
129//! tracing::debug!("sending {} bytes", chunk.len())
130//! })
131//! .on_eos(|trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
132//! tracing::debug!("stream closed after {:?}", stream_duration)
133//! })
134//! .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
135//! tracing::debug!("something went wrong")
136//! })
137//! )
138//! .service_fn(handle);
139//! # let mut service = service;
140//! # let response = service
141//! # .call(Request::new(Full::from("foo")))
142//! # .await?;
143//! # Ok(())
144//! # }
145//! ```
146//!
147//! ## Disabling something
148//!
149//! Setting the behaviour to `()` will be disable that particular step:
150//!
151//! ```rust
152//! use http::StatusCode;
153//! use tower_async::ServiceBuilder;
154//! use tower_async_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
155//! use std::time::Duration;
156//! use tracing::Span;
157//! # use tower_async::Service;
158//! # use http_body_util::Full;
159//! # use bytes::Bytes;
160//! # use http::{Response, Request};
161//! # use std::convert::Infallible;
162//! #
163//! # type Body = Full<Bytes>;
164//!
165//! # async fn handle(request: Request<Body>) -> Result<Response<Body>, Infallible> {
166//! # Ok(Response::new(Body::from("foo")))
167//! # }
168//! # #[tokio::main]
169//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
170//! # tracing_subscriber::fmt::init();
171//! #
172//! let service = ServiceBuilder::new()
173//! .layer(
174//! // This configuration will only emit events on failures
175//! TraceLayer::new_for_http()
176//! .on_request(())
177//! .on_response(())
178//! .on_body_chunk(())
179//! .on_eos(())
180//! .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
181//! tracing::debug!("something went wrong")
182//! })
183//! )
184//! .service_fn(handle);
185//! # let mut service = service;
186//! # let response = service
187//! # .call(Request::new(Body::from("foo")))
188//! # .await?;
189//! # Ok(())
190//! # }
191//! ```
192//!
193//! # When the callbacks are called
194//!
195//! ### `on_request`
196//!
197//! The `on_request` callback is called when the request arrives at the
198//! middleware in [`Service::call`] just prior to passing the request to the
199//! inner service.
200//!
201//! ### `on_response`
202//!
203//! The `on_response` callback is called when the inner service's response
204//! future completes with `Ok(response)` regardless if the response is
205//! classified as a success or a failure.
206//!
207//! For example if you're using [`ServerErrorsAsFailures`] as your classifier
208//! and the inner service responds with `500 Internal Server Error` then the
209//! `on_response` callback is still called. `on_failure` would _also_ be called
210//! in this case since the response was classified as a failure.
211//!
212//! ### `on_body_chunk`
213//!
214//! The `on_body_chunk` callback is called when the response body produces a new
215//! chunk, that is when [`http_body::Body::poll_frame`] returns `Poll::Ready(Some(Ok(chunk)))`.
216//!
217//! `on_body_chunk` is called even if the chunk is empty.
218//!
219//! ### `on_eos`
220//!
221//! The `on_eos` callback is called when a streaming response body ends, that is
222//! when `http_body::Body::poll_frame` returns `Poll::Ready(None)`.
223//!
224//! `on_eos` is called even if the trailers produced are `None`.
225//!
226//! ### `on_failure`
227//!
228//! The `on_failure` callback is called when:
229//!
230//! - The inner [`Service`]'s response future resolves to an error.
231//! - A response is classified as a failure.
232//! - [`http_body::Body::poll_frame`] returns an error.
233//! - An end-of-stream is classified as a failure.
234//!
235//! # Recording fields on the span
236//!
237//! All callbacks receive a reference to the [tracing] [`Span`], corresponding to this request,
238//! produced by the closure passed to [`TraceLayer::make_span_with`]. It can be used to [record
239//! field values][record] that weren't known when the span was created.
240//!
241//! ```rust
242//! use http::{Request, Response, HeaderMap, StatusCode};
243//! use http_body_util::Full;
244//! use bytes::Bytes;
245//! use tower_async::ServiceBuilder;
246//! use tower_async_http::trace::TraceLayer;
247//! use tracing::Span;
248//! use std::time::Duration;
249//! use std::convert::Infallible;
250//!
251//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
252//! # Ok(Response::new(Full::from("foo")))
253//! # }
254//! # #[tokio::main]
255//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
256//! # tracing_subscriber::fmt::init();
257//! #
258//! let service = ServiceBuilder::new()
259//! .layer(
260//! TraceLayer::new_for_http()
261//! .make_span_with(|request: &Request<Full<Bytes>>| {
262//! tracing::debug_span!(
263//! "http-request",
264//! status_code = tracing::field::Empty,
265//! )
266//! })
267//! .on_response(|response: &Response<Full<Bytes>>, _latency: Duration, span: &Span| {
268//! span.record("status_code", &tracing::field::display(response.status()));
269//!
270//! tracing::debug!("response generated")
271//! })
272//! )
273//! .service_fn(handle);
274//! # Ok(())
275//! # }
276//! ```
277//!
278//! # Providing classifiers
279//!
280//! Tracing requires determining if a response is a success or failure. [`MakeClassifier`] is used
281//! to create a classifier for the incoming request. See the docs for [`MakeClassifier`] and
282//! [`ClassifyResponse`] for more details on classification.
283//!
284//! A [`MakeClassifier`] can be provided when creating a [`TraceLayer`]:
285//!
286//! ```rust
287//! use http::{Request, Response};
288//! use http_body_util::Full;
289//! use bytes::Bytes;
290//! use tower_async::ServiceBuilder;
291//! use tower_async_http::{
292//! trace::TraceLayer,
293//! classify::{
294//! MakeClassifier, ClassifyResponse, ClassifiedResponse, NeverClassifyEos,
295//! SharedClassifier,
296//! },
297//! };
298//! use std::convert::Infallible;
299//!
300//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
301//! # Ok(Response::new(Full::from("foo")))
302//! # }
303//! # #[tokio::main]
304//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
305//! # tracing_subscriber::fmt::init();
306//! #
307//! // Our `MakeClassifier` that always crates `MyClassifier` classifiers.
308//! #[derive(Copy, Clone)]
309//! struct MyMakeClassify;
310//!
311//! impl MakeClassifier for MyMakeClassify {
312//! type Classifier = MyClassifier;
313//! type FailureClass = &'static str;
314//! type ClassifyEos = NeverClassifyEos<&'static str>;
315//!
316//! fn make_classifier<B>(&self, req: &Request<B>) -> Self::Classifier {
317//! MyClassifier
318//! }
319//! }
320//!
321//! // A classifier that classifies failures as `"something went wrong..."`.
322//! #[derive(Copy, Clone)]
323//! struct MyClassifier;
324//!
325//! impl ClassifyResponse for MyClassifier {
326//! type FailureClass = &'static str;
327//! type ClassifyEos = NeverClassifyEos<&'static str>;
328//!
329//! fn classify_response<B>(
330//! self,
331//! res: &Response<B>
332//! ) -> ClassifiedResponse<Self::FailureClass, Self::ClassifyEos> {
333//! // Classify based on the status code.
334//! if res.status().is_server_error() {
335//! ClassifiedResponse::Ready(Err("something went wrong..."))
336//! } else {
337//! ClassifiedResponse::Ready(Ok(()))
338//! }
339//! }
340//!
341//! fn classify_error<E>(self, error: &E) -> Self::FailureClass
342//! where
343//! E: std::fmt::Display,
344//! {
345//! "something went wrong..."
346//! }
347//! }
348//!
349//! let service = ServiceBuilder::new()
350//! // Create a trace layer that uses our classifier.
351//! .layer(TraceLayer::new(MyMakeClassify))
352//! .service_fn(handle);
353//!
354//! // Since `MyClassifier` is `Clone` we can also use `SharedClassifier`
355//! // to avoid having to define a separate `MakeClassifier`.
356//! let service = ServiceBuilder::new()
357//! .layer(TraceLayer::new(SharedClassifier::new(MyClassifier)))
358//! .service_fn(handle);
359//! # Ok(())
360//! # }
361//! ```
362//!
363//! [`TraceLayer`] comes with convenience methods for using common classifiers:
364//!
365//! - [`TraceLayer::new_for_http`] classifies based on the status code. It doesn't consider
366//! streaming responses.
367//! - [`TraceLayer::new_for_grpc`] classifies based on the gRPC protocol and supports streaming
368//! responses.
369//!
370//! [tracing]: https://crates.io/crates/tracing
371//! [`Service`]: tower_async_service::Service
372//! [`Service::call`]: tower_async_service::Service::call
373//! [`MakeClassifier`]: crate::classify::MakeClassifier
374//! [`ClassifyResponse`]: crate::classify::ClassifyResponse
375//! [record]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.record
376//! [`TraceLayer::make_span_with`]: crate::trace::TraceLayer::make_span_with
377//! [`Span`]: tracing::Span
378//! [`ServerErrorsAsFailures`]: crate::classify::ServerErrorsAsFailures
379
380use std::{fmt, time::Duration};
381
382use tracing::Level;
383
384pub use self::{
385 body::ResponseBody,
386 layer::TraceLayer,
387 make_span::{DefaultMakeSpan, MakeSpan},
388 on_body_chunk::{DefaultOnBodyChunk, OnBodyChunk},
389 on_eos::{DefaultOnEos, OnEos},
390 on_failure::{DefaultOnFailure, OnFailure},
391 on_request::{DefaultOnRequest, OnRequest},
392 on_response::{DefaultOnResponse, OnResponse},
393 service::Trace,
394};
395
396use crate::LatencyUnit;
397
398macro_rules! event_dynamic_lvl {
399 ( $(target: $target:expr,)? $(parent: $parent:expr,)? $lvl:expr, $($tt:tt)* ) => {
400 match $lvl {
401 tracing::Level::ERROR => {
402 tracing::event!(
403 $(target: $target,)?
404 $(parent: $parent,)?
405 tracing::Level::ERROR,
406 $($tt)*
407 );
408 }
409 tracing::Level::WARN => {
410 tracing::event!(
411 $(target: $target,)?
412 $(parent: $parent,)?
413 tracing::Level::WARN,
414 $($tt)*
415 );
416 }
417 tracing::Level::INFO => {
418 tracing::event!(
419 $(target: $target,)?
420 $(parent: $parent,)?
421 tracing::Level::INFO,
422 $($tt)*
423 );
424 }
425 tracing::Level::DEBUG => {
426 tracing::event!(
427 $(target: $target,)?
428 $(parent: $parent,)?
429 tracing::Level::DEBUG,
430 $($tt)*
431 );
432 }
433 tracing::Level::TRACE => {
434 tracing::event!(
435 $(target: $target,)?
436 $(parent: $parent,)?
437 tracing::Level::TRACE,
438 $($tt)*
439 );
440 }
441 }
442 };
443}
444
445mod body;
446mod layer;
447mod make_span;
448mod on_body_chunk;
449mod on_eos;
450mod on_failure;
451mod on_request;
452mod on_response;
453mod service;
454
455const DEFAULT_MESSAGE_LEVEL: Level = Level::DEBUG;
456const DEFAULT_ERROR_LEVEL: Level = Level::ERROR;
457
458struct Latency {
459 unit: LatencyUnit,
460 duration: Duration,
461}
462
463impl fmt::Display for Latency {
464 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465 match self.unit {
466 LatencyUnit::Seconds => write!(f, "{} s", self.duration.as_secs_f64()),
467 LatencyUnit::Millis => write!(f, "{} ms", self.duration.as_millis()),
468 LatencyUnit::Micros => write!(f, "{} μs", self.duration.as_micros()),
469 LatencyUnit::Nanos => write!(f, "{} ns", self.duration.as_nanos()),
470 }
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 use crate::classify::ServerErrorsFailureClass;
479 use crate::test_helpers::{self, Body};
480
481 use bytes::Bytes;
482 use http::{HeaderMap, Request, Response};
483 use once_cell::sync::Lazy;
484 use std::{
485 sync::atomic::{AtomicU32, Ordering},
486 time::Duration,
487 };
488 use tower_async::{BoxError, Service, ServiceBuilder};
489 use tracing::Span;
490
491 #[tokio::test]
492 async fn unary_request() {
493 static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
494 static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
495 static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
496 static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
497 static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
498
499 let trace_layer = TraceLayer::new_for_http()
500 .make_span_with(|_req: &Request<Body>| {
501 tracing::info_span!("test-span", foo = tracing::field::Empty)
502 })
503 .on_request(|_req: &Request<Body>, span: &Span| {
504 span.record("foo", 42);
505 ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
506 })
507 .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
508 ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
509 })
510 .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
511 ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
512 })
513 .on_eos(
514 |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
515 ON_EOS.fetch_add(1, Ordering::SeqCst);
516 },
517 )
518 .on_failure(
519 |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
520 ON_FAILURE.fetch_add(1, Ordering::SeqCst);
521 },
522 );
523
524 let svc = ServiceBuilder::new().layer(trace_layer).service_fn(echo);
525
526 let res = svc.call(Request::new(Body::from("foobar"))).await.unwrap();
527
528 assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
529 assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
530 assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
531 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
532 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
533
534 test_helpers::to_bytes(res.into_body()).await.unwrap();
535 assert_eq!(1, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
536 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
537 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
538 }
539
540 #[tokio::test]
541 async fn streaming_response() {
542 static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
543 static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
544 static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
545 static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
546 static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
547
548 let trace_layer = TraceLayer::new_for_http()
549 .on_request(|_req: &Request<Body>, _span: &Span| {
550 ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
551 })
552 .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
553 ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
554 })
555 .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
556 ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
557 })
558 .on_eos(
559 |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
560 ON_EOS.fetch_add(1, Ordering::SeqCst);
561 },
562 )
563 .on_failure(
564 |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
565 ON_FAILURE.fetch_add(1, Ordering::SeqCst);
566 },
567 );
568
569 let svc = ServiceBuilder::new()
570 .layer(trace_layer)
571 .service_fn(streaming_body);
572
573 let res = svc.call(Request::new(Body::empty())).await.unwrap();
574
575 assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
576 assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
577 assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
578 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
579 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
580
581 test_helpers::to_bytes(res.into_body()).await.unwrap();
582 assert_eq!(3, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
583 assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
584 assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
585 }
586
587 async fn echo(req: Request<Body>) -> Result<Response<Body>, BoxError> {
588 Ok(Response::new(req.into_body()))
589 }
590
591 async fn streaming_body(_req: Request<Body>) -> Result<Response<Body>, BoxError> {
592 use futures::stream::iter;
593
594 let stream = iter(vec![
595 Ok::<_, BoxError>(Bytes::from("one")),
596 Ok::<_, BoxError>(Bytes::from("two")),
597 Ok::<_, BoxError>(Bytes::from("three")),
598 ]);
599
600 let body = Body::from_stream(stream);
601
602 Ok(Response::new(body))
603 }
604}