ai_lib_rust/batch/
executor.rs1use std::time::{Duration, Instant};
4use super::collector::BatchItem;
5
6#[derive(Debug, Clone)]
7pub struct BatchResult<T, E> { pub successes: Vec<(usize, T)>, pub failures: Vec<(usize, E)>, pub execution_time: Duration, pub total_processed: usize }
8
9impl<T, E> BatchResult<T, E> {
10 pub fn new() -> Self { Self { successes: Vec::new(), failures: Vec::new(), execution_time: Duration::ZERO, total_processed: 0 } }
11 pub fn add_success(&mut self, i: usize, r: T) { self.successes.push((i, r)); }
12 pub fn add_failure(&mut self, i: usize, e: E) { self.failures.push((i, e)); }
13 pub fn all_succeeded(&self) -> bool { self.failures.is_empty() }
14 pub fn success_count(&self) -> usize { self.successes.len() }
15 pub fn failure_count(&self) -> usize { self.failures.len() }
16 pub fn success_rate(&self) -> f64 { if self.total_processed == 0 { 0.0 } else { self.successes.len() as f64 / self.total_processed as f64 } }
17}
18impl<T, E> Default for BatchResult<T, E> { fn default() -> Self { Self::new() } }
19
20#[derive(Debug, Clone)]
21pub struct BatchError { pub message: String, pub index: usize, pub retryable: bool }
22impl BatchError { pub fn new(msg: impl Into<String>, idx: usize) -> Self { Self { message: msg.into(), index: idx, retryable: false } } pub fn retryable(mut self) -> Self { self.retryable = true; self } }
23impl std::fmt::Display for BatchError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Batch error at {}: {}", self.index, self.message) } }
24impl std::error::Error for BatchError {}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum BatchStrategy { Parallel, Sequential, Concurrent { max_concurrency: usize } }
28impl Default for BatchStrategy { fn default() -> Self { BatchStrategy::Concurrent { max_concurrency: 5 } } }
29
30#[derive(Debug, Clone)]
31pub struct BatchExecutorConfig { pub strategy: BatchStrategy, pub continue_on_error: bool, pub item_timeout: Option<Duration>, pub max_retries: u32 }
32impl Default for BatchExecutorConfig { fn default() -> Self { Self { strategy: BatchStrategy::default(), continue_on_error: true, item_timeout: Some(Duration::from_secs(60)), max_retries: 2 } } }
33impl BatchExecutorConfig {
34 pub fn new() -> Self { Self::default() }
35 pub fn with_strategy(mut self, s: BatchStrategy) -> Self { self.strategy = s; self }
36 pub fn with_continue_on_error(mut self, c: bool) -> Self { self.continue_on_error = c; self }
37}
38
39pub struct BatchExecutor { config: BatchExecutorConfig }
40impl BatchExecutor {
41 pub fn new() -> Self { Self { config: BatchExecutorConfig::default() } }
42 pub fn with_config(config: BatchExecutorConfig) -> Self { Self { config } }
43 pub fn config(&self) -> &BatchExecutorConfig { &self.config }
44
45 pub async fn execute_sequential<T, R, E, F, Fut>(&self, items: Vec<BatchItem<T>>, executor_fn: F) -> BatchResult<R, BatchError>
46 where F: Fn(T) -> Fut, Fut: std::future::Future<Output = std::result::Result<R, E>>, E: std::fmt::Display {
47 let start = Instant::now();
48 let total = items.len();
49 let mut result = BatchResult::new();
50 for (i, item) in items.into_iter().enumerate() {
51 match executor_fn(item.data).await {
52 Ok(r) => result.add_success(i, r),
53 Err(e) => { result.add_failure(i, BatchError::new(e.to_string(), i)); if !self.config.continue_on_error { break; } }
54 }
55 }
56 result.execution_time = start.elapsed();
57 result.total_processed = total;
58 result
59 }
60}
61impl Default for BatchExecutor { fn default() -> Self { Self::new() } }