codetether_agent/swarm/
kubernetes_executor.rs1use super::subtask::SubTaskResult;
4use anyhow::{Result, anyhow};
5use base64::{Engine as _, engine::general_purpose::STANDARD};
6use serde::{Deserialize, Serialize};
7use std::collections::HashSet;
8
9pub const SWARM_SUBTASK_PAYLOAD_ENV: &str = "CODETETHER_SWARM_SUBTASK_PAYLOAD";
10pub const SWARM_SUBTASK_PROBE_PREFIX: &str = "CT_SUBTASK_PROBE ";
11pub const SWARM_SUBTASK_RESULT_PREFIX: &str = "CT_SUBTASK_RESULT ";
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RemoteSubtaskPayload {
15 pub swarm_id: String,
16 pub subtask_id: String,
17 pub subtask_name: String,
18 pub specialty: String,
19 pub instruction: String,
20 pub context: String,
21 pub provider: String,
22 pub model: String,
23 pub max_steps: usize,
24 pub timeout_secs: u64,
25 pub working_dir: Option<String>,
26 #[serde(default = "default_probe_interval_secs")]
27 pub probe_interval_secs: u64,
28}
29
30fn default_probe_interval_secs() -> u64 {
31 10
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteBranchProbe {
36 pub subtask_id: String,
37 pub compile_ok: bool,
38 pub changed_files: Vec<String>,
39 pub changed_lines: u32,
40}
41
42pub fn encode_payload(payload: &RemoteSubtaskPayload) -> Result<String> {
43 let json = serde_json::to_vec(payload)?;
44 Ok(STANDARD.encode(json))
45}
46
47pub fn decode_payload(payload_b64: &str) -> Result<RemoteSubtaskPayload> {
48 let bytes = STANDARD
49 .decode(payload_b64.as_bytes())
50 .map_err(|e| anyhow!("Invalid base64 payload: {e}"))?;
51 let payload = serde_json::from_slice::<RemoteSubtaskPayload>(&bytes)
52 .map_err(|e| anyhow!("Invalid remote payload JSON: {e}"))?;
53 Ok(payload)
54}
55
56pub fn latest_probe_from_logs(logs: &str) -> Option<RemoteBranchProbe> {
57 logs.lines()
58 .rev()
59 .find_map(|line| line.strip_prefix(SWARM_SUBTASK_PROBE_PREFIX))
60 .and_then(|json| serde_json::from_str::<RemoteBranchProbe>(json).ok())
61}
62
63pub fn result_from_logs(logs: &str) -> Option<SubTaskResult> {
64 logs.lines()
65 .rev()
66 .find_map(|line| line.strip_prefix(SWARM_SUBTASK_RESULT_PREFIX))
67 .and_then(|json| serde_json::from_str::<SubTaskResult>(json).ok())
68}
69
70pub fn probe_changed_files_set(probe: &RemoteBranchProbe) -> HashSet<String> {
71 probe.changed_files.iter().cloned().collect()
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77
78 #[test]
79 fn payload_roundtrip() {
80 let payload = RemoteSubtaskPayload {
81 swarm_id: "swarm-1".to_string(),
82 subtask_id: "subtask-1".to_string(),
83 subtask_name: "name".to_string(),
84 specialty: "specialist".to_string(),
85 instruction: "do work".to_string(),
86 context: "ctx".to_string(),
87 provider: "zai".to_string(),
88 model: "glm-5".to_string(),
89 max_steps: 20,
90 timeout_secs: 180,
91 working_dir: Some("/workspace".to_string()),
92 probe_interval_secs: 5,
93 };
94 let encoded = encode_payload(&payload).expect("encode");
95 let decoded = decode_payload(&encoded).expect("decode");
96 assert_eq!(decoded.subtask_id, "subtask-1");
97 assert_eq!(decoded.provider, "zai");
98 }
99
100 #[test]
101 fn parse_probe_and_result_from_logs() {
102 let probe = RemoteBranchProbe {
103 subtask_id: "subtask-1".to_string(),
104 compile_ok: true,
105 changed_files: vec!["src/main.rs".to_string()],
106 changed_lines: 42,
107 };
108 let result = SubTaskResult {
109 subtask_id: "subtask-1".to_string(),
110 subagent_id: "agent-subtask-1".to_string(),
111 success: true,
112 result: "ok".to_string(),
113 steps: 3,
114 tool_calls: 2,
115 execution_time_ms: 1000,
116 error: None,
117 artifacts: Vec::new(),
118 };
119 let logs = format!(
120 "line\n{}{}\nline\n{}{}\n",
121 SWARM_SUBTASK_PROBE_PREFIX,
122 serde_json::to_string(&probe).expect("probe json"),
123 SWARM_SUBTASK_RESULT_PREFIX,
124 serde_json::to_string(&result).expect("result json"),
125 );
126 let parsed_probe = latest_probe_from_logs(&logs).expect("probe");
127 let parsed_result = result_from_logs(&logs).expect("result");
128 assert!(parsed_probe.compile_ok);
129 assert_eq!(parsed_probe.changed_lines, 42);
130 assert_eq!(parsed_result.subtask_id, "subtask-1");
131 assert!(parsed_result.success);
132 }
133}