batch_mode_process_response/
process_output_file.rs

1// ---------------- [ File: batch-mode-process-response/src/process_output_file.rs ]
2crate::ix!();
3
4/**
5 * Loads the output file at `path` in NDJSON format (one JSON object per line).
6 * Each line is parsed into a `BatchResponseRecord`. Invalid lines are skipped
7 * with a warning, so that partial data doesn't abort the entire read.
8 */
9#[instrument(level="trace", skip_all)]
10async fn load_ndjson_output_file(
11    path: &Path
12) -> Result<BatchOutputData, BatchOutputProcessingError> {
13    info!("loading NDJSON output file: {:?}", path);
14
15    // Attempt to open the file
16    let file = File::open(path).await.map_err(BatchOutputProcessingError::from)?;
17    let reader = BufReader::new(file);
18    let mut lines = reader.lines();
19
20    let mut responses = Vec::new();
21    while let Some(line_res) = lines.next_line().await? {
22        let trimmed = line_res.trim();
23        if trimmed.is_empty() {
24            trace!("Skipping empty line in output file: {:?}", path);
25            continue;
26        }
27
28        trace!("Parsing NDJSON output line: {}", trimmed);
29        match serde_json::from_str::<BatchResponseRecord>(trimmed) {
30            Ok(record) => {
31                responses.push(record);
32            }
33            Err(e) => {
34                warn!(
35                    "Skipping invalid JSON line in output file {:?}: {} => {}",
36                    path, trimmed, e
37                );
38                // We skip this line instead of failing the entire load
39            }
40        }
41    }
42
43    info!(
44        "Finished loading NDJSON output file: {:?}, found {} valid record(s).",
45        path, responses.len()
46    );
47
48    Ok(BatchOutputData::new(responses))
49}
50
51/**
52 * The core async function to process the output file for a given triple.
53 * Now we do NDJSON line-by-line parsing, just like the older batch-mode approach.
54 */
55#[instrument(level="trace", skip_all)]
56pub async fn process_output_file<T>(
57    triple:                &BatchFileTriple,
58    workspace:             &dyn BatchWorkspaceInterface,
59    expected_content_type: &ExpectedContentType,
60) -> Result<(), BatchOutputProcessingError> 
61where
62    T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
63{
64    trace!("process_output_file => index = {:?}", triple.index());
65
66    // 1) Identify the path to the output file, which must be NDJSON (one record per line).
67    let output_path = match triple.output() {
68        Some(p) => p.clone(),
69        None => {
70            error!("No output path found in triple => cannot process output file.");
71            return Err(BatchOutputProcessingError::MissingFilePath);
72        }
73    };
74
75    // 2) Load as NDJSON => gather into a BatchOutputData
76    let output_data = load_ndjson_output_file(&output_path).await?;
77
78    // 3) Process all records
79    process_output_data::<T>(&output_data, workspace, expected_content_type).await
80}
81
82/**
83 * The bridging function EXACTLY matches the `BatchWorkflowProcessOutputFileFn` type:
84 * 
85 *   for<'a> fn(
86 *       &'a BatchFileTriple,
87 *       &'a (dyn BatchWorkspaceInterface + 'a),
88 *       &'a ExpectedContentType,
89 *   ) -> Pin<Box<dyn Future<Output=Result<(),BatchOutputProcessingError>> + Send + 'a>>
90 */
91pub fn process_output_file_bridge_fn<'a, T>(
92    triple:    &'a BatchFileTriple,
93    workspace: &'a (dyn BatchWorkspaceInterface + 'a),
94    ect:       &'a ExpectedContentType,
95) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>> 
96where
97    T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
98{
99    Box::pin(async move {
100        process_output_file::<T>(triple, workspace, ect).await
101    })
102}
103
104/**
105 * A default bridging function, if the user doesn't specify a particular T type.
106 * We'll parse each line as a `CamelCaseTokenWithComment` or whichever default type we prefer.
107 */
108pub fn default_output_file_bridge_fn<'a>(
109    triple:    &'a BatchFileTriple,
110    workspace: &'a (dyn BatchWorkspaceInterface + 'a),
111    ect:       &'a ExpectedContentType,
112) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>>
113{
114    Box::pin(async move {
115        // Use CamelCaseTokenWithComment (or any other default T) as the type:
116        process_output_file::<CamelCaseTokenWithComment>(triple, workspace, ect).await
117    })
118}
119
120/**
121 * The const pointer the macro references.
122 */
123pub const DEFAULT_OUTPUT_FILE_BRIDGE: BatchWorkflowProcessOutputFileFn 
124    = default_output_file_bridge_fn;
125
126
127#[cfg(test)]
128mod process_output_file_tests {
129    use super::*;
130
131    #[derive(Debug, Clone, Deserialize, Serialize, NamedItem)]
132    pub struct OutputFileMockItem {
133        pub name: String,
134    }
135
136    #[traced_test]
137    async fn test_process_output_file_ok() {
138        info!("Starting test_process_output_file_ok with NDJSON approach.");
139
140        let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp().await.unwrap();
141
142        let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace.clone());
143        triple.set_input_path(Some("dummy_input.json".into()));
144
145        // We create a NamedTempFile to avoid littering permanent files:
146        let mut tmpfile = NamedTempFile::new().expect("Failed to create NamedTempFile");
147        let tmp_path = tmpfile.path().to_path_buf();
148        triple.set_output_path(Some(tmp_path.clone()));
149
150        // We'll write two lines, each is a valid `BatchResponseRecord` in JSON:
151        let line_1 = r#"{"id":"batch_req_mock_item_1","custom_id":"mock_item_1","response":{"status_code":200,"request_id":"resp_req_mock_item_1","body":{"id":"someid123","object":"chat.completion","created":0,"model":"test-model","choices":[{"index":0,"message":{"role":"assistant","content":"{\"name\":\"item-from-output-file\"}"},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}}}"#;
152        let line_2 = r#"{"id":"batch_req_mock_item_2","custom_id":"mock_item_2","response":{"status_code":200,"request_id":"resp_req_mock_item_2","body":{"id":"someid456","object":"chat.completion","created":0,"model":"test-model-2","choices":[{"index":0,"message":{"role":"assistant","content":"{\"name\":\"another-output-item\"}"},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":10,"total_tokens":20}}}}"#;
153
154        // Write them to the file as NDJSON (each on its own line):
155        write!(tmpfile, "{}\n{}\n", line_1, line_2).unwrap();
156        tmpfile.flush().unwrap();
157
158        let result = process_output_file::<OutputFileMockItem>(
159            &triple,
160            workspace.as_ref(),
161            &ExpectedContentType::Json,
162        ).await;
163
164        assert!(
165            result.is_ok(),
166            "Expected process_output_file to succeed with valid NDJSON lines"
167        );
168        debug!("test_process_output_file_ok passed successfully.");
169    }
170}