use crate::config::GatewayConfig;
use crate::error::{GatewayError, Result};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
const DEBOUNCE_MS: u64 = 500;
pub struct FileWatcher {
config_path: PathBuf,
watch_directory: Option<PathBuf>,
last_config: Arc<RwLock<Option<GatewayConfig>>>,
reload_count: Arc<std::sync::atomic::AtomicU64>,
}
#[derive(Debug, Clone)]
pub struct ReloadEvent {
pub trigger_path: PathBuf,
pub config: std::result::Result<GatewayConfig, String>,
pub timestamp: Instant,
}
impl FileWatcher {
pub fn new(config_path: impl AsRef<Path>) -> Self {
Self {
config_path: config_path.as_ref().to_path_buf(),
watch_directory: None,
last_config: Arc::new(RwLock::new(None)),
reload_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn with_directory(mut self, dir: impl AsRef<Path>) -> Self {
self.watch_directory = Some(dir.as_ref().to_path_buf());
self
}
pub fn config_path(&self) -> &Path {
&self.config_path
}
pub fn watch_directory(&self) -> Option<&Path> {
self.watch_directory.as_deref()
}
pub fn reload_count(&self) -> u64 {
self.reload_count.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn last_config(&self) -> Option<GatewayConfig> {
self.last_config.read().unwrap().clone()
}
pub fn load_config(&self) -> Result<GatewayConfig> {
let content = std::fs::read_to_string(&self.config_path).map_err(|e| {
GatewayError::Config(format!(
"Failed to read config file {}: {}",
self.config_path.display(),
e
))
})?;
let config = GatewayConfig::from_hcl(&content)?;
config.validate()?;
let mut last = self.last_config.write().unwrap();
*last = Some(config.clone());
Ok(config)
}
pub fn watch(&self) -> Result<mpsc::Receiver<ReloadEvent>> {
let (event_tx, event_rx) = mpsc::channel();
let (notify_tx, notify_rx) = mpsc::channel();
let config_path = self.config_path.clone();
let watch_dir = self.watch_directory.clone();
let last_config = self.last_config.clone();
let reload_count = self.reload_count.clone();
let mut watcher: RecommendedWatcher = Watcher::new(notify_tx, notify::Config::default())
.map_err(|e| GatewayError::Other(format!("Failed to create file watcher: {}", e)))?;
let watch_path = config_path.parent().unwrap_or_else(|| Path::new("."));
watcher
.watch(watch_path, RecursiveMode::NonRecursive)
.map_err(|e| {
GatewayError::Other(format!("Failed to watch {}: {}", watch_path.display(), e))
})?;
if let Some(ref dir) = watch_dir {
if dir.exists() {
watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
GatewayError::Other(format!(
"Failed to watch directory {}: {}",
dir.display(),
e
))
})?;
}
}
std::thread::spawn(move || {
let _watcher = watcher; let mut last_event_time = Instant::now();
loop {
match notify_rx.recv() {
Ok(Ok(event)) => {
if !is_relevant_event(&event) {
continue;
}
let now = Instant::now();
if now.duration_since(last_event_time) < Duration::from_millis(DEBOUNCE_MS)
{
continue;
}
last_event_time = now;
let trigger_path = event
.paths
.first()
.cloned()
.unwrap_or_else(|| config_path.clone());
tracing::info!(
path = %trigger_path.display(),
"Config file change detected, reloading"
);
let content = match std::fs::read_to_string(&config_path) {
Ok(c) => c,
Err(e) => {
let _ = event_tx.send(ReloadEvent {
trigger_path,
config: Err(format!("Failed to read config: {}", e)),
timestamp: now,
});
continue;
}
};
let config_result = GatewayConfig::from_hcl(&content).and_then(|c| {
c.validate()?;
Ok(c)
});
match &config_result {
Ok(config) => {
let mut last = last_config.write().unwrap();
*last = Some(config.clone());
reload_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::info!("Configuration reloaded successfully");
}
Err(e) => {
tracing::error!(
error = %e,
"Config reload failed, keeping previous config"
);
}
}
let _ = event_tx.send(ReloadEvent {
trigger_path,
config: config_result.map_err(|e| e.to_string()),
timestamp: now,
});
}
Ok(Err(e)) => {
tracing::warn!(error = %e, "File watcher error");
}
Err(_) => {
break;
}
}
}
});
Ok(event_rx)
}
}
fn is_relevant_event(event: &Event) -> bool {
matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
)
}
pub fn is_config_file(path: &Path) -> bool {
path.extension().map(|ext| ext == "hcl").unwrap_or(false)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_file_watcher() {
let watcher = FileWatcher::new("/etc/gateway/config.hcl");
assert_eq!(watcher.config_path(), Path::new("/etc/gateway/config.hcl"));
assert!(watcher.watch_directory().is_none());
assert_eq!(watcher.reload_count(), 0);
}
#[test]
fn test_with_directory() {
let watcher =
FileWatcher::new("/etc/gateway/config.hcl").with_directory("/etc/gateway/conf.d");
assert_eq!(
watcher.watch_directory(),
Some(Path::new("/etc/gateway/conf.d"))
);
}
#[test]
fn test_last_config_initially_none() {
let watcher = FileWatcher::new("/nonexistent.hcl");
assert!(watcher.last_config().is_none());
}
#[test]
fn test_load_config_missing_file() {
let watcher = FileWatcher::new("/nonexistent/gateway.hcl");
let result = watcher.load_config();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Failed to read"));
}
#[test]
fn test_load_config_valid() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.hcl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
let config = watcher.load_config().unwrap();
assert!(config.entrypoints.contains_key("web"));
assert!(watcher.last_config().is_some());
}
#[test]
fn test_load_config_invalid_hcl() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.hcl");
std::fs::write(&config_path, "not valid hcl {{{").unwrap();
let watcher = FileWatcher::new(&config_path);
let result = watcher.load_config();
assert!(result.is_err());
}
#[test]
fn test_load_config_stores_last_good() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.hcl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
assert!(watcher.last_config().is_none());
watcher.load_config().unwrap();
assert!(watcher.last_config().is_some());
}
#[test]
fn test_is_config_file_hcl() {
assert!(is_config_file(Path::new("gateway.hcl")));
}
#[test]
fn test_is_config_file_toml_rejected() {
assert!(!is_config_file(Path::new("gateway.toml")));
}
#[test]
fn test_is_config_file_other() {
assert!(!is_config_file(Path::new("readme.md")));
assert!(!is_config_file(Path::new("binary.exe")));
assert!(!is_config_file(Path::new("noext")));
assert!(!is_config_file(Path::new("config.yaml")));
assert!(!is_config_file(Path::new("config.yml")));
}
#[test]
fn test_reload_event_success() {
let event = ReloadEvent {
trigger_path: PathBuf::from("/etc/gateway.hcl"),
config: Ok(GatewayConfig::default()),
timestamp: Instant::now(),
};
assert!(event.config.is_ok());
}
#[test]
fn test_reload_event_failure() {
let event = ReloadEvent {
trigger_path: PathBuf::from("/etc/gateway.hcl"),
config: Err("parse error".to_string()),
timestamp: Instant::now(),
};
assert!(event.config.is_err());
assert_eq!(event.config.unwrap_err(), "parse error");
}
#[test]
fn test_watch_creates_watcher() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.hcl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
let rx = watcher.watch();
assert!(rx.is_ok());
}
#[test]
fn test_watch_detects_file_change() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.hcl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
let rx = watcher.watch().unwrap();
std::thread::sleep(Duration::from_millis(100));
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:8080"
}
"#,
)
.unwrap();
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(event) => {
assert!(event.config.is_ok());
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
#[test]
fn test_watch_invalid_config_keeps_last_good() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.hcl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
watcher.load_config().unwrap(); let rx = watcher.watch().unwrap();
std::thread::sleep(Duration::from_millis(100));
std::fs::write(&config_path, "invalid {{{{").unwrap();
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(event) => {
assert!(event.config.is_err());
assert!(watcher.last_config().is_some());
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
#[test]
fn test_is_relevant_event() {
let modify = Event {
kind: EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![],
attrs: Default::default(),
};
assert!(is_relevant_event(&modify));
let create = Event {
kind: EventKind::Create(notify::event::CreateKind::File),
paths: vec![],
attrs: Default::default(),
};
assert!(is_relevant_event(&create));
let access = Event {
kind: EventKind::Access(notify::event::AccessKind::Read),
paths: vec![],
attrs: Default::default(),
};
assert!(!is_relevant_event(&access));
}
#[test]
fn test_reload_count_initial() {
let watcher = FileWatcher::new("/tmp/test.hcl");
assert_eq!(watcher.reload_count(), 0);
}
}