1use super::event::{LogCategory, LogEvent, LogLevel};
7use super::filter::LogFilter;
8use crate::types::{NodeId, TraceId};
9use parking_lot::RwLock;
10use std::collections::VecDeque;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, Ordering};
13
14pub const DEFAULT_BUFFER_CAPACITY: usize = 10_000;
16
17pub trait LogCollector: Send + Sync {
19 fn collect(&self, event: LogEvent);
21
22 fn len(&self) -> usize;
24
25 fn is_empty(&self) -> bool {
27 self.len() == 0
28 }
29}
30
31pub struct BufferedCollector {
33 buffer: RwLock<VecDeque<LogEvent>>,
35 capacity: usize,
37 next_id: AtomicU64,
39 filter: Option<LogFilter>,
41 subscribers: RwLock<Vec<Arc<dyn Fn(&LogEvent) + Send + Sync>>>,
43}
44
45impl BufferedCollector {
46 pub fn new(capacity: usize) -> Self {
48 Self {
49 buffer: RwLock::new(VecDeque::with_capacity(capacity)),
50 capacity,
51 next_id: AtomicU64::new(1),
52 filter: None,
53 subscribers: RwLock::new(Vec::new()),
54 }
55 }
56
57 pub fn with_default_capacity() -> Self {
59 Self::new(DEFAULT_BUFFER_CAPACITY)
60 }
61
62 pub fn with_filter(mut self, filter: LogFilter) -> Self {
64 self.filter = Some(filter);
65 self
66 }
67
68 pub fn subscribe(&self, callback: Arc<dyn Fn(&LogEvent) + Send + Sync>) {
70 let mut subscribers = self.subscribers.write();
71 subscribers.push(callback);
72 }
73
74 pub fn query(&self, filter: &LogFilter) -> Vec<LogEvent> {
76 let buffer = self.buffer.read();
77 buffer
78 .iter()
79 .filter(|e| filter.matches(e))
80 .cloned()
81 .collect()
82 }
83
84 pub fn recent(&self, limit: usize) -> Vec<LogEvent> {
86 let buffer = self.buffer.read();
87 buffer.iter().rev().take(limit).cloned().collect()
88 }
89
90 pub fn by_trace(&self, trace_id: TraceId) -> Vec<LogEvent> {
92 let buffer = self.buffer.read();
93 buffer
94 .iter()
95 .filter(|e| e.trace_id == Some(trace_id))
96 .cloned()
97 .collect()
98 }
99
100 pub fn by_trace_node(&self, trace_id: TraceId, node_id: NodeId) -> Vec<LogEvent> {
102 let buffer = self.buffer.read();
103 buffer
104 .iter()
105 .filter(|e| e.trace_id == Some(trace_id) && e.node_id == Some(node_id))
106 .cloned()
107 .collect()
108 }
109
110 pub fn by_pipeline(&self, pipeline_id: &str) -> Vec<LogEvent> {
112 let buffer = self.buffer.read();
113 buffer
114 .iter()
115 .filter(|e| e.pipeline_id.as_deref() == Some(pipeline_id))
116 .cloned()
117 .collect()
118 }
119
120 pub fn by_level(&self, min_level: LogLevel) -> Vec<LogEvent> {
122 let buffer = self.buffer.read();
123 buffer
124 .iter()
125 .filter(|e| e.level >= min_level)
126 .cloned()
127 .collect()
128 }
129
130 pub fn all(&self) -> Vec<LogEvent> {
132 let buffer = self.buffer.read();
133 buffer.iter().cloned().collect()
134 }
135
136 pub fn clear(&self) {
138 let mut buffer = self.buffer.write();
139 buffer.clear();
140 }
141
142 pub fn capacity(&self) -> usize {
144 self.capacity
145 }
146
147 pub fn since(&self, after_id: u64) -> Vec<LogEvent> {
149 let buffer = self.buffer.read();
150 buffer.iter().filter(|e| e.id > after_id).cloned().collect()
151 }
152
153 pub fn time_range(&self, start_ns: u64, end_ns: u64) -> Vec<LogEvent> {
155 let buffer = self.buffer.read();
156 buffer
157 .iter()
158 .filter(|e| e.timestamp_ns >= start_ns && e.timestamp_ns <= end_ns)
159 .cloned()
160 .collect()
161 }
162}
163
164impl LogCollector for BufferedCollector {
165 fn collect(&self, mut event: LogEvent) {
166 if let Some(ref filter) = self.filter {
168 if !filter.matches(&event) {
169 return;
170 }
171 }
172
173 event.id = self.next_id.fetch_add(1, Ordering::SeqCst);
175
176 {
178 let subscribers = self.subscribers.read();
179 for subscriber in subscribers.iter() {
180 subscriber(&event);
181 }
182 }
183
184 let mut buffer = self.buffer.write();
186 if buffer.len() >= self.capacity {
187 buffer.pop_front();
188 }
189 buffer.push_back(event);
190 }
191
192 fn len(&self) -> usize {
193 self.buffer.read().len()
194 }
195}
196
197impl Default for BufferedCollector {
198 fn default() -> Self {
199 Self::with_default_capacity()
200 }
201}
202
203pub struct NullCollector;
205
206impl LogCollector for NullCollector {
207 fn collect(&self, _event: LogEvent) {
208 }
210
211 fn len(&self) -> usize {
212 0
213 }
214}
215
216pub struct MultiCollector {
218 collectors: Vec<Arc<dyn LogCollector>>,
219}
220
221impl MultiCollector {
222 pub fn new(collectors: Vec<Arc<dyn LogCollector>>) -> Self {
224 Self { collectors }
225 }
226}
227
228impl LogCollector for MultiCollector {
229 fn collect(&self, event: LogEvent) {
230 for collector in &self.collectors {
231 collector.collect(event.clone());
232 }
233 }
234
235 fn len(&self) -> usize {
236 self.collectors.first().map(|c| c.len()).unwrap_or(0)
237 }
238}
239
240pub struct LogContext {
242 collector: Arc<dyn LogCollector>,
243 trace_id: Option<TraceId>,
244 node_id: Option<NodeId>,
245 pipeline_id: Option<String>,
246}
247
248impl LogContext {
249 pub fn new(collector: Arc<dyn LogCollector>) -> Self {
251 Self {
252 collector,
253 trace_id: None,
254 node_id: None,
255 pipeline_id: None,
256 }
257 }
258
259 pub fn with_trace_id(mut self, trace_id: TraceId) -> Self {
261 self.trace_id = Some(trace_id);
262 self
263 }
264
265 pub fn with_node_id(mut self, node_id: NodeId) -> Self {
267 self.node_id = Some(node_id);
268 self
269 }
270
271 pub fn with_pipeline_id(mut self, pipeline_id: impl Into<String>) -> Self {
273 self.pipeline_id = Some(pipeline_id.into());
274 self
275 }
276
277 pub fn for_node(&self, node_id: NodeId) -> Self {
279 Self {
280 collector: Arc::clone(&self.collector),
281 trace_id: self.trace_id,
282 node_id: Some(node_id),
283 pipeline_id: self.pipeline_id.clone(),
284 }
285 }
286
287 pub fn log(&self, mut event: LogEvent) {
289 if event.trace_id.is_none() {
290 event.trace_id = self.trace_id;
291 }
292 if event.node_id.is_none() {
293 event.node_id = self.node_id;
294 }
295 if event.pipeline_id.is_none() {
296 event.pipeline_id = self.pipeline_id.clone();
297 }
298 self.collector.collect(event);
299 }
300
301 pub fn trace(&self, category: LogCategory, message: impl Into<String>) {
303 self.log(LogEvent::trace(category, message));
304 }
305
306 pub fn debug(&self, category: LogCategory, message: impl Into<String>) {
308 self.log(LogEvent::debug(category, message));
309 }
310
311 pub fn info(&self, category: LogCategory, message: impl Into<String>) {
313 self.log(LogEvent::info(category, message));
314 }
315
316 pub fn warn(&self, category: LogCategory, message: impl Into<String>) {
318 self.log(LogEvent::warn(category, message));
319 }
320
321 pub fn error(&self, category: LogCategory, message: impl Into<String>) {
323 self.log(LogEvent::error(category, message));
324 }
325}
326
327impl Clone for LogContext {
328 fn clone(&self) -> Self {
329 Self {
330 collector: Arc::clone(&self.collector),
331 trace_id: self.trace_id,
332 node_id: self.node_id,
333 pipeline_id: self.pipeline_id.clone(),
334 }
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341
342 #[test]
343 fn buffered_collector_basic() {
344 let collector = BufferedCollector::new(100);
345
346 collector.collect(LogEvent::info(LogCategory::System, "Test message"));
347 collector.collect(LogEvent::warn(LogCategory::Node, "Warning"));
348
349 assert_eq!(collector.len(), 2);
350 }
351
352 #[test]
353 fn buffered_collector_capacity() {
354 let collector = BufferedCollector::new(3);
355
356 collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
357 collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
358 collector.collect(LogEvent::info(LogCategory::System, "Event 3"));
359 collector.collect(LogEvent::info(LogCategory::System, "Event 4"));
360
361 assert_eq!(collector.len(), 3);
362
363 let events = collector.all();
364 assert_eq!(events[0].message, "Event 2");
365 assert_eq!(events[2].message, "Event 4");
366 }
367
368 #[test]
369 fn buffered_collector_event_ids() {
370 let collector = BufferedCollector::new(100);
371
372 collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
373 collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
374
375 let events = collector.all();
376 assert_eq!(events[0].id, 1);
377 assert_eq!(events[1].id, 2);
378 }
379
380 #[test]
381 fn buffered_collector_by_trace() {
382 let collector = BufferedCollector::new(100);
383 let trace_id = TraceId::new();
384
385 collector.collect(LogEvent::info(LogCategory::System, "Unrelated"));
386 collector
387 .collect(LogEvent::info(LogCategory::Trace, "Trace event").with_trace_id(trace_id));
388 collector.collect(
389 LogEvent::info(LogCategory::Node, "Node event")
390 .with_trace_id(trace_id)
391 .with_node_id(NodeId::new(1)),
392 );
393
394 let trace_events = collector.by_trace(trace_id);
395 assert_eq!(trace_events.len(), 2);
396 }
397
398 #[test]
399 fn buffered_collector_by_level() {
400 let collector = BufferedCollector::new(100);
401
402 collector.collect(LogEvent::debug(LogCategory::System, "Debug"));
403 collector.collect(LogEvent::info(LogCategory::System, "Info"));
404 collector.collect(LogEvent::warn(LogCategory::System, "Warn"));
405 collector.collect(LogEvent::error(LogCategory::System, "Error"));
406
407 let warnings = collector.by_level(LogLevel::Warn);
408 assert_eq!(warnings.len(), 2);
409 assert!(warnings.iter().all(|e| e.level >= LogLevel::Warn));
410 }
411
412 #[test]
413 fn buffered_collector_recent() {
414 let collector = BufferedCollector::new(100);
415
416 for i in 1..=10 {
417 collector.collect(LogEvent::info(LogCategory::System, format!("Event {}", i)));
418 }
419
420 let recent = collector.recent(3);
421 assert_eq!(recent.len(), 3);
422 assert_eq!(recent[0].message, "Event 10");
423 assert_eq!(recent[2].message, "Event 8");
424 }
425
426 #[test]
427 fn buffered_collector_since() {
428 let collector = BufferedCollector::new(100);
429
430 collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
431 collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
432 collector.collect(LogEvent::info(LogCategory::System, "Event 3"));
433
434 let since = collector.since(1);
435 assert_eq!(since.len(), 2);
436 assert_eq!(since[0].id, 2);
437 }
438
439 #[test]
440 fn log_context_auto_fields() {
441 let collector = Arc::new(BufferedCollector::new(100));
442 let trace_id = TraceId::new();
443
444 let ctx = LogContext::new(collector.clone())
445 .with_trace_id(trace_id)
446 .with_pipeline_id("test_pipeline");
447
448 ctx.info(LogCategory::Trace, "Trace started");
449
450 let events = collector.all();
451 assert_eq!(events.len(), 1);
452 assert_eq!(events[0].trace_id, Some(trace_id));
453 assert_eq!(events[0].pipeline_id, Some("test_pipeline".to_string()));
454 }
455
456 #[test]
457 fn log_context_for_node() {
458 let collector = Arc::new(BufferedCollector::new(100));
459 let trace_id = TraceId::new();
460
461 let ctx = LogContext::new(collector.clone()).with_trace_id(trace_id);
462 let node_ctx = ctx.for_node(NodeId::new(42));
463
464 node_ctx.info(LogCategory::Node, "Node processing");
465
466 let events = collector.all();
467 assert_eq!(events[0].trace_id, Some(trace_id));
468 assert_eq!(events[0].node_id, Some(NodeId::new(42)));
469 }
470
471 #[test]
472 fn subscriber_notification() {
473 use std::sync::atomic::AtomicUsize;
474
475 let collector = BufferedCollector::new(100);
476 let count = Arc::new(AtomicUsize::new(0));
477 let count_clone = Arc::clone(&count);
478
479 collector.subscribe(Arc::new(move |_event| {
480 count_clone.fetch_add(1, Ordering::SeqCst);
481 }));
482
483 collector.collect(LogEvent::info(LogCategory::System, "Event 1"));
484 collector.collect(LogEvent::info(LogCategory::System, "Event 2"));
485
486 assert_eq!(count.load(Ordering::SeqCst), 2);
487 }
488
489 #[test]
490 fn null_collector() {
491 let collector = NullCollector;
492
493 collector.collect(LogEvent::info(LogCategory::System, "Discarded"));
494
495 assert_eq!(collector.len(), 0);
496 }
497}