1use crate::{
2 middleware::get_scope,
3 util::{http_method_str, http_url},
4};
5use actix_http::{encoding::Decoder, BoxedPayloadStream, Error, Payload};
6use actix_web::{
7 body::MessageBody,
8 http::{
9 self,
10 header::{HeaderName, HeaderValue},
11 },
12 web::Bytes,
13};
14use awc::{
15 error::SendRequestError,
16 http::header::{CONTENT_LENGTH, USER_AGENT},
17 ClientRequest, ClientResponse,
18};
19use futures_util::{future::TryFutureExt as _, Future, Stream};
20use opentelemetry::{
21 global,
22 propagation::Injector,
23 trace::{SpanKind, Status, TraceContextExt, Tracer},
24 Context, KeyValue,
25};
26use opentelemetry_semantic_conventions::{
27 attribute::MESSAGING_MESSAGE_BODY_SIZE,
28 trace::{
29 HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, SERVER_ADDRESS, SERVER_PORT, URL_FULL,
30 USER_AGENT_ORIGINAL,
31 },
32};
33use serde::Serialize;
34use std::mem;
35use std::str::FromStr;
36use std::{
37 borrow::Cow,
38 fmt::{self, Debug},
39};
40
41pub struct InstrumentedClientRequest {
43 cx: Context,
44 attrs: Vec<KeyValue>,
45 span_namer: fn(&ClientRequest) -> String,
46 request: ClientRequest,
47}
48
49impl Debug for InstrumentedClientRequest {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 let span_namer = fmt::Pointer::fmt(&(self.span_namer as usize as *const ()), f);
52 f.debug_struct("InstrumentedClientRequest")
53 .field("cx", &self.cx)
54 .field("attrs", &self.attrs)
55 .field("span_namer", &span_namer)
56 .field("request", &self.request)
57 .finish()
58 }
59}
60
61fn default_span_namer(request: &ClientRequest) -> String {
62 format!(
63 "{} {}",
64 request.get_method(),
65 request.get_uri().host().unwrap_or_default()
66 )
67}
68
69pub trait ClientExt {
71 fn trace_request(self) -> InstrumentedClientRequest
90 where
91 Self: Sized,
92 {
93 self.trace_request_with_context(Context::current())
94 }
95
96 fn trace_request_with_context(self, cx: Context) -> InstrumentedClientRequest;
117}
118
119impl ClientExt for ClientRequest {
120 fn trace_request_with_context(self, cx: Context) -> InstrumentedClientRequest {
121 InstrumentedClientRequest {
122 cx,
123 attrs: Vec::with_capacity(8),
124 span_namer: default_span_namer,
125 request: self,
126 }
127 }
128}
129
130type AwcResult = Result<ClientResponse<Decoder<Payload<BoxedPayloadStream>>>, SendRequestError>;
131
132impl InstrumentedClientRequest {
133 pub async fn send(self) -> AwcResult {
135 self.trace_request(|request| request.send()).await
136 }
137
138 pub async fn send_body<B>(self, body: B) -> AwcResult
140 where
141 B: MessageBody + 'static,
142 {
143 self.trace_request(|request| request.send_body(body)).await
144 }
145
146 pub async fn send_form<T: Serialize>(self, value: &T) -> AwcResult {
149 self.trace_request(|request| request.send_form(value)).await
150 }
151
152 pub async fn send_json<T: Serialize>(self, value: &T) -> AwcResult {
155 self.trace_request(|request| request.send_json(value)).await
156 }
157
158 pub async fn send_stream<S, E>(self, stream: S) -> AwcResult
161 where
162 S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
163 E: std::error::Error + Into<Error> + 'static,
164 {
165 self.trace_request(|request| request.send_stream(stream))
166 .await
167 }
168
169 async fn trace_request<F, R>(mut self, f: F) -> AwcResult
170 where
171 F: FnOnce(ClientRequest) -> R,
172 R: Future<Output = AwcResult>,
173 {
174 let tracer = global::tracer_with_scope(get_scope());
175
176 self.attrs.extend(
179 &mut [
180 KeyValue::new(
181 SERVER_ADDRESS,
182 self.request
183 .get_uri()
184 .host()
185 .map(|u| Cow::Owned(u.to_string()))
186 .unwrap_or(Cow::Borrowed("unknown")),
187 ),
188 KeyValue::new(
189 HTTP_REQUEST_METHOD,
190 http_method_str(self.request.get_method()),
191 ),
192 KeyValue::new(URL_FULL, http_url(self.request.get_uri())),
193 ]
194 .into_iter(),
195 );
196
197 if let Some(peer_port) = self.request.get_uri().port_u16() {
198 if peer_port != 80 && peer_port != 443 {
199 self.attrs
200 .push(KeyValue::new(SERVER_PORT, peer_port as i64));
201 }
202 }
203
204 if let Some(user_agent) = self
205 .request
206 .headers()
207 .get(USER_AGENT)
208 .and_then(|len| len.to_str().ok())
209 {
210 self.attrs
211 .push(KeyValue::new(USER_AGENT_ORIGINAL, user_agent.to_string()))
212 }
213
214 if let Some(content_length) = self.request.headers().get(CONTENT_LENGTH).and_then(|len| {
215 len.to_str()
216 .ok()
217 .and_then(|str_len| str_len.parse::<i64>().ok())
218 }) {
219 self.attrs
220 .push(KeyValue::new(MESSAGING_MESSAGE_BODY_SIZE, content_length))
221 }
222
223 let span = tracer
224 .span_builder((self.span_namer)(&self.request))
225 .with_kind(SpanKind::Client)
226 .with_attributes(mem::take(&mut self.attrs))
227 .start_with_context(&tracer, &self.cx);
228 let cx = self.cx.with_span(span);
229
230 global::get_text_map_propagator(|injector| {
231 injector.inject_context(&cx, &mut ActixClientCarrier::new(&mut self.request));
232 });
233
234 f(self.request)
235 .inspect_ok(|res| record_response(res, &cx))
236 .inspect_err(|err| record_err(err, &cx))
237 .await
238 }
239
240 pub fn with_attributes(
264 mut self,
265 attrs: impl IntoIterator<Item = KeyValue>,
266 ) -> InstrumentedClientRequest {
267 self.attrs.extend(&mut attrs.into_iter());
268 self
269 }
270
271 pub fn with_span_namer(
291 mut self,
292 span_namer: fn(&ClientRequest) -> String,
293 ) -> InstrumentedClientRequest {
294 self.span_namer = span_namer;
295 self
296 }
297}
298
299fn convert_status(status: http::StatusCode) -> Status {
302 match status.as_u16() {
303 100..=399 => Status::Unset,
304 400..=599 => Status::error("Unexpected status code"),
306 code => Status::error(format!("Invalid HTTP status code {}", code)),
307 }
308}
309
310fn record_response<T>(response: &ClientResponse<T>, cx: &Context) {
311 let span = cx.span();
312 let status = convert_status(response.status());
313 span.set_status(status);
314 span.set_attribute(KeyValue::new(
315 HTTP_RESPONSE_STATUS_CODE,
316 response.status().as_u16() as i64,
317 ));
318 span.end();
319}
320
321fn record_err<T: fmt::Debug>(err: T, cx: &Context) {
322 let span = cx.span();
323 span.set_status(Status::error(format!("{:?}", err)));
324 span.end();
325}
326
327struct ActixClientCarrier<'a> {
328 request: &'a mut ClientRequest,
329}
330
331impl<'a> ActixClientCarrier<'a> {
332 fn new(request: &'a mut ClientRequest) -> Self {
333 ActixClientCarrier { request }
334 }
335}
336
337impl Injector for ActixClientCarrier<'_> {
338 fn set(&mut self, key: &str, value: String) {
339 let header_name = HeaderName::from_str(key).expect("Must be header name");
340 let header_value = HeaderValue::from_str(&value).expect("Must be a header value");
341 self.request.headers_mut().insert(header_name, header_value);
342 }
343}