actix_slog/
lib.rs

1//! structured request logging middleware
2//!
3//! ```no_run
4//! use actix_web::{HttpServer, App};
5//! use actix_slog::StructuredLogger;
6//! use slog::o;
7//!
8//! let logger: slog::Logger = unimplemented!();
9//! let server = HttpServer::new(move || {
10//!   App::new()
11//!     .wrap(
12//!       StructuredLogger::new(logger.new(o!("log_type" => "access"))),
13//!     )
14//!   })
15//!   .bind("[::1]:8080");
16//! ```
17use actix_web::dev::{
18    BodySize, MessageBody, ResponseBody, Service, ServiceRequest, ServiceResponse, Transform,
19};
20use actix_web::error::{Error, Result};
21use actix_web::http::header::{HOST, REFERER, USER_AGENT};
22use actix_web::http::StatusCode;
23use actix_web::web::Bytes;
24use chrono::prelude::*;
25use futures::future::{ok, Ready};
26use pin_project::{pin_project, pinned_drop};
27use slog::{debug, info, o, Logger};
28use std::borrow::ToOwned;
29use std::collections::HashSet;
30use std::future::Future;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::rc::Rc;
34use std::task::{Context, Poll};
35
36/// global configuration/builder for the log middleware
37pub struct StructuredLogger(Rc<Inner>);
38
39struct Inner {
40    logger: Logger,
41    exclude: HashSet<String>,
42}
43
44impl StructuredLogger {
45    /// Create `Logger` middleware with the specified `format`.
46    #[must_use]
47    pub fn new(logger: Logger) -> StructuredLogger {
48        StructuredLogger(Rc::new(Inner {
49            logger,
50            exclude: HashSet::new(),
51        }))
52    }
53
54    /// Ignore and do not log access for specified path.
55    pub fn exclude<T: Into<String>>(mut self, path: T) -> Self {
56        Rc::get_mut(&mut self.0)
57            .unwrap()
58            .exclude
59            .insert(path.into());
60        self
61    }
62}
63
64/// "initializer" for the service/the actual middleware (called once per worker)
65impl<S, B> Transform<S> for StructuredLogger
66where
67    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
68    B: MessageBody,
69{
70    type Request = ServiceRequest;
71    type Response = ServiceResponse<StreamLog<B>>;
72    type Error = Error;
73    type Transform = StructuredLoggerMiddleware<S>;
74    type InitError = ();
75    type Future = Ready<Result<Self::Transform, Self::InitError>>;
76
77    fn new_transform(&self, service: S) -> Self::Future {
78        ok(StructuredLoggerMiddleware {
79            service,
80            inner: self.0.clone(),
81        })
82    }
83}
84
85/// Logger middleware
86pub struct StructuredLoggerMiddleware<S> {
87    inner: Rc<Inner>,
88
89    /// the next service in the chain, kind of like express' next()
90    service: S,
91}
92
93impl<S, B> Service for StructuredLoggerMiddleware<S>
94where
95    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
96    B: MessageBody,
97{
98    type Request = ServiceRequest;
99    type Response = ServiceResponse<StreamLog<B>>;
100    type Error = Error;
101    type Future = LoggerResponse<S, B>;
102
103    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
104        self.service.poll_ready(cx)
105    }
106
107    fn call(&mut self, req: ServiceRequest) -> Self::Future {
108        // check the exclude-list if to skip this path…
109        let is_exclude = self.inner.exclude.contains(req.path());
110
111        // …but collect other fields nevertheless, to log errors etc.
112        let timestamp = Utc::now();
113
114        let user_agent = req
115            .headers()
116            .get(USER_AGENT)
117            .and_then(|v| v.to_str().ok())
118            .unwrap_or("-");
119
120        let referer = req
121            .headers()
122            .get(REFERER)
123            .and_then(|v| v.to_str().ok())
124            .unwrap_or("-");
125
126        let remote_addr = req
127            .connection_info()
128            .remote_addr()
129            .map_or(String::from("-"), ToOwned::to_owned);
130
131        let host = req
132            .headers()
133            .get(HOST)
134            .and_then(|v| v.to_str().ok())
135            .unwrap_or("-");
136
137        let correlation_id = req
138            .headers()
139            .get("correlation-id")
140            .and_then(|v| v.to_str().ok())
141            .unwrap_or("-");
142
143        let logger = self.inner.logger.new(o!(
144            "version" => format!("{:?}", req.version()),
145            "http_host" => host.to_owned(),
146            "referer" => referer.to_owned(),
147            "remote_address" => remote_addr,
148            "user-agent" => user_agent.to_owned(),
149            "request_method" => req.method().to_string(),
150            "correlation_id" => correlation_id.to_owned(),
151            "uri" => req.path().to_owned(),
152            "query" => format!("?{}", req.query_string()),
153        ));
154
155        LoggerResponse {
156            logger,
157            fut: self.service.call(req),
158            timestamp,
159            _t: PhantomData,
160            is_exclude,
161        }
162    }
163}
164
165#[doc(hidden)]
166#[pin_project::pin_project]
167pub struct LoggerResponse<S, B>
168where
169    B: MessageBody,
170    S: Service,
171{
172    #[pin]
173    fut: S::Future,
174    // timestamp at which the request hit the service (in contrast to when the log is written, i.e. the request is done)
175    timestamp: DateTime<Utc>,
176    logger: Logger,
177    // if to exclude this request
178    is_exclude: bool,
179    _t: PhantomData<(B,)>,
180}
181
182/// "handler" for the response, i.e. "action" to call once the other services are done, and the
183/// response is ready
184impl<S, B> Future for LoggerResponse<S, B>
185where
186    B: MessageBody,
187    S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
188{
189    type Output = Result<ServiceResponse<StreamLog<B>>, Error>;
190
191    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
192        let this = self.project();
193
194        let res = match futures::ready!(this.fut.poll(cx)) {
195            Ok(res) => res,
196            Err(e) => return Poll::Ready(Err(e)),
197        };
198
199        if let Some(error) = res.response().error() {
200            if res.response().head().status != StatusCode::INTERNAL_SERVER_ERROR {
201                debug!(this.logger, "Error in response: {:?}", error);
202            }
203        }
204
205        let timestamp = *this.timestamp;
206        let logger = this.logger.new(o!("status" => res.status().as_u16()));
207        let is_exclude: bool = *this.is_exclude;
208
209        Poll::Ready(Ok(res.map_body(move |_, body| {
210            ResponseBody::Body(StreamLog {
211                logger,
212                is_exclude,
213                body,
214                timestamp,
215                size: 0,
216            })
217        })))
218    }
219}
220
221#[pin_project(PinnedDrop)]
222pub struct StreamLog<B> {
223    logger: Logger,
224    is_exclude: bool,
225    #[pin]
226    body: ResponseBody<B>,
227    size: usize,
228    timestamp: DateTime<Utc>,
229}
230
231#[pinned_drop]
232impl<B> PinnedDrop for StreamLog<B> {
233    fn drop(self: Pin<&mut Self>) {
234        if !self.is_exclude {
235            let response_time = Utc::now() - self.timestamp;
236            let response_time = response_time.num_milliseconds();
237            info!(self.logger, "-"; o!("bytes_sent" => self.size), "response_time" => response_time);
238        }
239    }
240}
241
242impl<B: MessageBody> MessageBody for StreamLog<B> {
243    fn size(&self) -> BodySize {
244        self.body.size()
245    }
246
247    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
248        let this = self.project();
249        match this.body.poll_next(cx) {
250            Poll::Ready(Some(Ok(chunk))) => {
251                *this.size += chunk.len();
252                Poll::Ready(Some(Ok(chunk)))
253            }
254            val => val,
255        }
256    }
257}