use rust_network_mgr::config::{load_config, validate_config};
use rust_network_mgr::network::NetworkMonitor;
use rust_network_mgr::nftables::NftablesManager;
use rust_network_mgr::socket::SocketHandler;
use rust_network_mgr::types::{AppConfig, AppError, ControlCommand, NetworkEvent, NetworkState, Result};
use std::collections::HashMap; use std::net::IpAddr; use std::path::Path;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio::signal::unix::{signal, SignalKind};
const EVENT_CHANNEL_SIZE: usize = 100;
const COMMAND_CHANNEL_SIZE: usize = 10;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
tracing::info!("Starting Rust Network Manager...");
let app_state = Arc::new(Mutex::new(AppState::new()));
let initial_config = load_initial_config(None)?;
let (network_tx, mut network_rx) = mpsc::channel::<NetworkEvent>(EVENT_CHANNEL_SIZE);
let (control_tx, mut control_rx) = mpsc::channel::<ControlCommand>(COMMAND_CHANNEL_SIZE);
let network_monitor = NetworkMonitor::new(network_tx);
let mut nftables_manager = NftablesManager::new(initial_config.clone());
let socket_handler = SocketHandler::new(initial_config.socket_path.as_deref(), control_tx.clone()).await?;
nftables_manager.load_rules()?;
{
let state = app_state.lock().await.network_state.clone();
nftables_manager.apply_rules(&state).await.map_err(|e| {
tracing::error!("Failed to apply initial NFTables rules: {}", e);
AppError::Nftables(format!("Initial apply failed: {}", e))
})?;
}
tracing::info!("Initial setup complete.");
let monitor_handle = tokio::spawn(async move {
if let Err(e) = network_monitor.start().await {
tracing::error!("Network monitor failed: {}", e);
}
tracing::info!("Network monitor task finished.");
});
let socket_handle = tokio::spawn(async move {
socket_handler.start().await;
tracing::info!("Socket handler task finished.");
});
let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
tracing::info!("Entering main event loop...");
loop {
tokio::select! {
Some(event) = network_rx.recv() => {
tracing::debug!("Received network event: {:?}", event);
let mut state_guard = app_state.lock().await;
handle_network_event(&mut state_guard.network_state, event);
let current_state = state_guard.network_state.clone();
drop(state_guard);
if let Err(e) = nftables_manager.apply_rules(¤t_state).await {
tracing::error!("Failed to apply NFTables rules after network event: {}", e);
}
}
Some(command) = control_rx.recv() => {
tracing::info!("Received control command: {:?}", command);
match command {
ControlCommand::Reload => {
tracing::info!("Reload command received. Reloading configuration...");
match load_initial_config(None) { Ok(new_config) => {
nftables_manager = NftablesManager::new(new_config);
if let Err(e) = nftables_manager.load_rules() {
tracing::error!("Failed to load NFTables rules during reload: {}", e);
} else {
let state_guard = app_state.lock().await;
let current_state = state_guard.network_state.clone();
drop(state_guard);
if let Err(e) = nftables_manager.apply_rules(¤t_state).await {
tracing::error!("Failed to apply NFTables rules after reload: {}", e);
}
tracing::info!("Reload complete.");
}
}
Err(e) => {
tracing::error!("Failed to reload configuration: {}", e);
}
}
}
ControlCommand::Status => {
let state_guard = app_state.lock().await;
tracing::info!("Current Network State: {:?}", state_guard.network_state);
}
ControlCommand::Ping => {
tracing::debug!("Ping command processed.");
}
ControlCommand::Shutdown => {
tracing::info!("Shutdown command received. Initiating graceful shutdown...");
break; }
}
}
_ = sigint.recv() => {
tracing::info!("Received SIGINT. Initiating graceful shutdown...");
if let Err(e) = control_tx.send(ControlCommand::Shutdown).await {
tracing::error!("Failed to send shutdown command internally: {}", e);
break; }
}
_ = sigterm.recv() => {
tracing::info!("Received SIGTERM. Initiating graceful shutdown...");
if let Err(e) = control_tx.send(ControlCommand::Shutdown).await {
tracing::error!("Failed to send shutdown command internally: {}", e);
break; }
}
else => {
tracing::info!("All channels closed or signal handlers errored. Shutting down.");
break;
}
}
}
tracing::info!("Shutting down tasks...");
if let Err(e) = monitor_handle.await {
tracing::error!("Error joining monitor task: {:?}", e);
}
if let Err(e) = socket_handle.await {
tracing::error!("Error joining socket task: {:?}", e);
}
tracing::info!("Rust Network Manager shut down gracefully.");
Ok(())
}
fn load_initial_config(path: Option<&Path>) -> Result<AppConfig> {
let config = load_config(path)?;
validate_config(&config)?;
tracing::info!("Configuration loaded and validated successfully.");
Ok(config)
}
pub struct AppState {
network_state: NetworkState,
}
impl AppState {
fn new() -> Self {
AppState {
network_state: NetworkState::new(),
}
}
}
fn handle_network_event(state: &mut NetworkState, event: NetworkEvent) {
match event {
NetworkEvent::IpAdded(if_name, ip) => {
let if_name_clone = if_name.clone(); let ips = state.interface_ips.entry(if_name).or_default();
if !ips.contains(&ip) {
ips.push(ip);
tracing::debug!("State updated: Added IP {} to {}", ip, if_name_clone);
}
}
NetworkEvent::IpRemoved(if_name, ip) => {
let if_name_clone = if_name.clone(); if let Some(ips) = state.interface_ips.get_mut(&if_name) {
if let Some(pos) = ips.iter().position(|&x| x == ip) {
ips.remove(pos);
tracing::debug!("State updated: Removed IP {} from {}", ip, &if_name_clone);
if ips.is_empty() {
state.interface_ips.remove(&if_name_clone);
tracing::debug!("Removed interface {} from state as it has no IPs.", if_name_clone);
}
}
}
}
}
}