xerv_core/logging/
mod.rs

1//! Structured logging for trace execution.
2//!
3//! This module provides a comprehensive logging system for XERV with:
4//!
5//! - **Correlation IDs**: Every log event can be associated with trace_id, node_id, and pipeline_id
6//! - **Structured Events**: Events contain typed fields for filtering and aggregation
7//! - **Buffered Collection**: Thread-safe ring buffer for in-memory log storage
8//! - **Flexible Filtering**: Query logs by level, category, trace, node, pipeline, time range, and message content
9//! - **Real-time Subscribers**: Register callbacks for immediate event notifications
10//!
11//! # Architecture
12//!
13//! ```text
14//! ┌─────────────┐     ┌──────────────┐     ┌───────────────┐
15//! │ LogEvent    │────>│ LogCollector │────>│ Subscribers   │
16//! │ (with IDs)  │     │ (buffer)     │     │ (callbacks)   │
17//! └─────────────┘     └──────────────┘     └───────────────┘
18//!                            │
19//!                            v
20//!                     ┌──────────────┐
21//!                     │ LogFilter    │
22//!                     │ (query)      │
23//!                     └──────────────┘
24//! ```
25//!
26//! # Example
27//!
28//! ```ignore
29//! use xerv_core::logging::{LogEvent, LogCategory, LogLevel, BufferedCollector, LogContext};
30//! use std::sync::Arc;
31//!
32//! // Create a collector
33//! let collector = Arc::new(BufferedCollector::with_default_capacity());
34//!
35//! // Create a context for a specific trace
36//! let ctx = LogContext::new(collector.clone())
37//!     .with_trace_id(trace_id)
38//!     .with_pipeline_id("order_pipeline");
39//!
40//! // Log events with automatic correlation
41//! ctx.info(LogCategory::Trace, "Trace started");
42//!
43//! // Create a node-specific context
44//! let node_ctx = ctx.for_node(NodeId::new(1));
45//! node_ctx.debug(LogCategory::Node, "Processing input");
46//!
47//! // Query logs
48//! let errors = collector.by_level(LogLevel::Error);
49//! let trace_logs = collector.by_trace(trace_id);
50//! ```
51
52mod collector;
53mod event;
54mod filter;
55
56pub use collector::{
57    BufferedCollector, DEFAULT_BUFFER_CAPACITY, LogCollector, LogContext, MultiCollector,
58    NullCollector,
59};
60pub use event::{LogCategory, LogEvent, LogEventBuilder, LogLevel};
61pub use filter::{LogFilter, LogFilterBuilder};
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66    use crate::types::{NodeId, TraceId};
67    use std::sync::Arc;
68
69    #[test]
70    fn integration_test_logging_workflow() {
71        // Create a collector
72        let collector = Arc::new(BufferedCollector::with_default_capacity());
73
74        // Simulate a trace execution
75        let trace_id = TraceId::new();
76        let pipeline_id = "order_processing";
77
78        // Create trace context
79        let ctx = LogContext::new(collector.clone())
80            .with_trace_id(trace_id)
81            .with_pipeline_id(pipeline_id);
82
83        // Log trace start
84        ctx.info(LogCategory::Trace, "Trace started");
85
86        // Simulate node execution
87        for node_id in [1, 2, 3] {
88            let node_ctx = ctx.for_node(NodeId::new(node_id));
89            node_ctx.debug(LogCategory::Node, format!("Node {} started", node_id));
90            node_ctx.info(LogCategory::Node, format!("Node {} completed", node_id));
91        }
92
93        // Log trace completion
94        ctx.info(LogCategory::Trace, "Trace completed");
95
96        // Verify collection
97        assert_eq!(collector.len(), 8); // 1 start + 3*2 nodes + 1 complete
98
99        // Query by trace
100        let trace_logs = collector.by_trace(trace_id);
101        assert_eq!(trace_logs.len(), 8);
102
103        // Query by level
104        let debug_logs = collector.query(&LogFilter::new().level(LogLevel::Debug));
105        assert_eq!(debug_logs.len(), 3); // 3 node starts
106
107        // Query by category
108        let node_logs = collector.query(&LogFilter::new().category(LogCategory::Node));
109        assert_eq!(node_logs.len(), 6); // 3 starts + 3 completes
110
111        // Query with multiple filters
112        let filter = LogFilter::new()
113            .trace_id(trace_id)
114            .min_level(LogLevel::Info)
115            .category(LogCategory::Trace);
116        let filtered = collector.query(&filter);
117        assert_eq!(filtered.len(), 2); // start + complete
118    }
119
120    #[test]
121    fn integration_test_subscriber() {
122        use std::sync::atomic::{AtomicUsize, Ordering};
123
124        let collector = BufferedCollector::with_default_capacity();
125        let error_count = Arc::new(AtomicUsize::new(0));
126
127        // Subscribe to errors
128        let count = Arc::clone(&error_count);
129        collector.subscribe(Arc::new(move |event| {
130            if event.level >= LogLevel::Error {
131                count.fetch_add(1, Ordering::SeqCst);
132            }
133        }));
134
135        // Log some events
136        collector.collect(LogEvent::info(LogCategory::System, "Info message"));
137        collector.collect(LogEvent::warn(LogCategory::System, "Warning message"));
138        collector.collect(LogEvent::error(LogCategory::System, "Error message"));
139        collector.collect(LogEvent::error(LogCategory::Node, "Another error"));
140
141        // Verify subscriber was called
142        assert_eq!(error_count.load(Ordering::SeqCst), 2);
143    }
144
145    #[test]
146    fn integration_test_event_formatting() {
147        let trace_id = TraceId::new();
148        let event = LogEvent::warn(LogCategory::Node, "Node timeout")
149            .with_trace_id(trace_id)
150            .with_node_id(NodeId::new(42))
151            .with_pipeline_id("my_pipeline")
152            .with_field("timeout_ms", "5000")
153            .with_field_i64("retry_count", 3);
154
155        let line = event.format_line();
156
157        // Verify all parts are present
158        assert!(line.contains("[WARN]"));
159        assert!(line.contains("[node]"));
160        assert!(line.contains(&format!("trace={}", trace_id)));
161        assert!(line.contains("node=42"));
162        assert!(line.contains("pipeline=my_pipeline"));
163        assert!(line.contains("Node timeout"));
164        assert!(line.contains("timeout_ms"));
165        assert!(line.contains("retry_count"));
166    }
167
168    #[test]
169    fn integration_test_filter_serialization() {
170        let trace_id = TraceId::new();
171        let filter = LogFilter::new()
172            .min_level(LogLevel::Warn)
173            .trace_id(trace_id)
174            .category(LogCategory::Node)
175            .limit(100);
176
177        // Serialize to JSON
178        let json = serde_json::to_string(&filter).unwrap();
179
180        // Deserialize back
181        let parsed: LogFilter = serde_json::from_str(&json).unwrap();
182
183        assert_eq!(parsed.min_level, Some(LogLevel::Warn));
184        assert_eq!(parsed.trace_id, Some(trace_id));
185        assert_eq!(parsed.categories, vec![LogCategory::Node]);
186        assert_eq!(parsed.limit, Some(100));
187    }
188}