use std::collections::VecDeque;
use std::fmt;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tracing::field::{Field, Visit};
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;
pub(crate) const HISTORY_CAPACITY: usize = 1000;
const BROADCAST_CAPACITY: usize = 4096;
#[derive(Clone, Debug, serde::Serialize)]
pub struct LogEntry {
pub timestamp: String,
pub level: String,
pub message: String,
}
#[derive(Clone)]
pub struct LogBuffer {
tx: broadcast::Sender<LogEntry>,
history: Arc<Mutex<VecDeque<LogEntry>>>,
}
impl Default for LogBuffer {
fn default() -> Self {
Self::new()
}
}
impl LogBuffer {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
Self {
tx,
history: Arc::new(Mutex::new(VecDeque::with_capacity(HISTORY_CAPACITY))),
}
}
pub fn subscribe(&self) -> broadcast::Receiver<LogEntry> {
self.tx.subscribe()
}
pub async fn history(&self) -> Vec<LogEntry> {
self.history_sync()
}
pub fn history_sync(&self) -> Vec<LogEntry> {
self.history
.lock()
.expect("log buffer history mutex poisoned")
.iter()
.cloned()
.collect()
}
pub fn push_sync(&self, level: &str, message: impl Into<String>) {
let entry = LogEntry {
timestamp: chrono::Utc::now().to_rfc3339(),
level: level.to_string(),
message: message.into(),
};
self.append_and_broadcast(entry);
}
fn append_and_broadcast(&self, entry: LogEntry) {
{
let mut hist = self
.history
.lock()
.expect("log buffer history mutex poisoned");
if hist.len() >= HISTORY_CAPACITY {
hist.pop_front();
}
hist.push_back(entry.clone());
}
let _ = self.tx.send(entry);
}
}
struct MessageVisitor {
message: String,
}
impl MessageVisitor {
fn new() -> Self {
Self {
message: String::new(),
}
}
}
impl Visit for MessageVisitor {
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
if field.name() == "message" {
self.message = format!("{:?}", value);
} else if self.message.is_empty() {
self.message = format!("{} = {:?}", field.name(), value);
}
}
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "message" {
self.message = value.to_string();
}
}
}
fn level_str(level: &Level) -> &'static str {
match *level {
Level::ERROR => "ERROR",
Level::WARN => "WARN",
Level::INFO => "INFO",
Level::DEBUG => "DEBUG",
Level::TRACE => "TRACE",
}
}
#[derive(Clone)]
pub struct LogCaptureLayer {
buffer: LogBuffer,
}
impl LogCaptureLayer {
pub fn new(buffer: LogBuffer) -> Self {
Self { buffer }
}
}
impl<S: Subscriber> Layer<S> for LogCaptureLayer {
fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
let mut visitor = MessageVisitor::new();
event.record(&mut visitor);
self.buffer
.push_sync(level_str(event.metadata().level()), visitor.message);
}
}
#[cfg(test)]
mod tests {
use super::*;
use tracing_subscriber::layer::SubscriberExt;
fn setup_tracing(buffer: &LogBuffer) -> tracing::subscriber::DefaultGuard {
let layer = LogCaptureLayer::new(buffer.clone());
let subscriber = tracing_subscriber::registry().with(layer);
tracing::subscriber::set_default(subscriber)
}
#[tokio::test]
async fn test_log_buffer_captures_entries() {
let buf = LogBuffer::new();
let mut rx = buf.subscribe();
let _guard = setup_tracing(&buf);
tracing::info!("hello from test");
let entry = rx.try_recv().unwrap();
assert_eq!(entry.level, "INFO");
assert!(entry.message.contains("hello from test"));
}
#[tokio::test]
async fn test_log_buffer_history() {
let buf = LogBuffer::new();
let _guard = setup_tracing(&buf);
tracing::info!("hist-1");
tracing::warn!("hist-2");
let hist = buf.history_sync();
assert_eq!(hist.len(), 2);
assert_eq!(hist[0].level, "INFO");
assert_eq!(hist[1].level, "WARN");
}
#[tokio::test]
async fn test_log_buffer_ring_capacity() {
let buf = LogBuffer::new();
for i in 0..HISTORY_CAPACITY + 100 {
buf.push_sync("INFO", format!("msg-{}", i));
}
let hist = buf.history_sync();
assert_eq!(hist.len(), HISTORY_CAPACITY);
assert!(hist[0].message.contains("msg-100"));
}
#[test]
fn test_log_capture_does_not_spawn_tasks_sync() {
let buf = LogBuffer::new();
for i in 0..50usize {
buf.push_sync("INFO", format!("event-{i}"));
}
let hist = buf.history_sync();
assert_eq!(hist.len(), 50);
}
#[tokio::test]
async fn test_log_buffer_async_history_matches_sync() {
let buf = LogBuffer::new();
buf.push_sync("DEBUG", "async-hist-a");
buf.push_sync("TRACE", "async-hist-b");
let async_hist = buf.history().await;
let sync_hist = buf.history_sync();
assert_eq!(async_hist.len(), sync_hist.len());
for (a, b) in async_hist.iter().zip(sync_hist.iter()) {
assert_eq!(a.level, b.level);
assert_eq!(a.message, b.message);
}
}
#[tokio::test]
async fn test_log_buffer_captures_debug_and_trace_levels() {
let buf = LogBuffer::new();
let _guard = setup_tracing(&buf);
tracing::debug!("debug-message");
tracing::trace!("trace-message");
let hist = buf.history_sync();
let debug_entry = hist.iter().find(|e| e.level == "DEBUG");
let trace_entry = hist.iter().find(|e| e.level == "TRACE");
assert!(debug_entry.is_some(), "DEBUG entry should be captured");
assert!(trace_entry.is_some(), "TRACE entry should be captured");
assert!(debug_entry.unwrap().message.contains("debug-message"));
assert!(trace_entry.unwrap().message.contains("trace-message"));
}
#[test]
fn test_log_buffer_default() {
let buf: LogBuffer = Default::default();
assert!(buf.history_sync().is_empty());
}
}