ai_lib_rust/batch/
collector.rs1use std::collections::VecDeque;
4use std::sync::{Arc, RwLock};
5use std::time::{Duration, Instant};
6
7#[derive(Debug, Clone)]
8pub struct BatchConfig { pub max_batch_size: usize, pub max_wait_time: Duration, pub auto_flush: bool }
9impl Default for BatchConfig { fn default() -> Self { Self { max_batch_size: 10, max_wait_time: Duration::from_secs(5), auto_flush: true } } }
10impl BatchConfig {
11 pub fn new() -> Self { Self::default() }
12 pub fn with_max_batch_size(mut self, s: usize) -> Self { self.max_batch_size = s; self }
13 pub fn with_auto_flush(mut self, a: bool) -> Self { self.auto_flush = a; self }
14}
15
16#[derive(Debug, Clone)]
17pub struct BatchItem<T> { pub data: T, pub added_at: Instant, pub request_id: Option<String>, pub priority: i32 }
18impl<T> BatchItem<T> {
19 pub fn new(data: T) -> Self { Self { data, added_at: Instant::now(), request_id: None, priority: 0 } }
20 pub fn with_request_id(mut self, id: impl Into<String>) -> Self { self.request_id = Some(id.into()); self }
21 pub fn with_priority(mut self, p: i32) -> Self { self.priority = p; self }
22}
23
24pub struct BatchCollector<T> { config: BatchConfig, items: Arc<RwLock<VecDeque<BatchItem<T>>>>, batch_start: Arc<RwLock<Option<Instant>>> }
25
26impl<T: Clone> BatchCollector<T> {
27 pub fn new(config: BatchConfig) -> Self { Self { config, items: Arc::new(RwLock::new(VecDeque::new())), batch_start: Arc::new(RwLock::new(None)) } }
28
29 pub fn add(&self, item: BatchItem<T>) -> BatchAddResult {
30 let mut items = self.items.write().unwrap();
31 let mut start = self.batch_start.write().unwrap();
32 if items.is_empty() { *start = Some(Instant::now()); }
33 items.push_back(item);
34 let count = items.len();
35 if self.config.auto_flush && count >= self.config.max_batch_size { BatchAddResult::ShouldFlush { count } } else { BatchAddResult::Added { count } }
36 }
37
38 pub fn add_data(&self, data: T) -> BatchAddResult { self.add(BatchItem::new(data)) }
39
40 pub fn should_flush(&self) -> bool {
41 let items = self.items.read().unwrap();
42 let start = self.batch_start.read().unwrap();
43 if items.is_empty() { return false; }
44 if items.len() >= self.config.max_batch_size { return true; }
45 if let Some(s) = *start { if s.elapsed() >= self.config.max_wait_time { return true; } }
46 false
47 }
48
49 pub fn drain(&self) -> Vec<BatchItem<T>> {
50 let mut items = self.items.write().unwrap();
51 let mut start = self.batch_start.write().unwrap();
52 *start = None;
53 items.drain(..).collect()
54 }
55
56 pub fn len(&self) -> usize { self.items.read().unwrap().len() }
57 pub fn is_empty(&self) -> bool { self.len() == 0 }
58 pub fn clear(&self) { self.items.write().unwrap().clear(); *self.batch_start.write().unwrap() = None; }
59}
60
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub enum BatchAddResult { Added { count: usize }, ShouldFlush { count: usize } }
63impl BatchAddResult { pub fn should_flush(&self) -> bool { matches!(self, BatchAddResult::ShouldFlush { .. }) } pub fn count(&self) -> usize { match self { BatchAddResult::Added { count } | BatchAddResult::ShouldFlush { count } => *count } } }