use std::{
io,
io::Write,
net::{TcpStream, ToSocketAddrs},
sync::mpsc,
thread,
time::Duration,
};
use env_filter::Filter;
use log::{LevelFilter, Log, Metadata, Record};
use native_tls::{TlsConnector, TlsStream};
use crate::{Builder, Error, GelfRecord, Map, Value};
#[derive(Debug)]
pub struct GelfLogger {
pub(crate) filter: Filter,
pub(crate) writer: Writer,
pub(crate) null_character: bool,
pub(crate) additional_fields: Map<String, Value>,
}
impl GelfLogger {
pub fn builder() -> Builder {
Builder::new()
}
pub fn filter(&self) -> LevelFilter {
self.filter.filter()
}
pub fn matches(&self, record: &Record<'_>) -> bool {
self.filter.matches(record)
}
}
impl Log for GelfLogger {
fn enabled(&self, metadata: &Metadata<'_>) -> bool {
self.filter.enabled(metadata)
}
fn log(&self, record: &Record<'_>) {
if !self.matches(record) {
return;
}
let mut record = GelfRecord::from(record);
record
.additional_fields
.extend(self.additional_fields.clone());
let Ok(mut data) = serde_json::to_vec(&record) else {
return;
};
data.push(b'\n');
if self.null_character {
data.push(b'\0');
}
self.writer.write(Op::Data(data));
}
fn flush(&self) {
let (tx, rx) = mpsc::sync_channel(1);
self.writer.write(Op::Flush(tx));
let _ = rx.recv();
}
}
impl Drop for GelfLogger {
fn drop(&mut self) {
self.flush();
}
}
#[derive(Debug)]
pub(crate) enum Writer {
Stdout,
Stderr,
Pipe(mpsc::SyncSender<Op>),
}
impl Writer {
pub(crate) fn new(target: Target) -> Result<Self, Error> {
Ok(match target {
Target::Stdout => Self::Stdout,
Target::Stderr => Self::Stderr,
Target::Tcp(TcpTarget {
hostname,
port,
tls,
connect_timeout,
write_timeout,
buffer_size,
background_error_handler,
}) => {
let (tx, rx) = mpsc::sync_channel::<Op>(buffer_size);
thread::spawn(move || {
let mut conn = None;
while let Ok(op) = rx.recv() {
if conn.is_none() {
conn = handle_background_error(
background_error_handler,
TcpConnection::new(
&hostname,
port,
tls,
connect_timeout,
write_timeout,
),
);
}
if let Some(conn_ref) = &mut conn {
match op {
Op::Data(data) => {
if handle_background_error(
background_error_handler,
conn_ref.write_all(&data),
)
.is_none()
{
conn = None;
}
}
Op::Flush(tx) => {
if handle_background_error(
background_error_handler,
conn_ref.flush(),
)
.is_none()
{
conn = None;
}
let _ = tx.send(());
}
}
}
}
});
Self::Pipe(tx)
}
})
}
fn write(&self, op: Op) {
match op {
Op::Data(data) => match self {
Writer::Stdout => {
let _ = io::stdout().write_all(&data);
}
Writer::Stderr => {
let _ = io::stderr().write_all(&data);
}
Writer::Pipe(tx) => {
let _ = tx.send(Op::Data(data));
}
},
Op::Flush(flush_tx) => match self {
Writer::Stdout => {
let _ = io::stdout().flush();
let _ = flush_tx.send(());
}
Writer::Stderr => {
let _ = io::stderr().flush();
let _ = flush_tx.send(());
}
Writer::Pipe(tx) => {
let _ = tx.send(Op::Flush(flush_tx));
}
},
}
}
}
pub(crate) enum Op {
Data(Vec<u8>),
Flush(mpsc::SyncSender<()>),
}
#[derive(Clone, Debug)]
pub enum Target {
Stdout,
Stderr,
Tcp(TcpTarget),
}
#[derive(Clone, Debug)]
pub struct TcpTarget {
pub hostname: String,
pub port: u16,
pub tls: bool,
pub connect_timeout: Option<Duration>,
pub write_timeout: Option<Duration>,
pub buffer_size: usize,
pub background_error_handler: Option<fn(Error)>,
}
impl Default for TcpTarget {
fn default() -> Self {
Self {
hostname: "127.0.0.1".to_owned(),
port: 2202,
tls: false,
connect_timeout: None,
write_timeout: None,
buffer_size: 1_000,
background_error_handler: None,
}
}
}
enum TcpConnection {
Raw(TcpStream),
Tls(TlsStream<TcpStream>),
}
impl TcpConnection {
fn new(
hostname: &str,
port: u16,
tls: bool,
connect_timeout: Option<Duration>,
write_timeout: Option<Duration>,
) -> Result<Self, Error> {
let socket_addr = (hostname, port).to_socket_addrs().unwrap().next().unwrap();
let stream = match connect_timeout {
Some(timeout) => TcpStream::connect_timeout(&socket_addr, timeout),
None => TcpStream::connect(socket_addr),
}?;
stream.set_write_timeout(write_timeout)?;
Ok(if tls {
let connector = TlsConnector::new()?;
Self::Tls(connector.connect(hostname, stream)?)
} else {
Self::Raw(stream)
})
}
fn write_all(&mut self, data: &[u8]) -> Result<(), io::Error> {
match self {
TcpConnection::Raw(stream) => stream.write_all(data),
TcpConnection::Tls(stream) => stream.write_all(data),
}
}
fn flush(&mut self) -> Result<(), io::Error> {
match self {
TcpConnection::Raw(stream) => stream.flush(),
TcpConnection::Tls(stream) => stream.flush(),
}
}
}
fn handle_background_error<T, E: Into<Error>>(
handler: Option<fn(Error)>,
error: Result<T, E>,
) -> Option<T> {
match (handler, error) {
(Some(handler), Err(err)) => {
handler(err.into());
None
}
(_, Ok(value)) => Some(value),
_ => None,
}
}