batch_mode_process_response/
process_error_file.rs1crate::ix!();
3
4#[instrument(level="trace", skip_all)]
10async fn load_ndjson_error_file(
11 path: &Path
12) -> Result<BatchErrorData, BatchErrorProcessingError> {
13 info!("loading NDJSON error file: {:?}", path);
14
15 let file = File::open(path).await?;
16 let reader = BufReader::new(file);
17 let mut lines = reader.lines();
18
19 let mut responses = Vec::new();
20 while let Some(line_res) = lines.next_line().await? {
21 let trimmed = line_res.trim();
22 if trimmed.is_empty() {
23 trace!("Skipping empty line in error file: {:?}", path);
24 continue;
25 }
26
27 trace!("Parsing NDJSON error line: {}", trimmed);
28 match serde_json::from_str::<BatchResponseRecord>(trimmed) {
29 Ok(record) => {
30 responses.push(record);
31 }
32 Err(e) => {
33 warn!(
34 "Skipping invalid JSON line in error file {:?}: {} => {}",
35 path, trimmed, e
36 );
37 }
38 }
39 }
40
41 info!(
42 "Finished loading NDJSON error file: {:?}, found {} valid record(s).",
43 path, responses.len()
44 );
45
46 Ok(BatchErrorData::new(responses))
47}
48
49#[instrument(level="trace", skip_all)]
54pub async fn process_error_file(
55 triple: &BatchFileTriple,
56 operations: &[BatchErrorFileProcessingOperation],
57) -> Result<(), BatchErrorProcessingError> {
58
59 let error_file_path = match triple.error() {
60 Some(e) => e.clone(),
61 None => {
62 error!("No error path found in triple => cannot process error file.");
63 return Err(BatchErrorProcessingError::MissingFilePath);
64 }
65 };
66
67 info!("processing NDJSON error file {:?} with operations: {:#?}", error_file_path, operations);
68
69 let error_data = load_ndjson_error_file(&error_file_path).await?;
71
72 for operation in operations {
74 match operation {
75 BatchErrorFileProcessingOperation::LogErrors => {
76 triple.log_errors(&error_data).await?;
77 }
78 BatchErrorFileProcessingOperation::RetryFailedRequests => {
79 triple.retry_failed_requests(&error_data).await?;
80 }
81 }
82 }
83
84 Ok(())
85}
86
87fn process_error_file_bridge_fn<'a>(
96 triple: &'a BatchFileTriple,
97 ops: &'a [BatchErrorFileProcessingOperation],
98) -> Pin<Box<dyn Future<Output = Result<(), BatchErrorProcessingError>> + Send + 'a>>
99{
100 Box::pin(async move {
101 process_error_file(triple, ops).await
102 })
103}
104
105pub const PROCESS_ERROR_FILE_BRIDGE: BatchWorkflowProcessErrorFileFn = process_error_file_bridge_fn;
110
111
112#[cfg(test)]
113mod process_error_file_tests {
114 use super::*;
115
116 #[traced_test]
117 async fn test_process_error_file() {
118 info!("Starting test_process_error_file with NDJSON approach.");
119 let workspace = BatchWorkspace::new_temp().await.unwrap();
120 let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace);
121
122 let mut tmp = NamedTempFile::new().expect("Failed to create NamedTempFile");
124 let tmp_path = tmp.path().to_path_buf();
125 triple.set_error_path(Some(tmp_path.clone()));
126
127 let line_1 = r#"{"id":"batch_req_error_id_1","custom_id":"error_id_1","response":{"status_code":400,"request_id":"resp_error_id_1","body":{"error":{"message":"Some error occurred","type":"some_test_error","param":null,"code":"SomeErrorCode"},"object":"error"}}}"#;
129 let line_2 = r#"{"id":"batch_req_error_id_2","custom_id":"error_id_2","response":{"status_code":400,"request_id":"resp_error_id_2","body":{"error":{"message":"Another error","type":"some_test_error","param":null,"code":"AnotherErrorCode"},"object":"error"}}}"#;
130
131 writeln!(tmp, "{}", line_1).unwrap();
132 writeln!(tmp, "{}", line_2).unwrap();
133 tmp.flush().unwrap();
134
135 let ops = vec![
136 BatchErrorFileProcessingOperation::LogErrors,
137 BatchErrorFileProcessingOperation::RetryFailedRequests,
138 ];
139
140 let result = process_error_file(&triple, &ops).await;
141 assert!(
142 result.is_ok(),
143 "Expected process_error_file to parse NDJSON and succeed."
144 );
145
146 debug!("test_process_error_file passed successfully.");
147 }
148}