batch_mode_batch_executor/
fresh_execute.rs

1// ---------------- [ File: batch-mode-batch-executor/src/fresh_execute.rs ]
2crate::ix!();
3
4#[async_trait]
5pub trait FreshExecute<Client,E> {
6    type Success;
7    async fn fresh_execute(&mut self, client: &Client) 
8        -> Result<Self::Success, E>;
9}
10
11#[async_trait]
12impl<C,E> FreshExecute<C,E> for BatchFileTriple
13where 
14    C: LanguageModelClientInterface<E>,
15
16    // We no longer require “BatchDownloadError: From<E>” or “BatchProcessingError: From<E>”.
17    // Instead, we do the normal “E: From<…>” for each error type that might bubble up:
18    E
19    : Debug 
20    + Display 
21    + From<BatchProcessingError>
22    + From<BatchDownloadError>
23    + From<JsonParseError>
24    + From<std::io::Error>
25    + From<OpenAIClientError>
26    + From<BatchMetadataError>,
27{
28    type Success = BatchExecutionResult;
29
30    async fn fresh_execute(&mut self, client: &C) -> Result<BatchExecutionResult, E> 
31    {
32        trace!("Inside fresh_execute for triple: {:?}", self);
33
34        assert!(self.input().is_some());
35        assert!(self.output().is_none());
36        assert!(self.error().is_none());
37        assert!(self.associated_metadata().is_none());
38
39        info!("executing fresh batch processing for triple {:#?}", self);
40
41        let input_filename    = self.effective_input_filename();
42        let output_filename   = self.effective_output_filename();
43        let error_filename    = self.effective_error_filename();
44        let metadata_filename = self.effective_metadata_filename();
45
46        info!("input_filename: {:?}",    input_filename);
47        info!("output_filename: {:?}",   output_filename);
48        info!("error_filename: {:?}",    error_filename);
49        info!("metadata_filename: {:?}", metadata_filename);
50
51        assert!(input_filename.exists());
52        assert!(!output_filename.exists());
53        assert!(!error_filename.exists());
54        assert!(!metadata_filename.exists());
55
56        // Upload file
57        let input_file = client.upload_batch_file_path(&input_filename).await?;
58        let input_file_id = input_file.id;
59
60        // Create batch
61        let batch = client.create_batch(&input_file_id).await?;
62        let batch_id = batch.id.clone();
63
64        // ** Save batch_id to metadata file **
65        let mut metadata = BatchMetadata::with_input_id_and_batch_id(&input_file_id, &batch_id);
66        metadata.save_to_file(&metadata_filename).await?;
67
68        // Wait for completion
69        let completed_batch = client.wait_for_batch_completion(&batch_id).await?;
70
71        // Download output file
72        let outputs = if let Some(output_file_id) = completed_batch.output_file_id {
73            metadata.set_output_file_id(Some(output_file_id));
74            metadata.save_to_file(&metadata_filename).await?;
75            self.download_output_file(client).await?;
76            let outputs = load_output_file(&output_filename).await?;
77            Some(outputs)
78        } else {
79            None
80        };
81
82        // Handle errors if any
83        let errors = if let Some(error_file_id) = completed_batch.error_file_id {
84            metadata.set_error_file_id(Some(error_file_id));
85            metadata.save_to_file(&metadata_filename).await?;
86            self.download_error_file(client).await?;
87            let errors = load_error_file(&error_filename).await?;
88            Some(errors)
89        } else {
90            None
91        };
92
93        Ok(BatchExecutionResult::new(outputs, errors))
94    }
95}
96
97#[cfg(test)]
98mod fresh_execute_tests {
99    use super::*;
100    use std::fs;
101    use std::path::Path;
102    use tempfile::tempdir;
103    use tracing::{debug, error, info, trace, warn};
104    use futures::executor::block_on;
105
106    /// A small helper that reproduces how the mock's create_batch
107    /// forms its "batch_id" from the `file_path`.
108    fn generate_mock_batch_id_for(input_file_path: &Path) -> String {
109        // Exactly as the mock does:
110        let input_file_id = format!("mock_file_id_{}", input_file_path.display());
111        format!("mock_batch_id_for_{}", input_file_id)
112    }
113
114    /// For "immediate_failure" or "eventual_failure" we want the final batch to end up with status=Failed.
115    /// Because the mock by default toggles from InProgress -> Completed, we can forcibly override
116    /// the final result to be "Failed" on the second retrieval. We'll do that by storing the
117    /// batch as "InProgress" initially with a custom key in `mock_batch_config`.
118    fn configure_mock_batch_for_failure(
119        mock_client: &MockLanguageModelClient<MockBatchClientError>,
120        batch_id: &str,
121        is_immediate: bool,
122    ) {
123        // If "immediate", just set it to Failed right away:
124        if is_immediate {
125            let mut guard = mock_client.batches().write().unwrap();
126            guard.insert(
127                batch_id.to_string(),
128                Batch {
129                    id: batch_id.to_string(),
130                    object: "batch".to_string(),
131                    endpoint: "/v1/chat/completions".to_string(),
132                    errors: None,
133                    input_file_id: format!("immediate_fail_for_{batch_id}"),
134                    completion_window: "24h".to_string(),
135                    status: BatchStatus::Failed,
136                    output_file_id: None,
137                    error_file_id: None,
138                    created_at: 0,
139                    in_progress_at: None,
140                    expires_at: None,
141                    finalizing_at: None,
142                    completed_at: None,
143                    failed_at: None,
144                    expired_at: None,
145                    cancelling_at: None,
146                    cancelled_at: None,
147                    request_counts: None,
148                    metadata: None,
149                },
150            );
151        } else {
152            // "eventual_failure": start InProgress, then on second retrieval => fail
153            // We'll store in mock_batch_config: fails_on_attempt_1 => set
154            {
155                let mut c = mock_client.mock_batch_config().write().unwrap();
156                c.fails_on_attempt_1_mut().insert(batch_id.to_string());
157            }
158            // Also store an initial "InProgress" batch so we can see the toggling
159            let mut guard = mock_client.batches().write().unwrap();
160            guard.insert(
161                batch_id.to_string(),
162                Batch {
163                    id: batch_id.to_string(),
164                    object: "batch".to_string(),
165                    endpoint: "/v1/chat/completions".to_string(),
166                    errors: None,
167                    input_file_id: format!("eventual_fail_for_{batch_id}"),
168                    completion_window: "24h".to_string(),
169                    status: BatchStatus::InProgress,
170                    output_file_id: None,
171                    error_file_id: None,
172                    created_at: 0,
173                    in_progress_at: None,
174                    expires_at: None,
175                    finalizing_at: None,
176                    completed_at: None,
177                    failed_at: None,
178                    expired_at: None,
179                    cancelling_at: None,
180                    cancelled_at: None,
181                    request_counts: None,
182                    metadata: None,
183                },
184            );
185        }
186    }
187
188    #[tracing::instrument(level = "trace", skip(mock_client))]
189    pub fn configure_mock_batch_for_success(
190        mock_client: &MockLanguageModelClient<MockBatchClientError>,
191        batch_id: &str,
192        want_output: bool,
193        want_error: bool,
194    ) {
195        trace!("Configuring mock batch for success with batch_id='{}', want_output={}, want_error={}", batch_id, want_output, want_error);
196
197        // Force the batch's status = Completed and set output_file_id/error_file_id if requested
198        {
199            let mut guard = mock_client.batches().write().unwrap();
200            match guard.get_mut(batch_id) {
201                Some(batch_entry) => {
202                    debug!("Found existing batch entry for batch_id='{}'; setting status=Completed.", batch_id);
203                    batch_entry.status = BatchStatus::Completed;
204                    if want_output {
205                        batch_entry.output_file_id = Some("mock_out_file_id".to_string());
206                    }
207                    if want_error {
208                        batch_entry.error_file_id = Some("mock_err_file_id".to_string());
209                    }
210                }
211                None => {
212                    warn!("No existing batch entry for batch_id='{}'; inserting a new one with Completed status.", batch_id);
213                    guard.insert(
214                        batch_id.to_string(),
215                        Batch {
216                            id: batch_id.to_string(),
217                            object: "batch".to_string(),
218                            endpoint: "/v1/chat/completions".to_string(),
219                            errors: None,
220                            input_file_id: "inserted_dummy".to_string(),
221                            completion_window: "24h".to_string(),
222                            status: BatchStatus::Completed,
223                            output_file_id: if want_output {
224                                Some("mock_out_file_id".to_string())
225                            } else {
226                                None
227                            },
228                            error_file_id: if want_error {
229                                Some("mock_err_file_id".to_string())
230                            } else {
231                                None
232                            },
233                            created_at: 0,
234                            in_progress_at: None,
235                            expires_at: None,
236                            finalizing_at: None,
237                            completed_at: None,
238                            failed_at: None,
239                            expired_at: None,
240                            cancelling_at: None,
241                            cancelled_at: None,
242                            request_counts: None,
243                            metadata: None,
244                        },
245                    );
246                }
247            }
248        }
249
250        // Insert the corresponding file contents into the mock's "files" map so
251        // that calls to `file_content("mock_out_file_id")` or `file_content("mock_err_file_id")`
252        // will return valid JSON that can be parsed as BatchResponseRecord.
253        {
254            let mut files_guard = mock_client.files().write().unwrap();
255
256            if want_output {
257                debug!("Inserting mock_out_file_id with a valid BatchResponseRecord JSON line.");
258                files_guard.insert(
259                    "mock_out_file_id".to_string(),
260                    Bytes::from(
261    r#"{
262      "id": "batch_req_mock_output",
263      "custom_id": "mock_out",
264      "response": {
265        "status_code": 200,
266        "request_id": "resp_req_mock_output",
267        "body": {
268          "id": "success-id",
269          "object": "chat.completion",
270          "created": 0,
271          "model": "test-model",
272          "choices": [],
273          "usage": {
274            "prompt_tokens": 0,
275            "completion_tokens": 0,
276            "total_tokens": 0
277          }
278        }
279      },
280      "error": null
281    }"#,
282                    ),
283                );
284            }
285
286            if want_error {
287                debug!("Inserting mock_err_file_id with a valid BatchResponseRecord JSON line (status=400).");
288                files_guard.insert(
289                    "mock_err_file_id".to_string(),
290                    Bytes::from(
291    r#"{
292      "id": "batch_req_mock_error",
293      "custom_id": "mock_err",
294      "response": {
295        "status_code": 400,
296        "request_id": "resp_req_mock_error",
297        "body": {
298          "error": {
299            "message": "Some error message",
300            "type": "test_error",
301            "param": null,
302            "code": null
303          }
304        }
305      },
306      "error": null
307    }"#,
308                    ),
309                );
310            }
311        }
312
313        trace!("configure_mock_batch_for_success done for batch_id='{}'", batch_id);
314    }
315
316    #[traced_test]
317    async fn test_fresh_execute_success_error_only() {
318        info!("Beginning test_fresh_execute_success_error_only");
319        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
320        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
321            .build()
322            .unwrap();
323        debug!("Constructed mock client: {:?}", mock_client);
324
325        // We'll create a local input file
326        let tmp_dir = tempdir().expect("Failed to create temp dir");
327        let input_file_path = tmp_dir.path().join("input.json");
328        fs::write(&input_file_path, b"{}").unwrap();
329
330        // Create the triple
331        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
332            workspace,
333            input_file_path.clone(),
334            None,
335            None,
336        );
337
338        // The real batch_id is "mock_batch_id_for_mock_file_id_{canonical_path}", 
339        // so we do:
340        let final_batch_id = generate_mock_batch_id_for(&input_file_path);
341
342        // Here is the important fix:
343        // We want the final to be Completed with *only* error_file_id => (false, true)
344        mock_client.configure_inprogress_then_complete_with(&final_batch_id, false, true);
345
346        // Now call fresh_execute
347        let exec_result = triple.fresh_execute(&mock_client).await;
348        debug!("Result from fresh_execute: {:?}", exec_result);
349
350        assert!(exec_result.is_ok(), "Should succeed with error-only scenario");
351        let result = exec_result.unwrap();
352        assert!(result.outputs().is_none(), "No output data expected");
353        assert!(result.errors().is_some(), "Should have error data");
354
355        info!("test_fresh_execute_success_error_only passed");
356    }
357
358    #[traced_test]
359    async fn test_fresh_execute_success_both_output_and_error() {
360        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
361        info!("Beginning test_fresh_execute_success_both_output_and_error");
362        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
363            .build()
364            .unwrap();
365        debug!("Mock client: {:?}", mock_client);
366
367        // local input file
368        let tmp_dir = tempdir().unwrap();
369        let input_file_path = tmp_dir.path().join("input.json");
370        fs::write(&input_file_path, b"{\"test\":\"data\"}").unwrap();
371
372        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
373            workspace,
374            input_file_path.clone(),
375            None,
376            None,
377        );
378
379        let final_batch_id = generate_mock_batch_id_for(&input_file_path);
380
381        // We want the final to be Completed with *both* output and error => (true, true)
382        mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, true);
383
384        info!("Calling fresh_execute for both output and error scenario");
385        let exec_result = triple.fresh_execute(&mock_client).await;
386        debug!("exec_result: {:?}", exec_result);
387
388        assert!(exec_result.is_ok(), "Should succeed with both output and error");
389        let exec_result = exec_result.unwrap();
390        assert!(exec_result.outputs().is_some(),  "Expected output data");
391        assert!(exec_result.errors().is_some(),   "Expected error data");
392
393        // Confirm the local JSONL files actually got written:
394        let out_file = triple.effective_output_filename();
395        let err_file = triple.effective_error_filename();
396        assert!(out_file.exists(), "Output file should exist on disk");
397        assert!(err_file.exists(), "Error file should exist on disk");
398
399        info!("test_fresh_execute_success_both_output_and_error passed");
400    }
401
402    #[traced_test]
403    async fn test_fresh_execute_immediate_failure() {
404        let workspace = BatchWorkspace::new_temp()
405            .await
406            .expect("expected workspace construction success");
407
408        info!("Beginning test_fresh_execute_immediate_failure");
409
410        // Build the mock client:
411        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
412            .build()
413            .unwrap();
414
415        // Create an input file:
416        let tmp_dir = tempdir().unwrap();
417        let raw_input_path = tmp_dir.path().join("input.txt");
418        fs::write(&raw_input_path, b"some input content").unwrap();
419
420        // Canonicalize so the generated batch_id will match what the mock uses:
421        let real_path = std::fs::canonicalize(&raw_input_path).unwrap();
422        let final_batch_id = generate_mock_batch_id_for(&real_path);
423
424        // Mark that batch as immediately Failed BEFORE we do fresh_execute:
425        mock_client.configure_failure(&final_batch_id, /*is_immediate=*/true);
426
427        // Now reference that same real_path in the triple:
428        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
429            workspace,
430            real_path.clone(),
431            None,
432            None,
433        );
434
435        // Because we forcibly set it to "Failed" above, fresh_execute should return an error:
436        let result = triple.fresh_execute(&mock_client).await;
437        debug!("Result from immediate_failure fresh_execute: {:?}", result);
438
439        // We expect an error because the batch was "Failed" from the start
440        assert!(
441            result.is_err(),
442            "fresh_execute should fail if the batch is immediately failed"
443        );
444        info!("test_fresh_execute_immediate_failure passed");
445    }
446
447    #[traced_test]
448    async fn test_fresh_execute_eventual_failure() {
449        info!("Beginning test_fresh_execute_eventual_failure");
450        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
451        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
452            .build()
453            .unwrap();
454
455        let tmp_dir = tempdir().unwrap();
456        let input_file_path = tmp_dir.path().join("input.txt");
457        fs::write(&input_file_path, b"some input content").unwrap();
458
459        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
460            workspace,
461            input_file_path.clone(),
462            None,
463            None,
464        );
465
466        let final_batch_id = generate_mock_batch_id_for(&input_file_path);
467        // Mark that batch as eventually failing on the second retrieval
468        configure_mock_batch_for_failure(&mock_client, &final_batch_id, /*is_immediate=*/false);
469
470        let result = triple.fresh_execute(&mock_client).await;
471        debug!("Result from eventual_failure fresh_execute: {:?}", result);
472
473        assert!(
474            result.is_err(),
475            "fresh_execute should eventually fail when batch toggles to Failed"
476        );
477        info!("test_fresh_execute_eventual_failure passed");
478    }
479
480    /// Instead of `catch_unwind`, we rely on `#[should_panic(expected)]` 
481    #[tokio::test]
482    #[should_panic(expected = "assertion failed: input_filename.exists()")]
483    async fn test_fresh_execute_missing_input_file() {
484        info!("Beginning test_fresh_execute_missing_input_file");
485        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
486        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
487            .build()
488            .unwrap();
489
490        let tmp_dir = tempdir().unwrap();
491        let input_file_path = tmp_dir.path().join("missing_file.json");
492        // We do NOT actually create the file => it doesn't exist.
493
494        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
495            workspace,
496            input_file_path.clone(),
497            None,
498            None,
499        );
500
501        // The code in `fresh_execute` does `assert!(input_filename.exists())`.
502        triple.fresh_execute(&mock_client).await.unwrap();
503    }
504
505    #[tokio::test]
506    #[should_panic(expected = "assertion failed: !metadata_filename.exists()")]
507    async fn test_fresh_execute_metadata_already_exists() {
508        info!("Beginning test_fresh_execute_metadata_already_exists");
509        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
510        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
511            .build()
512            .unwrap();
513
514        // We'll have a valid input file:
515        let tmp_dir = tempdir().unwrap();
516        let input_file_path = tmp_dir.path().join("input.json");
517        fs::write(&input_file_path, b"{}").unwrap();
518
519        // We'll create a triple referencing it:
520        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
521            workspace,
522            input_file_path.clone(),
523            None,
524            None,
525        );
526
527        // Then forcibly create the metadata file so it already exists:
528        let meta_path = triple.effective_metadata_filename();
529        fs::write(&meta_path, b"some old metadata content").unwrap();
530
531        // Because fresh_execute asserts that the metadata file does NOT exist,
532        // we expect a panic:
533        triple.fresh_execute(&mock_client).await.unwrap();
534    }
535
536    #[traced_test]
537    async fn test_fresh_execute_openai_error_on_upload() {
538        info!("Beginning test_fresh_execute_openai_error_on_upload");
539        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
540        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
541            .fail_on_file_create_openai_error(true) // forcibly cause an OpenAI error
542            .build()
543            .unwrap();
544
545        let tmp_dir = tempdir().unwrap();
546        let input_file_path = tmp_dir.path().join("input.json");
547        fs::write(&input_file_path, b"[1,2,3]").unwrap();
548
549        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
550            workspace,
551            input_file_path.clone(),
552            None,
553            None,
554        );
555
556        // The first step is to upload the batch file => that triggers an OpenAI error
557        let result = triple.fresh_execute(&mock_client).await;
558        debug!("Result from fresh_execute with forced OpenAI upload error: {:?}", result);
559
560        assert!(
561            result.is_err(),
562            "Should fail due to forced OpenAI error on upload"
563        );
564        info!("test_fresh_execute_openai_error_on_upload passed");
565    }
566
567    #[traced_test]
568    async fn test_fresh_execute_io_error_on_upload() {
569        info!("Beginning test_fresh_execute_io_error_on_upload");
570        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
571        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
572            .fail_on_file_create_other_error(true) // forcibly cause an IO error
573            .build()
574            .unwrap();
575
576        let tmp_dir = tempdir().unwrap();
577        let input_file_path = tmp_dir.path().join("input.json");
578        fs::write(&input_file_path, b"[4,5,6]").unwrap();
579
580        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
581            workspace,
582            input_file_path.clone(),
583            None,
584            None,
585        );
586
587        let result = triple.fresh_execute(&mock_client).await;
588        debug!("Result from fresh_execute with forced IO error on upload: {:?}", result);
589
590        assert!(
591            result.is_err(),
592            "Should fail due to forced I/O error on upload"
593        );
594        info!("test_fresh_execute_io_error_on_upload passed");
595    }
596
597    #[traced_test]
598    async fn test_fresh_execute_success_output_only() {
599        let workspace = BatchWorkspace::new_temp().await.unwrap();
600        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
601            .build()
602            .unwrap();
603
604        // create local input file
605        let tmp_dir = tempdir().unwrap();
606        let input_path = tmp_dir.path().join("input.json");
607        fs::write(&input_path, b"{\"test\":\"data\"}").unwrap();
608
609        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
610            workspace,
611            input_path.clone(),
612            None,
613            None,
614        );
615
616        // The mock batch_id is `mock_batch_id_for_mock_file_id_{input_path}`
617        let final_batch_id = format!("mock_batch_id_for_mock_file_id_{}", input_path.display());
618
619        // -> This sets an InProgress batch + sets planned_completions to (true, false).
620        mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, false);
621
622        // Now do fresh_execute => we expect outputs but no errors
623        let exec_result = triple.fresh_execute(&mock_client).await;
624        assert!(exec_result.is_ok());
625        let exec_result = exec_result.unwrap();
626        assert!(exec_result.outputs().is_some(), "Should have output data");
627        assert!(exec_result.errors().is_none(), "Should have no error data");
628    }
629}