use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConfigChangeEvent {
ConfigYaml(PathBuf),
WorkflowYaml(PathBuf),
SkillsChanged(PathBuf),
BatchChange(Vec<PathBuf>),
Error(String),
}
impl ConfigChangeEvent {
pub fn requires_reload(&self) -> bool {
!matches!(self, ConfigChangeEvent::Error(_))
}
pub fn affected_paths(&self) -> Vec<&Path> {
match self {
ConfigChangeEvent::ConfigYaml(p) => vec![p.as_path()],
ConfigChangeEvent::WorkflowYaml(p) => vec![p.as_path()],
ConfigChangeEvent::SkillsChanged(p) => vec![p.as_path()],
ConfigChangeEvent::BatchChange(paths) => paths.iter().map(|p| p.as_path()).collect(),
ConfigChangeEvent::Error(_) => vec![],
}
}
}
#[derive(Debug, Clone)]
pub struct WatcherConfig {
pub debounce_duration: Duration,
pub watch_config: bool,
pub watch_skills: bool,
}
impl Default for WatcherConfig {
fn default() -> Self {
Self {
debounce_duration: Duration::from_millis(500),
watch_config: true,
watch_skills: true,
}
}
}
#[derive(Debug, Clone)]
pub struct WatchPaths {
pub config_dir: Option<PathBuf>,
pub skills_dir: Option<PathBuf>,
}
pub struct ConfigWatcherHandle {
pub events: watch::Receiver<Option<ConfigChangeEvent>>,
_task_handle: tokio::task::JoinHandle<()>,
}
impl ConfigWatcherHandle {
pub async fn wait_for_change(&mut self) -> Option<ConfigChangeEvent> {
loop {
if self.events.changed().await.is_err() {
return None; }
let event = self.events.borrow().clone();
if event.is_some() {
return event;
}
}
}
pub fn has_pending_change(&self) -> bool {
self.events.borrow().is_some()
}
pub fn latest_event(&self) -> Option<ConfigChangeEvent> {
self.events.borrow().clone()
}
}
pub fn start_config_watcher(
paths: WatchPaths,
config: WatcherConfig,
) -> Result<ConfigWatcherHandle, notify::Error> {
let (event_tx, event_rx) = watch::channel(None);
let (notify_tx, notify_rx) = mpsc::channel();
let mut debouncer = new_debouncer(config.debounce_duration, notify_tx)?;
let watcher = debouncer.watcher();
if config.watch_config
&& let Some(ref config_dir) = paths.config_dir
{
if config_dir.exists() {
info!("Watching config directory: {}", config_dir.display());
watcher.watch(config_dir, notify::RecursiveMode::NonRecursive)?;
} else {
warn!(
"Config directory does not exist, skipping watch: {}",
config_dir.display()
);
}
}
if config.watch_skills
&& let Some(ref skills_dir) = paths.skills_dir
{
if skills_dir.exists() {
info!("Watching skills directory: {}", skills_dir.display());
watcher.watch(skills_dir, notify::RecursiveMode::Recursive)?;
} else {
warn!(
"Skills directory does not exist, skipping watch: {}",
skills_dir.display()
);
}
}
let task_handle = tokio::task::spawn_blocking(move || {
let _debouncer = debouncer;
process_notify_events(notify_rx, event_tx, &paths);
});
Ok(ConfigWatcherHandle {
events: event_rx,
_task_handle: task_handle,
})
}
fn process_notify_events(
rx: mpsc::Receiver<Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>>,
tx: watch::Sender<Option<ConfigChangeEvent>>,
paths: &WatchPaths,
) {
loop {
match rx.recv() {
Ok(Ok(events)) => {
let change_events = classify_events(events, paths);
for event in change_events {
debug!("Config change detected: {:?}", event);
if tx.send(Some(event)).is_err() {
info!("Config watcher receiver dropped, stopping");
return;
}
}
}
Ok(Err(e)) => {
error!("File watcher error: {}", e);
let _ = tx.send(Some(ConfigChangeEvent::Error(e.to_string())));
}
Err(_) => {
info!("Config watcher channel closed, stopping");
return;
}
}
}
}
fn classify_events(
events: Vec<notify_debouncer_mini::DebouncedEvent>,
paths: &WatchPaths,
) -> Vec<ConfigChangeEvent> {
let mut result = Vec::new();
let mut changed_paths: Vec<PathBuf> = Vec::new();
for event in events {
if !matches!(
event.kind,
DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous
) {
continue;
}
let path = event.path;
if let Some(event) = classify_path(&path, paths) {
match event {
ConfigChangeEvent::BatchChange(mut batch_paths) => {
changed_paths.append(&mut batch_paths);
}
_ => {
if let Some(p) = event.affected_paths().first() {
changed_paths.push(p.to_path_buf());
}
}
}
}
}
if changed_paths.len() > 1 {
result.push(ConfigChangeEvent::BatchChange(changed_paths));
} else if let Some(path) = changed_paths.into_iter().next()
&& let Some(event) = classify_path(&path, paths)
{
result.push(event);
}
result
}
fn classify_path(path: &Path, paths: &WatchPaths) -> Option<ConfigChangeEvent> {
let extension = path.extension().and_then(|e| e.to_str());
let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
if matches!(extension, Some("yaml") | Some("yml")) {
if file_name.starts_with("workflow") {
return Some(ConfigChangeEvent::WorkflowYaml(path.to_path_buf()));
}
if file_name == "config.yaml" || file_name == "prompts.yaml" {
return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
}
if let Some(ref config_dir) = paths.config_dir
&& path.starts_with(config_dir)
{
return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
}
}
if let Some(ref skills_dir) = paths.skills_dir
&& path.starts_with(skills_dir)
{
return Some(ConfigChangeEvent::SkillsChanged(path.to_path_buf()));
}
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_classify_config_yaml() {
let paths = WatchPaths {
config_dir: Some(PathBuf::from("task-graph")),
skills_dir: Some(PathBuf::from("task-graph/skills")),
};
let result = classify_path(&PathBuf::from("task-graph/config.yaml"), &paths);
assert!(matches!(result, Some(ConfigChangeEvent::ConfigYaml(_))));
}
#[test]
fn test_classify_workflow_yaml() {
let paths = WatchPaths {
config_dir: Some(PathBuf::from("config")),
skills_dir: None,
};
let result = classify_path(&PathBuf::from("config/workflow-swarm.yaml"), &paths);
assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
}
#[test]
fn test_classify_skills_change() {
let paths = WatchPaths {
config_dir: None,
skills_dir: Some(PathBuf::from("task-graph/skills")),
};
let result = classify_path(
&PathBuf::from("task-graph/skills/coordinator/SKILL.md"),
&paths,
);
assert!(matches!(result, Some(ConfigChangeEvent::SkillsChanged(_))));
}
#[test]
fn test_classify_unknown_file() {
let paths = WatchPaths {
config_dir: Some(PathBuf::from("config")),
skills_dir: None,
};
let result = classify_path(&PathBuf::from("src/main.rs"), &paths);
assert!(result.is_none());
}
#[test]
fn test_event_requires_reload() {
assert!(ConfigChangeEvent::ConfigYaml(PathBuf::new()).requires_reload());
assert!(ConfigChangeEvent::WorkflowYaml(PathBuf::new()).requires_reload());
assert!(ConfigChangeEvent::SkillsChanged(PathBuf::new()).requires_reload());
assert!(!ConfigChangeEvent::Error("test".to_string()).requires_reload());
}
}