Skip to main content

task_graph_mcp/tools/
agents.rs

1//! Worker connection and management tools.
2
3use super::{get_bool, get_i32, get_string, get_string_array, make_tool_with_prompts};
4use crate::config::workflows::WorkflowsConfig;
5use crate::config::{AppConfig, Prompts, ServerPaths, StatesConfig};
6use crate::db::Database;
7use crate::error::ToolError;
8use crate::format::{OutputFormat, ToolResult, format_workers_markdown};
9use anyhow::Result;
10use rmcp::model::Tool;
11use serde_json::{Value, json};
12
13/// Options for connecting a worker to the task graph.
14pub struct ConnectOptions<'a> {
15    pub db: &'a Database,
16    pub server_paths: &'a ServerPaths,
17    pub config: &'a AppConfig,
18    /// Per-connect workflow (may differ from config.workflows for named workflows).
19    pub workflows: &'a WorkflowsConfig,
20}
21
22pub fn get_tools(prompts: &Prompts) -> Vec<Tool> {
23    vec![
24        make_tool_with_prompts(
25            "connect",
26            "Connect as a worker. Call this FIRST before using other tools. Returns worker_id (save it for all subsequent calls). Tags enable task affinity matching.",
27            json!({
28                "worker_id": {
29                    "type": "string",
30                    "description": "Only use if assigned a unique name (e.g., 'worker-17', 'coordinator'). Avoid generic names like 'claude'. Leave empty for an auto-generated petname."
31                },
32                "tags": {
33                    "type": "array",
34                    "items": { "type": "string" },
35                    "description": "Freeform tags for capabilities, roles, etc."
36                },
37                "force": {
38                    "type": "boolean",
39                    "description": "Force reconnection if worker ID already exists (default: false). Use for stuck worker recovery."
40                },
41                "db_path": {
42                    "type": "string",
43                    "description": "Override database file path (same as TASK_GRAPH_DB_PATH env var). Note: Can only be set before server starts."
44                },
45                "media_dir": {
46                    "type": "string",
47                    "description": "Override media directory path (same as TASK_GRAPH_MEDIA_DIR env var). Note: Can only be set before server starts."
48                },
49                "log_dir": {
50                    "type": "string",
51                    "description": "Override log directory path (same as TASK_GRAPH_LOG_DIR env var). Note: Can only be set before server starts."
52                },
53                "config_path": {
54                    "type": "string",
55                    "description": "Override config file path (same as TASK_GRAPH_CONFIG_PATH env var). Note: Can only be set before server starts."
56                },
57                "workflow": {
58                    "type": "string",
59                    "description": "Named workflow to use (e.g., 'swarm' for workflow-swarm.yaml). If not specified, uses default workflows.yaml."
60                },
61                "overlays": {
62                    "type": "array",
63                    "items": { "type": "string" },
64                    "description": "Overlay names to apply on top of the workflow, in order (e.g., ['git', 'user-request']). Use list_workflows to see available overlays."
65                }
66            }),
67            vec![],
68            prompts,
69        ),
70        make_tool_with_prompts(
71            "disconnect",
72            "Disconnect a worker, releasing all claims and locks.",
73            json!({
74                "worker_id": {
75                    "type": "string",
76                    "description": "The worker's ID"
77                },
78                "final_status": {
79                    "type": "string",
80                    "enum": ["pending", "completed", "cancelled", "failed"],
81                    "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
82                }
83            }),
84            vec!["worker_id"],
85            prompts,
86        ),
87        make_tool_with_prompts(
88            "list_agents",
89            "List all connected workers with their current status, claim counts, and what they're working on. Automatically evicts stale workers (no heartbeat within timeout).",
90            json!({
91                "tags": {
92                    "type": "array",
93                    "items": { "type": "string" },
94                    "description": "Filter workers that have ALL of these tags"
95                },
96                "file": {
97                    "type": "string",
98                    "description": "Filter workers that have claimed this file"
99                },
100                "task": {
101                    "type": "string",
102                    "description": "Filter workers related to this task ID"
103                },
104                "depth": {
105                    "type": "integer",
106                    "description": "Task relationship depth (-3 to 3). Negative: ancestors, positive: descendants. Used with 'task' filter."
107                },
108                "stale_timeout": {
109                    "type": "integer",
110                    "description": "Seconds without heartbeat before a worker is considered stale and evicted. Set to 0 to disable auto-cleanup. Default: 300 (5 minutes)."
111                }
112            }),
113            vec![],
114            prompts,
115        ),
116        make_tool_with_prompts(
117            "cleanup_stale",
118            "Evict stale workers that haven't sent a heartbeat within the timeout period. Releases their task claims and file locks.",
119            json!({
120                "timeout": {
121                    "type": "integer",
122                    "description": "Seconds without heartbeat before a worker is considered stale. Default: 300 (5 minutes)."
123                },
124                "final_status": {
125                    "type": "string",
126                    "enum": ["pending", "completed", "cancelled", "failed"],
127                    "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
128                }
129            }),
130            vec![],
131            prompts,
132        ),
133        make_tool_with_prompts(
134            "add_overlay",
135            "Dynamically add an overlay to a connected worker's active overlay stack. The overlay is applied on top of existing overlays and persisted.",
136            json!({
137                "worker_id": {
138                    "type": "string",
139                    "description": "The worker's ID"
140                },
141                "overlay": {
142                    "type": "string",
143                    "description": "Name of the overlay to add (e.g., 'git', 'troubleshooting'). Use list_workflows to see available overlays."
144                }
145            }),
146            vec!["worker_id", "overlay"],
147            prompts,
148        ),
149        make_tool_with_prompts(
150            "remove_overlay",
151            "Dynamically remove an overlay from a connected worker's active overlay stack. The change is persisted immediately.",
152            json!({
153                "worker_id": {
154                    "type": "string",
155                    "description": "The worker's ID"
156                },
157                "overlay": {
158                    "type": "string",
159                    "description": "Name of the overlay to remove (must be currently active on this worker)"
160                }
161            }),
162            vec!["worker_id", "overlay"],
163            prompts,
164        ),
165    ]
166}
167
168pub fn connect(opts: ConnectOptions<'_>, args: Value) -> Result<Value> {
169    let ConnectOptions {
170        db,
171        server_paths,
172        config,
173        workflows,
174    } = opts;
175
176    let states_config = &config.states;
177    let phases_config = &config.phases;
178    let deps_config = &config.deps;
179    let tags_config = &config.tags;
180    let ids_config = &config.ids;
181
182    let worker_id = get_string(&args, "worker_id");
183    let tags = get_string_array(&args, "tags").unwrap_or_default();
184    let force = get_bool(&args, "force").unwrap_or(false);
185    let workflow = get_string(&args, "workflow");
186
187    // Validate tags if provided
188    let tag_warnings = tags_config.validate_tags(&tags)?;
189
190    // Check for path override requests (informational - paths are set at server startup)
191    let mut path_notes: Vec<String> = Vec::new();
192
193    if let Some(requested_db) = get_string(&args, "db_path")
194        && server_paths.db_path.to_string_lossy() != requested_db
195    {
196        path_notes.push(format!(
197                "db_path: requested '{}' but server is using '{}' (set TASK_GRAPH_DB_PATH before starting server)",
198                requested_db,
199                server_paths.db_path.display()
200            ));
201    }
202
203    if let Some(requested_media) = get_string(&args, "media_dir")
204        && server_paths.media_dir.to_string_lossy() != requested_media
205    {
206        path_notes.push(format!(
207                "media_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_MEDIA_DIR before starting server)",
208                requested_media,
209                server_paths.media_dir.display()
210            ));
211    }
212
213    if let Some(requested_log) = get_string(&args, "log_dir")
214        && server_paths.log_dir.to_string_lossy() != requested_log
215    {
216        path_notes.push(format!(
217                "log_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_LOG_DIR before starting server)",
218                requested_log,
219                server_paths.log_dir.display()
220            ));
221    }
222
223    if let Some(requested_config) = get_string(&args, "config_path") {
224        let current_config = server_paths
225            .config_path
226            .as_ref()
227            .map(|p| p.to_string_lossy().to_string());
228        if current_config.as_deref() != Some(&requested_config) {
229            path_notes.push(format!(
230                "config_path: requested '{}' but server is using '{}' (set TASK_GRAPH_CONFIG_PATH before starting server)",
231                requested_config,
232                current_config.unwrap_or_else(|| "default locations".to_string())
233            ));
234        }
235    }
236
237    let overlays = get_string_array(&args, "overlays").unwrap_or_default();
238    let worker = db.register_worker(worker_id, tags, force, ids_config, workflow, overlays)?;
239
240    // Build config summary for the response
241    let timed_states: Vec<&str> = states_config
242        .definitions
243        .iter()
244        .filter(|(_, def)| def.timed)
245        .map(|(name, _)| name.as_str())
246        .collect();
247
248    let terminal_states: Vec<&str> = states_config
249        .definitions
250        .iter()
251        .filter(|(_, def)| def.exits.is_empty())
252        .map(|(name, _)| name.as_str())
253        .collect();
254
255    let mut response = json!({
256        "version": env!("CARGO_PKG_VERSION"),
257        "worker_id": &worker.id,
258        "tags": worker.tags,
259        "max_claims": worker.max_claims,
260        "registered_at": worker.registered_at,
261        "workflow": worker.workflow,
262        "paths": {
263            "db_path": server_paths.db_path.to_string_lossy(),
264            "media_dir": server_paths.media_dir.to_string_lossy(),
265            "log_dir": server_paths.log_dir.to_string_lossy(),
266            "config_path": server_paths.config_path.as_ref().map(|p| p.to_string_lossy().to_string())
267        },
268        "config": {
269            "states": states_config.state_names(),
270            "initial_state": &states_config.initial,
271            "timed_states": timed_states,
272            "terminal_states": terminal_states,
273            "blocking_states": &states_config.blocking_states,
274            "phases": phases_config.phase_names(),
275            "dependency_types": deps_config.dep_type_names(),
276            "known_tags": tags_config.tag_names()
277        }
278    });
279
280    if !path_notes.is_empty() {
281        response["path_warnings"] = json!(path_notes);
282    }
283
284    if !tag_warnings.is_empty() {
285        response["tag_warnings"] = json!(tag_warnings);
286    }
287
288    // Deliver workflow-specific role information and prompts
289    if let Some(role_name) = workflows.match_role(&worker.tags) {
290        let mut role_info = json!({
291            "role": &role_name,
292        });
293
294        // Include role definition details
295        if let Some(role_def) = workflows.get_role(&role_name) {
296            if let Some(ref desc) = role_def.description {
297                role_info["description"] = json!(desc);
298            }
299            if let Some(max) = role_def.max_claims {
300                role_info["max_claims"] = json!(max);
301            }
302            if let Some(can_assign) = role_def.can_assign {
303                role_info["can_assign"] = json!(can_assign);
304            }
305        }
306
307        response["role"] = role_info;
308
309        // Include role-specific prompts
310        let prompts = workflows.get_role_prompts(&role_name);
311        if !prompts.is_empty() {
312            response["role_prompts"] = json!(prompts);
313        }
314    }
315
316    // Include workflow description if available
317    if let Some(ref desc) = workflows.description {
318        response["workflow_description"] = json!(desc);
319    }
320
321    // Include overlay information if overlays were applied
322    if !worker.overlays.is_empty() {
323        response["overlays"] = json!(worker.overlays);
324    }
325
326    Ok(response)
327}
328
329pub fn disconnect(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
330    let worker_id =
331        get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
332
333    // Get final_status from args or fall back to config
334    let final_status =
335        get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
336
337    // Validate final_status is untimed
338    if states_config.is_timed_state(&final_status) {
339        return Err(ToolError::invalid_value(
340            "final_status",
341            &format!(
342                "must be an untimed status, got '{}'. Valid statuses: {:?}",
343                final_status,
344                states_config.untimed_state_names()
345            ),
346        )
347        .into());
348    }
349
350    // Release worker locks before unregistering (close claim_sequence records)
351    let _ = db.release_worker_locks(&worker_id);
352
353    // Unregister and get summary
354    let summary = db.unregister_worker(&worker_id, &final_status)?;
355
356    Ok(json!({
357        "success": true,
358        "tasks_released": summary.tasks_released,
359        "files_released": summary.files_released,
360        "final_status": summary.final_status
361    }))
362}
363
364pub fn list_agents(
365    db: &Database,
366    states_config: &StatesConfig,
367    format: OutputFormat,
368    args: Value,
369) -> Result<ToolResult> {
370    // Extract filter parameters
371    let tags = get_string_array(&args, "tags");
372    let file = get_string(&args, "file");
373    let task = get_string(&args, "task");
374    let depth = get_i32(&args, "depth").unwrap_or(0).clamp(-3, 3);
375
376    // Auto-cleanup stale workers (default 5 minutes, 0 to disable)
377    let stale_timeout = get_i32(&args, "stale_timeout").unwrap_or(300);
378    let cleanup_summary = if stale_timeout > 0 {
379        let final_status = states_config.disconnect_state.clone();
380        db.cleanup_stale_workers(stale_timeout as i64, &final_status)
381            .ok()
382    } else {
383        None
384    };
385
386    // Get workers with filters
387    let workers =
388        db.list_workers_filtered(tags.as_ref(), file.as_deref(), task.as_deref(), depth)?;
389
390    // Get current time for heartbeat age calculation
391    let now = std::time::SystemTime::now()
392        .duration_since(std::time::UNIX_EPOCH)
393        .map(|d| d.as_millis() as i64)
394        .unwrap_or(0);
395
396    match format {
397        OutputFormat::Markdown => {
398            let mut output = String::new();
399            if let Some(ref summary) = cleanup_summary
400                && summary.workers_evicted > 0
401            {
402                output.push_str(&format!(
403                    "**Evicted {} stale worker(s)**: {} (released {} task(s), {} file(s))\n\n",
404                    summary.workers_evicted,
405                    summary.evicted_worker_ids.join(", "),
406                    summary.tasks_released,
407                    summary.files_released
408                ));
409            }
410            output.push_str(&format_workers_markdown(&workers));
411            Ok(ToolResult::Raw(output))
412        }
413        OutputFormat::Json => {
414            let mut result = json!({
415                "workers": workers.iter().map(|w| json!({
416                    "id": w.id,
417                    "tags": w.tags,
418                    "max_claims": w.max_claims,
419                    "claim_count": w.claim_count,
420                    "current_thought": w.current_thought,
421                    "registered_at": w.registered_at,
422                    "last_heartbeat": w.last_heartbeat,
423                    "heartbeat_age_ms": now - w.last_heartbeat,
424                    "workflow": w.workflow
425                })).collect::<Vec<_>>()
426            });
427
428            if let Some(summary) = cleanup_summary
429                && summary.workers_evicted > 0
430            {
431                result["cleanup"] = json!({
432                    "workers_evicted": summary.workers_evicted,
433                    "evicted_worker_ids": summary.evicted_worker_ids,
434                    "tasks_released": summary.tasks_released,
435                    "files_released": summary.files_released
436                });
437            }
438
439            Ok(ToolResult::Json(result))
440        }
441    }
442}
443
444pub fn cleanup_stale(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
445    // Default timeout: 5 minutes
446    let timeout = get_i32(&args, "timeout").unwrap_or(300) as i64;
447
448    // Get final_status from args or fall back to config
449    let final_status =
450        get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
451
452    // Validate final_status is untimed
453    if states_config.is_timed_state(&final_status) {
454        return Err(ToolError::invalid_value(
455            "final_status",
456            &format!(
457                "must be an untimed status, got '{}'. Valid statuses: {:?}",
458                final_status,
459                states_config.untimed_state_names()
460            ),
461        )
462        .into());
463    }
464
465    let summary = db.cleanup_stale_workers(timeout, &final_status)?;
466
467    Ok(json!({
468        "workers_evicted": summary.workers_evicted,
469        "evicted_worker_ids": summary.evicted_worker_ids,
470        "tasks_released": summary.tasks_released,
471        "files_released": summary.files_released,
472        "final_status": summary.final_status
473    }))
474}
475
476pub fn add_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
477    let worker_id =
478        get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
479    let overlay_name =
480        get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
481
482    // Validate overlay exists in named_overlays
483    if !config.workflows.named_overlays.contains_key(&overlay_name) {
484        let available: Vec<&String> = config.workflows.named_overlays.keys().collect();
485        return Err(ToolError::invalid_value(
486            "overlay",
487            &format!(
488                "unknown overlay '{}'. Available overlays: {:?}",
489                overlay_name, available
490            ),
491        )
492        .into());
493    }
494
495    // Get current worker
496    let worker = db
497        .get_worker(&worker_id)?
498        .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
499
500    // Check not already active
501    if worker.overlays.contains(&overlay_name) {
502        return Err(ToolError::invalid_value(
503            "overlay",
504            &format!(
505                "overlay '{}' is already active on worker '{}'",
506                overlay_name, worker_id
507            ),
508        )
509        .into());
510    }
511
512    // Build new overlays list (append)
513    let mut new_overlays = worker.overlays.clone();
514    new_overlays.push(overlay_name);
515
516    // Persist
517    let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
518
519    // Build merged workflow to compute diff
520    let base = resolve_base_workflow(&updated_worker, config);
521    let mut merged = (*base).clone();
522    for name in &updated_worker.overlays {
523        if let Some(overlay) = config.workflows.named_overlays.get(name) {
524            merged.apply_overlay(overlay);
525        }
526    }
527    merged.active_overlays = updated_worker.overlays.clone();
528
529    let overlay_diff = merged.compute_overlay_diff(&base);
530
531    let mut response = json!({
532        "success": true,
533        "worker_id": updated_worker.id,
534        "overlays": updated_worker.overlays,
535        "overlay_diff": overlay_diff,
536    });
537
538    // Include role info if applicable
539    if let Some(role_name) = merged.match_role(&updated_worker.tags) {
540        response["role"] = json!(role_name);
541        let prompts = merged.get_role_prompts(&role_name);
542        if !prompts.is_empty() {
543            response["role_prompts"] = json!(prompts);
544        }
545    }
546
547    Ok(response)
548}
549
550pub fn remove_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
551    let worker_id =
552        get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
553    let overlay_name =
554        get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
555
556    // Get current worker
557    let worker = db
558        .get_worker(&worker_id)?
559        .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
560
561    // Check overlay is currently active
562    if !worker.overlays.contains(&overlay_name) {
563        return Err(ToolError::invalid_value(
564            "overlay",
565            &format!(
566                "overlay '{}' is not active on worker '{}'. Active overlays: {:?}",
567                overlay_name, worker_id, worker.overlays
568            ),
569        )
570        .into());
571    }
572
573    // Build new overlays list (remove)
574    let new_overlays: Vec<String> = worker
575        .overlays
576        .into_iter()
577        .filter(|o| o != &overlay_name)
578        .collect();
579
580    // Persist
581    let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
582
583    // Build merged workflow to compute diff
584    let base = resolve_base_workflow(&updated_worker, config);
585    let mut merged = (*base).clone();
586    for name in &updated_worker.overlays {
587        if let Some(overlay) = config.workflows.named_overlays.get(name) {
588            merged.apply_overlay(overlay);
589        }
590    }
591    merged.active_overlays = updated_worker.overlays.clone();
592
593    let overlay_diff = merged.compute_overlay_diff(&base);
594
595    let mut response = json!({
596        "success": true,
597        "worker_id": updated_worker.id,
598        "overlays": updated_worker.overlays,
599        "overlay_diff": overlay_diff,
600    });
601
602    // Include role info if applicable
603    if let Some(role_name) = merged.match_role(&updated_worker.tags) {
604        response["role"] = json!(role_name);
605        let prompts = merged.get_role_prompts(&role_name);
606        if !prompts.is_empty() {
607            response["role_prompts"] = json!(prompts);
608        }
609    }
610
611    Ok(response)
612}
613
614/// Resolve the base workflow for a worker (before overlays).
615fn resolve_base_workflow(
616    worker: &crate::types::Worker,
617    config: &AppConfig,
618) -> std::sync::Arc<WorkflowsConfig> {
619    if let Some(ref workflow_name) = worker.workflow {
620        config
621            .workflows
622            .get_named_workflow(workflow_name)
623            .map(std::sync::Arc::clone)
624    } else {
625        None
626    }
627    .or_else(|| {
628        config
629            .workflows
630            .get_default_workflow()
631            .map(std::sync::Arc::clone)
632    })
633    .unwrap_or_else(|| std::sync::Arc::clone(&config.workflows))
634}