1use actix_web::dev::{
18 BodySize, MessageBody, ResponseBody, Service, ServiceRequest, ServiceResponse, Transform,
19};
20use actix_web::error::{Error, Result};
21use actix_web::http::header::{HOST, REFERER, USER_AGENT};
22use actix_web::http::StatusCode;
23use actix_web::web::Bytes;
24use chrono::prelude::*;
25use futures::future::{ok, Ready};
26use pin_project::{pin_project, pinned_drop};
27use slog::{debug, info, o, Logger};
28use std::borrow::ToOwned;
29use std::collections::HashSet;
30use std::future::Future;
31use std::marker::PhantomData;
32use std::pin::Pin;
33use std::rc::Rc;
34use std::task::{Context, Poll};
35
36pub struct StructuredLogger(Rc<Inner>);
38
39struct Inner {
40 logger: Logger,
41 exclude: HashSet<String>,
42}
43
44impl StructuredLogger {
45 #[must_use]
47 pub fn new(logger: Logger) -> StructuredLogger {
48 StructuredLogger(Rc::new(Inner {
49 logger,
50 exclude: HashSet::new(),
51 }))
52 }
53
54 pub fn exclude<T: Into<String>>(mut self, path: T) -> Self {
56 Rc::get_mut(&mut self.0)
57 .unwrap()
58 .exclude
59 .insert(path.into());
60 self
61 }
62}
63
64impl<S, B> Transform<S> for StructuredLogger
66where
67 S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
68 B: MessageBody,
69{
70 type Request = ServiceRequest;
71 type Response = ServiceResponse<StreamLog<B>>;
72 type Error = Error;
73 type Transform = StructuredLoggerMiddleware<S>;
74 type InitError = ();
75 type Future = Ready<Result<Self::Transform, Self::InitError>>;
76
77 fn new_transform(&self, service: S) -> Self::Future {
78 ok(StructuredLoggerMiddleware {
79 service,
80 inner: self.0.clone(),
81 })
82 }
83}
84
85pub struct StructuredLoggerMiddleware<S> {
87 inner: Rc<Inner>,
88
89 service: S,
91}
92
93impl<S, B> Service for StructuredLoggerMiddleware<S>
94where
95 S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
96 B: MessageBody,
97{
98 type Request = ServiceRequest;
99 type Response = ServiceResponse<StreamLog<B>>;
100 type Error = Error;
101 type Future = LoggerResponse<S, B>;
102
103 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
104 self.service.poll_ready(cx)
105 }
106
107 fn call(&mut self, req: ServiceRequest) -> Self::Future {
108 let is_exclude = self.inner.exclude.contains(req.path());
110
111 let timestamp = Utc::now();
113
114 let user_agent = req
115 .headers()
116 .get(USER_AGENT)
117 .and_then(|v| v.to_str().ok())
118 .unwrap_or("-");
119
120 let referer = req
121 .headers()
122 .get(REFERER)
123 .and_then(|v| v.to_str().ok())
124 .unwrap_or("-");
125
126 let remote_addr = req
127 .connection_info()
128 .remote_addr()
129 .map_or(String::from("-"), ToOwned::to_owned);
130
131 let host = req
132 .headers()
133 .get(HOST)
134 .and_then(|v| v.to_str().ok())
135 .unwrap_or("-");
136
137 let correlation_id = req
138 .headers()
139 .get("correlation-id")
140 .and_then(|v| v.to_str().ok())
141 .unwrap_or("-");
142
143 let logger = self.inner.logger.new(o!(
144 "version" => format!("{:?}", req.version()),
145 "http_host" => host.to_owned(),
146 "referer" => referer.to_owned(),
147 "remote_address" => remote_addr,
148 "user-agent" => user_agent.to_owned(),
149 "request_method" => req.method().to_string(),
150 "correlation_id" => correlation_id.to_owned(),
151 "uri" => req.path().to_owned(),
152 "query" => format!("?{}", req.query_string()),
153 ));
154
155 LoggerResponse {
156 logger,
157 fut: self.service.call(req),
158 timestamp,
159 _t: PhantomData,
160 is_exclude,
161 }
162 }
163}
164
165#[doc(hidden)]
166#[pin_project::pin_project]
167pub struct LoggerResponse<S, B>
168where
169 B: MessageBody,
170 S: Service,
171{
172 #[pin]
173 fut: S::Future,
174 timestamp: DateTime<Utc>,
176 logger: Logger,
177 is_exclude: bool,
179 _t: PhantomData<(B,)>,
180}
181
182impl<S, B> Future for LoggerResponse<S, B>
185where
186 B: MessageBody,
187 S: Service<Request = ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
188{
189 type Output = Result<ServiceResponse<StreamLog<B>>, Error>;
190
191 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
192 let this = self.project();
193
194 let res = match futures::ready!(this.fut.poll(cx)) {
195 Ok(res) => res,
196 Err(e) => return Poll::Ready(Err(e)),
197 };
198
199 if let Some(error) = res.response().error() {
200 if res.response().head().status != StatusCode::INTERNAL_SERVER_ERROR {
201 debug!(this.logger, "Error in response: {:?}", error);
202 }
203 }
204
205 let timestamp = *this.timestamp;
206 let logger = this.logger.new(o!("status" => res.status().as_u16()));
207 let is_exclude: bool = *this.is_exclude;
208
209 Poll::Ready(Ok(res.map_body(move |_, body| {
210 ResponseBody::Body(StreamLog {
211 logger,
212 is_exclude,
213 body,
214 timestamp,
215 size: 0,
216 })
217 })))
218 }
219}
220
221#[pin_project(PinnedDrop)]
222pub struct StreamLog<B> {
223 logger: Logger,
224 is_exclude: bool,
225 #[pin]
226 body: ResponseBody<B>,
227 size: usize,
228 timestamp: DateTime<Utc>,
229}
230
231#[pinned_drop]
232impl<B> PinnedDrop for StreamLog<B> {
233 fn drop(self: Pin<&mut Self>) {
234 if !self.is_exclude {
235 let response_time = Utc::now() - self.timestamp;
236 let response_time = response_time.num_milliseconds();
237 info!(self.logger, "-"; o!("bytes_sent" => self.size), "response_time" => response_time);
238 }
239 }
240}
241
242impl<B: MessageBody> MessageBody for StreamLog<B> {
243 fn size(&self) -> BodySize {
244 self.body.size()
245 }
246
247 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Bytes, Error>>> {
248 let this = self.project();
249 match this.body.poll_next(cx) {
250 Poll::Ready(Some(Ok(chunk))) => {
251 *this.size += chunk.len();
252 Poll::Ready(Some(Ok(chunk)))
253 }
254 val => val,
255 }
256 }
257}