use std::{fmt, thread};
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{Receiver, sync_channel, SyncSender};
use gelf_logger::{Batch, BatchProcessor, Buffer, Config, Event, GelfTcpOutput, Metronome};
use log4rs::append::Append;
use log::Record;
use serde_gelf::{GelfLevel, GelfRecord};
use serde_value::Value;
pub struct BufferAppender {
processor: BatchProcessor
}
#[derive(Debug)]
pub struct BufferAppenderBuilder {
level: GelfLevel,
hostname: String,
port: u64,
use_tls: bool,
null_character: bool,
buffer_size: Option<usize>,
buffer_duration: Option<u64>,
additional_fields: BTreeMap<Value, Value>,
}
impl Default for BufferAppenderBuilder {
fn default() -> BufferAppenderBuilder {
BufferAppenderBuilder {
level: GelfLevel::default(),
hostname: "127.0.0.1".to_string(),
port: 12202,
use_tls: true,
null_character: true,
buffer_size: Some(100),
buffer_duration: Some(500),
additional_fields: {
let mut additional_fields = BTreeMap::new();
additional_fields.insert(Value::String("pkg_name".into()), Value::String(env!("CARGO_PKG_NAME").into()));
additional_fields.insert(Value::String("pkg_version".into()), Value::String(env!("CARGO_PKG_VERSION").into()));
additional_fields
},
}
}
}
impl BufferAppenderBuilder {
pub fn set_level(mut self, level: GelfLevel) -> BufferAppenderBuilder {
self.level = level;
self
}
pub fn set_hostname(mut self, hostname: &str) -> BufferAppenderBuilder {
self.hostname = hostname.to_string();
self
}
pub fn set_port(mut self, port: u64) -> BufferAppenderBuilder {
self.port = port;
self
}
pub fn set_use_tls(mut self, use_tls: bool) -> BufferAppenderBuilder {
self.use_tls = use_tls;
self
}
pub fn set_null_character(mut self, null_character: bool) -> BufferAppenderBuilder {
self.null_character = null_character;
self
}
pub fn set_buffer_size(mut self, buffer_size: Option<usize>) -> BufferAppenderBuilder {
self.buffer_size = buffer_size;
self
}
pub fn set_buffer_duration(mut self, buffer_duration: Option<u64>) -> BufferAppenderBuilder {
self.buffer_duration = buffer_duration;
self
}
pub fn put_additional_field(mut self, key: &str, value: Value) -> BufferAppenderBuilder {
self.additional_fields.insert(Value::String(key.to_string()), value);
self
}
pub fn extend_additional_field(mut self, additional_fields: BTreeMap<Value, Value>) -> BufferAppenderBuilder {
self.additional_fields.extend(additional_fields);
self
}
pub fn build(self) -> Result<BufferAppender, gelf_logger::Error> {
let cfg = Config::builder()
.set_level(self.level)
.set_hostname(self.hostname)
.set_port(self.port)
.set_use_tls(self.use_tls)
.set_null_character(self.null_character)
.set_buffer_size(self.buffer_size.unwrap_or(100))
.set_buffer_duration(self.buffer_duration.unwrap_or(500))
.extend_additional_fields(self.additional_fields)
.build();
let (tx, rx): (SyncSender<Event>, Receiver<Event>) = sync_channel(10_000_000);
if let &Some(duration) = cfg.buffer_duration() {
let ctx = tx.clone();
Metronome::start(duration, ctx);
}
let gelf_level = cfg.level().clone();
let arx = Arc::new(Mutex::new(rx));
thread::spawn(move || {
let _ = Buffer::new(arx, GelfTcpOutput::from(&cfg)).run();
});
Ok(BufferAppender {
processor: BatchProcessor::new(tx, gelf_level)
})
}
}
impl BufferAppender {
#[cfg(not(feature = "ovh-ldp"))]
pub fn builder() -> BufferAppenderBuilder {
BufferAppenderBuilder::default()
}
#[cfg(feature = "ovh-ldp")]
pub fn builder(hostname: &str, token: &str) -> BufferAppenderBuilder {
BufferAppenderBuilder::default()
.set_hostname(hostname)
.set_level(GelfLevel::Informational)
.put_additional_field("X-OVH-TOKEN", serde_value::Value::String(token.to_string()))
}
}
impl fmt::Debug for BufferAppender {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("GelfAppender").finish()
}
}
impl Append for BufferAppender {
fn append(&self, record: &Record) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
match self.processor.send(&GelfRecord::from(record)) {
Ok(()) => Ok(()),
Err(exc) => Err(Box::new(exc))
}
}
fn flush(&self) {}
}