Skip to main content

allstak/integrations/
reqwest.rs

1//! Outbound HTTP auto-instrumentation (`reqwest-middleware` feature).
2//!
3//! [`AllstakHttpMiddleware`] is a [`reqwest_middleware::Middleware`] that, for
4//! every outbound request, with zero per-call code:
5//!
6//! - opens an `http.client` child span under the active scope's trace/span,
7//! - injects the active trace context (`traceparent` + `X-AllStak-*`) into the
8//!   outbound request headers so the downstream service continues the trace,
9//! - records an outbound [`HttpRequestRecord`] to `/ingest/v1/http-requests`
10//!   with the status, duration, host, path and trace ids.
11//!
12//! ## Zero-config usage
13//!
14//! ```no_run
15//! # #[cfg(feature = "reqwest-middleware")]
16//! # fn demo() -> reqwest_middleware::ClientWithMiddleware {
17//! // After `allstak::init(...)`, wrap your reqwest client once:
18//! let client = allstak::instrumented_http_client();
19//! // Every request through `client` is now traced + recorded automatically.
20//! client
21//! # }
22//! ```
23//!
24//! Or attach the middleware to an existing builder:
25//!
26//! ```no_run
27//! # #[cfg(feature = "reqwest-middleware")]
28//! # fn demo() -> reqwest_middleware::ClientWithMiddleware {
29//! use reqwest_middleware::ClientBuilder;
30//! ClientBuilder::new(reqwest::Client::new())
31//!     .with(allstak::integrations::reqwest::AllstakHttpMiddleware::new())
32//!     .build()
33//! # }
34//! ```
35
36use std::time::Instant;
37
38use http::Extensions;
39use reqwest::{Request, Response};
40use reqwest_middleware::{Error, Middleware, Next, Result};
41
42use crate::hub::Hub;
43use crate::performance::Span;
44use crate::propagation;
45use crate::protocol::HttpRequestRecord;
46use crate::util;
47
48/// reqwest middleware that traces and records outbound HTTP requests.
49///
50/// Behaviour is individually toggleable: span emission, header injection and
51/// request recording can each be turned off, but all are on by default.
52#[derive(Clone, Debug)]
53pub struct AllstakHttpMiddleware {
54    start_span: bool,
55    inject_headers: bool,
56    record_request: bool,
57    operation: &'static str,
58}
59
60impl Default for AllstakHttpMiddleware {
61    fn default() -> Self {
62        AllstakHttpMiddleware {
63            start_span: true,
64            inject_headers: true,
65            record_request: true,
66            operation: "http.client",
67        }
68    }
69}
70
71impl AllstakHttpMiddleware {
72    /// New middleware with span, header injection and request recording all on.
73    pub fn new() -> Self {
74        AllstakHttpMiddleware::default()
75    }
76
77    /// Toggle opening an `http.client` child span per request.
78    pub fn enable_span(mut self, enable: bool) -> Self {
79        self.start_span = enable;
80        self
81    }
82
83    /// Toggle injecting `traceparent` / `X-AllStak-*` headers downstream.
84    pub fn enable_header_injection(mut self, enable: bool) -> Self {
85        self.inject_headers = enable;
86        self
87    }
88
89    /// Toggle recording an outbound [`HttpRequestRecord`].
90    pub fn enable_request_record(mut self, enable: bool) -> Self {
91        self.record_request = enable;
92        self
93    }
94}
95
96#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
97#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
98impl Middleware for AllstakHttpMiddleware {
99    async fn handle(
100        &self,
101        mut req: Request,
102        extensions: &mut Extensions,
103        next: Next<'_>,
104    ) -> Result<Response> {
105        let hub = Hub::current();
106
107        // Snapshot the active trace context (set by inbound middleware or the
108        // active transaction). When there is no active trace, mint one so the
109        // outbound call is still traceable end-to-end.
110        let mut ctx = hub.current_trace_context();
111        if ctx.trace_id.is_none() {
112            ctx.trace_id = Some(util::new_trace_id());
113        }
114
115        let method = req.method().to_string();
116        let url = req.url().clone();
117        let host = url.host_str().unwrap_or("").to_string();
118        let path = url.path().to_string();
119
120        // Open an http.client child span under the active span.
121        let mut span = if self.start_span {
122            Some(Span::continued(
123                self.operation,
124                format!("{method} {host}{path}"),
125                ctx.trace_id.clone(),
126                ctx.parent_span_id.clone(),
127            ))
128        } else {
129            None
130        };
131        let span_id = span.as_ref().map(|s| s.span_id().to_string());
132
133        // Inject the active trace context into the outbound headers. The span
134        // we just opened becomes the downstream parent.
135        if self.inject_headers {
136            let headers = req.headers_mut();
137            propagation::inject(&ctx, span_id.as_deref(), |name, value| {
138                if let (Ok(hn), Ok(hv)) = (
139                    http::HeaderName::from_bytes(name.as_bytes()),
140                    http::HeaderValue::from_str(value),
141                ) {
142                    headers.insert(hn, hv);
143                }
144            });
145        }
146
147        let started = Instant::now();
148        let result = next.run(req, extensions).await;
149        let duration_ms = started.elapsed().as_millis() as u64;
150
151        // Resolve the outcome status: HTTP status on success, 0 on a transport
152        // error (no response was produced).
153        let status_code = match &result {
154            Ok(resp) => resp.status().as_u16(),
155            Err(Error::Reqwest(e)) => e.status().map(|s| s.as_u16()).unwrap_or(0),
156            Err(_) => 0,
157        };
158
159        if let Some(span) = span.as_mut() {
160            if status_code >= 500 || status_code == 0 {
161                span.set_status("internal_error");
162            } else {
163                span.set_status("ok");
164            }
165            span.set_tag("http.method", method.clone());
166            span.set_tag("http.host", host.clone());
167            span.set_tag("http.status_code", status_code.to_string());
168        }
169        // Finish the span (records to /ingest/v1/spans).
170        if let Some(span) = span.take() {
171            span.finish();
172        }
173
174        if self.record_request {
175            let record = HttpRequestRecord {
176                trace_id: ctx.trace_id.clone(),
177                request_id: ctx.request_id.clone(),
178                direction: "outbound".to_string(),
179                method,
180                host,
181                path,
182                status_code,
183                duration_ms,
184                request_size: None,
185                response_size: None,
186                user_id: None,
187                error_fingerprint: None,
188                timestamp: util::now_iso8601(),
189            };
190            if let Some(client) = hub.client() {
191                client.capture_http_request(record);
192            }
193        }
194
195        result
196    }
197}
198
199/// Build a fully-instrumented [`reqwest_middleware::ClientWithMiddleware`] from
200/// a fresh default [`reqwest::Client`].
201pub fn instrumented_client() -> reqwest_middleware::ClientWithMiddleware {
202    instrumented_client_from(reqwest::Client::new())
203}
204
205/// Wrap an existing [`reqwest::Client`] with the AllStak HTTP middleware.
206pub fn instrumented_client_from(
207    client: reqwest::Client,
208) -> reqwest_middleware::ClientWithMiddleware {
209    reqwest_middleware::ClientBuilder::new(client)
210        .with(AllstakHttpMiddleware::new())
211        .build()
212}