mod common;
use common::{
ServerProcess, create_test_compute_node, create_test_user_data, create_test_workflow,
run_cli_with_json, start_server,
};
use rstest::rstest;
use serde_json::json;
use torc::client::apis;
use torc::models;
#[rstest]
fn test_user_data_add_command_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_user_data_add_workflow");
let workflow_id = workflow.id.unwrap();
let test_data = r#"{"key": "value", "number": 42}"#;
let args = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"test_data",
"--data",
test_data,
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data create command");
assert!(json_output.get("id").is_some());
assert_eq!(json_output.get("workflow_id").unwrap(), &json!(workflow_id));
assert_eq!(json_output.get("name").unwrap(), &json!("test_data"));
assert_eq!(json_output.get("is_ephemeral").unwrap(), &json!(false));
let expected_data: serde_json::Value = serde_json::from_str(test_data).unwrap();
assert_eq!(json_output.get("data").unwrap(), &expected_data);
}
#[rstest]
fn test_user_data_add_ephemeral(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_ephemeral_workflow");
let workflow_id = workflow.id.unwrap();
let args = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"ephemeral_data",
"--data",
r#"{"temp": true}"#,
"--ephemeral",
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data create with ephemeral flag");
assert_eq!(json_output.get("is_ephemeral").unwrap(), &json!(true));
assert_eq!(json_output.get("name").unwrap(), &json!("ephemeral_data"));
}
#[rstest]
fn test_user_data_add_with_job_associations(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_job_associations_workflow");
let workflow_id = workflow.id.unwrap();
let producer_job = models::JobModel::new(
workflow_id,
"producer_job".to_string(),
"echo 'producing data'".to_string(),
);
let producer_job =
apis::jobs_api::create_job(config, producer_job).expect("Failed to create producer job");
let producer_job_id = producer_job.id.unwrap();
let consumer_job = models::JobModel::new(
workflow_id,
"consumer_job".to_string(),
"echo 'consuming data'".to_string(),
);
let consumer_job =
apis::jobs_api::create_job(config, consumer_job).expect("Failed to create consumer job");
let consumer_job_id = consumer_job.id.unwrap();
let args = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"job_data",
"--data",
r#"{"processed_by": "jobs"}"#,
"--producer-job-id",
&producer_job_id.to_string(),
"--consumer-job-id",
&consumer_job_id.to_string(),
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data create with job associations");
assert_eq!(json_output.get("name").unwrap(), &json!("job_data"));
assert_eq!(json_output.get("workflow_id").unwrap(), &json!(workflow_id));
let user_data_id = json_output.get("id").unwrap().as_i64().unwrap();
let producer_list_args = [
"user-data",
"list",
&workflow_id.to_string(),
"--producer-job-id",
&producer_job_id.to_string(),
];
let producer_list_output = run_cli_with_json(&producer_list_args, start_server, None)
.expect("Failed to list user-data by producer job ID");
let producer_user_data_array = producer_list_output
.get("user_data")
.unwrap()
.as_array()
.unwrap();
assert!(
!producer_user_data_array.is_empty(),
"Should find at least one user data associated with producer job"
);
let found_in_producer_list = producer_user_data_array
.iter()
.any(|item| item.get("id").unwrap().as_i64().unwrap() == user_data_id);
assert!(
found_in_producer_list,
"Created user data should be found when listing by producer job ID"
);
let consumer_list_args = [
"user-data",
"list",
&workflow_id.to_string(),
"--consumer-job-id",
&consumer_job_id.to_string(),
];
let consumer_list_output = run_cli_with_json(&consumer_list_args, start_server, None)
.expect("Failed to list user-data by consumer job ID");
let consumer_user_data_array = consumer_list_output
.get("user_data")
.unwrap()
.as_array()
.unwrap();
assert!(
!consumer_user_data_array.is_empty(),
"Should find at least one user data associated with consumer job"
);
let found_in_consumer_list = consumer_user_data_array
.iter()
.any(|item| item.get("id").unwrap().as_i64().unwrap() == user_data_id);
assert!(
found_in_consumer_list,
"Created user data should be found when listing by consumer job ID"
);
for user_data_array in [&producer_user_data_array, &consumer_user_data_array] {
for user_data in user_data_array.iter() {
if user_data.get("id").unwrap().as_i64().unwrap() == user_data_id {
assert_eq!(user_data.get("name").unwrap(), &json!("job_data"));
assert_eq!(user_data.get("workflow_id").unwrap(), &json!(workflow_id));
let expected_data = json!({"processed_by": "jobs"});
assert_eq!(user_data.get("data").unwrap(), &expected_data);
}
}
}
}
#[rstest]
fn test_user_data_list_command_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_user_data_list_workflow");
let workflow_id = workflow.id.unwrap();
let _user_data1 =
create_test_user_data(config, workflow_id, "data1", json!({"value": 1}), false);
let _user_data2 =
create_test_user_data(config, workflow_id, "data2", json!({"value": 2}), true);
let args = [
"user-data",
"list",
&workflow_id.to_string(),
"--limit",
"10",
];
let json_output =
run_cli_with_json(&args, start_server, None).expect("Failed to run user-data list command");
assert!(
json_output.is_object(),
"User data list should return an object"
);
assert!(
json_output.get("user_data").is_some(),
"Response should have 'user_data' field"
);
let user_data_array = json_output.get("user_data").unwrap().as_array().unwrap();
assert!(
user_data_array.len() >= 2,
"Should have at least 2 user data records"
);
for user_data in user_data_array {
assert!(user_data.get("id").is_some());
assert!(user_data.get("workflow_id").is_some());
assert!(user_data.get("name").is_some());
assert!(user_data.get("is_ephemeral").is_some());
assert!(user_data.get("data").is_some());
}
}
#[rstest]
fn test_user_data_list_with_filters(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_filter_workflow");
let workflow_id = workflow.id.unwrap();
let _ephemeral_data = create_test_user_data(
config,
workflow_id,
"ephemeral_item",
json!({"temporary": true}),
true,
);
let _persistent_data = create_test_user_data(
config,
workflow_id,
"persistent_item",
json!({"permanent": true}),
false,
);
let args = [
"user-data",
"list",
&workflow_id.to_string(),
"--is-ephemeral",
"true",
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data list with ephemeral filter");
let user_data_array = json_output.get("user_data").unwrap().as_array().unwrap();
assert!(!user_data_array.is_empty());
for user_data in user_data_array {
assert_eq!(user_data.get("is_ephemeral").unwrap(), &json!(true));
}
let args_name_filter = [
"user-data",
"list",
&workflow_id.to_string(),
"--name",
"persistent_item",
];
let json_output_name = run_cli_with_json(&args_name_filter, start_server, None)
.expect("Failed to run user-data list with name filter");
let filtered_array = json_output_name
.get("user_data")
.unwrap()
.as_array()
.unwrap();
assert!(!filtered_array.is_empty());
for user_data in filtered_array {
assert_eq!(user_data.get("name").unwrap(), &json!("persistent_item"));
}
}
#[rstest]
fn test_user_data_list_pagination(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_pagination_workflow");
let workflow_id = workflow.id.unwrap();
for i in 0..5 {
let _user_data = create_test_user_data(
config,
workflow_id,
&format!("pagination_data_{}", i),
json!({"index": i}),
false,
);
}
let args = [
"user-data",
"list",
&workflow_id.to_string(),
"--limit",
"3",
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run paginated user-data list");
let user_data_array = json_output.get("user_data").unwrap().as_array().unwrap();
assert!(user_data_array.len() <= 3, "Should respect limit parameter");
assert!(
!user_data_array.is_empty(),
"Should have at least one record"
);
let args_with_offset = [
"user-data",
"list",
&workflow_id.to_string(),
"--limit",
"2",
"--offset",
"2",
];
let json_output_offset = run_cli_with_json(&args_with_offset, start_server, None)
.expect("Failed to run user-data list with offset");
let user_data_with_offset = json_output_offset
.get("user_data")
.unwrap()
.as_array()
.unwrap();
assert!(
!user_data_with_offset.is_empty(),
"Should have user data with offset"
);
}
#[rstest]
fn test_user_data_list_sorting(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_sorting_workflow");
let workflow_id = workflow.id.unwrap();
let _data_a = create_test_user_data(
config,
workflow_id,
"aaa_data",
json!({"order": "first"}),
false,
);
let _data_b = create_test_user_data(
config,
workflow_id,
"bbb_data",
json!({"order": "second"}),
false,
);
let _data_c = create_test_user_data(
config,
workflow_id,
"ccc_data",
json!({"order": "third"}),
false,
);
let args = [
"user-data",
"list",
&workflow_id.to_string(),
"--sort-by",
"name",
];
let json_output =
run_cli_with_json(&args, start_server, None).expect("Failed to run sorted user-data list");
let user_data_array = json_output.get("user_data").unwrap().as_array().unwrap();
assert!(user_data_array.len() >= 3);
let args_reverse = [
"user-data",
"list",
&workflow_id.to_string(),
"--sort-by",
"name",
"--reverse-sort",
];
let json_output_reverse = run_cli_with_json(&args_reverse, start_server, None)
.expect("Failed to run reverse sorted user-data list");
let user_data_array_reverse = json_output_reverse
.get("user_data")
.unwrap()
.as_array()
.unwrap();
assert!(user_data_array_reverse.len() >= 3);
}
#[rstest]
fn test_user_data_get_command_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_user_data_get_workflow");
let workflow_id = workflow.id.unwrap();
let user_data = create_test_user_data(
config,
workflow_id,
"test_get_data",
json!({"test": "value", "nested": {"key": 123}}),
false,
);
let user_data_id = user_data.id.unwrap();
let args = ["user-data", "get", &user_data_id.to_string()];
let json_output =
run_cli_with_json(&args, start_server, None).expect("Failed to run user-data get command");
assert_eq!(json_output.get("id").unwrap(), &json!(user_data_id));
assert_eq!(json_output.get("workflow_id").unwrap(), &json!(workflow_id));
assert_eq!(json_output.get("name").unwrap(), &json!("test_get_data"));
assert_eq!(json_output.get("is_ephemeral").unwrap(), &json!(false));
let expected_data = json!({"test": "value", "nested": {"key": 123}});
assert_eq!(json_output.get("data").unwrap(), &expected_data);
}
#[rstest]
fn test_user_data_update_command_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_user_data_update_workflow");
let workflow_id = workflow.id.unwrap();
let user_data = create_test_user_data(
config,
workflow_id,
"test_update_data",
json!({"original": "data"}),
false,
);
let user_data_id = user_data.id.unwrap();
let new_data = r#"{"updated": "value", "count": 99}"#;
let args = [
"user-data",
"update",
&user_data_id.to_string(),
"--name",
"updated_data_name",
"--data",
new_data,
"--ephemeral",
"true",
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data update command");
assert_eq!(json_output.get("id").unwrap(), &json!(user_data_id));
assert_eq!(
json_output.get("name").unwrap(),
&json!("updated_data_name")
);
assert_eq!(json_output.get("is_ephemeral").unwrap(), &json!(true));
let expected_data: serde_json::Value = serde_json::from_str(new_data).unwrap();
assert_eq!(json_output.get("data").unwrap(), &expected_data);
assert_eq!(json_output.get("workflow_id").unwrap(), &json!(workflow_id));
}
#[rstest]
fn test_user_data_update_partial_fields(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_partial_update_workflow");
let workflow_id = workflow.id.unwrap();
let user_data = create_test_user_data(
config,
workflow_id,
"partial_update_data",
json!({"keep": "this"}),
true,
);
let user_data_id = user_data.id.unwrap();
let args = [
"user-data",
"update",
&user_data_id.to_string(),
"--name",
"only_name_updated",
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run partial user-data update");
assert_eq!(
json_output.get("name").unwrap(),
&json!("only_name_updated")
);
assert_eq!(json_output.get("data").unwrap(), &json!({"keep": "this"}));
assert_eq!(json_output.get("is_ephemeral").unwrap(), &json!(true));
}
#[rstest]
fn test_user_data_remove_command_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_user_data_remove_workflow");
let workflow_id = workflow.id.unwrap();
let user_data = create_test_user_data(
config,
workflow_id,
"test_remove_data",
json!({"will_be": "removed"}),
false,
);
let user_data_id = user_data.id.unwrap();
let args = ["user-data", "delete", &user_data_id.to_string()];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data delete command");
assert_eq!(json_output.get("id").unwrap(), &json!(user_data_id));
assert_eq!(json_output.get("workflow_id").unwrap(), &json!(workflow_id));
assert_eq!(json_output.get("name").unwrap(), &json!("test_remove_data"));
let get_result = apis::user_data_api::get_user_data(config, user_data_id);
assert!(get_result.is_err(), "User data should be deleted");
}
#[rstest]
fn test_user_data_delete_workflow_command_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_delete_workflow_data");
let workflow_id = workflow.id.unwrap();
let _data1 = create_test_user_data(config, workflow_id, "data1", json!({"value": 1}), false);
let _data2 = create_test_user_data(config, workflow_id, "data2", json!({"value": 2}), true);
let args = ["user-data", "delete-all", &workflow_id.to_string()];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data delete command");
assert!(json_output.get("message").is_some());
assert_eq!(json_output.get("deleted_count").unwrap(), &json!(2));
let response = apis::user_data_api::list_user_data(
config,
workflow_id,
None,
None,
None,
Some(10),
None,
None,
None,
None,
)
.expect("Failed to list user data after delete-all");
assert!(response.items.is_empty(), "All user data should be deleted");
}
#[rstest]
fn test_user_data_list_missing_command_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_missing_data_workflow");
let workflow_id = workflow.id.unwrap();
let consumer_job = models::JobModel::new(
workflow_id,
"consumer_job".to_string(),
"echo 'consuming data'".to_string(),
);
let consumer_job =
apis::jobs_api::create_job(config, consumer_job).expect("Failed to create consumer job");
let consumer_job_id = consumer_job.id.unwrap();
let user_created_data = create_test_user_data(
config,
workflow_id,
"expected_input_data",
json!({"input": "data"}),
false,
);
let user_data_id_1 = user_created_data.id.unwrap();
let args_consumer_association = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"input_for_consumer",
"--data",
r#"{"consumed_by": "consumer_job"}"#,
"--consumer-job-id",
&consumer_job_id.to_string(),
];
let consumer_associated_data =
run_cli_with_json(&args_consumer_association, start_server, None)
.expect("Failed to create consumer association");
let consumer_data_id = consumer_associated_data
.get("id")
.unwrap()
.as_i64()
.unwrap();
apis::user_data_api::delete_user_data(config, user_data_id_1)
.expect("Failed to delete user data to simulate missing input");
let producer_job = models::JobModel::new(
workflow_id,
"producer_job".to_string(),
"echo 'producing output'".to_string(),
);
let producer_job =
apis::jobs_api::create_job(config, producer_job).expect("Failed to create producer job");
let producer_job_id = producer_job.id.unwrap();
let job_produced_data = create_test_user_data(
config,
workflow_id,
"job_output_data",
json!({"output": "result"}),
false,
);
let user_data_id_2 = job_produced_data.id.unwrap();
let args_producer_association = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"output_from_producer",
"--data",
r#"{"produced_by": "producer_job"}"#,
"--producer-job-id",
&producer_job_id.to_string(),
];
let producer_associated_data =
run_cli_with_json(&args_producer_association, start_server, None)
.expect("Failed to create producer association");
let producer_data_id = producer_associated_data
.get("id")
.unwrap()
.as_i64()
.unwrap();
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let job_result = models::ResultModel::new(
producer_job_id,
workflow_id,
1, 1, compute_node_id,
0, 0.1, chrono::Utc::now().to_rfc3339(),
models::JobStatus::Completed,
);
let _ = apis::results_api::create_result(config, job_result);
apis::user_data_api::delete_user_data(config, user_data_id_2)
.expect("Failed to delete produced data to simulate missing output");
let args = ["user-data", "list-missing", &workflow_id.to_string()];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data list-missing command");
assert!(json_output.get("user_data").is_some());
let user_data_array = json_output.get("user_data").unwrap();
assert!(user_data_array.is_array());
let missing_ids: Vec<i64> = user_data_array
.as_array()
.unwrap()
.iter()
.map(|v| v.as_i64().unwrap())
.collect();
for item in user_data_array.as_array().unwrap() {
assert!(item.is_i64() || item.is_u64());
}
println!("Missing user data IDs: {:?}", missing_ids);
println!(
"Successfully retrieved missing data response with {} IDs",
missing_ids.len()
);
let _ = apis::user_data_api::delete_user_data(config, consumer_data_id);
let _ = apis::user_data_api::delete_user_data(config, producer_data_id);
}
#[rstest]
fn test_user_data_complex_json_data(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_complex_json_workflow");
let workflow_id = workflow.id.unwrap();
let complex_data = r#"{
"metadata": {
"version": "1.0.0",
"created_by": "test_user"
},
"data": {
"items": [
{"id": 1, "name": "item1", "active": true},
{"id": 2, "name": "item2", "active": false}
],
"config": {
"timeout": 30,
"retry_count": 3,
"flags": ["debug", "verbose"]
}
},
"stats": {
"count": 2,
"size_mb": 1.5
}
}"#;
let args = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"complex_data",
"--data",
complex_data,
];
let json_output = run_cli_with_json(&args, start_server, None)
.expect("Failed to run user-data create with complex JSON");
assert_eq!(json_output.get("name").unwrap(), &json!("complex_data"));
let expected_data: serde_json::Value = serde_json::from_str(complex_data).unwrap();
assert_eq!(json_output.get("data").unwrap(), &expected_data);
let user_data_id = json_output.get("id").unwrap().as_i64().unwrap();
let get_args = ["user-data", "get", &user_data_id.to_string()];
let get_output =
run_cli_with_json(&get_args, start_server, None).expect("Failed to get complex user data");
assert_eq!(get_output.get("data").unwrap(), &expected_data);
}
#[rstest]
fn test_user_data_error_handling(start_server: &ServerProcess) {
let args = ["user-data", "get", "999999"];
let result = run_cli_with_json(&args, start_server, None);
assert!(
result.is_err(),
"Should fail when getting non-existent user data"
);
let args = ["user-data", "update", "999999", "--name", "should_fail"];
let result = run_cli_with_json(&args, start_server, None);
assert!(
result.is_err(),
"Should fail when updating non-existent user data"
);
let args = ["user-data", "delete", "999999"];
let result = run_cli_with_json(&args, start_server, None);
assert!(
result.is_err(),
"Should fail when removing non-existent user data"
);
}
#[rstest]
fn test_user_data_invalid_json(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_invalid_json_workflow");
let workflow_id = workflow.id.unwrap();
let invalid_json = r#"{"key": "value", "incomplete": }"#;
let args = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"invalid_data",
"--data",
invalid_json,
];
let result = run_cli_with_json(&args, start_server, None);
assert!(result.is_err(), "Should fail with invalid JSON data");
}
#[rstest]
fn test_user_data_empty_null_data(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_empty_data_workflow");
let workflow_id = workflow.id.unwrap();
let args_empty = [
"user-data",
"create",
&workflow_id.to_string(),
"--name",
"empty_data",
"--data",
"{}",
];
let json_output_empty = run_cli_with_json(&args_empty, start_server, None)
.expect("Failed to run user-data create with empty object");
assert_eq!(json_output_empty.get("name").unwrap(), &json!("empty_data"));
assert_eq!(json_output_empty.get("data").unwrap(), &json!({}));
}
#[rstest]
fn test_api_list_missing_user_data(start_server: &ServerProcess) {
let config = &start_server.config;
let workflow = create_test_workflow(config, "test_missing_user_data_api");
let workflow_id = workflow.id.unwrap();
let mut user_data1 = models::UserDataModel::new(workflow_id, "input_data_1".to_string());
user_data1.data = Some(json!({"type": "input"}));
let user_data1 = apis::user_data_api::create_user_data(config, user_data1, None, None)
.expect("Failed to create user_data1");
let user_data1_id = user_data1.id.unwrap();
let missing_input_user_data =
models::UserDataModel::new(workflow_id, "missing_input".to_string());
let missing_input_user_data =
apis::user_data_api::create_user_data(config, missing_input_user_data, None, None)
.expect("Failed to create missing input placeholder");
let missing_input_id = missing_input_user_data.id.unwrap();
let mut job1 = models::JobModel::new(
workflow_id,
"consumer_job".to_string(),
"echo 'consuming'".to_string(),
);
job1.input_user_data_ids = Some(vec![user_data1_id, missing_input_id]);
let job1 = apis::jobs_api::create_job(config, job1).expect("Failed to create consumer job");
let _job1_id = job1.id.unwrap();
apis::workflows_api::initialize_jobs(config, workflow_id, None, None)
.expect("Failed to initialize jobs");
let response = apis::workflows_api::list_missing_user_data(config, workflow_id)
.expect("Failed to call list_missing_user_data");
assert_eq!(
response.user_data.len(),
1,
"Should report 1 missing user data (missing input), found: {:?}",
response.user_data
);
assert!(
response.user_data.contains(&missing_input_id),
"Missing list should contain missing_input_id {}, found: {:?}",
missing_input_id,
response.user_data
);
let missing_output_user_data =
models::UserDataModel::new(workflow_id, "missing_output".to_string());
let missing_output_user_data =
apis::user_data_api::create_user_data(config, missing_output_user_data, None, None)
.expect("Failed to create missing output placeholder");
let missing_output_id = missing_output_user_data.id.unwrap();
let mut job2 = models::JobModel::new(
workflow_id,
"producer_job".to_string(),
"echo 'producing'".to_string(),
);
job2.output_user_data_ids = Some(vec![missing_output_id]);
let job2 = apis::jobs_api::create_job(config, job2).expect("Failed to create producer job");
let job2_id = job2.id.unwrap();
let response = apis::workflows_api::list_missing_user_data(config, workflow_id)
.expect("Failed to call list_missing_user_data after creating producer");
assert_eq!(
response.user_data.len(),
1,
"Should still report 1 missing (the input, producer job not done yet), found: {:?}",
response.user_data
);
let compute_node = create_test_compute_node(config, workflow_id);
let compute_node_id = compute_node.id.unwrap();
let run_id = 0;
apis::jobs_api::manage_status_change(config, job2_id, models::JobStatus::Running, run_id)
.expect("Failed to set job2 to running");
let result = models::ResultModel::new(
job2_id,
workflow_id,
run_id,
1, compute_node_id,
0,
0.5,
chrono::Utc::now().to_rfc3339(),
models::JobStatus::Completed,
);
apis::jobs_api::complete_job(
config,
job2_id,
models::JobStatus::Completed,
run_id,
result,
)
.expect("Failed to complete job2");
let response = apis::workflows_api::list_missing_user_data(config, workflow_id).expect(
"Failed to call list_missing_user_data after job completion without creating output",
);
assert_eq!(
response.user_data.len(),
2,
"Should report 2 missing user data (missing input + missing output from completed job), found: {:?}",
response.user_data
);
assert!(
response.user_data.contains(&missing_input_id),
"Missing list should contain missing_input_id {}",
missing_input_id
);
assert!(
response.user_data.contains(&missing_output_id),
"Missing list should contain missing_output_id {} (output from completed job that was never created)",
missing_output_id
);
let missing_output_user_data_2 =
models::UserDataModel::new(workflow_id, "missing_output_2".to_string());
let missing_output_user_data_2 =
apis::user_data_api::create_user_data(config, missing_output_user_data_2, None, None)
.expect("Failed to create missing output placeholder 2");
let missing_output_id_2 = missing_output_user_data_2.id.unwrap();
let missing_output_user_data_3 =
models::UserDataModel::new(workflow_id, "missing_output_3".to_string());
let missing_output_user_data_3 =
apis::user_data_api::create_user_data(config, missing_output_user_data_3, None, None)
.expect("Failed to create missing output placeholder 3");
let missing_output_id_3 = missing_output_user_data_3.id.unwrap();
let mut job3 = models::JobModel::new(
workflow_id,
"producer_job_2".to_string(),
"echo 'producing multiple outputs'".to_string(),
);
job3.output_user_data_ids = Some(vec![missing_output_id_2, missing_output_id_3]);
let job3 = apis::jobs_api::create_job(config, job3).expect("Failed to create producer job 2");
let job3_id = job3.id.unwrap();
apis::jobs_api::manage_status_change(config, job3_id, models::JobStatus::Running, run_id)
.expect("Failed to set job3 to running");
let result3 = models::ResultModel::new(
job3_id,
workflow_id,
run_id,
1, compute_node_id,
0,
0.3,
chrono::Utc::now().to_rfc3339(),
models::JobStatus::Completed,
);
apis::jobs_api::complete_job(
config,
job3_id,
models::JobStatus::Completed,
run_id,
result3,
)
.expect("Failed to complete job3");
let response = apis::workflows_api::list_missing_user_data(config, workflow_id)
.expect("Failed to call list_missing_user_data after second completed job without output");
assert_eq!(
response.user_data.len(),
4,
"Should report 4 missing user data (1 input + 3 outputs from completed jobs), found: {:?}",
response.user_data
);
assert!(
response.user_data.contains(&missing_output_id_2),
"Missing list should contain missing_output_id_2 {} (never created by completed job)",
missing_output_id_2
);
assert!(
response.user_data.contains(&missing_output_id_3),
"Missing list should contain missing_output_id_3 {} (never created by completed job)",
missing_output_id_3
);
let missing_input_user_data_2 =
models::UserDataModel::new(workflow_id, "missing_input_2".to_string());
let missing_input_user_data_2 =
apis::user_data_api::create_user_data(config, missing_input_user_data_2, None, None)
.expect("Failed to create missing input placeholder 2");
let missing_input_id_2 = missing_input_user_data_2.id.unwrap();
let mut job4 = models::JobModel::new(
workflow_id,
"consumer_job_2".to_string(),
"echo 'consuming multiple inputs'".to_string(),
);
job4.input_user_data_ids = Some(vec![user_data1_id, missing_input_id_2]);
let _job4 = apis::jobs_api::create_job(config, job4).expect("Failed to create consumer job 2");
let response = apis::workflows_api::list_missing_user_data(config, workflow_id)
.expect("Failed to call list_missing_user_data after second consumer job");
assert_eq!(
response.user_data.len(),
5,
"Should report 5 missing user data (2 inputs + 3 outputs), found: {:?}",
response.user_data
);
assert!(
response.user_data.contains(&missing_input_id_2),
"Missing list should contain missing_input_id_2 {} (referenced but never created)",
missing_input_id_2
);
}