1use super::{
2 DefaultMakeSpan, DefaultOnBodyChunk, DefaultOnEos, DefaultOnFailure, DefaultOnRequest,
3 DefaultOnResponse, MakeSpan, OnBodyChunk, OnEos, OnFailure, OnRequest, OnResponse,
4 ResponseBody, TraceLayer,
5};
6use crate::classify::{
7 ClassifiedResponse, ClassifyResponse, GrpcErrorsAsFailures, MakeClassifier,
8 ServerErrorsAsFailures, SharedClassifier,
9};
10use http::{Request, Response};
11use http_body::Body;
12use std::{fmt, time::Instant};
13use tower_async_service::Service;
14
15#[derive(Debug, Clone, Copy)]
22pub struct Trace<
23 S,
24 M,
25 MakeSpan = DefaultMakeSpan,
26 OnRequest = DefaultOnRequest,
27 OnResponse = DefaultOnResponse,
28 OnBodyChunk = DefaultOnBodyChunk,
29 OnEos = DefaultOnEos,
30 OnFailure = DefaultOnFailure,
31> {
32 pub(crate) inner: S,
33 pub(crate) make_classifier: M,
34 pub(crate) make_span: MakeSpan,
35 pub(crate) on_request: OnRequest,
36 pub(crate) on_response: OnResponse,
37 pub(crate) on_body_chunk: OnBodyChunk,
38 pub(crate) on_eos: OnEos,
39 pub(crate) on_failure: OnFailure,
40}
41
42impl<S, M> Trace<S, M> {
43 pub fn new(inner: S, make_classifier: M) -> Self
45 where
46 M: MakeClassifier,
47 {
48 Self {
49 inner,
50 make_classifier,
51 make_span: DefaultMakeSpan::new(),
52 on_request: DefaultOnRequest::default(),
53 on_response: DefaultOnResponse::default(),
54 on_body_chunk: DefaultOnBodyChunk::default(),
55 on_eos: DefaultOnEos::default(),
56 on_failure: DefaultOnFailure::default(),
57 }
58 }
59
60 pub fn layer(make_classifier: M) -> TraceLayer<M>
64 where
65 M: MakeClassifier,
66 {
67 TraceLayer::new(make_classifier)
68 }
69}
70
71impl<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure>
72 Trace<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure>
73{
74 define_inner_service_accessors!();
75
76 pub fn on_request<NewOnRequest>(
82 self,
83 new_on_request: NewOnRequest,
84 ) -> Trace<S, M, MakeSpan, NewOnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure> {
85 Trace {
86 on_request: new_on_request,
87 inner: self.inner,
88 on_failure: self.on_failure,
89 on_eos: self.on_eos,
90 on_body_chunk: self.on_body_chunk,
91 make_span: self.make_span,
92 on_response: self.on_response,
93 make_classifier: self.make_classifier,
94 }
95 }
96
97 pub fn on_response<NewOnResponse>(
103 self,
104 new_on_response: NewOnResponse,
105 ) -> Trace<S, M, MakeSpan, OnRequest, NewOnResponse, OnBodyChunk, OnEos, OnFailure> {
106 Trace {
107 on_response: new_on_response,
108 inner: self.inner,
109 on_request: self.on_request,
110 on_failure: self.on_failure,
111 on_body_chunk: self.on_body_chunk,
112 on_eos: self.on_eos,
113 make_span: self.make_span,
114 make_classifier: self.make_classifier,
115 }
116 }
117
118 pub fn on_body_chunk<NewOnBodyChunk>(
124 self,
125 new_on_body_chunk: NewOnBodyChunk,
126 ) -> Trace<S, M, MakeSpan, OnRequest, OnResponse, NewOnBodyChunk, OnEos, OnFailure> {
127 Trace {
128 on_body_chunk: new_on_body_chunk,
129 on_eos: self.on_eos,
130 make_span: self.make_span,
131 inner: self.inner,
132 on_failure: self.on_failure,
133 on_request: self.on_request,
134 on_response: self.on_response,
135 make_classifier: self.make_classifier,
136 }
137 }
138
139 pub fn on_eos<NewOnEos>(
145 self,
146 new_on_eos: NewOnEos,
147 ) -> Trace<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, NewOnEos, OnFailure> {
148 Trace {
149 on_eos: new_on_eos,
150 make_span: self.make_span,
151 inner: self.inner,
152 on_failure: self.on_failure,
153 on_request: self.on_request,
154 on_body_chunk: self.on_body_chunk,
155 on_response: self.on_response,
156 make_classifier: self.make_classifier,
157 }
158 }
159
160 pub fn on_failure<NewOnFailure>(
166 self,
167 new_on_failure: NewOnFailure,
168 ) -> Trace<S, M, MakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, NewOnFailure> {
169 Trace {
170 on_failure: new_on_failure,
171 inner: self.inner,
172 make_span: self.make_span,
173 on_body_chunk: self.on_body_chunk,
174 on_request: self.on_request,
175 on_eos: self.on_eos,
176 on_response: self.on_response,
177 make_classifier: self.make_classifier,
178 }
179 }
180
181 pub fn make_span_with<NewMakeSpan>(
188 self,
189 new_make_span: NewMakeSpan,
190 ) -> Trace<S, M, NewMakeSpan, OnRequest, OnResponse, OnBodyChunk, OnEos, OnFailure> {
191 Trace {
192 make_span: new_make_span,
193 inner: self.inner,
194 on_failure: self.on_failure,
195 on_request: self.on_request,
196 on_body_chunk: self.on_body_chunk,
197 on_response: self.on_response,
198 on_eos: self.on_eos,
199 make_classifier: self.make_classifier,
200 }
201 }
202}
203
204impl<S>
205 Trace<
206 S,
207 SharedClassifier<ServerErrorsAsFailures>,
208 DefaultMakeSpan,
209 DefaultOnRequest,
210 DefaultOnResponse,
211 DefaultOnBodyChunk,
212 DefaultOnEos,
213 DefaultOnFailure,
214 >
215{
216 pub fn new_for_http(inner: S) -> Self {
219 Self {
220 inner,
221 make_classifier: SharedClassifier::new(ServerErrorsAsFailures::default()),
222 make_span: DefaultMakeSpan::new(),
223 on_request: DefaultOnRequest::default(),
224 on_response: DefaultOnResponse::default(),
225 on_body_chunk: DefaultOnBodyChunk::default(),
226 on_eos: DefaultOnEos::default(),
227 on_failure: DefaultOnFailure::default(),
228 }
229 }
230}
231
232impl<S>
233 Trace<
234 S,
235 SharedClassifier<GrpcErrorsAsFailures>,
236 DefaultMakeSpan,
237 DefaultOnRequest,
238 DefaultOnResponse,
239 DefaultOnBodyChunk,
240 DefaultOnEos,
241 DefaultOnFailure,
242 >
243{
244 pub fn new_for_grpc(inner: S) -> Self {
247 Self {
248 inner,
249 make_classifier: SharedClassifier::new(GrpcErrorsAsFailures::default()),
250 make_span: DefaultMakeSpan::new(),
251 on_request: DefaultOnRequest::default(),
252 on_response: DefaultOnResponse::default(),
253 on_body_chunk: DefaultOnBodyChunk::default(),
254 on_eos: DefaultOnEos::default(),
255 on_failure: DefaultOnFailure::default(),
256 }
257 }
258}
259
260impl<
261 S,
262 ReqBody,
263 ResBody,
264 M,
265 OnRequestT,
266 OnResponseT,
267 OnFailureT,
268 OnBodyChunkT,
269 OnEosT,
270 MakeSpanT,
271 > Service<Request<ReqBody>>
272 for Trace<S, M, MakeSpanT, OnRequestT, OnResponseT, OnBodyChunkT, OnEosT, OnFailureT>
273where
274 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
275 ReqBody: Body,
276 ResBody: Body,
277 ResBody::Error: fmt::Display,
278 S::Error: fmt::Display,
279 M: MakeClassifier,
280 M::Classifier: Clone,
281 MakeSpanT: MakeSpan<ReqBody>,
282 OnRequestT: OnRequest<ReqBody>,
283 OnResponseT: OnResponse<ResBody> + Clone,
284 OnBodyChunkT: OnBodyChunk<ResBody::Data> + Clone,
285 OnEosT: OnEos + Clone,
286 OnFailureT: OnFailure<M::FailureClass> + Clone,
287{
288 type Response =
289 Response<ResponseBody<ResBody, M::ClassifyEos, OnBodyChunkT, OnEosT, OnFailureT>>;
290 type Error = S::Error;
291
292 async fn call(&self, req: Request<ReqBody>) -> Result<Self::Response, Self::Error> {
293 let start = Instant::now();
294
295 let span = self.make_span.make_span(&req);
296
297 let classifier = self.make_classifier.make_classifier(&req);
298
299 let result = {
300 let _guard = span.enter();
301 self.on_request.on_request(&req, &span);
302 self.inner.call(req)
303 }
304 .await;
305 let latency = start.elapsed();
306
307 match result {
308 Ok(res) => {
309 let classification = classifier.classify_response(&res);
310
311 self.on_response.clone().on_response(&res, latency, &span);
312
313 match classification {
314 ClassifiedResponse::Ready(classification) => {
315 if let Err(failure_class) = classification {
316 self.on_failure.on_failure(failure_class, latency, &span);
317 }
318
319 let span = span.clone();
320 let res = res.map(|body| ResponseBody {
321 inner: body,
322 classify_eos: None,
323 on_eos: None,
324 on_body_chunk: self.on_body_chunk.clone(),
325 on_failure: Some(self.on_failure.clone()),
326 start,
327 span,
328 });
329
330 Ok(res)
331 }
332 ClassifiedResponse::RequiresEos(classify_eos) => {
333 let span = span.clone();
334 let res = res.map(|body| ResponseBody {
335 inner: body,
336 classify_eos: Some(classify_eos),
337 on_eos: Some((self.on_eos.clone(), Instant::now())),
338 on_body_chunk: self.on_body_chunk.clone(),
339 on_failure: Some(self.on_failure.clone()),
340 start,
341 span,
342 });
343
344 Ok(res)
345 }
346 }
347 }
348 Err(err) => {
349 let failure_class: <M as MakeClassifier>::FailureClass =
350 classifier.classify_error(&err);
351 self.on_failure.on_failure(failure_class, latency, &span);
352
353 Err(err)
354 }
355 }
356 }
357}