tower_async_http/trace/
service.rs

1use super::{
2    DefaultMakeSpan, DefaultOnBodyChunk, DefaultOnEos, DefaultOnFailure, DefaultOnRequest,
3    DefaultOnResponse, MakeSpan, OnBodyChunk, OnEos, OnFailure, OnRequest, OnResponse,
4    ResponseBody, TraceLayer,
5};
6use crate::classify::{
7    ClassifiedResponse, ClassifyResponse, GrpcErrorsAsFailures, MakeClassifier,
8    ServerErrorsAsFailures, SharedClassifier,
9};
10use http::{Request, Response};
11use http_body::Body;
12use std::{fmt, time::Instant};
13use tower_async_service::Service;
14
15/// Middleware that adds high level [tracing] to a [`Service`].
16///
17/// See the [module docs](crate::trace) for an example.
18///
19/// [tracing]: https://crates.io/crates/tracing
20/// [`Service`]: tower_async_service::Service
21#[derive(Debug, Clone, Copy)]
22pub struct Trace<
23    S,
24    M,
25    MakeSpan = DefaultMakeSpan,
26    OnRequest = DefaultOnRequest,
27    OnResponse = DefaultOnResponse,
28    OnBodyChunk = DefaultOnBodyChunk,
29    OnEos = DefaultOnEos,
30    OnFailure = DefaultOnFailure,
31> {
32    pub(crate) inner: S,
33    pub(crate) make_classifier: M,
34    pub(crate) make_span: MakeSpan,
35    pub(crate) on_request: OnRequest,
36    pub(crate) on_response: OnResponse,
37    pub(crate) on_body_chunk: OnBodyChunk,
38    pub(crate) on_eos: OnEos,
39    pub(crate) on_failure: OnFailure,
40}
41
42impl<S, M> Trace<S, M> {
43    /// Create a new [`Trace`] using the given [`MakeClassifier`].
44    pub fn new(inner: S, make_classifier: M) -> Self
45    where
46        M: MakeClassifier,
47    {
48        Self {
49            inner,
50            make_classifier,
51            make_span: DefaultMakeSpan::new(),
52            on_request: DefaultOnRequest::default(),
53            on_response: DefaultOnResponse::default(),
54            on_body_chunk: DefaultOnBodyChunk::default(),
55            on_eos: DefaultOnEos::default(),
56            on_failure: DefaultOnFailure::default(),
57        }
58    }
59
60    /// Returns a new [`Layer`] that wraps services with a [`TraceLayer`] middleware.
61    ///
62    /// [`Layer`]: tower_async_layer::Layer
63    pub fn layer(make_classifier: M) -> TraceLayer<M>
64    where
65        M: MakeClassifier,
66    {
67        TraceLayer::new(make_classifier)
68    }
69}
70
71impl<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure>
72    Trace<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure>
73{
74    define_inner_service_accessors!();
75
76    /// Customize what to do when a request is received.
77    ///
78    /// `NewOnRequest` is expected to implement [`OnRequest`].
79    ///
80    /// [`OnRequest`]: super::OnRequest
81    pub fn on_request<NewOnRequest>(
82        self,
83        new_on_request: NewOnRequest,
84    ) -> Trace<S, M, MakeSpan, NewOnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure> {
85        Trace {
86            on_request: new_on_request,
87            inner: self.inner,
88            on_failure: self.on_failure,
89            on_eos: self.on_eos,
90            on_body_chunk: self.on_body_chunk,
91            make_span: self.make_span,
92            on_response: self.on_response,
93            make_classifier: self.make_classifier,
94        }
95    }
96
97    /// Customize what to do when a response has been produced.
98    ///
99    /// `NewOnResponse` is expected to implement [`OnResponse`].
100    ///
101    /// [`OnResponse`]: super::OnResponse
102    pub fn on_response<NewOnResponse>(
103        self,
104        new_on_response: NewOnResponse,
105    ) -> Trace<S, M, MakeSpan, OnRequest, NewOnResponse, OnBodyChunk, OnEos, OnFailure> {
106        Trace {
107            on_response: new_on_response,
108            inner: self.inner,
109            on_request: self.on_request,
110            on_failure: self.on_failure,
111            on_body_chunk: self.on_body_chunk,
112            on_eos: self.on_eos,
113            make_span: self.make_span,
114            make_classifier: self.make_classifier,
115        }
116    }
117
118    /// Customize what to do when a body chunk has been sent.
119    ///
120    /// `NewOnBodyChunk` is expected to implement [`OnBodyChunk`].
121    ///
122    /// [`OnBodyChunk`]: super::OnBodyChunk
123    pub fn on_body_chunk<NewOnBodyChunk>(
124        self,
125        new_on_body_chunk: NewOnBodyChunk,
126    ) -> Trace<S, M, MakeSpan, OnRequest, OnResponse, NewOnBodyChunk, OnEos, OnFailure> {
127        Trace {
128            on_body_chunk: new_on_body_chunk,
129            on_eos: self.on_eos,
130            make_span: self.make_span,
131            inner: self.inner,
132            on_failure: self.on_failure,
133            on_request: self.on_request,
134            on_response: self.on_response,
135            make_classifier: self.make_classifier,
136        }
137    }
138
139    /// Customize what to do when a streaming response has closed.
140    ///
141    /// `NewOnEos` is expected to implement [`OnEos`].
142    ///
143    /// [`OnEos`]: super::OnEos
144    pub fn on_eos<NewOnEos>(
145        self,
146        new_on_eos: NewOnEos,
147    ) -> Trace<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, NewOnEos, OnFailure> {
148        Trace {
149            on_eos: new_on_eos,
150            make_span: self.make_span,
151            inner: self.inner,
152            on_failure: self.on_failure,
153            on_request: self.on_request,
154            on_body_chunk: self.on_body_chunk,
155            on_response: self.on_response,
156            make_classifier: self.make_classifier,
157        }
158    }
159
160    /// Customize what to do when a response has been classified as a failure.
161    ///
162    /// `NewOnFailure` is expected to implement [`OnFailure`].
163    ///
164    /// [`OnFailure`]: super::OnFailure
165    pub fn on_failure<NewOnFailure>(
166        self,
167        new_on_failure: NewOnFailure,
168    ) -> Trace<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, NewOnFailure> {
169        Trace {
170            on_failure: new_on_failure,
171            inner: self.inner,
172            make_span: self.make_span,
173            on_body_chunk: self.on_body_chunk,
174            on_request: self.on_request,
175            on_eos: self.on_eos,
176            on_response: self.on_response,
177            make_classifier: self.make_classifier,
178        }
179    }
180
181    /// Customize how to make [`Span`]s that all request handling will be wrapped in.
182    ///
183    /// `NewMakeSpan` is expected to implement [`MakeSpan`].
184    ///
185    /// [`MakeSpan`]: super::MakeSpan
186    /// [`Span`]: tracing::Span
187    pub fn make_span_with<NewMakeSpan>(
188        self,
189        new_make_span: NewMakeSpan,
190    ) -> Trace<S, M, NewMakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure> {
191        Trace {
192            make_span: new_make_span,
193            inner: self.inner,
194            on_failure: self.on_failure,
195            on_request: self.on_request,
196            on_body_chunk: self.on_body_chunk,
197            on_response: self.on_response,
198            on_eos: self.on_eos,
199            make_classifier: self.make_classifier,
200        }
201    }
202}
203
204impl<S>
205    Trace<
206        S,
207        SharedClassifier<ServerErrorsAsFailures>,
208        DefaultMakeSpan,
209        DefaultOnRequest,
210        DefaultOnResponse,
211        DefaultOnBodyChunk,
212        DefaultOnEos,
213        DefaultOnFailure,
214    >
215{
216    /// Create a new [`Trace`] using [`ServerErrorsAsFailures`] which supports classifying
217    /// regular HTTP responses based on the status code.
218    pub fn new_for_http(inner: S) -> Self {
219        Self {
220            inner,
221            make_classifier: SharedClassifier::new(ServerErrorsAsFailures::default()),
222            make_span: DefaultMakeSpan::new(),
223            on_request: DefaultOnRequest::default(),
224            on_response: DefaultOnResponse::default(),
225            on_body_chunk: DefaultOnBodyChunk::default(),
226            on_eos: DefaultOnEos::default(),
227            on_failure: DefaultOnFailure::default(),
228        }
229    }
230}
231
232impl<S>
233    Trace<
234        S,
235        SharedClassifier<GrpcErrorsAsFailures>,
236        DefaultMakeSpan,
237        DefaultOnRequest,
238        DefaultOnResponse,
239        DefaultOnBodyChunk,
240        DefaultOnEos,
241        DefaultOnFailure,
242    >
243{
244    /// Create a new [`Trace`] using [`GrpcErrorsAsFailures`] which supports classifying
245    /// gRPC responses and streams based on the `grpc-status` header.
246    pub fn new_for_grpc(inner: S) -> Self {
247        Self {
248            inner,
249            make_classifier: SharedClassifier::new(GrpcErrorsAsFailures::default()),
250            make_span: DefaultMakeSpan::new(),
251            on_request: DefaultOnRequest::default(),
252            on_response: DefaultOnResponse::default(),
253            on_body_chunk: DefaultOnBodyChunk::default(),
254            on_eos: DefaultOnEos::default(),
255            on_failure: DefaultOnFailure::default(),
256        }
257    }
258}
259
260impl<
261        S,
262        ReqBody,
263        ResBody,
264        M,
265        OnRequestT,
266        OnResponseT,
267        OnFailureT,
268        OnBodyChunkT,
269        OnEosT,
270        MakeSpanT,
271    > Service<Request<ReqBody>>
272    for Trace<S, M, MakeSpanT, OnRequestT, OnResponseT, OnBodyChunkT, OnEosT, OnFailureT>
273where
274    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
275    ReqBody: Body,
276    ResBody: Body,
277    ResBody::Error: fmt::Display,
278    S::Error: fmt::Display,
279    M: MakeClassifier,
280    M::Classifier: Clone,
281    MakeSpanT: MakeSpan<ReqBody>,
282    OnRequestT: OnRequest<ReqBody>,
283    OnResponseT: OnResponse<ResBody> + Clone,
284    OnBodyChunkT: OnBodyChunk<ResBody::Data> + Clone,
285    OnEosT: OnEos + Clone,
286    OnFailureT: OnFailure<M::FailureClass> + Clone,
287{
288    type Response =
289        Response<ResponseBody<ResBody, M::ClassifyEos, OnBodyChunkT, OnEosT, OnFailureT>>;
290    type Error = S::Error;
291
292    async fn call(&self, req: Request<ReqBody>) -> Result<Self::Response, Self::Error> {
293        let start = Instant::now();
294
295        let span = self.make_span.make_span(&req);
296
297        let classifier = self.make_classifier.make_classifier(&req);
298
299        let result = {
300            let _guard = span.enter();
301            self.on_request.on_request(&req, &span);
302            self.inner.call(req)
303        }
304        .await;
305        let latency = start.elapsed();
306
307        match result {
308            Ok(res) => {
309                let classification = classifier.classify_response(&res);
310
311                self.on_response.clone().on_response(&res, latency, &span);
312
313                match classification {
314                    ClassifiedResponse::Ready(classification) => {
315                        if let Err(failure_class) = classification {
316                            self.on_failure.on_failure(failure_class, latency, &span);
317                        }
318
319                        let span = span.clone();
320                        let res = res.map(|body| ResponseBody {
321                            inner: body,
322                            classify_eos: None,
323                            on_eos: None,
324                            on_body_chunk: self.on_body_chunk.clone(),
325                            on_failure: Some(self.on_failure.clone()),
326                            start,
327                            span,
328                        });
329
330                        Ok(res)
331                    }
332                    ClassifiedResponse::RequiresEos(classify_eos) => {
333                        let span = span.clone();
334                        let res = res.map(|body| ResponseBody {
335                            inner: body,
336                            classify_eos: Some(classify_eos),
337                            on_eos: Some((self.on_eos.clone(), Instant::now())),
338                            on_body_chunk: self.on_body_chunk.clone(),
339                            on_failure: Some(self.on_failure.clone()),
340                            start,
341                            span,
342                        });
343
344                        Ok(res)
345                    }
346                }
347            }
348            Err(err) => {
349                let failure_class: <M as MakeClassifier>::FailureClass =
350                    classifier.classify_error(&err);
351                self.on_failure.on_failure(failure_class, latency, &span);
352
353                Err(err)
354            }
355        }
356    }
357}