sparktest_api/
k8s.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use k8s_openapi::api::batch::v1::Job;
4use k8s_openapi::api::core::v1::{Container, Pod, PodSpec, PodTemplateSpec};
5use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
6use kube::{
7    api::{Api, ListParams, LogParams, PostParams},
8    Client, Error as KubeError,
9};
10use serde::{Deserialize, Serialize};
11use sqlx::PgPool;
12use tokio::time::{sleep, Duration};
13use tracing::{info, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct KubeConfig {
18    pub namespace: String,
19    pub timeout_seconds: u64,
20    pub max_log_lines: Option<i64>,
21}
22
23impl Default for KubeConfig {
24    fn default() -> Self {
25        Self {
26            namespace: "default".to_string(),
27            timeout_seconds: 300,
28            max_log_lines: Some(1000),
29        }
30    }
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct JobLogs {
35    pub job_name: String,
36    pub pod_name: String,
37    pub logs: String,
38    pub timestamp: chrono::DateTime<chrono::Utc>,
39    pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct KubernetesError {
44    pub error_type: String,
45    pub message: String,
46    pub details: Option<String>,
47}
48
49pub async fn create_k8s_job(
50    client: &Client,
51    job_name: &str,
52    image: &str,
53    command: &[String],
54) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
55    let jobs: Api<Job> = Api::namespaced(client.clone(), "default");
56
57    let job = Job {
58        metadata: ObjectMeta {
59            name: Some(job_name.to_string()),
60            labels: Some(std::collections::BTreeMap::from([
61                ("app".to_string(), "sparktest".to_string()),
62                ("component".to_string(), "test-runner".to_string()),
63            ])),
64            ..Default::default()
65        },
66        spec: Some(k8s_openapi::api::batch::v1::JobSpec {
67            template: PodTemplateSpec {
68                metadata: Some(ObjectMeta {
69                    labels: Some(std::collections::BTreeMap::from([
70                        ("job-name".to_string(), job_name.to_string()),
71                        ("app".to_string(), "sparktest".to_string()),
72                    ])),
73                    ..Default::default()
74                }),
75                spec: Some(PodSpec {
76                    containers: vec![Container {
77                        name: job_name.to_string(),
78                        image: Some(image.to_string()),
79                        command: Some(command.to_vec()),
80                        ..Default::default()
81                    }],
82                    restart_policy: Some("Never".to_string()),
83                    ..Default::default()
84                }),
85            },
86            backoff_limit: Some(0),
87            ttl_seconds_after_finished: Some(3600), // Clean up after 1 hour
88            ..Default::default()
89        }),
90        ..Default::default()
91    };
92
93    jobs.create(&PostParams::default(), &job).await?;
94    Ok(())
95}
96
97pub async fn monitor_job_and_update_status(
98    run_id: Uuid,
99    job_name: String,
100    pool: PgPool,
101) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
102    let client = Client::try_default().await?;
103    let jobs: Api<Job> = Api::namespaced(client.clone(), "default");
104
105    let start_time = Utc::now();
106    let mut status = "running".to_string();
107
108    for _ in 0..30 {
109        let job = jobs.get(&job_name).await?;
110        if let Some(s) = &job.status {
111            if let Some(conds) = &s.conditions {
112                if conds
113                    .iter()
114                    .any(|c| c.type_ == "Complete" && c.status == "True")
115                {
116                    status = "succeeded".to_string();
117                    break;
118                } else if conds
119                    .iter()
120                    .any(|c| c.type_ == "Failed" && c.status == "True")
121                {
122                    status = "failed".to_string();
123                    break;
124                }
125            }
126        }
127        sleep(Duration::from_secs(2)).await;
128    }
129
130    let duration = (Utc::now() - start_time).num_seconds() as i32;
131
132    sqlx::query("UPDATE test_runs SET status = $1, duration = $2 WHERE id = $3")
133        .bind(&status)
134        .bind(duration)
135        .bind(run_id)
136        .execute(&pool)
137        .await?;
138
139    Ok(())
140}
141
142pub struct KubernetesClient {
143    client: Client,
144    config: KubeConfig,
145}
146
147impl KubernetesClient {
148    /// Create a new Kubernetes client with authentication
149    pub async fn new() -> Result<Self> {
150        let client = Self::create_authenticated_client().await?;
151        let config = KubeConfig::default();
152
153        Ok(Self { client, config })
154    }
155
156    /// Create a new Kubernetes client with custom configuration
157    pub async fn new_with_config(config: KubeConfig) -> Result<Self> {
158        let client = Self::create_authenticated_client().await?;
159        Ok(Self { client, config })
160    }
161
162    /// Create authenticated Kubernetes client with fallback mechanisms
163    async fn create_authenticated_client() -> Result<Client> {
164        // Try different authentication methods in order of preference
165
166        // 1. Try in-cluster authentication (for pods running in Kubernetes)
167        if let Ok(client) = Client::try_default().await {
168            info!("Using in-cluster Kubernetes authentication");
169            return Ok(client);
170        }
171
172        // 2. Try kubeconfig from default locations
173        match kube::Config::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await {
174            Ok(config) => {
175                info!("Using kubeconfig authentication");
176                return Ok(Client::try_from(config)?);
177            }
178            Err(e) => {
179                warn!("Failed to load kubeconfig: {}", e);
180            }
181        }
182
183        // 3. Try environment-based configuration
184        if let Ok(config) = Self::config_from_env() {
185            info!("Using environment-based Kubernetes authentication");
186            return Ok(Client::try_from(config)?);
187        }
188
189        // 4. Final fallback - try default client creation
190        Client::try_default()
191            .await
192            .context("Failed to create Kubernetes client with any authentication method")
193    }
194
195    /// Create configuration from environment variables
196    fn config_from_env() -> Result<kube::Config> {
197        // Try to use the service account token method
198        let token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token";
199        let ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
200
201        if std::path::Path::new(token_path).exists() && std::path::Path::new(ca_path).exists() {
202            // We're running inside a Kubernetes cluster with service account
203            return kube::Config::incluster().context("Failed to create in-cluster config");
204        }
205
206        // Fallback to error if we can't create config
207        Err(anyhow::anyhow!("No valid Kubernetes configuration found"))
208    }
209
210    /// Get job logs with comprehensive error handling
211    pub async fn get_job_logs(&self, job_name: &str) -> Result<JobLogs> {
212        let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
213
214        // First, get the job to check its status
215        let job = jobs
216            .get(job_name)
217            .await
218            .with_context(|| format!("Failed to get job '{job_name}'"))?;
219
220        let job_status = job
221            .status
222            .as_ref()
223            .and_then(|s| s.conditions.as_ref())
224            .map(|conditions| {
225                if conditions
226                    .iter()
227                    .any(|c| c.type_ == "Complete" && c.status == "True")
228                {
229                    "completed"
230                } else if conditions
231                    .iter()
232                    .any(|c| c.type_ == "Failed" && c.status == "True")
233                {
234                    "failed"
235                } else {
236                    "running"
237                }
238            })
239            .unwrap_or("unknown");
240
241        // Get the pod associated with this job
242        let pod_name = self.get_job_pod_name(job_name).await?;
243
244        // Check if pod is pending and provide helpful message
245        let pod_status = self.get_pod_status(&pod_name).await?;
246
247        // Fetch logs from the pod (handle pending pods gracefully)
248        let logs = match pod_status.as_str() {
249            "Pending" => {
250                let reason = self
251                    .get_pod_pending_reason(&pod_name)
252                    .await
253                    .unwrap_or_else(|_| "Unknown reason".to_string());
254                format!("Pod is pending: {reason}")
255            }
256            _ => self
257                .get_pod_logs(&pod_name)
258                .await
259                .unwrap_or_else(|_| "No logs available yet".to_string()),
260        };
261
262        Ok(JobLogs {
263            job_name: job_name.to_string(),
264            pod_name,
265            logs,
266            timestamp: Utc::now(),
267            status: job_status.to_string(),
268        })
269    }
270
271    /// Get the pod name associated with a job
272    async fn get_job_pod_name(&self, job_name: &str) -> Result<String> {
273        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
274
275        let label_selector = format!("job-name={job_name}");
276        let list_params = ListParams::default().labels(&label_selector);
277
278        let pod_list = pods
279            .list(&list_params)
280            .await
281            .with_context(|| format!("Failed to list pods for job '{job_name}'"))?;
282
283        let pod = pod_list
284            .items
285            .into_iter()
286            .next()
287            .with_context(|| format!("No pods found for job '{job_name}'"))?;
288
289        pod.metadata
290            .name
291            .with_context(|| format!("Pod for job '{job_name}' has no name"))
292    }
293
294    /// Get logs from a specific pod
295    async fn get_pod_logs(&self, pod_name: &str) -> Result<String> {
296        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
297
298        let mut log_params = LogParams::default();
299        if let Some(tail_lines) = self.config.max_log_lines {
300            log_params.tail_lines = Some(tail_lines);
301        }
302        log_params.timestamps = true;
303
304        let logs = pods
305            .logs(pod_name, &log_params)
306            .await
307            .with_context(|| format!("Failed to get logs for pod '{pod_name}'"))?;
308
309        Ok(logs)
310    }
311
312    /// Check if the Kubernetes cluster is accessible
313    pub async fn health_check(&self) -> Result<bool> {
314        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
315
316        match pods.list(&ListParams::default().limit(1)).await {
317            Ok(_) => Ok(true),
318            Err(e) => {
319                warn!("Kubernetes health check failed: {}", e);
320                Ok(false)
321            }
322        }
323    }
324
325    /// Get job status
326    pub async fn get_job_status(&self, job_name: &str) -> Result<String> {
327        let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
328
329        let job = jobs
330            .get(job_name)
331            .await
332            .with_context(|| format!("Failed to get job '{job_name}'"))?;
333
334        let status = job
335            .status
336            .as_ref()
337            .and_then(|s| s.conditions.as_ref())
338            .map(|conditions| {
339                if conditions
340                    .iter()
341                    .any(|c| c.type_ == "Complete" && c.status == "True")
342                {
343                    "completed".to_string()
344                } else if conditions
345                    .iter()
346                    .any(|c| c.type_ == "Failed" && c.status == "True")
347                {
348                    "failed".to_string()
349                } else {
350                    "running".to_string()
351                }
352            })
353            .unwrap_or_else(|| "pending".to_string());
354
355        Ok(status)
356    }
357
358    /// Delete a job and its associated pods
359    pub async fn delete_job(&self, job_name: &str) -> Result<()> {
360        let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
361
362        // Delete the job (this will also clean up associated pods)
363        let delete_params = kube::api::DeleteParams::default();
364        jobs.delete(job_name, &delete_params)
365            .await
366            .with_context(|| format!("Failed to delete job '{job_name}'"))?;
367
368        info!("Successfully deleted job '{}'", job_name);
369        Ok(())
370    }
371
372    /// Get pod status
373    async fn get_pod_status(&self, pod_name: &str) -> Result<String> {
374        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
375
376        let pod = pods
377            .get(pod_name)
378            .await
379            .with_context(|| format!("Failed to get pod '{pod_name}'"))?;
380
381        let status = pod
382            .status
383            .as_ref()
384            .and_then(|s| s.phase.as_ref())
385            .cloned()
386            .unwrap_or_else(|| "Unknown".to_string());
387
388        Ok(status)
389    }
390
391    /// Get the reason why a pod is pending
392    async fn get_pod_pending_reason(&self, pod_name: &str) -> Result<String> {
393        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
394
395        let pod = pods
396            .get(pod_name)
397            .await
398            .with_context(|| format!("Failed to get pod '{pod_name}'"))?;
399
400        let reason = pod
401            .status
402            .as_ref()
403            .and_then(|s| s.conditions.as_ref())
404            .and_then(|conditions| {
405                conditions
406                    .iter()
407                    .find(|c| c.type_ == "PodScheduled" && c.status == "False")
408                    .and_then(|c| c.reason.as_ref())
409            })
410            .unwrap_or(&"Unknown".to_string())
411            .clone();
412
413        Ok(reason)
414    }
415}
416
417// Convert Kubernetes errors to our custom error type
418impl From<KubeError> for KubernetesError {
419    fn from(error: KubeError) -> Self {
420        KubernetesError {
421            error_type: "KubernetesError".to_string(),
422            message: error.to_string(),
423            details: Some(format!("{error:?}")),
424        }
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[tokio::test]
433    async fn test_kubernetes_client_creation() {
434        // This test verifies that the Kubernetes client can be created
435        // It will succeed if running in a cluster or with valid kubeconfig
436        match KubernetesClient::new().await {
437            Ok(_) => {
438                println!("✅ Kubernetes client created successfully");
439            }
440            Err(e) => {
441                println!(
442                    "⚠️ Kubernetes client creation failed (expected in test environment): {e}"
443                );
444                // This is expected when not running in a Kubernetes cluster
445            }
446        }
447    }
448
449    #[tokio::test]
450    async fn test_job_name_generation() {
451        // Test that job names are generated correctly for test runs
452        let run_id = uuid::Uuid::new_v4();
453        let job_name = format!("test-run-{run_id}");
454
455        assert!(job_name.starts_with("test-run-"));
456        assert_eq!(job_name.len(), 45); // "test-run-" (9) + UUID (36)
457    }
458
459    #[cfg(test)]
460    mod integration_tests {
461        use super::*;
462
463        // These tests would run if we have a real Kubernetes cluster available
464        // For now, they're disabled but show how to test the functionality
465
466        #[ignore] // Remove this when running against a real cluster
467        #[tokio::test]
468        async fn test_kubernetes_health_check() {
469            let client = KubernetesClient::new()
470                .await
471                .expect("Failed to create client");
472            let is_healthy = client.health_check().await.expect("Health check failed");
473            assert!(is_healthy);
474        }
475
476        #[ignore] // Remove this when running against a real cluster
477        #[tokio::test]
478        async fn test_job_logs_retrieval() {
479            let client = KubernetesClient::new()
480                .await
481                .expect("Failed to create client");
482
483            // This would test against a real job in the cluster
484            let job_name = "test-job";
485            match client.get_job_logs(job_name).await {
486                Ok(logs) => {
487                    assert!(!logs.logs.is_empty());
488                    assert_eq!(logs.job_name, job_name);
489                }
490                Err(_) => {
491                    // Expected if job doesn't exist
492                }
493            }
494        }
495    }
496}