use crate::error::{ObservabilityError, ObservabilityResult};
use crate::traits::LogLevel;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub level: LogLevel,
pub message: String,
pub fields: serde_json::Value,
pub trace_context: Option<TraceContext>,
pub source: LogSource,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogSource {
pub module: Option<String>,
pub file: Option<String>,
pub line: Option<u32>,
pub target: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TraceCorrelation {
pub trace_id: String,
pub span_id: String,
pub parent_span_id: Option<String>,
}
pub type TraceContext = TraceCorrelation;
pub trait LogProcessor: Send + Sync + std::fmt::Debug {
fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry>;
fn name(&self) -> &'static str;
}
#[derive(Debug)]
pub struct ProcessorChain {
processors: Vec<Box<dyn LogProcessor>>,
}
impl ProcessorChain {
pub fn new() -> Self {
Self {
processors: Vec::new(),
}
}
pub fn add_processor(mut self, processor: Box<dyn LogProcessor>) -> Self {
self.processors.push(processor);
self
}
pub fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
let mut processed = entry;
for processor in &self.processors {
processed = processor.process(processed).map_err(|e| {
ObservabilityError::logging(format!(
"Processor '{}' failed: {}",
processor.name(),
e
))
})?;
}
Ok(processed)
}
pub fn len(&self) -> usize {
self.processors.len()
}
pub fn is_empty(&self) -> bool {
self.processors.is_empty()
}
}
impl Default for ProcessorChain {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct TimestampProcessor;
impl LogProcessor for TimestampProcessor {
fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
entry.timestamp = Utc::now();
Ok(entry)
}
fn name(&self) -> &'static str {
"timestamp"
}
}
#[derive(Debug)]
pub struct ContextEnricher {
additional_fields: HashMap<String, serde_json::Value>,
}
impl ContextEnricher {
pub fn new() -> Self {
Self {
additional_fields: HashMap::new(),
}
}
pub fn with_field(
mut self,
key: impl Into<String>,
value: impl Into<serde_json::Value>,
) -> Self {
self.additional_fields.insert(key.into(), value.into());
self
}
}
impl Default for ContextEnricher {
fn default() -> Self {
Self::new()
}
}
impl LogProcessor for ContextEnricher {
fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
if let serde_json::Value::Object(ref mut map) = entry.fields {
for (key, value) in &self.additional_fields {
if !map.contains_key(key) {
map.insert(key.clone(), value.clone());
}
}
}
Ok(entry)
}
fn name(&self) -> &'static str {
"context_enricher"
}
}
#[derive(Debug)]
pub struct StructuredFieldsProcessor;
impl LogProcessor for StructuredFieldsProcessor {
fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
if !entry.fields.is_object() {
entry.fields = serde_json::json!({});
}
if let serde_json::Value::Object(ref mut map) = entry.fields {
map.insert(
"timestamp".to_string(),
serde_json::json!(entry.timestamp.to_rfc3339()),
);
map.insert("level".to_string(), serde_json::json!(entry.level.as_str()));
map.insert("message".to_string(), serde_json::json!(entry.message));
if let Some(ref module) = entry.source.module {
map.insert("module".to_string(), serde_json::json!(module));
}
if let Some(ref file) = entry.source.file {
map.insert("file".to_string(), serde_json::json!(file));
}
if let Some(line) = entry.source.line {
map.insert("line".to_string(), serde_json::json!(line));
}
if let Some(ref trace_ctx) = entry.trace_context {
map.insert(
"trace_id".to_string(),
serde_json::json!(trace_ctx.trace_id),
);
map.insert("span_id".to_string(), serde_json::json!(trace_ctx.span_id));
if let Some(ref parent) = trace_ctx.parent_span_id {
map.insert("parent_span_id".to_string(), serde_json::json!(parent));
}
}
}
Ok(entry)
}
fn name(&self) -> &'static str {
"structured_fields"
}
}
#[derive(Debug)]
pub struct LevelFilter {
min_level: LogLevel,
}
impl LevelFilter {
pub fn new(min_level: LogLevel) -> Self {
Self { min_level }
}
}
impl LogProcessor for LevelFilter {
fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
if entry.level <= self.min_level {
Ok(entry)
} else {
Err(ObservabilityError::logging("Log level filtered out"))
}
}
fn name(&self) -> &'static str {
"level_filter"
}
}
#[derive(Debug)]
pub struct LogKvExtractor;
impl LogKvExtractor {
pub fn new() -> Self {
Self
}
pub fn extract_kv_from_record(record: &log::Record) -> serde_json::Value {
let mut fields = serde_json::Map::new();
let key_values = record.key_values();
let mut visitor = LogKvVisitor::new(&mut fields);
let _ = key_values.visit(&mut visitor);
serde_json::Value::Object(fields)
}
}
impl LogProcessor for LogKvExtractor {
fn process(&self, entry: LogEntry) -> ObservabilityResult<LogEntry> {
let mut entry = entry;
if !entry.fields.is_object() {
entry.fields = serde_json::json!({});
}
if let serde_json::Value::Object(ref mut map) = entry.fields {
if !map.contains_key("kv_extracted") {
map.insert("kv_extracted".to_string(), serde_json::json!(true));
}
}
Ok(entry)
}
fn name(&self) -> &'static str {
"log_kv_extractor"
}
}
impl Default for LogKvExtractor {
fn default() -> Self {
Self::new()
}
}
struct LogKvVisitor<'a> {
fields: &'a mut serde_json::Map<String, serde_json::Value>,
}
impl<'a> LogKvVisitor<'a> {
fn new(fields: &'a mut serde_json::Map<String, serde_json::Value>) -> Self {
Self { fields }
}
}
impl<'a> log::kv::Visitor<'a> for LogKvVisitor<'a> {
fn visit_pair(
&mut self,
key: log::kv::Key,
value: log::kv::Value,
) -> Result<(), log::kv::Error> {
let key_str = key.as_str();
let json_value = match value.to_borrowed_str() {
Some(s) => serde_json::json!(s),
None => {
if let Some(i) = value.to_i64() {
serde_json::json!(i)
} else if let Some(u) = value.to_u64() {
serde_json::json!(u)
} else if let Some(f) = value.to_f64() {
serde_json::json!(f)
} else if let Some(b) = value.to_bool() {
serde_json::json!(b)
} else {
serde_json::json!(format!("{:?}", value))
}
}
};
self.fields.insert(key_str.to_string(), json_value);
Ok(())
}
}
#[derive(Debug)]
pub struct EnhancedContextEnricher {
additional_fields: HashMap<String, serde_json::Value>,
extract_kv: bool,
}
impl EnhancedContextEnricher {
pub fn new() -> Self {
Self {
additional_fields: HashMap::new(),
extract_kv: true,
}
}
pub fn with_field(
mut self,
key: impl Into<String>,
value: impl Into<serde_json::Value>,
) -> Self {
self.additional_fields.insert(key.into(), value.into());
self
}
pub fn with_kv_extraction(mut self, extract_kv: bool) -> Self {
self.extract_kv = extract_kv;
self
}
}
impl LogProcessor for EnhancedContextEnricher {
fn process(&self, mut entry: LogEntry) -> ObservabilityResult<LogEntry> {
if let serde_json::Value::Object(ref mut map) = entry.fields {
for (key, value) in &self.additional_fields {
if !map.contains_key(key) {
map.insert(key.clone(), value.clone());
}
}
if self.extract_kv {
map.insert("enhanced_context".to_string(), serde_json::json!(true));
}
}
Ok(entry)
}
fn name(&self) -> &'static str {
"enhanced_context_enricher"
}
}
impl Default for EnhancedContextEnricher {
fn default() -> Self {
Self::new()
}
}
pub fn build_default_processor_chain() -> ProcessorChain {
ProcessorChain::new()
.add_processor(Box::new(TimestampProcessor))
.add_processor(Box::new(LogKvExtractor::new()))
.add_processor(Box::new(EnhancedContextEnricher::new()))
.add_processor(Box::new(StructuredFieldsProcessor))
}
pub fn build_enhanced_processor_chain() -> ProcessorChain {
ProcessorChain::new()
.add_processor(Box::new(TimestampProcessor))
.add_processor(Box::new(LogKvExtractor::new()))
.add_processor(Box::new(
EnhancedContextEnricher::new()
.with_field("sdk_version", env!("CARGO_PKG_VERSION"))
.with_field("architecture", "hexagonal"),
))
.add_processor(Box::new(StructuredFieldsProcessor))
}
pub fn create_log_entry(
level: LogLevel,
message: impl Into<String>,
fields: serde_json::Value,
) -> LogEntry {
LogEntry {
timestamp: Utc::now(),
level,
message: message.into(),
fields,
trace_context: None,
source: LogSource {
module: None,
file: None,
line: None,
target: None,
},
}
}
#[derive(Debug, Clone)]
pub struct MetricsEntry {
pub name: String,
pub value: f64,
pub metric_type: BasicMetricType,
pub timestamp: DateTime<Utc>,
pub trace_context: Option<TraceContext>,
pub source: MetricsSource,
}
#[derive(Debug, Clone, PartialEq)]
pub enum BasicMetricType {
Counter,
Histogram,
Gauge,
}
#[derive(Debug, Clone)]
pub struct MetricsSource {
pub module: Option<String>,
pub component: Option<String>,
pub operation: Option<String>,
}
impl MetricsEntry {
pub fn new(name: impl Into<String>, value: f64, metric_type: BasicMetricType) -> Self {
Self {
name: name.into(),
value,
metric_type,
timestamp: Utc::now(),
trace_context: None,
source: MetricsSource {
module: None,
component: None,
operation: None,
},
}
}
pub fn with_trace_context(mut self, trace_context: TraceContext) -> Self {
self.trace_context = Some(trace_context);
self
}
pub fn with_source(
mut self,
module: Option<String>,
component: Option<String>,
operation: Option<String>,
) -> Self {
self.source = MetricsSource {
module,
component,
operation,
};
self
}
pub fn to_json(&self) -> serde_json::Value {
let mut json = serde_json::json!({
"name": self.name,
"value": self.value,
"type": match self.metric_type {
BasicMetricType::Counter => "counter",
BasicMetricType::Histogram => "histogram",
BasicMetricType::Gauge => "gauge",
},
"timestamp": self.timestamp.to_rfc3339(),
});
if let Some(ref trace_ctx) = self.trace_context {
json["trace_id"] = serde_json::json!(trace_ctx.trace_id);
json["span_id"] = serde_json::json!(trace_ctx.span_id);
if let Some(ref parent) = trace_ctx.parent_span_id {
json["parent_span_id"] = serde_json::json!(parent);
}
}
if let Some(ref module) = self.source.module {
json["module"] = serde_json::json!(module);
}
if let Some(ref component) = self.source.component {
json["component"] = serde_json::json!(component);
}
if let Some(ref operation) = self.source.operation {
json["operation"] = serde_json::json!(operation);
}
json
}
}
pub fn create_counter_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
MetricsEntry::new(name, value, BasicMetricType::Counter)
}
pub fn create_histogram_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
MetricsEntry::new(name, value, BasicMetricType::Histogram)
}
pub fn create_gauge_metric(name: impl Into<String>, value: f64) -> MetricsEntry {
MetricsEntry::new(name, value, BasicMetricType::Gauge)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_processor_chain() {
let chain = ProcessorChain::new()
.add_processor(Box::new(TimestampProcessor))
.add_processor(Box::new(StructuredFieldsProcessor));
let entry = create_log_entry(
LogLevel::Info,
"Test message",
serde_json::json!({"key": "value"}),
);
let processed = chain.process(entry).unwrap();
assert_eq!(processed.level, LogLevel::Info);
assert_eq!(processed.message, "Test message");
assert!(processed.fields.get("timestamp").is_some());
assert!(processed.fields.get("level").is_some());
}
#[test]
fn test_context_enricher() {
let enricher = ContextEnricher::new()
.with_field("service", "test-service")
.with_field("version", "1.0.0");
let entry = create_log_entry(
LogLevel::Info,
"Test",
serde_json::json!({"existing": "field"}),
);
let processed = enricher.process(entry).unwrap();
let fields = processed.fields.as_object().unwrap();
assert_eq!(fields.get("service").unwrap(), "test-service");
assert_eq!(fields.get("version").unwrap(), "1.0.0");
assert_eq!(fields.get("existing").unwrap(), "field");
}
#[test]
fn test_level_filter() {
let filter = LevelFilter::new(LogLevel::Info);
let info_entry = create_log_entry(LogLevel::Info, "Info message", serde_json::json!({}));
let debug_entry = create_log_entry(LogLevel::Debug, "Debug message", serde_json::json!({}));
assert!(filter.process(info_entry).is_ok());
assert!(filter.process(debug_entry).is_err());
}
}