1use anyhow::{Context, Result};
2use chrono::Utc;
3use k8s_openapi::api::batch::v1::Job;
4use k8s_openapi::api::core::v1::{Container, Pod, PodSpec, PodTemplateSpec};
5use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
6use kube::{
7 api::{Api, ListParams, LogParams, PostParams},
8 Client, Error as KubeError,
9};
10use serde::{Deserialize, Serialize};
11use sqlx::PgPool;
12use tokio::time::{sleep, Duration};
13use tracing::{info, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct KubeConfig {
18 pub namespace: String,
19 pub timeout_seconds: u64,
20 pub max_log_lines: Option<i64>,
21}
22
23impl Default for KubeConfig {
24 fn default() -> Self {
25 Self {
26 namespace: "default".to_string(),
27 timeout_seconds: 300,
28 max_log_lines: Some(1000),
29 }
30 }
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct JobLogs {
35 pub job_name: String,
36 pub pod_name: String,
37 pub logs: String,
38 pub timestamp: chrono::DateTime<chrono::Utc>,
39 pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct KubernetesError {
44 pub error_type: String,
45 pub message: String,
46 pub details: Option<String>,
47}
48
49pub async fn create_k8s_job(
50 client: &Client,
51 job_name: &str,
52 image: &str,
53 command: &[String],
54) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
55 let jobs: Api<Job> = Api::namespaced(client.clone(), "default");
56
57 let job = Job {
58 metadata: ObjectMeta {
59 name: Some(job_name.to_string()),
60 labels: Some(std::collections::BTreeMap::from([
61 ("app".to_string(), "sparktest".to_string()),
62 ("component".to_string(), "test-runner".to_string()),
63 ])),
64 ..Default::default()
65 },
66 spec: Some(k8s_openapi::api::batch::v1::JobSpec {
67 template: PodTemplateSpec {
68 metadata: Some(ObjectMeta {
69 labels: Some(std::collections::BTreeMap::from([
70 ("job-name".to_string(), job_name.to_string()),
71 ("app".to_string(), "sparktest".to_string()),
72 ])),
73 ..Default::default()
74 }),
75 spec: Some(PodSpec {
76 containers: vec![Container {
77 name: job_name.to_string(),
78 image: Some(image.to_string()),
79 command: Some(command.to_vec()),
80 ..Default::default()
81 }],
82 restart_policy: Some("Never".to_string()),
83 ..Default::default()
84 }),
85 },
86 backoff_limit: Some(0),
87 ttl_seconds_after_finished: Some(3600), ..Default::default()
89 }),
90 ..Default::default()
91 };
92
93 jobs.create(&PostParams::default(), &job).await?;
94 Ok(())
95}
96
97pub async fn monitor_job_and_update_status(
98 run_id: Uuid,
99 job_name: String,
100 pool: PgPool,
101) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
102 let client = Client::try_default().await?;
103 let jobs: Api<Job> = Api::namespaced(client.clone(), "default");
104
105 let start_time = Utc::now();
106 let mut status = "running".to_string();
107
108 for _ in 0..30 {
109 let job = jobs.get(&job_name).await?;
110 if let Some(s) = &job.status {
111 if let Some(conds) = &s.conditions {
112 if conds
113 .iter()
114 .any(|c| c.type_ == "Complete" && c.status == "True")
115 {
116 status = "succeeded".to_string();
117 break;
118 } else if conds
119 .iter()
120 .any(|c| c.type_ == "Failed" && c.status == "True")
121 {
122 status = "failed".to_string();
123 break;
124 }
125 }
126 }
127 sleep(Duration::from_secs(2)).await;
128 }
129
130 let duration = (Utc::now() - start_time).num_seconds() as i32;
131
132 sqlx::query("UPDATE test_runs SET status = $1, duration = $2 WHERE id = $3")
133 .bind(&status)
134 .bind(duration)
135 .bind(run_id)
136 .execute(&pool)
137 .await?;
138
139 Ok(())
140}
141
142pub struct KubernetesClient {
143 client: Client,
144 config: KubeConfig,
145}
146
147impl KubernetesClient {
148 pub async fn new() -> Result<Self> {
150 let client = Self::create_authenticated_client().await?;
151 let config = KubeConfig::default();
152
153 Ok(Self { client, config })
154 }
155
156 pub async fn new_with_config(config: KubeConfig) -> Result<Self> {
158 let client = Self::create_authenticated_client().await?;
159 Ok(Self { client, config })
160 }
161
162 async fn create_authenticated_client() -> Result<Client> {
164 if let Ok(client) = Client::try_default().await {
168 info!("Using in-cluster Kubernetes authentication");
169 return Ok(client);
170 }
171
172 match kube::Config::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await {
174 Ok(config) => {
175 info!("Using kubeconfig authentication");
176 return Ok(Client::try_from(config)?);
177 }
178 Err(e) => {
179 warn!("Failed to load kubeconfig: {}", e);
180 }
181 }
182
183 if let Ok(config) = Self::config_from_env() {
185 info!("Using environment-based Kubernetes authentication");
186 return Ok(Client::try_from(config)?);
187 }
188
189 Client::try_default()
191 .await
192 .context("Failed to create Kubernetes client with any authentication method")
193 }
194
195 fn config_from_env() -> Result<kube::Config> {
197 let token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token";
199 let ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
200
201 if std::path::Path::new(token_path).exists() && std::path::Path::new(ca_path).exists() {
202 return kube::Config::incluster().context("Failed to create in-cluster config");
204 }
205
206 Err(anyhow::anyhow!("No valid Kubernetes configuration found"))
208 }
209
210 pub async fn get_job_logs(&self, job_name: &str) -> Result<JobLogs> {
212 let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
213
214 let job = jobs
216 .get(job_name)
217 .await
218 .with_context(|| format!("Failed to get job '{}'", job_name))?;
219
220 let job_status = job
221 .status
222 .as_ref()
223 .and_then(|s| s.conditions.as_ref())
224 .map(|conditions| {
225 if conditions
226 .iter()
227 .any(|c| c.type_ == "Complete" && c.status == "True")
228 {
229 "completed"
230 } else if conditions
231 .iter()
232 .any(|c| c.type_ == "Failed" && c.status == "True")
233 {
234 "failed"
235 } else {
236 "running"
237 }
238 })
239 .unwrap_or("unknown");
240
241 let pod_name = self.get_job_pod_name(job_name).await?;
243
244 let pod_status = self.get_pod_status(&pod_name).await?;
246
247 let logs = match pod_status.as_str() {
249 "Pending" => {
250 let reason = self
251 .get_pod_pending_reason(&pod_name)
252 .await
253 .unwrap_or_else(|_| "Unknown reason".to_string());
254 format!("Pod is pending: {}", reason)
255 }
256 _ => self
257 .get_pod_logs(&pod_name)
258 .await
259 .unwrap_or_else(|_| "No logs available yet".to_string()),
260 };
261
262 Ok(JobLogs {
263 job_name: job_name.to_string(),
264 pod_name,
265 logs,
266 timestamp: Utc::now(),
267 status: job_status.to_string(),
268 })
269 }
270
271 async fn get_job_pod_name(&self, job_name: &str) -> Result<String> {
273 let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
274
275 let label_selector = format!("job-name={}", job_name);
276 let list_params = ListParams::default().labels(&label_selector);
277
278 let pod_list = pods
279 .list(&list_params)
280 .await
281 .with_context(|| format!("Failed to list pods for job '{}'", job_name))?;
282
283 let pod = pod_list
284 .items
285 .into_iter()
286 .next()
287 .with_context(|| format!("No pods found for job '{}'", job_name))?;
288
289 pod.metadata
290 .name
291 .with_context(|| format!("Pod for job '{}' has no name", job_name))
292 }
293
294 async fn get_pod_logs(&self, pod_name: &str) -> Result<String> {
296 let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
297
298 let mut log_params = LogParams::default();
299 if let Some(tail_lines) = self.config.max_log_lines {
300 log_params.tail_lines = Some(tail_lines);
301 }
302 log_params.timestamps = true;
303
304 let logs = pods
305 .logs(pod_name, &log_params)
306 .await
307 .with_context(|| format!("Failed to get logs for pod '{}'", pod_name))?;
308
309 Ok(logs)
310 }
311
312 pub async fn health_check(&self) -> Result<bool> {
314 let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
315
316 match pods.list(&ListParams::default().limit(1)).await {
317 Ok(_) => Ok(true),
318 Err(e) => {
319 warn!("Kubernetes health check failed: {}", e);
320 Ok(false)
321 }
322 }
323 }
324
325 pub async fn get_job_status(&self, job_name: &str) -> Result<String> {
327 let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
328
329 let job = jobs
330 .get(job_name)
331 .await
332 .with_context(|| format!("Failed to get job '{}'", job_name))?;
333
334 let status = job
335 .status
336 .as_ref()
337 .and_then(|s| s.conditions.as_ref())
338 .map(|conditions| {
339 if conditions
340 .iter()
341 .any(|c| c.type_ == "Complete" && c.status == "True")
342 {
343 "completed".to_string()
344 } else if conditions
345 .iter()
346 .any(|c| c.type_ == "Failed" && c.status == "True")
347 {
348 "failed".to_string()
349 } else {
350 "running".to_string()
351 }
352 })
353 .unwrap_or_else(|| "pending".to_string());
354
355 Ok(status)
356 }
357
358 pub async fn delete_job(&self, job_name: &str) -> Result<()> {
360 let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
361
362 let delete_params = kube::api::DeleteParams::default();
364 jobs.delete(job_name, &delete_params)
365 .await
366 .with_context(|| format!("Failed to delete job '{}'", job_name))?;
367
368 info!("Successfully deleted job '{}'", job_name);
369 Ok(())
370 }
371
372 async fn get_pod_status(&self, pod_name: &str) -> Result<String> {
374 let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
375
376 let pod = pods
377 .get(pod_name)
378 .await
379 .with_context(|| format!("Failed to get pod '{}'", pod_name))?;
380
381 let status = pod
382 .status
383 .as_ref()
384 .and_then(|s| s.phase.as_ref())
385 .map(|phase| phase.clone())
386 .unwrap_or_else(|| "Unknown".to_string());
387
388 Ok(status)
389 }
390
391 async fn get_pod_pending_reason(&self, pod_name: &str) -> Result<String> {
393 let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
394
395 let pod = pods
396 .get(pod_name)
397 .await
398 .with_context(|| format!("Failed to get pod '{}'", pod_name))?;
399
400 let reason = pod
401 .status
402 .as_ref()
403 .and_then(|s| s.conditions.as_ref())
404 .and_then(|conditions| {
405 conditions
406 .iter()
407 .find(|c| c.type_ == "PodScheduled" && c.status == "False")
408 .and_then(|c| c.reason.as_ref())
409 })
410 .unwrap_or(&"Unknown".to_string())
411 .clone();
412
413 Ok(reason)
414 }
415}
416
417impl From<KubeError> for KubernetesError {
419 fn from(error: KubeError) -> Self {
420 KubernetesError {
421 error_type: "KubernetesError".to_string(),
422 message: error.to_string(),
423 details: Some(format!("{:?}", error)),
424 }
425 }
426}
427
428#[cfg(test)]
429mod tests {
430 use super::*;
431
432 #[tokio::test]
433 async fn test_kubernetes_client_creation() {
434 match KubernetesClient::new().await {
437 Ok(_) => {
438 println!("✅ Kubernetes client created successfully");
439 }
440 Err(e) => {
441 println!(
442 "⚠️ Kubernetes client creation failed (expected in test environment): {}",
443 e
444 );
445 }
447 }
448 }
449
450 #[tokio::test]
451 async fn test_job_name_generation() {
452 let run_id = uuid::Uuid::new_v4();
454 let job_name = format!("test-run-{}", run_id);
455
456 assert!(job_name.starts_with("test-run-"));
457 assert_eq!(job_name.len(), 45); }
459
460 #[cfg(test)]
461 mod integration_tests {
462 use super::*;
463
464 #[ignore] #[tokio::test]
469 async fn test_kubernetes_health_check() {
470 let client = KubernetesClient::new()
471 .await
472 .expect("Failed to create client");
473 let is_healthy = client.health_check().await.expect("Health check failed");
474 assert!(is_healthy);
475 }
476
477 #[ignore] #[tokio::test]
479 async fn test_job_logs_retrieval() {
480 let client = KubernetesClient::new()
481 .await
482 .expect("Failed to create client");
483
484 let job_name = "test-job";
486 match client.get_job_logs(job_name).await {
487 Ok(logs) => {
488 assert!(!logs.logs.is_empty());
489 assert_eq!(logs.job_name, job_name);
490 }
491 Err(_) => {
492 }
494 }
495 }
496 }
497}