Skip to main content

salvo_otel/
tracing.rs

1use std::fmt::{self, Debug, Formatter};
2
3use opentelemetry::trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer};
4use opentelemetry::{Context, KeyValue, global};
5use opentelemetry_http::HeaderExtractor;
6use opentelemetry_semantic_conventions::{resource, trace};
7use salvo_core::http::headers::{self, HeaderMap, HeaderMapExt, HeaderName, HeaderValue};
8use salvo_core::prelude::*;
9
10/// Middleware for tracing with OpenTelemetry.
11pub struct Tracing<T> {
12    tracer: T,
13}
14
15impl<T> Tracing<T> {
16    /// Create `Tracing` middleware with `tracer`.
17    pub fn new(tracer: T) -> Self {
18        Self { tracer }
19    }
20}
21impl<T> Debug for Tracing<T>
22where
23    T: Debug,
24{
25    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
26        f.debug_struct("Tracing")
27            .field("tracer", &self.tracer)
28            .finish()
29    }
30}
31
32#[async_trait]
33impl<T> Handler for Tracing<T>
34where
35    T: Tracer + Sync + Send + 'static,
36    T::Span: Send + Sync + 'static,
37{
38    async fn handle(
39        &self,
40        req: &mut Request,
41        depot: &mut Depot,
42        res: &mut Response,
43        ctrl: &mut FlowCtrl,
44    ) {
45        let remote_addr = req.remote_addr().to_string();
46
47        // TODO: Will remove after opentelemetry_http updated
48        let mut headers = HeaderMap::with_capacity(req.headers().len());
49        headers.extend(req.headers().into_iter().map(|(name, value)| {
50            let name = HeaderName::from_bytes(name.as_ref()).expect("Invalid header name");
51            let value = HeaderValue::from_bytes(value.as_ref()).expect("Invalid header value");
52            (name, value)
53        }));
54
55        let parent_cx = global::get_text_map_propagator(|propagator| {
56            propagator.extract(&HeaderExtractor(&headers))
57        });
58
59        let mut attributes = Vec::new();
60        attributes.push(KeyValue::new(
61            resource::TELEMETRY_SDK_NAME,
62            env!("CARGO_CRATE_NAME"),
63        ));
64        attributes.push(KeyValue::new(
65            resource::TELEMETRY_SDK_VERSION,
66            env!("CARGO_PKG_VERSION"),
67        ));
68        attributes.push(KeyValue::new(resource::TELEMETRY_SDK_LANGUAGE, "rust"));
69        attributes.push(KeyValue::new(
70            trace::HTTP_REQUEST_METHOD,
71            req.method().to_string(),
72        ));
73        attributes.push(KeyValue::new(trace::URL_FULL, req.uri().to_string()));
74        attributes.push(KeyValue::new(trace::CLIENT_ADDRESS, remote_addr));
75        attributes.push(KeyValue::new(
76            trace::NETWORK_PROTOCOL_VERSION,
77            format!("{:?}", req.version()),
78        ));
79        let mut span = self
80            .tracer
81            .span_builder(format!("{} {}", req.method(), req.uri()))
82            .with_kind(SpanKind::Server)
83            .with_attributes(attributes)
84            .start_with_context(&self.tracer, &parent_cx);
85
86        span.add_event("request.started".to_owned(), vec![]);
87
88        async move {
89            ctrl.call_next(req, depot, res).await;
90            let cx = Context::current();
91            let span = cx.span();
92
93            let status = res.status_code.unwrap_or_else(|| {
94                tracing::info!("[otel::Tracing] Treat status_code=none as 200(OK).");
95                StatusCode::OK
96            });
97            let event = if status.is_client_error() || status.is_server_error() {
98                "request.failure"
99            } else {
100                "request.success"
101            };
102            span.add_event(event.to_owned(), vec![]);
103            span.set_attribute(KeyValue::new(
104                trace::HTTP_RESPONSE_STATUS_CODE,
105                status.as_u16() as i64,
106            ));
107            if let Some(content_length) = res.headers().typed_get::<headers::ContentLength>() {
108                span.set_attribute(KeyValue::new(
109                    "http.response.header.content-length",
110                    content_length.0 as i64,
111                ));
112            }
113        }
114        .with_context(Context::current_with_span(span))
115        .await
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use opentelemetry::trace::TracerProvider;
122    use opentelemetry::trace::noop::NoopTracerProvider;
123    use salvo_core::{Depot, FlowCtrl, Request, Response};
124
125    use super::*;
126
127    #[tokio::test]
128    async fn test_tracing_handler() {
129        let tracer = NoopTracerProvider::new().tracer("test");
130        let handler = Tracing::new(tracer);
131
132        let mut req = Request::new();
133        let mut depot = Depot::new();
134        let mut res = Response::new();
135        let mut ctrl = FlowCtrl::new(vec![]);
136
137        handler
138            .handle(&mut req, &mut depot, &mut res, &mut ctrl)
139            .await;
140    }
141}