svc-agent 0.21.0

An agent library.
Documentation
use std::collections::HashMap;
use std::time::{Duration, Instant};

use chrono::{DateTime, Utc};
use log::error;
use tokio::sync::{
    mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
    oneshot,
};

use crate::mqtt::ExtraTags;
use crate::mqtt::{IncomingMessage, PublishableMessage};

struct QueueCounter {
    cmd_rx: UnboundedReceiver<TimestampedCommand>,
    counters: HashMap<ExtraTags, QueuesCounterInstant>,
}

#[derive(Debug, Default, Clone)]
pub struct QueuesCounter {
    pub incoming_requests: u64,
    pub incoming_responses: u64,
    pub incoming_events: u64,
    pub outgoing_requests: u64,
    pub outgoing_responses: u64,
    pub outgoing_events: u64,
    pub incoming_bytes: u64,
}

#[derive(Clone)]
pub struct QueuesCounterInstant {
    pub result: QueuesCounter,
    updated_at: Instant,
}

impl Default for QueuesCounterInstant {
    fn default() -> Self {
        Self {
            updated_at: Instant::now(),
            result: Default::default(),
        }
    }
}

#[derive(Debug)]
enum Command {
    IncomingRequest(ExtraTags, u64),
    IncomingResponse(ExtraTags),
    IncomingEvent(ExtraTags),
    OutgoingRequest(ExtraTags),
    OutgoingResponse(ExtraTags),
    OutgoingEvent(ExtraTags),
    GetThroughput(oneshot::Sender<HashMap<ExtraTags, QueuesCounter>>),
}

#[derive(Debug)]
struct TimestampedCommand {
    command: Command,
    #[allow(dead_code)]
    timestamp: DateTime<Utc>,
}

#[derive(Clone)]
pub struct QueueCounterHandle {
    cmd_tx: UnboundedSender<TimestampedCommand>,
}

impl QueueCounterHandle {
    pub(crate) fn start() -> Self {
        let (cmd_tx, cmd_rx) = unbounded_channel();

        let mut counter = QueueCounter {
            cmd_rx,
            counters: HashMap::new(),
        };
        tokio::spawn(async move { counter.start_loop().await });
        Self { cmd_tx }
    }

    pub(crate) fn add_incoming_message(&self, msg: &IncomingMessage<String>) {
        let command = match msg {
            IncomingMessage::Event(ev) => {
                let tags = ev.properties().tags().to_owned();
                Command::IncomingEvent(tags)
            }
            IncomingMessage::Request(req) => {
                let method = req.properties().method().to_owned();
                let mut tags = req.properties().tags().to_owned();
                tags.set_method(&method);
                let bytes = req.payload().len() as u64;
                Command::IncomingRequest(tags, bytes)
            }
            IncomingMessage::Response(resp) => {
                let tags = resp.properties().tags().to_owned();
                Command::IncomingResponse(tags)
            }
        };

        self.send_command(command);
    }

    pub(crate) fn add_outgoing_message(&self, dump: &PublishableMessage) {
        let command = match dump {
            PublishableMessage::Event(ev) => {
                let tags = ev.tags().to_owned();
                Command::OutgoingEvent(tags)
            }
            PublishableMessage::Request(req) => {
                let tags = req.tags().to_owned();
                Command::OutgoingRequest(tags)
            }
            PublishableMessage::Response(resp) => {
                let tags = resp.tags().to_owned();
                Command::OutgoingResponse(tags)
            }
        };
        self.send_command(command);
    }

    fn send_command(&self, command: Command) {
        if let Err(e) = self.cmd_tx.send(TimestampedCommand {
            timestamp: Utc::now(),
            command,
        }) {
            error!("Failed to send command, reason = {:?}", e);
        }
    }

    pub async fn get_stats(&self) -> Result<HashMap<ExtraTags, QueuesCounter>, String> {
        let (resp_tx, resp_rx) = oneshot::channel();
        let command = Command::GetThroughput(resp_tx);

        self.send_command(command);

        resp_rx
            .await
            .map_err(|e| format!("get_stats went wrong: {:?}", e))
    }
}

const EVICTION_PERIOD: Duration = Duration::from_secs(600);
const EVICTION_CHECK_PERIOD: Duration = Duration::from_secs(5);

impl QueueCounter {
    async fn start_loop(&mut self) {
        let mut last_checked = Instant::now();

        while let Some(c) = self.cmd_rx.recv().await {
            if last_checked.elapsed() > EVICTION_CHECK_PERIOD {
                self.evict_old_counters();
                last_checked = Instant::now();
            }

            match c.command {
                Command::GetThroughput(resp_tx) => {
                    if resp_tx.send(self.fold()).is_err() {
                        error!("The receiving end was dropped before this was called");
                    }
                }
                Command::IncomingRequest(tags, bytes) => {
                    let c = self.counters.entry(tags).or_default();
                    c.result.incoming_requests += 1;
                    c.result.incoming_bytes += bytes;
                    c.updated_at = Instant::now();
                }
                Command::IncomingResponse(tags) => {
                    let c = self.counters.entry(tags).or_default();
                    c.result.incoming_responses += 1;
                    c.updated_at = Instant::now();
                }
                Command::IncomingEvent(tags) => {
                    let c = self.counters.entry(tags).or_default();
                    c.result.incoming_events += 1;
                    c.updated_at = Instant::now();
                }
                Command::OutgoingRequest(tags) => {
                    let c = self.counters.entry(tags).or_default();
                    c.result.outgoing_requests += 1;
                    c.updated_at = Instant::now();
                }
                Command::OutgoingResponse(tags) => {
                    let c = self.counters.entry(tags).or_default();
                    c.result.outgoing_responses += 1;
                    c.updated_at = Instant::now();
                }
                Command::OutgoingEvent(tags) => {
                    let c = self.counters.entry(tags).or_default();
                    c.result.outgoing_events += 1;
                    c.updated_at = Instant::now();
                }
            }
        }
    }

    fn evict_old_counters(&mut self) {
        self.counters
            .retain(|_k, v| v.updated_at.elapsed() < EVICTION_PERIOD)
    }

    fn fold(&self) -> HashMap<ExtraTags, QueuesCounter> {
        self.counters
            .iter()
            .map(|(k, v)| (k.clone(), v.result.clone()))
            .collect()
    }
}