siumai 0.10.3

A unified LLM interface library for Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
//! ⚡ Batch Processing - High-volume concurrent processing
//!
//! This example demonstrates how to process large numbers of requests efficiently:
//! - Concurrent request handling with proper rate limiting
//! - Progress tracking and monitoring

#![allow(unused_variables)]
#![allow(clippy::let_and_return)]
//! - Error handling and retry strategies at scale
//! - Memory management for large batches
//! - Performance optimization techniques
//!
//! Before running, set your API keys:
//! ```bash
//! export OPENAI_API_KEY="your-key"
//! export ANTHROPIC_API_KEY="your-key"
//! ```
//!
//! Run with:
//! ```bash
//! cargo run --example batch_processing
//! ```

use futures::stream::{self, StreamExt};
use siumai::models;
use siumai::prelude::*;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Semaphore;
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("⚡ Batch Processing - High-volume concurrent processing\n");

    // Demonstrate different batch processing patterns
    demonstrate_basic_batch_processing().await;
    demonstrate_rate_limited_processing().await;
    demonstrate_progress_tracking().await;
    demonstrate_error_handling_at_scale().await;
    demonstrate_memory_efficient_processing().await;

    println!("\n✅ Batch processing examples completed!");
    Ok(())
}

/// Demonstrate basic concurrent batch processing
async fn demonstrate_basic_batch_processing() {
    println!("🔄 Basic Batch Processing:\n");

    if let Ok(client) = create_test_client().await {
        // Create a batch of tasks
        let tasks = create_sample_tasks(10);

        println!("   Processing {} tasks concurrently...", tasks.len());
        let start_time = Instant::now();

        // Process all tasks concurrently
        let futures: Vec<_> = tasks
            .into_iter()
            .enumerate()
            .map(|(i, task)| {
                let client = client.clone();
                async move {
                    let result = process_single_task(client.as_ref(), &task).await;
                    (i, result)
                }
            })
            .collect();

        let results = futures::future::join_all(futures).await;
        let duration = start_time.elapsed();

        // Analyze results
        let successful = results.iter().filter(|(_, r)| r.is_ok()).count();
        let failed = results.len() - successful;

        println!("   📊 Results:");
        println!("      ✅ Successful: {successful}");
        println!("      ❌ Failed: {failed}");
        println!("      ⏱️  Total time: {duration:?}");
        println!(
            "      📈 Throughput: {:.2} tasks/second",
            results.len() as f64 / duration.as_secs_f64()
        );

        println!("   ✅ Basic batch processing completed");
    } else {
        println!("   ⚠️  No client available for batch processing");
    }

    println!();
}

/// Demonstrate rate-limited batch processing
async fn demonstrate_rate_limited_processing() {
    println!("⏱️ Rate-Limited Processing:\n");

    if let Ok(client) = create_test_client().await {
        let tasks = create_sample_tasks(15);

        println!("   Processing {} tasks with rate limiting...", tasks.len());
        let start_time = Instant::now();

        // Process with rate limiting (max 3 concurrent, 500ms delay between batches)
        let results = process_with_rate_limiting(
            client,
            tasks,
            3,                          // max concurrent
            Duration::from_millis(500), // delay between requests
        )
        .await;

        let duration = start_time.elapsed();
        let successful = results.iter().filter(|r| r.is_ok()).count();
        let failed = results.len() - successful;

        println!("   📊 Rate-Limited Results:");
        println!("      ✅ Successful: {successful}");
        println!("      ❌ Failed: {failed}");
        println!("      ⏱️  Total time: {duration:?}");
        println!(
            "      🐌 Controlled throughput: {:.2} tasks/second",
            results.len() as f64 / duration.as_secs_f64()
        );

        println!("   ✅ Rate-limited processing completed");
    } else {
        println!("   ⚠️  No client available for rate-limited processing");
    }

    println!();
}

/// Demonstrate progress tracking
async fn demonstrate_progress_tracking() {
    println!("📊 Progress Tracking:\n");

    if let Ok(client) = create_test_client().await {
        let tasks = create_sample_tasks(20);

        println!(
            "   Processing {} tasks with progress tracking...",
            tasks.len()
        );

        let results = process_with_progress_tracking(client, tasks).await;

        let successful = results.iter().filter(|r| r.is_ok()).count();
        let failed = results.len() - successful;

        println!("\n   📊 Final Results:");
        println!("      ✅ Successful: {successful}");
        println!("      ❌ Failed: {failed}");
        println!(
            "      📈 Success rate: {:.1}%",
            (successful as f64 / results.len() as f64) * 100.0
        );

        println!("   ✅ Progress tracking completed");
    } else {
        println!("   ⚠️  No client available for progress tracking");
    }

    println!();
}

/// Demonstrate error handling at scale
async fn demonstrate_error_handling_at_scale() {
    println!("🛡️ Error Handling at Scale:\n");

    if let Ok(client) = create_test_client().await {
        // Create tasks with some that will likely fail
        let mut tasks = create_sample_tasks(10);
        tasks.extend(create_error_prone_tasks(5));

        println!(
            "   Processing {} tasks with robust error handling...",
            tasks.len()
        );

        let results = process_with_error_handling(client, tasks).await;

        // Categorize results
        let mut successful = 0;
        let mut retried = 0;
        let mut failed = 0;

        for result in &results {
            match result {
                Ok(_) => successful += 1,
                Err(e) if e.to_string().contains("retry") => retried += 1,
                Err(_) => failed += 1,
            }
        }

        println!("   📊 Error Handling Results:");
        println!("      ✅ Successful: {successful}");
        println!("      🔄 Retried and succeeded: {retried}");
        println!("      ❌ Permanently failed: {failed}");

        println!("   ✅ Error handling at scale completed");
    } else {
        println!("   ⚠️  No client available for error handling demo");
    }

    println!();
}

/// Demonstrate memory-efficient processing for large batches
async fn demonstrate_memory_efficient_processing() {
    println!("💾 Memory-Efficient Processing:\n");

    if let Ok(client) = create_test_client().await {
        let total_tasks = 50;
        let chunk_size = 10;

        println!("   Processing {total_tasks} tasks in chunks of {chunk_size}...");

        let mut total_successful = 0;
        let mut total_failed = 0;
        let start_time = Instant::now();

        // Process in chunks to manage memory
        for chunk_start in (0..total_tasks).step_by(chunk_size) {
            let chunk_end = (chunk_start + chunk_size).min(total_tasks);
            let chunk_tasks = create_sample_tasks(chunk_end - chunk_start);

            println!("   Processing chunk {}-{}...", chunk_start + 1, chunk_end);

            let chunk_results = process_chunk(client.clone(), chunk_tasks).await;

            let chunk_successful = chunk_results.iter().filter(|r| r.is_ok()).count();
            let chunk_failed = chunk_results.len() - chunk_successful;

            total_successful += chunk_successful;
            total_failed += chunk_failed;

            println!("      Chunk results: {chunk_successful} ✅, {chunk_failed}");

            // Small delay between chunks to be respectful
            sleep(Duration::from_millis(100)).await;
        }

        let duration = start_time.elapsed();

        println!("\n   📊 Memory-Efficient Results:");
        println!("      ✅ Total successful: {total_successful}");
        println!("      ❌ Total failed: {total_failed}");
        println!("      ⏱️  Total time: {duration:?}");
        println!("      💾 Memory usage: Constant (chunked processing)");

        println!("   ✅ Memory-efficient processing completed");
    } else {
        println!("   ⚠️  No client available for memory-efficient processing");
    }

    println!();
}

/// Create a test client
async fn create_test_client() -> Result<Arc<dyn ChatCapability>, LlmError> {
    if let Ok(api_key) = std::env::var("OPENAI_API_KEY") {
        let client = LlmBuilder::new()
            .openai()
            .api_key(&api_key)
            .model(models::openai::GPT_4O_MINI)
            .temperature(0.7)
            .build()
            .await?;
        Ok(Arc::new(client) as Arc<dyn ChatCapability>)
    } else if let Ok(api_key) = std::env::var("ANTHROPIC_API_KEY") {
        let client = LlmBuilder::new()
            .anthropic()
            .api_key(&api_key)
            .model(models::anthropic::CLAUDE_HAIKU_3_5)
            .temperature(0.7)
            .build()
            .await?;
        Ok(Arc::new(client) as Arc<dyn ChatCapability>)
    } else {
        Err(LlmError::AuthenticationError(
            "No API key available".to_string(),
        ))
    }
}

/// Create sample tasks for processing
fn create_sample_tasks(count: usize) -> Vec<String> {
    (1..=count)
        .map(|i| format!("Task {}: What is {}+{}? Answer briefly.", i, i, i + 1))
        .collect()
}

/// Create tasks that are likely to cause errors
fn create_error_prone_tasks(count: usize) -> Vec<String> {
    (1..=count)
        .map(|i| {
            format!(
                "Error task {}: [This might cause an error] Process this: {}",
                i,
                "x".repeat(1000)
            )
        })
        .collect()
}

/// Process a single task
async fn process_single_task(client: &dyn ChatCapability, task: &str) -> Result<String, LlmError> {
    let messages = vec![user!(task)];
    let response = client.chat(messages).await?;
    Ok(response.content_text().unwrap_or_default().to_string())
}

/// Process tasks with rate limiting
async fn process_with_rate_limiting(
    client: Arc<dyn ChatCapability>,
    tasks: Vec<String>,
    max_concurrent: usize,
    delay_between_requests: Duration,
) -> Vec<Result<String, LlmError>> {
    let semaphore = Arc::new(Semaphore::new(max_concurrent));

    stream::iter(tasks)
        .map(|task| {
            let client = client.clone();
            let semaphore = semaphore.clone();
            async move {
                let _permit = semaphore.acquire().await.unwrap();
                sleep(delay_between_requests).await;
                process_single_task(client.as_ref(), &task).await
            }
        })
        .buffer_unordered(max_concurrent)
        .collect()
        .await
}

/// Process tasks with progress tracking
async fn process_with_progress_tracking(
    client: Arc<dyn ChatCapability>,
    tasks: Vec<String>,
) -> Vec<Result<String, LlmError>> {
    let total = tasks.len();
    let mut completed = 0;

    let futures: Vec<_> = tasks
        .into_iter()
        .map(|task| {
            let client = client.clone();
            async move {
                let result = process_single_task(client.as_ref(), &task).await;
                result
            }
        })
        .collect();

    let mut results = Vec::new();

    for future in futures {
        let result = future.await;
        completed += 1;

        let progress = (completed as f64 / total as f64) * 100.0;
        print!("\r   Progress: {progress:.1}% ({completed}/{total})");
        std::io::Write::flush(&mut std::io::stdout()).unwrap();

        results.push(result);
    }

    println!(); // New line after progress
    results
}

/// Process tasks with comprehensive error handling
async fn process_with_error_handling(
    client: Arc<dyn ChatCapability>,
    tasks: Vec<String>,
) -> Vec<Result<String, LlmError>> {
    let futures: Vec<_> = tasks
        .into_iter()
        .map(|task| {
            let client = client.clone();
            async move {
                // Retry logic for each task
                for attempt in 1..=3 {
                    match process_single_task(client.as_ref(), &task).await {
                        Ok(result) => return Ok(result),
                        Err(e) if attempt < 3 => {
                            // Wait before retry
                            sleep(Duration::from_millis(100 * attempt as u64)).await;
                        }
                        Err(e) => return Err(e),
                    }
                }
                unreachable!()
            }
        })
        .collect();

    futures::future::join_all(futures).await
}

/// Process a chunk of tasks
async fn process_chunk(
    client: Arc<dyn ChatCapability>,
    tasks: Vec<String>,
) -> Vec<Result<String, LlmError>> {
    let futures: Vec<_> = tasks
        .into_iter()
        .map(|task| {
            let client = client.clone();
            async move { process_single_task(client.as_ref(), &task).await }
        })
        .collect();

    futures::future::join_all(futures).await
}

/*
🎯 Key Batch Processing Concepts:

Concurrency Control:
- Use Semaphore to limit concurrent requests
- Buffer streams to control memory usage
- Implement proper backpressure handling

Rate Limiting:
- Respect provider rate limits
- Add delays between requests
- Use exponential backoff for retries
- Monitor and adjust based on errors

Error Handling:
- Retry transient errors with backoff
- Categorize and handle different error types
- Implement circuit breakers for persistent failures
- Log errors for monitoring and debugging

Memory Management:
- Process in chunks for large datasets
- Use streaming instead of collecting all results
- Clean up resources promptly
- Monitor memory usage in production

Performance Optimization:
- Balance concurrency vs. rate limits
- Use connection pooling when available
- Implement caching for repeated requests
- Monitor and optimize based on metrics

Best Practices:
1. Start with conservative rate limits
2. Implement comprehensive error handling
3. Monitor progress and performance
4. Use chunking for very large batches
5. Respect provider terms of service
6. Implement proper logging and monitoring

Production Considerations:
- Cost monitoring and budgets
- Error alerting and escalation
- Performance metrics and SLAs
- Graceful shutdown and cleanup
- Resource scaling and management

Next Steps:
- custom_configurations.rs: Optimize for specific use cases
- ../04_providers/: Provider-specific batch optimizations
- ../05_use_cases/: Real-world batch processing applications
*/