batch_mode_process_response/
process_output_data.rs

1// ---------------- [ File: batch-mode-process-response/src/process_output_data.rs ]
2crate::ix!();
3
4#[instrument(level="trace", skip_all)]
5pub async fn process_output_data<T>(
6    output_data:           &BatchOutputData,
7    workspace:             &dyn BatchWorkspaceInterface,
8    expected_content_type: &ExpectedContentType,
9) -> Result<(), BatchOutputProcessingError> 
10where
11    // `'static + Send + Sync` ensures T can be held in `Arc<dyn ... + Send + Sync + 'static>`,
12    // and that the future is `Send`.
13    T: 'static + Send + Sync + DeserializeOwned + Named + GetTargetPathForAIExpansion,
14{
15    trace!("entering process_output_data, output_data len = {}", output_data.responses().len());
16
17    let mut failed_entries = Vec::new();
18
19    for response_record in output_data.responses() {
20        info!("processing output data record with custom_id={}", response_record.custom_id());
21
22        if let Some(success_body) = response_record.response().body().as_success() {
23            if let Err(e) = handle_successful_response::<T>(success_body, workspace, expected_content_type).await {
24                eprintln!(
25                    "Failed to process response for request ID '{}', error: {:?}, response: {:?}",
26                    response_record.custom_id(),
27                    e,
28                    success_body
29                );
30                failed_entries.push(response_record);
31            }
32        }
33    }
34
35    if !failed_entries.is_empty() {
36        warn!("some entries failed, saving them.");
37        save_failed_entries(workspace, &failed_entries).await?;
38    }
39
40    info!("process_output_data completed without fatal errors.");
41    Ok(())
42}
43
44// =======================
45// src/process_output_data.rs (RELEVANT TEST ONLY)
46// =======================
47
48#[cfg(test)]
49mod process_output_data_tests {
50    use super::*;
51    use std::fs;
52    use tokio::runtime::Runtime;
53
54    #[derive(Debug, Clone, Deserialize, Serialize, NamedItem)]
55    pub struct MockItem {
56        pub name: String,
57    }
58
59    #[traced_test]
60    async fn test_process_output_data_with_deserialization_failure() {
61
62        let workspace: Arc<dyn BatchWorkspaceInterface> = BatchWorkspace::new_temp().await.unwrap();
63
64        // 3) Construct an output_data with a record that fails deserialization into `MockItem`.
65        let invalid_msg = BatchMessageBuilder::default()
66            .role(MessageRole::Assistant)
67            .content(
68                BatchMessageContentBuilder::default()
69                    .content("{\"invalid_field\":12}".to_string())
70                    .build()
71                    .unwrap(),
72            )
73            .build()
74            .unwrap();
75
76        let choice_fail = BatchChoiceBuilder::default()
77            .index(0_u32)
78            .finish_reason(FinishReason::Stop)
79            .logprobs(None)
80            .message(invalid_msg)
81            .build()
82            .unwrap();
83
84        let success_body_fail = BatchSuccessResponseBodyBuilder::default()
85            .id("550e8400-e29b-41d4-a716-446655440000".to_string())
86            .object("response".to_string())
87            .created(0_u64)
88            .model("test-model".to_string())
89            .choices(vec![choice_fail])
90            .usage(BatchUsage::mock())
91            .build()
92            .unwrap();
93
94        let response_content_fail = BatchResponseContentBuilder::default()
95            .status_code(200_u16)
96            .request_id(ResponseRequestId::new("resp_req_mock_item_2"))
97            .body(BatchResponseBody::Success(success_body_fail))
98            .build()
99            .unwrap();
100
101        let record_fail = BatchResponseRecordBuilder::default()
102            .id(BatchRequestId::new("batch_req_mock_item_2"))
103            .custom_id(CustomRequestId::new("mock_item_2"))
104            .response(response_content_fail)
105            .build()
106            .unwrap();
107
108        let output_data = BatchOutputData::new(vec![record_fail]);
109
110        // 4) Attempt processing
111        let result = process_output_data::<MockItem>(
112            &output_data,
113            workspace.as_ref(),
114            &ExpectedContentType::Json,
115        ).await;
116
117        // Because the record is missing the "name" field, it fails to deserialize,
118        // but we expect `process_output_data` to handle it by saving to
119        // failed_entries.jsonl (not panic).
120        assert!(
121            result.is_ok(),
122            "Should handle the failing record gracefully by saving a failed entry."
123        );
124    }
125}