mod api;
mod config;
mod monitor;
use futures_util::{SinkExt, StreamExt};
use std::env;
use sysinfo::{Networks, System};
use tokio::signal;
use tokio::time::{interval, Duration};
use tokio_tungstenite::tungstenite::Message;
use tracing::{error, info, warn};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_env_filter("info").init();
let config_path =
env::var("VMONITOR_CONFIG_PATH").unwrap_or_else(|_| "config.toml".to_string());
info!(config_path = %config_path, "Starting application");
let config = match config::AppConfig::from_file(&config_path) {
Ok(cfg) => cfg,
Err(e) => {
error!(error = %e, "Failed to load config");
std::process::exit(1);
}
};
info!("Configuration loaded");
let shutdown_signal = async {
signal::ctrl_c()
.await
.expect("Failed to listen for shutdown signal");
info!("Received shutdown signal");
};
info!("Connecting to WebSocket...");
let mut retry_count = 0;
let socket = loop {
match api::connect_websocket(&config.websocket_url, &config.auth_secret).await {
Some(socket) => break socket,
None => {
if config.connection.max_retries >= 0
&& retry_count >= config.connection.max_retries
{
error!(
"Failed to connect to WebSocket after {} attempts",
retry_count
);
return;
}
retry_count += 1;
let delay = config.connection.base_delay * 2u64.pow(retry_count.min(16) as u32 - 1);
let delay = delay.min(config.connection.max_delay);
warn!(
retry = retry_count,
next_attempt_in = delay,
"WebSocket connection failed, retrying..."
);
tokio::time::sleep(Duration::from_secs(delay)).await;
}
}
};
info!("WebSocket connection established");
let (mut write, mut read) = socket.split();
info!("Waiting for authentication confirmation...");
let auth_success = async {
while let Some(msg) = read.next().await {
match msg {
Ok(msg) => {
if let Message::Text(text) = msg {
match serde_json::from_str::<serde_json::Value>(&text) {
Ok(json) => {
if let Some(msg_type) = json.get("type").and_then(|v| v.as_str()) {
match msg_type {
"auth-success" => {
info!("Authentication successful");
return true;
}
"auth-failed" => {
warn!("Authentication failed");
return false;
}
_ => {
info!(type = %msg_type, "Received unexpected message type during auth");
}
}
}
}
Err(e) => {
warn!(error = %e, "Failed to parse WebSocket message as JSON");
}
}
}
}
Err(e) => {
error!(error = %e, "WebSocket read error during authentication");
return false;
}
}
}
false
};
if !auth_success.await {
error!("Failed to authenticate with WebSocket server");
return;
}
let mut system = System::new();
let mut networks = Networks::new();
let mut system_interval = interval(Duration::from_secs(config.interval.system));
let mut network_interval = interval(Duration::from_secs(config.interval.network));
let collect_task = async {
loop {
tokio::select! {
_ = system_interval.tick() => {
let system_data = monitor::collect_system_info(&mut system);
info!(data = ?system_data, "Collected system information");
let msg = api::ReportMessage {
r#type: "system".to_string(),
data: serde_json::to_string(&system_data).unwrap(),
};
if let Err(e) = write.send(Message::Text(serde_json::to_string(&msg).unwrap().into())).await {
warn!(error = %e, "Failed to report system data");
}
}
_ = network_interval.tick() => {
let network_data = monitor::collect_network_info(&mut networks);
info!(data = ?network_data, "Collected network information");
let msg = api::ReportMessage {
r#type: "network".to_string(),
data: serde_json::to_string(&network_data).unwrap(),
};
if let Err(e) = write.send(Message::Text(serde_json::to_string(&msg).unwrap().into())).await {
warn!(error = %e, "Failed to report network data");
}
}
Some(msg) = read.next() => {
match msg {
Ok(msg) => {
if let Message::Text(text) = msg {
info!(message = %text, "Received WebSocket message");
match serde_json::from_str::<serde_json::Value>(&text) {
Ok(json) => {
info!(json = ?json, "Parsed WebSocket message");
}
Err(e) => {
warn!(error = %e, "Failed to parse WebSocket message as JSON");
}
}
}
}
Err(e) => {
error!(error = %e, "WebSocket read error");
break; }
}
}
}
}
};
tokio::select! {
_ = shutdown_signal => {
info!("Shutting down...");
}
_ = collect_task => {
warn!("Collect task exited unexpectedly");
}
}
}