Skip to main content

codetether_agent/tool/
k8s_tool.rs

1//! Kubernetes tool: manage K8s resources from the agent.
2//!
3//! Exposes cluster introspection, pod/deployment management, scaling,
4//! rolling restarts, sub-agent pod lifecycle, and log retrieval through
5//! a single `kubernetes` tool with an `action` parameter.
6
7use super::{Tool, ToolResult};
8use crate::k8s::K8sManager;
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use serde::Deserialize;
12use serde_json::{Value, json};
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::OnceCell;
16
17#[derive(Debug, Clone, Deserialize)]
18struct K8sInput {
19    action: K8sAction,
20    #[serde(default)]
21    namespace: Option<String>,
22    #[serde(default)]
23    deployment: Option<String>,
24    #[serde(default)]
25    replicas: Option<i32>,
26    #[serde(default)]
27    subagent_id: Option<String>,
28    #[serde(default)]
29    image: Option<String>,
30    #[serde(default)]
31    env_vars: Option<HashMap<String, String>>,
32    #[serde(default)]
33    labels: Option<HashMap<String, String>>,
34    #[serde(default)]
35    command: Option<Vec<String>>,
36    #[serde(default)]
37    args: Option<Vec<String>>,
38    #[serde(default = "default_tail_lines")]
39    tail_lines: i64,
40    #[serde(default)]
41    label_selector: Option<String>,
42}
43
44fn default_tail_lines() -> i64 {
45    100
46}
47
48#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
49#[serde(rename_all = "snake_case")]
50enum K8sAction {
51    Status,
52    ListPods,
53    Scale,
54    RollingRestart,
55    SpawnPod,
56    DeletePod,
57    PodState,
58    Logs,
59    RecentActions,
60}
61
62pub struct K8sTool {
63    manager: Arc<OnceCell<K8sManager>>,
64}
65
66impl K8sTool {
67    pub fn new() -> Self {
68        Self {
69            manager: Arc::new(OnceCell::new()),
70        }
71    }
72
73    /// Create a K8sTool with a pre-initialized manager.
74    pub fn with_manager(manager: K8sManager) -> Self {
75        let cell = OnceCell::new();
76        let _ = cell.set(manager);
77        Self {
78            manager: Arc::new(cell),
79        }
80    }
81
82    async fn get_manager(&self) -> &K8sManager {
83        self.manager
84            .get_or_init(|| async { K8sManager::new().await })
85            .await
86    }
87}
88
89impl Default for K8sTool {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95#[async_trait]
96impl Tool for K8sTool {
97    fn id(&self) -> &str {
98        "kubernetes"
99    }
100    fn name(&self) -> &str {
101        "kubernetes"
102    }
103
104    fn description(&self) -> &str {
105        "Manage Kubernetes resources: check cluster status, list pods, scale deployments, trigger rolling restarts, spawn/delete sub-agent pods, fetch pod logs, and view recent management actions. Works both in-cluster and with local kubeconfig."
106    }
107
108    fn parameters(&self) -> Value {
109        json!({
110            "type": "object",
111            "properties": {
112                "action": {
113                    "type": "string",
114                    "enum": ["status","list_pods","scale","rolling_restart",
115                              "spawn_pod","delete_pod","pod_state","logs","recent_actions"],
116                    "description": "The Kubernetes action to perform."
117                },
118                "replicas": {
119                    "type": "integer",
120                    "description": "Number of replicas (required for scale)."
121                },
122                "subagent_id": {
123                    "type": "string",
124                    "description": "Sub-agent identifier (for spawn_pod, delete_pod, pod_state, logs)."
125                },
126                "image": {
127                    "type": "string",
128                    "description": "Container image for spawned sub-agent pods."
129                },
130                "env_vars": {
131                    "type": "object",
132                    "additionalProperties": {"type": "string"},
133                    "description": "Environment variables for spawned sub-agent pods."
134                },
135                "labels": {
136                    "type": "object",
137                    "additionalProperties": {"type": "string"},
138                    "description": "Labels for spawned sub-agent pods."
139                },
140                "command": {
141                    "type": "array", "items": {"type": "string"},
142                    "description": "Command override for spawned sub-agent pods."
143                },
144                "args": {
145                    "type": "array", "items": {"type": "string"},
146                    "description": "Args override for spawned sub-agent pods."
147                },
148                "tail_lines": {
149                    "type": "integer",
150                    "description": "Number of tail lines for logs (default: 100).",
151                    "default": 100
152                },
153                "label_selector": {
154                    "type": "string",
155                    "description": "Label selector for list_pods (e.g. 'app=myapp')."
156                }
157            },
158            "required": ["action"]
159        })
160    }
161
162    async fn execute(&self, input: Value) -> Result<ToolResult> {
163        let params: K8sInput = serde_json::from_value(input)?;
164        let manager = self.get_manager().await;
165
166        if !manager.is_available() {
167            return Ok(ToolResult::error(
168                "Kubernetes is not available. Ensure the agent is running inside a cluster or KUBECONFIG is set.",
169            ));
170        }
171
172        match params.action {
173            K8sAction::Status => Self::exec_status(manager).await,
174            K8sAction::ListPods => Self::exec_list_pods(manager).await,
175            K8sAction::Scale => Self::exec_scale(manager, params).await,
176            K8sAction::RollingRestart => Self::exec_rolling_restart(manager).await,
177            K8sAction::SpawnPod => Self::exec_spawn_pod(manager, params).await,
178            K8sAction::DeletePod => Self::exec_delete_pod(manager, params).await,
179            K8sAction::PodState => Self::exec_pod_state(manager, params).await,
180            K8sAction::Logs => Self::exec_logs(manager, params).await,
181            K8sAction::RecentActions => Self::exec_recent_actions(manager).await,
182        }
183    }
184}
185
186// ---------------------------------------------------------------------------
187// Action handlers
188// ---------------------------------------------------------------------------
189
190impl K8sTool {
191    async fn exec_status(manager: &K8sManager) -> Result<ToolResult> {
192        let status = manager.status().await;
193        let output = serde_json::to_string_pretty(&json!({
194            "in_cluster": status.in_cluster,
195            "namespace": status.namespace,
196            "pod_name": status.pod_name,
197            "deployment_name": status.deployment_name,
198            "replicas": status.replicas,
199            "available_replicas": status.available_replicas,
200        }))?;
201        Ok(ToolResult::success(output))
202    }
203
204    async fn exec_list_pods(manager: &K8sManager) -> Result<ToolResult> {
205        let pods = manager.list_pods().await?;
206        let pods_json: Vec<Value> = pods
207            .iter()
208            .map(|p| {
209                json!({
210                    "name": p.name,
211                    "phase": p.phase,
212                    "ready": p.ready,
213                    "start_time": p.start_time,
214                })
215            })
216            .collect();
217
218        let output = json!({
219            "namespace": manager.status().await.namespace,
220            "pods": pods_json,
221            "count": pods_json.len(),
222        });
223        Ok(ToolResult::success(serde_json::to_string_pretty(&output)?))
224    }
225
226    async fn exec_scale(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
227        let replicas = params
228            .replicas
229            .ok_or_else(|| anyhow!("'replicas' is required for the scale action"))?;
230        let action = manager.scale(replicas).await?;
231        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
232            "action": action.action,
233            "success": action.success,
234            "message": action.message,
235            "timestamp": action.timestamp,
236        }))?))
237    }
238
239    async fn exec_rolling_restart(manager: &K8sManager) -> Result<ToolResult> {
240        let action = manager.rolling_restart().await?;
241        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
242            "action": action.action,
243            "success": action.success,
244            "message": action.message,
245            "timestamp": action.timestamp,
246        }))?))
247    }
248
249    async fn exec_spawn_pod(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
250        let subagent_id = params
251            .subagent_id
252            .as_deref()
253            .ok_or_else(|| anyhow!("'subagent_id' is required for the spawn_pod action"))?;
254
255        let spec = crate::k8s::SubagentPodSpec {
256            image: params.image,
257            env_vars: params.env_vars.unwrap_or_default(),
258            labels: params.labels.unwrap_or_default(),
259            command: params.command,
260            args: params.args,
261        };
262
263        let action = manager
264            .spawn_subagent_pod_with_spec(subagent_id, spec)
265            .await?;
266        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
267            "action": action.action,
268            "success": action.success,
269            "message": action.message,
270            "timestamp": action.timestamp,
271        }))?))
272    }
273
274    async fn exec_delete_pod(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
275        let subagent_id = params
276            .subagent_id
277            .as_deref()
278            .ok_or_else(|| anyhow!("'subagent_id' is required for the delete_pod action"))?;
279        let action = manager.delete_subagent_pod(subagent_id).await?;
280        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
281            "action": action.action,
282            "success": action.success,
283            "message": action.message,
284            "timestamp": action.timestamp,
285        }))?))
286    }
287
288    async fn exec_pod_state(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
289        let subagent_id = params
290            .subagent_id
291            .as_deref()
292            .ok_or_else(|| anyhow!("'subagent_id' is required for the pod_state action"))?;
293
294        let state = manager
295            .get_subagent_pod_state(subagent_id)
296            .await?
297            .ok_or_else(|| anyhow!("Pod for sub-agent '{}' not found", subagent_id))?;
298
299        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
300            "pod_name": state.pod_name,
301            "phase": state.phase,
302            "ready": state.ready,
303            "terminated": state.terminated,
304            "exit_code": state.exit_code,
305            "reason": state.reason,
306            "restart_count": state.restart_count,
307        }))?))
308    }
309
310    async fn exec_logs(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
311        let subagent_id = params
312            .subagent_id
313            .as_deref()
314            .ok_or_else(|| anyhow!("'subagent_id' is required for the logs action"))?;
315        let logs = manager
316            .subagent_logs(subagent_id, params.tail_lines)
317            .await?;
318        Ok(ToolResult::success(logs))
319    }
320
321    async fn exec_recent_actions(manager: &K8sManager) -> Result<ToolResult> {
322        let actions = manager.recent_actions(20).await;
323        let actions_json: Vec<Value> = actions
324            .iter()
325            .map(|a| {
326                json!({
327                    "action": a.action,
328                    "success": a.success,
329                    "message": a.message,
330                    "timestamp": a.timestamp,
331                })
332            })
333            .collect();
334
335        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
336            "actions": actions_json,
337            "count": actions_json.len(),
338        }))?))
339    }
340}