Skip to main content

codetether_agent/k8s/
mod.rs

1//! Kubernetes self-deployment and pod management.
2//!
3//! Allows the CodeTether agent to manage its own lifecycle on Kubernetes:
4//! - Detect whether it is running inside a K8s cluster
5//! - Read its own pod/deployment metadata
6//! - Scale its own deployment replica count
7//! - Perform rolling restarts
8//! - Create new pods for swarm sub-agents
9//! - Monitor pod health and recover from failures
10//!
11//! All K8s operations are audit-logged.
12
13use anyhow::{Context, Result, anyhow};
14use chrono::Utc;
15use k8s_openapi::api::apps::v1::Deployment;
16use k8s_openapi::api::core::v1::Pod;
17use kube::{
18    Api, Client, Config as KubeConfig,
19    api::{ListParams, Patch, PatchParams, PostParams},
20};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::RwLock;
25
26/// Status of the K8s integration.
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct K8sStatus {
29    /// Whether we are running inside a K8s cluster.
30    pub in_cluster: bool,
31    /// Current namespace.
32    pub namespace: String,
33    /// This pod's name (if detectable).
34    pub pod_name: Option<String>,
35    /// Deployment name managing this pod (if detectable).
36    pub deployment_name: Option<String>,
37    /// Current replica count.
38    pub replicas: Option<i32>,
39    /// Available replica count.
40    pub available_replicas: Option<i32>,
41}
42
43/// Result of a self-deployment action.
44#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct DeployAction {
46    pub action: String,
47    pub success: bool,
48    pub message: String,
49    pub timestamp: String,
50}
51
52/// Kubernetes self-deployment manager.
53#[derive(Clone)]
54pub struct K8sManager {
55    client: Option<Client>,
56    namespace: String,
57    pod_name: Option<String>,
58    deployment_name: Option<String>,
59    actions: Arc<RwLock<Vec<DeployAction>>>,
60}
61
62impl std::fmt::Debug for K8sManager {
63    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64        f.debug_struct("K8sManager")
65            .field("namespace", &self.namespace)
66            .field("pod_name", &self.pod_name)
67            .field("deployment_name", &self.deployment_name)
68            .field("connected", &self.client.is_some())
69            .finish()
70    }
71}
72
73impl K8sManager {
74    /// Attempt to initialize from in-cluster configuration.
75    /// Returns a manager even if not running in K8s (with client = None).
76    pub async fn new() -> Self {
77        let namespace = std::env::var("CODETETHER_K8S_NAMESPACE")
78            .or_else(|_| Self::read_namespace_file())
79            .unwrap_or_else(|_| "default".to_string());
80
81        let pod_name = std::env::var("HOSTNAME")
82            .ok()
83            .or_else(|| std::env::var("CODETETHER_POD_NAME").ok());
84
85        let deployment_name = std::env::var("CODETETHER_DEPLOYMENT_NAME").ok();
86
87        let client = match KubeConfig::incluster() {
88            Ok(config) => match Client::try_from(config) {
89                Ok(c) => {
90                    tracing::info!(
91                        namespace = %namespace,
92                        pod = pod_name.as_deref().unwrap_or("-"),
93                        "K8s client initialized (in-cluster)"
94                    );
95                    Some(c)
96                }
97                Err(e) => {
98                    tracing::debug!("Failed to create in-cluster K8s client: {}", e);
99                    None
100                }
101            },
102            Err(_) => {
103                // Try loading from KUBECONFIG for local development.
104                match KubeConfig::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await
105                {
106                    Ok(config) => match Client::try_from(config) {
107                        Ok(c) => {
108                            tracing::info!(namespace = %namespace, "K8s client initialized (kubeconfig)");
109                            Some(c)
110                        }
111                        Err(e) => {
112                            tracing::debug!("Failed to create K8s client from kubeconfig: {}", e);
113                            None
114                        }
115                    },
116                    Err(_) => {
117                        tracing::debug!(
118                            "Not running in K8s and no kubeconfig found — K8s features disabled"
119                        );
120                        None
121                    }
122                }
123            }
124        };
125
126        Self {
127            client,
128            namespace,
129            pod_name,
130            deployment_name,
131            actions: Arc::new(RwLock::new(Vec::new())),
132        }
133    }
134
135    /// Read /var/run/secrets/kubernetes.io/serviceaccount/namespace
136    fn read_namespace_file() -> Result<String, std::env::VarError> {
137        std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
138            .map(|s| s.trim().to_string())
139            .map_err(|_| std::env::VarError::NotPresent)
140    }
141
142    /// Whether K8s integration is available.
143    pub fn is_available(&self) -> bool {
144        self.client.is_some()
145    }
146
147    /// Get current status.
148    pub async fn status(&self) -> K8sStatus {
149        let (replicas, available) = if let Some(ref client) = self.client {
150            if let Some(ref dep_name) = self.deployment_name {
151                self.get_deployment_replicas(client, dep_name).await
152            } else {
153                (None, None)
154            }
155        } else {
156            (None, None)
157        };
158
159        K8sStatus {
160            in_cluster: self.client.is_some(),
161            namespace: self.namespace.clone(),
162            pod_name: self.pod_name.clone(),
163            deployment_name: self.deployment_name.clone(),
164            replicas,
165            available_replicas: available,
166        }
167    }
168
169    async fn get_deployment_replicas(
170        &self,
171        client: &Client,
172        name: &str,
173    ) -> (Option<i32>, Option<i32>) {
174        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
175        match deployments.get(name).await {
176            Ok(dep) => {
177                let spec_replicas = dep.spec.as_ref().and_then(|s| s.replicas);
178                let available = dep.status.as_ref().and_then(|s| s.available_replicas);
179                (spec_replicas, available)
180            }
181            Err(e) => {
182                tracing::warn!("Failed to get deployment {}: {}", name, e);
183                (None, None)
184            }
185        }
186    }
187
188    /// Scale the agent's own deployment.
189    pub async fn scale(&self, replicas: i32) -> Result<DeployAction> {
190        let client = self
191            .client
192            .as_ref()
193            .ok_or_else(|| anyhow!("K8s client not available — cannot scale"))?;
194        let dep_name = self
195            .deployment_name
196            .as_ref()
197            .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
198
199        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
200
201        let patch = serde_json::json!({
202            "spec": {
203                "replicas": replicas
204            }
205        });
206
207        deployments
208            .patch(
209                dep_name,
210                &PatchParams::apply("codetether"),
211                &Patch::Merge(&patch),
212            )
213            .await
214            .with_context(|| {
215                format!(
216                    "Failed to scale deployment {} to {} replicas",
217                    dep_name, replicas
218                )
219            })?;
220
221        let action = DeployAction {
222            action: format!("scale:{}", replicas),
223            success: true,
224            message: format!("Scaled deployment '{}' to {} replicas", dep_name, replicas),
225            timestamp: Utc::now().to_rfc3339(),
226        };
227
228        tracing::info!(
229            deployment = %dep_name,
230            replicas = replicas,
231            "Self-deployment: scaled"
232        );
233
234        self.record_action(action.clone()).await;
235        Ok(action)
236    }
237
238    /// Perform a rolling restart of the agent's deployment.
239    pub async fn rolling_restart(&self) -> Result<DeployAction> {
240        let client = self
241            .client
242            .as_ref()
243            .ok_or_else(|| anyhow!("K8s client not available — cannot restart"))?;
244        let dep_name = self
245            .deployment_name
246            .as_ref()
247            .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
248
249        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
250
251        // Trigger rolling restart by updating the restart annotation.
252        let patch = serde_json::json!({
253            "spec": {
254                "template": {
255                    "metadata": {
256                        "annotations": {
257                            "codetether.run/restartedAt": Utc::now().to_rfc3339()
258                        }
259                    }
260                }
261            }
262        });
263
264        deployments
265            .patch(
266                dep_name,
267                &PatchParams::apply("codetether"),
268                &Patch::Merge(&patch),
269            )
270            .await
271            .with_context(|| format!("Failed to trigger rolling restart for {}", dep_name))?;
272
273        let action = DeployAction {
274            action: "rolling_restart".to_string(),
275            success: true,
276            message: format!("Triggered rolling restart for deployment '{}'", dep_name),
277            timestamp: Utc::now().to_rfc3339(),
278        };
279
280        tracing::info!(deployment = %dep_name, "Self-deployment: rolling restart");
281
282        self.record_action(action.clone()).await;
283        Ok(action)
284    }
285
286    /// List pods belonging to our deployment.
287    pub async fn list_pods(&self) -> Result<Vec<PodInfo>> {
288        let client = self
289            .client
290            .as_ref()
291            .ok_or_else(|| anyhow!("K8s client not available"))?;
292
293        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
294
295        let label_selector = self
296            .deployment_name
297            .as_ref()
298            .map(|n| format!("app={}", n))
299            .unwrap_or_else(|| "app=codetether".to_string());
300
301        let list = pods
302            .list(&ListParams::default().labels(&label_selector))
303            .await
304            .context("Failed to list pods")?;
305
306        let infos: Vec<PodInfo> = list
307            .items
308            .iter()
309            .map(|pod| {
310                let name = pod.metadata.name.clone().unwrap_or_default();
311                let phase = pod
312                    .status
313                    .as_ref()
314                    .and_then(|s| s.phase.clone())
315                    .unwrap_or_else(|| "Unknown".to_string());
316                let ready = pod
317                    .status
318                    .as_ref()
319                    .and_then(|s| s.conditions.as_ref())
320                    .map(|conditions| {
321                        conditions
322                            .iter()
323                            .any(|c| c.type_ == "Ready" && c.status == "True")
324                    })
325                    .unwrap_or(false);
326                let start_time = pod
327                    .status
328                    .as_ref()
329                    .and_then(|s| s.start_time.as_ref())
330                    .map(|t| t.0.to_string());
331
332                PodInfo {
333                    name,
334                    phase,
335                    ready,
336                    start_time,
337                }
338            })
339            .collect();
340
341        Ok(infos)
342    }
343
344    /// Spawn a new pod for a swarm sub-agent.
345    pub async fn spawn_subagent_pod(
346        &self,
347        subagent_id: &str,
348        image: Option<&str>,
349        env_vars: HashMap<String, String>,
350    ) -> Result<DeployAction> {
351        let client = self
352            .client
353            .as_ref()
354            .ok_or_else(|| anyhow!("K8s client not available — cannot spawn sub-agent pod"))?;
355
356        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
357
358        let image = image.unwrap_or("ghcr.io/rileyseaburg/codetether-agent:latest");
359        let pod_name = format!(
360            "codetether-subagent-{}",
361            &subagent_id[..8.min(subagent_id.len())]
362        );
363
364        let mut env_list: Vec<serde_json::Value> = env_vars
365            .iter()
366            .map(|(k, v)| serde_json::json!({ "name": k, "value": v }))
367            .collect();
368        env_list
369            .push(serde_json::json!({ "name": "CODETETHER_SUBAGENT_ID", "value": subagent_id }));
370        env_list.push(
371            serde_json::json!({ "name": "CODETETHER_K8S_NAMESPACE", "value": &self.namespace }),
372        );
373
374        let pod_manifest: Pod = serde_json::from_value(serde_json::json!({
375            "apiVersion": "v1",
376            "kind": "Pod",
377            "metadata": {
378                "name": pod_name,
379                "namespace": &self.namespace,
380                "labels": {
381                    "app": "codetether",
382                    "codetether.run/role": "subagent",
383                    "codetether.run/subagent-id": subagent_id
384                }
385            },
386            "spec": {
387                "restartPolicy": "Never",
388                "containers": [{
389                    "name": "agent",
390                    "image": image,
391                    "env": env_list,
392                    "resources": {
393                        "requests": { "memory": "256Mi", "cpu": "250m" },
394                        "limits": { "memory": "1Gi", "cpu": "1000m" }
395                    }
396                }]
397            }
398        }))?;
399
400        pods.create(&PostParams::default(), &pod_manifest)
401            .await
402            .with_context(|| format!("Failed to create sub-agent pod {}", pod_name))?;
403
404        let action = DeployAction {
405            action: format!("spawn_subagent:{}", subagent_id),
406            success: true,
407            message: format!(
408                "Created sub-agent pod '{}' in namespace '{}'",
409                pod_name, self.namespace
410            ),
411            timestamp: Utc::now().to_rfc3339(),
412        };
413
414        tracing::info!(
415            pod = %pod_name,
416            subagent_id = %subagent_id,
417            "Self-deployment: spawned sub-agent pod"
418        );
419
420        self.record_action(action.clone()).await;
421        Ok(action)
422    }
423
424    /// Delete a sub-agent pod.
425    pub async fn delete_subagent_pod(&self, subagent_id: &str) -> Result<DeployAction> {
426        let client = self
427            .client
428            .as_ref()
429            .ok_or_else(|| anyhow!("K8s client not available"))?;
430
431        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
432        let pod_name = format!(
433            "codetether-subagent-{}",
434            &subagent_id[..8.min(subagent_id.len())]
435        );
436
437        pods.delete(&pod_name, &kube::api::DeleteParams::default())
438            .await
439            .with_context(|| format!("Failed to delete pod {}", pod_name))?;
440
441        let action = DeployAction {
442            action: format!("delete_subagent:{}", subagent_id),
443            success: true,
444            message: format!("Deleted sub-agent pod '{}'", pod_name),
445            timestamp: Utc::now().to_rfc3339(),
446        };
447
448        self.record_action(action.clone()).await;
449        Ok(action)
450    }
451
452    /// Get recent deployment actions.
453    pub async fn recent_actions(&self, limit: usize) -> Vec<DeployAction> {
454        let actions = self.actions.read().await;
455        actions.iter().rev().take(limit).cloned().collect()
456    }
457
458    async fn record_action(&self, action: DeployAction) {
459        let mut actions = self.actions.write().await;
460        actions.push(action);
461        // Keep bounded.
462        while actions.len() > 1000 {
463            actions.remove(0);
464        }
465    }
466}
467
468/// Summary information about a pod.
469#[derive(Debug, Clone, Serialize, Deserialize)]
470pub struct PodInfo {
471    pub name: String,
472    pub phase: String,
473    pub ready: bool,
474    pub start_time: Option<String>,
475}
476
477#[cfg(test)]
478mod tests {
479    use super::*;
480
481    #[tokio::test]
482    async fn k8s_manager_initializes_without_cluster() {
483        let mgr = K8sManager::new().await;
484        // In CI/local dev, likely no K8s cluster available.
485        let status = mgr.status().await;
486        assert_eq!(status.namespace.is_empty(), false);
487    }
488}