viz_core/middleware/otel/
tracing.rs1use http::{uri::Scheme, HeaderValue};
6use opentelemetry::{
7 global,
8 propagation::Extractor,
9 trace::{
10 FutureExt as OtelFutureExt, Span, SpanKind, Status, TraceContextExt, Tracer, TracerProvider,
11 },
12 Context, InstrumentationScope, KeyValue,
13};
14use opentelemetry_semantic_conventions::trace::{
15 CLIENT_ADDRESS, EXCEPTION_MESSAGE, HTTP_REQUEST_METHOD, HTTP_RESPONSE_STATUS_CODE, HTTP_ROUTE,
16 NETWORK_PROTOCOL_VERSION, SERVER_ADDRESS, SERVER_PORT, URL_PATH, URL_QUERY, URL_SCHEME,
17 USER_AGENT_ORIGINAL,
18};
19
20use crate::{
21 header::{HeaderMap, HeaderName},
22 headers::UserAgent,
23 Handler, IntoResponse, Request, RequestExt, Response, ResponseExt, Result, Transform,
24};
25
26const HTTP_REQUEST_BODY_SIZE: &str = "http.request.body.size";
27const HTTP_RESPONSE_BODY_SIZE: &str = "http.response.body.size";
28
29#[derive(Debug)]
31pub struct Config<T> {
32 tracer: T,
33 name: Option<String>,
34}
35
36impl<T> Config<T> {
37 pub fn new(t: T, name: Option<String>) -> Self {
39 Self { tracer: t, name }
40 }
41}
42
43impl<H, T> Transform<H> for Config<T>
44where
45 T: Clone,
46{
47 type Output = TracingMiddleware<H, T>;
48
49 fn transform(&self, h: H) -> Self::Output {
50 TracingMiddleware {
51 h,
52 tracer: self.tracer.clone(),
53 name: self.name.clone().unwrap_or("tracing".to_string()),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
60pub struct TracingMiddleware<H, T> {
61 h: H,
62 tracer: T,
63 name: String,
64}
65
66#[crate::async_trait]
67impl<H, O, T> Handler<Request> for TracingMiddleware<H, T>
68where
69 H: Handler<Request, Output = Result<O>>,
70 O: IntoResponse,
71 T: TracerProvider + Send + Sync + Clone + 'static,
72 T::Tracer: Tracer + Send + Sync + 'static,
73 <T::Tracer as Tracer>::Span: Span + Send + Sync + 'static,
74{
75 type Output = Result<Response>;
76
77 async fn call(&self, req: Request) -> Self::Output {
78 let parent_context = global::get_text_map_propagator(|propagator| {
79 propagator.extract(&RequestHeaderCarrier::new(req.headers()))
80 });
81
82 let http_route = &req.route_info().pattern;
83 let attributes = build_attributes(&req, http_route.as_str());
84 let scope = InstrumentationScope::builder(self.name.clone())
85 .with_attributes(attributes)
86 .build();
87 let tracer = self.tracer.tracer_with_scope(scope);
88 let mut span = tracer.build_with_context(
89 tracer
90 .span_builder(format!("{} {}", req.method(), http_route))
91 .with_kind(SpanKind::Server),
92 &parent_context,
93 );
94
95 span.add_event("request.started".to_string(), vec![]);
96
97 let resp = self
98 .h
99 .call(req)
100 .with_context(Context::current_with_span(span))
101 .await;
102
103 let cx = Context::current();
104 let span = cx.span();
105
106 match resp {
107 Ok(resp) => {
108 let resp = resp.into_response();
109 span.add_event("request.completed".to_string(), vec![]);
110 span.set_attribute(KeyValue::new(
111 HTTP_RESPONSE_STATUS_CODE,
112 i64::from(resp.status().as_u16()),
113 ));
114 if let Some(content_length) = resp.content_length() {
115 span.set_attribute(KeyValue::new(
116 HTTP_RESPONSE_BODY_SIZE,
117 i64::try_from(content_length).unwrap_or(i64::MAX),
118 ));
119 }
120 if resp.status().is_server_error() {
121 span.set_status(Status::error(
122 resp.status()
123 .canonical_reason()
124 .map(ToString::to_string)
125 .unwrap_or_default(),
126 ));
127 };
128 span.end();
129 Ok(resp)
130 }
131 Err(err) => {
132 span.add_event(
133 "request.error".to_string(),
134 vec![KeyValue::new(EXCEPTION_MESSAGE, err.to_string())],
135 );
136 span.set_status(Status::error(err.to_string()));
137 span.end();
138 Err(err)
139 }
140 }
141 }
142}
143
144struct RequestHeaderCarrier<'a> {
145 headers: &'a HeaderMap,
146}
147
148impl<'a> RequestHeaderCarrier<'a> {
149 const fn new(headers: &'a HeaderMap) -> Self {
150 RequestHeaderCarrier { headers }
151 }
152}
153
154impl Extractor for RequestHeaderCarrier<'_> {
155 fn get(&self, key: &str) -> Option<&str> {
156 self.headers
157 .get(key)
158 .map(HeaderValue::to_str)
159 .and_then(Result::ok)
160 }
161
162 fn keys(&self) -> Vec<&str> {
163 self.headers.keys().map(HeaderName::as_str).collect()
164 }
165}
166
167fn build_attributes(req: &Request, http_route: &str) -> Vec<KeyValue> {
168 let mut attributes = Vec::with_capacity(10);
169 attributes.push(KeyValue::new(HTTP_ROUTE, http_route.to_string()));
171
172 attributes.push(KeyValue::new(HTTP_REQUEST_METHOD, req.method().to_string()));
174 attributes.push(KeyValue::new(
175 NETWORK_PROTOCOL_VERSION,
176 format!("{:?}", req.version()),
177 ));
178
179 if let Some(remote_addr) = req.remote_addr() {
180 attributes.push(KeyValue::new(CLIENT_ADDRESS, remote_addr.to_string()));
181 }
182
183 let uri = req.uri();
184 if let Some(host) = uri.host() {
185 attributes.push(KeyValue::new(SERVER_ADDRESS, host.to_string()));
186 }
187 if let Some(port) = uri
188 .port_u16()
189 .map(i64::from)
190 .filter(|port| *port != 80 && *port != 443)
191 {
192 attributes.push(KeyValue::new(SERVER_PORT, port.to_string()));
193 }
194
195 if let Some(path_query) = uri.path_and_query() {
196 if path_query.path() != "/" {
197 attributes.push(KeyValue::new(URL_PATH, path_query.path().to_string()));
198 }
199 if let Some(query) = path_query.query() {
200 attributes.push(KeyValue::new(URL_QUERY, query.to_string()));
201 }
202 }
203
204 attributes.push(KeyValue::new(
205 URL_SCHEME,
206 uri.scheme().unwrap_or(&Scheme::HTTP).to_string(),
207 ));
208
209 if let Some(content_length) = req
210 .content_length()
211 .and_then(|len| i64::try_from(len).ok())
212 .filter(|len| *len > 0)
213 {
214 attributes.push(KeyValue::new(
215 HTTP_REQUEST_BODY_SIZE,
216 content_length.to_string(),
217 ));
218 }
219
220 if let Some(user_agent) = req
221 .header_typed::<UserAgent>()
222 .as_ref()
223 .map(UserAgent::as_str)
224 {
225 attributes.push(KeyValue::new(USER_AGENT_ORIGINAL, user_agent.to_string()));
226 }
227
228 attributes
229}