1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;

use actix_service::Service;
use actix_service::Transform;
use actix_web::dev::Extensions;
use actix_web::dev::ServiceRequest;
use actix_web::dev::ServiceResponse;
use actix_web::Error;
use actix_web::HttpRequest;
use futures::future::ok;
use futures::future::Ready;
use opentracingrust::Span;
use opentracingrust::Tracer;
use slog::Logger;

use replicante_util_failure::capture_fail;
use replicante_util_failure::failure_info;

mod carriers;

pub use self::carriers::HeadersCarrier;

/// Access the request's tracing span.
#[deprecated(
    since = "0.2.0",
    note = "use replicante_util_actixweb::with_request_span"
)]
pub fn request_span(req: &mut Extensions) -> &mut Span {
    req.get_mut::<Span>()
        .expect("request is missing Span extention")
}

/// Access the request's tracing span.
pub fn with_request_span<B, R>(request: &mut HttpRequest, block: B) -> R
where
    B: FnOnce(Option<&mut Span>) -> R,
{
    let mut exts = request.extensions_mut();
    let span = exts.get_mut::<Span>();
    block(span)
}

/// Actix Web middleware to inject an `opentracingrust::Span` on each request.
pub struct TracingMiddleware {
    logger: Logger,
    name: Option<String>,
    tracer: Arc<Tracer>,
}

impl TracingMiddleware {
    /// Inject spans using the request path as then name.
    pub fn new(logger: Logger, tracer: Arc<Tracer>) -> TracingMiddleware {
        TracingMiddleware {
            logger,
            name: None,
            tracer,
        }
    }

    /// Inject spans using the given name.
    pub fn with_name<S>(logger: Logger, tracer: Arc<Tracer>, name: S) -> TracingMiddleware
    where
        S: Into<String>,
    {
        let name = Some(name.into());
        TracingMiddleware {
            logger,
            name,
            tracer,
        }
    }
}

// `S` - type of the next service
// `B` - type of response's body
impl<S, B> Transform<S> for TracingMiddleware
where
    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Request = ServiceRequest;
    type Response = ServiceResponse<B>;
    type Error = Error;
    type InitError = ();
    type Transform = MiddlewareService<S>;
    type Future = Ready<Result<Self::Transform, Self::InitError>>;

    fn new_transform(&self, service: S) -> Self::Future {
        ok(MiddlewareService {
            logger: self.logger.clone(),
            name: self.name.clone(),
            service,
            tracer: Arc::clone(&self.tracer),
        })
    }
}

/// Inner middleware to process requests on behalf of `TracingMiddleware`.
pub struct MiddlewareService<S> {
    logger: Logger,
    name: Option<String>,
    service: S,
    tracer: Arc<Tracer>,
}

impl<S, B> Service for MiddlewareService<S>
where
    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
    S::Future: 'static,
    B: 'static,
{
    type Request = ServiceRequest;
    type Response = ServiceResponse<B>;
    type Error = Error;
    type Future = crate::BoxedFuture<Self::Response, Self::Error>;

    fn poll_ready(&mut self, ctx: &mut Context) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(ctx)
    }

    fn call(&mut self, mut req: ServiceRequest) -> Self::Future {
        let logger = self.logger.clone();
        let name = match self.name.as_ref() {
            None => req.path(),
            Some(name) => name.as_str(),
        };
        let mut span = self.tracer.span(name);

        // Extend the span with a parent and some request attributes.
        match HeadersCarrier::extract(req.headers_mut(), &self.tracer) {
            Ok(Some(context)) => span.child_of(context),
            Ok(None) => (),
            Err(error) => {
                capture_fail!(
                    &error,
                    logger,
                    "Unable to extract trace context from request headers";
                    failure_info(&error),
                );
            }
        };
        span.tag("http.route.method", req.method().as_str());
        span.tag("http.route.uri", req.uri().to_string());
        for (param, value) in req.match_info().iter() {
            span.tag(&format!("http.route.param.{}", param), value);
        }

        // Send the request and handle the span on response.
        let tracer = self.tracer.clone();
        req.head_mut().extensions_mut().insert(span);
        let response = self.service.call(req);
        Box::pin(async move {
            let mut response = response.await?;
            let span: Option<Span> = response.request().extensions_mut().remove();
            if let Some(span) = span {
                let result = HeadersCarrier::inject(
                    span.context(),
                    response.response_mut().headers_mut(),
                    &tracer,
                );
                if let Err(error) = result {
                    capture_fail!(
                        &error,
                        logger,
                        "Failed to inject trace context into response headers";
                        failure_info(&error),
                    );
                }

                if let Err(error) = span.finish() {
                    let error = failure::SyncFailure::new(error);
                    capture_fail!(
                        &error,
                        logger,
                        "Failed to finish request tracing span";
                        failure_info(&error),
                    );
                }
            }
            Ok(response)
        })
    }
}