sparktest_api/
k8s.rs

1use anyhow::{Context, Result};
2use chrono::Utc;
3use k8s_openapi::api::batch::v1::Job;
4use k8s_openapi::api::core::v1::{Container, Pod, PodSpec, PodTemplateSpec};
5use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
6use kube::{
7    api::{Api, ListParams, LogParams, PostParams},
8    Client, Error as KubeError,
9};
10use serde::{Deserialize, Serialize};
11use sqlx::PgPool;
12use tokio::time::{sleep, Duration};
13use tracing::{info, warn};
14use uuid::Uuid;
15
16#[derive(Debug, Serialize, Deserialize)]
17pub struct KubeConfig {
18    pub namespace: String,
19    pub timeout_seconds: u64,
20    pub max_log_lines: Option<i64>,
21}
22
23impl Default for KubeConfig {
24    fn default() -> Self {
25        Self {
26            namespace: "default".to_string(),
27            timeout_seconds: 300,
28            max_log_lines: Some(1000),
29        }
30    }
31}
32
33#[derive(Debug, Serialize, Deserialize)]
34pub struct JobLogs {
35    pub job_name: String,
36    pub pod_name: String,
37    pub logs: String,
38    pub timestamp: chrono::DateTime<chrono::Utc>,
39    pub status: String,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub struct KubernetesError {
44    pub error_type: String,
45    pub message: String,
46    pub details: Option<String>,
47}
48
49pub async fn create_k8s_job(
50    client: &Client,
51    job_name: &str,
52    image: &str,
53    command: &[String],
54) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
55    let jobs: Api<Job> = Api::namespaced(client.clone(), "default");
56
57    let job = Job {
58        metadata: ObjectMeta {
59            name: Some(job_name.to_string()),
60            labels: Some(std::collections::BTreeMap::from([
61                ("app".to_string(), "sparktest".to_string()),
62                ("component".to_string(), "test-runner".to_string()),
63            ])),
64            ..Default::default()
65        },
66        spec: Some(k8s_openapi::api::batch::v1::JobSpec {
67            template: PodTemplateSpec {
68                metadata: Some(ObjectMeta {
69                    labels: Some(std::collections::BTreeMap::from([
70                        ("job-name".to_string(), job_name.to_string()),
71                        ("app".to_string(), "sparktest".to_string()),
72                    ])),
73                    ..Default::default()
74                }),
75                spec: Some(PodSpec {
76                    containers: vec![Container {
77                        name: job_name.to_string(),
78                        image: Some(image.to_string()),
79                        command: Some(command.to_vec()),
80                        ..Default::default()
81                    }],
82                    restart_policy: Some("Never".to_string()),
83                    ..Default::default()
84                }),
85            },
86            backoff_limit: Some(0),
87            ttl_seconds_after_finished: Some(3600), // Clean up after 1 hour
88            ..Default::default()
89        }),
90        ..Default::default()
91    };
92
93    jobs.create(&PostParams::default(), &job).await?;
94    Ok(())
95}
96
97pub async fn monitor_job_and_update_status(
98    run_id: Uuid,
99    job_name: String,
100    pool: PgPool,
101) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
102    let client = Client::try_default().await?;
103    let jobs: Api<Job> = Api::namespaced(client.clone(), "default");
104
105    let start_time = Utc::now();
106    let mut status = "running".to_string();
107
108    for _ in 0..30 {
109        let job = jobs.get(&job_name).await?;
110        if let Some(s) = &job.status {
111            if let Some(conds) = &s.conditions {
112                if conds
113                    .iter()
114                    .any(|c| c.type_ == "Complete" && c.status == "True")
115                {
116                    status = "succeeded".to_string();
117                    break;
118                } else if conds
119                    .iter()
120                    .any(|c| c.type_ == "Failed" && c.status == "True")
121                {
122                    status = "failed".to_string();
123                    break;
124                }
125            }
126        }
127        sleep(Duration::from_secs(2)).await;
128    }
129
130    let duration = (Utc::now() - start_time).num_seconds() as i32;
131
132    sqlx::query("UPDATE test_runs SET status = $1, duration = $2 WHERE id = $3")
133        .bind(&status)
134        .bind(duration)
135        .bind(run_id)
136        .execute(&pool)
137        .await?;
138
139    Ok(())
140}
141
142pub struct KubernetesClient {
143    client: Client,
144    config: KubeConfig,
145}
146
147impl KubernetesClient {
148    /// Create a new Kubernetes client with authentication
149    pub async fn new() -> Result<Self> {
150        let client = Self::create_authenticated_client().await?;
151        let config = KubeConfig::default();
152
153        Ok(Self { client, config })
154    }
155
156    /// Create a new Kubernetes client with custom configuration
157    pub async fn new_with_config(config: KubeConfig) -> Result<Self> {
158        let client = Self::create_authenticated_client().await?;
159        Ok(Self { client, config })
160    }
161
162    /// Create authenticated Kubernetes client with fallback mechanisms
163    async fn create_authenticated_client() -> Result<Client> {
164        // Try different authentication methods in order of preference
165
166        // 1. Try in-cluster authentication (for pods running in Kubernetes)
167        if let Ok(client) = Client::try_default().await {
168            info!("Using in-cluster Kubernetes authentication");
169            return Ok(client);
170        }
171
172        // 2. Try kubeconfig from default locations
173        match kube::Config::from_kubeconfig(&kube::config::KubeConfigOptions::default()).await {
174            Ok(config) => {
175                info!("Using kubeconfig authentication");
176                return Ok(Client::try_from(config)?);
177            }
178            Err(e) => {
179                warn!("Failed to load kubeconfig: {}", e);
180            }
181        }
182
183        // 3. Try environment-based configuration
184        if let Ok(config) = Self::config_from_env() {
185            info!("Using environment-based Kubernetes authentication");
186            return Ok(Client::try_from(config)?);
187        }
188
189        // 4. Final fallback - try default client creation
190        Client::try_default()
191            .await
192            .context("Failed to create Kubernetes client with any authentication method")
193    }
194
195    /// Create configuration from environment variables
196    fn config_from_env() -> Result<kube::Config> {
197        // Try to use the service account token method
198        let token_path = "/var/run/secrets/kubernetes.io/serviceaccount/token";
199        let ca_path = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
200
201        if std::path::Path::new(token_path).exists() && std::path::Path::new(ca_path).exists() {
202            // We're running inside a Kubernetes cluster with service account
203            return kube::Config::incluster().context("Failed to create in-cluster config");
204        }
205
206        // Fallback to error if we can't create config
207        Err(anyhow::anyhow!("No valid Kubernetes configuration found"))
208    }
209
210    /// Get job logs with comprehensive error handling
211    pub async fn get_job_logs(&self, job_name: &str) -> Result<JobLogs> {
212        let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
213
214        // First, get the job to check its status
215        let job = jobs
216            .get(job_name)
217            .await
218            .with_context(|| format!("Failed to get job '{}'", job_name))?;
219
220        let job_status = job
221            .status
222            .as_ref()
223            .and_then(|s| s.conditions.as_ref())
224            .map(|conditions| {
225                if conditions
226                    .iter()
227                    .any(|c| c.type_ == "Complete" && c.status == "True")
228                {
229                    "completed"
230                } else if conditions
231                    .iter()
232                    .any(|c| c.type_ == "Failed" && c.status == "True")
233                {
234                    "failed"
235                } else {
236                    "running"
237                }
238            })
239            .unwrap_or("unknown");
240
241        // Get the pod associated with this job
242        let pod_name = self.get_job_pod_name(job_name).await?;
243
244        // Check if pod is pending and provide helpful message
245        let pod_status = self.get_pod_status(&pod_name).await?;
246
247        // Fetch logs from the pod (handle pending pods gracefully)
248        let logs = match pod_status.as_str() {
249            "Pending" => {
250                let reason = self
251                    .get_pod_pending_reason(&pod_name)
252                    .await
253                    .unwrap_or_else(|_| "Unknown reason".to_string());
254                format!("Pod is pending: {}", reason)
255            }
256            _ => self
257                .get_pod_logs(&pod_name)
258                .await
259                .unwrap_or_else(|_| "No logs available yet".to_string()),
260        };
261
262        Ok(JobLogs {
263            job_name: job_name.to_string(),
264            pod_name,
265            logs,
266            timestamp: Utc::now(),
267            status: job_status.to_string(),
268        })
269    }
270
271    /// Get the pod name associated with a job
272    async fn get_job_pod_name(&self, job_name: &str) -> Result<String> {
273        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
274
275        let label_selector = format!("job-name={}", job_name);
276        let list_params = ListParams::default().labels(&label_selector);
277
278        let pod_list = pods
279            .list(&list_params)
280            .await
281            .with_context(|| format!("Failed to list pods for job '{}'", job_name))?;
282
283        let pod = pod_list
284            .items
285            .into_iter()
286            .next()
287            .with_context(|| format!("No pods found for job '{}'", job_name))?;
288
289        pod.metadata
290            .name
291            .with_context(|| format!("Pod for job '{}' has no name", job_name))
292    }
293
294    /// Get logs from a specific pod
295    async fn get_pod_logs(&self, pod_name: &str) -> Result<String> {
296        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
297
298        let mut log_params = LogParams::default();
299        if let Some(tail_lines) = self.config.max_log_lines {
300            log_params.tail_lines = Some(tail_lines);
301        }
302        log_params.timestamps = true;
303
304        let logs = pods
305            .logs(pod_name, &log_params)
306            .await
307            .with_context(|| format!("Failed to get logs for pod '{}'", pod_name))?;
308
309        Ok(logs)
310    }
311
312    /// Check if the Kubernetes cluster is accessible
313    pub async fn health_check(&self) -> Result<bool> {
314        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
315
316        match pods.list(&ListParams::default().limit(1)).await {
317            Ok(_) => Ok(true),
318            Err(e) => {
319                warn!("Kubernetes health check failed: {}", e);
320                Ok(false)
321            }
322        }
323    }
324
325    /// Get job status
326    pub async fn get_job_status(&self, job_name: &str) -> Result<String> {
327        let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
328
329        let job = jobs
330            .get(job_name)
331            .await
332            .with_context(|| format!("Failed to get job '{}'", job_name))?;
333
334        let status = job
335            .status
336            .as_ref()
337            .and_then(|s| s.conditions.as_ref())
338            .map(|conditions| {
339                if conditions
340                    .iter()
341                    .any(|c| c.type_ == "Complete" && c.status == "True")
342                {
343                    "completed".to_string()
344                } else if conditions
345                    .iter()
346                    .any(|c| c.type_ == "Failed" && c.status == "True")
347                {
348                    "failed".to_string()
349                } else {
350                    "running".to_string()
351                }
352            })
353            .unwrap_or_else(|| "pending".to_string());
354
355        Ok(status)
356    }
357
358    /// Delete a job and its associated pods
359    pub async fn delete_job(&self, job_name: &str) -> Result<()> {
360        let jobs: Api<Job> = Api::namespaced(self.client.clone(), &self.config.namespace);
361
362        // Delete the job (this will also clean up associated pods)
363        let delete_params = kube::api::DeleteParams::default();
364        jobs.delete(job_name, &delete_params)
365            .await
366            .with_context(|| format!("Failed to delete job '{}'", job_name))?;
367
368        info!("Successfully deleted job '{}'", job_name);
369        Ok(())
370    }
371
372    /// Get pod status
373    async fn get_pod_status(&self, pod_name: &str) -> Result<String> {
374        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
375
376        let pod = pods
377            .get(pod_name)
378            .await
379            .with_context(|| format!("Failed to get pod '{}'", pod_name))?;
380
381        let status = pod
382            .status
383            .as_ref()
384            .and_then(|s| s.phase.as_ref())
385            .map(|phase| phase.clone())
386            .unwrap_or_else(|| "Unknown".to_string());
387
388        Ok(status)
389    }
390
391    /// Get the reason why a pod is pending
392    async fn get_pod_pending_reason(&self, pod_name: &str) -> Result<String> {
393        let pods: Api<Pod> = Api::namespaced(self.client.clone(), &self.config.namespace);
394
395        let pod = pods
396            .get(pod_name)
397            .await
398            .with_context(|| format!("Failed to get pod '{}'", pod_name))?;
399
400        let reason = pod
401            .status
402            .as_ref()
403            .and_then(|s| s.conditions.as_ref())
404            .and_then(|conditions| {
405                conditions
406                    .iter()
407                    .find(|c| c.type_ == "PodScheduled" && c.status == "False")
408                    .and_then(|c| c.reason.as_ref())
409            })
410            .unwrap_or(&"Unknown".to_string())
411            .clone();
412
413        Ok(reason)
414    }
415}
416
417// Convert Kubernetes errors to our custom error type
418impl From<KubeError> for KubernetesError {
419    fn from(error: KubeError) -> Self {
420        KubernetesError {
421            error_type: "KubernetesError".to_string(),
422            message: error.to_string(),
423            details: Some(format!("{:?}", error)),
424        }
425    }
426}
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431
432    #[tokio::test]
433    async fn test_kubernetes_client_creation() {
434        // This test verifies that the Kubernetes client can be created
435        // It will succeed if running in a cluster or with valid kubeconfig
436        match KubernetesClient::new().await {
437            Ok(_) => {
438                println!("✅ Kubernetes client created successfully");
439            }
440            Err(e) => {
441                println!(
442                    "⚠️ Kubernetes client creation failed (expected in test environment): {}",
443                    e
444                );
445                // This is expected when not running in a Kubernetes cluster
446            }
447        }
448    }
449
450    #[tokio::test]
451    async fn test_job_name_generation() {
452        // Test that job names are generated correctly for test runs
453        let run_id = uuid::Uuid::new_v4();
454        let job_name = format!("test-run-{}", run_id);
455
456        assert!(job_name.starts_with("test-run-"));
457        assert_eq!(job_name.len(), 45); // "test-run-" (9) + UUID (36)
458    }
459
460    #[cfg(test)]
461    mod integration_tests {
462        use super::*;
463
464        // These tests would run if we have a real Kubernetes cluster available
465        // For now, they're disabled but show how to test the functionality
466
467        #[ignore] // Remove this when running against a real cluster
468        #[tokio::test]
469        async fn test_kubernetes_health_check() {
470            let client = KubernetesClient::new()
471                .await
472                .expect("Failed to create client");
473            let is_healthy = client.health_check().await.expect("Health check failed");
474            assert!(is_healthy);
475        }
476
477        #[ignore] // Remove this when running against a real cluster
478        #[tokio::test]
479        async fn test_job_logs_retrieval() {
480            let client = KubernetesClient::new()
481                .await
482                .expect("Failed to create client");
483
484            // This would test against a real job in the cluster
485            let job_name = "test-job";
486            match client.get_job_logs(job_name).await {
487                Ok(logs) => {
488                    assert!(!logs.logs.is_empty());
489                    assert_eq!(logs.job_name, job_name);
490                }
491                Err(_) => {
492                    // Expected if job doesn't exist
493                }
494            }
495        }
496    }
497}