1use anyhow::{Context, Result, anyhow};
14use chrono::Utc;
15use k8s_openapi::api::apps::v1::Deployment;
16use k8s_openapi::api::core::v1::Pod;
17use kube::{
18 Api, Client, Config as KubeConfig,
19 api::{ListParams, Patch, PatchParams, PostParams},
20};
21use serde::{Deserialize, Serialize};
22use std::collections::HashMap;
23use std::sync::Arc;
24use tokio::sync::RwLock;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct K8sStatus {
29 pub in_cluster: bool,
31 pub namespace: String,
33 pub pod_name: Option<String>,
35 pub deployment_name: Option<String>,
37 pub replicas: Option<i32>,
39 pub available_replicas: Option<i32>,
41}
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct DeployAction {
46 pub action: String,
47 pub success: bool,
48 pub message: String,
49 pub timestamp: String,
50}
51
52#[derive(Clone)]
54pub struct K8sManager {
55 client: Option<Client>,
56 namespace: String,
57 pod_name: Option<String>,
58 deployment_name: Option<String>,
59 actions: Arc<RwLock<Vec<DeployAction>>>,
60}
61
62impl std::fmt::Debug for K8sManager {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 f.debug_struct("K8sManager")
65 .field("namespace", &self.namespace)
66 .field("pod_name", &self.pod_name)
67 .field("deployment_name", &self.deployment_name)
68 .field("connected", &self.client.is_some())
69 .finish()
70 }
71}
72
73impl K8sManager {
74 pub async fn new() -> Self {
77 let namespace = std::env::var("CODETETHER_K8S_NAMESPACE")
78 .or_else(|_| Self::read_namespace_file())
79 .unwrap_or_else(|_| "default".to_string());
80
81 let pod_name = std::env::var("HOSTNAME")
82 .ok()
83 .or_else(|| std::env::var("CODETETHER_POD_NAME").ok());
84
85 let deployment_name = std::env::var("CODETETHER_DEPLOYMENT_NAME").ok();
86
87 let client = match KubeConfig::incluster() {
88 Ok(config) => match Client::try_from(config) {
89 Ok(c) => {
90 tracing::info!(
91 namespace = %namespace,
92 pod = pod_name.as_deref().unwrap_or("-"),
93 "K8s client initialized (in-cluster)"
94 );
95 Some(c)
96 }
97 Err(e) => {
98 tracing::debug!("Failed to create in-cluster K8s client: {}", e);
99 None
100 }
101 },
102 Err(_) => {
103 match KubeConfig::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await
105 {
106 Ok(config) => match Client::try_from(config) {
107 Ok(c) => {
108 tracing::info!(namespace = %namespace, "K8s client initialized (kubeconfig)");
109 Some(c)
110 }
111 Err(e) => {
112 tracing::debug!("Failed to create K8s client from kubeconfig: {}", e);
113 None
114 }
115 },
116 Err(_) => {
117 tracing::debug!(
118 "Not running in K8s and no kubeconfig found — K8s features disabled"
119 );
120 None
121 }
122 }
123 }
124 };
125
126 Self {
127 client,
128 namespace,
129 pod_name,
130 deployment_name,
131 actions: Arc::new(RwLock::new(Vec::new())),
132 }
133 }
134
135 fn read_namespace_file() -> Result<String, std::env::VarError> {
137 std::fs::read_to_string("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
138 .map(|s| s.trim().to_string())
139 .map_err(|_| std::env::VarError::NotPresent)
140 }
141
142 pub fn is_available(&self) -> bool {
144 self.client.is_some()
145 }
146
147 pub async fn status(&self) -> K8sStatus {
149 let (replicas, available) = if let Some(ref client) = self.client {
150 if let Some(ref dep_name) = self.deployment_name {
151 self.get_deployment_replicas(client, dep_name).await
152 } else {
153 (None, None)
154 }
155 } else {
156 (None, None)
157 };
158
159 K8sStatus {
160 in_cluster: self.client.is_some(),
161 namespace: self.namespace.clone(),
162 pod_name: self.pod_name.clone(),
163 deployment_name: self.deployment_name.clone(),
164 replicas,
165 available_replicas: available,
166 }
167 }
168
169 async fn get_deployment_replicas(
170 &self,
171 client: &Client,
172 name: &str,
173 ) -> (Option<i32>, Option<i32>) {
174 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
175 match deployments.get(name).await {
176 Ok(dep) => {
177 let spec_replicas = dep.spec.as_ref().and_then(|s| s.replicas);
178 let available = dep.status.as_ref().and_then(|s| s.available_replicas);
179 (spec_replicas, available)
180 }
181 Err(e) => {
182 tracing::warn!("Failed to get deployment {}: {}", name, e);
183 (None, None)
184 }
185 }
186 }
187
188 pub async fn scale(&self, replicas: i32) -> Result<DeployAction> {
190 let client = self
191 .client
192 .as_ref()
193 .ok_or_else(|| anyhow!("K8s client not available — cannot scale"))?;
194 let dep_name = self
195 .deployment_name
196 .as_ref()
197 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
198
199 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
200
201 let patch = serde_json::json!({
202 "spec": {
203 "replicas": replicas
204 }
205 });
206
207 deployments
208 .patch(
209 dep_name,
210 &PatchParams::apply("codetether"),
211 &Patch::Merge(&patch),
212 )
213 .await
214 .with_context(|| {
215 format!(
216 "Failed to scale deployment {} to {} replicas",
217 dep_name, replicas
218 )
219 })?;
220
221 let action = DeployAction {
222 action: format!("scale:{}", replicas),
223 success: true,
224 message: format!("Scaled deployment '{}' to {} replicas", dep_name, replicas),
225 timestamp: Utc::now().to_rfc3339(),
226 };
227
228 tracing::info!(
229 deployment = %dep_name,
230 replicas = replicas,
231 "Self-deployment: scaled"
232 );
233
234 self.record_action(action.clone()).await;
235 Ok(action)
236 }
237
238 pub async fn rolling_restart(&self) -> Result<DeployAction> {
240 let client = self
241 .client
242 .as_ref()
243 .ok_or_else(|| anyhow!("K8s client not available — cannot restart"))?;
244 let dep_name = self
245 .deployment_name
246 .as_ref()
247 .ok_or_else(|| anyhow!("Deployment name not set — set CODETETHER_DEPLOYMENT_NAME"))?;
248
249 let deployments: Api<Deployment> = Api::namespaced(client.clone(), &self.namespace);
250
251 let patch = serde_json::json!({
253 "spec": {
254 "template": {
255 "metadata": {
256 "annotations": {
257 "codetether.run/restartedAt": Utc::now().to_rfc3339()
258 }
259 }
260 }
261 }
262 });
263
264 deployments
265 .patch(
266 dep_name,
267 &PatchParams::apply("codetether"),
268 &Patch::Merge(&patch),
269 )
270 .await
271 .with_context(|| format!("Failed to trigger rolling restart for {}", dep_name))?;
272
273 let action = DeployAction {
274 action: "rolling_restart".to_string(),
275 success: true,
276 message: format!("Triggered rolling restart for deployment '{}'", dep_name),
277 timestamp: Utc::now().to_rfc3339(),
278 };
279
280 tracing::info!(deployment = %dep_name, "Self-deployment: rolling restart");
281
282 self.record_action(action.clone()).await;
283 Ok(action)
284 }
285
286 pub async fn list_pods(&self) -> Result<Vec<PodInfo>> {
288 let client = self
289 .client
290 .as_ref()
291 .ok_or_else(|| anyhow!("K8s client not available"))?;
292
293 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
294
295 let label_selector = self
296 .deployment_name
297 .as_ref()
298 .map(|n| format!("app={}", n))
299 .unwrap_or_else(|| "app=codetether".to_string());
300
301 let list = pods
302 .list(&ListParams::default().labels(&label_selector))
303 .await
304 .context("Failed to list pods")?;
305
306 let infos: Vec<PodInfo> = list
307 .items
308 .iter()
309 .map(|pod| {
310 let name = pod.metadata.name.clone().unwrap_or_default();
311 let phase = pod
312 .status
313 .as_ref()
314 .and_then(|s| s.phase.clone())
315 .unwrap_or_else(|| "Unknown".to_string());
316 let ready = pod
317 .status
318 .as_ref()
319 .and_then(|s| s.conditions.as_ref())
320 .map(|conditions| {
321 conditions
322 .iter()
323 .any(|c| c.type_ == "Ready" && c.status == "True")
324 })
325 .unwrap_or(false);
326 let start_time = pod
327 .status
328 .as_ref()
329 .and_then(|s| s.start_time.as_ref())
330 .map(|t| t.0.to_string());
331
332 PodInfo {
333 name,
334 phase,
335 ready,
336 start_time,
337 }
338 })
339 .collect();
340
341 Ok(infos)
342 }
343
344 pub async fn spawn_subagent_pod(
346 &self,
347 subagent_id: &str,
348 image: Option<&str>,
349 env_vars: HashMap<String, String>,
350 ) -> Result<DeployAction> {
351 let client = self
352 .client
353 .as_ref()
354 .ok_or_else(|| anyhow!("K8s client not available — cannot spawn sub-agent pod"))?;
355
356 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
357
358 let image = image.unwrap_or("ghcr.io/rileyseaburg/codetether-agent:latest");
359 let pod_name = format!(
360 "codetether-subagent-{}",
361 &subagent_id[..8.min(subagent_id.len())]
362 );
363
364 let mut env_list: Vec<serde_json::Value> = env_vars
365 .iter()
366 .map(|(k, v)| serde_json::json!({ "name": k, "value": v }))
367 .collect();
368 env_list
369 .push(serde_json::json!({ "name": "CODETETHER_SUBAGENT_ID", "value": subagent_id }));
370 env_list.push(
371 serde_json::json!({ "name": "CODETETHER_K8S_NAMESPACE", "value": &self.namespace }),
372 );
373
374 let pod_manifest: Pod = serde_json::from_value(serde_json::json!({
375 "apiVersion": "v1",
376 "kind": "Pod",
377 "metadata": {
378 "name": pod_name,
379 "namespace": &self.namespace,
380 "labels": {
381 "app": "codetether",
382 "codetether.run/role": "subagent",
383 "codetether.run/subagent-id": subagent_id
384 }
385 },
386 "spec": {
387 "restartPolicy": "Never",
388 "containers": [{
389 "name": "agent",
390 "image": image,
391 "env": env_list,
392 "resources": {
393 "requests": { "memory": "256Mi", "cpu": "250m" },
394 "limits": { "memory": "1Gi", "cpu": "1000m" }
395 }
396 }]
397 }
398 }))?;
399
400 pods.create(&PostParams::default(), &pod_manifest)
401 .await
402 .with_context(|| format!("Failed to create sub-agent pod {}", pod_name))?;
403
404 let action = DeployAction {
405 action: format!("spawn_subagent:{}", subagent_id),
406 success: true,
407 message: format!(
408 "Created sub-agent pod '{}' in namespace '{}'",
409 pod_name, self.namespace
410 ),
411 timestamp: Utc::now().to_rfc3339(),
412 };
413
414 tracing::info!(
415 pod = %pod_name,
416 subagent_id = %subagent_id,
417 "Self-deployment: spawned sub-agent pod"
418 );
419
420 self.record_action(action.clone()).await;
421 Ok(action)
422 }
423
424 pub async fn delete_subagent_pod(&self, subagent_id: &str) -> Result<DeployAction> {
426 let client = self
427 .client
428 .as_ref()
429 .ok_or_else(|| anyhow!("K8s client not available"))?;
430
431 let pods: Api<Pod> = Api::namespaced(client.clone(), &self.namespace);
432 let pod_name = format!(
433 "codetether-subagent-{}",
434 &subagent_id[..8.min(subagent_id.len())]
435 );
436
437 pods.delete(&pod_name, &kube::api::DeleteParams::default())
438 .await
439 .with_context(|| format!("Failed to delete pod {}", pod_name))?;
440
441 let action = DeployAction {
442 action: format!("delete_subagent:{}", subagent_id),
443 success: true,
444 message: format!("Deleted sub-agent pod '{}'", pod_name),
445 timestamp: Utc::now().to_rfc3339(),
446 };
447
448 self.record_action(action.clone()).await;
449 Ok(action)
450 }
451
452 pub async fn recent_actions(&self, limit: usize) -> Vec<DeployAction> {
454 let actions = self.actions.read().await;
455 actions.iter().rev().take(limit).cloned().collect()
456 }
457
458 async fn record_action(&self, action: DeployAction) {
459 let mut actions = self.actions.write().await;
460 actions.push(action);
461 while actions.len() > 1000 {
463 actions.remove(0);
464 }
465 }
466}
467
468#[derive(Debug, Clone, Serialize, Deserialize)]
470pub struct PodInfo {
471 pub name: String,
472 pub phase: String,
473 pub ready: bool,
474 pub start_time: Option<String>,
475}
476
477#[cfg(test)]
478mod tests {
479 use super::*;
480
481 #[tokio::test]
482 async fn k8s_manager_initializes_without_cluster() {
483 let mgr = K8sManager::new().await;
484 let status = mgr.status().await;
486 assert_eq!(status.namespace.is_empty(), false);
487 }
488}