batch_mode_process_response/
process_error_file.rs

1// ---------------- [ File: batch-mode-process-response/src/process_error_file.rs ]
2crate::ix!();
3
4/**
5 * Loads the error file at `path` in NDJSON format (one JSON object per line).
6 * Each line is parsed into a `BatchResponseRecord`. Invalid lines are skipped
7 * with a warning.
8 */
9#[instrument(level="trace", skip_all)]
10async fn load_ndjson_error_file(
11    path: &Path
12) -> Result<BatchErrorData, BatchErrorProcessingError> {
13    info!("loading NDJSON error file: {:?}", path);
14
15    let file = File::open(path).await?;
16    let reader = BufReader::new(file);
17    let mut lines = reader.lines();
18
19    let mut responses = Vec::new();
20    while let Some(line_res) = lines.next_line().await? {
21        let trimmed = line_res.trim();
22        if trimmed.is_empty() {
23            trace!("Skipping empty line in error file: {:?}", path);
24            continue;
25        }
26
27        trace!("Parsing NDJSON error line: {}", trimmed);
28        match serde_json::from_str::<BatchResponseRecord>(trimmed) {
29            Ok(record) => {
30                responses.push(record);
31            }
32            Err(e) => {
33                warn!(
34                    "Skipping invalid JSON line in error file {:?}: {} => {}",
35                    path, trimmed, e
36                );
37            }
38        }
39    }
40
41    info!(
42        "Finished loading NDJSON error file: {:?}, found {} valid record(s).",
43        path, responses.len()
44    );
45
46    Ok(BatchErrorData::new(responses))
47}
48
49/**
50 * This is the real async function that processes the error file for a given triple,
51 * using the list of error operations. Now uses NDJSON approach line by line.
52 */
53#[instrument(level="trace", skip_all)]
54pub async fn process_error_file(
55    triple:     &BatchFileTriple,
56    operations: &[BatchErrorFileProcessingOperation],
57) -> Result<(), BatchErrorProcessingError> {
58
59    let error_file_path = match triple.error() {
60        Some(e) => e.clone(),
61        None => {
62            error!("No error path found in triple => cannot process error file.");
63            return Err(BatchErrorProcessingError::MissingFilePath);
64        }
65    };
66
67    info!("processing NDJSON error file {:?} with operations: {:#?}", error_file_path, operations);
68
69    // 1) Load error file line-by-line, parse as `BatchResponseRecord`, accumulate.
70    let error_data = load_ndjson_error_file(&error_file_path).await?;
71
72    // 2) Perform each requested operation
73    for operation in operations {
74        match operation {
75            BatchErrorFileProcessingOperation::LogErrors => {
76                triple.log_errors(&error_data).await?;
77            }
78            BatchErrorFileProcessingOperation::RetryFailedRequests => {
79                triple.retry_failed_requests(&error_data).await?;
80            }
81        }
82    }
83
84    Ok(())
85}
86
87/**
88 * The bridging function matching `BatchWorkflowProcessErrorFileFn`: 
89 * 
90 *  for<'a> fn(
91 *      &'a BatchFileTriple,
92 *      &'a [BatchErrorFileProcessingOperation],
93 *  ) -> Pin<Box<dyn Future<Output=Result<(),BatchErrorProcessingError>> + Send + 'a>>
94 */
95fn process_error_file_bridge_fn<'a>(
96    triple: &'a BatchFileTriple,
97    ops: &'a [BatchErrorFileProcessingOperation],
98) -> Pin<Box<dyn Future<Output = Result<(), BatchErrorProcessingError>> + Send + 'a>>
99{
100    Box::pin(async move {
101        process_error_file(triple, ops).await
102    })
103}
104
105/**
106 * We expose a CONST of type `BatchWorkflowProcessErrorFileFn`, so passing `&PROCESS_ERROR_FILE_BRIDGE`
107 * exactly matches the trait's needed function pointer type.
108 */
109pub const PROCESS_ERROR_FILE_BRIDGE: BatchWorkflowProcessErrorFileFn = process_error_file_bridge_fn;
110
111
112#[cfg(test)]
113mod process_error_file_tests {
114    use super::*;
115
116    #[traced_test]
117    async fn test_process_error_file() {
118        info!("Starting test_process_error_file with NDJSON approach.");
119        let workspace = BatchWorkspace::new_temp().await.unwrap();
120        let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace);
121
122        // We'll create a NamedTempFile so no permanent file is left behind
123        let mut tmp = NamedTempFile::new().expect("Failed to create NamedTempFile");
124        let tmp_path = tmp.path().to_path_buf();
125        triple.set_error_path(Some(tmp_path.clone()));
126
127        // We'll write 2 lines, each a valid BatchResponseRecord with status_code=400
128        let line_1 = r#"{"id":"batch_req_error_id_1","custom_id":"error_id_1","response":{"status_code":400,"request_id":"resp_error_id_1","body":{"error":{"message":"Some error occurred","type":"some_test_error","param":null,"code":"SomeErrorCode"},"object":"error"}}}"#;
129        let line_2 = r#"{"id":"batch_req_error_id_2","custom_id":"error_id_2","response":{"status_code":400,"request_id":"resp_error_id_2","body":{"error":{"message":"Another error","type":"some_test_error","param":null,"code":"AnotherErrorCode"},"object":"error"}}}"#;
130
131        writeln!(tmp, "{}", line_1).unwrap();
132        writeln!(tmp, "{}", line_2).unwrap();
133        tmp.flush().unwrap();
134
135        let ops = vec![
136            BatchErrorFileProcessingOperation::LogErrors,
137            BatchErrorFileProcessingOperation::RetryFailedRequests,
138        ];
139
140        let result = process_error_file(&triple, &ops).await;
141        assert!(
142            result.is_ok(),
143            "Expected process_error_file to parse NDJSON and succeed."
144        );
145
146        debug!("test_process_error_file passed successfully.");
147    }
148}