use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;
use crate::metrics::labels::{ErrorLabels, QueueLabels};
pub struct PrometheusMetrics {
registry: Registry,
messages_read: Family<QueueLabels, Counter>,
bytes_read: Family<QueueLabels, Counter>,
segments_written: Family<QueueLabels, Counter>,
segments_bytes: Family<QueueLabels, Counter>,
checkpoint_syncs: Counter,
errors: Family<ErrorLabels, Counter>,
messages_published: Family<QueueLabels, Counter>,
messages_confirmed: Family<QueueLabels, Counter>,
messages_failed: Family<QueueLabels, Counter>,
amqp_connections_active: Gauge,
stream_connections_active: Gauge,
}
impl PrometheusMetrics {
pub fn new() -> Self {
let mut registry = Registry::default();
let messages_read = Family::<QueueLabels, Counter>::default();
registry.register(
"rabbitmq_backup_messages_read",
"Total messages read during backup",
messages_read.clone(),
);
let bytes_read = Family::<QueueLabels, Counter>::default();
registry.register(
"rabbitmq_backup_bytes_read",
"Total bytes read during backup",
bytes_read.clone(),
);
let segments_written = Family::<QueueLabels, Counter>::default();
registry.register(
"rabbitmq_backup_segments_written",
"Total segments written",
segments_written.clone(),
);
let segments_bytes = Family::<QueueLabels, Counter>::default();
registry.register(
"rabbitmq_backup_segments_bytes",
"Total segment bytes written",
segments_bytes.clone(),
);
let checkpoint_syncs = Counter::default();
registry.register(
"rabbitmq_backup_checkpoint_syncs",
"Total checkpoint sync operations",
checkpoint_syncs.clone(),
);
let errors = Family::<ErrorLabels, Counter>::default();
registry.register(
"rabbitmq_backup_errors",
"Total errors by type",
errors.clone(),
);
let messages_published = Family::<QueueLabels, Counter>::default();
registry.register(
"rabbitmq_restore_messages_published",
"Total messages published during restore",
messages_published.clone(),
);
let messages_confirmed = Family::<QueueLabels, Counter>::default();
registry.register(
"rabbitmq_restore_messages_confirmed",
"Total messages confirmed during restore",
messages_confirmed.clone(),
);
let messages_failed = Family::<QueueLabels, Counter>::default();
registry.register(
"rabbitmq_restore_messages_failed",
"Total messages failed during restore",
messages_failed.clone(),
);
let amqp_connections_active = Gauge::default();
registry.register(
"rabbitmq_backup_amqp_connections_active",
"Active AMQP connections",
amqp_connections_active.clone(),
);
let stream_connections_active = Gauge::default();
registry.register(
"rabbitmq_backup_stream_connections_active",
"Active stream protocol connections",
stream_connections_active.clone(),
);
Self {
registry,
messages_read,
bytes_read,
segments_written,
segments_bytes,
checkpoint_syncs,
errors,
messages_published,
messages_confirmed,
messages_failed,
amqp_connections_active,
stream_connections_active,
}
}
pub fn inc_messages_read(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
self.messages_read
.get_or_create(&QueueLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
queue_type: queue_type.to_string(),
})
.inc_by(count);
}
pub fn inc_bytes_read(&self, queue: &str, vhost: &str, queue_type: &str, bytes: u64) {
self.bytes_read
.get_or_create(&QueueLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
queue_type: queue_type.to_string(),
})
.inc_by(bytes);
}
pub fn inc_segments_written(&self, queue: &str, vhost: &str, queue_type: &str) {
self.segments_written
.get_or_create(&QueueLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
queue_type: queue_type.to_string(),
})
.inc();
}
pub fn inc_segments_bytes(&self, queue: &str, vhost: &str, queue_type: &str, bytes: u64) {
self.segments_bytes
.get_or_create(&QueueLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
queue_type: queue_type.to_string(),
})
.inc_by(bytes);
}
pub fn inc_checkpoint_syncs(&self) {
self.checkpoint_syncs.inc();
}
pub fn inc_errors(&self, queue: &str, vhost: &str, error_type: &str) {
self.errors
.get_or_create(&ErrorLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
error_type: error_type.to_string(),
})
.inc();
}
pub fn inc_messages_published(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
self.messages_published
.get_or_create(&QueueLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
queue_type: queue_type.to_string(),
})
.inc_by(count);
}
pub fn inc_messages_confirmed(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
self.messages_confirmed
.get_or_create(&QueueLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
queue_type: queue_type.to_string(),
})
.inc_by(count);
}
pub fn inc_messages_failed(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
self.messages_failed
.get_or_create(&QueueLabels {
queue: queue.to_string(),
vhost: vhost.to_string(),
queue_type: queue_type.to_string(),
})
.inc_by(count);
}
pub fn inc_amqp_connections(&self) {
self.amqp_connections_active.inc();
}
pub fn dec_amqp_connections(&self) {
self.amqp_connections_active.dec();
}
pub fn inc_stream_connections(&self) {
self.stream_connections_active.inc();
}
pub fn dec_stream_connections(&self) {
self.stream_connections_active.dec();
}
pub fn encode(&self) -> String {
let mut buf = String::new();
prometheus_client::encoding::text::encode(&mut buf, &self.registry)
.expect("Prometheus encoding should not fail");
buf
}
}
impl Default for PrometheusMetrics {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_encode() {
let metrics = PrometheusMetrics::new();
metrics.inc_messages_read("orders", "/", "classic", 42);
metrics.inc_errors("orders", "/", "amqp");
let output = metrics.encode();
assert!(output.contains("rabbitmq_backup_messages_read"));
assert!(output.contains("rabbitmq_backup_errors"));
assert!(output.contains("42"));
}
#[test]
fn test_metrics_connection_gauge() {
let metrics = PrometheusMetrics::new();
metrics.inc_amqp_connections();
metrics.inc_amqp_connections();
metrics.dec_amqp_connections();
let output = metrics.encode();
assert!(output.contains("rabbitmq_backup_amqp_connections_active"));
}
}