use crate::log_level::LogLevel;
#[cfg(not(miri))]
use crate::sink::PlatformSink;
use crate::tui::TuiMetrics;
#[cfg(not(miri))]
use crate::tui::spawn_tui_thread;
use crossbeam_queue::ArrayQueue;
use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::sync::{Arc, LazyLock, Mutex};
use std::thread;
#[cfg(not(miri))]
use std::time::Duration;
const RING_BUFFER_CAPACITY: usize = 65_536;
#[cfg(not(miri))]
const MAX_DRAIN_BATCH_SIZE: usize = 64;
#[derive(Debug, Clone)]
pub struct LogEvent {
pub level: LogLevel,
pub level_num: u8,
pub log: crate::log::Log,
}
pub struct LockFreeEngine {
queue: Arc<ArrayQueue<LogEvent>>,
shutdown_flag: Arc<AtomicBool>,
metrics: Arc<TuiMetrics>,
filter_level: AtomicU8,
flusher_thread_handle: Option<thread::Thread>,
flusher_join: Mutex<Option<thread::JoinHandle<()>>>,
}
impl fmt::Debug for LockFreeEngine {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LockFreeEngine")
.field("queue", &self.queue)
.field("shutdown_flag", &self.shutdown_flag)
.field("metrics", &self.metrics)
.field("filter_level", &self.filter_level)
.field(
"flusher_thread_handle",
&self
.flusher_thread_handle
.as_ref()
.map(thread::Thread::id),
)
.finish_non_exhaustive()
}
}
pub static ENGINE: LazyLock<LockFreeEngine> =
LazyLock::new(|| LockFreeEngine::new(RING_BUFFER_CAPACITY));
impl LockFreeEngine {
#[must_use]
pub fn new(capacity: usize) -> Self {
let queue = Arc::new(ArrayQueue::new(capacity));
let shutdown_flag = Arc::new(AtomicBool::new(false));
let metrics = Arc::new(TuiMetrics::default());
let filter_level = AtomicU8::new(0);
#[cfg(not(miri))]
let flusher_handle = {
let flusher_queue = queue.clone();
let flusher_shutdown = shutdown_flag.clone();
let handle = thread::Builder::new()
.name("rlg-flusher".into())
.spawn(move || {
use std::io::Write;
let mut sink = PlatformSink::native();
let mut fmt_buf = Vec::with_capacity(512);
loop {
let mut batch: [Option<LogEvent>;
MAX_DRAIN_BATCH_SIZE] =
std::array::from_fn(|_| None);
let mut count = 0;
while count < MAX_DRAIN_BATCH_SIZE {
match flusher_queue.pop() {
Some(event) => {
batch[count] = Some(event);
count += 1;
}
None => break,
}
}
for event in batch.iter().flatten() {
fmt_buf.clear();
let _ = writeln!(fmt_buf, "{}", &event.log);
sink.emit(event.level.as_str(), &fmt_buf);
}
if flusher_shutdown.load(Ordering::Relaxed)
&& flusher_queue.is_empty()
{
break;
}
thread::park_timeout(Duration::from_millis(5));
}
})
.expect(
"Failed to spawn rlg-flusher background thread",
);
if std::env::var("RLG_TUI")
.map(|v| v == "1")
.unwrap_or(false)
{
spawn_tui_thread(
metrics.clone(),
shutdown_flag.clone(),
);
}
Some(handle)
};
#[cfg(miri)]
let flusher_handle: Option<thread::JoinHandle<()>> = None;
let flusher_thread_handle =
flusher_handle.as_ref().map(|h| h.thread().clone());
Self {
queue,
shutdown_flag,
metrics,
filter_level,
flusher_thread_handle,
flusher_join: Mutex::new(flusher_handle),
}
}
pub fn ingest(&self, event: LogEvent) {
if event.level_num < self.filter_level.load(Ordering::Acquire) {
return;
}
self.metrics.inc_events();
self.metrics.inc_level(event.level);
if event.level_num >= LogLevel::ERROR.to_numeric() {
self.metrics.inc_errors();
}
if let Err(rejected) = self.queue.push(event) {
self.metrics.inc_dropped();
let mut to_push = rejected;
for _ in 0..3 {
let _ = self.queue.pop();
match self.queue.push(to_push) {
Ok(()) => break,
Err(e) => to_push = e,
}
}
}
if let Some(thread) = &self.flusher_thread_handle {
thread.unpark();
}
}
pub fn set_filter(&self, level: u8) {
self.filter_level.store(level, Ordering::Release);
}
#[must_use]
pub fn filter_level(&self) -> u8 {
self.filter_level.load(Ordering::Relaxed)
}
pub fn inc_format(&self, format: crate::log_format::LogFormat) {
self.metrics.inc_format(format);
}
pub fn inc_spans(&self) {
self.metrics.inc_spans();
}
pub fn dec_spans(&self) {
self.metrics.dec_spans();
}
#[must_use]
pub fn active_spans(&self) -> usize {
self.metrics.active_spans.load(Ordering::Relaxed)
}
pub fn apply_config(&self, config: &crate::config::Config) {
self.set_filter(config.log_level.to_numeric());
}
pub fn shutdown(&self) {
self.shutdown_flag.store(true, Ordering::SeqCst);
if let Some(thread) = &self.flusher_thread_handle {
thread.unpark();
}
if let Ok(mut guard) = self.flusher_join.lock()
&& let Some(handle) = guard.take()
{
let _ = handle.join();
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct FastSerializer;
impl FastSerializer {
pub fn append_u64(buf: &mut Vec<u8>, val: u64) {
let mut buffer = itoa::Buffer::new();
buf.extend_from_slice(buffer.format(val).as_bytes());
}
pub fn append_f64(buf: &mut Vec<u8>, val: f64) {
let mut buffer = ryu::Buffer::new();
buf.extend_from_slice(buffer.format(val).as_bytes());
}
}