1crate::ix!();
3
4#[async_trait]
5pub trait FreshExecute<Client,E> {
6 type Success;
7 async fn fresh_execute(&mut self, client: &Client)
8 -> Result<Self::Success, E>;
9}
10
11#[async_trait]
12impl<C,E> FreshExecute<C,E> for BatchFileTriple
13where
14 C: LanguageModelClientInterface<E>,
15
16 E
19 : Debug
20 + Display
21 + From<BatchProcessingError>
22 + From<BatchDownloadError>
23 + From<JsonParseError>
24 + From<std::io::Error>
25 + From<OpenAIClientError>
26 + From<BatchMetadataError>,
27{
28 type Success = BatchExecutionResult;
29
30 async fn fresh_execute(&mut self, client: &C) -> Result<BatchExecutionResult, E>
31 {
32 trace!("Inside fresh_execute for triple: {:?}", self);
33
34 assert!(self.input().is_some());
35 assert!(self.output().is_none());
36 assert!(self.error().is_none());
37 assert!(self.associated_metadata().is_none());
38 assert!(self.seed_manifest().is_some());
39
40 info!("executing fresh batch processing for triple {:#?}", self);
41
42 let input_filename = self.effective_input_filename();
43 let output_filename = self.effective_output_filename();
44 let error_filename = self.effective_error_filename();
45 let metadata_filename = self.effective_metadata_filename();
46 let seed_manifest_filename = self.effective_seed_manifest_filename();
47
48 info!("input_filename: {:?}", input_filename);
49 info!("output_filename: {:?}", output_filename);
50 info!("error_filename: {:?}", error_filename);
51 info!("metadata_filename: {:?}", metadata_filename);
52 info!("seed_manifest_filename: {:?}", seed_manifest_filename);
53
54 assert!(input_filename.exists());
55 assert!(!output_filename.exists());
56 assert!(!error_filename.exists());
57 assert!(!metadata_filename.exists());
58 assert!(seed_manifest_filename.exists());
59
60 let input_file = client.upload_batch_file_path(&input_filename).await?;
62 let input_file_id = input_file.id;
63
64 let batch = client.create_batch(&input_file_id).await?;
66 let batch_id = batch.id.clone();
67
68 let mut metadata = BatchMetadata::with_input_id_and_batch_id(&input_file_id, &batch_id);
70 metadata.save_to_file(&metadata_filename).await?;
71
72 let completed_batch = client.wait_for_batch_completion(&batch_id).await?;
74
75 let outputs = if let Some(output_file_id) = completed_batch.output_file_id {
77 metadata.set_output_file_id(Some(output_file_id));
78 metadata.save_to_file(&metadata_filename).await?;
79 self.download_output_file(client).await?;
80 let outputs = load_output_file(&output_filename).await?;
81 Some(outputs)
82 } else {
83 None
84 };
85
86 let errors = if let Some(error_file_id) = completed_batch.error_file_id {
88 metadata.set_error_file_id(Some(error_file_id));
89 metadata.save_to_file(&metadata_filename).await?;
90 self.download_error_file(client).await?;
91 let errors = load_error_file(&error_filename).await?;
92 Some(errors)
93 } else {
94 None
95 };
96
97 Ok(BatchExecutionResult::new(outputs, errors))
98 }
99}
100
101#[cfg(test)]
102mod fresh_execute_tests {
103 use super::*;
104 use std::fs;
105 use std::path::Path;
106 use tempfile::tempdir;
107 use tracing::{debug, error, info, trace, warn};
108 use futures::executor::block_on;
109
110 fn generate_mock_batch_id_for(input_file_path: &Path) -> String {
113 let input_file_id = format!("mock_file_id_{}", input_file_path.display());
115 format!("mock_batch_id_for_{}", input_file_id)
116 }
117
118 fn configure_mock_batch_for_failure(
123 mock_client: &MockLanguageModelClient<MockBatchClientError>,
124 batch_id: &str,
125 is_immediate: bool,
126 ) {
127 if is_immediate {
129 let mut guard = mock_client.batches().write().unwrap();
130 guard.insert(
131 batch_id.to_string(),
132 Batch {
133 id: batch_id.to_string(),
134 object: "batch".to_string(),
135 endpoint: "/v1/chat/completions".to_string(),
136 errors: None,
137 input_file_id: format!("immediate_fail_for_{batch_id}"),
138 completion_window: "24h".to_string(),
139 status: BatchStatus::Failed,
140 output_file_id: None,
141 error_file_id: None,
142 created_at: 0,
143 in_progress_at: None,
144 expires_at: None,
145 finalizing_at: None,
146 completed_at: None,
147 failed_at: None,
148 expired_at: None,
149 cancelling_at: None,
150 cancelled_at: None,
151 request_counts: None,
152 metadata: None,
153 },
154 );
155 } else {
156 {
159 let mut c = mock_client.mock_batch_config().write().unwrap();
160 c.fails_on_attempt_1_mut().insert(batch_id.to_string());
161 }
162 let mut guard = mock_client.batches().write().unwrap();
164 guard.insert(
165 batch_id.to_string(),
166 Batch {
167 id: batch_id.to_string(),
168 object: "batch".to_string(),
169 endpoint: "/v1/chat/completions".to_string(),
170 errors: None,
171 input_file_id: format!("eventual_fail_for_{batch_id}"),
172 completion_window: "24h".to_string(),
173 status: BatchStatus::InProgress,
174 output_file_id: None,
175 error_file_id: None,
176 created_at: 0,
177 in_progress_at: None,
178 expires_at: None,
179 finalizing_at: None,
180 completed_at: None,
181 failed_at: None,
182 expired_at: None,
183 cancelling_at: None,
184 cancelled_at: None,
185 request_counts: None,
186 metadata: None,
187 },
188 );
189 }
190 }
191
192 #[tracing::instrument(level = "trace", skip(mock_client))]
193 pub fn configure_mock_batch_for_success(
194 mock_client: &MockLanguageModelClient<MockBatchClientError>,
195 batch_id: &str,
196 want_output: bool,
197 want_error: bool,
198 ) {
199 trace!("Configuring mock batch for success with batch_id='{}', want_output={}, want_error={}", batch_id, want_output, want_error);
200
201 {
203 let mut guard = mock_client.batches().write().unwrap();
204 match guard.get_mut(batch_id) {
205 Some(batch_entry) => {
206 debug!("Found existing batch entry for batch_id='{}'; setting status=Completed.", batch_id);
207 batch_entry.status = BatchStatus::Completed;
208 if want_output {
209 batch_entry.output_file_id = Some("mock_out_file_id".to_string());
210 }
211 if want_error {
212 batch_entry.error_file_id = Some("mock_err_file_id".to_string());
213 }
214 }
215 None => {
216 warn!("No existing batch entry for batch_id='{}'; inserting a new one with Completed status.", batch_id);
217 guard.insert(
218 batch_id.to_string(),
219 Batch {
220 id: batch_id.to_string(),
221 object: "batch".to_string(),
222 endpoint: "/v1/chat/completions".to_string(),
223 errors: None,
224 input_file_id: "inserted_dummy".to_string(),
225 completion_window: "24h".to_string(),
226 status: BatchStatus::Completed,
227 output_file_id: if want_output {
228 Some("mock_out_file_id".to_string())
229 } else {
230 None
231 },
232 error_file_id: if want_error {
233 Some("mock_err_file_id".to_string())
234 } else {
235 None
236 },
237 created_at: 0,
238 in_progress_at: None,
239 expires_at: None,
240 finalizing_at: None,
241 completed_at: None,
242 failed_at: None,
243 expired_at: None,
244 cancelling_at: None,
245 cancelled_at: None,
246 request_counts: None,
247 metadata: None,
248 },
249 );
250 }
251 }
252 }
253
254 {
258 let mut files_guard = mock_client.files().write().unwrap();
259
260 if want_output {
261 debug!("Inserting mock_out_file_id with a valid BatchResponseRecord JSON line.");
262 files_guard.insert(
263 "mock_out_file_id".to_string(),
264 Bytes::from(
265 r#"{
266 "id": "batch_req_mock_output",
267 "custom_id": "mock_out",
268 "response": {
269 "status_code": 200,
270 "request_id": "resp_req_mock_output",
271 "body": {
272 "id": "success-id",
273 "object": "chat.completion",
274 "created": 0,
275 "model": "test-model",
276 "choices": [],
277 "usage": {
278 "prompt_tokens": 0,
279 "completion_tokens": 0,
280 "total_tokens": 0
281 }
282 }
283 },
284 "error": null
285 }"#,
286 ),
287 );
288 }
289
290 if want_error {
291 debug!("Inserting mock_err_file_id with a valid BatchResponseRecord JSON line (status=400).");
292 files_guard.insert(
293 "mock_err_file_id".to_string(),
294 Bytes::from(
295 r#"{
296 "id": "batch_req_mock_error",
297 "custom_id": "mock_err",
298 "response": {
299 "status_code": 400,
300 "request_id": "resp_req_mock_error",
301 "body": {
302 "error": {
303 "message": "Some error message",
304 "type": "test_error",
305 "param": null,
306 "code": null
307 }
308 }
309 },
310 "error": null
311 }"#,
312 ),
313 );
314 }
315 }
316
317 trace!("configure_mock_batch_for_success done for batch_id='{}'", batch_id);
318 }
319
320 #[traced_test]
321 async fn test_fresh_execute_success_error_only() {
322 info!("Beginning test_fresh_execute_success_error_only");
323 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
324 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
325 .build()
326 .unwrap();
327 debug!("Constructed mock client: {:?}", mock_client);
328
329 let tmp_dir = tempdir().expect("Failed to create temp dir");
331 let input_file_path = tmp_dir.path().join("input.json");
332 fs::write(&input_file_path, b"{}").unwrap();
333
334 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
336 workspace,
337 input_file_path.clone(),
338 None,
339 None,
340 );
341
342 let final_batch_id = generate_mock_batch_id_for(&input_file_path);
345
346 mock_client.configure_inprogress_then_complete_with(&final_batch_id, false, true);
349
350 let exec_result = triple.fresh_execute(&mock_client).await;
352 debug!("Result from fresh_execute: {:?}", exec_result);
353
354 assert!(exec_result.is_ok(), "Should succeed with error-only scenario");
355 let result = exec_result.unwrap();
356 assert!(result.outputs().is_none(), "No output data expected");
357 assert!(result.errors().is_some(), "Should have error data");
358
359 info!("test_fresh_execute_success_error_only passed");
360 }
361
362 #[traced_test]
363 async fn test_fresh_execute_success_both_output_and_error() {
364 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
365 info!("Beginning test_fresh_execute_success_both_output_and_error");
366 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
367 .build()
368 .unwrap();
369 debug!("Mock client: {:?}", mock_client);
370
371 let tmp_dir = tempdir().unwrap();
373 let input_file_path = tmp_dir.path().join("input.json");
374 fs::write(&input_file_path, b"{\"test\":\"data\"}").unwrap();
375
376 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
377 workspace,
378 input_file_path.clone(),
379 None,
380 None,
381 );
382
383 let final_batch_id = generate_mock_batch_id_for(&input_file_path);
384
385 mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, true);
387
388 info!("Calling fresh_execute for both output and error scenario");
389 let exec_result = triple.fresh_execute(&mock_client).await;
390 debug!("exec_result: {:?}", exec_result);
391
392 assert!(exec_result.is_ok(), "Should succeed with both output and error");
393 let exec_result = exec_result.unwrap();
394 assert!(exec_result.outputs().is_some(), "Expected output data");
395 assert!(exec_result.errors().is_some(), "Expected error data");
396
397 let out_file = triple.effective_output_filename();
399 let err_file = triple.effective_error_filename();
400 assert!(out_file.exists(), "Output file should exist on disk");
401 assert!(err_file.exists(), "Error file should exist on disk");
402
403 info!("test_fresh_execute_success_both_output_and_error passed");
404 }
405
406 #[traced_test]
407 async fn test_fresh_execute_immediate_failure() {
408 let workspace = BatchWorkspace::new_temp()
409 .await
410 .expect("expected workspace construction success");
411
412 info!("Beginning test_fresh_execute_immediate_failure");
413
414 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
416 .build()
417 .unwrap();
418
419 let tmp_dir = tempdir().unwrap();
421 let raw_input_path = tmp_dir.path().join("input.txt");
422 fs::write(&raw_input_path, b"some input content").unwrap();
423
424 let real_path = std::fs::canonicalize(&raw_input_path).unwrap();
426 let final_batch_id = generate_mock_batch_id_for(&real_path);
427
428 mock_client.configure_failure(&final_batch_id, true);
430
431 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
433 workspace,
434 real_path.clone(),
435 None,
436 None,
437 );
438
439 let result = triple.fresh_execute(&mock_client).await;
441 debug!("Result from immediate_failure fresh_execute: {:?}", result);
442
443 assert!(
445 result.is_err(),
446 "fresh_execute should fail if the batch is immediately failed"
447 );
448 info!("test_fresh_execute_immediate_failure passed");
449 }
450
451 #[traced_test]
452 async fn test_fresh_execute_eventual_failure() {
453 info!("Beginning test_fresh_execute_eventual_failure");
454 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
455 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
456 .build()
457 .unwrap();
458
459 let tmp_dir = tempdir().unwrap();
460 let input_file_path = tmp_dir.path().join("input.txt");
461 fs::write(&input_file_path, b"some input content").unwrap();
462
463 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
464 workspace,
465 input_file_path.clone(),
466 None,
467 None,
468 );
469
470 let final_batch_id = generate_mock_batch_id_for(&input_file_path);
471 configure_mock_batch_for_failure(&mock_client, &final_batch_id, false);
473
474 let result = triple.fresh_execute(&mock_client).await;
475 debug!("Result from eventual_failure fresh_execute: {:?}", result);
476
477 assert!(
478 result.is_err(),
479 "fresh_execute should eventually fail when batch toggles to Failed"
480 );
481 info!("test_fresh_execute_eventual_failure passed");
482 }
483
484 #[tokio::test]
486 #[should_panic(expected = "assertion failed: input_filename.exists()")]
487 async fn test_fresh_execute_missing_input_file() {
488 info!("Beginning test_fresh_execute_missing_input_file");
489 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
490 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
491 .build()
492 .unwrap();
493
494 let tmp_dir = tempdir().unwrap();
495 let input_file_path = tmp_dir.path().join("missing_file.json");
496 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
499 workspace,
500 input_file_path.clone(),
501 None,
502 None,
503 );
504
505 triple.fresh_execute(&mock_client).await.unwrap();
507 }
508
509 #[tokio::test]
510 #[should_panic(expected = "assertion failed: !metadata_filename.exists()")]
511 async fn test_fresh_execute_metadata_already_exists() {
512 info!("Beginning test_fresh_execute_metadata_already_exists");
513 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
514 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
515 .build()
516 .unwrap();
517
518 let tmp_dir = tempdir().unwrap();
520 let input_file_path = tmp_dir.path().join("input.json");
521 fs::write(&input_file_path, b"{}").unwrap();
522
523 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
525 workspace,
526 input_file_path.clone(),
527 None,
528 None,
529 );
530
531 let meta_path = triple.effective_metadata_filename();
533 fs::write(&meta_path, b"some old metadata content").unwrap();
534
535 triple.fresh_execute(&mock_client).await.unwrap();
538 }
539
540 #[traced_test]
541 async fn test_fresh_execute_openai_error_on_upload() {
542 info!("Beginning test_fresh_execute_openai_error_on_upload");
543 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
544 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
545 .fail_on_file_create_openai_error(true) .build()
547 .unwrap();
548
549 let tmp_dir = tempdir().unwrap();
550 let input_file_path = tmp_dir.path().join("input.json");
551 fs::write(&input_file_path, b"[1,2,3]").unwrap();
552
553 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
554 workspace,
555 input_file_path.clone(),
556 None,
557 None,
558 );
559
560 let result = triple.fresh_execute(&mock_client).await;
562 debug!("Result from fresh_execute with forced OpenAI upload error: {:?}", result);
563
564 assert!(
565 result.is_err(),
566 "Should fail due to forced OpenAI error on upload"
567 );
568 info!("test_fresh_execute_openai_error_on_upload passed");
569 }
570
571 #[traced_test]
572 async fn test_fresh_execute_io_error_on_upload() {
573 info!("Beginning test_fresh_execute_io_error_on_upload");
574 let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
575 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
576 .fail_on_file_create_other_error(true) .build()
578 .unwrap();
579
580 let tmp_dir = tempdir().unwrap();
581 let input_file_path = tmp_dir.path().join("input.json");
582 fs::write(&input_file_path, b"[4,5,6]").unwrap();
583
584 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
585 workspace,
586 input_file_path.clone(),
587 None,
588 None,
589 );
590
591 let result = triple.fresh_execute(&mock_client).await;
592 debug!("Result from fresh_execute with forced IO error on upload: {:?}", result);
593
594 assert!(
595 result.is_err(),
596 "Should fail due to forced I/O error on upload"
597 );
598 info!("test_fresh_execute_io_error_on_upload passed");
599 }
600
601 #[traced_test]
602 async fn test_fresh_execute_success_output_only() {
603 let workspace = BatchWorkspace::new_temp().await.unwrap();
604 let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
605 .build()
606 .unwrap();
607
608 let tmp_dir = tempdir().unwrap();
610 let input_path = tmp_dir.path().join("input.json");
611 fs::write(&input_path, b"{\"test\":\"data\"}").unwrap();
612
613 let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
614 workspace,
615 input_path.clone(),
616 None,
617 None,
618 );
619
620 let final_batch_id = format!("mock_batch_id_for_mock_file_id_{}", input_path.display());
622
623 mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, false);
625
626 let exec_result = triple.fresh_execute(&mock_client).await;
628 assert!(exec_result.is_ok());
629 let exec_result = exec_result.unwrap();
630 assert!(exec_result.outputs().is_some(), "Should have output data");
631 assert!(exec_result.errors().is_none(), "Should have no error data");
632 }
633}