use anyhow::Result;
use k8s_openapi::api::batch::v1::Job;
use kube::{
api::{Api, ListParams},
Client,
};
use orign::mutation::Mutation;
use orign::query::Query;
use sea_orm::DatabaseConnection;
use tracing::info;
pub async fn execute_report_jobs(db: &DatabaseConnection, namespace: &str) -> Result<()> {
loop {
match _execute_report_jobs(db, namespace).await {
Ok(_) => {
}
Err(e) => {
eprintln!("Error while reporting jobs: {}", e);
}
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
pub async fn _execute_report_jobs(db: &DatabaseConnection, namespace: &str) -> Result<()> {
let client = Client::try_default().await?;
let jobs_api: Api<Job> = Api::namespaced(client, namespace);
let lp = ListParams::default().labels("type=trainingjob");
let job_list = jobs_api.list(&lp).await?;
info!("job_list: {:?}", job_list);
for kjob in job_list.items {
let job_name = kjob.metadata.name.clone().unwrap_or_default();
let job_phase = if let Some(status) = &kjob.status {
if let Some(conditions) = &status.conditions {
if conditions
.iter()
.any(|cond| cond.type_ == "Complete" && cond.status == "True")
{
"Complete".to_string()
} else if conditions
.iter()
.any(|cond| cond.type_ == "Failed" && cond.status == "True")
{
"Failed".to_string()
} else {
"Active".to_string()
}
} else {
"Unknown".to_string()
}
} else {
"Unknown".to_string()
};
if let Some(job_id_label) = kjob
.metadata
.labels
.as_ref()
.and_then(|labels| labels.get("id").cloned())
{
if let Ok(Some(db_job)) = Query::find_training_job_by_id(db, &job_id_label).await {
Mutation::update_training_job_status(db, &db_job.id, &job_phase).await?;
println!(
"Job '{}' => Updated training job '{}' status to '{}'",
job_name, db_job.id, job_phase
);
} else {
println!(
"Job '{}' => no matching training job for job_id '{}'",
job_name, job_id_label
);
}
} else {
println!("Job '{}' => no 'id' label found.", job_name);
}
}
Ok(())
}