1use std::time::{Duration, Instant};
34use tracing::debug;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum FlushReason {
39 Time,
41 Count,
43 Size,
45 Manual,
47 Shutdown,
49}
50
51#[derive(Debug, Clone)]
53pub struct BatchConfig {
54 pub flush_ms: u64,
56 pub flush_count: usize,
58 pub flush_bytes: usize,
60}
61
62impl Default for BatchConfig {
63 fn default() -> Self {
64 Self {
65 flush_ms: 100,
66 flush_count: 1000,
67 flush_bytes: 1024 * 1024, }
69 }
70}
71
72#[derive(Debug)]
74pub struct FlushBatch<T> {
75 pub items: Vec<T>,
76 pub total_bytes: usize,
77 pub reason: FlushReason,
78}
79
80#[derive(Debug)]
82pub struct Batch<T> {
83 pub items: Vec<T>,
84 pub total_bytes: usize,
85 pub created_at: Instant,
86}
87
88impl<T> Batch<T> {
89 pub fn new() -> Self {
90 Self {
91 items: Vec::new(),
92 total_bytes: 0,
93 created_at: Instant::now(),
94 }
95 }
96
97 pub fn is_empty(&self) -> bool {
98 self.items.is_empty()
99 }
100
101 pub fn len(&self) -> usize {
102 self.items.len()
103 }
104
105 pub fn age(&self) -> Duration {
106 self.created_at.elapsed()
107 }
108
109 pub fn push(&mut self, item: T, size_bytes: usize) {
110 self.items.push(item);
111 self.total_bytes += size_bytes;
112 }
113
114 pub fn take(&mut self) -> Vec<T> {
115 self.total_bytes = 0;
116 self.created_at = Instant::now();
117 std::mem::take(&mut self.items)
118 }
119}
120
121impl<T> Default for Batch<T> {
122 fn default() -> Self {
123 Self::new()
124 }
125}
126
127pub struct HybridBatcher<T> {
130 config: BatchConfig,
131 batch: Batch<T>,
132}
133
134impl<T> HybridBatcher<T> {
135 pub fn new(config: BatchConfig) -> Self {
136 Self {
137 config,
138 batch: Batch::new(),
139 }
140 }
141
142 pub fn push(&mut self, item: T, size_bytes: usize) -> Option<FlushReason> {
144 self.batch.push(item, size_bytes);
145
146 if self.batch.len() >= self.config.flush_count {
148 Some(FlushReason::Count)
149 } else if self.batch.total_bytes >= self.config.flush_bytes {
150 Some(FlushReason::Size)
151 } else {
152 None
153 }
154 }
155
156 #[must_use]
158 pub fn should_flush_time(&self) -> bool {
159 !self.batch.is_empty()
160 && self.batch.age() >= Duration::from_millis(self.config.flush_ms)
161 }
162
163 pub fn take_batch(&mut self) -> Vec<T> {
165 let count = self.batch.len();
166 let bytes = self.batch.total_bytes;
167 let items = self.batch.take();
168 debug!(count, bytes, "Batch taken for flush");
169 items
170 }
171
172 #[must_use]
174 pub fn is_empty(&self) -> bool {
175 self.batch.is_empty()
176 }
177
178 #[must_use]
180 pub fn stats(&self) -> (usize, usize, Duration) {
181 (self.batch.len(), self.batch.total_bytes, self.batch.age())
182 }
183}
184
185impl<T: SizedItem> HybridBatcher<T> {
187 pub fn add(&mut self, item: T) -> Option<FlushReason> {
189 let size = item.size_bytes();
190 self.push(item, size)
191 }
192
193 pub fn add_batch(&mut self, items: Vec<T>) {
195 for item in items {
196 self.add(item);
197 }
198 }
199
200 pub fn take_if_ready(&mut self) -> Option<FlushBatch<T>> {
202 let reason = if self.batch.len() >= self.config.flush_count {
204 Some(FlushReason::Count)
205 } else if self.batch.total_bytes >= self.config.flush_bytes {
206 Some(FlushReason::Size)
207 } else if self.should_flush_time() {
208 Some(FlushReason::Time)
209 } else {
210 None
211 };
212
213 reason.map(|r| {
214 let total_bytes = self.batch.total_bytes;
216 FlushBatch {
217 items: self.batch.take(),
218 total_bytes,
219 reason: r,
220 }
221 })
222 }
223
224 pub fn force_flush(&mut self) -> Option<FlushBatch<T>> {
226 self.force_flush_with_reason(FlushReason::Manual)
227 }
228
229 pub fn force_flush_with_reason(&mut self, reason: FlushReason) -> Option<FlushBatch<T>> {
231 if self.batch.is_empty() {
232 return None;
233 }
234 let total_bytes = self.batch.total_bytes;
236 Some(FlushBatch {
237 items: self.batch.take(),
238 total_bytes,
239 reason,
240 })
241 }
242}
243
244impl<T: BatchableItem> HybridBatcher<T> {
246 #[must_use]
248 pub fn contains(&self, id: &str) -> bool {
249 self.batch.items.iter().any(|item| item.id() == id)
250 }
251}
252
253pub trait SizedItem {
255 #[must_use]
256 fn size_bytes(&self) -> usize;
257}
258
259pub trait BatchableItem: SizedItem {
261 fn id(&self) -> &str;
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use std::thread::sleep;
268
269 #[derive(Debug, Clone)]
271 #[allow(dead_code)]
272 struct TestItem {
273 id: String,
274 size: usize,
275 }
276
277 impl SizedItem for TestItem {
278 fn size_bytes(&self) -> usize {
279 self.size
280 }
281 }
282
283 fn item(id: &str, size: usize) -> TestItem {
284 TestItem { id: id.to_string(), size }
285 }
286
287 #[test]
288 fn test_batch_empty_initially() {
289 let batcher: HybridBatcher<TestItem> = HybridBatcher::new(BatchConfig::default());
290 assert!(batcher.is_empty());
291 let (count, bytes, _) = batcher.stats();
292 assert_eq!(count, 0);
293 assert_eq!(bytes, 0);
294 }
295
296 #[test]
297 fn test_batch_tracks_items_and_bytes() {
298 let mut batcher = HybridBatcher::new(BatchConfig::default());
299
300 batcher.add(item("a", 100));
301 batcher.add(item("b", 200));
302 batcher.add(item("c", 150));
303
304 let (count, bytes, _) = batcher.stats();
305 assert_eq!(count, 3);
306 assert_eq!(bytes, 450);
307 assert!(!batcher.is_empty());
308 }
309
310 #[test]
311 fn test_flush_on_count_threshold() {
312 let config = BatchConfig {
313 flush_count: 3,
314 flush_bytes: 1_000_000,
315 flush_ms: 10_000,
316 };
317 let mut batcher = HybridBatcher::new(config);
318
319 assert!(batcher.add(item("a", 100)).is_none());
321 assert!(batcher.add(item("b", 100)).is_none());
322
323 let reason = batcher.add(item("c", 100));
325 assert_eq!(reason, Some(FlushReason::Count));
326 }
327
328 #[test]
329 fn test_flush_on_size_threshold() {
330 let config = BatchConfig {
331 flush_count: 1000,
332 flush_bytes: 500,
333 flush_ms: 10_000,
334 };
335 let mut batcher = HybridBatcher::new(config);
336
337 assert!(batcher.add(item("a", 200)).is_none());
339 assert!(batcher.add(item("b", 200)).is_none());
340
341 let reason = batcher.add(item("c", 200));
343 assert_eq!(reason, Some(FlushReason::Size));
344 }
345
346 #[test]
347 fn test_flush_on_time_threshold() {
348 let config = BatchConfig {
349 flush_count: 1000,
350 flush_bytes: 1_000_000,
351 flush_ms: 10, };
353 let mut batcher = HybridBatcher::new(config);
354
355 batcher.add(item("a", 100));
356
357 assert!(!batcher.should_flush_time());
359
360 sleep(Duration::from_millis(15));
362
363 assert!(batcher.should_flush_time());
365 }
366
367 #[test]
368 fn test_take_if_ready_returns_batch() {
369 let config = BatchConfig {
370 flush_count: 2,
371 flush_bytes: 1_000_000,
372 flush_ms: 10_000,
373 };
374 let mut batcher = HybridBatcher::new(config);
375
376 batcher.add(item("a", 100));
377
378 assert!(batcher.take_if_ready().is_none());
380
381 batcher.add(item("b", 200));
382
383 let batch = batcher.take_if_ready().unwrap();
385 assert_eq!(batch.items.len(), 2);
386 assert_eq!(batch.total_bytes, 300);
387 assert_eq!(batch.reason, FlushReason::Count);
388
389 assert!(batcher.is_empty());
391 }
392
393 #[test]
394 fn test_force_flush() {
395 let mut batcher = HybridBatcher::new(BatchConfig::default());
396
397 batcher.add(item("a", 100));
398 batcher.add(item("b", 200));
399
400 let batch = batcher.force_flush().unwrap();
402 assert_eq!(batch.items.len(), 2);
403 assert_eq!(batch.total_bytes, 300);
404 assert_eq!(batch.reason, FlushReason::Manual);
405
406 assert!(batcher.is_empty());
408
409 assert!(batcher.force_flush().is_none());
411
412 batcher.add(item("c", 100));
414 let batch = batcher.force_flush_with_reason(FlushReason::Shutdown).unwrap();
415 assert_eq!(batch.reason, FlushReason::Shutdown);
416 }
417
418 #[test]
419 fn test_take_resets_batch() {
420 let mut batcher = HybridBatcher::new(BatchConfig::default());
421
422 batcher.add(item("a", 100));
423 batcher.add(item("b", 200));
424
425 let items = batcher.take_batch();
426 assert_eq!(items.len(), 2);
427
428 let (count, bytes, age) = batcher.stats();
430 assert_eq!(count, 0);
431 assert_eq!(bytes, 0);
432 assert!(age < Duration::from_millis(10)); }
434
435 #[test]
436 fn test_add_batch() {
437 let mut batcher = HybridBatcher::new(BatchConfig::default());
438
439 let items = vec![item("a", 100), item("b", 200), item("c", 300)];
440 batcher.add_batch(items);
441
442 let (count, bytes, _) = batcher.stats();
443 assert_eq!(count, 3);
444 assert_eq!(bytes, 600);
445 }
446
447 #[test]
448 fn test_count_beats_size_on_simultaneous_threshold() {
449 let config = BatchConfig {
450 flush_count: 2,
451 flush_bytes: 200,
452 flush_ms: 10_000,
453 };
454 let mut batcher = HybridBatcher::new(config);
455
456 batcher.add(item("a", 100));
457
458 let reason = batcher.add(item("b", 100));
460
461 assert_eq!(reason, Some(FlushReason::Count));
463 }
464}