batch_mode_process_response/
process_output_data.rs1crate::ix!();
3
4#[instrument(level = "trace", skip_all)]
5pub async fn process_output_data<T>(
6 output_data: &BatchOutputData,
7 workspace: &dyn BatchWorkspaceInterface,
8 expected_content_type: &ExpectedContentType,
9) -> Result<(), BatchOutputProcessingError>
10where
11 T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
12{
13 trace!(
14 "Starting process_output_data with {} response record(s)",
15 output_data.responses().len()
16 );
17
18 let mut failed_entries = Vec::new();
19
20 for response_record in output_data.responses() {
21 let custom_id = response_record.custom_id();
22 trace!("Processing response record custom_id={}", custom_id);
23
24 match response_record.response().body().as_success() {
25 Some(success_body) => {
26 trace!("Successfully extracted body for custom_id={}", custom_id);
27
28 match workspace.load_seed_by_custom_id(custom_id).await {
29 Ok(seed_box) => {
30 trace!(
31 "Successfully loaded seed named '{}' for custom_id={}",
32 seed_box.name(),
33 custom_id
34 );
35
36 match handle_successful_response::<T>(
37 success_body,
38 workspace,
39 expected_content_type,
40 seed_box.as_ref(),
41 )
42 .await
43 {
44 Ok(()) => {
45 debug!("Successfully handled response for custom_id={}", custom_id);
46 }
47 Err(e) => {
48 error!(
49 "Error processing successful response for custom_id={}: {:?}",
50 custom_id, e
51 );
52 debug!("Problematic success_body for custom_id={}: {:?}", custom_id, success_body);
53 failed_entries.push(response_record);
54 }
55 }
56 }
57 Err(e) => {
58 error!(
59 "Failed to load seed for custom_id={}, error: {:?}",
60 custom_id, e
61 );
62 failed_entries.push(response_record);
63 }
64 }
65 }
66 None => {
67 warn!("No success body for custom_id={}", custom_id);
68 failed_entries.push(response_record);
69 }
70 }
71 }
72
73 if !failed_entries.is_empty() {
74 warn!(
75 "{} response record(s) failed, invoking save_failed_entries",
76 failed_entries.len()
77 );
78
79 if let Err(e) = save_failed_entries(workspace, &failed_entries).await {
80 error!(
81 "Failed to save {} failed entries: {:?}",
82 failed_entries.len(),
83 e
84 );
85 return Err(e.into());
86 } else {
87 info!("Successfully saved {} failed entries.", failed_entries.len());
88 }
89 }
90
91 info!(
92 "Finished process_output_data ({} succeeded, {} failed)",
93 output_data.responses().len() - failed_entries.len(),
94 failed_entries.len()
95 );
96
97 Ok(())
98}
99
100#[cfg(test)]
105mod process_output_data_tests {
106 use super::*;
107 use std::fs;
108 use tokio::runtime::Runtime;
109
110 #[derive(Debug, Clone, Deserialize, Serialize, NamedItem)]
111 pub struct MockItem {
112 pub name: String,
113 }
114
115 #[traced_test]
116 async fn test_process_output_data_with_deserialization_failure() {
117
118 let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp().await.unwrap();
119
120 let invalid_msg = BatchMessageBuilder::default()
122 .role(MessageRole::Assistant)
123 .content(
124 BatchMessageContentBuilder::default()
125 .content("{\"invalid_field\":12}".to_string())
126 .build()
127 .unwrap(),
128 )
129 .build()
130 .unwrap();
131
132 let choice_fail = BatchChoiceBuilder::default()
133 .index(0_u32)
134 .finish_reason(FinishReason::Stop)
135 .logprobs(None)
136 .message(invalid_msg)
137 .build()
138 .unwrap();
139
140 let success_body_fail = BatchSuccessResponseBodyBuilder::default()
141 .id("550e8400-e29b-41d4-a716-446655440000".to_string())
142 .object("response".to_string())
143 .created(0_u64)
144 .model("test-model".to_string())
145 .choices(vec![choice_fail])
146 .usage(BatchUsage::mock())
147 .build()
148 .unwrap();
149
150 let response_content_fail = BatchResponseContentBuilder::default()
151 .status_code(200_u16)
152 .request_id(ResponseRequestId::new("resp_req_mock_item_2"))
153 .body(BatchResponseBody::Success(success_body_fail))
154 .build()
155 .unwrap();
156
157 let record_fail = BatchResponseRecordBuilder::default()
158 .id(BatchRequestId::new("batch_req_mock_item_2"))
159 .custom_id(CustomRequestId::new("mock_item_2"))
160 .response(response_content_fail)
161 .build()
162 .unwrap();
163
164 let output_data = BatchOutputData::new(vec![record_fail]);
165
166 let result = process_output_data::<MockItem>(
168 &output_data,
169 workspace.as_ref(),
170 &ExpectedContentType::Json,
171 ).await;
172
173 assert!(
177 result.is_ok(),
178 "Should handle the failing record gracefully by saving a failed entry."
179 );
180 }
181}