Skip to main content

camel_prometheus/
metrics.rs

1use std::time::Duration;
2
3use camel_api::metrics::MetricsCollector;
4use prometheus::{CounterVec, GaugeVec, HistogramVec, Opts, Registry, TextEncoder};
5
6/// Prometheus metrics collector for rust-camel
7///
8/// This struct implements the `MetricsCollector` trait and exposes metrics
9/// in Prometheus format via the `/metrics` endpoint.
10pub struct PrometheusMetrics {
11    registry: Registry,
12    exchanges_total: CounterVec,
13    errors_total: CounterVec,
14    exchange_duration_seconds: HistogramVec,
15    queue_depth: GaugeVec,
16    circuit_breaker_state: GaugeVec,
17}
18
19impl PrometheusMetrics {
20    /// Creates a new PrometheusMetrics instance with all metrics registered.
21    ///
22    /// # Panics
23    ///
24    /// Panics if metric creation or registration fails. This can only happen if:
25    /// - A metric name is invalid (must match `^[a-zA-Z_:][a-zA-Z0-9_:]*$`). All names are
26    ///   hardcoded below and comply with this requirement by convention.
27    /// - A metric is registered twice. This is impossible here because each call creates a
28    ///   fresh [`Registry`].
29    ///
30    /// Since both conditions are static invariants enforced by code review, these `expect()`
31    /// calls are intentional and will never fail in practice.
32    pub fn new() -> Self {
33        let registry = Registry::new();
34
35        // Create and register exchanges_total counter
36        let exchanges_total = CounterVec::new(
37            Opts::new("exchanges_total", "Total number of exchanges processed").namespace("camel"),
38            &["route"],
39        )
40        .expect("Failed to create exchanges_total counter"); // allow-unwrap
41        registry
42            .register(Box::new(exchanges_total.clone()))
43            .expect("Failed to register exchanges_total counter"); // allow-unwrap
44
45        // Create and register errors_total counter
46        let errors_total = CounterVec::new(
47            Opts::new("errors_total", "Total number of errors").namespace("camel"),
48            &["route", "error_type"],
49        )
50        .expect("Failed to create errors_total counter"); // allow-unwrap
51        registry
52            .register(Box::new(errors_total.clone()))
53            .expect("Failed to register errors_total counter"); // allow-unwrap
54
55        // Create and register exchange_duration_seconds histogram
56        // Using buckets suitable for typical exchange durations (ms to seconds range)
57        let exchange_duration_seconds = HistogramVec::new(
58            prometheus::HistogramOpts {
59                common_opts: Opts::new(
60                    "exchange_duration_seconds",
61                    "Exchange processing duration in seconds",
62                )
63                .namespace("camel"),
64                buckets: vec![
65                    0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
66                ],
67            },
68            &["route"],
69        )
70        .expect("Failed to create exchange_duration_seconds histogram"); // allow-unwrap
71        registry
72            .register(Box::new(exchange_duration_seconds.clone()))
73            .expect("Failed to register exchange_duration_seconds histogram"); // allow-unwrap
74
75        // Create and register queue_depth gauge
76        let queue_depth = GaugeVec::new(
77            Opts::new("queue_depth", "Current queue depth").namespace("camel"),
78            &["route"],
79        )
80        .expect("Failed to create queue_depth gauge"); // allow-unwrap
81        registry
82            .register(Box::new(queue_depth.clone()))
83            .expect("Failed to register queue_depth gauge"); // allow-unwrap
84
85        // Create and register circuit_breaker_state gauge
86        let circuit_breaker_state = GaugeVec::new(
87            Opts::new(
88                "circuit_breaker_state",
89                "Circuit breaker state (0=closed, 1=open, 2=half_open)",
90            )
91            .namespace("camel"),
92            &["route"],
93        )
94        .expect("Failed to create circuit_breaker_state gauge"); // allow-unwrap
95        registry
96            .register(Box::new(circuit_breaker_state.clone()))
97            .expect("Failed to register circuit_breaker_state gauge"); // allow-unwrap
98
99        Self {
100            registry,
101            exchanges_total,
102            errors_total,
103            exchange_duration_seconds,
104            queue_depth,
105            circuit_breaker_state,
106        }
107    }
108
109    /// Returns a reference to the underlying Prometheus registry
110    pub fn registry(&self) -> &Registry {
111        &self.registry
112    }
113
114    /// Gathers all metrics and returns them in Prometheus text format
115    pub fn gather(&self) -> String {
116        let encoder = TextEncoder::new();
117        let metric_families = self.registry.gather();
118        encoder
119            .encode_to_string(&metric_families)
120            .unwrap_or_else(|e| format!("# Error encoding metrics: {}\n", e))
121    }
122}
123
124impl Default for PrometheusMetrics {
125    fn default() -> Self {
126        Self::new()
127    }
128}
129
130impl MetricsCollector for PrometheusMetrics {
131    fn record_exchange_duration(&self, route_id: &str, duration: Duration) {
132        let duration_secs = duration.as_secs_f64();
133        self.exchange_duration_seconds
134            .with_label_values(&[route_id])
135            .observe(duration_secs);
136    }
137
138    fn increment_errors(&self, route_id: &str, error_type: &str) {
139        self.errors_total
140            .with_label_values(&[route_id, error_type])
141            .inc();
142    }
143
144    fn increment_exchanges(&self, route_id: &str) {
145        self.exchanges_total.with_label_values(&[route_id]).inc();
146    }
147
148    fn set_queue_depth(&self, route_id: &str, depth: usize) {
149        self.queue_depth
150            .with_label_values(&[route_id])
151            .set(depth as f64);
152    }
153
154    fn record_circuit_breaker_change(&self, route_id: &str, _from: &str, to: &str) {
155        // Map state names to numeric values
156        let state_value = |state: &str| -> f64 {
157            match state.to_lowercase().as_str() {
158                "closed" => 0.0,
159                "open" => 1.0,
160                "half_open" | "halfopen" => 2.0,
161                _ => -1.0, // Unknown state
162            }
163        };
164
165        // Set the new state
166        self.circuit_breaker_state
167            .with_label_values(&[route_id])
168            .set(state_value(to));
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175    use std::sync::Arc;
176
177    #[test]
178    fn test_create_prometheus_metrics() {
179        let metrics = PrometheusMetrics::new();
180        // Verify registry is accessible
181        let _ = metrics.registry();
182    }
183
184    #[test]
185    fn test_default_implementation() {
186        let metrics = PrometheusMetrics::default();
187        // Verify registry is accessible
188        let _ = metrics.registry();
189    }
190
191    #[test]
192    fn test_increment_exchanges() {
193        let metrics = PrometheusMetrics::new();
194
195        // Should not panic
196        metrics.increment_exchanges("test-route");
197        metrics.increment_exchanges("test-route");
198        metrics.increment_exchanges("other-route");
199
200        // Verify the metric is registered
201        let output = metrics.gather();
202        assert!(output.contains("camel_exchanges_total"));
203        assert!(output.contains("test-route"));
204        assert!(output.contains("other-route"));
205    }
206
207    #[test]
208    fn test_increment_errors() {
209        let metrics = PrometheusMetrics::new();
210
211        // Should not panic
212        metrics.increment_errors("test-route", "timeout");
213        metrics.increment_errors("test-route", "connection_failed");
214        metrics.increment_errors("other-route", "timeout");
215
216        // Verify the metric is registered
217        let output = metrics.gather();
218        assert!(output.contains("camel_errors_total"));
219        assert!(output.contains("timeout"));
220        assert!(output.contains("connection_failed"));
221    }
222
223    #[test]
224    fn test_record_exchange_duration() {
225        let metrics = PrometheusMetrics::new();
226
227        // Should not panic
228        metrics.record_exchange_duration("test-route", Duration::from_millis(50));
229        metrics.record_exchange_duration("test-route", Duration::from_millis(150));
230        metrics.record_exchange_duration("other-route", Duration::from_secs(1));
231
232        // Verify the metric is registered
233        let output = metrics.gather();
234        assert!(output.contains("camel_exchange_duration_seconds"));
235        assert!(output.contains("test-route"));
236    }
237
238    #[test]
239    fn test_set_queue_depth() {
240        let metrics = PrometheusMetrics::new();
241
242        // Should not panic
243        metrics.set_queue_depth("test-route", 5);
244        metrics.set_queue_depth("test-route", 10);
245        metrics.set_queue_depth("other-route", 3);
246
247        // Verify the metric is registered
248        let output = metrics.gather();
249        assert!(output.contains("camel_queue_depth"));
250    }
251
252    #[test]
253    fn test_record_circuit_breaker_change() {
254        let metrics = PrometheusMetrics::new();
255
256        // Should not panic
257        metrics.record_circuit_breaker_change("test-route", "closed", "open");
258        metrics.record_circuit_breaker_change("test-route", "open", "half_open");
259        metrics.record_circuit_breaker_change("test-route", "half_open", "closed");
260
261        // Verify the metric is registered
262        let output = metrics.gather();
263        assert!(output.contains("camel_circuit_breaker_state"));
264    }
265
266    #[test]
267    fn test_gather_returns_prometheus_format() {
268        let metrics = PrometheusMetrics::new();
269
270        // Record some metrics
271        metrics.increment_exchanges("route-1");
272        metrics.increment_errors("route-1", "timeout");
273        metrics.set_queue_depth("route-1", 5);
274
275        // Gather metrics
276        let output = metrics.gather();
277
278        // Verify output is valid Prometheus text format
279        assert!(output.starts_with("# HELP") || output.starts_with("# TYPE"));
280        assert!(output.contains("camel_exchanges_total"));
281        assert!(output.contains("camel_errors_total"));
282        assert!(output.contains("camel_queue_depth"));
283
284        // Verify labels use 'route' not 'route_id'
285        assert!(output.contains("route=\"route-1\""));
286        assert!(!output.contains("route_id=\"route-1\""));
287    }
288
289    #[test]
290    fn test_metrics_collector_trait_object() {
291        // Verify PrometheusMetrics can be used as a trait object
292        let metrics: Arc<dyn MetricsCollector> = Arc::new(PrometheusMetrics::new());
293
294        // All methods should work without panicking
295        metrics.increment_exchanges("test-route");
296        metrics.increment_errors("test-route", "test-error");
297        metrics.record_exchange_duration("test-route", Duration::from_millis(100));
298        metrics.set_queue_depth("test-route", 5);
299        metrics.record_circuit_breaker_change("test-route", "closed", "open");
300    }
301}