1use anyhow::{Context, Result, anyhow};
14use chrono::Utc;
15use k8s_openapi::api::apps::v1::Deployment;
16use k8s_openapi::api::core::v1::Pod;
17use kube::{
18 Api, Client, Config as KubeConfig,
19 api::{ListParams, LogParams, Patch, PatchParams, PostParams},
20};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct K8sStatus {
30 pub in_cluster: bool,
32 pub namespace: String,
34 pub pod_name: Option<String>,
36 pub deployment_name: Option<String>,
38 pub replicas: Option<i32>,
40 pub available_replicas: Option<i32>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct DeployAction {
47 pub action: String,
48 pub success: bool,
49 pub message: String,
50 pub timestamp: String,
51}
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct SubagentPodSpec {
56 #[serde(default)]
57 pub image: Option<String>,
58 #[serde(default)]
59 pub env_vars: HashMap<String, String>,
60 #[serde(default)]
61 pub labels: HashMap<String, String>,
62 #[serde(default)]
63 pub command: Option<Vec<String>>,
64 #[serde(default)]
65 pub args: Option<Vec<String>>,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct SubagentPodState {
71 pub pod_name: String,
72 pub phase: String,
73 pub ready: bool,
74 pub terminated: bool,
75 pub exit_code: Option<i32>,
76 pub reason: Option<String>,
77 pub restart_count: u32,
78}
79
80#[derive(Clone)]
82pub struct K8sManager {
83 client: Option<Client>,
84 namespace: String,
85 pod_name: Option<String>,
86 deployment_name: Option<String>,
87 actions: Arc<RwLock<Vec<DeployAction>>>,
88}
89
90impl std::fmt::Debug for K8sManager {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("K8sManager")
93 .field("namespace", &self.namespace)
94 .field("pod_name", &self.pod_name)
95 .field("deployment_name", &self.deployment_name)
96 .field("connected", &self.client.is_some())
97 .finish()
98 }
99}
100
101impl K8sManager {
102 pub fn subagent_pod_name(subagent_id: &str) -> String {
103 let mut sanitized: String = subagent_id
105 .to_ascii_lowercase()
106 .chars()
107 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
108 .collect();
109 sanitized = sanitized.trim_matches('-').to_string();
110 if sanitized.is_empty() {
111 sanitized = "subagent".to_string();
112 }
113
114 let mut hasher = Sha256::new();
115 hasher.update(subagent_id.as_bytes());
116 let hash_hex = hex::encode(hasher.finalize());
117 let hash_suffix = &hash_hex[..10];
118
119 const PREFIX: &str = "codetether-subagent-";
121 let max_name_len = 63usize
122 .saturating_sub(PREFIX.len())
123 .saturating_sub(1)
124 .saturating_sub(hash_suffix.len());
125 let mut name_part: String = sanitized.chars().take(max_name_len.max(1)).collect();
126 name_part = name_part.trim_matches('-').to_string();
127 if name_part.is_empty() {
128 name_part = "subagent".to_string();
129 }
130
131 format!("{PREFIX}{name_part}-{hash_suffix}")
132 }
133
134 pub async fn new() -> Self {
137 crate::tls::ensure_rustls_crypto_provider();
141
142 let namespace = std::env::var("CODETETHER_K8S_NAMESPACE")
143 .or_else(|_| Self::read_namespace_file())
144 .unwrap_or_else(|_| "default".to_string());
145
146 let pod_name = std::env::var("HOSTNAME")
147 .ok()
148 .or_else(|| std::env::var("CODETETHER_POD_NAME").ok());
149
150 let deployment_name = std::env::var("CODETETHER_DEPLOYMENT_NAME").ok();
151
152 let client = match KubeConfig::incluster() {
153 Ok(config) => match Client::try_from(config) {
154 Ok(c) => {
155 tracing::info!(
156 namespace = %namespace,
157 pod = pod_name.as_deref().unwrap_or("-"),
158 "K8s client initialized (in-cluster)"
159 );
160 Some(c)
161 }
162 Err(e) => {
163 tracing::debug!("Failed to create in-cluster K8s client: {}", e);
164 None
165 }
166 },
167 Err(_) => {
168 match KubeConfig::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await
170 {
171 Ok(config) => match Client::try_from(config) {
172 Ok(c) => {
173 tracing::info!(namespace = %namespace, "K8s client initialized (kubeconfig)");
174 Some(c)
175 }
176 Err(e) => {
177 tracing::debug!("Failed to create K8s client from kubeconfig: {}", e);
178 None
179 }
180 },
181 Err(_) => {
182 tracing::debug!(
183 "Not running in K8s and no kubeconfig found — K8s features disabled"
184 );
185 None
186 }
187 }
188 }
189 };
190
191 Self {
192 client,
193 namespace,
194 pod_name,
195 deployment_name,
196 actions: Arc::new(RwLock::new(Vec::new())),
197 }
198 }
199
200 fn read_namespace_file() -> Result<String, std::env::VarError> {
202 std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
203 .map(|s| s.trim().to_string())
204 .map_err(|_| std::env::VarError::NotPresent)
205 }
206
207 pub fn is_available(&self) -> bool {
209 self.client.is_some()
210 }
211
212 pub fn scoped(&self, namespace: Option<&str>, deployment_name: Option<&str>) -> Self {
214 let mut scoped = self.clone();
215 if let Some(namespace) = namespace.filter(|value| !value.trim().is_empty()) {
216 scoped.namespace = namespace.to_string();
217 }
218 if let Some(deployment_name) = deployment_name.filter(|value| !value.trim().is_empty()) {
219 scoped.deployment_name = Some(deployment_name.to_string());
220 }
221 scoped
222 }
223
224 pub async fn status(&self) -> K8sStatus {
226 let (replicas, available) = if let Some(ref client) = self.client {
227 if let Some(ref dep_name) = self.deployment_name {
228 self.get_deployment_replicas(client, dep_name).await
229 } else {
230 (None, None)
231 }
232 } else {
233 (None, None)
234 };
235
236 K8sStatus {
237 in_cluster: self.client.is_some(),
238 namespace: self.namespace.clone(),
239 pod_name: self.pod_name.clone(),
240 deployment_name: self.deployment_name.clone(),
241 replicas,
242 available_replicas: available,
243 }
244 }
245
246 async fn get_deployment_replicas(
247 &self,
248 client: &Client,
249 name: &str,
250 ) -> (Option<i32>, Option<i32>) {
251 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
252 match deployments.get(name).await {
253 Ok(dep) => {
254 let spec_replicas = dep.spec.as_ref().and_then(|s| s.replicas);
255 let available = dep.status.as_ref().and_then(|s| s.available_replicas);
256 (spec_replicas, available)
257 }
258 Err(e) => {
259 tracing::warn!("Failed to get deployment {}: {}", name, e);
260 (None, None)
261 }
262 }
263 }
264
265 pub async fn scale(&self, replicas: i32) -> Result<DeployAction> {
267 let client = self
268 .client
269 .as_ref()
270 .ok_or_else(|| anyhow!("K8s client not available — cannot scale"))?;
271 let dep_name = self
272 .deployment_name
273 .as_ref()
274 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
275
276 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
277
278 let patch = serde_json::json!({
279 "spec": {
280 "replicas": replicas
281 }
282 });
283
284 deployments
285 .patch(
286 dep_name,
287 &PatchParams::apply("codetether"),
288 &Patch::Merge(&patch),
289 )
290 .await
291 .with_context(|| {
292 format!(
293 "Failed to scale deployment {} to {} replicas",
294 dep_name, replicas
295 )
296 })?;
297
298 let action = DeployAction {
299 action: format!("scale:{}", replicas),
300 success: true,
301 message: format!("Scaled deployment '{}' to {} replicas", dep_name, replicas),
302 timestamp: Utc::now().to_rfc3339(),
303 };
304
305 tracing::info!(
306 deployment = %dep_name,
307 replicas = replicas,
308 "Self-deployment: scaled"
309 );
310
311 self.record_action(action.clone()).await;
312 Ok(action)
313 }
314
315 pub async fn rolling_restart(&self) -> Result<DeployAction> {
317 let client = self
318 .client
319 .as_ref()
320 .ok_or_else(|| anyhow!("K8s client not available — cannot restart"))?;
321 let dep_name = self
322 .deployment_name
323 .as_ref()
324 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
325
326 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
327
328 let patch = serde_json::json!({
330 "spec": {
331 "template": {
332 "metadata": {
333 "annotations": {
334 "codetether.run/restartedAt": Utc::now().to_rfc3339()
335 }
336 }
337 }
338 }
339 });
340
341 deployments
342 .patch(
343 dep_name,
344 &PatchParams::apply("codetether"),
345 &Patch::Merge(&patch),
346 )
347 .await
348 .with_context(|| format!("Failed to trigger rolling restart for {}", dep_name))?;
349
350 let action = DeployAction {
351 action: "rolling_restart".to_string(),
352 success: true,
353 message: format!("Triggered rolling restart for deployment '{}'", dep_name),
354 timestamp: Utc::now().to_rfc3339(),
355 };
356
357 tracing::info!(deployment = %dep_name, "Self-deployment: rolling restart");
358
359 self.record_action(action.clone()).await;
360 Ok(action)
361 }
362
363 pub async fn list_pods(&self) -> Result<Vec<PodInfo>> {
365 self.list_pods_with_selector(None).await
366 }
367
368 pub async fn list_pods_with_selector(
370 &self,
371 label_selector: Option<&str>,
372 ) -> Result<Vec<PodInfo>> {
373 let client = self
374 .client
375 .as_ref()
376 .ok_or_else(|| anyhow!("K8s client not available"))?;
377
378 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
379
380 let label_selector = label_selector
381 .filter(|value| !value.trim().is_empty())
382 .map(ToString::to_string)
383 .or_else(|| self.deployment_name.as_ref().map(|n| format!("app={}", n)))
384 .unwrap_or_else(|| "app=codetether".to_string());
385
386 let list = pods
387 .list(&ListParams::default().labels(&label_selector))
388 .await
389 .context("Failed to list pods")?;
390
391 let infos: Vec<PodInfo> = list
392 .items
393 .iter()
394 .map(|pod| {
395 let name = pod.metadata.name.clone().unwrap_or_default();
396 let phase = pod
397 .status
398 .as_ref()
399 .and_then(|s| s.phase.clone())
400 .unwrap_or_else(|| "Unknown".to_string());
401 let ready = pod
402 .status
403 .as_ref()
404 .and_then(|s| s.conditions.as_ref())
405 .map(|conditions| {
406 conditions
407 .iter()
408 .any(|c| c.type_ == "Ready" && c.status == "True")
409 })
410 .unwrap_or(false);
411 let start_time = pod
412 .status
413 .as_ref()
414 .and_then(|s| s.start_time.as_ref())
415 .map(|t| t.0.to_string());
416
417 PodInfo {
418 name,
419 phase,
420 ready,
421 start_time,
422 }
423 })
424 .collect();
425
426 Ok(infos)
427 }
428
429 #[allow(dead_code)]
430 pub async fn spawn_subagent_pod(
432 &self,
433 subagent_id: &str,
434 image: Option<&str>,
435 env_vars: HashMap<String, String>,
436 ) -> Result<DeployAction> {
437 self.spawn_subagent_pod_with_spec(
438 subagent_id,
439 SubagentPodSpec {
440 image: image.map(ToString::to_string),
441 env_vars,
442 ..SubagentPodSpec::default()
443 },
444 )
445 .await
446 }
447
448 pub async fn spawn_subagent_pod_with_spec(
450 &self,
451 subagent_id: &str,
452 spec: SubagentPodSpec,
453 ) -> Result<DeployAction> {
454 let client = self
455 .client
456 .as_ref()
457 .ok_or_else(|| anyhow!("K8s client not available — cannot spawn sub-agent pod"))?;
458
459 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
460
461 let image = spec
462 .image
463 .as_deref()
464 .unwrap_or("ghcr.io/rileyseaburg/codetether-agent:latest");
465 let pod_name = Self::subagent_pod_name(subagent_id);
466
467 let mut env_list: Vec<serde_json::Value> = spec
468 .env_vars
469 .iter()
470 .map(|(k, v)| serde_json::json!({ "name": k, "value": v }))
471 .collect();
472 env_list
473 .push(serde_json::json!({ "name": "CODETETHER_SUBAGENT_ID", "value": subagent_id }));
474 env_list.push(
475 serde_json::json!({ "name": "CODETETHER_K8S_NAMESPACE", "value": &self.namespace }),
476 );
477
478 let mut labels = serde_json::json!({
479 "app": "codetether",
480 "codetether.run/role": "subagent",
481 "codetether.run/subagent-id": sanitize_label_value(subagent_id)
482 });
483
484 if let Some(map) = labels.as_object_mut() {
485 for (k, v) in &spec.labels {
486 map.insert(k.clone(), serde_json::json!(v));
487 }
488 }
489
490 let pod_manifest: Pod = serde_json::from_value(serde_json::json!({
491 "apiVersion": "v1",
492 "kind": "Pod",
493 "metadata": {
494 "name": pod_name,
495 "namespace": &self.namespace,
496 "labels": labels
497 },
498 "spec": {
499 "restartPolicy": "Never",
500 "containers": [{
501 "name": "agent",
502 "image": image,
503 "env": env_list,
504 "command": spec.command,
505 "args": spec.args,
506 "resources": {
507 "requests": { "memory": "256Mi", "cpu": "250m" },
508 "limits": { "memory": "1Gi", "cpu": "1000m" }
509 }
510 }]
511 }
512 }))?;
513
514 match pods.create(&PostParams::default(), &pod_manifest).await {
515 Ok(_) => {}
516 Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
517 tracing::warn!(
518 pod = %pod_name,
519 subagent_id = %subagent_id,
520 "Sub-agent pod already exists, deleting stale pod and retrying create"
521 );
522 let _ = pods
523 .delete(&pod_name, &kube::api::DeleteParams::default())
524 .await;
525 tokio::time::sleep(std::time::Duration::from_millis(600)).await;
526 pods.create(&PostParams::default(), &pod_manifest)
527 .await
528 .with_context(|| {
529 format!("Failed to create sub-agent pod {} after retry", pod_name)
530 })?;
531 }
532 Err(e) => {
533 return Err(e)
534 .with_context(|| format!("Failed to create sub-agent pod {pod_name}"));
535 }
536 }
537
538 let action = DeployAction {
539 action: format!("spawn_subagent:{}", subagent_id),
540 success: true,
541 message: format!(
542 "Created sub-agent pod '{}' in namespace '{}'",
543 pod_name, self.namespace
544 ),
545 timestamp: Utc::now().to_rfc3339(),
546 };
547
548 tracing::info!(
549 pod = %pod_name,
550 subagent_id = %subagent_id,
551 "Self-deployment: spawned sub-agent pod"
552 );
553
554 self.record_action(action.clone()).await;
555 Ok(action)
556 }
557
558 pub async fn delete_subagent_pod(&self, subagent_id: &str) -> Result<DeployAction> {
560 let client = self
561 .client
562 .as_ref()
563 .ok_or_else(|| anyhow!("K8s client not available"))?;
564
565 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
566 let pod_name = Self::subagent_pod_name(subagent_id);
567
568 match pods
569 .delete(&pod_name, &kube::api::DeleteParams::default())
570 .await
571 {
572 Ok(_) => {}
573 Err(kube::Error::Api(api_err)) if api_err.code == 404 => {
574 tracing::debug!(
575 pod = %pod_name,
576 subagent_id = %subagent_id,
577 "Sub-agent pod already deleted"
578 );
579 }
580 Err(e) => {
581 return Err(e).with_context(|| format!("Failed to delete pod {}", pod_name));
582 }
583 }
584
585 let action = DeployAction {
586 action: format!("delete_subagent:{}", subagent_id),
587 success: true,
588 message: format!("Deleted sub-agent pod '{}'", pod_name),
589 timestamp: Utc::now().to_rfc3339(),
590 };
591
592 self.record_action(action.clone()).await;
593 Ok(action)
594 }
595
596 pub async fn get_subagent_pod_state(
598 &self,
599 subagent_id: &str,
600 ) -> Result<Option<SubagentPodState>> {
601 let client = self
602 .client
603 .as_ref()
604 .ok_or_else(|| anyhow!("K8s client not available"))?;
605
606 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
607 let pod_name = Self::subagent_pod_name(subagent_id);
608
609 let pod = match pods.get_opt(&pod_name).await {
610 Ok(p) => p,
611 Err(e) => {
612 tracing::warn!(pod = %pod_name, error = %e, "Failed to fetch sub-agent pod state");
613 return Ok(None);
614 }
615 };
616
617 let Some(pod) = pod else {
618 return Ok(None);
619 };
620
621 let phase = pod
622 .status
623 .as_ref()
624 .and_then(|s| s.phase.clone())
625 .unwrap_or_else(|| "Unknown".to_string());
626 let ready = pod
627 .status
628 .as_ref()
629 .and_then(|s| s.conditions.as_ref())
630 .map(|conditions| {
631 conditions
632 .iter()
633 .any(|c| c.type_ == "Ready" && c.status == "True")
634 })
635 .unwrap_or(false);
636
637 let container_status = pod
638 .status
639 .as_ref()
640 .and_then(|s| s.container_statuses.as_ref())
641 .and_then(|statuses| statuses.first());
642 let terminated = container_status
643 .and_then(|status| status.state.as_ref())
644 .and_then(|state| state.terminated.as_ref())
645 .is_some();
646 let exit_code = container_status
647 .and_then(|status| status.state.as_ref())
648 .and_then(|state| state.terminated.as_ref())
649 .map(|terminated| terminated.exit_code);
650 let reason = container_status
651 .and_then(|status| status.state.as_ref())
652 .and_then(|state| {
653 state
654 .terminated
655 .as_ref()
656 .and_then(|t| t.reason.clone())
657 .or_else(|| state.waiting.as_ref().and_then(|w| w.reason.clone()))
658 });
659 let restart_count = container_status
660 .map(|status| status.restart_count.max(0) as u32)
661 .unwrap_or(0);
662
663 Ok(Some(SubagentPodState {
664 pod_name,
665 phase,
666 ready,
667 terminated,
668 exit_code,
669 reason,
670 restart_count,
671 }))
672 }
673
674 pub async fn subagent_logs(&self, subagent_id: &str, tail_lines: i64) -> Result<String> {
676 let client = self
677 .client
678 .as_ref()
679 .ok_or_else(|| anyhow!("K8s client not available"))?;
680 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
681 let pod_name = Self::subagent_pod_name(subagent_id);
682 let params = LogParams {
683 tail_lines: Some(tail_lines),
684 ..LogParams::default()
685 };
686 pods.logs(&pod_name, ¶ms)
687 .await
688 .with_context(|| format!("Failed to fetch logs for sub-agent pod {pod_name}"))
689 }
690
691 pub async fn recent_actions(&self, limit: usize) -> Vec<DeployAction> {
693 let actions = self.actions.read().await;
694 actions.iter().rev().take(limit).cloned().collect()
695 }
696
697 async fn record_action(&self, action: DeployAction) {
698 let mut actions = self.actions.write().await;
699 actions.push(action);
700 while actions.len() > 1000 {
702 actions.remove(0);
703 }
704 }
705}
706
707fn sanitize_label_value(input: &str) -> String {
708 let mut value: String = input
709 .chars()
710 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
711 .take(63)
712 .collect();
713 value = value
714 .trim_matches(|c| c == '-' || c == '_' || c == '.')
715 .to_string();
716 if value.is_empty() {
717 "subagent".to_string()
718 } else {
719 value
720 }
721}
722
723#[derive(Debug, Clone, Serialize, Deserialize)]
725pub struct PodInfo {
726 pub name: String,
727 pub phase: String,
728 pub ready: bool,
729 pub start_time: Option<String>,
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735
736 #[test]
737 fn subagent_pod_name_is_sanitized_and_stable() {
738 let pod_name = K8sManager::subagent_pod_name("SubTask_ABC/123");
739 assert!(pod_name.starts_with("codetether-subagent-"));
740 assert!(pod_name.len() <= 63);
741 assert!(
742 pod_name
743 .chars()
744 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
745 );
746
747 let pod_name_again = K8sManager::subagent_pod_name("SubTask_ABC/123");
748 assert_eq!(pod_name, pod_name_again);
749 }
750
751 #[test]
752 fn subagent_pod_name_avoids_prefix_collisions() {
753 let a = K8sManager::subagent_pod_name("subtask-aaaaaaaa-1111");
754 let b = K8sManager::subagent_pod_name("subtask-aaaaaaaa-2222");
755 assert_ne!(a, b);
756 }
757
758 #[tokio::test]
759 async fn k8s_manager_initializes_without_cluster() {
760 let mgr = K8sManager::new().await;
761 let status = mgr.status().await;
763 assert_eq!(status.namespace.is_empty(), false);
764 }
765}