#![allow(
unknown_lints,
renamed_and_removed_lints,
clippy::unknown_clippy_lints,
clippy::mutex_atomic
)]
use std::cell::RefCell;
use std::panic::{self, AssertUnwindSafe};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::thread::{Builder as ThreadBuilder, Thread};
use std::time::Duration;
use either::Either;
use fern::Dispatch;
use flume::{Receiver, Sender, TrySendError};
use log::{Level, LevelFilter, Log, Metadata, Record};
use spirit::extension::{Autojoin, Extensible, Extension};
use spirit::fragment::Transformation;
use spirit::AnyError;
thread_local! {
static LOG_THREAD_NAME: RefCell<Option<Arc<str>>> = RefCell::new(None);
static MY_THREAD_NAME: Arc<str> = {
Arc::from(thread::current().name().unwrap_or(super::UNKNOWN_THREAD))
};
}
fn reset_thread_name() {
LOG_THREAD_NAME.with(|log| *log.borrow_mut() = None);
}
pub(crate) fn get_thread_name(thread: &Thread) -> Either<&str, Arc<str>> {
LOG_THREAD_NAME.with(|n| {
n.borrow()
.as_ref()
.map(|n| Either::Right(Arc::clone(n)))
.unwrap_or_else(|| Either::Left(thread.name().unwrap_or(super::UNKNOWN_THREAD)))
})
}
struct FlushDone {
done: Mutex<bool>,
wakeup: Condvar,
}
impl FlushDone {
fn new() -> Self {
Self {
done: Mutex::new(false),
wakeup: Condvar::new(),
}
}
fn wait(&self) {
let mut done = self.done.lock().unwrap();
while !*done {
done = self.wakeup.wait(done).unwrap();
}
}
}
struct DropNotify(Arc<FlushDone>);
impl Drop for DropNotify {
fn drop(&mut self) {
*self.0.done.lock().unwrap() = true;
self.0.wakeup.notify_all();
}
}
enum Instruction {
Msg {
msg: String,
level: Level,
target: String,
module_path: Option<String>,
file: Option<String>,
line: Option<u32>,
thread: Arc<str>,
},
Flush(DropNotify),
}
impl Instruction {
fn process(self, dst: &dyn Log) {
match self {
Instruction::Msg {
msg,
level,
target,
module_path,
file,
line,
thread,
} => {
LOG_THREAD_NAME.with(|n| n.replace(Some(thread)));
dst.log(
&Record::builder()
.args(format_args!("{}", msg))
.level(level)
.target(&target)
.file(file.as_ref().map(|f| f as &str))
.line(line)
.module_path(module_path.as_ref().map(|m| m as &str))
.build(),
);
}
Instruction::Flush(done) => {
dst.flush();
drop(done);
}
}
}
}
struct SyncLogger {
logger: Box<dyn Log>,
lost_msgs: AtomicUsize,
}
struct Recv {
shared: Arc<SyncLogger>,
instructions: Receiver<Instruction>,
}
impl Recv {
fn run(&self) {
let mut panicked = false;
loop {
let result = panic::catch_unwind(AssertUnwindSafe(|| {
if panicked {
reset_thread_name();
self.shared.logger.log(
&Record::builder()
.args(format_args!("Panic in the logger thread, restarted"))
.level(Level::Error)
.target(module_path!())
.line(Some(line!()))
.module_path(Some(module_path!()))
.build(),
);
}
for i in &self.instructions {
let lost_msgs = self.shared.lost_msgs.swap(0, Ordering::Relaxed);
if lost_msgs > 0 {
reset_thread_name();
self.shared.logger.log(
&Record::builder()
.args(format_args!("Lost {} messages", lost_msgs))
.level(Level::Warn)
.target(module_path!())
.line(Some(line!()))
.module_path(Some(module_path!()))
.build(),
);
}
i.process(&*self.shared.logger);
}
}));
if result.is_ok() {
break;
}
panicked = true;
thread::sleep(Duration::from_millis(100));
}
self.shared.logger.flush();
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
#[non_exhaustive]
pub enum OverflowMode {
Block,
DropMsg,
DropMsgSilently,
AdaptiveDrop {
from_level: Level,
fill_limit: usize,
},
}
impl OverflowMode {
fn count_lost(self) -> bool {
matches!(
self,
OverflowMode::DropMsg | OverflowMode::AdaptiveDrop { .. }
)
}
}
pub struct AsyncLogger {
mode: OverflowMode,
ch: Sender<Instruction>,
shared: Arc<SyncLogger>,
}
impl AsyncLogger {
pub fn new(logger: Box<dyn Log>, buffer: usize, mode: OverflowMode) -> Self {
assert!(
buffer > 0,
"Zero-sized buffer for async logging makes no sense"
);
if let OverflowMode::AdaptiveDrop { fill_limit, .. } = mode {
assert!(fill_limit > 0);
assert!(fill_limit < buffer);
}
let shared = Arc::new(SyncLogger {
logger,
lost_msgs: AtomicUsize::new(0),
});
let (sender, receiver) = flume::bounded(buffer);
let recv = Recv {
shared: Arc::clone(&shared),
instructions: receiver,
};
ThreadBuilder::new()
.name("spirit-log-bg".to_owned())
.spawn(move || {
recv.run();
})
.expect("Failed to start logging thread");
AsyncLogger {
mode,
ch: sender,
shared,
}
}
}
impl Log for AsyncLogger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= log::max_level() && self.shared.logger.enabled(metadata)
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
if let OverflowMode::AdaptiveDrop {
from_level,
fill_limit,
} = self.mode
{
if record.level() >= from_level && self.ch.len() >= fill_limit {
self.shared.lost_msgs.fetch_add(1, Ordering::Relaxed);
return;
}
}
let i = Instruction::Msg {
file: record.file().map(ToOwned::to_owned),
level: record.level(),
line: record.line(),
module_path: record.module_path().map(ToOwned::to_owned),
msg: format!("{}", record.args()),
target: record.target().to_owned(),
thread: MY_THREAD_NAME.with(|n| Arc::clone(n)),
};
if self.mode == OverflowMode::Block {
self.ch.send(i).expect("Logging thread disappeared");
} else {
match self.ch.try_send(i) {
Err(TrySendError::Full(_)) if self.mode.count_lost() => {
self.shared.lost_msgs.fetch_add(1, Ordering::Relaxed);
}
Err(TrySendError::Full(_)) | Ok(()) => (),
_ => panic!("Logging thread disappeared"),
}
}
}
}
fn flush(&self) {
let done = Arc::new(FlushDone::new());
self.ch
.send(Instruction::Flush(DropNotify(Arc::clone(&done))))
.expect("Logger thread disappeared");
done.wait();
}
}
impl Drop for AsyncLogger {
fn drop(&mut self) {
self.flush();
}
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Background {
mode: OverflowMode,
buffer: usize,
}
impl Background {
pub fn new(buffer: usize, mode: OverflowMode) -> Self {
Background { mode, buffer }
}
}
impl<I, F> Transformation<Dispatch, I, F> for Background {
type OutputResource = (LevelFilter, Box<dyn Log>);
type OutputInstaller = I;
fn installer(&mut self, original: I, _name: &'static str) -> I {
original
}
fn transform(
&mut self,
dispatch: Dispatch,
_fragment: &F,
_name: &'static str,
) -> Result<(LevelFilter, Box<dyn Log>), AnyError> {
let (level, sync_logger) = dispatch.into_log();
let bg = AsyncLogger::new(sync_logger, self.buffer, self.mode);
Ok((level, Box::new(bg)))
}
}
pub struct FlushGuard;
impl FlushGuard {
pub fn flush() {
log::logger().flush();
}
}
impl Drop for FlushGuard {
fn drop(&mut self) {
Self::flush();
}
}
impl<E> Extension<E> for FlushGuard
where
E: Extensible<Ok = E>,
{
fn apply(self, builder: E) -> Result<E, AnyError> {
let builder = builder
.autojoin_bg_thread(Autojoin::TerminateAndJoin)
.keep_guard(self);
Ok(builder)
}
}