1use actix_http::{error::PayloadError, header::map::HeaderMap, Payload, RequestHeadType};
2use actix_service::Service;
3use awc::{error::SendRequestError, middleware::Transform, ConnectRequest, ConnectResponse};
4use bytes::Bytes;
5use futures_core::stream::Stream;
6use std::{
7 future::Future,
8 pin::Pin,
9 task::{Context, Poll},
10};
11use tracing::{instrument::Instrumented, Instrument, Span};
12
13#[cfg(feature = "opentelemetry_0_13")]
14use opentelemetry_0_13_pkg as opentelemetry;
15#[cfg(feature = "opentelemetry_0_14")]
16use opentelemetry_0_14_pkg as opentelemetry;
17#[cfg(feature = "opentelemetry_0_15")]
18use opentelemetry_0_15_pkg as opentelemetry;
19#[cfg(feature = "opentelemetry_0_16")]
20use opentelemetry_0_16_pkg as opentelemetry;
21#[cfg(feature = "opentelemetry_0_17")]
22use opentelemetry_0_17_pkg as opentelemetry;
23#[cfg(feature = "opentelemetry_0_18")]
24use opentelemetry_0_18_pkg as opentelemetry;
25#[cfg(feature = "opentelemetry_0_19")]
26use opentelemetry_0_19_pkg as opentelemetry;
27#[cfg(feature = "opentelemetry_0_20")]
28use opentelemetry_0_20_pkg as opentelemetry;
29#[cfg(feature = "opentelemetry_0_21")]
30use opentelemetry_0_21_pkg as opentelemetry;
31#[cfg(feature = "opentelemetry_0_22")]
32use opentelemetry_0_22_pkg as opentelemetry;
33#[cfg(feature = "opentelemetry_0_23")]
34use opentelemetry_0_23_pkg as opentelemetry;
35#[cfg(feature = "opentelemetry_0_24")]
36use opentelemetry_0_24_pkg as opentelemetry;
37#[cfg(feature = "opentelemetry_0_25")]
38use opentelemetry_0_25_pkg as opentelemetry;
39#[cfg(feature = "opentelemetry_0_26")]
40use opentelemetry_0_26_pkg as opentelemetry;
41#[cfg(feature = "opentelemetry_0_27")]
42use opentelemetry_0_27_pkg as opentelemetry;
43#[cfg(feature = "opentelemetry_0_28")]
44use opentelemetry_0_28_pkg as opentelemetry;
45
46#[cfg(feature = "opentelemetry_0_13")]
47use tracing_opentelemetry_0_12_pkg as tracing_opentelemetry;
48#[cfg(feature = "opentelemetry_0_14")]
49use tracing_opentelemetry_0_13_pkg as tracing_opentelemetry;
50#[cfg(feature = "opentelemetry_0_15")]
51use tracing_opentelemetry_0_14_pkg as tracing_opentelemetry;
52#[cfg(feature = "opentelemetry_0_16")]
53use tracing_opentelemetry_0_16_pkg as tracing_opentelemetry;
54#[cfg(feature = "opentelemetry_0_17")]
55use tracing_opentelemetry_0_17_pkg as tracing_opentelemetry;
56#[cfg(feature = "opentelemetry_0_18")]
57use tracing_opentelemetry_0_18_pkg as tracing_opentelemetry;
58#[cfg(feature = "opentelemetry_0_19")]
59use tracing_opentelemetry_0_19_pkg as tracing_opentelemetry;
60#[cfg(feature = "opentelemetry_0_20")]
61use tracing_opentelemetry_0_21_pkg as tracing_opentelemetry;
62#[cfg(feature = "opentelemetry_0_21")]
63use tracing_opentelemetry_0_22_pkg as tracing_opentelemetry;
64#[cfg(feature = "opentelemetry_0_22")]
65use tracing_opentelemetry_0_23_pkg as tracing_opentelemetry;
66#[cfg(feature = "opentelemetry_0_23")]
67use tracing_opentelemetry_0_24_pkg as tracing_opentelemetry;
68#[cfg(feature = "opentelemetry_0_24")]
69use tracing_opentelemetry_0_25_pkg as tracing_opentelemetry;
70#[cfg(feature = "opentelemetry_0_25")]
71use tracing_opentelemetry_0_26_pkg as tracing_opentelemetry;
72#[cfg(feature = "opentelemetry_0_26")]
73use tracing_opentelemetry_0_27_pkg as tracing_opentelemetry;
74#[cfg(feature = "opentelemetry_0_27")]
75use tracing_opentelemetry_0_28_pkg as tracing_opentelemetry;
76#[cfg(feature = "opentelemetry_0_28")]
77use tracing_opentelemetry_0_29_pkg as tracing_opentelemetry;
78
79#[cfg(any(
80 feature = "opentelemetry_0_13",
81 feature = "opentelemetry_0_14",
82 feature = "opentelemetry_0_15",
83 feature = "opentelemetry_0_16",
84 feature = "opentelemetry_0_17",
85 feature = "opentelemetry_0_18",
86 feature = "opentelemetry_0_19",
87 feature = "opentelemetry_0_20",
88 feature = "opentelemetry_0_21",
89 feature = "opentelemetry_0_22",
90 feature = "opentelemetry_0_23",
91 feature = "opentelemetry_0_24",
92 feature = "opentelemetry_0_25",
93 feature = "opentelemetry_0_26",
94 feature = "opentelemetry_0_27",
95 feature = "opentelemetry_0_28",
96))]
97pub fn root_span() -> Span {
98 let span = tracing::info_span!("Root span", trace_id = tracing::field::Empty,);
99 {
100 use opentelemetry::trace::TraceContextExt;
101 use tracing_opentelemetry::OpenTelemetrySpanExt;
102 #[cfg(not(any(
103 feature = "opentelemetry_0_17",
104 feature = "opentelemetry_0_18",
105 feature = "opentelemetry_0_19",
106 feature = "opentelemetry_0_20",
107 feature = "opentelemetry_0_21",
108 feature = "opentelemetry_0_22",
109 feature = "opentelemetry_0_23",
110 feature = "opentelemetry_0_24",
111 feature = "opentelemetry_0_25",
112 feature = "opentelemetry_0_26",
113 feature = "opentelemetry_0_27",
114 feature = "opentelemetry_0_28",
115 )))]
116 let trace_id = span.context().span().span_context().trace_id().to_hex();
117
118 #[cfg(any(
119 feature = "opentelemetry_0_17",
120 feature = "opentelemetry_0_18",
121 feature = "opentelemetry_0_19",
122 feature = "opentelemetry_0_20",
123 feature = "opentelemetry_0_21",
124 feature = "opentelemetry_0_22",
125 feature = "opentelemetry_0_23",
126 feature = "opentelemetry_0_24",
127 feature = "opentelemetry_0_25",
128 feature = "opentelemetry_0_26",
129 feature = "opentelemetry_0_27",
130 feature = "opentelemetry_0_28",
131 ))]
132 let trace_id = {
133 let id = span.context().span().span_context().trace_id();
134 format!("{:032x}", id)
135 };
136
137 span.record("trace_id", &tracing::field::display(trace_id));
138 }
139
140 span
141}
142
143pub struct Tracing;
144pub struct TracingMiddleware<S>(S);
145
146impl<S> Transform<S, ConnectRequest> for Tracing
147where
148 S: Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
149{
150 type Transform = TracingMiddleware<S>;
151
152 fn new_transform(self, service: S) -> Self::Transform {
153 TracingMiddleware(service)
154 }
155}
156
157impl<S> Service<ConnectRequest> for TracingMiddleware<S>
158where
159 S: Service<ConnectRequest, Response = ConnectResponse, Error = SendRequestError>,
160{
161 type Response = ConnectResponse;
162 type Error = SendRequestError;
163 type Future = Instrumented<TracingFuture<S::Future>>;
164
165 fn poll_ready(&self, ctx: &mut core::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
166 self.0.poll_ready(ctx)
167 }
168
169 fn call(&self, mut req: ConnectRequest) -> Self::Future {
170 let request_head = match &req {
171 ConnectRequest::Client(head_type, _, _) => head_type.as_ref(),
172 ConnectRequest::Tunnel(head, _) => head,
173 };
174
175 let span = tracing::info_span!(
176 "HTTP Client",
177 otel.kind = "client",
178 otel.status_code = tracing::field::Empty,
179 http.method = tracing::field::display(&request_head.method),
180 http.url = tracing::field::display(&request_head.uri),
181 http.flavor = tracing::field::Empty,
182 http.status_code = tracing::field::Empty,
183 net.peer.ip = tracing::field::Empty,
184 exception.message = tracing::field::Empty,
185 exception.details = tracing::field::Empty,
186 );
187
188 match request_head.version {
189 actix_http::Version::HTTP_09 => {
190 span.record("http.flavor", "0.9");
191 }
192 actix_http::Version::HTTP_10 => {
193 span.record("http.flavor", "1.0");
194 }
195 actix_http::Version::HTTP_11 => {
196 span.record("http.flavor", "1.1");
197 }
198 actix_http::Version::HTTP_2 => {
199 span.record("http.flavor", "2.0");
200 }
201 actix_http::Version::HTTP_3 => {
202 span.record("http.flavor", "3.0");
203 }
204 _ => (),
205 }
206
207 if let Some(peer_ip) = &request_head.peer_addr {
208 span.record("net.peer.ip", &tracing::field::display(&peer_ip.ip()));
209 }
210
211 match &mut req {
212 ConnectRequest::Client(ref mut head_type, _, _) => match head_type {
213 RequestHeadType::Owned(ref mut head) => {
214 record_otel(head.headers_mut(), &span);
215 }
216 RequestHeadType::Rc(_, ref mut extras) => {
217 let mut owned = extras.take().unwrap_or_default();
218 record_otel(&mut owned, &span);
219 *extras = Some(owned);
220 }
221 },
222 ConnectRequest::Tunnel(head, _) => record_otel(head.headers_mut(), &span),
223 }
224
225 TracingFuture {
226 future: span
227 .in_scope(|| self.0.call(req))
228 .instrument(tracing::trace_span!(parent: None, "Http Request Inner")),
229 }
230 .instrument(span)
231 }
232}
233
234pin_project_lite::pin_project! {
235 pub struct TracingFuture<F> {
236 #[pin]
237 future: Instrumented<F>,
238 }
239}
240
241impl<F> Future for TracingFuture<F>
242where
243 F: Future<Output = Result<ConnectResponse, SendRequestError>>,
244{
245 type Output = Result<ConnectResponse, SendRequestError>;
246
247 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248 let this = self.as_mut().project();
249
250 let span = Span::current();
251
252 let future = this.future;
253
254 future
255 .poll(cx)
256 .map_ok(|succ| match succ {
257 ConnectResponse::Client(client_response) => {
258 let status: i32 = client_response.status().as_u16().into();
259 span.record("http.status_code", status);
260 if client_response.status().is_client_error() {
261 span.record("otel.status_code", "ERROR");
262 }
263
264 ConnectResponse::Client(client_response.map_body(|_, payload| {
265 let payload = InstrumentedBody::new(payload);
266 let payload: Pin<Box<dyn Stream<Item = Result<Bytes, PayloadError>>>> =
267 Box::pin(payload);
268
269 Payload::Stream { payload }
270 }))
271 }
272 ConnectResponse::Tunnel(response_head, etc) => {
273 let status: i32 = response_head.status.as_u16().into();
274 span.record("http.status_code", status);
275 if response_head.status.is_client_error() {
276 span.record("otel.status_code", "ERROR");
277 }
278 ConnectResponse::Tunnel(response_head, etc)
279 }
280 })
281 .map_err(|err| {
282 span.record("otel.status_code", "ERROR");
283 span.record(
284 "exception.message",
285 &tracing::field::display(&format!("{}", err)),
286 );
287 span.record(
288 "exception.details",
289 &tracing::field::display(&format!("{:?}", err)),
290 );
291
292 #[cfg(feature = "emit_event_on_error")]
293 tracing::warn!("Error in request: {}", err);
294
295 err
296 })
297 }
298}
299
300pin_project_lite::pin_project! {
301 struct InstrumentedBody<S> {
302 span: Option<Span>,
303 dummy_span: Option<Span>,
304
305 #[pin]
306 body: S,
307 }
308}
309
310impl<S> InstrumentedBody<S>
311where
312 S: Stream<Item = Result<Bytes, PayloadError>>,
313{
314 fn new(body: S) -> InstrumentedBody<S> {
315 InstrumentedBody {
316 span: None,
317 dummy_span: None,
318 body,
319 }
320 }
321}
322
323impl<S> Stream for InstrumentedBody<S>
324where
325 S: Stream<Item = Result<Bytes, PayloadError>> + Unpin,
326{
327 type Item = <S as Stream>::Item;
328
329 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
330 let this = self.as_mut().project();
331
332 let span = this
333 .span
334 .get_or_insert_with(|| tracing::info_span!("HTTP Client Response Body"));
335
336 let dummy_span = this.dummy_span.get_or_insert_with(
337 || tracing::trace_span!(parent: None, "HTTP Client Response Body Inner"),
338 );
339
340 let body = this.body;
341
342 let res = span.in_scope(|| dummy_span.in_scope(|| body.poll_next(cx)));
343
344 match res {
345 Poll::Ready(None) => {
346 this.span.take();
347 this.dummy_span.take();
348 Poll::Ready(None)
349 }
350 otherwise => otherwise,
351 }
352 }
353}
354
355#[derive(Debug)]
356#[cfg(any(
357 feature = "opentelemetry_0_13",
358 feature = "opentelemetry_0_14",
359 feature = "opentelemetry_0_15",
360 feature = "opentelemetry_0_16",
361 feature = "opentelemetry_0_17",
362 feature = "opentelemetry_0_18",
363 feature = "opentelemetry_0_19",
364 feature = "opentelemetry_0_20",
365 feature = "opentelemetry_0_21",
366 feature = "opentelemetry_0_22",
367 feature = "opentelemetry_0_23",
368 feature = "opentelemetry_0_24",
369 feature = "opentelemetry_0_25",
370 feature = "opentelemetry_0_26",
371 feature = "opentelemetry_0_27",
372 feature = "opentelemetry_0_28",
373))]
374struct RequestHeaderCarrier<'a> {
375 #[allow(dead_code)]
376 headers: &'a mut HeaderMap,
377}
378
379#[cfg(any(
380 feature = "opentelemetry_0_13",
381 feature = "opentelemetry_0_14",
382 feature = "opentelemetry_0_15",
383 feature = "opentelemetry_0_16",
384 feature = "opentelemetry_0_17",
385 feature = "opentelemetry_0_18",
386 feature = "opentelemetry_0_19",
387 feature = "opentelemetry_0_20",
388 feature = "opentelemetry_0_21",
389 feature = "opentelemetry_0_22",
390 feature = "opentelemetry_0_23",
391 feature = "opentelemetry_0_24",
392 feature = "opentelemetry_0_25",
393 feature = "opentelemetry_0_26",
394 feature = "opentelemetry_0_27",
395 feature = "opentelemetry_0_28",
396))]
397impl<'a> opentelemetry::propagation::Injector for RequestHeaderCarrier<'a> {
398 fn set(&mut self, key: &str, value: String) {
399 let f = || {
400 use actix_http::header::{HeaderName, HeaderValue};
401 use std::convert::TryFrom;
402
403 let key = HeaderName::from_bytes(key.as_bytes())
404 .map_err(|e| {
405 tracing::warn!("Failed to inject header {}: {}", key, e);
406 })
407 .ok()?;
408 let value = HeaderValue::try_from(value)
409 .map_err(|e| {
410 tracing::warn!("Failed to inject header value for {}: {}", key, e);
411 })
412 .ok()?;
413
414 self.headers.insert(key, value);
415 Some(())
416 };
417
418 let _ = (f)();
419 }
420}
421
422fn record_otel(_headers: &mut HeaderMap, _span: &Span) {
423 #[cfg(any(
424 feature = "opentelemetry_0_13",
425 feature = "opentelemetry_0_14",
426 feature = "opentelemetry_0_15",
427 feature = "opentelemetry_0_16",
428 feature = "opentelemetry_0_17",
429 feature = "opentelemetry_0_18",
430 feature = "opentelemetry_0_19",
431 feature = "opentelemetry_0_20",
432 feature = "opentelemetry_0_21",
433 feature = "opentelemetry_0_22",
434 feature = "opentelemetry_0_23",
435 feature = "opentelemetry_0_24",
436 feature = "opentelemetry_0_25",
437 feature = "opentelemetry_0_26",
438 feature = "opentelemetry_0_27",
439 feature = "opentelemetry_0_28",
440 ))]
441 {
442 let span = _span;
443 let headers = _headers;
444 use tracing_opentelemetry::OpenTelemetrySpanExt;
445
446 let mut carrier = RequestHeaderCarrier { headers };
447
448 let context = span.context();
449
450 opentelemetry::global::get_text_map_propagator(|propagator| {
451 propagator.inject_context(&context, &mut carrier);
452 });
453 };
454}