mod collector;
mod event;
mod filter;
pub use collector::{
BufferedCollector, DEFAULT_BUFFER_CAPACITY, LogCollector, LogContext, MultiCollector,
NullCollector,
};
pub use event::{LogCategory, LogEvent, LogEventBuilder, LogLevel};
pub use filter::{LogFilter, LogFilterBuilder};
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{NodeId, TraceId};
use std::sync::Arc;
#[test]
fn integration_test_logging_workflow() {
let collector = Arc::new(BufferedCollector::with_default_capacity());
let trace_id = TraceId::new();
let pipeline_id = "order_processing";
let ctx = LogContext::new(collector.clone())
.with_trace_id(trace_id)
.with_pipeline_id(pipeline_id);
ctx.info(LogCategory::Trace, "Trace started");
for node_id in [1, 2, 3] {
let node_ctx = ctx.for_node(NodeId::new(node_id));
node_ctx.debug(LogCategory::Node, format!("Node {} started", node_id));
node_ctx.info(LogCategory::Node, format!("Node {} completed", node_id));
}
ctx.info(LogCategory::Trace, "Trace completed");
assert_eq!(collector.len(), 8);
let trace_logs = collector.by_trace(trace_id);
assert_eq!(trace_logs.len(), 8);
let debug_logs = collector.query(&LogFilter::new().level(LogLevel::Debug));
assert_eq!(debug_logs.len(), 3);
let node_logs = collector.query(&LogFilter::new().category(LogCategory::Node));
assert_eq!(node_logs.len(), 6);
let filter = LogFilter::new()
.trace_id(trace_id)
.min_level(LogLevel::Info)
.category(LogCategory::Trace);
let filtered = collector.query(&filter);
assert_eq!(filtered.len(), 2); }
#[test]
fn integration_test_subscriber() {
use std::sync::atomic::{AtomicUsize, Ordering};
let collector = BufferedCollector::with_default_capacity();
let error_count = Arc::new(AtomicUsize::new(0));
let count = Arc::clone(&error_count);
collector.subscribe(Arc::new(move |event| {
if event.level >= LogLevel::Error {
count.fetch_add(1, Ordering::SeqCst);
}
}));
collector.collect(LogEvent::info(LogCategory::System, "Info message"));
collector.collect(LogEvent::warn(LogCategory::System, "Warning message"));
collector.collect(LogEvent::error(LogCategory::System, "Error message"));
collector.collect(LogEvent::error(LogCategory::Node, "Another error"));
assert_eq!(error_count.load(Ordering::SeqCst), 2);
}
#[test]
fn integration_test_event_formatting() {
let trace_id = TraceId::new();
let event = LogEvent::warn(LogCategory::Node, "Node timeout")
.with_trace_id(trace_id)
.with_node_id(NodeId::new(42))
.with_pipeline_id("my_pipeline")
.with_field("timeout_ms", "5000")
.with_field_i64("retry_count", 3);
let line = event.format_line();
assert!(line.contains("[WARN]"));
assert!(line.contains("[node]"));
assert!(line.contains(&format!("trace={}", trace_id)));
assert!(line.contains("node=42"));
assert!(line.contains("pipeline=my_pipeline"));
assert!(line.contains("Node timeout"));
assert!(line.contains("timeout_ms"));
assert!(line.contains("retry_count"));
}
#[test]
fn integration_test_filter_serialization() {
let trace_id = TraceId::new();
let filter = LogFilter::new()
.min_level(LogLevel::Warn)
.trace_id(trace_id)
.category(LogCategory::Node)
.limit(100);
let json = serde_json::to_string(&filter).unwrap();
let parsed: LogFilter = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.min_level, Some(LogLevel::Warn));
assert_eq!(parsed.trace_id, Some(trace_id));
assert_eq!(parsed.categories, vec![LogCategory::Node]);
assert_eq!(parsed.limit, Some(100));
}
}