Skip to main content

observability_core/
ports.rs

1//! Ports layer - Abstract interfaces for hexagonal architecture
2//!
3//! This module defines the ports (interfaces) that the core domain uses
4//! to interact with the external world. Adapters implement these ports.
5
6use crate::domain::LogEntry;
7use crate::error::ObservabilityResult;
8use std::collections::HashMap;
9
10/// Port for transporting log entries to external systems
11///
12/// This is the interface that different adapters implement:
13/// - WasmStdoutAdapter (writes to WASM stdout)
14/// - HttpTransportAdapter (sends via HTTP)
15/// - BatchingTransportAdapter (batches entries)
16pub trait TransportPort: Send + Sync {
17    /// Transport a single log entry
18    fn transport(&self, entry: &LogEntry) -> ObservabilityResult<()>;
19
20    /// Transport multiple log entries (for batching)
21    fn transport_batch(&self, entries: &[LogEntry]) -> ObservabilityResult<()> {
22        // Default implementation - transport one by one
23        for entry in entries {
24            self.transport(entry)?;
25        }
26        Ok(())
27    }
28}
29
30/// Port for formatting log entries
31///
32/// Different formatters can be plugged in:
33/// - JsonFormatter (structured JSON output)
34/// - PlainTextFormatter (human-readable text)
35/// - CompactFormatter (single-line JSON)
36pub trait FormatterPort: Send + Sync {
37    /// Format a log entry into a string
38    fn format(&self, entry: &LogEntry) -> ObservabilityResult<String>;
39}
40
41/// Port for standard logging integration
42///
43/// This port allows us to hook into standard Rust logging infrastructure:
44/// - log::Log implementation
45/// - tracing::Subscriber implementation
46pub trait StandardLoggingPort: Send + Sync {
47    /// Initialize the logging system (called once during agent startup)
48    fn initialize(&self) -> ObservabilityResult<()>;
49
50    /// Process a log entry from standard logging macros
51    fn process_standard_log(&self, entry: LogEntry) -> ObservabilityResult<()>;
52
53    /// Check if logging is enabled for this level
54    fn enabled(&self, level: &crate::traits::LogLevel) -> bool;
55}
56
57/// Port for accessing context information
58///
59/// This allows the logging system to enrich entries with context:
60/// - Agent ID, request ID, trace ID
61/// - User-defined context fields
62/// - Thread/task local context
63pub trait ContextPort: Send + Sync {
64    /// Get current context fields
65    fn get_context(&self) -> HashMap<String, serde_json::Value>;
66
67    /// Add a context field
68    fn add_context(&self, key: String, value: serde_json::Value);
69
70    /// Remove a context field
71    fn remove_context(&self, key: &str);
72
73    /// Clear all context
74    fn clear_context(&self);
75}
76
77/// Port for batching log entries
78///
79/// This allows different batching strategies:
80/// - TimeBasedBatcher (flush every N seconds)
81/// - SizeBasedBatcher (flush when buffer reaches N entries)
82/// - HybridBatcher (combination of time and size)
83pub trait BatchingPort: Send + Sync {
84    /// Add an entry to the batch
85    fn add_to_batch(&self, entry: LogEntry) -> ObservabilityResult<()>;
86
87    /// Force flush the current batch
88    fn flush_batch(&self) -> ObservabilityResult<()>;
89
90    /// Get current batch size
91    fn batch_size(&self) -> usize;
92}
93
94/// Port for metrics collection (basic interface for correlation).
95pub trait MetricsPort: Send + Sync {
96    /// Emit a simple counter metric
97    fn emit_counter_simple(&self, name: &str, value: f64) -> ObservabilityResult<()>;
98
99    /// Emit a simple histogram/timing metric
100    fn emit_histogram_simple(&self, name: &str, value: f64) -> ObservabilityResult<()>;
101
102    /// Emit a simple gauge metric
103    fn emit_gauge_simple(&self, name: &str, value: f64) -> ObservabilityResult<()>;
104
105    /// Check if metrics collection is enabled
106    fn is_enabled(&self) -> bool;
107
108    /// Batch emit multiple metrics (optional, has default implementation)
109    fn emit_metrics_batch(
110        &self,
111        entries: &[crate::domain::MetricsEntry],
112    ) -> ObservabilityResult<()> {
113        for entry in entries {
114            match entry.metric_type {
115                crate::domain::BasicMetricType::Counter => {
116                    self.emit_counter_simple(&entry.name, entry.value)?;
117                }
118                crate::domain::BasicMetricType::Histogram => {
119                    self.emit_histogram_simple(&entry.name, entry.value)?;
120                }
121                crate::domain::BasicMetricType::Gauge => {
122                    self.emit_gauge_simple(&entry.name, entry.value)?;
123                }
124            }
125        }
126        Ok(())
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use crate::domain::{BasicMetricType, LogEntry, MetricsEntry};
134    use crate::error::ObservabilityError;
135    use crate::traits::LogLevel;
136    use std::sync::Mutex;
137
138    fn dummy_entry() -> LogEntry {
139        crate::domain::create_log_entry(LogLevel::Info, "test", serde_json::json!({"key": "value"}))
140    }
141
142    // --- TransportPort default batch ---
143
144    struct RecordingTransport {
145        entries: Mutex<Vec<String>>,
146    }
147
148    impl RecordingTransport {
149        fn new() -> Self {
150            Self {
151                entries: Mutex::new(Vec::new()),
152            }
153        }
154    }
155
156    impl TransportPort for RecordingTransport {
157        fn transport(&self, entry: &LogEntry) -> ObservabilityResult<()> {
158            self.entries.lock().unwrap().push(entry.message.clone());
159            Ok(())
160        }
161    }
162
163    #[test]
164    fn test_transport_batch_default_calls_transport_per_entry() {
165        let transport = RecordingTransport::new();
166        let entries: Vec<LogEntry> = (0..3).map(|_| dummy_entry()).collect();
167        transport.transport_batch(&entries).unwrap();
168        assert_eq!(transport.entries.lock().unwrap().len(), 3);
169    }
170
171    #[test]
172    fn test_transport_batch_empty_is_ok() {
173        let transport = RecordingTransport::new();
174        transport.transport_batch(&[]).unwrap();
175        assert!(transport.entries.lock().unwrap().is_empty());
176    }
177
178    // --- MetricsPort default batch ---
179
180    struct RecordingMetrics {
181        counters: Mutex<Vec<(String, f64)>>,
182        histograms: Mutex<Vec<(String, f64)>>,
183        gauges: Mutex<Vec<(String, f64)>>,
184    }
185
186    impl RecordingMetrics {
187        fn new() -> Self {
188            Self {
189                counters: Mutex::new(Vec::new()),
190                histograms: Mutex::new(Vec::new()),
191                gauges: Mutex::new(Vec::new()),
192            }
193        }
194    }
195
196    impl MetricsPort for RecordingMetrics {
197        fn emit_counter_simple(&self, name: &str, value: f64) -> ObservabilityResult<()> {
198            self.counters
199                .lock()
200                .unwrap()
201                .push((name.to_string(), value));
202            Ok(())
203        }
204
205        fn emit_histogram_simple(&self, name: &str, value: f64) -> ObservabilityResult<()> {
206            self.histograms
207                .lock()
208                .unwrap()
209                .push((name.to_string(), value));
210            Ok(())
211        }
212
213        fn emit_gauge_simple(&self, name: &str, value: f64) -> ObservabilityResult<()> {
214            self.gauges.lock().unwrap().push((name.to_string(), value));
215            Ok(())
216        }
217
218        fn is_enabled(&self) -> bool {
219            true
220        }
221    }
222
223    #[test]
224    fn test_metrics_batch_routes_by_metric_type() {
225        let metrics = RecordingMetrics::new();
226        let entries = vec![
227            MetricsEntry::new("req_total", 1.0, BasicMetricType::Counter),
228            MetricsEntry::new("latency_ms", 42.0, BasicMetricType::Histogram),
229            MetricsEntry::new("queue_depth", 7.0, BasicMetricType::Gauge),
230        ];
231
232        metrics.emit_metrics_batch(&entries).unwrap();
233
234        assert_eq!(metrics.counters.lock().unwrap().len(), 1);
235        assert_eq!(metrics.histograms.lock().unwrap().len(), 1);
236        assert_eq!(metrics.gauges.lock().unwrap().len(), 1);
237        assert_eq!(metrics.counters.lock().unwrap()[0].0, "req_total");
238        assert_eq!(metrics.histograms.lock().unwrap()[0].1, 42.0);
239        assert_eq!(metrics.gauges.lock().unwrap()[0].1, 7.0);
240    }
241
242    #[test]
243    fn test_metrics_batch_empty_is_ok() {
244        let metrics = RecordingMetrics::new();
245        metrics.emit_metrics_batch(&[]).unwrap();
246    }
247
248    // --- ContextPort ---
249
250    struct InMemoryContext {
251        fields: Mutex<HashMap<String, serde_json::Value>>,
252    }
253
254    impl InMemoryContext {
255        fn new() -> Self {
256            Self {
257                fields: Mutex::new(HashMap::new()),
258            }
259        }
260    }
261
262    impl ContextPort for InMemoryContext {
263        fn get_context(&self) -> HashMap<String, serde_json::Value> {
264            self.fields.lock().unwrap().clone()
265        }
266        fn add_context(&self, key: String, value: serde_json::Value) {
267            self.fields.lock().unwrap().insert(key, value);
268        }
269        fn remove_context(&self, key: &str) {
270            self.fields.lock().unwrap().remove(key);
271        }
272        fn clear_context(&self) {
273            self.fields.lock().unwrap().clear();
274        }
275    }
276
277    #[test]
278    fn test_context_port_add_get_remove_clear() {
279        let ctx = InMemoryContext::new();
280        assert!(ctx.get_context().is_empty());
281
282        ctx.add_context("agent".into(), serde_json::json!("echo"));
283        assert_eq!(ctx.get_context().len(), 1);
284        assert_eq!(ctx.get_context()["agent"], serde_json::json!("echo"));
285
286        ctx.add_context("version".into(), serde_json::json!("1.0"));
287        assert_eq!(ctx.get_context().len(), 2);
288
289        ctx.remove_context("agent");
290        assert_eq!(ctx.get_context().len(), 1);
291        assert!(!ctx.get_context().contains_key("agent"));
292
293        ctx.clear_context();
294        assert!(ctx.get_context().is_empty());
295    }
296
297    // --- FormatterPort ---
298
299    struct JsonFmt;
300
301    impl FormatterPort for JsonFmt {
302        fn format(&self, entry: &LogEntry) -> ObservabilityResult<String> {
303            serde_json::to_string(entry)
304                .map_err(|e| ObservabilityError::serialization(e.to_string()))
305        }
306    }
307
308    #[test]
309    fn test_formatter_port_produces_valid_json() {
310        let fmt = JsonFmt;
311        let entry = dummy_entry();
312        let output = fmt.format(&entry).unwrap();
313        let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
314        assert_eq!(parsed["message"], "test");
315    }
316
317    // --- BatchingPort ---
318
319    struct InMemoryBatcher {
320        batch: Mutex<Vec<LogEntry>>,
321    }
322
323    impl InMemoryBatcher {
324        fn new() -> Self {
325            Self {
326                batch: Mutex::new(Vec::new()),
327            }
328        }
329    }
330
331    impl BatchingPort for InMemoryBatcher {
332        fn add_to_batch(&self, entry: LogEntry) -> ObservabilityResult<()> {
333            self.batch.lock().unwrap().push(entry);
334            Ok(())
335        }
336        fn flush_batch(&self) -> ObservabilityResult<()> {
337            self.batch.lock().unwrap().clear();
338            Ok(())
339        }
340        fn batch_size(&self) -> usize {
341            self.batch.lock().unwrap().len()
342        }
343    }
344
345    #[test]
346    fn test_batching_port_add_size_flush() {
347        let batcher = InMemoryBatcher::new();
348        assert_eq!(batcher.batch_size(), 0);
349
350        batcher.add_to_batch(dummy_entry()).unwrap();
351        batcher.add_to_batch(dummy_entry()).unwrap();
352        assert_eq!(batcher.batch_size(), 2);
353
354        batcher.flush_batch().unwrap();
355        assert_eq!(batcher.batch_size(), 0);
356    }
357
358    // --- StandardLoggingPort ---
359
360    struct DummyLogging;
361
362    impl StandardLoggingPort for DummyLogging {
363        fn initialize(&self) -> ObservabilityResult<()> {
364            Ok(())
365        }
366        fn process_standard_log(&self, _entry: LogEntry) -> ObservabilityResult<()> {
367            Ok(())
368        }
369        fn enabled(&self, _level: &LogLevel) -> bool {
370            true
371        }
372    }
373
374    #[test]
375    fn test_standard_logging_port_initialize_and_enabled() {
376        let logging = DummyLogging;
377        logging.initialize().unwrap();
378        assert!(logging.enabled(&LogLevel::Info));
379        assert!(logging.enabled(&LogLevel::Error));
380        logging.process_standard_log(dummy_entry()).unwrap();
381    }
382}