mod common;
use common::{ServerProcess, create_test_compute_node, create_test_workflow, start_server};
use rstest::rstest;
use serial_test::serial;
use torc::client::{apis, workflow_manager::WorkflowManager};
use torc::config::TorcConfig;
use torc::models;
use torc::models::JobStatus;
#[rstest]
#[serial(completion)]
fn test_completion_reversal_resets_downstream_jobs(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_completion_reversal");
let workflow_id = workflow.id.unwrap();
let job1 = models::JobModel::new(
workflow_id,
"job1".to_string(),
"echo 'job1 failed' && exit 1".to_string(),
);
let created_job1 = apis::jobs_api::create_job(config, job1).expect("Failed to create job1");
let job1_id = created_job1.id.unwrap();
let mut job2 = models::JobModel::new(
workflow_id,
"job2".to_string(),
"echo 'job2 success'".to_string(),
);
job2.depends_on_job_ids = Some(vec![job1_id]);
job2.cancel_on_blocking_job_failure = Some(false);
let created_job2 = apis::jobs_api::create_job(config, job2).expect("Failed to create job2");
let job2_id = created_job2.id.unwrap();
let mut job3 = models::JobModel::new(
workflow_id,
"job3".to_string(),
"echo 'job3 success'".to_string(),
);
job3.depends_on_job_ids = Some(vec![job2_id]);
job3.cancel_on_blocking_job_failure = Some(false);
let created_job3 = apis::jobs_api::create_job(config, job3).expect("Failed to create job3");
let job3_id = created_job3.id.unwrap();
let torc_config = TorcConfig::load().unwrap_or_default();
let manager = WorkflowManager::new(config.clone(), torc_config, workflow);
manager.initialize(true).expect("Failed to start workflow");
let job1_initial = apis::jobs_api::get_job(config, job1_id).expect("Failed to get job1");
let job2_initial = apis::jobs_api::get_job(config, job2_id).expect("Failed to get job2");
let job3_initial = apis::jobs_api::get_job(config, job3_id).expect("Failed to get job3");
assert_eq!(job1_initial.status.unwrap(), JobStatus::Ready);
assert_eq!(job2_initial.status.unwrap(), JobStatus::Blocked);
assert_eq!(job3_initial.status.unwrap(), JobStatus::Blocked);
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let job1_result = models::ResultModel::new(
job1_id,
workflow_id,
1, 1, compute_node_id,
1, 1.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Failed,
);
apis::jobs_api::complete_job(
config,
job1_id,
job1_result.status,
1, job1_result,
)
.expect("Failed to complete job1");
let job2_result = models::ResultModel::new(
job2_id,
workflow_id,
1, 1, compute_node_id,
0, 1.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
apis::jobs_api::complete_job(
config,
job2_id,
job2_result.status,
1, job2_result,
)
.expect("Failed to complete job2");
let job3_result = models::ResultModel::new(
job3_id,
workflow_id,
1, 1, compute_node_id,
0, 1.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
apis::jobs_api::complete_job(
config,
job3_id,
job3_result.status,
1, job3_result,
)
.expect("Failed to complete job3");
let job1_completed = apis::jobs_api::get_job(config, job1_id).expect("Failed to get job1");
let job2_completed = apis::jobs_api::get_job(config, job2_id).expect("Failed to get job2");
let job3_completed = apis::jobs_api::get_job(config, job3_id).expect("Failed to get job3");
assert_eq!(job1_completed.status.unwrap(), JobStatus::Failed);
assert_eq!(job2_completed.status.unwrap(), JobStatus::Completed);
assert_eq!(job3_completed.status.unwrap(), JobStatus::Completed);
apis::workflows_api::reset_job_status(
config,
workflow_id,
Some(true), )
.expect("Failed to reset job status");
let job1_final = apis::jobs_api::get_job(config, job1_id).expect("Failed to get job1");
let job2_final = apis::jobs_api::get_job(config, job2_id).expect("Failed to get job2");
let job3_final = apis::jobs_api::get_job(config, job3_id).expect("Failed to get job3");
assert_eq!(
job1_final.status.unwrap(),
JobStatus::Uninitialized,
"job1 should be Uninitialized after reset"
);
assert_eq!(
job2_final.status.unwrap(),
JobStatus::Uninitialized,
"job2 should be Uninitialized due to completion reversal"
);
assert_eq!(
job3_final.status.unwrap(),
JobStatus::Uninitialized,
"job3 should be Uninitialized due to completion reversal"
);
}
#[rstest]
#[serial(completion)]
fn test_completion_reversal_complex_dependencies(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_complex_reversal");
let workflow_id = workflow.id.unwrap();
let job1 = models::JobModel::new(
workflow_id,
"job1".to_string(),
"echo 'job1 failed' && exit 1".to_string(),
);
let created_job1 = apis::jobs_api::create_job(config, job1).expect("Failed to create job1");
let job1_id = created_job1.id.unwrap();
let mut job2 = models::JobModel::new(
workflow_id,
"job2".to_string(),
"echo 'job2 success'".to_string(),
);
job2.depends_on_job_ids = Some(vec![job1_id]);
job2.cancel_on_blocking_job_failure = Some(false);
let created_job2 = apis::jobs_api::create_job(config, job2).expect("Failed to create job2");
let job2_id = created_job2.id.unwrap();
let mut job3 = models::JobModel::new(
workflow_id,
"job3".to_string(),
"echo 'job3 success'".to_string(),
);
job3.depends_on_job_ids = Some(vec![job1_id]);
job3.cancel_on_blocking_job_failure = Some(false);
let created_job3 = apis::jobs_api::create_job(config, job3).expect("Failed to create job3");
let job3_id = created_job3.id.unwrap();
let mut job4 = models::JobModel::new(
workflow_id,
"job4".to_string(),
"echo 'job4 success'".to_string(),
);
job4.depends_on_job_ids = Some(vec![job2_id, job3_id]);
job4.cancel_on_blocking_job_failure = Some(false);
let created_job4 = apis::jobs_api::create_job(config, job4).expect("Failed to create job4");
let job4_id = created_job4.id.unwrap();
let torc_config = TorcConfig::load().unwrap_or_default();
let manager = WorkflowManager::new(config.clone(), torc_config, workflow);
manager.initialize(true).expect("Failed to start workflow");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let jobs_and_results: Vec<(i64, i64, JobStatus)> = vec![
(job1_id, 1, JobStatus::Failed), (job2_id, 0, JobStatus::Completed), (job3_id, 0, JobStatus::Completed), (job4_id, 0, JobStatus::Completed), ];
for (job_id, return_code, status) in jobs_and_results {
let result = models::ResultModel::new(
job_id,
workflow_id,
1, 1, compute_node_id,
return_code,
1.0,
chrono::Utc::now().to_rfc3339(),
status,
);
apis::jobs_api::complete_job(config, job_id, result.status, 1, result)
.unwrap_or_else(|_| panic!("Failed to complete job {}", job_id));
}
assert_eq!(
apis::jobs_api::get_job(config, job1_id)
.expect("Failed to get job1")
.status
.unwrap(),
JobStatus::Failed
);
for job_id in [job2_id, job3_id, job4_id] {
let job = apis::jobs_api::get_job(config, job_id).expect("Failed to get job");
assert_eq!(job.status.unwrap(), JobStatus::Completed);
}
apis::workflows_api::reset_job_status(config, workflow_id, Some(true))
.expect("Failed to reset job status");
for job_id in [job1_id, job2_id, job3_id, job4_id] {
let job = apis::jobs_api::get_job(config, job_id)
.unwrap_or_else(|_| panic!("Failed to get job {}", job_id));
assert_eq!(
job.status.unwrap(),
JobStatus::Uninitialized,
"Job {} should be Uninitialized after completion reversal",
job_id
);
}
}
#[rstest]
#[serial(completion)]
fn test_completion_reversal_selective_reset(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_selective_reversal");
let workflow_id = workflow.id.unwrap();
let job1 = models::JobModel::new(
workflow_id,
"job1".to_string(),
"echo 'job1 failed' && exit 1".to_string(),
);
let created_job1 = apis::jobs_api::create_job(config, job1).expect("Failed to create job1");
let job1_id = created_job1.id.unwrap();
let mut job2 = models::JobModel::new(
workflow_id,
"job2".to_string(),
"echo 'job2 success'".to_string(),
);
job2.depends_on_job_ids = Some(vec![job1_id]);
let created_job2 = apis::jobs_api::create_job(config, job2).expect("Failed to create job2");
let job2_id = created_job2.id.unwrap();
let job3 = models::JobModel::new(
workflow_id,
"job3".to_string(),
"echo 'job3 success'".to_string(),
);
let created_job3 = apis::jobs_api::create_job(config, job3).expect("Failed to create job3");
let job3_id = created_job3.id.unwrap();
let mut job4 = models::JobModel::new(
workflow_id,
"job4".to_string(),
"echo 'job4 success'".to_string(),
);
job4.depends_on_job_ids = Some(vec![job3_id]);
let created_job4 = apis::jobs_api::create_job(config, job4).expect("Failed to create job4");
let job4_id = created_job4.id.unwrap();
let torc_config = TorcConfig::load().unwrap_or_default();
let manager = WorkflowManager::new(config.clone(), torc_config, workflow);
manager.initialize(true).expect("Failed to start workflow");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let jobs_and_results: Vec<(i64, i64, JobStatus)> = vec![
(job1_id, 1, JobStatus::Failed), (job2_id, 0, JobStatus::Completed), (job3_id, 0, JobStatus::Completed), (job4_id, 0, JobStatus::Completed), ];
for (job_id, return_code, status) in jobs_and_results {
let result = models::ResultModel::new(
job_id,
workflow_id,
1, 1, compute_node_id,
return_code,
1.0,
chrono::Utc::now().to_rfc3339(),
status,
);
apis::jobs_api::complete_job(config, job_id, result.status, 1, result)
.unwrap_or_else(|_| panic!("Failed to complete job {}", job_id));
}
apis::workflows_api::reset_job_status(config, workflow_id, Some(true))
.expect("Failed to reset job status");
let job1_final = apis::jobs_api::get_job(config, job1_id).expect("Failed to get job1");
let job2_final = apis::jobs_api::get_job(config, job2_id).expect("Failed to get job2");
assert_eq!(
job1_final.status.unwrap(),
JobStatus::Uninitialized,
"job1 should be Uninitialized (it failed)"
);
assert_eq!(
job2_final.status.unwrap(),
JobStatus::Uninitialized,
"job2 should be Uninitialized (downstream of failed job1)"
);
let job3_final = apis::jobs_api::get_job(config, job3_id).expect("Failed to get job3");
let job4_final = apis::jobs_api::get_job(config, job4_id).expect("Failed to get job4");
assert_eq!(
job3_final.status.unwrap(),
JobStatus::Completed,
"job3 should remain Completed (independent successful job)"
);
assert_eq!(
job4_final.status.unwrap(),
JobStatus::Completed,
"job4 should remain Completed (downstream of successful job3)"
);
}