1use anyhow::{Context, Result};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use uuid::Uuid;
17
18use super::{Tool, ToolResult};
19use crate::cli::go_ralph::{execute_go_ralph, format_go_ralph_result};
20use crate::okr::{KeyResult, Okr, OkrRepository, OkrRun};
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(tag = "phase", rename_all = "snake_case")]
27pub enum GoRunPhase {
28 Starting,
29 Running,
30 Completed {
31 passed: usize,
32 total: usize,
33 all_passed: bool,
34 feature_branch: String,
35 summary: String,
36 },
37 Failed {
38 error: String,
39 },
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ActiveGoRun {
45 pub okr_id: String,
46 pub task: String,
47 pub model: String,
48 pub started_at: String,
49 pub working_dir: String,
50 pub prd_filename: String,
51 pub progress_filename: String,
52 pub phase: GoRunPhase,
53}
54
55static ACTIVE_GO_RUNS: std::sync::LazyLock<Mutex<HashMap<String, ActiveGoRun>>> =
57 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
58
59#[derive(Deserialize)]
60struct GoParams {
61 action: String,
62 #[serde(default)]
63 task: Option<String>,
64 #[serde(default)]
65 max_iterations: Option<usize>,
66 #[serde(default)]
67 max_concurrent_stories: Option<usize>,
68 #[serde(default)]
69 model: Option<String>,
70 #[serde(default)]
71 okr_id: Option<String>,
72}
73
74pub struct GoTool {
75 completion_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
78}
79
80impl GoTool {
81 pub fn new() -> Self {
82 Self {
83 completion_callback: None,
84 }
85 }
86
87 pub fn with_callback(cb: Arc<dyn Fn(String) + Send + Sync + 'static>) -> Self {
89 Self {
90 completion_callback: Some(cb),
91 }
92 }
93}
94
95#[async_trait]
96impl Tool for GoTool {
97 fn id(&self) -> &str {
98 "go"
99 }
100
101 fn name(&self) -> &str {
102 "Go"
103 }
104
105 fn description(&self) -> &str {
106 r#"Autonomous task execution pipeline. Creates an OKR, generates a PRD from the task
107description using an LLM, runs the Ralph autonomous agent loop to implement all user stories,
108and maps results back to OKR outcomes.
109
110This is the programmatic equivalent of the `/go` TUI command. OKR approval is automatic
111(no interactive gate) since this is called by MCP clients.
112
113The pipeline runs in the background. Use action "watch" to monitor progress.
114
115Actions:
116- execute: Launch the autonomous pipeline (OKR → PRD → Ralph → results). Returns immediately.
117- watch: Watch a running pipeline's progress by okr_id. Shows PRD status, progress notes, and phase.
118- status: Check final status of an OKR run by okr_id
119
120Required for execute: task
121Optional for execute: max_iterations (default 10), max_concurrent_stories (default 3), model
122Required for watch/status: okr_id"#
123 }
124
125 fn parameters(&self) -> Value {
126 json!({
127 "type": "object",
128 "properties": {
129 "action": {
130 "type": "string",
131 "enum": ["execute", "watch", "status"],
132 "description": "Action to perform. Use 'execute' to start, 'watch' to monitor progress, 'status' for OKR results."
133 },
134 "task": {
135 "type": "string",
136 "description": "Task description for autonomous execution"
137 },
138 "max_iterations": {
139 "type": "integer",
140 "description": "Maximum Ralph iterations (default: 10)"
141 },
142 "max_concurrent_stories": {
143 "type": "integer",
144 "description": "Maximum concurrent stories in Ralph (default: 3)"
145 },
146 "model": {
147 "type": "string",
148 "description": "Model to use for PRD generation and Ralph execution"
149 },
150 "okr_id": {
151 "type": "string",
152 "description": "OKR ID for watch/status actions"
153 }
154 },
155 "required": ["action"]
156 })
157 }
158
159 async fn execute(&self, params: Value) -> Result<ToolResult> {
160 let p: GoParams = serde_json::from_value(params).context("Invalid params")?;
161
162 match p.action.as_str() {
163 "execute" => self.execute_go(p).await,
164 "watch" => self.watch_go(p).await,
165 "status" => self.check_status(p).await,
166 _ => Ok(ToolResult::structured_error(
167 "INVALID_ACTION",
168 "go",
169 &format!(
170 "Unknown action: '{}'. Valid actions: execute, watch, status",
171 p.action
172 ),
173 None,
174 Some(json!({
175 "action": "execute",
176 "task": "implement feature X with tests"
177 })),
178 )),
179 }
180 }
181}
182
183impl GoTool {
184 async fn execute_go(&self, p: GoParams) -> Result<ToolResult> {
185 let task = match p.task {
186 Some(t) if !t.trim().is_empty() => t,
187 _ => {
188 return Ok(ToolResult::structured_error(
189 "MISSING_FIELD",
190 "go",
191 "task is required for execute action",
192 Some(vec!["task"]),
193 Some(json!({
194 "action": "execute",
195 "task": "implement user authentication with OAuth2"
196 })),
197 ));
198 }
199 };
200
201 let max_iterations = p.max_iterations.unwrap_or(10);
202 let max_concurrent = p.max_concurrent_stories.unwrap_or(3);
203
204 let registry = Arc::new(
206 crate::provider::ProviderRegistry::from_vault()
207 .await
208 .context("Failed to load providers from Vault")?,
209 );
210
211 let model = p.model.unwrap_or_else(|| "zai/glm-5".to_string());
213 let (provider, resolved_model) = resolve_provider(®istry, &model)?;
214
215 let okr_id = Uuid::new_v4();
217 let okr_id_str = okr_id.to_string();
218 let mut okr = create_default_okr(okr_id, &task);
219 let mut run = OkrRun::new(
220 okr_id,
221 format!("Go {}", chrono::Local::now().format("%Y-%m-%d %H:%M")),
222 );
223 let _ = run.submit_for_approval();
224 run.record_decision(crate::okr::ApprovalDecision::approve(
225 run.id,
226 "Auto-approved via MCP go tool",
227 ));
228
229 let run_id = run.id;
230 let prd_filename = format!("prd_{}.json", run_id.to_string().replace('-', "_"));
231 let progress_filename = format!("progress_{}.txt", run_id.to_string().replace('-', "_"));
232
233 if let Ok(repo) = OkrRepository::from_config().await {
235 let _ = repo.create_okr(okr.clone()).await;
236 let _ = repo.create_run(run.clone()).await;
237 }
238
239 let working_dir = std::env::current_dir()
240 .map(|p| p.display().to_string())
241 .unwrap_or_else(|_| ".".into());
242
243 let active_run = ActiveGoRun {
245 okr_id: okr_id_str.clone(),
246 task: task.clone(),
247 model: resolved_model.clone(),
248 started_at: chrono::Utc::now().to_rfc3339(),
249 working_dir: working_dir.clone(),
250 prd_filename: prd_filename.clone(),
251 progress_filename: progress_filename.clone(),
252 phase: GoRunPhase::Starting,
253 };
254 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
255 runs.insert(okr_id_str.clone(), active_run);
256 }
257
258 tracing::info!(
259 task = %task,
260 okr_id = %okr_id,
261 model = %resolved_model,
262 max_iterations,
263 "Starting /go autonomous pipeline via MCP (background)"
264 );
265
266 let bg_okr_id = okr_id_str.clone();
268 let bg_task = task.clone();
269 let bg_model = resolved_model.clone();
270 let bg_callback = self.completion_callback.clone();
271 tokio::spawn(async move {
272 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
274 if let Some(r) = runs.get_mut(&bg_okr_id) {
275 r.phase = GoRunPhase::Running;
276 }
277 }
278
279 let bus = crate::bus::AgentBus::new().into_arc();
281 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
282
283 match execute_go_ralph(
284 &bg_task,
285 &mut okr,
286 &mut run,
287 provider,
288 &bg_model,
289 max_iterations,
290 Some(bus),
291 max_concurrent,
292 Some(registry),
293 )
294 .await
295 {
296 Ok(result) => {
297 if let Ok(repo) = OkrRepository::from_config().await {
299 let _ = repo.update_run(run).await;
300 }
301
302 let summary = format_go_ralph_result(&result, &bg_task);
303
304 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
305 if let Some(r) = runs.get_mut(&bg_okr_id) {
306 r.phase = GoRunPhase::Completed {
307 passed: result.passed,
308 total: result.total,
309 all_passed: result.all_passed,
310 feature_branch: result.feature_branch,
311 summary,
312 };
313 }
314 }
315
316 tracing::info!(
317 okr_id = %bg_okr_id,
318 passed = result.passed,
319 total = result.total,
320 "Go pipeline completed"
321 );
322
323 if let Some(ref cb) = bg_callback {
325 let phase_str = {
326 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
327 if let Some(r) = runs.get(&bg_okr_id) {
328 if let GoRunPhase::Completed {
329 passed,
330 total,
331 all_passed,
332 ref feature_branch,
333 ref summary,
334 } = r.phase
335 {
336 format!(
337 "# Go Pipeline Completed {}\n\n\
338 **OKR ID:** `{}`\n\
339 **Result:** {}/{} stories passed\n\
340 **Branch:** `{}`\n\n{}",
341 if all_passed { "✅" } else { "⚠️" },
342 bg_okr_id,
343 passed,
344 total,
345 feature_branch,
346 summary
347 )
348 } else {
349 format!("Go pipeline completed for OKR `{}`", bg_okr_id)
350 }
351 } else {
352 format!("Go pipeline completed for OKR `{}`", bg_okr_id)
353 }
354 };
355 cb(phase_str);
356 }
357 }
358 Err(err) => {
359 run.status = crate::okr::OkrRunStatus::Failed;
361 if let Ok(repo) = OkrRepository::from_config().await {
362 let _ = repo.update_run(run).await;
363 }
364
365 let error_msg = err.to_string();
366 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
367 if let Some(r) = runs.get_mut(&bg_okr_id) {
368 r.phase = GoRunPhase::Failed {
369 error: error_msg.clone(),
370 };
371 }
372 }
373
374 tracing::error!(
375 okr_id = %bg_okr_id,
376 error = %error_msg,
377 "Go pipeline failed"
378 );
379
380 if let Some(ref cb) = bg_callback {
382 cb(format!(
383 "# Go Pipeline Failed ❌\n\n**OKR ID:** `{}`\n**Error:** {}",
384 bg_okr_id, error_msg
385 ));
386 }
387 }
388 }
389 });
390
391 let output = format!(
393 "# Go Pipeline Launched\n\n\
394 **OKR ID:** `{okr_id_str}`\n\
395 **Task:** {task}\n\
396 **Model:** {resolved_model}\n\
397 **Max Iterations:** {max_iterations}\n\
398 **Working Directory:** {working_dir}\n\n\
399 The autonomous pipeline is now running in the background.\n\n\
400 ## Monitor Progress\n\n\
401 Use the `go` tool with action `watch` to monitor this pipeline:\n\n\
402 ```json\n\
403 {{\"action\": \"watch\", \"okr_id\": \"{okr_id_str}\"}}\n\
404 ```\n\n\
405 The pipeline will:\n\
406 1. Generate a PRD from the task description\n\
407 2. Run the Ralph loop to implement all user stories\n\
408 3. Run quality checks (typecheck, lint, test, build)\n\
409 4. Map results back to OKR outcomes\n\n\
410 PRD file: `{prd_filename}`\n\
411 Progress file: `{progress_filename}`"
412 );
413
414 Ok(ToolResult::success(output)
415 .with_metadata("okr_id", json!(okr_id_str))
416 .with_metadata("phase", json!("starting"))
417 .with_metadata("prd_filename", json!(prd_filename))
418 .with_metadata("progress_filename", json!(progress_filename))
419 .with_metadata(
420 "watch_hint",
421 json!({
422 "tool": "go",
423 "args": {"action": "watch", "okr_id": okr_id_str}
424 }),
425 ))
426 }
427
428 async fn watch_go(&self, p: GoParams) -> Result<ToolResult> {
429 let okr_id_str = match p.okr_id {
430 Some(id) if !id.trim().is_empty() => id,
431 _ => {
432 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
434 if runs.is_empty() {
435 return Ok(ToolResult::success(
436 "No active go pipelines. Use `go(action: \"execute\", task: \"...\")` to start one.",
437 ));
438 }
439
440 let mut output = String::from("# Active Go Pipelines\n\n");
441 for (id, run) in runs.iter() {
442 let phase_str = match &run.phase {
443 GoRunPhase::Starting => "Starting".to_string(),
444 GoRunPhase::Running => "Running".to_string(),
445 GoRunPhase::Completed { passed, total, .. } => {
446 format!("Completed ({passed}/{total} passed)")
447 }
448 GoRunPhase::Failed { error } => {
449 format!("Failed: {}", &error[..error.len().min(80)])
450 }
451 };
452 output.push_str(&format!(
453 "- **{id}**: {phase_str}\n Task: {}\n Started: {}\n\n",
454 run.task, run.started_at
455 ));
456 }
457 output.push_str("Use `go(action: \"watch\", okr_id: \"<id>\")` for details.");
458 return Ok(ToolResult::success(output));
459 }
460 };
461
462 let active_run = {
464 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
465 runs.get(&okr_id_str).cloned()
466 };
467
468 let Some(run) = active_run else {
469 return Ok(ToolResult::error(format!(
470 "No active go pipeline found for OKR `{okr_id_str}`.\n\n\
471 Use `go(action: \"watch\")` with no okr_id to list active pipelines,\n\
472 or `go(action: \"status\", okr_id: \"...\")` to check completed runs."
473 )));
474 };
475
476 let mut output = format!(
477 "# Go Pipeline Status\n\n\
478 **OKR ID:** `{}`\n\
479 **Task:** {}\n\
480 **Model:** {}\n\
481 **Started:** {}\n\
482 **Working Directory:** {}\n",
483 run.okr_id, run.task, run.model, run.started_at, run.working_dir
484 );
485
486 match &run.phase {
488 GoRunPhase::Starting => {
489 output.push_str("\n**Phase:** Starting (generating PRD...)\n");
490 }
491 GoRunPhase::Running => {
492 output.push_str("\n**Phase:** Running Ralph loop\n");
493 }
494 GoRunPhase::Completed {
495 passed,
496 total,
497 all_passed,
498 feature_branch,
499 summary,
500 } => {
501 output.push_str(&format!(
502 "\n**Phase:** Completed {}\n\
503 **Result:** {passed}/{total} stories passed\n\
504 **Feature Branch:** `{feature_branch}`\n\n\
505 ## Summary\n\n{summary}\n",
506 if *all_passed { "✅" } else { "⚠️" }
507 ));
508 }
509 GoRunPhase::Failed { error } => {
510 output.push_str(&format!("\n**Phase:** Failed ❌\n**Error:** {error}\n"));
511 }
512 }
513
514 if let Ok(prd_content) = std::fs::read_to_string(&run.prd_filename) {
516 if let Ok(prd) = serde_json::from_str::<Value>(&prd_content) {
517 if let Some(stories) = prd.get("user_stories").and_then(|s| s.as_array()) {
518 output.push_str("\n## Stories\n\n");
519 let mut passed_count = 0;
520 for story in stories {
521 let id = story.get("id").and_then(|v| v.as_str()).unwrap_or("?");
522 let title = story.get("title").and_then(|v| v.as_str()).unwrap_or("?");
523 let passes = story
524 .get("passes")
525 .and_then(|v| v.as_bool())
526 .unwrap_or(false);
527 let icon = if passes {
528 passed_count += 1;
529 "✅"
530 } else {
531 "⏳"
532 };
533 output.push_str(&format!("- {icon} **{id}**: {title}\n"));
534 }
535 output.push_str(&format!(
536 "\n**Progress:** {passed_count}/{} stories passed\n",
537 stories.len()
538 ));
539 }
540 }
541 }
542
543 if let Ok(progress) = std::fs::read_to_string(&run.progress_filename) {
545 if !progress.trim().is_empty() {
546 let lines: Vec<&str> = progress.lines().collect();
548 let start = lines.len().saturating_sub(30);
549 let tail: String = lines[start..].join("\n");
550 output.push_str(&format!(
551 "\n## Progress Notes (last {} lines)\n\n```\n{tail}\n```\n",
552 lines.len().min(30)
553 ));
554 }
555 }
556
557 if matches!(run.phase, GoRunPhase::Starting | GoRunPhase::Running) {
559 output.push_str(&format!(
560 "\n---\n*Pipeline still running. Call `go(action: \"watch\", okr_id: \"{}\")` again to check progress.*\n",
561 run.okr_id
562 ));
563 }
564
565 Ok(ToolResult::success(output)
566 .with_metadata("okr_id", json!(run.okr_id))
567 .with_metadata(
568 "phase",
569 json!(serde_json::to_value(&run.phase).unwrap_or(json!("unknown"))),
570 ))
571 }
572
573 async fn check_status(&self, p: GoParams) -> Result<ToolResult> {
574 let okr_id_str = match p.okr_id {
575 Some(id) if !id.trim().is_empty() => id,
576 _ => {
577 return Ok(ToolResult::structured_error(
578 "MISSING_FIELD",
579 "go",
580 "okr_id is required for status action",
581 Some(vec!["okr_id"]),
582 Some(json!({
583 "action": "status",
584 "okr_id": "uuid-of-okr"
585 })),
586 ));
587 }
588 };
589
590 let okr_id: Uuid = okr_id_str
591 .parse()
592 .context("Invalid UUID format for okr_id")?;
593
594 let repo = OkrRepository::from_config()
595 .await
596 .context("Failed to load OKR repository")?;
597
598 let okr = match repo.get_okr(okr_id).await? {
599 Some(okr) => okr,
600 None => {
601 return Ok(ToolResult::error(format!("OKR not found: {okr_id}")));
602 }
603 };
604
605 let runs = repo.list_runs().await.unwrap_or_default();
606 let runs: Vec<_> = runs.into_iter().filter(|r| r.okr_id == okr_id).collect();
607 let latest_run = runs.last();
608
609 let kr_status: Vec<String> = okr
610 .key_results
611 .iter()
612 .map(|kr| {
613 format!(
614 " - {}: {:.0}/{:.0} {} ({:.0}%)",
615 kr.title,
616 kr.current_value,
617 kr.target_value,
618 kr.unit,
619 kr.progress() * 100.0
620 )
621 })
622 .collect();
623
624 let run_info = if let Some(run) = latest_run {
625 format!(
626 "\nLatest Run: {} ({:?})\n Iterations: {}\n Outcomes: {}",
627 run.name,
628 run.status,
629 run.iterations,
630 run.outcomes.len()
631 )
632 } else {
633 "\nNo runs found.".to_string()
634 };
635
636 let output = format!(
637 "# Go Status\n\n**OKR:** {}\n**Status:** {:?}\n**Progress:** {:.0}%\n\n## Key Results\n{}\n{}",
638 okr.title,
639 okr.status,
640 okr.progress() * 100.0,
641 kr_status.join("\n"),
642 run_info
643 );
644
645 Ok(ToolResult::success(output).with_metadata("okr_id", json!(okr_id.to_string())))
646 }
647}
648
649fn create_default_okr(okr_id: Uuid, task: &str) -> Okr {
651 let title = if task.len() > 60 {
652 format!("Go: {}…", &task[..57])
653 } else {
654 format!("Go: {task}")
655 };
656
657 let mut okr = Okr::new(title, format!("Autonomous execution: {task}"));
658 okr.id = okr_id;
659
660 okr.add_key_result(KeyResult::new(okr_id, "All stories complete", 100.0, "%"));
661 okr.add_key_result(KeyResult::new(okr_id, "Quality gates pass", 1.0, "count"));
662 okr.add_key_result(KeyResult::new(okr_id, "No critical errors", 0.0, "count"));
663
664 okr
665}
666
667fn resolve_provider(
670 registry: &crate::provider::ProviderRegistry,
671 model: &str,
672) -> Result<(Arc<dyn crate::provider::Provider>, String)> {
673 let (provider_name, model_name) = crate::provider::parse_model_string(model);
674
675 if let Some(provider_name) = provider_name {
677 if let Some(provider) = registry.get(provider_name) {
678 return Ok((provider, model_name.to_string()));
679 }
680 }
681
682 let fallbacks = [
684 "zai",
685 "openai",
686 "github-copilot",
687 "anthropic",
688 "openrouter",
689 "novita",
690 "moonshotai",
691 "google",
692 ];
693
694 for name in fallbacks {
695 if let Some(provider) = registry.get(name) {
696 return Ok((provider, model.to_string()));
697 }
698 }
699
700 if let Some(name) = registry.list().into_iter().next() {
702 if let Some(provider) = registry.get(name) {
703 return Ok((provider, model.to_string()));
704 }
705 }
706
707 anyhow::bail!("No provider available for model '{model}' and no fallback providers found")
708}