1use std::cell::RefCell;
44use std::fmt;
45use std::process::Command;
46use std::time::Duration;
47
48use chrono::{DateTime, SecondsFormat, Utc};
49use core::fmt::Write;
50use r2d2_redis::RedisConnectionManager;
51use serde_json::json;
52use slog::Key;
53use slog::{OwnedKVList, Record, KV};
54
55#[derive(Debug)]
77pub struct Logger {
78 redis_host: String,
79 redis_port: u32,
80 redis_key: String,
81 app_name: String,
82 hostname: String,
83 ttl_seconds: Option<u64>,
84 pool: r2d2::Pool<RedisConnectionManager>,
85}
86
87#[derive(Default, Debug)]
89pub struct Builder {
90 redis_host: String,
91 redis_port: u32,
92 redis_key: String,
93 app_name: String,
94 hostname: Option<String>,
95 ttl_seconds: Option<u64>,
96 connection_pool_size: u32,
97}
98
99#[derive(Debug)]
101pub enum Error {
102 ConnectionPoolErr(r2d2::Error),
103 RedisErr(redis::RedisError),
104 LogErr(slog::Error),
105}
106
107type KeyVals = std::vec::Vec<(String, serde_json::Value)>;
109
110struct Serializer {
112 vec: KeyVals,
113}
114
115#[allow(dead_code)]
116impl Builder {
117 pub fn new(app_name: &str) -> Builder {
120 Builder {
121 app_name: app_name.to_string(),
122 redis_host: "localhost".to_string(),
123 redis_port: 6379,
124 connection_pool_size: 10,
125 ..Default::default()
126 }
127 }
128
129 pub fn redis(self, host: String, port: u32, key: impl Into<String>) -> Builder {
131 Builder {
132 redis_host: host,
133 redis_port: port,
134 redis_key: key.into(),
135 ..self
136 }
137 }
138
139 pub fn redis_key(self, key: impl Into<String>) -> Builder {
141 Builder {
142 redis_key: key.into(),
143 ..self
144 }
145 }
146
147 pub fn redis_host(self, host: impl Into<String>) -> Builder {
149 Builder {
150 redis_host: host.into(),
151 ..self
152 }
153 }
154
155 pub fn redis_port(self, val: u32) -> Builder {
157 Builder {
158 redis_port: val,
159 ..self
160 }
161 }
162
163 pub fn ttl(self, duration: Duration) -> Builder {
165 Builder {
166 ttl_seconds: Some(duration.as_secs()),
167 ..self
168 }
169 }
170
171 pub fn source_host(self, host: impl Into<String>) -> Builder {
174 Builder {
175 hostname: Some(host.into()),
176 ..self
177 }
178 }
179
180 pub fn build(self) -> Result<Logger, Error> {
182 fn get_host_name() -> String {
184 let output = Command::new("hostname").output().expect("failed to execute process");
185 String::from_utf8_lossy(&output.stdout).replace("\n", "")
186 }
187
188 let connection_str = format!("redis://{}:{}", self.redis_host, self.redis_port);
189 let manager = RedisConnectionManager::new(connection_str.as_str())?;
190 let pool = r2d2::Pool::builder()
191 .max_size(self.connection_pool_size)
192 .connection_timeout(Duration::new(1, 0))
193 .build(manager)?;
194
195 let mut con = pool.get()?;
196 redis::cmd("PING").query(&mut *con)?;
197
198 Ok(Logger {
199 redis_host: self.redis_host,
200 redis_port: self.redis_port,
201 redis_key: self.redis_key,
202 app_name: self.app_name,
203 hostname: self.hostname.unwrap_or_else(get_host_name),
204 ttl_seconds: self.ttl_seconds,
205 pool,
206 })
207 }
208}
209
210impl Logger {
211 fn v0_msg(&self, level: &str, msg: &str, key_vals: Option<KeyVals>) -> String {
212 let now: DateTime<Utc> = Utc::now();
213 let time = now.to_rfc3339_opts(SecondsFormat::AutoSi, true);
214 let mut json_val = json!({
215 "@timestamp": time,
216 "@source_host": &self.hostname,
217 "@message": msg.to_lowercase(),
218 "@fields": {
219 "level": level,
220 "application": &self.app_name
221 }
222 });
223
224 let fields = match json_val {
225 serde_json::Value::Object(ref mut v) => match v.get_mut("@fields").unwrap() {
226 serde_json::Value::Object(ref mut v) => Some(v),
227 _ => None,
228 },
229 _ => None,
230 }
231 .unwrap();
232
233 for key_val in &key_vals.unwrap() {
234 fields.insert(key_val.0.clone(), key_val.1.clone());
235 }
236
237 json_val.to_string()
238 }
239
240 fn send_to_redis(&self, msg: &str) -> Result<(), Error> {
242 let mut con = self.pool.get()?;
243
244 redis::cmd("RPUSH")
245 .arg(self.redis_key.as_str())
246 .arg(msg)
247 .query(&mut *con)?;
248
249 if let Some(t) = self.ttl_seconds {
250 redis::cmd("EXPIRE")
251 .arg(self.redis_key.as_str())
252 .arg(t)
253 .query(&mut *con)?
254 }
255 Ok(())
256 }
257}
258
259impl slog::Drain for Logger {
260 type Ok = ();
261 type Err = self::Error;
262
263 fn log(&self, record: &Record, values: &OwnedKVList) -> Result<Self::Ok, Self::Err> {
264 let ser = &mut Serializer::new();
265 record.kv().serialize(record, ser)?;
266 values.serialize(record, ser)?;
267
268 let level_str = record.level().as_str();
269 let msg = format!("{}", record.msg());
270 let log_entry = self.v0_msg(level_str, msg.as_str(), Some(ser.done()));
271 self.send_to_redis(&log_entry)?;
272 Ok(())
273 }
274}
275
276impl From<r2d2::Error> for Error {
277 fn from(error: r2d2::Error) -> Self {
278 Error::ConnectionPoolErr(error)
279 }
280}
281
282impl From<redis::RedisError> for Error {
283 fn from(error: redis::RedisError) -> Self {
284 Error::RedisErr(error)
285 }
286}
287
288impl From<slog::Error> for Error {
289 fn from(error: slog::Error) -> Self {
290 Error::LogErr(error)
291 }
292}
293
294impl fmt::Display for Error {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 match self {
297 Error::ConnectionPoolErr(e) => write!(f, "Redis logger connection pool error: {}", e),
298 Error::RedisErr(e) => write!(f, "Redis logger Redis error: {}", e),
299 Error::LogErr(e) => write!(f, "Redis logger slog error: {}", e),
300 }
301 }
302}
303
304impl Serializer {
305 pub fn new() -> Serializer {
306 Serializer { vec: Vec::new() }
307 }
308
309 pub fn emit_val(&mut self, key: slog::Key, val: serde_json::Value) -> slog::Result {
310 self.vec.push((key.to_string(), val));
311 Ok(())
312 }
313
314 fn emit_serde_json_number<V>(&mut self, key: Key, value: V) -> slog::Result
315 where
316 serde_json::Number: From<V>,
317 {
318 let num = serde_json::Number::from(value);
320 self.emit_val(key, serde_json::Value::Number(num))
321 }
322
323 fn done(&mut self) -> KeyVals {
324 self.vec.clone()
325 }
326}
327
328thread_local! {
330 static THREAD_LOCAL_BUF: RefCell<String> = RefCell::new(String::with_capacity(256))
331}
332
333#[allow(dead_code)]
334impl slog::Serializer for Serializer {
335 fn emit_bool(&mut self, key: Key, val: bool) -> slog::Result {
336 self.emit_val(key, serde_json::Value::Bool(val))
337 }
338
339 fn emit_unit(&mut self, key: Key) -> slog::Result {
340 self.emit_val(key, serde_json::Value::Null)
341 }
342
343 fn emit_str(&mut self, key: Key, val: &str) -> slog::Result {
344 self.emit_val(key, serde_json::Value::String(val.to_string()))
345 }
346
347 fn emit_char(&mut self, key: Key, val: char) -> slog::Result {
348 self.emit_val(key, serde_json::Value::String(val.to_string()))
349 }
350
351 fn emit_none(&mut self, key: Key) -> slog::Result {
352 self.emit_val(key, serde_json::Value::Null)
353 }
354
355 fn emit_u8(&mut self, key: Key, val: u8) -> slog::Result {
356 self.emit_serde_json_number::<u8>(key, val)
357 }
358
359 fn emit_i8(&mut self, key: Key, val: i8) -> slog::Result {
360 self.emit_serde_json_number::<i8>(key, val)
361 }
362
363 fn emit_u16(&mut self, key: Key, val: u16) -> slog::Result {
364 self.emit_serde_json_number::<u16>(key, val)
365 }
366
367 fn emit_i16(&mut self, key: Key, val: i16) -> slog::Result {
368 self.emit_serde_json_number::<i16>(key, val)
369 }
370
371 fn emit_usize(&mut self, key: Key, val: usize) -> slog::Result {
372 self.emit_serde_json_number::<usize>(key, val)
373 }
374
375 fn emit_isize(&mut self, key: Key, val: isize) -> slog::Result {
376 self.emit_serde_json_number::<isize>(key, val)
377 }
378
379 fn emit_u32(&mut self, key: Key, val: u32) -> slog::Result {
380 self.emit_serde_json_number::<u32>(key, val)
381 }
382
383 fn emit_i32(&mut self, key: Key, val: i32) -> slog::Result {
384 self.emit_serde_json_number::<i32>(key, val)
385 }
386
387 fn emit_f32(&mut self, key: Key, val: f32) -> slog::Result {
388 self.emit_f64(key, f64::from(val))
389 }
390
391 fn emit_u64(&mut self, key: Key, val: u64) -> slog::Result {
392 self.emit_serde_json_number::<u64>(key, val)
393 }
394
395 fn emit_i64(&mut self, key: Key, val: i64) -> slog::Result {
396 self.emit_serde_json_number::<i64>(key, val)
397 }
398
399 fn emit_f64(&mut self, key: Key, val: f64) -> slog::Result {
400 let n = serde_json::Number::from_f64(val);
401 self.emit_val(key, serde_json::Value::Number(n.unwrap()))
402 }
403
404 fn emit_arguments(&mut self, key: Key, val: &fmt::Arguments) -> slog::Result {
405 THREAD_LOCAL_BUF.with(|buf| {
406 let mut buf = buf.borrow_mut();
407 buf.write_fmt(*val).unwrap();
408 let res = self.emit_val(key, serde_json::Value::String(buf.clone()));
409 buf.clear();
410 res
411 })
412 }
413}