task_graph_mcp/config/
watcher.rs1use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
11use std::path::{Path, PathBuf};
12use std::sync::mpsc;
13use std::time::Duration;
14use tokio::sync::watch;
15use tracing::{debug, error, info, warn};
16
17#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ConfigChangeEvent {
20 ConfigYaml(PathBuf),
22 WorkflowYaml(PathBuf),
24 SkillsChanged(PathBuf),
26 BatchChange(Vec<PathBuf>),
28 Error(String),
30}
31
32impl ConfigChangeEvent {
33 pub fn requires_reload(&self) -> bool {
35 !matches!(self, ConfigChangeEvent::Error(_))
36 }
37
38 pub fn affected_paths(&self) -> Vec<&Path> {
40 match self {
41 ConfigChangeEvent::ConfigYaml(p) => vec![p.as_path()],
42 ConfigChangeEvent::WorkflowYaml(p) => vec![p.as_path()],
43 ConfigChangeEvent::SkillsChanged(p) => vec![p.as_path()],
44 ConfigChangeEvent::BatchChange(paths) => paths.iter().map(|p| p.as_path()).collect(),
45 ConfigChangeEvent::Error(_) => vec![],
46 }
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct WatcherConfig {
53 pub debounce_duration: Duration,
55 pub watch_config: bool,
57 pub watch_skills: bool,
59}
60
61impl Default for WatcherConfig {
62 fn default() -> Self {
63 Self {
64 debounce_duration: Duration::from_millis(500),
65 watch_config: true,
66 watch_skills: true,
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct WatchPaths {
74 pub config_dir: Option<PathBuf>,
76 pub skills_dir: Option<PathBuf>,
78}
79
80pub struct ConfigWatcherHandle {
82 pub events: watch::Receiver<Option<ConfigChangeEvent>>,
85 _task_handle: tokio::task::JoinHandle<()>,
87}
88
89impl ConfigWatcherHandle {
90 pub async fn wait_for_change(&mut self) -> Option<ConfigChangeEvent> {
92 loop {
94 if self.events.changed().await.is_err() {
95 return None; }
97 let event = self.events.borrow().clone();
98 if event.is_some() {
99 return event;
100 }
101 }
102 }
103
104 pub fn has_pending_change(&self) -> bool {
106 self.events.borrow().is_some()
107 }
108
109 pub fn latest_event(&self) -> Option<ConfigChangeEvent> {
111 self.events.borrow().clone()
112 }
113}
114
115pub fn start_config_watcher(
144 paths: WatchPaths,
145 config: WatcherConfig,
146) -> Result<ConfigWatcherHandle, notify::Error> {
147 let (event_tx, event_rx) = watch::channel(None);
148 let (notify_tx, notify_rx) = mpsc::channel();
149
150 let mut debouncer = new_debouncer(config.debounce_duration, notify_tx)?;
152
153 let watcher = debouncer.watcher();
155
156 if config.watch_config {
157 if let Some(ref config_dir) = paths.config_dir {
158 if config_dir.exists() {
159 info!("Watching config directory: {}", config_dir.display());
160 watcher.watch(config_dir, notify::RecursiveMode::NonRecursive)?;
161 } else {
162 warn!(
163 "Config directory does not exist, skipping watch: {}",
164 config_dir.display()
165 );
166 }
167 }
168 }
169
170 if config.watch_skills {
171 if let Some(ref skills_dir) = paths.skills_dir {
172 if skills_dir.exists() {
173 info!("Watching skills directory: {}", skills_dir.display());
174 watcher.watch(skills_dir, notify::RecursiveMode::Recursive)?;
175 } else {
176 warn!(
177 "Skills directory does not exist, skipping watch: {}",
178 skills_dir.display()
179 );
180 }
181 }
182 }
183
184 let task_handle = tokio::task::spawn_blocking(move || {
186 let _debouncer = debouncer;
188
189 process_notify_events(notify_rx, event_tx, &paths);
191 });
192
193 Ok(ConfigWatcherHandle {
194 events: event_rx,
195 _task_handle: task_handle,
196 })
197}
198
199fn process_notify_events(
201 rx: mpsc::Receiver<Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>>,
202 tx: watch::Sender<Option<ConfigChangeEvent>>,
203 paths: &WatchPaths,
204) {
205 loop {
206 match rx.recv() {
207 Ok(Ok(events)) => {
208 let change_events = classify_events(events, paths);
209 for event in change_events {
210 debug!("Config change detected: {:?}", event);
211 if tx.send(Some(event)).is_err() {
212 info!("Config watcher receiver dropped, stopping");
214 return;
215 }
216 }
217 }
218 Ok(Err(e)) => {
219 error!("File watcher error: {}", e);
220 let _ = tx.send(Some(ConfigChangeEvent::Error(e.to_string())));
221 }
222 Err(_) => {
223 info!("Config watcher channel closed, stopping");
225 return;
226 }
227 }
228 }
229}
230
231fn classify_events(
233 events: Vec<notify_debouncer_mini::DebouncedEvent>,
234 paths: &WatchPaths,
235) -> Vec<ConfigChangeEvent> {
236 let mut result = Vec::new();
237 let mut changed_paths: Vec<PathBuf> = Vec::new();
238
239 for event in events {
240 if !matches!(
242 event.kind,
243 DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous
244 ) {
245 continue;
246 }
247
248 let path = event.path;
249
250 if let Some(event) = classify_path(&path, paths) {
252 match event {
253 ConfigChangeEvent::BatchChange(mut batch_paths) => {
254 changed_paths.append(&mut batch_paths);
255 }
256 _ => {
257 if let Some(p) = event.affected_paths().first() {
259 changed_paths.push(p.to_path_buf());
260 }
261 }
262 }
263 }
264 }
265
266 if changed_paths.len() > 1 {
268 result.push(ConfigChangeEvent::BatchChange(changed_paths));
269 } else if let Some(path) = changed_paths.into_iter().next() {
270 if let Some(event) = classify_path(&path, paths) {
271 result.push(event);
272 }
273 }
274
275 result
276}
277
278fn classify_path(path: &Path, paths: &WatchPaths) -> Option<ConfigChangeEvent> {
280 let extension = path.extension().and_then(|e| e.to_str());
281 let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
282
283 if matches!(extension, Some("yaml") | Some("yml")) {
285 if file_name.starts_with("workflow") {
287 return Some(ConfigChangeEvent::WorkflowYaml(path.to_path_buf()));
288 }
289 if file_name == "config.yaml" || file_name == "prompts.yaml" {
291 return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
292 }
293 if let Some(ref config_dir) = paths.config_dir {
295 if path.starts_with(config_dir) {
296 return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
297 }
298 }
299 }
300
301 if let Some(ref skills_dir) = paths.skills_dir {
303 if path.starts_with(skills_dir) {
304 return Some(ConfigChangeEvent::SkillsChanged(path.to_path_buf()));
305 }
306 }
307
308 None
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 #[test]
316 fn test_classify_config_yaml() {
317 let paths = WatchPaths {
318 config_dir: Some(PathBuf::from("task-graph")),
319 skills_dir: Some(PathBuf::from("task-graph/skills")),
320 };
321
322 let result = classify_path(&PathBuf::from("task-graph/config.yaml"), &paths);
323 assert!(matches!(result, Some(ConfigChangeEvent::ConfigYaml(_))));
324 }
325
326 #[test]
327 fn test_classify_workflow_yaml() {
328 let paths = WatchPaths {
329 config_dir: Some(PathBuf::from("config")),
330 skills_dir: None,
331 };
332
333 let result = classify_path(&PathBuf::from("config/workflow-swarm.yaml"), &paths);
334 assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
335 }
336
337 #[test]
338 fn test_classify_skills_change() {
339 let paths = WatchPaths {
340 config_dir: None,
341 skills_dir: Some(PathBuf::from("task-graph/skills")),
342 };
343
344 let result = classify_path(
345 &PathBuf::from("task-graph/skills/coordinator/SKILL.md"),
346 &paths,
347 );
348 assert!(matches!(result, Some(ConfigChangeEvent::SkillsChanged(_))));
349 }
350
351 #[test]
352 fn test_classify_unknown_file() {
353 let paths = WatchPaths {
354 config_dir: Some(PathBuf::from("config")),
355 skills_dir: None,
356 };
357
358 let result = classify_path(&PathBuf::from("src/main.rs"), &paths);
359 assert!(result.is_none());
360 }
361
362 #[test]
363 fn test_event_requires_reload() {
364 assert!(ConfigChangeEvent::ConfigYaml(PathBuf::new()).requires_reload());
365 assert!(ConfigChangeEvent::WorkflowYaml(PathBuf::new()).requires_reload());
366 assert!(ConfigChangeEvent::SkillsChanged(PathBuf::new()).requires_reload());
367 assert!(!ConfigChangeEvent::Error("test".to_string()).requires_reload());
368 }
369}