Skip to main content

observability_core/
domain.rs

1//! Domain layer for observability core - Pure business logic
2//!
3//! This module contains ONLY pure business logic with no external dependencies.
4//! No WASM, HTTP, or framework-specific code should be here.
5
6use crate::error::{ObservabilityError, ObservabilityResult};
7use crate::traits::LogLevel;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11
12/// Core log entry - pure data structure
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct LogEntry {
15    pub timestamp: DateTime<Utc>,
16    pub level: LogLevel,
17    pub message: String,
18    pub fields: serde_json::Value,
19    pub trace_context: Option<TraceContext>,
20    pub source: LogSource,
21}
22
23/// Source of the log entry
24#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct LogSource {
26    pub module: Option<String>,
27    pub file: Option<String>,
28    pub line: Option<u32>,
29    pub target: Option<String>,
30}
31
32/// Serializable trace identifiers for log/metric correlation.
33///
34/// Distinct from [`crate::context::TraceContext`] which is the runtime
35/// tracing context with sampling decisions and W3C conversion methods.
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct TraceCorrelation {
38    pub trace_id: String,
39    pub span_id: String,
40    pub parent_span_id: Option<String>,
41}
42
43/// Backwards-compatible alias.
44pub type TraceContext = TraceCorrelation;
45
46/// Core processor interface - transforms log entries
47pub trait LogProcessor: Send + Sync + std::fmt::Debug {
48    /// Process and transform a log entry
49    fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry>;
50
51    /// Get processor name for debugging
52    fn name(&self) -> &'static str;
53}
54
55/// Processor chain pattern (inspired by structlog)
56#[derive(Debug)]
57pub struct ProcessorChain {
58    processors: Vec<Box<dyn LogProcessor>>,
59}
60
61impl ProcessorChain {
62    /// Create a new empty processor chain
63    pub fn new() -> Self {
64        Self {
65            processors: Vec::new(),
66        }
67    }
68
69    /// Add a processor to the chain
70    pub fn add_processor(mut self, processor: Box<dyn LogProcessor>) -> Self {
71        self.processors.push(processor);
72        self
73    }
74
75    /// Process an entry through the entire chain
76    pub fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
77        let mut processed = entry;
78
79        for processor in &self.processors {
80            processed = processor.process(processed).map_err(|e| {
81                ObservabilityError::logging(format!(
82                    "Processor '{}' failed: {}",
83                    processor.name(),
84                    e
85                ))
86            })?;
87        }
88
89        Ok(processed)
90    }
91
92    /// Get number of processors in the chain
93    pub fn len(&self) -> usize {
94        self.processors.len()
95    }
96
97    /// Check if chain is empty
98    pub fn is_empty(&self) -> bool {
99        self.processors.is_empty()
100    }
101}
102
103impl Default for ProcessorChain {
104    fn default() -> Self {
105        Self::new()
106    }
107}
108
109// ==================== BUILT-IN PROCESSORS ====================
110
111/// Processor that adds timestamps
112#[derive(Debug)]
113pub struct TimestampProcessor;
114
115impl LogProcessor for TimestampProcessor {
116    fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
117        entry.timestamp = Utc::now();
118        Ok(entry)
119    }
120
121    fn name(&self) -> &'static str {
122        "timestamp"
123    }
124}
125
126/// Processor that enriches with context
127#[derive(Debug)]
128pub struct ContextEnricher {
129    additional_fields: HashMap<String, serde_json::Value>,
130}
131
132impl ContextEnricher {
133    pub fn new() -> Self {
134        Self {
135            additional_fields: HashMap::new(),
136        }
137    }
138
139    pub fn with_field(
140        mut self,
141        key: impl Into<String>,
142        value: impl Into<serde_json::Value>,
143    ) -> Self {
144        self.additional_fields.insert(key.into(), value.into());
145        self
146    }
147}
148
149impl Default for ContextEnricher {
150    fn default() -> Self {
151        Self::new()
152    }
153}
154
155impl LogProcessor for ContextEnricher {
156    fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
157        // Add additional context fields
158        if let serde_json::Value::Object(ref mut map) = entry.fields {
159            for (key, value) in &self.additional_fields {
160                if !map.contains_key(key) {
161                    map.insert(key.clone(), value.clone());
162                }
163            }
164        }
165
166        Ok(entry)
167    }
168
169    fn name(&self) -> &'static str {
170        "context_enricher"
171    }
172}
173
174/// Processor that structures fields
175#[derive(Debug)]
176pub struct StructuredFieldsProcessor;
177
178impl LogProcessor for StructuredFieldsProcessor {
179    fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
180        // Ensure fields is always an object
181        if !entry.fields.is_object() {
182            entry.fields = serde_json::json!({});
183        }
184
185        // Add standard fields
186        if let serde_json::Value::Object(ref mut map) = entry.fields {
187            map.insert(
188                "timestamp".to_string(),
189                serde_json::json!(entry.timestamp.to_rfc3339()),
190            );
191            map.insert("level".to_string(), serde_json::json!(entry.level.as_str()));
192            map.insert("message".to_string(), serde_json::json!(entry.message));
193
194            // Add source information if available
195            if let Some(ref module) = entry.source.module {
196                map.insert("module".to_string(), serde_json::json!(module));
197            }
198            if let Some(ref file) = entry.source.file {
199                map.insert("file".to_string(), serde_json::json!(file));
200            }
201            if let Some(line) = entry.source.line {
202                map.insert("line".to_string(), serde_json::json!(line));
203            }
204
205            // Add trace context if available
206            if let Some(ref trace_ctx) = entry.trace_context {
207                map.insert(
208                    "trace_id".to_string(),
209                    serde_json::json!(trace_ctx.trace_id),
210                );
211                map.insert("span_id".to_string(), serde_json::json!(trace_ctx.span_id));
212                if let Some(ref parent) = trace_ctx.parent_span_id {
213                    map.insert("parent_span_id".to_string(), serde_json::json!(parent));
214                }
215            }
216        }
217
218        Ok(entry)
219    }
220
221    fn name(&self) -> &'static str {
222        "structured_fields"
223    }
224}
225
226/// Filter processor that filters out logs below certain level
227#[derive(Debug)]
228pub struct LevelFilter {
229    min_level: LogLevel,
230}
231
232impl LevelFilter {
233    pub fn new(min_level: LogLevel) -> Self {
234        Self { min_level }
235    }
236}
237
238impl LogProcessor for LevelFilter {
239    fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
240        if entry.level <= self.min_level {
241            Ok(entry)
242        } else {
243            Err(ObservabilityError::logging("Log level filtered out"))
244        }
245    }
246
247    fn name(&self) -> &'static str {
248        "level_filter"
249    }
250}
251
252/// Processor that extracts structured fields from log::kv
253///
254/// This processor supports the log::info!("msg"; "key" => value) syntax
255/// by extracting key-value pairs from log::Record and adding them to LogEntry fields
256#[derive(Debug)]
257pub struct LogKvExtractor;
258
259impl LogKvExtractor {
260    pub fn new() -> Self {
261        Self
262    }
263
264    /// Extract key-value pairs from log::Record
265    ///
266    /// This function would be called by StandardLogAdapter when processing log::Record
267    /// but we'll implement the interface here for the processor chain
268    pub fn extract_kv_from_record(record: &log::Record) -> serde_json::Value {
269        let mut fields = serde_json::Map::new();
270
271        // Extract key-value pairs using log::kv
272        let key_values = record.key_values();
273        let mut visitor = LogKvVisitor::new(&mut fields);
274        let _ = key_values.visit(&mut visitor);
275
276        serde_json::Value::Object(fields)
277    }
278}
279
280impl LogProcessor for LogKvExtractor {
281    fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
282        // In the processor chain context, we assume fields are already extracted
283        // by StandardLogAdapter, so this processor mainly ensures proper structure
284
285        // Ensure fields is always an object for consistency
286        let mut entry = entry;
287        if !entry.fields.is_object() {
288            entry.fields = serde_json::json!({});
289        }
290
291        // Add metadata about kv extraction
292        if let serde_json::Value::Object(ref mut map) = entry.fields {
293            if !map.contains_key("kv_extracted") {
294                map.insert("kv_extracted".to_string(), serde_json::json!(true));
295            }
296        }
297
298        Ok(entry)
299    }
300
301    fn name(&self) -> &'static str {
302        "log_kv_extractor"
303    }
304}
305
306impl Default for LogKvExtractor {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312/// Visitor for extracting log::kv key-value pairs
313struct LogKvVisitor<'a> {
314    fields: &'a mut serde_json::Map<String, serde_json::Value>,
315}
316
317impl<'a> LogKvVisitor<'a> {
318    fn new(fields: &'a mut serde_json::Map<String, serde_json::Value>) -> Self {
319        Self { fields }
320    }
321}
322
323impl<'a> log::kv::Visitor<'a> for LogKvVisitor<'a> {
324    fn visit_pair(
325        &mut self,
326        key: log::kv::Key,
327        value: log::kv::Value,
328    ) -> Result<(), log::kv::Error> {
329        let key_str = key.as_str();
330
331        // Convert log::kv::Value to serde_json::Value
332        let json_value = match value.to_borrowed_str() {
333            Some(s) => serde_json::json!(s),
334            None => {
335                // Handle other value types
336                if let Some(i) = value.to_i64() {
337                    serde_json::json!(i)
338                } else if let Some(u) = value.to_u64() {
339                    serde_json::json!(u)
340                } else if let Some(f) = value.to_f64() {
341                    serde_json::json!(f)
342                } else if let Some(b) = value.to_bool() {
343                    serde_json::json!(b)
344                } else {
345                    // Fallback to debug representation
346                    serde_json::json!(format!("{:?}", value))
347                }
348            }
349        };
350
351        self.fields.insert(key_str.to_string(), json_value);
352        Ok(())
353    }
354}
355
356/// Enhanced processor that combines context enrichment with kv extraction
357#[derive(Debug)]
358pub struct EnhancedContextEnricher {
359    additional_fields: HashMap<String, serde_json::Value>,
360    extract_kv: bool,
361}
362
363impl EnhancedContextEnricher {
364    pub fn new() -> Self {
365        Self {
366            additional_fields: HashMap::new(),
367            extract_kv: true,
368        }
369    }
370
371    pub fn with_field(
372        mut self,
373        key: impl Into<String>,
374        value: impl Into<serde_json::Value>,
375    ) -> Self {
376        self.additional_fields.insert(key.into(), value.into());
377        self
378    }
379
380    pub fn with_kv_extraction(mut self, extract_kv: bool) -> Self {
381        self.extract_kv = extract_kv;
382        self
383    }
384}
385
386impl LogProcessor for EnhancedContextEnricher {
387    fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
388        // Add additional context fields (same as ContextEnricher)
389        if let serde_json::Value::Object(ref mut map) = entry.fields {
390            for (key, value) in &self.additional_fields {
391                if !map.contains_key(key) {
392                    map.insert(key.clone(), value.clone());
393                }
394            }
395
396            // Add extraction metadata
397            if self.extract_kv {
398                map.insert("enhanced_context".to_string(), serde_json::json!(true));
399            }
400        }
401
402        Ok(entry)
403    }
404
405    fn name(&self) -> &'static str {
406        "enhanced_context_enricher"
407    }
408}
409
410impl Default for EnhancedContextEnricher {
411    fn default() -> Self {
412        Self::new()
413    }
414}
415
416// ==================== UTILITY FUNCTIONS ====================
417
418/// Build a default processor chain with common processors
419pub fn build_default_processor_chain() -> ProcessorChain {
420    ProcessorChain::new()
421        .add_processor(Box::new(TimestampProcessor))
422        .add_processor(Box::new(LogKvExtractor::new()))
423        .add_processor(Box::new(EnhancedContextEnricher::new()))
424        .add_processor(Box::new(StructuredFieldsProcessor))
425}
426
427/// Build an enhanced processor chain with additional features
428pub fn build_enhanced_processor_chain() -> ProcessorChain {
429    ProcessorChain::new()
430        .add_processor(Box::new(TimestampProcessor))
431        .add_processor(Box::new(LogKvExtractor::new()))
432        .add_processor(Box::new(
433            EnhancedContextEnricher::new()
434                .with_field("sdk_version", env!("CARGO_PKG_VERSION"))
435                .with_field("architecture", "hexagonal"),
436        ))
437        .add_processor(Box::new(StructuredFieldsProcessor))
438}
439
440/// Create a log entry from basic components
441pub fn create_log_entry(
442    level: LogLevel,
443    message: impl Into<String>,
444    fields: serde_json::Value,
445) -> LogEntry {
446    LogEntry {
447        timestamp: Utc::now(),
448        level,
449        message: message.into(),
450        fields,
451        trace_context: None,
452        source: LogSource {
453            module: None,
454            file: None,
455            line: None,
456            target: None,
457        },
458    }
459}
460
461/// Basic metric entry for correlation with logs
462#[derive(Debug, Clone)]
463pub struct MetricsEntry {
464    pub name: String,
465    pub value: f64,
466    pub metric_type: BasicMetricType,
467    pub timestamp: DateTime<Utc>,
468    pub trace_context: Option<TraceContext>,
469    pub source: MetricsSource,
470}
471
472/// Simple metric types supported by core
473#[derive(Debug, Clone, PartialEq)]
474pub enum BasicMetricType {
475    /// Counter metric (monotonically increasing)
476    Counter,
477    /// Histogram/timing metric (distribution of values)
478    Histogram,
479    /// Gauge metric (current value)
480    Gauge,
481}
482
483/// Source of the metric
484#[derive(Debug, Clone)]
485pub struct MetricsSource {
486    pub module: Option<String>,
487    pub component: Option<String>,
488    pub operation: Option<String>,
489}
490
491impl MetricsEntry {
492    /// Create a new metrics entry
493    pub fn new(name: impl Into<String>, value: f64, metric_type: BasicMetricType) -> Self {
494        Self {
495            name: name.into(),
496            value,
497            metric_type,
498            timestamp: Utc::now(),
499            trace_context: None,
500            source: MetricsSource {
501                module: None,
502                component: None,
503                operation: None,
504            },
505        }
506    }
507
508    /// Add trace context for correlation
509    pub fn with_trace_context(mut self, trace_context: TraceContext) -> Self {
510        self.trace_context = Some(trace_context);
511        self
512    }
513
514    /// Add source information
515    pub fn with_source(
516        mut self,
517        module: Option<String>,
518        component: Option<String>,
519        operation: Option<String>,
520    ) -> Self {
521        self.source = MetricsSource {
522            module,
523            component,
524            operation,
525        };
526        self
527    }
528
529    /// Convert to JSON for transport
530    pub fn to_json(&self) -> serde_json::Value {
531        let mut json = serde_json::json!({
532            "name": self.name,
533            "value": self.value,
534            "type": match self.metric_type {
535                BasicMetricType::Counter => "counter",
536                BasicMetricType::Histogram => "histogram",
537                BasicMetricType::Gauge => "gauge",
538            },
539            "timestamp": self.timestamp.to_rfc3339(),
540        });
541
542        // Add trace context if present
543        if let Some(ref trace_ctx) = self.trace_context {
544            json["trace_id"] = serde_json::json!(trace_ctx.trace_id);
545            json["span_id"] = serde_json::json!(trace_ctx.span_id);
546            if let Some(ref parent) = trace_ctx.parent_span_id {
547                json["parent_span_id"] = serde_json::json!(parent);
548            }
549        }
550
551        // Add source info if present
552        if let Some(ref module) = self.source.module {
553            json["module"] = serde_json::json!(module);
554        }
555        if let Some(ref component) = self.source.component {
556            json["component"] = serde_json::json!(component);
557        }
558        if let Some(ref operation) = self.source.operation {
559            json["operation"] = serde_json::json!(operation);
560        }
561
562        json
563    }
564}
565
566/// Convenience function to create a counter metric
567pub fn create_counter_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
568    MetricsEntry::new(name, value, BasicMetricType::Counter)
569}
570
571/// Convenience function to create a histogram metric
572pub fn create_histogram_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
573    MetricsEntry::new(name, value, BasicMetricType::Histogram)
574}
575
576/// Convenience function to create a gauge metric
577pub fn create_gauge_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
578    MetricsEntry::new(name, value, BasicMetricType::Gauge)
579}
580
581#[cfg(test)]
582mod tests {
583    use super::*;
584
585    #[test]
586    fn test_processor_chain() {
587        let chain = ProcessorChain::new()
588            .add_processor(Box::new(TimestampProcessor))
589            .add_processor(Box::new(StructuredFieldsProcessor));
590
591        let entry = create_log_entry(
592            LogLevel::Info,
593            "Test message",
594            serde_json::json!({"key": "value"}),
595        );
596
597        let processed = chain.process(entry).unwrap();
598
599        assert_eq!(processed.level, LogLevel::Info);
600        assert_eq!(processed.message, "Test message");
601        assert!(processed.fields.get("timestamp").is_some());
602        assert!(processed.fields.get("level").is_some());
603    }
604
605    #[test]
606    fn test_context_enricher() {
607        let enricher = ContextEnricher::new()
608            .with_field("service", "test-service")
609            .with_field("version", "1.0.0");
610
611        let entry = create_log_entry(
612            LogLevel::Info,
613            "Test",
614            serde_json::json!({"existing": "field"}),
615        );
616
617        let processed = enricher.process(entry).unwrap();
618
619        let fields = processed.fields.as_object().unwrap();
620        assert_eq!(fields.get("service").unwrap(), "test-service");
621        assert_eq!(fields.get("version").unwrap(), "1.0.0");
622        assert_eq!(fields.get("existing").unwrap(), "field");
623    }
624
625    #[test]
626    fn test_level_filter() {
627        let filter = LevelFilter::new(LogLevel::Info);
628
629        let info_entry = create_log_entry(LogLevel::Info, "Info message", serde_json::json!({}));
630        let debug_entry = create_log_entry(LogLevel::Debug, "Debug message", serde_json::json!({}));
631
632        assert!(filter.process(info_entry).is_ok());
633        assert!(filter.process(debug_entry).is_err());
634    }
635}