Skip to main content

task_graph_mcp/config/
watcher.rs

1//! File watcher for configuration files.
2//!
3//! Watches for changes to:
4//! - `config/*.yaml` files (workflow definitions, config)
5//! - `task-graph/skills/` directory (custom skills)
6//!
7//! Emits reload events through a tokio watch channel when changes are detected.
8//! Uses debouncing to coalesce rapid file changes.
9
10use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
11use std::path::{Path, PathBuf};
12use std::sync::mpsc;
13use std::time::Duration;
14use tokio::sync::watch;
15use tracing::{debug, error, info, warn};
16
17/// Event types emitted when configuration files change.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ConfigChangeEvent {
20    /// A config YAML file changed (config.yaml, prompts.yaml, workflow-*.yaml)
21    ConfigYaml(PathBuf),
22    /// A workflow YAML file changed
23    WorkflowYaml(PathBuf),
24    /// Skills directory changed (file added, modified, or removed)
25    SkillsChanged(PathBuf),
26    /// Multiple files changed in quick succession
27    BatchChange(Vec<PathBuf>),
28    /// Watcher encountered an error
29    Error(String),
30}
31
32impl ConfigChangeEvent {
33    /// Returns true if this event requires a config reload.
34    pub fn requires_reload(&self) -> bool {
35        !matches!(self, ConfigChangeEvent::Error(_))
36    }
37
38    /// Get the affected paths for this event.
39    pub fn affected_paths(&self) -> Vec<&Path> {
40        match self {
41            ConfigChangeEvent::ConfigYaml(p) => vec![p.as_path()],
42            ConfigChangeEvent::WorkflowYaml(p) => vec![p.as_path()],
43            ConfigChangeEvent::SkillsChanged(p) => vec![p.as_path()],
44            ConfigChangeEvent::BatchChange(paths) => paths.iter().map(|p| p.as_path()).collect(),
45            ConfigChangeEvent::Error(_) => vec![],
46        }
47    }
48}
49
50/// Configuration for the file watcher.
51#[derive(Debug, Clone)]
52pub struct WatcherConfig {
53    /// Debounce duration for coalescing rapid changes.
54    pub debounce_duration: Duration,
55    /// Whether to watch config YAML files.
56    pub watch_config: bool,
57    /// Whether to watch skills directory.
58    pub watch_skills: bool,
59}
60
61impl Default for WatcherConfig {
62    fn default() -> Self {
63        Self {
64            debounce_duration: Duration::from_millis(500),
65            watch_config: true,
66            watch_skills: true,
67        }
68    }
69}
70
71/// Paths to watch for configuration changes.
72#[derive(Debug, Clone)]
73pub struct WatchPaths {
74    /// Project config directory (typically `task-graph/` or `.task-graph/`)
75    pub config_dir: Option<PathBuf>,
76    /// Install/package config directory (typically `config/`)
77    pub install_dir: Option<PathBuf>,
78    /// User-level config directory (typically `~/.task-graph/`)
79    pub user_dir: Option<PathBuf>,
80    /// Skills directory (typically `task-graph/skills/`)
81    pub skills_dir: Option<PathBuf>,
82}
83
84/// Handle to control the config watcher.
85pub struct ConfigWatcherHandle {
86    /// Receiver for config change events.
87    /// Cloning this receiver will allow multiple consumers to receive events.
88    pub events: watch::Receiver<Option<ConfigChangeEvent>>,
89    /// Handle to the watcher task (dropping this will stop the watcher).
90    _task_handle: tokio::task::JoinHandle<()>,
91}
92
93impl ConfigWatcherHandle {
94    /// Wait for the next config change event.
95    pub async fn wait_for_change(&mut self) -> Option<ConfigChangeEvent> {
96        // Skip the initial None value
97        loop {
98            if self.events.changed().await.is_err() {
99                return None; // Sender dropped
100            }
101            let event = self.events.borrow().clone();
102            if event.is_some() {
103                return event;
104            }
105        }
106    }
107
108    /// Check if there's a pending change without blocking.
109    pub fn has_pending_change(&self) -> bool {
110        self.events.borrow().is_some()
111    }
112
113    /// Get the latest event without waiting.
114    pub fn latest_event(&self) -> Option<ConfigChangeEvent> {
115        self.events.borrow().clone()
116    }
117}
118
119/// Starts the configuration file watcher.
120///
121/// Returns a handle that provides:
122/// - A watch channel receiver for config change events
123/// - Automatic cleanup when dropped
124///
125/// # Arguments
126/// * `paths` - Directories to watch
127/// * `config` - Watcher configuration
128///
129/// # Example
130/// ```ignore
131/// let paths = WatchPaths {
132///     config_dir: Some(PathBuf::from("./task-graph")),
133///     skills_dir: Some(PathBuf::from("./task-graph/skills")),
134/// };
135/// let handle = start_config_watcher(paths, WatcherConfig::default())?;
136///
137/// // In an async context:
138/// tokio::spawn(async move {
139///     let mut events = handle.events;
140///     while events.changed().await.is_ok() {
141///         if let Some(event) = events.borrow().clone() {
142///             println!("Config changed: {:?}", event);
143///         }
144///     }
145/// });
146/// ```
147pub fn start_config_watcher(
148    paths: WatchPaths,
149    config: WatcherConfig,
150) -> Result<ConfigWatcherHandle, notify::Error> {
151    let (event_tx, event_rx) = watch::channel(None);
152    let (notify_tx, notify_rx) = mpsc::channel();
153
154    // Create the debounced watcher
155    let mut debouncer = new_debouncer(config.debounce_duration, notify_tx)?;
156
157    // Set up watches for each configured path
158    let watcher = debouncer.watcher();
159
160    if config.watch_config
161        && let Some(ref config_dir) = paths.config_dir
162    {
163        if config_dir.exists() {
164            info!("Watching config directory: {}", config_dir.display());
165            watcher.watch(config_dir, notify::RecursiveMode::NonRecursive)?;
166        } else {
167            warn!(
168                "Config directory does not exist, skipping watch: {}",
169                config_dir.display()
170            );
171        }
172    }
173
174    if config.watch_config
175        && let Some(ref install_dir) = paths.install_dir
176    {
177        if install_dir.exists() {
178            info!(
179                "Watching install config directory: {}",
180                install_dir.display()
181            );
182            watcher.watch(install_dir, notify::RecursiveMode::NonRecursive)?;
183        } else {
184            warn!(
185                "Install config directory does not exist, skipping watch: {}",
186                install_dir.display()
187            );
188        }
189    }
190
191    if config.watch_config
192        && let Some(ref user_dir) = paths.user_dir
193    {
194        if user_dir.exists() {
195            info!("Watching user config directory: {}", user_dir.display());
196            watcher.watch(user_dir, notify::RecursiveMode::NonRecursive)?;
197        } else {
198            debug!(
199                "User config directory does not exist, skipping watch: {}",
200                user_dir.display()
201            );
202        }
203    }
204
205    if config.watch_skills
206        && let Some(ref skills_dir) = paths.skills_dir
207    {
208        if skills_dir.exists() {
209            info!("Watching skills directory: {}", skills_dir.display());
210            watcher.watch(skills_dir, notify::RecursiveMode::Recursive)?;
211        } else {
212            warn!(
213                "Skills directory does not exist, skipping watch: {}",
214                skills_dir.display()
215            );
216        }
217    }
218
219    // Spawn the event processing task
220    let task_handle = tokio::task::spawn_blocking(move || {
221        // Keep the debouncer alive
222        let _debouncer = debouncer;
223
224        // Process events from the notify channel
225        process_notify_events(notify_rx, event_tx, &paths);
226    });
227
228    Ok(ConfigWatcherHandle {
229        events: event_rx,
230        _task_handle: task_handle,
231    })
232}
233
234/// Process events from the notify debouncer and convert to ConfigChangeEvents.
235fn process_notify_events(
236    rx: mpsc::Receiver<Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>>,
237    tx: watch::Sender<Option<ConfigChangeEvent>>,
238    paths: &WatchPaths,
239) {
240    loop {
241        match rx.recv() {
242            Ok(Ok(events)) => {
243                let change_events = classify_events(events, paths);
244                for event in change_events {
245                    debug!("Config change detected: {:?}", event);
246                    if tx.send(Some(event)).is_err() {
247                        // Receiver dropped, exit
248                        info!("Config watcher receiver dropped, stopping");
249                        return;
250                    }
251                }
252            }
253            Ok(Err(e)) => {
254                error!("File watcher error: {}", e);
255                let _ = tx.send(Some(ConfigChangeEvent::Error(e.to_string())));
256            }
257            Err(_) => {
258                // Channel closed, exit
259                info!("Config watcher channel closed, stopping");
260                return;
261            }
262        }
263    }
264}
265
266/// Classify debounced events into ConfigChangeEvents.
267fn classify_events(
268    events: Vec<notify_debouncer_mini::DebouncedEvent>,
269    paths: &WatchPaths,
270) -> Vec<ConfigChangeEvent> {
271    let mut result = Vec::new();
272    let mut changed_paths: Vec<PathBuf> = Vec::new();
273
274    for event in events {
275        // Only process data change events (not just any access)
276        if !matches!(
277            event.kind,
278            DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous
279        ) {
280            continue;
281        }
282
283        let path = event.path;
284
285        // Classify the path
286        if let Some(event) = classify_path(&path, paths) {
287            match event {
288                ConfigChangeEvent::BatchChange(mut batch_paths) => {
289                    changed_paths.append(&mut batch_paths);
290                }
291                _ => {
292                    // For non-batch events, check if we should batch them
293                    if let Some(p) = event.affected_paths().first() {
294                        changed_paths.push(p.to_path_buf());
295                    }
296                }
297            }
298        }
299    }
300
301    // If we have multiple paths, create a batch event
302    if changed_paths.len() > 1 {
303        result.push(ConfigChangeEvent::BatchChange(changed_paths));
304    } else if let Some(path) = changed_paths.into_iter().next()
305        && let Some(event) = classify_path(&path, paths)
306    {
307        result.push(event);
308    }
309
310    result
311}
312
313/// Classify a single path into a ConfigChangeEvent.
314fn classify_path(path: &Path, paths: &WatchPaths) -> Option<ConfigChangeEvent> {
315    let extension = path.extension().and_then(|e| e.to_str());
316    let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
317
318    // Check if it's a YAML file
319    if matches!(extension, Some("yaml") | Some("yml")) {
320        // Check if it's a workflow or overlay file
321        if file_name.starts_with("workflow") || file_name.starts_with("overlay") {
322            return Some(ConfigChangeEvent::WorkflowYaml(path.to_path_buf()));
323        }
324        // Check if it's config.yaml or prompts.yaml
325        if file_name == "config.yaml" || file_name == "prompts.yaml" {
326            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
327        }
328        // Other YAML files in config dir are treated as config
329        if let Some(ref config_dir) = paths.config_dir
330            && path.starts_with(config_dir)
331        {
332            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
333        }
334        // YAML files in install dir are also config
335        if let Some(ref install_dir) = paths.install_dir
336            && path.starts_with(install_dir)
337        {
338            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
339        }
340        // YAML files in user dir are also config
341        if let Some(ref user_dir) = paths.user_dir
342            && path.starts_with(user_dir)
343        {
344            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
345        }
346    }
347
348    // Check if it's in the skills directory
349    if let Some(ref skills_dir) = paths.skills_dir
350        && path.starts_with(skills_dir)
351    {
352        return Some(ConfigChangeEvent::SkillsChanged(path.to_path_buf()));
353    }
354
355    None
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361
362    #[test]
363    fn test_classify_config_yaml() {
364        let paths = WatchPaths {
365            config_dir: Some(PathBuf::from("task-graph")),
366            install_dir: None,
367            user_dir: None,
368            skills_dir: Some(PathBuf::from("task-graph/skills")),
369        };
370
371        let result = classify_path(&PathBuf::from("task-graph/config.yaml"), &paths);
372        assert!(matches!(result, Some(ConfigChangeEvent::ConfigYaml(_))));
373    }
374
375    #[test]
376    fn test_classify_workflow_yaml() {
377        let paths = WatchPaths {
378            config_dir: Some(PathBuf::from("config")),
379            install_dir: None,
380            user_dir: None,
381            skills_dir: None,
382        };
383
384        let result = classify_path(&PathBuf::from("config/workflow-swarm.yaml"), &paths);
385        assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
386    }
387
388    #[test]
389    fn test_classify_skills_change() {
390        let paths = WatchPaths {
391            config_dir: None,
392            install_dir: None,
393            user_dir: None,
394            skills_dir: Some(PathBuf::from("task-graph/skills")),
395        };
396
397        let result = classify_path(
398            &PathBuf::from("task-graph/skills/coordinator/SKILL.md"),
399            &paths,
400        );
401        assert!(matches!(result, Some(ConfigChangeEvent::SkillsChanged(_))));
402    }
403
404    #[test]
405    fn test_classify_overlay_yaml() {
406        let paths = WatchPaths {
407            config_dir: Some(PathBuf::from("config")),
408            install_dir: None,
409            user_dir: None,
410            skills_dir: None,
411        };
412
413        let result = classify_path(&PathBuf::from("config/overlay-git.yaml"), &paths);
414        assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
415    }
416
417    #[test]
418    fn test_classify_unknown_file() {
419        let paths = WatchPaths {
420            config_dir: Some(PathBuf::from("config")),
421            install_dir: None,
422            user_dir: None,
423            skills_dir: None,
424        };
425
426        let result = classify_path(&PathBuf::from("src/main.rs"), &paths);
427        assert!(result.is_none());
428    }
429
430    #[test]
431    fn test_event_requires_reload() {
432        assert!(ConfigChangeEvent::ConfigYaml(PathBuf::new()).requires_reload());
433        assert!(ConfigChangeEvent::WorkflowYaml(PathBuf::new()).requires_reload());
434        assert!(ConfigChangeEvent::SkillsChanged(PathBuf::new()).requires_reload());
435        assert!(!ConfigChangeEvent::Error("test".to_string()).requires_reload());
436    }
437}