Skip to main content

codetether_agent/tool/
go.rs

1//! Go Tool - Autonomous task execution via OKR → PRD → Ralph pipeline
2//!
3//! Exposes the `/go` command as an MCP-callable tool for programmatic
4//! autonomous work execution. Creates an OKR, generates a PRD from the
5//! task description, runs the Ralph loop, and maps results back to the OKR.
6//!
7//! The `execute` action spawns the pipeline in a background task and returns
8//! immediately so the MCP client can monitor progress via `go_watch`.
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use uuid::Uuid;
17
18use super::{Tool, ToolResult};
19use crate::cli::go_ralph::{execute_go_ralph, format_go_ralph_result};
20use crate::okr::{KeyResult, Okr, OkrRepository, OkrRun};
21
22// ─── Active execution tracking ──────────────────────────────────────────
23
24/// Phase of a running go pipeline
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(tag = "phase", rename_all = "snake_case")]
27pub enum GoRunPhase {
28    Starting,
29    Running,
30    Completed {
31        passed: usize,
32        total: usize,
33        all_passed: bool,
34        feature_branch: String,
35        summary: String,
36    },
37    Failed {
38        error: String,
39    },
40}
41
42/// Metadata for an active go run
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ActiveGoRun {
45    pub okr_id: String,
46    pub task: String,
47    pub model: String,
48    pub started_at: String,
49    pub working_dir: String,
50    pub prd_filename: String,
51    pub progress_filename: String,
52    pub phase: GoRunPhase,
53}
54
55/// Global registry of active (and recently completed) go runs.
56static ACTIVE_GO_RUNS: std::sync::LazyLock<Mutex<HashMap<String, ActiveGoRun>>> =
57    std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
58
59#[derive(Deserialize)]
60struct GoParams {
61    action: String,
62    #[serde(default)]
63    task: Option<String>,
64    #[serde(default)]
65    max_iterations: Option<usize>,
66    #[serde(default)]
67    max_concurrent_stories: Option<usize>,
68    #[serde(default)]
69    model: Option<String>,
70    #[serde(default)]
71    okr_id: Option<String>,
72}
73
74pub struct GoTool {
75    /// Optional callback invoked when the background pipeline completes or fails.
76    /// Used by the A2A worker to stream the final OKR result back to the calling LLM.
77    completion_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
78}
79
80impl GoTool {
81    pub fn new() -> Self {
82        Self {
83            completion_callback: None,
84        }
85    }
86
87    /// Construct with a completion callback that fires when the background pipeline finishes.
88    pub fn with_callback(cb: Arc<dyn Fn(String) + Send + Sync + 'static>) -> Self {
89        Self {
90            completion_callback: Some(cb),
91        }
92    }
93}
94
95#[async_trait]
96impl Tool for GoTool {
97    fn id(&self) -> &str {
98        "go"
99    }
100
101    fn name(&self) -> &str {
102        "Go"
103    }
104
105    fn description(&self) -> &str {
106        r#"Autonomous task execution pipeline. Creates an OKR, generates a PRD from the task 
107description using an LLM, runs the Ralph autonomous agent loop to implement all user stories, 
108and maps results back to OKR outcomes.
109
110This is the programmatic equivalent of the `/go` TUI command. OKR approval is automatic 
111(no interactive gate) since this is called by MCP clients.
112
113The pipeline runs in the background. Use action "watch" to monitor progress.
114
115Actions:
116- execute: Launch the autonomous pipeline (OKR → PRD → Ralph → results). Returns immediately.
117- watch: Watch a running pipeline's progress by okr_id. Shows PRD status, progress notes, and phase.
118- status: Check final status of an OKR run by okr_id
119
120Required for execute: task
121Optional for execute: max_iterations (default 10), max_concurrent_stories (default 3), model
122Required for watch/status: okr_id"#
123    }
124
125    fn parameters(&self) -> Value {
126        json!({
127            "type": "object",
128            "properties": {
129                "action": {
130                    "type": "string",
131                    "enum": ["execute", "watch", "status"],
132                    "description": "Action to perform. Use 'execute' to start, 'watch' to monitor progress, 'status' for OKR results."
133                },
134                "task": {
135                    "type": "string",
136                    "description": "Task description for autonomous execution"
137                },
138                "max_iterations": {
139                    "type": "integer",
140                    "description": "Maximum Ralph iterations (default: 10)"
141                },
142                "max_concurrent_stories": {
143                    "type": "integer",
144                    "description": "Maximum concurrent stories in Ralph (default: 3)"
145                },
146                "model": {
147                    "type": "string",
148                    "description": "Model to use for PRD generation and Ralph execution"
149                },
150                "okr_id": {
151                    "type": "string",
152                    "description": "OKR ID for watch/status actions"
153                }
154            },
155            "required": ["action"]
156        })
157    }
158
159    async fn execute(&self, params: Value) -> Result<ToolResult> {
160        let p: GoParams = serde_json::from_value(params).context("Invalid params")?;
161
162        match p.action.as_str() {
163            "execute" => self.execute_go(p).await,
164            "watch" => self.watch_go(p).await,
165            "status" => self.check_status(p).await,
166            _ => Ok(ToolResult::structured_error(
167                "INVALID_ACTION",
168                "go",
169                &format!(
170                    "Unknown action: '{}'. Valid actions: execute, watch, status",
171                    p.action
172                ),
173                None,
174                Some(json!({
175                    "action": "execute",
176                    "task": "implement feature X with tests"
177                })),
178            )),
179        }
180    }
181}
182
183impl GoTool {
184    async fn execute_go(&self, p: GoParams) -> Result<ToolResult> {
185        let task = match p.task {
186            Some(t) if !t.trim().is_empty() => t,
187            _ => {
188                return Ok(ToolResult::structured_error(
189                    "MISSING_FIELD",
190                    "go",
191                    "task is required for execute action",
192                    Some(vec!["task"]),
193                    Some(json!({
194                        "action": "execute",
195                        "task": "implement user authentication with OAuth2"
196                    })),
197                ));
198            }
199        };
200
201        let max_iterations = p.max_iterations.unwrap_or(10);
202        let max_concurrent = p.max_concurrent_stories.unwrap_or(3);
203
204        // Load provider registry from Vault
205        let registry = Arc::new(
206            crate::provider::ProviderRegistry::from_vault()
207                .await
208                .context("Failed to load providers from Vault")?,
209        );
210
211        // Resolve model and provider
212        let model = p.model.unwrap_or_else(|| "zai/glm-5".to_string());
213        let (provider, resolved_model) = resolve_provider(&registry, &model)?;
214
215        // Create OKR with default template
216        let okr_id = Uuid::new_v4();
217        let okr_id_str = okr_id.to_string();
218        let mut okr = create_default_okr(okr_id, &task);
219        let mut run = OkrRun::new(
220            okr_id,
221            format!("Go {}", chrono::Local::now().format("%Y-%m-%d %H:%M")),
222        );
223        let _ = run.submit_for_approval();
224        run.record_decision(crate::okr::ApprovalDecision::approve(
225            run.id,
226            "Auto-approved via MCP go tool",
227        ));
228
229        let run_id = run.id;
230        let prd_filename = format!("prd_{}.json", run_id.to_string().replace('-', "_"));
231        let progress_filename = format!("progress_{}.txt", run_id.to_string().replace('-', "_"));
232
233        // Persist OKR before execution
234        if let Ok(repo) = OkrRepository::from_config().await {
235            let _ = repo.create_okr(okr.clone()).await;
236            let _ = repo.create_run(run.clone()).await;
237        }
238
239        let working_dir = std::env::current_dir()
240            .map(|p| p.display().to_string())
241            .unwrap_or_else(|_| ".".into());
242
243        // Register active run
244        let active_run = ActiveGoRun {
245            okr_id: okr_id_str.clone(),
246            task: task.clone(),
247            model: resolved_model.clone(),
248            started_at: chrono::Utc::now().to_rfc3339(),
249            working_dir: working_dir.clone(),
250            prd_filename: prd_filename.clone(),
251            progress_filename: progress_filename.clone(),
252            phase: GoRunPhase::Starting,
253        };
254        if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
255            runs.insert(okr_id_str.clone(), active_run);
256        }
257
258        tracing::info!(
259            task = %task,
260            okr_id = %okr_id,
261            model = %resolved_model,
262            max_iterations,
263            "Starting /go autonomous pipeline via MCP (background)"
264        );
265
266        // Spawn the pipeline in a background task
267        let bg_okr_id = okr_id_str.clone();
268        let bg_task = task.clone();
269        let bg_model = resolved_model.clone();
270        let bg_callback = self.completion_callback.clone();
271        tokio::spawn(async move {
272            // Update phase to Running
273            if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
274                if let Some(r) = runs.get_mut(&bg_okr_id) {
275                    r.phase = GoRunPhase::Running;
276                }
277            }
278
279            // Create bus with S3 sink for training data archival
280            let bus = crate::bus::AgentBus::new().into_arc();
281            crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
282
283            match execute_go_ralph(
284                &bg_task,
285                &mut okr,
286                &mut run,
287                provider,
288                &bg_model,
289                max_iterations,
290                Some(bus),
291                max_concurrent,
292                Some(registry),
293            )
294            .await
295            {
296                Ok(result) => {
297                    // Persist final state
298                    if let Ok(repo) = OkrRepository::from_config().await {
299                        let _ = repo.update_run(run).await;
300                    }
301
302                    let summary = format_go_ralph_result(&result, &bg_task);
303
304                    if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
305                        if let Some(r) = runs.get_mut(&bg_okr_id) {
306                            r.phase = GoRunPhase::Completed {
307                                passed: result.passed,
308                                total: result.total,
309                                all_passed: result.all_passed,
310                                feature_branch: result.feature_branch,
311                                summary,
312                            };
313                        }
314                    }
315
316                    tracing::info!(
317                        okr_id = %bg_okr_id,
318                        passed = result.passed,
319                        total = result.total,
320                        "Go pipeline completed"
321                    );
322
323                    // Notify the LLM session that the pipeline finished
324                    if let Some(ref cb) = bg_callback {
325                        let phase_str = {
326                            let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
327                            if let Some(r) = runs.get(&bg_okr_id) {
328                                if let GoRunPhase::Completed {
329                                    passed,
330                                    total,
331                                    all_passed,
332                                    ref feature_branch,
333                                    ref summary,
334                                } = r.phase
335                                {
336                                    format!(
337                                        "# Go Pipeline Completed {}\n\n\
338                                         **OKR ID:** `{}`\n\
339                                         **Result:** {}/{} stories passed\n\
340                                         **Branch:** `{}`\n\n{}",
341                                        if all_passed { "✅" } else { "⚠️" },
342                                        bg_okr_id,
343                                        passed,
344                                        total,
345                                        feature_branch,
346                                        summary
347                                    )
348                                } else {
349                                    format!("Go pipeline completed for OKR `{}`", bg_okr_id)
350                                }
351                            } else {
352                                format!("Go pipeline completed for OKR `{}`", bg_okr_id)
353                            }
354                        };
355                        cb(phase_str);
356                    }
357                }
358                Err(err) => {
359                    // Mark run as failed
360                    run.status = crate::okr::OkrRunStatus::Failed;
361                    if let Ok(repo) = OkrRepository::from_config().await {
362                        let _ = repo.update_run(run).await;
363                    }
364
365                    let error_msg = err.to_string();
366                    if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
367                        if let Some(r) = runs.get_mut(&bg_okr_id) {
368                            r.phase = GoRunPhase::Failed {
369                                error: error_msg.clone(),
370                            };
371                        }
372                    }
373
374                    tracing::error!(
375                        okr_id = %bg_okr_id,
376                        error = %error_msg,
377                        "Go pipeline failed"
378                    );
379
380                    // Notify the LLM session that the pipeline failed
381                    if let Some(ref cb) = bg_callback {
382                        cb(format!(
383                            "# Go Pipeline Failed ❌\n\n**OKR ID:** `{}`\n**Error:** {}",
384                            bg_okr_id, error_msg
385                        ));
386                    }
387                }
388            }
389        });
390
391        // Return immediately with launch confirmation + watch instructions
392        let output = format!(
393            "# Go Pipeline Launched\n\n\
394             **OKR ID:** `{okr_id_str}`\n\
395             **Task:** {task}\n\
396             **Model:** {resolved_model}\n\
397             **Max Iterations:** {max_iterations}\n\
398             **Working Directory:** {working_dir}\n\n\
399             The autonomous pipeline is now running in the background.\n\n\
400             ## Monitor Progress\n\n\
401             Use the `go` tool with action `watch` to monitor this pipeline:\n\n\
402             ```json\n\
403             {{\"action\": \"watch\", \"okr_id\": \"{okr_id_str}\"}}\n\
404             ```\n\n\
405             The pipeline will:\n\
406             1. Generate a PRD from the task description\n\
407             2. Run the Ralph loop to implement all user stories\n\
408             3. Run quality checks (typecheck, lint, test, build)\n\
409             4. Map results back to OKR outcomes\n\n\
410             PRD file: `{prd_filename}`\n\
411             Progress file: `{progress_filename}`"
412        );
413
414        Ok(ToolResult::success(output)
415            .with_metadata("okr_id", json!(okr_id_str))
416            .with_metadata("phase", json!("starting"))
417            .with_metadata("prd_filename", json!(prd_filename))
418            .with_metadata("progress_filename", json!(progress_filename))
419            .with_metadata(
420                "watch_hint",
421                json!({
422                    "tool": "go",
423                    "args": {"action": "watch", "okr_id": okr_id_str}
424                }),
425            ))
426    }
427
428    async fn watch_go(&self, p: GoParams) -> Result<ToolResult> {
429        let okr_id_str = match p.okr_id {
430            Some(id) if !id.trim().is_empty() => id,
431            _ => {
432                // If no OKR ID given, list all active runs
433                let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
434                if runs.is_empty() {
435                    return Ok(ToolResult::success(
436                        "No active go pipelines. Use `go(action: \"execute\", task: \"...\")` to start one.",
437                    ));
438                }
439
440                let mut output = String::from("# Active Go Pipelines\n\n");
441                for (id, run) in runs.iter() {
442                    let phase_str = match &run.phase {
443                        GoRunPhase::Starting => "Starting".to_string(),
444                        GoRunPhase::Running => "Running".to_string(),
445                        GoRunPhase::Completed { passed, total, .. } => {
446                            format!("Completed ({passed}/{total} passed)")
447                        }
448                        GoRunPhase::Failed { error } => {
449                            format!("Failed: {}", &error[..error.len().min(80)])
450                        }
451                    };
452                    output.push_str(&format!(
453                        "- **{id}**: {phase_str}\n  Task: {}\n  Started: {}\n\n",
454                        run.task, run.started_at
455                    ));
456                }
457                output.push_str("Use `go(action: \"watch\", okr_id: \"<id>\")` for details.");
458                return Ok(ToolResult::success(output));
459            }
460        };
461
462        // Look up the active run
463        let active_run = {
464            let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
465            runs.get(&okr_id_str).cloned()
466        };
467
468        let Some(run) = active_run else {
469            return Ok(ToolResult::error(format!(
470                "No active go pipeline found for OKR `{okr_id_str}`.\n\n\
471                 Use `go(action: \"watch\")` with no okr_id to list active pipelines,\n\
472                 or `go(action: \"status\", okr_id: \"...\")` to check completed runs."
473            )));
474        };
475
476        let mut output = format!(
477            "# Go Pipeline Status\n\n\
478             **OKR ID:** `{}`\n\
479             **Task:** {}\n\
480             **Model:** {}\n\
481             **Started:** {}\n\
482             **Working Directory:** {}\n",
483            run.okr_id, run.task, run.model, run.started_at, run.working_dir
484        );
485
486        // Phase
487        match &run.phase {
488            GoRunPhase::Starting => {
489                output.push_str("\n**Phase:** Starting (generating PRD...)\n");
490            }
491            GoRunPhase::Running => {
492                output.push_str("\n**Phase:** Running Ralph loop\n");
493            }
494            GoRunPhase::Completed {
495                passed,
496                total,
497                all_passed,
498                feature_branch,
499                summary,
500            } => {
501                output.push_str(&format!(
502                    "\n**Phase:** Completed {}\n\
503                     **Result:** {passed}/{total} stories passed\n\
504                     **Feature Branch:** `{feature_branch}`\n\n\
505                     ## Summary\n\n{summary}\n",
506                    if *all_passed { "✅" } else { "⚠️" }
507                ));
508            }
509            GoRunPhase::Failed { error } => {
510                output.push_str(&format!("\n**Phase:** Failed ❌\n**Error:** {error}\n"));
511            }
512        }
513
514        // Read PRD file for story-level progress
515        if let Ok(prd_content) = std::fs::read_to_string(&run.prd_filename) {
516            if let Ok(prd) = serde_json::from_str::<Value>(&prd_content) {
517                if let Some(stories) = prd.get("user_stories").and_then(|s| s.as_array()) {
518                    output.push_str("\n## Stories\n\n");
519                    let mut passed_count = 0;
520                    for story in stories {
521                        let id = story.get("id").and_then(|v| v.as_str()).unwrap_or("?");
522                        let title = story.get("title").and_then(|v| v.as_str()).unwrap_or("?");
523                        let passes = story
524                            .get("passes")
525                            .and_then(|v| v.as_bool())
526                            .unwrap_or(false);
527                        let icon = if passes {
528                            passed_count += 1;
529                            "✅"
530                        } else {
531                            "⏳"
532                        };
533                        output.push_str(&format!("- {icon} **{id}**: {title}\n"));
534                    }
535                    output.push_str(&format!(
536                        "\n**Progress:** {passed_count}/{} stories passed\n",
537                        stories.len()
538                    ));
539                }
540            }
541        }
542
543        // Read progress file
544        if let Ok(progress) = std::fs::read_to_string(&run.progress_filename) {
545            if !progress.trim().is_empty() {
546                // Show last 30 lines to avoid overwhelming output
547                let lines: Vec<&str> = progress.lines().collect();
548                let start = lines.len().saturating_sub(30);
549                let tail: String = lines[start..].join("\n");
550                output.push_str(&format!(
551                    "\n## Progress Notes (last {} lines)\n\n```\n{tail}\n```\n",
552                    lines.len().min(30)
553                ));
554            }
555        }
556
557        // Hint for next action
558        if matches!(run.phase, GoRunPhase::Starting | GoRunPhase::Running) {
559            output.push_str(&format!(
560                "\n---\n*Pipeline still running. Call `go(action: \"watch\", okr_id: \"{}\")` again to check progress.*\n",
561                run.okr_id
562            ));
563        }
564
565        Ok(ToolResult::success(output)
566            .with_metadata("okr_id", json!(run.okr_id))
567            .with_metadata(
568                "phase",
569                json!(serde_json::to_value(&run.phase).unwrap_or(json!("unknown"))),
570            ))
571    }
572
573    async fn check_status(&self, p: GoParams) -> Result<ToolResult> {
574        let okr_id_str = match p.okr_id {
575            Some(id) if !id.trim().is_empty() => id,
576            _ => {
577                return Ok(ToolResult::structured_error(
578                    "MISSING_FIELD",
579                    "go",
580                    "okr_id is required for status action",
581                    Some(vec!["okr_id"]),
582                    Some(json!({
583                        "action": "status",
584                        "okr_id": "uuid-of-okr"
585                    })),
586                ));
587            }
588        };
589
590        let okr_id: Uuid = okr_id_str
591            .parse()
592            .context("Invalid UUID format for okr_id")?;
593
594        let repo = OkrRepository::from_config()
595            .await
596            .context("Failed to load OKR repository")?;
597
598        let okr = match repo.get_okr(okr_id).await? {
599            Some(okr) => okr,
600            None => {
601                return Ok(ToolResult::error(format!("OKR not found: {okr_id}")));
602            }
603        };
604
605        let runs = repo.list_runs().await.unwrap_or_default();
606        let runs: Vec<_> = runs.into_iter().filter(|r| r.okr_id == okr_id).collect();
607        let latest_run = runs.last();
608
609        let kr_status: Vec<String> = okr
610            .key_results
611            .iter()
612            .map(|kr| {
613                format!(
614                    "  - {}: {:.0}/{:.0} {} ({:.0}%)",
615                    kr.title,
616                    kr.current_value,
617                    kr.target_value,
618                    kr.unit,
619                    kr.progress() * 100.0
620                )
621            })
622            .collect();
623
624        let run_info = if let Some(run) = latest_run {
625            format!(
626                "\nLatest Run: {} ({:?})\n  Iterations: {}\n  Outcomes: {}",
627                run.name,
628                run.status,
629                run.iterations,
630                run.outcomes.len()
631            )
632        } else {
633            "\nNo runs found.".to_string()
634        };
635
636        let output = format!(
637            "# Go Status\n\n**OKR:** {}\n**Status:** {:?}\n**Progress:** {:.0}%\n\n## Key Results\n{}\n{}",
638            okr.title,
639            okr.status,
640            okr.progress() * 100.0,
641            kr_status.join("\n"),
642            run_info
643        );
644
645        Ok(ToolResult::success(output).with_metadata("okr_id", json!(okr_id.to_string())))
646    }
647}
648
649/// Create a default OKR with standard key results for a task
650fn create_default_okr(okr_id: Uuid, task: &str) -> Okr {
651    let title = if task.len() > 60 {
652        format!("Go: {}…", &task[..57])
653    } else {
654        format!("Go: {task}")
655    };
656
657    let mut okr = Okr::new(title, format!("Autonomous execution: {task}"));
658    okr.id = okr_id;
659
660    okr.add_key_result(KeyResult::new(okr_id, "All stories complete", 100.0, "%"));
661    okr.add_key_result(KeyResult::new(okr_id, "Quality gates pass", 1.0, "count"));
662    okr.add_key_result(KeyResult::new(okr_id, "No critical errors", 0.0, "count"));
663
664    okr
665}
666
667/// Resolve provider and model string from the registry.
668/// Uses the same fallback strategy as the TUI autochat.
669fn resolve_provider(
670    registry: &crate::provider::ProviderRegistry,
671    model: &str,
672) -> Result<(Arc<dyn crate::provider::Provider>, String)> {
673    let (provider_name, model_name) = crate::provider::parse_model_string(model);
674
675    // Try explicit provider/model pair
676    if let Some(provider_name) = provider_name {
677        if let Some(provider) = registry.get(provider_name) {
678            return Ok((provider, model_name.to_string()));
679        }
680    }
681
682    // Fallback provider order (same as TUI)
683    let fallbacks = [
684        "zai",
685        "openai",
686        "github-copilot",
687        "anthropic",
688        "openrouter",
689        "novita",
690        "moonshotai",
691        "google",
692    ];
693
694    for name in fallbacks {
695        if let Some(provider) = registry.get(name) {
696            return Ok((provider, model.to_string()));
697        }
698    }
699
700    // Last resort: first available provider
701    if let Some(name) = registry.list().into_iter().next() {
702        if let Some(provider) = registry.get(name) {
703            return Ok((provider, model.to_string()));
704        }
705    }
706
707    anyhow::bail!("No provider available for model '{model}' and no fallback providers found")
708}