actix_web_opentelemetry/
client.rs

1use crate::{
2    middleware::get_scope,
3    util::{http_method_str, http_url},
4};
5use actix_http::{encoding::Decoder, BoxedPayloadStream, Error, Payload};
6use actix_web::{
7    body::MessageBody,
8    http::{
9        self,
10        header::{HeaderName, HeaderValue},
11    },
12    web::Bytes,
13};
14use awc::{
15    error::SendRequestError,
16    http::header::{CONTENT_LENGTH, USER_AGENT},
17    ClientRequest, ClientResponse,
18};
19use futures_util::{future::TryFutureExt as _, Future, Stream};
20use opentelemetry::{
21    global,
22    propagation::Injector,
23    trace::{SpanKind, Status, TraceContextExt, Tracer},
24    Context, KeyValue,
25};
26use opentelemetry_semantic_conventions::{
27    attribute::MESSAGING_MESSAGE_BODY_SIZE,
28    trace::{
29        HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, SERVER_ADDRESS, SERVER_PORT, URL_FULL,
30        USER_AGENT_ORIGINAL,
31    },
32};
33use serde::Serialize;
34use std::mem;
35use std::str::FromStr;
36use std::{
37    borrow::Cow,
38    fmt::{self, Debug},
39};
40
41/// A wrapper for the actix-web [awc::ClientRequest].
42pub struct InstrumentedClientRequest {
43    cx: Context,
44    attrs: Vec<KeyValue>,
45    span_namer: fn(&ClientRequest) -> String,
46    request: ClientRequest,
47}
48
49impl Debug for InstrumentedClientRequest {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        let span_namer = fmt::Pointer::fmt(&(self.span_namer as usize as *const ()), f);
52        f.debug_struct("InstrumentedClientRequest")
53            .field("cx", &self.cx)
54            .field("attrs", &self.attrs)
55            .field("span_namer", &span_namer)
56            .field("request", &self.request)
57            .finish()
58    }
59}
60
61fn default_span_namer(request: &ClientRequest) -> String {
62    format!(
63        "{} {}",
64        request.get_method(),
65        request.get_uri().host().unwrap_or_default()
66    )
67}
68
69/// OpenTelemetry extensions for actix-web's [awc::Client].
70pub trait ClientExt {
71    /// Trace an [awc::Client] request using the current context.
72    ///
73    /// Example:
74    /// ```no_run
75    /// use actix_web_opentelemetry::ClientExt;
76    /// use awc::{Client, error::SendRequestError};
77    ///
78    /// async fn execute_request(client: &Client) -> Result<(), SendRequestError> {
79    ///     let res = client.get("http://localhost:8080")
80    ///         // Add `trace_request` before `send` to any awc request to add instrumentation
81    ///         .trace_request()
82    ///         .send()
83    ///         .await?;
84    ///
85    ///     println!("Response: {:?}", res);
86    ///     Ok(())
87    /// }
88    /// ```
89    fn trace_request(self) -> InstrumentedClientRequest
90    where
91        Self: Sized,
92    {
93        self.trace_request_with_context(Context::current())
94    }
95
96    /// Trace an [awc::Client] request using the given span context.
97    ///
98    /// Example:
99    /// ```no_run
100    /// use actix_web_opentelemetry::ClientExt;
101    /// use awc::{Client, error::SendRequestError};
102    /// use opentelemetry::Context;
103    ///
104    /// async fn execute_request(client: &Client) -> Result<(), SendRequestError> {
105    ///     let res = client.get("http://localhost:8080")
106    ///         // Add `trace_request_with_context` before `send` to any awc request to
107    ///         // add instrumentation
108    ///         .trace_request_with_context(Context::current())
109    ///         .send()
110    ///         .await?;
111    ///
112    ///     println!("Response: {:?}", res);
113    ///     Ok(())
114    /// }
115    /// ```
116    fn trace_request_with_context(self, cx: Context) -> InstrumentedClientRequest;
117}
118
119impl ClientExt for ClientRequest {
120    fn trace_request_with_context(self, cx: Context) -> InstrumentedClientRequest {
121        InstrumentedClientRequest {
122            cx,
123            attrs: Vec::with_capacity(8),
124            span_namer: default_span_namer,
125            request: self,
126        }
127    }
128}
129
130type AwcResult = Result<ClientResponse<Decoder<Payload<BoxedPayloadStream>>>, SendRequestError>;
131
132impl InstrumentedClientRequest {
133    /// Generate an [`awc::ClientResponse`] from a traced request with an empty body.
134    pub async fn send(self) -> AwcResult {
135        self.trace_request(|request| request.send()).await
136    }
137
138    /// Generate an [awc::ClientResponse] from a traced request with the given body.
139    pub async fn send_body<B>(self, body: B) -> AwcResult
140    where
141        B: MessageBody + 'static,
142    {
143        self.trace_request(|request| request.send_body(body)).await
144    }
145
146    /// Generate an [awc::ClientResponse] from a traced request with the given form
147    /// body.
148    pub async fn send_form<T: Serialize>(self, value: &T) -> AwcResult {
149        self.trace_request(|request| request.send_form(value)).await
150    }
151
152    /// Generate an [awc::ClientResponse] from a traced request with the given JSON
153    /// body.
154    pub async fn send_json<T: Serialize>(self, value: &T) -> AwcResult {
155        self.trace_request(|request| request.send_json(value)).await
156    }
157
158    /// Generate an [awc::ClientResponse] from a traced request with the given stream
159    /// body.
160    pub async fn send_stream<S, E>(self, stream: S) -> AwcResult
161    where
162        S: Stream<Item = Result<Bytes, E>> + Unpin + 'static,
163        E: std::error::Error + Into<Error> + 'static,
164    {
165        self.trace_request(|request| request.send_stream(stream))
166            .await
167    }
168
169    async fn trace_request<F, R>(mut self, f: F) -> AwcResult
170    where
171        F: FnOnce(ClientRequest) -> R,
172        R: Future<Output = AwcResult>,
173    {
174        let tracer = global::tracer_with_scope(get_scope());
175
176        // Client attributes
177        // https://github.com/open-telemetry/semantic-conventions/blob/v1.21.0/docs/http/http-spans.md#http-client
178        self.attrs.extend(
179            &mut [
180                KeyValue::new(
181                    SERVER_ADDRESS,
182                    self.request
183                        .get_uri()
184                        .host()
185                        .map(|u| Cow::Owned(u.to_string()))
186                        .unwrap_or(Cow::Borrowed("unknown")),
187                ),
188                KeyValue::new(
189                    HTTP_REQUEST_METHOD,
190                    http_method_str(self.request.get_method()),
191                ),
192                KeyValue::new(URL_FULL, http_url(self.request.get_uri())),
193            ]
194            .into_iter(),
195        );
196
197        if let Some(peer_port) = self.request.get_uri().port_u16() {
198            if peer_port != 80 && peer_port != 443 {
199                self.attrs
200                    .push(KeyValue::new(SERVER_PORT, peer_port as i64));
201            }
202        }
203
204        if let Some(user_agent) = self
205            .request
206            .headers()
207            .get(USER_AGENT)
208            .and_then(|len| len.to_str().ok())
209        {
210            self.attrs
211                .push(KeyValue::new(USER_AGENT_ORIGINAL, user_agent.to_string()))
212        }
213
214        if let Some(content_length) = self.request.headers().get(CONTENT_LENGTH).and_then(|len| {
215            len.to_str()
216                .ok()
217                .and_then(|str_len| str_len.parse::<i64>().ok())
218        }) {
219            self.attrs
220                .push(KeyValue::new(MESSAGING_MESSAGE_BODY_SIZE, content_length))
221        }
222
223        let span = tracer
224            .span_builder((self.span_namer)(&self.request))
225            .with_kind(SpanKind::Client)
226            .with_attributes(mem::take(&mut self.attrs))
227            .start_with_context(&tracer, &self.cx);
228        let cx = self.cx.with_span(span);
229
230        global::get_text_map_propagator(|injector| {
231            injector.inject_context(&cx, &mut ActixClientCarrier::new(&mut self.request));
232        });
233
234        f(self.request)
235            .inspect_ok(|res| record_response(res, &cx))
236            .inspect_err(|err| record_err(err, &cx))
237            .await
238    }
239
240    /// Add additional attributes to the instrumented span for a given request.
241    ///
242    /// The standard otel attributes will still be tracked.
243    ///
244    /// Example:
245    /// ```
246    /// use actix_web_opentelemetry::ClientExt;
247    /// use awc::{Client, error::SendRequestError};
248    /// use opentelemetry::KeyValue;
249    ///
250    /// async fn execute_request(client: &Client) -> Result<(), SendRequestError> {
251    ///     let attrs = [KeyValue::new("dye-key", "dye-value")];
252    ///     let res = client.get("http://localhost:8080")
253    ///         // Add `trace_request` before `send` to any awc request to add instrumentation
254    ///         .trace_request()
255    ///         .with_attributes(attrs)
256    ///         .send()
257    ///         .await?;
258    ///
259    ///     println!("Response: {:?}", res);
260    ///     Ok(())
261    /// }
262    /// ```
263    pub fn with_attributes(
264        mut self,
265        attrs: impl IntoIterator<Item = KeyValue>,
266    ) -> InstrumentedClientRequest {
267        self.attrs.extend(&mut attrs.into_iter());
268        self
269    }
270
271    /// Customise the Span Name, for example to reduce cardinality
272    ///
273    /// Example:
274    /// ```
275    /// use actix_web_opentelemetry::ClientExt;
276    /// use awc::{Client, error::SendRequestError};
277    ///
278    /// async fn execute_request(client: &Client) -> Result<(), SendRequestError> {
279    ///     let res = client.get("http://localhost:8080")
280    ///         // Add `trace_request` before `send` to any awc request to add instrumentation
281    ///         .trace_request()
282    ///         .with_span_namer(|r| format!("HTTP {}", r.get_method()))
283    ///         .send()
284    ///         .await?;
285    ///
286    ///     println!("Response: {:?}", res);
287    ///     Ok(())
288    /// }
289    /// ```
290    pub fn with_span_namer(
291        mut self,
292        span_namer: fn(&ClientRequest) -> String,
293    ) -> InstrumentedClientRequest {
294        self.span_namer = span_namer;
295        self
296    }
297}
298
299// convert http status code to span status following the rules described by the spec:
300// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#status
301fn convert_status(status: http::StatusCode) -> Status {
302    match status.as_u16() {
303        100..=399 => Status::Unset,
304        // since we are the client, we MUST treat 4xx as error
305        400..=599 => Status::error("Unexpected status code"),
306        code => Status::error(format!("Invalid HTTP status code {}", code)),
307    }
308}
309
310fn record_response<T>(response: &ClientResponse<T>, cx: &Context) {
311    let span = cx.span();
312    let status = convert_status(response.status());
313    span.set_status(status);
314    span.set_attribute(KeyValue::new(
315        HTTP_RESPONSE_STATUS_CODE,
316        response.status().as_u16() as i64,
317    ));
318    span.end();
319}
320
321fn record_err<T: fmt::Debug>(err: T, cx: &Context) {
322    let span = cx.span();
323    span.set_status(Status::error(format!("{:?}", err)));
324    span.end();
325}
326
327struct ActixClientCarrier<'a> {
328    request: &'a mut ClientRequest,
329}
330
331impl<'a> ActixClientCarrier<'a> {
332    fn new(request: &'a mut ClientRequest) -> Self {
333        ActixClientCarrier { request }
334    }
335}
336
337impl Injector for ActixClientCarrier<'_> {
338    fn set(&mut self, key: &str, value: String) {
339        let header_name = HeaderName::from_str(key).expect("Must be header name");
340        let header_value = HeaderValue::from_str(&value).expect("Must be a header value");
341        self.request.headers_mut().insert(header_name, header_value);
342    }
343}