#![allow(unused_imports)]
use super::*;
pub(crate) fn collect_history_ids(payload: &Value) -> Vec<String> {
let mut ids = Vec::new();
if let Some(id) = payload
.get("history")
.and_then(|history| history.get("job_id"))
.and_then(Value::as_str)
&& !id.is_empty()
{
ids.push(id.to_string());
}
if let Some(job_ids) = payload
.get("history")
.and_then(|history| history.get("job_ids"))
.and_then(Value::as_array)
{
for id in job_ids.iter().filter_map(Value::as_str) {
if !id.is_empty() && !ids.iter().any(|existing| existing == id) {
ids.push(id.to_string());
}
}
}
ids
}
pub(crate) fn output_files_from_payload(payload: &Value) -> Vec<Value> {
let output = payload.get("output").cloned().unwrap_or_else(|| json!({}));
let mut files = output
.get("files")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
if files.is_empty()
&& let Some(path) = output.get("path").and_then(Value::as_str)
{
files.push(json!({
"index": 0,
"path": path,
"bytes": output.get("bytes").and_then(Value::as_u64).unwrap_or(0),
}));
}
files
}
pub(crate) fn normalize_batch_output(files: Vec<Value>) -> Value {
let indexed_files = files
.into_iter()
.enumerate()
.map(|(index, mut file)| {
if let Value::Object(object) = &mut file {
object
.entry("index".to_string())
.or_insert_with(|| json!(index));
}
file
})
.collect::<Vec<_>>();
let total_bytes = indexed_files
.iter()
.filter_map(|file| file.get("bytes").and_then(Value::as_u64))
.sum::<u64>();
let primary_path = indexed_files
.iter()
.find(|file| file.get("index").and_then(Value::as_u64) == Some(0))
.and_then(|file| file.get("path"))
.cloned()
.unwrap_or(Value::Null);
json!({
"path": primary_path,
"bytes": total_bytes,
"files": indexed_files,
})
}
pub(crate) fn batch_errors_json(errors: &[BatchItemError]) -> Value {
Value::Array(
errors
.iter()
.map(|error| {
json!({
"index": error.index,
"message": error.message,
})
})
.collect(),
)
}
pub(crate) fn batch_error_summary(errors: &[BatchItemError]) -> Option<String> {
if errors.is_empty() {
return None;
}
let first = errors
.first()
.map(|error| error.message.as_str())
.unwrap_or("Unknown batch error.");
if errors.len() == 1 {
Some(first.to_string())
} else {
Some(format!("{} 个子任务失败:{first}", errors.len()))
}
}
pub(crate) fn merge_batch_payloads(
command: &str,
request_count: usize,
payloads: Vec<(usize, Value)>,
errors: Vec<BatchItemError>,
) -> Value {
let first = payloads
.first()
.map(|(_, payload)| payload.clone())
.unwrap_or_else(|| json!({}));
let files = payloads
.iter()
.flat_map(|(batch_index, payload)| {
output_files_from_payload(payload)
.into_iter()
.map(move |mut file| {
if let Value::Object(object) = &mut file {
object.insert("index".to_string(), json!(batch_index));
}
file
})
})
.collect::<Vec<_>>();
let mut history_job_ids = Vec::new();
let mut revised_prompts = Vec::new();
for (_, payload) in &payloads {
history_job_ids.extend(collect_history_ids(payload));
if let Some(prompts) = payload
.get("response")
.and_then(|response| response.get("revised_prompts"))
.and_then(Value::as_array)
{
revised_prompts.extend(prompts.iter().cloned());
}
}
history_job_ids.sort();
history_job_ids.dedup();
let primary_history_job_id = history_job_ids.first().cloned();
let output = normalize_batch_output(files);
let image_count = output
.get("files")
.and_then(Value::as_array)
.map(Vec::len)
.unwrap_or(0);
let mut response = first.get("response").cloned().unwrap_or_else(|| json!({}));
if let Value::Object(response) = &mut response {
response.insert("image_count".to_string(), json!(image_count));
response.insert("batch_count".to_string(), json!(request_count));
response.insert("batch_request_count".to_string(), json!(request_count));
response.insert("revised_prompts".to_string(), json!(revised_prompts));
}
let error_summary = batch_error_summary(&errors);
let ok = image_count > 0;
let status = if ok && errors.is_empty() {
"completed"
} else if ok {
"partial_failed"
} else {
"failed"
};
let mut payload = json!({
"ok": ok,
"status": status,
"command": command,
"provider": first.get("provider").cloned().unwrap_or(Value::Null),
"provider_selection": first.get("provider_selection").cloned().unwrap_or(Value::Null),
"auth": first.get("auth").cloned().unwrap_or(Value::Null),
"request": first.get("request").cloned().unwrap_or(Value::Null),
"response": response,
"output": output,
"history": {
"job_id": primary_history_job_id,
"job_ids": history_job_ids,
},
"batch": {
"mode": "parallel-single-output",
"request_count": request_count,
"success_count": image_count,
"failure_count": errors.len(),
"errors": batch_errors_json(&errors),
},
"events": {
"count": request_count,
}
});
if !errors.is_empty()
&& let Value::Object(object) = &mut payload
{
object.insert(
"error".to_string(),
json!({
"code": if ok { "batch_partial_failed" } else { "batch_failed" },
"message": error_summary.unwrap_or_else(|| "Batch request failed.".to_string()),
"items": batch_errors_json(&errors),
}),
);
}
payload
}
pub(crate) fn cleanup_child_history(payload: &Value, app_job_id: &str) {
for id in collect_history_ids(payload) {
if id != app_job_id {
let _ = delete_history_job(&id);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn payload(path: &str) -> Value {
json!({
"ok": true,
"provider": "mock",
"output": {
"path": path,
"bytes": 10,
},
"history": {
"job_id": format!("child-{path}"),
},
"response": {
"revised_prompts": [],
},
})
}
#[test]
fn merge_batch_payloads_keeps_successful_outputs_with_failed_items() {
let merged = merge_batch_payloads(
"images generate",
3,
vec![(0, payload("/tmp/a.png")), (2, payload("/tmp/c.png"))],
vec![BatchItemError {
index: 1,
message: "upstream rejected candidate B".to_string(),
}],
);
assert_eq!(merged["status"], "partial_failed");
assert_eq!(merged["ok"], true);
let files = merged["output"]["files"].as_array().unwrap();
assert_eq!(files.len(), 2);
assert_eq!(files[0]["index"], 0);
assert_eq!(files[0]["path"], "/tmp/a.png");
assert_eq!(files[1]["index"], 2);
assert_eq!(files[1]["path"], "/tmp/c.png");
assert_eq!(merged["output"]["path"], "/tmp/a.png");
assert_eq!(merged["batch"]["request_count"], 3);
assert_eq!(merged["batch"]["success_count"], 2);
assert_eq!(merged["batch"]["failure_count"], 1);
assert_eq!(merged["batch"]["errors"][0]["index"], 1);
assert_eq!(merged["error"]["message"], "upstream rejected candidate B");
}
#[test]
fn merge_batch_payloads_marks_total_failure_not_ok() {
let merged = merge_batch_payloads(
"images generate",
2,
vec![],
vec![
BatchItemError {
index: 0,
message: "candidate A failed".to_string(),
},
BatchItemError {
index: 1,
message: "candidate B failed".to_string(),
},
],
);
assert_eq!(merged["ok"], false);
assert_eq!(merged["status"], "failed");
assert_eq!(merged["output"]["files"].as_array().unwrap().len(), 0);
assert!(merged["output"]["path"].is_null());
assert_eq!(merged["batch"]["success_count"], 0);
assert_eq!(merged["batch"]["failure_count"], 2);
assert_eq!(merged["error"]["code"], "batch_failed");
assert_eq!(merged["error"]["items"][0]["index"], 0);
let job = job_from_payload(&merged, "job-1", "images generate", json!({}));
assert_eq!(job["status"], "failed");
assert_eq!(terminal_event_type(job["status"].as_str()), "job.failed");
assert!(!terminal_status_runs_storage_upload(job["status"].as_str()));
}
}