use std::{
collections::{HashMap, VecDeque},
io::Write, mem::MaybeUninit, str::FromStr, sync::Mutex
};
use std::sync::RwLock;
#[cfg(feature = "time")]
use time::format_description::OwnedFormatItem;
#[cfg(not(feature = "tokio"))]
use std::{fs::File, io::{LineWriter, Stdout}, sync::mpsc::Sender};
#[cfg(feature = "tokio")]
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter, Stdout},
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}
};
const CACHE_STR_ARRAY_SIZE: usize = 16;
const CACHE_STR_INIT_SIZE: usize = 256;
static mut ASYNC_LOGGER: MaybeUninit<AsyncLogger> = MaybeUninit::uninit();
#[cfg(debug_assertions)]
static mut INITED: bool = false;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type BoxPlugin = Box<dyn std::io::Write + Send + Sync + 'static>;
type BoxCustomFilter = Box<dyn CustomFilter>;
type LevelFilter = RwLock<HashMap<String, log::LevelFilter>>;
pub struct Builder {
level: log::LevelFilter,
log_file: String,
log_file_max: u32,
use_console: bool,
use_async: bool,
plugin: Option<BoxPlugin>,
filter: Option<BoxCustomFilter>,
}
pub trait CustomFilter: Send + Sync + 'static {
fn enabled(&self, record: &log::Record) -> bool;
}
struct AsyncLogger {
level: log::LevelFilter, #[cfg(feature = "time")]
dt_fmt: OwnedFormatItem, #[cfg(feature = "chrono")]
dt_fmt: String, log_file: String, max_size: u32, level_filter: LevelFilter, fmt_cache: Mutex<VecDeque<Vec<u8>>>, #[cfg(feature = "tokio")]
msg_tx: UnboundedSender<AsyncLogType>, filter: Option<BoxCustomFilter>, #[cfg(not(feature = "tokio"))]
logger_data: Mutex<LogData>, }
struct LogData {
log_size: u32, #[cfg(feature = "tokio")]
use_file: bool, #[cfg(feature = "tokio")]
console: Option<BufWriter<Stdout>>, #[cfg(not(feature = "tokio"))]
console: Option<LineWriter<Stdout>>, #[cfg(feature = "tokio")]
fileout: Option<BufWriter<File>>, #[cfg(not(feature = "tokio"))]
fileout: Option<LineWriter<File>>, #[cfg(not(feature = "tokio"))]
sender: Option<Sender<AsyncLogType>>, plugin: Option<BoxPlugin>, }
struct SkipAnsiColorIter<'a> {
data: &'a [u8],
pos: usize,
find_len: usize,
}
enum AsyncLogType {
Message(Vec<u8>),
Flush,
}
#[inline]
pub fn init_log_simple(level: &str, log_file: String, log_file_max: &str,
use_console: bool, use_async: bool) -> Result<()> {
init_log_inner(parse_level(level)?, log_file, parse_size(log_file_max)?,
use_console, use_async, None, None)
}
#[inline]
pub fn init_log(level: log::LevelFilter, log_file: String, log_file_max: u32,
use_console: bool, use_async: bool) -> Result<()> {
init_log_inner(level, log_file, log_file_max, use_console, use_async, None, None)
}
#[inline]
pub fn init_log_with_plugin<P>(level: log::LevelFilter, log_file: String,
log_file_max: u32, use_console: bool, use_async: bool,
plugin: P) -> Result<()>
where
P: std::io::Write + Send + Sync + 'static,
{
init_log_inner(level, log_file, log_file_max, use_console, use_async,
Some(Box::new(plugin)), None)
}
#[inline]
pub fn init_log_with_filter(level: log::LevelFilter, log_file: String,
log_file_max: u32, use_console: bool, use_async: bool,
filter: impl CustomFilter) -> Result<()> {
init_log_inner(level, log_file, log_file_max, use_console, use_async,
None, Some(Box::new(filter)))
}
pub fn init_log_with_all<P>(level: log::LevelFilter, log_file: String,
log_file_max: u32, use_console: bool, use_async: bool,
plugin: P, filter: impl CustomFilter) -> Result<()>
where
P: std::io::Write + Send + Sync + 'static,
{
init_log_inner(level, log_file, log_file_max, use_console, use_async,
Some(Box::new(plugin)), Some(Box::new(filter)))
}
pub fn set_level(target: String, level: log::LevelFilter) {
if let Ok(mut f) = get_async_logger().level_filter.write() {
f.insert(target, level);
}
}
pub fn parse_level(level: &str) -> Result<log::LevelFilter> {
match log::LevelFilter::from_str(level) {
Ok(num) => Ok(num),
Err(_) => Err(format!("can't parse log level: {level}").into()),
}
}
pub fn parse_size(size: &str) -> Result<u32> {
match size.parse() {
Ok(n) => Ok(n),
Err(_) => match size[..size.len() - 1].parse() {
Ok(n) => {
let s = size.as_bytes();
match s[s.len() - 1] {
b'b' | b'B' => Ok(n),
b'k' | b'K' => Ok(n * 1024),
b'm' | b'M' => Ok(n * 1024 * 1024),
b'g' | b'G' => Ok(n * 1024 * 1024 * 1024),
_ => Err(format!("parse size error, unit is unknown: {size}").into()),
}
},
Err(e) => Err(e.into()),
}
}
}
#[cfg(not(feature = "tokio"))]
impl AsyncLogger {
fn write(&self, msg: &[u8]) {
let mut logger_data = match self.logger_data.lock() {
Ok(v) => v,
Err(e) => {
eprint!("log mutex lock failed: {e:?}");
return;
}
};
if let Some(ref mut console) = logger_data.console {
console.write_all(msg).expect("write log to console fail");
}
if logger_data.log_size > self.max_size {
let mut log_file_closed = false;
if let Some(ref mut fileout) = logger_data.fileout {
fileout.flush().expect("flush log file fail");
logger_data.fileout.take();
log_file_closed = true;
}
if log_file_closed {
let bak = format!("{}.bak", self.log_file);
std::fs::remove_file(&bak).unwrap_or_default();
std::fs::rename(&self.log_file, &bak).expect("backup log file fail");
let f = std::fs::OpenOptions::new()
.write(true)
.create(true)
.open(&self.log_file)
.expect("reopen log file fail");
logger_data.fileout = Some(LineWriter::new(f));
logger_data.log_size = 0;
}
}
if let Some(ref mut fileout) = logger_data.fileout {
let ws = write_text(fileout, msg).unwrap();
logger_data.log_size += ws as u32;
}
if let Some(plugin) = &mut logger_data.plugin {
plugin.write_all(msg).expect("write log to plugin fail");
}
}
fn flush_inner(&self) {
let mut logger_data = match self.logger_data.lock() {
Ok(v) => v,
Err(e) => {
eprint!("log mutex lock failed: {e:?}");
return;
}
};
if let Some(ref mut console) = logger_data.console {
console.flush().expect("flush log console error");
}
if let Some(ref mut fileout) = logger_data.fileout {
fileout.flush().expect("flush log file error");
}
}
}
impl log::Log for AsyncLogger {
fn enabled(&self, metadata: &log::Metadata) -> bool {
if metadata.level() <= self.level {
if let Ok(level_filters) = self.level_filter.read() {
let mut target = metadata.target();
while !target.is_empty() {
if let Some(level) = level_filters.get(target) {
return metadata.level() <= *level;
}
target = match target.rfind("::") {
Some(rpos) => &target[..rpos],
None => ""
};
}
return true;
}
}
false
}
#[cfg(feature = "tokio")]
fn log(&self, record: &log::Record) {
if !self.enabled(record.metadata()) { return; }
if let Some(filter) = &self.filter {
if !filter.enabled(record) { return; }
}
#[cfg(feature = "time")]
unsafe { time::util::local_offset::set_soundness(time::util::local_offset::Soundness::Unsound); }
#[cfg(feature = "time")]
let now = time::OffsetDateTime::now_local().unwrap().format(&self.dt_fmt).unwrap();
#[cfg(feature = "chrono")]
let now = chrono::Local::now().format(&self.dt_fmt);
let mut msg = get_msg_from_cache();
if self.level >= log::LevelFilter::Debug {
write!(&mut msg, "[\x1b[36m{now}\x1b[0m] [{}{:5}\x1b[0m] [{}::{}] - {}\n",
level_color(record.level()),
record.level(),
record.target(),
record.line().unwrap_or(0),
record.args()).unwrap();
} else {
write!(&mut msg, "[{now}] [{:5}] - {}\n", record.level(), record.args()).unwrap();
};
if let Err(e) = get_async_logger().msg_tx.send(AsyncLogType::Message(msg)) {
eprintln!("failed in log::Log.log, {e:?}");
}
}
#[cfg(not(feature = "tokio"))]
fn log(&self, record: &log::Record) {
if !self.enabled(record.metadata()) { return; }
if let Some(filter) = &self.filter {
if !filter.enabled(record) { return; }
}
#[cfg(feature = "time")]
unsafe { time::util::local_offset::set_soundness(time::util::local_offset::Soundness::Unsound); }
#[cfg(feature = "time")]
let now = time::OffsetDateTime::now_local().unwrap().format(&self.dt_fmt).unwrap();
#[cfg(feature = "chrono")]
let now = chrono::Local::now().format(&self.dt_fmt);
let mut msg = get_msg_from_cache();
if self.level >= log::LevelFilter::Debug {
write!(&mut msg, "[\x1b[36m{now}\x1b[0m] [{}{:5}\x1b[0m] [{}::{}] - {}\n",
level_color(record.level()),
record.level(),
record.target(),
record.line().unwrap_or(0),
record.args()).unwrap();
} else {
write!(&mut msg, "[{now}] [{:5}] - {}\n", record.level(), record.args()).unwrap();
};
match self.logger_data.lock() {
Ok(logger_data) => {
if let Some(ref sender) = logger_data.sender {
sender.send(AsyncLogType::Message(msg)).unwrap();
return;
}
},
Err(e) => {
eprint!("log mutex lock failed: {e:?}");
return;
}
}
self.write(&msg);
put_msg_to_cache(msg);
}
#[cfg(feature = "tokio")]
fn flush(&self) {
tokio::spawn(async move {
if let Err(e) = get_async_logger().msg_tx.send(AsyncLogType::Flush) {
eprintln!("failed in log send to channel: {e:?}");
}
});
}
#[cfg(not(feature = "tokio"))]
fn flush(&self) {
if let Ok(logger_data) = self.logger_data.lock() {
if let Some(ref sender) = logger_data.sender {
if let Err(e) = sender.send(AsyncLogType::Flush) {
eprint!("failed in log::flush: {e:?}");
}
} else {
drop(logger_data);
self.flush_inner();
}
}
}
}
impl Builder {
#[inline]
pub fn new() -> Self {
Self {
level: log::LevelFilter::Info,
log_file: String::new(),
log_file_max: 10 * 1024 * 1024,
use_console: true,
use_async: true,
plugin: None,
filter: None,
}
}
#[inline]
pub fn builder(self) -> Result<()> {
init_log_inner(self.level, self.log_file, self.log_file_max,
self.use_console, self.use_async, self.plugin, self.filter)
}
#[inline]
pub fn level(mut self, level: log::LevelFilter) -> Self {
self.level = level;
self
}
#[inline]
pub fn log_file(mut self, log_file: String) -> Self {
self.log_file = log_file;
self
}
#[inline]
pub fn log_file_max(mut self, log_file_max: u32) -> Self {
self.log_file_max = log_file_max;
self
}
#[inline]
pub fn use_console(mut self, use_console: bool) -> Self {
self.use_console = use_console;
self
}
#[inline]
pub fn use_async(mut self, use_async: bool) -> Self {
self.use_async = use_async;
self
}
#[inline]
pub fn level_str(mut self, level: &str) -> Result<Self> {
self.level = parse_level(level)?;
Ok(self)
}
#[inline]
pub fn log_file_max_str(mut self, log_file_max: &str) -> Result<Self> {
self.log_file_max = parse_size(log_file_max)?;
Ok(self)
}
pub fn plugin<T: std::io::Write + Send + Sync + 'static>(mut self, plugin: T) -> Self {
self.plugin = Some(Box::new(plugin));
self
}
pub fn filter(mut self, filter: impl CustomFilter) -> Self {
self.filter = Some(Box::new(filter));
self
}
}
impl<F: Fn(&log::Record) -> bool + Send + Sync + 'static> CustomFilter for F {
fn enabled(&self, record: &log::Record) -> bool {
self(record)
}
}
impl<'a> SkipAnsiColorIter<'a> {
pub fn new(data: &'a [u8]) -> Self {
let find_len = if data.len() > 3 {
data.len() - 3
} else {
0
};
SkipAnsiColorIter {
data,
pos: 0,
find_len,
}
}
}
impl<'a> Iterator for SkipAnsiColorIter<'a> {
type Item = &'a [u8];
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let (mut pos, find_len, data) = (self.pos, self.find_len, self.data);
while pos < find_len {
unsafe {
if *data.get_unchecked(pos) != 0x1b || *data.get_unchecked(pos + 1) != b'[' {
pos += 1;
continue;
}
let n = if *data.get_unchecked(pos + 3) == b'm' { 4 } else { 5 };
let p = self.pos;
self.pos = pos + n;
return Some(&data[p..pos]);
}
}
let dl = data.len();
if pos < dl {
let p = self.pos;
self.pos = dl;
return Some(&data[p..dl]);
}
None
}
}
#[cfg(feature = "tokio")]
pub fn init_log_inner(
level: log::LevelFilter,
log_file: String,
log_file_max: u32,
use_console: bool,
_use_async: bool,
plugin: Option<BoxPlugin>,
filter: Option<BoxCustomFilter>
) -> Result<()>
{
#[cfg(debug_assertions)]
debug_check_init();
log::set_max_level(level);
#[cfg(feature = "time")]
let dt_fmt = if level >= log::LevelFilter::Debug {
"[month]-[day] [hour]:[minute]:[second]"
} else {
"[year]-[month]-[day] [hour]:[minute]:[second]"
};
#[cfg(feature = "time")]
let dt_fmt = time::format_description::parse_owned::<2>(dt_fmt).unwrap();
#[cfg(feature = "chrono")]
let dt_fmt = if level >= log::LevelFilter::Debug {
"%m-%d %H:%M:%S"
} else {
"%Y-%m-%d %H:%M:%S"
}.to_owned();
let use_file = !log_file.is_empty();
let console = if use_console {
Some(BufWriter::new(tokio::io::stdout()))
} else {
None
};
let (tx, rx) = unbounded_channel();
let logger = AsyncLogger {
level,
dt_fmt,
log_file,
max_size: log_file_max,
level_filter: RwLock::new(HashMap::new()),
fmt_cache: Mutex::new(VecDeque::with_capacity(CACHE_STR_ARRAY_SIZE)),
msg_tx: tx,
filter,
};
unsafe {
#[cfg(debug_assertions)]
{
debug_assert!(!INITED);
INITED = true;
}
ASYNC_LOGGER.write(logger);
}
log::set_logger(get_async_logger()).expect("init_log call set_logger error");
tokio::spawn(write_async(LogData {
log_size: 0,
use_file,
console,
fileout: None,
plugin,
}, rx));
Ok(())
}
#[cfg(not(feature = "tokio"))]
fn init_log_inner(
level: log::LevelFilter,
log_file: String,
log_file_max: u32,
use_console: bool,
use_async: bool,
plugin: Option<BoxPlugin>,
filter: Option<BoxCustomFilter>
) -> Result<()>
{
#[cfg(debug_assertions)]
debug_check_init();
log::set_max_level(level);
#[cfg(feature = "time")]
let dt_fmt = if level >= log::LevelFilter::Debug {
"[month]-[day] [hour]:[minute]:[second]"
} else {
"[year]-[month]-[day] [hour]:[minute]:[second]"
};
#[cfg(feature = "time")]
let dt_fmt = time::format_description::parse_owned::<2>(dt_fmt).unwrap();
#[cfg(feature = "chrono")]
let dt_fmt = if level >= log::LevelFilter::Debug {
"%m-%d %H:%M:%S"
} else {
"%Y-%m-%d %H:%M:%S"
}.to_owned();
let console = if use_console {
Some(LineWriter::new(std::io::stdout()))
} else {
None
};
let (fileout, log_size) = if !log_file.is_empty() {
let f = std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(&log_file)?;
let log_size = std::fs::metadata(&log_file)?.len() as u32;
let fileout = Some(LineWriter::new(f));
(fileout, log_size)
} else {
(None, 0)
};
let sender = if use_async {
let (sender, receiver) = std::sync::mpsc::channel::<AsyncLogType>();
std::thread::spawn(move || loop {
match receiver.recv() {
Ok(data) => match data {
AsyncLogType::Message(msg) => {
get_async_logger().write(&msg);
put_msg_to_cache(msg);
},
AsyncLogType::Flush => get_async_logger().flush_inner(),
},
Err(e) => eprintln!("logger channel recv error: {}", e),
}
});
Some(sender)
} else {
None
};
let logger = AsyncLogger {
level,
dt_fmt,
log_file,
max_size: log_file_max,
level_filter: RwLock::new(HashMap::new()),
fmt_cache: Mutex::new(VecDeque::with_capacity(CACHE_STR_ARRAY_SIZE)),
filter,
logger_data: Mutex::new(LogData {
log_size,
console,
fileout,
sender,
plugin,
}),
};
unsafe {
#[cfg(debug_assertions)]
{
debug_assert!(!INITED);
INITED = true;
}
ASYNC_LOGGER.write(logger);
}
log::set_logger(get_async_logger()).expect("init_log call set_logger error");
Ok(())
}
#[cfg(not(feature = "tokio"))]
fn write_text(w: &mut LineWriter<File>, msg: &[u8]) -> std::io::Result<usize> {
let mut write_len = 0;
for item in SkipAnsiColorIter::new(msg) {
write_len += item.len();
w.write_all(item)?;
}
let len = msg.len();
if len > 0 && (msg[len - 1] == b'\n' || msg[len - 1] == b'\r') {
w.flush()?;
}
Ok(write_len)
}
#[cfg(feature = "tokio")]
async fn write_async(mut log_data: LogData, mut rx: UnboundedReceiver<AsyncLogType>) {
let async_logger = get_async_logger();
if log_data.use_file && log_data.fileout.is_none() {
let (f, size) = open_log_file(&async_logger.log_file, true).await;
log_data.fileout = Some(f);
log_data.log_size = size;
}
while let Some(data) = rx.recv().await {
match data {
AsyncLogType::Message(msg) => {
write_to_log(&mut log_data, &msg).await;
put_msg_to_cache(msg);
}
AsyncLogType::Flush => {
if let Some(ref mut console) = log_data.console {
if let Err(e) = console.flush().await {
println!("flush log to console failed: {e:?}");
}
}
if let Some(ref mut fileout) = log_data.fileout {
if let Err(e) = fileout.flush().await {
println!("flush log to file failed: {e:?}");
}
}
}
}
}
}
#[cfg(feature = "tokio")]
async fn write_text(w: &mut BufWriter<File>, msg: &[u8]) -> std::io::Result<usize> {
let mut write_len = 0;
for item in SkipAnsiColorIter::new(msg) {
write_len += item.len();
w.write_all(item).await?;
}
let len = msg.len();
if len > 0 && (msg[len - 1] == b'\n' || msg[len - 1] == b'\r') {
w.flush().await?;
}
Ok(write_len)
}
#[cfg(feature = "tokio")]
async fn open_log_file(log_file: &str, append: bool) -> (BufWriter<File>, u32) {
let f = tokio::fs::OpenOptions::new()
.append(append)
.write(true)
.create(true)
.open(log_file)
.await
.expect("reopen log file fail");
let size = if append {
tokio::fs::metadata(log_file).await.map_or_else(|_| 0, |m| m.len())
} else {
0
};
return (BufWriter::new(f), size as u32);
}
#[cfg(feature = "tokio")]
async fn write_to_log(log_data: &mut LogData, msg: &[u8]) {
if msg.is_empty() { return; }
if let Some(ref mut console) = log_data.console {
if console.write_all(&msg).await.is_ok() {
let c = msg[msg.len() - 1];
if c == b'\n' || c == b'\r' {
let _ = console.flush().await;
}
}
}
let async_logger = get_async_logger();
if log_data.log_size > async_logger.max_size {
let has_file = match log_data.fileout {
Some(ref mut fileout) => {
if let Err(e) = fileout.flush().await {
eprintln!("failed in log flush log file: {e:?}");
}
true
}
None => false
};
if has_file {
log_data.fileout.take();
let bak = format!("{}.bak", async_logger.log_file);
let _ = tokio::fs::remove_file(&bak).await;
match tokio::fs::rename(&async_logger.log_file, &bak).await {
Ok(_) => {
log_data.fileout = Some(open_log_file(&async_logger.log_file, false).await.0);
log_data.log_size = 0;
}
Err(e) => eprintln!("failed in log rename file: {e:?}"),
}
}
}
if let Some(ref mut fileout) = log_data.fileout {
match write_text(fileout, &msg).await {
Ok(size) => log_data.log_size += size as u32,
Err(e) => eprintln!("failed in write to file: {e:?}"),
}
}
if let Some(ref mut plugin) = log_data.plugin {
if let Err(e) = plugin.write_all(&msg) {
eprintln!("failed in log plugin: {e:?}");
}
}
}
fn get_async_logger() -> &'static AsyncLogger {
unsafe {
#[cfg(debug_assertions)]
debug_assert!(INITED);
ASYNC_LOGGER.assume_init_ref()
}
}
fn level_color(level: log::Level) -> &'static str {
const RED: &str = "\x1b[31m";
const GREEN: &str = "\x1b[32m";
const YELLOW: &str = "\x1b[33m";
const BLUE: &str = "\x1b[34m";
const MAGENTA: &str = "\x1b[35m";
match level {
log::Level::Trace => GREEN,
log::Level::Debug => YELLOW,
log::Level::Info => BLUE,
log::Level::Warn => MAGENTA,
log::Level::Error => RED,
}
}
fn get_msg_from_cache() -> Vec<u8> {
if let Ok(mut fmt_cache) = get_async_logger().fmt_cache.lock() {
if let Some(vec) = fmt_cache.pop_back() {
return vec;
}
}
Vec::with_capacity(CACHE_STR_INIT_SIZE)
}
fn put_msg_to_cache(mut value: Vec<u8>) {
if value.capacity() <= CACHE_STR_INIT_SIZE {
value.clear();
if let Ok(mut fmt_cache) = get_async_logger().fmt_cache.lock() {
if fmt_cache.len() < fmt_cache.capacity() {
fmt_cache.push_back(value);
}
}
}
}
#[cfg(debug_assertions)]
fn debug_check_init() {
use std::sync::atomic::{AtomicBool, Ordering};
static INITED: AtomicBool = AtomicBool::new(false);
if let Err(true) = INITED.compare_exchange(false, true, Ordering::Release, Ordering::Relaxed) {
panic!("init_log must run once!");
}
}