1use anyhow::{Context, Result, anyhow};
14use chrono::Utc;
15use k8s_openapi::api::apps::v1::Deployment;
16use k8s_openapi::api::core::v1::Pod;
17use kube::{
18 Api, Client, Config as KubeConfig,
19 api::{ListParams, LogParams, Patch, PatchParams, PostParams},
20};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct K8sStatus {
30 pub in_cluster: bool,
32 pub namespace: String,
34 pub pod_name: Option<String>,
36 pub deployment_name: Option<String>,
38 pub replicas: Option<i32>,
40 pub available_replicas: Option<i32>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct DeployAction {
47 pub action: String,
48 pub success: bool,
49 pub message: String,
50 pub timestamp: String,
51}
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct SubagentPodSpec {
56 #[serde(default)]
57 pub image: Option<String>,
58 #[serde(default)]
59 pub env_vars: HashMap<String, String>,
60 #[serde(default)]
61 pub labels: HashMap<String, String>,
62 #[serde(default)]
63 pub command: Option<Vec<String>>,
64 #[serde(default)]
65 pub args: Option<Vec<String>>,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct SubagentPodState {
71 pub pod_name: String,
72 pub phase: String,
73 pub ready: bool,
74 pub terminated: bool,
75 pub exit_code: Option<i32>,
76 pub reason: Option<String>,
77 pub restart_count: u32,
78}
79
80#[derive(Clone)]
82pub struct K8sManager {
83 client: Option<Client>,
84 namespace: String,
85 pod_name: Option<String>,
86 deployment_name: Option<String>,
87 actions: Arc<RwLock<Vec<DeployAction>>>,
88}
89
90impl std::fmt::Debug for K8sManager {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("K8sManager")
93 .field("namespace", &self.namespace)
94 .field("pod_name", &self.pod_name)
95 .field("deployment_name", &self.deployment_name)
96 .field("connected", &self.client.is_some())
97 .finish()
98 }
99}
100
101impl K8sManager {
102 pub fn subagent_pod_name(subagent_id: &str) -> String {
103 let mut sanitized: String = subagent_id
105 .to_ascii_lowercase()
106 .chars()
107 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
108 .collect();
109 sanitized = sanitized.trim_matches('-').to_string();
110 if sanitized.is_empty() {
111 sanitized = "subagent".to_string();
112 }
113
114 let mut hasher = Sha256::new();
115 hasher.update(subagent_id.as_bytes());
116 let hash_hex = hex::encode(hasher.finalize());
117 let hash_suffix = &hash_hex[..10];
118
119 const PREFIX: &str = "codetether-subagent-";
121 let max_name_len = 63usize
122 .saturating_sub(PREFIX.len())
123 .saturating_sub(1)
124 .saturating_sub(hash_suffix.len());
125 let mut name_part: String = sanitized.chars().take(max_name_len.max(1)).collect();
126 name_part = name_part.trim_matches('-').to_string();
127 if name_part.is_empty() {
128 name_part = "subagent".to_string();
129 }
130
131 format!("{PREFIX}{name_part}-{hash_suffix}")
132 }
133
134 pub async fn new() -> Self {
137 let namespace = std::env::var("CODETETHER_K8S_NAMESPACE")
138 .or_else(|_| Self::read_namespace_file())
139 .unwrap_or_else(|_| "default".to_string());
140
141 let pod_name = std::env::var("HOSTNAME")
142 .ok()
143 .or_else(|| std::env::var("CODETETHER_POD_NAME").ok());
144
145 let deployment_name = std::env::var("CODETETHER_DEPLOYMENT_NAME").ok();
146
147 let client = match KubeConfig::incluster() {
148 Ok(config) => match Client::try_from(config) {
149 Ok(c) => {
150 tracing::info!(
151 namespace = %namespace,
152 pod = pod_name.as_deref().unwrap_or("-"),
153 "K8s client initialized (in-cluster)"
154 );
155 Some(c)
156 }
157 Err(e) => {
158 tracing::debug!("Failed to create in-cluster K8s client: {}", e);
159 None
160 }
161 },
162 Err(_) => {
163 match KubeConfig::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await
165 {
166 Ok(config) => match Client::try_from(config) {
167 Ok(c) => {
168 tracing::info!(namespace = %namespace, "K8s client initialized (kubeconfig)");
169 Some(c)
170 }
171 Err(e) => {
172 tracing::debug!("Failed to create K8s client from kubeconfig: {}", e);
173 None
174 }
175 },
176 Err(_) => {
177 tracing::debug!(
178 "Not running in K8s and no kubeconfig found — K8s features disabled"
179 );
180 None
181 }
182 }
183 }
184 };
185
186 Self {
187 client,
188 namespace,
189 pod_name,
190 deployment_name,
191 actions: Arc::new(RwLock::new(Vec::new())),
192 }
193 }
194
195 fn read_namespace_file() -> Result<String, std::env::VarError> {
197 std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
198 .map(|s| s.trim().to_string())
199 .map_err(|_| std::env::VarError::NotPresent)
200 }
201
202 pub fn is_available(&self) -> bool {
204 self.client.is_some()
205 }
206
207 pub async fn status(&self) -> K8sStatus {
209 let (replicas, available) = if let Some(ref client) = self.client {
210 if let Some(ref dep_name) = self.deployment_name {
211 self.get_deployment_replicas(client, dep_name).await
212 } else {
213 (None, None)
214 }
215 } else {
216 (None, None)
217 };
218
219 K8sStatus {
220 in_cluster: self.client.is_some(),
221 namespace: self.namespace.clone(),
222 pod_name: self.pod_name.clone(),
223 deployment_name: self.deployment_name.clone(),
224 replicas,
225 available_replicas: available,
226 }
227 }
228
229 async fn get_deployment_replicas(
230 &self,
231 client: &Client,
232 name: &str,
233 ) -> (Option<i32>, Option<i32>) {
234 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
235 match deployments.get(name).await {
236 Ok(dep) => {
237 let spec_replicas = dep.spec.as_ref().and_then(|s| s.replicas);
238 let available = dep.status.as_ref().and_then(|s| s.available_replicas);
239 (spec_replicas, available)
240 }
241 Err(e) => {
242 tracing::warn!("Failed to get deployment {}: {}", name, e);
243 (None, None)
244 }
245 }
246 }
247
248 pub async fn scale(&self, replicas: i32) -> Result<DeployAction> {
250 let client = self
251 .client
252 .as_ref()
253 .ok_or_else(|| anyhow!("K8s client not available — cannot scale"))?;
254 let dep_name = self
255 .deployment_name
256 .as_ref()
257 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
258
259 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
260
261 let patch = serde_json::json!({
262 "spec": {
263 "replicas": replicas
264 }
265 });
266
267 deployments
268 .patch(
269 dep_name,
270 &PatchParams::apply("codetether"),
271 &Patch::Merge(&patch),
272 )
273 .await
274 .with_context(|| {
275 format!(
276 "Failed to scale deployment {} to {} replicas",
277 dep_name, replicas
278 )
279 })?;
280
281 let action = DeployAction {
282 action: format!("scale:{}", replicas),
283 success: true,
284 message: format!("Scaled deployment '{}' to {} replicas", dep_name, replicas),
285 timestamp: Utc::now().to_rfc3339(),
286 };
287
288 tracing::info!(
289 deployment = %dep_name,
290 replicas = replicas,
291 "Self-deployment: scaled"
292 );
293
294 self.record_action(action.clone()).await;
295 Ok(action)
296 }
297
298 pub async fn rolling_restart(&self) -> Result<DeployAction> {
300 let client = self
301 .client
302 .as_ref()
303 .ok_or_else(|| anyhow!("K8s client not available — cannot restart"))?;
304 let dep_name = self
305 .deployment_name
306 .as_ref()
307 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
308
309 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
310
311 let patch = serde_json::json!({
313 "spec": {
314 "template": {
315 "metadata": {
316 "annotations": {
317 "codetether.run/restartedAt": Utc::now().to_rfc3339()
318 }
319 }
320 }
321 }
322 });
323
324 deployments
325 .patch(
326 dep_name,
327 &PatchParams::apply("codetether"),
328 &Patch::Merge(&patch),
329 )
330 .await
331 .with_context(|| format!("Failed to trigger rolling restart for {}", dep_name))?;
332
333 let action = DeployAction {
334 action: "rolling_restart".to_string(),
335 success: true,
336 message: format!("Triggered rolling restart for deployment '{}'", dep_name),
337 timestamp: Utc::now().to_rfc3339(),
338 };
339
340 tracing::info!(deployment = %dep_name, "Self-deployment: rolling restart");
341
342 self.record_action(action.clone()).await;
343 Ok(action)
344 }
345
346 pub async fn list_pods(&self) -> Result<Vec<PodInfo>> {
348 let client = self
349 .client
350 .as_ref()
351 .ok_or_else(|| anyhow!("K8s client not available"))?;
352
353 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
354
355 let label_selector = self
356 .deployment_name
357 .as_ref()
358 .map(|n| format!("app={}", n))
359 .unwrap_or_else(|| "app=codetether".to_string());
360
361 let list = pods
362 .list(&ListParams::default().labels(&label_selector))
363 .await
364 .context("Failed to list pods")?;
365
366 let infos: Vec<PodInfo> = list
367 .items
368 .iter()
369 .map(|pod| {
370 let name = pod.metadata.name.clone().unwrap_or_default();
371 let phase = pod
372 .status
373 .as_ref()
374 .and_then(|s| s.phase.clone())
375 .unwrap_or_else(|| "Unknown".to_string());
376 let ready = pod
377 .status
378 .as_ref()
379 .and_then(|s| s.conditions.as_ref())
380 .map(|conditions| {
381 conditions
382 .iter()
383 .any(|c| c.type_ == "Ready" && c.status == "True")
384 })
385 .unwrap_or(false);
386 let start_time = pod
387 .status
388 .as_ref()
389 .and_then(|s| s.start_time.as_ref())
390 .map(|t| t.0.to_string());
391
392 PodInfo {
393 name,
394 phase,
395 ready,
396 start_time,
397 }
398 })
399 .collect();
400
401 Ok(infos)
402 }
403
404 pub async fn spawn_subagent_pod(
406 &self,
407 subagent_id: &str,
408 image: Option<&str>,
409 env_vars: HashMap<String, String>,
410 ) -> Result<DeployAction> {
411 self.spawn_subagent_pod_with_spec(
412 subagent_id,
413 SubagentPodSpec {
414 image: image.map(ToString::to_string),
415 env_vars,
416 ..SubagentPodSpec::default()
417 },
418 )
419 .await
420 }
421
422 pub async fn spawn_subagent_pod_with_spec(
424 &self,
425 subagent_id: &str,
426 spec: SubagentPodSpec,
427 ) -> Result<DeployAction> {
428 let client = self
429 .client
430 .as_ref()
431 .ok_or_else(|| anyhow!("K8s client not available — cannot spawn sub-agent pod"))?;
432
433 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
434
435 let image = spec
436 .image
437 .as_deref()
438 .unwrap_or("ghcr.io/rileyseaburg/codetether-agent:latest");
439 let pod_name = Self::subagent_pod_name(subagent_id);
440
441 let mut env_list: Vec<serde_json::Value> = spec
442 .env_vars
443 .iter()
444 .map(|(k, v)| serde_json::json!({ "name": k, "value": v }))
445 .collect();
446 env_list
447 .push(serde_json::json!({ "name": "CODETETHER_SUBAGENT_ID", "value": subagent_id }));
448 env_list.push(
449 serde_json::json!({ "name": "CODETETHER_K8S_NAMESPACE", "value": &self.namespace }),
450 );
451
452 let mut labels = serde_json::json!({
453 "app": "codetether",
454 "codetether.run/role": "subagent",
455 "codetether.run/subagent-id": sanitize_label_value(subagent_id)
456 });
457
458 if let Some(map) = labels.as_object_mut() {
459 for (k, v) in &spec.labels {
460 map.insert(k.clone(), serde_json::json!(v));
461 }
462 }
463
464 let pod_manifest: Pod = serde_json::from_value(serde_json::json!({
465 "apiVersion": "v1",
466 "kind": "Pod",
467 "metadata": {
468 "name": pod_name,
469 "namespace": &self.namespace,
470 "labels": labels
471 },
472 "spec": {
473 "restartPolicy": "Never",
474 "containers": [{
475 "name": "agent",
476 "image": image,
477 "env": env_list,
478 "command": spec.command,
479 "args": spec.args,
480 "resources": {
481 "requests": { "memory": "256Mi", "cpu": "250m" },
482 "limits": { "memory": "1Gi", "cpu": "1000m" }
483 }
484 }]
485 }
486 }))?;
487
488 match pods.create(&PostParams::default(), &pod_manifest).await {
489 Ok(_) => {}
490 Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
491 tracing::warn!(
492 pod = %pod_name,
493 subagent_id = %subagent_id,
494 "Sub-agent pod already exists, deleting stale pod and retrying create"
495 );
496 let _ = pods
497 .delete(&pod_name, &kube::api::DeleteParams::default())
498 .await;
499 tokio::time::sleep(std::time::Duration::from_millis(600)).await;
500 pods.create(&PostParams::default(), &pod_manifest)
501 .await
502 .with_context(|| {
503 format!("Failed to create sub-agent pod {} after retry", pod_name)
504 })?;
505 }
506 Err(e) => {
507 return Err(e)
508 .with_context(|| format!("Failed to create sub-agent pod {pod_name}"));
509 }
510 }
511
512 let action = DeployAction {
513 action: format!("spawn_subagent:{}", subagent_id),
514 success: true,
515 message: format!(
516 "Created sub-agent pod '{}' in namespace '{}'",
517 pod_name, self.namespace
518 ),
519 timestamp: Utc::now().to_rfc3339(),
520 };
521
522 tracing::info!(
523 pod = %pod_name,
524 subagent_id = %subagent_id,
525 "Self-deployment: spawned sub-agent pod"
526 );
527
528 self.record_action(action.clone()).await;
529 Ok(action)
530 }
531
532 pub async fn delete_subagent_pod(&self, subagent_id: &str) -> Result<DeployAction> {
534 let client = self
535 .client
536 .as_ref()
537 .ok_or_else(|| anyhow!("K8s client not available"))?;
538
539 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
540 let pod_name = Self::subagent_pod_name(subagent_id);
541
542 match pods
543 .delete(&pod_name, &kube::api::DeleteParams::default())
544 .await
545 {
546 Ok(_) => {}
547 Err(kube::Error::Api(api_err)) if api_err.code == 404 => {
548 tracing::debug!(
549 pod = %pod_name,
550 subagent_id = %subagent_id,
551 "Sub-agent pod already deleted"
552 );
553 }
554 Err(e) => {
555 return Err(e).with_context(|| format!("Failed to delete pod {}", pod_name));
556 }
557 }
558
559 let action = DeployAction {
560 action: format!("delete_subagent:{}", subagent_id),
561 success: true,
562 message: format!("Deleted sub-agent pod '{}'", pod_name),
563 timestamp: Utc::now().to_rfc3339(),
564 };
565
566 self.record_action(action.clone()).await;
567 Ok(action)
568 }
569
570 pub async fn get_subagent_pod_state(
572 &self,
573 subagent_id: &str,
574 ) -> Result<Option<SubagentPodState>> {
575 let client = self
576 .client
577 .as_ref()
578 .ok_or_else(|| anyhow!("K8s client not available"))?;
579
580 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
581 let pod_name = Self::subagent_pod_name(subagent_id);
582
583 let pod = match pods.get_opt(&pod_name).await {
584 Ok(p) => p,
585 Err(e) => {
586 tracing::warn!(pod = %pod_name, error = %e, "Failed to fetch sub-agent pod state");
587 return Ok(None);
588 }
589 };
590
591 let Some(pod) = pod else {
592 return Ok(None);
593 };
594
595 let phase = pod
596 .status
597 .as_ref()
598 .and_then(|s| s.phase.clone())
599 .unwrap_or_else(|| "Unknown".to_string());
600 let ready = pod
601 .status
602 .as_ref()
603 .and_then(|s| s.conditions.as_ref())
604 .map(|conditions| {
605 conditions
606 .iter()
607 .any(|c| c.type_ == "Ready" && c.status == "True")
608 })
609 .unwrap_or(false);
610
611 let container_status = pod
612 .status
613 .as_ref()
614 .and_then(|s| s.container_statuses.as_ref())
615 .and_then(|statuses| statuses.first());
616 let terminated = container_status
617 .and_then(|status| status.state.as_ref())
618 .and_then(|state| state.terminated.as_ref())
619 .is_some();
620 let exit_code = container_status
621 .and_then(|status| status.state.as_ref())
622 .and_then(|state| state.terminated.as_ref())
623 .map(|terminated| terminated.exit_code);
624 let reason = container_status
625 .and_then(|status| status.state.as_ref())
626 .and_then(|state| {
627 state
628 .terminated
629 .as_ref()
630 .and_then(|t| t.reason.clone())
631 .or_else(|| state.waiting.as_ref().and_then(|w| w.reason.clone()))
632 });
633 let restart_count = container_status
634 .map(|status| status.restart_count.max(0) as u32)
635 .unwrap_or(0);
636
637 Ok(Some(SubagentPodState {
638 pod_name,
639 phase,
640 ready,
641 terminated,
642 exit_code,
643 reason,
644 restart_count,
645 }))
646 }
647
648 pub async fn subagent_logs(&self, subagent_id: &str, tail_lines: i64) -> Result<String> {
650 let client = self
651 .client
652 .as_ref()
653 .ok_or_else(|| anyhow!("K8s client not available"))?;
654 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
655 let pod_name = Self::subagent_pod_name(subagent_id);
656 let params = LogParams {
657 tail_lines: Some(tail_lines),
658 ..LogParams::default()
659 };
660 pods.logs(&pod_name, ¶ms)
661 .await
662 .with_context(|| format!("Failed to fetch logs for sub-agent pod {pod_name}"))
663 }
664
665 pub async fn recent_actions(&self, limit: usize) -> Vec<DeployAction> {
667 let actions = self.actions.read().await;
668 actions.iter().rev().take(limit).cloned().collect()
669 }
670
671 async fn record_action(&self, action: DeployAction) {
672 let mut actions = self.actions.write().await;
673 actions.push(action);
674 while actions.len() > 1000 {
676 actions.remove(0);
677 }
678 }
679}
680
681fn sanitize_label_value(input: &str) -> String {
682 let mut value: String = input
683 .chars()
684 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
685 .take(63)
686 .collect();
687 value = value
688 .trim_matches(|c| c == '-' || c == '_' || c == '.')
689 .to_string();
690 if value.is_empty() {
691 "subagent".to_string()
692 } else {
693 value
694 }
695}
696
697#[derive(Debug, Clone, Serialize, Deserialize)]
699pub struct PodInfo {
700 pub name: String,
701 pub phase: String,
702 pub ready: bool,
703 pub start_time: Option<String>,
704}
705
706#[cfg(test)]
707mod tests {
708 use super::*;
709
710 #[test]
711 fn subagent_pod_name_is_sanitized_and_stable() {
712 let pod_name = K8sManager::subagent_pod_name("SubTask_ABC/123");
713 assert!(pod_name.starts_with("codetether-subagent-"));
714 assert!(pod_name.len() <= 63);
715 assert!(
716 pod_name
717 .chars()
718 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
719 );
720
721 let pod_name_again = K8sManager::subagent_pod_name("SubTask_ABC/123");
722 assert_eq!(pod_name, pod_name_again);
723 }
724
725 #[test]
726 fn subagent_pod_name_avoids_prefix_collisions() {
727 let a = K8sManager::subagent_pod_name("subtask-aaaaaaaa-1111");
728 let b = K8sManager::subagent_pod_name("subtask-aaaaaaaa-2222");
729 assert_ne!(a, b);
730 }
731
732 #[tokio::test]
733 async fn k8s_manager_initializes_without_cluster() {
734 let mgr = K8sManager::new().await;
735 let status = mgr.status().await;
737 assert_eq!(status.namespace.is_empty(), false);
738 }
739}