1use prometheus::{
10 Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
11};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone)]
17pub struct RustRabbitMetrics {
18 registry: Arc<Registry>,
19
20 messages_published_total: IntCounterVec,
22 messages_consumed_total: IntCounterVec,
23 messages_failed_total: IntCounterVec,
24 messages_retried_total: IntCounterVec,
25
26 message_processing_duration: HistogramVec,
28 message_publish_duration: HistogramVec,
29
30 connections_total: IntGauge,
32 connections_healthy: IntGauge,
33 connections_unhealthy: IntGauge,
34 connection_attempts_total: IntCounter,
35 connection_failures_total: IntCounter,
36
37 queue_depth: IntGaugeVec,
39 consumer_count: IntGaugeVec,
40
41 health_check_duration: Histogram,
43 last_health_check_timestamp: Gauge,
44}
45
46impl RustRabbitMetrics {
47 pub fn new() -> Result<Self, prometheus::Error> {
49 let registry = Arc::new(Registry::new());
50 Self::with_registry(registry)
51 }
52
53 pub fn with_registry(registry: Arc<Registry>) -> Result<Self, prometheus::Error> {
55 let messages_published_total = IntCounterVec::new(
56 prometheus::Opts::new(
57 "rustrabbit_messages_published_total",
58 "Total number of messages published",
59 ),
60 &["queue", "exchange", "routing_key"],
61 )?;
62
63 let messages_consumed_total = IntCounterVec::new(
64 prometheus::Opts::new(
65 "rustrabbit_messages_consumed_total",
66 "Total number of messages consumed",
67 ),
68 &["queue", "consumer_tag", "status"],
69 )?;
70
71 let messages_failed_total = IntCounterVec::new(
72 prometheus::Opts::new(
73 "rustrabbit_messages_failed_total",
74 "Total number of failed message processing attempts",
75 ),
76 &["queue", "error_type", "retry_attempt"],
77 )?;
78
79 let messages_retried_total = IntCounterVec::new(
80 prometheus::Opts::new(
81 "rustrabbit_messages_retried_total",
82 "Total number of message retry attempts",
83 ),
84 &["queue", "retry_reason"],
85 )?;
86
87 let message_processing_duration = HistogramVec::new(
88 prometheus::HistogramOpts::new(
89 "rustrabbit_message_processing_duration_seconds",
90 "Time spent processing messages",
91 )
92 .buckets(vec![
93 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
94 ]),
95 &["queue", "consumer_tag"],
96 )?;
97
98 let message_publish_duration = HistogramVec::new(
99 prometheus::HistogramOpts::new(
100 "rustrabbit_message_publish_duration_seconds",
101 "Time spent publishing messages",
102 )
103 .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]),
104 &["queue", "exchange"],
105 )?;
106
107 let connections_total = IntGauge::with_opts(prometheus::Opts::new(
108 "rustrabbit_connections_total",
109 "Total number of connections in the pool",
110 ))?;
111
112 let connections_healthy = IntGauge::with_opts(prometheus::Opts::new(
113 "rustrabbit_connections_healthy",
114 "Number of healthy connections",
115 ))?;
116
117 let connections_unhealthy = IntGauge::with_opts(prometheus::Opts::new(
118 "rustrabbit_connections_unhealthy",
119 "Number of unhealthy connections",
120 ))?;
121
122 let connection_attempts_total = IntCounter::with_opts(prometheus::Opts::new(
123 "rustrabbit_connection_attempts_total",
124 "Total number of connection attempts",
125 ))?;
126
127 let connection_failures_total = IntCounter::with_opts(prometheus::Opts::new(
128 "rustrabbit_connection_failures_total",
129 "Total number of connection failures",
130 ))?;
131
132 let queue_depth = IntGaugeVec::new(
133 prometheus::Opts::new(
134 "rustrabbit_queue_depth",
135 "Number of messages waiting in queue",
136 ),
137 &["queue"],
138 )?;
139
140 let consumer_count = IntGaugeVec::new(
141 prometheus::Opts::new(
142 "rustrabbit_consumer_count",
143 "Number of active consumers per queue",
144 ),
145 &["queue"],
146 )?;
147
148 let health_check_duration = Histogram::with_opts(
149 prometheus::HistogramOpts::new(
150 "rustrabbit_health_check_duration_seconds",
151 "Time spent performing health checks",
152 )
153 .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]),
154 )?;
155
156 let last_health_check_timestamp = Gauge::with_opts(prometheus::Opts::new(
157 "rustrabbit_last_health_check_timestamp",
158 "Timestamp of last health check (Unix seconds)",
159 ))?;
160
161 registry.register(Box::new(messages_published_total.clone()))?;
163 registry.register(Box::new(messages_consumed_total.clone()))?;
164 registry.register(Box::new(messages_failed_total.clone()))?;
165 registry.register(Box::new(messages_retried_total.clone()))?;
166 registry.register(Box::new(message_processing_duration.clone()))?;
167 registry.register(Box::new(message_publish_duration.clone()))?;
168 registry.register(Box::new(connections_total.clone()))?;
169 registry.register(Box::new(connections_healthy.clone()))?;
170 registry.register(Box::new(connections_unhealthy.clone()))?;
171 registry.register(Box::new(connection_attempts_total.clone()))?;
172 registry.register(Box::new(connection_failures_total.clone()))?;
173 registry.register(Box::new(queue_depth.clone()))?;
174 registry.register(Box::new(consumer_count.clone()))?;
175 registry.register(Box::new(health_check_duration.clone()))?;
176 registry.register(Box::new(last_health_check_timestamp.clone()))?;
177
178 Ok(Self {
179 registry,
180 messages_published_total,
181 messages_consumed_total,
182 messages_failed_total,
183 messages_retried_total,
184 message_processing_duration,
185 message_publish_duration,
186 connections_total,
187 connections_healthy,
188 connections_unhealthy,
189 connection_attempts_total,
190 connection_failures_total,
191 queue_depth,
192 consumer_count,
193 health_check_duration,
194 last_health_check_timestamp,
195 })
196 }
197
198 pub fn registry(&self) -> Arc<Registry> {
200 self.registry.clone()
201 }
202
203 pub fn record_message_published(&self, queue: &str, exchange: &str, routing_key: &str) {
207 self.messages_published_total
208 .with_label_values(&[queue, exchange, routing_key])
209 .inc();
210 }
211
212 pub fn record_message_consumed(&self, queue: &str, consumer_tag: &str, success: bool) {
214 let status = if success { "success" } else { "failed" };
215 self.messages_consumed_total
216 .with_label_values(&[queue, consumer_tag, status])
217 .inc();
218 }
219
220 pub fn record_message_failed(&self, queue: &str, error_type: &str, retry_attempt: u32) {
222 self.messages_failed_total
223 .with_label_values(&[queue, error_type, &retry_attempt.to_string()])
224 .inc();
225 }
226
227 pub fn record_message_retry(&self, queue: &str, retry_reason: &str) {
229 self.messages_retried_total
230 .with_label_values(&[queue, retry_reason])
231 .inc();
232 }
233
234 pub fn record_processing_duration(&self, queue: &str, consumer_tag: &str, duration: Duration) {
236 self.message_processing_duration
237 .with_label_values(&[queue, consumer_tag])
238 .observe(duration.as_secs_f64());
239 }
240
241 pub fn record_publish_duration(&self, queue: &str, exchange: &str, duration: Duration) {
243 self.message_publish_duration
244 .with_label_values(&[queue, exchange])
245 .observe(duration.as_secs_f64());
246 }
247
248 pub fn update_connection_pool(&self, total: i64, healthy: i64, unhealthy: i64) {
252 self.connections_total.set(total);
253 self.connections_healthy.set(healthy);
254 self.connections_unhealthy.set(unhealthy);
255 }
256
257 pub fn record_connection_attempt(&self) {
259 self.connection_attempts_total.inc();
260 }
261
262 pub fn record_connection_failure(&self) {
264 self.connection_failures_total.inc();
265 }
266
267 pub fn update_queue_depth(&self, queue: &str, depth: i64) {
271 self.queue_depth.with_label_values(&[queue]).set(depth);
272 }
273
274 pub fn update_consumer_count(&self, queue: &str, count: i64) {
276 self.consumer_count.with_label_values(&[queue]).set(count);
277 }
278
279 pub fn record_health_check_duration(&self, duration: Duration) {
283 self.health_check_duration.observe(duration.as_secs_f64());
284 self.last_health_check_timestamp.set(
285 std::time::SystemTime::now()
286 .duration_since(std::time::UNIX_EPOCH)
287 .unwrap_or_default()
288 .as_secs_f64(),
289 );
290 }
291}
292
293impl Default for RustRabbitMetrics {
294 fn default() -> Self {
295 Self::new().expect("Failed to create default metrics")
296 }
297}
298
299pub struct MetricsTimer {
301 start: Instant,
302}
303
304impl Default for MetricsTimer {
305 fn default() -> Self {
306 Self::new()
307 }
308}
309
310impl MetricsTimer {
311 pub fn new() -> Self {
313 Self {
314 start: Instant::now(),
315 }
316 }
317
318 pub fn elapsed(&self) -> Duration {
320 self.start.elapsed()
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_metrics_creation() {
330 let metrics = RustRabbitMetrics::new().unwrap();
331 assert!(!metrics.registry().gather().is_empty());
332 }
333
334 #[test]
335 fn test_message_metrics() {
336 let metrics = RustRabbitMetrics::new().unwrap();
337
338 metrics.record_message_published("test-queue", "test-exchange", "test-key");
339 metrics.record_message_consumed("test-queue", "test-consumer", true);
340 metrics.record_message_failed("test-queue", "timeout", 1);
341 metrics.record_message_retry("test-queue", "transient-error");
342
343 let metric_families = metrics.registry().gather();
344 assert!(!metric_families.is_empty());
345 }
346
347 #[test]
348 fn test_connection_metrics() {
349 let metrics = RustRabbitMetrics::new().unwrap();
350
351 metrics.update_connection_pool(5, 4, 1);
352 metrics.record_connection_attempt();
353 metrics.record_connection_failure();
354
355 let metric_families = metrics.registry().gather();
356 assert!(!metric_families.is_empty());
357 }
358
359 #[test]
360 fn test_timer() {
361 let timer = MetricsTimer::new();
362 std::thread::sleep(Duration::from_millis(1));
363 let elapsed = timer.elapsed();
364 assert!(elapsed >= Duration::from_millis(1));
365 }
366}