1use std::time::Duration;
2
3use camel_api::metrics::MetricsCollector;
4use prometheus::{CounterVec, GaugeVec, HistogramVec, Opts, Registry, TextEncoder};
5
6pub struct PrometheusMetrics {
11 registry: Registry,
12 exchanges_total: CounterVec,
13 errors_total: CounterVec,
14 exchange_duration_seconds: HistogramVec,
15 queue_depth: GaugeVec,
16 circuit_breaker_state: GaugeVec,
17}
18
19impl PrometheusMetrics {
20 pub fn new() -> Self {
22 let registry = Registry::new();
23
24 let exchanges_total = CounterVec::new(
26 Opts::new("exchanges_total", "Total number of exchanges processed").namespace("camel"),
27 &["route"],
28 )
29 .expect("Failed to create exchanges_total counter");
30 registry
31 .register(Box::new(exchanges_total.clone()))
32 .expect("Failed to register exchanges_total counter");
33
34 let errors_total = CounterVec::new(
36 Opts::new("errors_total", "Total number of errors").namespace("camel"),
37 &["route", "error_type"],
38 )
39 .expect("Failed to create errors_total counter");
40 registry
41 .register(Box::new(errors_total.clone()))
42 .expect("Failed to register errors_total counter");
43
44 let exchange_duration_seconds = HistogramVec::new(
47 prometheus::HistogramOpts {
48 common_opts: Opts::new(
49 "exchange_duration_seconds",
50 "Exchange processing duration in seconds",
51 )
52 .namespace("camel"),
53 buckets: vec![
54 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
55 ],
56 },
57 &["route"],
58 )
59 .expect("Failed to create exchange_duration_seconds histogram");
60 registry
61 .register(Box::new(exchange_duration_seconds.clone()))
62 .expect("Failed to register exchange_duration_seconds histogram");
63
64 let queue_depth = GaugeVec::new(
66 Opts::new("queue_depth", "Current queue depth").namespace("camel"),
67 &["route"],
68 )
69 .expect("Failed to create queue_depth gauge");
70 registry
71 .register(Box::new(queue_depth.clone()))
72 .expect("Failed to register queue_depth gauge");
73
74 let circuit_breaker_state = GaugeVec::new(
76 Opts::new(
77 "circuit_breaker_state",
78 "Circuit breaker state (0=closed, 1=open, 2=half_open)",
79 )
80 .namespace("camel"),
81 &["route"],
82 )
83 .expect("Failed to create circuit_breaker_state gauge");
84 registry
85 .register(Box::new(circuit_breaker_state.clone()))
86 .expect("Failed to register circuit_breaker_state gauge");
87
88 Self {
89 registry,
90 exchanges_total,
91 errors_total,
92 exchange_duration_seconds,
93 queue_depth,
94 circuit_breaker_state,
95 }
96 }
97
98 pub fn registry(&self) -> &Registry {
100 &self.registry
101 }
102
103 pub fn gather(&self) -> String {
105 let encoder = TextEncoder::new();
106 let metric_families = self.registry.gather();
107 encoder
108 .encode_to_string(&metric_families)
109 .unwrap_or_else(|e| format!("# Error encoding metrics: {}\n", e))
110 }
111}
112
113impl Default for PrometheusMetrics {
114 fn default() -> Self {
115 Self::new()
116 }
117}
118
119impl MetricsCollector for PrometheusMetrics {
120 fn record_exchange_duration(&self, route_id: &str, duration: Duration) {
121 let duration_secs = duration.as_secs_f64();
122 self.exchange_duration_seconds
123 .with_label_values(&[route_id])
124 .observe(duration_secs);
125 }
126
127 fn increment_errors(&self, route_id: &str, error_type: &str) {
128 self.errors_total
129 .with_label_values(&[route_id, error_type])
130 .inc();
131 }
132
133 fn increment_exchanges(&self, route_id: &str) {
134 self.exchanges_total.with_label_values(&[route_id]).inc();
135 }
136
137 fn set_queue_depth(&self, route_id: &str, depth: usize) {
138 self.queue_depth
139 .with_label_values(&[route_id])
140 .set(depth as f64);
141 }
142
143 fn record_circuit_breaker_change(&self, route_id: &str, _from: &str, to: &str) {
144 let state_value = |state: &str| -> f64 {
146 match state.to_lowercase().as_str() {
147 "closed" => 0.0,
148 "open" => 1.0,
149 "half_open" | "halfopen" => 2.0,
150 _ => -1.0, }
152 };
153
154 self.circuit_breaker_state
156 .with_label_values(&[route_id])
157 .set(state_value(to));
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164 use std::sync::Arc;
165
166 #[test]
167 fn test_create_prometheus_metrics() {
168 let metrics = PrometheusMetrics::new();
169 let _ = metrics.registry();
171 }
172
173 #[test]
174 fn test_default_implementation() {
175 let metrics = PrometheusMetrics::default();
176 let _ = metrics.registry();
178 }
179
180 #[test]
181 fn test_increment_exchanges() {
182 let metrics = PrometheusMetrics::new();
183
184 metrics.increment_exchanges("test-route");
186 metrics.increment_exchanges("test-route");
187 metrics.increment_exchanges("other-route");
188
189 let output = metrics.gather();
191 assert!(output.contains("camel_exchanges_total"));
192 assert!(output.contains("test-route"));
193 assert!(output.contains("other-route"));
194 }
195
196 #[test]
197 fn test_increment_errors() {
198 let metrics = PrometheusMetrics::new();
199
200 metrics.increment_errors("test-route", "timeout");
202 metrics.increment_errors("test-route", "connection_failed");
203 metrics.increment_errors("other-route", "timeout");
204
205 let output = metrics.gather();
207 assert!(output.contains("camel_errors_total"));
208 assert!(output.contains("timeout"));
209 assert!(output.contains("connection_failed"));
210 }
211
212 #[test]
213 fn test_record_exchange_duration() {
214 let metrics = PrometheusMetrics::new();
215
216 metrics.record_exchange_duration("test-route", Duration::from_millis(50));
218 metrics.record_exchange_duration("test-route", Duration::from_millis(150));
219 metrics.record_exchange_duration("other-route", Duration::from_secs(1));
220
221 let output = metrics.gather();
223 assert!(output.contains("camel_exchange_duration_seconds"));
224 assert!(output.contains("test-route"));
225 }
226
227 #[test]
228 fn test_set_queue_depth() {
229 let metrics = PrometheusMetrics::new();
230
231 metrics.set_queue_depth("test-route", 5);
233 metrics.set_queue_depth("test-route", 10);
234 metrics.set_queue_depth("other-route", 3);
235
236 let output = metrics.gather();
238 assert!(output.contains("camel_queue_depth"));
239 }
240
241 #[test]
242 fn test_record_circuit_breaker_change() {
243 let metrics = PrometheusMetrics::new();
244
245 metrics.record_circuit_breaker_change("test-route", "closed", "open");
247 metrics.record_circuit_breaker_change("test-route", "open", "half_open");
248 metrics.record_circuit_breaker_change("test-route", "half_open", "closed");
249
250 let output = metrics.gather();
252 assert!(output.contains("camel_circuit_breaker_state"));
253 }
254
255 #[test]
256 fn test_gather_returns_prometheus_format() {
257 let metrics = PrometheusMetrics::new();
258
259 metrics.increment_exchanges("route-1");
261 metrics.increment_errors("route-1", "timeout");
262 metrics.set_queue_depth("route-1", 5);
263
264 let output = metrics.gather();
266
267 assert!(output.starts_with("# HELP") || output.starts_with("# TYPE"));
269 assert!(output.contains("camel_exchanges_total"));
270 assert!(output.contains("camel_errors_total"));
271 assert!(output.contains("camel_queue_depth"));
272
273 assert!(output.contains("route=\"route-1\""));
275 assert!(!output.contains("route_id=\"route-1\""));
276 }
277
278 #[test]
279 fn test_metrics_collector_trait_object() {
280 let metrics: Arc<dyn MetricsCollector> = Arc::new(PrometheusMetrics::new());
282
283 metrics.increment_exchanges("test-route");
285 metrics.increment_errors("test-route", "test-error");
286 metrics.record_exchange_duration("test-route", Duration::from_millis(100));
287 metrics.set_queue_depth("test-route", 5);
288 metrics.record_circuit_breaker_change("test-route", "closed", "open");
289 }
290}