batch_mode_process_response/
process_output_file.rs1crate::ix!();
3
4#[instrument(level="trace", skip_all)]
10async fn load_ndjson_output_file(
11 path: &Path
12) -> Result<BatchOutputData, BatchOutputProcessingError> {
13 info!("loading NDJSON output file: {:?}", path);
14
15 let file = File::open(path).await.map_err(BatchOutputProcessingError::from)?;
17 let reader = BufReader::new(file);
18 let mut lines = reader.lines();
19
20 let mut responses = Vec::new();
21 while let Some(line_res) = lines.next_line().await? {
22 let trimmed = line_res.trim();
23 if trimmed.is_empty() {
24 trace!("Skipping empty line in output file: {:?}", path);
25 continue;
26 }
27
28 trace!("Parsing NDJSON output line: {}", trimmed);
29 match serde_json::from_str::<BatchResponseRecord>(trimmed) {
30 Ok(record) => {
31 responses.push(record);
32 }
33 Err(e) => {
34 warn!(
35 "Skipping invalid JSON line in output file {:?}: {} => {}",
36 path, trimmed, e
37 );
38 }
40 }
41 }
42
43 info!(
44 "Finished loading NDJSON output file: {:?}, found {} valid record(s).",
45 path, responses.len()
46 );
47
48 Ok(BatchOutputData::new(responses))
49}
50
51#[instrument(level="trace", skip_all)]
56pub async fn process_output_file<T>(
57 triple: &BatchFileTriple,
58 workspace: &dyn BatchWorkspaceInterface,
59 expected_content_type: &ExpectedContentType,
60) -> Result<(), BatchOutputProcessingError>
61where
62 T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
63{
64 trace!("process_output_file => index = {:?}", triple.index());
65
66 let output_path = match triple.output() {
68 Some(p) => p.clone(),
69 None => {
70 error!("No output path found in triple => cannot process output file.");
71 return Err(BatchOutputProcessingError::MissingFilePath);
72 }
73 };
74
75 let output_data = load_ndjson_output_file(&output_path).await?;
77
78 process_output_data::<T>(&output_data, workspace, expected_content_type).await
80}
81
82pub fn process_output_file_bridge_fn<'a, T>(
92 triple: &'a BatchFileTriple,
93 workspace: &'a (dyn BatchWorkspaceInterface + 'a),
94 ect: &'a ExpectedContentType,
95) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>>
96where
97 T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
98{
99 Box::pin(async move {
100 process_output_file::<T>(triple, workspace, ect).await
101 })
102}
103
104pub fn default_output_file_bridge_fn<'a>(
109 triple: &'a BatchFileTriple,
110 workspace: &'a (dyn BatchWorkspaceInterface + 'a),
111 ect: &'a ExpectedContentType,
112) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>>
113{
114 Box::pin(async move {
115 process_output_file::<CamelCaseTokenWithComment>(triple, workspace, ect).await
117 })
118}
119
120pub const DEFAULT_OUTPUT_FILE_BRIDGE: BatchWorkflowProcessOutputFileFn
124 = default_output_file_bridge_fn;
125
126
127#[cfg(test)]
128mod process_output_file_tests {
129 use super::*;
130
131 #[derive(Debug, Clone, Deserialize, Serialize, NamedItem)]
132 pub struct OutputFileMockItem {
133 pub name: String,
134 }
135
136 #[traced_test]
137 async fn test_process_output_file_ok() {
138 info!("Starting test_process_output_file_ok with NDJSON approach.");
139
140 let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp().await.unwrap();
141
142 let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace.clone());
143 triple.set_input_path(Some("dummy_input.json".into()));
144
145 let mut tmpfile = NamedTempFile::new().expect("Failed to create NamedTempFile");
147 let tmp_path = tmpfile.path().to_path_buf();
148 triple.set_output_path(Some(tmp_path.clone()));
149
150 let line_1 = r#"{"id":"batch_req_mock_item_1","custom_id":"mock_item_1","response":{"status_code":200,"request_id":"resp_req_mock_item_1","body":{"id":"someid123","object":"chat.completion","created":0,"model":"test-model","choices":[{"index":0,"message":{"role":"assistant","content":"{\"name\":\"item-from-output-file\"}"},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}}}"#;
152 let line_2 = r#"{"id":"batch_req_mock_item_2","custom_id":"mock_item_2","response":{"status_code":200,"request_id":"resp_req_mock_item_2","body":{"id":"someid456","object":"chat.completion","created":0,"model":"test-model-2","choices":[{"index":0,"message":{"role":"assistant","content":"{\"name\":\"another-output-item\"}"},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":10,"total_tokens":20}}}}"#;
153
154 write!(tmpfile, "{}\n{}\n", line_1, line_2).unwrap();
156 tmpfile.flush().unwrap();
157
158 let result = process_output_file::<OutputFileMockItem>(
159 &triple,
160 workspace.as_ref(),
161 &ExpectedContentType::Json,
162 ).await;
163
164 assert!(
165 result.is_ok(),
166 "Expected process_output_file to succeed with valid NDJSON lines"
167 );
168 debug!("test_process_output_file_ok passed successfully.");
169 }
170}