batch_mode_batch_executor/
fresh_execute.rs

1// ---------------- [ File: batch-mode-batch-executor/src/fresh_execute.rs ]
2crate::ix!();
3
4#[async_trait]
5pub trait FreshExecute<Client,E> {
6    type Success;
7    async fn fresh_execute(&mut self, client: &Client) 
8        -> Result<Self::Success, E>;
9}
10
11#[async_trait]
12impl<C,E> FreshExecute<C,E> for BatchFileTriple
13where 
14    C: LanguageModelClientInterface<E>,
15
16    // We no longer require “BatchDownloadError: From<E>” or “BatchProcessingError: From<E>”.
17    // Instead, we do the normal “E: From<…>” for each error type that might bubble up:
18    E
19    : Debug 
20    + Display 
21    + From<BatchProcessingError>
22    + From<BatchDownloadError>
23    + From<JsonParseError>
24    + From<std::io::Error>
25    + From<OpenAIClientError>
26    + From<BatchMetadataError>,
27{
28    type Success = BatchExecutionResult;
29
30    async fn fresh_execute(&mut self, client: &C) -> Result<BatchExecutionResult, E> 
31    {
32        trace!("Inside fresh_execute for triple: {:?}", self);
33
34        assert!(self.input().is_some());
35        assert!(self.output().is_none());
36        assert!(self.error().is_none());
37        assert!(self.associated_metadata().is_none());
38        assert!(self.seed_manifest().is_some());
39
40        info!("executing fresh batch processing for triple {:#?}", self);
41
42        let input_filename         = self.effective_input_filename();
43        let output_filename        = self.effective_output_filename();
44        let error_filename         = self.effective_error_filename();
45        let metadata_filename      = self.effective_metadata_filename();
46        let seed_manifest_filename = self.effective_seed_manifest_filename();
47
48        info!("input_filename: {:?}",    input_filename);
49        info!("output_filename: {:?}",   output_filename);
50        info!("error_filename: {:?}",    error_filename);
51        info!("metadata_filename: {:?}", metadata_filename);
52        info!("seed_manifest_filename: {:?}", seed_manifest_filename);
53
54        assert!(input_filename.exists());
55        assert!(!output_filename.exists());
56        assert!(!error_filename.exists());
57        assert!(!metadata_filename.exists());
58        assert!(seed_manifest_filename.exists());
59
60        // Upload file
61        let input_file = client.upload_batch_file_path(&input_filename).await?;
62        let input_file_id = input_file.id;
63
64        // Create batch
65        let batch = client.create_batch(&input_file_id).await?;
66        let batch_id = batch.id.clone();
67
68        // ** Save batch_id to metadata file **
69        let mut metadata = BatchMetadata::with_input_id_and_batch_id(&input_file_id, &batch_id);
70        metadata.save_to_file(&metadata_filename).await?;
71
72        // Wait for completion
73        let completed_batch = client.wait_for_batch_completion(&batch_id).await?;
74
75        // Download output file
76        let outputs = if let Some(output_file_id) = completed_batch.output_file_id {
77            metadata.set_output_file_id(Some(output_file_id));
78            metadata.save_to_file(&metadata_filename).await?;
79            self.download_output_file(client).await?;
80            let outputs = load_output_file(&output_filename).await?;
81            Some(outputs)
82        } else {
83            None
84        };
85
86        // Handle errors if any
87        let errors = if let Some(error_file_id) = completed_batch.error_file_id {
88            metadata.set_error_file_id(Some(error_file_id));
89            metadata.save_to_file(&metadata_filename).await?;
90            self.download_error_file(client).await?;
91            let errors = load_error_file(&error_filename).await?;
92            Some(errors)
93        } else {
94            None
95        };
96
97        Ok(BatchExecutionResult::new(outputs, errors))
98    }
99}
100
101#[cfg(test)]
102mod fresh_execute_tests {
103    use super::*;
104    use std::fs;
105    use std::path::Path;
106    use tempfile::tempdir;
107    use tracing::{debug, error, info, trace, warn};
108    use futures::executor::block_on;
109
110    /// A small helper that reproduces how the mock's create_batch
111    /// forms its "batch_id" from the `file_path`.
112    fn generate_mock_batch_id_for(input_file_path: &Path) -> String {
113        // Exactly as the mock does:
114        let input_file_id = format!("mock_file_id_{}", input_file_path.display());
115        format!("mock_batch_id_for_{}", input_file_id)
116    }
117
118    /// For "immediate_failure" or "eventual_failure" we want the final batch to end up with status=Failed.
119    /// Because the mock by default toggles from InProgress -> Completed, we can forcibly override
120    /// the final result to be "Failed" on the second retrieval. We'll do that by storing the
121    /// batch as "InProgress" initially with a custom key in `mock_batch_config`.
122    fn configure_mock_batch_for_failure(
123        mock_client: &MockLanguageModelClient<MockBatchClientError>,
124        batch_id: &str,
125        is_immediate: bool,
126    ) {
127        // If "immediate", just set it to Failed right away:
128        if is_immediate {
129            let mut guard = mock_client.batches().write().unwrap();
130            guard.insert(
131                batch_id.to_string(),
132                Batch {
133                    id: batch_id.to_string(),
134                    object: "batch".to_string(),
135                    endpoint: "/v1/chat/completions".to_string(),
136                    errors: None,
137                    input_file_id: format!("immediate_fail_for_{batch_id}"),
138                    completion_window: "24h".to_string(),
139                    status: BatchStatus::Failed,
140                    output_file_id: None,
141                    error_file_id: None,
142                    created_at: 0,
143                    in_progress_at: None,
144                    expires_at: None,
145                    finalizing_at: None,
146                    completed_at: None,
147                    failed_at: None,
148                    expired_at: None,
149                    cancelling_at: None,
150                    cancelled_at: None,
151                    request_counts: None,
152                    metadata: None,
153                },
154            );
155        } else {
156            // "eventual_failure": start InProgress, then on second retrieval => fail
157            // We'll store in mock_batch_config: fails_on_attempt_1 => set
158            {
159                let mut c = mock_client.mock_batch_config().write().unwrap();
160                c.fails_on_attempt_1_mut().insert(batch_id.to_string());
161            }
162            // Also store an initial "InProgress" batch so we can see the toggling
163            let mut guard = mock_client.batches().write().unwrap();
164            guard.insert(
165                batch_id.to_string(),
166                Batch {
167                    id: batch_id.to_string(),
168                    object: "batch".to_string(),
169                    endpoint: "/v1/chat/completions".to_string(),
170                    errors: None,
171                    input_file_id: format!("eventual_fail_for_{batch_id}"),
172                    completion_window: "24h".to_string(),
173                    status: BatchStatus::InProgress,
174                    output_file_id: None,
175                    error_file_id: None,
176                    created_at: 0,
177                    in_progress_at: None,
178                    expires_at: None,
179                    finalizing_at: None,
180                    completed_at: None,
181                    failed_at: None,
182                    expired_at: None,
183                    cancelling_at: None,
184                    cancelled_at: None,
185                    request_counts: None,
186                    metadata: None,
187                },
188            );
189        }
190    }
191
192    #[tracing::instrument(level = "trace", skip(mock_client))]
193    pub fn configure_mock_batch_for_success(
194        mock_client: &MockLanguageModelClient<MockBatchClientError>,
195        batch_id: &str,
196        want_output: bool,
197        want_error: bool,
198    ) {
199        trace!("Configuring mock batch for success with batch_id='{}', want_output={}, want_error={}", batch_id, want_output, want_error);
200
201        // Force the batch's status = Completed and set output_file_id/error_file_id if requested
202        {
203            let mut guard = mock_client.batches().write().unwrap();
204            match guard.get_mut(batch_id) {
205                Some(batch_entry) => {
206                    debug!("Found existing batch entry for batch_id='{}'; setting status=Completed.", batch_id);
207                    batch_entry.status = BatchStatus::Completed;
208                    if want_output {
209                        batch_entry.output_file_id = Some("mock_out_file_id".to_string());
210                    }
211                    if want_error {
212                        batch_entry.error_file_id = Some("mock_err_file_id".to_string());
213                    }
214                }
215                None => {
216                    warn!("No existing batch entry for batch_id='{}'; inserting a new one with Completed status.", batch_id);
217                    guard.insert(
218                        batch_id.to_string(),
219                        Batch {
220                            id: batch_id.to_string(),
221                            object: "batch".to_string(),
222                            endpoint: "/v1/chat/completions".to_string(),
223                            errors: None,
224                            input_file_id: "inserted_dummy".to_string(),
225                            completion_window: "24h".to_string(),
226                            status: BatchStatus::Completed,
227                            output_file_id: if want_output {
228                                Some("mock_out_file_id".to_string())
229                            } else {
230                                None
231                            },
232                            error_file_id: if want_error {
233                                Some("mock_err_file_id".to_string())
234                            } else {
235                                None
236                            },
237                            created_at: 0,
238                            in_progress_at: None,
239                            expires_at: None,
240                            finalizing_at: None,
241                            completed_at: None,
242                            failed_at: None,
243                            expired_at: None,
244                            cancelling_at: None,
245                            cancelled_at: None,
246                            request_counts: None,
247                            metadata: None,
248                        },
249                    );
250                }
251            }
252        }
253
254        // Insert the corresponding file contents into the mock's "files" map so
255        // that calls to `file_content("mock_out_file_id")` or `file_content("mock_err_file_id")`
256        // will return valid JSON that can be parsed as BatchResponseRecord.
257        {
258            let mut files_guard = mock_client.files().write().unwrap();
259
260            if want_output {
261                debug!("Inserting mock_out_file_id with a valid BatchResponseRecord JSON line.");
262                files_guard.insert(
263                    "mock_out_file_id".to_string(),
264                    Bytes::from(
265    r#"{
266      "id": "batch_req_mock_output",
267      "custom_id": "mock_out",
268      "response": {
269        "status_code": 200,
270        "request_id": "resp_req_mock_output",
271        "body": {
272          "id": "success-id",
273          "object": "chat.completion",
274          "created": 0,
275          "model": "test-model",
276          "choices": [],
277          "usage": {
278            "prompt_tokens": 0,
279            "completion_tokens": 0,
280            "total_tokens": 0
281          }
282        }
283      },
284      "error": null
285    }"#,
286                    ),
287                );
288            }
289
290            if want_error {
291                debug!("Inserting mock_err_file_id with a valid BatchResponseRecord JSON line (status=400).");
292                files_guard.insert(
293                    "mock_err_file_id".to_string(),
294                    Bytes::from(
295    r#"{
296      "id": "batch_req_mock_error",
297      "custom_id": "mock_err",
298      "response": {
299        "status_code": 400,
300        "request_id": "resp_req_mock_error",
301        "body": {
302          "error": {
303            "message": "Some error message",
304            "type": "test_error",
305            "param": null,
306            "code": null
307          }
308        }
309      },
310      "error": null
311    }"#,
312                    ),
313                );
314            }
315        }
316
317        trace!("configure_mock_batch_for_success done for batch_id='{}'", batch_id);
318    }
319
320    #[traced_test]
321    async fn test_fresh_execute_success_error_only() {
322        info!("Beginning test_fresh_execute_success_error_only");
323        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
324        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
325            .build()
326            .unwrap();
327        debug!("Constructed mock client: {:?}", mock_client);
328
329        // We'll create a local input file
330        let tmp_dir = tempdir().expect("Failed to create temp dir");
331        let input_file_path = tmp_dir.path().join("input.json");
332        fs::write(&input_file_path, b"{}").unwrap();
333
334        // Create the triple
335        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
336            workspace,
337            input_file_path.clone(),
338            None,
339            None,
340        );
341
342        // The real batch_id is "mock_batch_id_for_mock_file_id_{canonical_path}", 
343        // so we do:
344        let final_batch_id = generate_mock_batch_id_for(&input_file_path);
345
346        // Here is the important fix:
347        // We want the final to be Completed with *only* error_file_id => (false, true)
348        mock_client.configure_inprogress_then_complete_with(&final_batch_id, false, true);
349
350        // Now call fresh_execute
351        let exec_result = triple.fresh_execute(&mock_client).await;
352        debug!("Result from fresh_execute: {:?}", exec_result);
353
354        assert!(exec_result.is_ok(), "Should succeed with error-only scenario");
355        let result = exec_result.unwrap();
356        assert!(result.outputs().is_none(), "No output data expected");
357        assert!(result.errors().is_some(), "Should have error data");
358
359        info!("test_fresh_execute_success_error_only passed");
360    }
361
362    #[traced_test]
363    async fn test_fresh_execute_success_both_output_and_error() {
364        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
365        info!("Beginning test_fresh_execute_success_both_output_and_error");
366        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
367            .build()
368            .unwrap();
369        debug!("Mock client: {:?}", mock_client);
370
371        // local input file
372        let tmp_dir = tempdir().unwrap();
373        let input_file_path = tmp_dir.path().join("input.json");
374        fs::write(&input_file_path, b"{\"test\":\"data\"}").unwrap();
375
376        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
377            workspace,
378            input_file_path.clone(),
379            None,
380            None,
381        );
382
383        let final_batch_id = generate_mock_batch_id_for(&input_file_path);
384
385        // We want the final to be Completed with *both* output and error => (true, true)
386        mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, true);
387
388        info!("Calling fresh_execute for both output and error scenario");
389        let exec_result = triple.fresh_execute(&mock_client).await;
390        debug!("exec_result: {:?}", exec_result);
391
392        assert!(exec_result.is_ok(), "Should succeed with both output and error");
393        let exec_result = exec_result.unwrap();
394        assert!(exec_result.outputs().is_some(),  "Expected output data");
395        assert!(exec_result.errors().is_some(),   "Expected error data");
396
397        // Confirm the local JSONL files actually got written:
398        let out_file = triple.effective_output_filename();
399        let err_file = triple.effective_error_filename();
400        assert!(out_file.exists(), "Output file should exist on disk");
401        assert!(err_file.exists(), "Error file should exist on disk");
402
403        info!("test_fresh_execute_success_both_output_and_error passed");
404    }
405
406    #[traced_test]
407    async fn test_fresh_execute_immediate_failure() {
408        let workspace = BatchWorkspace::new_temp()
409            .await
410            .expect("expected workspace construction success");
411
412        info!("Beginning test_fresh_execute_immediate_failure");
413
414        // Build the mock client:
415        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
416            .build()
417            .unwrap();
418
419        // Create an input file:
420        let tmp_dir = tempdir().unwrap();
421        let raw_input_path = tmp_dir.path().join("input.txt");
422        fs::write(&raw_input_path, b"some input content").unwrap();
423
424        // Canonicalize so the generated batch_id will match what the mock uses:
425        let real_path = std::fs::canonicalize(&raw_input_path).unwrap();
426        let final_batch_id = generate_mock_batch_id_for(&real_path);
427
428        // Mark that batch as immediately Failed BEFORE we do fresh_execute:
429        mock_client.configure_failure(&final_batch_id, /*is_immediate=*/true);
430
431        // Now reference that same real_path in the triple:
432        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
433            workspace,
434            real_path.clone(),
435            None,
436            None,
437        );
438
439        // Because we forcibly set it to "Failed" above, fresh_execute should return an error:
440        let result = triple.fresh_execute(&mock_client).await;
441        debug!("Result from immediate_failure fresh_execute: {:?}", result);
442
443        // We expect an error because the batch was "Failed" from the start
444        assert!(
445            result.is_err(),
446            "fresh_execute should fail if the batch is immediately failed"
447        );
448        info!("test_fresh_execute_immediate_failure passed");
449    }
450
451    #[traced_test]
452    async fn test_fresh_execute_eventual_failure() {
453        info!("Beginning test_fresh_execute_eventual_failure");
454        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
455        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
456            .build()
457            .unwrap();
458
459        let tmp_dir = tempdir().unwrap();
460        let input_file_path = tmp_dir.path().join("input.txt");
461        fs::write(&input_file_path, b"some input content").unwrap();
462
463        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
464            workspace,
465            input_file_path.clone(),
466            None,
467            None,
468        );
469
470        let final_batch_id = generate_mock_batch_id_for(&input_file_path);
471        // Mark that batch as eventually failing on the second retrieval
472        configure_mock_batch_for_failure(&mock_client, &final_batch_id, /*is_immediate=*/false);
473
474        let result = triple.fresh_execute(&mock_client).await;
475        debug!("Result from eventual_failure fresh_execute: {:?}", result);
476
477        assert!(
478            result.is_err(),
479            "fresh_execute should eventually fail when batch toggles to Failed"
480        );
481        info!("test_fresh_execute_eventual_failure passed");
482    }
483
484    /// Instead of `catch_unwind`, we rely on `#[should_panic(expected)]` 
485    #[tokio::test]
486    #[should_panic(expected = "assertion failed: input_filename.exists()")]
487    async fn test_fresh_execute_missing_input_file() {
488        info!("Beginning test_fresh_execute_missing_input_file");
489        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
490        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
491            .build()
492            .unwrap();
493
494        let tmp_dir = tempdir().unwrap();
495        let input_file_path = tmp_dir.path().join("missing_file.json");
496        // We do NOT actually create the file => it doesn't exist.
497
498        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
499            workspace,
500            input_file_path.clone(),
501            None,
502            None,
503        );
504
505        // The code in `fresh_execute` does `assert!(input_filename.exists())`.
506        triple.fresh_execute(&mock_client).await.unwrap();
507    }
508
509    #[tokio::test]
510    #[should_panic(expected = "assertion failed: !metadata_filename.exists()")]
511    async fn test_fresh_execute_metadata_already_exists() {
512        info!("Beginning test_fresh_execute_metadata_already_exists");
513        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
514        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
515            .build()
516            .unwrap();
517
518        // We'll have a valid input file:
519        let tmp_dir = tempdir().unwrap();
520        let input_file_path = tmp_dir.path().join("input.json");
521        fs::write(&input_file_path, b"{}").unwrap();
522
523        // We'll create a triple referencing it:
524        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
525            workspace,
526            input_file_path.clone(),
527            None,
528            None,
529        );
530
531        // Then forcibly create the metadata file so it already exists:
532        let meta_path = triple.effective_metadata_filename();
533        fs::write(&meta_path, b"some old metadata content").unwrap();
534
535        // Because fresh_execute asserts that the metadata file does NOT exist,
536        // we expect a panic:
537        triple.fresh_execute(&mock_client).await.unwrap();
538    }
539
540    #[traced_test]
541    async fn test_fresh_execute_openai_error_on_upload() {
542        info!("Beginning test_fresh_execute_openai_error_on_upload");
543        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
544        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
545            .fail_on_file_create_openai_error(true) // forcibly cause an OpenAI error
546            .build()
547            .unwrap();
548
549        let tmp_dir = tempdir().unwrap();
550        let input_file_path = tmp_dir.path().join("input.json");
551        fs::write(&input_file_path, b"[1,2,3]").unwrap();
552
553        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
554            workspace,
555            input_file_path.clone(),
556            None,
557            None,
558        );
559
560        // The first step is to upload the batch file => that triggers an OpenAI error
561        let result = triple.fresh_execute(&mock_client).await;
562        debug!("Result from fresh_execute with forced OpenAI upload error: {:?}", result);
563
564        assert!(
565            result.is_err(),
566            "Should fail due to forced OpenAI error on upload"
567        );
568        info!("test_fresh_execute_openai_error_on_upload passed");
569    }
570
571    #[traced_test]
572    async fn test_fresh_execute_io_error_on_upload() {
573        info!("Beginning test_fresh_execute_io_error_on_upload");
574        let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
575        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
576            .fail_on_file_create_other_error(true) // forcibly cause an IO error
577            .build()
578            .unwrap();
579
580        let tmp_dir = tempdir().unwrap();
581        let input_file_path = tmp_dir.path().join("input.json");
582        fs::write(&input_file_path, b"[4,5,6]").unwrap();
583
584        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
585            workspace,
586            input_file_path.clone(),
587            None,
588            None,
589        );
590
591        let result = triple.fresh_execute(&mock_client).await;
592        debug!("Result from fresh_execute with forced IO error on upload: {:?}", result);
593
594        assert!(
595            result.is_err(),
596            "Should fail due to forced I/O error on upload"
597        );
598        info!("test_fresh_execute_io_error_on_upload passed");
599    }
600
601    #[traced_test]
602    async fn test_fresh_execute_success_output_only() {
603        let workspace = BatchWorkspace::new_temp().await.unwrap();
604        let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
605            .build()
606            .unwrap();
607
608        // create local input file
609        let tmp_dir = tempdir().unwrap();
610        let input_path = tmp_dir.path().join("input.json");
611        fs::write(&input_path, b"{\"test\":\"data\"}").unwrap();
612
613        let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
614            workspace,
615            input_path.clone(),
616            None,
617            None,
618        );
619
620        // The mock batch_id is `mock_batch_id_for_mock_file_id_{input_path}`
621        let final_batch_id = format!("mock_batch_id_for_mock_file_id_{}", input_path.display());
622
623        // -> This sets an InProgress batch + sets planned_completions to (true, false).
624        mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, false);
625
626        // Now do fresh_execute => we expect outputs but no errors
627        let exec_result = triple.fresh_execute(&mock_client).await;
628        assert!(exec_result.is_ok());
629        let exec_result = exec_result.unwrap();
630        assert!(exec_result.outputs().is_some(), "Should have output data");
631        assert!(exec_result.errors().is_none(), "Should have no error data");
632    }
633}