codetether_agent/swarm/
remote_subtask.rs1use super::executor::{AgentLoopExit, run_agent_loop};
4use super::kubernetes_executor::{
5 RemoteBranchProbe, SWARM_SUBTASK_PROBE_PREFIX, SWARM_SUBTASK_RESULT_PREFIX, decode_payload,
6};
7use super::subtask::SubTaskResult;
8use crate::cli::SwarmSubagentArgs;
9use crate::provider::ProviderRegistry;
10use crate::tool::ToolRegistry;
11use anyhow::{Context, Result, anyhow};
12use std::path::PathBuf;
13use std::process::Command;
14use std::sync::{
15 Arc,
16 atomic::{AtomicBool, Ordering},
17};
18use std::thread;
19use std::time::{Duration, Instant};
20
21pub async fn run_swarm_subagent(args: SwarmSubagentArgs) -> Result<()> {
22 let payload_b64 = if let Some(payload) = args.payload_base64 {
23 payload
24 } else {
25 std::env::var(&args.payload_env)
26 .with_context(|| format!("Missing swarm payload env var {}", args.payload_env))?
27 };
28 let payload = decode_payload(&payload_b64)?;
29
30 let working_dir = payload
31 .working_dir
32 .as_ref()
33 .map(PathBuf::from)
34 .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
35 std::env::set_current_dir(&working_dir)
36 .with_context(|| format!("Failed to set working dir to {}", working_dir.display()))?;
37
38 let registry = ProviderRegistry::from_vault().await?;
39 let provider = registry
40 .get(&payload.provider)
41 .ok_or_else(|| anyhow!("Provider '{}' unavailable in remote pod", payload.provider))?;
42
43 let tool_registry =
44 ToolRegistry::with_provider_arc(Arc::clone(&provider), payload.model.clone());
45 let tool_defs = tool_registry
47 .definitions()
48 .into_iter()
49 .filter(|t| t.name != "question")
50 .collect();
51
52 let specialty = if payload.specialty.is_empty() {
53 "generalist".to_string()
54 } else {
55 payload.specialty.clone()
56 };
57 let system_prompt = format!(
58 "You are a {specialty} specialist sub-agent (ID: {}). \
59Use tools to execute the task and summarize concrete outputs.",
60 payload.subtask_id
61 );
62 let user_prompt = if payload.context.trim().is_empty() {
63 payload.instruction.clone()
64 } else {
65 format!(
66 "{}\n\nContext from dependencies:\n{}",
67 payload.instruction, payload.context
68 )
69 };
70
71 let done = Arc::new(AtomicBool::new(false));
72 let done_for_probe = Arc::clone(&done);
73 let subtask_id_for_probe = payload.subtask_id.clone();
74 let probe_interval = payload.probe_interval_secs.max(3);
75 let probe_thread = thread::spawn(move || {
76 emit_probe(&subtask_id_for_probe);
77 while !done_for_probe.load(Ordering::Relaxed) {
78 thread::sleep(Duration::from_secs(probe_interval));
79 if done_for_probe.load(Ordering::Relaxed) {
80 break;
81 }
82 emit_probe(&subtask_id_for_probe);
83 }
84 });
85
86 let started = Instant::now();
87 let run_result = run_agent_loop(
88 provider,
89 &payload.model,
90 &system_prompt,
91 &user_prompt,
92 tool_defs,
93 tool_registry,
94 payload.max_steps,
95 payload.timeout_secs,
96 None,
97 payload.subtask_id.clone(),
98 None,
99 Some(working_dir.clone()),
100 )
101 .await;
102 done.store(true, Ordering::Relaxed);
103 let _ = probe_thread.join();
104
105 let result = match run_result {
106 Ok((output, steps, tool_calls, exit_reason)) => {
107 let (success, error) = match exit_reason {
108 AgentLoopExit::Completed => (true, None),
109 AgentLoopExit::MaxStepsReached => (
110 false,
111 Some(format!("Sub-agent hit max steps ({})", payload.max_steps)),
112 ),
113 AgentLoopExit::TimedOut => (
114 false,
115 Some(format!(
116 "Sub-agent timed out after {}s",
117 payload.timeout_secs
118 )),
119 ),
120 };
121 SubTaskResult {
122 subtask_id: payload.subtask_id.clone(),
123 subagent_id: format!("agent-{}", payload.subtask_id),
124 success,
125 result: output,
126 steps,
127 tool_calls,
128 execution_time_ms: started.elapsed().as_millis() as u64,
129 error,
130 artifacts: Vec::new(),
131 }
132 }
133 Err(error) => SubTaskResult {
134 subtask_id: payload.subtask_id.clone(),
135 subagent_id: format!("agent-{}", payload.subtask_id),
136 success: false,
137 result: String::new(),
138 steps: 0,
139 tool_calls: 0,
140 execution_time_ms: started.elapsed().as_millis() as u64,
141 error: Some(error.to_string()),
142 artifacts: Vec::new(),
143 },
144 };
145
146 println!(
147 "{}{}",
148 SWARM_SUBTASK_RESULT_PREFIX,
149 serde_json::to_string(&result)?
150 );
151 Ok(())
152}
153
154fn emit_probe(subtask_id: &str) {
155 let changed_files = collect_changed_files().unwrap_or_default();
156 let changed_lines = collect_changed_lines().unwrap_or(0);
157 let probe = RemoteBranchProbe {
158 subtask_id: subtask_id.to_string(),
159 compile_ok: run_cargo_check().unwrap_or(false),
160 changed_files: changed_files.into_iter().collect(),
161 changed_lines,
162 };
163 if let Ok(json) = serde_json::to_string(&probe) {
164 println!("{SWARM_SUBTASK_PROBE_PREFIX}{json}");
165 }
166}
167
168fn run_cargo_check() -> Result<bool> {
169 let output = Command::new("cargo")
170 .args(["check", "--quiet"])
171 .output()
172 .context("Failed to execute cargo check in remote subtask")?;
173 Ok(output.status.success())
174}
175
176fn collect_changed_files() -> Result<std::collections::HashSet<String>> {
177 let output = Command::new("git")
178 .args(["diff", "--name-only"])
179 .output()
180 .context("Failed to collect changed files in remote subtask")?;
181 if !output.status.success() {
182 return Ok(Default::default());
183 }
184 Ok(String::from_utf8_lossy(&output.stdout)
185 .lines()
186 .filter(|line| !line.trim().is_empty())
187 .map(ToString::to_string)
188 .collect())
189}
190
191fn collect_changed_lines() -> Result<u32> {
192 let output = Command::new("git")
193 .args(["diff", "--numstat"])
194 .output()
195 .context("Failed to collect changed lines in remote subtask")?;
196 if !output.status.success() {
197 return Ok(0);
198 }
199 let mut total = 0u32;
200 for line in String::from_utf8_lossy(&output.stdout).lines() {
201 let parts: Vec<&str> = line.split('\t').collect();
202 if parts.len() < 2 {
203 continue;
204 }
205 total += parts[0].parse::<u32>().unwrap_or(0);
206 total += parts[1].parse::<u32>().unwrap_or(0);
207 }
208 Ok(total)
209}