use parking_lot::{Condvar, Mutex, Once};
use std::{
io::Write,
sync::atomic::{AtomicBool, Ordering},
time::Duration,
};
use tracing::{Level, Metadata};
const ASYNC_FLUSH_THRESHOLD: usize = 16 * 1024;
const SYNC_FLUSH_THRESHOLD: usize = 768 * 1024;
const EMERGENCY_FLUSH_THRESHOLD: usize = 2 * 1024 * 1024;
const AUTOFLUSH_EVERY: Duration = Duration::from_millis(50);
const SYNC_FLUSH_LEVEL_THRESHOLD: Level = Level::ERROR;
const ON_EXIT_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);
static BUFFER: Mutex<Vec<u8>> = parking_lot::const_mutex(Vec::new());
static SPARE_BUFFER: Mutex<Vec<u8>> = parking_lot::const_mutex(Vec::new());
static ASYNC_FLUSH_CONDVAR: Condvar = Condvar::new();
static ENABLE_ASYNC_LOGGING: AtomicBool = AtomicBool::new(true);
fn flush_logs(mut buffer: parking_lot::lock_api::MutexGuard<parking_lot::RawMutex, Vec<u8>>) {
let mut spare_buffer = SPARE_BUFFER.lock();
std::mem::swap(&mut *spare_buffer, &mut *buffer);
std::mem::drop(buffer);
let stderr = std::io::stderr();
let mut stderr_lock = stderr.lock();
let _ = stderr_lock.write_all(&spare_buffer);
std::mem::drop(stderr_lock);
spare_buffer.clear();
}
fn log_autoflush_thread() {
let mut buffer = BUFFER.lock();
loop {
ASYNC_FLUSH_CONDVAR.wait_for(&mut buffer, AUTOFLUSH_EVERY);
loop {
flush_logs(buffer);
buffer = BUFFER.lock();
if buffer.len() >= ASYNC_FLUSH_THRESHOLD {
continue;
} else {
break;
}
}
}
}
#[cold]
fn initialize() {
std::thread::Builder::new()
.name("log-autoflush".to_owned())
.spawn(log_autoflush_thread)
.expect("thread spawning doesn't normally fail; qed");
let errcode = unsafe { libc::atexit(on_exit) };
assert_eq!(errcode, 0, "atexit failed while setting up the logger: {}", errcode);
}
extern "C" fn on_exit() {
ENABLE_ASYNC_LOGGING.store(false, Ordering::SeqCst);
if let Some(buffer) = BUFFER.try_lock_for(ON_EXIT_FLUSH_TIMEOUT) {
flush_logs(buffer);
}
}
pub struct MakeStderrWriter {
_dummy: (),
}
impl Default for MakeStderrWriter {
fn default() -> Self {
static ONCE: Once = Once::new();
ONCE.call_once(initialize);
MakeStderrWriter { _dummy: () }
}
}
impl tracing_subscriber::fmt::MakeWriter<'_> for MakeStderrWriter {
type Writer = StderrWriter;
fn make_writer(&self) -> Self::Writer {
StderrWriter::new(false)
}
fn make_writer_for(&self, meta: &Metadata<'_>) -> Self::Writer {
StderrWriter::new(*meta.level() <= SYNC_FLUSH_LEVEL_THRESHOLD)
}
}
pub struct StderrWriter {
buffer: Option<parking_lot::lock_api::MutexGuard<'static, parking_lot::RawMutex, Vec<u8>>>,
sync_flush_on_drop: bool,
original_len: usize,
}
impl StderrWriter {
fn new(mut sync_flush_on_drop: bool) -> Self {
if !ENABLE_ASYNC_LOGGING.load(Ordering::Relaxed) {
sync_flush_on_drop = true;
}
let buffer = BUFFER.lock();
StderrWriter { original_len: buffer.len(), buffer: Some(buffer), sync_flush_on_drop }
}
}
#[cold]
fn emergency_flush(buffer: &mut Vec<u8>, input: &[u8]) {
let stderr = std::io::stderr();
let mut stderr_lock = stderr.lock();
let _ = stderr_lock.write_all(buffer);
buffer.clear();
let _ = stderr_lock.write_all(input);
}
impl Write for StderrWriter {
fn write(&mut self, input: &[u8]) -> Result<usize, std::io::Error> {
let buffer = self.buffer.as_mut().expect("buffer is only None after `drop`; qed");
if buffer.len() + input.len() >= EMERGENCY_FLUSH_THRESHOLD {
emergency_flush(buffer, input);
} else {
buffer.extend_from_slice(input);
}
Ok(input.len())
}
fn write_all(&mut self, input: &[u8]) -> Result<(), std::io::Error> {
self.write(input).map(|_| ())
}
fn flush(&mut self) -> Result<(), std::io::Error> {
Ok(())
}
}
impl Drop for StderrWriter {
fn drop(&mut self) {
let buf = self.buffer.take().expect("buffer is only None after `drop`; qed");
if self.sync_flush_on_drop || buf.len() >= SYNC_FLUSH_THRESHOLD {
flush_logs(buf);
} else if self.original_len < ASYNC_FLUSH_THRESHOLD && buf.len() >= ASYNC_FLUSH_THRESHOLD {
ASYNC_FLUSH_CONDVAR.notify_one();
}
}
}