use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::time::Duration;
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use camel_api::CamelError;
use crate::context::RuntimeExecutionHandle;
use crate::hot_reload::application::{
compute_reload_actions_from_runtime_snapshot, execute_reload_actions,
};
use crate::lifecycle::application::route_definition::RouteDefinition;
pub struct ReloadWatcher;
impl ReloadWatcher {
pub async fn watch_and_reload<F>(
watch_dirs: Vec<PathBuf>,
controller: RuntimeExecutionHandle,
discover_fn: F,
shutdown: Option<CancellationToken>,
drain_timeout: Duration,
debounce: Duration,
) -> Result<(), CamelError>
where
F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
{
watch_and_reload(
watch_dirs,
controller,
discover_fn,
shutdown,
drain_timeout,
debounce,
)
.await
}
pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
resolve_watch_dirs(patterns)
}
}
pub async fn watch_and_reload<F>(
watch_dirs: Vec<PathBuf>,
controller: RuntimeExecutionHandle,
discover_fn: F,
shutdown: Option<CancellationToken>,
drain_timeout: Duration,
debounce: Duration,
) -> Result<(), CamelError>
where
F: Fn() -> Result<Vec<RouteDefinition>, CamelError> + Send + 'static,
{
let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(64);
let mut watcher = RecommendedWatcher::new(
move |res| {
let _ = tx.blocking_send(res);
},
notify::Config::default(),
)
.map_err(|e| CamelError::RouteError(format!("Failed to create file watcher: {e}")))?;
if watch_dirs.is_empty() {
tracing::warn!("hot-reload: no directories to watch");
}
for dir in &watch_dirs {
watcher
.watch(dir, RecursiveMode::NonRecursive)
.map_err(|e| {
CamelError::RouteError(format!("Failed to watch directory {dir:?}: {e}"))
})?;
tracing::info!("hot-reload: watching {:?}", dir);
}
let is_cancelled = || shutdown.as_ref().map(|t| t.is_cancelled()).unwrap_or(false);
loop {
if is_cancelled() {
tracing::info!("hot-reload: shutdown requested — stopping watcher");
return Ok(());
}
let triggered = loop {
let recv_fut = rx.recv();
if let Some(token) = shutdown.as_ref() {
tokio::select! {
biased;
_ = token.cancelled() => {
tracing::info!("hot-reload: shutdown requested — stopping watcher");
return Ok(());
}
msg = recv_fut => {
match msg {
None => return Ok(()), Some(Err(e)) => {
tracing::warn!("hot-reload: watcher error: {e}");
continue;
}
Some(Ok(event)) => {
if is_reload_event(&event) {
break true;
}
}
}
}
}
} else {
match recv_fut.await {
None => return Ok(()), Some(Err(e)) => {
tracing::warn!("hot-reload: watcher error: {e}");
continue;
}
Some(Ok(event)) => {
if is_reload_event(&event) {
break true;
}
}
}
}
};
if !triggered {
continue;
}
let deadline = tokio::time::Instant::now() + debounce;
loop {
match tokio::time::timeout_at(deadline, rx.recv()).await {
Ok(Some(_)) => continue, Ok(None) => return Ok(()), Err(_elapsed) => break, }
}
tracing::info!("hot-reload: file change detected — reloading routes");
let new_defs = match discover_fn() {
Ok(defs) => defs,
Err(e) => {
tracing::warn!("hot-reload: route discovery failed: {e}");
continue;
}
};
let runtime_route_ids = match controller.runtime_route_ids().await {
Ok(route_ids) => route_ids,
Err(err) => {
tracing::warn!(
error = %err,
"hot-reload: failed to list runtime routes; skipping this reload cycle"
);
continue;
}
};
let mut runtime_hashes: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
for id in &runtime_route_ids {
if let Some(hash) = controller.route_source_hash(id).await {
runtime_hashes.insert(id.clone(), hash);
}
}
let actions = compute_reload_actions_from_runtime_snapshot(
&new_defs,
&runtime_route_ids,
&|route_id: &str| runtime_hashes.get(route_id).copied(),
);
if actions.is_empty() {
tracing::debug!("hot-reload: no route changes detected");
continue;
}
tracing::info!("hot-reload: applying {} reload action(s)", actions.len());
let errors = execute_reload_actions(actions, new_defs, &controller, drain_timeout).await;
for err in &errors {
tracing::warn!(
"hot-reload: error on route '{}' ({}): {}",
err.route_id,
err.action,
err.error
);
}
}
}
fn is_reload_event(event: &Event) -> bool {
let has_yaml = event.paths.iter().any(|p| {
p.extension()
.map(|e| e == "yaml" || e == "yml")
.unwrap_or(false)
});
has_yaml
&& matches!(
event.kind,
EventKind::Create(_) | EventKind::Modify(_) | EventKind::Remove(_)
)
}
pub fn resolve_watch_dirs(patterns: &[String]) -> Vec<PathBuf> {
let mut dirs: HashSet<PathBuf> = HashSet::new();
for pattern in patterns {
let path = Path::new(pattern.as_str());
let dir = if path.is_dir() {
path.to_path_buf()
} else {
let mut candidate = path;
loop {
let s = candidate.to_string_lossy();
if s.contains('*') || s.contains('?') || s.contains('[') {
match candidate.parent() {
Some(p) => candidate = p,
None => break,
}
} else {
break;
}
}
candidate.to_path_buf()
};
if dir.as_os_str().is_empty() {
dirs.insert(PathBuf::from("."));
} else if dir.exists() {
dirs.insert(dir);
}
}
dirs.into_iter().collect()
}