Skip to main content

codetether_agent/swarm/
kubernetes_executor.rs

1//! Shared protocol types for Kubernetes-backed swarm execution.
2
3use super::subtask::SubTaskResult;
4use anyhow::{Result, anyhow};
5use base64::{Engine as _, engine::general_purpose::STANDARD};
6use serde::{Deserialize, Serialize};
7use std::collections::HashSet;
8
9pub const SWARM_SUBTASK_PAYLOAD_ENV: &str = "CODETETHER_SWARM_SUBTASK_PAYLOAD";
10pub const SWARM_SUBTASK_PROBE_PREFIX: &str = "CT_SUBTASK_PROBE ";
11pub const SWARM_SUBTASK_RESULT_PREFIX: &str = "CT_SUBTASK_RESULT ";
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RemoteSubtaskPayload {
15    pub swarm_id: String,
16    pub subtask_id: String,
17    pub subtask_name: String,
18    pub specialty: String,
19    pub instruction: String,
20    pub context: String,
21    pub provider: String,
22    pub model: String,
23    pub max_steps: usize,
24    pub timeout_secs: u64,
25    pub working_dir: Option<String>,
26    #[serde(default = "default_probe_interval_secs")]
27    pub probe_interval_secs: u64,
28}
29
30fn default_probe_interval_secs() -> u64 {
31    10
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct RemoteBranchProbe {
36    pub subtask_id: String,
37    pub compile_ok: bool,
38    pub changed_files: Vec<String>,
39    pub changed_lines: u32,
40}
41
42pub fn encode_payload(payload: &RemoteSubtaskPayload) -> Result<String> {
43    let json = serde_json::to_vec(payload)?;
44    Ok(STANDARD.encode(json))
45}
46
47pub fn decode_payload(payload_b64: &str) -> Result<RemoteSubtaskPayload> {
48    let bytes = STANDARD
49        .decode(payload_b64.as_bytes())
50        .map_err(|e| anyhow!("Invalid base64 payload: {e}"))?;
51    let payload = serde_json::from_slice::<RemoteSubtaskPayload>(&bytes)
52        .map_err(|e| anyhow!("Invalid remote payload JSON: {e}"))?;
53    Ok(payload)
54}
55
56pub fn latest_probe_from_logs(logs: &str) -> Option<RemoteBranchProbe> {
57    logs.lines()
58        .rev()
59        .find_map(|line| line.strip_prefix(SWARM_SUBTASK_PROBE_PREFIX))
60        .and_then(|json| serde_json::from_str::<RemoteBranchProbe>(json).ok())
61}
62
63pub fn result_from_logs(logs: &str) -> Option<SubTaskResult> {
64    logs.lines()
65        .rev()
66        .find_map(|line| line.strip_prefix(SWARM_SUBTASK_RESULT_PREFIX))
67        .and_then(|json| serde_json::from_str::<SubTaskResult>(json).ok())
68}
69
70pub fn probe_changed_files_set(probe: &RemoteBranchProbe) -> HashSet<String> {
71    probe.changed_files.iter().cloned().collect()
72}
73
74#[cfg(test)]
75mod tests {
76    use super::*;
77
78    #[test]
79    fn payload_roundtrip() {
80        let payload = RemoteSubtaskPayload {
81            swarm_id: "swarm-1".to_string(),
82            subtask_id: "subtask-1".to_string(),
83            subtask_name: "name".to_string(),
84            specialty: "specialist".to_string(),
85            instruction: "do work".to_string(),
86            context: "ctx".to_string(),
87            provider: "zai".to_string(),
88            model: "glm-5".to_string(),
89            max_steps: 20,
90            timeout_secs: 180,
91            working_dir: Some("/workspace".to_string()),
92            probe_interval_secs: 5,
93        };
94        let encoded = encode_payload(&payload).expect("encode");
95        let decoded = decode_payload(&encoded).expect("decode");
96        assert_eq!(decoded.subtask_id, "subtask-1");
97        assert_eq!(decoded.provider, "zai");
98    }
99
100    #[test]
101    fn parse_probe_and_result_from_logs() {
102        let probe = RemoteBranchProbe {
103            subtask_id: "subtask-1".to_string(),
104            compile_ok: true,
105            changed_files: vec!["src/main.rs".to_string()],
106            changed_lines: 42,
107        };
108        let result = SubTaskResult {
109            subtask_id: "subtask-1".to_string(),
110            subagent_id: "agent-subtask-1".to_string(),
111            success: true,
112            result: "ok".to_string(),
113            steps: 3,
114            tool_calls: 2,
115            execution_time_ms: 1000,
116            error: None,
117            artifacts: Vec::new(),
118        };
119        let logs = format!(
120            "line\n{}{}\nline\n{}{}\n",
121            SWARM_SUBTASK_PROBE_PREFIX,
122            serde_json::to_string(&probe).expect("probe json"),
123            SWARM_SUBTASK_RESULT_PREFIX,
124            serde_json::to_string(&result).expect("result json"),
125        );
126        let parsed_probe = latest_probe_from_logs(&logs).expect("probe");
127        let parsed_result = result_from_logs(&logs).expect("result");
128        assert!(parsed_probe.compile_ok);
129        assert_eq!(parsed_probe.changed_lines, 42);
130        assert_eq!(parsed_result.subtask_id, "subtask-1");
131        assert!(parsed_result.success);
132    }
133}