Skip to main content

codetether_agent/tool/
k8s_tool.rs

1//! Kubernetes tool: manage K8s resources from the agent.
2//!
3//! Exposes cluster introspection, pod/deployment management, scaling,
4//! rolling restarts, sub-agent pod lifecycle, and log retrieval through
5//! a single `kubernetes` tool with an `action` parameter.
6
7use super::{Tool, ToolResult};
8use crate::k8s::K8sManager;
9use anyhow::{Result, anyhow};
10use async_trait::async_trait;
11use serde::Deserialize;
12use serde_json::{Value, json};
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::OnceCell;
16
17#[derive(Debug, Clone, Deserialize)]
18struct K8sInput {
19    action: K8sAction,
20    #[serde(default)]
21    namespace: Option<String>,
22    #[serde(default)]
23    deployment: Option<String>,
24    #[serde(default)]
25    replicas: Option<i32>,
26    #[serde(default)]
27    subagent_id: Option<String>,
28    #[serde(default)]
29    image: Option<String>,
30    #[serde(default)]
31    env_vars: Option<HashMap<String, String>>,
32    #[serde(default)]
33    labels: Option<HashMap<String, String>>,
34    #[serde(default)]
35    command: Option<Vec<String>>,
36    #[serde(default)]
37    args: Option<Vec<String>>,
38    #[serde(default = "default_tail_lines")]
39    tail_lines: i64,
40    #[serde(default)]
41    label_selector: Option<String>,
42}
43
44fn default_tail_lines() -> i64 {
45    100
46}
47
48#[derive(Debug, Clone, Deserialize, PartialEq, Eq)]
49#[serde(rename_all = "snake_case")]
50enum K8sAction {
51    Status,
52    ListPods,
53    Scale,
54    RollingRestart,
55    SpawnPod,
56    DeletePod,
57    PodState,
58    Logs,
59    RecentActions,
60}
61
62pub struct K8sTool {
63    manager: Arc<OnceCell<K8sManager>>,
64}
65
66impl K8sTool {
67    pub fn new() -> Self {
68        Self {
69            manager: Arc::new(OnceCell::new()),
70        }
71    }
72
73    /// Create a K8sTool with a pre-initialized manager.
74    pub fn with_manager(manager: K8sManager) -> Self {
75        let cell = OnceCell::new();
76        let _ = cell.set(manager);
77        Self {
78            manager: Arc::new(cell),
79        }
80    }
81
82    async fn get_manager(&self) -> &K8sManager {
83        self.manager
84            .get_or_init(|| async { K8sManager::new().await })
85            .await
86    }
87}
88
89impl Default for K8sTool {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95#[async_trait]
96impl Tool for K8sTool {
97    fn id(&self) -> &str {
98        "kubernetes"
99    }
100    fn name(&self) -> &str {
101        "kubernetes"
102    }
103
104    fn description(&self) -> &str {
105        "Manage Kubernetes resources: check cluster status, list pods, scale deployments, trigger rolling restarts, spawn/delete sub-agent pods, fetch pod logs, and view recent management actions. Works both in-cluster and with local kubeconfig."
106    }
107
108    fn parameters(&self) -> Value {
109        json!({
110            "type": "object",
111            "properties": {
112                "action": {
113                    "type": "string",
114                    "enum": ["status","list_pods","scale","rolling_restart",
115                              "spawn_pod","delete_pod","pod_state","logs","recent_actions"],
116                    "description": "The Kubernetes action to perform."
117                },
118                "namespace": {
119                    "type": "string",
120                    "description": "Namespace override for status, list_pods, scale, rolling_restart, spawn_pod, delete_pod, pod_state, and logs. Ignored by recent_actions."
121                },
122                "deployment": {
123                    "type": "string",
124                    "description": "Deployment override for status, scale, rolling_restart, and list_pods only. Ignored by pod and recent_actions operations."
125                },
126                "replicas": {
127                    "type": "integer",
128                    "description": "Number of replicas (required for scale)."
129                },
130                "subagent_id": {
131                    "type": "string",
132                    "description": "Sub-agent identifier (for spawn_pod, delete_pod, pod_state, logs)."
133                },
134                "image": {
135                    "type": "string",
136                    "description": "Container image for spawned sub-agent pods."
137                },
138                "env_vars": {
139                    "type": "object",
140                    "additionalProperties": {"type": "string"},
141                    "description": "Environment variables for spawned sub-agent pods."
142                },
143                "labels": {
144                    "type": "object",
145                    "additionalProperties": {"type": "string"},
146                    "description": "Labels for spawned sub-agent pods."
147                },
148                "command": {
149                    "type": "array", "items": {"type": "string"},
150                    "description": "Command override for spawned sub-agent pods."
151                },
152                "args": {
153                    "type": "array", "items": {"type": "string"},
154                    "description": "Args override for spawned sub-agent pods."
155                },
156                "tail_lines": {
157                    "type": "integer",
158                    "description": "Number of tail lines for logs (default: 100).",
159                    "default": 100
160                },
161                "label_selector": {
162                    "type": "string",
163                    "description": "Label selector for list_pods (e.g. 'app=myapp')."
164                }
165            },
166            "required": ["action"]
167        })
168    }
169
170    async fn execute(&self, input: Value) -> Result<ToolResult> {
171        let params: K8sInput = serde_json::from_value(input)?;
172        let manager = self.get_manager().await;
173
174        if !manager.is_available() {
175            return Ok(ToolResult::error(
176                "Kubernetes is not available. Ensure the agent is running inside a cluster or KUBECONFIG is set.",
177            ));
178        }
179
180        match params.action.clone() {
181            K8sAction::Status => Self::exec_status(manager, params).await,
182            K8sAction::ListPods => Self::exec_list_pods(manager, params).await,
183            K8sAction::Scale => Self::exec_scale(manager, params).await,
184            K8sAction::RollingRestart => Self::exec_rolling_restart(manager, params).await,
185            K8sAction::SpawnPod => Self::exec_spawn_pod(manager, params).await,
186            K8sAction::DeletePod => Self::exec_delete_pod(manager, params).await,
187            K8sAction::PodState => Self::exec_pod_state(manager, params).await,
188            K8sAction::Logs => Self::exec_logs(manager, params).await,
189            K8sAction::RecentActions => Self::exec_recent_actions(manager).await,
190        }
191    }
192}
193
194// ---------------------------------------------------------------------------
195// Action handlers
196// ---------------------------------------------------------------------------
197
198impl K8sTool {
199    async fn exec_status(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
200        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
201        let status = target.status().await;
202        let output = serde_json::to_string_pretty(&json!({
203            "in_cluster": status.in_cluster,
204            "namespace": status.namespace,
205            "pod_name": status.pod_name,
206            "deployment_name": status.deployment_name,
207            "replicas": status.replicas,
208            "available_replicas": status.available_replicas,
209        }))?;
210        Ok(ToolResult::success(output))
211    }
212
213    async fn exec_list_pods(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
214        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
215        let pods = target
216            .list_pods_with_selector(params.label_selector.as_deref())
217            .await?;
218        let pods_json: Vec<Value> = pods
219            .iter()
220            .map(|p| {
221                json!({
222                    "name": p.name,
223                    "phase": p.phase,
224                    "ready": p.ready,
225                    "start_time": p.start_time,
226                })
227            })
228            .collect();
229
230        let output = json!({
231            "namespace": target.status().await.namespace,
232            "pods": pods_json,
233            "count": pods_json.len(),
234        });
235        Ok(ToolResult::success(serde_json::to_string_pretty(&output)?))
236    }
237
238    async fn exec_scale(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
239        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
240        let replicas = params
241            .replicas
242            .ok_or_else(|| anyhow!("'replicas' is required for the scale action"))?;
243        let action = target.scale(replicas).await?;
244        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
245            "action": action.action,
246            "success": action.success,
247            "message": action.message,
248            "timestamp": action.timestamp,
249        }))?))
250    }
251
252    async fn exec_rolling_restart(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
253        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
254        let action = target.rolling_restart().await?;
255        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
256            "action": action.action,
257            "success": action.success,
258            "message": action.message,
259            "timestamp": action.timestamp,
260        }))?))
261    }
262
263    async fn exec_spawn_pod(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
264        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
265        let subagent_id = params
266            .subagent_id
267            .as_deref()
268            .ok_or_else(|| anyhow!("'subagent_id' is required for the spawn_pod action"))?;
269
270        let spec = crate::k8s::SubagentPodSpec {
271            image: params.image,
272            env_vars: params.env_vars.unwrap_or_default(),
273            labels: params.labels.unwrap_or_default(),
274            command: params.command,
275            args: params.args,
276        };
277
278        let action = target
279            .spawn_subagent_pod_with_spec(subagent_id, spec)
280            .await?;
281        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
282            "action": action.action,
283            "success": action.success,
284            "message": action.message,
285            "timestamp": action.timestamp,
286        }))?))
287    }
288
289    async fn exec_delete_pod(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
290        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
291        let subagent_id = params
292            .subagent_id
293            .as_deref()
294            .ok_or_else(|| anyhow!("'subagent_id' is required for the delete_pod action"))?;
295        let action = target.delete_subagent_pod(subagent_id).await?;
296        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
297            "action": action.action,
298            "success": action.success,
299            "message": action.message,
300            "timestamp": action.timestamp,
301        }))?))
302    }
303
304    async fn exec_pod_state(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
305        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
306        let subagent_id = params
307            .subagent_id
308            .as_deref()
309            .ok_or_else(|| anyhow!("'subagent_id' is required for the pod_state action"))?;
310
311        let state = target
312            .get_subagent_pod_state(subagent_id)
313            .await?
314            .ok_or_else(|| anyhow!("Pod for sub-agent '{}' not found", subagent_id))?;
315
316        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
317            "pod_name": state.pod_name,
318            "phase": state.phase,
319            "ready": state.ready,
320            "terminated": state.terminated,
321            "exit_code": state.exit_code,
322            "reason": state.reason,
323            "restart_count": state.restart_count,
324        }))?))
325    }
326
327    async fn exec_logs(manager: &K8sManager, params: K8sInput) -> Result<ToolResult> {
328        let target = manager.scoped(params.namespace.as_deref(), params.deployment.as_deref());
329        let subagent_id = params
330            .subagent_id
331            .as_deref()
332            .ok_or_else(|| anyhow!("'subagent_id' is required for the logs action"))?;
333        let logs = target.subagent_logs(subagent_id, params.tail_lines).await?;
334        Ok(ToolResult::success(logs))
335    }
336
337    async fn exec_recent_actions(manager: &K8sManager) -> Result<ToolResult> {
338        let actions = manager.recent_actions(20).await;
339        let actions_json: Vec<Value> = actions
340            .iter()
341            .map(|a| {
342                json!({
343                    "action": a.action,
344                    "success": a.success,
345                    "message": a.message,
346                    "timestamp": a.timestamp,
347                })
348            })
349            .collect();
350
351        Ok(ToolResult::success(serde_json::to_string_pretty(&json!({
352            "actions": actions_json,
353            "count": actions_json.len(),
354        }))?))
355    }
356}