Skip to main content

task_graph_mcp/config/
watcher.rs

1//! File watcher for configuration files.
2//!
3//! Watches for changes to:
4//! - `config/*.yaml` files (workflow definitions, config)
5//! - `task-graph/skills/` directory (custom skills)
6//!
7//! Emits reload events through a tokio watch channel when changes are detected.
8//! Uses debouncing to coalesce rapid file changes.
9
10use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
11use std::path::{Path, PathBuf};
12use std::sync::mpsc;
13use std::time::Duration;
14use tokio::sync::watch;
15use tracing::{debug, error, info, warn};
16
17/// Event types emitted when configuration files change.
18#[derive(Debug, Clone, PartialEq, Eq)]
19pub enum ConfigChangeEvent {
20    /// A config YAML file changed (config.yaml, prompts.yaml, workflow-*.yaml)
21    ConfigYaml(PathBuf),
22    /// A workflow YAML file changed
23    WorkflowYaml(PathBuf),
24    /// Skills directory changed (file added, modified, or removed)
25    SkillsChanged(PathBuf),
26    /// Multiple files changed in quick succession
27    BatchChange(Vec<PathBuf>),
28    /// Watcher encountered an error
29    Error(String),
30}
31
32impl ConfigChangeEvent {
33    /// Returns true if this event requires a config reload.
34    pub fn requires_reload(&self) -> bool {
35        !matches!(self, ConfigChangeEvent::Error(_))
36    }
37
38    /// Get the affected paths for this event.
39    pub fn affected_paths(&self) -> Vec<&Path> {
40        match self {
41            ConfigChangeEvent::ConfigYaml(p) => vec![p.as_path()],
42            ConfigChangeEvent::WorkflowYaml(p) => vec![p.as_path()],
43            ConfigChangeEvent::SkillsChanged(p) => vec![p.as_path()],
44            ConfigChangeEvent::BatchChange(paths) => paths.iter().map(|p| p.as_path()).collect(),
45            ConfigChangeEvent::Error(_) => vec![],
46        }
47    }
48}
49
50/// Configuration for the file watcher.
51#[derive(Debug, Clone)]
52pub struct WatcherConfig {
53    /// Debounce duration for coalescing rapid changes.
54    pub debounce_duration: Duration,
55    /// Whether to watch config YAML files.
56    pub watch_config: bool,
57    /// Whether to watch skills directory.
58    pub watch_skills: bool,
59}
60
61impl Default for WatcherConfig {
62    fn default() -> Self {
63        Self {
64            debounce_duration: Duration::from_millis(500),
65            watch_config: true,
66            watch_skills: true,
67        }
68    }
69}
70
71/// Paths to watch for configuration changes.
72#[derive(Debug, Clone)]
73pub struct WatchPaths {
74    /// Config directory (typically `config/` or `task-graph/`)
75    pub config_dir: Option<PathBuf>,
76    /// Skills directory (typically `task-graph/skills/`)
77    pub skills_dir: Option<PathBuf>,
78}
79
80/// Handle to control the config watcher.
81pub struct ConfigWatcherHandle {
82    /// Receiver for config change events.
83    /// Cloning this receiver will allow multiple consumers to receive events.
84    pub events: watch::Receiver<Option<ConfigChangeEvent>>,
85    /// Handle to the watcher task (dropping this will stop the watcher).
86    _task_handle: tokio::task::JoinHandle<()>,
87}
88
89impl ConfigWatcherHandle {
90    /// Wait for the next config change event.
91    pub async fn wait_for_change(&mut self) -> Option<ConfigChangeEvent> {
92        // Skip the initial None value
93        loop {
94            if self.events.changed().await.is_err() {
95                return None; // Sender dropped
96            }
97            let event = self.events.borrow().clone();
98            if event.is_some() {
99                return event;
100            }
101        }
102    }
103
104    /// Check if there's a pending change without blocking.
105    pub fn has_pending_change(&self) -> bool {
106        self.events.borrow().is_some()
107    }
108
109    /// Get the latest event without waiting.
110    pub fn latest_event(&self) -> Option<ConfigChangeEvent> {
111        self.events.borrow().clone()
112    }
113}
114
115/// Starts the configuration file watcher.
116///
117/// Returns a handle that provides:
118/// - A watch channel receiver for config change events
119/// - Automatic cleanup when dropped
120///
121/// # Arguments
122/// * `paths` - Directories to watch
123/// * `config` - Watcher configuration
124///
125/// # Example
126/// ```ignore
127/// let paths = WatchPaths {
128///     config_dir: Some(PathBuf::from("./task-graph")),
129///     skills_dir: Some(PathBuf::from("./task-graph/skills")),
130/// };
131/// let handle = start_config_watcher(paths, WatcherConfig::default())?;
132///
133/// // In an async context:
134/// tokio::spawn(async move {
135///     let mut events = handle.events;
136///     while events.changed().await.is_ok() {
137///         if let Some(event) = events.borrow().clone() {
138///             println!("Config changed: {:?}", event);
139///         }
140///     }
141/// });
142/// ```
143pub fn start_config_watcher(
144    paths: WatchPaths,
145    config: WatcherConfig,
146) -> Result<ConfigWatcherHandle, notify::Error> {
147    let (event_tx, event_rx) = watch::channel(None);
148    let (notify_tx, notify_rx) = mpsc::channel();
149
150    // Create the debounced watcher
151    let mut debouncer = new_debouncer(config.debounce_duration, notify_tx)?;
152
153    // Set up watches for each configured path
154    let watcher = debouncer.watcher();
155
156    if config.watch_config
157        && let Some(ref config_dir) = paths.config_dir
158    {
159        if config_dir.exists() {
160            info!("Watching config directory: {}", config_dir.display());
161            watcher.watch(config_dir, notify::RecursiveMode::NonRecursive)?;
162        } else {
163            warn!(
164                "Config directory does not exist, skipping watch: {}",
165                config_dir.display()
166            );
167        }
168    }
169
170    if config.watch_skills
171        && let Some(ref skills_dir) = paths.skills_dir
172    {
173        if skills_dir.exists() {
174            info!("Watching skills directory: {}", skills_dir.display());
175            watcher.watch(skills_dir, notify::RecursiveMode::Recursive)?;
176        } else {
177            warn!(
178                "Skills directory does not exist, skipping watch: {}",
179                skills_dir.display()
180            );
181        }
182    }
183
184    // Spawn the event processing task
185    let task_handle = tokio::task::spawn_blocking(move || {
186        // Keep the debouncer alive
187        let _debouncer = debouncer;
188
189        // Process events from the notify channel
190        process_notify_events(notify_rx, event_tx, &paths);
191    });
192
193    Ok(ConfigWatcherHandle {
194        events: event_rx,
195        _task_handle: task_handle,
196    })
197}
198
199/// Process events from the notify debouncer and convert to ConfigChangeEvents.
200fn process_notify_events(
201    rx: mpsc::Receiver<Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>>,
202    tx: watch::Sender<Option<ConfigChangeEvent>>,
203    paths: &WatchPaths,
204) {
205    loop {
206        match rx.recv() {
207            Ok(Ok(events)) => {
208                let change_events = classify_events(events, paths);
209                for event in change_events {
210                    debug!("Config change detected: {:?}", event);
211                    if tx.send(Some(event)).is_err() {
212                        // Receiver dropped, exit
213                        info!("Config watcher receiver dropped, stopping");
214                        return;
215                    }
216                }
217            }
218            Ok(Err(e)) => {
219                error!("File watcher error: {}", e);
220                let _ = tx.send(Some(ConfigChangeEvent::Error(e.to_string())));
221            }
222            Err(_) => {
223                // Channel closed, exit
224                info!("Config watcher channel closed, stopping");
225                return;
226            }
227        }
228    }
229}
230
231/// Classify debounced events into ConfigChangeEvents.
232fn classify_events(
233    events: Vec<notify_debouncer_mini::DebouncedEvent>,
234    paths: &WatchPaths,
235) -> Vec<ConfigChangeEvent> {
236    let mut result = Vec::new();
237    let mut changed_paths: Vec<PathBuf> = Vec::new();
238
239    for event in events {
240        // Only process data change events (not just any access)
241        if !matches!(
242            event.kind,
243            DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous
244        ) {
245            continue;
246        }
247
248        let path = event.path;
249
250        // Classify the path
251        if let Some(event) = classify_path(&path, paths) {
252            match event {
253                ConfigChangeEvent::BatchChange(mut batch_paths) => {
254                    changed_paths.append(&mut batch_paths);
255                }
256                _ => {
257                    // For non-batch events, check if we should batch them
258                    if let Some(p) = event.affected_paths().first() {
259                        changed_paths.push(p.to_path_buf());
260                    }
261                }
262            }
263        }
264    }
265
266    // If we have multiple paths, create a batch event
267    if changed_paths.len() > 1 {
268        result.push(ConfigChangeEvent::BatchChange(changed_paths));
269    } else if let Some(path) = changed_paths.into_iter().next()
270        && let Some(event) = classify_path(&path, paths)
271    {
272        result.push(event);
273    }
274
275    result
276}
277
278/// Classify a single path into a ConfigChangeEvent.
279fn classify_path(path: &Path, paths: &WatchPaths) -> Option<ConfigChangeEvent> {
280    let extension = path.extension().and_then(|e| e.to_str());
281    let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");
282
283    // Check if it's a YAML file
284    if matches!(extension, Some("yaml") | Some("yml")) {
285        // Check if it's a workflow or overlay file
286        if file_name.starts_with("workflow") || file_name.starts_with("overlay") {
287            return Some(ConfigChangeEvent::WorkflowYaml(path.to_path_buf()));
288        }
289        // Check if it's config.yaml or prompts.yaml
290        if file_name == "config.yaml" || file_name == "prompts.yaml" {
291            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
292        }
293        // Other YAML files in config dir are treated as config
294        if let Some(ref config_dir) = paths.config_dir
295            && path.starts_with(config_dir)
296        {
297            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
298        }
299    }
300
301    // Check if it's in the skills directory
302    if let Some(ref skills_dir) = paths.skills_dir
303        && path.starts_with(skills_dir)
304    {
305        return Some(ConfigChangeEvent::SkillsChanged(path.to_path_buf()));
306    }
307
308    None
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn test_classify_config_yaml() {
317        let paths = WatchPaths {
318            config_dir: Some(PathBuf::from("task-graph")),
319            skills_dir: Some(PathBuf::from("task-graph/skills")),
320        };
321
322        let result = classify_path(&PathBuf::from("task-graph/config.yaml"), &paths);
323        assert!(matches!(result, Some(ConfigChangeEvent::ConfigYaml(_))));
324    }
325
326    #[test]
327    fn test_classify_workflow_yaml() {
328        let paths = WatchPaths {
329            config_dir: Some(PathBuf::from("config")),
330            skills_dir: None,
331        };
332
333        let result = classify_path(&PathBuf::from("config/workflow-swarm.yaml"), &paths);
334        assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
335    }
336
337    #[test]
338    fn test_classify_skills_change() {
339        let paths = WatchPaths {
340            config_dir: None,
341            skills_dir: Some(PathBuf::from("task-graph/skills")),
342        };
343
344        let result = classify_path(
345            &PathBuf::from("task-graph/skills/coordinator/SKILL.md"),
346            &paths,
347        );
348        assert!(matches!(result, Some(ConfigChangeEvent::SkillsChanged(_))));
349    }
350
351    #[test]
352    fn test_classify_overlay_yaml() {
353        let paths = WatchPaths {
354            config_dir: Some(PathBuf::from("config")),
355            skills_dir: None,
356        };
357
358        let result = classify_path(&PathBuf::from("config/overlay-git.yaml"), &paths);
359        assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
360    }
361
362    #[test]
363    fn test_classify_unknown_file() {
364        let paths = WatchPaths {
365            config_dir: Some(PathBuf::from("config")),
366            skills_dir: None,
367        };
368
369        let result = classify_path(&PathBuf::from("src/main.rs"), &paths);
370        assert!(result.is_none());
371    }
372
373    #[test]
374    fn test_event_requires_reload() {
375        assert!(ConfigChangeEvent::ConfigYaml(PathBuf::new()).requires_reload());
376        assert!(ConfigChangeEvent::WorkflowYaml(PathBuf::new()).requires_reload());
377        assert!(ConfigChangeEvent::SkillsChanged(PathBuf::new()).requires_reload());
378        assert!(!ConfigChangeEvent::Error("test".to_string()).requires_reload());
379    }
380}