Skip to main content

codetether_agent/swarm/
kubernetes_executor.rs

1//! Shared protocol types for Kubernetes-backed swarm execution.
2
3use super::subtask::SubTaskResult;
4use anyhow::{Result, anyhow};
5use base64::{Engine as _, engine::general_purpose::STANDARD};
6use serde::{Deserialize, Serialize};
7use std::collections::HashSet;
8
9pub const SWARM_SUBTASK_PAYLOAD_ENV: &str = "CODETETHER_SWARM_SUBTASK_PAYLOAD";
10pub const SWARM_SUBTASK_PROBE_PREFIX: &str = "CT_SUBTASK_PROBE ";
11pub const SWARM_SUBTASK_RESULT_PREFIX: &str = "CT_SUBTASK_RESULT ";
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct RemoteSubtaskPayload {
15    pub swarm_id: String,
16    pub subtask_id: String,
17    pub subtask_name: String,
18    pub specialty: String,
19    pub instruction: String,
20    pub context: String,
21    pub provider: String,
22    pub model: String,
23    pub max_steps: usize,
24    pub timeout_secs: u64,
25    pub working_dir: Option<String>,
26    #[serde(default)]
27    pub read_only: bool,
28    #[serde(default = "default_probe_interval_secs")]
29    pub probe_interval_secs: u64,
30}
31
32fn default_probe_interval_secs() -> u64 {
33    10
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct RemoteBranchProbe {
38    pub subtask_id: String,
39    pub compile_ok: bool,
40    pub changed_files: Vec<String>,
41    pub changed_lines: u32,
42}
43
44pub fn encode_payload(payload: &RemoteSubtaskPayload) -> Result<String> {
45    let json = serde_json::to_vec(payload)?;
46    Ok(STANDARD.encode(json))
47}
48
49pub fn decode_payload(payload_b64: &str) -> Result<RemoteSubtaskPayload> {
50    let bytes = STANDARD
51        .decode(payload_b64.as_bytes())
52        .map_err(|e| anyhow!("Invalid base64 payload: {e}"))?;
53    let payload = serde_json::from_slice::<RemoteSubtaskPayload>(&bytes)
54        .map_err(|e| anyhow!("Invalid remote payload JSON: {e}"))?;
55    Ok(payload)
56}
57
58pub fn latest_probe_from_logs(logs: &str) -> Option<RemoteBranchProbe> {
59    logs.lines()
60        .rev()
61        .find_map(|line| line.strip_prefix(SWARM_SUBTASK_PROBE_PREFIX))
62        .and_then(|json| serde_json::from_str::<RemoteBranchProbe>(json).ok())
63}
64
65pub fn result_from_logs(logs: &str) -> Option<SubTaskResult> {
66    logs.lines()
67        .rev()
68        .find_map(|line| line.strip_prefix(SWARM_SUBTASK_RESULT_PREFIX))
69        .and_then(|json| serde_json::from_str::<SubTaskResult>(json).ok())
70}
71
72pub fn probe_changed_files_set(probe: &RemoteBranchProbe) -> HashSet<String> {
73    probe.changed_files.iter().cloned().collect()
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79
80    #[test]
81    fn payload_roundtrip() {
82        let payload = RemoteSubtaskPayload {
83            swarm_id: "swarm-1".to_string(),
84            subtask_id: "subtask-1".to_string(),
85            subtask_name: "name".to_string(),
86            specialty: "specialist".to_string(),
87            instruction: "do work".to_string(),
88            context: "ctx".to_string(),
89            provider: "zai".to_string(),
90            model: "glm-5".to_string(),
91            max_steps: 20,
92            timeout_secs: 180,
93            working_dir: Some("/workspace".to_string()),
94            read_only: true,
95            probe_interval_secs: 5,
96        };
97        let encoded = encode_payload(&payload).expect("encode");
98        let decoded = decode_payload(&encoded).expect("decode");
99        assert_eq!(decoded.subtask_id, "subtask-1");
100        assert_eq!(decoded.provider, "zai");
101        assert!(decoded.read_only);
102    }
103
104    #[test]
105    fn parse_probe_and_result_from_logs() {
106        let probe = RemoteBranchProbe {
107            subtask_id: "subtask-1".to_string(),
108            compile_ok: true,
109            changed_files: vec!["src/main.rs".to_string()],
110            changed_lines: 42,
111        };
112        let result = SubTaskResult {
113            subtask_id: "subtask-1".to_string(),
114            subagent_id: "agent-subtask-1".to_string(),
115            success: true,
116            result: "ok".to_string(),
117            steps: 3,
118            tool_calls: 2,
119            execution_time_ms: 1000,
120            error: None,
121            artifacts: Vec::new(),
122            retry_count: 0,
123        };
124        let logs = format!(
125            "line\n{}{}\nline\n{}{}\n",
126            SWARM_SUBTASK_PROBE_PREFIX,
127            serde_json::to_string(&probe).expect("probe json"),
128            SWARM_SUBTASK_RESULT_PREFIX,
129            serde_json::to_string(&result).expect("result json"),
130        );
131        let parsed_probe = latest_probe_from_logs(&logs).expect("probe");
132        let parsed_result = result_from_logs(&logs).expect("result");
133        assert!(parsed_probe.compile_ok);
134        assert_eq!(parsed_probe.changed_lines, 42);
135        assert_eq!(parsed_result.subtask_id, "subtask-1");
136        assert!(parsed_result.success);
137    }
138}