1crate::ix!();
3
4#[async_trait]
5pub trait FreshExecute<Client,E> {
6 type Success;
7 async fn fresh_execute(&mut self, client: &Client)
8 -> Result<Self::Success, E>;
9}
10
11#[async_trait]
12impl<C,E> FreshExecute<C,E> for BatchFileTriple
13where
14 C: LanguageModelClientInterface<E>,
15
16 E
19 : Debug
20 + Display
21 + From<BatchProcessingError>
22 + From<BatchDownloadError>
23 + From<JsonParseError>
24 + From<std::io::Error>
25 + From<OpenAIClientError>
26 + From<BatchMetadataError>,
27{
28 type Success = BatchExecutionResult;
29
30 async fn fresh_execute(&mut self, client: &C) -> Result<BatchExecutionResult, E>
31 {
32 trace!("Inside fresh_execute for triple: {:?}", self);
33
34 assert!(self.input().is_some());
35 assert!(self.output().is_none());
36 assert!(self.error().is_none());
37 assert!(self.associated_metadata().is_none());
38
39 info!("executing fresh batch processing for triple {:#?}", self);
40
41 let input_filename = self.effective_input_filename();
42 let output_filename = self.effective_output_filename();
43 let error_filename = self.effective_error_filename();
44 let metadata_filename = self.effective_metadata_filename();
45
46 info!("input_filename: {:?}", input_filename);
47 info!("output_filename: {:?}", output_filename);
48 info!("error_filename: {:?}", error_filename);
49 info!("metadata_filename: {:?}", metadata_filename);
50
51 assert!(input_filename.exists());
52 assert!(!output_filename.exists());
53 assert!(!error_filename.exists());
54 assert!(!metadata_filename.exists());
55
56 let input_file = client.upload_batch_file_path(&input_filename).await?;
58 let input_file_id = input_file.id;
59
60 let batch = client.create_batch(&input_file_id).await?;
62 let batch_id = batch.id.clone();
63
64 let mut metadata = BatchMetadata::with_input_id_and_batch_id(&input_file_id, &batch_id);
66 metadata.save_to_file(&metadata_filename).await?;
67
68 let completed_batch = client.wait_for_batch_completion(&batch_id).await?;
70
71 let outputs = if let Some(output_file_id) = completed_batch.output_file_id {
73 metadata.set_output_file_id(Some(output_file_id));
74 metadata.save_to_file(&metadata_filename).await?;
75 self.download_output_file(client).await?;
76 let outputs = load_output_file(&output_filename).await?;
77 Some(outputs)
78 } else {
79 None
80 };
81
82 let errors = if let Some(error_file_id) = completed_batch.error_file_id {
84 metadata.set_error_file_id(Some(error_file_id));
85 metadata.save_to_file(&metadata_filename).await?;
86 self.download_error_file(client).await?;
87 let errors = load_error_file(&error_filename).await?;
88 Some(errors)
89 } else {
90 None
91 };
92
93 Ok(BatchExecutionResult::new(outputs, errors))
94 }
95}
96
97#[cfg(test)]
98mod fresh_execute_tests {
99 use super::*;
100 use std::fs;
101 use std::path::Path;
102 use tempfile::tempdir;
103 use tracing::{debug, error, info, trace, warn};
104 use futures::executor::block_on;
105
106 fn generate_mock_batch_id_for(input_file_path: &Path) -> String {
109 let input_file_id = format!("mock_file_id_{}", input_file_path.display());
111 format!("mock_batch_id_for_{}", input_file_id)
112 }
113
114 fn configure_mock_batch_for_failure(
119 mock_client: &MockLanguageModelClient<MockBatchClientError>,
120 batch_id: &str,
121 is_immediate: bool,
122 ) {
123 if is_immediate {
125 let mut guard = mock_client.batches().write().unwrap();
126 guard.insert(
127 batch_id.to_string(),
128 Batch {
129 id: batch_id.to_string(),
130 object: "batch".to_string(),
131 endpoint: "/v1/chat/completions".to_string(),
132 errors: None,
133 input_file_id: format!("immediate_fail_for_{batch_id}"),
134 completion_window: "24h".to_string(),
135 status: BatchStatus::Failed,
136 output_file_id: None,
137 error_file_id: None,
138 created_at: 0,
139 in_progress_at: None,
140 expires_at: None,
141 finalizing_at: None,
142 completed_at: None,
143 failed_at: None,
144 expired_at: None,
145 cancelling_at: None,
146 cancelled_at: None,
147 request_counts: None,
148 metadata: None,
149 },
150 );
151 } else {
152 {
155 let mut c = mock_client.mock_batch_config().write().unwrap();
156 c.fails_on_attempt_1_mut().insert(batch_id.to_string());
157 }
158 let mut guard = mock_client.batches().write().unwrap();
160 guard.insert(
161 batch_id.to_string(),
162 Batch {
163 id: batch_id.to_string(),
164 object: "batch".to_string(),
165 endpoint: "/v1/chat/completions".to_string(),
166 errors: None,
167 input_file_id: format!("eventual_fail_for_{batch_id}"),
168 completion_window: "24h".to_string(),
169 status: BatchStatus::InProgress,
170 output_file_id: None,
171 error_file_id: None,
172 created_at: 0,
173 in_progress_at: None,
174 expires_at: None,
175 finalizing_at: None,
176 completed_at: None,
177 failed_at: None,
178 expired_at: None,
179 cancelling_at: None,
180 cancelled_at: None,
181 request_counts: None,
182 metadata: None,
183 },
184 );
185 }
186 }
187
188 #[tracing::instrument(level = "trace", skip(mock_client))]
189 pub fn configure_mock_batch_for_success(
190 mock_client: &MockLanguageModelClient<MockBatchClientError>,
191 batch_id: &str,
192 want_output: bool,
193 want_error: bool,
194 ) {
195 trace!("Configuring mock batch for success with batch_id='{}', want_output={}, want_error={}", batch_id, want_output, want_error);
196
197 {
199 let mut guard = mock_client.batches().write().unwrap();
200 match guard.get_mut(batch_id) {
201 Some(batch_entry) => {
202 debug!("Found existing batch entry for batch_id='{}'; setting status=Completed.", batch_id);
203 batch_entry.status = BatchStatus::Completed;
204 if want_output {
205 batch_entry.output_file_id = Some("mock_out_file_id".to_string());
206 }
207 if want_error {
208 batch_entry.error_file_id = Some("mock_err_file_id".to_string());
209 }
210 }
211 None => {
212 warn!("No existing batch entry for batch_id='{}'; inserting a new one with Completed status.", batch_id);
213 guard.insert(
214 batch_id.to_string(),
215 Batch {
216 id: batch_id.to_string(),
217 object: "batch".to_string(),
218 endpoint: "/v1/chat/completions".to_string(),
219 errors: None,
220 input_file_id: "inserted_dummy".to_string(),
221 completion_window: "24h".to_string(),
222 status: BatchStatus::Completed,
223 output_file_id: if want_output {
224 Some("mock_out_file_id".to_string())
225 } else {
226 None
227 },
228 error_file_id: if want_error {
229 Some("mock_err_file_id".to_string())
230 } else {
231 None
232 },
233 created_at: 0,
234 in_progress_at: None,
235 expires_at: None,
236 finalizing_at: None,
237 completed_at: None,
238 failed_at: None,
239 expired_at: None,
240 cancelling_at: None,
241 cancelled_at: None,
242 request_counts: None,
243 metadata: None,
244 },
245 );
246 }
247 }
248 }
249
250 {
254 let mut files_guard = mock_client.files().write().unwrap();
255
256 if want_output {
257 debug!("Inserting mock_out_file_id with a valid BatchResponseRecord JSON line.");
258 files_guard.insert(
259 "mock_out_file_id".to_string(),
260 Bytes::from(
261 r#"{
262 "id": "batch_req_mock_output",
263 "custom_id": "mock_out",
264 "response": {
265 "status_code": 200,
266 "request_id": "resp_req_mock_output",
267 "body": {
268 "id": "success-id",
269 "object": "chat.completion",
270 "created": 0,
271 "model": "test-model",
272 "choices": [],
273 "usage": {
274 "prompt_tokens": 0,
275 "completion_tokens": 0,
276 "total_tokens": 0
277 }
278 }
279 },
280 "error": null
281 }"#,
282 ),
283 );
284 }
285
286 if want_error {
287 debug!("Inserting mock_err_file_id with a valid BatchResponseRecord JSON line (status=400).");
288 files_guard.insert(
289 "mock_err_file_id".to_string(),
290 Bytes::from(
291 r#"{
292 "id": "batch_req_mock_error",
293 "custom_id": "mock_err",
294 "response": {
295 "status_code": 400,
296 "request_id": "resp_req_mock_error",
297 "body": {
298 "error": {
299 "message": "Some error message",
300 "type": "test_error",
301 "param": null,
302 "code": null
303 }
304 }
305 },
306 "error": null
307 }"#,
308 ),
309 );
310 }
311 }
312
313 trace!("configure_mock_batch_for_success done for batch_id='{}'", batch_id);
314 }
315
316 #[traced_test]
317 async fn test_fresh_execute_success_error_only() {
318 info!("Beginning test_fresh_execute_success_error_only");
319 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
320 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
321 .build()
322 .unwrap();
323 debug!("Constructed mock client: {:?}", mock_client);
324
325 let tmp_dir = tempdir().expect("Failed to create temp dir");
327 let input_file_path = tmp_dir.path().join("input.json");
328 fs::write(&input_file_path, b"{}").unwrap();
329
330 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
332 workspace,
333 input_file_path.clone(),
334 None,
335 None,
336 );
337
338 let final_batch_id = generate_mock_batch_id_for(&input_file_path);
341
342 mock_client.configure_inprogress_then_complete_with(&final_batch_id, false, true);
345
346 let exec_result = triple.fresh_execute(&mock_client).await;
348 debug!("Result from fresh_execute: {:?}", exec_result);
349
350 assert!(exec_result.is_ok(), "Should succeed with error-only scenario");
351 let result = exec_result.unwrap();
352 assert!(result.outputs().is_none(), "No output data expected");
353 assert!(result.errors().is_some(), "Should have error data");
354
355 info!("test_fresh_execute_success_error_only passed");
356 }
357
358 #[traced_test]
359 async fn test_fresh_execute_success_both_output_and_error() {
360 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
361 info!("Beginning test_fresh_execute_success_both_output_and_error");
362 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
363 .build()
364 .unwrap();
365 debug!("Mock client: {:?}", mock_client);
366
367 let tmp_dir = tempdir().unwrap();
369 let input_file_path = tmp_dir.path().join("input.json");
370 fs::write(&input_file_path, b"{\"test\":\"data\"}").unwrap();
371
372 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
373 workspace,
374 input_file_path.clone(),
375 None,
376 None,
377 );
378
379 let final_batch_id = generate_mock_batch_id_for(&input_file_path);
380
381 mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, true);
383
384 info!("Calling fresh_execute for both output and error scenario");
385 let exec_result = triple.fresh_execute(&mock_client).await;
386 debug!("exec_result: {:?}", exec_result);
387
388 assert!(exec_result.is_ok(), "Should succeed with both output and error");
389 let exec_result = exec_result.unwrap();
390 assert!(exec_result.outputs().is_some(), "Expected output data");
391 assert!(exec_result.errors().is_some(), "Expected error data");
392
393 let out_file = triple.effective_output_filename();
395 let err_file = triple.effective_error_filename();
396 assert!(out_file.exists(), "Output file should exist on disk");
397 assert!(err_file.exists(), "Error file should exist on disk");
398
399 info!("test_fresh_execute_success_both_output_and_error passed");
400 }
401
402 #[traced_test]
403 async fn test_fresh_execute_immediate_failure() {
404 let workspace = BatchWorkspace::new_temp()
405 .await
406 .expect("expected workspace construction success");
407
408 info!("Beginning test_fresh_execute_immediate_failure");
409
410 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
412 .build()
413 .unwrap();
414
415 let tmp_dir = tempdir().unwrap();
417 let raw_input_path = tmp_dir.path().join("input.txt");
418 fs::write(&raw_input_path, b"some input content").unwrap();
419
420 let real_path = std::fs::canonicalize(&raw_input_path).unwrap();
422 let final_batch_id = generate_mock_batch_id_for(&real_path);
423
424 mock_client.configure_failure(&final_batch_id, true);
426
427 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
429 workspace,
430 real_path.clone(),
431 None,
432 None,
433 );
434
435 let result = triple.fresh_execute(&mock_client).await;
437 debug!("Result from immediate_failure fresh_execute: {:?}", result);
438
439 assert!(
441 result.is_err(),
442 "fresh_execute should fail if the batch is immediately failed"
443 );
444 info!("test_fresh_execute_immediate_failure passed");
445 }
446
447 #[traced_test]
448 async fn test_fresh_execute_eventual_failure() {
449 info!("Beginning test_fresh_execute_eventual_failure");
450 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
451 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
452 .build()
453 .unwrap();
454
455 let tmp_dir = tempdir().unwrap();
456 let input_file_path = tmp_dir.path().join("input.txt");
457 fs::write(&input_file_path, b"some input content").unwrap();
458
459 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
460 workspace,
461 input_file_path.clone(),
462 None,
463 None,
464 );
465
466 let final_batch_id = generate_mock_batch_id_for(&input_file_path);
467 configure_mock_batch_for_failure(&mock_client, &final_batch_id, false);
469
470 let result = triple.fresh_execute(&mock_client).await;
471 debug!("Result from eventual_failure fresh_execute: {:?}", result);
472
473 assert!(
474 result.is_err(),
475 "fresh_execute should eventually fail when batch toggles to Failed"
476 );
477 info!("test_fresh_execute_eventual_failure passed");
478 }
479
480 #[tokio::test]
482 #[should_panic(expected = "assertion failed: input_filename.exists()")]
483 async fn test_fresh_execute_missing_input_file() {
484 info!("Beginning test_fresh_execute_missing_input_file");
485 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
486 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
487 .build()
488 .unwrap();
489
490 let tmp_dir = tempdir().unwrap();
491 let input_file_path = tmp_dir.path().join("missing_file.json");
492 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
495 workspace,
496 input_file_path.clone(),
497 None,
498 None,
499 );
500
501 triple.fresh_execute(&mock_client).await.unwrap();
503 }
504
505 #[tokio::test]
506 #[should_panic(expected = "assertion failed: !metadata_filename.exists()")]
507 async fn test_fresh_execute_metadata_already_exists() {
508 info!("Beginning test_fresh_execute_metadata_already_exists");
509 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
510 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
511 .build()
512 .unwrap();
513
514 let tmp_dir = tempdir().unwrap();
516 let input_file_path = tmp_dir.path().join("input.json");
517 fs::write(&input_file_path, b"{}").unwrap();
518
519 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
521 workspace,
522 input_file_path.clone(),
523 None,
524 None,
525 );
526
527 let meta_path = triple.effective_metadata_filename();
529 fs::write(&meta_path, b"some old metadata content").unwrap();
530
531 triple.fresh_execute(&mock_client).await.unwrap();
534 }
535
536 #[traced_test]
537 async fn test_fresh_execute_openai_error_on_upload() {
538 info!("Beginning test_fresh_execute_openai_error_on_upload");
539 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
540 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
541 .fail_on_file_create_openai_error(true) .build()
543 .unwrap();
544
545 let tmp_dir = tempdir().unwrap();
546 let input_file_path = tmp_dir.path().join("input.json");
547 fs::write(&input_file_path, b"[1,2,3]").unwrap();
548
549 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
550 workspace,
551 input_file_path.clone(),
552 None,
553 None,
554 );
555
556 let result = triple.fresh_execute(&mock_client).await;
558 debug!("Result from fresh_execute with forced OpenAI upload error: {:?}", result);
559
560 assert!(
561 result.is_err(),
562 "Should fail due to forced OpenAI error on upload"
563 );
564 info!("test_fresh_execute_openai_error_on_upload passed");
565 }
566
567 #[traced_test]
568 async fn test_fresh_execute_io_error_on_upload() {
569 info!("Beginning test_fresh_execute_io_error_on_upload");
570 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
571 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
572 .fail_on_file_create_other_error(true) .build()
574 .unwrap();
575
576 let tmp_dir = tempdir().unwrap();
577 let input_file_path = tmp_dir.path().join("input.json");
578 fs::write(&input_file_path, b"[4,5,6]").unwrap();
579
580 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
581 workspace,
582 input_file_path.clone(),
583 None,
584 None,
585 );
586
587 let result = triple.fresh_execute(&mock_client).await;
588 debug!("Result from fresh_execute with forced IO error on upload: {:?}", result);
589
590 assert!(
591 result.is_err(),
592 "Should fail due to forced I/O error on upload"
593 );
594 info!("test_fresh_execute_io_error_on_upload passed");
595 }
596
597 #[traced_test]
598 async fn test_fresh_execute_success_output_only() {
599 let workspace = BatchWorkspace::new_temp().await.unwrap();
600 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
601 .build()
602 .unwrap();
603
604 let tmp_dir = tempdir().unwrap();
606 let input_path = tmp_dir.path().join("input.json");
607 fs::write(&input_path, b"{\"test\":\"data\"}").unwrap();
608
609 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
610 workspace,
611 input_path.clone(),
612 None,
613 None,
614 );
615
616 let final_batch_id = format!("mock_batch_id_for_mock_file_id_{}", input_path.display());
618
619 mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, false);
621
622 let exec_result = triple.fresh_execute(&mock_client).await;
624 assert!(exec_result.is_ok());
625 let exec_result = exec_result.unwrap();
626 assert!(exec_result.outputs().is_some(), "Should have output data");
627 assert!(exec_result.errors().is_none(), "Should have no error data");
628 }
629}