use crate::config::Config;
use crate::error::{Error, Result};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum ConfigChangeEvent {
Reloaded {
path: PathBuf,
timestamp: SystemTime,
},
ReloadFailed {
path: PathBuf,
error: String,
timestamp: SystemTime,
},
FileModified {
path: PathBuf,
timestamp: SystemTime,
},
FileDeleted {
path: PathBuf,
timestamp: SystemTime,
},
}
const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(100);
pub struct HotReloadConfig {
current: Arc<RwLock<Config>>,
file_path: PathBuf,
last_modified: SystemTime,
event_sender: Option<Sender<ConfigChangeEvent>>,
poll_interval: Duration,
debounce: Duration,
polling_fallback_enabled: bool,
}
impl HotReloadConfig {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let config = Config::from_file(&path)?;
let last_modified = std::fs::metadata(&path)
.map_err(|e| Error::io(path.display().to_string(), e))?
.modified()
.map_err(|e| Error::io(path.display().to_string(), e))?;
Ok(Self {
current: Arc::new(RwLock::new(config)),
file_path: path,
last_modified,
event_sender: None,
poll_interval: Duration::from_secs(1),
debounce: DEFAULT_DEBOUNCE,
polling_fallback_enabled: false,
})
}
pub fn with_poll_interval(mut self, interval: Duration) -> Self {
self.poll_interval = interval;
self
}
pub fn with_debounce(mut self, debounce: Duration) -> Self {
self.debounce = debounce;
self
}
pub fn with_polling_fallback(mut self) -> Self {
self.polling_fallback_enabled = true;
self
}
pub fn with_change_notifications(mut self) -> (Self, Receiver<ConfigChangeEvent>) {
let (sender, receiver) = mpsc::channel();
self.event_sender = Some(sender);
(self, receiver)
}
pub fn config(&self) -> Arc<RwLock<Config>> {
Arc::clone(&self.current)
}
pub fn snapshot(&self) -> Result<Config> {
Config::from_file(&self.file_path)
}
pub fn reload(&mut self) -> Result<bool> {
let metadata = std::fs::metadata(&self.file_path)
.map_err(|e| Error::io(self.file_path.display().to_string(), e))?;
let modified = metadata
.modified()
.map_err(|e| Error::io(self.file_path.display().to_string(), e))?;
if modified <= self.last_modified {
return Ok(false);
}
match Config::from_file(&self.file_path) {
Ok(new_config) => {
{
let mut config = self.current.write().map_err(|_| {
Error::concurrency("Failed to acquire write lock".to_string())
})?;
*config = new_config;
}
self.last_modified = modified;
if let Some(sender) = &self.event_sender {
let _ = sender.send(ConfigChangeEvent::Reloaded {
path: self.file_path.clone(),
timestamp: SystemTime::now(),
});
}
Ok(true)
}
Err(e) => {
if let Some(sender) = &self.event_sender {
let _ = sender.send(ConfigChangeEvent::ReloadFailed {
path: self.file_path.clone(),
error: e.to_string(),
timestamp: SystemTime::now(),
});
}
Err(e)
}
}
}
pub fn start_watching(self) -> HotReloadHandle {
#[cfg(feature = "hot-reload")]
{
self.start_watching_event_driven()
}
#[cfg(not(feature = "hot-reload"))]
{
self.start_watching_polling()
}
}
pub fn file_path(&self) -> &Path {
&self.file_path
}
pub fn last_modified(&self) -> SystemTime {
self.last_modified
}
#[cfg(not(feature = "hot-reload"))]
fn start_watching_polling(self) -> HotReloadHandle {
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = Arc::clone(&stop);
let current = Arc::clone(&self.current);
let file_path = self.file_path.clone();
let event_sender = self.event_sender.clone();
let poll_interval = self.poll_interval;
let mut last_modified = self.last_modified;
let handle = thread::spawn(move || {
while !stop_clone.load(Ordering::Relaxed) {
if let Ok(metadata) = std::fs::metadata(&file_path) {
if let Ok(modified) = metadata.modified() {
if modified > last_modified {
if let Some(sender) = &event_sender {
let _ = sender.send(ConfigChangeEvent::FileModified {
path: file_path.clone(),
timestamp: SystemTime::now(),
});
}
match Config::from_file(&file_path) {
Ok(new_config) => {
if let Ok(mut config) = current.write() {
*config = new_config;
last_modified = modified;
if let Some(sender) = &event_sender {
let _ = sender.send(ConfigChangeEvent::Reloaded {
path: file_path.clone(),
timestamp: SystemTime::now(),
});
}
}
}
Err(e) => {
if let Some(sender) = &event_sender {
let _ = sender.send(ConfigChangeEvent::ReloadFailed {
path: file_path.clone(),
error: e.to_string(),
timestamp: SystemTime::now(),
});
}
}
}
}
}
}
thread::sleep(poll_interval);
}
});
HotReloadHandle {
handle: Some(handle),
stop,
}
}
#[cfg(feature = "hot-reload")]
fn start_watching_event_driven(self) -> HotReloadHandle {
use notify::{Event, RecursiveMode, Watcher};
let stop = Arc::new(AtomicBool::new(false));
let current = Arc::clone(&self.current);
let file_path = self.file_path.clone();
let event_sender = self.event_sender.clone();
let debounce = self.debounce;
let poll_interval = self.poll_interval;
let polling_fallback = self.polling_fallback_enabled;
let initial_modified = self.last_modified;
let (tx, rx) = mpsc::channel::<Event>();
let watcher_dir = file_path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
let watcher_result = notify::RecommendedWatcher::new(
move |res: notify::Result<Event>| {
if let Ok(event) = res {
let _ = tx.send(event);
}
},
notify::Config::default(),
)
.and_then(|mut w| {
w.watch(&watcher_dir, RecursiveMode::NonRecursive)?;
Ok(w)
});
let watcher = match watcher_result {
Ok(w) => Some(w),
Err(e) => {
if let Some(sender) = &event_sender {
let _ = sender.send(ConfigChangeEvent::ReloadFailed {
path: file_path.clone(),
error: format!(
"notify watcher construction failed: {e}; falling back to polling"
),
timestamp: SystemTime::now(),
});
}
None
}
};
let target_file = file_path.clone();
let event_sender_for_worker = event_sender.clone();
let current_for_worker = Arc::clone(¤t);
let stop_for_worker = Arc::clone(&stop);
let mut last_modified_seen = initial_modified;
let handle = thread::spawn(move || {
while !stop_for_worker.load(Ordering::Relaxed) {
let first = match rx.recv_timeout(poll_interval) {
Ok(ev) => Some(ev),
Err(mpsc::RecvTimeoutError::Timeout) => None,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
};
let mut relevant = false;
if let Some(ev) = first {
relevant |= event_targets_path(&ev, &target_file);
let deadline = std::time::Instant::now() + debounce;
loop {
let remaining =
deadline.saturating_duration_since(std::time::Instant::now());
if remaining.is_zero() {
break;
}
match rx.recv_timeout(remaining) {
Ok(ev) => relevant |= event_targets_path(&ev, &target_file),
Err(_) => break,
}
}
} else if !polling_fallback {
continue;
}
let metadata = std::fs::metadata(&target_file);
match metadata {
Ok(meta) => {
let modified = meta.modified().ok();
let is_newer = match modified {
Some(m) => m > last_modified_seen,
None => true,
};
if !relevant && !is_newer {
continue;
}
if let Some(sender) = &event_sender_for_worker {
let _ = sender.send(ConfigChangeEvent::FileModified {
path: target_file.clone(),
timestamp: SystemTime::now(),
});
}
match Config::from_file(&target_file) {
Ok(new_config) => {
if let Ok(mut cfg) = current_for_worker.write() {
*cfg = new_config;
if let Some(m) = modified {
last_modified_seen = m;
}
if let Some(sender) = &event_sender_for_worker {
let _ = sender.send(ConfigChangeEvent::Reloaded {
path: target_file.clone(),
timestamp: SystemTime::now(),
});
}
}
}
Err(e) => {
if let Some(sender) = &event_sender_for_worker {
let _ = sender.send(ConfigChangeEvent::ReloadFailed {
path: target_file.clone(),
error: e.to_string(),
timestamp: SystemTime::now(),
});
}
}
}
}
Err(_) => {
if let Some(sender) = &event_sender_for_worker {
let _ = sender.send(ConfigChangeEvent::FileDeleted {
path: target_file.clone(),
timestamp: SystemTime::now(),
});
}
}
}
}
});
HotReloadHandle {
handle: Some(handle),
stop,
_watcher: watcher,
}
}
}
#[cfg(feature = "hot-reload")]
fn event_targets_path(event: ¬ify::Event, target: &Path) -> bool {
use notify::EventKind;
if !matches!(
event.kind,
EventKind::Modify(_) | EventKind::Create(_) | EventKind::Remove(_) | EventKind::Any
) {
return false;
}
let target_canon = std::fs::canonicalize(target).ok();
event.paths.iter().any(|p| {
if p == target {
return true;
}
if let (Some(tc), Ok(pc)) = (&target_canon, std::fs::canonicalize(p)) {
return *tc == pc;
}
false
})
}
pub struct HotReloadHandle {
handle: Option<thread::JoinHandle<()>>,
stop: Arc<AtomicBool>,
#[cfg(feature = "hot-reload")]
_watcher: Option<notify::RecommendedWatcher>,
}
impl HotReloadHandle {
pub fn stop(mut self) -> Result<()> {
self.stop.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
handle
.join()
.map_err(|_| Error::concurrency("Failed to join background thread".to_string()))?;
}
Ok(())
}
}
impl Drop for HotReloadHandle {
fn drop(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
fn write_conf(path: &Path, body: &str) {
let mut f = File::create(path).unwrap();
f.write_all(body.as_bytes()).unwrap();
f.flush().unwrap();
f.sync_all().unwrap();
}
#[test]
fn test_hot_reload_basic() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let mut hot_config = HotReloadConfig::from_file(&config_path).unwrap();
{
let config = hot_config.config();
let config_read = config.read().unwrap();
assert_eq!(
config_read.get("key").unwrap().as_string().unwrap(),
"value1"
);
}
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
let reloaded = hot_config.reload().unwrap();
assert!(reloaded);
{
let config = hot_config.config();
let config_read = config.read().unwrap();
assert_eq!(
config_read.get("key").unwrap().as_string().unwrap(),
"value2"
);
}
}
#[test]
fn test_hot_reload_notifications() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let (mut hot_config, receiver) = HotReloadConfig::from_file(&config_path)
.unwrap()
.with_change_notifications();
thread::sleep(Duration::from_millis(10));
write_conf(&config_path, "key=value2\n");
hot_config.reload().unwrap();
let event = receiver.try_recv().unwrap();
match event {
ConfigChangeEvent::Reloaded { path, .. } => assert_eq!(path, config_path),
_ => panic!("Expected Reloaded event"),
}
}
#[test]
fn test_automatic_watching() {
let temp_dir = TempDir::new().unwrap();
let config_path = temp_dir.path().join("test.conf");
write_conf(&config_path, "key=value1\n");
let (hot_config, receiver) = HotReloadConfig::from_file(&config_path)
.unwrap()
.with_poll_interval(Duration::from_millis(50))
.with_debounce(Duration::from_millis(25))
.with_change_notifications();
let config_ref = hot_config.config();
let handle = hot_config.start_watching();
thread::sleep(Duration::from_millis(100));
write_conf(&config_path, "key=value2\n");
thread::sleep(Duration::from_millis(500));
{
let config_read = config_ref.read().unwrap();
assert_eq!(
config_read.get("key").unwrap().as_string().unwrap(),
"value2"
);
}
let mut events = Vec::new();
while let Ok(ev) = receiver.try_recv() {
events.push(ev);
}
assert!(
!events.is_empty(),
"expected at least one ConfigChangeEvent"
);
let has_reloaded = events
.iter()
.any(|e| matches!(e, ConfigChangeEvent::Reloaded { .. }));
assert!(has_reloaded, "expected at least one Reloaded event");
handle.stop().unwrap();
}
}