Skip to main content

ai_lib_rust/batch/
executor.rs

1//! Batch executor.
2
3use std::time::{Duration, Instant};
4use super::collector::BatchItem;
5
6#[derive(Debug, Clone)]
7pub struct BatchResult<T, E> { pub successes: Vec<(usize, T)>, pub failures: Vec<(usize, E)>, pub execution_time: Duration, pub total_processed: usize }
8
9impl<T, E> BatchResult<T, E> {
10    pub fn new() -> Self { Self { successes: Vec::new(), failures: Vec::new(), execution_time: Duration::ZERO, total_processed: 0 } }
11    pub fn add_success(&mut self, i: usize, r: T) { self.successes.push((i, r)); }
12    pub fn add_failure(&mut self, i: usize, e: E) { self.failures.push((i, e)); }
13    pub fn all_succeeded(&self) -> bool { self.failures.is_empty() }
14    pub fn success_count(&self) -> usize { self.successes.len() }
15    pub fn failure_count(&self) -> usize { self.failures.len() }
16    pub fn success_rate(&self) -> f64 { if self.total_processed == 0 { 0.0 } else { self.successes.len() as f64 / self.total_processed as f64 } }
17}
18impl<T, E> Default for BatchResult<T, E> { fn default() -> Self { Self::new() } }
19
20#[derive(Debug, Clone)]
21pub struct BatchError { pub message: String, pub index: usize, pub retryable: bool }
22impl BatchError { pub fn new(msg: impl Into<String>, idx: usize) -> Self { Self { message: msg.into(), index: idx, retryable: false } } pub fn retryable(mut self) -> Self { self.retryable = true; self } }
23impl std::fmt::Display for BatchError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Batch error at {}: {}", self.index, self.message) } }
24impl std::error::Error for BatchError {}
25
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum BatchStrategy { Parallel, Sequential, Concurrent { max_concurrency: usize } }
28impl Default for BatchStrategy { fn default() -> Self { BatchStrategy::Concurrent { max_concurrency: 5 } } }
29
30#[derive(Debug, Clone)]
31pub struct BatchExecutorConfig { pub strategy: BatchStrategy, pub continue_on_error: bool, pub item_timeout: Option<Duration>, pub max_retries: u32 }
32impl Default for BatchExecutorConfig { fn default() -> Self { Self { strategy: BatchStrategy::default(), continue_on_error: true, item_timeout: Some(Duration::from_secs(60)), max_retries: 2 } } }
33impl BatchExecutorConfig {
34    pub fn new() -> Self { Self::default() }
35    pub fn with_strategy(mut self, s: BatchStrategy) -> Self { self.strategy = s; self }
36    pub fn with_continue_on_error(mut self, c: bool) -> Self { self.continue_on_error = c; self }
37}
38
39pub struct BatchExecutor { config: BatchExecutorConfig }
40impl BatchExecutor {
41    pub fn new() -> Self { Self { config: BatchExecutorConfig::default() } }
42    pub fn with_config(config: BatchExecutorConfig) -> Self { Self { config } }
43    pub fn config(&self) -> &BatchExecutorConfig { &self.config }
44
45    pub async fn execute_sequential<T, R, E, F, Fut>(&self, items: Vec<BatchItem<T>>, executor_fn: F) -> BatchResult<R, BatchError>
46    where F: Fn(T) -> Fut, Fut: std::future::Future<Output = std::result::Result<R, E>>, E: std::fmt::Display {
47        let start = Instant::now();
48        let total = items.len();
49        let mut result = BatchResult::new();
50        for (i, item) in items.into_iter().enumerate() {
51            match executor_fn(item.data).await {
52                Ok(r) => result.add_success(i, r),
53                Err(e) => { result.add_failure(i, BatchError::new(e.to_string(), i)); if !self.config.continue_on_error { break; } }
54            }
55        }
56        result.execution_time = start.elapsed();
57        result.total_processed = total;
58        result
59    }
60}
61impl Default for BatchExecutor { fn default() -> Self { Self::new() } }