1use std::fmt::{self, Debug, Formatter};
2
3use opentelemetry::trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer};
4use opentelemetry::{Context, KeyValue, global};
5use opentelemetry_http::HeaderExtractor;
6use opentelemetry_semantic_conventions::{resource, trace};
7use salvo_core::http::headers::{self, HeaderMap, HeaderMapExt, HeaderName, HeaderValue};
8use salvo_core::prelude::*;
9
10pub struct Tracing<T> {
12 tracer: T,
13}
14
15impl<T> Tracing<T> {
16 pub fn new(tracer: T) -> Self {
18 Self { tracer }
19 }
20}
21impl<T> Debug for Tracing<T>
22where
23 T: Debug,
24{
25 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
26 f.debug_struct("Tracing")
27 .field("tracer", &self.tracer)
28 .finish()
29 }
30}
31
32#[async_trait]
33impl<T> Handler for Tracing<T>
34where
35 T: Tracer + Sync + Send + 'static,
36 T::Span: Send + Sync + 'static,
37{
38 async fn handle(
39 &self,
40 req: &mut Request,
41 depot: &mut Depot,
42 res: &mut Response,
43 ctrl: &mut FlowCtrl,
44 ) {
45 let remote_addr = req.remote_addr().to_string();
46
47 let mut headers = HeaderMap::with_capacity(req.headers().len());
49 headers.extend(req.headers().into_iter().map(|(name, value)| {
50 let name = HeaderName::from_bytes(name.as_ref()).expect("Invalid header name");
51 let value = HeaderValue::from_bytes(value.as_ref()).expect("Invalid header value");
52 (name, value)
53 }));
54
55 let parent_cx = global::get_text_map_propagator(|propagator| {
56 propagator.extract(&HeaderExtractor(&headers))
57 });
58
59 let mut attributes = Vec::new();
60 attributes.push(KeyValue::new(
61 resource::TELEMETRY_SDK_NAME,
62 env!("CARGO_CRATE_NAME"),
63 ));
64 attributes.push(KeyValue::new(
65 resource::TELEMETRY_SDK_VERSION,
66 env!("CARGO_PKG_VERSION"),
67 ));
68 attributes.push(KeyValue::new(resource::TELEMETRY_SDK_LANGUAGE, "rust"));
69 attributes.push(KeyValue::new(
70 trace::HTTP_REQUEST_METHOD,
71 req.method().to_string(),
72 ));
73 attributes.push(KeyValue::new(trace::URL_FULL, req.uri().to_string()));
74 attributes.push(KeyValue::new(trace::CLIENT_ADDRESS, remote_addr));
75 attributes.push(KeyValue::new(
76 trace::NETWORK_PROTOCOL_VERSION,
77 format!("{:?}", req.version()),
78 ));
79 let mut span = self
80 .tracer
81 .span_builder(format!("{} {}", req.method(), req.uri()))
82 .with_kind(SpanKind::Server)
83 .with_attributes(attributes)
84 .start_with_context(&self.tracer, &parent_cx);
85
86 span.add_event("request.started".to_owned(), vec![]);
87
88 async move {
89 ctrl.call_next(req, depot, res).await;
90 let cx = Context::current();
91 let span = cx.span();
92
93 let status = res.status_code.unwrap_or_else(|| {
94 tracing::info!("[otel::Tracing] Treat status_code=none as 200(OK).");
95 StatusCode::OK
96 });
97 let event = if status.is_client_error() || status.is_server_error() {
98 "request.failure"
99 } else {
100 "request.success"
101 };
102 span.add_event(event.to_owned(), vec![]);
103 span.set_attribute(KeyValue::new(
104 trace::HTTP_RESPONSE_STATUS_CODE,
105 status.as_u16() as i64,
106 ));
107 if let Some(content_length) = res.headers().typed_get::<headers::ContentLength>() {
108 span.set_attribute(KeyValue::new(
109 "http.response.header.content-length",
110 content_length.0 as i64,
111 ));
112 }
113 }
114 .with_context(Context::current_with_span(span))
115 .await
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use opentelemetry::trace::TracerProvider;
122 use opentelemetry::trace::noop::NoopTracerProvider;
123 use salvo_core::{Depot, FlowCtrl, Request, Response};
124
125 use super::*;
126
127 #[tokio::test]
128 async fn test_tracing_handler() {
129 let tracer = NoopTracerProvider::new().tracer("test");
130 let handler = Tracing::new(tracer);
131
132 let mut req = Request::new();
133 let mut depot = Depot::new();
134 let mut res = Response::new();
135 let mut ctrl = FlowCtrl::new(vec![]);
136
137 handler
138 .handle(&mut req, &mut depot, &mut res, &mut ctrl)
139 .await;
140 }
141}