use super::*;
impl SchedulerRuntime {
fn retry_lineage_root_id(job: &Job) -> u32 {
job.redone_from.unwrap_or(job.id)
}
fn retry_budget_root_id(&self, job_id: u32) -> u32 {
let mut current_job_id = job_id;
while let Some(parent_id) = self
.scheduler
.get_job_spec(current_job_id)
.and_then(|spec| spec.retried_from)
{
current_job_id = parent_id;
}
current_job_id
}
fn retries_used_for_budget_root(&self, root_job_id: u32) -> u32 {
self.scheduler
.job_specs()
.iter()
.enumerate()
.filter(|spec| {
spec.1.retried_from.is_some()
&& self.retry_budget_root_id((spec.0 + 1) as u32) == root_job_id
})
.count() as u32
}
fn should_retry_job(&self, job: &Job) -> bool {
if job.state != JobState::Running {
return false;
}
if job.max_retries == 0 {
return false;
}
let root_job_id = self.retry_budget_root_id(job.id);
self.retries_used_for_budget_root(root_job_id) < job.max_retries
}
fn build_retry_job(&self, original_job: &Job) -> Job {
let retry_root_id = Self::retry_lineage_root_id(original_job);
let depends_on_ids = original_job.all_dependency_ids();
let mut builder = Job::builder();
if let Some(ref script) = original_job.script {
builder = builder.script((**script).clone());
}
if let Some(ref command) = original_job.command {
builder = builder.command(command.clone());
}
builder = builder.gpus(original_job.gpus);
builder = builder.gpu_sharing_mode(original_job.gpu_sharing_mode);
builder = builder.priority(original_job.priority);
builder = builder.conda_env(original_job.conda_env.as_ref().map(|s| s.to_string()));
builder = builder.time_limit(original_job.time_limit);
builder = builder.memory_limit_mb(original_job.memory_limit_mb);
builder = builder.gpu_memory_limit_mb(original_job.gpu_memory_limit_mb);
builder = builder.depends_on_ids(depends_on_ids.clone());
builder = builder.dependency_mode(original_job.dependency_mode);
builder = builder
.auto_cancel_on_dependency_failure(original_job.auto_cancel_on_dependency_failure);
if depends_on_ids.len() == 1 {
builder = builder.depends_on(Some(depends_on_ids[0]));
}
builder = builder.run_dir(original_job.run_dir.clone());
builder = builder.task_id(original_job.task_id);
builder = builder.max_retries(original_job.max_retries);
builder = builder.auto_close_tmux(original_job.auto_close_tmux);
builder = builder.parameters_compact(original_job.parameters.clone());
builder = builder.group_id_uuid(original_job.group_id);
builder = builder.max_concurrent(original_job.max_concurrent);
builder = builder.project(original_job.project.as_ref().map(|s| s.to_string()));
builder = builder.notifications(original_job.notifications.clone());
builder = builder.redone_from(Some(retry_root_id));
builder = builder.retried_from(Some(original_job.id));
builder = builder.submitted_by(original_job.submitted_by.to_string());
builder.build()
}
pub(super) async fn finalize_job_with_retry(
&mut self,
job_id: u32,
final_state: JobState,
) -> Option<Option<u32>> {
let original_job = self.scheduler.get_job(job_id)?;
if !matches!(final_state, JobState::Failed | JobState::Timeout) {
return None;
}
if original_job.state != JobState::Running {
return None;
}
if final_state == JobState::Failed && self.should_retry_job(&original_job) {
let retry_job = self.build_retry_job(&original_job);
match self.submit_job(retry_job).await {
Ok((new_job_id, _run_name, _stored_job)) => {
self.scheduler
.retarget_dependents_to_retry(job_id, new_job_id);
let transitioned = match final_state {
JobState::Failed => self.scheduler.fail_job_without_propagation(job_id),
JobState::Timeout => self.scheduler.timeout_job_without_propagation(job_id),
_ => false,
};
if transitioned {
self.mark_dirty();
return Some(Some(new_job_id));
}
}
Err(error) => {
tracing::error!(
job_id,
desired_state = %final_state,
error = %error,
"Automatic retry submission failed; falling back to final terminal state"
);
}
}
}
let transitioned = match final_state {
JobState::Failed => self.scheduler.fail_job(job_id),
JobState::Timeout => self.scheduler.timeout_job(job_id),
_ => false,
};
if transitioned {
self.mark_dirty();
Some(None)
} else {
None
}
}
}