crate::ix!();
#[instrument(level="trace", skip_all)]
async fn load_ndjson_output_file(
path: &Path
) -> Result<BatchOutputData, BatchOutputProcessingError> {
info!("loading NDJSON output file: {:?}", path);
let file = File::open(path).await.map_err(BatchOutputProcessingError::from)?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut responses = Vec::new();
while let Some(line_res) = lines.next_line().await? {
let trimmed = line_res.trim();
if trimmed.is_empty() {
trace!("Skipping empty line in output file: {:?}", path);
continue;
}
trace!("Parsing NDJSON output line: {}", trimmed);
match serde_json::from_str::<BatchResponseRecord>(trimmed) {
Ok(record) => {
responses.push(record);
}
Err(e) => {
warn!(
"Skipping invalid JSON line in output file {:?}: {} => {}",
path, trimmed, e
);
}
}
}
info!(
"Finished loading NDJSON output file: {:?}, found {} valid record(s).",
path, responses.len()
);
Ok(BatchOutputData::new(responses))
}
#[instrument(level="trace", skip_all)]
pub async fn process_output_file<T>(
triple: &BatchFileTriple,
workspace: &dyn BatchWorkspaceInterface,
expected_content_type: &ExpectedContentType,
) -> Result<(), BatchOutputProcessingError>
where
T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
{
trace!("process_output_file => index = {:?}", triple.index());
let output_path = match triple.output() {
Some(p) => p.clone(),
None => {
error!("No output path found in triple => cannot process output file.");
return Err(BatchOutputProcessingError::MissingFilePath);
}
};
let output_data = load_ndjson_output_file(&output_path).await?;
process_output_data::<T>(&output_data, workspace, expected_content_type).await
}
pub fn process_output_file_bridge_fn<'a, T>(
triple: &'a BatchFileTriple,
workspace: &'a (dyn BatchWorkspaceInterface + 'a),
ect: &'a ExpectedContentType,
) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>>
where
T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
{
Box::pin(async move {
process_output_file::<T>(triple, workspace, ect).await
})
}
pub fn default_output_file_bridge_fn<'a>(
triple: &'a BatchFileTriple,
workspace: &'a (dyn BatchWorkspaceInterface + 'a),
ect: &'a ExpectedContentType,
) -> Pin<Box<dyn Future<Output = Result<(), BatchOutputProcessingError>> + Send + 'a>>
{
Box::pin(async move {
process_output_file::<CamelCaseTokenWithComment>(triple, workspace, ect).await
})
}
pub const DEFAULT_OUTPUT_FILE_BRIDGE: BatchWorkflowProcessOutputFileFn
= default_output_file_bridge_fn;
#[cfg(test)]
mod process_output_file_tests {
use super::*;
#[derive(Debug, Clone, Deserialize, Serialize, NamedItem)]
pub struct OutputFileMockItem {
pub name: String,
}
#[traced_test]
async fn test_process_output_file_ok() {
info!("Starting test_process_output_file_ok with NDJSON approach.");
let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp().await.unwrap();
let mut triple = BatchFileTriple::new_for_test_with_workspace(workspace.clone());
triple.set_input_path(Some("dummy_input.json".into()));
let mut tmpfile = NamedTempFile::new().expect("Failed to create NamedTempFile");
let tmp_path = tmpfile.path().to_path_buf();
triple.set_output_path(Some(tmp_path.clone()));
let line_1 = r#"{"id":"batch_req_mock_item_1","custom_id":"mock_item_1","response":{"status_code":200,"request_id":"resp_req_mock_item_1","body":{"id":"someid123","object":"chat.completion","created":0,"model":"test-model","choices":[{"index":0,"message":{"role":"assistant","content":"{\"name\":\"item-from-output-file\"}"},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":0,"completion_tokens":0,"total_tokens":0}}}}"#;
let line_2 = r#"{"id":"batch_req_mock_item_2","custom_id":"mock_item_2","response":{"status_code":200,"request_id":"resp_req_mock_item_2","body":{"id":"someid456","object":"chat.completion","created":0,"model":"test-model-2","choices":[{"index":0,"message":{"role":"assistant","content":"{\"name\":\"another-output-item\"}"},"logprobs":null,"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":10,"total_tokens":20}}}}"#;
write!(tmpfile, "{}\n{}\n", line_1, line_2).unwrap();
tmpfile.flush().unwrap();
let result = process_output_file::<OutputFileMockItem>(
&triple,
workspace.as_ref(),
&ExpectedContentType::Json,
).await;
assert!(
result.is_ok(),
"Expected process_output_file to succeed with valid NDJSON lines"
);
debug!("test_process_output_file_ok passed successfully.");
}
}