qrusty 0.20.7

A trusty priority queue server built with Rust
Documentation
// src/log_buffer.rs
//
// Implements: SYS-0015

//! In-memory log capture for real-time log streaming.
//!
//! Provides a custom `tracing` layer that captures log events into a bounded
//! ring buffer and broadcasts them to WebSocket subscribers.
//!
//! ## Design note
//!
//! `on_event` is a synchronous callback invoked by the tracing subscriber.
//! Spawning a new task per event (the previous design) created an unbounded
//! task storm under high log rates.  The history ring-buffer now uses a
//! `std::sync::Mutex` so that `on_event` can append synchronously without
//! any async machinery (SYS-0015).

use std::collections::VecDeque;
use std::fmt;
use std::sync::{Arc, Mutex};
use tokio::sync::broadcast;
use tracing::field::{Field, Visit};
use tracing::{Event, Level, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::Layer;

/// Maximum number of entries kept in the history ring buffer.
pub(crate) const HISTORY_CAPACITY: usize = 1000;

/// Broadcast channel capacity — large enough to absorb bursts.
const BROADCAST_CAPACITY: usize = 4096;

/// A single captured log entry.
#[derive(Clone, Debug, serde::Serialize)]
pub struct LogEntry {
    pub timestamp: String,
    pub level: String,
    pub message: String,
}

/// Shared log buffer that captures tracing events and broadcasts them.
///
/// The history ring-buffer uses `std::sync::Mutex` (not `tokio::sync::Mutex`)
/// so that entries can be appended synchronously from `on_event` without
/// spawning tasks (SYS-0015).
#[derive(Clone)]
pub struct LogBuffer {
    tx: broadcast::Sender<LogEntry>,
    history: Arc<Mutex<VecDeque<LogEntry>>>,
}

impl Default for LogBuffer {
    fn default() -> Self {
        Self::new()
    }
}

impl LogBuffer {
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(BROADCAST_CAPACITY);
        Self {
            tx,
            history: Arc::new(Mutex::new(VecDeque::with_capacity(HISTORY_CAPACITY))),
        }
    }

    /// Subscribe to live log events.
    pub fn subscribe(&self) -> broadcast::Receiver<LogEntry> {
        self.tx.subscribe()
    }

    /// Return a snapshot of the buffered history.
    pub fn history_sync(&self) -> Vec<LogEntry> {
        self.history
            .lock()
            .expect("log buffer history mutex poisoned")
            .iter()
            .cloned()
            .collect()
    }

    /// Push an entry synchronously (SYS-0015).
    ///
    /// Appends to the ring buffer and broadcasts without spawning any task.
    /// Called by `LogCaptureLayer::on_event` and from tests.
    pub fn push_sync(&self, level: &str, message: impl Into<String>) {
        let entry = LogEntry {
            timestamp: chrono::Utc::now().to_rfc3339(),
            level: level.to_string(),
            message: message.into(),
        };
        self.append_and_broadcast(entry);
    }

    /// Internal: append to ring buffer and broadcast.
    fn append_and_broadcast(&self, entry: LogEntry) {
        {
            let mut hist = self
                .history
                .lock()
                .expect("log buffer history mutex poisoned");
            if hist.len() >= HISTORY_CAPACITY {
                hist.pop_front();
            }
            hist.push_back(entry.clone());
        }
        // Ignore send errors (no subscribers).
        let _ = self.tx.send(entry);
    }
}

/// Visitor that extracts the `message` field from a tracing event.
struct MessageVisitor {
    message: String,
}

impl MessageVisitor {
    fn new() -> Self {
        Self {
            message: String::new(),
        }
    }
}

impl Visit for MessageVisitor {
    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
        if field.name() == "message" {
            self.message = format!("{:?}", value);
        } else if self.message.is_empty() {
            // Fallback: record first field
            self.message = format!("{} = {:?}", field.name(), value);
        }
    }

    fn record_str(&mut self, field: &Field, value: &str) {
        if field.name() == "message" {
            self.message = value.to_string();
        }
    }
}

fn level_str(level: &Level) -> &'static str {
    match *level {
        Level::ERROR => "ERROR",
        Level::WARN => "WARN",
        Level::INFO => "INFO",
        Level::DEBUG => "DEBUG",
        Level::TRACE => "TRACE",
    }
}

/// A `tracing_subscriber::Layer` that captures events into a [`LogBuffer`].
///
/// Appends synchronously from `on_event`; no tasks are spawned (SYS-0015).
#[derive(Clone)]
pub struct LogCaptureLayer {
    buffer: LogBuffer,
}

impl LogCaptureLayer {
    pub fn new(buffer: LogBuffer) -> Self {
        Self { buffer }
    }
}

impl<S: Subscriber> Layer<S> for LogCaptureLayer {
    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
        let mut visitor = MessageVisitor::new();
        event.record(&mut visitor);

        // Synchronous append + broadcast — no task spawn (SYS-0015).
        self.buffer
            .push_sync(level_str(event.metadata().level()), visitor.message);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tracing_subscriber::layer::SubscriberExt;

    fn setup_tracing(buffer: &LogBuffer) -> tracing::subscriber::DefaultGuard {
        let layer = LogCaptureLayer::new(buffer.clone());
        let subscriber = tracing_subscriber::registry().with(layer);
        tracing::subscriber::set_default(subscriber)
    }

    #[tokio::test]
    async fn test_log_buffer_captures_entries() {
        let buf = LogBuffer::new();
        let mut rx = buf.subscribe();

        let _guard = setup_tracing(&buf);

        tracing::info!("hello from test");

        // Synchronous — no sleep needed
        let entry = rx.try_recv().unwrap();
        assert_eq!(entry.level, "INFO");
        assert!(entry.message.contains("hello from test"));
    }

    #[tokio::test]
    async fn test_log_buffer_history() {
        let buf = LogBuffer::new();
        let _guard = setup_tracing(&buf);

        tracing::info!("hist-1");
        tracing::warn!("hist-2");

        let hist = buf.history_sync();
        assert_eq!(hist.len(), 2);
        assert_eq!(hist[0].level, "INFO");
        assert_eq!(hist[1].level, "WARN");
    }

    #[tokio::test]
    async fn test_log_buffer_ring_capacity() {
        let buf = LogBuffer::new();

        for i in 0..HISTORY_CAPACITY + 100 {
            buf.push_sync("INFO", format!("msg-{}", i));
        }

        let hist = buf.history_sync();
        assert_eq!(hist.len(), HISTORY_CAPACITY);
        // Oldest should be entry 100 (first 100 evicted)
        assert!(hist[0].message.contains("msg-100"));
    }

    // Verifies: SYS-0015
    #[test]
    fn test_log_capture_does_not_spawn_tasks_sync() {
        let buf = LogBuffer::new();
        // push_sync is fully synchronous — no runtime needed
        for i in 0..50usize {
            buf.push_sync("INFO", format!("event-{i}"));
        }
        let hist = buf.history_sync();
        assert_eq!(hist.len(), 50);
    }

    /// Verify that DEBUG and TRACE log levels are captured correctly.
    #[tokio::test]
    async fn test_log_buffer_captures_debug_and_trace_levels() {
        let buf = LogBuffer::new();
        let _guard = setup_tracing(&buf);

        tracing::debug!("debug-message");
        tracing::trace!("trace-message");

        let hist = buf.history_sync();
        let debug_entry = hist.iter().find(|e| e.level == "DEBUG");
        let trace_entry = hist.iter().find(|e| e.level == "TRACE");

        assert!(debug_entry.is_some(), "DEBUG entry should be captured");
        assert!(trace_entry.is_some(), "TRACE entry should be captured");
        assert!(debug_entry.unwrap().message.contains("debug-message"));
        assert!(trace_entry.unwrap().message.contains("trace-message"));
    }

    /// `Default::default()` is equivalent to `new()`.
    #[test]
    fn test_log_buffer_default() {
        let buf: LogBuffer = Default::default();
        assert!(buf.history_sync().is_empty());
    }
}