crate::ix!();
#[async_trait]
pub trait FreshExecute<Client,E> {
type Success;
async fn fresh_execute(&mut self, client: &Client)
-> Result<Self::Success, E>;
}
#[async_trait]
impl<C,E> FreshExecute<C,E> for BatchFileTriple
where
C: LanguageModelClientInterface<E>,
E
: Debug
+ Display
+ From<BatchProcessingError>
+ From<BatchDownloadError>
+ From<JsonParseError>
+ From<std::io::Error>
+ From<OpenAIClientError>
+ From<BatchMetadataError>,
{
type Success = BatchExecutionResult;
async fn fresh_execute(&mut self, client: &C) -> Result<BatchExecutionResult, E>
{
trace!("Inside fresh_execute for triple: {:?}", self);
assert!(self.input().is_some());
assert!(self.output().is_none());
assert!(self.error().is_none());
assert!(self.associated_metadata().is_none());
assert!(self.seed_manifest().is_some());
info!("executing fresh batch processing for triple {:#?}", self);
let input_filename = self.effective_input_filename();
let output_filename = self.effective_output_filename();
let error_filename = self.effective_error_filename();
let metadata_filename = self.effective_metadata_filename();
let seed_manifest_filename = self.effective_seed_manifest_filename();
info!("input_filename: {:?}", input_filename);
info!("output_filename: {:?}", output_filename);
info!("error_filename: {:?}", error_filename);
info!("metadata_filename: {:?}", metadata_filename);
info!("seed_manifest_filename: {:?}", seed_manifest_filename);
assert!(input_filename.exists());
assert!(!output_filename.exists());
assert!(!error_filename.exists());
assert!(!metadata_filename.exists());
assert!(seed_manifest_filename.exists());
let input_file = client.upload_batch_file_path(&input_filename).await?;
let input_file_id = input_file.id;
let batch = client.create_batch(&input_file_id).await?;
let batch_id = batch.id.clone();
let mut metadata = BatchMetadata::with_input_id_and_batch_id(&input_file_id, &batch_id);
metadata.save_to_file(&metadata_filename).await?;
let completed_batch = client.wait_for_batch_completion(&batch_id).await?;
let outputs = if let Some(output_file_id) = completed_batch.output_file_id {
metadata.set_output_file_id(Some(output_file_id));
metadata.save_to_file(&metadata_filename).await?;
self.download_output_file(client).await?;
let outputs = load_output_file(&output_filename).await?;
Some(outputs)
} else {
None
};
let errors = if let Some(error_file_id) = completed_batch.error_file_id {
metadata.set_error_file_id(Some(error_file_id));
metadata.save_to_file(&metadata_filename).await?;
self.download_error_file(client).await?;
let errors = load_error_file(&error_filename).await?;
Some(errors)
} else {
None
};
Ok(BatchExecutionResult::new(outputs, errors))
}
}
#[cfg(test)]
mod fresh_execute_tests {
use super::*;
use std::fs;
use std::path::Path;
use tempfile::tempdir;
use tracing::{debug, error, info, trace, warn};
use futures::executor::block_on;
fn generate_mock_batch_id_for(input_file_path: &Path) -> String {
let input_file_id = format!("mock_file_id_{}", input_file_path.display());
format!("mock_batch_id_for_{}", input_file_id)
}
fn configure_mock_batch_for_failure(
mock_client: &MockLanguageModelClient<MockBatchClientError>,
batch_id: &str,
is_immediate: bool,
) {
if is_immediate {
let mut guard = mock_client.batches().write().unwrap();
guard.insert(
batch_id.to_string(),
Batch {
id: batch_id.to_string(),
object: "batch".to_string(),
endpoint: "/v1/chat/completions".to_string(),
errors: None,
input_file_id: format!("immediate_fail_for_{batch_id}"),
completion_window: "24h".to_string(),
status: BatchStatus::Failed,
output_file_id: None,
error_file_id: None,
created_at: 0,
in_progress_at: None,
expires_at: None,
finalizing_at: None,
completed_at: None,
failed_at: None,
expired_at: None,
cancelling_at: None,
cancelled_at: None,
request_counts: None,
metadata: None,
},
);
} else {
{
let mut c = mock_client.mock_batch_config().write().unwrap();
c.fails_on_attempt_1_mut().insert(batch_id.to_string());
}
let mut guard = mock_client.batches().write().unwrap();
guard.insert(
batch_id.to_string(),
Batch {
id: batch_id.to_string(),
object: "batch".to_string(),
endpoint: "/v1/chat/completions".to_string(),
errors: None,
input_file_id: format!("eventual_fail_for_{batch_id}"),
completion_window: "24h".to_string(),
status: BatchStatus::InProgress,
output_file_id: None,
error_file_id: None,
created_at: 0,
in_progress_at: None,
expires_at: None,
finalizing_at: None,
completed_at: None,
failed_at: None,
expired_at: None,
cancelling_at: None,
cancelled_at: None,
request_counts: None,
metadata: None,
},
);
}
}
#[tracing::instrument(level = "trace", skip(mock_client))]
pub fn configure_mock_batch_for_success(
mock_client: &MockLanguageModelClient<MockBatchClientError>,
batch_id: &str,
want_output: bool,
want_error: bool,
) {
trace!("Configuring mock batch for success with batch_id='{}', want_output={}, want_error={}", batch_id, want_output, want_error);
{
let mut guard = mock_client.batches().write().unwrap();
match guard.get_mut(batch_id) {
Some(batch_entry) => {
debug!("Found existing batch entry for batch_id='{}'; setting status=Completed.", batch_id);
batch_entry.status = BatchStatus::Completed;
if want_output {
batch_entry.output_file_id = Some("mock_out_file_id".to_string());
}
if want_error {
batch_entry.error_file_id = Some("mock_err_file_id".to_string());
}
}
None => {
warn!("No existing batch entry for batch_id='{}'; inserting a new one with Completed status.", batch_id);
guard.insert(
batch_id.to_string(),
Batch {
id: batch_id.to_string(),
object: "batch".to_string(),
endpoint: "/v1/chat/completions".to_string(),
errors: None,
input_file_id: "inserted_dummy".to_string(),
completion_window: "24h".to_string(),
status: BatchStatus::Completed,
output_file_id: if want_output {
Some("mock_out_file_id".to_string())
} else {
None
},
error_file_id: if want_error {
Some("mock_err_file_id".to_string())
} else {
None
},
created_at: 0,
in_progress_at: None,
expires_at: None,
finalizing_at: None,
completed_at: None,
failed_at: None,
expired_at: None,
cancelling_at: None,
cancelled_at: None,
request_counts: None,
metadata: None,
},
);
}
}
}
{
let mut files_guard = mock_client.files().write().unwrap();
if want_output {
debug!("Inserting mock_out_file_id with a valid BatchResponseRecord JSON line.");
files_guard.insert(
"mock_out_file_id".to_string(),
Bytes::from(
r#"{
"id": "batch_req_mock_output",
"custom_id": "mock_out",
"response": {
"status_code": 200,
"request_id": "resp_req_mock_output",
"body": {
"id": "success-id",
"object": "chat.completion",
"created": 0,
"model": "test-model",
"choices": [],
"usage": {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0
}
}
},
"error": null
}"#,
),
);
}
if want_error {
debug!("Inserting mock_err_file_id with a valid BatchResponseRecord JSON line (status=400).");
files_guard.insert(
"mock_err_file_id".to_string(),
Bytes::from(
r#"{
"id": "batch_req_mock_error",
"custom_id": "mock_err",
"response": {
"status_code": 400,
"request_id": "resp_req_mock_error",
"body": {
"error": {
"message": "Some error message",
"type": "test_error",
"param": null,
"code": null
}
}
},
"error": null
}"#,
),
);
}
}
trace!("configure_mock_batch_for_success done for batch_id='{}'", batch_id);
}
#[traced_test]
async fn test_fresh_execute_success_error_only() {
info!("Beginning test_fresh_execute_success_error_only");
let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.build()
.unwrap();
debug!("Constructed mock client: {:?}", mock_client);
let tmp_dir = tempdir().expect("Failed to create temp dir");
let input_file_path = tmp_dir.path().join("input.json");
fs::write(&input_file_path, b"{}").unwrap();
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_file_path.clone(),
None,
None,
);
let final_batch_id = generate_mock_batch_id_for(&input_file_path);
mock_client.configure_inprogress_then_complete_with(&final_batch_id, false, true);
let exec_result = triple.fresh_execute(&mock_client).await;
debug!("Result from fresh_execute: {:?}", exec_result);
assert!(exec_result.is_ok(), "Should succeed with error-only scenario");
let result = exec_result.unwrap();
assert!(result.outputs().is_none(), "No output data expected");
assert!(result.errors().is_some(), "Should have error data");
info!("test_fresh_execute_success_error_only passed");
}
#[traced_test]
async fn test_fresh_execute_success_both_output_and_error() {
let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
info!("Beginning test_fresh_execute_success_both_output_and_error");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.build()
.unwrap();
debug!("Mock client: {:?}", mock_client);
let tmp_dir = tempdir().unwrap();
let input_file_path = tmp_dir.path().join("input.json");
fs::write(&input_file_path, b"{\"test\":\"data\"}").unwrap();
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_file_path.clone(),
None,
None,
);
let final_batch_id = generate_mock_batch_id_for(&input_file_path);
mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, true);
info!("Calling fresh_execute for both output and error scenario");
let exec_result = triple.fresh_execute(&mock_client).await;
debug!("exec_result: {:?}", exec_result);
assert!(exec_result.is_ok(), "Should succeed with both output and error");
let exec_result = exec_result.unwrap();
assert!(exec_result.outputs().is_some(), "Expected output data");
assert!(exec_result.errors().is_some(), "Expected error data");
let out_file = triple.effective_output_filename();
let err_file = triple.effective_error_filename();
assert!(out_file.exists(), "Output file should exist on disk");
assert!(err_file.exists(), "Error file should exist on disk");
info!("test_fresh_execute_success_both_output_and_error passed");
}
#[traced_test]
async fn test_fresh_execute_immediate_failure() {
let workspace = BatchWorkspace::new_temp()
.await
.expect("expected workspace construction success");
info!("Beginning test_fresh_execute_immediate_failure");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.build()
.unwrap();
let tmp_dir = tempdir().unwrap();
let raw_input_path = tmp_dir.path().join("input.txt");
fs::write(&raw_input_path, b"some input content").unwrap();
let real_path = std::fs::canonicalize(&raw_input_path).unwrap();
let final_batch_id = generate_mock_batch_id_for(&real_path);
mock_client.configure_failure(&final_batch_id, true);
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
real_path.clone(),
None,
None,
);
let result = triple.fresh_execute(&mock_client).await;
debug!("Result from immediate_failure fresh_execute: {:?}", result);
assert!(
result.is_err(),
"fresh_execute should fail if the batch is immediately failed"
);
info!("test_fresh_execute_immediate_failure passed");
}
#[traced_test]
async fn test_fresh_execute_eventual_failure() {
info!("Beginning test_fresh_execute_eventual_failure");
let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.build()
.unwrap();
let tmp_dir = tempdir().unwrap();
let input_file_path = tmp_dir.path().join("input.txt");
fs::write(&input_file_path, b"some input content").unwrap();
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_file_path.clone(),
None,
None,
);
let final_batch_id = generate_mock_batch_id_for(&input_file_path);
configure_mock_batch_for_failure(&mock_client, &final_batch_id, false);
let result = triple.fresh_execute(&mock_client).await;
debug!("Result from eventual_failure fresh_execute: {:?}", result);
assert!(
result.is_err(),
"fresh_execute should eventually fail when batch toggles to Failed"
);
info!("test_fresh_execute_eventual_failure passed");
}
#[tokio::test]
#[should_panic(expected = "assertion failed: input_filename.exists()")]
async fn test_fresh_execute_missing_input_file() {
info!("Beginning test_fresh_execute_missing_input_file");
let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.build()
.unwrap();
let tmp_dir = tempdir().unwrap();
let input_file_path = tmp_dir.path().join("missing_file.json");
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_file_path.clone(),
None,
None,
);
triple.fresh_execute(&mock_client).await.unwrap();
}
#[tokio::test]
#[should_panic(expected = "assertion failed: !metadata_filename.exists()")]
async fn test_fresh_execute_metadata_already_exists() {
info!("Beginning test_fresh_execute_metadata_already_exists");
let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.build()
.unwrap();
let tmp_dir = tempdir().unwrap();
let input_file_path = tmp_dir.path().join("input.json");
fs::write(&input_file_path, b"{}").unwrap();
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_file_path.clone(),
None,
None,
);
let meta_path = triple.effective_metadata_filename();
fs::write(&meta_path, b"some old metadata content").unwrap();
triple.fresh_execute(&mock_client).await.unwrap();
}
#[traced_test]
async fn test_fresh_execute_openai_error_on_upload() {
info!("Beginning test_fresh_execute_openai_error_on_upload");
let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.fail_on_file_create_openai_error(true) .build()
.unwrap();
let tmp_dir = tempdir().unwrap();
let input_file_path = tmp_dir.path().join("input.json");
fs::write(&input_file_path, b"[1,2,3]").unwrap();
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_file_path.clone(),
None,
None,
);
let result = triple.fresh_execute(&mock_client).await;
debug!("Result from fresh_execute with forced OpenAI upload error: {:?}", result);
assert!(
result.is_err(),
"Should fail due to forced OpenAI error on upload"
);
info!("test_fresh_execute_openai_error_on_upload passed");
}
#[traced_test]
async fn test_fresh_execute_io_error_on_upload() {
info!("Beginning test_fresh_execute_io_error_on_upload");
let workspace = BatchWorkspace::new_temp().await.expect("expected workspace construction success");
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.fail_on_file_create_other_error(true) .build()
.unwrap();
let tmp_dir = tempdir().unwrap();
let input_file_path = tmp_dir.path().join("input.json");
fs::write(&input_file_path, b"[4,5,6]").unwrap();
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_file_path.clone(),
None,
None,
);
let result = triple.fresh_execute(&mock_client).await;
debug!("Result from fresh_execute with forced IO error on upload: {:?}", result);
assert!(
result.is_err(),
"Should fail due to forced I/O error on upload"
);
info!("test_fresh_execute_io_error_on_upload passed");
}
#[traced_test]
async fn test_fresh_execute_success_output_only() {
let workspace = BatchWorkspace::new_temp().await.unwrap();
let mock_client = MockLanguageModelClientBuilder::<MockBatchClientError>::default()
.build()
.unwrap();
let tmp_dir = tempdir().unwrap();
let input_path = tmp_dir.path().join("input.json");
fs::write(&input_path, b"{\"test\":\"data\"}").unwrap();
let mut triple = BatchFileTriple::new_for_test_with_in_out_err_paths(
workspace,
input_path.clone(),
None,
None,
);
let final_batch_id = format!("mock_batch_id_for_mock_file_id_{}", input_path.display());
mock_client.configure_inprogress_then_complete_with(&final_batch_id, true, false);
let exec_result = triple.fresh_execute(&mock_client).await;
assert!(exec_result.is_ok());
let exec_result = exec_result.unwrap();
assert!(exec_result.outputs().is_some(), "Should have output data");
assert!(exec_result.errors().is_none(), "Should have no error data");
}
}