Skip to main content

codetether_agent/k8s/
mod.rs

1//! Kubernetes self-deployment and pod management.
2//!
3//! Allows the CodeTether agent to manage its own lifecycle on Kubernetes:
4//! - Detect whether it is running inside a K8s cluster
5//! - Read its own pod/deployment metadata
6//! - Scale its own deployment replica count
7//! - Perform rolling restarts
8//! - Create new pods for swarm sub-agents
9//! - Monitor pod health and recover from failures
10//!
11//! All K8s operations are audit-logged.
12
13use anyhow::{Context, Result, anyhow};
14use chrono::Utc;
15use k8s_openapi::api::apps::v1::Deployment;
16use k8s_openapi::api::core::v1::Pod;
17use kube::{
18    Api, Client, Config as KubeConfig,
19    api::{ListParams, LogParams, Patch, PatchParams, PostParams},
20};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27/// Status of the K8s integration.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct K8sStatus {
30    /// Whether we are running inside a K8s cluster.
31    pub in_cluster: bool,
32    /// Current namespace.
33    pub namespace: String,
34    /// This pod's name (if detectable).
35    pub pod_name: Option<String>,
36    /// Deployment name managing this pod (if detectable).
37    pub deployment_name: Option<String>,
38    /// Current replica count.
39    pub replicas: Option<i32>,
40    /// Available replica count.
41    pub available_replicas: Option<i32>,
42}
43
44/// Result of a self-deployment action.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct DeployAction {
47    pub action: String,
48    pub success: bool,
49    pub message: String,
50    pub timestamp: String,
51}
52
53/// Kubernetes pod launch options for a sub-agent.
54#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct SubagentPodSpec {
56    #[serde(default)]
57    pub image: Option<String>,
58    #[serde(default)]
59    pub env_vars: HashMap<String, String>,
60    #[serde(default)]
61    pub labels: HashMap<String, String>,
62    #[serde(default)]
63    pub command: Option<Vec<String>>,
64    #[serde(default)]
65    pub args: Option<Vec<String>>,
66}
67
68/// Summary of a running/completed sub-agent pod.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct SubagentPodState {
71    pub pod_name: String,
72    pub phase: String,
73    pub ready: bool,
74    pub terminated: bool,
75    pub exit_code: Option<i32>,
76    pub reason: Option<String>,
77    pub restart_count: u32,
78}
79
80/// Kubernetes self-deployment manager.
81#[derive(Clone)]
82pub struct K8sManager {
83    client: Option<Client>,
84    namespace: String,
85    pod_name: Option<String>,
86    deployment_name: Option<String>,
87    actions: Arc<RwLock<Vec<DeployAction>>>,
88}
89
90impl std::fmt::Debug for K8sManager {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("K8sManager")
93            .field("namespace", &self.namespace)
94            .field("pod_name", &self.pod_name)
95            .field("deployment_name", &self.deployment_name)
96            .field("connected", &self.client.is_some())
97            .finish()
98    }
99}
100
101impl K8sManager {
102    pub fn subagent_pod_name(subagent_id: &str) -> String {
103        // DNS-1123 label: lowercase alphanumeric or '-', max 63 chars.
104        let mut sanitized: String = subagent_id
105            .to_ascii_lowercase()
106            .chars()
107            .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
108            .collect();
109        sanitized = sanitized.trim_matches('-').to_string();
110        if sanitized.is_empty() {
111            sanitized = "subagent".to_string();
112        }
113
114        let mut hasher = Sha256::new();
115        hasher.update(subagent_id.as_bytes());
116        let hash_hex = hex::encode(hasher.finalize());
117        let hash_suffix = &hash_hex[..10];
118
119        // "codetether-subagent-" + "{name}" + "-" + "{hash}"
120        const PREFIX: &str = "codetether-subagent-";
121        let max_name_len = 63usize
122            .saturating_sub(PREFIX.len())
123            .saturating_sub(1)
124            .saturating_sub(hash_suffix.len());
125        let mut name_part: String = sanitized.chars().take(max_name_len.max(1)).collect();
126        name_part = name_part.trim_matches('-').to_string();
127        if name_part.is_empty() {
128            name_part = "subagent".to_string();
129        }
130
131        format!("{PREFIX}{name_part}-{hash_suffix}")
132    }
133
134    /// Attempt to initialize from in-cluster configuration.
135    /// Returns a manager even if not running in K8s (with client = None).
136    pub async fn new() -> Self {
137        let namespace = std::env::var("CODETETHER_K8S_NAMESPACE")
138            .or_else(|_| Self::read_namespace_file())
139            .unwrap_or_else(|_| "default".to_string());
140
141        let pod_name = std::env::var("HOSTNAME")
142            .ok()
143            .or_else(|| std::env::var("CODETETHER_POD_NAME").ok());
144
145        let deployment_name = std::env::var("CODETETHER_DEPLOYMENT_NAME").ok();
146
147        let client = match KubeConfig::incluster() {
148            Ok(config) => match Client::try_from(config) {
149                Ok(c) => {
150                    tracing::info!(
151                        namespace = %namespace,
152                        pod = pod_name.as_deref().unwrap_or("-"),
153                        "K8s client initialized (in-cluster)"
154                    );
155                    Some(c)
156                }
157                Err(e) => {
158                    tracing::debug!("Failed to create in-cluster K8s client: {}", e);
159                    None
160                }
161            },
162            Err(_) => {
163                // Try loading from KUBECONFIG for local development.
164                match KubeConfig::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await
165                {
166                    Ok(config) => match Client::try_from(config) {
167                        Ok(c) => {
168                            tracing::info!(namespace = %namespace, "K8s client initialized (kubeconfig)");
169                            Some(c)
170                        }
171                        Err(e) => {
172                            tracing::debug!("Failed to create K8s client from kubeconfig: {}", e);
173                            None
174                        }
175                    },
176                    Err(_) => {
177                        tracing::debug!(
178                            "Not running in K8s and no kubeconfig found — K8s features disabled"
179                        );
180                        None
181                    }
182                }
183            }
184        };
185
186        Self {
187            client,
188            namespace,
189            pod_name,
190            deployment_name,
191            actions: Arc::new(RwLock::new(Vec::new())),
192        }
193    }
194
195    /// Read /var/run/secrets/kubernetes.io/serviceaccount/namespace
196    fn read_namespace_file() -> Result<String, std::env::VarError> {
197        std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
198            .map(|s| s.trim().to_string())
199            .map_err(|_| std::env::VarError::NotPresent)
200    }
201
202    /// Whether K8s integration is available.
203    pub fn is_available(&self) -> bool {
204        self.client.is_some()
205    }
206
207    /// Get current status.
208    pub async fn status(&self) -> K8sStatus {
209        let (replicas, available) = if let Some(ref client) = self.client {
210            if let Some(ref dep_name) = self.deployment_name {
211                self.get_deployment_replicas(client, dep_name).await
212            } else {
213                (None, None)
214            }
215        } else {
216            (None, None)
217        };
218
219        K8sStatus {
220            in_cluster: self.client.is_some(),
221            namespace: self.namespace.clone(),
222            pod_name: self.pod_name.clone(),
223            deployment_name: self.deployment_name.clone(),
224            replicas,
225            available_replicas: available,
226        }
227    }
228
229    async fn get_deployment_replicas(
230        &self,
231        client: &Client,
232        name: &str,
233    ) -> (Option<i32>, Option<i32>) {
234        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
235        match deployments.get(name).await {
236            Ok(dep) => {
237                let spec_replicas = dep.spec.as_ref().and_then(|s| s.replicas);
238                let available = dep.status.as_ref().and_then(|s| s.available_replicas);
239                (spec_replicas, available)
240            }
241            Err(e) => {
242                tracing::warn!("Failed to get deployment {}: {}", name, e);
243                (None, None)
244            }
245        }
246    }
247
248    /// Scale the agent's own deployment.
249    pub async fn scale(&self, replicas: i32) -> Result<DeployAction> {
250        let client = self
251            .client
252            .as_ref()
253            .ok_or_else(|| anyhow!("K8s client not available — cannot scale"))?;
254        let dep_name = self
255            .deployment_name
256            .as_ref()
257            .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
258
259        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
260
261        let patch = serde_json::json!({
262            "spec": {
263                "replicas": replicas
264            }
265        });
266
267        deployments
268            .patch(
269                dep_name,
270                &PatchParams::apply("codetether"),
271                &Patch::Merge(&patch),
272            )
273            .await
274            .with_context(|| {
275                format!(
276                    "Failed to scale deployment {} to {} replicas",
277                    dep_name, replicas
278                )
279            })?;
280
281        let action = DeployAction {
282            action: format!("scale:{}", replicas),
283            success: true,
284            message: format!("Scaled deployment '{}' to {} replicas", dep_name, replicas),
285            timestamp: Utc::now().to_rfc3339(),
286        };
287
288        tracing::info!(
289            deployment = %dep_name,
290            replicas = replicas,
291            "Self-deployment: scaled"
292        );
293
294        self.record_action(action.clone()).await;
295        Ok(action)
296    }
297
298    /// Perform a rolling restart of the agent's deployment.
299    pub async fn rolling_restart(&self) -> Result<DeployAction> {
300        let client = self
301            .client
302            .as_ref()
303            .ok_or_else(|| anyhow!("K8s client not available — cannot restart"))?;
304        let dep_name = self
305            .deployment_name
306            .as_ref()
307            .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
308
309        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
310
311        // Trigger rolling restart by updating the restart annotation.
312        let patch = serde_json::json!({
313            "spec": {
314                "template": {
315                    "metadata": {
316                        "annotations": {
317                            "codetether.run/restartedAt": Utc::now().to_rfc3339()
318                        }
319                    }
320                }
321            }
322        });
323
324        deployments
325            .patch(
326                dep_name,
327                &PatchParams::apply("codetether"),
328                &Patch::Merge(&patch),
329            )
330            .await
331            .with_context(|| format!("Failed to trigger rolling restart for {}", dep_name))?;
332
333        let action = DeployAction {
334            action: "rolling_restart".to_string(),
335            success: true,
336            message: format!("Triggered rolling restart for deployment '{}'", dep_name),
337            timestamp: Utc::now().to_rfc3339(),
338        };
339
340        tracing::info!(deployment = %dep_name, "Self-deployment: rolling restart");
341
342        self.record_action(action.clone()).await;
343        Ok(action)
344    }
345
346    /// List pods belonging to our deployment.
347    pub async fn list_pods(&self) -> Result<Vec<PodInfo>> {
348        let client = self
349            .client
350            .as_ref()
351            .ok_or_else(|| anyhow!("K8s client not available"))?;
352
353        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
354
355        let label_selector = self
356            .deployment_name
357            .as_ref()
358            .map(|n| format!("app={}", n))
359            .unwrap_or_else(|| "app=codetether".to_string());
360
361        let list = pods
362            .list(&ListParams::default().labels(&label_selector))
363            .await
364            .context("Failed to list pods")?;
365
366        let infos: Vec<PodInfo> = list
367            .items
368            .iter()
369            .map(|pod| {
370                let name = pod.metadata.name.clone().unwrap_or_default();
371                let phase = pod
372                    .status
373                    .as_ref()
374                    .and_then(|s| s.phase.clone())
375                    .unwrap_or_else(|| "Unknown".to_string());
376                let ready = pod
377                    .status
378                    .as_ref()
379                    .and_then(|s| s.conditions.as_ref())
380                    .map(|conditions| {
381                        conditions
382                            .iter()
383                            .any(|c| c.type_ == "Ready" && c.status == "True")
384                    })
385                    .unwrap_or(false);
386                let start_time = pod
387                    .status
388                    .as_ref()
389                    .and_then(|s| s.start_time.as_ref())
390                    .map(|t| t.0.to_string());
391
392                PodInfo {
393                    name,
394                    phase,
395                    ready,
396                    start_time,
397                }
398            })
399            .collect();
400
401        Ok(infos)
402    }
403
404    /// Spawn a new pod for a swarm sub-agent.
405    pub async fn spawn_subagent_pod(
406        &self,
407        subagent_id: &str,
408        image: Option<&str>,
409        env_vars: HashMap<String, String>,
410    ) -> Result<DeployAction> {
411        self.spawn_subagent_pod_with_spec(
412            subagent_id,
413            SubagentPodSpec {
414                image: image.map(ToString::to_string),
415                env_vars,
416                ..SubagentPodSpec::default()
417            },
418        )
419        .await
420    }
421
422    /// Spawn a new pod for a swarm sub-agent with full pod options.
423    pub async fn spawn_subagent_pod_with_spec(
424        &self,
425        subagent_id: &str,
426        spec: SubagentPodSpec,
427    ) -> Result<DeployAction> {
428        let client = self
429            .client
430            .as_ref()
431            .ok_or_else(|| anyhow!("K8s client not available — cannot spawn sub-agent pod"))?;
432
433        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
434
435        let image = spec
436            .image
437            .as_deref()
438            .unwrap_or("ghcr.io/rileyseaburg/codetether-agent:latest");
439        let pod_name = Self::subagent_pod_name(subagent_id);
440
441        let mut env_list: Vec<serde_json::Value> = spec
442            .env_vars
443            .iter()
444            .map(|(k, v)| serde_json::json!({ "name": k, "value": v }))
445            .collect();
446        env_list
447            .push(serde_json::json!({ "name": "CODETETHER_SUBAGENT_ID", "value": subagent_id }));
448        env_list.push(
449            serde_json::json!({ "name": "CODETETHER_K8S_NAMESPACE", "value": &self.namespace }),
450        );
451
452        let mut labels = serde_json::json!({
453            "app": "codetether",
454            "codetether.run/role": "subagent",
455            "codetether.run/subagent-id": sanitize_label_value(subagent_id)
456        });
457
458        if let Some(map) = labels.as_object_mut() {
459            for (k, v) in &spec.labels {
460                map.insert(k.clone(), serde_json::json!(v));
461            }
462        }
463
464        let pod_manifest: Pod = serde_json::from_value(serde_json::json!({
465            "apiVersion": "v1",
466            "kind": "Pod",
467            "metadata": {
468                "name": pod_name,
469                "namespace": &self.namespace,
470                "labels": labels
471            },
472            "spec": {
473                "restartPolicy": "Never",
474                "containers": [{
475                    "name": "agent",
476                    "image": image,
477                    "env": env_list,
478                    "command": spec.command,
479                    "args": spec.args,
480                    "resources": {
481                        "requests": { "memory": "256Mi", "cpu": "250m" },
482                        "limits": { "memory": "1Gi", "cpu": "1000m" }
483                    }
484                }]
485            }
486        }))?;
487
488        match pods.create(&PostParams::default(), &pod_manifest).await {
489            Ok(_) => {}
490            Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
491                tracing::warn!(
492                    pod = %pod_name,
493                    subagent_id = %subagent_id,
494                    "Sub-agent pod already exists, deleting stale pod and retrying create"
495                );
496                let _ = pods
497                    .delete(&pod_name, &kube::api::DeleteParams::default())
498                    .await;
499                tokio::time::sleep(std::time::Duration::from_millis(600)).await;
500                pods.create(&PostParams::default(), &pod_manifest)
501                    .await
502                    .with_context(|| {
503                        format!("Failed to create sub-agent pod {} after retry", pod_name)
504                    })?;
505            }
506            Err(e) => {
507                return Err(e)
508                    .with_context(|| format!("Failed to create sub-agent pod {pod_name}"));
509            }
510        }
511
512        let action = DeployAction {
513            action: format!("spawn_subagent:{}", subagent_id),
514            success: true,
515            message: format!(
516                "Created sub-agent pod '{}' in namespace '{}'",
517                pod_name, self.namespace
518            ),
519            timestamp: Utc::now().to_rfc3339(),
520        };
521
522        tracing::info!(
523            pod = %pod_name,
524            subagent_id = %subagent_id,
525            "Self-deployment: spawned sub-agent pod"
526        );
527
528        self.record_action(action.clone()).await;
529        Ok(action)
530    }
531
532    /// Delete a sub-agent pod.
533    pub async fn delete_subagent_pod(&self, subagent_id: &str) -> Result<DeployAction> {
534        let client = self
535            .client
536            .as_ref()
537            .ok_or_else(|| anyhow!("K8s client not available"))?;
538
539        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
540        let pod_name = Self::subagent_pod_name(subagent_id);
541
542        match pods
543            .delete(&pod_name, &kube::api::DeleteParams::default())
544            .await
545        {
546            Ok(_) => {}
547            Err(kube::Error::Api(api_err)) if api_err.code == 404 => {
548                tracing::debug!(
549                    pod = %pod_name,
550                    subagent_id = %subagent_id,
551                    "Sub-agent pod already deleted"
552                );
553            }
554            Err(e) => {
555                return Err(e).with_context(|| format!("Failed to delete pod {}", pod_name));
556            }
557        }
558
559        let action = DeployAction {
560            action: format!("delete_subagent:{}", subagent_id),
561            success: true,
562            message: format!("Deleted sub-agent pod '{}'", pod_name),
563            timestamp: Utc::now().to_rfc3339(),
564        };
565
566        self.record_action(action.clone()).await;
567        Ok(action)
568    }
569
570    /// Get pod state for a sub-agent.
571    pub async fn get_subagent_pod_state(
572        &self,
573        subagent_id: &str,
574    ) -> Result<Option<SubagentPodState>> {
575        let client = self
576            .client
577            .as_ref()
578            .ok_or_else(|| anyhow!("K8s client not available"))?;
579
580        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
581        let pod_name = Self::subagent_pod_name(subagent_id);
582
583        let pod = match pods.get_opt(&pod_name).await {
584            Ok(p) => p,
585            Err(e) => {
586                tracing::warn!(pod = %pod_name, error = %e, "Failed to fetch sub-agent pod state");
587                return Ok(None);
588            }
589        };
590
591        let Some(pod) = pod else {
592            return Ok(None);
593        };
594
595        let phase = pod
596            .status
597            .as_ref()
598            .and_then(|s| s.phase.clone())
599            .unwrap_or_else(|| "Unknown".to_string());
600        let ready = pod
601            .status
602            .as_ref()
603            .and_then(|s| s.conditions.as_ref())
604            .map(|conditions| {
605                conditions
606                    .iter()
607                    .any(|c| c.type_ == "Ready" && c.status == "True")
608            })
609            .unwrap_or(false);
610
611        let container_status = pod
612            .status
613            .as_ref()
614            .and_then(|s| s.container_statuses.as_ref())
615            .and_then(|statuses| statuses.first());
616        let terminated = container_status
617            .and_then(|status| status.state.as_ref())
618            .and_then(|state| state.terminated.as_ref())
619            .is_some();
620        let exit_code = container_status
621            .and_then(|status| status.state.as_ref())
622            .and_then(|state| state.terminated.as_ref())
623            .map(|terminated| terminated.exit_code);
624        let reason = container_status
625            .and_then(|status| status.state.as_ref())
626            .and_then(|state| {
627                state
628                    .terminated
629                    .as_ref()
630                    .and_then(|t| t.reason.clone())
631                    .or_else(|| state.waiting.as_ref().and_then(|w| w.reason.clone()))
632            });
633        let restart_count = container_status
634            .map(|status| status.restart_count.max(0) as u32)
635            .unwrap_or(0);
636
637        Ok(Some(SubagentPodState {
638            pod_name,
639            phase,
640            ready,
641            terminated,
642            exit_code,
643            reason,
644            restart_count,
645        }))
646    }
647
648    /// Fetch recent logs for a sub-agent pod.
649    pub async fn subagent_logs(&self, subagent_id: &str, tail_lines: i64) -> Result<String> {
650        let client = self
651            .client
652            .as_ref()
653            .ok_or_else(|| anyhow!("K8s client not available"))?;
654        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
655        let pod_name = Self::subagent_pod_name(subagent_id);
656        let params = LogParams {
657            tail_lines: Some(tail_lines),
658            ..LogParams::default()
659        };
660        pods.logs(&pod_name, &params)
661            .await
662            .with_context(|| format!("Failed to fetch logs for sub-agent pod {pod_name}"))
663    }
664
665    /// Get recent deployment actions.
666    pub async fn recent_actions(&self, limit: usize) -> Vec<DeployAction> {
667        let actions = self.actions.read().await;
668        actions.iter().rev().take(limit).cloned().collect()
669    }
670
671    async fn record_action(&self, action: DeployAction) {
672        let mut actions = self.actions.write().await;
673        actions.push(action);
674        // Keep bounded.
675        while actions.len() > 1000 {
676            actions.remove(0);
677        }
678    }
679}
680
681fn sanitize_label_value(input: &str) -> String {
682    let mut value: String = input
683        .chars()
684        .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
685        .take(63)
686        .collect();
687    value = value
688        .trim_matches(|c| c == '-' || c == '_' || c == '.')
689        .to_string();
690    if value.is_empty() {
691        "subagent".to_string()
692    } else {
693        value
694    }
695}
696
697/// Summary information about a pod.
698#[derive(Debug, Clone, Serialize, Deserialize)]
699pub struct PodInfo {
700    pub name: String,
701    pub phase: String,
702    pub ready: bool,
703    pub start_time: Option<String>,
704}
705
706#[cfg(test)]
707mod tests {
708    use super::*;
709
710    #[test]
711    fn subagent_pod_name_is_sanitized_and_stable() {
712        let pod_name = K8sManager::subagent_pod_name("SubTask_ABC/123");
713        assert!(pod_name.starts_with("codetether-subagent-"));
714        assert!(pod_name.len() <= 63);
715        assert!(
716            pod_name
717                .chars()
718                .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
719        );
720
721        let pod_name_again = K8sManager::subagent_pod_name("SubTask_ABC/123");
722        assert_eq!(pod_name, pod_name_again);
723    }
724
725    #[test]
726    fn subagent_pod_name_avoids_prefix_collisions() {
727        let a = K8sManager::subagent_pod_name("subtask-aaaaaaaa-1111");
728        let b = K8sManager::subagent_pod_name("subtask-aaaaaaaa-2222");
729        assert_ne!(a, b);
730    }
731
732    #[tokio::test]
733    async fn k8s_manager_initializes_without_cluster() {
734        let mgr = K8sManager::new().await;
735        // In CI/local dev, likely no K8s cluster available.
736        let status = mgr.status().await;
737        assert_eq!(status.namespace.is_empty(), false);
738    }
739}