use crate::config::Config;
use crate::error::{Error, Result};
use arc_swap::ArcSwap;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::mpsc::{self, Receiver};
use std::sync::{Arc, RwLock, Weak};
use std::thread;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ConfigChangeEvent {
Reloaded {
path: PathBuf,
timestamp: SystemTime,
},
ReloadFailed {
path: PathBuf,
error: String,
timestamp: SystemTime,
},
FileModified {
path: PathBuf,
timestamp: SystemTime,
},
FileDeleted {
path: PathBuf,
timestamp: SystemTime,
},
}
const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(100);
type Handler = Arc<dyn Fn(&ConfigChangeEvent) + Send + Sync + 'static>;
pub(crate) struct HandlerList {
handlers: ArcSwap<Vec<(u64, Handler)>>,
next_id: AtomicU64,
}
impl HandlerList {
fn new() -> Self {
Self {
handlers: ArcSwap::from_pointee(Vec::new()),
next_id: AtomicU64::new(0),
}
}
fn dispatch(&self, event: &ConfigChangeEvent) {
let snapshot = self.handlers.load();
for (_id, handler) in snapshot.iter() {
let h = Arc::clone(handler);
let _ = catch_unwind(AssertUnwindSafe(move || {
h(event);
}));
}
}
fn register(&self, handler: Handler) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
self.handlers.rcu(|current| {
let mut next = Vec::with_capacity(current.len() + 1);
next.extend(current.iter().cloned());
next.push((id, Arc::clone(&handler)));
next
});
id
}
fn unregister(&self, id: u64) {
self.handlers.rcu(|current| {
current
.iter()
.filter(|(other_id, _)| *other_id != id)
.cloned()
.collect::<Vec<_>>()
});
}
}
#[must_use = "dropping the Subscription immediately unregisters the handler; bind to a name (`let _sub = ...`) or call `.forget()` to keep the handler alive"]
pub struct Subscription {
list: Weak<HandlerList>,
id: u64,
}
impl Subscription {
pub fn forget(mut self) {
self.list = Weak::new();
}
}
impl Drop for Subscription {
fn drop(&mut self) {
if let Some(list) = self.list.upgrade() {
list.unregister(self.id);
}
}
}
impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("id", &self.id)
.field("alive", &(self.list.strong_count() > 0))
.finish()
}
}
pub struct HotReloadConfig {
current: Arc<RwLock<Config>>,
file_path: PathBuf,
last_modified: SystemTime,
handlers: Arc<HandlerList>,
bridges: Vec<Subscription>,
poll_interval: Duration,
debounce: Duration,
polling_fallback_enabled: bool,
}
impl HotReloadConfig {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let config = Config::from_file(&path)?;
let last_modified = std::fs::metadata(&path)
.map_err(|e| Error::io(path.display().to_string(), e))?
.modified()
.map_err(|e| Error::io(path.display().to_string(), e))?;
Ok(Self {
current: Arc::new(RwLock::new(config)),
file_path: path,
last_modified,
handlers: Arc::new(HandlerList::new()),
bridges: Vec::new(),
poll_interval: Duration::from_secs(1),
debounce: DEFAULT_DEBOUNCE,
polling_fallback_enabled: false,
})
}
pub fn with_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn with_debounce(mut self, debounce: Duration) -> Self {
self.debounce = debounce;
self
}
pub fn with_polling_fallback(mut self) -> Self {
self.polling_fallback_enabled = true;
self
}
pub fn on_change<F>(&self, handler: F) -> Subscription
where
F: Fn(&ConfigChangeEvent) + Send + Sync + 'static,
{
let id = self.handlers.register(Arc::new(handler));
Subscription {
list: Arc::downgrade(&self.handlers),
id,
}
}
#[deprecated(
since = "1.0.0",
note = "use `on_change` for lock-free dispatch; this method continues to work but pays an mpsc allocation per event"
)]
pub fn with_change_notifications(mut self) -> (Self, Receiver<ConfigChangeEvent>) {
let (tx, rx) = mpsc::channel();
let bridge = self.on_change(move |event| {
let _ = tx.send(event.clone());
});
self.bridges.push(bridge);
(self, rx)
}
pub fn config(&self) -> Arc<RwLock<Config>> {
Arc::clone(&self.current)
}
pub fn snapshot(&self) -> Result<Config> {
Config::from_file(&self.file_path)
}
pub fn reload(&mut self) -> Result<bool> {
let metadata = std::fs::metadata(&self.file_path)
.map_err(|e| Error::io(self.file_path.display().to_string(), e))?;
let modified = metadata
.modified()
.map_err(|e| Error::io(self.file_path.display().to_string(), e))?;
if modified <= self.last_modified {
return Ok(false);
}
match Config::from_file(&self.file_path) {
Ok(new_config) => {
{
let mut config = self.current.write().map_err(|_| {
Error::concurrency("Failed to acquire write lock".to_string())
})?;
*config = new_config;
}
self.last_modified = modified;
self.handlers.dispatch(&ConfigChangeEvent::Reloaded {
path: self.file_path.clone(),
timestamp: SystemTime::now(),
});
Ok(true)
}
Err(e) => {
self.handlers.dispatch(&ConfigChangeEvent::ReloadFailed {
path: self.file_path.clone(),
error: e.to_string(),
timestamp: SystemTime::now(),
});
Err(e)
}
}
}
pub fn start_watching(self) -> HotReloadHandle {
#[cfg(feature = "hot-reload")]
{
self.start_watching_event_driven()
}
#[cfg(not(feature = "hot-reload"))]
{
self.start_watching_polling()
}
}
pub fn file_path(&self) -> &Path {
&self.file_path
}
pub fn last_modified(&self) -> SystemTime {
self.last_modified
}
#[cfg(not(feature = "hot-reload"))]
fn start_watching_polling(self) -> HotReloadHandle {
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = Arc::clone(&stop);
let current = Arc::clone(&self.current);
let file_path = self.file_path.clone();
let handlers = Arc::clone(&self.handlers);
let poll_interval = self.poll_interval;
let mut last_modified = self.last_modified;
let handle = thread::spawn(move || {
while !stop_clone.load(Ordering::Relaxed) {
if let Ok(metadata) = std::fs::metadata(&file_path) {
if let Ok(modified) = metadata.modified() {
if modified > last_modified {
handlers.dispatch(&ConfigChangeEvent::FileModified {
path: file_path.clone(),
timestamp: SystemTime::now(),
});
match Config::from_file(&file_path) {
Ok(new_config) => {
if let Ok(mut config) = current.write() {
*config = new_config;
last_modified = modified;
handlers.dispatch(&ConfigChangeEvent::Reloaded {
path: file_path.clone(),
timestamp: SystemTime::now(),
});
}
}
Err(e) => {
handlers.dispatch(&ConfigChangeEvent::ReloadFailed {
path: file_path.clone(),
error: e.to_string(),
timestamp: SystemTime::now(),
});
}
}
}
}
}
thread::sleep(poll_interval);
}
});
HotReloadHandle {
handle: Some(handle),
stop,
handlers: self.handlers,
_bridges: self.bridges,
}
}
#[cfg(feature = "hot-reload")]
fn start_watching_event_driven(self) -> HotReloadHandle {
use notify::{Event, RecursiveMode, Watcher};
let stop = Arc::new(AtomicBool::new(false));
let current = Arc::clone(&self.current);
let file_path = self.file_path.clone();
let handlers = Arc::clone(&self.handlers);
let debounce = self.debounce;
let poll_interval = self.poll_interval;
let polling_fallback = self.polling_fallback_enabled;
let initial_modified = self.last_modified;
let (tx, rx) = mpsc::channel::<Event>();
let watcher_dir = file_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let watcher_result = notify::RecommendedWatcher::new(
move |res: notify::Result<Event>| {
if let Ok(event) = res {
let _ = tx.send(event);
}
},
notify::Config::default(),
)
.and_then(|mut w| {
w.watch(&watcher_dir, RecursiveMode::NonRecursive)?;
Ok(w)
});
let watcher = match watcher_result {
Ok(w) => Some(w),
Err(e) => {
handlers.dispatch(&ConfigChangeEvent::ReloadFailed {
path: file_path.clone(),
error: format!(
"notify watcher construction failed: {e}; falling back to polling"
),
timestamp: SystemTime::now(),
});
None
}
};
let target_file = file_path.clone();
let handlers_for_worker = Arc::clone(&handlers);
let current_for_worker = Arc::clone(¤t);
let stop_for_worker = Arc::clone(&stop);
let mut last_modified_seen = initial_modified;
let handle = thread::spawn(move || {
while !stop_for_worker.load(Ordering::Relaxed) {
let first = match rx.recv_timeout(poll_interval) {
Ok(ev) => Some(ev),
Err(mpsc::RecvTimeoutError::Timeout) => None,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
let mut relevant = false;
if let Some(ev) = first {
relevant |= event_targets_path(&ev, &target_file);
let deadline = std::time::Instant::now() + debounce;
loop {
let remaining =
deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break;
}
match rx.recv_timeout(remaining) {
Ok(ev) => relevant |= event_targets_path(&ev, &target_file),
Err(_) => break,
}
}
} else if !polling_fallback {
continue;
}
let metadata = std::fs::metadata(&target_file);
match metadata {
Ok(meta) => {
let modified = meta.modified().ok();
let is_newer = match modified {
Some(m) => m > last_modified_seen,
None => true,
};
if !relevant && !is_newer {
continue;
}
handlers_for_worker.dispatch(&ConfigChangeEvent::FileModified {
path: target_file.clone(),
timestamp: SystemTime::now(),
});
match Config::from_file(&target_file) {
Ok(new_config) => {
if let Ok(mut cfg) = current_for_worker.write() {
*cfg = new_config;
if let Some(m) = modified {
last_modified_seen = m;
}
handlers_for_worker.dispatch(&ConfigChangeEvent::Reloaded {
path: target_file.clone(),
timestamp: SystemTime::now(),
});
}
}
Err(e) => {
handlers_for_worker.dispatch(&ConfigChangeEvent::ReloadFailed {
path: target_file.clone(),
error: e.to_string(),
timestamp: SystemTime::now(),
});
}
}
}
Err(_) => {
handlers_for_worker.dispatch(&ConfigChangeEvent::FileDeleted {
path: target_file.clone(),
timestamp: SystemTime::now(),
});
}
}
}
});
HotReloadHandle {
handle: Some(handle),
stop,
handlers: self.handlers,
_bridges: self.bridges,
_watcher: watcher,
}
}
}
#[cfg(feature = "hot-reload")]
fn event_targets_path(event: ¬ify::Event, target: &Path) -> bool {
use notify::EventKind;
if !matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) | EventKind::Any
) {
return false;
}
let target_canon = std::fs::canonicalize(target).ok();
event.paths.iter().any(|p| {
if p == target {
return true;
}
if let (Some(tc), Ok(pc)) = (&target_canon, std::fs::canonicalize(p)) {
return *tc == pc;
}
false
})
}
pub struct HotReloadHandle {
handle: Option<thread::JoinHandle<()>>,
stop: Arc<AtomicBool>,
handlers: Arc<HandlerList>,
_bridges: Vec<Subscription>,
#[cfg(feature = "hot-reload")]
_watcher: Option<notify::RecommendedWatcher>,
}
impl HotReloadHandle {
pub fn on_change<F>(&self, handler: F) -> Subscription
where
F: Fn(&ConfigChangeEvent) + Send + Sync + 'static,
{
let id = self.handlers.register(Arc::new(handler));
Subscription {
list: Arc::downgrade(&self.handlers),
id,
}
}
pub fn stop(mut self) -> Result<()> {
self.stop.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
handle
.join()
.map_err(|_| Error::concurrency("Failed to join background thread".to_string()))?;
}
Ok(())
}
}
impl Drop for HotReloadHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::io::Write;
use std::sync::atomic::AtomicUsize;
use tempfile::TempDir;
fn write_conf(path: &Path, body: &str) {
let mut f = File::create(path).unwrap();
f.write_all(body.as_bytes()).unwrap();
f.flush().unwrap();
f.sync_all().unwrap();
}
#[test]
fn test_hot_reload_basic() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let mut hot_config = HotReloadConfig::from_file(&config_path).unwrap();
{
let config = hot_config.config();
let config_read = config.read().unwrap();
assert_eq!(
config_read.get("key").unwrap().as_string().unwrap(),
"value1"
);
}
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
let reloaded = hot_config.reload().unwrap();
assert!(reloaded);
{
let config = hot_config.config();
let config_read = config.read().unwrap();
assert_eq!(
config_read.get("key").unwrap().as_string().unwrap(),
"value2"
);
}
}
#[test]
#[allow(deprecated)]
fn test_hot_reload_notifications_deprecated_bridge() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let (mut hot_config, receiver) = HotReloadConfig::from_file(&config_path)
.unwrap()
.with_change_notifications();
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
hot_config.reload().unwrap();
let event = receiver.try_recv().unwrap();
match event {
ConfigChangeEvent::Reloaded { path, .. } => assert_eq!(path, config_path),
_ => panic!("Expected Reloaded event"),
}
}
#[test]
fn test_on_change_single_handler() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let mut hot = HotReloadConfig::from_file(&config_path).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let c = Arc::clone(&counter);
let _sub = hot.on_change(move |event| {
if matches!(event, ConfigChangeEvent::Reloaded { .. }) {
c.fetch_add(1, Ordering::Relaxed);
}
});
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
hot.reload().unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn test_on_change_multiple_handlers_fire_in_order() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let mut hot = HotReloadConfig::from_file(&config_path).unwrap();
let order: Arc<std::sync::Mutex<Vec<u8>>> = Arc::new(std::sync::Mutex::new(Vec::new()));
let o1 = Arc::clone(&order);
let _s1 = hot.on_change(move |_e| o1.lock().unwrap().push(1));
let o2 = Arc::clone(&order);
let _s2 = hot.on_change(move |_e| o2.lock().unwrap().push(2));
let o3 = Arc::clone(&order);
let _s3 = hot.on_change(move |_e| o3.lock().unwrap().push(3));
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
hot.reload().unwrap();
let final_order = order.lock().unwrap().clone();
assert_eq!(final_order, vec![1u8, 2, 3]);
}
#[test]
fn test_on_change_drop_unregisters() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let mut hot = HotReloadConfig::from_file(&config_path).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let c = Arc::clone(&counter);
let sub = hot.on_change(move |_e| {
c.fetch_add(1, Ordering::Relaxed);
});
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
hot.reload().unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 1);
drop(sub);
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value3\n");
hot.reload().unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 1);
}
#[test]
fn test_on_change_panic_isolation() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let mut hot = HotReloadConfig::from_file(&config_path).unwrap();
let after_panic = Arc::new(AtomicUsize::new(0));
let _s_panic = hot.on_change(|_e| {
panic!("handler-side panic; should be swallowed");
});
let after = Arc::clone(&after_panic);
let _s_after = hot.on_change(move |_e| {
after.fetch_add(1, Ordering::Relaxed);
});
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
hot.reload().unwrap();
assert_eq!(after_panic.load(Ordering::Relaxed), 1);
}
#[test]
fn test_on_change_forget_keeps_handler_alive() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let mut hot = HotReloadConfig::from_file(&config_path).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let c = Arc::clone(&counter);
hot.on_change(move |_e| {
c.fetch_add(1, Ordering::Relaxed);
})
.forget();
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
hot.reload().unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 1);
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value3\n");
hot.reload().unwrap();
assert_eq!(counter.load(Ordering::Relaxed), 2);
}
#[test]
fn test_handle_on_change_after_start_watching() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let hot = HotReloadConfig::from_file(&config_path)
.unwrap()
.with_poll_interval(Duration::from_millis(50))
.with_debounce(Duration::from_millis(25));
let handle = hot.start_watching();
let counter = Arc::new(AtomicUsize::new(0));
let c = Arc::clone(&counter);
let _sub = handle.on_change(move |_e| {
c.fetch_add(1, Ordering::Relaxed);
});
thread::sleep(Duration::from_millis(150));
write_conf(&config_path, "key=value2\n");
thread::sleep(Duration::from_millis(500));
assert!(
counter.load(Ordering::Relaxed) >= 1,
"handle.on_change handler never fired"
);
handle.stop().unwrap();
}
#[test]
fn test_automatic_watching() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let counter = Arc::new(AtomicUsize::new(0));
let hot = HotReloadConfig::from_file(&config_path)
.unwrap()
.with_poll_interval(Duration::from_millis(50))
.with_debounce(Duration::from_millis(25));
let c = Arc::clone(&counter);
let _sub = hot.on_change(move |event| {
if matches!(event, ConfigChangeEvent::Reloaded { .. }) {
c.fetch_add(1, Ordering::Relaxed);
}
});
let config_ref = hot.config();
let handle = hot.start_watching();
thread::sleep(Duration::from_millis(100));
write_conf(&config_path, "key=value2\n");
thread::sleep(Duration::from_millis(500));
{
let config_read = config_ref.read().unwrap();
assert_eq!(
config_read.get("key").unwrap().as_string().unwrap(),
"value2"
);
}
assert!(
counter.load(Ordering::Relaxed) >= 1,
"expected at least one Reloaded event"
);
handle.stop().unwrap();
}
}