use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use anyhow::Result;
use tokio::{
sync::broadcast,
time::{MissedTickBehavior, interval},
};
use tracing::{debug, info, warn};
use crate::{
config::{self, runtime::RuntimeConfig},
gateway::live_config::detect_restart_fields,
};
const POLL_INTERVAL: Duration = Duration::from_secs(2);
#[derive(Debug, Clone)]
pub enum ConfigChange {
AgentUpdated(String),
ChannelUpdated(String),
ModelUpdated(String),
SkillUpdated(String),
PluginUpdated(String),
SessionUpdated,
CronUpdated,
HooksUpdated,
FullReload(Arc<RuntimeConfig>),
RequiresRestart(Vec<String>),
}
pub struct FileWatcher {
path: PathBuf,
last_hash: u64,
last_config: Option<RuntimeConfig>,
tx: broadcast::Sender<ConfigChange>,
}
impl FileWatcher {
pub fn new(path: PathBuf) -> (Self, broadcast::Receiver<ConfigChange>) {
let (tx, rx) = broadcast::channel(64);
let hash = hash_file(&path).unwrap_or(0);
let last_config = config::load_from(path.clone()).ok();
(
Self {
path,
last_hash: hash,
last_config,
tx,
},
rx,
)
}
pub fn subscribe(&self) -> broadcast::Receiver<ConfigChange> {
self.tx.subscribe()
}
pub async fn run(&mut self) {
let mut ticker = interval(POLL_INTERVAL);
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
info!(path = %self.path.display(), "hot-reload watcher started");
loop {
ticker.tick().await;
let new_hash = match hash_file(&self.path) {
Ok(h) => h,
Err(e) => {
warn!(path = %self.path.display(), "cannot hash config file: {e}");
continue;
}
};
if new_hash == self.last_hash {
continue;
}
self.last_hash = new_hash;
debug!(path = %self.path.display(), "config file changed");
self.process_change().await;
}
}
async fn process_change(&mut self) {
match config::load_from(self.path.clone()) {
Ok(new_cfg) => {
let restart_fields = match self.last_config {
Some(ref old) => detect_restart_fields(&old.gateway, &new_cfg.gateway),
None => vec![],
};
self.last_config = Some(new_cfg.clone());
if !restart_fields.is_empty() {
warn!(?restart_fields, "config change requires gateway restart");
let _ = self.tx.send(ConfigChange::RequiresRestart(restart_fields));
return;
}
info!("config hot-reloaded successfully");
let _ = self.tx.send(ConfigChange::FullReload(Arc::new(new_cfg)));
}
Err(e) => {
warn!("hot-reload failed (config error): {e:#}");
}
}
}
}
fn hash_file(path: &Path) -> Result<u64> {
let content = std::fs::read(path)?;
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
Ok(hasher.finish())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn detects_file_change() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("test.json5");
std::fs::write(&path, r#"{}"#).expect("write");
let (watcher, _rx) = FileWatcher::new(path.clone());
let initial_hash = watcher.last_hash;
std::fs::write(&path, r#"{"agents": {}}"#).expect("modify");
let new_hash = hash_file(&path).expect("hash");
assert_ne!(
initial_hash, new_hash,
"file hash should change after modification"
);
}
#[test]
fn hash_stable_for_same_content() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("cfg.json5");
std::fs::write(&path, r#"{"gateway": {}}"#).expect("write");
let h1 = hash_file(&path).expect("h1");
let h2 = hash_file(&path).expect("h2");
assert_eq!(h1, h2);
}
#[tokio::test]
async fn emits_full_reload_when_no_restart_fields_changed() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("cfg.json5");
std::fs::write(&path, r#"{}"#).expect("write initial");
let (mut watcher, mut rx) = FileWatcher::new(path.clone());
std::fs::write(&path, r#"{}"#).expect("write again");
watcher.process_change().await;
match rx.try_recv() {
Ok(ConfigChange::FullReload(_)) => {}
other => panic!("expected FullReload, got {other:?}"),
}
}
#[tokio::test]
async fn second_save_after_restart_fields_does_not_re_trigger() {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("cfg.json5");
std::fs::write(&path, r#"{}"#).expect("write initial");
let (mut watcher, mut rx) = FileWatcher::new(path.clone());
watcher.process_change().await;
let _ = rx.try_recv();
watcher.process_change().await;
match rx.try_recv() {
Ok(ConfigChange::FullReload(_)) => {}
other => panic!("expected FullReload on second save, got {other:?}"),
}
}
}