use std::cell::RefCell;
use std::fmt;
use std::process::Command;
use std::time::Duration;
use chrono::{DateTime, SecondsFormat, Utc};
use core::fmt::Write;
use r2d2_redis::RedisConnectionManager;
use serde_json::json;
use slog::Key;
use slog::{OwnedKVList, Record, KV};
#[derive(Debug)]
pub struct Logger {
redis_host: String,
redis_port: u32,
redis_key: String,
app_name: String,
hostname: String,
ttl_seconds: Option<u64>,
pool: r2d2::Pool<RedisConnectionManager>,
}
#[derive(Default, Debug)]
pub struct Builder {
redis_host: String,
redis_port: u32,
redis_key: String,
app_name: String,
hostname: Option<String>,
ttl_seconds: Option<u64>,
}
#[derive(Debug)]
pub enum Error {
ConnectionPoolErr(r2d2::Error),
RedisErr(redis::RedisError),
LogErr(slog::Error),
}
type KeyVals = std::vec::Vec<(String, serde_json::Value)>;
struct Serializer {
vec: KeyVals,
}
#[allow(dead_code)]
impl Builder {
pub fn new(app_name: &str) -> Builder {
Builder {
app_name: app_name.to_string(),
redis_host: "localhost".to_string(),
redis_port: 6379,
..Default::default()
}
}
pub fn redis(self, host: String, port: u32, key: String) -> Builder {
Builder {
redis_host: host,
redis_port: port,
redis_key: key,
..self
}
}
pub fn redis_key(self, key: &str) -> Builder {
Builder {
redis_key: key.to_string(),
..self
}
}
pub fn redis_host(self, host: &str) -> Builder {
Builder {
redis_host: host.to_string(),
..self
}
}
pub fn redis_port(self, val: u32) -> Builder {
Builder {
redis_port: val,
..self
}
}
pub fn ttl(self, duration: Duration) -> Builder {
Builder {
ttl_seconds: Some(duration.as_secs()),
..self
}
}
pub fn build(self) -> Result<Logger, Error> {
fn get_host_name() -> String {
let output = Command::new("hostname").output().expect("failed to execute process");
String::from_utf8_lossy(&output.stdout).replace("\n", "")
}
let connection_str = format!("redis://{}:{}", self.redis_host, self.redis_port);
let manager = RedisConnectionManager::new(connection_str.as_str())?;
let pool = r2d2::Pool::builder()
.connection_timeout(Duration::new(1, 0))
.build(manager)?;
let mut con = pool.get()?;
redis::cmd("PING").query(&mut *con)?;
Ok(Logger {
redis_host: self.redis_host,
redis_port: self.redis_port,
redis_key: self.redis_key,
app_name: self.app_name,
hostname: self.hostname.unwrap_or_else(get_host_name),
ttl_seconds: self.ttl_seconds,
pool,
})
}
}
impl Logger {
fn v0_msg(&self, level: &str, msg: &str, key_vals: Option<KeyVals>) -> String {
let now: DateTime<Utc> = Utc::now();
let time = now.to_rfc3339_opts(SecondsFormat::AutoSi, true);
let application = self.app_name.clone();
let mut json_val = json!({
"@timestamp": time,
"@source_host": self.hostname.clone(),
"@message": msg.to_lowercase(),
"@fields": {
"level": level,
"application": application
}
});
let fields = match json_val {
serde_json::Value::Object(ref mut v) => match v.get_mut("@fields").unwrap() {
serde_json::Value::Object(ref mut v) => Some(v),
_ => None,
},
_ => None,
}
.unwrap();
for key_val in &key_vals.unwrap() {
fields.insert(key_val.0.clone(), key_val.1.clone());
}
json_val.to_string()
}
fn send_to_redis(&self, msg: &str) -> Result<(), Error> {
let mut con = self.pool.get()?;
redis::cmd("RPUSH")
.arg(self.redis_key.as_str())
.arg(msg)
.query(&mut *con)?;
if let Some(t) = self.ttl_seconds {
redis::cmd("EXPIRE")
.arg(self.redis_key.as_str())
.arg(t)
.query(&mut *con)?
}
Ok(())
}
}
impl slog::Drain for Logger {
type Ok = ();
type Err = self::Error;
fn log(&self, record: &Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
let ser = &mut Serializer::new();
record.kv().serialize(record, ser)?;
values.serialize(record, ser)?;
let level_str = record.level().as_str();
let msg = format!("{}", record.msg());
let log_entry = self.v0_msg(level_str, msg.as_str(), Some(ser.done()));
self.send_to_redis(&log_entry)?;
Ok(())
}
}
impl From<r2d2::Error> for Error {
fn from(error: r2d2::Error) -> Self {
Error::ConnectionPoolErr(error)
}
}
impl From<redis::RedisError> for Error {
fn from(error: redis::RedisError) -> Self {
Error::RedisErr(error)
}
}
impl From<slog::Error> for Error {
fn from(error: slog::Error) -> Self {
Error::LogErr(error)
}
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::ConnectionPoolErr(e) => write!(f, "Redis logger connection pool error: {}", e),
Error::RedisErr(e) => write!(f, "Redis logger Redis error: {}", e),
Error::LogErr(e) => write!(f, "Redis logger slog error: {}", e),
}
}
}
impl Serializer {
pub fn new() -> Serializer {
Serializer { vec: Vec::new() }
}
pub fn emit_val(&mut self, key: slog::Key, val: serde_json::Value) -> slog::Result {
self.vec.push((key.to_string(), val));
Ok(())
}
fn emit_serde_json_number<V>(&mut self, key: Key, value: V) -> slog::Result
where
serde_json::Number: From<V>,
{
let num = serde_json::Number::from(value);
self.emit_val(key, serde_json::Value::Number(num))
}
fn done(&mut self) -> KeyVals {
self.vec.clone()
}
}
thread_local! {
static THREAD_LOCAL_BUF: RefCell<String> = RefCell::new(String::with_capacity(256))
}
#[allow(dead_code)]
impl slog::Serializer for Serializer {
fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
self.emit_val(key, serde_json::Value::Bool(val))
}
fn emit_unit(&mut self, key: Key) -> slog::Result {
self.emit_val(key, serde_json::Value::Null)
}
fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
self.emit_val(key, serde_json::Value::String(val.to_string()))
}
fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
self.emit_val(key, serde_json::Value::String(val.to_string()))
}
fn emit_none(&mut self, key: Key) -> slog::Result {
self.emit_val(key, serde_json::Value::Null)
}
fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
self.emit_serde_json_number::<u8>(key, val)
}
fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
self.emit_serde_json_number::<i8>(key, val)
}
fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
self.emit_serde_json_number::<u16>(key, val)
}
fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
self.emit_serde_json_number::<i16>(key, val)
}
fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
self.emit_serde_json_number::<usize>(key, val)
}
fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
self.emit_serde_json_number::<isize>(key, val)
}
fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
self.emit_serde_json_number::<u32>(key, val)
}
fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
self.emit_serde_json_number::<i32>(key, val)
}
fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
self.emit_f64(key, f64::from(val))
}
fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
self.emit_serde_json_number::<u64>(key, val)
}
fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
self.emit_serde_json_number::<i64>(key, val)
}
fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
let n = serde_json::Number::from_f64(val);
self.emit_val(key, serde_json::Value::Number(n.unwrap()))
}
fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
THREAD_LOCAL_BUF.with(|buf| {
let mut buf = buf.borrow_mut();
buf.write_fmt(*val).unwrap();
let res = self.emit_val(key, serde_json::Value::String(buf.clone()));
buf.clear();
res
})
}
}