Skip to main content

camel_api/
metrics.rs

1use std::time::Duration;
2
3/// Trait for collecting metrics from the Camel runtime.
4/// Implementations can integrate with Prometheus, OpenTelemetry, etc.
5pub trait MetricsCollector: Send + Sync {
6    /// Record exchange processing time
7    fn record_exchange_duration(&self, route_id: &str, duration: Duration);
8
9    /// Increment error counter
10    fn increment_errors(&self, route_id: &str, error_type: &str);
11
12    /// Increment exchange counter
13    fn increment_exchanges(&self, route_id: &str);
14
15    /// Update queue depth
16    fn set_queue_depth(&self, route_id: &str, depth: usize);
17
18    /// Record circuit breaker state change
19    fn record_circuit_breaker_change(&self, route_id: &str, from: &str, to: &str);
20
21    /// Record a histogram observation (e.g., cost, latency distribution).
22    /// Default: no-op (backward-compatible).
23    fn record_histogram(&self, _name: &str, _value: f64, _labels: &[(&str, &str)]) {}
24}
25
26/// No-op metrics collector for default behavior
27pub struct NoOpMetrics;
28
29impl MetricsCollector for NoOpMetrics {
30    fn record_exchange_duration(&self, _route_id: &str, _duration: Duration) {}
31    fn increment_errors(&self, _route_id: &str, _error_type: &str) {}
32    fn increment_exchanges(&self, _route_id: &str) {}
33    fn set_queue_depth(&self, _route_id: &str, _depth: usize) {}
34    fn record_circuit_breaker_change(&self, _route_id: &str, _from: &str, _to: &str) {}
35}
36
37#[cfg(test)]
38mod tests {
39    use super::*;
40    use std::sync::Arc;
41
42    #[test]
43    fn test_noop_metrics_implements_trait() {
44        let metrics = NoOpMetrics;
45        let metrics_arc: Arc<dyn MetricsCollector> = Arc::new(metrics);
46
47        // All methods should execute without panicking
48        metrics_arc.record_exchange_duration("test-route", Duration::from_millis(100));
49        metrics_arc.increment_errors("test-route", "test-error");
50        metrics_arc.increment_exchanges("test-route");
51        metrics_arc.set_queue_depth("test-route", 5);
52        metrics_arc.record_circuit_breaker_change("test-route", "closed", "open");
53    }
54
55    #[test]
56    fn test_custom_metrics_collector() {
57        struct TestMetrics {
58            exchange_count: std::sync::atomic::AtomicU64,
59        }
60
61        impl MetricsCollector for TestMetrics {
62            fn record_exchange_duration(&self, route_id: &str, duration: Duration) {
63                // In a real implementation, this would record the duration
64                println!("Route {} took {}ms", route_id, duration.as_millis());
65            }
66
67            fn increment_errors(&self, route_id: &str, error_type: &str) {
68                // In a real implementation, this would increment an error counter
69                println!("Route {} had error: {}", route_id, error_type);
70            }
71
72            fn increment_exchanges(&self, route_id: &str) {
73                // In a real implementation, this would increment an exchange counter
74                self.exchange_count
75                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
76                println!("Route {} processed exchange", route_id);
77            }
78
79            fn set_queue_depth(&self, route_id: &str, depth: usize) {
80                // In a real implementation, this would update a gauge
81                println!("Route {} queue depth: {}", route_id, depth);
82            }
83
84            fn record_circuit_breaker_change(&self, route_id: &str, from: &str, to: &str) {
85                // In a real implementation, this would record the state change
86                println!("Route {} circuit breaker: {} -> {}", route_id, from, to);
87            }
88        }
89
90        let test_metrics = TestMetrics {
91            exchange_count: std::sync::atomic::AtomicU64::new(0),
92        };
93        let metrics_arc: Arc<dyn MetricsCollector> = Arc::new(test_metrics);
94
95        // Test that all methods work
96        metrics_arc.record_exchange_duration("test-route", Duration::from_millis(100));
97        metrics_arc.increment_errors("test-route", "test-error");
98        metrics_arc.increment_exchanges("test-route");
99        metrics_arc.set_queue_depth("test-route", 5);
100        metrics_arc.record_circuit_breaker_change("test-route", "closed", "open");
101
102        // Note: We can't easily test the counter value without additional accessors
103        // This is just to verify the trait implementation works
104    }
105}