rotonda 0.4.0

composable, programmable BGP engine
Documentation
use std::{
    fmt::{Debug, Display},
    sync::{atomic::Ordering::SeqCst, Arc},
    time::Duration,
};

use log::{debug, info, trace, warn};

use crate::common::status_reporter::{
    sr_log, AnyStatusReporter, Chainable, Named, TargetStatusReporter,
};

use super::{config::Destination, metrics::MqttMetrics};

#[derive(Debug, Default)]
pub struct MqttStatusReporter {
    name: String,
    metrics: Arc<MqttMetrics>,
}

impl MqttStatusReporter {
    pub fn new<T: Display>(name: T, metrics: Arc<MqttMetrics>) -> Self {
        Self {
            name: format!("{}", name),
            metrics,
        }
    }

    pub fn metrics(&self) -> Arc<MqttMetrics> {
        self.metrics.clone()
    }

    pub fn connecting(&self, broker_address: &Destination) {
        sr_log!(debug: self, "Connecting to MQTT server {}", broker_address);
    }

    pub fn connected(&self, broker_address: &Destination) {
        sr_log!(info: self, "Connected to MQTT server at {}", broker_address);
        self.metrics
            .connection_established_state
            .store(true, SeqCst);
    }

    pub fn disconnected(&self, broker_address: &Destination) {
        sr_log!(info: self, "Disconnected from MQTT server at {}", broker_address);
        self.metrics
            .connection_established_state
            .store(false, SeqCst);
    }

    pub fn connection_error<T: Display>(&self, err: T) {
        sr_log!(warn: self, "MQTT connection error: {}", err);
        self.metrics.connection_error_count.fetch_add(1, SeqCst);
    }

    pub fn reconnecting(&self, connect_retry_secs: Duration) {
        sr_log!(
            info: self,
            "Reconnecting in {} seconds",
            connect_retry_secs.as_secs()
        );
        self.metrics
            .connection_established_state
            .store(false, SeqCst);
        self.metrics.connection_lost_count.fetch_add(1, SeqCst);
    }

    pub fn publishing<T: Display, C: Display>(&self, topic: T, content: C) {
        sr_log!(
            trace: self,
            "Publishing message {} to topic {}",
            content, topic
        );
    }

    pub fn publish_ok(&self, topic: String) {
        sr_log!(
            debug: self,
            "Published message to topic {}",
            topic
        );

        let metrics = self.metrics.topic_metrics(Arc::new(topic));
        metrics.publish_counts.fetch_add(1, SeqCst);
    }

    pub fn publish_error<T: Display>(&self, err: T) {
        sr_log!(warn: self, "Publishing failed: {}", err);
        self.metrics.publish_error_count.fetch_add(1, SeqCst);
    }

    pub fn inflight_update(&self, inflight: u16) {
        self.metrics.in_flight_count.store(inflight, SeqCst);
    }
}

impl TargetStatusReporter for MqttStatusReporter {}

impl AnyStatusReporter for MqttStatusReporter {
    fn metrics(&self) -> Option<Arc<dyn crate::metrics::Source>> {
        Some(self.metrics.clone())
    }
}

impl Chainable for MqttStatusReporter {
    fn add_child<T: Display>(&self, child_name: T) -> Self {
        Self::new(self.link_names(child_name), self.metrics.clone())
    }
}

impl Named for MqttStatusReporter {
    fn name(&self) -> &str {
        &self.name
    }
}