eventuali_core/observability/
logging.rs

1//! Structured logging module with correlation ID support
2//!
3//! Provides comprehensive logging capabilities with structured output and correlation tracking.
4
5use crate::error::{EventualiError, Result};
6use crate::observability::{
7    correlation::{CorrelationId, CorrelationContext},
8    telemetry::TraceContext,
9    ObservabilityConfig,
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tracing::{Event, Subscriber};
16use tracing_subscriber::{
17    fmt::format::FmtSpan,
18    layer::{Context, SubscriberExt},
19    registry::LookupSpan,
20    Layer, Registry,
21};
22
23/// Log levels for structured logging
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum LogLevel {
26    Error,
27    Warn,
28    Info,
29    Debug,
30    Trace,
31}
32
33impl From<LogLevel> for tracing::Level {
34    fn from(level: LogLevel) -> Self {
35        match level {
36            LogLevel::Error => tracing::Level::ERROR,
37            LogLevel::Warn => tracing::Level::WARN,
38            LogLevel::Info => tracing::Level::INFO,
39            LogLevel::Debug => tracing::Level::DEBUG,
40            LogLevel::Trace => tracing::Level::TRACE,
41        }
42    }
43}
44
45impl From<&tracing::Level> for LogLevel {
46    fn from(level: &tracing::Level) -> Self {
47        match *level {
48            tracing::Level::ERROR => LogLevel::Error,
49            tracing::Level::WARN => LogLevel::Warn,
50            tracing::Level::INFO => LogLevel::Info,
51            tracing::Level::DEBUG => LogLevel::Debug,
52            tracing::Level::TRACE => LogLevel::Trace,
53        }
54    }
55}
56
57/// Context for structured logging
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct LogContext {
60    pub correlation_id: Option<CorrelationId>,
61    pub trace_id: Option<String>,
62    pub span_id: Option<String>,
63    pub user_id: Option<String>,
64    pub session_id: Option<String>,
65    pub request_id: Option<String>,
66    pub service_name: String,
67    pub operation: Option<String>,
68    pub attributes: HashMap<String, String>,
69}
70
71impl LogContext {
72    /// Create a new log context
73    pub fn new(service_name: impl Into<String>) -> Self {
74        Self {
75            correlation_id: None,
76            trace_id: None,
77            span_id: None,
78            user_id: None,
79            session_id: None,
80            request_id: None,
81            service_name: service_name.into(),
82            operation: None,
83            attributes: HashMap::new(),
84        }
85    }
86
87    /// Create log context from correlation context
88    pub fn from_correlation_context(context: &CorrelationContext) -> Self {
89        Self {
90            correlation_id: Some(context.correlation_id.clone()),
91            trace_id: context.trace_id.clone(),
92            span_id: context.span_id.clone(),
93            user_id: context.user_id.clone(),
94            session_id: context.session_id.clone(),
95            request_id: context.request_id.clone(),
96            service_name: context.service.clone(),
97            operation: Some(context.operation.clone()),
98            attributes: context.attributes.clone(),
99        }
100    }
101
102    /// Create log context from trace context
103    pub fn from_trace_context(trace_context: &TraceContext, service_name: impl Into<String>) -> Self {
104        Self {
105            correlation_id: Some(trace_context.correlation_id.clone()),
106            trace_id: None, // Could extract from span
107            span_id: None,  // Could extract from span
108            user_id: None,
109            session_id: None,
110            request_id: None,
111            service_name: service_name.into(),
112            operation: Some(trace_context.operation.clone()),
113            attributes: trace_context.attributes.clone(),
114        }
115    }
116
117    /// Add an attribute to the context
118    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
119        self.attributes.insert(key.into(), value.into());
120        self
121    }
122
123    /// Set user ID
124    pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
125        self.user_id = Some(user_id.into());
126        self
127    }
128
129    /// Set session ID
130    pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
131        self.session_id = Some(session_id.into());
132        self
133    }
134
135    /// Set operation
136    pub fn with_operation(mut self, operation: impl Into<String>) -> Self {
137        self.operation = Some(operation.into());
138        self
139    }
140}
141
142/// Structured log entry
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct LogEntry {
145    pub timestamp: chrono::DateTime<chrono::Utc>,
146    pub level: LogLevel,
147    pub message: String,
148    pub context: LogContext,
149    pub module: Option<String>,
150    pub file: Option<String>,
151    pub line: Option<u32>,
152    pub target: Option<String>,
153    pub fields: HashMap<String, serde_json::Value>,
154}
155
156impl LogEntry {
157    /// Create a new log entry
158    pub fn new(level: LogLevel, message: impl Into<String>, context: LogContext) -> Self {
159        Self {
160            timestamp: chrono::Utc::now(),
161            level,
162            message: message.into(),
163            context,
164            module: None,
165            file: None,
166            line: None,
167            target: None,
168            fields: HashMap::new(),
169        }
170    }
171
172    /// Add metadata to the log entry
173    pub fn with_metadata(
174        mut self,
175        module: Option<&str>,
176        file: Option<&str>,
177        line: Option<u32>,
178        target: Option<&str>,
179    ) -> Self {
180        self.module = module.map(|s| s.to_string());
181        self.file = file.map(|s| s.to_string());
182        self.line = line;
183        self.target = target.map(|s| s.to_string());
184        self
185    }
186
187    /// Add a field to the log entry
188    pub fn with_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
189        self.fields.insert(key.into(), value);
190        self
191    }
192
193    /// Convert to JSON string
194    pub fn to_json(&self) -> Result<String> {
195        serde_json::to_string(self)
196            .map_err(|e| EventualiError::ObservabilityError(format!("Failed to serialize log entry: {e}")))
197    }
198
199    /// Convert to pretty JSON string
200    pub fn to_json_pretty(&self) -> Result<String> {
201        serde_json::to_string_pretty(self)
202            .map_err(|e| EventualiError::ObservabilityError(format!("Failed to serialize log entry: {e}")))
203    }
204}
205
206/// Main structured logger
207#[derive(Debug)]
208pub struct StructuredLogger {
209    config: ObservabilityConfig,
210    entries: Arc<RwLock<Vec<LogEntry>>>,
211    #[allow(dead_code)] // Correlation logger for request tracing (initialized but not currently used in main logger)
212    correlation_logger: CorrelationLogger,
213}
214
215impl StructuredLogger {
216    /// Create a new structured logger
217    pub fn new(config: &ObservabilityConfig) -> Result<Self> {
218        Ok(Self {
219            config: config.clone(),
220            entries: Arc::new(RwLock::new(Vec::new())),
221            correlation_logger: CorrelationLogger::new(config.service_name.clone()),
222        })
223    }
224
225    /// Initialize the structured logger
226    pub async fn initialize(&self) -> Result<()> {
227        // Set up tracing subscriber if structured logging is enabled
228        if self.config.structured_logging {
229            let subscriber = Registry::default()
230                .with(
231                    tracing_subscriber::fmt::layer()
232                        .with_target(true)
233                        .with_thread_ids(true)
234                        .with_thread_names(true)
235                        .with_span_events(FmtSpan::CLOSE)
236                )
237                .with(ObservabilityLayer::new(self.entries.clone()));
238
239            tracing::subscriber::set_global_default(subscriber)
240                .map_err(|e| EventualiError::ObservabilityError(format!("Failed to set tracing subscriber: {e}")))?;
241        }
242
243        tracing::info!(
244            structured_logging = self.config.structured_logging,
245            service_name = %self.config.service_name,
246            "Structured logger initialized"
247        );
248
249        Ok(())
250    }
251
252    /// Log a message with full context
253    pub fn log_with_context(&self, level: LogLevel, message: &str, trace_context: &TraceContext) {
254        let context = LogContext::from_trace_context(trace_context, &self.config.service_name);
255        let entry = LogEntry::new(level, message, context);
256        
257        // Store the entry
258        if let Ok(mut entries) = self.entries.try_write() {
259            entries.push(entry.clone());
260        }
261
262        // Also log through tracing
263        match level {
264            LogLevel::Error => tracing::error!(
265                correlation_id = %trace_context.correlation_id,
266                operation = %trace_context.operation,
267                message = message
268            ),
269            LogLevel::Warn => tracing::warn!(
270                correlation_id = %trace_context.correlation_id,
271                operation = %trace_context.operation,
272                message = message
273            ),
274            LogLevel::Info => tracing::info!(
275                correlation_id = %trace_context.correlation_id,
276                operation = %trace_context.operation,
277                message = message
278            ),
279            LogLevel::Debug => tracing::debug!(
280                correlation_id = %trace_context.correlation_id,
281                operation = %trace_context.operation,
282                message = message
283            ),
284            LogLevel::Trace => tracing::trace!(
285                correlation_id = %trace_context.correlation_id,
286                operation = %trace_context.operation,
287                message = message
288            ),
289        }
290    }
291
292    /// Log with correlation context
293    pub fn log_with_correlation(&self, level: LogLevel, message: &str, context: &CorrelationContext) {
294        let log_context = LogContext::from_correlation_context(context);
295        let entry = LogEntry::new(level, message, log_context);
296        
297        if let Ok(mut entries) = self.entries.try_write() {
298            entries.push(entry);
299        }
300
301        self.correlation_logger.log(level, message, Some(context));
302    }
303
304    /// Log a simple message
305    pub fn log(&self, level: LogLevel, message: &str) {
306        let context = LogContext::new(&self.config.service_name);
307        let entry = LogEntry::new(level, message, context);
308        
309        if let Ok(mut entries) = self.entries.try_write() {
310            entries.push(entry);
311        }
312
313        self.correlation_logger.log(level, message, None);
314    }
315
316    /// Get recent log entries
317    pub async fn get_recent_entries(&self, limit: usize) -> Vec<LogEntry> {
318        let entries = self.entries.read().await;
319        let start = if entries.len() > limit { entries.len() - limit } else { 0 };
320        entries[start..].to_vec()
321    }
322
323    /// Clear stored log entries
324    pub async fn clear_entries(&self) {
325        self.entries.write().await.clear();
326    }
327
328    /// Get all log entries
329    pub async fn get_all_entries(&self) -> Vec<LogEntry> {
330        self.entries.read().await.clone()
331    }
332
333    /// Export logs to JSON file
334    pub async fn export_logs(&self, file_path: &str) -> Result<()> {
335        let entries = self.get_all_entries().await;
336        let json = serde_json::to_string_pretty(&entries)
337            .map_err(|e| EventualiError::ObservabilityError(format!("Failed to serialize logs: {e}")))?;
338
339        tokio::fs::write(file_path, json).await
340            .map_err(|e| EventualiError::ObservabilityError(format!("Failed to write log file: {e}")))?;
341
342        Ok(())
343    }
344
345    /// Shutdown the logger
346    pub async fn shutdown(&self) -> Result<()> {
347        tracing::info!("Structured logger shut down successfully");
348        Ok(())
349    }
350}
351
352/// Logger specifically for correlation tracking
353#[derive(Debug)]
354pub struct CorrelationLogger {
355    service_name: String,
356}
357
358impl CorrelationLogger {
359    pub fn new(service_name: String) -> Self {
360        Self { service_name }
361    }
362
363    /// Log with optional correlation context
364    pub fn log(&self, level: LogLevel, message: &str, context: Option<&CorrelationContext>) {
365        if let Some(ctx) = context {
366            match level {
367                LogLevel::Error => tracing::error!(
368                    correlation_id = %ctx.correlation_id,
369                    operation = %ctx.operation,
370                    service = %ctx.service,
371                    user_id = ?ctx.user_id,
372                    session_id = ?ctx.session_id,
373                    message = message
374                ),
375                LogLevel::Warn => tracing::warn!(
376                    correlation_id = %ctx.correlation_id,
377                    operation = %ctx.operation,
378                    service = %ctx.service,
379                    user_id = ?ctx.user_id,
380                    session_id = ?ctx.session_id,
381                    message = message
382                ),
383                LogLevel::Info => tracing::info!(
384                    correlation_id = %ctx.correlation_id,
385                    operation = %ctx.operation,
386                    service = %ctx.service,
387                    user_id = ?ctx.user_id,
388                    session_id = ?ctx.session_id,
389                    message = message
390                ),
391                LogLevel::Debug => tracing::debug!(
392                    correlation_id = %ctx.correlation_id,
393                    operation = %ctx.operation,
394                    service = %ctx.service,
395                    user_id = ?ctx.user_id,
396                    session_id = ?ctx.session_id,
397                    message = message
398                ),
399                LogLevel::Trace => tracing::trace!(
400                    correlation_id = %ctx.correlation_id,
401                    operation = %ctx.operation,
402                    service = %ctx.service,
403                    user_id = ?ctx.user_id,
404                    session_id = ?ctx.session_id,
405                    message = message
406                ),
407            }
408        } else {
409            match level {
410                LogLevel::Error => tracing::error!(service = %self.service_name, message = message),
411                LogLevel::Warn => tracing::warn!(service = %self.service_name, message = message),
412                LogLevel::Info => tracing::info!(service = %self.service_name, message = message),
413                LogLevel::Debug => tracing::debug!(service = %self.service_name, message = message),
414                LogLevel::Trace => tracing::trace!(service = %self.service_name, message = message),
415            }
416        }
417    }
418}
419
420/// Observability-aware logger that combines structured logging with telemetry
421#[derive(Debug)]
422pub struct ObservabilityLogger {
423    structured_logger: Arc<StructuredLogger>,
424    #[allow(dead_code)] // Correlation logger for request tracing (initialized but not currently used in observability logger)
425    correlation_logger: CorrelationLogger,
426}
427
428impl ObservabilityLogger {
429    pub fn new(structured_logger: Arc<StructuredLogger>, service_name: String) -> Self {
430        Self {
431            structured_logger,
432            correlation_logger: CorrelationLogger::new(service_name),
433        }
434    }
435
436    /// Log with full observability context
437    pub async fn log_with_observability(
438        &self,
439        level: LogLevel,
440        message: &str,
441        trace_context: Option<&TraceContext>,
442        correlation_context: Option<&CorrelationContext>,
443    ) {
444        if let Some(trace_ctx) = trace_context {
445            self.structured_logger.log_with_context(level, message, trace_ctx);
446        } else if let Some(corr_ctx) = correlation_context {
447            self.structured_logger.log_with_correlation(level, message, corr_ctx);
448        } else {
449            self.structured_logger.log(level, message);
450        }
451    }
452}
453
454/// Log aggregator for collecting and analyzing logs
455#[derive(Debug)]
456pub struct LogAggregator {
457    entries: Arc<RwLock<Vec<LogEntry>>>,
458}
459
460impl LogAggregator {
461    pub fn new() -> Self {
462        Self {
463            entries: Arc::new(RwLock::new(Vec::new())),
464        }
465    }
466
467    /// Add a log entry
468    pub async fn add_entry(&self, entry: LogEntry) {
469        self.entries.write().await.push(entry);
470    }
471
472    /// Get entries by correlation ID
473    pub async fn get_entries_by_correlation(&self, correlation_id: &CorrelationId) -> Vec<LogEntry> {
474        let entries = self.entries.read().await;
475        entries
476            .iter()
477            .filter(|entry| {
478                entry.context.correlation_id.as_ref() == Some(correlation_id)
479            })
480            .cloned()
481            .collect()
482    }
483
484    /// Get entries by operation
485    pub async fn get_entries_by_operation(&self, operation: &str) -> Vec<LogEntry> {
486        let entries = self.entries.read().await;
487        entries
488            .iter()
489            .filter(|entry| {
490                entry.context.operation.as_deref() == Some(operation)
491            })
492            .cloned()
493            .collect()
494    }
495
496    /// Get error entries
497    pub async fn get_error_entries(&self) -> Vec<LogEntry> {
498        let entries = self.entries.read().await;
499        entries
500            .iter()
501            .filter(|entry| entry.level == LogLevel::Error)
502            .cloned()
503            .collect()
504    }
505
506    /// Generate aggregated statistics
507    pub async fn get_statistics(&self) -> LogStatistics {
508        let entries = self.entries.read().await;
509        
510        let mut stats = LogStatistics {
511            total_entries: entries.len() as u64,
512            ..Default::default()
513        };
514
515        for entry in entries.iter() {
516            match entry.level {
517                LogLevel::Error => stats.error_count += 1,
518                LogLevel::Warn => stats.warn_count += 1,
519                LogLevel::Info => stats.info_count += 1,
520                LogLevel::Debug => stats.debug_count += 1,
521                LogLevel::Trace => stats.trace_count += 1,
522            }
523
524            if let Some(operation) = &entry.context.operation {
525                *stats.operations.entry(operation.clone()).or_insert(0) += 1;
526            }
527
528            if let Some(correlation_id) = &entry.context.correlation_id {
529                stats.unique_correlations.insert(correlation_id.to_string());
530            }
531        }
532
533        stats.unique_correlation_count = stats.unique_correlations.len() as u64;
534        stats
535    }
536}
537
538impl Default for LogAggregator {
539    fn default() -> Self {
540        Self::new()
541    }
542}
543
544/// Statistics about collected logs
545#[derive(Debug, Clone, Serialize, Deserialize)]
546#[derive(Default)]
547pub struct LogStatistics {
548    pub total_entries: u64,
549    pub error_count: u64,
550    pub warn_count: u64,
551    pub info_count: u64,
552    pub debug_count: u64,
553    pub trace_count: u64,
554    pub unique_correlation_count: u64,
555    pub operations: HashMap<String, u64>,
556    #[serde(skip)]
557    pub unique_correlations: std::collections::HashSet<String>,
558}
559
560
561/// Custom tracing layer for capturing structured logs
562struct ObservabilityLayer {
563    entries: Arc<RwLock<Vec<LogEntry>>>,
564}
565
566impl ObservabilityLayer {
567    fn new(entries: Arc<RwLock<Vec<LogEntry>>>) -> Self {
568        Self { entries }
569    }
570}
571
572impl<S> Layer<S> for ObservabilityLayer
573where
574    S: Subscriber + for<'lookup> LookupSpan<'lookup>,
575{
576    fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
577        let metadata = event.metadata();
578        let level = LogLevel::from(metadata.level());
579        
580        // Extract message and fields from the event
581        let mut visitor = EventVisitor::new();
582        event.record(&mut visitor);
583        
584        let context = LogContext::new("eventuali-core")
585            .with_attribute("target", metadata.target())
586            .with_attribute("module", metadata.module_path().unwrap_or("unknown"));
587
588        let entry = LogEntry::new(level, visitor.message.unwrap_or_default(), context)
589            .with_metadata(
590                metadata.module_path(),
591                metadata.file(),
592                metadata.line(),
593                Some(metadata.target()),
594            );
595
596        if let Ok(mut entries) = self.entries.try_write() {
597            entries.push(entry);
598        }
599    }
600}
601
602/// Visitor for extracting event data
603struct EventVisitor {
604    message: Option<String>,
605    fields: HashMap<String, serde_json::Value>,
606}
607
608impl EventVisitor {
609    fn new() -> Self {
610        Self {
611            message: None,
612            fields: HashMap::new(),
613        }
614    }
615}
616
617impl tracing::field::Visit for EventVisitor {
618    fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
619        if field.name() == "message" {
620            self.message = Some(format!("{value:?}"));
621        } else {
622            self.fields.insert(field.name().to_string(), serde_json::Value::String(format!("{value:?}")));
623        }
624    }
625
626    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
627        if field.name() == "message" {
628            self.message = Some(value.to_string());
629        } else {
630            self.fields.insert(field.name().to_string(), serde_json::Value::String(value.to_string()));
631        }
632    }
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638
639    #[test]
640    fn test_log_level_conversion() {
641        assert_eq!(tracing::Level::from(LogLevel::Error), tracing::Level::ERROR);
642        assert_eq!(tracing::Level::from(LogLevel::Info), tracing::Level::INFO);
643        assert_eq!(LogLevel::from(&tracing::Level::WARN), LogLevel::Warn);
644    }
645
646    #[test]
647    fn test_log_context_creation() {
648        let context = LogContext::new("test-service")
649            .with_attribute("key1", "value1")
650            .with_user_id("user123")
651            .with_operation("test_operation");
652
653        assert_eq!(context.service_name, "test-service");
654        assert_eq!(context.user_id, Some("user123".to_string()));
655        assert_eq!(context.operation, Some("test_operation".to_string()));
656        assert_eq!(context.attributes.get("key1"), Some(&"value1".to_string()));
657    }
658
659    #[test]
660    fn test_log_entry_creation() {
661        let context = LogContext::new("test-service");
662        let entry = LogEntry::new(LogLevel::Info, "Test message", context)
663            .with_field("field1", serde_json::Value::String("value1".to_string()));
664
665        assert_eq!(entry.level, LogLevel::Info);
666        assert_eq!(entry.message, "Test message");
667        assert_eq!(entry.fields.get("field1"), Some(&serde_json::Value::String("value1".to_string())));
668    }
669
670    #[tokio::test]
671    async fn test_structured_logger_creation() {
672        let config = ObservabilityConfig {
673            structured_logging: false, // Disable to avoid global subscriber conflicts
674            ..ObservabilityConfig::default()
675        };
676        let logger = StructuredLogger::new(&config).unwrap();
677        
678        assert_eq!(logger.config.service_name, "eventuali");
679    }
680
681    #[tokio::test]
682    async fn test_log_aggregator() {
683        let aggregator = LogAggregator::new();
684        let context = LogContext::new("test-service").with_operation("test_op");
685        let entry1 = LogEntry::new(LogLevel::Info, "Message 1", context.clone());
686        let entry2 = LogEntry::new(LogLevel::Error, "Message 2", context);
687
688        aggregator.add_entry(entry1).await;
689        aggregator.add_entry(entry2).await;
690
691        let stats = aggregator.get_statistics().await;
692        assert_eq!(stats.total_entries, 2);
693        assert_eq!(stats.info_count, 1);
694        assert_eq!(stats.error_count, 1);
695        assert_eq!(stats.operations.get("test_op"), Some(&2));
696    }
697}