batch_mode_process_response/
process_output_data.rs

1// ---------------- [ File: batch-mode-process-response/src/process_output_data.rs ]
2crate::ix!();
3
4#[instrument(level = "trace", skip_all)]
5pub async fn process_output_data<T>(
6    output_data: &BatchOutputData,
7    workspace: &dyn BatchWorkspaceInterface,
8    expected_content_type: &ExpectedContentType,
9) -> Result<(), BatchOutputProcessingError>
10where
11    T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
12{
13    trace!(
14        "Starting process_output_data with {} response record(s)",
15        output_data.responses().len()
16    );
17
18    let mut failed_entries = Vec::new();
19
20    for response_record in output_data.responses() {
21        let custom_id = response_record.custom_id();
22        trace!("Processing response record custom_id={}", custom_id);
23
24        match response_record.response().body().as_success() {
25            Some(success_body) => {
26                trace!("Successfully extracted body for custom_id={}", custom_id);
27
28                match workspace.load_seed_by_custom_id(custom_id).await {
29                    Ok(seed_box) => {
30                        trace!(
31                            "Successfully loaded seed named '{}' for custom_id={}",
32                            seed_box.name(),
33                            custom_id
34                        );
35
36                        match handle_successful_response::<T>(
37                            success_body,
38                            workspace,
39                            expected_content_type,
40                            seed_box.as_ref(),
41                        )
42                        .await
43                        {
44                            Ok(()) => {
45                                debug!("Successfully handled response for custom_id={}", custom_id);
46                            }
47                            Err(e) => {
48                                error!(
49                                    "Error processing successful response for custom_id={}: {:?}",
50                                    custom_id, e
51                                );
52                                debug!("Problematic success_body for custom_id={}: {:?}", custom_id, success_body);
53                                failed_entries.push(response_record);
54                            }
55                        }
56                    }
57                    Err(e) => {
58                        error!(
59                            "Failed to load seed for custom_id={}, error: {:?}",
60                            custom_id, e
61                        );
62                        failed_entries.push(response_record);
63                    }
64                }
65            }
66            None => {
67                warn!("No success body for custom_id={}", custom_id);
68                failed_entries.push(response_record);
69            }
70        }
71    }
72
73    if !failed_entries.is_empty() {
74        warn!(
75            "{} response record(s) failed, invoking save_failed_entries",
76            failed_entries.len()
77        );
78
79        if let Err(e) = save_failed_entries(workspace, &failed_entries).await {
80            error!(
81                "Failed to save {} failed entries: {:?}",
82                failed_entries.len(),
83                e
84            );
85            return Err(e.into());
86        } else {
87            info!("Successfully saved {} failed entries.", failed_entries.len());
88        }
89    }
90
91    info!(
92        "Finished process_output_data ({} succeeded, {} failed)",
93        output_data.responses().len() - failed_entries.len(),
94        failed_entries.len()
95    );
96
97    Ok(())
98}
99
100// =======================
101// src/process_output_data.rs (RELEVANT TEST ONLY)
102// =======================
103
104#[cfg(test)]
105mod process_output_data_tests {
106    use super::*;
107    use std::fs;
108    use tokio::runtime::Runtime;
109
110    #[derive(Debug, Clone, Deserialize, Serialize, NamedItem)]
111    pub struct MockItem {
112        pub name: String,
113    }
114
115    #[traced_test]
116    async fn test_process_output_data_with_deserialization_failure() {
117
118        let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp().await.unwrap();
119
120        // 3) Construct an output_data with a record that fails deserialization into `MockItem`.
121        let invalid_msg = BatchMessageBuilder::default()
122            .role(MessageRole::Assistant)
123            .content(
124                BatchMessageContentBuilder::default()
125                    .content("{\"invalid_field\":12}".to_string())
126                    .build()
127                    .unwrap(),
128            )
129            .build()
130            .unwrap();
131
132        let choice_fail = BatchChoiceBuilder::default()
133            .index(0_u32)
134            .finish_reason(FinishReason::Stop)
135            .logprobs(None)
136            .message(invalid_msg)
137            .build()
138            .unwrap();
139
140        let success_body_fail = BatchSuccessResponseBodyBuilder::default()
141            .id("550e8400-e29b-41d4-a716-446655440000".to_string())
142            .object("response".to_string())
143            .created(0_u64)
144            .model("test-model".to_string())
145            .choices(vec![choice_fail])
146            .usage(BatchUsage::mock())
147            .build()
148            .unwrap();
149
150        let response_content_fail = BatchResponseContentBuilder::default()
151            .status_code(200_u16)
152            .request_id(ResponseRequestId::new("resp_req_mock_item_2"))
153            .body(BatchResponseBody::Success(success_body_fail))
154            .build()
155            .unwrap();
156
157        let record_fail = BatchResponseRecordBuilder::default()
158            .id(BatchRequestId::new("batch_req_mock_item_2"))
159            .custom_id(CustomRequestId::new("mock_item_2"))
160            .response(response_content_fail)
161            .build()
162            .unwrap();
163
164        let output_data = BatchOutputData::new(vec![record_fail]);
165
166        // 4) Attempt processing
167        let result = process_output_data::<MockItem>(
168            &output_data,
169            workspace.as_ref(),
170            &ExpectedContentType::Json,
171        ).await;
172
173        // Because the record is missing the "name" field, it fails to deserialize,
174        // but we expect `process_output_data` to handle it by saving to
175        // failed_entries.jsonl (not panic).
176        assert!(
177            result.is_ok(),
178            "Should handle the failing record gracefully by saving a failed entry."
179        );
180    }
181}