use arc_swap::ArcSwap;
use concat_strs_derive::concat_strs;
pub use log::{
debug,
error,
info,
log,
log_enabled,
logger,
trace,
warn,
Level,
LevelFilter,
Record,
};
use std::borrow::Cow;
use std::fmt::Display;
use std::hash::{
BuildHasher,
Hash,
Hasher,
};
use std::io::{
stderr,
Error as IoError,
Write,
};
use std::sync::atomic::{
AtomicBool,
AtomicUsize,
Ordering,
};
use std::sync::Arc;
use std::time::{
Duration,
Instant,
};
use time::format_description::OwnedFormatItem;
use time::OffsetDateTime;
use crossbeam_channel::{
bounded,
unbounded,
Receiver,
RecvTimeoutError,
Sender,
TrySendError,
};
use hashbrown::{
DefaultHashBuilder,
HashMap,
};
use log::{
kv::Key,
set_boxed_logger,
set_max_level,
Log,
Metadata,
SetLoggerError,
};
use tm::{
duration,
now,
to_utc,
Time,
};
mod tm {
use super::*;
pub type Time = std::time::SystemTime;
#[inline]
pub fn now() -> Time {
std::time::SystemTime::now()
}
#[inline]
pub fn to_utc(time: Time) -> OffsetDateTime {
time.into()
}
#[inline]
pub fn duration(
from: Time,
to: Time,
) -> Duration {
to.duration_since(from).unwrap_or_default()
}
}
struct LogMsg {
time: Time,
msg: Box<dyn Sync + Send + Display>,
level: Level,
target: String,
limit: u32,
limit_key: u64,
}
impl LogMsg {
fn write(
self,
filters: &Vec<Directive>,
appenders: &mut HashMap<&'static str, Box<dyn Write + Send>>,
root: &mut Box<dyn Write + Send>,
root_level: LevelFilter,
missed_log: &mut HashMap<u64, i64, nohash_hasher::BuildNoHashHasher<u64>>,
last_log: &mut HashMap<u64, Time, nohash_hasher::BuildNoHashHasher<u64>>,
time_format: &time::format_description::OwnedFormatItem,
) {
let writer = if let Some(filter) = filters
.iter()
.find(|x| self.target.starts_with(x.path))
{
if filter
.level
.map(|l| l < self.level)
.unwrap_or(false)
{
return;
}
filter
.appender
.and_then(|n| appenders.get_mut(n))
.unwrap_or(root)
} else {
if root_level < self.level {
return;
}
root
};
let msg = self.msg.to_string();
if msg.is_empty() {
return;
}
let now = now();
if self.limit > 0 {
let missed_entry = missed_log
.entry(self.limit_key)
.or_insert_with(|| 0);
if let Some(last) = last_log.get(&self.limit_key) {
if duration(*last, now) < Duration::from_millis(self.limit as u64) {
*missed_entry += 1;
return;
}
}
last_log.insert(self.limit_key, now);
let delay = duration(self.time, now);
let utc_datetime = to_utc(self.time);
let s = concat_strs!(
&utc_datetime
.format(&time_format)
.unwrap_or_else(|_| utc_datetime
.format(&time::format_description::well_known::Rfc3339)
.unwrap()),
' ',
&delay.as_millis().to_string(),
"ms ",
&missed_entry.to_string(),
' ',
&msg,
"\n"
);
if let Err(e) = writer.write_all(s.as_bytes()) {
eprintln!("logger write message failed: {e}");
}
*missed_entry = 0;
} else {
let delay = duration(self.time, now);
let utc_datetime = to_utc(self.time);
let s = concat_strs!(
&utc_datetime
.format(&time_format)
.unwrap_or_else(|_| utc_datetime
.format(&time::format_description::well_known::Rfc3339)
.unwrap()),
' ',
&delay.as_millis().to_string(),
"ms ",
&msg,
"\n"
);
if let Err(e) = writer.write_all(s.as_bytes()) {
eprintln!("logger write message failed: {e}");
};
}
}
}
enum LoggerInput {
LogMsg(LogMsg),
Flush,
}
#[derive(Debug)]
enum LoggerOutput {
Flushed,
FlushError(std::io::Error),
}
pub trait FtLogFormat: Send + Sync {
fn msg(
&self,
record: &Record,
) -> Box<dyn Send + Sync + Display>;
}
pub struct FtLogFormatter;
impl FtLogFormat for FtLogFormatter {
#[inline]
fn msg(
&self,
record: &Record,
) -> Box<dyn Send + Sync + Display> {
Box::new(Message {
level: record.level(),
file: record
.file_static()
.map(|s| Cow::Borrowed(s))
.or_else(|| record.file().map(|s| Cow::Owned(s.to_owned())))
.unwrap_or(Cow::Borrowed("")),
line: record.line(),
args: record
.args()
.as_str()
.map(|s| Cow::Borrowed(s))
.unwrap_or_else(|| Cow::Owned(record.args().to_string())),
})
}
}
struct Message {
level: Level,
file: Cow<'static, str>,
line: Option<u32>,
args: Cow<'static, str>,
}
impl Display for Message {
fn fmt(
&self,
f: &mut std::fmt::Formatter<'_>,
) -> std::fmt::Result {
f.write_str(&format!(
"{} [{}:{}] {}",
self.level,
self.file,
self.line.unwrap_or(0),
self.args
))
}
}
struct DiscardState {
last: ArcSwap<Instant>,
count: AtomicUsize,
}
pub struct LoggerGuard {
queue: Sender<LoggerInput>,
notification: Receiver<LoggerOutput>,
}
impl Drop for LoggerGuard {
fn drop(&mut self) {
self.queue
.send(LoggerInput::Flush)
.expect("logger queue closed when flushing, this is a bug");
self.notification
.recv()
.expect("logger notification closed, this is a bug");
}
}
pub struct Logger {
format: Box<dyn FtLogFormat>,
level: LevelFilter,
queue: Sender<LoggerInput>,
notification: Receiver<LoggerOutput>,
block: bool,
discard_state: Option<DiscardState>,
stopped: AtomicBool,
}
impl Logger {
pub fn init(self) -> Result<LoggerGuard, SetLoggerError> {
let guard = LoggerGuard {
queue: self.queue.clone(),
notification: self.notification.clone(),
};
set_max_level(self.level);
let boxed = Box::new(self);
set_boxed_logger(boxed).map(|_| guard)
}
}
impl Log for Logger {
#[inline]
fn enabled(
&self,
metadata: &Metadata,
) -> bool {
self.level >= metadata.level()
}
fn log(
&self,
record: &Record,
) {
let limit = record
.key_values()
.get(Key::from_str("limit"))
.and_then(|x| x.to_u64())
.unwrap_or(0) as u32;
let msg = self.format.msg(record);
let limit_key = if limit == 0 {
0
} else {
let mut b = DefaultHashBuilder::default().build_hasher();
if let Some(p) = record.module_path() {
p.as_bytes().hash(&mut b);
} else {
record
.file()
.unwrap_or("")
.as_bytes()
.hash(&mut b);
}
record.line().unwrap_or(0).hash(&mut b);
b.finish()
};
let msg = LoggerInput::LogMsg(LogMsg {
time: now(),
msg,
target: record.target().to_owned(),
level: record.level(),
limit,
limit_key,
});
if self.block {
if let Err(_) = self.queue.send(msg) {
let stop = self.stopped.load(Ordering::SeqCst);
if !stop {
eprintln!("logger queue closed when logging, this is a bug");
self.stopped.store(true, Ordering::SeqCst)
}
}
} else {
match self.queue.try_send(msg) {
Err(TrySendError::Full(_)) => {
if let Some(s) = &self.discard_state {
let count = s.count.fetch_add(1, Ordering::SeqCst);
if s.last.load().elapsed().as_secs() >= 5 {
eprintln!("Excessive log messages. Log omitted: {count}");
s.last.store(Arc::new(Instant::now()));
}
}
}
Err(TrySendError::Disconnected(_)) => {
let stop = self.stopped.load(Ordering::SeqCst);
if !stop {
eprintln!("logger queue closed when logging, this is a bug");
self.stopped.store(true, Ordering::SeqCst)
}
}
_ => (),
}
}
}
fn flush(&self) {
self.queue
.send(LoggerInput::Flush)
.expect("logger queue closed when flushing, this is a bug");
if let LoggerOutput::FlushError(err) = self
.notification
.recv()
.expect("logger notification closed, this is a bug")
{
eprintln!("Fail to flush: {err}");
}
}
}
struct BoundedChannelOption {
size: usize,
block: bool,
print: bool,
}
pub struct Builder {
format: Box<dyn FtLogFormat>,
time_format: Option<OwnedFormatItem>,
level: Option<LevelFilter>,
root_level: Option<LevelFilter>,
root: Box<dyn Write + Send>,
appenders: HashMap<&'static str, Box<dyn Write + Send + 'static>>,
filters: Vec<Directive>,
bounded_channel_option: Option<BoundedChannelOption>,
}
#[inline]
pub fn builder() -> Builder {
Builder::new()
}
struct Directive {
path: &'static str,
level: Option<LevelFilter>,
appender: Option<&'static str>,
}
impl Builder {
#[inline]
pub fn new() -> Builder {
Builder {
format: Box::new(FtLogFormatter),
level: None,
root_level: None,
root: Box::new(stderr()) as Box<dyn Write + Send>,
appenders: HashMap::new(),
filters: Vec::new(),
bounded_channel_option: Some(BoundedChannelOption {
size: 100_000,
block: false,
print: true,
}),
time_format: None,
}
}
#[inline]
pub fn format<F: FtLogFormat + 'static>(
mut self,
format: F,
) -> Builder {
self.format = Box::new(format);
self
}
#[inline]
pub fn bounded(
mut self,
size: usize,
block_when_full: bool,
) -> Builder {
self.bounded_channel_option = Some(BoundedChannelOption {
size,
block: block_when_full,
print: false,
});
self
}
#[inline]
pub fn print_omitted_count(
mut self,
print: bool,
) -> Builder {
self.bounded_channel_option.as_mut().map(|o| {
o.print = print;
});
self
}
#[inline]
pub fn unbounded(mut self) -> Builder {
self.bounded_channel_option = None;
self
}
#[inline]
pub fn appender(
mut self,
name: &'static str,
appender: impl Write + Send + 'static,
) -> Builder {
self.appenders.insert(name, Box::new(appender));
self
}
#[inline]
pub fn filter<A: Into<Option<&'static str>>, L: Into<Option<LevelFilter>>>(
mut self,
module_path: &'static str,
appender: A,
level: L,
) -> Builder {
let appender = appender.into();
let level = level.into();
if appender.is_some() || level.is_some() {
self.filters.push(Directive {
path: module_path,
appender: appender,
level: level,
});
}
self
}
#[inline]
pub fn max_log_level(
mut self,
level: LevelFilter,
) -> Builder {
self.level = Some(level);
self
}
#[inline]
pub fn root_log_level(
mut self,
level: LevelFilter,
) -> Builder {
self.root_level = Some(level);
self
}
pub fn build(self) -> Result<Logger, IoError> {
let time_format = self.time_format.unwrap_or_else(|| {
time::format_description::parse_owned::<1>(
"[year]-[month]-[day] [hour]:[minute]:[second].[subsecond digits:6]",
)
.unwrap()
});
let mut filters = self.filters;
filters.sort_by(|a, b| a.path.len().cmp(&b.path.len()));
filters.reverse();
for appender_name in filters.iter().filter_map(|x| x.appender) {
if !self.appenders.contains_key(appender_name) {
panic!("Appender {appender_name} not configured");
}
}
let global_level = self.level.unwrap_or(LevelFilter::Info);
let root_level = self.root_level.unwrap_or(global_level);
if global_level < root_level {
warn!("Logs with level more verbose than {global_level} will be ignored");
}
let (sync_sender, receiver) = match &self.bounded_channel_option {
None => unbounded(),
Some(option) => bounded(option.size),
};
let (notification_sender, notification_receiver) = bounded(1);
std::thread::Builder::new()
.name("logger".to_string())
.spawn(move || {
let mut appenders = self.appenders;
let filters = filters;
for filter in &filters {
if let Some(level) = filter.level {
if global_level < level {
warn!(
"Logs with level more verbose than {} will be ignored in `{}` ",
global_level, filter.path
);
}
}
}
let mut root = self.root;
let mut last_log = HashMap::default();
let mut missed_log = HashMap::default();
let mut last_flush = Instant::now();
let timeout = Duration::from_millis(200);
loop {
match receiver.recv_timeout(timeout) {
Ok(LoggerInput::LogMsg(log_msg)) => {
log_msg.write(
&filters,
&mut appenders,
&mut root,
root_level,
&mut missed_log,
&mut last_log,
&time_format,
);
}
Ok(LoggerInput::Flush) => {
let max = receiver.len();
'queue: for _ in 1..=max {
if let Ok(LoggerInput::LogMsg(msg)) = receiver.try_recv() {
msg.write(
&filters,
&mut appenders,
&mut root,
root_level,
&mut missed_log,
&mut last_log,
&time_format,
);
} else {
break 'queue;
}
}
let flush_result = appenders
.values_mut()
.chain([&mut root])
.find_map(|w| w.flush().err());
if let Some(error) = flush_result {
notification_sender
.send(LoggerOutput::FlushError(
error,
))
.expect("logger notification failed");
} else {
notification_sender
.send(LoggerOutput::Flushed)
.expect("logger notification failed");
}
}
Err(RecvTimeoutError::Timeout) => {
if last_flush.elapsed() > Duration::from_millis(1000) {
let flush_errors = appenders
.values_mut()
.chain([&mut root])
.filter_map(|w| w.flush().err());
for err in flush_errors {
log::warn!("Ftlog flush error: {}", err);
}
last_flush = Instant::now();
}
}
Err(e) => {
eprintln!("sender closed without sending a Quit first, this is a bug, {e}");
}
}
}
})?;
let block = self
.bounded_channel_option
.as_ref()
.map(|x| x.block)
.unwrap_or(false);
let print = self
.bounded_channel_option
.as_ref()
.map(|x| x.print)
.unwrap_or(false);
Ok(Logger {
format: self.format,
level: global_level,
queue: sync_sender,
notification: notification_receiver,
block,
discard_state: if block || !print {
None
} else {
Some(DiscardState {
last: ArcSwap::new(Arc::new(Instant::now())),
count: AtomicUsize::new(0),
})
},
stopped: AtomicBool::new(false),
})
}
pub fn try_init(self) -> Result<LoggerGuard, Box<dyn std::error::Error>> {
let logger = self.build()?;
Ok(logger.init()?)
}
}
impl Default for Builder {
#[inline]
fn default() -> Self {
Builder::new()
}
}