Skip to main content

codetether_agent/k8s/
mod.rs

1//! Kubernetes self-deployment and pod management.
2//!
3//! Allows the CodeTether agent to manage its own lifecycle on Kubernetes:
4//! - Detect whether it is running inside a K8s cluster
5//! - Read its own pod/deployment metadata
6//! - Scale its own deployment replica count
7//! - Perform rolling restarts
8//! - Create new pods for swarm sub-agents
9//! - Monitor pod health and recover from failures
10//!
11//! All K8s operations are audit-logged.
12
13use anyhow::{Context, Result, anyhow};
14use chrono::Utc;
15use k8s_openapi::api::apps::v1::Deployment;
16use k8s_openapi::api::core::v1::Pod;
17use kube::{
18    Api, Client, Config as KubeConfig,
19    api::{ListParams, LogParams, Patch, PatchParams, PostParams},
20};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27/// Status of the K8s integration.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct K8sStatus {
30    /// Whether we are running inside a K8s cluster.
31    pub in_cluster: bool,
32    /// Current namespace.
33    pub namespace: String,
34    /// This pod's name (if detectable).
35    pub pod_name: Option<String>,
36    /// Deployment name managing this pod (if detectable).
37    pub deployment_name: Option<String>,
38    /// Current replica count.
39    pub replicas: Option<i32>,
40    /// Available replica count.
41    pub available_replicas: Option<i32>,
42}
43
44/// Result of a self-deployment action.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct DeployAction {
47    pub action: String,
48    pub success: bool,
49    pub message: String,
50    pub timestamp: String,
51}
52
53/// Kubernetes pod launch options for a sub-agent.
54#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct SubagentPodSpec {
56    #[serde(default)]
57    pub image: Option<String>,
58    #[serde(default)]
59    pub env_vars: HashMap<String, String>,
60    #[serde(default)]
61    pub labels: HashMap<String, String>,
62    #[serde(default)]
63    pub command: Option<Vec<String>>,
64    #[serde(default)]
65    pub args: Option<Vec<String>>,
66}
67
68/// Summary of a running/completed sub-agent pod.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct SubagentPodState {
71    pub pod_name: String,
72    pub phase: String,
73    pub ready: bool,
74    pub terminated: bool,
75    pub exit_code: Option<i32>,
76    pub reason: Option<String>,
77    pub restart_count: u32,
78}
79
80/// Kubernetes self-deployment manager.
81#[derive(Clone)]
82pub struct K8sManager {
83    client: Option<Client>,
84    namespace: String,
85    pod_name: Option<String>,
86    deployment_name: Option<String>,
87    actions: Arc<RwLock<Vec<DeployAction>>>,
88}
89
90impl std::fmt::Debug for K8sManager {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("K8sManager")
93            .field("namespace", &self.namespace)
94            .field("pod_name", &self.pod_name)
95            .field("deployment_name", &self.deployment_name)
96            .field("connected", &self.client.is_some())
97            .finish()
98    }
99}
100
101impl K8sManager {
102    pub fn subagent_pod_name(subagent_id: &str) -> String {
103        // DNS-1123 label: lowercase alphanumeric or '-', max 63 chars.
104        let mut sanitized: String = subagent_id
105            .to_ascii_lowercase()
106            .chars()
107            .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
108            .collect();
109        sanitized = sanitized.trim_matches('-').to_string();
110        if sanitized.is_empty() {
111            sanitized = "subagent".to_string();
112        }
113
114        let mut hasher = Sha256::new();
115        hasher.update(subagent_id.as_bytes());
116        let hash_hex = hex::encode(hasher.finalize());
117        let hash_suffix = &hash_hex[..10];
118
119        // "codetether-subagent-" + "{name}" + "-" + "{hash}"
120        const PREFIX: &str = "codetether-subagent-";
121        let max_name_len = 63usize
122            .saturating_sub(PREFIX.len())
123            .saturating_sub(1)
124            .saturating_sub(hash_suffix.len());
125        let mut name_part: String = sanitized.chars().take(max_name_len.max(1)).collect();
126        name_part = name_part.trim_matches('-').to_string();
127        if name_part.is_empty() {
128            name_part = "subagent".to_string();
129        }
130
131        format!("{PREFIX}{name_part}-{hash_suffix}")
132    }
133
134    /// Attempt to initialize from in-cluster configuration.
135    /// Returns a manager even if not running in K8s (with client = None).
136    pub async fn new() -> Self {
137        // rustls 0.23+ requires selecting a process-level crypto provider.
138        // This path is exercised by unit tests (and library users) without
139        // going through the binary's startup initialization.
140        crate::tls::ensure_rustls_crypto_provider();
141
142        let namespace = std::env::var("CODETETHER_K8S_NAMESPACE")
143            .or_else(|_| Self::read_namespace_file())
144            .unwrap_or_else(|_| "default".to_string());
145
146        let pod_name = std::env::var("HOSTNAME")
147            .ok()
148            .or_else(|| std::env::var("CODETETHER_POD_NAME").ok());
149
150        let deployment_name = std::env::var("CODETETHER_DEPLOYMENT_NAME").ok();
151
152        let client = match KubeConfig::incluster() {
153            Ok(config) => match Client::try_from(config) {
154                Ok(c) => {
155                    tracing::info!(
156                        namespace = %namespace,
157                        pod = pod_name.as_deref().unwrap_or("-"),
158                        "K8s client initialized (in-cluster)"
159                    );
160                    Some(c)
161                }
162                Err(e) => {
163                    tracing::debug!("Failed to create in-cluster K8s client: {}", e);
164                    None
165                }
166            },
167            Err(_) => {
168                // Try loading from KUBECONFIG for local development.
169                match KubeConfig::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await
170                {
171                    Ok(config) => match Client::try_from(config) {
172                        Ok(c) => {
173                            tracing::info!(namespace = %namespace, "K8s client initialized (kubeconfig)");
174                            Some(c)
175                        }
176                        Err(e) => {
177                            tracing::debug!("Failed to create K8s client from kubeconfig: {}", e);
178                            None
179                        }
180                    },
181                    Err(_) => {
182                        tracing::debug!(
183                            "Not running in K8s and no kubeconfig found — K8s features disabled"
184                        );
185                        None
186                    }
187                }
188            }
189        };
190
191        Self {
192            client,
193            namespace,
194            pod_name,
195            deployment_name,
196            actions: Arc::new(RwLock::new(Vec::new())),
197        }
198    }
199
200    /// Read /var/run/secrets/kubernetes.io/serviceaccount/namespace
201    fn read_namespace_file() -> Result<String, std::env::VarError> {
202        std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
203            .map(|s| s.trim().to_string())
204            .map_err(|_| std::env::VarError::NotPresent)
205    }
206
207    /// Whether K8s integration is available.
208    pub fn is_available(&self) -> bool {
209        self.client.is_some()
210    }
211
212    /// Get current status.
213    pub async fn status(&self) -> K8sStatus {
214        let (replicas, available) = if let Some(ref client) = self.client {
215            if let Some(ref dep_name) = self.deployment_name {
216                self.get_deployment_replicas(client, dep_name).await
217            } else {
218                (None, None)
219            }
220        } else {
221            (None, None)
222        };
223
224        K8sStatus {
225            in_cluster: self.client.is_some(),
226            namespace: self.namespace.clone(),
227            pod_name: self.pod_name.clone(),
228            deployment_name: self.deployment_name.clone(),
229            replicas,
230            available_replicas: available,
231        }
232    }
233
234    async fn get_deployment_replicas(
235        &self,
236        client: &Client,
237        name: &str,
238    ) -> (Option<i32>, Option<i32>) {
239        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
240        match deployments.get(name).await {
241            Ok(dep) => {
242                let spec_replicas = dep.spec.as_ref().and_then(|s| s.replicas);
243                let available = dep.status.as_ref().and_then(|s| s.available_replicas);
244                (spec_replicas, available)
245            }
246            Err(e) => {
247                tracing::warn!("Failed to get deployment {}: {}", name, e);
248                (None, None)
249            }
250        }
251    }
252
253    /// Scale the agent's own deployment.
254    pub async fn scale(&self, replicas: i32) -> Result<DeployAction> {
255        let client = self
256            .client
257            .as_ref()
258            .ok_or_else(|| anyhow!("K8s client not available — cannot scale"))?;
259        let dep_name = self
260            .deployment_name
261            .as_ref()
262            .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
263
264        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
265
266        let patch = serde_json::json!({
267            "spec": {
268                "replicas": replicas
269            }
270        });
271
272        deployments
273            .patch(
274                dep_name,
275                &PatchParams::apply("codetether"),
276                &Patch::Merge(&patch),
277            )
278            .await
279            .with_context(|| {
280                format!(
281                    "Failed to scale deployment {} to {} replicas",
282                    dep_name, replicas
283                )
284            })?;
285
286        let action = DeployAction {
287            action: format!("scale:{}", replicas),
288            success: true,
289            message: format!("Scaled deployment '{}' to {} replicas", dep_name, replicas),
290            timestamp: Utc::now().to_rfc3339(),
291        };
292
293        tracing::info!(
294            deployment = %dep_name,
295            replicas = replicas,
296            "Self-deployment: scaled"
297        );
298
299        self.record_action(action.clone()).await;
300        Ok(action)
301    }
302
303    /// Perform a rolling restart of the agent's deployment.
304    pub async fn rolling_restart(&self) -> Result<DeployAction> {
305        let client = self
306            .client
307            .as_ref()
308            .ok_or_else(|| anyhow!("K8s client not available — cannot restart"))?;
309        let dep_name = self
310            .deployment_name
311            .as_ref()
312            .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
313
314        let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
315
316        // Trigger rolling restart by updating the restart annotation.
317        let patch = serde_json::json!({
318            "spec": {
319                "template": {
320                    "metadata": {
321                        "annotations": {
322                            "codetether.run/restartedAt": Utc::now().to_rfc3339()
323                        }
324                    }
325                }
326            }
327        });
328
329        deployments
330            .patch(
331                dep_name,
332                &PatchParams::apply("codetether"),
333                &Patch::Merge(&patch),
334            )
335            .await
336            .with_context(|| format!("Failed to trigger rolling restart for {}", dep_name))?;
337
338        let action = DeployAction {
339            action: "rolling_restart".to_string(),
340            success: true,
341            message: format!("Triggered rolling restart for deployment '{}'", dep_name),
342            timestamp: Utc::now().to_rfc3339(),
343        };
344
345        tracing::info!(deployment = %dep_name, "Self-deployment: rolling restart");
346
347        self.record_action(action.clone()).await;
348        Ok(action)
349    }
350
351    /// List pods belonging to our deployment.
352    pub async fn list_pods(&self) -> Result<Vec<PodInfo>> {
353        let client = self
354            .client
355            .as_ref()
356            .ok_or_else(|| anyhow!("K8s client not available"))?;
357
358        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
359
360        let label_selector = self
361            .deployment_name
362            .as_ref()
363            .map(|n| format!("app={}", n))
364            .unwrap_or_else(|| "app=codetether".to_string());
365
366        let list = pods
367            .list(&ListParams::default().labels(&label_selector))
368            .await
369            .context("Failed to list pods")?;
370
371        let infos: Vec<PodInfo> = list
372            .items
373            .iter()
374            .map(|pod| {
375                let name = pod.metadata.name.clone().unwrap_or_default();
376                let phase = pod
377                    .status
378                    .as_ref()
379                    .and_then(|s| s.phase.clone())
380                    .unwrap_or_else(|| "Unknown".to_string());
381                let ready = pod
382                    .status
383                    .as_ref()
384                    .and_then(|s| s.conditions.as_ref())
385                    .map(|conditions| {
386                        conditions
387                            .iter()
388                            .any(|c| c.type_ == "Ready" && c.status == "True")
389                    })
390                    .unwrap_or(false);
391                let start_time = pod
392                    .status
393                    .as_ref()
394                    .and_then(|s| s.start_time.as_ref())
395                    .map(|t| t.0.to_string());
396
397                PodInfo {
398                    name,
399                    phase,
400                    ready,
401                    start_time,
402                }
403            })
404            .collect();
405
406        Ok(infos)
407    }
408
409    #[allow(dead_code)]
410    /// Spawn a new pod for a swarm sub-agent.
411    pub async fn spawn_subagent_pod(
412        &self,
413        subagent_id: &str,
414        image: Option<&str>,
415        env_vars: HashMap<String, String>,
416    ) -> Result<DeployAction> {
417        self.spawn_subagent_pod_with_spec(
418            subagent_id,
419            SubagentPodSpec {
420                image: image.map(ToString::to_string),
421                env_vars,
422                ..SubagentPodSpec::default()
423            },
424        )
425        .await
426    }
427
428    /// Spawn a new pod for a swarm sub-agent with full pod options.
429    pub async fn spawn_subagent_pod_with_spec(
430        &self,
431        subagent_id: &str,
432        spec: SubagentPodSpec,
433    ) -> Result<DeployAction> {
434        let client = self
435            .client
436            .as_ref()
437            .ok_or_else(|| anyhow!("K8s client not available — cannot spawn sub-agent pod"))?;
438
439        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
440
441        let image = spec
442            .image
443            .as_deref()
444            .unwrap_or("ghcr.io/rileyseaburg/codetether-agent:latest");
445        let pod_name = Self::subagent_pod_name(subagent_id);
446
447        let mut env_list: Vec<serde_json::Value> = spec
448            .env_vars
449            .iter()
450            .map(|(k, v)| serde_json::json!({ "name": k, "value": v }))
451            .collect();
452        env_list
453            .push(serde_json::json!({ "name": "CODETETHER_SUBAGENT_ID", "value": subagent_id }));
454        env_list.push(
455            serde_json::json!({ "name": "CODETETHER_K8S_NAMESPACE", "value": &self.namespace }),
456        );
457
458        let mut labels = serde_json::json!({
459            "app": "codetether",
460            "codetether.run/role": "subagent",
461            "codetether.run/subagent-id": sanitize_label_value(subagent_id)
462        });
463
464        if let Some(map) = labels.as_object_mut() {
465            for (k, v) in &spec.labels {
466                map.insert(k.clone(), serde_json::json!(v));
467            }
468        }
469
470        let pod_manifest: Pod = serde_json::from_value(serde_json::json!({
471            "apiVersion": "v1",
472            "kind": "Pod",
473            "metadata": {
474                "name": pod_name,
475                "namespace": &self.namespace,
476                "labels": labels
477            },
478            "spec": {
479                "restartPolicy": "Never",
480                "containers": [{
481                    "name": "agent",
482                    "image": image,
483                    "env": env_list,
484                    "command": spec.command,
485                    "args": spec.args,
486                    "resources": {
487                        "requests": { "memory": "256Mi", "cpu": "250m" },
488                        "limits": { "memory": "1Gi", "cpu": "1000m" }
489                    }
490                }]
491            }
492        }))?;
493
494        match pods.create(&PostParams::default(), &pod_manifest).await {
495            Ok(_) => {}
496            Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
497                tracing::warn!(
498                    pod = %pod_name,
499                    subagent_id = %subagent_id,
500                    "Sub-agent pod already exists, deleting stale pod and retrying create"
501                );
502                let _ = pods
503                    .delete(&pod_name, &kube::api::DeleteParams::default())
504                    .await;
505                tokio::time::sleep(std::time::Duration::from_millis(600)).await;
506                pods.create(&PostParams::default(), &pod_manifest)
507                    .await
508                    .with_context(|| {
509                        format!("Failed to create sub-agent pod {} after retry", pod_name)
510                    })?;
511            }
512            Err(e) => {
513                return Err(e)
514                    .with_context(|| format!("Failed to create sub-agent pod {pod_name}"));
515            }
516        }
517
518        let action = DeployAction {
519            action: format!("spawn_subagent:{}", subagent_id),
520            success: true,
521            message: format!(
522                "Created sub-agent pod '{}' in namespace '{}'",
523                pod_name, self.namespace
524            ),
525            timestamp: Utc::now().to_rfc3339(),
526        };
527
528        tracing::info!(
529            pod = %pod_name,
530            subagent_id = %subagent_id,
531            "Self-deployment: spawned sub-agent pod"
532        );
533
534        self.record_action(action.clone()).await;
535        Ok(action)
536    }
537
538    /// Delete a sub-agent pod.
539    pub async fn delete_subagent_pod(&self, subagent_id: &str) -> Result<DeployAction> {
540        let client = self
541            .client
542            .as_ref()
543            .ok_or_else(|| anyhow!("K8s client not available"))?;
544
545        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
546        let pod_name = Self::subagent_pod_name(subagent_id);
547
548        match pods
549            .delete(&pod_name, &kube::api::DeleteParams::default())
550            .await
551        {
552            Ok(_) => {}
553            Err(kube::Error::Api(api_err)) if api_err.code == 404 => {
554                tracing::debug!(
555                    pod = %pod_name,
556                    subagent_id = %subagent_id,
557                    "Sub-agent pod already deleted"
558                );
559            }
560            Err(e) => {
561                return Err(e).with_context(|| format!("Failed to delete pod {}", pod_name));
562            }
563        }
564
565        let action = DeployAction {
566            action: format!("delete_subagent:{}", subagent_id),
567            success: true,
568            message: format!("Deleted sub-agent pod '{}'", pod_name),
569            timestamp: Utc::now().to_rfc3339(),
570        };
571
572        self.record_action(action.clone()).await;
573        Ok(action)
574    }
575
576    /// Get pod state for a sub-agent.
577    pub async fn get_subagent_pod_state(
578        &self,
579        subagent_id: &str,
580    ) -> Result<Option<SubagentPodState>> {
581        let client = self
582            .client
583            .as_ref()
584            .ok_or_else(|| anyhow!("K8s client not available"))?;
585
586        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
587        let pod_name = Self::subagent_pod_name(subagent_id);
588
589        let pod = match pods.get_opt(&pod_name).await {
590            Ok(p) => p,
591            Err(e) => {
592                tracing::warn!(pod = %pod_name, error = %e, "Failed to fetch sub-agent pod state");
593                return Ok(None);
594            }
595        };
596
597        let Some(pod) = pod else {
598            return Ok(None);
599        };
600
601        let phase = pod
602            .status
603            .as_ref()
604            .and_then(|s| s.phase.clone())
605            .unwrap_or_else(|| "Unknown".to_string());
606        let ready = pod
607            .status
608            .as_ref()
609            .and_then(|s| s.conditions.as_ref())
610            .map(|conditions| {
611                conditions
612                    .iter()
613                    .any(|c| c.type_ == "Ready" && c.status == "True")
614            })
615            .unwrap_or(false);
616
617        let container_status = pod
618            .status
619            .as_ref()
620            .and_then(|s| s.container_statuses.as_ref())
621            .and_then(|statuses| statuses.first());
622        let terminated = container_status
623            .and_then(|status| status.state.as_ref())
624            .and_then(|state| state.terminated.as_ref())
625            .is_some();
626        let exit_code = container_status
627            .and_then(|status| status.state.as_ref())
628            .and_then(|state| state.terminated.as_ref())
629            .map(|terminated| terminated.exit_code);
630        let reason = container_status
631            .and_then(|status| status.state.as_ref())
632            .and_then(|state| {
633                state
634                    .terminated
635                    .as_ref()
636                    .and_then(|t| t.reason.clone())
637                    .or_else(|| state.waiting.as_ref().and_then(|w| w.reason.clone()))
638            });
639        let restart_count = container_status
640            .map(|status| status.restart_count.max(0) as u32)
641            .unwrap_or(0);
642
643        Ok(Some(SubagentPodState {
644            pod_name,
645            phase,
646            ready,
647            terminated,
648            exit_code,
649            reason,
650            restart_count,
651        }))
652    }
653
654    /// Fetch recent logs for a sub-agent pod.
655    pub async fn subagent_logs(&self, subagent_id: &str, tail_lines: i64) -> Result<String> {
656        let client = self
657            .client
658            .as_ref()
659            .ok_or_else(|| anyhow!("K8s client not available"))?;
660        let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
661        let pod_name = Self::subagent_pod_name(subagent_id);
662        let params = LogParams {
663            tail_lines: Some(tail_lines),
664            ..LogParams::default()
665        };
666        pods.logs(&pod_name, &params)
667            .await
668            .with_context(|| format!("Failed to fetch logs for sub-agent pod {pod_name}"))
669    }
670
671    /// Get recent deployment actions.
672    pub async fn recent_actions(&self, limit: usize) -> Vec<DeployAction> {
673        let actions = self.actions.read().await;
674        actions.iter().rev().take(limit).cloned().collect()
675    }
676
677    async fn record_action(&self, action: DeployAction) {
678        let mut actions = self.actions.write().await;
679        actions.push(action);
680        // Keep bounded.
681        while actions.len() > 1000 {
682            actions.remove(0);
683        }
684    }
685}
686
687fn sanitize_label_value(input: &str) -> String {
688    let mut value: String = input
689        .chars()
690        .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
691        .take(63)
692        .collect();
693    value = value
694        .trim_matches(|c| c == '-' || c == '_' || c == '.')
695        .to_string();
696    if value.is_empty() {
697        "subagent".to_string()
698    } else {
699        value
700    }
701}
702
703/// Summary information about a pod.
704#[derive(Debug, Clone, Serialize, Deserialize)]
705pub struct PodInfo {
706    pub name: String,
707    pub phase: String,
708    pub ready: bool,
709    pub start_time: Option<String>,
710}
711
712#[cfg(test)]
713mod tests {
714    use super::*;
715
716    #[test]
717    fn subagent_pod_name_is_sanitized_and_stable() {
718        let pod_name = K8sManager::subagent_pod_name("SubTask_ABC/123");
719        assert!(pod_name.starts_with("codetether-subagent-"));
720        assert!(pod_name.len() <= 63);
721        assert!(
722            pod_name
723                .chars()
724                .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
725        );
726
727        let pod_name_again = K8sManager::subagent_pod_name("SubTask_ABC/123");
728        assert_eq!(pod_name, pod_name_again);
729    }
730
731    #[test]
732    fn subagent_pod_name_avoids_prefix_collisions() {
733        let a = K8sManager::subagent_pod_name("subtask-aaaaaaaa-1111");
734        let b = K8sManager::subagent_pod_name("subtask-aaaaaaaa-2222");
735        assert_ne!(a, b);
736    }
737
738    #[tokio::test]
739    async fn k8s_manager_initializes_without_cluster() {
740        let mgr = K8sManager::new().await;
741        // In CI/local dev, likely no K8s cluster available.
742        let status = mgr.status().await;
743        assert_eq!(status.namespace.is_empty(), false);
744    }
745}