Skip to main content

camel_prometheus/
metrics.rs

1use std::time::Duration;
2
3use camel_api::metrics::MetricsCollector;
4use prometheus::{CounterVec, GaugeVec, HistogramVec, Opts, Registry, TextEncoder};
5
6/// Prometheus metrics collector for rust-camel
7///
8/// This struct implements the `MetricsCollector` trait and exposes metrics
9/// in Prometheus format via the `/metrics` endpoint.
10pub struct PrometheusMetrics {
11    registry: Registry,
12    exchanges_total: CounterVec,
13    errors_total: CounterVec,
14    exchange_duration_seconds: HistogramVec,
15    queue_depth: GaugeVec,
16    circuit_breaker_state: GaugeVec,
17}
18
19impl PrometheusMetrics {
20    /// Creates a new PrometheusMetrics instance with all metrics registered
21    pub fn new() -> Self {
22        let registry = Registry::new();
23
24        // Create and register exchanges_total counter
25        let exchanges_total = CounterVec::new(
26            Opts::new("exchanges_total", "Total number of exchanges processed").namespace("camel"),
27            &["route"],
28        )
29        .expect("Failed to create exchanges_total counter");
30        registry
31            .register(Box::new(exchanges_total.clone()))
32            .expect("Failed to register exchanges_total counter");
33
34        // Create and register errors_total counter
35        let errors_total = CounterVec::new(
36            Opts::new("errors_total", "Total number of errors").namespace("camel"),
37            &["route", "error_type"],
38        )
39        .expect("Failed to create errors_total counter");
40        registry
41            .register(Box::new(errors_total.clone()))
42            .expect("Failed to register errors_total counter");
43
44        // Create and register exchange_duration_seconds histogram
45        // Using buckets suitable for typical exchange durations (ms to seconds range)
46        let exchange_duration_seconds = HistogramVec::new(
47            prometheus::HistogramOpts {
48                common_opts: Opts::new(
49                    "exchange_duration_seconds",
50                    "Exchange processing duration in seconds",
51                )
52                .namespace("camel"),
53                buckets: vec![
54                    0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
55                ],
56            },
57            &["route"],
58        )
59        .expect("Failed to create exchange_duration_seconds histogram");
60        registry
61            .register(Box::new(exchange_duration_seconds.clone()))
62            .expect("Failed to register exchange_duration_seconds histogram");
63
64        // Create and register queue_depth gauge
65        let queue_depth = GaugeVec::new(
66            Opts::new("queue_depth", "Current queue depth").namespace("camel"),
67            &["route"],
68        )
69        .expect("Failed to create queue_depth gauge");
70        registry
71            .register(Box::new(queue_depth.clone()))
72            .expect("Failed to register queue_depth gauge");
73
74        // Create and register circuit_breaker_state gauge
75        let circuit_breaker_state = GaugeVec::new(
76            Opts::new(
77                "circuit_breaker_state",
78                "Circuit breaker state (0=closed, 1=open, 2=half_open)",
79            )
80            .namespace("camel"),
81            &["route"],
82        )
83        .expect("Failed to create circuit_breaker_state gauge");
84        registry
85            .register(Box::new(circuit_breaker_state.clone()))
86            .expect("Failed to register circuit_breaker_state gauge");
87
88        Self {
89            registry,
90            exchanges_total,
91            errors_total,
92            exchange_duration_seconds,
93            queue_depth,
94            circuit_breaker_state,
95        }
96    }
97
98    /// Returns a reference to the underlying Prometheus registry
99    pub fn registry(&self) -> &Registry {
100        &self.registry
101    }
102
103    /// Gathers all metrics and returns them in Prometheus text format
104    pub fn gather(&self) -> String {
105        let encoder = TextEncoder::new();
106        let metric_families = self.registry.gather();
107        encoder
108            .encode_to_string(&metric_families)
109            .unwrap_or_else(|e| format!("# Error encoding metrics: {}\n", e))
110    }
111}
112
113impl Default for PrometheusMetrics {
114    fn default() -> Self {
115        Self::new()
116    }
117}
118
119impl MetricsCollector for PrometheusMetrics {
120    fn record_exchange_duration(&self, route_id: &str, duration: Duration) {
121        let duration_secs = duration.as_secs_f64();
122        self.exchange_duration_seconds
123            .with_label_values(&[route_id])
124            .observe(duration_secs);
125    }
126
127    fn increment_errors(&self, route_id: &str, error_type: &str) {
128        self.errors_total
129            .with_label_values(&[route_id, error_type])
130            .inc();
131    }
132
133    fn increment_exchanges(&self, route_id: &str) {
134        self.exchanges_total.with_label_values(&[route_id]).inc();
135    }
136
137    fn set_queue_depth(&self, route_id: &str, depth: usize) {
138        self.queue_depth
139            .with_label_values(&[route_id])
140            .set(depth as f64);
141    }
142
143    fn record_circuit_breaker_change(&self, route_id: &str, _from: &str, to: &str) {
144        // Map state names to numeric values
145        let state_value = |state: &str| -> f64 {
146            match state.to_lowercase().as_str() {
147                "closed" => 0.0,
148                "open" => 1.0,
149                "half_open" | "halfopen" => 2.0,
150                _ => -1.0, // Unknown state
151            }
152        };
153
154        // Set the new state
155        self.circuit_breaker_state
156            .with_label_values(&[route_id])
157            .set(state_value(to));
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164    use std::sync::Arc;
165
166    #[test]
167    fn test_create_prometheus_metrics() {
168        let metrics = PrometheusMetrics::new();
169        // Verify registry is accessible
170        let _ = metrics.registry();
171    }
172
173    #[test]
174    fn test_default_implementation() {
175        let metrics = PrometheusMetrics::default();
176        // Verify registry is accessible
177        let _ = metrics.registry();
178    }
179
180    #[test]
181    fn test_increment_exchanges() {
182        let metrics = PrometheusMetrics::new();
183
184        // Should not panic
185        metrics.increment_exchanges("test-route");
186        metrics.increment_exchanges("test-route");
187        metrics.increment_exchanges("other-route");
188
189        // Verify the metric is registered
190        let output = metrics.gather();
191        assert!(output.contains("camel_exchanges_total"));
192        assert!(output.contains("test-route"));
193        assert!(output.contains("other-route"));
194    }
195
196    #[test]
197    fn test_increment_errors() {
198        let metrics = PrometheusMetrics::new();
199
200        // Should not panic
201        metrics.increment_errors("test-route", "timeout");
202        metrics.increment_errors("test-route", "connection_failed");
203        metrics.increment_errors("other-route", "timeout");
204
205        // Verify the metric is registered
206        let output = metrics.gather();
207        assert!(output.contains("camel_errors_total"));
208        assert!(output.contains("timeout"));
209        assert!(output.contains("connection_failed"));
210    }
211
212    #[test]
213    fn test_record_exchange_duration() {
214        let metrics = PrometheusMetrics::new();
215
216        // Should not panic
217        metrics.record_exchange_duration("test-route", Duration::from_millis(50));
218        metrics.record_exchange_duration("test-route", Duration::from_millis(150));
219        metrics.record_exchange_duration("other-route", Duration::from_secs(1));
220
221        // Verify the metric is registered
222        let output = metrics.gather();
223        assert!(output.contains("camel_exchange_duration_seconds"));
224        assert!(output.contains("test-route"));
225    }
226
227    #[test]
228    fn test_set_queue_depth() {
229        let metrics = PrometheusMetrics::new();
230
231        // Should not panic
232        metrics.set_queue_depth("test-route", 5);
233        metrics.set_queue_depth("test-route", 10);
234        metrics.set_queue_depth("other-route", 3);
235
236        // Verify the metric is registered
237        let output = metrics.gather();
238        assert!(output.contains("camel_queue_depth"));
239    }
240
241    #[test]
242    fn test_record_circuit_breaker_change() {
243        let metrics = PrometheusMetrics::new();
244
245        // Should not panic
246        metrics.record_circuit_breaker_change("test-route", "closed", "open");
247        metrics.record_circuit_breaker_change("test-route", "open", "half_open");
248        metrics.record_circuit_breaker_change("test-route", "half_open", "closed");
249
250        // Verify the metric is registered
251        let output = metrics.gather();
252        assert!(output.contains("camel_circuit_breaker_state"));
253    }
254
255    #[test]
256    fn test_gather_returns_prometheus_format() {
257        let metrics = PrometheusMetrics::new();
258
259        // Record some metrics
260        metrics.increment_exchanges("route-1");
261        metrics.increment_errors("route-1", "timeout");
262        metrics.set_queue_depth("route-1", 5);
263
264        // Gather metrics
265        let output = metrics.gather();
266
267        // Verify output is valid Prometheus text format
268        assert!(output.starts_with("# HELP") || output.starts_with("# TYPE"));
269        assert!(output.contains("camel_exchanges_total"));
270        assert!(output.contains("camel_errors_total"));
271        assert!(output.contains("camel_queue_depth"));
272
273        // Verify labels use 'route' not 'route_id'
274        assert!(output.contains("route=\"route-1\""));
275        assert!(!output.contains("route_id=\"route-1\""));
276    }
277
278    #[test]
279    fn test_metrics_collector_trait_object() {
280        // Verify PrometheusMetrics can be used as a trait object
281        let metrics: Arc<dyn MetricsCollector> = Arc::new(PrometheusMetrics::new());
282
283        // All methods should work without panicking
284        metrics.increment_exchanges("test-route");
285        metrics.increment_errors("test-route", "test-error");
286        metrics.record_exchange_duration("test-route", Duration::from_millis(100));
287        metrics.set_queue_depth("test-route", 5);
288        metrics.record_circuit_breaker_change("test-route", "closed", "open");
289    }
290}