tower_async_http/trace/
mod.rs

1//! Middleware that adds high level [tracing] to a [`Service`].
2//!
3//! # Example
4//!
5//! Adding tracing to your service can be as simple as:
6//!
7//! ```rust
8//! use http::{Request, Response};
9//! use http_body_util::Full;
10//! use bytes::Bytes;
11//! use tower_async::{ServiceBuilder, Service};
12//! use tower_async_http::trace::TraceLayer;
13//! use std::convert::Infallible;
14//!
15//! async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
16//!     Ok(Response::new(Full::from("foo")))
17//! }
18//!
19//! # #[tokio::main]
20//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
21//! // Setup tracing
22//! tracing_subscriber::fmt::init();
23//!
24//! let mut service = ServiceBuilder::new()
25//!     .layer(TraceLayer::new_for_http())
26//!     .service_fn(handle);
27//!
28//! let request = Request::new(Full::from("foo"));
29//!
30//! let response = service
31//!     .call(request)
32//!     .await?;
33//! # Ok(())
34//! # }
35//! ```
36//!
37//! If you run this application with `RUST_LOG=tower_http=trace cargo run` you should see logs like:
38//!
39//! ```text
40//! Mar 05 20:50:28.523 DEBUG request{method=GET path="/foo"}: tower_async_http::trace::on_request: started processing request
41//! Mar 05 20:50:28.524 DEBUG request{method=GET path="/foo"}: tower_async_http::trace::on_response: finished processing request latency=1 ms status=200
42//! ```
43//!
44//! # Customization
45//!
46//! [`Trace`] comes with good defaults but also supports customizing many aspects of the output.
47//!
48//! The default behaviour supports some customization:
49//!
50//! ```rust
51//! use http::{Request, Response, HeaderMap, StatusCode};
52//! use http_body_util::Full;
53//! use bytes::Bytes;
54//! use tower_async::ServiceBuilder;
55//! use tracing::Level;
56//! use tower_async_http::{
57//!     LatencyUnit,
58//!     trace::{TraceLayer, DefaultMakeSpan, DefaultOnRequest, DefaultOnResponse},
59//! };
60//! use std::time::Duration;
61//! # use tower_async::Service;
62//! # use std::convert::Infallible;
63//!
64//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
65//! #     Ok(Response::new(Full::from("foo")))
66//! # }
67//! # #[tokio::main]
68//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
69//! # tracing_subscriber::fmt::init();
70//! #
71//! let service = ServiceBuilder::new()
72//!     .layer(
73//!         TraceLayer::new_for_http()
74//!             .make_span_with(
75//!                 DefaultMakeSpan::new().include_headers(true)
76//!             )
77//!             .on_request(
78//!                 DefaultOnRequest::new().level(Level::INFO)
79//!             )
80//!             .on_response(
81//!                 DefaultOnResponse::new()
82//!                     .level(Level::INFO)
83//!                     .latency_unit(LatencyUnit::Micros)
84//!             )
85//!             // on so on for `on_eos`, `on_body_chunk`, and `on_failure`
86//!     )
87//!     .service_fn(handle);
88//! # let mut service = service;
89//! # let response = service
90//! #     .call(Request::new(Full::from("foo")))
91//! #     .await?;
92//! # Ok(())
93//! # }
94//! ```
95//!
96//! However for maximum control you can provide callbacks:
97//!
98//! ```rust
99//! use http::{Request, Response, HeaderMap, StatusCode};
100//! use http_body_util::Full;
101//! use bytes::Bytes;
102//! use tower_async::ServiceBuilder;
103//! use tower_async_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
104//! use std::time::Duration;
105//! use tracing::Span;
106//! # use tower_async::Service;
107//! # use std::convert::Infallible;
108//!
109//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
110//! #     Ok(Response::new(Full::from("foo")))
111//! # }
112//! # #[tokio::main]
113//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
114//! # tracing_subscriber::fmt::init();
115//! #
116//! let service = ServiceBuilder::new()
117//!     .layer(
118//!         TraceLayer::new_for_http()
119//!             .make_span_with(|request: &Request<Full<Bytes>>| {
120//!                 tracing::debug_span!("http-request")
121//!             })
122//!             .on_request(|request: &Request<Full<Bytes>>, _span: &Span| {
123//!                 tracing::debug!("started {} {}", request.method(), request.uri().path())
124//!             })
125//!             .on_response(|response: &Response<Full<Bytes>>, latency: Duration, _span: &Span| {
126//!                 tracing::debug!("response generated in {:?}", latency)
127//!             })
128//!             .on_body_chunk(|chunk: &Bytes, latency: Duration, _span: &Span| {
129//!                 tracing::debug!("sending {} bytes", chunk.len())
130//!             })
131//!             .on_eos(|trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span| {
132//!                 tracing::debug!("stream closed after {:?}", stream_duration)
133//!             })
134//!             .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
135//!                 tracing::debug!("something went wrong")
136//!             })
137//!     )
138//!     .service_fn(handle);
139//! # let mut service = service;
140//! # let response = service
141//! #     .call(Request::new(Full::from("foo")))
142//! #     .await?;
143//! # Ok(())
144//! # }
145//! ```
146//!
147//! ## Disabling something
148//!
149//! Setting the behaviour to `()` will be disable that particular step:
150//!
151//! ```rust
152//! use http::StatusCode;
153//! use tower_async::ServiceBuilder;
154//! use tower_async_http::{classify::ServerErrorsFailureClass, trace::TraceLayer};
155//! use std::time::Duration;
156//! use tracing::Span;
157//! # use tower_async::Service;
158//! # use http_body_util::Full;
159//! # use bytes::Bytes;
160//! # use http::{Response, Request};
161//! # use std::convert::Infallible;
162//! #
163//! # type Body = Full<Bytes>;
164//!
165//! # async fn handle(request: Request<Body>) -> Result<Response<Body>, Infallible> {
166//! #     Ok(Response::new(Body::from("foo")))
167//! # }
168//! # #[tokio::main]
169//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
170//! # tracing_subscriber::fmt::init();
171//! #
172//! let service = ServiceBuilder::new()
173//!     .layer(
174//!         // This configuration will only emit events on failures
175//!         TraceLayer::new_for_http()
176//!             .on_request(())
177//!             .on_response(())
178//!             .on_body_chunk(())
179//!             .on_eos(())
180//!             .on_failure(|error: ServerErrorsFailureClass, latency: Duration, _span: &Span| {
181//!                 tracing::debug!("something went wrong")
182//!             })
183//!     )
184//!     .service_fn(handle);
185//! # let mut service = service;
186//! # let response = service
187//! #     .call(Request::new(Body::from("foo")))
188//! #     .await?;
189//! # Ok(())
190//! # }
191//! ```
192//!
193//! # When the callbacks are called
194//!
195//! ### `on_request`
196//!
197//! The `on_request` callback is called when the request arrives at the
198//! middleware in [`Service::call`] just prior to passing the request to the
199//! inner service.
200//!
201//! ### `on_response`
202//!
203//! The `on_response` callback is called when the inner service's response
204//! future completes with `Ok(response)` regardless if the response is
205//! classified as a success or a failure.
206//!
207//! For example if you're using [`ServerErrorsAsFailures`] as your classifier
208//! and the inner service responds with `500 Internal Server Error` then the
209//! `on_response` callback is still called. `on_failure` would _also_ be called
210//! in this case since the response was classified as a failure.
211//!
212//! ### `on_body_chunk`
213//!
214//! The `on_body_chunk` callback is called when the response body produces a new
215//! chunk, that is when [`http_body::Body::poll_frame`] returns `Poll::Ready(Some(Ok(chunk)))`.
216//!
217//! `on_body_chunk` is called even if the chunk is empty.
218//!
219//! ### `on_eos`
220//!
221//! The `on_eos` callback is called when a streaming response body ends, that is
222//! when `http_body::Body::poll_frame` returns `Poll::Ready(None)`.
223//!
224//! `on_eos` is called even if the trailers produced are `None`.
225//!
226//! ### `on_failure`
227//!
228//! The `on_failure` callback is called when:
229//!
230//! - The inner [`Service`]'s response future resolves to an error.
231//! - A response is classified as a failure.
232//! - [`http_body::Body::poll_frame`] returns an error.
233//! - An end-of-stream is classified as a failure.
234//!
235//! # Recording fields on the span
236//!
237//! All callbacks receive a reference to the [tracing] [`Span`], corresponding to this request,
238//! produced by the closure passed to [`TraceLayer::make_span_with`]. It can be used to [record
239//! field values][record] that weren't known when the span was created.
240//!
241//! ```rust
242//! use http::{Request, Response, HeaderMap, StatusCode};
243//! use http_body_util::Full;
244//! use bytes::Bytes;
245//! use tower_async::ServiceBuilder;
246//! use tower_async_http::trace::TraceLayer;
247//! use tracing::Span;
248//! use std::time::Duration;
249//! use std::convert::Infallible;
250//!
251//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
252//! #     Ok(Response::new(Full::from("foo")))
253//! # }
254//! # #[tokio::main]
255//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
256//! # tracing_subscriber::fmt::init();
257//! #
258//! let service = ServiceBuilder::new()
259//!     .layer(
260//!         TraceLayer::new_for_http()
261//!             .make_span_with(|request: &Request<Full<Bytes>>| {
262//!                 tracing::debug_span!(
263//!                     "http-request",
264//!                     status_code = tracing::field::Empty,
265//!                 )
266//!             })
267//!             .on_response(|response: &Response<Full<Bytes>>, _latency: Duration, span: &Span| {
268//!                 span.record("status_code", &tracing::field::display(response.status()));
269//!
270//!                 tracing::debug!("response generated")
271//!             })
272//!     )
273//!     .service_fn(handle);
274//! # Ok(())
275//! # }
276//! ```
277//!
278//! # Providing classifiers
279//!
280//! Tracing requires determining if a response is a success or failure. [`MakeClassifier`] is used
281//! to create a classifier for the incoming request. See the docs for [`MakeClassifier`] and
282//! [`ClassifyResponse`] for more details on classification.
283//!
284//! A [`MakeClassifier`] can be provided when creating a [`TraceLayer`]:
285//!
286//! ```rust
287//! use http::{Request, Response};
288//! use http_body_util::Full;
289//! use bytes::Bytes;
290//! use tower_async::ServiceBuilder;
291//! use tower_async_http::{
292//!     trace::TraceLayer,
293//!     classify::{
294//!         MakeClassifier, ClassifyResponse, ClassifiedResponse, NeverClassifyEos,
295//!         SharedClassifier,
296//!     },
297//! };
298//! use std::convert::Infallible;
299//!
300//! # async fn handle(request: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
301//! #     Ok(Response::new(Full::from("foo")))
302//! # }
303//! # #[tokio::main]
304//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
305//! # tracing_subscriber::fmt::init();
306//! #
307//! // Our `MakeClassifier` that always crates `MyClassifier` classifiers.
308//! #[derive(Copy, Clone)]
309//! struct MyMakeClassify;
310//!
311//! impl MakeClassifier for MyMakeClassify {
312//!     type Classifier = MyClassifier;
313//!     type FailureClass = &'static str;
314//!     type ClassifyEos = NeverClassifyEos<&'static str>;
315//!
316//!     fn make_classifier<B>(&self, req: &Request<B>) -> Self::Classifier {
317//!         MyClassifier
318//!     }
319//! }
320//!
321//! // A classifier that classifies failures as `"something went wrong..."`.
322//! #[derive(Copy, Clone)]
323//! struct MyClassifier;
324//!
325//! impl ClassifyResponse for MyClassifier {
326//!     type FailureClass = &'static str;
327//!     type ClassifyEos = NeverClassifyEos<&'static str>;
328//!
329//!     fn classify_response<B>(
330//!         self,
331//!         res: &Response<B>
332//!     ) -> ClassifiedResponse<Self::FailureClass, Self::ClassifyEos> {
333//!         // Classify based on the status code.
334//!         if res.status().is_server_error() {
335//!             ClassifiedResponse::Ready(Err("something went wrong..."))
336//!         } else {
337//!             ClassifiedResponse::Ready(Ok(()))
338//!         }
339//!     }
340//!
341//!     fn classify_error<E>(self, error: &E) -> Self::FailureClass
342//!     where
343//!         E: std::fmt::Display,
344//!     {
345//!         "something went wrong..."
346//!     }
347//! }
348//!
349//! let service = ServiceBuilder::new()
350//!     // Create a trace layer that uses our classifier.
351//!     .layer(TraceLayer::new(MyMakeClassify))
352//!     .service_fn(handle);
353//!
354//! // Since `MyClassifier` is `Clone` we can also use `SharedClassifier`
355//! // to avoid having to define a separate `MakeClassifier`.
356//! let service = ServiceBuilder::new()
357//!     .layer(TraceLayer::new(SharedClassifier::new(MyClassifier)))
358//!     .service_fn(handle);
359//! # Ok(())
360//! # }
361//! ```
362//!
363//! [`TraceLayer`] comes with convenience methods for using common classifiers:
364//!
365//! - [`TraceLayer::new_for_http`] classifies based on the status code. It doesn't consider
366//! streaming responses.
367//! - [`TraceLayer::new_for_grpc`] classifies based on the gRPC protocol and supports streaming
368//! responses.
369//!
370//! [tracing]: https://crates.io/crates/tracing
371//! [`Service`]: tower_async_service::Service
372//! [`Service::call`]: tower_async_service::Service::call
373//! [`MakeClassifier`]: crate::classify::MakeClassifier
374//! [`ClassifyResponse`]: crate::classify::ClassifyResponse
375//! [record]: https://docs.rs/tracing/latest/tracing/span/struct.Span.html#method.record
376//! [`TraceLayer::make_span_with`]: crate::trace::TraceLayer::make_span_with
377//! [`Span`]: tracing::Span
378//! [`ServerErrorsAsFailures`]: crate::classify::ServerErrorsAsFailures
379
380use std::{fmt, time::Duration};
381
382use tracing::Level;
383
384pub use self::{
385    body::ResponseBody,
386    layer::TraceLayer,
387    make_span::{DefaultMakeSpan, MakeSpan},
388    on_body_chunk::{DefaultOnBodyChunk, OnBodyChunk},
389    on_eos::{DefaultOnEos, OnEos},
390    on_failure::{DefaultOnFailure, OnFailure},
391    on_request::{DefaultOnRequest, OnRequest},
392    on_response::{DefaultOnResponse, OnResponse},
393    service::Trace,
394};
395
396use crate::LatencyUnit;
397
398macro_rules! event_dynamic_lvl {
399    ( $(target: $target:expr,)? $(parent: $parent:expr,)? $lvl:expr, $($tt:tt)* ) => {
400        match $lvl {
401            tracing::Level::ERROR => {
402                tracing::event!(
403                    $(target: $target,)?
404                    $(parent: $parent,)?
405                    tracing::Level::ERROR,
406                    $($tt)*
407                );
408            }
409            tracing::Level::WARN => {
410                tracing::event!(
411                    $(target: $target,)?
412                    $(parent: $parent,)?
413                    tracing::Level::WARN,
414                    $($tt)*
415                );
416            }
417            tracing::Level::INFO => {
418                tracing::event!(
419                    $(target: $target,)?
420                    $(parent: $parent,)?
421                    tracing::Level::INFO,
422                    $($tt)*
423                );
424            }
425            tracing::Level::DEBUG => {
426                tracing::event!(
427                    $(target: $target,)?
428                    $(parent: $parent,)?
429                    tracing::Level::DEBUG,
430                    $($tt)*
431                );
432            }
433            tracing::Level::TRACE => {
434                tracing::event!(
435                    $(target: $target,)?
436                    $(parent: $parent,)?
437                    tracing::Level::TRACE,
438                    $($tt)*
439                );
440            }
441        }
442    };
443}
444
445mod body;
446mod layer;
447mod make_span;
448mod on_body_chunk;
449mod on_eos;
450mod on_failure;
451mod on_request;
452mod on_response;
453mod service;
454
455const DEFAULT_MESSAGE_LEVEL: Level = Level::DEBUG;
456const DEFAULT_ERROR_LEVEL: Level = Level::ERROR;
457
458struct Latency {
459    unit: LatencyUnit,
460    duration: Duration,
461}
462
463impl fmt::Display for Latency {
464    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
465        match self.unit {
466            LatencyUnit::Seconds => write!(f, "{} s", self.duration.as_secs_f64()),
467            LatencyUnit::Millis => write!(f, "{} ms", self.duration.as_millis()),
468            LatencyUnit::Micros => write!(f, "{} μs", self.duration.as_micros()),
469            LatencyUnit::Nanos => write!(f, "{} ns", self.duration.as_nanos()),
470        }
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    use crate::classify::ServerErrorsFailureClass;
479    use crate::test_helpers::{self, Body};
480
481    use bytes::Bytes;
482    use http::{HeaderMap, Request, Response};
483    use once_cell::sync::Lazy;
484    use std::{
485        sync::atomic::{AtomicU32, Ordering},
486        time::Duration,
487    };
488    use tower_async::{BoxError, Service, ServiceBuilder};
489    use tracing::Span;
490
491    #[tokio::test]
492    async fn unary_request() {
493        static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
494        static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
495        static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
496        static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
497        static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
498
499        let trace_layer = TraceLayer::new_for_http()
500            .make_span_with(|_req: &Request<Body>| {
501                tracing::info_span!("test-span", foo = tracing::field::Empty)
502            })
503            .on_request(|_req: &Request<Body>, span: &Span| {
504                span.record("foo", 42);
505                ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
506            })
507            .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
508                ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
509            })
510            .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
511                ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
512            })
513            .on_eos(
514                |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
515                    ON_EOS.fetch_add(1, Ordering::SeqCst);
516                },
517            )
518            .on_failure(
519                |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
520                    ON_FAILURE.fetch_add(1, Ordering::SeqCst);
521                },
522            );
523
524        let svc = ServiceBuilder::new().layer(trace_layer).service_fn(echo);
525
526        let res = svc.call(Request::new(Body::from("foobar"))).await.unwrap();
527
528        assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
529        assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
530        assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
531        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
532        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
533
534        test_helpers::to_bytes(res.into_body()).await.unwrap();
535        assert_eq!(1, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
536        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
537        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
538    }
539
540    #[tokio::test]
541    async fn streaming_response() {
542        static ON_REQUEST_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
543        static ON_RESPONSE_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
544        static ON_BODY_CHUNK_COUNT: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
545        static ON_EOS: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
546        static ON_FAILURE: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));
547
548        let trace_layer = TraceLayer::new_for_http()
549            .on_request(|_req: &Request<Body>, _span: &Span| {
550                ON_REQUEST_COUNT.fetch_add(1, Ordering::SeqCst);
551            })
552            .on_response(|_res: &Response<Body>, _latency: Duration, _span: &Span| {
553                ON_RESPONSE_COUNT.fetch_add(1, Ordering::SeqCst);
554            })
555            .on_body_chunk(|_chunk: &Bytes, _latency: Duration, _span: &Span| {
556                ON_BODY_CHUNK_COUNT.fetch_add(1, Ordering::SeqCst);
557            })
558            .on_eos(
559                |_trailers: Option<&HeaderMap>, _latency: Duration, _span: &Span| {
560                    ON_EOS.fetch_add(1, Ordering::SeqCst);
561                },
562            )
563            .on_failure(
564                |_class: ServerErrorsFailureClass, _latency: Duration, _span: &Span| {
565                    ON_FAILURE.fetch_add(1, Ordering::SeqCst);
566                },
567            );
568
569        let svc = ServiceBuilder::new()
570            .layer(trace_layer)
571            .service_fn(streaming_body);
572
573        let res = svc.call(Request::new(Body::empty())).await.unwrap();
574
575        assert_eq!(1, ON_REQUEST_COUNT.load(Ordering::SeqCst), "request");
576        assert_eq!(1, ON_RESPONSE_COUNT.load(Ordering::SeqCst), "request");
577        assert_eq!(0, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
578        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
579        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
580
581        test_helpers::to_bytes(res.into_body()).await.unwrap();
582        assert_eq!(3, ON_BODY_CHUNK_COUNT.load(Ordering::SeqCst), "body chunk");
583        assert_eq!(0, ON_EOS.load(Ordering::SeqCst), "eos");
584        assert_eq!(0, ON_FAILURE.load(Ordering::SeqCst), "failure");
585    }
586
587    async fn echo(req: Request<Body>) -> Result<Response<Body>, BoxError> {
588        Ok(Response::new(req.into_body()))
589    }
590
591    async fn streaming_body(_req: Request<Body>) -> Result<Response<Body>, BoxError> {
592        use futures::stream::iter;
593
594        let stream = iter(vec![
595            Ok::<_, BoxError>(Bytes::from("one")),
596            Ok::<_, BoxError>(Bytes::from("two")),
597            Ok::<_, BoxError>(Bytes::from("three")),
598        ]);
599
600        let body = Body::from_stream(stream);
601
602        Ok(Response::new(body))
603    }
604}