use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use arc_swap::ArcSwap;
use tokio::net::UdpSocket;
use tracing::{error, info, warn};
use crate::cache::DnsCache;
use crate::config::{self, Config};
use crate::dns::Resolver;
pub struct AppState {
pub config: ArcSwap<Config>,
pub cache: Arc<DnsCache>,
pub resolver: Resolver,
pub start_time: std::time::Instant,
pub query_count: std::sync::atomic::AtomicU64,
pub config_path: PathBuf,
pub last_mtime: std::sync::Mutex<Option<std::time::SystemTime>>,
}
pub fn build_state(cfg: Config, no_cache: bool, config_path: PathBuf) -> Arc<AppState> {
let cache_enabled = cfg.server.cache_enabled && !no_cache;
let cache = Arc::new(DnsCache::new(
cfg.server.cache_size,
cfg.server.cache_ttl,
cache_enabled,
));
let resolver = Resolver::new(cache.clone());
let initial_mtime = mtime(&config_path);
Arc::new(AppState {
config: ArcSwap::new(Arc::new(cfg)),
cache,
resolver,
start_time: std::time::Instant::now(),
query_count: std::sync::atomic::AtomicU64::new(0),
config_path,
last_mtime: std::sync::Mutex::new(initial_mtime),
})
}
pub async fn run(cfg: Config, no_cache: bool, config_path: PathBuf) -> Result<()> {
let mgmt_enabled = cfg.server.mgmt_port > 0;
let mgmt_addr = format!("{}:{}", cfg.server.mgmt_host, cfg.server.mgmt_port);
let hot_reload = cfg.server.hot_reload;
let peers = cfg.server.peers.clone();
let bind_addr = format!("{}:{}", cfg.server.host, cfg.server.port);
let state = build_state(cfg, no_cache, config_path);
let socket = UdpSocket::bind(&bind_addr).await?;
info!("DNS listening on udp://{}", bind_addr);
let socket = Arc::new(socket);
if hot_reload {
let state2 = state.clone();
tokio::spawn(async move { watch_config(state2).await });
}
if mgmt_enabled {
let state3 = state.clone();
let mgmt_addr2 = mgmt_addr.clone();
tokio::spawn(async move {
match tokio::net::TcpListener::bind(&mgmt_addr2).await {
Ok(listener) => {
if let Err(e) = crate::mgmt::start_with_listener(state3, listener).await {
error!("Management API error: {}", e);
}
}
Err(e) => error!("Management API bind failed on {}: {}", mgmt_addr2, e),
}
});
}
if !peers.is_empty() {
let state4 = state.clone();
tokio::spawn(async move { crate::sync::reconcile_loop(state4, peers).await });
}
serve_udp(socket, state).await
}
pub async fn serve_udp(socket: Arc<UdpSocket>, state: Arc<AppState>) -> Result<()> {
let mut buf = vec![0u8; 4096];
loop {
match socket.recv_from(&mut buf).await {
Ok((n, src)) => {
let query = buf[..n].to_vec();
let sock = socket.clone();
let st = state.clone();
tokio::spawn(async move { handle_packet(query, src, sock, st).await });
}
Err(e) => {
#[cfg(windows)]
if let Some(raw) = e.raw_os_error() {
if raw == 10054 {
tracing::debug!("UDP WSAECONNRESET — ignored");
continue;
}
}
warn!("UDP recv error: {}", e);
}
}
}
}
async fn handle_packet(
query: Vec<u8>,
src: SocketAddr,
socket: Arc<UdpSocket>,
state: Arc<AppState>,
) {
state.query_count.fetch_add(1, Ordering::Relaxed);
let config = state.config.load();
let response = state.resolver.resolve(&query, &config).await;
if response.is_empty() {
return;
}
if let Err(e) = socket.send_to(&response, src).await {
warn!("Send to {} failed: {}", src, e);
}
}
pub async fn watch_config(state: Arc<AppState>) {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
let cur = mtime(&state.config_path);
let changed = {
let known = state.last_mtime.lock().unwrap();
cur != *known
};
if !changed {
continue;
}
info!("Config changed — reloading...");
match config::load(&state.config_path) {
Ok(mut new_cfg) => {
let new_version = state.config.load().server.config_version + 1;
new_cfg.server.config_version = new_version;
let peers = new_cfg.server.peers.clone();
state.cache.invalidate();
state.config.store(Arc::new(new_cfg));
info!("Config reloaded — config_version now {}", new_version);
if let Err(e) = config::persist_version(&state.config_path, new_version) {
warn!("Could not persist config_version: {}", e);
}
*state.last_mtime.lock().unwrap() = mtime(&state.config_path);
if !peers.is_empty() {
let cfg_snapshot = (*state.config.load()).clone();
tokio::spawn(async move {
crate::sync::push_to_peers(&cfg_snapshot, &peers).await;
});
}
}
Err(e) => {
*state.last_mtime.lock().unwrap() = cur;
warn!("Config reload failed: {} — keeping current config", e);
}
}
}
}
pub fn mtime(path: &PathBuf) -> Option<std::time::SystemTime> {
std::fs::metadata(path).and_then(|m| m.modified()).ok()
}