batch_mode_process_response/
process_output_data.rs1crate::ix!();
3
4#[instrument(level="trace", skip_all)]
5pub async fn process_output_data<T>(
6 output_data: &BatchOutputData,
7 workspace: &dyn BatchWorkspaceInterface,
8 expected_content_type: &ExpectedContentType,
9) -> Result<(), BatchOutputProcessingError>
10where
11 T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
14{
15 trace!("entering process_output_data, output_data len = {}", output_data.responses().len());
16
17 let mut failed_entries = Vec::new();
18
19 for response_record in output_data.responses() {
20 info!("processing output data record with custom_id={}", response_record.custom_id());
21
22 if let Some(success_body) = response_record.response().body().as_success() {
23 if let Err(e) = handle_successful_response::<T>(success_body, workspace, expected_content_type).await {
24 eprintln!(
25 "Failed to process response for request ID '{}', error: {:?}, response: {:?}",
26 response_record.custom_id(),
27 e,
28 success_body
29 );
30 failed_entries.push(response_record);
31 }
32 }
33 }
34
35 if !failed_entries.is_empty() {
36 warn!("some entries failed, saving them.");
37 save_failed_entries(workspace, &failed_entries).await?;
38 }
39
40 info!("process_output_data completed without fatal errors.");
41 Ok(())
42}
43
44#[cfg(test)]
49mod process_output_data_tests {
50 use super::*;
51 use std::fs;
52 use tokio::runtime::Runtime;
53
54 #[derive(Debug, Clone, Deserialize, Serialize, NamedItem)]
55 pub struct MockItem {
56 pub name: String,
57 }
58
59 #[traced_test]
60 async fn test_process_output_data_with_deserialization_failure() {
61
62 let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp().await.unwrap();
63
64 let invalid_msg = BatchMessageBuilder::default()
66 .role(MessageRole::Assistant)
67 .content(
68 BatchMessageContentBuilder::default()
69 .content("{\"invalid_field\":12}".to_string())
70 .build()
71 .unwrap(),
72 )
73 .build()
74 .unwrap();
75
76 let choice_fail = BatchChoiceBuilder::default()
77 .index(0_u32)
78 .finish_reason(FinishReason::Stop)
79 .logprobs(None)
80 .message(invalid_msg)
81 .build()
82 .unwrap();
83
84 let success_body_fail = BatchSuccessResponseBodyBuilder::default()
85 .id("550e8400-e29b-41d4-a716-446655440000".to_string())
86 .object("response".to_string())
87 .created(0_u64)
88 .model("test-model".to_string())
89 .choices(vec![choice_fail])
90 .usage(BatchUsage::mock())
91 .build()
92 .unwrap();
93
94 let response_content_fail = BatchResponseContentBuilder::default()
95 .status_code(200_u16)
96 .request_id(ResponseRequestId::new("resp_req_mock_item_2"))
97 .body(BatchResponseBody::Success(success_body_fail))
98 .build()
99 .unwrap();
100
101 let record_fail = BatchResponseRecordBuilder::default()
102 .id(BatchRequestId::new("batch_req_mock_item_2"))
103 .custom_id(CustomRequestId::new("mock_item_2"))
104 .response(response_content_fail)
105 .build()
106 .unwrap();
107
108 let output_data = BatchOutputData::new(vec![record_fail]);
109
110 let result = process_output_data::<MockItem>(
112 &output_data,
113 workspace.as_ref(),
114 &ExpectedContentType::Json,
115 ).await;
116
117 assert!(
121 result.is_ok(),
122 "Should handle the failing record gracefully by saving a failed entry."
123 );
124 }
125}