use crate::domain::LogEntry;
use crate::error::ObservabilityResult;
use std::collections::HashMap;
pub trait TransportPort: Send + Sync {
fn transport(&self, entry: &LogEntry) -> ObservabilityResult<()>;
fn transport_batch(&self, entries: &[LogEntry]) -> ObservabilityResult<()> {
for entry in entries {
self.transport(entry)?;
}
Ok(())
}
}
pub trait FormatterPort: Send + Sync {
fn format(&self, entry: &LogEntry) -> ObservabilityResult<String>;
}
pub trait StandardLoggingPort: Send + Sync {
fn initialize(&self) -> ObservabilityResult<()>;
fn process_standard_log(&self, entry: LogEntry) -> ObservabilityResult<()>;
fn enabled(&self, level: &crate::traits::LogLevel) -> bool;
}
pub trait ContextPort: Send + Sync {
fn get_context(&self) -> HashMap<String, serde_json::Value>;
fn add_context(&self, key: String, value: serde_json::Value);
fn remove_context(&self, key: &str);
fn clear_context(&self);
}
pub trait BatchingPort: Send + Sync {
fn add_to_batch(&self, entry: LogEntry) -> ObservabilityResult<()>;
fn flush_batch(&self) -> ObservabilityResult<()>;
fn batch_size(&self) -> usize;
}
pub trait MetricsPort: Send + Sync {
fn emit_counter_simple(&self, name: &str, value: f64) -> ObservabilityResult<()>;
fn emit_histogram_simple(&self, name: &str, value: f64) -> ObservabilityResult<()>;
fn emit_gauge_simple(&self, name: &str, value: f64) -> ObservabilityResult<()>;
fn is_enabled(&self) -> bool;
fn emit_metrics_batch(
&self,
entries: &[crate::domain::MetricsEntry],
) -> ObservabilityResult<()> {
for entry in entries {
match entry.metric_type {
crate::domain::BasicMetricType::Counter => {
self.emit_counter_simple(&entry.name, entry.value)?;
}
crate::domain::BasicMetricType::Histogram => {
self.emit_histogram_simple(&entry.name, entry.value)?;
}
crate::domain::BasicMetricType::Gauge => {
self.emit_gauge_simple(&entry.name, entry.value)?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::domain::{BasicMetricType, LogEntry, MetricsEntry};
use crate::error::ObservabilityError;
use crate::traits::LogLevel;
use std::sync::Mutex;
fn dummy_entry() -> LogEntry {
crate::domain::create_log_entry(LogLevel::Info, "test", serde_json::json!({"key": "value"}))
}
struct RecordingTransport {
entries: Mutex<Vec<String>>,
}
impl RecordingTransport {
fn new() -> Self {
Self {
entries: Mutex::new(Vec::new()),
}
}
}
impl TransportPort for RecordingTransport {
fn transport(&self, entry: &LogEntry) -> ObservabilityResult<()> {
self.entries.lock().unwrap().push(entry.message.clone());
Ok(())
}
}
#[test]
fn test_transport_batch_default_calls_transport_per_entry() {
let transport = RecordingTransport::new();
let entries: Vec<LogEntry> = (0..3).map(|_| dummy_entry()).collect();
transport.transport_batch(&entries).unwrap();
assert_eq!(transport.entries.lock().unwrap().len(), 3);
}
#[test]
fn test_transport_batch_empty_is_ok() {
let transport = RecordingTransport::new();
transport.transport_batch(&[]).unwrap();
assert!(transport.entries.lock().unwrap().is_empty());
}
struct RecordingMetrics {
counters: Mutex<Vec<(String, f64)>>,
histograms: Mutex<Vec<(String, f64)>>,
gauges: Mutex<Vec<(String, f64)>>,
}
impl RecordingMetrics {
fn new() -> Self {
Self {
counters: Mutex::new(Vec::new()),
histograms: Mutex::new(Vec::new()),
gauges: Mutex::new(Vec::new()),
}
}
}
impl MetricsPort for RecordingMetrics {
fn emit_counter_simple(&self, name: &str, value: f64) -> ObservabilityResult<()> {
self.counters
.lock()
.unwrap()
.push((name.to_string(), value));
Ok(())
}
fn emit_histogram_simple(&self, name: &str, value: f64) -> ObservabilityResult<()> {
self.histograms
.lock()
.unwrap()
.push((name.to_string(), value));
Ok(())
}
fn emit_gauge_simple(&self, name: &str, value: f64) -> ObservabilityResult<()> {
self.gauges.lock().unwrap().push((name.to_string(), value));
Ok(())
}
fn is_enabled(&self) -> bool {
true
}
}
#[test]
fn test_metrics_batch_routes_by_metric_type() {
let metrics = RecordingMetrics::new();
let entries = vec![
MetricsEntry::new("req_total", 1.0, BasicMetricType::Counter),
MetricsEntry::new("latency_ms", 42.0, BasicMetricType::Histogram),
MetricsEntry::new("queue_depth", 7.0, BasicMetricType::Gauge),
];
metrics.emit_metrics_batch(&entries).unwrap();
assert_eq!(metrics.counters.lock().unwrap().len(), 1);
assert_eq!(metrics.histograms.lock().unwrap().len(), 1);
assert_eq!(metrics.gauges.lock().unwrap().len(), 1);
assert_eq!(metrics.counters.lock().unwrap()[0].0, "req_total");
assert_eq!(metrics.histograms.lock().unwrap()[0].1, 42.0);
assert_eq!(metrics.gauges.lock().unwrap()[0].1, 7.0);
}
#[test]
fn test_metrics_batch_empty_is_ok() {
let metrics = RecordingMetrics::new();
metrics.emit_metrics_batch(&[]).unwrap();
}
struct InMemoryContext {
fields: Mutex<HashMap<String, serde_json::Value>>,
}
impl InMemoryContext {
fn new() -> Self {
Self {
fields: Mutex::new(HashMap::new()),
}
}
}
impl ContextPort for InMemoryContext {
fn get_context(&self) -> HashMap<String, serde_json::Value> {
self.fields.lock().unwrap().clone()
}
fn add_context(&self, key: String, value: serde_json::Value) {
self.fields.lock().unwrap().insert(key, value);
}
fn remove_context(&self, key: &str) {
self.fields.lock().unwrap().remove(key);
}
fn clear_context(&self) {
self.fields.lock().unwrap().clear();
}
}
#[test]
fn test_context_port_add_get_remove_clear() {
let ctx = InMemoryContext::new();
assert!(ctx.get_context().is_empty());
ctx.add_context("agent".into(), serde_json::json!("echo"));
assert_eq!(ctx.get_context().len(), 1);
assert_eq!(ctx.get_context()["agent"], serde_json::json!("echo"));
ctx.add_context("version".into(), serde_json::json!("1.0"));
assert_eq!(ctx.get_context().len(), 2);
ctx.remove_context("agent");
assert_eq!(ctx.get_context().len(), 1);
assert!(!ctx.get_context().contains_key("agent"));
ctx.clear_context();
assert!(ctx.get_context().is_empty());
}
struct JsonFmt;
impl FormatterPort for JsonFmt {
fn format(&self, entry: &LogEntry) -> ObservabilityResult<String> {
serde_json::to_string(entry)
.map_err(|e| ObservabilityError::serialization(e.to_string()))
}
}
#[test]
fn test_formatter_port_produces_valid_json() {
let fmt = JsonFmt;
let entry = dummy_entry();
let output = fmt.format(&entry).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
assert_eq!(parsed["message"], "test");
}
struct InMemoryBatcher {
batch: Mutex<Vec<LogEntry>>,
}
impl InMemoryBatcher {
fn new() -> Self {
Self {
batch: Mutex::new(Vec::new()),
}
}
}
impl BatchingPort for InMemoryBatcher {
fn add_to_batch(&self, entry: LogEntry) -> ObservabilityResult<()> {
self.batch.lock().unwrap().push(entry);
Ok(())
}
fn flush_batch(&self) -> ObservabilityResult<()> {
self.batch.lock().unwrap().clear();
Ok(())
}
fn batch_size(&self) -> usize {
self.batch.lock().unwrap().len()
}
}
#[test]
fn test_batching_port_add_size_flush() {
let batcher = InMemoryBatcher::new();
assert_eq!(batcher.batch_size(), 0);
batcher.add_to_batch(dummy_entry()).unwrap();
batcher.add_to_batch(dummy_entry()).unwrap();
assert_eq!(batcher.batch_size(), 2);
batcher.flush_batch().unwrap();
assert_eq!(batcher.batch_size(), 0);
}
struct DummyLogging;
impl StandardLoggingPort for DummyLogging {
fn initialize(&self) -> ObservabilityResult<()> {
Ok(())
}
fn process_standard_log(&self, _entry: LogEntry) -> ObservabilityResult<()> {
Ok(())
}
fn enabled(&self, _level: &LogLevel) -> bool {
true
}
}
#[test]
fn test_standard_logging_port_initialize_and_enabled() {
let logging = DummyLogging;
logging.initialize().unwrap();
assert!(logging.enabled(&LogLevel::Info));
assert!(logging.enabled(&LogLevel::Error));
logging.process_standard_log(dummy_entry()).unwrap();
}
}