use crate::config::GatewayConfig;
use crate::error::{GatewayError, Result};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
const DEBOUNCE_MS: u64 = 500;
pub struct FileWatcher {
config_path: PathBuf,
watch_directory: Option<PathBuf>,
last_config: Arc<RwLock<Option<GatewayConfig>>>,
reload_count: Arc<std::sync::atomic::AtomicU64>,
}
#[derive(Debug, Clone)]
pub struct ReloadEvent {
pub trigger_path: PathBuf,
pub config: std::result::Result<GatewayConfig, String>,
pub timestamp: Instant,
}
impl FileWatcher {
pub fn new(config_path: impl AsRef<Path>) -> Self {
Self {
config_path: config_path.as_ref().to_path_buf(),
watch_directory: None,
last_config: Arc::new(RwLock::new(None)),
reload_count: Arc::new(std::sync::atomic::AtomicU64::new(0)),
}
}
pub fn with_directory(mut self, dir: impl AsRef<Path>) -> Self {
self.watch_directory = Some(dir.as_ref().to_path_buf());
self
}
pub fn config_path(&self) -> &Path {
&self.config_path
}
pub fn watch_directory(&self) -> Option<&Path> {
self.watch_directory.as_deref()
}
pub fn reload_count(&self) -> u64 {
self.reload_count.load(std::sync::atomic::Ordering::Relaxed)
}
pub fn last_config(&self) -> Option<GatewayConfig> {
self.last_config.read().unwrap().clone()
}
pub fn load_config(&self) -> Result<GatewayConfig> {
let content = read_combined_config(&self.config_path, self.watch_directory.as_deref())?;
let config = GatewayConfig::from_acl(&content)?;
config.validate()?;
let mut last = self.last_config.write().unwrap();
*last = Some(config.clone());
Ok(config)
}
pub fn watch(&self) -> Result<mpsc::Receiver<ReloadEvent>> {
let (event_tx, event_rx) = mpsc::channel();
let (notify_tx, notify_rx) = mpsc::channel();
let config_path = self.config_path.clone();
let watch_dir = self.watch_directory.clone();
let last_config = self.last_config.clone();
let reload_count = self.reload_count.clone();
let mut watcher: RecommendedWatcher = Watcher::new(notify_tx, notify::Config::default())
.map_err(|e| GatewayError::Other(format!("Failed to create file watcher: {}", e)))?;
let watch_path = config_path.parent().unwrap_or_else(|| Path::new("."));
watcher
.watch(watch_path, RecursiveMode::NonRecursive)
.map_err(|e| {
GatewayError::Other(format!("Failed to watch {}: {}", watch_path.display(), e))
})?;
if let Some(ref dir) = watch_dir {
if dir.exists() {
watcher.watch(dir, RecursiveMode::Recursive).map_err(|e| {
GatewayError::Other(format!(
"Failed to watch directory {}: {}",
dir.display(),
e
))
})?;
}
}
std::thread::spawn(move || {
let _watcher = watcher; let mut last_event_time = Instant::now();
loop {
match notify_rx.recv() {
Ok(Ok(event)) => {
if !is_relevant_config_event(&event, &config_path, watch_dir.as_deref()) {
continue;
}
let now = Instant::now();
if now.duration_since(last_event_time) < Duration::from_millis(DEBOUNCE_MS)
{
continue;
}
last_event_time = now;
let trigger_path = event
.paths
.first()
.cloned()
.unwrap_or_else(|| config_path.clone());
tracing::info!(
path = %trigger_path.display(),
"Config file change detected, reloading"
);
let content = match read_combined_config(&config_path, watch_dir.as_deref())
{
Ok(c) => c,
Err(e) => {
let _ = event_tx.send(ReloadEvent {
trigger_path,
config: Err(e.to_string()),
timestamp: now,
});
continue;
}
};
let config_result = GatewayConfig::from_acl(&content).and_then(|c| {
c.validate()?;
Ok(c)
});
match &config_result {
Ok(config) => {
let mut last = last_config.write().unwrap();
*last = Some(config.clone());
reload_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
tracing::info!("Configuration reloaded successfully");
}
Err(e) => {
tracing::error!(
error = %e,
"Config reload failed, keeping previous config"
);
}
}
let _ = event_tx.send(ReloadEvent {
trigger_path,
config: config_result.map_err(|e| e.to_string()),
timestamp: now,
});
}
Ok(Err(e)) => {
tracing::warn!(error = %e, "File watcher error");
}
Err(_) => {
break;
}
}
}
});
Ok(event_rx)
}
}
fn is_relevant_event(event: &Event) -> bool {
matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_)
)
}
fn is_relevant_config_event(event: &Event, config_path: &Path, watch_dir: Option<&Path>) -> bool {
is_relevant_event(event)
&& event
.paths
.iter()
.any(|path| is_watched_config_path(path, config_path, watch_dir))
}
fn is_watched_config_path(path: &Path, config_path: &Path, watch_dir: Option<&Path>) -> bool {
if paths_equivalent(path, config_path) {
return true;
}
if !is_config_file(path) {
return false;
}
watch_dir.is_some_and(|dir| path.starts_with(dir))
}
fn read_combined_config(config_path: &Path, watch_dir: Option<&Path>) -> Result<String> {
if !is_config_file(config_path) {
return Err(GatewayError::Config(
"Gateway config files must use .acl extension".to_string(),
));
}
let mut content = read_config_file(config_path)?;
for path in collect_config_files(watch_dir, config_path)? {
content.push_str("\n\n");
content.push_str(&read_config_file(&path)?);
}
Ok(content)
}
fn read_config_file(path: &Path) -> Result<String> {
std::fs::read_to_string(path).map_err(|e| {
GatewayError::Config(format!(
"Failed to read config file {}: {}",
path.display(),
e
))
})
}
fn collect_config_files(watch_dir: Option<&Path>, config_path: &Path) -> Result<Vec<PathBuf>> {
let Some(dir) = watch_dir else {
return Ok(Vec::new());
};
if !dir.exists() {
return Ok(Vec::new());
}
let mut paths = Vec::new();
collect_config_files_recursive(dir, config_path, &mut paths)?;
paths.sort();
Ok(paths)
}
fn collect_config_files_recursive(
dir: &Path,
config_path: &Path,
paths: &mut Vec<PathBuf>,
) -> Result<()> {
let entries = std::fs::read_dir(dir).map_err(|e| {
GatewayError::Config(format!(
"Failed to read config directory {}: {}",
dir.display(),
e
))
})?;
for entry in entries {
let entry = entry.map_err(|e| {
GatewayError::Config(format!(
"Failed to read config directory entry {}: {}",
dir.display(),
e
))
})?;
let path = entry.path();
let file_type = entry.file_type().map_err(|e| {
GatewayError::Config(format!(
"Failed to inspect config path {}: {}",
path.display(),
e
))
})?;
if file_type.is_dir() {
collect_config_files_recursive(&path, config_path, paths)?;
} else if file_type.is_file()
&& is_config_file(&path)
&& !paths_equivalent(&path, config_path)
{
paths.push(path);
}
}
Ok(())
}
fn paths_equivalent(left: &Path, right: &Path) -> bool {
if left == right {
return true;
}
match (std::fs::canonicalize(left), std::fs::canonicalize(right)) {
(Ok(left), Ok(right)) => left == right,
_ => false,
}
}
pub fn is_config_file(path: &Path) -> bool {
path.extension().map(|ext| ext == "acl").unwrap_or(false)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_file_watcher() {
let watcher = FileWatcher::new("/etc/gateway/config.acl");
assert_eq!(watcher.config_path(), Path::new("/etc/gateway/config.acl"));
assert!(watcher.watch_directory().is_none());
assert_eq!(watcher.reload_count(), 0);
}
#[test]
fn test_with_directory() {
let watcher =
FileWatcher::new("/etc/gateway/config.acl").with_directory("/etc/gateway/conf.d");
assert_eq!(
watcher.watch_directory(),
Some(Path::new("/etc/gateway/conf.d"))
);
}
#[test]
fn test_last_config_initially_none() {
let watcher = FileWatcher::new("/nonexistent.acl");
assert!(watcher.last_config().is_none());
}
#[test]
fn test_load_config_missing_file() {
let watcher = FileWatcher::new("/nonexistent/gateway.acl");
let result = watcher.load_config();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Failed to read"));
}
#[test]
fn test_load_config_valid() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
let config = watcher.load_config().unwrap();
assert!(config.entrypoints.contains_key("web"));
assert!(watcher.last_config().is_some());
}
#[test]
fn test_load_config_with_directory_merges_acl_fragments() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
let conf_dir = dir.path().join("conf.d");
std::fs::create_dir(&conf_dir).unwrap();
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
std::fs::write(
conf_dir.join("10-service.acl"),
r#"
services "backend" {
load_balancer {
servers {
url = "http://127.0.0.1:8001"
}
}
}
"#,
)
.unwrap();
std::fs::write(
conf_dir.join("20-router.acl"),
r#"
routers "api" {
rule = "PathPrefix(`/api`)"
service = "backend"
entrypoints = ["web"]
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path).with_directory(&conf_dir);
let config = watcher.load_config().unwrap();
assert!(config.entrypoints.contains_key("web"));
assert!(config.services.contains_key("backend"));
assert!(config.routers.contains_key("api"));
}
#[test]
fn test_load_config_ignores_non_acl_fragments() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
let conf_dir = dir.path().join("conf.d");
std::fs::create_dir(&conf_dir).unwrap();
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
std::fs::write(conf_dir.join("notes.txt"), "this is not acl {{{").unwrap();
let watcher = FileWatcher::new(&config_path).with_directory(&conf_dir);
let config = watcher.load_config().unwrap();
assert_eq!(config.entrypoints.len(), 1);
assert!(config.routers.is_empty());
}
#[test]
fn test_load_config_rejects_non_acl_main_extension() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.txt");
std::fs::write(&config_path, "entrypoints \"web\" {}").unwrap();
let watcher = FileWatcher::new(&config_path);
let result = watcher.load_config();
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains(".acl"));
}
#[test]
fn test_load_config_invalid_acl() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
std::fs::write(&config_path, "not valid acl {{{").unwrap();
let watcher = FileWatcher::new(&config_path);
let result = watcher.load_config();
assert!(result.is_err());
}
#[test]
fn test_load_config_stores_last_good() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
assert!(watcher.last_config().is_none());
watcher.load_config().unwrap();
assert!(watcher.last_config().is_some());
}
#[test]
fn test_is_config_file_acl() {
assert!(is_config_file(Path::new("gateway.acl")));
}
#[test]
fn test_is_config_file_toml_rejected() {
assert!(!is_config_file(Path::new("gateway.toml")));
}
#[test]
fn test_is_config_file_other() {
assert!(!is_config_file(Path::new("readme.md")));
assert!(!is_config_file(Path::new("binary.exe")));
assert!(!is_config_file(Path::new("noext")));
assert!(!is_config_file(Path::new("config.yaml")));
assert!(!is_config_file(Path::new("config.yml")));
}
#[test]
fn test_reload_event_success() {
let event = ReloadEvent {
trigger_path: PathBuf::from("/etc/gateway.acl"),
config: Ok(GatewayConfig::default()),
timestamp: Instant::now(),
};
assert!(event.config.is_ok());
}
#[test]
fn test_reload_event_failure() {
let event = ReloadEvent {
trigger_path: PathBuf::from("/etc/gateway.acl"),
config: Err("parse error".to_string()),
timestamp: Instant::now(),
};
assert!(event.config.is_err());
assert_eq!(event.config.unwrap_err(), "parse error");
}
#[test]
fn test_watch_creates_watcher() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
let rx = watcher.watch();
assert!(rx.is_ok());
}
#[test]
fn test_watch_detects_file_change() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
let rx = watcher.watch().unwrap();
std::thread::sleep(Duration::from_millis(100));
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:8080"
}
"#,
)
.unwrap();
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(event) => {
assert!(event.config.is_ok());
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
#[test]
fn test_watch_invalid_config_keeps_last_good() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
std::fs::write(
&config_path,
r#"
entrypoints "web" {
address = "0.0.0.0:80"
}
"#,
)
.unwrap();
let watcher = FileWatcher::new(&config_path);
watcher.load_config().unwrap(); let rx = watcher.watch().unwrap();
std::thread::sleep(Duration::from_millis(100));
std::fs::write(&config_path, "invalid {{{{").unwrap();
match rx.recv_timeout(Duration::from_secs(2)) {
Ok(event) => {
assert!(event.config.is_err());
assert!(watcher.last_config().is_some());
}
Err(mpsc::RecvTimeoutError::Timeout) => {
}
Err(e) => panic!("Unexpected error: {:?}", e),
}
}
#[test]
fn test_is_relevant_event() {
let modify = Event {
kind: EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![],
attrs: Default::default(),
};
assert!(is_relevant_event(&modify));
let create = Event {
kind: EventKind::Create(notify::event::CreateKind::File),
paths: vec![],
attrs: Default::default(),
};
assert!(is_relevant_event(&create));
let access = Event {
kind: EventKind::Access(notify::event::AccessKind::Read),
paths: vec![],
attrs: Default::default(),
};
assert!(!is_relevant_event(&access));
}
#[test]
fn test_is_relevant_config_event_filters_paths() {
let dir = tempfile::tempdir().unwrap();
let config_path = dir.path().join("gateway.acl");
let conf_dir = dir.path().join("conf.d");
std::fs::create_dir(&conf_dir).unwrap();
std::fs::write(&config_path, "").unwrap();
let event_for_main = Event {
kind: EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![config_path.clone()],
attrs: Default::default(),
};
assert!(is_relevant_config_event(
&event_for_main,
&config_path,
Some(&conf_dir)
));
let event_for_fragment = Event {
kind: EventKind::Create(notify::event::CreateKind::File),
paths: vec![conf_dir.join("api.acl")],
attrs: Default::default(),
};
assert!(is_relevant_config_event(
&event_for_fragment,
&config_path,
Some(&conf_dir)
));
let event_for_unrelated_file = Event {
kind: EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)),
paths: vec![dir.path().join("notes.txt")],
attrs: Default::default(),
};
assert!(!is_relevant_config_event(
&event_for_unrelated_file,
&config_path,
Some(&conf_dir)
));
}
#[test]
fn test_reload_count_initial() {
let watcher = FileWatcher::new("/tmp/test.acl");
assert_eq!(watcher.reload_count(), 0);
}
}