1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// use anyhow::Result;
// use k8s_openapi::api::batch::v1::Job;
// use kube::{
// api::{Api, ListParams},
// Client,
// };
// use nebulous::mutation::Mutation;
// use nebulous::query::Query;
// use sea_orm::DatabaseConnection;
// use tracing::info;
// pub async fn execute_report_jobs(db: &DatabaseConnection, namespace: &str) -> Result<()> {
// loop {
// match _execute_report_jobs(db, namespace).await {
// Ok(_) => {
// // Successfully executed
// }
// Err(e) => {
// eprintln!("Error while reporting jobs: {}", e);
// // Errors won't break the loop
// }
// }
// // Sleep some amount of time before the next iteration
// tokio::time::sleep(std::time::Duration::from_secs(5)).await;
// }
// }
// pub async fn _execute_report_jobs(db: &DatabaseConnection, namespace: &str) -> Result<()> {
// // 1) Create a Kubernetes client
// let client = Client::try_default().await?;
// // 2) Set up an Api for fetching jobs in your namespace
// let jobs_api: Api<Job> = Api::namespaced(client, namespace);
// // 3) Filter jobs with the label "type=trainingjob" (as an example)
// let lp = ListParams::default().labels("type=trainingjob");
// let job_list = jobs_api.list(&lp).await?;
// info!("job_list: {:?}", job_list);
// // 4) For each Kubernetes Job, look up "id" from labels, then update your DB record
// for kjob in job_list.items {
// let job_name = kjob.metadata.name.clone().unwrap_or_default();
// // Here’s one way to parse a user-friendly status from the "conditions" field
// let job_phase = if let Some(status) = &kjob.status {
// if let Some(conditions) = &status.conditions {
// if conditions
// .iter()
// .any(|cond| cond.type_ == "Complete" && cond.status == "True")
// {
// "Complete".to_string()
// } else if conditions
// .iter()
// .any(|cond| cond.type_ == "Failed" && cond.status == "True")
// {
// "Failed".to_string()
// } else {
// "Active".to_string()
// }
// } else {
// "Unknown".to_string()
// }
// } else {
// "Unknown".to_string()
// };
// // Look for a "job_id" label (or "id", depending on how you labeled it)
// if let Some(job_id_label) = kjob
// .metadata
// .labels
// .as_ref()
// .and_then(|labels| labels.get("id").cloned())
// {
// // Find the corresponding training_job by ID in the DB
// if let Ok(Some(db_job)) = Query::find_training_job_by_id(db, &job_id_label).await {
// // Update the training job's status
// Mutation::update_training_job_status(db, &db_job.id, &job_phase).await?;
// println!(
// "Job '{}' => Updated training job '{}' status to '{}'",
// job_name, db_job.id, job_phase
// );
// } else {
// println!(
// "Job '{}' => no matching training job for job_id '{}'",
// job_name, job_id_label
// );
// }
// } else {
// println!("Job '{}' => no 'id' label found.", job_name);
// }
// }
// Ok(())
// }