slog_redis/
lib.rs

1//! This crate implements a [slog](https://crates.io/crates/slog) drain that outputs JSON formatted
2//! logs to a Redis list
3//!
4//! Useful for structured, **centralized logging** using a RELK stack (Redis, Elasticsearch,
5//! Logstash and Kibana). All log messages are sent to a Redis server, in **Logstash message V0 format**,
6//! ready to be parsed/processed by Logstash.
7//!
8//! The format looks like this:
9//!
10//! ```json
11//!  {
12//!     "@timestamp": ${timeRFC3339},
13//!     "@source_host": ${hostname},
14//!     "@message": ${message},
15//!     "@fields": {
16//!        "level": ${levelLowercase},
17//!        "application": ${appName}
18//!        ... // logged field 1
19//!        ... // logged field 2
20//!        ...
21//!    }
22//! ```
23//!
24//! Example usage:
25//!
26//! ```no_run
27//!  use slog::*;
28//!  use slog_redis::Builder;
29//!
30//!  let redis_drain = Builder::new("my-app-name")
31//!    .redis_host("localhost")
32//!    .redis_key("my_redis_list_key")
33//!    .build()
34//!    .unwrap();
35//!
36//!  let drain = slog_async::Async::new(redis_drain.fuse()).build().fuse();
37//!
38//!  let log = Logger::root(drain, o!());
39//!  info!(log, "Send me to {}!", "Redis"; "msg" => "Hello World!");
40//! ```
41//!
42
43use std::cell::RefCell;
44use std::fmt;
45use std::process::Command;
46use std::time::Duration;
47
48use chrono::{DateTime, SecondsFormat, Utc};
49use core::fmt::Write;
50use r2d2_redis::RedisConnectionManager;
51use serde_json::json;
52use slog::Key;
53use slog::{OwnedKVList, Record, KV};
54
55/// A logger that sends JSON formatted logs to a list in a Redis instance. It uses this format
56///
57/// ```json
58///   {
59///     "@timestamp": ${timeRFC3339},
60///     "@source_host": ${hostname},
61///     "@message": ${message},
62///     "@fields": {
63///        "level": ${levelLowercase},
64///        "application": ${appName}
65///        ... // logged field 1
66///        ... // logged field 2
67///        ...
68///    }
69/// ```
70///
71/// It supports structured logging via [`slog`][slog-url]. You can use the [`Builder`] to
72/// construct it and then use it as an slog drain.
73///
74/// [`Builder`]: struct.Builder.html
75/// [slog-url]: https://github.com/slog-rs/slog
76#[derive(Debug)]
77pub struct Logger {
78    redis_host: String,
79    redis_port: u32,
80    redis_key: String,
81    app_name: String,
82    hostname: String,
83    ttl_seconds: Option<u64>,
84    pool: r2d2::Pool<RedisConnectionManager>,
85}
86
87/// Builds the Redis logger.
88#[derive(Default, Debug)]
89pub struct Builder {
90    redis_host: String,
91    redis_port: u32,
92    redis_key: String,
93    app_name: String,
94    hostname: Option<String>,
95    ttl_seconds: Option<u64>,
96    connection_pool_size: u32,
97}
98
99/// Errors returned by the [`Builder`](crate::Builder) and the [`Logger`](crate::Logger)
100#[derive(Debug)]
101pub enum Error {
102    ConnectionPoolErr(r2d2::Error),
103    RedisErr(redis::RedisError),
104    LogErr(slog::Error),
105}
106
107// A Key/Value pair used when constructing the JSON message.
108type KeyVals = std::vec::Vec<(String, serde_json::Value)>;
109
110// Serializes to KeyVals and implements slog::Serializer
111struct Serializer {
112    vec: KeyVals,
113}
114
115#[allow(dead_code)]
116impl Builder {
117    /// Creates the builder taking an application name that will end up in the `@fields.application`
118    /// JSON field of the structured log message.
119    pub fn new(app_name: &str) -> Builder {
120        Builder {
121            app_name: app_name.to_string(),
122            redis_host: "localhost".to_string(),
123            redis_port: 6379,
124            connection_pool_size: 10,
125            ..Default::default()
126        }
127    }
128
129    /// Sets the redis details all at once.
130    pub fn redis(self, host: String, port: u32, key: impl Into<String>) -> Builder {
131        Builder {
132            redis_host: host,
133            redis_port: port,
134            redis_key: key.into(),
135            ..self
136        }
137    }
138
139    /// Sets the name of the key for the list where log messages will be added.
140    pub fn redis_key(self, key: impl Into<String>) -> Builder {
141        Builder {
142            redis_key: key.into(),
143            ..self
144        }
145    }
146
147    /// Sets the name of the redis host. Defaults to 'localhost'.
148    pub fn redis_host(self, host: impl Into<String>) -> Builder {
149        Builder {
150            redis_host: host.into(),
151            ..self
152        }
153    }
154
155    /// Sets the name of the redis port. Defaults to 6379.
156    pub fn redis_port(self, val: u32) -> Builder {
157        Builder {
158            redis_port: val,
159            ..self
160        }
161    }
162
163    /// Sets the time to live for messages in the redis list. Defaults to no timeout
164    pub fn ttl(self, duration: Duration) -> Builder {
165        Builder {
166            ttl_seconds: Some(duration.as_secs()),
167            ..self
168        }
169    }
170
171    /// Sets the name noted down in logs indicating the source of the log entry i.e. the
172    /// `@source_host` field in the JSON payload
173    pub fn source_host(self, host: impl Into<String>) -> Builder {
174        Builder {
175            hostname: Some(host.into()),
176            ..self
177        }
178    }
179
180    /// Consumes the builder, returning the redis logger
181    pub fn build(self) -> Result<Logger, Error> {
182        // TODO: Get something that works on windows too
183        fn get_host_name() -> String {
184            let output = Command::new("hostname").output().expect("failed to execute process");
185            String::from_utf8_lossy(&output.stdout).replace("\n", "")
186        }
187
188        let connection_str = format!("redis://{}:{}", self.redis_host, self.redis_port);
189        let manager = RedisConnectionManager::new(connection_str.as_str())?;
190        let pool = r2d2::Pool::builder()
191            .max_size(self.connection_pool_size)
192            .connection_timeout(Duration::new(1, 0))
193            .build(manager)?;
194
195        let mut con = pool.get()?;
196        redis::cmd("PING").query(&mut *con)?;
197
198        Ok(Logger {
199            redis_host: self.redis_host,
200            redis_port: self.redis_port,
201            redis_key: self.redis_key,
202            app_name: self.app_name,
203            hostname: self.hostname.unwrap_or_else(get_host_name),
204            ttl_seconds: self.ttl_seconds,
205            pool,
206        })
207    }
208}
209
210impl Logger {
211    fn v0_msg(&self, level: &str, msg: &str, key_vals: Option<KeyVals>) -> String {
212        let now: DateTime<Utc> = Utc::now();
213        let time = now.to_rfc3339_opts(SecondsFormat::AutoSi, true);
214        let mut json_val = json!({
215            "@timestamp": time,
216            "@source_host": &self.hostname,
217            "@message": msg.to_lowercase(),
218            "@fields": {
219                "level": level,
220                "application": &self.app_name
221            }
222        });
223
224        let fields = match json_val {
225            serde_json::Value::Object(ref mut v) => match v.get_mut("@fields").unwrap() {
226                serde_json::Value::Object(ref mut v) => Some(v),
227                _ => None,
228            },
229            _ => None,
230        }
231        .unwrap();
232
233        for key_val in &key_vals.unwrap() {
234            fields.insert(key_val.0.clone(), key_val.1.clone());
235        }
236
237        json_val.to_string()
238    }
239
240    /// Sends a message constructed by v0_msg to the redis server
241    fn send_to_redis(&self, msg: &str) -> Result<(), Error> {
242        let mut con = self.pool.get()?;
243
244        redis::cmd("RPUSH")
245            .arg(self.redis_key.as_str())
246            .arg(msg)
247            .query(&mut *con)?;
248
249        if let Some(t) = self.ttl_seconds {
250            redis::cmd("EXPIRE")
251                .arg(self.redis_key.as_str())
252                .arg(t)
253                .query(&mut *con)?
254        }
255        Ok(())
256    }
257}
258
259impl slog::Drain for Logger {
260    type Ok = ();
261    type Err = self::Error;
262
263    fn log(&self, record: &Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
264        let ser = &mut Serializer::new();
265        record.kv().serialize(record, ser)?;
266        values.serialize(record, ser)?;
267
268        let level_str = record.level().as_str();
269        let msg = format!("{}", record.msg());
270        let log_entry = self.v0_msg(level_str, msg.as_str(), Some(ser.done()));
271        self.send_to_redis(&log_entry)?;
272        Ok(())
273    }
274}
275
276impl From<r2d2::Error> for Error {
277    fn from(error: r2d2::Error) -> Self {
278        Error::ConnectionPoolErr(error)
279    }
280}
281
282impl From<redis::RedisError> for Error {
283    fn from(error: redis::RedisError) -> Self {
284        Error::RedisErr(error)
285    }
286}
287
288impl From<slog::Error> for Error {
289    fn from(error: slog::Error) -> Self {
290        Error::LogErr(error)
291    }
292}
293
294impl fmt::Display for Error {
295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296        match self {
297            Error::ConnectionPoolErr(e) => write!(f, "Redis logger connection pool error: {}", e),
298            Error::RedisErr(e) => write!(f, "Redis logger Redis error: {}", e),
299            Error::LogErr(e) => write!(f, "Redis logger slog error: {}", e),
300        }
301    }
302}
303
304impl Serializer {
305    pub fn new() -> Serializer {
306        Serializer { vec: Vec::new() }
307    }
308
309    pub fn emit_val(&mut self, key: slog::Key, val: serde_json::Value) -> slog::Result {
310        self.vec.push((key.to_string(), val));
311        Ok(())
312    }
313
314    fn emit_serde_json_number<V>(&mut self, key: Key, value: V) -> slog::Result
315    where
316        serde_json::Number: From<V>,
317    {
318        // convert a given number into serde_json::Number
319        let num = serde_json::Number::from(value);
320        self.emit_val(key, serde_json::Value::Number(num))
321    }
322
323    fn done(&mut self) -> KeyVals {
324        self.vec.clone()
325    }
326}
327
328// used by Serializer
329thread_local! {
330    static THREAD_LOCAL_BUF: RefCell<String> = RefCell::new(String::with_capacity(256))
331}
332
333#[allow(dead_code)]
334impl slog::Serializer for Serializer {
335    fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
336        self.emit_val(key, serde_json::Value::Bool(val))
337    }
338
339    fn emit_unit(&mut self, key: Key) -> slog::Result {
340        self.emit_val(key, serde_json::Value::Null)
341    }
342
343    fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
344        self.emit_val(key, serde_json::Value::String(val.to_string()))
345    }
346
347    fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
348        self.emit_val(key, serde_json::Value::String(val.to_string()))
349    }
350
351    fn emit_none(&mut self, key: Key) -> slog::Result {
352        self.emit_val(key, serde_json::Value::Null)
353    }
354
355    fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
356        self.emit_serde_json_number::<u8>(key, val)
357    }
358
359    fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
360        self.emit_serde_json_number::<i8>(key, val)
361    }
362
363    fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
364        self.emit_serde_json_number::<u16>(key, val)
365    }
366
367    fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
368        self.emit_serde_json_number::<i16>(key, val)
369    }
370
371    fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
372        self.emit_serde_json_number::<usize>(key, val)
373    }
374
375    fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
376        self.emit_serde_json_number::<isize>(key, val)
377    }
378
379    fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
380        self.emit_serde_json_number::<u32>(key, val)
381    }
382
383    fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
384        self.emit_serde_json_number::<i32>(key, val)
385    }
386
387    fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
388        self.emit_f64(key, f64::from(val))
389    }
390
391    fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
392        self.emit_serde_json_number::<u64>(key, val)
393    }
394
395    fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
396        self.emit_serde_json_number::<i64>(key, val)
397    }
398
399    fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
400        let n = serde_json::Number::from_f64(val);
401        self.emit_val(key, serde_json::Value::Number(n.unwrap()))
402    }
403
404    fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
405        THREAD_LOCAL_BUF.with(|buf| {
406            let mut buf = buf.borrow_mut();
407            buf.write_fmt(*val).unwrap();
408            let res = self.emit_val(key, serde_json::Value::String(buf.clone()));
409            buf.clear();
410            res
411        })
412    }
413}