1use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
11use std::path::{Path, PathBuf};
12use std::sync::mpsc;
13use std::time::Duration;
14use tokio::sync::watch;
15use tracing::{debug, error, info, warn};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ConfigChangeEvent {
20 ConfigYaml(PathBuf),
22 WorkflowYaml(PathBuf),
24 SkillsChanged(PathBuf),
26 BatchChange(Vec<PathBuf>),
28 Error(String),
30}
31
32impl ConfigChangeEvent {
33 pub fn requires_reload(&self) -> bool {
35 !matches!(self, ConfigChangeEvent::Error(_))
36 }
37
38 pub fn affected_paths(&self) -> Vec<&Path> {
40 match self {
41 ConfigChangeEvent::ConfigYaml(p) => vec![p.as_path()],
42 ConfigChangeEvent::WorkflowYaml(p) => vec![p.as_path()],
43 ConfigChangeEvent::SkillsChanged(p) => vec![p.as_path()],
44 ConfigChangeEvent::BatchChange(paths) => paths.iter().map(|p| p.as_path()).collect(),
45 ConfigChangeEvent::Error(_) => vec![],
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct WatcherConfig {
53 pub debounce_duration: Duration,
55 pub watch_config: bool,
57 pub watch_skills: bool,
59}
60
61impl Default for WatcherConfig {
62 fn default() -> Self {
63 Self {
64 debounce_duration: Duration::from_millis(500),
65 watch_config: true,
66 watch_skills: true,
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct WatchPaths {
74 pub config_dir: Option<PathBuf>,
76 pub install_dir: Option<PathBuf>,
78 pub user_dir: Option<PathBuf>,
80 pub skills_dir: Option<PathBuf>,
82}
83
84pub struct ConfigWatcherHandle {
86 pub events: watch::Receiver<Option<ConfigChangeEvent>>,
89 _task_handle: tokio::task::JoinHandle<()>,
91}
92
93impl ConfigWatcherHandle {
94 pub async fn wait_for_change(&mut self) -> Option<ConfigChangeEvent> {
96 loop {
98 if self.events.changed().await.is_err() {
99 return None; }
101 let event = self.events.borrow().clone();
102 if event.is_some() {
103 return event;
104 }
105 }
106 }
107
108 pub fn has_pending_change(&self) -> bool {
110 self.events.borrow().is_some()
111 }
112
113 pub fn latest_event(&self) -> Option<ConfigChangeEvent> {
115 self.events.borrow().clone()
116 }
117}
118
119pub fn start_config_watcher(
148 paths: WatchPaths,
149 config: WatcherConfig,
150) -> Result<ConfigWatcherHandle, notify::Error> {
151 let (event_tx, event_rx) = watch::channel(None);
152 let (notify_tx, notify_rx) = mpsc::channel();
153
154 let mut debouncer = new_debouncer(config.debounce_duration, notify_tx)?;
156
157 let watcher = debouncer.watcher();
159
160 if config.watch_config
161 && let Some(ref config_dir) = paths.config_dir
162 {
163 if config_dir.exists() {
164 info!("Watching config directory: {}", config_dir.display());
165 watcher.watch(config_dir, notify::RecursiveMode::NonRecursive)?;
166 } else {
167 warn!(
168 "Config directory does not exist, skipping watch: {}",
169 config_dir.display()
170 );
171 }
172 }
173
174 if config.watch_config
175 && let Some(ref install_dir) = paths.install_dir
176 {
177 if install_dir.exists() {
178 info!(
179 "Watching install config directory: {}",
180 install_dir.display()
181 );
182 watcher.watch(install_dir, notify::RecursiveMode::NonRecursive)?;
183 } else {
184 warn!(
185 "Install config directory does not exist, skipping watch: {}",
186 install_dir.display()
187 );
188 }
189 }
190
191 if config.watch_config
192 && let Some(ref user_dir) = paths.user_dir
193 {
194 if user_dir.exists() {
195 info!("Watching user config directory: {}", user_dir.display());
196 watcher.watch(user_dir, notify::RecursiveMode::NonRecursive)?;
197 } else {
198 debug!(
199 "User config directory does not exist, skipping watch: {}",
200 user_dir.display()
201 );
202 }
203 }
204
205 if config.watch_skills
206 && let Some(ref skills_dir) = paths.skills_dir
207 {
208 if skills_dir.exists() {
209 info!("Watching skills directory: {}", skills_dir.display());
210 watcher.watch(skills_dir, notify::RecursiveMode::Recursive)?;
211 } else {
212 warn!(
213 "Skills directory does not exist, skipping watch: {}",
214 skills_dir.display()
215 );
216 }
217 }
218
219 let task_handle = tokio::task::spawn_blocking(move || {
221 let _debouncer = debouncer;
223
224 process_notify_events(notify_rx, event_tx, &paths);
226 });
227
228 Ok(ConfigWatcherHandle {
229 events: event_rx,
230 _task_handle: task_handle,
231 })
232}
233
234fn process_notify_events(
236 rx: mpsc::Receiver<Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>>,
237 tx: watch::Sender<Option<ConfigChangeEvent>>,
238 paths: &WatchPaths,
239) {
240 loop {
241 match rx.recv() {
242 Ok(Ok(events)) => {
243 let change_events = classify_events(events, paths);
244 for event in change_events {
245 debug!("Config change detected: {:?}", event);
246 if tx.send(Some(event)).is_err() {
247 info!("Config watcher receiver dropped, stopping");
249 return;
250 }
251 }
252 }
253 Ok(Err(e)) => {
254 error!("File watcher error: {}", e);
255 let _ = tx.send(Some(ConfigChangeEvent::Error(e.to_string())));
256 }
257 Err(_) => {
258 info!("Config watcher channel closed, stopping");
260 return;
261 }
262 }
263 }
264}
265
266fn classify_events(
268 events: Vec<notify_debouncer_mini::DebouncedEvent>,
269 paths: &WatchPaths,
270) -> Vec<ConfigChangeEvent> {
271 let mut result = Vec::new();
272 let mut changed_paths: Vec<PathBuf> = Vec::new();
273
274 for event in events {
275 if !matches!(
277 event.kind,
278 DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous
279 ) {
280 continue;
281 }
282
283 let path = event.path;
284
285 if let Some(event) = classify_path(&path, paths) {
287 match event {
288 ConfigChangeEvent::BatchChange(mut batch_paths) => {
289 changed_paths.append(&mut batch_paths);
290 }
291 _ => {
292 if let Some(p) = event.affected_paths().first() {
294 changed_paths.push(p.to_path_buf());
295 }
296 }
297 }
298 }
299 }
300
301 if changed_paths.len() > 1 {
303 result.push(ConfigChangeEvent::BatchChange(changed_paths));
304 } else if let Some(path) = changed_paths.into_iter().next()
305 && let Some(event) = classify_path(&path, paths)
306 {
307 result.push(event);
308 }
309
310 result
311}
312
313fn classify_path(path: &Path, paths: &WatchPaths) -> Option<ConfigChangeEvent> {
315 let extension = path.extension().and_then(|e| e.to_str());
316 let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
317
318 if matches!(extension, Some("yaml") | Some("yml")) {
320 if file_name.starts_with("workflow") || file_name.starts_with("overlay") {
322 return Some(ConfigChangeEvent::WorkflowYaml(path.to_path_buf()));
323 }
324 if file_name == "config.yaml" || file_name == "prompts.yaml" {
326 return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
327 }
328 if let Some(ref config_dir) = paths.config_dir
330 && path.starts_with(config_dir)
331 {
332 return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
333 }
334 if let Some(ref install_dir) = paths.install_dir
336 && path.starts_with(install_dir)
337 {
338 return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
339 }
340 if let Some(ref user_dir) = paths.user_dir
342 && path.starts_with(user_dir)
343 {
344 return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
345 }
346 }
347
348 if let Some(ref skills_dir) = paths.skills_dir
350 && path.starts_with(skills_dir)
351 {
352 return Some(ConfigChangeEvent::SkillsChanged(path.to_path_buf()));
353 }
354
355 None
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361
362 #[test]
363 fn test_classify_config_yaml() {
364 let paths = WatchPaths {
365 config_dir: Some(PathBuf::from("task-graph")),
366 install_dir: None,
367 user_dir: None,
368 skills_dir: Some(PathBuf::from("task-graph/skills")),
369 };
370
371 let result = classify_path(&PathBuf::from("task-graph/config.yaml"), &paths);
372 assert!(matches!(result, Some(ConfigChangeEvent::ConfigYaml(_))));
373 }
374
375 #[test]
376 fn test_classify_workflow_yaml() {
377 let paths = WatchPaths {
378 config_dir: Some(PathBuf::from("config")),
379 install_dir: None,
380 user_dir: None,
381 skills_dir: None,
382 };
383
384 let result = classify_path(&PathBuf::from("config/workflow-swarm.yaml"), &paths);
385 assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
386 }
387
388 #[test]
389 fn test_classify_skills_change() {
390 let paths = WatchPaths {
391 config_dir: None,
392 install_dir: None,
393 user_dir: None,
394 skills_dir: Some(PathBuf::from("task-graph/skills")),
395 };
396
397 let result = classify_path(
398 &PathBuf::from("task-graph/skills/coordinator/SKILL.md"),
399 &paths,
400 );
401 assert!(matches!(result, Some(ConfigChangeEvent::SkillsChanged(_))));
402 }
403
404 #[test]
405 fn test_classify_overlay_yaml() {
406 let paths = WatchPaths {
407 config_dir: Some(PathBuf::from("config")),
408 install_dir: None,
409 user_dir: None,
410 skills_dir: None,
411 };
412
413 let result = classify_path(&PathBuf::from("config/overlay-git.yaml"), &paths);
414 assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
415 }
416
417 #[test]
418 fn test_classify_unknown_file() {
419 let paths = WatchPaths {
420 config_dir: Some(PathBuf::from("config")),
421 install_dir: None,
422 user_dir: None,
423 skills_dir: None,
424 };
425
426 let result = classify_path(&PathBuf::from("src/main.rs"), &paths);
427 assert!(result.is_none());
428 }
429
430 #[test]
431 fn test_event_requires_reload() {
432 assert!(ConfigChangeEvent::ConfigYaml(PathBuf::new()).requires_reload());
433 assert!(ConfigChangeEvent::WorkflowYaml(PathBuf::new()).requires_reload());
434 assert!(ConfigChangeEvent::SkillsChanged(PathBuf::new()).requires_reload());
435 assert!(!ConfigChangeEvent::Error("test".to_string()).requires_reload());
436 }
437}