task-graph-mcp 0.5.0

MCP server for agent task workflows with phases, prompts, gates, and multi-agent coordination
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
//! File watcher for configuration files.
//!
//! Watches for changes to:
//! - `config/*.yaml` files (workflow definitions, config)
//! - `task-graph/skills/` directory (custom skills)
//!
//! Emits reload events through a tokio watch channel when changes are detected.
//! Uses debouncing to coalesce rapid file changes.

use notify_debouncer_mini::{DebouncedEventKind, new_debouncer};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::{debug, error, info, warn};

/// Event types emitted when configuration files change.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConfigChangeEvent {
    /// A config YAML file changed (config.yaml, prompts.yaml, workflow-*.yaml)
    ConfigYaml(PathBuf),
    /// A workflow YAML file changed
    WorkflowYaml(PathBuf),
    /// Skills directory changed (file added, modified, or removed)
    SkillsChanged(PathBuf),
    /// Multiple files changed in quick succession
    BatchChange(Vec<PathBuf>),
    /// Watcher encountered an error
    Error(String),
}

impl ConfigChangeEvent {
    /// Returns true if this event requires a config reload.
    pub fn requires_reload(&self) -> bool {
        !matches!(self, ConfigChangeEvent::Error(_))
    }

    /// Get the affected paths for this event.
    pub fn affected_paths(&self) -> Vec<&Path> {
        match self {
            ConfigChangeEvent::ConfigYaml(p) => vec![p.as_path()],
            ConfigChangeEvent::WorkflowYaml(p) => vec![p.as_path()],
            ConfigChangeEvent::SkillsChanged(p) => vec![p.as_path()],
            ConfigChangeEvent::BatchChange(paths) => paths.iter().map(|p| p.as_path()).collect(),
            ConfigChangeEvent::Error(_) => vec![],
        }
    }
}

/// Configuration for the file watcher.
#[derive(Debug, Clone)]
pub struct WatcherConfig {
    /// Debounce duration for coalescing rapid changes.
    pub debounce_duration: Duration,
    /// Whether to watch config YAML files.
    pub watch_config: bool,
    /// Whether to watch skills directory.
    pub watch_skills: bool,
}

impl Default for WatcherConfig {
    fn default() -> Self {
        Self {
            debounce_duration: Duration::from_millis(500),
            watch_config: true,
            watch_skills: true,
        }
    }
}

/// Paths to watch for configuration changes.
#[derive(Debug, Clone)]
pub struct WatchPaths {
    /// Project config directory (typically `task-graph/` or `.task-graph/`)
    pub config_dir: Option<PathBuf>,
    /// Install/package config directory (typically `config/`)
    pub install_dir: Option<PathBuf>,
    /// User-level config directory (typically `~/.task-graph/`)
    pub user_dir: Option<PathBuf>,
    /// Skills directory (typically `task-graph/skills/`)
    pub skills_dir: Option<PathBuf>,
}

/// Handle to control the config watcher.
pub struct ConfigWatcherHandle {
    /// Receiver for config change events.
    /// Cloning this receiver will allow multiple consumers to receive events.
    pub events: watch::Receiver<Option<ConfigChangeEvent>>,
    /// Handle to the watcher task (dropping this will stop the watcher).
    _task_handle: tokio::task::JoinHandle<()>,
}

impl ConfigWatcherHandle {
    /// Wait for the next config change event.
    pub async fn wait_for_change(&mut self) -> Option<ConfigChangeEvent> {
        // Skip the initial None value
        loop {
            if self.events.changed().await.is_err() {
                return None; // Sender dropped
            }
            let event = self.events.borrow().clone();
            if event.is_some() {
                return event;
            }
        }
    }

    /// Check if there's a pending change without blocking.
    pub fn has_pending_change(&self) -> bool {
        self.events.borrow().is_some()
    }

    /// Get the latest event without waiting.
    pub fn latest_event(&self) -> Option<ConfigChangeEvent> {
        self.events.borrow().clone()
    }
}

/// Starts the configuration file watcher.
///
/// Returns a handle that provides:
/// - A watch channel receiver for config change events
/// - Automatic cleanup when dropped
///
/// # Arguments
/// * `paths` - Directories to watch
/// * `config` - Watcher configuration
///
/// # Example
/// ```ignore
/// let paths = WatchPaths {
///     config_dir: Some(PathBuf::from("./task-graph")),
///     skills_dir: Some(PathBuf::from("./task-graph/skills")),
/// };
/// let handle = start_config_watcher(paths, WatcherConfig::default())?;
///
/// // In an async context:
/// tokio::spawn(async move {
///     let mut events = handle.events;
///     while events.changed().await.is_ok() {
///         if let Some(event) = events.borrow().clone() {
///             println!("Config changed: {:?}", event);
///         }
///     }
/// });
/// ```
pub fn start_config_watcher(
    paths: WatchPaths,
    config: WatcherConfig,
) -> Result<ConfigWatcherHandle, notify::Error> {
    let (event_tx, event_rx) = watch::channel(None);
    let (notify_tx, notify_rx) = mpsc::channel();

    // Create the debounced watcher
    let mut debouncer = new_debouncer(config.debounce_duration, notify_tx)?;

    // Set up watches for each configured path
    let watcher = debouncer.watcher();

    if config.watch_config
        && let Some(ref config_dir) = paths.config_dir
    {
        if config_dir.exists() {
            info!("Watching config directory: {}", config_dir.display());
            watcher.watch(config_dir, notify::RecursiveMode::NonRecursive)?;
        } else {
            warn!(
                "Config directory does not exist, skipping watch: {}",
                config_dir.display()
            );
        }
    }

    if config.watch_config
        && let Some(ref install_dir) = paths.install_dir
    {
        if install_dir.exists() {
            info!(
                "Watching install config directory: {}",
                install_dir.display()
            );
            watcher.watch(install_dir, notify::RecursiveMode::NonRecursive)?;
        } else {
            warn!(
                "Install config directory does not exist, skipping watch: {}",
                install_dir.display()
            );
        }
    }

    if config.watch_config
        && let Some(ref user_dir) = paths.user_dir
    {
        if user_dir.exists() {
            info!("Watching user config directory: {}", user_dir.display());
            watcher.watch(user_dir, notify::RecursiveMode::NonRecursive)?;
        } else {
            debug!(
                "User config directory does not exist, skipping watch: {}",
                user_dir.display()
            );
        }
    }

    if config.watch_skills
        && let Some(ref skills_dir) = paths.skills_dir
    {
        if skills_dir.exists() {
            info!("Watching skills directory: {}", skills_dir.display());
            watcher.watch(skills_dir, notify::RecursiveMode::Recursive)?;
        } else {
            warn!(
                "Skills directory does not exist, skipping watch: {}",
                skills_dir.display()
            );
        }
    }

    // Spawn the event processing task
    let task_handle = tokio::task::spawn_blocking(move || {
        // Keep the debouncer alive
        let _debouncer = debouncer;

        // Process events from the notify channel
        process_notify_events(notify_rx, event_tx, &paths);
    });

    Ok(ConfigWatcherHandle {
        events: event_rx,
        _task_handle: task_handle,
    })
}

/// Process events from the notify debouncer and convert to ConfigChangeEvents.
fn process_notify_events(
    rx: mpsc::Receiver<Result<Vec<notify_debouncer_mini::DebouncedEvent>, notify::Error>>,
    tx: watch::Sender<Option<ConfigChangeEvent>>,
    paths: &WatchPaths,
) {
    loop {
        match rx.recv() {
            Ok(Ok(events)) => {
                let change_events = classify_events(events, paths);
                for event in change_events {
                    debug!("Config change detected: {:?}", event);
                    if tx.send(Some(event)).is_err() {
                        // Receiver dropped, exit
                        info!("Config watcher receiver dropped, stopping");
                        return;
                    }
                }
            }
            Ok(Err(e)) => {
                error!("File watcher error: {}", e);
                let _ = tx.send(Some(ConfigChangeEvent::Error(e.to_string())));
            }
            Err(_) => {
                // Channel closed, exit
                info!("Config watcher channel closed, stopping");
                return;
            }
        }
    }
}

/// Classify debounced events into ConfigChangeEvents.
fn classify_events(
    events: Vec<notify_debouncer_mini::DebouncedEvent>,
    paths: &WatchPaths,
) -> Vec<ConfigChangeEvent> {
    let mut result = Vec::new();
    let mut changed_paths: Vec<PathBuf> = Vec::new();

    for event in events {
        // Only process data change events (not just any access)
        if !matches!(
            event.kind,
            DebouncedEventKind::Any | DebouncedEventKind::AnyContinuous
        ) {
            continue;
        }

        let path = event.path;

        // Classify the path
        if let Some(event) = classify_path(&path, paths) {
            match event {
                ConfigChangeEvent::BatchChange(mut batch_paths) => {
                    changed_paths.append(&mut batch_paths);
                }
                _ => {
                    // For non-batch events, check if we should batch them
                    if let Some(p) = event.affected_paths().first() {
                        changed_paths.push(p.to_path_buf());
                    }
                }
            }
        }
    }

    // If we have multiple paths, create a batch event
    if changed_paths.len() > 1 {
        result.push(ConfigChangeEvent::BatchChange(changed_paths));
    } else if let Some(path) = changed_paths.into_iter().next()
        && let Some(event) = classify_path(&path, paths)
    {
        result.push(event);
    }

    result
}

/// Classify a single path into a ConfigChangeEvent.
fn classify_path(path: &Path, paths: &WatchPaths) -> Option<ConfigChangeEvent> {
    let extension = path.extension().and_then(|e| e.to_str());
    let file_name = path.file_name().and_then(|n| n.to_str()).unwrap_or("");

    // Check if it's a YAML file
    if matches!(extension, Some("yaml") | Some("yml")) {
        // Check if it's a workflow or overlay file
        if file_name.starts_with("workflow") || file_name.starts_with("overlay") {
            return Some(ConfigChangeEvent::WorkflowYaml(path.to_path_buf()));
        }
        // Check if it's config.yaml or prompts.yaml
        if file_name == "config.yaml" || file_name == "prompts.yaml" {
            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
        }
        // Other YAML files in config dir are treated as config
        if let Some(ref config_dir) = paths.config_dir
            && path.starts_with(config_dir)
        {
            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
        }
        // YAML files in install dir are also config
        if let Some(ref install_dir) = paths.install_dir
            && path.starts_with(install_dir)
        {
            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
        }
        // YAML files in user dir are also config
        if let Some(ref user_dir) = paths.user_dir
            && path.starts_with(user_dir)
        {
            return Some(ConfigChangeEvent::ConfigYaml(path.to_path_buf()));
        }
    }

    // Check if it's in the skills directory
    if let Some(ref skills_dir) = paths.skills_dir
        && path.starts_with(skills_dir)
    {
        return Some(ConfigChangeEvent::SkillsChanged(path.to_path_buf()));
    }

    None
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_classify_config_yaml() {
        let paths = WatchPaths {
            config_dir: Some(PathBuf::from("task-graph")),
            install_dir: None,
            user_dir: None,
            skills_dir: Some(PathBuf::from("task-graph/skills")),
        };

        let result = classify_path(&PathBuf::from("task-graph/config.yaml"), &paths);
        assert!(matches!(result, Some(ConfigChangeEvent::ConfigYaml(_))));
    }

    #[test]
    fn test_classify_workflow_yaml() {
        let paths = WatchPaths {
            config_dir: Some(PathBuf::from("config")),
            install_dir: None,
            user_dir: None,
            skills_dir: None,
        };

        let result = classify_path(&PathBuf::from("config/workflow-swarm.yaml"), &paths);
        assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
    }

    #[test]
    fn test_classify_skills_change() {
        let paths = WatchPaths {
            config_dir: None,
            install_dir: None,
            user_dir: None,
            skills_dir: Some(PathBuf::from("task-graph/skills")),
        };

        let result = classify_path(
            &PathBuf::from("task-graph/skills/coordinator/SKILL.md"),
            &paths,
        );
        assert!(matches!(result, Some(ConfigChangeEvent::SkillsChanged(_))));
    }

    #[test]
    fn test_classify_overlay_yaml() {
        let paths = WatchPaths {
            config_dir: Some(PathBuf::from("config")),
            install_dir: None,
            user_dir: None,
            skills_dir: None,
        };

        let result = classify_path(&PathBuf::from("config/overlay-git.yaml"), &paths);
        assert!(matches!(result, Some(ConfigChangeEvent::WorkflowYaml(_))));
    }

    #[test]
    fn test_classify_unknown_file() {
        let paths = WatchPaths {
            config_dir: Some(PathBuf::from("config")),
            install_dir: None,
            user_dir: None,
            skills_dir: None,
        };

        let result = classify_path(&PathBuf::from("src/main.rs"), &paths);
        assert!(result.is_none());
    }

    #[test]
    fn test_event_requires_reload() {
        assert!(ConfigChangeEvent::ConfigYaml(PathBuf::new()).requires_reload());
        assert!(ConfigChangeEvent::WorkflowYaml(PathBuf::new()).requires_reload());
        assert!(ConfigChangeEvent::SkillsChanged(PathBuf::new()).requires_reload());
        assert!(!ConfigChangeEvent::Error("test".to_string()).requires_reload());
    }
}