rust_rabbit/
metrics.rs

1//! Metrics and observability module for RustRabbit
2//!
3//! This module provides Prometheus metrics integration for monitoring:
4//! - Message throughput (published/consumed)
5//! - Connection health and pool status
6//! - Error rates and retry attempts
7//! - Processing latency and queue depths
8
9use prometheus::{
10    Gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Registry,
11};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15/// Metrics collector for RustRabbit operations
16#[derive(Debug, Clone)]
17pub struct RustRabbitMetrics {
18    registry: Arc<Registry>,
19
20    // Message metrics
21    messages_published_total: IntCounterVec,
22    messages_consumed_total: IntCounterVec,
23    messages_failed_total: IntCounterVec,
24    messages_retried_total: IntCounterVec,
25
26    // Processing metrics
27    message_processing_duration: HistogramVec,
28    message_publish_duration: HistogramVec,
29
30    // Connection metrics
31    connections_total: IntGauge,
32    connections_healthy: IntGauge,
33    connections_unhealthy: IntGauge,
34    connection_attempts_total: IntCounter,
35    connection_failures_total: IntCounter,
36
37    // Queue metrics
38    queue_depth: IntGaugeVec,
39    consumer_count: IntGaugeVec,
40
41    // Health metrics
42    health_check_duration: Histogram,
43    last_health_check_timestamp: Gauge,
44}
45
46impl RustRabbitMetrics {
47    /// Create a new metrics instance with default registry
48    pub fn new() -> Result<Self, prometheus::Error> {
49        let registry = Arc::new(Registry::new());
50        Self::with_registry(registry)
51    }
52
53    /// Create a new metrics instance with custom registry
54    pub fn with_registry(registry: Arc<Registry>) -> Result<Self, prometheus::Error> {
55        let messages_published_total = IntCounterVec::new(
56            prometheus::Opts::new(
57                "rustrabbit_messages_published_total",
58                "Total number of messages published",
59            ),
60            &["queue", "exchange", "routing_key"],
61        )?;
62
63        let messages_consumed_total = IntCounterVec::new(
64            prometheus::Opts::new(
65                "rustrabbit_messages_consumed_total",
66                "Total number of messages consumed",
67            ),
68            &["queue", "consumer_tag", "status"],
69        )?;
70
71        let messages_failed_total = IntCounterVec::new(
72            prometheus::Opts::new(
73                "rustrabbit_messages_failed_total",
74                "Total number of failed message processing attempts",
75            ),
76            &["queue", "error_type", "retry_attempt"],
77        )?;
78
79        let messages_retried_total = IntCounterVec::new(
80            prometheus::Opts::new(
81                "rustrabbit_messages_retried_total",
82                "Total number of message retry attempts",
83            ),
84            &["queue", "retry_reason"],
85        )?;
86
87        let message_processing_duration = HistogramVec::new(
88            prometheus::HistogramOpts::new(
89                "rustrabbit_message_processing_duration_seconds",
90                "Time spent processing messages",
91            )
92            .buckets(vec![
93                0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
94            ]),
95            &["queue", "consumer_tag"],
96        )?;
97
98        let message_publish_duration = HistogramVec::new(
99            prometheus::HistogramOpts::new(
100                "rustrabbit_message_publish_duration_seconds",
101                "Time spent publishing messages",
102            )
103            .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]),
104            &["queue", "exchange"],
105        )?;
106
107        let connections_total = IntGauge::with_opts(prometheus::Opts::new(
108            "rustrabbit_connections_total",
109            "Total number of connections in the pool",
110        ))?;
111
112        let connections_healthy = IntGauge::with_opts(prometheus::Opts::new(
113            "rustrabbit_connections_healthy",
114            "Number of healthy connections",
115        ))?;
116
117        let connections_unhealthy = IntGauge::with_opts(prometheus::Opts::new(
118            "rustrabbit_connections_unhealthy",
119            "Number of unhealthy connections",
120        ))?;
121
122        let connection_attempts_total = IntCounter::with_opts(prometheus::Opts::new(
123            "rustrabbit_connection_attempts_total",
124            "Total number of connection attempts",
125        ))?;
126
127        let connection_failures_total = IntCounter::with_opts(prometheus::Opts::new(
128            "rustrabbit_connection_failures_total",
129            "Total number of connection failures",
130        ))?;
131
132        let queue_depth = IntGaugeVec::new(
133            prometheus::Opts::new(
134                "rustrabbit_queue_depth",
135                "Number of messages waiting in queue",
136            ),
137            &["queue"],
138        )?;
139
140        let consumer_count = IntGaugeVec::new(
141            prometheus::Opts::new(
142                "rustrabbit_consumer_count",
143                "Number of active consumers per queue",
144            ),
145            &["queue"],
146        )?;
147
148        let health_check_duration = Histogram::with_opts(
149            prometheus::HistogramOpts::new(
150                "rustrabbit_health_check_duration_seconds",
151                "Time spent performing health checks",
152            )
153            .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0]),
154        )?;
155
156        let last_health_check_timestamp = Gauge::with_opts(prometheus::Opts::new(
157            "rustrabbit_last_health_check_timestamp",
158            "Timestamp of last health check (Unix seconds)",
159        ))?;
160
161        // Register all metrics
162        registry.register(Box::new(messages_published_total.clone()))?;
163        registry.register(Box::new(messages_consumed_total.clone()))?;
164        registry.register(Box::new(messages_failed_total.clone()))?;
165        registry.register(Box::new(messages_retried_total.clone()))?;
166        registry.register(Box::new(message_processing_duration.clone()))?;
167        registry.register(Box::new(message_publish_duration.clone()))?;
168        registry.register(Box::new(connections_total.clone()))?;
169        registry.register(Box::new(connections_healthy.clone()))?;
170        registry.register(Box::new(connections_unhealthy.clone()))?;
171        registry.register(Box::new(connection_attempts_total.clone()))?;
172        registry.register(Box::new(connection_failures_total.clone()))?;
173        registry.register(Box::new(queue_depth.clone()))?;
174        registry.register(Box::new(consumer_count.clone()))?;
175        registry.register(Box::new(health_check_duration.clone()))?;
176        registry.register(Box::new(last_health_check_timestamp.clone()))?;
177
178        Ok(Self {
179            registry,
180            messages_published_total,
181            messages_consumed_total,
182            messages_failed_total,
183            messages_retried_total,
184            message_processing_duration,
185            message_publish_duration,
186            connections_total,
187            connections_healthy,
188            connections_unhealthy,
189            connection_attempts_total,
190            connection_failures_total,
191            queue_depth,
192            consumer_count,
193            health_check_duration,
194            last_health_check_timestamp,
195        })
196    }
197
198    /// Get the Prometheus registry for exposing metrics
199    pub fn registry(&self) -> Arc<Registry> {
200        self.registry.clone()
201    }
202
203    // Message metrics
204
205    /// Record a published message
206    pub fn record_message_published(&self, queue: &str, exchange: &str, routing_key: &str) {
207        self.messages_published_total
208            .with_label_values(&[queue, exchange, routing_key])
209            .inc();
210    }
211
212    /// Record a consumed message
213    pub fn record_message_consumed(&self, queue: &str, consumer_tag: &str, success: bool) {
214        let status = if success { "success" } else { "failed" };
215        self.messages_consumed_total
216            .with_label_values(&[queue, consumer_tag, status])
217            .inc();
218    }
219
220    /// Record a failed message processing
221    pub fn record_message_failed(&self, queue: &str, error_type: &str, retry_attempt: u32) {
222        self.messages_failed_total
223            .with_label_values(&[queue, error_type, &retry_attempt.to_string()])
224            .inc();
225    }
226
227    /// Record a message retry
228    pub fn record_message_retry(&self, queue: &str, retry_reason: &str) {
229        self.messages_retried_total
230            .with_label_values(&[queue, retry_reason])
231            .inc();
232    }
233
234    /// Record message processing duration
235    pub fn record_processing_duration(&self, queue: &str, consumer_tag: &str, duration: Duration) {
236        self.message_processing_duration
237            .with_label_values(&[queue, consumer_tag])
238            .observe(duration.as_secs_f64());
239    }
240
241    /// Record message publish duration
242    pub fn record_publish_duration(&self, queue: &str, exchange: &str, duration: Duration) {
243        self.message_publish_duration
244            .with_label_values(&[queue, exchange])
245            .observe(duration.as_secs_f64());
246    }
247
248    // Connection metrics
249
250    /// Update connection pool metrics
251    pub fn update_connection_pool(&self, total: i64, healthy: i64, unhealthy: i64) {
252        self.connections_total.set(total);
253        self.connections_healthy.set(healthy);
254        self.connections_unhealthy.set(unhealthy);
255    }
256
257    /// Record a connection attempt
258    pub fn record_connection_attempt(&self) {
259        self.connection_attempts_total.inc();
260    }
261
262    /// Record a connection failure
263    pub fn record_connection_failure(&self) {
264        self.connection_failures_total.inc();
265    }
266
267    // Queue metrics
268
269    /// Update queue depth
270    pub fn update_queue_depth(&self, queue: &str, depth: i64) {
271        self.queue_depth.with_label_values(&[queue]).set(depth);
272    }
273
274    /// Update consumer count
275    pub fn update_consumer_count(&self, queue: &str, count: i64) {
276        self.consumer_count.with_label_values(&[queue]).set(count);
277    }
278
279    // Health metrics
280
281    /// Record health check duration
282    pub fn record_health_check_duration(&self, duration: Duration) {
283        self.health_check_duration.observe(duration.as_secs_f64());
284        self.last_health_check_timestamp.set(
285            std::time::SystemTime::now()
286                .duration_since(std::time::UNIX_EPOCH)
287                .unwrap_or_default()
288                .as_secs_f64(),
289        );
290    }
291}
292
293impl Default for RustRabbitMetrics {
294    fn default() -> Self {
295        Self::new().expect("Failed to create default metrics")
296    }
297}
298
299/// Timer helper for measuring operation duration
300pub struct MetricsTimer {
301    start: Instant,
302}
303
304impl Default for MetricsTimer {
305    fn default() -> Self {
306        Self::new()
307    }
308}
309
310impl MetricsTimer {
311    /// Start a new timer
312    pub fn new() -> Self {
313        Self {
314            start: Instant::now(),
315        }
316    }
317
318    /// Get elapsed duration since timer started
319    pub fn elapsed(&self) -> Duration {
320        self.start.elapsed()
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_metrics_creation() {
330        let metrics = RustRabbitMetrics::new().unwrap();
331        assert!(!metrics.registry().gather().is_empty());
332    }
333
334    #[test]
335    fn test_message_metrics() {
336        let metrics = RustRabbitMetrics::new().unwrap();
337
338        metrics.record_message_published("test-queue", "test-exchange", "test-key");
339        metrics.record_message_consumed("test-queue", "test-consumer", true);
340        metrics.record_message_failed("test-queue", "timeout", 1);
341        metrics.record_message_retry("test-queue", "transient-error");
342
343        let metric_families = metrics.registry().gather();
344        assert!(!metric_families.is_empty());
345    }
346
347    #[test]
348    fn test_connection_metrics() {
349        let metrics = RustRabbitMetrics::new().unwrap();
350
351        metrics.update_connection_pool(5, 4, 1);
352        metrics.record_connection_attempt();
353        metrics.record_connection_failure();
354
355        let metric_families = metrics.registry().gather();
356        assert!(!metric_families.is_empty());
357    }
358
359    #[test]
360    fn test_timer() {
361        let timer = MetricsTimer::new();
362        std::thread::sleep(Duration::from_millis(1));
363        let elapsed = timer.elapsed();
364        assert!(elapsed >= Duration::from_millis(1));
365    }
366}