use arc_swap::ArcSwap;
use crossbeam_channel::{
bounded,
unbounded,
Receiver,
RecvTimeoutError,
Sender,
TrySendError,
};
pub use log::{
debug,
error,
info,
log,
log_enabled,
logger,
trace,
warn,
Level,
LevelFilter,
Record,
};
use log::{
set_boxed_logger,
set_max_level,
Log,
Metadata,
SetLoggerError,
};
use logmsg::LogMsg;
use std::{
borrow::Cow,
fmt::Display,
io::{
stderr,
Error as IoError,
Write,
},
sync::{
atomic::{
AtomicBool,
AtomicUsize,
Ordering,
},
Arc,
},
time::{
Duration,
Instant,
},
};
mod logmsg;
enum LoggerInput {
LogMsg(LogMsg),
Flush,
}
#[derive(Debug)]
enum LoggerOutput {
Flushed,
}
pub trait FtLogFormat: Send + Sync {
fn msg(
&self,
record: &Record,
) -> Box<dyn Send + Sync + Display>;
}
pub struct FtLogFormatter;
impl FtLogFormat for FtLogFormatter {
#[inline]
fn msg(
&self,
record: &Record,
) -> Box<dyn Send + Sync + Display> {
Box::new(Message {
level: record.level(),
file: record
.file_static()
.map(|s| Cow::Borrowed(s))
.or_else(|| record.file().map(|s| Cow::Owned(s.to_owned())))
.unwrap_or(Cow::Borrowed("")),
line: record.line(),
args: record
.args()
.as_str()
.map(|s| Cow::Borrowed(s))
.unwrap_or_else(|| Cow::Owned(record.args().to_string())),
})
}
}
struct Message {
level: Level,
file: Cow<'static, str>,
line: Option<u32>,
args: Cow<'static, str>,
}
impl Display for Message {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
f.write_str(&format!(
"{} [{}:{}] {}",
self.level,
self.file,
self.line.unwrap_or(0),
self.args
))
}
}
struct DiscardState {
last: ArcSwap<Instant>,
count: AtomicUsize,
}
pub struct LoggerGuard {
queue: Sender<LoggerInput>,
notification: Receiver<LoggerOutput>,
}
impl Drop for LoggerGuard {
fn drop(&mut self) {
self.queue
.send(LoggerInput::Flush)
.expect("logger queue closed when flushing, this is a bug");
self.notification
.recv()
.expect("logger notification closed, this is a bug");
}
}
pub struct Logger {
format: Box<dyn FtLogFormat>,
level: LevelFilter,
queue: Sender<LoggerInput>,
notification: Receiver<LoggerOutput>,
block: bool,
discard_state: Option<DiscardState>,
stopped: AtomicBool,
}
impl Logger {
pub fn init(self) -> Result<LoggerGuard, SetLoggerError> {
let guard = LoggerGuard {
queue: self.queue.clone(),
notification: self.notification.clone(),
};
set_max_level(self.level);
let boxed = Box::new(self);
set_boxed_logger(boxed).map(|_| guard)
}
}
impl Log for Logger {
#[inline]
fn enabled(
&self,
metadata: &Metadata,
) -> bool {
self.level >= metadata.level()
}
fn log(
&self,
record: &Record,
) {
let msg = self.format.msg(record);
let msg = LoggerInput::LogMsg(LogMsg {
msg,
target: record.target().to_owned(),
level: record.level(),
});
if self.block {
if let Err(_) = self.queue.send(msg) {
let stop = self.stopped.load(Ordering::SeqCst);
if !stop {
eprintln!("logger queue closed when logging, this is a bug");
self.stopped.store(true, Ordering::SeqCst)
}
}
} else {
match self.queue.try_send(msg) {
Err(TrySendError::Full(_)) => {
if let Some(s) = &self.discard_state {
let count = s.count.fetch_add(1, Ordering::SeqCst);
if s.last.load().elapsed().as_secs() >= 5 {
eprintln!("Excessive log messages. Log omitted: {count}");
s.last.store(Arc::new(Instant::now()));
}
}
}
Err(TrySendError::Disconnected(_)) => {
let stop = self.stopped.load(Ordering::SeqCst);
if !stop {
eprintln!("logger queue closed when logging, this is a bug");
self.stopped.store(true, Ordering::SeqCst)
}
}
_ => (),
}
}
}
fn flush(&self) {
let _ = self
.queue
.send(LoggerInput::Flush)
.map_err(|e| eprintln!("logger queue closed when flushing, this is a bug: {e}"));
}
}
struct BoundedChannelOption {
size: usize,
block: bool,
print: bool,
}
pub struct Builder {
format: Box<dyn FtLogFormat>,
level: Option<LevelFilter>,
root_level: Option<LevelFilter>,
root: Box<dyn Write + Send>,
filters: Vec<Directive>,
bounded_channel_option: Option<BoundedChannelOption>,
}
#[inline]
pub fn builder() -> Builder {
Builder::new()
}
struct Directive {
path: &'static str,
level: Option<LevelFilter>,
}
impl Builder {
#[inline]
pub fn new() -> Builder {
Builder {
format: Box::new(FtLogFormatter),
level: None,
root_level: None,
root: Box::new(stderr()) as Box<dyn Write + Send>,
filters: Vec::new(),
bounded_channel_option: Some(BoundedChannelOption {
size: 100_000,
block: false,
print: true,
}),
}
}
#[inline]
pub fn format<F: FtLogFormat + 'static>(
mut self,
format: F,
) -> Builder {
self.format = Box::new(format);
self
}
#[inline]
pub fn bounded(
mut self,
size: usize,
block_when_full: bool,
) -> Builder {
self.bounded_channel_option = Some(BoundedChannelOption {
size,
block: block_when_full,
print: false,
});
self
}
#[inline]
pub fn print_omitted_count(
mut self,
print: bool,
) -> Builder {
self.bounded_channel_option.as_mut().map(|o| {
o.print = print;
});
self
}
#[inline]
pub fn unbounded(mut self) -> Builder {
self.bounded_channel_option = None;
self
}
#[inline]
pub fn filter<A: Into<Option<&'static str>>, L: Into<Option<LevelFilter>>>(
mut self,
module_path: &'static str,
appender: A,
level: L,
) -> Builder {
let appender = appender.into();
let level = level.into();
if appender.is_some() || level.is_some() {
self.filters.push(Directive {
path: module_path,
level,
});
}
self
}
#[inline]
pub fn max_log_level(
mut self,
level: LevelFilter,
) -> Builder {
self.level = Some(level);
self
}
#[inline]
pub fn root_log_level(
mut self,
level: LevelFilter,
) -> Builder {
self.root_level = Some(level);
self
}
pub fn build(self) -> Result<Logger, IoError> {
let mut filters = self.filters;
filters.sort_by(|a, b| a.path.len().cmp(&b.path.len()));
filters.reverse();
let global_level = self.level.unwrap_or(LevelFilter::Info);
let root_level = self.root_level.unwrap_or(global_level);
if global_level < root_level {
warn!("Logs with level more verbose than {global_level} will be ignored");
}
let (sync_sender, receiver) = match &self.bounded_channel_option {
None => unbounded(),
Some(option) => bounded(option.size),
};
let (notification_sender, notification_receiver) = bounded(1);
std::thread::Builder::new()
.name("logger".to_string())
.spawn(move || {
let filters = filters;
for filter in &filters {
if let Some(level) = filter.level {
if global_level < level {
warn!(
"Logs with level more verbose than {} will be ignored in `{}` ",
global_level, filter.path
);
}
}
}
let mut root = self.root;
let mut last_flush = Instant::now();
let timeout = Duration::from_millis(200);
loop {
match receiver.recv_timeout(timeout) {
Ok(LoggerInput::LogMsg(log_msg)) => {
log_msg.write(
&filters, &mut root, root_level,
);
}
Ok(LoggerInput::Flush) => {
let max = receiver.len();
'queue: for _ in 1..=max {
if let Ok(LoggerInput::LogMsg(msg)) = receiver.try_recv() {
msg.write(
&filters, &mut root, root_level,
);
} else {
break 'queue;
}
}
notification_sender
.send(LoggerOutput::Flushed)
.expect("logger notification failed");
}
Err(RecvTimeoutError::Timeout) => {
if last_flush.elapsed() > Duration::from_millis(1000) {
last_flush = Instant::now();
}
}
Err(e) => {
eprintln!("sender closed without sending a Quit first, this is a bug, {e}");
}
}
}
})?;
let block = self
.bounded_channel_option
.as_ref()
.map(|x| x.block)
.unwrap_or(false);
let print = self
.bounded_channel_option
.as_ref()
.map(|x| x.print)
.unwrap_or(false);
Ok(Logger {
format: self.format,
level: global_level,
queue: sync_sender,
notification: notification_receiver,
block,
discard_state: if block || !print {
None
} else {
Some(DiscardState {
last: ArcSwap::new(Arc::new(Instant::now())),
count: AtomicUsize::new(0),
})
},
stopped: AtomicBool::new(false),
})
}
pub fn try_init(self) -> Result<LoggerGuard, Box<dyn std::error::Error>> {
let logger = self.build()?;
Ok(logger.init()?)
}
}
impl Default for Builder {
#[inline]
fn default() -> Self {
Builder::new()
}
}