1use super::{get_bool, get_i32, get_string, get_string_array, make_tool_with_prompts};
4use crate::config::workflows::WorkflowsConfig;
5use crate::config::{AppConfig, Prompts, ServerPaths, StatesConfig};
6use crate::db::Database;
7use crate::error::ToolError;
8use crate::format::{OutputFormat, ToolResult, format_workers_markdown};
9use anyhow::Result;
10use rmcp::model::Tool;
11use serde_json::{Value, json};
12
13pub struct ConnectOptions<'a> {
15 pub db: &'a Database,
16 pub server_paths: &'a ServerPaths,
17 pub config: &'a AppConfig,
18 pub workflows: &'a WorkflowsConfig,
20}
21
22pub fn get_tools(prompts: &Prompts) -> Vec<Tool> {
23 vec![
24 make_tool_with_prompts(
25 "connect",
26 "Connect as a worker. Call this FIRST before using other tools. Returns worker_id (save it for all subsequent calls). Tags enable task affinity matching.",
27 json!({
28 "worker_id": {
29 "type": "string",
30 "description": "Only use if assigned a unique name (e.g., 'worker-17', 'coordinator'). Avoid generic names like 'claude'. Leave empty for an auto-generated petname."
31 },
32 "tags": {
33 "type": "array",
34 "items": { "type": "string" },
35 "description": "Freeform tags for capabilities, roles, etc."
36 },
37 "force": {
38 "type": "boolean",
39 "description": "Force reconnection if worker ID already exists (default: false). Use for stuck worker recovery."
40 },
41 "db_path": {
42 "type": "string",
43 "description": "Override database file path (same as TASK_GRAPH_DB_PATH env var). Note: Can only be set before server starts."
44 },
45 "media_dir": {
46 "type": "string",
47 "description": "Override media directory path (same as TASK_GRAPH_MEDIA_DIR env var). Note: Can only be set before server starts."
48 },
49 "log_dir": {
50 "type": "string",
51 "description": "Override log directory path (same as TASK_GRAPH_LOG_DIR env var). Note: Can only be set before server starts."
52 },
53 "config_path": {
54 "type": "string",
55 "description": "Override config file path (same as TASK_GRAPH_CONFIG_PATH env var). Note: Can only be set before server starts."
56 },
57 "workflow": {
58 "type": "string",
59 "description": "Named workflow to use (e.g., 'swarm' for workflow-swarm.yaml). If not specified, uses default workflows.yaml."
60 },
61 "overlays": {
62 "type": "array",
63 "items": { "type": "string" },
64 "description": "Overlay names to apply on top of the workflow, in order (e.g., ['git', 'user-request']). Use list_workflows to see available overlays."
65 }
66 }),
67 vec![],
68 prompts,
69 ),
70 make_tool_with_prompts(
71 "disconnect",
72 "Disconnect a worker, releasing all claims and locks.",
73 json!({
74 "worker_id": {
75 "type": "string",
76 "description": "The worker's ID"
77 },
78 "final_status": {
79 "type": "string",
80 "enum": ["pending", "completed", "cancelled", "failed"],
81 "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
82 }
83 }),
84 vec!["worker_id"],
85 prompts,
86 ),
87 make_tool_with_prompts(
88 "list_agents",
89 "List all connected workers with their current status, claim counts, and what they're working on. Automatically evicts stale workers (no heartbeat within timeout).",
90 json!({
91 "tags": {
92 "type": "array",
93 "items": { "type": "string" },
94 "description": "Filter workers that have ALL of these tags"
95 },
96 "file": {
97 "type": "string",
98 "description": "Filter workers that have claimed this file"
99 },
100 "task": {
101 "type": "string",
102 "description": "Filter workers related to this task ID"
103 },
104 "depth": {
105 "type": "integer",
106 "description": "Task relationship depth (-3 to 3). Negative: ancestors, positive: descendants. Used with 'task' filter."
107 },
108 "stale_timeout": {
109 "type": "integer",
110 "description": "Seconds without heartbeat before a worker is considered stale and evicted. Set to 0 to disable auto-cleanup. Default: 300 (5 minutes)."
111 }
112 }),
113 vec![],
114 prompts,
115 ),
116 make_tool_with_prompts(
117 "cleanup_stale",
118 "Evict stale workers that haven't sent a heartbeat within the timeout period. Releases their task claims and file locks.",
119 json!({
120 "timeout": {
121 "type": "integer",
122 "description": "Seconds without heartbeat before a worker is considered stale. Default: 300 (5 minutes)."
123 },
124 "final_status": {
125 "type": "string",
126 "enum": ["pending", "completed", "cancelled", "failed"],
127 "description": "Status to set released tasks to (default: config disconnect_status, typically 'pending'). Must be an untimed status."
128 }
129 }),
130 vec![],
131 prompts,
132 ),
133 make_tool_with_prompts(
134 "add_overlay",
135 "Dynamically add an overlay to a connected worker's active overlay stack. The overlay is applied on top of existing overlays and persisted.",
136 json!({
137 "worker_id": {
138 "type": "string",
139 "description": "The worker's ID"
140 },
141 "overlay": {
142 "type": "string",
143 "description": "Name of the overlay to add (e.g., 'git', 'troubleshooting'). Use list_workflows to see available overlays."
144 }
145 }),
146 vec!["worker_id", "overlay"],
147 prompts,
148 ),
149 make_tool_with_prompts(
150 "remove_overlay",
151 "Dynamically remove an overlay from a connected worker's active overlay stack. The change is persisted immediately.",
152 json!({
153 "worker_id": {
154 "type": "string",
155 "description": "The worker's ID"
156 },
157 "overlay": {
158 "type": "string",
159 "description": "Name of the overlay to remove (must be currently active on this worker)"
160 }
161 }),
162 vec!["worker_id", "overlay"],
163 prompts,
164 ),
165 ]
166}
167
168pub fn connect(opts: ConnectOptions<'_>, args: Value) -> Result<Value> {
169 let ConnectOptions {
170 db,
171 server_paths,
172 config,
173 workflows,
174 } = opts;
175
176 let states_config = &config.states;
177 let phases_config = &config.phases;
178 let deps_config = &config.deps;
179 let tags_config = &config.tags;
180 let ids_config = &config.ids;
181
182 let worker_id = get_string(&args, "worker_id");
183 let tags = get_string_array(&args, "tags").unwrap_or_default();
184 let force = get_bool(&args, "force").unwrap_or(false);
185 let workflow = get_string(&args, "workflow");
186
187 let tag_warnings = tags_config.validate_tags(&tags)?;
189
190 let mut path_notes: Vec<String> = Vec::new();
192
193 if let Some(requested_db) = get_string(&args, "db_path")
194 && server_paths.db_path.to_string_lossy() != requested_db
195 {
196 path_notes.push(format!(
197 "db_path: requested '{}' but server is using '{}' (set TASK_GRAPH_DB_PATH before starting server)",
198 requested_db,
199 server_paths.db_path.display()
200 ));
201 }
202
203 if let Some(requested_media) = get_string(&args, "media_dir")
204 && server_paths.media_dir.to_string_lossy() != requested_media
205 {
206 path_notes.push(format!(
207 "media_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_MEDIA_DIR before starting server)",
208 requested_media,
209 server_paths.media_dir.display()
210 ));
211 }
212
213 if let Some(requested_log) = get_string(&args, "log_dir")
214 && server_paths.log_dir.to_string_lossy() != requested_log
215 {
216 path_notes.push(format!(
217 "log_dir: requested '{}' but server is using '{}' (set TASK_GRAPH_LOG_DIR before starting server)",
218 requested_log,
219 server_paths.log_dir.display()
220 ));
221 }
222
223 if let Some(requested_config) = get_string(&args, "config_path") {
224 let current_config = server_paths
225 .config_path
226 .as_ref()
227 .map(|p| p.to_string_lossy().to_string());
228 if current_config.as_deref() != Some(&requested_config) {
229 path_notes.push(format!(
230 "config_path: requested '{}' but server is using '{}' (set TASK_GRAPH_CONFIG_PATH before starting server)",
231 requested_config,
232 current_config.unwrap_or_else(|| "default locations".to_string())
233 ));
234 }
235 }
236
237 let overlays = get_string_array(&args, "overlays").unwrap_or_default();
238 let worker = db.register_worker(worker_id, tags, force, ids_config, workflow, overlays)?;
239
240 let timed_states: Vec<&str> = states_config
242 .definitions
243 .iter()
244 .filter(|(_, def)| def.timed)
245 .map(|(name, _)| name.as_str())
246 .collect();
247
248 let terminal_states: Vec<&str> = states_config
249 .definitions
250 .iter()
251 .filter(|(_, def)| def.exits.is_empty())
252 .map(|(name, _)| name.as_str())
253 .collect();
254
255 let mut response = json!({
256 "version": env!("CARGO_PKG_VERSION"),
257 "worker_id": &worker.id,
258 "tags": worker.tags,
259 "max_claims": worker.max_claims,
260 "registered_at": worker.registered_at,
261 "workflow": worker.workflow,
262 "paths": {
263 "db_path": server_paths.db_path.to_string_lossy(),
264 "media_dir": server_paths.media_dir.to_string_lossy(),
265 "log_dir": server_paths.log_dir.to_string_lossy(),
266 "config_path": server_paths.config_path.as_ref().map(|p| p.to_string_lossy().to_string())
267 },
268 "config": {
269 "states": states_config.state_names(),
270 "initial_state": &states_config.initial,
271 "timed_states": timed_states,
272 "terminal_states": terminal_states,
273 "blocking_states": &states_config.blocking_states,
274 "phases": phases_config.phase_names(),
275 "dependency_types": deps_config.dep_type_names(),
276 "known_tags": tags_config.tag_names()
277 }
278 });
279
280 if !path_notes.is_empty() {
281 response["path_warnings"] = json!(path_notes);
282 }
283
284 if !tag_warnings.is_empty() {
285 response["tag_warnings"] = json!(tag_warnings);
286 }
287
288 if let Some(role_name) = workflows.match_role(&worker.tags) {
290 let mut role_info = json!({
291 "role": &role_name,
292 });
293
294 if let Some(role_def) = workflows.get_role(&role_name) {
296 if let Some(ref desc) = role_def.description {
297 role_info["description"] = json!(desc);
298 }
299 if let Some(max) = role_def.max_claims {
300 role_info["max_claims"] = json!(max);
301 }
302 if let Some(can_assign) = role_def.can_assign {
303 role_info["can_assign"] = json!(can_assign);
304 }
305 }
306
307 response["role"] = role_info;
308
309 let prompts = workflows.get_role_prompts(&role_name);
311 if !prompts.is_empty() {
312 response["role_prompts"] = json!(prompts);
313 }
314 }
315
316 if let Some(ref desc) = workflows.description {
318 response["workflow_description"] = json!(desc);
319 }
320
321 if !worker.overlays.is_empty() {
323 response["overlays"] = json!(worker.overlays);
324 }
325
326 Ok(response)
327}
328
329pub fn disconnect(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
330 let worker_id =
331 get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
332
333 let final_status =
335 get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
336
337 if states_config.is_timed_state(&final_status) {
339 return Err(ToolError::invalid_value(
340 "final_status",
341 &format!(
342 "must be an untimed status, got '{}'. Valid statuses: {:?}",
343 final_status,
344 states_config.untimed_state_names()
345 ),
346 )
347 .into());
348 }
349
350 let _ = db.release_worker_locks(&worker_id);
352
353 let summary = db.unregister_worker(&worker_id, &final_status)?;
355
356 Ok(json!({
357 "success": true,
358 "tasks_released": summary.tasks_released,
359 "files_released": summary.files_released,
360 "final_status": summary.final_status
361 }))
362}
363
364pub fn list_agents(
365 db: &Database,
366 states_config: &StatesConfig,
367 format: OutputFormat,
368 args: Value,
369) -> Result<ToolResult> {
370 let tags = get_string_array(&args, "tags");
372 let file = get_string(&args, "file");
373 let task = get_string(&args, "task");
374 let depth = get_i32(&args, "depth").unwrap_or(0).clamp(-3, 3);
375
376 let stale_timeout = get_i32(&args, "stale_timeout").unwrap_or(300);
378 let cleanup_summary = if stale_timeout > 0 {
379 let final_status = states_config.disconnect_state.clone();
380 db.cleanup_stale_workers(stale_timeout as i64, &final_status)
381 .ok()
382 } else {
383 None
384 };
385
386 let workers =
388 db.list_workers_filtered(tags.as_ref(), file.as_deref(), task.as_deref(), depth)?;
389
390 let now = std::time::SystemTime::now()
392 .duration_since(std::time::UNIX_EPOCH)
393 .map(|d| d.as_millis() as i64)
394 .unwrap_or(0);
395
396 match format {
397 OutputFormat::Markdown => {
398 let mut output = String::new();
399 if let Some(ref summary) = cleanup_summary
400 && summary.workers_evicted > 0
401 {
402 output.push_str(&format!(
403 "**Evicted {} stale worker(s)**: {} (released {} task(s), {} file(s))\n\n",
404 summary.workers_evicted,
405 summary.evicted_worker_ids.join(", "),
406 summary.tasks_released,
407 summary.files_released
408 ));
409 }
410 output.push_str(&format_workers_markdown(&workers));
411 Ok(ToolResult::Raw(output))
412 }
413 OutputFormat::Json => {
414 let mut result = json!({
415 "workers": workers.iter().map(|w| json!({
416 "id": w.id,
417 "tags": w.tags,
418 "max_claims": w.max_claims,
419 "claim_count": w.claim_count,
420 "current_thought": w.current_thought,
421 "registered_at": w.registered_at,
422 "last_heartbeat": w.last_heartbeat,
423 "heartbeat_age_ms": now - w.last_heartbeat,
424 "workflow": w.workflow
425 })).collect::<Vec<_>>()
426 });
427
428 if let Some(summary) = cleanup_summary
429 && summary.workers_evicted > 0
430 {
431 result["cleanup"] = json!({
432 "workers_evicted": summary.workers_evicted,
433 "evicted_worker_ids": summary.evicted_worker_ids,
434 "tasks_released": summary.tasks_released,
435 "files_released": summary.files_released
436 });
437 }
438
439 Ok(ToolResult::Json(result))
440 }
441 }
442}
443
444pub fn cleanup_stale(db: &Database, states_config: &StatesConfig, args: Value) -> Result<Value> {
445 let timeout = get_i32(&args, "timeout").unwrap_or(300) as i64;
447
448 let final_status =
450 get_string(&args, "final_status").unwrap_or_else(|| states_config.disconnect_state.clone());
451
452 if states_config.is_timed_state(&final_status) {
454 return Err(ToolError::invalid_value(
455 "final_status",
456 &format!(
457 "must be an untimed status, got '{}'. Valid statuses: {:?}",
458 final_status,
459 states_config.untimed_state_names()
460 ),
461 )
462 .into());
463 }
464
465 let summary = db.cleanup_stale_workers(timeout, &final_status)?;
466
467 Ok(json!({
468 "workers_evicted": summary.workers_evicted,
469 "evicted_worker_ids": summary.evicted_worker_ids,
470 "tasks_released": summary.tasks_released,
471 "files_released": summary.files_released,
472 "final_status": summary.final_status
473 }))
474}
475
476pub fn add_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
477 let worker_id =
478 get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
479 let overlay_name =
480 get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
481
482 if !config.workflows.named_overlays.contains_key(&overlay_name) {
484 let available: Vec<&String> = config.workflows.named_overlays.keys().collect();
485 return Err(ToolError::invalid_value(
486 "overlay",
487 &format!(
488 "unknown overlay '{}'. Available overlays: {:?}",
489 overlay_name, available
490 ),
491 )
492 .into());
493 }
494
495 let worker = db
497 .get_worker(&worker_id)?
498 .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
499
500 if worker.overlays.contains(&overlay_name) {
502 return Err(ToolError::invalid_value(
503 "overlay",
504 &format!(
505 "overlay '{}' is already active on worker '{}'",
506 overlay_name, worker_id
507 ),
508 )
509 .into());
510 }
511
512 let mut new_overlays = worker.overlays.clone();
514 new_overlays.push(overlay_name);
515
516 let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
518
519 let base = resolve_base_workflow(&updated_worker, config);
521 let mut merged = (*base).clone();
522 for name in &updated_worker.overlays {
523 if let Some(overlay) = config.workflows.named_overlays.get(name) {
524 merged.apply_overlay(overlay);
525 }
526 }
527 merged.active_overlays = updated_worker.overlays.clone();
528
529 let overlay_diff = merged.compute_overlay_diff(&base);
530
531 let mut response = json!({
532 "success": true,
533 "worker_id": updated_worker.id,
534 "overlays": updated_worker.overlays,
535 "overlay_diff": overlay_diff,
536 });
537
538 if let Some(role_name) = merged.match_role(&updated_worker.tags) {
540 response["role"] = json!(role_name);
541 let prompts = merged.get_role_prompts(&role_name);
542 if !prompts.is_empty() {
543 response["role_prompts"] = json!(prompts);
544 }
545 }
546
547 Ok(response)
548}
549
550pub fn remove_overlay(db: &Database, config: &AppConfig, args: Value) -> Result<Value> {
551 let worker_id =
552 get_string(&args, "worker_id").ok_or_else(|| ToolError::missing_field("worker_id"))?;
553 let overlay_name =
554 get_string(&args, "overlay").ok_or_else(|| ToolError::missing_field("overlay"))?;
555
556 let worker = db
558 .get_worker(&worker_id)?
559 .ok_or_else(|| ToolError::agent_not_found(&worker_id))?;
560
561 if !worker.overlays.contains(&overlay_name) {
563 return Err(ToolError::invalid_value(
564 "overlay",
565 &format!(
566 "overlay '{}' is not active on worker '{}'. Active overlays: {:?}",
567 overlay_name, worker_id, worker.overlays
568 ),
569 )
570 .into());
571 }
572
573 let new_overlays: Vec<String> = worker
575 .overlays
576 .into_iter()
577 .filter(|o| o != &overlay_name)
578 .collect();
579
580 let updated_worker = db.update_worker_overlays(&worker_id, new_overlays)?;
582
583 let base = resolve_base_workflow(&updated_worker, config);
585 let mut merged = (*base).clone();
586 for name in &updated_worker.overlays {
587 if let Some(overlay) = config.workflows.named_overlays.get(name) {
588 merged.apply_overlay(overlay);
589 }
590 }
591 merged.active_overlays = updated_worker.overlays.clone();
592
593 let overlay_diff = merged.compute_overlay_diff(&base);
594
595 let mut response = json!({
596 "success": true,
597 "worker_id": updated_worker.id,
598 "overlays": updated_worker.overlays,
599 "overlay_diff": overlay_diff,
600 });
601
602 if let Some(role_name) = merged.match_role(&updated_worker.tags) {
604 response["role"] = json!(role_name);
605 let prompts = merged.get_role_prompts(&role_name);
606 if !prompts.is_empty() {
607 response["role_prompts"] = json!(prompts);
608 }
609 }
610
611 Ok(response)
612}
613
614fn resolve_base_workflow(
616 worker: &crate::types::Worker,
617 config: &AppConfig,
618) -> std::sync::Arc<WorkflowsConfig> {
619 if let Some(ref workflow_name) = worker.workflow {
620 config
621 .workflows
622 .get_named_workflow(workflow_name)
623 .map(std::sync::Arc::clone)
624 } else {
625 None
626 }
627 .or_else(|| {
628 config
629 .workflows
630 .get_default_workflow()
631 .map(std::sync::Arc::clone)
632 })
633 .unwrap_or_else(|| std::sync::Arc::clone(&config.workflows))
634}