1use anyhow::{Context, Result};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use uuid::Uuid;
17
18use super::{Tool, ToolResult};
19use crate::cli::go_ralph::{execute_go_ralph, format_go_ralph_result};
20use crate::okr::{KeyResult, Okr, OkrRepository, OkrRun};
21use crate::util;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "phase", rename_all = "snake_case")]
28pub enum GoRunPhase {
29 Starting,
30 Running,
31 Completed {
32 passed: usize,
33 total: usize,
34 all_passed: bool,
35 feature_branch: String,
36 summary: String,
37 },
38 Failed {
39 error: String,
40 },
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ActiveGoRun {
46 pub okr_id: String,
47 pub task: String,
48 pub model: String,
49 pub started_at: String,
50 pub working_dir: String,
51 pub prd_filename: String,
52 pub progress_filename: String,
53 pub phase: GoRunPhase,
54}
55
56pub static ACTIVE_GO_RUNS: std::sync::LazyLock<Mutex<HashMap<String, ActiveGoRun>>> =
58 std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
59
60#[derive(Deserialize)]
61struct GoParams {
62 action: String,
63 #[serde(default)]
64 task: Option<String>,
65 #[serde(default)]
66 max_iterations: Option<usize>,
67 #[serde(default)]
68 max_concurrent_stories: Option<usize>,
69 #[serde(default)]
70 model: Option<String>,
71 #[serde(default)]
72 okr_id: Option<String>,
73}
74
75pub struct GoTool {
76 completion_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
79}
80
81impl Default for GoTool {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl GoTool {
88 pub fn new() -> Self {
89 Self {
90 completion_callback: None,
91 }
92 }
93
94 pub fn with_callback(cb: Arc<dyn Fn(String) + Send + Sync + 'static>) -> Self {
96 Self {
97 completion_callback: Some(cb),
98 }
99 }
100}
101
102#[async_trait]
103impl Tool for GoTool {
104 fn id(&self) -> &str {
105 "go"
106 }
107
108 fn name(&self) -> &str {
109 "Go"
110 }
111
112 fn description(&self) -> &str {
113 r#"Autonomous task execution pipeline. Creates an OKR, generates a PRD from the task
114description using an LLM, runs the Ralph autonomous agent loop to implement all user stories,
115and maps results back to OKR outcomes.
116
117This is the programmatic equivalent of the `/go` TUI command. OKR approval is automatic
118(no interactive gate) since this is called by MCP clients.
119
120The pipeline runs in the background. Use action "watch" to monitor progress.
121
122Actions:
123- execute: Launch the autonomous pipeline (OKR → PRD → Ralph → results). Returns immediately.
124- watch: Watch a running pipeline's progress by okr_id. Shows PRD status, progress notes, and phase.
125- status: Check final status of an OKR run by okr_id
126
127Required for execute: task
128Optional for execute: max_iterations (default 10), max_concurrent_stories (default 3), model
129Required for watch/status: okr_id"#
130 }
131
132 fn parameters(&self) -> Value {
133 json!({
134 "type": "object",
135 "properties": {
136 "action": {
137 "type": "string",
138 "enum": ["execute", "watch", "status"],
139 "description": "Action to perform. Use 'execute' to start, 'watch' to monitor progress, 'status' for OKR results."
140 },
141 "task": {
142 "type": "string",
143 "description": "Task description for autonomous execution"
144 },
145 "max_iterations": {
146 "type": "integer",
147 "description": "Maximum Ralph iterations (default: 10)"
148 },
149 "max_concurrent_stories": {
150 "type": "integer",
151 "description": "Maximum concurrent stories in Ralph (default: 3)"
152 },
153 "model": {
154 "type": "string",
155 "description": "Model to use for PRD generation and Ralph execution"
156 },
157 "okr_id": {
158 "type": "string",
159 "description": "OKR ID for watch/status actions"
160 }
161 },
162 "required": ["action"]
163 })
164 }
165
166 async fn execute(&self, params: Value) -> Result<ToolResult> {
167 let p: GoParams = serde_json::from_value(params).context("Invalid params")?;
168
169 match p.action.as_str() {
170 "execute" => self.execute_go(p).await,
171 "watch" => self.watch_go(p).await,
172 "status" => self.check_status(p).await,
173 _ => Ok(ToolResult::structured_error(
174 "INVALID_ACTION",
175 "go",
176 &format!(
177 "Unknown action: '{}'. Valid actions: execute, watch, status",
178 p.action
179 ),
180 None,
181 Some(json!({
182 "action": "execute",
183 "task": "implement feature X with tests"
184 })),
185 )),
186 }
187 }
188}
189
190impl GoTool {
191 async fn execute_go(&self, p: GoParams) -> Result<ToolResult> {
192 let task = match p.task {
193 Some(t) if !t.trim().is_empty() => t,
194 _ => {
195 return Ok(ToolResult::structured_error(
196 "MISSING_FIELD",
197 "go",
198 "task is required for execute action",
199 Some(vec!["task"]),
200 Some(json!({
201 "action": "execute",
202 "task": "implement user authentication with OAuth2"
203 })),
204 ));
205 }
206 };
207
208 let max_iterations = p.max_iterations.unwrap_or(10);
209 let max_concurrent = p.max_concurrent_stories.unwrap_or(3);
210
211 let registry = Arc::new(
213 crate::provider::ProviderRegistry::from_vault()
214 .await
215 .context("Failed to load providers from Vault")?,
216 );
217
218 let model = p.model.unwrap_or_else(|| "zai/glm-5".to_string());
220 let (provider, resolved_model) = resolve_provider(®istry, &model)?;
221
222 let okr_id = Uuid::new_v4();
224 let okr_id_str = okr_id.to_string();
225 let mut okr = create_default_okr(okr_id, &task);
226 let mut run = OkrRun::new(
227 okr_id,
228 format!("Go {}", chrono::Local::now().format("%Y-%m-%d %H:%M")),
229 );
230 let _ = run.submit_for_approval();
231 run.record_decision(crate::okr::ApprovalDecision::approve(
232 run.id,
233 "Auto-approved via MCP go tool",
234 ));
235
236 let run_id = run.id;
237 let prd_filename = format!("prd_{}.json", run_id.to_string().replace('-', "_"));
238 let progress_filename = format!("progress_{}.txt", run_id.to_string().replace('-', "_"));
239
240 if let Ok(repo) = OkrRepository::from_config().await {
242 let _ = repo.create_okr(okr.clone()).await;
243 let _ = repo.create_run(run.clone()).await;
244 }
245
246 let working_dir = std::env::current_dir()
247 .map(|p| p.display().to_string())
248 .unwrap_or_else(|_| ".".into());
249
250 let active_run = ActiveGoRun {
252 okr_id: okr_id_str.clone(),
253 task: task.clone(),
254 model: resolved_model.clone(),
255 started_at: chrono::Utc::now().to_rfc3339(),
256 working_dir: working_dir.clone(),
257 prd_filename: prd_filename.clone(),
258 progress_filename: progress_filename.clone(),
259 phase: GoRunPhase::Starting,
260 };
261 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
262 runs.insert(okr_id_str.clone(), active_run);
263 }
264
265 tracing::info!(
266 task = %task,
267 okr_id = %okr_id,
268 model = %resolved_model,
269 max_iterations,
270 "Starting /go autonomous pipeline via MCP (background)"
271 );
272
273 let bg_okr_id = okr_id_str.clone();
275 let bg_task = task.clone();
276 let bg_model = resolved_model.clone();
277 let bg_callback = self.completion_callback.clone();
278 tokio::spawn(async move {
279 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
281 && let Some(r) = runs.get_mut(&bg_okr_id)
282 {
283 r.phase = GoRunPhase::Running;
284 }
285
286 let bus = crate::bus::AgentBus::new().into_arc();
288 crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
289
290 match execute_go_ralph(
291 &bg_task,
292 &mut okr,
293 &mut run,
294 provider,
295 &bg_model,
296 max_iterations,
297 Some(bus),
298 max_concurrent,
299 Some(registry),
300 )
301 .await
302 {
303 Ok(result) => {
304 if let Ok(repo) = OkrRepository::from_config().await {
306 let _ = repo.update_run(run).await;
307 }
308
309 let summary = format_go_ralph_result(&result, &bg_task);
310
311 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
312 && let Some(r) = runs.get_mut(&bg_okr_id)
313 {
314 r.phase = GoRunPhase::Completed {
315 passed: result.passed,
316 total: result.total,
317 all_passed: result.all_passed,
318 feature_branch: result.feature_branch,
319 summary,
320 };
321 }
322
323 tracing::info!(
324 okr_id = %bg_okr_id,
325 passed = result.passed,
326 total = result.total,
327 "Go pipeline completed"
328 );
329
330 if let Some(ref cb) = bg_callback {
332 let phase_str = {
333 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
334 if let Some(r) = runs.get(&bg_okr_id) {
335 if let GoRunPhase::Completed {
336 passed,
337 total,
338 all_passed,
339 ref feature_branch,
340 ref summary,
341 } = r.phase
342 {
343 format!(
344 "# Go Pipeline Completed {}\n\n\
345 **OKR ID:** `{}`\n\
346 **Result:** {}/{} stories passed\n\
347 **Branch:** `{}`\n\n{}",
348 if all_passed { "✅" } else { "⚠️" },
349 bg_okr_id,
350 passed,
351 total,
352 feature_branch,
353 summary
354 )
355 } else {
356 format!("Go pipeline completed for OKR `{}`", bg_okr_id)
357 }
358 } else {
359 format!("Go pipeline completed for OKR `{}`", bg_okr_id)
360 }
361 };
362 cb(phase_str);
363 }
364 }
365 Err(err) => {
366 run.status = crate::okr::OkrRunStatus::Failed;
368 if let Ok(repo) = OkrRepository::from_config().await {
369 let _ = repo.update_run(run).await;
370 }
371
372 let error_msg = err.to_string();
373 if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
374 && let Some(r) = runs.get_mut(&bg_okr_id)
375 {
376 r.phase = GoRunPhase::Failed {
377 error: error_msg.clone(),
378 };
379 }
380
381 tracing::error!(
382 okr_id = %bg_okr_id,
383 error = %error_msg,
384 "Go pipeline failed"
385 );
386
387 if let Some(ref cb) = bg_callback {
389 cb(format!(
390 "# Go Pipeline Failed ❌\n\n**OKR ID:** `{}`\n**Error:** {}",
391 bg_okr_id, error_msg
392 ));
393 }
394 }
395 }
396 });
397
398 let output = format!(
400 "# Go Pipeline Launched\n\n\
401 **OKR ID:** `{okr_id_str}`\n\
402 **Task:** {task}\n\
403 **Model:** {resolved_model}\n\
404 **Max Iterations:** {max_iterations}\n\
405 **Working Directory:** {working_dir}\n\n\
406 The autonomous pipeline is now running in the background.\n\n\
407 ## Monitor Progress\n\n\
408 Use the `go` tool with action `watch` to monitor this pipeline:\n\n\
409 ```json\n\
410 {{\"action\": \"watch\", \"okr_id\": \"{okr_id_str}\"}}\n\
411 ```\n\n\
412 The pipeline will:\n\
413 1. Generate a PRD from the task description\n\
414 2. Run the Ralph loop to implement all user stories\n\
415 3. Run quality checks (typecheck, lint, test, build)\n\
416 4. Map results back to OKR outcomes\n\n\
417 PRD file: `{prd_filename}`\n\
418 Progress file: `{progress_filename}`"
419 );
420
421 Ok(ToolResult::success(output)
422 .with_metadata("okr_id", json!(okr_id_str))
423 .with_metadata("phase", json!("starting"))
424 .with_metadata("prd_filename", json!(prd_filename))
425 .with_metadata("progress_filename", json!(progress_filename))
426 .with_metadata(
427 "watch_hint",
428 json!({
429 "tool": "go",
430 "args": {"action": "watch", "okr_id": okr_id_str}
431 }),
432 ))
433 }
434
435 async fn watch_go(&self, p: GoParams) -> Result<ToolResult> {
436 let okr_id_str = match p.okr_id {
437 Some(id) if !id.trim().is_empty() => id,
438 _ => {
439 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
441 if runs.is_empty() {
442 return Ok(ToolResult::success(
443 "No active go pipelines. Use `go(action: \"execute\", task: \"...\")` to start one.",
444 ));
445 }
446
447 let mut output = String::from("# Active Go Pipelines\n\n");
448 for (id, run) in runs.iter() {
449 let phase_str = match &run.phase {
450 GoRunPhase::Starting => "Starting".to_string(),
451 GoRunPhase::Running => "Running".to_string(),
452 GoRunPhase::Completed { passed, total, .. } => {
453 format!("Completed ({passed}/{total} passed)")
454 }
455 GoRunPhase::Failed { error } => {
456 format!("Failed: {}", crate::util::truncate_bytes_safe(&error, 80))
457 }
458 };
459 output.push_str(&format!(
460 "- **{id}**: {phase_str}\n Task: {}\n Started: {}\n\n",
461 run.task, run.started_at
462 ));
463 }
464 output.push_str("Use `go(action: \"watch\", okr_id: \"<id>\")` for details.");
465 return Ok(ToolResult::success(output));
466 }
467 };
468
469 let active_run = {
471 let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
472 runs.get(&okr_id_str).cloned()
473 };
474
475 let Some(run) = active_run else {
476 return Ok(ToolResult::error(format!(
477 "No active go pipeline found for OKR `{okr_id_str}`.\n\n\
478 Use `go(action: \"watch\")` with no okr_id to list active pipelines,\n\
479 or `go(action: \"status\", okr_id: \"...\")` to check completed runs."
480 )));
481 };
482
483 let mut output = format!(
484 "# Go Pipeline Status\n\n\
485 **OKR ID:** `{}`\n\
486 **Task:** {}\n\
487 **Model:** {}\n\
488 **Started:** {}\n\
489 **Working Directory:** {}\n",
490 run.okr_id, run.task, run.model, run.started_at, run.working_dir
491 );
492
493 match &run.phase {
495 GoRunPhase::Starting => {
496 output.push_str("\n**Phase:** Starting (generating PRD...)\n");
497 }
498 GoRunPhase::Running => {
499 output.push_str("\n**Phase:** Running Ralph loop\n");
500 }
501 GoRunPhase::Completed {
502 passed,
503 total,
504 all_passed,
505 feature_branch,
506 summary,
507 } => {
508 output.push_str(&format!(
509 "\n**Phase:** Completed {}\n\
510 **Result:** {passed}/{total} stories passed\n\
511 **Feature Branch:** `{feature_branch}`\n\n\
512 ## Summary\n\n{summary}\n",
513 if *all_passed { "✅" } else { "⚠️" }
514 ));
515 }
516 GoRunPhase::Failed { error } => {
517 output.push_str(&format!("\n**Phase:** Failed ❌\n**Error:** {error}\n"));
518 }
519 }
520
521 if let Ok(prd_content) = std::fs::read_to_string(&run.prd_filename)
523 && let Ok(prd) = serde_json::from_str::<Value>(&prd_content)
524 && let Some(stories) = prd.get("user_stories").and_then(|s| s.as_array())
525 {
526 output.push_str("\n## Stories\n\n");
527 let mut passed_count = 0;
528 for story in stories {
529 let id = story.get("id").and_then(|v| v.as_str()).unwrap_or("?");
530 let title = story.get("title").and_then(|v| v.as_str()).unwrap_or("?");
531 let passes = story
532 .get("passes")
533 .and_then(|v| v.as_bool())
534 .unwrap_or(false);
535 let icon = if passes {
536 passed_count += 1;
537 "✅"
538 } else {
539 "⏳"
540 };
541 output.push_str(&format!("- {icon} **{id}**: {title}\n"));
542 }
543 output.push_str(&format!(
544 "\n**Progress:** {passed_count}/{} stories passed\n",
545 stories.len()
546 ));
547 }
548
549 if let Ok(progress) = std::fs::read_to_string(&run.progress_filename)
551 && !progress.trim().is_empty()
552 {
553 let lines: Vec<&str> = progress.lines().collect();
555 let start = lines.len().saturating_sub(30);
556 let tail: String = lines[start..].join("\n");
557 output.push_str(&format!(
558 "\n## Progress Notes (last {} lines)\n\n```\n{tail}\n```\n",
559 lines.len().min(30)
560 ));
561 }
562
563 if matches!(run.phase, GoRunPhase::Starting | GoRunPhase::Running) {
565 output.push_str(&format!(
566 "\n---\n*Pipeline still running. Call `go(action: \"watch\", okr_id: \"{}\")` again to check progress.*\n",
567 run.okr_id
568 ));
569 }
570
571 Ok(ToolResult::success(output)
572 .with_metadata("okr_id", json!(run.okr_id))
573 .with_metadata(
574 "phase",
575 json!(serde_json::to_value(&run.phase).unwrap_or(json!("unknown"))),
576 ))
577 }
578
579 async fn check_status(&self, p: GoParams) -> Result<ToolResult> {
580 let okr_id_str = match p.okr_id {
581 Some(id) if !id.trim().is_empty() => id,
582 _ => {
583 return Ok(ToolResult::structured_error(
584 "MISSING_FIELD",
585 "go",
586 "okr_id is required for status action",
587 Some(vec!["okr_id"]),
588 Some(json!({
589 "action": "status",
590 "okr_id": "uuid-of-okr"
591 })),
592 ));
593 }
594 };
595
596 let okr_id: Uuid = okr_id_str
597 .parse()
598 .context("Invalid UUID format for okr_id")?;
599
600 let repo = OkrRepository::from_config()
601 .await
602 .context("Failed to load OKR repository")?;
603
604 let okr = match repo.get_okr(okr_id).await? {
605 Some(okr) => okr,
606 None => {
607 return Ok(ToolResult::error(format!("OKR not found: {okr_id}")));
608 }
609 };
610
611 let runs = repo.list_runs().await.unwrap_or_default();
612 let runs: Vec<_> = runs.into_iter().filter(|r| r.okr_id == okr_id).collect();
613 let latest_run = runs.last();
614
615 let kr_status: Vec<String> = okr
616 .key_results
617 .iter()
618 .map(|kr| {
619 format!(
620 " - {}: {:.0}/{:.0} {} ({:.0}%)",
621 kr.title,
622 kr.current_value,
623 kr.target_value,
624 kr.unit,
625 kr.progress() * 100.0
626 )
627 })
628 .collect();
629
630 let run_info = if let Some(run) = latest_run {
631 format!(
632 "\nLatest Run: {} ({:?})\n Iterations: {}\n Outcomes: {}",
633 run.name,
634 run.status,
635 run.iterations,
636 run.outcomes.len()
637 )
638 } else {
639 "\nNo runs found.".to_string()
640 };
641
642 let output = format!(
643 "# Go Status\n\n**OKR:** {}\n**Status:** {:?}\n**Progress:** {:.0}%\n\n## Key Results\n{}\n{}",
644 okr.title,
645 okr.status,
646 okr.progress() * 100.0,
647 kr_status.join("\n"),
648 run_info
649 );
650
651 Ok(ToolResult::success(output).with_metadata("okr_id", json!(okr_id.to_string())))
652 }
653}
654
655fn create_default_okr(okr_id: Uuid, task: &str) -> Okr {
657 let title = if task.len() > 60 {
658 format!("Go: {}…", &task[..57])
659 } else {
660 format!("Go: {task}")
661 };
662
663 let mut okr = Okr::new(title, format!("Autonomous execution: {task}"));
664 okr.id = okr_id;
665
666 okr.add_key_result(KeyResult::new(okr_id, "All stories complete", 100.0, "%"));
667 okr.add_key_result(KeyResult::new(okr_id, "Quality gates pass", 1.0, "count"));
668 okr.add_key_result(KeyResult::new(okr_id, "No critical errors", 0.0, "count"));
669
670 okr
671}
672
673fn resolve_provider(
676 registry: &crate::provider::ProviderRegistry,
677 model: &str,
678) -> Result<(Arc<dyn crate::provider::Provider>, String)> {
679 let (provider_name, model_name) = crate::provider::parse_model_string(model);
680
681 if let Some(provider_name) = provider_name {
683 let normalized_provider = if provider_name == "zhipuai" {
684 "zai"
685 } else {
686 provider_name
687 };
688 if let Some(provider) = registry.get(normalized_provider) {
689 return Ok((provider, model_name.to_string()));
690 }
691 let available = registry.list().join(", ");
692 anyhow::bail!(
693 "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
694 normalized_provider,
695 available
696 );
697 }
698
699 let fallbacks = [
701 "zai",
702 "openai",
703 "github-copilot",
704 "anthropic",
705 "openrouter",
706 "novita",
707 "moonshotai",
708 "google",
709 ];
710
711 for name in fallbacks {
712 if let Some(provider) = registry.get(name) {
713 return Ok((provider, model.to_string()));
714 }
715 }
716
717 if let Some(name) = registry.list().into_iter().next()
719 && let Some(provider) = registry.get(name)
720 {
721 return Ok((provider, model.to_string()));
722 }
723
724 anyhow::bail!("No provider available for model '{model}' and no fallback providers found")
725}