crate::ix!();
#[instrument(level="trace", skip_all)]
async fn load_ndjson_error_file(
path: &Path
) -> Result<BatchErrorData, BatchErrorProcessingError> {
info!("loading NDJSON error file: {:?}", path);
let file = File::open(path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut responses = Vec::new();
while let Some(line_res) = lines.next_line().await? {
let trimmed = line_res.trim();
if trimmed.is_empty() {
trace!("Skipping empty line in error file: {:?}", path);
continue;
}
trace!("Parsing NDJSON error line: {}", trimmed);
match serde_json::from_str::<BatchResponseRecord>(trimmed) {
Ok(record) => {
responses.push(record);
}
Err(e) => {
warn!(
"Skipping invalid JSON line in error file {:?}: {} => {}",
path, trimmed, e
);
}
}
}
info!(
"Finished loading NDJSON error file: {:?}, found {} valid record(s).",
path, responses.len()
);
Ok(BatchErrorData::new(responses))
}
#[instrument(level="trace", skip_all)]
pub async fn process_error_file(
triple: &BatchFileTriple,
operations: &[BatchErrorFileProcessingOperation],
) -> Result<(), BatchErrorProcessingError> {
let error_file_path = match triple.error() {
Some(e) => e.clone(),
None => {
error!("No error path found in triple => cannot process error file.");
return Err(BatchErrorProcessingError::MissingFilePath);
}
};
info!("processing NDJSON error file {:?} with operations: {:#?}", error_file_path, operations);
let error_data = load_ndjson_error_file(&error_file_path).await?;
for operation in operations {
match operation {
BatchErrorFileProcessingOperation::LogErrors => {
triple.log_errors(&error_data).await?;
}
BatchErrorFileProcessingOperation::RetryFailedRequests => {
triple.retry_failed_requests(&error_data).await?;
}
}
}
Ok(())
}
fn process_error_file_bridge_fn<'a>(
triple: &'a BatchFileTriple,
ops: &'a [BatchErrorFileProcessingOperation],
) -> Pin<Box<dyn Future<Output = Result<(), BatchErrorProcessingError>> + Send + 'a>>
{
Box::pin(async move {
process_error_file(triple, ops).await
})
}
pub const PROCESS_ERROR_FILE_BRIDGE: BatchWorkflowProcessErrorFileFn = process_error_file_bridge_fn;
#[cfg(test)]
mod process_error_file_tests {
use super::*;
#[traced_test]
async fn test_process_error_file() {
info!("Starting test_process_error_file with NDJSON approach.");
let workspace = BatchWorkspace::new_temp().await.unwrap();
let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace);
let mut tmp = NamedTempFile::new().expect("Failed to create NamedTempFile");
let tmp_path = tmp.path().to_path_buf();
triple.set_error_path(Some(tmp_path.clone()));
let line_1 = r#"{"id":"batch_req_error_id_1","custom_id":"error_id_1","response":{"status_code":400,"request_id":"resp_error_id_1","body":{"error":{"message":"Some error occurred","type":"some_test_error","param":null,"code":"SomeErrorCode"},"object":"error"}}}"#;
let line_2 = r#"{"id":"batch_req_error_id_2","custom_id":"error_id_2","response":{"status_code":400,"request_id":"resp_error_id_2","body":{"error":{"message":"Another error","type":"some_test_error","param":null,"code":"AnotherErrorCode"},"object":"error"}}}"#;
writeln!(tmp, "{}", line_1).unwrap();
writeln!(tmp, "{}", line_2).unwrap();
tmp.flush().unwrap();
let ops = vec![
BatchErrorFileProcessingOperation::LogErrors,
BatchErrorFileProcessingOperation::RetryFailedRequests,
];
let result = process_error_file(&triple, &ops).await;
assert!(
result.is_ok(),
"Expected process_error_file to parse NDJSON and succeed."
);
debug!("test_process_error_file passed successfully.");
}
}