1use super::{get_bool, get_i32, get_string, get_string_array, make_tool_with_prompts};
4use crate::config::workflows::WorkflowsConfig;
5use crate::config::{AppConfig, Prompts, ServerPaths, StatesConfig};
6use crate::db::Database;
7use crate::error::ToolError;
8use crate::format::{OutputFormat, ToolResult, format_workers_markdown};
9use anyhow::Result;
10use rmcp::model::Tool;
11use serde_json::{Value, json};
12
13pub struct ConnectOptions<'a> {
15 pub db: &'a Database,
16 pub server_paths: &'a ServerPaths,
17 pub config: &'a AppConfig,
18 pub workflows: &'a WorkflowsConfig,
20}
21
22pub fn get_tools(prompts: &Prompts) -> Vec<Tool> {
23 vec![
24 make_tool_with_prompts(
25 "connect",
26 "Connect as a worker. Call this FIRST before using other tools. Returns worker_id (save it for all subsequent calls). Tags enable task affinity matching.",
27 json!({
28 "worker_id": {
29 "type": "string",
30 "description": "Only use if assigned a unique name (e.g., 'worker-17', 'coordinator'). Avoid generic names like 'claude'. Leave empty for an auto-generated petname."
31 },
32 "tags": {
33 "type": "array",
34 "items": { "type": "string" },
35 "description": "Worker capability tags for task affinity matching. Tasks use needed_tags/wanted_tags to prefer or require workers with matching tags."
36 },
37 "force": {
38 "type": "boolean",
39 "description": "Force reconnection if worker ID already exists (default: false). Use for stuck worker recovery."
40 },
41 "db_path": {
42 "type": "string",
43 "description": "Override database file path (same as TASK_GRAPH_DB_PATH env var). Note: Can only be set before server starts."
44 },
45 "media_dir": {
46 "type": "string",
47 "description": "Override media directory path (same as TASK_GRAPH_MEDIA_DIR env var). Note: Can only be set before server starts."
48 },
49 "log_dir": {
50 "type": "string",
51 "description": "Override log directory path (same as TASK_GRAPH_LOG_DIR env var). Note: Can only be set before server starts."
52 },
53 "config_path": {
54 "type": "string",
55 "description": "Override config file path (same as TASK_GRAPH_CONFIG_PATH env var). Note: Can only be set before server starts."
56 },
57 "workflow": {
58 "type": "string",
59 "description": "Named workflow to use (e.g., 'swarm' for workflow-swarm.yaml). If not specified, uses default workflows.yaml."
60 },
61 "overlays": {
62 "type": "array",
63 "items": { "type": "string" },
64 "description": "Overlay names to apply on top of the workflow, in order (e.g., ['git', 'user-request']). Use list_workflows to see available overlays."
65 },
66 "max_claims": {
67 "type": "integer",
68 "description": "Maximum concurrent task claims (default: 1). Use 0 for unlimited."
69 }
70 }),
71 vec![],
72 prompts,
73 ),
74 make_tool_with_prompts(
75 "disconnect",
76 "Disconnect a worker, releasing all claims and locks.",
77 json!({
78 "worker_id": {
79 "type": "string",
80 "description": "The worker's ID"
81 },
82 "final_status": {
83 "type": "string",
84 "enum": ["pending", "completed", "cancelled", "failed"],
85 "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
86 }
87 }),
88 vec!["worker_id"],
89 prompts,
90 ),
91 make_tool_with_prompts(
92 "list_agents",
93 "List all connected workers with their current status, claim counts, and what they're working on. Automatically evicts stale workers (no heartbeat within timeout).",
94 json!({
95 "tags": {
96 "type": "array",
97 "items": { "type": "string" },
98 "description": "Filter workers that have ALL of these capability/affinity tags"
99 },
100 "file": {
101 "type": "string",
102 "description": "Filter workers that have claimed this file"
103 },
104 "task": {
105 "type": "string",
106 "description": "Filter workers related to this task ID"
107 },
108 "depth": {
109 "type": "integer",
110 "description": "Task relationship depth (-3 to 3). Negative: ancestors, positive: descendants. Used with 'task' filter."
111 },
112 "stale_timeout": {
113 "type": "integer",
114 "description": "Seconds without heartbeat before a worker is considered stale and evicted. Set to 0 to disable auto-cleanup. Default: 300 (5 minutes)."
115 }
116 }),
117 vec![],
118 prompts,
119 ),
120 make_tool_with_prompts(
121 "cleanup_stale",
122 "Evict stale workers that haven't sent a heartbeat within the timeout period. Releases their task claims and file locks. If worker_id is provided, force-expire that specific worker regardless of staleness.",
123 json!({
124 "worker_id": {
125 "type": "string",
126 "description": "If provided, force-expire this specific worker regardless of staleness. When omitted, expire all stale workers based on timeout."
127 },
128 "timeout": {
129 "type": "integer",
130 "description": "Seconds without heartbeat before a worker is considered stale. Default: 300 (5 minutes). Ignored when worker_id is provided."
131 },
132 "final_status": {
133 "type": "string",
134 "enum": ["pending", "completed", "cancelled", "failed"],
135 "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
136 }
137 }),
138 vec![],
139 prompts,
140 ),
141 make_tool_with_prompts(
142 "add_overlay",
143 "Dynamically add an overlay to a connected worker's active overlay stack. The overlay is applied on top of existing overlays and persisted.",
144 json!({
145 "worker_id": {
146 "type": "string",
147 "description": "The worker's ID"
148 },
149 "overlay": {
150 "type": "string",
151 "description": "Name of the overlay to add (e.g., 'git', 'troubleshooting'). Use list_workflows to see available overlays."
152 }
153 }),
154 vec!["worker_id", "overlay"],
155 prompts,
156 ),
157 make_tool_with_prompts(
158 "remove_overlay",
159 "Dynamically remove an overlay from a connected worker's active overlay stack. The change is persisted immediately.",
160 json!({
161 "worker_id": {
162 "type": "string",
163 "description": "The worker's ID"
164 },
165 "overlay": {
166 "type": "string",
167 "description": "Name of the overlay to remove (must be currently active on this worker)"
168 }
169 }),
170 vec!["worker_id", "overlay"],
171 prompts,
172 ),
173 ]
174}
175
176pub fn connect(opts: ConnectOptions<'_>, args: Value) -> Result<Value> {
177 let ConnectOptions {
178 db,
179 server_paths,
180 config,
181 workflows,
182 } = opts;
183
184 let states_config = &config.states;
185 let phases_config = &config.phases;
186 let deps_config = &config.deps;
187 let tags_config = &config.tags;
188 let ids_config = &config.ids;
189
190 let worker_id = get_string(&args, "worker_id");
191 let tags = get_string_array(&args, "tags").unwrap_or_default();
192 let force = get_bool(&args, "force").unwrap_or(false);
193 let workflow = get_string(&args, "workflow");
194 let max_claims = get_i32(&args, "max_claims");
195
196 let mut path_notes: Vec<String> = Vec::new();
204
205 if let Some(requested_db) = get_string(&args, "db_path")
206 && server_paths.db_path.to_string_lossy() != requested_db
207 {
208 path_notes.push(format!(
209 "db_path: requested '{}' but server is using '{}' (set TASK_GRAPH_DB_PATH before starting server)",
210 requested_db,
211 server_paths.db_path.display()
212 ));
213 }
214
215 if let Some(requested_media) = get_string(&args, "media_dir")
216 && server_paths.media_dir.to_string_lossy() != requested_media
217 {
218 path_notes.push(format!(
219 "media_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_MEDIA_DIR before starting server)",
220 requested_media,
221 server_paths.media_dir.display()
222 ));
223 }
224
225 if let Some(requested_log) = get_string(&args, "log_dir")
226 && server_paths.log_dir.to_string_lossy() != requested_log
227 {
228 path_notes.push(format!(
229 "log_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_LOG_DIR before starting server)",
230 requested_log,
231 server_paths.log_dir.display()
232 ));
233 }
234
235 if let Some(requested_config) = get_string(&args, "config_path") {
236 let current_config = server_paths
237 .config_path
238 .as_ref()
239 .map(|p| p.to_string_lossy().to_string());
240 if current_config.as_deref() != Some(&requested_config) {
241 path_notes.push(format!(
242 "config_path: requested '{}' but server is using '{}' (set TASK_GRAPH_CONFIG_PATH before starting server)",
243 requested_config,
244 current_config.unwrap_or_else(|| "default locations".to_string())
245 ));
246 }
247 }
248
249 let disconnect_status = states_config.disconnect_state.clone();
251 let _ = db.cleanup_stale_workers(300, &disconnect_status);
252
253 if force
255 && let Some(ref wid) = worker_id
256 && db.get_worker(wid)?.is_some()
257 {
258 let _ = db.expire_worker(wid, &disconnect_status);
259 }
260
261 let overlays = get_string_array(&args, "overlays").unwrap_or_default();
262 let worker = db.register_worker(
263 worker_id, tags, force, ids_config, workflow, overlays, max_claims,
264 )?;
265
266 let timed_states: Vec<&str> = states_config
268 .definitions
269 .iter()
270 .filter(|(_, def)| def.timed)
271 .map(|(name, _)| name.as_str())
272 .collect();
273
274 let terminal_states: Vec<&str> = states_config
275 .definitions
276 .iter()
277 .filter(|(_, def)| def.exits.is_empty())
278 .map(|(name, _)| name.as_str())
279 .collect();
280
281 let mut response = json!({
282 "version": env!("CARGO_PKG_VERSION"),
283 "worker_id": &worker.id,
284 "tags": worker.tags,
285 "max_claims": worker.max_claims,
286 "registered_at": crate::types::ms_to_iso(worker.registered_at),
287 "workflow": worker.workflow,
288 "paths": {
289 "db_path": server_paths.db_path.to_string_lossy(),
290 "media_dir": server_paths.media_dir.to_string_lossy(),
291 "log_dir": server_paths.log_dir.to_string_lossy(),
292 "config_path": server_paths.config_path.as_ref().map(|p| p.to_string_lossy().to_string())
293 },
294 "config": {
295 "states": states_config.state_names(),
296 "initial_state": &states_config.initial,
297 "timed_states": timed_states,
298 "terminal_states": terminal_states,
299 "blocking_states": &states_config.blocking_states,
300 "phases": phases_config.phase_names(),
301 "dependency_types": deps_config.dep_type_names(),
302 "known_tags": tags_config.tag_names()
303 }
304 });
305
306 response["hints"] = json!({
308 "resources": "MCP resources are available for live data. Use config://current for active configuration, query://tasks/ready for claimable tasks, docs://skills/list for skill documentation."
309 });
310
311 if !path_notes.is_empty() {
312 response["path_warnings"] = json!(path_notes);
313 }
314
315 if let Some(role_name) = workflows.match_role(&worker.tags) {
317 let mut role_info = json!({
318 "role": &role_name,
319 });
320
321 if let Some(role_def) = workflows.get_role(&role_name) {
323 if let Some(ref desc) = role_def.description {
324 role_info["description"] = json!(desc);
325 }
326 if let Some(max) = role_def.max_claims {
327 role_info["max_claims"] = json!(max);
328 }
329 if let Some(can_assign) = role_def.can_assign {
330 role_info["can_assign"] = json!(can_assign);
331 }
332 }
333
334 response["role"] = role_info;
335
336 let prompts = workflows.get_role_prompts(&role_name);
338 if !prompts.is_empty() {
339 response["role_prompts"] = json!(prompts);
340 }
341 }
342
343 if let Some(ref desc) = workflows.description {
345 response["workflow_description"] = json!(desc);
346 }
347
348 if !worker.overlays.is_empty() {
350 response["overlays"] = json!(worker.overlays);
351 }
352
353 Ok(response)
354}
355
356pub fn disconnect(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
357 let worker_id =
358 get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
359
360 let final_status =
362 get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
363
364 if states_config.is_timed_state(&final_status) {
366 return Err(ToolError::invalid_value(
367 "final_status",
368 &format!(
369 "must be an untimed status, got '{}'. Valid statuses: {:?}",
370 final_status,
371 states_config.untimed_state_names()
372 ),
373 )
374 .into());
375 }
376
377 let _ = db.release_worker_locks(&worker_id);
379
380 let summary = db.unregister_worker(&worker_id, &final_status)?;
382
383 Ok(json!({
384 "success": true,
385 "tasks_released": summary.tasks_released,
386 "files_released": summary.files_released,
387 "final_status": summary.final_status
388 }))
389}
390
391pub fn list_agents(
392 db: &Database,
393 states_config: &StatesConfig,
394 format: OutputFormat,
395 args: Value,
396) -> Result<ToolResult> {
397 let tags = get_string_array(&args, "tags");
399 let file = get_string(&args, "file");
400 let task = get_string(&args, "task");
401 let depth = get_i32(&args, "depth").unwrap_or(0).clamp(-3, 3);
402
403 let stale_timeout = get_i32(&args, "stale_timeout").unwrap_or(300);
405 let cleanup_summary = if stale_timeout > 0 {
406 let final_status = states_config.disconnect_state.clone();
407 db.cleanup_stale_workers(stale_timeout as i64, &final_status)
408 .ok()
409 } else {
410 None
411 };
412
413 let workers = db.list_workers_filtered(
415 tags.as_ref(),
416 file.as_deref(),
417 task.as_deref(),
418 depth,
419 states_config,
420 )?;
421
422 let now = std::time::SystemTime::now()
424 .duration_since(std::time::UNIX_EPOCH)
425 .map(|d| d.as_millis() as i64)
426 .unwrap_or(0);
427
428 match format {
429 OutputFormat::Markdown => {
430 let mut output = String::new();
431 if let Some(ref summary) = cleanup_summary
432 && summary.workers_evicted > 0
433 {
434 output.push_str(&format!(
435 "**Evicted {} stale worker(s)**: {} (released {} task(s), {} file(s))\n\n",
436 summary.workers_evicted,
437 summary.evicted_worker_ids.join(", "),
438 summary.tasks_released,
439 summary.files_released
440 ));
441 }
442 output.push_str(&format_workers_markdown(&workers));
443 Ok(ToolResult::Raw(output))
444 }
445 OutputFormat::Json => {
446 let mut result = json!({
447 "workers": workers.iter().map(|w| json!({
448 "id": w.id,
449 "tags": w.tags,
450 "max_claims": w.max_claims,
451 "claim_count": w.claim_count,
452 "current_thought": w.current_thought,
453 "registered_at": crate::types::ms_to_iso(w.registered_at),
454 "last_heartbeat": crate::types::ms_to_iso(w.last_heartbeat),
455 "heartbeat_age_ms": now - w.last_heartbeat,
456 "workflow": w.workflow
457 })).collect::<Vec<_>>()
458 });
459
460 if let Some(summary) = cleanup_summary
461 && summary.workers_evicted > 0
462 {
463 result["cleanup"] = json!({
464 "workers_evicted": summary.workers_evicted,
465 "evicted_worker_ids": summary.evicted_worker_ids,
466 "tasks_released": summary.tasks_released,
467 "files_released": summary.files_released
468 });
469 }
470
471 Ok(ToolResult::Json(result))
472 }
473 }
474}
475
476pub fn cleanup_stale(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
477 let final_status =
479 get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
480
481 if states_config.is_timed_state(&final_status) {
483 return Err(ToolError::invalid_value(
484 "final_status",
485 &format!(
486 "must be an untimed status, got '{}'. Valid statuses: {:?}",
487 final_status,
488 states_config.untimed_state_names()
489 ),
490 )
491 .into());
492 }
493
494 if let Some(worker_id) = get_string(&args, "worker_id") {
496 let summary = db.expire_worker(&worker_id, &final_status)?;
497
498 return Ok(json!({
499 "workers_evicted": 1,
500 "evicted_worker_ids": [worker_id],
501 "tasks_released": summary.tasks_released,
502 "files_released": summary.files_released,
503 "final_status": summary.final_status
504 }));
505 }
506
507 let timeout = get_i32(&args, "timeout").unwrap_or(300) as i64;
509 let summary = db.cleanup_stale_workers(timeout, &final_status)?;
510
511 Ok(json!({
512 "workers_evicted": summary.workers_evicted,
513 "evicted_worker_ids": summary.evicted_worker_ids,
514 "tasks_released": summary.tasks_released,
515 "files_released": summary.files_released,
516 "final_status": summary.final_status
517 }))
518}
519
520pub fn add_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
521 let worker_id =
522 get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
523 let overlay_name =
524 get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
525
526 if !config.workflows.named_overlays.contains_key(&overlay_name) {
528 let available: Vec<&String> = config.workflows.named_overlays.keys().collect();
529 return Err(ToolError::invalid_value(
530 "overlay",
531 &format!(
532 "unknown overlay '{}'. Available overlays: {:?}",
533 overlay_name, available
534 ),
535 )
536 .into());
537 }
538
539 let worker = db
541 .get_worker(&worker_id)?
542 .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
543
544 if worker.overlays.contains(&overlay_name) {
546 return Err(ToolError::invalid_value(
547 "overlay",
548 &format!(
549 "overlay '{}' is already active on worker '{}'",
550 overlay_name, worker_id
551 ),
552 )
553 .into());
554 }
555
556 let mut new_overlays = worker.overlays.clone();
558 new_overlays.push(overlay_name);
559
560 let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
562
563 let base = resolve_base_workflow(&updated_worker, config);
565 let mut merged = (*base).clone();
566 for name in &updated_worker.overlays {
567 if let Some(overlay) = config.workflows.named_overlays.get(name) {
568 merged.apply_overlay(overlay);
569 }
570 }
571 merged.active_overlays = updated_worker.overlays.clone();
572
573 let overlay_diff = merged.compute_overlay_diff(&base);
574
575 let mut response = json!({
576 "success": true,
577 "worker_id": updated_worker.id,
578 "overlays": updated_worker.overlays,
579 "overlay_diff": overlay_diff,
580 });
581
582 if let Some(role_name) = merged.match_role(&updated_worker.tags) {
584 response["role"] = json!(role_name);
585 let prompts = merged.get_role_prompts(&role_name);
586 if !prompts.is_empty() {
587 response["role_prompts"] = json!(prompts);
588 }
589 }
590
591 Ok(response)
592}
593
594pub fn remove_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
595 let worker_id =
596 get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
597 let overlay_name =
598 get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
599
600 let worker = db
602 .get_worker(&worker_id)?
603 .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
604
605 if !worker.overlays.contains(&overlay_name) {
607 return Err(ToolError::invalid_value(
608 "overlay",
609 &format!(
610 "overlay '{}' is not active on worker '{}'. Active overlays: {:?}",
611 overlay_name, worker_id, worker.overlays
612 ),
613 )
614 .into());
615 }
616
617 let new_overlays: Vec<String> = worker
619 .overlays
620 .into_iter()
621 .filter(|o| o != &overlay_name)
622 .collect();
623
624 let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
626
627 let base = resolve_base_workflow(&updated_worker, config);
629 let mut merged = (*base).clone();
630 for name in &updated_worker.overlays {
631 if let Some(overlay) = config.workflows.named_overlays.get(name) {
632 merged.apply_overlay(overlay);
633 }
634 }
635 merged.active_overlays = updated_worker.overlays.clone();
636
637 let overlay_diff = merged.compute_overlay_diff(&base);
638
639 let mut response = json!({
640 "success": true,
641 "worker_id": updated_worker.id,
642 "overlays": updated_worker.overlays,
643 "overlay_diff": overlay_diff,
644 });
645
646 if let Some(role_name) = merged.match_role(&updated_worker.tags) {
648 response["role"] = json!(role_name);
649 let prompts = merged.get_role_prompts(&role_name);
650 if !prompts.is_empty() {
651 response["role_prompts"] = json!(prompts);
652 }
653 }
654
655 Ok(response)
656}
657
658fn resolve_base_workflow(
660 worker: &crate::types::Worker,
661 config: &AppConfig,
662) -> std::sync::Arc<WorkflowsConfig> {
663 if let Some(ref workflow_name) = worker.workflow {
664 config
665 .workflows
666 .get_named_workflow(workflow_name)
667 .map(std::sync::Arc::clone)
668 } else {
669 None
670 }
671 .or_else(|| {
672 config
673 .workflows
674 .get_default_workflow()
675 .map(std::sync::Arc::clone)
676 })
677 .unwrap_or_else(|| std::sync::Arc::clone(&config.workflows))
678}