Skip to main content

codetether_agent/tool/
go.rs

1//! Go Tool - Autonomous task execution via OKR → PRD → Ralph pipeline
2//!
3//! Exposes the `/go` command as an MCP-callable tool for programmatic
4//! autonomous work execution. Creates an OKR, generates a PRD from the
5//! task description, runs the Ralph loop, and maps results back to the OKR.
6//!
7//! The `execute` action spawns the pipeline in a background task and returns
8//! immediately so the MCP client can monitor progress via `go_watch`.
9
10use anyhow::{Context, Result};
11use async_trait::async_trait;
12use serde::{Deserialize, Serialize};
13use serde_json::{Value, json};
14use std::collections::HashMap;
15use std::sync::{Arc, Mutex};
16use uuid::Uuid;
17
18use super::{Tool, ToolResult};
19use crate::cli::go_ralph::{execute_go_ralph, format_go_ralph_result};
20use crate::okr::{KeyResult, Okr, OkrRepository, OkrRun};
21use crate::util;
22
23// ─── Active execution tracking ──────────────────────────────────────────
24
25/// Phase of a running go pipeline
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(tag = "phase", rename_all = "snake_case")]
28pub enum GoRunPhase {
29    Starting,
30    Running,
31    Completed {
32        passed: usize,
33        total: usize,
34        all_passed: bool,
35        feature_branch: String,
36        summary: String,
37    },
38    Failed {
39        error: String,
40    },
41}
42
43/// Metadata for an active go run
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct ActiveGoRun {
46    pub okr_id: String,
47    pub task: String,
48    pub model: String,
49    pub started_at: String,
50    pub working_dir: String,
51    pub prd_filename: String,
52    pub progress_filename: String,
53    pub phase: GoRunPhase,
54}
55
56/// Global registry of active (and recently completed) go runs.
57pub static ACTIVE_GO_RUNS: std::sync::LazyLock<Mutex<HashMap<String, ActiveGoRun>>> =
58    std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
59
60#[derive(Deserialize)]
61struct GoParams {
62    action: String,
63    #[serde(default)]
64    task: Option<String>,
65    #[serde(default)]
66    max_iterations: Option<usize>,
67    #[serde(default)]
68    max_concurrent_stories: Option<usize>,
69    #[serde(default)]
70    model: Option<String>,
71    #[serde(default)]
72    okr_id: Option<String>,
73}
74
75pub struct GoTool {
76    /// Optional callback invoked when the background pipeline completes or fails.
77    /// Used by the A2A worker to stream the final OKR result back to the calling LLM.
78    completion_callback: Option<Arc<dyn Fn(String) + Send + Sync + 'static>>,
79}
80
81impl Default for GoTool {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87impl GoTool {
88    pub fn new() -> Self {
89        Self {
90            completion_callback: None,
91        }
92    }
93
94    /// Construct with a completion callback that fires when the background pipeline finishes.
95    pub fn with_callback(cb: Arc<dyn Fn(String) + Send + Sync + 'static>) -> Self {
96        Self {
97            completion_callback: Some(cb),
98        }
99    }
100}
101
102#[async_trait]
103impl Tool for GoTool {
104    fn id(&self) -> &str {
105        "go"
106    }
107
108    fn name(&self) -> &str {
109        "Go"
110    }
111
112    fn description(&self) -> &str {
113        r#"Autonomous task execution pipeline. Creates an OKR, generates a PRD from the task 
114description using an LLM, runs the Ralph autonomous agent loop to implement all user stories, 
115and maps results back to OKR outcomes.
116
117This is the programmatic equivalent of the `/go` TUI command. OKR approval is automatic 
118(no interactive gate) since this is called by MCP clients.
119
120The pipeline runs in the background. Use action "watch" to monitor progress.
121
122Actions:
123- execute: Launch the autonomous pipeline (OKR → PRD → Ralph → results). Returns immediately.
124- watch: Watch a running pipeline's progress by okr_id. Shows PRD status, progress notes, and phase.
125- status: Check final status of an OKR run by okr_id
126
127Required for execute: task
128Optional for execute: max_iterations (default 10), max_concurrent_stories (default 3), model
129Required for watch/status: okr_id"#
130    }
131
132    fn parameters(&self) -> Value {
133        json!({
134            "type": "object",
135            "properties": {
136                "action": {
137                    "type": "string",
138                    "enum": ["execute", "watch", "status"],
139                    "description": "Action to perform. Use 'execute' to start, 'watch' to monitor progress, 'status' for OKR results."
140                },
141                "task": {
142                    "type": "string",
143                    "description": "Task description for autonomous execution"
144                },
145                "max_iterations": {
146                    "type": "integer",
147                    "description": "Maximum Ralph iterations (default: 10)"
148                },
149                "max_concurrent_stories": {
150                    "type": "integer",
151                    "description": "Maximum concurrent stories in Ralph (default: 3)"
152                },
153                "model": {
154                    "type": "string",
155                    "description": "Model to use for PRD generation and Ralph execution"
156                },
157                "okr_id": {
158                    "type": "string",
159                    "description": "OKR ID for watch/status actions"
160                }
161            },
162            "required": ["action"]
163        })
164    }
165
166    async fn execute(&self, params: Value) -> Result<ToolResult> {
167        let p: GoParams = serde_json::from_value(params).context("Invalid params")?;
168
169        match p.action.as_str() {
170            "execute" => self.execute_go(p).await,
171            "watch" => self.watch_go(p).await,
172            "status" => self.check_status(p).await,
173            _ => Ok(ToolResult::structured_error(
174                "INVALID_ACTION",
175                "go",
176                &format!(
177                    "Unknown action: '{}'. Valid actions: execute, watch, status",
178                    p.action
179                ),
180                None,
181                Some(json!({
182                    "action": "execute",
183                    "task": "implement feature X with tests"
184                })),
185            )),
186        }
187    }
188}
189
190impl GoTool {
191    async fn execute_go(&self, p: GoParams) -> Result<ToolResult> {
192        let task = match p.task {
193            Some(t) if !t.trim().is_empty() => t,
194            _ => {
195                return Ok(ToolResult::structured_error(
196                    "MISSING_FIELD",
197                    "go",
198                    "task is required for execute action",
199                    Some(vec!["task"]),
200                    Some(json!({
201                        "action": "execute",
202                        "task": "implement user authentication with OAuth2"
203                    })),
204                ));
205            }
206        };
207
208        let max_iterations = p.max_iterations.unwrap_or(10);
209        let max_concurrent = p.max_concurrent_stories.unwrap_or(3);
210
211        // Load provider registry from Vault
212        let registry = Arc::new(
213            crate::provider::ProviderRegistry::from_vault()
214                .await
215                .context("Failed to load providers from Vault")?,
216        );
217
218        // Resolve model and provider
219        let model = p.model.unwrap_or_else(|| "zai/glm-5".to_string());
220        let (provider, resolved_model) = resolve_provider(&registry, &model)?;
221
222        // Create OKR with default template
223        let okr_id = Uuid::new_v4();
224        let okr_id_str = okr_id.to_string();
225        let mut okr = create_default_okr(okr_id, &task);
226        let mut run = OkrRun::new(
227            okr_id,
228            format!("Go {}", chrono::Local::now().format("%Y-%m-%d %H:%M")),
229        );
230        let _ = run.submit_for_approval();
231        run.record_decision(crate::okr::ApprovalDecision::approve(
232            run.id,
233            "Auto-approved via MCP go tool",
234        ));
235
236        let run_id = run.id;
237        let prd_filename = format!("prd_{}.json", run_id.to_string().replace('-', "_"));
238        let progress_filename = format!("progress_{}.txt", run_id.to_string().replace('-', "_"));
239
240        // Persist OKR before execution
241        if let Ok(repo) = OkrRepository::from_config().await {
242            let _ = repo.create_okr(okr.clone()).await;
243            let _ = repo.create_run(run.clone()).await;
244        }
245
246        let working_dir = std::env::current_dir()
247            .map(|p| p.display().to_string())
248            .unwrap_or_else(|_| ".".into());
249
250        // Register active run
251        let active_run = ActiveGoRun {
252            okr_id: okr_id_str.clone(),
253            task: task.clone(),
254            model: resolved_model.clone(),
255            started_at: chrono::Utc::now().to_rfc3339(),
256            working_dir: working_dir.clone(),
257            prd_filename: prd_filename.clone(),
258            progress_filename: progress_filename.clone(),
259            phase: GoRunPhase::Starting,
260        };
261        if let Ok(mut runs) = ACTIVE_GO_RUNS.lock() {
262            runs.insert(okr_id_str.clone(), active_run);
263        }
264
265        tracing::info!(
266            task = %task,
267            okr_id = %okr_id,
268            model = %resolved_model,
269            max_iterations,
270            "Starting /go autonomous pipeline via MCP (background)"
271        );
272
273        // Spawn the pipeline in a background task
274        let bg_okr_id = okr_id_str.clone();
275        let bg_task = task.clone();
276        let bg_model = resolved_model.clone();
277        let bg_callback = self.completion_callback.clone();
278        tokio::spawn(async move {
279            // Update phase to Running
280            if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
281                && let Some(r) = runs.get_mut(&bg_okr_id)
282            {
283                r.phase = GoRunPhase::Running;
284            }
285
286            // Create bus with S3 sink for training data archival
287            let bus = crate::bus::AgentBus::new().into_arc();
288            crate::bus::s3_sink::spawn_bus_s3_sink(bus.clone());
289
290            match execute_go_ralph(
291                &bg_task,
292                &mut okr,
293                &mut run,
294                provider,
295                &bg_model,
296                max_iterations,
297                Some(bus),
298                max_concurrent,
299                Some(registry),
300            )
301            .await
302            {
303                Ok(result) => {
304                    // Persist final state
305                    if let Ok(repo) = OkrRepository::from_config().await {
306                        let _ = repo.update_run(run).await;
307                    }
308
309                    let summary = format_go_ralph_result(&result, &bg_task);
310
311                    if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
312                        && let Some(r) = runs.get_mut(&bg_okr_id)
313                    {
314                        r.phase = GoRunPhase::Completed {
315                            passed: result.passed,
316                            total: result.total,
317                            all_passed: result.all_passed,
318                            feature_branch: result.feature_branch,
319                            summary,
320                        };
321                    }
322
323                    tracing::info!(
324                        okr_id = %bg_okr_id,
325                        passed = result.passed,
326                        total = result.total,
327                        "Go pipeline completed"
328                    );
329
330                    // Notify the LLM session that the pipeline finished
331                    if let Some(ref cb) = bg_callback {
332                        let phase_str = {
333                            let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
334                            if let Some(r) = runs.get(&bg_okr_id) {
335                                if let GoRunPhase::Completed {
336                                    passed,
337                                    total,
338                                    all_passed,
339                                    ref feature_branch,
340                                    ref summary,
341                                } = r.phase
342                                {
343                                    format!(
344                                        "# Go Pipeline Completed {}\n\n\
345                                         **OKR ID:** `{}`\n\
346                                         **Result:** {}/{} stories passed\n\
347                                         **Branch:** `{}`\n\n{}",
348                                        if all_passed { "✅" } else { "⚠️" },
349                                        bg_okr_id,
350                                        passed,
351                                        total,
352                                        feature_branch,
353                                        summary
354                                    )
355                                } else {
356                                    format!("Go pipeline completed for OKR `{}`", bg_okr_id)
357                                }
358                            } else {
359                                format!("Go pipeline completed for OKR `{}`", bg_okr_id)
360                            }
361                        };
362                        cb(phase_str);
363                    }
364                }
365                Err(err) => {
366                    // Mark run as failed
367                    run.status = crate::okr::OkrRunStatus::Failed;
368                    if let Ok(repo) = OkrRepository::from_config().await {
369                        let _ = repo.update_run(run).await;
370                    }
371
372                    let error_msg = err.to_string();
373                    if let Ok(mut runs) = ACTIVE_GO_RUNS.lock()
374                        && let Some(r) = runs.get_mut(&bg_okr_id)
375                    {
376                        r.phase = GoRunPhase::Failed {
377                            error: error_msg.clone(),
378                        };
379                    }
380
381                    tracing::error!(
382                        okr_id = %bg_okr_id,
383                        error = %error_msg,
384                        "Go pipeline failed"
385                    );
386
387                    // Notify the LLM session that the pipeline failed
388                    if let Some(ref cb) = bg_callback {
389                        cb(format!(
390                            "# Go Pipeline Failed ❌\n\n**OKR ID:** `{}`\n**Error:** {}",
391                            bg_okr_id, error_msg
392                        ));
393                    }
394                }
395            }
396        });
397
398        // Return immediately with launch confirmation + watch instructions
399        let output = format!(
400            "# Go Pipeline Launched\n\n\
401             **OKR ID:** `{okr_id_str}`\n\
402             **Task:** {task}\n\
403             **Model:** {resolved_model}\n\
404             **Max Iterations:** {max_iterations}\n\
405             **Working Directory:** {working_dir}\n\n\
406             The autonomous pipeline is now running in the background.\n\n\
407             ## Monitor Progress\n\n\
408             Use the `go` tool with action `watch` to monitor this pipeline:\n\n\
409             ```json\n\
410             {{\"action\": \"watch\", \"okr_id\": \"{okr_id_str}\"}}\n\
411             ```\n\n\
412             The pipeline will:\n\
413             1. Generate a PRD from the task description\n\
414             2. Run the Ralph loop to implement all user stories\n\
415             3. Run quality checks (typecheck, lint, test, build)\n\
416             4. Map results back to OKR outcomes\n\n\
417             PRD file: `{prd_filename}`\n\
418             Progress file: `{progress_filename}`"
419        );
420
421        Ok(ToolResult::success(output)
422            .with_metadata("okr_id", json!(okr_id_str))
423            .with_metadata("phase", json!("starting"))
424            .with_metadata("prd_filename", json!(prd_filename))
425            .with_metadata("progress_filename", json!(progress_filename))
426            .with_metadata(
427                "watch_hint",
428                json!({
429                    "tool": "go",
430                    "args": {"action": "watch", "okr_id": okr_id_str}
431                }),
432            ))
433    }
434
435    async fn watch_go(&self, p: GoParams) -> Result<ToolResult> {
436        let okr_id_str = match p.okr_id {
437            Some(id) if !id.trim().is_empty() => id,
438            _ => {
439                // If no OKR ID given, list all active runs
440                let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
441                if runs.is_empty() {
442                    return Ok(ToolResult::success(
443                        "No active go pipelines. Use `go(action: \"execute\", task: \"...\")` to start one.",
444                    ));
445                }
446
447                let mut output = String::from("# Active Go Pipelines\n\n");
448                for (id, run) in runs.iter() {
449                    let phase_str = match &run.phase {
450                        GoRunPhase::Starting => "Starting".to_string(),
451                        GoRunPhase::Running => "Running".to_string(),
452                        GoRunPhase::Completed { passed, total, .. } => {
453                            format!("Completed ({passed}/{total} passed)")
454                        }
455                        GoRunPhase::Failed { error } => {
456                            format!("Failed: {}", crate::util::truncate_bytes_safe(&error, 80))
457                        }
458                    };
459                    output.push_str(&format!(
460                        "- **{id}**: {phase_str}\n  Task: {}\n  Started: {}\n\n",
461                        run.task, run.started_at
462                    ));
463                }
464                output.push_str("Use `go(action: \"watch\", okr_id: \"<id>\")` for details.");
465                return Ok(ToolResult::success(output));
466            }
467        };
468
469        // Look up the active run
470        let active_run = {
471            let runs = ACTIVE_GO_RUNS.lock().unwrap_or_else(|e| e.into_inner());
472            runs.get(&okr_id_str).cloned()
473        };
474
475        let Some(run) = active_run else {
476            return Ok(ToolResult::error(format!(
477                "No active go pipeline found for OKR `{okr_id_str}`.\n\n\
478                 Use `go(action: \"watch\")` with no okr_id to list active pipelines,\n\
479                 or `go(action: \"status\", okr_id: \"...\")` to check completed runs."
480            )));
481        };
482
483        let mut output = format!(
484            "# Go Pipeline Status\n\n\
485             **OKR ID:** `{}`\n\
486             **Task:** {}\n\
487             **Model:** {}\n\
488             **Started:** {}\n\
489             **Working Directory:** {}\n",
490            run.okr_id, run.task, run.model, run.started_at, run.working_dir
491        );
492
493        // Phase
494        match &run.phase {
495            GoRunPhase::Starting => {
496                output.push_str("\n**Phase:** Starting (generating PRD...)\n");
497            }
498            GoRunPhase::Running => {
499                output.push_str("\n**Phase:** Running Ralph loop\n");
500            }
501            GoRunPhase::Completed {
502                passed,
503                total,
504                all_passed,
505                feature_branch,
506                summary,
507            } => {
508                output.push_str(&format!(
509                    "\n**Phase:** Completed {}\n\
510                     **Result:** {passed}/{total} stories passed\n\
511                     **Feature Branch:** `{feature_branch}`\n\n\
512                     ## Summary\n\n{summary}\n",
513                    if *all_passed { "✅" } else { "⚠️" }
514                ));
515            }
516            GoRunPhase::Failed { error } => {
517                output.push_str(&format!("\n**Phase:** Failed ❌\n**Error:** {error}\n"));
518            }
519        }
520
521        // Read PRD file for story-level progress
522        if let Ok(prd_content) = std::fs::read_to_string(&run.prd_filename)
523            && let Ok(prd) = serde_json::from_str::<Value>(&prd_content)
524            && let Some(stories) = prd.get("user_stories").and_then(|s| s.as_array())
525        {
526            output.push_str("\n## Stories\n\n");
527            let mut passed_count = 0;
528            for story in stories {
529                let id = story.get("id").and_then(|v| v.as_str()).unwrap_or("?");
530                let title = story.get("title").and_then(|v| v.as_str()).unwrap_or("?");
531                let passes = story
532                    .get("passes")
533                    .and_then(|v| v.as_bool())
534                    .unwrap_or(false);
535                let icon = if passes {
536                    passed_count += 1;
537                    "✅"
538                } else {
539                    "⏳"
540                };
541                output.push_str(&format!("- {icon} **{id}**: {title}\n"));
542            }
543            output.push_str(&format!(
544                "\n**Progress:** {passed_count}/{} stories passed\n",
545                stories.len()
546            ));
547        }
548
549        // Read progress file
550        if let Ok(progress) = std::fs::read_to_string(&run.progress_filename)
551            && !progress.trim().is_empty()
552        {
553            // Show last 30 lines to avoid overwhelming output
554            let lines: Vec<&str> = progress.lines().collect();
555            let start = lines.len().saturating_sub(30);
556            let tail: String = lines[start..].join("\n");
557            output.push_str(&format!(
558                "\n## Progress Notes (last {} lines)\n\n```\n{tail}\n```\n",
559                lines.len().min(30)
560            ));
561        }
562
563        // Hint for next action
564        if matches!(run.phase, GoRunPhase::Starting | GoRunPhase::Running) {
565            output.push_str(&format!(
566                "\n---\n*Pipeline still running. Call `go(action: \"watch\", okr_id: \"{}\")` again to check progress.*\n",
567                run.okr_id
568            ));
569        }
570
571        Ok(ToolResult::success(output)
572            .with_metadata("okr_id", json!(run.okr_id))
573            .with_metadata(
574                "phase",
575                json!(serde_json::to_value(&run.phase).unwrap_or(json!("unknown"))),
576            ))
577    }
578
579    async fn check_status(&self, p: GoParams) -> Result<ToolResult> {
580        let okr_id_str = match p.okr_id {
581            Some(id) if !id.trim().is_empty() => id,
582            _ => {
583                return Ok(ToolResult::structured_error(
584                    "MISSING_FIELD",
585                    "go",
586                    "okr_id is required for status action",
587                    Some(vec!["okr_id"]),
588                    Some(json!({
589                        "action": "status",
590                        "okr_id": "uuid-of-okr"
591                    })),
592                ));
593            }
594        };
595
596        let okr_id: Uuid = okr_id_str
597            .parse()
598            .context("Invalid UUID format for okr_id")?;
599
600        let repo = OkrRepository::from_config()
601            .await
602            .context("Failed to load OKR repository")?;
603
604        let okr = match repo.get_okr(okr_id).await? {
605            Some(okr) => okr,
606            None => {
607                return Ok(ToolResult::error(format!("OKR not found: {okr_id}")));
608            }
609        };
610
611        let runs = repo.list_runs().await.unwrap_or_default();
612        let runs: Vec<_> = runs.into_iter().filter(|r| r.okr_id == okr_id).collect();
613        let latest_run = runs.last();
614
615        let kr_status: Vec<String> = okr
616            .key_results
617            .iter()
618            .map(|kr| {
619                format!(
620                    "  - {}: {:.0}/{:.0} {} ({:.0}%)",
621                    kr.title,
622                    kr.current_value,
623                    kr.target_value,
624                    kr.unit,
625                    kr.progress() * 100.0
626                )
627            })
628            .collect();
629
630        let run_info = if let Some(run) = latest_run {
631            format!(
632                "\nLatest Run: {} ({:?})\n  Iterations: {}\n  Outcomes: {}",
633                run.name,
634                run.status,
635                run.iterations,
636                run.outcomes.len()
637            )
638        } else {
639            "\nNo runs found.".to_string()
640        };
641
642        let output = format!(
643            "# Go Status\n\n**OKR:** {}\n**Status:** {:?}\n**Progress:** {:.0}%\n\n## Key Results\n{}\n{}",
644            okr.title,
645            okr.status,
646            okr.progress() * 100.0,
647            kr_status.join("\n"),
648            run_info
649        );
650
651        Ok(ToolResult::success(output).with_metadata("okr_id", json!(okr_id.to_string())))
652    }
653}
654
655/// Create a default OKR with standard key results for a task
656fn create_default_okr(okr_id: Uuid, task: &str) -> Okr {
657    let title = if task.len() > 60 {
658        format!("Go: {}…", &task[..57])
659    } else {
660        format!("Go: {task}")
661    };
662
663    let mut okr = Okr::new(title, format!("Autonomous execution: {task}"));
664    okr.id = okr_id;
665
666    okr.add_key_result(KeyResult::new(okr_id, "All stories complete", 100.0, "%"));
667    okr.add_key_result(KeyResult::new(okr_id, "Quality gates pass", 1.0, "count"));
668    okr.add_key_result(KeyResult::new(okr_id, "No critical errors", 0.0, "count"));
669
670    okr
671}
672
673/// Resolve provider and model string from the registry.
674/// Uses the same fallback strategy as the TUI autochat.
675fn resolve_provider(
676    registry: &crate::provider::ProviderRegistry,
677    model: &str,
678) -> Result<(Arc<dyn crate::provider::Provider>, String)> {
679    let (provider_name, model_name) = crate::provider::parse_model_string(model);
680
681    // Try explicit provider/model pair
682    if let Some(provider_name) = provider_name {
683        let normalized_provider = if provider_name == "zhipuai" {
684            "zai"
685        } else {
686            provider_name
687        };
688        if let Some(provider) = registry.get(normalized_provider) {
689            return Ok((provider, model_name.to_string()));
690        }
691        let available = registry.list().join(", ");
692        anyhow::bail!(
693            "Provider '{}' selected explicitly but is unavailable. Available providers: {}",
694            normalized_provider,
695            available
696        );
697    }
698
699    // Fallback provider order (same as TUI)
700    let fallbacks = [
701        "zai",
702        "openai",
703        "github-copilot",
704        "anthropic",
705        "openrouter",
706        "novita",
707        "moonshotai",
708        "google",
709    ];
710
711    for name in fallbacks {
712        if let Some(provider) = registry.get(name) {
713            return Ok((provider, model.to_string()));
714        }
715    }
716
717    // Last resort: first available provider
718    if let Some(name) = registry.list().into_iter().next()
719        && let Some(provider) = registry.get(name)
720    {
721        return Ok((provider, model.to_string()));
722    }
723
724    anyhow::bail!("No provider available for model '{model}' and no fallback providers found")
725}