#![allow(unused_variables)]
#![allow(clippy::let_and_return)]
use futures::stream::{self, StreamExt};
use siumai::models;
use siumai::prelude::*;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("⚡ Batch Processing - High-volume concurrent processing\n");
demonstrate_basic_batch_processing().await;
demonstrate_rate_limited_processing().await;
demonstrate_progress_tracking().await;
demonstrate_error_handling_at_scale().await;
demonstrate_memory_efficient_processing().await;
println!("\n✅ Batch processing examples completed!");
Ok(())
}
async fn demonstrate_basic_batch_processing() {
println!("🔄 Basic Batch Processing:\n");
if let Ok(client) = create_test_client().await {
let tasks = create_sample_tasks(10);
println!(" Processing {} tasks concurrently...", tasks.len());
let start_time = Instant::now();
let futures: Vec<_> = tasks
.into_iter()
.enumerate()
.map(|(i, task)| {
let client = client.clone();
async move {
let result = process_single_task(client.as_ref(), &task).await;
(i, result)
}
})
.collect();
let results = futures::future::join_all(futures).await;
let duration = start_time.elapsed();
let successful = results.iter().filter(|(_, r)| r.is_ok()).count();
let failed = results.len() - successful;
println!(" 📊 Results:");
println!(" ✅ Successful: {successful}");
println!(" ❌ Failed: {failed}");
println!(" ⏱️ Total time: {duration:?}");
println!(
" 📈 Throughput: {:.2} tasks/second",
results.len() as f64 / duration.as_secs_f64()
);
println!(" ✅ Basic batch processing completed");
} else {
println!(" ⚠️ No client available for batch processing");
}
println!();
}
async fn demonstrate_rate_limited_processing() {
println!("⏱️ Rate-Limited Processing:\n");
if let Ok(client) = create_test_client().await {
let tasks = create_sample_tasks(15);
println!(" Processing {} tasks with rate limiting...", tasks.len());
let start_time = Instant::now();
let results = process_with_rate_limiting(
client,
tasks,
3, Duration::from_millis(500), )
.await;
let duration = start_time.elapsed();
let successful = results.iter().filter(|r| r.is_ok()).count();
let failed = results.len() - successful;
println!(" 📊 Rate-Limited Results:");
println!(" ✅ Successful: {successful}");
println!(" ❌ Failed: {failed}");
println!(" ⏱️ Total time: {duration:?}");
println!(
" 🐌 Controlled throughput: {:.2} tasks/second",
results.len() as f64 / duration.as_secs_f64()
);
println!(" ✅ Rate-limited processing completed");
} else {
println!(" ⚠️ No client available for rate-limited processing");
}
println!();
}
async fn demonstrate_progress_tracking() {
println!("📊 Progress Tracking:\n");
if let Ok(client) = create_test_client().await {
let tasks = create_sample_tasks(20);
println!(
" Processing {} tasks with progress tracking...",
tasks.len()
);
let results = process_with_progress_tracking(client, tasks).await;
let successful = results.iter().filter(|r| r.is_ok()).count();
let failed = results.len() - successful;
println!("\n 📊 Final Results:");
println!(" ✅ Successful: {successful}");
println!(" ❌ Failed: {failed}");
println!(
" 📈 Success rate: {:.1}%",
(successful as f64 / results.len() as f64) * 100.0
);
println!(" ✅ Progress tracking completed");
} else {
println!(" ⚠️ No client available for progress tracking");
}
println!();
}
async fn demonstrate_error_handling_at_scale() {
println!("🛡️ Error Handling at Scale:\n");
if let Ok(client) = create_test_client().await {
let mut tasks = create_sample_tasks(10);
tasks.extend(create_error_prone_tasks(5));
println!(
" Processing {} tasks with robust error handling...",
tasks.len()
);
let results = process_with_error_handling(client, tasks).await;
let mut successful = 0;
let mut retried = 0;
let mut failed = 0;
for result in &results {
match result {
Ok(_) => successful += 1,
Err(e) if e.to_string().contains("retry") => retried += 1,
Err(_) => failed += 1,
}
}
println!(" 📊 Error Handling Results:");
println!(" ✅ Successful: {successful}");
println!(" 🔄 Retried and succeeded: {retried}");
println!(" ❌ Permanently failed: {failed}");
println!(" ✅ Error handling at scale completed");
} else {
println!(" ⚠️ No client available for error handling demo");
}
println!();
}
async fn demonstrate_memory_efficient_processing() {
println!("💾 Memory-Efficient Processing:\n");
if let Ok(client) = create_test_client().await {
let total_tasks = 50;
let chunk_size = 10;
println!(" Processing {total_tasks} tasks in chunks of {chunk_size}...");
let mut total_successful = 0;
let mut total_failed = 0;
let start_time = Instant::now();
for chunk_start in (0..total_tasks).step_by(chunk_size) {
let chunk_end = (chunk_start + chunk_size).min(total_tasks);
let chunk_tasks = create_sample_tasks(chunk_end - chunk_start);
println!(" Processing chunk {}-{}...", chunk_start + 1, chunk_end);
let chunk_results = process_chunk(client.clone(), chunk_tasks).await;
let chunk_successful = chunk_results.iter().filter(|r| r.is_ok()).count();
let chunk_failed = chunk_results.len() - chunk_successful;
total_successful += chunk_successful;
total_failed += chunk_failed;
println!(" Chunk results: {chunk_successful} ✅, {chunk_failed} ❌");
sleep(Duration::from_millis(100)).await;
}
let duration = start_time.elapsed();
println!("\n 📊 Memory-Efficient Results:");
println!(" ✅ Total successful: {total_successful}");
println!(" ❌ Total failed: {total_failed}");
println!(" ⏱️ Total time: {duration:?}");
println!(" 💾 Memory usage: Constant (chunked processing)");
println!(" ✅ Memory-efficient processing completed");
} else {
println!(" ⚠️ No client available for memory-efficient processing");
}
println!();
}
async fn create_test_client() -> Result<Arc<dyn ChatCapability>, LlmError> {
if let Ok(api_key) = std::env::var("OPENAI_API_KEY") {
let client = LlmBuilder::new()
.openai()
.api_key(&api_key)
.model(models::openai::GPT_4O_MINI)
.temperature(0.7)
.build()
.await?;
Ok(Arc::new(client) as Arc<dyn ChatCapability>)
} else if let Ok(api_key) = std::env::var("ANTHROPIC_API_KEY") {
let client = LlmBuilder::new()
.anthropic()
.api_key(&api_key)
.model(models::anthropic::CLAUDE_HAIKU_3_5)
.temperature(0.7)
.build()
.await?;
Ok(Arc::new(client) as Arc<dyn ChatCapability>)
} else {
Err(LlmError::AuthenticationError(
"No API key available".to_string(),
))
}
}
fn create_sample_tasks(count: usize) -> Vec<String> {
(1..=count)
.map(|i| format!("Task {}: What is {}+{}? Answer briefly.", i, i, i + 1))
.collect()
}
fn create_error_prone_tasks(count: usize) -> Vec<String> {
(1..=count)
.map(|i| {
format!(
"Error task {}: [This might cause an error] Process this: {}",
i,
"x".repeat(1000)
)
})
.collect()
}
async fn process_single_task(client: &dyn ChatCapability, task: &str) -> Result<String, LlmError> {
let messages = vec![user!(task)];
let response = client.chat(messages).await?;
Ok(response.content_text().unwrap_or_default().to_string())
}
async fn process_with_rate_limiting(
client: Arc<dyn ChatCapability>,
tasks: Vec<String>,
max_concurrent: usize,
delay_between_requests: Duration,
) -> Vec<Result<String, LlmError>> {
let semaphore = Arc::new(Semaphore::new(max_concurrent));
stream::iter(tasks)
.map(|task| {
let client = client.clone();
let semaphore = semaphore.clone();
async move {
let _permit = semaphore.acquire().await.unwrap();
sleep(delay_between_requests).await;
process_single_task(client.as_ref(), &task).await
}
})
.buffer_unordered(max_concurrent)
.collect()
.await
}
async fn process_with_progress_tracking(
client: Arc<dyn ChatCapability>,
tasks: Vec<String>,
) -> Vec<Result<String, LlmError>> {
let total = tasks.len();
let mut completed = 0;
let futures: Vec<_> = tasks
.into_iter()
.map(|task| {
let client = client.clone();
async move {
let result = process_single_task(client.as_ref(), &task).await;
result
}
})
.collect();
let mut results = Vec::new();
for future in futures {
let result = future.await;
completed += 1;
let progress = (completed as f64 / total as f64) * 100.0;
print!("\r Progress: {progress:.1}% ({completed}/{total})");
std::io::Write::flush(&mut std::io::stdout()).unwrap();
results.push(result);
}
println!(); results
}
async fn process_with_error_handling(
client: Arc<dyn ChatCapability>,
tasks: Vec<String>,
) -> Vec<Result<String, LlmError>> {
let futures: Vec<_> = tasks
.into_iter()
.map(|task| {
let client = client.clone();
async move {
for attempt in 1..=3 {
match process_single_task(client.as_ref(), &task).await {
Ok(result) => return Ok(result),
Err(e) if attempt < 3 => {
sleep(Duration::from_millis(100 * attempt as u64)).await;
}
Err(e) => return Err(e),
}
}
unreachable!()
}
})
.collect();
futures::future::join_all(futures).await
}
async fn process_chunk(
client: Arc<dyn ChatCapability>,
tasks: Vec<String>,
) -> Vec<Result<String, LlmError>> {
let futures: Vec<_> = tasks
.into_iter()
.map(|task| {
let client = client.clone();
async move { process_single_task(client.as_ref(), &task).await }
})
.collect();
futures::future::join_all(futures).await
}