Skip to main content

codetether_agent/tool/
go.rs

1//! Go Tool - Autonomous task execution via OKR → PRD → Ralph pipeline
2//!
3//! Exposes the `/go` command as an MCP-callable tool for programmatic
4//! autonomous work execution. Creates an OKR, generates a PRD from the
5//! task description, runs the Ralph loop, and maps results back to the OKR.
6//!
7//! The `execute` action spawns the pipeline in a background task and returns
8//! immediately so the MCP client can monitor progress via `go_watch`.
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use uuid::Uuid;
17
18use super::{Tool, ToolResult};
19use crate::cli::go_ralph::{execute_go_ralph, format_go_ralph_result};
20use crate::okr::{KeyResult, Okr, OkrRepository, OkrRun};
21
22// ─── Active execution tracking ──────────────────────────────────────────
23
24/// Phase of a running go pipeline
25#[derive(Debug, Clone, Serialize, Deserialize)]
26#[serde(tag = "phase", rename_all = "snake_case")]
27pub enum GoRunPhase {
28    Starting,
29    Running,
30    Completed {
31        passed: usize,
32        total: usize,
33        all_passed: bool,
34        feature_branch: String,
35        summary: String,
36    },
37    Failed {
38        error: String,
39    },
40}
41
42/// Metadata for an active go run
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct ActiveGoRun {
45    pub okr_id: String,
46    pub task: String,
47    pub model: String,
48    pub started_at: String,
49    pub working_dir: String,
50    pub prd_filename: String,
51    pub progress_filename: String,
52    pub phase: GoRunPhase,
53}
54
55/// Global registry of active (and recently completed) go runs.
56pub static ACTIVE_GO_RUNS: std::sync::LazyLock<Mutex<HashMap<String, ActiveGoRun>>> =
57    std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
58
59#[derive(Deserialize)]
60struct GoParams {
61    action: String,
62    #[serde(default)]
63    task: Option<String>,
64    #[serde(default)]
65    max_iterations: Option<usize>,
66    #[serde(default)]
67    max_concurrent_stories: Option<usize>,
68    #[serde(default)]
69    model: Option<String>,
70    #[serde(default)]
71    okr_id: Option<String>,
72}
73
74pub struct GoTool {
75    /// Optional callback invoked when the background pipeline completes or fails.
76    /// Used by the A2A worker to stream the final OKR result back to the calling LLM.
77    completion_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
78}
79
80impl Default for GoTool {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl GoTool {
87    pub fn new() -> Self {
88        Self {
89            completion_callback: None,
90        }
91    }
92
93    /// Construct with a completion callback that fires when the background pipeline finishes.
94    pub fn with_callback(cb: Arc<dyn Fn(String) + Send + Sync + 'static>) -> Self {
95        Self {
96            completion_callback: Some(cb),
97        }
98    }
99}
100
101#[async_trait]
102impl Tool for GoTool {
103    fn id(&self) -> &str {
104        "go"
105    }
106
107    fn name(&self) -> &str {
108        "Go"
109    }
110
111    fn description(&self) -> &str {
112        r#"Autonomous task execution pipeline. Creates an OKR, generates a PRD from the task 
113description using an LLM, runs the Ralph autonomous agent loop to implement all user stories, 
114and maps results back to OKR outcomes.
115
116This is the programmatic equivalent of the `/go` TUI command. OKR approval is automatic 
117(no interactive gate) since this is called by MCP clients.
118
119The pipeline runs in the background. Use action "watch" to monitor progress.
120
121Actions:
122- execute: Launch the autonomous pipeline (OKR → PRD → Ralph → results). Returns immediately.
123- watch: Watch a running pipeline's progress by okr_id. Shows PRD status, progress notes, and phase.
124- status: Check final status of an OKR run by okr_id
125
126Required for execute: task
127Optional for execute: max_iterations (default 10), max_concurrent_stories (default 3), model
128Required for watch/status: okr_id"#
129    }
130
131    fn parameters(&self) -> Value {
132        json!({
133            "type": "object",
134            "properties": {
135                "action": {
136                    "type": "string",
137                    "enum": ["execute", "watch", "status"],
138                    "description": "Action to perform. Use 'execute' to start, 'watch' to monitor progress, 'status' for OKR results."
139                },
140                "task": {
141                    "type": "string",
142                    "description": "Task description for autonomous execution"
143                },
144                "max_iterations": {
145                    "type": "integer",
146                    "description": "Maximum Ralph iterations (default: 10)"
147                },
148                "max_concurrent_stories": {
149                    "type": "integer",
150                    "description": "Maximum concurrent stories in Ralph (default: 3)"
151                },
152                "model": {
153                    "type": "string",
154                    "description": "Model to use for PRD generation and Ralph execution"
155                },
156                "okr_id": {
157                    "type": "string",
158                    "description": "OKR ID for watch/status actions"
159                }
160            },
161            "required": ["action"]
162        })
163    }
164
165    async fn execute(&self, params: Value) -> Result<ToolResult> {
166        let p: GoParams = serde_json::from_value(params).context("Invalid params")?;
167
168        match p.action.as_str() {
169            "execute" => self.execute_go(p).await,
170            "watch" => self.watch_go(p).await,
171            "status" => self.check_status(p).await,
172            _ => Ok(ToolResult::structured_error(
173                "INVALID_ACTION",
174                "go",
175                &format!(
176                    "Unknown action: '{}'. Valid actions: execute, watch, status",
177                    p.action
178                ),
179                None,
180                Some(json!({
181                    "action": "execute",
182                    "task": "implement feature X with tests"
183                })),
184            )),
185        }
186    }
187}
188
189impl GoTool {
190    async fn execute_go(&self, p: GoParams) -> Result<ToolResult> {
191        let task = match p.task {
192            Some(t) if !t.trim().is_empty() => t,
193            _ => {
194                return Ok(ToolResult::structured_error(
195                    "MISSING_FIELD",
196                    "go",
197                    "task is required for execute action",
198                    Some(vec!["task"]),
199                    Some(json!({
200                        "action": "execute",
201                        "task": "implement user authentication with OAuth2"
202                    })),
203                ));
204            }
205        };
206
207        let max_iterations = p.max_iterations.unwrap_or(10);
208        let max_concurrent = p.max_concurrent_stories.unwrap_or(3);
209
210        // Load provider registry from Vault
211        let registry = Arc::new(
212            crate::provider::ProviderRegistry::from_vault()
213                .await
214                .context("Failed to load providers from Vault")?,
215        );
216
217        // Resolve model and provider
218        let model = p.model.unwrap_or_else(|| "zai/glm-5".to_string());
219        let (provider, resolved_model) = resolve_provider(&registry, &model)?;
220
221        // Create OKR with default template
222        let okr_id = Uuid::new_v4();
223        let okr_id_str = okr_id.to_string();
224        let mut okr = create_default_okr(okr_id, &task);
225        let mut run = OkrRun::new(
226            okr_id,
227            format!("Go {}", chrono::Local::now().format("%Y-%m-%d %H:%M")),
228        );
229        let _ = run.submit_for_approval();
230        run.record_decision(crate::okr::ApprovalDecision::approve(
231            run.id,
232            "Auto-approved via MCP go tool",
233        ));
234
235        let run_id = run.id;
236        let prd_filename = format!("prd_{}.json", run_id.to_string().replace('-', "_"));
237        let progress_filename = format!("progress_{}.txt", run_id.to_string().replace('-', "_"));
238
239        // Persist OKR before execution
240        if let Ok(repo) = OkrRepository::from_config().await {
241            let _ = repo.create_okr(okr.clone()).await;
242            let _ = repo.create_run(run.clone()).await;
243        }
244
245        let working_dir = std::env::current_dir()
246            .map(|p| p.display().to_string())
247            .unwrap_or_else(|_| ".".into());
248
249        // Register active run
250        let active_run = ActiveGoRun {
251            okr_id: okr_id_str.clone(),
252            task: task.clone(),
253            model: resolved_model.clone(),
254            started_at: chrono::Utc::now().to_rfc3339(),
255            working_dir: working_dir.clone(),
256            prd_filename: prd_filename.clone(),
257            progress_filename: progress_filename.clone(),
258            phase: GoRunPhase::Starting,
259        };
260        if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
261            runs.insert(okr_id_str.clone(), active_run);
262        }
263
264        tracing::info!(
265            task = %task,
266            okr_id = %okr_id,
267            model = %resolved_model,
268            max_iterations,
269            "Starting /go autonomous pipeline via MCP (background)"
270        );
271
272        // Spawn the pipeline in a background task
273        let bg_okr_id = okr_id_str.clone();
274        let bg_task = task.clone();
275        let bg_model = resolved_model.clone();
276        let bg_callback = self.completion_callback.clone();
277        tokio::spawn(async move {
278            // Update phase to Running
279            if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
280                && let Some(r) = runs.get_mut(&bg_okr_id)
281            {
282                r.phase = GoRunPhase::Running;
283            }
284
285            // Create bus with S3 sink for training data archival
286            let bus = crate::bus::AgentBus::new().into_arc();
287            crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
288
289            match execute_go_ralph(
290                &bg_task,
291                &mut okr,
292                &mut run,
293                provider,
294                &bg_model,
295                max_iterations,
296                Some(bus),
297                max_concurrent,
298                Some(registry),
299            )
300            .await
301            {
302                Ok(result) => {
303                    // Persist final state
304                    if let Ok(repo) = OkrRepository::from_config().await {
305                        let _ = repo.update_run(run).await;
306                    }
307
308                    let summary = format_go_ralph_result(&result, &bg_task);
309
310                    if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
311                        && let Some(r) = runs.get_mut(&bg_okr_id)
312                    {
313                        r.phase = GoRunPhase::Completed {
314                            passed: result.passed,
315                            total: result.total,
316                            all_passed: result.all_passed,
317                            feature_branch: result.feature_branch,
318                            summary,
319                        };
320                    }
321
322                    tracing::info!(
323                        okr_id = %bg_okr_id,
324                        passed = result.passed,
325                        total = result.total,
326                        "Go pipeline completed"
327                    );
328
329                    // Notify the LLM session that the pipeline finished
330                    if let Some(ref cb) = bg_callback {
331                        let phase_str = {
332                            let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
333                            if let Some(r) = runs.get(&bg_okr_id) {
334                                if let GoRunPhase::Completed {
335                                    passed,
336                                    total,
337                                    all_passed,
338                                    ref feature_branch,
339                                    ref summary,
340                                } = r.phase
341                                {
342                                    format!(
343                                        "# Go Pipeline Completed {}\n\n\
344                                         **OKR ID:** `{}`\n\
345                                         **Result:** {}/{} stories passed\n\
346                                         **Branch:** `{}`\n\n{}",
347                                        if all_passed { "✅" } else { "⚠️" },
348                                        bg_okr_id,
349                                        passed,
350                                        total,
351                                        feature_branch,
352                                        summary
353                                    )
354                                } else {
355                                    format!("Go pipeline completed for OKR `{}`", bg_okr_id)
356                                }
357                            } else {
358                                format!("Go pipeline completed for OKR `{}`", bg_okr_id)
359                            }
360                        };
361                        cb(phase_str);
362                    }
363                }
364                Err(err) => {
365                    // Mark run as failed
366                    run.status = crate::okr::OkrRunStatus::Failed;
367                    if let Ok(repo) = OkrRepository::from_config().await {
368                        let _ = repo.update_run(run).await;
369                    }
370
371                    let error_msg = err.to_string();
372                    if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
373                        && let Some(r) = runs.get_mut(&bg_okr_id)
374                    {
375                        r.phase = GoRunPhase::Failed {
376                            error: error_msg.clone(),
377                        };
378                    }
379
380                    tracing::error!(
381                        okr_id = %bg_okr_id,
382                        error = %error_msg,
383                        "Go pipeline failed"
384                    );
385
386                    // Notify the LLM session that the pipeline failed
387                    if let Some(ref cb) = bg_callback {
388                        cb(format!(
389                            "# Go Pipeline Failed ❌\n\n**OKR ID:** `{}`\n**Error:** {}",
390                            bg_okr_id, error_msg
391                        ));
392                    }
393                }
394            }
395        });
396
397        // Return immediately with launch confirmation + watch instructions
398        let output = format!(
399            "# Go Pipeline Launched\n\n\
400             **OKR ID:** `{okr_id_str}`\n\
401             **Task:** {task}\n\
402             **Model:** {resolved_model}\n\
403             **Max Iterations:** {max_iterations}\n\
404             **Working Directory:** {working_dir}\n\n\
405             The autonomous pipeline is now running in the background.\n\n\
406             ## Monitor Progress\n\n\
407             Use the `go` tool with action `watch` to monitor this pipeline:\n\n\
408             ```json\n\
409             {{\"action\": \"watch\", \"okr_id\": \"{okr_id_str}\"}}\n\
410             ```\n\n\
411             The pipeline will:\n\
412             1. Generate a PRD from the task description\n\
413             2. Run the Ralph loop to implement all user stories\n\
414             3. Run quality checks (typecheck, lint, test, build)\n\
415             4. Map results back to OKR outcomes\n\n\
416             PRD file: `{prd_filename}`\n\
417             Progress file: `{progress_filename}`"
418        );
419
420        Ok(ToolResult::success(output)
421            .with_metadata("okr_id", json!(okr_id_str))
422            .with_metadata("phase", json!("starting"))
423            .with_metadata("prd_filename", json!(prd_filename))
424            .with_metadata("progress_filename", json!(progress_filename))
425            .with_metadata(
426                "watch_hint",
427                json!({
428                    "tool": "go",
429                    "args": {"action": "watch", "okr_id": okr_id_str}
430                }),
431            ))
432    }
433
434    async fn watch_go(&self, p: GoParams) -> Result<ToolResult> {
435        let okr_id_str = match p.okr_id {
436            Some(id) if !id.trim().is_empty() => id,
437            _ => {
438                // If no OKR ID given, list all active runs
439                let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
440                if runs.is_empty() {
441                    return Ok(ToolResult::success(
442                        "No active go pipelines. Use `go(action: \"execute\", task: \"...\")` to start one.",
443                    ));
444                }
445
446                let mut output = String::from("# Active Go Pipelines\n\n");
447                for (id, run) in runs.iter() {
448                    let phase_str = match &run.phase {
449                        GoRunPhase::Starting => "Starting".to_string(),
450                        GoRunPhase::Running => "Running".to_string(),
451                        GoRunPhase::Completed { passed, total, .. } => {
452                            format!("Completed ({passed}/{total} passed)")
453                        }
454                        GoRunPhase::Failed { error } => {
455                            format!("Failed: {}", crate::util::truncate_bytes_safe(&error, 80))
456                        }
457                    };
458                    output.push_str(&format!(
459                        "- **{id}**: {phase_str}\n  Task: {}\n  Started: {}\n\n",
460                        run.task, run.started_at
461                    ));
462                }
463                output.push_str("Use `go(action: \"watch\", okr_id: \"<id>\")` for details.");
464                return Ok(ToolResult::success(output));
465            }
466        };
467
468        // Look up the active run
469        let active_run = {
470            let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
471            runs.get(&okr_id_str).cloned()
472        };
473
474        let Some(run) = active_run else {
475            return Ok(ToolResult::error(format!(
476                "No active go pipeline found for OKR `{okr_id_str}`.\n\n\
477                 Use `go(action: \"watch\")` with no okr_id to list active pipelines,\n\
478                 or `go(action: \"status\", okr_id: \"...\")` to check completed runs."
479            )));
480        };
481
482        let mut output = format!(
483            "# Go Pipeline Status\n\n\
484             **OKR ID:** `{}`\n\
485             **Task:** {}\n\
486             **Model:** {}\n\
487             **Started:** {}\n\
488             **Working Directory:** {}\n",
489            run.okr_id, run.task, run.model, run.started_at, run.working_dir
490        );
491
492        // Phase
493        match &run.phase {
494            GoRunPhase::Starting => {
495                output.push_str("\n**Phase:** Starting (generating PRD...)\n");
496            }
497            GoRunPhase::Running => {
498                output.push_str("\n**Phase:** Running Ralph loop\n");
499            }
500            GoRunPhase::Completed {
501                passed,
502                total,
503                all_passed,
504                feature_branch,
505                summary,
506            } => {
507                output.push_str(&format!(
508                    "\n**Phase:** Completed {}\n\
509                     **Result:** {passed}/{total} stories passed\n\
510                     **Feature Branch:** `{feature_branch}`\n\n\
511                     ## Summary\n\n{summary}\n",
512                    if *all_passed { "✅" } else { "⚠️" }
513                ));
514            }
515            GoRunPhase::Failed { error } => {
516                output.push_str(&format!("\n**Phase:** Failed ❌\n**Error:** {error}\n"));
517            }
518        }
519
520        // Read PRD file for story-level progress
521        if let Ok(prd_content) = std::fs::read_to_string(&run.prd_filename)
522            && let Ok(prd) = serde_json::from_str::<Value>(&prd_content)
523            && let Some(stories) = prd.get("user_stories").and_then(|s| s.as_array())
524        {
525            output.push_str("\n## Stories\n\n");
526            let mut passed_count = 0;
527            for story in stories {
528                let id = story.get("id").and_then(|v| v.as_str()).unwrap_or("?");
529                let title = story.get("title").and_then(|v| v.as_str()).unwrap_or("?");
530                let passes = story
531                    .get("passes")
532                    .and_then(|v| v.as_bool())
533                    .unwrap_or(false);
534                let icon = if passes {
535                    passed_count += 1;
536                    "✅"
537                } else {
538                    "⏳"
539                };
540                output.push_str(&format!("- {icon} **{id}**: {title}\n"));
541            }
542            output.push_str(&format!(
543                "\n**Progress:** {passed_count}/{} stories passed\n",
544                stories.len()
545            ));
546        }
547
548        // Read progress file
549        if let Ok(progress) = std::fs::read_to_string(&run.progress_filename)
550            && !progress.trim().is_empty()
551        {
552            // Show last 30 lines to avoid overwhelming output
553            let lines: Vec<&str> = progress.lines().collect();
554            let start = lines.len().saturating_sub(30);
555            let tail: String = lines[start..].join("\n");
556            output.push_str(&format!(
557                "\n## Progress Notes (last {} lines)\n\n```\n{tail}\n```\n",
558                lines.len().min(30)
559            ));
560        }
561
562        // Hint for next action
563        if matches!(run.phase, GoRunPhase::Starting | GoRunPhase::Running) {
564            output.push_str(&format!(
565                "\n---\n*Pipeline still running. Call `go(action: \"watch\", okr_id: \"{}\")` again to check progress.*\n",
566                run.okr_id
567            ));
568        }
569
570        Ok(ToolResult::success(output)
571            .with_metadata("okr_id", json!(run.okr_id))
572            .with_metadata(
573                "phase",
574                json!(serde_json::to_value(&run.phase).unwrap_or(json!("unknown"))),
575            ))
576    }
577
578    async fn check_status(&self, p: GoParams) -> Result<ToolResult> {
579        let okr_id_str = match p.okr_id {
580            Some(id) if !id.trim().is_empty() => id,
581            _ => {
582                return Ok(ToolResult::structured_error(
583                    "MISSING_FIELD",
584                    "go",
585                    "okr_id is required for status action",
586                    Some(vec!["okr_id"]),
587                    Some(json!({
588                        "action": "status",
589                        "okr_id": "uuid-of-okr"
590                    })),
591                ));
592            }
593        };
594
595        let okr_id: Uuid = okr_id_str
596            .parse()
597            .context("Invalid UUID format for okr_id")?;
598
599        let repo = OkrRepository::from_config()
600            .await
601            .context("Failed to load OKR repository")?;
602
603        let okr = match repo.get_okr(okr_id).await? {
604            Some(okr) => okr,
605            None => {
606                return Ok(ToolResult::error(format!("OKR not found: {okr_id}")));
607            }
608        };
609
610        let runs = repo.list_runs().await.unwrap_or_default();
611        let runs: Vec<_> = runs.into_iter().filter(|r| r.okr_id == okr_id).collect();
612        let latest_run = runs.last();
613
614        let kr_status: Vec<String> = okr
615            .key_results
616            .iter()
617            .map(|kr| {
618                format!(
619                    "  - {}: {:.0}/{:.0} {} ({:.0}%)",
620                    kr.title,
621                    kr.current_value,
622                    kr.target_value,
623                    kr.unit,
624                    kr.progress() * 100.0
625                )
626            })
627            .collect();
628
629        let run_info = if let Some(run) = latest_run {
630            format!(
631                "\nLatest Run: {} ({:?})\n  Iterations: {}\n  Outcomes: {}",
632                run.name,
633                run.status,
634                run.iterations,
635                run.outcomes.len()
636            )
637        } else {
638            "\nNo runs found.".to_string()
639        };
640
641        let output = format!(
642            "# Go Status\n\n**OKR:** {}\n**Status:** {:?}\n**Progress:** {:.0}%\n\n## Key Results\n{}\n{}",
643            okr.title,
644            okr.status,
645            okr.progress() * 100.0,
646            kr_status.join("\n"),
647            run_info
648        );
649
650        Ok(ToolResult::success(output).with_metadata("okr_id", json!(okr_id.to_string())))
651    }
652}
653
654/// Create a default OKR with standard key results for a task
655fn create_default_okr(okr_id: Uuid, task: &str) -> Okr {
656    let title = if task.len() > 60 {
657        format!("Go: {}…", &task[..57])
658    } else {
659        format!("Go: {task}")
660    };
661
662    let mut okr = Okr::new(title, format!("Autonomous execution: {task}"));
663    okr.id = okr_id;
664
665    okr.add_key_result(KeyResult::new(okr_id, "All stories complete", 100.0, "%"));
666    okr.add_key_result(KeyResult::new(okr_id, "Quality gates pass", 1.0, "count"));
667    okr.add_key_result(KeyResult::new(okr_id, "No critical errors", 0.0, "count"));
668
669    okr
670}
671
672/// Resolve provider and model string from the registry.
673/// Uses the same fallback strategy as the TUI autochat.
674fn resolve_provider(
675    registry: &crate::provider::ProviderRegistry,
676    model: &str,
677) -> Result<(Arc<dyn crate::provider::Provider>, String)> {
678    let (provider_name, model_name) = crate::provider::parse_model_string(model);
679
680    // Try explicit provider/model pair
681    if let Some(provider_name) = provider_name {
682        let normalized_provider = if provider_name == "zhipuai" {
683            "zai"
684        } else {
685            provider_name
686        };
687        if let Some(provider) = registry.get(normalized_provider) {
688            return Ok((provider, model_name.to_string()));
689        }
690        let available = registry.list().join(", ");
691        anyhow::bail!(
692            "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
693            normalized_provider,
694            available
695        );
696    }
697
698    // Fallback provider order (same as TUI)
699    let fallbacks = [
700        "zai",
701        "openai",
702        "github-copilot",
703        "anthropic",
704        "openrouter",
705        "novita",
706        "moonshotai",
707        "google",
708    ];
709
710    for name in fallbacks {
711        if let Some(provider) = registry.get(name) {
712            return Ok((provider, model.to_string()));
713        }
714    }
715
716    // Last resort: first available provider
717    if let Some(name) = registry.list().into_iter().next()
718        && let Some(provider) = registry.get(name)
719    {
720        return Ok((provider, model.to_string()));
721    }
722
723    anyhow::bail!("No provider available for model '{model}' and no fallback providers found")
724}