use super::event::{LogCategory, LogEvent, LogLevel};
use super::filter::LogFilter;
use crate::types::{NodeId, TraceId};
use parking_lot::RwLock;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
pub const DEFAULT_BUFFER_CAPACITY: usize = 10_000;
pub trait LogCollector: Send + Sync {
fn collect(&self, event: LogEvent);
fn len(&self) -> usize;
fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub struct BufferedCollector {
buffer: RwLock<VecDeque<LogEvent>>,
capacity: usize,
next_id: AtomicU64,
filter: Option<LogFilter>,
subscribers: RwLock<Vec<Arc<dyn Fn(&LogEvent) + Send + Sync>>>,
}
impl BufferedCollector {
pub fn new(capacity: usize) -> Self {
Self {
buffer: RwLock::new(VecDeque::with_capacity(capacity)),
capacity,
next_id: AtomicU64::new(1),
filter: None,
subscribers: RwLock::new(Vec::new()),
}
}
pub fn with_default_capacity() -> Self {
Self::new(DEFAULT_BUFFER_CAPACITY)
}
pub fn with_filter(mut self, filter: LogFilter) -> Self {
self.filter = Some(filter);
self
}
pub fn subscribe(&self, callback: Arc<dyn Fn(&LogEvent) + Send + Sync>) {
let mut subscribers = self.subscribers.write();
subscribers.push(callback);
}
pub fn query(&self, filter: &LogFilter) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|e| filter.matches(e))
.cloned()
.collect()
}
pub fn recent(&self, limit: usize) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer.iter().rev().take(limit).cloned().collect()
}
pub fn by_trace(&self, trace_id: TraceId) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|e| e.trace_id == Some(trace_id))
.cloned()
.collect()
}
pub fn by_trace_node(&self, trace_id: TraceId, node_id: NodeId) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|e| e.trace_id == Some(trace_id) && e.node_id == Some(node_id))
.cloned()
.collect()
}
pub fn by_pipeline(&self, pipeline_id: &str) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|e| e.pipeline_id.as_deref() == Some(pipeline_id))
.cloned()
.collect()
}
pub fn by_level(&self, min_level: LogLevel) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|e| e.level >= min_level)
.cloned()
.collect()
}
pub fn all(&self) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer.iter().cloned().collect()
}
pub fn clear(&self) {
let mut buffer = self.buffer.write();
buffer.clear();
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn since(&self, after_id: u64) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer.iter().filter(|e| e.id > after_id).cloned().collect()
}
pub fn time_range(&self, start_ns: u64, end_ns: u64) -> Vec<LogEvent> {
let buffer = self.buffer.read();
buffer
.iter()
.filter(|e| e.timestamp_ns >= start_ns && e.timestamp_ns <= end_ns)
.cloned()
.collect()
}
}
impl LogCollector for BufferedCollector {
fn collect(&self, mut event: LogEvent) {
if let Some(ref filter) = self.filter {
if !filter.matches(&event) {
return;
}
}
event.id = self.next_id.fetch_add(1, Ordering::SeqCst);
{
let subscribers = self.subscribers.read();
for subscriber in subscribers.iter() {
subscriber(&event);
}
}
let mut buffer = self.buffer.write();
if buffer.len() >= self.capacity {
buffer.pop_front();
}
buffer.push_back(event);
}
fn len(&self) -> usize {
self.buffer.read().len()
}
}
impl Default for BufferedCollector {
fn default() -> Self {
Self::with_default_capacity()
}
}
pub struct NullCollector;
impl LogCollector for NullCollector {
fn collect(&self, _event: LogEvent) {
}
fn len(&self) -> usize {
0
}
}
pub struct MultiCollector {
collectors: Vec<Arc<dyn LogCollector>>,
}
impl MultiCollector {
pub fn new(collectors: Vec<Arc<dyn LogCollector>>) -> Self {
Self { collectors }
}
}
impl LogCollector for MultiCollector {
fn collect(&self, event: LogEvent) {
for collector in &self.collectors {
collector.collect(event.clone());
}
}
fn len(&self) -> usize {
self.collectors.first().map(|c| c.len()).unwrap_or(0)
}
}
pub struct LogContext {
collector: Arc<dyn LogCollector>,
trace_id: Option<TraceId>,
node_id: Option<NodeId>,
pipeline_id: Option<String>,
}
impl LogContext {
pub fn new(collector: Arc<dyn LogCollector>) -> Self {
Self {
collector,
trace_id: None,
node_id: None,
pipeline_id: None,
}
}
pub fn with_trace_id(mut self, trace_id: TraceId) -> Self {
self.trace_id = Some(trace_id);
self
}
pub fn with_node_id(mut self, node_id: NodeId) -> Self {
self.node_id = Some(node_id);
self
}
pub fn with_pipeline_id(mut self, pipeline_id: impl Into<String>) -> Self {
self.pipeline_id = Some(pipeline_id.into());
self
}
pub fn for_node(&self, node_id: NodeId) -> Self {
Self {
collector: Arc::clone(&self.collector),
trace_id: self.trace_id,
node_id: Some(node_id),
pipeline_id: self.pipeline_id.clone(),
}
}
pub fn log(&self, mut event: LogEvent) {
if event.trace_id.is_none() {
event.trace_id = self.trace_id;
}
if event.node_id.is_none() {
event.node_id = self.node_id;
}
if event.pipeline_id.is_none() {
event.pipeline_id = self.pipeline_id.clone();
}
self.collector.collect(event);
}
pub fn trace(&self, category: LogCategory, message: impl Into<String>) {
self.log(LogEvent::trace(category, message));
}
pub fn debug(&self, category: LogCategory, message: impl Into<String>) {
self.log(LogEvent::debug(category, message));
}
pub fn info(&self, category: LogCategory, message: impl Into<String>) {
self.log(LogEvent::info(category, message));
}
pub fn warn(&self, category: LogCategory, message: impl Into<String>) {
self.log(LogEvent::warn(category, message));
}
pub fn error(&self, category: LogCategory, message: impl Into<String>) {
self.log(LogEvent::error(category, message));
}
}
impl Clone for LogContext {
fn clone(&self) -> Self {
Self {
collector: Arc::clone(&self.collector),
trace_id: self.trace_id,
node_id: self.node_id,
pipeline_id: self.pipeline_id.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn buffered_collector_basic() {
let collector = BufferedCollector::new(100);
collector.collect(LogEvent::info(LogCategory::System, "Test message"));
collector.collect(LogEvent::warn(LogCategory::Node, "Warning"));
assert_eq!(collector.len(), 2);
}
#[test]
fn buffered_collector_capacity() {
let collector = BufferedCollector::new(3);
collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
collector.collect(LogEvent::info(LogCategory::System, "Event 3"));
collector.collect(LogEvent::info(LogCategory::System, "Event 4"));
assert_eq!(collector.len(), 3);
let events = collector.all();
assert_eq!(events[0].message, "Event 2");
assert_eq!(events[2].message, "Event 4");
}
#[test]
fn buffered_collector_event_ids() {
let collector = BufferedCollector::new(100);
collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
let events = collector.all();
assert_eq!(events[0].id, 1);
assert_eq!(events[1].id, 2);
}
#[test]
fn buffered_collector_by_trace() {
let collector = BufferedCollector::new(100);
let trace_id = TraceId::new();
collector.collect(LogEvent::info(LogCategory::System, "Unrelated"));
collector
.collect(LogEvent::info(LogCategory::Trace, "Trace event").with_trace_id(trace_id));
collector.collect(
LogEvent::info(LogCategory::Node, "Node event")
.with_trace_id(trace_id)
.with_node_id(NodeId::new(1)),
);
let trace_events = collector.by_trace(trace_id);
assert_eq!(trace_events.len(), 2);
}
#[test]
fn buffered_collector_by_level() {
let collector = BufferedCollector::new(100);
collector.collect(LogEvent::debug(LogCategory::System, "Debug"));
collector.collect(LogEvent::info(LogCategory::System, "Info"));
collector.collect(LogEvent::warn(LogCategory::System, "Warn"));
collector.collect(LogEvent::error(LogCategory::System, "Error"));
let warnings = collector.by_level(LogLevel::Warn);
assert_eq!(warnings.len(), 2);
assert!(warnings.iter().all(|e| e.level >= LogLevel::Warn));
}
#[test]
fn buffered_collector_recent() {
let collector = BufferedCollector::new(100);
for i in 1..=10 {
collector.collect(LogEvent::info(LogCategory::System, format!("Event {}", i)));
}
let recent = collector.recent(3);
assert_eq!(recent.len(), 3);
assert_eq!(recent[0].message, "Event 10");
assert_eq!(recent[2].message, "Event 8");
}
#[test]
fn buffered_collector_since() {
let collector = BufferedCollector::new(100);
collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
collector.collect(LogEvent::info(LogCategory::System, "Event 3"));
let since = collector.since(1);
assert_eq!(since.len(), 2);
assert_eq!(since[0].id, 2);
}
#[test]
fn log_context_auto_fields() {
let collector = Arc::new(BufferedCollector::new(100));
let trace_id = TraceId::new();
let ctx = LogContext::new(collector.clone())
.with_trace_id(trace_id)
.with_pipeline_id("test_pipeline");
ctx.info(LogCategory::Trace, "Trace started");
let events = collector.all();
assert_eq!(events.len(), 1);
assert_eq!(events[0].trace_id, Some(trace_id));
assert_eq!(events[0].pipeline_id, Some("test_pipeline".to_string()));
}
#[test]
fn log_context_for_node() {
let collector = Arc::new(BufferedCollector::new(100));
let trace_id = TraceId::new();
let ctx = LogContext::new(collector.clone()).with_trace_id(trace_id);
let node_ctx = ctx.for_node(NodeId::new(42));
node_ctx.info(LogCategory::Node, "Node processing");
let events = collector.all();
assert_eq!(events[0].trace_id, Some(trace_id));
assert_eq!(events[0].node_id, Some(NodeId::new(42)));
}
#[test]
fn subscriber_notification() {
use std::sync::atomic::AtomicUsize;
let collector = BufferedCollector::new(100);
let count = Arc::new(AtomicUsize::new(0));
let count_clone = Arc::clone(&count);
collector.subscribe(Arc::new(move |_event| {
count_clone.fetch_add(1, Ordering::SeqCst);
}));
collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[test]
fn null_collector() {
let collector = NullCollector;
collector.collect(LogEvent::info(LogCategory::System, "Discarded"));
assert_eq!(collector.len(), 0);
}
}