Skip to main content

codetether_agent/swarm/
remote_subtask.rs

1//! Remote subtask runner for Kubernetes-backed swarm execution.
2
3use super::executor::{AgentLoopExit, run_agent_loop};
4use super::kubernetes_executor::{
5    RemoteBranchProbe, SWARM_SUBTASK_PROBE_PREFIX, SWARM_SUBTASK_RESULT_PREFIX, decode_payload,
6};
7use super::subtask::SubTaskResult;
8use crate::cli::SwarmSubagentArgs;
9use crate::provider::ProviderRegistry;
10use crate::tool::ToolRegistry;
11use anyhow::{Context, Result, anyhow};
12use std::path::PathBuf;
13use std::process::Command;
14use std::sync::{
15    Arc,
16    atomic::{AtomicBool, Ordering},
17};
18use std::thread;
19use std::time::{Duration, Instant};
20
21pub async fn run_swarm_subagent(args: SwarmSubagentArgs) -> Result<()> {
22    let payload_b64 = if let Some(payload) = args.payload_base64 {
23        payload
24    } else {
25        std::env::var(&args.payload_env)
26            .with_context(|| format!("Missing swarm payload env var {}", args.payload_env))?
27    };
28    let payload = decode_payload(&payload_b64)?;
29
30    let working_dir = payload
31        .working_dir
32        .as_ref()
33        .map(PathBuf::from)
34        .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
35    std::env::set_current_dir(&working_dir)
36        .with_context(|| format!("Failed to set working dir to {}", working_dir.display()))?;
37
38    let registry = ProviderRegistry::from_vault().await?;
39    let provider = registry
40        .get(&payload.provider)
41        .ok_or_else(|| anyhow!("Provider '{}' unavailable in remote pod", payload.provider))?;
42
43    let tool_registry =
44        ToolRegistry::with_provider_arc(Arc::clone(&provider), payload.model.clone());
45    // Remote pod is autonomous; interactive question tool is intentionally excluded.
46    let tool_defs = tool_registry
47        .definitions()
48        .into_iter()
49        .filter(|t| t.name != "question")
50        .collect();
51
52    let specialty = if payload.specialty.is_empty() {
53        "generalist".to_string()
54    } else {
55        payload.specialty.clone()
56    };
57    let system_prompt = format!(
58        "You are a {specialty} specialist sub-agent (ID: {}). \
59Use tools to execute the task and summarize concrete outputs.",
60        payload.subtask_id
61    );
62    let user_prompt = if payload.context.trim().is_empty() {
63        payload.instruction.clone()
64    } else {
65        format!(
66            "{}\n\nContext from dependencies:\n{}",
67            payload.instruction, payload.context
68        )
69    };
70
71    let done = Arc::new(AtomicBool::new(false));
72    let done_for_probe = Arc::clone(&done);
73    let subtask_id_for_probe = payload.subtask_id.clone();
74    let probe_interval = payload.probe_interval_secs.max(3);
75    let probe_thread = thread::spawn(move || {
76        emit_probe(&subtask_id_for_probe);
77        while !done_for_probe.load(Ordering::Relaxed) {
78            thread::sleep(Duration::from_secs(probe_interval));
79            if done_for_probe.load(Ordering::Relaxed) {
80                break;
81            }
82            emit_probe(&subtask_id_for_probe);
83        }
84    });
85
86    let started = Instant::now();
87    let run_result = run_agent_loop(
88        provider,
89        &payload.model,
90        &system_prompt,
91        &user_prompt,
92        tool_defs,
93        tool_registry,
94        payload.max_steps,
95        payload.timeout_secs,
96        None,
97        payload.subtask_id.clone(),
98        None,
99        Some(working_dir.clone()),
100    )
101    .await;
102    done.store(true, Ordering::Relaxed);
103    let _ = probe_thread.join();
104
105    let result = match run_result {
106        Ok((output, steps, tool_calls, exit_reason)) => {
107            let (success, error) = match exit_reason {
108                AgentLoopExit::Completed => (true, None),
109                AgentLoopExit::MaxStepsReached => (
110                    false,
111                    Some(format!("Sub-agent hit max steps ({})", payload.max_steps)),
112                ),
113                AgentLoopExit::TimedOut => (
114                    false,
115                    Some(format!(
116                        "Sub-agent timed out after {}s",
117                        payload.timeout_secs
118                    )),
119                ),
120            };
121            SubTaskResult {
122                subtask_id: payload.subtask_id.clone(),
123                subagent_id: format!("agent-{}", payload.subtask_id),
124                success,
125                result: output,
126                steps,
127                tool_calls,
128                execution_time_ms: started.elapsed().as_millis() as u64,
129                error,
130                artifacts: Vec::new(),
131
132                retry_count: 0,
133            }
134        }
135        Err(error) => SubTaskResult {
136            subtask_id: payload.subtask_id.clone(),
137            subagent_id: format!("agent-{}", payload.subtask_id),
138            success: false,
139            result: String::new(),
140            steps: 0,
141            tool_calls: 0,
142            execution_time_ms: started.elapsed().as_millis() as u64,
143            error: Some(error.to_string()),
144            artifacts: Vec::new(),
145
146            retry_count: 0,
147        },
148    };
149
150    println!(
151        "{}{}",
152        SWARM_SUBTASK_RESULT_PREFIX,
153        serde_json::to_string(&result)?
154    );
155    Ok(())
156}
157
158fn emit_probe(subtask_id: &str) {
159    let changed_files = collect_changed_files().unwrap_or_default();
160    let changed_lines = collect_changed_lines().unwrap_or(0);
161    let probe = RemoteBranchProbe {
162        subtask_id: subtask_id.to_string(),
163        compile_ok: run_cargo_check().unwrap_or(false),
164        changed_files: changed_files.into_iter().collect(),
165        changed_lines,
166    };
167    if let Ok(json) = serde_json::to_string(&probe) {
168        println!("{SWARM_SUBTASK_PROBE_PREFIX}{json}");
169    }
170}
171
172fn run_cargo_check() -> Result<bool> {
173    let output = Command::new("cargo")
174        .args(["check", "--quiet"])
175        .output()
176        .context("Failed to execute cargo check in remote subtask")?;
177    Ok(output.status.success())
178}
179
180fn collect_changed_files() -> Result<std::collections::HashSet<String>> {
181    let output = Command::new("git")
182        .args(["diff", "--name-only"])
183        .output()
184        .context("Failed to collect changed files in remote subtask")?;
185    if !output.status.success() {
186        return Ok(Default::default());
187    }
188    Ok(String::from_utf8_lossy(&output.stdout)
189        .lines()
190        .filter(|line| !line.trim().is_empty())
191        .map(ToString::to_string)
192        .collect())
193}
194
195fn collect_changed_lines() -> Result<u32> {
196    let output = Command::new("git")
197        .args(["diff", "--numstat"])
198        .output()
199        .context("Failed to collect changed lines in remote subtask")?;
200    if !output.status.success() {
201        return Ok(0);
202    }
203    let mut total = 0u32;
204    for line in String::from_utf8_lossy(&output.stdout).lines() {
205        let parts: Vec<&str> = line.split('\t').collect();
206        if parts.len() < 2 {
207            continue;
208        }
209        total += parts[0].parse::<u32>().unwrap_or(0);
210        total += parts[1].parse::<u32>().unwrap_or(0);
211    }
212    Ok(total)
213}