1mod collector;
53mod event;
54mod filter;
55
56pub use collector::{
57 BufferedCollector, DEFAULT_BUFFER_CAPACITY, LogCollector, LogContext, MultiCollector,
58 NullCollector,
59};
60pub use event::{LogCategory, LogEvent, LogEventBuilder, LogLevel};
61pub use filter::{LogFilter, LogFilterBuilder};
62
63#[cfg(test)]
64mod tests {
65 use super::*;
66 use crate::types::{NodeId, TraceId};
67 use std::sync::Arc;
68
69 #[test]
70 fn integration_test_logging_workflow() {
71 let collector = Arc::new(BufferedCollector::with_default_capacity());
73
74 let trace_id = TraceId::new();
76 let pipeline_id = "order_processing";
77
78 let ctx = LogContext::new(collector.clone())
80 .with_trace_id(trace_id)
81 .with_pipeline_id(pipeline_id);
82
83 ctx.info(LogCategory::Trace, "Trace started");
85
86 for node_id in [1, 2, 3] {
88 let node_ctx = ctx.for_node(NodeId::new(node_id));
89 node_ctx.debug(LogCategory::Node, format!("Node {} started", node_id));
90 node_ctx.info(LogCategory::Node, format!("Node {} completed", node_id));
91 }
92
93 ctx.info(LogCategory::Trace, "Trace completed");
95
96 assert_eq!(collector.len(), 8); let trace_logs = collector.by_trace(trace_id);
101 assert_eq!(trace_logs.len(), 8);
102
103 let debug_logs = collector.query(&LogFilter::new().level(LogLevel::Debug));
105 assert_eq!(debug_logs.len(), 3); let node_logs = collector.query(&LogFilter::new().category(LogCategory::Node));
109 assert_eq!(node_logs.len(), 6); let filter = LogFilter::new()
113 .trace_id(trace_id)
114 .min_level(LogLevel::Info)
115 .category(LogCategory::Trace);
116 let filtered = collector.query(&filter);
117 assert_eq!(filtered.len(), 2); }
119
120 #[test]
121 fn integration_test_subscriber() {
122 use std::sync::atomic::{AtomicUsize, Ordering};
123
124 let collector = BufferedCollector::with_default_capacity();
125 let error_count = Arc::new(AtomicUsize::new(0));
126
127 let count = Arc::clone(&error_count);
129 collector.subscribe(Arc::new(move |event| {
130 if event.level >= LogLevel::Error {
131 count.fetch_add(1, Ordering::SeqCst);
132 }
133 }));
134
135 collector.collect(LogEvent::info(LogCategory::System, "Info message"));
137 collector.collect(LogEvent::warn(LogCategory::System, "Warning message"));
138 collector.collect(LogEvent::error(LogCategory::System, "Error message"));
139 collector.collect(LogEvent::error(LogCategory::Node, "Another error"));
140
141 assert_eq!(error_count.load(Ordering::SeqCst), 2);
143 }
144
145 #[test]
146 fn integration_test_event_formatting() {
147 let trace_id = TraceId::new();
148 let event = LogEvent::warn(LogCategory::Node, "Node timeout")
149 .with_trace_id(trace_id)
150 .with_node_id(NodeId::new(42))
151 .with_pipeline_id("my_pipeline")
152 .with_field("timeout_ms", "5000")
153 .with_field_i64("retry_count", 3);
154
155 let line = event.format_line();
156
157 assert!(line.contains("[WARN]"));
159 assert!(line.contains("[node]"));
160 assert!(line.contains(&format!("trace={}", trace_id)));
161 assert!(line.contains("node=42"));
162 assert!(line.contains("pipeline=my_pipeline"));
163 assert!(line.contains("Node timeout"));
164 assert!(line.contains("timeout_ms"));
165 assert!(line.contains("retry_count"));
166 }
167
168 #[test]
169 fn integration_test_filter_serialization() {
170 let trace_id = TraceId::new();
171 let filter = LogFilter::new()
172 .min_level(LogLevel::Warn)
173 .trace_id(trace_id)
174 .category(LogCategory::Node)
175 .limit(100);
176
177 let json = serde_json::to_string(&filter).unwrap();
179
180 let parsed: LogFilter = serde_json::from_str(&json).unwrap();
182
183 assert_eq!(parsed.min_level, Some(LogLevel::Warn));
184 assert_eq!(parsed.trace_id, Some(trace_id));
185 assert_eq!(parsed.categories, vec![LogCategory::Node]);
186 assert_eq!(parsed.limit, Some(100));
187 }
188}