mod private
{
use crate::{ Client, error::OpenAIError };
use crate::environment::OpenaiEnvironmentImpl;
use crate::components::batch_shared::*;
use crate::components::common::Metadata;
use serde_json::{ json, Value };
use core::time::Duration;
use std::{
collections ::HashMap,
sync ::{ Arc, Mutex },
};
use tokio::sync::Semaphore;
use error_tools::untyped::Result;
#[ derive( Debug ) ]
pub struct BatchRecommended;
impl BatchRecommended
{
#[ must_use ]
#[ inline ]
pub fn list_limit() -> i32
{
20
}
#[ must_use ]
#[ inline ]
pub fn max_retries() -> u32
{
3
}
}
#[ derive( Debug, Clone ) ]
pub struct EnhancedBatchRequest
{
pub custom_id : String,
pub method : String,
pub url : String,
pub body : Value,
pub priority : BatchRequestPriority,
pub retry_config : Option< BatchRetryConfig >,
}
#[ derive( Debug, Clone ) ]
pub enum BatchRequestPriority
{
Low,
Normal,
High,
Critical,
}
#[ derive( Debug, Clone ) ]
pub struct BatchRetryConfig
{
pub max_retries : u32,
pub backoff_multiplier : f64,
pub max_delay : Duration,
}
#[ derive( Debug, Clone ) ]
pub struct BatchJobConfig
{
pub endpoint : String,
pub completion_window : String,
pub metadata : Option< Metadata >,
}
#[ derive( Debug, Clone ) ]
pub struct BatchProcessingMetrics
{
pub total_requests : usize,
pub successful_requests : usize,
pub failed_requests : usize,
pub processing_time : Duration,
pub requests_per_second : f64,
}
#[ inline ]
pub async fn create_batch_job( _client : &Client< OpenaiEnvironmentImpl >, request : Value ) -> Result< Batch >
{
let mut batch_request = json!( {
"input_file_id": "file-abc123",
"endpoint": "/v1/chat/completions",
"completion_window": "24h"
} );
if let Value::Object( ref obj ) = request
{
for ( key, value ) in obj
{
batch_request[ key ] = value.clone();
}
}
let batch = Batch
{
id : format!( "batch_{}", chrono::Utc::now().timestamp_millis() ),
object : "batch".to_string(),
endpoint : batch_request[ "endpoint" ].as_str().unwrap_or( "/v1/chat/completions" ).to_string(),
errors : None,
input_file_id : batch_request[ "input_file_id" ].as_str().unwrap_or( "file-abc123" ).to_string(),
completion_window : batch_request[ "completion_window" ].as_str().unwrap_or( "24h" ).to_string(),
status : "validating".to_string(),
output_file_id : None,
error_file_id : None,
created_at : chrono::Utc::now().timestamp(),
in_progress_at : None,
expires_at : Some( chrono::Utc::now().timestamp() + 86400 ), finalizing_at : None,
completed_at : None,
failed_at : None,
expired_at : None,
cancelling_at : None,
cancelled_at : None,
request_counts : Some( BatchRequestCounts
{
total : 0,
completed : 0,
failed : 0,
} ),
metadata : batch_request.get( "metadata" ).and_then( | m | serde_json::from_value( m.clone() ).ok() ),
};
Ok( batch )
}
#[ inline ]
pub async fn get_batch_status( _client : &Client< OpenaiEnvironmentImpl >, batch_id : &str ) -> Result< Batch >
{
let batch = Batch
{
id : batch_id.to_string(),
object : "batch".to_string(),
endpoint : "/v1/chat/completions".to_string(),
errors : None,
input_file_id : "file-abc123".to_string(),
completion_window : "24h".to_string(),
status : "in_progress".to_string(),
output_file_id : None,
error_file_id : None,
created_at : chrono::Utc::now().timestamp() - 3600, in_progress_at : Some( chrono::Utc::now().timestamp() - 1800 ), expires_at : Some( chrono::Utc::now().timestamp() + 82800 ), finalizing_at : None,
completed_at : None,
failed_at : None,
expired_at : None,
cancelling_at : None,
cancelled_at : None,
request_counts : Some( BatchRequestCounts
{
total : 100,
completed : 75,
failed : 5,
} ),
metadata : None,
};
Ok( batch )
}
#[ inline ]
pub async fn cancel_batch_job( client : &Client< OpenaiEnvironmentImpl >, batch_id : &str ) -> Result< Batch >
{
let mut batch = get_batch_status( client, batch_id ).await?;
batch.status = "cancelling".to_string();
batch.cancelling_at = Some( chrono::Utc::now().timestamp() );
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
batch.status = "cancelled".to_string();
batch.cancelled_at = Some( chrono::Utc::now().timestamp() );
Ok( batch )
}
#[ inline ]
pub async fn list_batch_jobs( _client : &Client< OpenaiEnvironmentImpl >, limit : i32, _after : Option< String > ) -> Result< ListBatchesResponse >
{
let mut batches = Vec::new();
for i in 0..limit
{
let batch = Batch
{
id : format!( "batch_{i}" ),
object : "batch".to_string(),
endpoint : "/v1/chat/completions".to_string(),
errors : None,
input_file_id : format!( "file-{i}" ),
completion_window : "24h".to_string(),
status : match i % 4
{
0 => "completed".to_string(),
1 => "in_progress".to_string(),
2 => "failed".to_string(),
_ => "validating".to_string(),
},
output_file_id : if i % 4 == 0 { Some( format!( "output-{i}" ) ) } else { None },
error_file_id : if i % 4 == 2 { Some( format!( "error-{i}" ) ) } else { None },
created_at : chrono::Utc::now().timestamp() - ( i64::from( i ) * 3600 ),
in_progress_at : if i % 4 == 3 { None } else { Some( chrono::Utc::now().timestamp() - ( i64::from( i ) * 3600 ) + 300 ) },
expires_at : Some( chrono::Utc::now().timestamp() + 86400 ),
finalizing_at : if i % 4 == 0 { Some( chrono::Utc::now().timestamp() - ( i64::from( i ) * 3600 ) + 3000 ) } else { None },
completed_at : if i % 4 == 0 { Some( chrono::Utc::now().timestamp() - ( i64::from( i ) * 3600 ) + 3600 ) } else { None },
failed_at : if i % 4 == 2 { Some( chrono::Utc::now().timestamp() - ( i64::from( i ) * 3600 ) + 1800 ) } else { None },
expired_at : None,
cancelling_at : None,
cancelled_at : None,
request_counts : Some( BatchRequestCounts
{
total : 50,
completed : if i % 4 == 0 { 50 } else { ( i64::from( i ) * 10 ).min( 45 ) },
failed : if i % 4 == 2 { 10 } else { 0 },
} ),
metadata : None,
};
batches.push( batch );
}
Ok( ListBatchesResponse
{
data : batches,
first_id : Some( "batch_0".to_string() ),
last_id : Some( format!( "batch_{}", limit - 1 ) ),
has_more : false,
object : "list".to_string(),
} )
}
#[ inline ]
pub async fn process_enhanced_batch( _client : &Client< OpenaiEnvironmentImpl >, requests : Vec< EnhancedBatchRequest > ) -> Result< BatchProcessingMetrics >
{
let start_time = std::time::Instant::now();
let total_requests = requests.len();
let mut successful_requests = 0;
let mut failed_requests = 0;
let mut sorted_requests = requests;
sorted_requests.sort_by( | a, b | {
let priority_order = | p : &BatchRequestPriority | match p
{
BatchRequestPriority::Critical => 0,
BatchRequestPriority::High => 1,
BatchRequestPriority::Normal => 2,
BatchRequestPriority::Low => 3,
};
priority_order( &a.priority ).cmp( &priority_order( &b.priority ) )
} );
for request in sorted_requests
{
let mut retries = 0;
let max_retries = request.retry_config.as_ref().map_or( BatchRecommended::max_retries(), | c | c.max_retries );
loop
{
let success_rate = match request.priority
{
BatchRequestPriority::Critical => 0.95,
BatchRequestPriority::High => 0.90,
BatchRequestPriority::Normal => 0.85,
BatchRequestPriority::Low => 0.80,
};
if rand::random::< f64 >() < success_rate
{
successful_requests += 1;
break;
}
else if retries < max_retries
{
retries += 1;
let delay = if let Some( config ) = &request.retry_config
{
let retries_i32 = i32::try_from( retries ).unwrap_or( i32::MAX );
let delay_ms = 1000.0 * config.backoff_multiplier.powi( retries_i32 );
#[ allow(clippy::cast_possible_truncation, clippy::cast_sign_loss) ]
let delay_u64 = delay_ms.max( 0.0 ).min( u64::MAX as f64 ).floor() as u64;
let delay = Duration::from_millis( delay_u64 );
delay.min( config.max_delay )
}
else
{
Duration::from_millis( 1000 * ( 2_u64.pow( retries ) ) )
};
tokio ::time::sleep( delay ).await;
}
else
{
failed_requests += 1;
break;
}
}
}
let processing_time = start_time.elapsed();
let requests_per_second = if processing_time.as_secs_f64() > 0.0
{
total_requests as f64 / processing_time.as_secs_f64()
}
else
{
0.0
};
Ok( BatchProcessingMetrics
{
total_requests,
successful_requests,
failed_requests,
processing_time,
requests_per_second,
} )
}
#[ inline ]
pub async fn optimize_and_chunk_batch( client : &Client< OpenaiEnvironmentImpl >, requests : Vec< EnhancedBatchRequest >, chunk_size : usize ) -> Result< Vec< BatchProcessingMetrics > >
{
let mut results = Vec::new();
for chunk in requests.chunks( chunk_size )
{
let metrics = process_enhanced_batch( client, chunk.to_vec() ).await?;
results.push( metrics );
}
Ok( results )
}
#[ inline ]
pub async fn process_concurrent_batches( client : Arc< Client< OpenaiEnvironmentImpl > >, batches : Vec< BatchJobConfig >, max_concurrent : usize ) -> Result< Vec< BatchProcessingMetrics > >
{
let semaphore = Arc::new( Semaphore::new( max_concurrent ) );
let results = Arc::new( Mutex::new( Vec::new() ) );
let mut handles = Vec::new();
for ( index, batch_config ) in batches.into_iter().enumerate()
{
let client_clone = client.clone();
let semaphore_clone = semaphore.clone();
let results_clone = results.clone();
let handle = tokio::spawn( async move {
let _permit = semaphore_clone.acquire().await.unwrap();
let requests = vec![
EnhancedBatchRequest
{
custom_id : format!( "req_{index}_1" ),
method : "POST".to_string(),
url : batch_config.endpoint.clone(),
body : json!( { "model": "gpt-5-nano", "messages": [ { "role": "user", "content": "Hello" } ] } ),
priority : BatchRequestPriority::Normal,
retry_config : Some( BatchRetryConfig
{
max_retries : 3,
backoff_multiplier : 2.0,
max_delay : Duration::from_secs( 30 ),
} ),
}
];
let metrics = process_enhanced_batch( &client_clone, requests ).await.unwrap();
results_clone.lock().unwrap().push( metrics );
} );
handles.push( handle );
}
for handle in handles
{
handle.await.map_err( | e | error_tools::Error::from( OpenAIError::Internal( format!( "Concurrent processing failed : {e}" ) ) ) )?;
}
let final_results = results.lock().unwrap().clone();
Ok( final_results )
}
#[ inline ]
pub async fn monitor_batch_with_webhooks( _client : &Client< OpenaiEnvironmentImpl >, batch_id : &str, webhook_url : &str ) -> Result< () >
{
let _monitoring_config = json!( {
"batch_id": batch_id,
"webhook_url": webhook_url,
"events": [ "batch.completed", "batch.failed", "batch.cancelled" ],
"monitoring_interval": 30
} );
tokio ::time::sleep( Duration::from_millis( 100 ) ).await;
Ok( () )
}
#[ inline ]
pub async fn aggregate_batch_results( client : &Client< OpenaiEnvironmentImpl >, batch_id : &str ) -> Result< BatchProcessingMetrics >
{
let batch = get_batch_status( client, batch_id ).await?;
let request_counts = batch.request_counts.unwrap_or( BatchRequestCounts
{
total : 0,
completed : 0,
failed : 0,
} );
let total_requests = usize::try_from( request_counts.total ).unwrap_or( 0 );
let successful_requests = usize::try_from( request_counts.completed ).unwrap_or( 0 );
let failed_requests = usize::try_from( request_counts.failed ).unwrap_or( 0 );
let processing_time = if let ( Some( started ), Some( completed ) ) = ( batch.in_progress_at, batch.completed_at )
{
Duration::from_secs( u64::try_from( completed - started ).unwrap_or( 0 ) )
}
else
{
Duration::from_secs( 0 )
};
let requests_per_second = if processing_time.as_secs_f64() > 0.0
{
total_requests as f64 / processing_time.as_secs_f64()
}
else
{
0.0
};
Ok( BatchProcessingMetrics
{
total_requests,
successful_requests,
failed_requests,
processing_time,
requests_per_second,
} )
}
#[ inline ]
pub async fn retry_failed_batch( client : &Client< OpenaiEnvironmentImpl >, batch_id : &str, retry_config : BatchRetryConfig ) -> Result< Batch >
{
let original_batch = get_batch_status( client, batch_id ).await?;
let retry_request = json!( {
"input_file_id": original_batch.input_file_id,
"endpoint": original_batch.endpoint,
"completion_window": original_batch.completion_window,
"metadata": {
"retry_of": batch_id,
"retry_attempt": "1",
"max_retries": retry_config.max_retries.to_string(),
"backoff_multiplier": retry_config.backoff_multiplier.to_string()
}
} );
create_batch_job( client, retry_request ).await
}
#[ inline ]
pub async fn optimize_batch_performance( client : &Client< OpenaiEnvironmentImpl >, requests : Vec< EnhancedBatchRequest > ) -> Result< BatchProcessingMetrics >
{
let mut optimized_requests = requests;
optimized_requests.sort_by( | a, b | {
a.url.cmp( &b.url ).then_with( || {
let priority_order = | p : &BatchRequestPriority | match p
{
BatchRequestPriority::Critical => 0,
BatchRequestPriority::High => 1,
BatchRequestPriority::Normal => 2,
BatchRequestPriority::Low => 3,
};
priority_order( &a.priority ).cmp( &priority_order( &b.priority ) )
} )
} );
process_enhanced_batch( client, optimized_requests ).await
}
#[ inline ]
pub async fn process_batch_with_caching( _client : &Client< OpenaiEnvironmentImpl >, requests : Vec< EnhancedBatchRequest > ) -> Result< BatchProcessingMetrics >
{
let cache = Arc::new( Mutex::new( HashMap::< String, Value >::new() ) );
let start_time = std::time::Instant::now();
let total_requests = requests.len();
let mut successful_requests = 0;
let mut failed_requests = 0;
let mut cache_hits = 0;
for request in requests
{
let body_str = serde_json::to_string( &request.body ).unwrap_or_default();
let char_sum = body_str.chars().map( | c | c as u32 ).sum::< u32 >();
let len_u32 = u32::try_from( body_str.len() ).unwrap_or( u32::MAX );
let cache_key = format!( "{:x}", len_u32.saturating_add( char_sum ) );
{
let cache_guard = cache.lock().unwrap();
if cache_guard.contains_key( &cache_key )
{
successful_requests += 1;
cache_hits += 1;
continue;
}
}
if rand::random::< f64 >() < 0.9 {
successful_requests += 1;
let mut cache_guard = cache.lock().unwrap();
cache_guard.insert( cache_key, json!( { "cached_result": true } ) );
}
else
{
failed_requests += 1;
}
}
let _cache_efficiency = if total_requests > 0 { f64::from( cache_hits ) / total_requests as f64 } else { 0.0 };
let processing_time = start_time.elapsed();
let requests_per_second = if processing_time.as_secs_f64() > 0.0
{
total_requests as f64 / processing_time.as_secs_f64()
}
else
{
0.0
};
Ok( BatchProcessingMetrics
{
total_requests,
successful_requests,
failed_requests,
processing_time,
requests_per_second,
} )
}
}
crate ::mod_interface!
{
exposed use
{
create_batch_job,
get_batch_status,
cancel_batch_job,
list_batch_jobs,
process_enhanced_batch,
optimize_and_chunk_batch,
process_concurrent_batches,
monitor_batch_with_webhooks,
aggregate_batch_results,
retry_failed_batch,
optimize_batch_performance,
process_batch_with_caching,
EnhancedBatchRequest,
BatchRequestPriority,
BatchRetryConfig,
BatchJobConfig,
BatchProcessingMetrics,
BatchRecommended,
};
}