mod common;
use common::{
ServerProcess, create_test_compute_node, create_test_file, create_test_workflow, start_server,
};
use rstest::rstest;
use torc::client::{apis, config::TorcConfig, workflow_manager::WorkflowManager};
use torc::models;
use torc::models::JobStatus;
#[rstest]
fn test_list_required_existing_files_missing_user_files(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_missing_user_files");
let workflow_id = workflow.id.unwrap();
let user_file1 = create_test_file(config, workflow_id, "input1", "/data/input1.txt");
let user_file2 = create_test_file(config, workflow_id, "input2", "/data/input2.txt");
let user_file3 = create_test_file(config, workflow_id, "input3", "/data/input3.txt");
let output_file = create_test_file(config, workflow_id, "output", "/data/output.txt");
let mut job = models::JobModel::new(
workflow_id,
"test_job".to_string(),
"cat /data/input1.txt /data/input2.txt > /data/output.txt".to_string(),
);
job.input_file_ids = Some(vec![
user_file1.id.unwrap(),
user_file2.id.unwrap(),
user_file3.id.unwrap(),
]);
job.output_file_ids = Some(vec![output_file.id.unwrap()]);
let _created_job = apis::jobs_api::create_job(config, job).expect("Failed to create job");
apis::workflows_api::initialize_jobs(config, workflow_id, None, None)
.expect("Failed to initialize jobs");
let response = apis::workflows_api::list_required_existing_files(config, workflow_id)
.expect("Failed to list required existing files");
let missing_file_ids = response.files;
assert!(missing_file_ids.contains(&user_file1.id.unwrap()));
assert!(missing_file_ids.contains(&user_file2.id.unwrap()));
assert!(missing_file_ids.contains(&user_file3.id.unwrap()));
assert!(!missing_file_ids.contains(&output_file.id.unwrap()));
assert_eq!(missing_file_ids.len(), 3);
}
#[rstest]
fn test_list_required_existing_files_missing_job_outputs(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_missing_job_outputs");
let workflow_id = workflow.id.unwrap();
let input_file = create_test_file(config, workflow_id, "input", "/data/input.txt");
let output_file1 = create_test_file(config, workflow_id, "output1", "/data/output1.txt");
let output_file2 = create_test_file(config, workflow_id, "output2", "/data/output2.txt");
let output_file3 = create_test_file(config, workflow_id, "output3", "/data/output3.txt");
let mut job1 = models::JobModel::new(
workflow_id,
"producer_job1".to_string(),
"echo 'data1' > /data/output1.txt && echo 'data2' > /data/output2.txt".to_string(),
);
job1.input_file_ids = Some(vec![input_file.id.unwrap()]);
job1.output_file_ids = Some(vec![output_file1.id.unwrap(), output_file2.id.unwrap()]);
let created_job1 = apis::jobs_api::create_job(config, job1).expect("Failed to create job1");
let job1_id = created_job1.id.unwrap();
let mut job2 = models::JobModel::new(
workflow_id,
"producer_job2".to_string(),
"echo 'data3' > /data/output3.txt".to_string(),
);
job2.input_file_ids = Some(vec![input_file.id.unwrap()]);
job2.output_file_ids = Some(vec![output_file3.id.unwrap()]);
let created_job2 = apis::jobs_api::create_job(config, job2).expect("Failed to create job2");
let job2_id = created_job2.id.unwrap();
let torc_config = TorcConfig::default();
let manager = WorkflowManager::new(config.clone(), torc_config, workflow);
manager.initialize(true).expect("Failed to start workflow");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let job1_result = models::ResultModel::new(
job1_id,
workflow_id,
1, 1, compute_node_id,
0, 1.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
apis::jobs_api::complete_job(
config,
job1_id,
job1_result.status,
1, job1_result,
)
.expect("Failed to complete job1");
let job2_result = models::ResultModel::new(
job2_id,
workflow_id,
1, 1, compute_node_id,
0, 1.0, chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
apis::jobs_api::complete_job(
config,
job2_id,
job2_result.status,
1, job2_result,
)
.expect("Failed to complete job2");
let job1_status = apis::jobs_api::get_job(config, job1_id).expect("Failed to get job1");
let job2_status = apis::jobs_api::get_job(config, job2_id).expect("Failed to get job2");
assert_eq!(job1_status.status.unwrap(), JobStatus::Completed);
assert_eq!(job2_status.status.unwrap(), JobStatus::Completed);
let files_response = apis::files_api::list_files(
config,
workflow_id,
None, None, None, None, None, None, None, None, )
.expect("Failed to list files");
let _existing_file_ids: Vec<i64> = files_response.items.iter().filter_map(|f| f.id).collect();
let response = apis::workflows_api::list_required_existing_files(config, workflow_id)
.expect("Failed to list required existing files");
let missing_file_ids = response.files;
assert!(missing_file_ids.contains(&output_file1.id.unwrap()));
assert!(missing_file_ids.contains(&output_file2.id.unwrap()));
assert!(missing_file_ids.contains(&output_file3.id.unwrap()));
let expected_missing: Vec<i64> = vec![
output_file1.id.unwrap(),
output_file2.id.unwrap(),
output_file3.id.unwrap(),
];
for expected_id in expected_missing {
assert!(
missing_file_ids.contains(&expected_id),
"Expected file ID {} to be in missing files list, but it wasn't. Missing IDs: {:?}",
expected_id,
missing_file_ids
);
}
}
#[rstest]
fn test_list_required_existing_files_combined_scenario(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_combined_missing_files");
let workflow_id = workflow.id.unwrap();
let user_input1 = create_test_file(config, workflow_id, "user_input1", "/data/user_input1.txt");
let user_input2 = create_test_file(config, workflow_id, "user_input2", "/data/user_input2.txt");
let intermediate1 = create_test_file(
config,
workflow_id,
"intermediate1",
"/data/intermediate1.txt",
);
let intermediate2 = create_test_file(
config,
workflow_id,
"intermediate2",
"/data/intermediate2.txt",
);
let final_output = create_test_file(
config,
workflow_id,
"final_output",
"/data/final_output.txt",
);
let mut job1 = models::JobModel::new(
workflow_id,
"stage1_job".to_string(),
"process inputs".to_string(),
);
job1.input_file_ids = Some(vec![user_input1.id.unwrap(), user_input2.id.unwrap()]);
job1.output_file_ids = Some(vec![intermediate1.id.unwrap(), intermediate2.id.unwrap()]);
let created_job1 = apis::jobs_api::create_job(config, job1).expect("Failed to create job1");
let job1_id = created_job1.id.unwrap();
let mut job2 = models::JobModel::new(
workflow_id,
"stage2_job".to_string(),
"process intermediate".to_string(),
);
job2.input_file_ids = Some(vec![intermediate1.id.unwrap(), intermediate2.id.unwrap()]);
job2.output_file_ids = Some(vec![final_output.id.unwrap()]);
let created_job2 = apis::jobs_api::create_job(config, job2).expect("Failed to create job2");
let job2_id = created_job2.id.unwrap();
let torc_config = TorcConfig::default();
let manager = WorkflowManager::new(config.clone(), torc_config, workflow);
manager.initialize(true).expect("Failed to start workflow");
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let job1_result = models::ResultModel::new(
job1_id,
workflow_id,
1,
1, compute_node_id,
0,
1.0,
chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
apis::jobs_api::complete_job(config, job1_id, job1_result.status, 1, job1_result)
.expect("Failed to complete job1");
let job2_result = models::ResultModel::new(
job2_id,
workflow_id,
1,
1, compute_node_id,
0,
1.0,
chrono::Utc::now().to_rfc3339(),
JobStatus::Completed,
);
apis::jobs_api::complete_job(config, job2_id, job2_result.status, 1, job2_result)
.expect("Failed to complete job2");
let response = apis::workflows_api::list_required_existing_files(config, workflow_id)
.expect("Failed to list required existing files");
let missing_file_ids = response.files;
assert!(missing_file_ids.contains(&user_input1.id.unwrap()));
assert!(missing_file_ids.contains(&user_input2.id.unwrap()));
assert!(missing_file_ids.contains(&intermediate1.id.unwrap()));
assert!(missing_file_ids.contains(&intermediate2.id.unwrap()));
assert!(missing_file_ids.contains(&final_output.id.unwrap()));
assert_eq!(missing_file_ids.len(), 5);
}