1use std::time::{Duration, Instant};
31use tracing::debug;
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum FlushReason {
36 Time,
38 Count,
40 Size,
42 Manual,
44 Shutdown,
46}
47
48#[derive(Debug, Clone)]
50pub struct BatchConfig {
51 pub flush_ms: u64,
53 pub flush_count: usize,
55 pub flush_bytes: usize,
57}
58
59impl Default for BatchConfig {
60 fn default() -> Self {
61 Self {
62 flush_ms: 100,
63 flush_count: 1000,
64 flush_bytes: 1024 * 1024, }
66 }
67}
68
69#[derive(Debug)]
71pub struct FlushBatch<T> {
72 pub items: Vec<T>,
73 pub total_bytes: usize,
74 pub reason: FlushReason,
75}
76
77#[derive(Debug)]
79pub struct Batch<T> {
80 pub items: Vec<T>,
81 pub total_bytes: usize,
82 pub created_at: Instant,
83}
84
85impl<T> Batch<T> {
86 pub fn new() -> Self {
87 Self {
88 items: Vec::new(),
89 total_bytes: 0,
90 created_at: Instant::now(),
91 }
92 }
93
94 pub fn is_empty(&self) -> bool {
95 self.items.is_empty()
96 }
97
98 pub fn len(&self) -> usize {
99 self.items.len()
100 }
101
102 pub fn age(&self) -> Duration {
103 self.created_at.elapsed()
104 }
105
106 pub fn push(&mut self, item: T, size_bytes: usize) {
107 self.items.push(item);
108 self.total_bytes += size_bytes;
109 }
110
111 pub fn take(&mut self) -> Vec<T> {
112 self.total_bytes = 0;
113 self.created_at = Instant::now();
114 std::mem::take(&mut self.items)
115 }
116}
117
118impl<T> Default for Batch<T> {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124pub struct HybridBatcher<T> {
127 config: BatchConfig,
128 batch: Batch<T>,
129}
130
131impl<T> HybridBatcher<T> {
132 pub fn new(config: BatchConfig) -> Self {
133 Self {
134 config,
135 batch: Batch::new(),
136 }
137 }
138
139 pub fn push(&mut self, item: T, size_bytes: usize) -> Option<FlushReason> {
141 self.batch.push(item, size_bytes);
142
143 if self.batch.len() >= self.config.flush_count {
145 Some(FlushReason::Count)
146 } else if self.batch.total_bytes >= self.config.flush_bytes {
147 Some(FlushReason::Size)
148 } else {
149 None
150 }
151 }
152
153 #[must_use]
155 pub fn should_flush_time(&self) -> bool {
156 !self.batch.is_empty()
157 && self.batch.age() >= Duration::from_millis(self.config.flush_ms)
158 }
159
160 pub fn take_batch(&mut self) -> Vec<T> {
162 let count = self.batch.len();
163 let bytes = self.batch.total_bytes;
164 let items = self.batch.take();
165 debug!(count, bytes, "Batch taken for flush");
166 items
167 }
168
169 #[must_use]
171 pub fn is_empty(&self) -> bool {
172 self.batch.is_empty()
173 }
174
175 #[must_use]
177 pub fn stats(&self) -> (usize, usize, Duration) {
178 (self.batch.len(), self.batch.total_bytes, self.batch.age())
179 }
180}
181
182impl<T: SizedItem> HybridBatcher<T> {
184 pub fn add(&mut self, item: T) -> Option<FlushReason> {
186 let size = item.size_bytes();
187 self.push(item, size)
188 }
189
190 pub fn add_batch(&mut self, items: Vec<T>) {
192 for item in items {
193 self.add(item);
194 }
195 }
196
197 pub fn take_if_ready(&mut self) -> Option<FlushBatch<T>> {
199 let reason = if self.batch.len() >= self.config.flush_count {
201 Some(FlushReason::Count)
202 } else if self.batch.total_bytes >= self.config.flush_bytes {
203 Some(FlushReason::Size)
204 } else if self.should_flush_time() {
205 Some(FlushReason::Time)
206 } else {
207 None
208 };
209
210 reason.map(|r| {
211 let total_bytes = self.batch.total_bytes;
213 FlushBatch {
214 items: self.batch.take(),
215 total_bytes,
216 reason: r,
217 }
218 })
219 }
220
221 pub fn force_flush(&mut self) -> Option<FlushBatch<T>> {
223 self.force_flush_with_reason(FlushReason::Manual)
224 }
225
226 pub fn force_flush_with_reason(&mut self, reason: FlushReason) -> Option<FlushBatch<T>> {
228 if self.batch.is_empty() {
229 return None;
230 }
231 let total_bytes = self.batch.total_bytes;
233 Some(FlushBatch {
234 items: self.batch.take(),
235 total_bytes,
236 reason,
237 })
238 }
239}
240
241impl<T: BatchableItem> HybridBatcher<T> {
243 #[must_use]
245 pub fn contains(&self, id: &str) -> bool {
246 self.batch.items.iter().any(|item| item.id() == id)
247 }
248}
249
250pub trait SizedItem {
252 #[must_use]
253 fn size_bytes(&self) -> usize;
254}
255
256pub trait BatchableItem: SizedItem {
258 fn id(&self) -> &str;
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264 use std::thread::sleep;
265
266 #[derive(Debug, Clone)]
268 #[allow(dead_code)]
269 struct TestItem {
270 id: String,
271 size: usize,
272 }
273
274 impl SizedItem for TestItem {
275 fn size_bytes(&self) -> usize {
276 self.size
277 }
278 }
279
280 fn item(id: &str, size: usize) -> TestItem {
281 TestItem { id: id.to_string(), size }
282 }
283
284 #[test]
285 fn test_batch_empty_initially() {
286 let batcher: HybridBatcher<TestItem> = HybridBatcher::new(BatchConfig::default());
287 assert!(batcher.is_empty());
288 let (count, bytes, _) = batcher.stats();
289 assert_eq!(count, 0);
290 assert_eq!(bytes, 0);
291 }
292
293 #[test]
294 fn test_batch_tracks_items_and_bytes() {
295 let mut batcher = HybridBatcher::new(BatchConfig::default());
296
297 batcher.add(item("a", 100));
298 batcher.add(item("b", 200));
299 batcher.add(item("c", 150));
300
301 let (count, bytes, _) = batcher.stats();
302 assert_eq!(count, 3);
303 assert_eq!(bytes, 450);
304 assert!(!batcher.is_empty());
305 }
306
307 #[test]
308 fn test_flush_on_count_threshold() {
309 let config = BatchConfig {
310 flush_count: 3,
311 flush_bytes: 1_000_000,
312 flush_ms: 10_000,
313 };
314 let mut batcher = HybridBatcher::new(config);
315
316 assert!(batcher.add(item("a", 100)).is_none());
318 assert!(batcher.add(item("b", 100)).is_none());
319
320 let reason = batcher.add(item("c", 100));
322 assert_eq!(reason, Some(FlushReason::Count));
323 }
324
325 #[test]
326 fn test_flush_on_size_threshold() {
327 let config = BatchConfig {
328 flush_count: 1000,
329 flush_bytes: 500,
330 flush_ms: 10_000,
331 };
332 let mut batcher = HybridBatcher::new(config);
333
334 assert!(batcher.add(item("a", 200)).is_none());
336 assert!(batcher.add(item("b", 200)).is_none());
337
338 let reason = batcher.add(item("c", 200));
340 assert_eq!(reason, Some(FlushReason::Size));
341 }
342
343 #[test]
344 fn test_flush_on_time_threshold() {
345 let config = BatchConfig {
346 flush_count: 1000,
347 flush_bytes: 1_000_000,
348 flush_ms: 10, };
350 let mut batcher = HybridBatcher::new(config);
351
352 batcher.add(item("a", 100));
353
354 assert!(!batcher.should_flush_time());
356
357 sleep(Duration::from_millis(15));
359
360 assert!(batcher.should_flush_time());
362 }
363
364 #[test]
365 fn test_take_if_ready_returns_batch() {
366 let config = BatchConfig {
367 flush_count: 2,
368 flush_bytes: 1_000_000,
369 flush_ms: 10_000,
370 };
371 let mut batcher = HybridBatcher::new(config);
372
373 batcher.add(item("a", 100));
374
375 assert!(batcher.take_if_ready().is_none());
377
378 batcher.add(item("b", 200));
379
380 let batch = batcher.take_if_ready().unwrap();
382 assert_eq!(batch.items.len(), 2);
383 assert_eq!(batch.total_bytes, 300);
384 assert_eq!(batch.reason, FlushReason::Count);
385
386 assert!(batcher.is_empty());
388 }
389
390 #[test]
391 fn test_force_flush() {
392 let mut batcher = HybridBatcher::new(BatchConfig::default());
393
394 batcher.add(item("a", 100));
395 batcher.add(item("b", 200));
396
397 let batch = batcher.force_flush().unwrap();
399 assert_eq!(batch.items.len(), 2);
400 assert_eq!(batch.total_bytes, 300);
401 assert_eq!(batch.reason, FlushReason::Manual);
402
403 assert!(batcher.is_empty());
405
406 assert!(batcher.force_flush().is_none());
408
409 batcher.add(item("c", 100));
411 let batch = batcher.force_flush_with_reason(FlushReason::Shutdown).unwrap();
412 assert_eq!(batch.reason, FlushReason::Shutdown);
413 }
414
415 #[test]
416 fn test_take_resets_batch() {
417 let mut batcher = HybridBatcher::new(BatchConfig::default());
418
419 batcher.add(item("a", 100));
420 batcher.add(item("b", 200));
421
422 let items = batcher.take_batch();
423 assert_eq!(items.len(), 2);
424
425 let (count, bytes, age) = batcher.stats();
427 assert_eq!(count, 0);
428 assert_eq!(bytes, 0);
429 assert!(age < Duration::from_millis(10)); }
431
432 #[test]
433 fn test_add_batch() {
434 let mut batcher = HybridBatcher::new(BatchConfig::default());
435
436 let items = vec![item("a", 100), item("b", 200), item("c", 300)];
437 batcher.add_batch(items);
438
439 let (count, bytes, _) = batcher.stats();
440 assert_eq!(count, 3);
441 assert_eq!(bytes, 600);
442 }
443
444 #[test]
445 fn test_count_beats_size_on_simultaneous_threshold() {
446 let config = BatchConfig {
447 flush_count: 2,
448 flush_bytes: 200,
449 flush_ms: 10_000,
450 };
451 let mut batcher = HybridBatcher::new(config);
452
453 batcher.add(item("a", 100));
454
455 let reason = batcher.add(item("b", 100));
457
458 assert_eq!(reason, Some(FlushReason::Count));
460 }
461}