1use crate::error::{EventualiError, Result};
6use crate::observability::{
7 correlation::{CorrelationId, CorrelationContext},
8 telemetry::TraceContext,
9 ObservabilityConfig,
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use tracing::{Event, Subscriber};
16use tracing_subscriber::{
17 fmt::format::FmtSpan,
18 layer::{Context, SubscriberExt},
19 registry::LookupSpan,
20 Layer, Registry,
21};
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
25pub enum LogLevel {
26 Error,
27 Warn,
28 Info,
29 Debug,
30 Trace,
31}
32
33impl From<LogLevel> for tracing::Level {
34 fn from(level: LogLevel) -> Self {
35 match level {
36 LogLevel::Error => tracing::Level::ERROR,
37 LogLevel::Warn => tracing::Level::WARN,
38 LogLevel::Info => tracing::Level::INFO,
39 LogLevel::Debug => tracing::Level::DEBUG,
40 LogLevel::Trace => tracing::Level::TRACE,
41 }
42 }
43}
44
45impl From<&tracing::Level> for LogLevel {
46 fn from(level: &tracing::Level) -> Self {
47 match *level {
48 tracing::Level::ERROR => LogLevel::Error,
49 tracing::Level::WARN => LogLevel::Warn,
50 tracing::Level::INFO => LogLevel::Info,
51 tracing::Level::DEBUG => LogLevel::Debug,
52 tracing::Level::TRACE => LogLevel::Trace,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct LogContext {
60 pub correlation_id: Option<CorrelationId>,
61 pub trace_id: Option<String>,
62 pub span_id: Option<String>,
63 pub user_id: Option<String>,
64 pub session_id: Option<String>,
65 pub request_id: Option<String>,
66 pub service_name: String,
67 pub operation: Option<String>,
68 pub attributes: HashMap<String, String>,
69}
70
71impl LogContext {
72 pub fn new(service_name: impl Into<String>) -> Self {
74 Self {
75 correlation_id: None,
76 trace_id: None,
77 span_id: None,
78 user_id: None,
79 session_id: None,
80 request_id: None,
81 service_name: service_name.into(),
82 operation: None,
83 attributes: HashMap::new(),
84 }
85 }
86
87 pub fn from_correlation_context(context: &CorrelationContext) -> Self {
89 Self {
90 correlation_id: Some(context.correlation_id.clone()),
91 trace_id: context.trace_id.clone(),
92 span_id: context.span_id.clone(),
93 user_id: context.user_id.clone(),
94 session_id: context.session_id.clone(),
95 request_id: context.request_id.clone(),
96 service_name: context.service.clone(),
97 operation: Some(context.operation.clone()),
98 attributes: context.attributes.clone(),
99 }
100 }
101
102 pub fn from_trace_context(trace_context: &TraceContext, service_name: impl Into<String>) -> Self {
104 Self {
105 correlation_id: Some(trace_context.correlation_id.clone()),
106 trace_id: None, span_id: None, user_id: None,
109 session_id: None,
110 request_id: None,
111 service_name: service_name.into(),
112 operation: Some(trace_context.operation.clone()),
113 attributes: trace_context.attributes.clone(),
114 }
115 }
116
117 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
119 self.attributes.insert(key.into(), value.into());
120 self
121 }
122
123 pub fn with_user_id(mut self, user_id: impl Into<String>) -> Self {
125 self.user_id = Some(user_id.into());
126 self
127 }
128
129 pub fn with_session_id(mut self, session_id: impl Into<String>) -> Self {
131 self.session_id = Some(session_id.into());
132 self
133 }
134
135 pub fn with_operation(mut self, operation: impl Into<String>) -> Self {
137 self.operation = Some(operation.into());
138 self
139 }
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct LogEntry {
145 pub timestamp: chrono::DateTime<chrono::Utc>,
146 pub level: LogLevel,
147 pub message: String,
148 pub context: LogContext,
149 pub module: Option<String>,
150 pub file: Option<String>,
151 pub line: Option<u32>,
152 pub target: Option<String>,
153 pub fields: HashMap<String, serde_json::Value>,
154}
155
156impl LogEntry {
157 pub fn new(level: LogLevel, message: impl Into<String>, context: LogContext) -> Self {
159 Self {
160 timestamp: chrono::Utc::now(),
161 level,
162 message: message.into(),
163 context,
164 module: None,
165 file: None,
166 line: None,
167 target: None,
168 fields: HashMap::new(),
169 }
170 }
171
172 pub fn with_metadata(
174 mut self,
175 module: Option<&str>,
176 file: Option<&str>,
177 line: Option<u32>,
178 target: Option<&str>,
179 ) -> Self {
180 self.module = module.map(|s| s.to_string());
181 self.file = file.map(|s| s.to_string());
182 self.line = line;
183 self.target = target.map(|s| s.to_string());
184 self
185 }
186
187 pub fn with_field(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
189 self.fields.insert(key.into(), value);
190 self
191 }
192
193 pub fn to_json(&self) -> Result<String> {
195 serde_json::to_string(self)
196 .map_err(|e| EventualiError::ObservabilityError(format!("Failed to serialize log entry: {e}")))
197 }
198
199 pub fn to_json_pretty(&self) -> Result<String> {
201 serde_json::to_string_pretty(self)
202 .map_err(|e| EventualiError::ObservabilityError(format!("Failed to serialize log entry: {e}")))
203 }
204}
205
206#[derive(Debug)]
208pub struct StructuredLogger {
209 config: ObservabilityConfig,
210 entries: Arc<RwLock<Vec<LogEntry>>>,
211 #[allow(dead_code)] correlation_logger: CorrelationLogger,
213}
214
215impl StructuredLogger {
216 pub fn new(config: &ObservabilityConfig) -> Result<Self> {
218 Ok(Self {
219 config: config.clone(),
220 entries: Arc::new(RwLock::new(Vec::new())),
221 correlation_logger: CorrelationLogger::new(config.service_name.clone()),
222 })
223 }
224
225 pub async fn initialize(&self) -> Result<()> {
227 if self.config.structured_logging {
229 let subscriber = Registry::default()
230 .with(
231 tracing_subscriber::fmt::layer()
232 .with_target(true)
233 .with_thread_ids(true)
234 .with_thread_names(true)
235 .with_span_events(FmtSpan::CLOSE)
236 )
237 .with(ObservabilityLayer::new(self.entries.clone()));
238
239 tracing::subscriber::set_global_default(subscriber)
240 .map_err(|e| EventualiError::ObservabilityError(format!("Failed to set tracing subscriber: {e}")))?;
241 }
242
243 tracing::info!(
244 structured_logging = self.config.structured_logging,
245 service_name = %self.config.service_name,
246 "Structured logger initialized"
247 );
248
249 Ok(())
250 }
251
252 pub fn log_with_context(&self, level: LogLevel, message: &str, trace_context: &TraceContext) {
254 let context = LogContext::from_trace_context(trace_context, &self.config.service_name);
255 let entry = LogEntry::new(level, message, context);
256
257 if let Ok(mut entries) = self.entries.try_write() {
259 entries.push(entry.clone());
260 }
261
262 match level {
264 LogLevel::Error => tracing::error!(
265 correlation_id = %trace_context.correlation_id,
266 operation = %trace_context.operation,
267 message = message
268 ),
269 LogLevel::Warn => tracing::warn!(
270 correlation_id = %trace_context.correlation_id,
271 operation = %trace_context.operation,
272 message = message
273 ),
274 LogLevel::Info => tracing::info!(
275 correlation_id = %trace_context.correlation_id,
276 operation = %trace_context.operation,
277 message = message
278 ),
279 LogLevel::Debug => tracing::debug!(
280 correlation_id = %trace_context.correlation_id,
281 operation = %trace_context.operation,
282 message = message
283 ),
284 LogLevel::Trace => tracing::trace!(
285 correlation_id = %trace_context.correlation_id,
286 operation = %trace_context.operation,
287 message = message
288 ),
289 }
290 }
291
292 pub fn log_with_correlation(&self, level: LogLevel, message: &str, context: &CorrelationContext) {
294 let log_context = LogContext::from_correlation_context(context);
295 let entry = LogEntry::new(level, message, log_context);
296
297 if let Ok(mut entries) = self.entries.try_write() {
298 entries.push(entry);
299 }
300
301 self.correlation_logger.log(level, message, Some(context));
302 }
303
304 pub fn log(&self, level: LogLevel, message: &str) {
306 let context = LogContext::new(&self.config.service_name);
307 let entry = LogEntry::new(level, message, context);
308
309 if let Ok(mut entries) = self.entries.try_write() {
310 entries.push(entry);
311 }
312
313 self.correlation_logger.log(level, message, None);
314 }
315
316 pub async fn get_recent_entries(&self, limit: usize) -> Vec<LogEntry> {
318 let entries = self.entries.read().await;
319 let start = if entries.len() > limit { entries.len() - limit } else { 0 };
320 entries[start..].to_vec()
321 }
322
323 pub async fn clear_entries(&self) {
325 self.entries.write().await.clear();
326 }
327
328 pub async fn get_all_entries(&self) -> Vec<LogEntry> {
330 self.entries.read().await.clone()
331 }
332
333 pub async fn export_logs(&self, file_path: &str) -> Result<()> {
335 let entries = self.get_all_entries().await;
336 let json = serde_json::to_string_pretty(&entries)
337 .map_err(|e| EventualiError::ObservabilityError(format!("Failed to serialize logs: {e}")))?;
338
339 tokio::fs::write(file_path, json).await
340 .map_err(|e| EventualiError::ObservabilityError(format!("Failed to write log file: {e}")))?;
341
342 Ok(())
343 }
344
345 pub async fn shutdown(&self) -> Result<()> {
347 tracing::info!("Structured logger shut down successfully");
348 Ok(())
349 }
350}
351
352#[derive(Debug)]
354pub struct CorrelationLogger {
355 service_name: String,
356}
357
358impl CorrelationLogger {
359 pub fn new(service_name: String) -> Self {
360 Self { service_name }
361 }
362
363 pub fn log(&self, level: LogLevel, message: &str, context: Option<&CorrelationContext>) {
365 if let Some(ctx) = context {
366 match level {
367 LogLevel::Error => tracing::error!(
368 correlation_id = %ctx.correlation_id,
369 operation = %ctx.operation,
370 service = %ctx.service,
371 user_id = ?ctx.user_id,
372 session_id = ?ctx.session_id,
373 message = message
374 ),
375 LogLevel::Warn => tracing::warn!(
376 correlation_id = %ctx.correlation_id,
377 operation = %ctx.operation,
378 service = %ctx.service,
379 user_id = ?ctx.user_id,
380 session_id = ?ctx.session_id,
381 message = message
382 ),
383 LogLevel::Info => tracing::info!(
384 correlation_id = %ctx.correlation_id,
385 operation = %ctx.operation,
386 service = %ctx.service,
387 user_id = ?ctx.user_id,
388 session_id = ?ctx.session_id,
389 message = message
390 ),
391 LogLevel::Debug => tracing::debug!(
392 correlation_id = %ctx.correlation_id,
393 operation = %ctx.operation,
394 service = %ctx.service,
395 user_id = ?ctx.user_id,
396 session_id = ?ctx.session_id,
397 message = message
398 ),
399 LogLevel::Trace => tracing::trace!(
400 correlation_id = %ctx.correlation_id,
401 operation = %ctx.operation,
402 service = %ctx.service,
403 user_id = ?ctx.user_id,
404 session_id = ?ctx.session_id,
405 message = message
406 ),
407 }
408 } else {
409 match level {
410 LogLevel::Error => tracing::error!(service = %self.service_name, message = message),
411 LogLevel::Warn => tracing::warn!(service = %self.service_name, message = message),
412 LogLevel::Info => tracing::info!(service = %self.service_name, message = message),
413 LogLevel::Debug => tracing::debug!(service = %self.service_name, message = message),
414 LogLevel::Trace => tracing::trace!(service = %self.service_name, message = message),
415 }
416 }
417 }
418}
419
420#[derive(Debug)]
422pub struct ObservabilityLogger {
423 structured_logger: Arc<StructuredLogger>,
424 #[allow(dead_code)] correlation_logger: CorrelationLogger,
426}
427
428impl ObservabilityLogger {
429 pub fn new(structured_logger: Arc<StructuredLogger>, service_name: String) -> Self {
430 Self {
431 structured_logger,
432 correlation_logger: CorrelationLogger::new(service_name),
433 }
434 }
435
436 pub async fn log_with_observability(
438 &self,
439 level: LogLevel,
440 message: &str,
441 trace_context: Option<&TraceContext>,
442 correlation_context: Option<&CorrelationContext>,
443 ) {
444 if let Some(trace_ctx) = trace_context {
445 self.structured_logger.log_with_context(level, message, trace_ctx);
446 } else if let Some(corr_ctx) = correlation_context {
447 self.structured_logger.log_with_correlation(level, message, corr_ctx);
448 } else {
449 self.structured_logger.log(level, message);
450 }
451 }
452}
453
454#[derive(Debug)]
456pub struct LogAggregator {
457 entries: Arc<RwLock<Vec<LogEntry>>>,
458}
459
460impl LogAggregator {
461 pub fn new() -> Self {
462 Self {
463 entries: Arc::new(RwLock::new(Vec::new())),
464 }
465 }
466
467 pub async fn add_entry(&self, entry: LogEntry) {
469 self.entries.write().await.push(entry);
470 }
471
472 pub async fn get_entries_by_correlation(&self, correlation_id: &CorrelationId) -> Vec<LogEntry> {
474 let entries = self.entries.read().await;
475 entries
476 .iter()
477 .filter(|entry| {
478 entry.context.correlation_id.as_ref() == Some(correlation_id)
479 })
480 .cloned()
481 .collect()
482 }
483
484 pub async fn get_entries_by_operation(&self, operation: &str) -> Vec<LogEntry> {
486 let entries = self.entries.read().await;
487 entries
488 .iter()
489 .filter(|entry| {
490 entry.context.operation.as_deref() == Some(operation)
491 })
492 .cloned()
493 .collect()
494 }
495
496 pub async fn get_error_entries(&self) -> Vec<LogEntry> {
498 let entries = self.entries.read().await;
499 entries
500 .iter()
501 .filter(|entry| entry.level == LogLevel::Error)
502 .cloned()
503 .collect()
504 }
505
506 pub async fn get_statistics(&self) -> LogStatistics {
508 let entries = self.entries.read().await;
509
510 let mut stats = LogStatistics {
511 total_entries: entries.len() as u64,
512 ..Default::default()
513 };
514
515 for entry in entries.iter() {
516 match entry.level {
517 LogLevel::Error => stats.error_count += 1,
518 LogLevel::Warn => stats.warn_count += 1,
519 LogLevel::Info => stats.info_count += 1,
520 LogLevel::Debug => stats.debug_count += 1,
521 LogLevel::Trace => stats.trace_count += 1,
522 }
523
524 if let Some(operation) = &entry.context.operation {
525 *stats.operations.entry(operation.clone()).or_insert(0) += 1;
526 }
527
528 if let Some(correlation_id) = &entry.context.correlation_id {
529 stats.unique_correlations.insert(correlation_id.to_string());
530 }
531 }
532
533 stats.unique_correlation_count = stats.unique_correlations.len() as u64;
534 stats
535 }
536}
537
538impl Default for LogAggregator {
539 fn default() -> Self {
540 Self::new()
541 }
542}
543
544#[derive(Debug, Clone, Serialize, Deserialize)]
546#[derive(Default)]
547pub struct LogStatistics {
548 pub total_entries: u64,
549 pub error_count: u64,
550 pub warn_count: u64,
551 pub info_count: u64,
552 pub debug_count: u64,
553 pub trace_count: u64,
554 pub unique_correlation_count: u64,
555 pub operations: HashMap<String, u64>,
556 #[serde(skip)]
557 pub unique_correlations: std::collections::HashSet<String>,
558}
559
560
561struct ObservabilityLayer {
563 entries: Arc<RwLock<Vec<LogEntry>>>,
564}
565
566impl ObservabilityLayer {
567 fn new(entries: Arc<RwLock<Vec<LogEntry>>>) -> Self {
568 Self { entries }
569 }
570}
571
572impl<S> Layer<S> for ObservabilityLayer
573where
574 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
575{
576 fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) {
577 let metadata = event.metadata();
578 let level = LogLevel::from(metadata.level());
579
580 let mut visitor = EventVisitor::new();
582 event.record(&mut visitor);
583
584 let context = LogContext::new("eventuali-core")
585 .with_attribute("target", metadata.target())
586 .with_attribute("module", metadata.module_path().unwrap_or("unknown"));
587
588 let entry = LogEntry::new(level, visitor.message.unwrap_or_default(), context)
589 .with_metadata(
590 metadata.module_path(),
591 metadata.file(),
592 metadata.line(),
593 Some(metadata.target()),
594 );
595
596 if let Ok(mut entries) = self.entries.try_write() {
597 entries.push(entry);
598 }
599 }
600}
601
602struct EventVisitor {
604 message: Option<String>,
605 fields: HashMap<String, serde_json::Value>,
606}
607
608impl EventVisitor {
609 fn new() -> Self {
610 Self {
611 message: None,
612 fields: HashMap::new(),
613 }
614 }
615}
616
617impl tracing::field::Visit for EventVisitor {
618 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
619 if field.name() == "message" {
620 self.message = Some(format!("{value:?}"));
621 } else {
622 self.fields.insert(field.name().to_string(), serde_json::Value::String(format!("{value:?}")));
623 }
624 }
625
626 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
627 if field.name() == "message" {
628 self.message = Some(value.to_string());
629 } else {
630 self.fields.insert(field.name().to_string(), serde_json::Value::String(value.to_string()));
631 }
632 }
633}
634
635#[cfg(test)]
636mod tests {
637 use super::*;
638
639 #[test]
640 fn test_log_level_conversion() {
641 assert_eq!(tracing::Level::from(LogLevel::Error), tracing::Level::ERROR);
642 assert_eq!(tracing::Level::from(LogLevel::Info), tracing::Level::INFO);
643 assert_eq!(LogLevel::from(&tracing::Level::WARN), LogLevel::Warn);
644 }
645
646 #[test]
647 fn test_log_context_creation() {
648 let context = LogContext::new("test-service")
649 .with_attribute("key1", "value1")
650 .with_user_id("user123")
651 .with_operation("test_operation");
652
653 assert_eq!(context.service_name, "test-service");
654 assert_eq!(context.user_id, Some("user123".to_string()));
655 assert_eq!(context.operation, Some("test_operation".to_string()));
656 assert_eq!(context.attributes.get("key1"), Some(&"value1".to_string()));
657 }
658
659 #[test]
660 fn test_log_entry_creation() {
661 let context = LogContext::new("test-service");
662 let entry = LogEntry::new(LogLevel::Info, "Test message", context)
663 .with_field("field1", serde_json::Value::String("value1".to_string()));
664
665 assert_eq!(entry.level, LogLevel::Info);
666 assert_eq!(entry.message, "Test message");
667 assert_eq!(entry.fields.get("field1"), Some(&serde_json::Value::String("value1".to_string())));
668 }
669
670 #[tokio::test]
671 async fn test_structured_logger_creation() {
672 let config = ObservabilityConfig {
673 structured_logging: false, ..ObservabilityConfig::default()
675 };
676 let logger = StructuredLogger::new(&config).unwrap();
677
678 assert_eq!(logger.config.service_name, "eventuali");
679 }
680
681 #[tokio::test]
682 async fn test_log_aggregator() {
683 let aggregator = LogAggregator::new();
684 let context = LogContext::new("test-service").with_operation("test_op");
685 let entry1 = LogEntry::new(LogLevel::Info, "Message 1", context.clone());
686 let entry2 = LogEntry::new(LogLevel::Error, "Message 2", context);
687
688 aggregator.add_entry(entry1).await;
689 aggregator.add_entry(entry2).await;
690
691 let stats = aggregator.get_statistics().await;
692 assert_eq!(stats.total_entries, 2);
693 assert_eq!(stats.info_count, 1);
694 assert_eq!(stats.error_count, 1);
695 assert_eq!(stats.operations.get("test_op"), Some(&2));
696 }
697}