vmonitor 0.1.0

A simple and lightweight system monitor
Documentation
use crate::api;
use crate::config::Endpoint;
use crate::features::metrics::Metrics;
use futures::{sink::SinkExt, stream::StreamExt};
use futures_util::stream::SplitStream;
use tokio::{
    net::TcpStream,
    sync::{mpsc, watch},
    time::{interval, sleep, Duration},
};
use tokio_tungstenite::{
    tungstenite::{Bytes, Message},
    MaybeTlsStream, WebSocketStream,
};
use tracing::{debug, error, info, warn};

#[derive(Clone)]
struct Config {
    metrics_interval: Duration,
}
impl Config {
    fn new() -> Self {
        Self {
            metrics_interval: Duration::from_secs(10),
        }
    }
    fn validate(&self) -> Result<(), String> {
        if self.metrics_interval < Duration::from_secs(1) {
            return Err("Metrics interval must be at least 1 second".to_string());
        }
        Ok(())
    }
}
pub struct Monitor {
    pub endpoint: Endpoint,
    config_tx: watch::Sender<Config>,
    config_rx: watch::Receiver<Config>,
}

enum WriteMessage {
    Data(Vec<u8>),
    Pong(Bytes),
    Close,
}

impl Monitor {
    pub fn new(endpoint: Endpoint) -> Self {
        let (config_tx, config_rx) = watch::channel(Config::new());
        Self {
            endpoint,
            config_tx,
            config_rx,
        }
    }

    pub async fn run(&self) {
        let mut retry_count = 0;

        loop {
            let endpoint = self.endpoint.clone();
            let strategy = endpoint.connection.unwrap();

            let socket = match api::connect_websocket(
                endpoint.server.as_str(),
                endpoint.secret.as_str(),
                &strategy,
            )
            .await
            {
                Some(socket) => socket,
                None => {
                    return;
                }
            };
            let (mut write, mut read) = socket.split();
            let (tx, mut rx) = mpsc::channel::<WriteMessage>(100);

            let write_task = tokio::spawn(async move {
                while let Some(msg) = rx.recv().await {
                    match msg {
                        WriteMessage::Data(data) => {
                            if let Err(e) = write.send(Message::Binary(Bytes::from(data))).await {
                                eprintln!("Write error: {}", e);
                                break;
                            }
                        }
                        WriteMessage::Pong(data) => {
                            if let Err(e) = write.send(Message::Pong(Bytes::from(data))).await {
                                eprintln!("Write error: {}", e);
                                break;
                            }
                        }
                        WriteMessage::Close => {
                            if let Err(e) = write.send(Message::Close(None)).await {
                                eprintln!("Write error: {}", e);
                            }
                            break;
                        }
                    }
                }
            });
            let send_metrics_tx = tx.clone();
            let metrics_config_rx = self.config_rx.clone();
            let send_metrics_task = tokio::spawn(async move {
                Monitor::send_metrics(send_metrics_tx, metrics_config_rx).await;
            });
            let command_handle_tx = tx.clone();
            let config_tx = self.config_tx.clone();
            let command_handle_task = tokio::spawn(async move {
                Monitor::handle_command(&endpoint, &mut read, command_handle_tx, config_tx).await
            });

            let _ = tokio::try_join!(write_task, send_metrics_task, command_handle_task);

            retry_count += 1;
            if strategy.max_retries >= 0 && retry_count > strategy.max_retries {
                let delay = strategy.base_delay * 2u64.pow(retry_count.min(16) as u32 - 1);
                let delay = delay.min(strategy.max_delay);

                debug!(
                    "Operation failed (attempt {}), retrying in {} seconds",
                    retry_count, delay
                );
                sleep(Duration::from_secs(delay)).await;
            }
        }
    }

    async fn send_metrics(tx: mpsc::Sender<WriteMessage>, mut config_rx: watch::Receiver<Config>) {
        let mut metrics_interval = interval(config_rx.borrow().metrics_interval);
        let mut metrics = Metrics::new();

        loop {
            tokio::select! {
                result = config_rx.changed() => {
                    if result.is_ok() {
                        metrics_interval = interval(config_rx.borrow().metrics_interval);
                        debug!("Metrics interval updated to {:?}", config_rx.borrow().metrics_interval);
                    }
                }
                _ = metrics_interval.tick() => {
                    let data = metrics.collet_metrics().await;
                    let msg = api::Message {
                        r#type: "metrics".to_string(),
                        data,
                    };
                    match rmp_serde::to_vec_named(&msg) {
                        Ok(binary_data) => {
                            if let Err(e) = tx.send(WriteMessage::Data(binary_data)).await {
                                warn!(error = %e, "Failed to report system data");
                                break;
                            }
                        }
                        Err(e) => {
                            warn!(error = %e, "Failed to serialize system data");
                        }
                    }
                }
            }
        }
    }

    async fn handle_command(
        endpoint: &Endpoint,
        read: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
        tx: mpsc::Sender<WriteMessage>,
        config_tx: watch::Sender<Config>,
    ) {
        let mut metrics = Metrics::new();

        loop {
            let msg = read.next().await;
            let Some(msg) = msg else {
                break;
            };
            let command = match msg {
                Ok(Message::Text(text)) => {
                    debug!(endpoint = %endpoint.name, message = %text, "Received WebSocket message");
                    match serde_json::from_str::<api::Message<serde_json::Value>>(&text) {
                        Ok(value) => {
                            debug!(endpoint = %endpoint.name, json = ?value, "Parsed WebSocket message");
                            Some(value)
                        }
                        Err(e) => {
                            warn!(endpoint = %endpoint.name, error = %e, "Failed to parse WebSocket message as JSON");
                            None
                        }
                    }
                }
                Ok(Message::Binary(binary)) => {
                    debug!(endpoint = %endpoint.name, binary = ?binary, "Received binary message");
                    match rmp_serde::from_slice::<api::Message<serde_json::Value>>(&binary) {
                        Ok(api_msg) => Some(api_msg),
                        Err(e) => {
                            warn!(endpoint = %endpoint.name, error = %e, "Failed to parse as api::Message");
                            None
                        }
                    }
                }
                Ok(Message::Ping(ping)) => {
                    if let Err(e) = tx.send(WriteMessage::Pong(ping)).await {
                        error!(endpoint = %endpoint.name, error = %e, "Failed to send pong response");
                    }
                    None
                }
                _ => None,
            };

            match command {
                Some(value) => match value.r#type.as_str() {
                    "get_info" => {
                        let vm_info = metrics.collect_vm_info();
                        info!(endpoint = %endpoint.name, vm_info = ?vm_info, "Sending VM info response");
                        let response = api::Message {
                            r#type: "vm_info".to_string(),
                            data: vm_info,
                        };
                        if let Ok(msgpack) = rmp_serde::to_vec_named(&response) {
                            if let Err(e) = tx.send(WriteMessage::Data(msgpack)).await {
                                warn!(endpoint = %endpoint.name, error = %e, "Failed to send VM info response");
                            }
                            info!(endpoint = %endpoint.name, "Sent VM info response");
                        }
                    }
                    "update_config" => {
                        if let Ok(probe_config) =
                            serde_json::from_value::<api::ProbeConfig>(value.data)
                        {
                            info!(endpoint = %endpoint.name, config = ?probe_config, "Received server configuration");
                            let new_config = Config {
                                metrics_interval: Duration::from_secs(
                                    probe_config.metrics_interval,
                                ),
                            };
                            if let Err(e) = new_config.validate() {
                                warn!(error = %e, "Invalid configuration received");
                                continue;
                            }
                            if let Err(e) = config_tx.send(new_config) {
                                warn!(error = %e, "Failed to update configuration");
                            } else {
                                info!("Configuration updated successfully");
                            }
                        }
                    }
                    _ => {
                        info!(endpoint = %endpoint.name, message = ?value, "Received unknown message type")
                    }
                },
                None => {}
            }
        }
    }
}