tracing_awc/
lib.rs

1use actix_http::{error::PayloadError, header::map::HeaderMap, Payload, RequestHeadType};
2use actix_service::Service;
3use awc::{error::SendRequestError, middleware::Transform, ConnectRequest, ConnectResponse};
4use bytes::Bytes;
5use futures_core::stream::Stream;
6use std::{
7    future::Future,
8    pin::Pin,
9    task::{Context, Poll},
10};
11use tracing::{instrument::Instrumented, Instrument, Span};
12
13#[cfg(feature = "opentelemetry_0_13")]
14use opentelemetry_0_13_pkg as opentelemetry;
15#[cfg(feature = "opentelemetry_0_14")]
16use opentelemetry_0_14_pkg as opentelemetry;
17#[cfg(feature = "opentelemetry_0_15")]
18use opentelemetry_0_15_pkg as opentelemetry;
19#[cfg(feature = "opentelemetry_0_16")]
20use opentelemetry_0_16_pkg as opentelemetry;
21#[cfg(feature = "opentelemetry_0_17")]
22use opentelemetry_0_17_pkg as opentelemetry;
23#[cfg(feature = "opentelemetry_0_18")]
24use opentelemetry_0_18_pkg as opentelemetry;
25#[cfg(feature = "opentelemetry_0_19")]
26use opentelemetry_0_19_pkg as opentelemetry;
27#[cfg(feature = "opentelemetry_0_20")]
28use opentelemetry_0_20_pkg as opentelemetry;
29#[cfg(feature = "opentelemetry_0_21")]
30use opentelemetry_0_21_pkg as opentelemetry;
31#[cfg(feature = "opentelemetry_0_22")]
32use opentelemetry_0_22_pkg as opentelemetry;
33#[cfg(feature = "opentelemetry_0_23")]
34use opentelemetry_0_23_pkg as opentelemetry;
35#[cfg(feature = "opentelemetry_0_24")]
36use opentelemetry_0_24_pkg as opentelemetry;
37#[cfg(feature = "opentelemetry_0_25")]
38use opentelemetry_0_25_pkg as opentelemetry;
39#[cfg(feature = "opentelemetry_0_26")]
40use opentelemetry_0_26_pkg as opentelemetry;
41#[cfg(feature = "opentelemetry_0_27")]
42use opentelemetry_0_27_pkg as opentelemetry;
43#[cfg(feature = "opentelemetry_0_28")]
44use opentelemetry_0_28_pkg as opentelemetry;
45
46#[cfg(feature = "opentelemetry_0_13")]
47use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry;
48#[cfg(feature = "opentelemetry_0_14")]
49use tracing_opentelemetry_0_13_pkg as tracing_opentelemetry;
50#[cfg(feature = "opentelemetry_0_15")]
51use tracing_opentelemetry_0_14_pkg as tracing_opentelemetry;
52#[cfg(feature = "opentelemetry_0_16")]
53use tracing_opentelemetry_0_16_pkg as tracing_opentelemetry;
54#[cfg(feature = "opentelemetry_0_17")]
55use tracing_opentelemetry_0_17_pkg as tracing_opentelemetry;
56#[cfg(feature = "opentelemetry_0_18")]
57use tracing_opentelemetry_0_18_pkg as tracing_opentelemetry;
58#[cfg(feature = "opentelemetry_0_19")]
59use tracing_opentelemetry_0_19_pkg as tracing_opentelemetry;
60#[cfg(feature = "opentelemetry_0_20")]
61use tracing_opentelemetry_0_21_pkg as tracing_opentelemetry;
62#[cfg(feature = "opentelemetry_0_21")]
63use tracing_opentelemetry_0_22_pkg as tracing_opentelemetry;
64#[cfg(feature = "opentelemetry_0_22")]
65use tracing_opentelemetry_0_23_pkg as tracing_opentelemetry;
66#[cfg(feature = "opentelemetry_0_23")]
67use tracing_opentelemetry_0_24_pkg as tracing_opentelemetry;
68#[cfg(feature = "opentelemetry_0_24")]
69use tracing_opentelemetry_0_25_pkg as tracing_opentelemetry;
70#[cfg(feature = "opentelemetry_0_25")]
71use tracing_opentelemetry_0_26_pkg as tracing_opentelemetry;
72#[cfg(feature = "opentelemetry_0_26")]
73use tracing_opentelemetry_0_27_pkg as tracing_opentelemetry;
74#[cfg(feature = "opentelemetry_0_27")]
75use tracing_opentelemetry_0_28_pkg as tracing_opentelemetry;
76#[cfg(feature = "opentelemetry_0_28")]
77use tracing_opentelemetry_0_29_pkg as tracing_opentelemetry;
78
79#[cfg(any(
80    feature = "opentelemetry_0_13",
81    feature = "opentelemetry_0_14",
82    feature = "opentelemetry_0_15",
83    feature = "opentelemetry_0_16",
84    feature = "opentelemetry_0_17",
85    feature = "opentelemetry_0_18",
86    feature = "opentelemetry_0_19",
87    feature = "opentelemetry_0_20",
88    feature = "opentelemetry_0_21",
89    feature = "opentelemetry_0_22",
90    feature = "opentelemetry_0_23",
91    feature = "opentelemetry_0_24",
92    feature = "opentelemetry_0_25",
93    feature = "opentelemetry_0_26",
94    feature = "opentelemetry_0_27",
95    feature = "opentelemetry_0_28",
96))]
97pub fn root_span() -> Span {
98    let span = tracing::info_span!("Root span", trace_id = tracing::field::Empty,);
99    {
100        use opentelemetry::trace::TraceContextExt;
101        use tracing_opentelemetry::OpenTelemetrySpanExt;
102        #[cfg(not(any(
103            feature = "opentelemetry_0_17",
104            feature = "opentelemetry_0_18",
105            feature = "opentelemetry_0_19",
106            feature = "opentelemetry_0_20",
107            feature = "opentelemetry_0_21",
108            feature = "opentelemetry_0_22",
109            feature = "opentelemetry_0_23",
110            feature = "opentelemetry_0_24",
111            feature = "opentelemetry_0_25",
112            feature = "opentelemetry_0_26",
113            feature = "opentelemetry_0_27",
114            feature = "opentelemetry_0_28",
115        )))]
116        let trace_id = span.context().span().span_context().trace_id().to_hex();
117
118        #[cfg(any(
119            feature = "opentelemetry_0_17",
120            feature = "opentelemetry_0_18",
121            feature = "opentelemetry_0_19",
122            feature = "opentelemetry_0_20",
123            feature = "opentelemetry_0_21",
124            feature = "opentelemetry_0_22",
125            feature = "opentelemetry_0_23",
126            feature = "opentelemetry_0_24",
127            feature = "opentelemetry_0_25",
128            feature = "opentelemetry_0_26",
129            feature = "opentelemetry_0_27",
130            feature = "opentelemetry_0_28",
131        ))]
132        let trace_id = {
133            let id = span.context().span().span_context().trace_id();
134            format!("{:032x}", id)
135        };
136
137        span.record("trace_id", &tracing::field::display(trace_id));
138    }
139
140    span
141}
142
143pub struct Tracing;
144pub struct TracingMiddleware<S>(S);
145
146impl<S> Transform<S, ConnectRequest> for Tracing
147where
148    S: Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
149{
150    type Transform = TracingMiddleware<S>;
151
152    fn new_transform(self, service: S) -> Self::Transform {
153        TracingMiddleware(service)
154    }
155}
156
157impl<S> Service<ConnectRequest> for TracingMiddleware<S>
158where
159    S: Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
160{
161    type Response = ConnectResponse;
162    type Error = SendRequestError;
163    type Future = Instrumented<TracingFuture<S::Future>>;
164
165    fn poll_ready(&self, ctx: &mut core::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
166        self.0.poll_ready(ctx)
167    }
168
169    fn call(&self, mut req: ConnectRequest) -> Self::Future {
170        let request_head = match &req {
171            ConnectRequest::Client(head_type, _, _) => head_type.as_ref(),
172            ConnectRequest::Tunnel(head, _) => head,
173        };
174
175        let span = tracing::info_span!(
176            "HTTP Client",
177            otel.kind = "client",
178            otel.status_code = tracing::field::Empty,
179            http.method = tracing::field::display(&request_head.method),
180            http.url = tracing::field::display(&request_head.uri),
181            http.flavor = tracing::field::Empty,
182            http.status_code = tracing::field::Empty,
183            net.peer.ip = tracing::field::Empty,
184            exception.message = tracing::field::Empty,
185            exception.details = tracing::field::Empty,
186        );
187
188        match request_head.version {
189            actix_http::Version::HTTP_09 => {
190                span.record("http.flavor", "0.9");
191            }
192            actix_http::Version::HTTP_10 => {
193                span.record("http.flavor", "1.0");
194            }
195            actix_http::Version::HTTP_11 => {
196                span.record("http.flavor", "1.1");
197            }
198            actix_http::Version::HTTP_2 => {
199                span.record("http.flavor", "2.0");
200            }
201            actix_http::Version::HTTP_3 => {
202                span.record("http.flavor", "3.0");
203            }
204            _ => (),
205        }
206
207        if let Some(peer_ip) = &request_head.peer_addr {
208            span.record("net.peer.ip", &tracing::field::display(&peer_ip.ip()));
209        }
210
211        match &mut req {
212            ConnectRequest::Client(ref mut head_type, _, _) => match head_type {
213                RequestHeadType::Owned(ref mut head) => {
214                    record_otel(head.headers_mut(), &span);
215                }
216                RequestHeadType::Rc(_, ref mut extras) => {
217                    let mut owned = extras.take().unwrap_or_default();
218                    record_otel(&mut owned, &span);
219                    *extras = Some(owned);
220                }
221            },
222            ConnectRequest::Tunnel(head, _) => record_otel(head.headers_mut(), &span),
223        }
224
225        TracingFuture {
226            future: span
227                .in_scope(|| self.0.call(req))
228                .instrument(tracing::trace_span!(parent: None, "Http Request Inner")),
229        }
230        .instrument(span)
231    }
232}
233
234pin_project_lite::pin_project! {
235    pub struct TracingFuture<F> {
236        #[pin]
237        future: Instrumented<F>,
238    }
239}
240
241impl<F> Future for TracingFuture<F>
242where
243    F: Future<Output = Result<ConnectResponse, SendRequestError>>,
244{
245    type Output = Result<ConnectResponse, SendRequestError>;
246
247    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248        let this = self.as_mut().project();
249
250        let span = Span::current();
251
252        let future = this.future;
253
254        future
255            .poll(cx)
256            .map_ok(|succ| match succ {
257                ConnectResponse::Client(client_response) => {
258                    let status: i32 = client_response.status().as_u16().into();
259                    span.record("http.status_code", status);
260                    if client_response.status().is_client_error() {
261                        span.record("otel.status_code", "ERROR");
262                    }
263
264                    ConnectResponse::Client(client_response.map_body(|_, payload| {
265                        let payload = InstrumentedBody::new(payload);
266                        let payload: Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>> =
267                            Box::pin(payload);
268
269                        Payload::Stream { payload }
270                    }))
271                }
272                ConnectResponse::Tunnel(response_head, etc) => {
273                    let status: i32 = response_head.status.as_u16().into();
274                    span.record("http.status_code", status);
275                    if response_head.status.is_client_error() {
276                        span.record("otel.status_code", "ERROR");
277                    }
278                    ConnectResponse::Tunnel(response_head, etc)
279                }
280            })
281            .map_err(|err| {
282                span.record("otel.status_code", "ERROR");
283                span.record(
284                    "exception.message",
285                    &tracing::field::display(&format!("{}", err)),
286                );
287                span.record(
288                    "exception.details",
289                    &tracing::field::display(&format!("{:?}", err)),
290                );
291
292                #[cfg(feature = "emit_event_on_error")]
293                tracing::warn!("Error in request: {}", err);
294
295                err
296            })
297    }
298}
299
300pin_project_lite::pin_project! {
301    struct InstrumentedBody<S> {
302        span: Option<Span>,
303        dummy_span: Option<Span>,
304
305        #[pin]
306        body: S,
307    }
308}
309
310impl<S> InstrumentedBody<S>
311where
312    S: Stream<Item = Result<Bytes, PayloadError>>,
313{
314    fn new(body: S) -> InstrumentedBody<S> {
315        InstrumentedBody {
316            span: None,
317            dummy_span: None,
318            body,
319        }
320    }
321}
322
323impl<S> Stream for InstrumentedBody<S>
324where
325    S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
326{
327    type Item = <S as Stream>::Item;
328
329    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
330        let this = self.as_mut().project();
331
332        let span = this
333            .span
334            .get_or_insert_with(|| tracing::info_span!("HTTP Client Response Body"));
335
336        let dummy_span = this.dummy_span.get_or_insert_with(
337            || tracing::trace_span!(parent: None, "HTTP Client Response Body Inner"),
338        );
339
340        let body = this.body;
341
342        let res = span.in_scope(|| dummy_span.in_scope(|| body.poll_next(cx)));
343
344        match res {
345            Poll::Ready(None) => {
346                this.span.take();
347                this.dummy_span.take();
348                Poll::Ready(None)
349            }
350            otherwise => otherwise,
351        }
352    }
353}
354
355#[derive(Debug)]
356#[cfg(any(
357    feature = "opentelemetry_0_13",
358    feature = "opentelemetry_0_14",
359    feature = "opentelemetry_0_15",
360    feature = "opentelemetry_0_16",
361    feature = "opentelemetry_0_17",
362    feature = "opentelemetry_0_18",
363    feature = "opentelemetry_0_19",
364    feature = "opentelemetry_0_20",
365    feature = "opentelemetry_0_21",
366    feature = "opentelemetry_0_22",
367    feature = "opentelemetry_0_23",
368    feature = "opentelemetry_0_24",
369    feature = "opentelemetry_0_25",
370    feature = "opentelemetry_0_26",
371    feature = "opentelemetry_0_27",
372    feature = "opentelemetry_0_28",
373))]
374struct RequestHeaderCarrier<'a> {
375    #[allow(dead_code)]
376    headers: &'a mut HeaderMap,
377}
378
379#[cfg(any(
380    feature = "opentelemetry_0_13",
381    feature = "opentelemetry_0_14",
382    feature = "opentelemetry_0_15",
383    feature = "opentelemetry_0_16",
384    feature = "opentelemetry_0_17",
385    feature = "opentelemetry_0_18",
386    feature = "opentelemetry_0_19",
387    feature = "opentelemetry_0_20",
388    feature = "opentelemetry_0_21",
389    feature = "opentelemetry_0_22",
390    feature = "opentelemetry_0_23",
391    feature = "opentelemetry_0_24",
392    feature = "opentelemetry_0_25",
393    feature = "opentelemetry_0_26",
394    feature = "opentelemetry_0_27",
395    feature = "opentelemetry_0_28",
396))]
397impl<'a> opentelemetry::propagation::Injector for RequestHeaderCarrier<'a> {
398    fn set(&mut self, key: &str, value: String) {
399        let f = || {
400            use actix_http::header::{HeaderName, HeaderValue};
401            use std::convert::TryFrom;
402
403            let key = HeaderName::from_bytes(key.as_bytes())
404                .map_err(|e| {
405                    tracing::warn!("Failed to inject header {}: {}", key, e);
406                })
407                .ok()?;
408            let value = HeaderValue::try_from(value)
409                .map_err(|e| {
410                    tracing::warn!("Failed to inject header value for {}: {}", key, e);
411                })
412                .ok()?;
413
414            self.headers.insert(key, value);
415            Some(())
416        };
417
418        let _ = (f)();
419    }
420}
421
422fn record_otel(_headers: &mut HeaderMap, _span: &Span) {
423    #[cfg(any(
424        feature = "opentelemetry_0_13",
425        feature = "opentelemetry_0_14",
426        feature = "opentelemetry_0_15",
427        feature = "opentelemetry_0_16",
428        feature = "opentelemetry_0_17",
429        feature = "opentelemetry_0_18",
430        feature = "opentelemetry_0_19",
431        feature = "opentelemetry_0_20",
432        feature = "opentelemetry_0_21",
433        feature = "opentelemetry_0_22",
434        feature = "opentelemetry_0_23",
435        feature = "opentelemetry_0_24",
436        feature = "opentelemetry_0_25",
437        feature = "opentelemetry_0_26",
438        feature = "opentelemetry_0_27",
439        feature = "opentelemetry_0_28",
440    ))]
441    {
442        let span = _span;
443        let headers = _headers;
444        use tracing_opentelemetry::OpenTelemetrySpanExt;
445
446        let mut carrier = RequestHeaderCarrier { headers };
447
448        let context = span.context();
449
450        opentelemetry::global::get_text_map_propagator(|propagator| {
451            propagator.inject_context(&context, &mut carrier);
452        });
453    };
454}