rabbitmq-backup-core 0.1.0

Core engine for RabbitMQ backup and restore operations
Documentation
//! Prometheus metrics registry.
//!
//! Follows kafka-backup's pattern: `prometheus-client` with `Family<Labels, Counter/Gauge>`.

use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;

use crate::metrics::labels::{ErrorLabels, QueueLabels};

/// Central metrics registry for all backup/restore counters and gauges.
pub struct PrometheusMetrics {
    registry: Registry,

    // Backup metrics
    messages_read: Family<QueueLabels, Counter>,
    bytes_read: Family<QueueLabels, Counter>,
    segments_written: Family<QueueLabels, Counter>,
    segments_bytes: Family<QueueLabels, Counter>,
    checkpoint_syncs: Counter,
    errors: Family<ErrorLabels, Counter>,

    // Restore metrics
    messages_published: Family<QueueLabels, Counter>,
    messages_confirmed: Family<QueueLabels, Counter>,
    messages_failed: Family<QueueLabels, Counter>,

    // Connection metrics
    amqp_connections_active: Gauge,
    stream_connections_active: Gauge,
}

impl PrometheusMetrics {
    pub fn new() -> Self {
        let mut registry = Registry::default();

        let messages_read = Family::<QueueLabels, Counter>::default();
        registry.register(
            "rabbitmq_backup_messages_read",
            "Total messages read during backup",
            messages_read.clone(),
        );

        let bytes_read = Family::<QueueLabels, Counter>::default();
        registry.register(
            "rabbitmq_backup_bytes_read",
            "Total bytes read during backup",
            bytes_read.clone(),
        );

        let segments_written = Family::<QueueLabels, Counter>::default();
        registry.register(
            "rabbitmq_backup_segments_written",
            "Total segments written",
            segments_written.clone(),
        );

        let segments_bytes = Family::<QueueLabels, Counter>::default();
        registry.register(
            "rabbitmq_backup_segments_bytes",
            "Total segment bytes written",
            segments_bytes.clone(),
        );

        let checkpoint_syncs = Counter::default();
        registry.register(
            "rabbitmq_backup_checkpoint_syncs",
            "Total checkpoint sync operations",
            checkpoint_syncs.clone(),
        );

        let errors = Family::<ErrorLabels, Counter>::default();
        registry.register(
            "rabbitmq_backup_errors",
            "Total errors by type",
            errors.clone(),
        );

        let messages_published = Family::<QueueLabels, Counter>::default();
        registry.register(
            "rabbitmq_restore_messages_published",
            "Total messages published during restore",
            messages_published.clone(),
        );

        let messages_confirmed = Family::<QueueLabels, Counter>::default();
        registry.register(
            "rabbitmq_restore_messages_confirmed",
            "Total messages confirmed during restore",
            messages_confirmed.clone(),
        );

        let messages_failed = Family::<QueueLabels, Counter>::default();
        registry.register(
            "rabbitmq_restore_messages_failed",
            "Total messages failed during restore",
            messages_failed.clone(),
        );

        let amqp_connections_active = Gauge::default();
        registry.register(
            "rabbitmq_backup_amqp_connections_active",
            "Active AMQP connections",
            amqp_connections_active.clone(),
        );

        let stream_connections_active = Gauge::default();
        registry.register(
            "rabbitmq_backup_stream_connections_active",
            "Active stream protocol connections",
            stream_connections_active.clone(),
        );

        Self {
            registry,
            messages_read,
            bytes_read,
            segments_written,
            segments_bytes,
            checkpoint_syncs,
            errors,
            messages_published,
            messages_confirmed,
            messages_failed,
            amqp_connections_active,
            stream_connections_active,
        }
    }

    // -- Backup metrics --

    pub fn inc_messages_read(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
        self.messages_read
            .get_or_create(&QueueLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                queue_type: queue_type.to_string(),
            })
            .inc_by(count);
    }

    pub fn inc_bytes_read(&self, queue: &str, vhost: &str, queue_type: &str, bytes: u64) {
        self.bytes_read
            .get_or_create(&QueueLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                queue_type: queue_type.to_string(),
            })
            .inc_by(bytes);
    }

    pub fn inc_segments_written(&self, queue: &str, vhost: &str, queue_type: &str) {
        self.segments_written
            .get_or_create(&QueueLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                queue_type: queue_type.to_string(),
            })
            .inc();
    }

    pub fn inc_segments_bytes(&self, queue: &str, vhost: &str, queue_type: &str, bytes: u64) {
        self.segments_bytes
            .get_or_create(&QueueLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                queue_type: queue_type.to_string(),
            })
            .inc_by(bytes);
    }

    pub fn inc_checkpoint_syncs(&self) {
        self.checkpoint_syncs.inc();
    }

    pub fn inc_errors(&self, queue: &str, vhost: &str, error_type: &str) {
        self.errors
            .get_or_create(&ErrorLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                error_type: error_type.to_string(),
            })
            .inc();
    }

    // -- Restore metrics --

    pub fn inc_messages_published(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
        self.messages_published
            .get_or_create(&QueueLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                queue_type: queue_type.to_string(),
            })
            .inc_by(count);
    }

    pub fn inc_messages_confirmed(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
        self.messages_confirmed
            .get_or_create(&QueueLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                queue_type: queue_type.to_string(),
            })
            .inc_by(count);
    }

    pub fn inc_messages_failed(&self, queue: &str, vhost: &str, queue_type: &str, count: u64) {
        self.messages_failed
            .get_or_create(&QueueLabels {
                queue: queue.to_string(),
                vhost: vhost.to_string(),
                queue_type: queue_type.to_string(),
            })
            .inc_by(count);
    }

    // -- Connection metrics --

    pub fn inc_amqp_connections(&self) {
        self.amqp_connections_active.inc();
    }

    pub fn dec_amqp_connections(&self) {
        self.amqp_connections_active.dec();
    }

    pub fn inc_stream_connections(&self) {
        self.stream_connections_active.inc();
    }

    pub fn dec_stream_connections(&self) {
        self.stream_connections_active.dec();
    }

    /// Encode all metrics in Prometheus text format.
    pub fn encode(&self) -> String {
        let mut buf = String::new();
        prometheus_client::encoding::text::encode(&mut buf, &self.registry)
            .expect("Prometheus encoding should not fail");
        buf
    }
}

impl Default for PrometheusMetrics {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_metrics_encode() {
        let metrics = PrometheusMetrics::new();
        metrics.inc_messages_read("orders", "/", "classic", 42);
        metrics.inc_errors("orders", "/", "amqp");

        let output = metrics.encode();
        assert!(output.contains("rabbitmq_backup_messages_read"));
        assert!(output.contains("rabbitmq_backup_errors"));
        assert!(output.contains("42"));
    }

    #[test]
    fn test_metrics_connection_gauge() {
        let metrics = PrometheusMetrics::new();
        metrics.inc_amqp_connections();
        metrics.inc_amqp_connections();
        metrics.dec_amqp_connections();

        let output = metrics.encode();
        assert!(output.contains("rabbitmq_backup_amqp_connections_active"));
    }
}