salvo_otel/
tracing.rs

1use opentelemetry::trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer};
2use opentelemetry::{Context, KeyValue, global};
3use opentelemetry_http::HeaderExtractor;
4use opentelemetry_semantic_conventions::{resource, trace};
5use salvo_core::http::headers::{self, HeaderMap, HeaderMapExt, HeaderName, HeaderValue};
6use salvo_core::prelude::*;
7
8/// Middleware for tracing with OpenTelemetry.
9pub struct Tracing<T> {
10    tracer: T,
11}
12
13impl<T> Tracing<T> {
14    /// Create `Tracing` middleware with `tracer`.
15    pub fn new(tracer: T) -> Self {
16        Self { tracer }
17    }
18}
19
20#[async_trait]
21impl<T> Handler for Tracing<T>
22where
23    T: Tracer + Sync + Send + 'static,
24    T::Span: Send + Sync + 'static,
25{
26    async fn handle(
27        &self,
28        req: &mut Request,
29        depot: &mut Depot,
30        res: &mut Response,
31        ctrl: &mut FlowCtrl,
32    ) {
33        let remote_addr = req.remote_addr().to_string();
34
35        //TODO: Will remove after opentelemetry_http updated
36        let mut headers = HeaderMap::with_capacity(req.headers().len());
37        headers.extend(req.headers().into_iter().map(|(name, value)| {
38            let name = HeaderName::from_bytes(name.as_ref()).expect("Invalid header name");
39            let value = HeaderValue::from_bytes(value.as_ref()).expect("Invalid header value");
40            (name, value)
41        }));
42
43        let parent_cx = global::get_text_map_propagator(|propagator| {
44            propagator.extract(&HeaderExtractor(&headers))
45        });
46
47        let mut attributes = Vec::new();
48        attributes.push(KeyValue::new(
49            resource::TELEMETRY_SDK_NAME,
50            env!("CARGO_CRATE_NAME"),
51        ));
52        attributes.push(KeyValue::new(
53            resource::TELEMETRY_SDK_VERSION,
54            env!("CARGO_PKG_VERSION"),
55        ));
56        attributes.push(KeyValue::new(resource::TELEMETRY_SDK_LANGUAGE, "rust"));
57        attributes.push(KeyValue::new(
58            trace::HTTP_REQUEST_METHOD,
59            req.method().to_string(),
60        ));
61        attributes.push(KeyValue::new(trace::URL_FULL, req.uri().to_string()));
62        attributes.push(KeyValue::new(trace::CLIENT_ADDRESS, remote_addr));
63        attributes.push(KeyValue::new(
64            trace::NETWORK_PROTOCOL_VERSION,
65            format!("{:?}", req.version()),
66        ));
67        let mut span = self
68            .tracer
69            .span_builder(format!("{} {}", req.method(), req.uri()))
70            .with_kind(SpanKind::Server)
71            .with_attributes(attributes)
72            .start_with_context(&self.tracer, &parent_cx);
73
74        span.add_event("request.started".to_string(), vec![]);
75
76        async move {
77            ctrl.call_next(req, depot, res).await;
78            let cx = Context::current();
79            let span = cx.span();
80
81            let status = res.status_code.unwrap_or_else(|| {
82                tracing::info!("[otel::Tracing] Treat status_code=none as 200(OK).");
83                StatusCode::OK
84            });
85            let event = if status.is_client_error() || status.is_server_error() {
86                "request.failure"
87            } else {
88                "request.success"
89            };
90            span.add_event(event.to_string(), vec![]);
91            span.set_attribute(KeyValue::new(
92                trace::HTTP_RESPONSE_STATUS_CODE,
93                status.as_u16() as i64,
94            ));
95            if let Some(content_length) = res.headers().typed_get::<headers::ContentLength>() {
96                span.set_attribute(KeyValue::new(
97                    "http.response.header.content-length",
98                    content_length.0 as i64,
99                ));
100            }
101        }
102        .with_context(Context::current_with_span(span))
103        .await
104    }
105}