1use anyhow::{Context, Result, anyhow};
14use chrono::Utc;
15use k8s_openapi::api::apps::v1::Deployment;
16use k8s_openapi::api::core::v1::Pod;
17use kube::{
18 Api, Client, Config as KubeConfig,
19 api::{ListParams, LogParams, Patch, PatchParams, PostParams},
20};
21use serde::{Deserialize, Serialize};
22use sha2::{Digest, Sha256};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct K8sStatus {
30 pub in_cluster: bool,
32 pub namespace: String,
34 pub pod_name: Option<String>,
36 pub deployment_name: Option<String>,
38 pub replicas: Option<i32>,
40 pub available_replicas: Option<i32>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct DeployAction {
47 pub action: String,
48 pub success: bool,
49 pub message: String,
50 pub timestamp: String,
51}
52
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
55pub struct SubagentPodSpec {
56 #[serde(default)]
57 pub image: Option<String>,
58 #[serde(default)]
59 pub env_vars: HashMap<String, String>,
60 #[serde(default)]
61 pub labels: HashMap<String, String>,
62 #[serde(default)]
63 pub command: Option<Vec<String>>,
64 #[serde(default)]
65 pub args: Option<Vec<String>>,
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct SubagentPodState {
71 pub pod_name: String,
72 pub phase: String,
73 pub ready: bool,
74 pub terminated: bool,
75 pub exit_code: Option<i32>,
76 pub reason: Option<String>,
77 pub restart_count: u32,
78}
79
80#[derive(Clone)]
82pub struct K8sManager {
83 client: Option<Client>,
84 namespace: String,
85 pod_name: Option<String>,
86 deployment_name: Option<String>,
87 actions: Arc<RwLock<Vec<DeployAction>>>,
88}
89
90impl std::fmt::Debug for K8sManager {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("K8sManager")
93 .field("namespace", &self.namespace)
94 .field("pod_name", &self.pod_name)
95 .field("deployment_name", &self.deployment_name)
96 .field("connected", &self.client.is_some())
97 .finish()
98 }
99}
100
101impl K8sManager {
102 pub fn subagent_pod_name(subagent_id: &str) -> String {
103 let mut sanitized: String = subagent_id
105 .to_ascii_lowercase()
106 .chars()
107 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
108 .collect();
109 sanitized = sanitized.trim_matches('-').to_string();
110 if sanitized.is_empty() {
111 sanitized = "subagent".to_string();
112 }
113
114 let mut hasher = Sha256::new();
115 hasher.update(subagent_id.as_bytes());
116 let hash_hex = hex::encode(hasher.finalize());
117 let hash_suffix = &hash_hex[..10];
118
119 const PREFIX: &str = "codetether-subagent-";
121 let max_name_len = 63usize
122 .saturating_sub(PREFIX.len())
123 .saturating_sub(1)
124 .saturating_sub(hash_suffix.len());
125 let mut name_part: String = sanitized.chars().take(max_name_len.max(1)).collect();
126 name_part = name_part.trim_matches('-').to_string();
127 if name_part.is_empty() {
128 name_part = "subagent".to_string();
129 }
130
131 format!("{PREFIX}{name_part}-{hash_suffix}")
132 }
133
134 pub async fn new() -> Self {
137 crate::tls::ensure_rustls_crypto_provider();
141
142 let namespace = std::env::var("CODETETHER_K8S_NAMESPACE")
143 .or_else(|_| Self::read_namespace_file())
144 .unwrap_or_else(|_| "default".to_string());
145
146 let pod_name = std::env::var("HOSTNAME")
147 .ok()
148 .or_else(|| std::env::var("CODETETHER_POD_NAME").ok());
149
150 let deployment_name = std::env::var("CODETETHER_DEPLOYMENT_NAME").ok();
151
152 let client = match KubeConfig::incluster() {
153 Ok(config) => match Client::try_from(config) {
154 Ok(c) => {
155 tracing::info!(
156 namespace = %namespace,
157 pod = pod_name.as_deref().unwrap_or("-"),
158 "K8s client initialized (in-cluster)"
159 );
160 Some(c)
161 }
162 Err(e) => {
163 tracing::debug!("Failed to create in-cluster K8s client: {}", e);
164 None
165 }
166 },
167 Err(_) => {
168 match KubeConfig::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await
170 {
171 Ok(config) => match Client::try_from(config) {
172 Ok(c) => {
173 tracing::info!(namespace = %namespace, "K8s client initialized (kubeconfig)");
174 Some(c)
175 }
176 Err(e) => {
177 tracing::debug!("Failed to create K8s client from kubeconfig: {}", e);
178 None
179 }
180 },
181 Err(_) => {
182 tracing::debug!(
183 "Not running in K8s and no kubeconfig found — K8s features disabled"
184 );
185 None
186 }
187 }
188 }
189 };
190
191 Self {
192 client,
193 namespace,
194 pod_name,
195 deployment_name,
196 actions: Arc::new(RwLock::new(Vec::new())),
197 }
198 }
199
200 fn read_namespace_file() -> Result<String, std::env::VarError> {
202 std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
203 .map(|s| s.trim().to_string())
204 .map_err(|_| std::env::VarError::NotPresent)
205 }
206
207 pub fn is_available(&self) -> bool {
209 self.client.is_some()
210 }
211
212 pub async fn status(&self) -> K8sStatus {
214 let (replicas, available) = if let Some(ref client) = self.client {
215 if let Some(ref dep_name) = self.deployment_name {
216 self.get_deployment_replicas(client, dep_name).await
217 } else {
218 (None, None)
219 }
220 } else {
221 (None, None)
222 };
223
224 K8sStatus {
225 in_cluster: self.client.is_some(),
226 namespace: self.namespace.clone(),
227 pod_name: self.pod_name.clone(),
228 deployment_name: self.deployment_name.clone(),
229 replicas,
230 available_replicas: available,
231 }
232 }
233
234 async fn get_deployment_replicas(
235 &self,
236 client: &Client,
237 name: &str,
238 ) -> (Option<i32>, Option<i32>) {
239 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
240 match deployments.get(name).await {
241 Ok(dep) => {
242 let spec_replicas = dep.spec.as_ref().and_then(|s| s.replicas);
243 let available = dep.status.as_ref().and_then(|s| s.available_replicas);
244 (spec_replicas, available)
245 }
246 Err(e) => {
247 tracing::warn!("Failed to get deployment {}: {}", name, e);
248 (None, None)
249 }
250 }
251 }
252
253 pub async fn scale(&self, replicas: i32) -> Result<DeployAction> {
255 let client = self
256 .client
257 .as_ref()
258 .ok_or_else(|| anyhow!("K8s client not available — cannot scale"))?;
259 let dep_name = self
260 .deployment_name
261 .as_ref()
262 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
263
264 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
265
266 let patch = serde_json::json!({
267 "spec": {
268 "replicas": replicas
269 }
270 });
271
272 deployments
273 .patch(
274 dep_name,
275 &PatchParams::apply("codetether"),
276 &Patch::Merge(&patch),
277 )
278 .await
279 .with_context(|| {
280 format!(
281 "Failed to scale deployment {} to {} replicas",
282 dep_name, replicas
283 )
284 })?;
285
286 let action = DeployAction {
287 action: format!("scale:{}", replicas),
288 success: true,
289 message: format!("Scaled deployment '{}' to {} replicas", dep_name, replicas),
290 timestamp: Utc::now().to_rfc3339(),
291 };
292
293 tracing::info!(
294 deployment = %dep_name,
295 replicas = replicas,
296 "Self-deployment: scaled"
297 );
298
299 self.record_action(action.clone()).await;
300 Ok(action)
301 }
302
303 pub async fn rolling_restart(&self) -> Result<DeployAction> {
305 let client = self
306 .client
307 .as_ref()
308 .ok_or_else(|| anyhow!("K8s client not available — cannot restart"))?;
309 let dep_name = self
310 .deployment_name
311 .as_ref()
312 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
313
314 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
315
316 let patch = serde_json::json!({
318 "spec": {
319 "template": {
320 "metadata": {
321 "annotations": {
322 "codetether.run/restartedAt": Utc::now().to_rfc3339()
323 }
324 }
325 }
326 }
327 });
328
329 deployments
330 .patch(
331 dep_name,
332 &PatchParams::apply("codetether"),
333 &Patch::Merge(&patch),
334 )
335 .await
336 .with_context(|| format!("Failed to trigger rolling restart for {}", dep_name))?;
337
338 let action = DeployAction {
339 action: "rolling_restart".to_string(),
340 success: true,
341 message: format!("Triggered rolling restart for deployment '{}'", dep_name),
342 timestamp: Utc::now().to_rfc3339(),
343 };
344
345 tracing::info!(deployment = %dep_name, "Self-deployment: rolling restart");
346
347 self.record_action(action.clone()).await;
348 Ok(action)
349 }
350
351 pub async fn list_pods(&self) -> Result<Vec<PodInfo>> {
353 let client = self
354 .client
355 .as_ref()
356 .ok_or_else(|| anyhow!("K8s client not available"))?;
357
358 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
359
360 let label_selector = self
361 .deployment_name
362 .as_ref()
363 .map(|n| format!("app={}", n))
364 .unwrap_or_else(|| "app=codetether".to_string());
365
366 let list = pods
367 .list(&ListParams::default().labels(&label_selector))
368 .await
369 .context("Failed to list pods")?;
370
371 let infos: Vec<PodInfo> = list
372 .items
373 .iter()
374 .map(|pod| {
375 let name = pod.metadata.name.clone().unwrap_or_default();
376 let phase = pod
377 .status
378 .as_ref()
379 .and_then(|s| s.phase.clone())
380 .unwrap_or_else(|| "Unknown".to_string());
381 let ready = pod
382 .status
383 .as_ref()
384 .and_then(|s| s.conditions.as_ref())
385 .map(|conditions| {
386 conditions
387 .iter()
388 .any(|c| c.type_ == "Ready" && c.status == "True")
389 })
390 .unwrap_or(false);
391 let start_time = pod
392 .status
393 .as_ref()
394 .and_then(|s| s.start_time.as_ref())
395 .map(|t| t.0.to_string());
396
397 PodInfo {
398 name,
399 phase,
400 ready,
401 start_time,
402 }
403 })
404 .collect();
405
406 Ok(infos)
407 }
408
409 #[allow(dead_code)]
410 pub async fn spawn_subagent_pod(
412 &self,
413 subagent_id: &str,
414 image: Option<&str>,
415 env_vars: HashMap<String, String>,
416 ) -> Result<DeployAction> {
417 self.spawn_subagent_pod_with_spec(
418 subagent_id,
419 SubagentPodSpec {
420 image: image.map(ToString::to_string),
421 env_vars,
422 ..SubagentPodSpec::default()
423 },
424 )
425 .await
426 }
427
428 pub async fn spawn_subagent_pod_with_spec(
430 &self,
431 subagent_id: &str,
432 spec: SubagentPodSpec,
433 ) -> Result<DeployAction> {
434 let client = self
435 .client
436 .as_ref()
437 .ok_or_else(|| anyhow!("K8s client not available — cannot spawn sub-agent pod"))?;
438
439 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
440
441 let image = spec
442 .image
443 .as_deref()
444 .unwrap_or("ghcr.io/rileyseaburg/codetether-agent:latest");
445 let pod_name = Self::subagent_pod_name(subagent_id);
446
447 let mut env_list: Vec<serde_json::Value> = spec
448 .env_vars
449 .iter()
450 .map(|(k, v)| serde_json::json!({ "name": k, "value": v }))
451 .collect();
452 env_list
453 .push(serde_json::json!({ "name": "CODETETHER_SUBAGENT_ID", "value": subagent_id }));
454 env_list.push(
455 serde_json::json!({ "name": "CODETETHER_K8S_NAMESPACE", "value": &self.namespace }),
456 );
457
458 let mut labels = serde_json::json!({
459 "app": "codetether",
460 "codetether.run/role": "subagent",
461 "codetether.run/subagent-id": sanitize_label_value(subagent_id)
462 });
463
464 if let Some(map) = labels.as_object_mut() {
465 for (k, v) in &spec.labels {
466 map.insert(k.clone(), serde_json::json!(v));
467 }
468 }
469
470 let pod_manifest: Pod = serde_json::from_value(serde_json::json!({
471 "apiVersion": "v1",
472 "kind": "Pod",
473 "metadata": {
474 "name": pod_name,
475 "namespace": &self.namespace,
476 "labels": labels
477 },
478 "spec": {
479 "restartPolicy": "Never",
480 "containers": [{
481 "name": "agent",
482 "image": image,
483 "env": env_list,
484 "command": spec.command,
485 "args": spec.args,
486 "resources": {
487 "requests": { "memory": "256Mi", "cpu": "250m" },
488 "limits": { "memory": "1Gi", "cpu": "1000m" }
489 }
490 }]
491 }
492 }))?;
493
494 match pods.create(&PostParams::default(), &pod_manifest).await {
495 Ok(_) => {}
496 Err(kube::Error::Api(api_err)) if api_err.code == 409 => {
497 tracing::warn!(
498 pod = %pod_name,
499 subagent_id = %subagent_id,
500 "Sub-agent pod already exists, deleting stale pod and retrying create"
501 );
502 let _ = pods
503 .delete(&pod_name, &kube::api::DeleteParams::default())
504 .await;
505 tokio::time::sleep(std::time::Duration::from_millis(600)).await;
506 pods.create(&PostParams::default(), &pod_manifest)
507 .await
508 .with_context(|| {
509 format!("Failed to create sub-agent pod {} after retry", pod_name)
510 })?;
511 }
512 Err(e) => {
513 return Err(e)
514 .with_context(|| format!("Failed to create sub-agent pod {pod_name}"));
515 }
516 }
517
518 let action = DeployAction {
519 action: format!("spawn_subagent:{}", subagent_id),
520 success: true,
521 message: format!(
522 "Created sub-agent pod '{}' in namespace '{}'",
523 pod_name, self.namespace
524 ),
525 timestamp: Utc::now().to_rfc3339(),
526 };
527
528 tracing::info!(
529 pod = %pod_name,
530 subagent_id = %subagent_id,
531 "Self-deployment: spawned sub-agent pod"
532 );
533
534 self.record_action(action.clone()).await;
535 Ok(action)
536 }
537
538 pub async fn delete_subagent_pod(&self, subagent_id: &str) -> Result<DeployAction> {
540 let client = self
541 .client
542 .as_ref()
543 .ok_or_else(|| anyhow!("K8s client not available"))?;
544
545 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
546 let pod_name = Self::subagent_pod_name(subagent_id);
547
548 match pods
549 .delete(&pod_name, &kube::api::DeleteParams::default())
550 .await
551 {
552 Ok(_) => {}
553 Err(kube::Error::Api(api_err)) if api_err.code == 404 => {
554 tracing::debug!(
555 pod = %pod_name,
556 subagent_id = %subagent_id,
557 "Sub-agent pod already deleted"
558 );
559 }
560 Err(e) => {
561 return Err(e).with_context(|| format!("Failed to delete pod {}", pod_name));
562 }
563 }
564
565 let action = DeployAction {
566 action: format!("delete_subagent:{}", subagent_id),
567 success: true,
568 message: format!("Deleted sub-agent pod '{}'", pod_name),
569 timestamp: Utc::now().to_rfc3339(),
570 };
571
572 self.record_action(action.clone()).await;
573 Ok(action)
574 }
575
576 pub async fn get_subagent_pod_state(
578 &self,
579 subagent_id: &str,
580 ) -> Result<Option<SubagentPodState>> {
581 let client = self
582 .client
583 .as_ref()
584 .ok_or_else(|| anyhow!("K8s client not available"))?;
585
586 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
587 let pod_name = Self::subagent_pod_name(subagent_id);
588
589 let pod = match pods.get_opt(&pod_name).await {
590 Ok(p) => p,
591 Err(e) => {
592 tracing::warn!(pod = %pod_name, error = %e, "Failed to fetch sub-agent pod state");
593 return Ok(None);
594 }
595 };
596
597 let Some(pod) = pod else {
598 return Ok(None);
599 };
600
601 let phase = pod
602 .status
603 .as_ref()
604 .and_then(|s| s.phase.clone())
605 .unwrap_or_else(|| "Unknown".to_string());
606 let ready = pod
607 .status
608 .as_ref()
609 .and_then(|s| s.conditions.as_ref())
610 .map(|conditions| {
611 conditions
612 .iter()
613 .any(|c| c.type_ == "Ready" && c.status == "True")
614 })
615 .unwrap_or(false);
616
617 let container_status = pod
618 .status
619 .as_ref()
620 .and_then(|s| s.container_statuses.as_ref())
621 .and_then(|statuses| statuses.first());
622 let terminated = container_status
623 .and_then(|status| status.state.as_ref())
624 .and_then(|state| state.terminated.as_ref())
625 .is_some();
626 let exit_code = container_status
627 .and_then(|status| status.state.as_ref())
628 .and_then(|state| state.terminated.as_ref())
629 .map(|terminated| terminated.exit_code);
630 let reason = container_status
631 .and_then(|status| status.state.as_ref())
632 .and_then(|state| {
633 state
634 .terminated
635 .as_ref()
636 .and_then(|t| t.reason.clone())
637 .or_else(|| state.waiting.as_ref().and_then(|w| w.reason.clone()))
638 });
639 let restart_count = container_status
640 .map(|status| status.restart_count.max(0) as u32)
641 .unwrap_or(0);
642
643 Ok(Some(SubagentPodState {
644 pod_name,
645 phase,
646 ready,
647 terminated,
648 exit_code,
649 reason,
650 restart_count,
651 }))
652 }
653
654 pub async fn subagent_logs(&self, subagent_id: &str, tail_lines: i64) -> Result<String> {
656 let client = self
657 .client
658 .as_ref()
659 .ok_or_else(|| anyhow!("K8s client not available"))?;
660 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
661 let pod_name = Self::subagent_pod_name(subagent_id);
662 let params = LogParams {
663 tail_lines: Some(tail_lines),
664 ..LogParams::default()
665 };
666 pods.logs(&pod_name, ¶ms)
667 .await
668 .with_context(|| format!("Failed to fetch logs for sub-agent pod {pod_name}"))
669 }
670
671 pub async fn recent_actions(&self, limit: usize) -> Vec<DeployAction> {
673 let actions = self.actions.read().await;
674 actions.iter().rev().take(limit).cloned().collect()
675 }
676
677 async fn record_action(&self, action: DeployAction) {
678 let mut actions = self.actions.write().await;
679 actions.push(action);
680 while actions.len() > 1000 {
682 actions.remove(0);
683 }
684 }
685}
686
687fn sanitize_label_value(input: &str) -> String {
688 let mut value: String = input
689 .chars()
690 .filter(|c| c.is_ascii_alphanumeric() || *c == '-' || *c == '_' || *c == '.')
691 .take(63)
692 .collect();
693 value = value
694 .trim_matches(|c| c == '-' || c == '_' || c == '.')
695 .to_string();
696 if value.is_empty() {
697 "subagent".to_string()
698 } else {
699 value
700 }
701}
702
703#[derive(Debug, Clone, Serialize, Deserialize)]
705pub struct PodInfo {
706 pub name: String,
707 pub phase: String,
708 pub ready: bool,
709 pub start_time: Option<String>,
710}
711
712#[cfg(test)]
713mod tests {
714 use super::*;
715
716 #[test]
717 fn subagent_pod_name_is_sanitized_and_stable() {
718 let pod_name = K8sManager::subagent_pod_name("SubTask_ABC/123");
719 assert!(pod_name.starts_with("codetether-subagent-"));
720 assert!(pod_name.len() <= 63);
721 assert!(
722 pod_name
723 .chars()
724 .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-')
725 );
726
727 let pod_name_again = K8sManager::subagent_pod_name("SubTask_ABC/123");
728 assert_eq!(pod_name, pod_name_again);
729 }
730
731 #[test]
732 fn subagent_pod_name_avoids_prefix_collisions() {
733 let a = K8sManager::subagent_pod_name("subtask-aaaaaaaa-1111");
734 let b = K8sManager::subagent_pod_name("subtask-aaaaaaaa-2222");
735 assert_ne!(a, b);
736 }
737
738 #[tokio::test]
739 async fn k8s_manager_initializes_without_cluster() {
740 let mgr = K8sManager::new().await;
741 let status = mgr.status().await;
743 assert_eq!(status.namespace.is_empty(), false);
744 }
745}