1use std::time::Duration;
2
3use camel_api::metrics::MetricsCollector;
4use prometheus::{CounterVec, GaugeVec, HistogramVec, Opts, Registry, TextEncoder};
5
6pub struct PrometheusMetrics {
11 registry: Registry,
12 exchanges_total: CounterVec,
13 errors_total: CounterVec,
14 exchange_duration_seconds: HistogramVec,
15 queue_depth: GaugeVec,
16 circuit_breaker_state: GaugeVec,
17}
18
19impl PrometheusMetrics {
20 pub fn new() -> Self {
33 let registry = Registry::new();
34
35 let exchanges_total = CounterVec::new(
37 Opts::new("exchanges_total", "Total number of exchanges processed").namespace("camel"),
38 &["route"],
39 )
40 .expect("Failed to create exchanges_total counter"); registry
42 .register(Box::new(exchanges_total.clone()))
43 .expect("Failed to register exchanges_total counter"); let errors_total = CounterVec::new(
47 Opts::new("errors_total", "Total number of errors").namespace("camel"),
48 &["route", "error_type"],
49 )
50 .expect("Failed to create errors_total counter"); registry
52 .register(Box::new(errors_total.clone()))
53 .expect("Failed to register errors_total counter"); let exchange_duration_seconds = HistogramVec::new(
58 prometheus::HistogramOpts {
59 common_opts: Opts::new(
60 "exchange_duration_seconds",
61 "Exchange processing duration in seconds",
62 )
63 .namespace("camel"),
64 buckets: vec![
65 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
66 ],
67 },
68 &["route"],
69 )
70 .expect("Failed to create exchange_duration_seconds histogram"); registry
72 .register(Box::new(exchange_duration_seconds.clone()))
73 .expect("Failed to register exchange_duration_seconds histogram"); let queue_depth = GaugeVec::new(
77 Opts::new("queue_depth", "Current queue depth").namespace("camel"),
78 &["route"],
79 )
80 .expect("Failed to create queue_depth gauge"); registry
82 .register(Box::new(queue_depth.clone()))
83 .expect("Failed to register queue_depth gauge"); let circuit_breaker_state = GaugeVec::new(
87 Opts::new(
88 "circuit_breaker_state",
89 "Circuit breaker state (0=closed, 1=open, 2=half_open)",
90 )
91 .namespace("camel"),
92 &["route"],
93 )
94 .expect("Failed to create circuit_breaker_state gauge"); registry
96 .register(Box::new(circuit_breaker_state.clone()))
97 .expect("Failed to register circuit_breaker_state gauge"); Self {
100 registry,
101 exchanges_total,
102 errors_total,
103 exchange_duration_seconds,
104 queue_depth,
105 circuit_breaker_state,
106 }
107 }
108
109 pub fn registry(&self) -> &Registry {
111 &self.registry
112 }
113
114 pub fn gather(&self) -> String {
116 let encoder = TextEncoder::new();
117 let metric_families = self.registry.gather();
118 encoder
119 .encode_to_string(&metric_families)
120 .unwrap_or_else(|e| format!("# Error encoding metrics: {}\n", e))
121 }
122}
123
124impl Default for PrometheusMetrics {
125 fn default() -> Self {
126 Self::new()
127 }
128}
129
130impl MetricsCollector for PrometheusMetrics {
131 fn record_exchange_duration(&self, route_id: &str, duration: Duration) {
132 let duration_secs = duration.as_secs_f64();
133 self.exchange_duration_seconds
134 .with_label_values(&[route_id])
135 .observe(duration_secs);
136 }
137
138 fn increment_errors(&self, route_id: &str, error_type: &str) {
139 self.errors_total
140 .with_label_values(&[route_id, error_type])
141 .inc();
142 }
143
144 fn increment_exchanges(&self, route_id: &str) {
145 self.exchanges_total.with_label_values(&[route_id]).inc();
146 }
147
148 fn set_queue_depth(&self, route_id: &str, depth: usize) {
149 self.queue_depth
150 .with_label_values(&[route_id])
151 .set(depth as f64);
152 }
153
154 fn record_circuit_breaker_change(&self, route_id: &str, _from: &str, to: &str) {
155 let state_value = |state: &str| -> f64 {
157 match state.to_lowercase().as_str() {
158 "closed" => 0.0,
159 "open" => 1.0,
160 "half_open" | "halfopen" => 2.0,
161 _ => -1.0, }
163 };
164
165 self.circuit_breaker_state
167 .with_label_values(&[route_id])
168 .set(state_value(to));
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175 use std::sync::Arc;
176
177 #[test]
178 fn test_create_prometheus_metrics() {
179 let metrics = PrometheusMetrics::new();
180 let _ = metrics.registry();
182 }
183
184 #[test]
185 fn test_default_implementation() {
186 let metrics = PrometheusMetrics::default();
187 let _ = metrics.registry();
189 }
190
191 #[test]
192 fn test_increment_exchanges() {
193 let metrics = PrometheusMetrics::new();
194
195 metrics.increment_exchanges("test-route");
197 metrics.increment_exchanges("test-route");
198 metrics.increment_exchanges("other-route");
199
200 let output = metrics.gather();
202 assert!(output.contains("camel_exchanges_total"));
203 assert!(output.contains("test-route"));
204 assert!(output.contains("other-route"));
205 }
206
207 #[test]
208 fn test_increment_errors() {
209 let metrics = PrometheusMetrics::new();
210
211 metrics.increment_errors("test-route", "timeout");
213 metrics.increment_errors("test-route", "connection_failed");
214 metrics.increment_errors("other-route", "timeout");
215
216 let output = metrics.gather();
218 assert!(output.contains("camel_errors_total"));
219 assert!(output.contains("timeout"));
220 assert!(output.contains("connection_failed"));
221 }
222
223 #[test]
224 fn test_record_exchange_duration() {
225 let metrics = PrometheusMetrics::new();
226
227 metrics.record_exchange_duration("test-route", Duration::from_millis(50));
229 metrics.record_exchange_duration("test-route", Duration::from_millis(150));
230 metrics.record_exchange_duration("other-route", Duration::from_secs(1));
231
232 let output = metrics.gather();
234 assert!(output.contains("camel_exchange_duration_seconds"));
235 assert!(output.contains("test-route"));
236 }
237
238 #[test]
239 fn test_set_queue_depth() {
240 let metrics = PrometheusMetrics::new();
241
242 metrics.set_queue_depth("test-route", 5);
244 metrics.set_queue_depth("test-route", 10);
245 metrics.set_queue_depth("other-route", 3);
246
247 let output = metrics.gather();
249 assert!(output.contains("camel_queue_depth"));
250 }
251
252 #[test]
253 fn test_record_circuit_breaker_change() {
254 let metrics = PrometheusMetrics::new();
255
256 metrics.record_circuit_breaker_change("test-route", "closed", "open");
258 metrics.record_circuit_breaker_change("test-route", "open", "half_open");
259 metrics.record_circuit_breaker_change("test-route", "half_open", "closed");
260
261 let output = metrics.gather();
263 assert!(output.contains("camel_circuit_breaker_state"));
264 }
265
266 #[test]
267 fn test_gather_returns_prometheus_format() {
268 let metrics = PrometheusMetrics::new();
269
270 metrics.increment_exchanges("route-1");
272 metrics.increment_errors("route-1", "timeout");
273 metrics.set_queue_depth("route-1", 5);
274
275 let output = metrics.gather();
277
278 assert!(output.starts_with("# HELP") || output.starts_with("# TYPE"));
280 assert!(output.contains("camel_exchanges_total"));
281 assert!(output.contains("camel_errors_total"));
282 assert!(output.contains("camel_queue_depth"));
283
284 assert!(output.contains("route=\"route-1\""));
286 assert!(!output.contains("route_id=\"route-1\""));
287 }
288
289 #[test]
290 fn test_metrics_collector_trait_object() {
291 let metrics: Arc<dyn MetricsCollector> = Arc::new(PrometheusMetrics::new());
293
294 metrics.increment_exchanges("test-route");
296 metrics.increment_errors("test-route", "test-error");
297 metrics.record_exchange_duration("test-route", Duration::from_millis(100));
298 metrics.set_queue_depth("test-route", 5);
299 metrics.record_circuit_breaker_change("test-route", "closed", "open");
300 }
301}