Skip to main content

codetether_agent/swarm/
remote_subtask.rs

1//! Remote subtask runner for Kubernetes-backed swarm execution.
2
3use super::executor::{AgentLoopExit, run_agent_loop};
4use super::kubernetes_executor::{
5    RemoteBranchProbe, SWARM_SUBTASK_PROBE_PREFIX, SWARM_SUBTASK_RESULT_PREFIX, decode_payload,
6};
7use super::subtask::SubTaskResult;
8use crate::cli::SwarmSubagentArgs;
9use crate::provider::ProviderRegistry;
10use crate::tool::ToolRegistry;
11use anyhow::{Context, Result, anyhow};
12use std::path::PathBuf;
13use std::process::Command;
14use std::sync::{
15    Arc,
16    atomic::{AtomicBool, Ordering},
17};
18use std::thread;
19use std::time::{Duration, Instant};
20
21pub async fn run_swarm_subagent(args: SwarmSubagentArgs) -> Result<()> {
22    let payload_b64 = if let Some(payload) = args.payload_base64 {
23        payload
24    } else {
25        std::env::var(&args.payload_env)
26            .with_context(|| format!("Missing swarm payload env var {}", args.payload_env))?
27    };
28    let payload = decode_payload(&payload_b64)?;
29
30    let working_dir = payload
31        .working_dir
32        .as_ref()
33        .map(PathBuf::from)
34        .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
35    std::env::set_current_dir(&working_dir)
36        .with_context(|| format!("Failed to set working dir to {}", working_dir.display()))?;
37
38    let registry = ProviderRegistry::from_vault().await?;
39    let provider = registry
40        .get(&payload.provider)
41        .ok_or_else(|| anyhow!("Provider '{}' unavailable in remote pod", payload.provider))?;
42
43    let tool_registry =
44        ToolRegistry::with_provider_arc(Arc::clone(&provider), payload.model.clone());
45    // Remote pod is autonomous; interactive question tool is intentionally excluded.
46    let tool_defs = tool_registry
47        .definitions()
48        .into_iter()
49        .filter(|t| t.name != "question")
50        .collect();
51
52    let specialty = if payload.specialty.is_empty() {
53        "generalist".to_string()
54    } else {
55        payload.specialty.clone()
56    };
57    let system_prompt = format!(
58        "You are a {specialty} specialist sub-agent (ID: {}). \
59Use tools to execute the task and summarize concrete outputs.",
60        payload.subtask_id
61    );
62    let user_prompt = if payload.context.trim().is_empty() {
63        payload.instruction.clone()
64    } else {
65        format!(
66            "{}\n\nContext from dependencies:\n{}",
67            payload.instruction, payload.context
68        )
69    };
70
71    let done = Arc::new(AtomicBool::new(false));
72    let done_for_probe = Arc::clone(&done);
73    let subtask_id_for_probe = payload.subtask_id.clone();
74    let probe_interval = payload.probe_interval_secs.max(3);
75    let probe_thread = thread::spawn(move || {
76        emit_probe(&subtask_id_for_probe);
77        while !done_for_probe.load(Ordering::Relaxed) {
78            thread::sleep(Duration::from_secs(probe_interval));
79            if done_for_probe.load(Ordering::Relaxed) {
80                break;
81            }
82            emit_probe(&subtask_id_for_probe);
83        }
84    });
85
86    let started = Instant::now();
87    let run_result = run_agent_loop(
88        provider,
89        &payload.model,
90        &system_prompt,
91        &user_prompt,
92        tool_defs,
93        tool_registry,
94        payload.max_steps,
95        payload.timeout_secs,
96        None,
97        payload.subtask_id.clone(),
98        None,
99        Some(working_dir.clone()),
100    )
101    .await;
102    done.store(true, Ordering::Relaxed);
103    let _ = probe_thread.join();
104
105    let result = match run_result {
106        Ok((output, steps, tool_calls, exit_reason)) => {
107            let (success, error) = match exit_reason {
108                AgentLoopExit::Completed => (true, None),
109                AgentLoopExit::MaxStepsReached => (
110                    false,
111                    Some(format!("Sub-agent hit max steps ({})", payload.max_steps)),
112                ),
113                AgentLoopExit::TimedOut => (
114                    false,
115                    Some(format!(
116                        "Sub-agent timed out after {}s",
117                        payload.timeout_secs
118                    )),
119                ),
120            };
121            SubTaskResult {
122                subtask_id: payload.subtask_id.clone(),
123                subagent_id: format!("agent-{}", payload.subtask_id),
124                success,
125                result: output,
126                steps,
127                tool_calls,
128                execution_time_ms: started.elapsed().as_millis() as u64,
129                error,
130                artifacts: Vec::new(),
131            }
132        }
133        Err(error) => SubTaskResult {
134            subtask_id: payload.subtask_id.clone(),
135            subagent_id: format!("agent-{}", payload.subtask_id),
136            success: false,
137            result: String::new(),
138            steps: 0,
139            tool_calls: 0,
140            execution_time_ms: started.elapsed().as_millis() as u64,
141            error: Some(error.to_string()),
142            artifacts: Vec::new(),
143        },
144    };
145
146    println!(
147        "{}{}",
148        SWARM_SUBTASK_RESULT_PREFIX,
149        serde_json::to_string(&result)?
150    );
151    Ok(())
152}
153
154fn emit_probe(subtask_id: &str) {
155    let changed_files = collect_changed_files().unwrap_or_default();
156    let changed_lines = collect_changed_lines().unwrap_or(0);
157    let probe = RemoteBranchProbe {
158        subtask_id: subtask_id.to_string(),
159        compile_ok: run_cargo_check().unwrap_or(false),
160        changed_files: changed_files.into_iter().collect(),
161        changed_lines,
162    };
163    if let Ok(json) = serde_json::to_string(&probe) {
164        println!("{SWARM_SUBTASK_PROBE_PREFIX}{json}");
165    }
166}
167
168fn run_cargo_check() -> Result<bool> {
169    let output = Command::new("cargo")
170        .args(["check", "--quiet"])
171        .output()
172        .context("Failed to execute cargo check in remote subtask")?;
173    Ok(output.status.success())
174}
175
176fn collect_changed_files() -> Result<std::collections::HashSet<String>> {
177    let output = Command::new("git")
178        .args(["diff", "--name-only"])
179        .output()
180        .context("Failed to collect changed files in remote subtask")?;
181    if !output.status.success() {
182        return Ok(Default::default());
183    }
184    Ok(String::from_utf8_lossy(&output.stdout)
185        .lines()
186        .filter(|line| !line.trim().is_empty())
187        .map(ToString::to_string)
188        .collect())
189}
190
191fn collect_changed_lines() -> Result<u32> {
192    let output = Command::new("git")
193        .args(["diff", "--numstat"])
194        .output()
195        .context("Failed to collect changed lines in remote subtask")?;
196    if !output.status.success() {
197        return Ok(0);
198    }
199    let mut total = 0u32;
200    for line in String::from_utf8_lossy(&output.stdout).lines() {
201        let parts: Vec<&str> = line.split('\t').collect();
202        if parts.len() < 2 {
203            continue;
204        }
205        total += parts[0].parse::<u32>().unwrap_or(0);
206        total += parts[1].parse::<u32>().unwrap_or(0);
207    }
208    Ok(total)
209}