praxis_llm/buffer_utils/
batching.rs1use tokio::time::{interval, Duration, Interval};
2
3pub struct EventBatcher<T> {
6 batch: Vec<T>,
7 ticker: Interval,
8 window_ms: u64,
9}
10
11impl<T> EventBatcher<T> {
12 pub fn new(window_ms: u64) -> Self {
14 Self {
15 batch: Vec::new(),
16 ticker: interval(Duration::from_millis(window_ms)),
17 window_ms,
18 }
19 }
20
21 pub fn push(&mut self, event: T) {
23 self.batch.push(event);
24 }
25
26 pub fn should_flush_now(&self) -> bool {
29 !self.batch.is_empty()
32 }
33
34 pub fn take(&mut self) -> Vec<T> {
36 std::mem::take(&mut self.batch)
37 }
38
39 pub fn len(&self) -> usize {
41 self.batch.len()
42 }
43
44 pub fn is_empty(&self) -> bool {
46 self.batch.is_empty()
47 }
48
49 pub fn ticker(&mut self) -> &mut Interval {
51 &mut self.ticker
52 }
53
54 pub fn window_ms(&self) -> u64 {
56 self.window_ms
57 }
58}
59
60#[cfg(test)]
61mod tests {
62 use super::*;
63
64 #[tokio::test]
65 async fn test_batcher_basic() {
66 let mut batcher = EventBatcher::<i32>::new(50);
67
68 batcher.push(1);
69 batcher.push(2);
70 batcher.push(3);
71
72 assert_eq!(batcher.len(), 3);
73 assert!(!batcher.is_empty());
74
75 let batch = batcher.take();
76 assert_eq!(batch, vec![1, 2, 3]);
77 assert!(batcher.is_empty());
78 }
79}
80