codetether_agent/swarm/
kubernetes_executor.rs1use super::subtask::SubTaskResult;
4use anyhow::{Result, anyhow};
5use base64::{Engine as _, engine::general_purpose::STANDARD};
6use serde::{Deserialize, Serialize};
7use std::collections::HashSet;
8
9pub const SWARM_SUBTASK_PAYLOAD_ENV: &str = "CODETETHER_SWARM_SUBTASK_PAYLOAD";
10pub const SWARM_SUBTASK_PROBE_PREFIX: &str = "CT_SUBTASK_PROBE ";
11pub const SWARM_SUBTASK_RESULT_PREFIX: &str = "CT_SUBTASK_RESULT ";
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RemoteSubtaskPayload {
15 pub swarm_id: String,
16 pub subtask_id: String,
17 pub subtask_name: String,
18 pub specialty: String,
19 pub instruction: String,
20 pub context: String,
21 pub provider: String,
22 pub model: String,
23 pub max_steps: usize,
24 pub timeout_secs: u64,
25 pub working_dir: Option<String>,
26 #[serde(default)]
27 pub read_only: bool,
28 #[serde(default = "default_probe_interval_secs")]
29 pub probe_interval_secs: u64,
30}
31
32fn default_probe_interval_secs() -> u64 {
33 10
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RemoteBranchProbe {
38 pub subtask_id: String,
39 pub compile_ok: bool,
40 pub changed_files: Vec<String>,
41 pub changed_lines: u32,
42}
43
44pub fn encode_payload(payload: &RemoteSubtaskPayload) -> Result<String> {
45 let json = serde_json::to_vec(payload)?;
46 Ok(STANDARD.encode(json))
47}
48
49pub fn decode_payload(payload_b64: &str) -> Result<RemoteSubtaskPayload> {
50 let bytes = STANDARD
51 .decode(payload_b64.as_bytes())
52 .map_err(|e| anyhow!("Invalid base64 payload: {e}"))?;
53 let payload = serde_json::from_slice::<RemoteSubtaskPayload>(&bytes)
54 .map_err(|e| anyhow!("Invalid remote payload JSON: {e}"))?;
55 Ok(payload)
56}
57
58pub fn latest_probe_from_logs(logs: &str) -> Option<RemoteBranchProbe> {
59 logs.lines()
60 .rev()
61 .find_map(|line| line.strip_prefix(SWARM_SUBTASK_PROBE_PREFIX))
62 .and_then(|json| serde_json::from_str::<RemoteBranchProbe>(json).ok())
63}
64
65pub fn result_from_logs(logs: &str) -> Option<SubTaskResult> {
66 logs.lines()
67 .rev()
68 .find_map(|line| line.strip_prefix(SWARM_SUBTASK_RESULT_PREFIX))
69 .and_then(|json| serde_json::from_str::<SubTaskResult>(json).ok())
70}
71
72pub fn probe_changed_files_set(probe: &RemoteBranchProbe) -> HashSet<String> {
73 probe.changed_files.iter().cloned().collect()
74}
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79
80 #[test]
81 fn payload_roundtrip() {
82 let payload = RemoteSubtaskPayload {
83 swarm_id: "swarm-1".to_string(),
84 subtask_id: "subtask-1".to_string(),
85 subtask_name: "name".to_string(),
86 specialty: "specialist".to_string(),
87 instruction: "do work".to_string(),
88 context: "ctx".to_string(),
89 provider: "zai".to_string(),
90 model: "glm-5".to_string(),
91 max_steps: 20,
92 timeout_secs: 180,
93 working_dir: Some("/workspace".to_string()),
94 read_only: true,
95 probe_interval_secs: 5,
96 };
97 let encoded = encode_payload(&payload).expect("encode");
98 let decoded = decode_payload(&encoded).expect("decode");
99 assert_eq!(decoded.subtask_id, "subtask-1");
100 assert_eq!(decoded.provider, "zai");
101 assert!(decoded.read_only);
102 }
103
104 #[test]
105 fn parse_probe_and_result_from_logs() {
106 let probe = RemoteBranchProbe {
107 subtask_id: "subtask-1".to_string(),
108 compile_ok: true,
109 changed_files: vec!["src/main.rs".to_string()],
110 changed_lines: 42,
111 };
112 let result = SubTaskResult {
113 subtask_id: "subtask-1".to_string(),
114 subagent_id: "agent-subtask-1".to_string(),
115 success: true,
116 result: "ok".to_string(),
117 steps: 3,
118 tool_calls: 2,
119 execution_time_ms: 1000,
120 error: None,
121 artifacts: Vec::new(),
122 retry_count: 0,
123 };
124 let logs = format!(
125 "line\n{}{}\nline\n{}{}\n",
126 SWARM_SUBTASK_PROBE_PREFIX,
127 serde_json::to_string(&probe).expect("probe json"),
128 SWARM_SUBTASK_RESULT_PREFIX,
129 serde_json::to_string(&result).expect("result json"),
130 );
131 let parsed_probe = latest_probe_from_logs(&logs).expect("probe");
132 let parsed_result = result_from_logs(&logs).expect("result");
133 assert!(parsed_probe.compile_ok);
134 assert_eq!(parsed_probe.changed_lines, 42);
135 assert_eq!(parsed_result.subtask_id, "subtask-1");
136 assert!(parsed_result.success);
137 }
138}