Skip to main content

task_graph_mcp/tools/
agents.rs

1//! Worker connection and management tools.
2
3use super::{get_bool, get_i32, get_string, get_string_array, make_tool_with_prompts};
4use crate::config::workflows::WorkflowsConfig;
5use crate::config::{AppConfig, Prompts, ServerPaths, StatesConfig};
6use crate::db::Database;
7use crate::error::ToolError;
8use crate::format::{OutputFormat, ToolResult, format_workers_markdown};
9use anyhow::Result;
10use rmcp::model::Tool;
11use serde_json::{Value, json};
12
13/// Options for connecting a worker to the task graph.
14pub struct ConnectOptions<'a> {
15    pub db: &'a Database,
16    pub server_paths: &'a ServerPaths,
17    pub config: &'a AppConfig,
18    /// Per-connect workflow (may differ from config.workflows for named workflows).
19    pub workflows: &'a WorkflowsConfig,
20}
21
22pub fn get_tools(prompts: &Prompts) -> Vec<Tool> {
23    vec![
24        make_tool_with_prompts(
25            "connect",
26            "Connect as a worker. Call this FIRST before using other tools. Returns worker_id (save it for all subsequent calls). Tags enable task affinity matching.",
27            json!({
28                "worker_id": {
29                    "type": "string",
30                    "description": "Only use if assigned a unique name (e.g., 'worker-17', 'coordinator'). Avoid generic names like 'claude'. Leave empty for an auto-generated petname."
31                },
32                "tags": {
33                    "type": "array",
34                    "items": { "type": "string" },
35                    "description": "Worker capability tags for task affinity matching. Tasks use needed_tags/wanted_tags to prefer or require workers with matching tags."
36                },
37                "force": {
38                    "type": "boolean",
39                    "description": "Force reconnection if worker ID already exists (default: false). Use for stuck worker recovery."
40                },
41                "db_path": {
42                    "type": "string",
43                    "description": "Override database file path (same as TASK_GRAPH_DB_PATH env var). Note: Can only be set before server starts."
44                },
45                "media_dir": {
46                    "type": "string",
47                    "description": "Override media directory path (same as TASK_GRAPH_MEDIA_DIR env var). Note: Can only be set before server starts."
48                },
49                "log_dir": {
50                    "type": "string",
51                    "description": "Override log directory path (same as TASK_GRAPH_LOG_DIR env var). Note: Can only be set before server starts."
52                },
53                "config_path": {
54                    "type": "string",
55                    "description": "Override config file path (same as TASK_GRAPH_CONFIG_PATH env var). Note: Can only be set before server starts."
56                },
57                "workflow": {
58                    "type": "string",
59                    "description": "Named workflow to use (e.g., 'swarm' for workflow-swarm.yaml). If not specified, uses default workflows.yaml."
60                },
61                "overlays": {
62                    "type": "array",
63                    "items": { "type": "string" },
64                    "description": "Overlay names to apply on top of the workflow, in order (e.g., ['git', 'user-request']). Use list_workflows to see available overlays."
65                },
66                "max_claims": {
67                    "type": "integer",
68                    "description": "Maximum concurrent task claims (default: 1). Use 0 for unlimited."
69                }
70            }),
71            vec![],
72            prompts,
73        ),
74        make_tool_with_prompts(
75            "disconnect",
76            "Disconnect a worker, releasing all claims and locks.",
77            json!({
78                "worker_id": {
79                    "type": "string",
80                    "description": "The worker's ID"
81                },
82                "final_status": {
83                    "type": "string",
84                    "enum": ["pending", "completed", "cancelled", "failed"],
85                    "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
86                }
87            }),
88            vec!["worker_id"],
89            prompts,
90        ),
91        make_tool_with_prompts(
92            "list_agents",
93            "List all connected workers with their current status, claim counts, and what they're working on. Automatically evicts stale workers (no heartbeat within timeout).",
94            json!({
95                "tags": {
96                    "type": "array",
97                    "items": { "type": "string" },
98                    "description": "Filter workers that have ALL of these capability/affinity tags"
99                },
100                "file": {
101                    "type": "string",
102                    "description": "Filter workers that have claimed this file"
103                },
104                "task": {
105                    "type": "string",
106                    "description": "Filter workers related to this task ID"
107                },
108                "depth": {
109                    "type": "integer",
110                    "description": "Task relationship depth (-3 to 3). Negative: ancestors, positive: descendants. Used with 'task' filter."
111                },
112                "stale_timeout": {
113                    "type": "integer",
114                    "description": "Seconds without heartbeat before a worker is considered stale and evicted. Set to 0 to disable auto-cleanup. Default: 300 (5 minutes)."
115                }
116            }),
117            vec![],
118            prompts,
119        ),
120        make_tool_with_prompts(
121            "cleanup_stale",
122            "Evict stale workers that haven't sent a heartbeat within the timeout period. Releases their task claims and file locks. If worker_id is provided, force-expire that specific worker regardless of staleness.",
123            json!({
124                "worker_id": {
125                    "type": "string",
126                    "description": "If provided, force-expire this specific worker regardless of staleness. When omitted, expire all stale workers based on timeout."
127                },
128                "timeout": {
129                    "type": "integer",
130                    "description": "Seconds without heartbeat before a worker is considered stale. Default: 300 (5 minutes). Ignored when worker_id is provided."
131                },
132                "final_status": {
133                    "type": "string",
134                    "enum": ["pending", "completed", "cancelled", "failed"],
135                    "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
136                }
137            }),
138            vec![],
139            prompts,
140        ),
141        make_tool_with_prompts(
142            "add_overlay",
143            "Dynamically add an overlay to a connected worker's active overlay stack. The overlay is applied on top of existing overlays and persisted.",
144            json!({
145                "worker_id": {
146                    "type": "string",
147                    "description": "The worker's ID"
148                },
149                "overlay": {
150                    "type": "string",
151                    "description": "Name of the overlay to add (e.g., 'git', 'troubleshooting'). Use list_workflows to see available overlays."
152                }
153            }),
154            vec!["worker_id", "overlay"],
155            prompts,
156        ),
157        make_tool_with_prompts(
158            "remove_overlay",
159            "Dynamically remove an overlay from a connected worker's active overlay stack. The change is persisted immediately.",
160            json!({
161                "worker_id": {
162                    "type": "string",
163                    "description": "The worker's ID"
164                },
165                "overlay": {
166                    "type": "string",
167                    "description": "Name of the overlay to remove (must be currently active on this worker)"
168                }
169            }),
170            vec!["worker_id", "overlay"],
171            prompts,
172        ),
173    ]
174}
175
176pub fn connect(opts: ConnectOptions<'_>, args: Value) -> Result<Value> {
177    let ConnectOptions {
178        db,
179        server_paths,
180        config,
181        workflows,
182    } = opts;
183
184    let states_config = &config.states;
185    let phases_config = &config.phases;
186    let deps_config = &config.deps;
187    let tags_config = &config.tags;
188    let ids_config = &config.ids;
189
190    let worker_id = get_string(&args, "worker_id");
191    let tags = get_string_array(&args, "tags").unwrap_or_default();
192    let force = get_bool(&args, "force").unwrap_or(false);
193    let workflow = get_string(&args, "workflow");
194    let max_claims = get_i32(&args, "max_claims");
195
196    // Worker tags are accepted silently (no validation warnings).
197    // They participate in task affinity matching (needed_tags/wanted_tags)
198    // and workflow role matching, but unknown worker tags should not warn
199    // since workers may register capabilities not predefined in tag definitions.
200    // Task tag validation (on create/update) continues to warn as configured.
201
202    // Check for path override requests (informational - paths are set at server startup)
203    let mut path_notes: Vec<String> = Vec::new();
204
205    if let Some(requested_db) = get_string(&args, "db_path")
206        && server_paths.db_path.to_string_lossy() != requested_db
207    {
208        path_notes.push(format!(
209                "db_path: requested '{}' but server is using '{}' (set TASK_GRAPH_DB_PATH before starting server)",
210                requested_db,
211                server_paths.db_path.display()
212            ));
213    }
214
215    if let Some(requested_media) = get_string(&args, "media_dir")
216        && server_paths.media_dir.to_string_lossy() != requested_media
217    {
218        path_notes.push(format!(
219                "media_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_MEDIA_DIR before starting server)",
220                requested_media,
221                server_paths.media_dir.display()
222            ));
223    }
224
225    if let Some(requested_log) = get_string(&args, "log_dir")
226        && server_paths.log_dir.to_string_lossy() != requested_log
227    {
228        path_notes.push(format!(
229                "log_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_LOG_DIR before starting server)",
230                requested_log,
231                server_paths.log_dir.display()
232            ));
233    }
234
235    if let Some(requested_config) = get_string(&args, "config_path") {
236        let current_config = server_paths
237            .config_path
238            .as_ref()
239            .map(|p| p.to_string_lossy().to_string());
240        if current_config.as_deref() != Some(&requested_config) {
241            path_notes.push(format!(
242                "config_path: requested '{}' but server is using '{}' (set TASK_GRAPH_CONFIG_PATH before starting server)",
243                requested_config,
244                current_config.unwrap_or_else(|| "default locations".to_string())
245            ));
246        }
247    }
248
249    // Run stale worker cleanup before registering (catches orphaned workers from crashed sessions)
250    let disconnect_status = states_config.disconnect_state.clone();
251    let _ = db.cleanup_stale_workers(300, &disconnect_status);
252
253    // If force-reconnecting, expire the old worker first to release its claimed tasks
254    if force
255        && let Some(ref wid) = worker_id
256        && db.get_worker(wid)?.is_some()
257    {
258        let _ = db.expire_worker(wid, &disconnect_status);
259    }
260
261    let overlays = get_string_array(&args, "overlays").unwrap_or_default();
262    let worker = db.register_worker(
263        worker_id, tags, force, ids_config, workflow, overlays, max_claims,
264    )?;
265
266    // Build config summary for the response
267    let timed_states: Vec<&str> = states_config
268        .definitions
269        .iter()
270        .filter(|(_, def)| def.timed)
271        .map(|(name, _)| name.as_str())
272        .collect();
273
274    let terminal_states: Vec<&str> = states_config
275        .definitions
276        .iter()
277        .filter(|(_, def)| def.exits.is_empty())
278        .map(|(name, _)| name.as_str())
279        .collect();
280
281    let mut response = json!({
282        "version": env!("CARGO_PKG_VERSION"),
283        "worker_id": &worker.id,
284        "tags": worker.tags,
285        "max_claims": worker.max_claims,
286        "registered_at": crate::types::ms_to_iso(worker.registered_at),
287        "workflow": worker.workflow,
288        "paths": {
289            "db_path": server_paths.db_path.to_string_lossy(),
290            "media_dir": server_paths.media_dir.to_string_lossy(),
291            "log_dir": server_paths.log_dir.to_string_lossy(),
292            "config_path": server_paths.config_path.as_ref().map(|p| p.to_string_lossy().to_string())
293        },
294        "config": {
295            "states": states_config.state_names(),
296            "initial_state": &states_config.initial,
297            "timed_states": timed_states,
298            "terminal_states": terminal_states,
299            "blocking_states": &states_config.blocking_states,
300            "phases": phases_config.phase_names(),
301            "dependency_types": deps_config.dep_type_names(),
302            "known_tags": tags_config.tag_names()
303        }
304    });
305
306    // Hint about available MCP resources so agents discover them
307    response["hints"] = json!({
308        "resources": "MCP resources are available for live data. Use config://current for active configuration, query://tasks/ready for claimable tasks, docs://skills/list for skill documentation."
309    });
310
311    if !path_notes.is_empty() {
312        response["path_warnings"] = json!(path_notes);
313    }
314
315    // Deliver workflow-specific role information and prompts
316    if let Some(role_name) = workflows.match_role(&worker.tags) {
317        let mut role_info = json!({
318            "role": &role_name,
319        });
320
321        // Include role definition details
322        if let Some(role_def) = workflows.get_role(&role_name) {
323            if let Some(ref desc) = role_def.description {
324                role_info["description"] = json!(desc);
325            }
326            if let Some(max) = role_def.max_claims {
327                role_info["max_claims"] = json!(max);
328            }
329            if let Some(can_assign) = role_def.can_assign {
330                role_info["can_assign"] = json!(can_assign);
331            }
332        }
333
334        response["role"] = role_info;
335
336        // Include role-specific prompts
337        let prompts = workflows.get_role_prompts(&role_name);
338        if !prompts.is_empty() {
339            response["role_prompts"] = json!(prompts);
340        }
341    }
342
343    // Include workflow description if available
344    if let Some(ref desc) = workflows.description {
345        response["workflow_description"] = json!(desc);
346    }
347
348    // Include overlay information if overlays were applied
349    if !worker.overlays.is_empty() {
350        response["overlays"] = json!(worker.overlays);
351    }
352
353    Ok(response)
354}
355
356pub fn disconnect(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
357    let worker_id =
358        get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
359
360    // Get final_status from args or fall back to config
361    let final_status =
362        get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
363
364    // Validate final_status is untimed
365    if states_config.is_timed_state(&final_status) {
366        return Err(ToolError::invalid_value(
367            "final_status",
368            &format!(
369                "must be an untimed status, got '{}'. Valid statuses: {:?}",
370                final_status,
371                states_config.untimed_state_names()
372            ),
373        )
374        .into());
375    }
376
377    // Release worker locks before unregistering (close claim_sequence records)
378    let _ = db.release_worker_locks(&worker_id);
379
380    // Unregister and get summary
381    let summary = db.unregister_worker(&worker_id, &final_status)?;
382
383    Ok(json!({
384        "success": true,
385        "tasks_released": summary.tasks_released,
386        "files_released": summary.files_released,
387        "final_status": summary.final_status
388    }))
389}
390
391pub fn list_agents(
392    db: &Database,
393    states_config: &StatesConfig,
394    format: OutputFormat,
395    args: Value,
396) -> Result<ToolResult> {
397    // Extract filter parameters
398    let tags = get_string_array(&args, "tags");
399    let file = get_string(&args, "file");
400    let task = get_string(&args, "task");
401    let depth = get_i32(&args, "depth").unwrap_or(0).clamp(-3, 3);
402
403    // Auto-cleanup stale workers (default 5 minutes, 0 to disable)
404    let stale_timeout = get_i32(&args, "stale_timeout").unwrap_or(300);
405    let cleanup_summary = if stale_timeout > 0 {
406        let final_status = states_config.disconnect_state.clone();
407        db.cleanup_stale_workers(stale_timeout as i64, &final_status)
408            .ok()
409    } else {
410        None
411    };
412
413    // Get workers with filters
414    let workers = db.list_workers_filtered(
415        tags.as_ref(),
416        file.as_deref(),
417        task.as_deref(),
418        depth,
419        states_config,
420    )?;
421
422    // Get current time for heartbeat age calculation
423    let now = std::time::SystemTime::now()
424        .duration_since(std::time::UNIX_EPOCH)
425        .map(|d| d.as_millis() as i64)
426        .unwrap_or(0);
427
428    match format {
429        OutputFormat::Markdown => {
430            let mut output = String::new();
431            if let Some(ref summary) = cleanup_summary
432                && summary.workers_evicted > 0
433            {
434                output.push_str(&format!(
435                    "**Evicted {} stale worker(s)**: {} (released {} task(s), {} file(s))\n\n",
436                    summary.workers_evicted,
437                    summary.evicted_worker_ids.join(", "),
438                    summary.tasks_released,
439                    summary.files_released
440                ));
441            }
442            output.push_str(&format_workers_markdown(&workers));
443            Ok(ToolResult::Raw(output))
444        }
445        OutputFormat::Json => {
446            let mut result = json!({
447                "workers": workers.iter().map(|w| json!({
448                    "id": w.id,
449                    "tags": w.tags,
450                    "max_claims": w.max_claims,
451                    "claim_count": w.claim_count,
452                    "current_thought": w.current_thought,
453                    "registered_at": crate::types::ms_to_iso(w.registered_at),
454                    "last_heartbeat": crate::types::ms_to_iso(w.last_heartbeat),
455                    "heartbeat_age_ms": now - w.last_heartbeat,
456                    "workflow": w.workflow
457                })).collect::<Vec<_>>()
458            });
459
460            if let Some(summary) = cleanup_summary
461                && summary.workers_evicted > 0
462            {
463                result["cleanup"] = json!({
464                    "workers_evicted": summary.workers_evicted,
465                    "evicted_worker_ids": summary.evicted_worker_ids,
466                    "tasks_released": summary.tasks_released,
467                    "files_released": summary.files_released
468                });
469            }
470
471            Ok(ToolResult::Json(result))
472        }
473    }
474}
475
476pub fn cleanup_stale(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
477    // Get final_status from args or fall back to config
478    let final_status =
479        get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
480
481    // Validate final_status is untimed
482    if states_config.is_timed_state(&final_status) {
483        return Err(ToolError::invalid_value(
484            "final_status",
485            &format!(
486                "must be an untimed status, got '{}'. Valid statuses: {:?}",
487                final_status,
488                states_config.untimed_state_names()
489            ),
490        )
491        .into());
492    }
493
494    // If worker_id is provided, force-expire that specific worker regardless of staleness
495    if let Some(worker_id) = get_string(&args, "worker_id") {
496        let summary = db.expire_worker(&worker_id, &final_status)?;
497
498        return Ok(json!({
499            "workers_evicted": 1,
500            "evicted_worker_ids": [worker_id],
501            "tasks_released": summary.tasks_released,
502            "files_released": summary.files_released,
503            "final_status": summary.final_status
504        }));
505    }
506
507    // Default: expire all stale workers based on timeout (default 5 minutes)
508    let timeout = get_i32(&args, "timeout").unwrap_or(300) as i64;
509    let summary = db.cleanup_stale_workers(timeout, &final_status)?;
510
511    Ok(json!({
512        "workers_evicted": summary.workers_evicted,
513        "evicted_worker_ids": summary.evicted_worker_ids,
514        "tasks_released": summary.tasks_released,
515        "files_released": summary.files_released,
516        "final_status": summary.final_status
517    }))
518}
519
520pub fn add_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
521    let worker_id =
522        get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
523    let overlay_name =
524        get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
525
526    // Validate overlay exists in named_overlays
527    if !config.workflows.named_overlays.contains_key(&overlay_name) {
528        let available: Vec<&String> = config.workflows.named_overlays.keys().collect();
529        return Err(ToolError::invalid_value(
530            "overlay",
531            &format!(
532                "unknown overlay '{}'. Available overlays: {:?}",
533                overlay_name, available
534            ),
535        )
536        .into());
537    }
538
539    // Get current worker
540    let worker = db
541        .get_worker(&worker_id)?
542        .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
543
544    // Check not already active
545    if worker.overlays.contains(&overlay_name) {
546        return Err(ToolError::invalid_value(
547            "overlay",
548            &format!(
549                "overlay '{}' is already active on worker '{}'",
550                overlay_name, worker_id
551            ),
552        )
553        .into());
554    }
555
556    // Build new overlays list (append)
557    let mut new_overlays = worker.overlays.clone();
558    new_overlays.push(overlay_name);
559
560    // Persist
561    let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
562
563    // Build merged workflow to compute diff
564    let base = resolve_base_workflow(&updated_worker, config);
565    let mut merged = (*base).clone();
566    for name in &updated_worker.overlays {
567        if let Some(overlay) = config.workflows.named_overlays.get(name) {
568            merged.apply_overlay(overlay);
569        }
570    }
571    merged.active_overlays = updated_worker.overlays.clone();
572
573    let overlay_diff = merged.compute_overlay_diff(&base);
574
575    let mut response = json!({
576        "success": true,
577        "worker_id": updated_worker.id,
578        "overlays": updated_worker.overlays,
579        "overlay_diff": overlay_diff,
580    });
581
582    // Include role info if applicable
583    if let Some(role_name) = merged.match_role(&updated_worker.tags) {
584        response["role"] = json!(role_name);
585        let prompts = merged.get_role_prompts(&role_name);
586        if !prompts.is_empty() {
587            response["role_prompts"] = json!(prompts);
588        }
589    }
590
591    Ok(response)
592}
593
594pub fn remove_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
595    let worker_id =
596        get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
597    let overlay_name =
598        get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
599
600    // Get current worker
601    let worker = db
602        .get_worker(&worker_id)?
603        .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
604
605    // Check overlay is currently active
606    if !worker.overlays.contains(&overlay_name) {
607        return Err(ToolError::invalid_value(
608            "overlay",
609            &format!(
610                "overlay '{}' is not active on worker '{}'. Active overlays: {:?}",
611                overlay_name, worker_id, worker.overlays
612            ),
613        )
614        .into());
615    }
616
617    // Build new overlays list (remove)
618    let new_overlays: Vec<String> = worker
619        .overlays
620        .into_iter()
621        .filter(|o| o != &overlay_name)
622        .collect();
623
624    // Persist
625    let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
626
627    // Build merged workflow to compute diff
628    let base = resolve_base_workflow(&updated_worker, config);
629    let mut merged = (*base).clone();
630    for name in &updated_worker.overlays {
631        if let Some(overlay) = config.workflows.named_overlays.get(name) {
632            merged.apply_overlay(overlay);
633        }
634    }
635    merged.active_overlays = updated_worker.overlays.clone();
636
637    let overlay_diff = merged.compute_overlay_diff(&base);
638
639    let mut response = json!({
640        "success": true,
641        "worker_id": updated_worker.id,
642        "overlays": updated_worker.overlays,
643        "overlay_diff": overlay_diff,
644    });
645
646    // Include role info if applicable
647    if let Some(role_name) = merged.match_role(&updated_worker.tags) {
648        response["role"] = json!(role_name);
649        let prompts = merged.get_role_prompts(&role_name);
650        if !prompts.is_empty() {
651            response["role_prompts"] = json!(prompts);
652        }
653    }
654
655    Ok(response)
656}
657
658/// Resolve the base workflow for a worker (before overlays).
659fn resolve_base_workflow(
660    worker: &crate::types::Worker,
661    config: &AppConfig,
662) -> std::sync::Arc<WorkflowsConfig> {
663    if let Some(ref workflow_name) = worker.workflow {
664        config
665            .workflows
666            .get_named_workflow(workflow_name)
667            .map(std::sync::Arc::clone)
668    } else {
669        None
670    }
671    .or_else(|| {
672        config
673            .workflows
674            .get_default_workflow()
675            .map(std::sync::Arc::clone)
676    })
677    .unwrap_or_else(|| std::sync::Arc::clone(&config.workflows))
678}