1use crate::error::{ObservabilityError, ObservabilityResult};
7use crate::traits::LogLevel;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct LogEntry {
15 pub timestamp: DateTime<Utc>,
16 pub level: LogLevel,
17 pub message: String,
18 pub fields: serde_json::Value,
19 pub trace_context: Option<TraceContext>,
20 pub source: LogSource,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct LogSource {
26 pub module: Option<String>,
27 pub file: Option<String>,
28 pub line: Option<u32>,
29 pub target: Option<String>,
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct TraceCorrelation {
38 pub trace_id: String,
39 pub span_id: String,
40 pub parent_span_id: Option<String>,
41}
42
43pub type TraceContext = TraceCorrelation;
45
46pub trait LogProcessor: Send + Sync + std::fmt::Debug {
48 fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry>;
50
51 fn name(&self) -> &'static str;
53}
54
55#[derive(Debug)]
57pub struct ProcessorChain {
58 processors: Vec<Box<dyn LogProcessor>>,
59}
60
61impl ProcessorChain {
62 pub fn new() -> Self {
64 Self {
65 processors: Vec::new(),
66 }
67 }
68
69 pub fn add_processor(mut self, processor: Box<dyn LogProcessor>) -> Self {
71 self.processors.push(processor);
72 self
73 }
74
75 pub fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
77 let mut processed = entry;
78
79 for processor in &self.processors {
80 processed = processor.process(processed).map_err(|e| {
81 ObservabilityError::logging(format!(
82 "Processor '{}' failed: {}",
83 processor.name(),
84 e
85 ))
86 })?;
87 }
88
89 Ok(processed)
90 }
91
92 pub fn len(&self) -> usize {
94 self.processors.len()
95 }
96
97 pub fn is_empty(&self) -> bool {
99 self.processors.is_empty()
100 }
101}
102
103impl Default for ProcessorChain {
104 fn default() -> Self {
105 Self::new()
106 }
107}
108
109#[derive(Debug)]
113pub struct TimestampProcessor;
114
115impl LogProcessor for TimestampProcessor {
116 fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
117 entry.timestamp = Utc::now();
118 Ok(entry)
119 }
120
121 fn name(&self) -> &'static str {
122 "timestamp"
123 }
124}
125
126#[derive(Debug)]
128pub struct ContextEnricher {
129 additional_fields: HashMap<String, serde_json::Value>,
130}
131
132impl ContextEnricher {
133 pub fn new() -> Self {
134 Self {
135 additional_fields: HashMap::new(),
136 }
137 }
138
139 pub fn with_field(
140 mut self,
141 key: impl Into<String>,
142 value: impl Into<serde_json::Value>,
143 ) -> Self {
144 self.additional_fields.insert(key.into(), value.into());
145 self
146 }
147}
148
149impl Default for ContextEnricher {
150 fn default() -> Self {
151 Self::new()
152 }
153}
154
155impl LogProcessor for ContextEnricher {
156 fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
157 if let serde_json::Value::Object(ref mut map) = entry.fields {
159 for (key, value) in &self.additional_fields {
160 if !map.contains_key(key) {
161 map.insert(key.clone(), value.clone());
162 }
163 }
164 }
165
166 Ok(entry)
167 }
168
169 fn name(&self) -> &'static str {
170 "context_enricher"
171 }
172}
173
174#[derive(Debug)]
176pub struct StructuredFieldsProcessor;
177
178impl LogProcessor for StructuredFieldsProcessor {
179 fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
180 if !entry.fields.is_object() {
182 entry.fields = serde_json::json!({});
183 }
184
185 if let serde_json::Value::Object(ref mut map) = entry.fields {
187 map.insert(
188 "timestamp".to_string(),
189 serde_json::json!(entry.timestamp.to_rfc3339()),
190 );
191 map.insert("level".to_string(), serde_json::json!(entry.level.as_str()));
192 map.insert("message".to_string(), serde_json::json!(entry.message));
193
194 if let Some(ref module) = entry.source.module {
196 map.insert("module".to_string(), serde_json::json!(module));
197 }
198 if let Some(ref file) = entry.source.file {
199 map.insert("file".to_string(), serde_json::json!(file));
200 }
201 if let Some(line) = entry.source.line {
202 map.insert("line".to_string(), serde_json::json!(line));
203 }
204
205 if let Some(ref trace_ctx) = entry.trace_context {
207 map.insert(
208 "trace_id".to_string(),
209 serde_json::json!(trace_ctx.trace_id),
210 );
211 map.insert("span_id".to_string(), serde_json::json!(trace_ctx.span_id));
212 if let Some(ref parent) = trace_ctx.parent_span_id {
213 map.insert("parent_span_id".to_string(), serde_json::json!(parent));
214 }
215 }
216 }
217
218 Ok(entry)
219 }
220
221 fn name(&self) -> &'static str {
222 "structured_fields"
223 }
224}
225
226#[derive(Debug)]
228pub struct LevelFilter {
229 min_level: LogLevel,
230}
231
232impl LevelFilter {
233 pub fn new(min_level: LogLevel) -> Self {
234 Self { min_level }
235 }
236}
237
238impl LogProcessor for LevelFilter {
239 fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
240 if entry.level <= self.min_level {
241 Ok(entry)
242 } else {
243 Err(ObservabilityError::logging("Log level filtered out"))
244 }
245 }
246
247 fn name(&self) -> &'static str {
248 "level_filter"
249 }
250}
251
252#[derive(Debug)]
257pub struct LogKvExtractor;
258
259impl LogKvExtractor {
260 pub fn new() -> Self {
261 Self
262 }
263
264 pub fn extract_kv_from_record(record: &log::Record) -> serde_json::Value {
269 let mut fields = serde_json::Map::new();
270
271 let key_values = record.key_values();
273 let mut visitor = LogKvVisitor::new(&mut fields);
274 let _ = key_values.visit(&mut visitor);
275
276 serde_json::Value::Object(fields)
277 }
278}
279
280impl LogProcessor for LogKvExtractor {
281 fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
282 let mut entry = entry;
287 if !entry.fields.is_object() {
288 entry.fields = serde_json::json!({});
289 }
290
291 if let serde_json::Value::Object(ref mut map) = entry.fields {
293 if !map.contains_key("kv_extracted") {
294 map.insert("kv_extracted".to_string(), serde_json::json!(true));
295 }
296 }
297
298 Ok(entry)
299 }
300
301 fn name(&self) -> &'static str {
302 "log_kv_extractor"
303 }
304}
305
306impl Default for LogKvExtractor {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312struct LogKvVisitor<'a> {
314 fields: &'a mut serde_json::Map<String, serde_json::Value>,
315}
316
317impl<'a> LogKvVisitor<'a> {
318 fn new(fields: &'a mut serde_json::Map<String, serde_json::Value>) -> Self {
319 Self { fields }
320 }
321}
322
323impl<'a> log::kv::Visitor<'a> for LogKvVisitor<'a> {
324 fn visit_pair(
325 &mut self,
326 key: log::kv::Key,
327 value: log::kv::Value,
328 ) -> Result<(), log::kv::Error> {
329 let key_str = key.as_str();
330
331 let json_value = match value.to_borrowed_str() {
333 Some(s) => serde_json::json!(s),
334 None => {
335 if let Some(i) = value.to_i64() {
337 serde_json::json!(i)
338 } else if let Some(u) = value.to_u64() {
339 serde_json::json!(u)
340 } else if let Some(f) = value.to_f64() {
341 serde_json::json!(f)
342 } else if let Some(b) = value.to_bool() {
343 serde_json::json!(b)
344 } else {
345 serde_json::json!(format!("{:?}", value))
347 }
348 }
349 };
350
351 self.fields.insert(key_str.to_string(), json_value);
352 Ok(())
353 }
354}
355
356#[derive(Debug)]
358pub struct EnhancedContextEnricher {
359 additional_fields: HashMap<String, serde_json::Value>,
360 extract_kv: bool,
361}
362
363impl EnhancedContextEnricher {
364 pub fn new() -> Self {
365 Self {
366 additional_fields: HashMap::new(),
367 extract_kv: true,
368 }
369 }
370
371 pub fn with_field(
372 mut self,
373 key: impl Into<String>,
374 value: impl Into<serde_json::Value>,
375 ) -> Self {
376 self.additional_fields.insert(key.into(), value.into());
377 self
378 }
379
380 pub fn with_kv_extraction(mut self, extract_kv: bool) -> Self {
381 self.extract_kv = extract_kv;
382 self
383 }
384}
385
386impl LogProcessor for EnhancedContextEnricher {
387 fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
388 if let serde_json::Value::Object(ref mut map) = entry.fields {
390 for (key, value) in &self.additional_fields {
391 if !map.contains_key(key) {
392 map.insert(key.clone(), value.clone());
393 }
394 }
395
396 if self.extract_kv {
398 map.insert("enhanced_context".to_string(), serde_json::json!(true));
399 }
400 }
401
402 Ok(entry)
403 }
404
405 fn name(&self) -> &'static str {
406 "enhanced_context_enricher"
407 }
408}
409
410impl Default for EnhancedContextEnricher {
411 fn default() -> Self {
412 Self::new()
413 }
414}
415
416pub fn build_default_processor_chain() -> ProcessorChain {
420 ProcessorChain::new()
421 .add_processor(Box::new(TimestampProcessor))
422 .add_processor(Box::new(LogKvExtractor::new()))
423 .add_processor(Box::new(EnhancedContextEnricher::new()))
424 .add_processor(Box::new(StructuredFieldsProcessor))
425}
426
427pub fn build_enhanced_processor_chain() -> ProcessorChain {
429 ProcessorChain::new()
430 .add_processor(Box::new(TimestampProcessor))
431 .add_processor(Box::new(LogKvExtractor::new()))
432 .add_processor(Box::new(
433 EnhancedContextEnricher::new()
434 .with_field("sdk_version", env!("CARGO_PKG_VERSION"))
435 .with_field("architecture", "hexagonal"),
436 ))
437 .add_processor(Box::new(StructuredFieldsProcessor))
438}
439
440pub fn create_log_entry(
442 level: LogLevel,
443 message: impl Into<String>,
444 fields: serde_json::Value,
445) -> LogEntry {
446 LogEntry {
447 timestamp: Utc::now(),
448 level,
449 message: message.into(),
450 fields,
451 trace_context: None,
452 source: LogSource {
453 module: None,
454 file: None,
455 line: None,
456 target: None,
457 },
458 }
459}
460
461#[derive(Debug, Clone)]
463pub struct MetricsEntry {
464 pub name: String,
465 pub value: f64,
466 pub metric_type: BasicMetricType,
467 pub timestamp: DateTime<Utc>,
468 pub trace_context: Option<TraceContext>,
469 pub source: MetricsSource,
470}
471
472#[derive(Debug, Clone, PartialEq)]
474pub enum BasicMetricType {
475 Counter,
477 Histogram,
479 Gauge,
481}
482
483#[derive(Debug, Clone)]
485pub struct MetricsSource {
486 pub module: Option<String>,
487 pub component: Option<String>,
488 pub operation: Option<String>,
489}
490
491impl MetricsEntry {
492 pub fn new(name: impl Into<String>, value: f64, metric_type: BasicMetricType) -> Self {
494 Self {
495 name: name.into(),
496 value,
497 metric_type,
498 timestamp: Utc::now(),
499 trace_context: None,
500 source: MetricsSource {
501 module: None,
502 component: None,
503 operation: None,
504 },
505 }
506 }
507
508 pub fn with_trace_context(mut self, trace_context: TraceContext) -> Self {
510 self.trace_context = Some(trace_context);
511 self
512 }
513
514 pub fn with_source(
516 mut self,
517 module: Option<String>,
518 component: Option<String>,
519 operation: Option<String>,
520 ) -> Self {
521 self.source = MetricsSource {
522 module,
523 component,
524 operation,
525 };
526 self
527 }
528
529 pub fn to_json(&self) -> serde_json::Value {
531 let mut json = serde_json::json!({
532 "name": self.name,
533 "value": self.value,
534 "type": match self.metric_type {
535 BasicMetricType::Counter => "counter",
536 BasicMetricType::Histogram => "histogram",
537 BasicMetricType::Gauge => "gauge",
538 },
539 "timestamp": self.timestamp.to_rfc3339(),
540 });
541
542 if let Some(ref trace_ctx) = self.trace_context {
544 json["trace_id"] = serde_json::json!(trace_ctx.trace_id);
545 json["span_id"] = serde_json::json!(trace_ctx.span_id);
546 if let Some(ref parent) = trace_ctx.parent_span_id {
547 json["parent_span_id"] = serde_json::json!(parent);
548 }
549 }
550
551 if let Some(ref module) = self.source.module {
553 json["module"] = serde_json::json!(module);
554 }
555 if let Some(ref component) = self.source.component {
556 json["component"] = serde_json::json!(component);
557 }
558 if let Some(ref operation) = self.source.operation {
559 json["operation"] = serde_json::json!(operation);
560 }
561
562 json
563 }
564}
565
566pub fn create_counter_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
568 MetricsEntry::new(name, value, BasicMetricType::Counter)
569}
570
571pub fn create_histogram_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
573 MetricsEntry::new(name, value, BasicMetricType::Histogram)
574}
575
576pub fn create_gauge_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
578 MetricsEntry::new(name, value, BasicMetricType::Gauge)
579}
580
581#[cfg(test)]
582mod tests {
583 use super::*;
584
585 #[test]
586 fn test_processor_chain() {
587 let chain = ProcessorChain::new()
588 .add_processor(Box::new(TimestampProcessor))
589 .add_processor(Box::new(StructuredFieldsProcessor));
590
591 let entry = create_log_entry(
592 LogLevel::Info,
593 "Test message",
594 serde_json::json!({"key": "value"}),
595 );
596
597 let processed = chain.process(entry).unwrap();
598
599 assert_eq!(processed.level, LogLevel::Info);
600 assert_eq!(processed.message, "Test message");
601 assert!(processed.fields.get("timestamp").is_some());
602 assert!(processed.fields.get("level").is_some());
603 }
604
605 #[test]
606 fn test_context_enricher() {
607 let enricher = ContextEnricher::new()
608 .with_field("service", "test-service")
609 .with_field("version", "1.0.0");
610
611 let entry = create_log_entry(
612 LogLevel::Info,
613 "Test",
614 serde_json::json!({"existing": "field"}),
615 );
616
617 let processed = enricher.process(entry).unwrap();
618
619 let fields = processed.fields.as_object().unwrap();
620 assert_eq!(fields.get("service").unwrap(), "test-service");
621 assert_eq!(fields.get("version").unwrap(), "1.0.0");
622 assert_eq!(fields.get("existing").unwrap(), "field");
623 }
624
625 #[test]
626 fn test_level_filter() {
627 let filter = LevelFilter::new(LogLevel::Info);
628
629 let info_entry = create_log_entry(LogLevel::Info, "Info message", serde_json::json!({}));
630 let debug_entry = create_log_entry(LogLevel::Debug, "Debug message", serde_json::json!({}));
631
632 assert!(filter.process(info_entry).is_ok());
633 assert!(filter.process(debug_entry).is_err());
634 }
635}