1use anyhow::{Context, Result};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use uuid::Uuid;
17
18use super::{Tool, ToolResult};
19use crate::cli::go_ralph::{execute_go_ralph, format_go_ralph_result};
20use crate::okr::{KeyResult, Okr, OkrRepository, OkrRun};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(tag = "phase", rename_all = "snake_case")]
27pub enum GoRunPhase {
28 Starting,
29 Running,
30 Completed {
31 passed: usize,
32 total: usize,
33 all_passed: bool,
34 feature_branch: String,
35 summary: String,
36 },
37 Failed {
38 error: String,
39 },
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ActiveGoRun {
45 pub okr_id: String,
46 pub task: String,
47 pub model: String,
48 pub started_at: String,
49 pub working_dir: String,
50 pub prd_filename: String,
51 pub progress_filename: String,
52 pub phase: GoRunPhase,
53}
54
55pub static ACTIVE_GO_RUNS: std::sync::LazyLock<Mutex<HashMap<String, ActiveGoRun>>> =
57 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
58
59#[derive(Deserialize)]
60struct GoParams {
61 action: String,
62 #[serde(default)]
63 task: Option<String>,
64 #[serde(default)]
65 max_iterations: Option<usize>,
66 #[serde(default)]
67 max_concurrent_stories: Option<usize>,
68 #[serde(default)]
69 model: Option<String>,
70 #[serde(default)]
71 okr_id: Option<String>,
72}
73
74pub struct GoTool {
75 completion_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
78}
79
80impl Default for GoTool {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl GoTool {
87 pub fn new() -> Self {
88 Self {
89 completion_callback: None,
90 }
91 }
92
93 pub fn with_callback(cb: Arc<dyn Fn(String) + Send + Sync + 'static>) -> Self {
95 Self {
96 completion_callback: Some(cb),
97 }
98 }
99}
100
101#[async_trait]
102impl Tool for GoTool {
103 fn id(&self) -> &str {
104 "go"
105 }
106
107 fn name(&self) -> &str {
108 "Go"
109 }
110
111 fn description(&self) -> &str {
112 r#"Autonomous task execution pipeline. Creates an OKR, generates a PRD from the task
113description using an LLM, runs the Ralph autonomous agent loop to implement all user stories,
114and maps results back to OKR outcomes.
115
116This is the programmatic equivalent of the `/go` TUI command. OKR approval is automatic
117(no interactive gate) since this is called by MCP clients.
118
119The pipeline runs in the background. Use action "watch" to monitor progress.
120
121Actions:
122- execute: Launch the autonomous pipeline (OKR → PRD → Ralph → results). Returns immediately.
123- watch: Watch a running pipeline's progress by okr_id. Shows PRD status, progress notes, and phase.
124- status: Check final status of an OKR run by okr_id
125
126Required for execute: task
127Optional for execute: max_iterations (default 10), max_concurrent_stories (default 3), model
128Required for watch/status: okr_id"#
129 }
130
131 fn parameters(&self) -> Value {
132 json!({
133 "type": "object",
134 "properties": {
135 "action": {
136 "type": "string",
137 "enum": ["execute", "watch", "status"],
138 "description": "Action to perform. Use 'execute' to start, 'watch' to monitor progress, 'status' for OKR results."
139 },
140 "task": {
141 "type": "string",
142 "description": "Task description for autonomous execution"
143 },
144 "max_iterations": {
145 "type": "integer",
146 "description": "Maximum Ralph iterations (default: 10)"
147 },
148 "max_concurrent_stories": {
149 "type": "integer",
150 "description": "Maximum concurrent stories in Ralph (default: 3)"
151 },
152 "model": {
153 "type": "string",
154 "description": "Model to use for PRD generation and Ralph execution"
155 },
156 "okr_id": {
157 "type": "string",
158 "description": "OKR ID for watch/status actions"
159 }
160 },
161 "required": ["action"]
162 })
163 }
164
165 async fn execute(&self, params: Value) -> Result<ToolResult> {
166 let p: GoParams = serde_json::from_value(params).context("Invalid params")?;
167
168 match p.action.as_str() {
169 "execute" => self.execute_go(p).await,
170 "watch" => self.watch_go(p).await,
171 "status" => self.check_status(p).await,
172 _ => Ok(ToolResult::structured_error(
173 "INVALID_ACTION",
174 "go",
175 &format!(
176 "Unknown action: '{}'. Valid actions: execute, watch, status",
177 p.action
178 ),
179 None,
180 Some(json!({
181 "action": "execute",
182 "task": "implement feature X with tests"
183 })),
184 )),
185 }
186 }
187}
188
189impl GoTool {
190 async fn execute_go(&self, p: GoParams) -> Result<ToolResult> {
191 let task = match p.task {
192 Some(t) if !t.trim().is_empty() => t,
193 _ => {
194 return Ok(ToolResult::structured_error(
195 "MISSING_FIELD",
196 "go",
197 "task is required for execute action",
198 Some(vec!["task"]),
199 Some(json!({
200 "action": "execute",
201 "task": "implement user authentication with OAuth2"
202 })),
203 ));
204 }
205 };
206
207 let max_iterations = p.max_iterations.unwrap_or(10);
208 let max_concurrent = p.max_concurrent_stories.unwrap_or(3);
209
210 let registry = Arc::new(
212 crate::provider::ProviderRegistry::from_vault()
213 .await
214 .context("Failed to load providers from Vault")?,
215 );
216
217 let model = p.model.unwrap_or_else(|| "zai/glm-5".to_string());
219 let (provider, resolved_model) = resolve_provider(®istry, &model)?;
220
221 let okr_id = Uuid::new_v4();
223 let okr_id_str = okr_id.to_string();
224 let mut okr = create_default_okr(okr_id, &task);
225 let mut run = OkrRun::new(
226 okr_id,
227 format!("Go {}", chrono::Local::now().format("%Y-%m-%d %H:%M")),
228 );
229 let _ = run.submit_for_approval();
230 run.record_decision(crate::okr::ApprovalDecision::approve(
231 run.id,
232 "Auto-approved via MCP go tool",
233 ));
234
235 let run_id = run.id;
236 let prd_filename = format!("prd_{}.json", run_id.to_string().replace('-', "_"));
237 let progress_filename = format!("progress_{}.txt", run_id.to_string().replace('-', "_"));
238
239 if let Ok(repo) = OkrRepository::from_config().await {
241 let _ = repo.create_okr(okr.clone()).await;
242 let _ = repo.create_run(run.clone()).await;
243 }
244
245 let working_dir = std::env::current_dir()
246 .map(|p| p.display().to_string())
247 .unwrap_or_else(|_| ".".into());
248
249 let active_run = ActiveGoRun {
251 okr_id: okr_id_str.clone(),
252 task: task.clone(),
253 model: resolved_model.clone(),
254 started_at: chrono::Utc::now().to_rfc3339(),
255 working_dir: working_dir.clone(),
256 prd_filename: prd_filename.clone(),
257 progress_filename: progress_filename.clone(),
258 phase: GoRunPhase::Starting,
259 };
260 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
261 runs.insert(okr_id_str.clone(), active_run);
262 }
263
264 tracing::info!(
265 task = %task,
266 okr_id = %okr_id,
267 model = %resolved_model,
268 max_iterations,
269 "Starting /go autonomous pipeline via MCP (background)"
270 );
271
272 let bg_okr_id = okr_id_str.clone();
274 let bg_task = task.clone();
275 let bg_model = resolved_model.clone();
276 let bg_callback = self.completion_callback.clone();
277 tokio::spawn(async move {
278 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
280 && let Some(r) = runs.get_mut(&bg_okr_id)
281 {
282 r.phase = GoRunPhase::Running;
283 }
284
285 let bus = crate::bus::AgentBus::new().into_arc();
287 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
288
289 match execute_go_ralph(
290 &bg_task,
291 &mut okr,
292 &mut run,
293 provider,
294 &bg_model,
295 max_iterations,
296 Some(bus),
297 max_concurrent,
298 Some(registry),
299 )
300 .await
301 {
302 Ok(result) => {
303 if let Ok(repo) = OkrRepository::from_config().await {
305 let _ = repo.update_run(run).await;
306 }
307
308 let summary = format_go_ralph_result(&result, &bg_task);
309
310 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
311 && let Some(r) = runs.get_mut(&bg_okr_id)
312 {
313 r.phase = GoRunPhase::Completed {
314 passed: result.passed,
315 total: result.total,
316 all_passed: result.all_passed,
317 feature_branch: result.feature_branch,
318 summary,
319 };
320 }
321
322 tracing::info!(
323 okr_id = %bg_okr_id,
324 passed = result.passed,
325 total = result.total,
326 "Go pipeline completed"
327 );
328
329 if let Some(ref cb) = bg_callback {
331 let phase_str = {
332 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
333 if let Some(r) = runs.get(&bg_okr_id) {
334 if let GoRunPhase::Completed {
335 passed,
336 total,
337 all_passed,
338 ref feature_branch,
339 ref summary,
340 } = r.phase
341 {
342 format!(
343 "# Go Pipeline Completed {}\n\n\
344 **OKR ID:** `{}`\n\
345 **Result:** {}/{} stories passed\n\
346 **Branch:** `{}`\n\n{}",
347 if all_passed { "✅" } else { "⚠️" },
348 bg_okr_id,
349 passed,
350 total,
351 feature_branch,
352 summary
353 )
354 } else {
355 format!("Go pipeline completed for OKR `{}`", bg_okr_id)
356 }
357 } else {
358 format!("Go pipeline completed for OKR `{}`", bg_okr_id)
359 }
360 };
361 cb(phase_str);
362 }
363 }
364 Err(err) => {
365 run.status = crate::okr::OkrRunStatus::Failed;
367 if let Ok(repo) = OkrRepository::from_config().await {
368 let _ = repo.update_run(run).await;
369 }
370
371 let error_msg = err.to_string();
372 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
373 && let Some(r) = runs.get_mut(&bg_okr_id)
374 {
375 r.phase = GoRunPhase::Failed {
376 error: error_msg.clone(),
377 };
378 }
379
380 tracing::error!(
381 okr_id = %bg_okr_id,
382 error = %error_msg,
383 "Go pipeline failed"
384 );
385
386 if let Some(ref cb) = bg_callback {
388 cb(format!(
389 "# Go Pipeline Failed ❌\n\n**OKR ID:** `{}`\n**Error:** {}",
390 bg_okr_id, error_msg
391 ));
392 }
393 }
394 }
395 });
396
397 let output = format!(
399 "# Go Pipeline Launched\n\n\
400 **OKR ID:** `{okr_id_str}`\n\
401 **Task:** {task}\n\
402 **Model:** {resolved_model}\n\
403 **Max Iterations:** {max_iterations}\n\
404 **Working Directory:** {working_dir}\n\n\
405 The autonomous pipeline is now running in the background.\n\n\
406 ## Monitor Progress\n\n\
407 Use the `go` tool with action `watch` to monitor this pipeline:\n\n\
408 ```json\n\
409 {{\"action\": \"watch\", \"okr_id\": \"{okr_id_str}\"}}\n\
410 ```\n\n\
411 The pipeline will:\n\
412 1. Generate a PRD from the task description\n\
413 2. Run the Ralph loop to implement all user stories\n\
414 3. Run quality checks (typecheck, lint, test, build)\n\
415 4. Map results back to OKR outcomes\n\n\
416 PRD file: `{prd_filename}`\n\
417 Progress file: `{progress_filename}`"
418 );
419
420 Ok(ToolResult::success(output)
421 .with_metadata("okr_id", json!(okr_id_str))
422 .with_metadata("phase", json!("starting"))
423 .with_metadata("prd_filename", json!(prd_filename))
424 .with_metadata("progress_filename", json!(progress_filename))
425 .with_metadata(
426 "watch_hint",
427 json!({
428 "tool": "go",
429 "args": {"action": "watch", "okr_id": okr_id_str}
430 }),
431 ))
432 }
433
434 async fn watch_go(&self, p: GoParams) -> Result<ToolResult> {
435 let okr_id_str = match p.okr_id {
436 Some(id) if !id.trim().is_empty() => id,
437 _ => {
438 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
440 if runs.is_empty() {
441 return Ok(ToolResult::success(
442 "No active go pipelines. Use `go(action: \"execute\", task: \"...\")` to start one.",
443 ));
444 }
445
446 let mut output = String::from("# Active Go Pipelines\n\n");
447 for (id, run) in runs.iter() {
448 let phase_str = match &run.phase {
449 GoRunPhase::Starting => "Starting".to_string(),
450 GoRunPhase::Running => "Running".to_string(),
451 GoRunPhase::Completed { passed, total, .. } => {
452 format!("Completed ({passed}/{total} passed)")
453 }
454 GoRunPhase::Failed { error } => {
455 format!("Failed: {}", crate::util::truncate_bytes_safe(&error, 80))
456 }
457 };
458 output.push_str(&format!(
459 "- **{id}**: {phase_str}\n Task: {}\n Started: {}\n\n",
460 run.task, run.started_at
461 ));
462 }
463 output.push_str("Use `go(action: \"watch\", okr_id: \"<id>\")` for details.");
464 return Ok(ToolResult::success(output));
465 }
466 };
467
468 let active_run = {
470 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
471 runs.get(&okr_id_str).cloned()
472 };
473
474 let Some(run) = active_run else {
475 return Ok(ToolResult::error(format!(
476 "No active go pipeline found for OKR `{okr_id_str}`.\n\n\
477 Use `go(action: \"watch\")` with no okr_id to list active pipelines,\n\
478 or `go(action: \"status\", okr_id: \"...\")` to check completed runs."
479 )));
480 };
481
482 let mut output = format!(
483 "# Go Pipeline Status\n\n\
484 **OKR ID:** `{}`\n\
485 **Task:** {}\n\
486 **Model:** {}\n\
487 **Started:** {}\n\
488 **Working Directory:** {}\n",
489 run.okr_id, run.task, run.model, run.started_at, run.working_dir
490 );
491
492 match &run.phase {
494 GoRunPhase::Starting => {
495 output.push_str("\n**Phase:** Starting (generating PRD...)\n");
496 }
497 GoRunPhase::Running => {
498 output.push_str("\n**Phase:** Running Ralph loop\n");
499 }
500 GoRunPhase::Completed {
501 passed,
502 total,
503 all_passed,
504 feature_branch,
505 summary,
506 } => {
507 output.push_str(&format!(
508 "\n**Phase:** Completed {}\n\
509 **Result:** {passed}/{total} stories passed\n\
510 **Feature Branch:** `{feature_branch}`\n\n\
511 ## Summary\n\n{summary}\n",
512 if *all_passed { "✅" } else { "⚠️" }
513 ));
514 }
515 GoRunPhase::Failed { error } => {
516 output.push_str(&format!("\n**Phase:** Failed ❌\n**Error:** {error}\n"));
517 }
518 }
519
520 if let Ok(prd_content) = std::fs::read_to_string(&run.prd_filename)
522 && let Ok(prd) = serde_json::from_str::<Value>(&prd_content)
523 && let Some(stories) = prd.get("user_stories").and_then(|s| s.as_array())
524 {
525 output.push_str("\n## Stories\n\n");
526 let mut passed_count = 0;
527 for story in stories {
528 let id = story.get("id").and_then(|v| v.as_str()).unwrap_or("?");
529 let title = story.get("title").and_then(|v| v.as_str()).unwrap_or("?");
530 let passes = story
531 .get("passes")
532 .and_then(|v| v.as_bool())
533 .unwrap_or(false);
534 let icon = if passes {
535 passed_count += 1;
536 "✅"
537 } else {
538 "⏳"
539 };
540 output.push_str(&format!("- {icon} **{id}**: {title}\n"));
541 }
542 output.push_str(&format!(
543 "\n**Progress:** {passed_count}/{} stories passed\n",
544 stories.len()
545 ));
546 }
547
548 if let Ok(progress) = std::fs::read_to_string(&run.progress_filename)
550 && !progress.trim().is_empty()
551 {
552 let lines: Vec<&str> = progress.lines().collect();
554 let start = lines.len().saturating_sub(30);
555 let tail: String = lines[start..].join("\n");
556 output.push_str(&format!(
557 "\n## Progress Notes (last {} lines)\n\n```\n{tail}\n```\n",
558 lines.len().min(30)
559 ));
560 }
561
562 if matches!(run.phase, GoRunPhase::Starting | GoRunPhase::Running) {
564 output.push_str(&format!(
565 "\n---\n*Pipeline still running. Call `go(action: \"watch\", okr_id: \"{}\")` again to check progress.*\n",
566 run.okr_id
567 ));
568 }
569
570 Ok(ToolResult::success(output)
571 .with_metadata("okr_id", json!(run.okr_id))
572 .with_metadata(
573 "phase",
574 json!(serde_json::to_value(&run.phase).unwrap_or(json!("unknown"))),
575 ))
576 }
577
578 async fn check_status(&self, p: GoParams) -> Result<ToolResult> {
579 let okr_id_str = match p.okr_id {
580 Some(id) if !id.trim().is_empty() => id,
581 _ => {
582 return Ok(ToolResult::structured_error(
583 "MISSING_FIELD",
584 "go",
585 "okr_id is required for status action",
586 Some(vec!["okr_id"]),
587 Some(json!({
588 "action": "status",
589 "okr_id": "uuid-of-okr"
590 })),
591 ));
592 }
593 };
594
595 let okr_id: Uuid = okr_id_str
596 .parse()
597 .context("Invalid UUID format for okr_id")?;
598
599 let repo = OkrRepository::from_config()
600 .await
601 .context("Failed to load OKR repository")?;
602
603 let okr = match repo.get_okr(okr_id).await? {
604 Some(okr) => okr,
605 None => {
606 return Ok(ToolResult::error(format!("OKR not found: {okr_id}")));
607 }
608 };
609
610 let runs = repo.list_runs().await.unwrap_or_default();
611 let runs: Vec<_> = runs.into_iter().filter(|r| r.okr_id == okr_id).collect();
612 let latest_run = runs.last();
613
614 let kr_status: Vec<String> = okr
615 .key_results
616 .iter()
617 .map(|kr| {
618 format!(
619 " - {}: {:.0}/{:.0} {} ({:.0}%)",
620 kr.title,
621 kr.current_value,
622 kr.target_value,
623 kr.unit,
624 kr.progress() * 100.0
625 )
626 })
627 .collect();
628
629 let run_info = if let Some(run) = latest_run {
630 format!(
631 "\nLatest Run: {} ({:?})\n Iterations: {}\n Outcomes: {}",
632 run.name,
633 run.status,
634 run.iterations,
635 run.outcomes.len()
636 )
637 } else {
638 "\nNo runs found.".to_string()
639 };
640
641 let output = format!(
642 "# Go Status\n\n**OKR:** {}\n**Status:** {:?}\n**Progress:** {:.0}%\n\n## Key Results\n{}\n{}",
643 okr.title,
644 okr.status,
645 okr.progress() * 100.0,
646 kr_status.join("\n"),
647 run_info
648 );
649
650 Ok(ToolResult::success(output).with_metadata("okr_id", json!(okr_id.to_string())))
651 }
652}
653
654fn create_default_okr(okr_id: Uuid, task: &str) -> Okr {
656 let title = if task.len() > 60 {
657 format!("Go: {}…", &task[..57])
658 } else {
659 format!("Go: {task}")
660 };
661
662 let mut okr = Okr::new(title, format!("Autonomous execution: {task}"));
663 okr.id = okr_id;
664
665 okr.add_key_result(KeyResult::new(okr_id, "All stories complete", 100.0, "%"));
666 okr.add_key_result(KeyResult::new(okr_id, "Quality gates pass", 1.0, "count"));
667 okr.add_key_result(KeyResult::new(okr_id, "No critical errors", 0.0, "count"));
668
669 okr
670}
671
672fn resolve_provider(
675 registry: &crate::provider::ProviderRegistry,
676 model: &str,
677) -> Result<(Arc<dyn crate::provider::Provider>, String)> {
678 let (provider_name, model_name) = crate::provider::parse_model_string(model);
679
680 if let Some(provider_name) = provider_name {
682 let normalized_provider = if provider_name == "zhipuai" {
683 "zai"
684 } else {
685 provider_name
686 };
687 if let Some(provider) = registry.get(normalized_provider) {
688 return Ok((provider, model_name.to_string()));
689 }
690 let available = registry.list().join(", ");
691 anyhow::bail!(
692 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
693 normalized_provider,
694 available
695 );
696 }
697
698 let fallbacks = [
700 "zai",
701 "openai",
702 "github-copilot",
703 "anthropic",
704 "openrouter",
705 "novita",
706 "moonshotai",
707 "google",
708 ];
709
710 for name in fallbacks {
711 if let Some(provider) = registry.get(name) {
712 return Ok((provider, model.to_string()));
713 }
714 }
715
716 if let Some(name) = registry.list().into_iter().next()
718 && let Some(provider) = registry.get(name)
719 {
720 return Ok((provider, model.to_string()));
721 }
722
723 anyhow::bail!("No provider available for model '{model}' and no fallback providers found")
724}