1use std::time::{Duration, Instant};
34use tracing::debug;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum FlushReason {
39 Time,
41 Count,
43 Size,
45 Manual,
47 Shutdown,
49 ViewQueue,
51}
52
53#[derive(Debug, Clone)]
55pub struct BatchConfig {
56 pub flush_ms: u64,
58 pub flush_count: usize,
60 pub flush_bytes: usize,
62}
63
64impl Default for BatchConfig {
65 fn default() -> Self {
66 Self {
67 flush_ms: 100,
68 flush_count: 1000,
69 flush_bytes: 1024 * 1024, }
71 }
72}
73
74#[derive(Debug)]
76pub struct FlushBatch<T> {
77 pub items: Vec<T>,
78 pub total_bytes: usize,
79 pub reason: FlushReason,
80}
81
82#[derive(Debug)]
84pub struct Batch<T> {
85 pub items: Vec<T>,
86 pub total_bytes: usize,
87 pub created_at: Instant,
88}
89
90impl<T> Batch<T> {
91 pub fn new() -> Self {
92 Self {
93 items: Vec::new(),
94 total_bytes: 0,
95 created_at: Instant::now(),
96 }
97 }
98
99 pub fn is_empty(&self) -> bool {
100 self.items.is_empty()
101 }
102
103 pub fn len(&self) -> usize {
104 self.items.len()
105 }
106
107 pub fn age(&self) -> Duration {
108 self.created_at.elapsed()
109 }
110
111 pub fn push(&mut self, item: T, size_bytes: usize) {
112 self.items.push(item);
113 self.total_bytes += size_bytes;
114 }
115
116 pub fn take(&mut self) -> Vec<T> {
117 self.total_bytes = 0;
118 self.created_at = Instant::now();
119 std::mem::take(&mut self.items)
120 }
121}
122
123impl<T> Default for Batch<T> {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129pub struct HybridBatcher<T> {
132 config: BatchConfig,
133 batch: Batch<T>,
134}
135
136impl<T> HybridBatcher<T> {
137 pub fn new(config: BatchConfig) -> Self {
138 Self {
139 config,
140 batch: Batch::new(),
141 }
142 }
143
144 pub fn push(&mut self, item: T, size_bytes: usize) -> Option<FlushReason> {
146 self.batch.push(item, size_bytes);
147
148 if self.batch.len() >= self.config.flush_count {
150 Some(FlushReason::Count)
151 } else if self.batch.total_bytes >= self.config.flush_bytes {
152 Some(FlushReason::Size)
153 } else {
154 None
155 }
156 }
157
158 #[must_use]
160 pub fn should_flush_time(&self) -> bool {
161 !self.batch.is_empty()
162 && self.batch.age() >= Duration::from_millis(self.config.flush_ms)
163 }
164
165 pub fn take_batch(&mut self) -> Vec<T> {
167 let count = self.batch.len();
168 let bytes = self.batch.total_bytes;
169 let items = self.batch.take();
170 debug!(count, bytes, "Batch taken for flush");
171 items
172 }
173
174 #[must_use]
176 pub fn is_empty(&self) -> bool {
177 self.batch.is_empty()
178 }
179
180 #[must_use]
182 pub fn stats(&self) -> (usize, usize, Duration) {
183 (self.batch.len(), self.batch.total_bytes, self.batch.age())
184 }
185}
186
187impl<T: SizedItem> HybridBatcher<T> {
189 pub fn add(&mut self, item: T) -> Option<FlushReason> {
191 let size = item.size_bytes();
192 self.push(item, size)
193 }
194
195 pub fn add_batch(&mut self, items: Vec<T>) {
197 for item in items {
198 self.add(item);
199 }
200 }
201
202 pub fn take_if_ready(&mut self) -> Option<FlushBatch<T>> {
204 let reason = if self.batch.len() >= self.config.flush_count {
206 Some(FlushReason::Count)
207 } else if self.batch.total_bytes >= self.config.flush_bytes {
208 Some(FlushReason::Size)
209 } else if self.should_flush_time() {
210 Some(FlushReason::Time)
211 } else {
212 None
213 };
214
215 reason.map(|r| {
216 let total_bytes = self.batch.total_bytes;
218 FlushBatch {
219 items: self.batch.take(),
220 total_bytes,
221 reason: r,
222 }
223 })
224 }
225
226 pub fn force_flush(&mut self) -> Option<FlushBatch<T>> {
228 self.force_flush_with_reason(FlushReason::Manual)
229 }
230
231 pub fn force_flush_with_reason(&mut self, reason: FlushReason) -> Option<FlushBatch<T>> {
233 if self.batch.is_empty() {
234 return None;
235 }
236 let total_bytes = self.batch.total_bytes;
238 Some(FlushBatch {
239 items: self.batch.take(),
240 total_bytes,
241 reason,
242 })
243 }
244}
245
246impl<T: BatchableItem> HybridBatcher<T> {
248 #[must_use]
250 pub fn contains(&self, id: &str) -> bool {
251 self.batch.items.iter().any(|item| item.id() == id)
252 }
253}
254
255pub trait SizedItem {
257 #[must_use]
258 fn size_bytes(&self) -> usize;
259}
260
261pub trait BatchableItem: SizedItem {
263 fn id(&self) -> &str;
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use std::thread::sleep;
270
271 #[derive(Debug, Clone)]
273 #[allow(dead_code)]
274 struct TestItem {
275 id: String,
276 size: usize,
277 }
278
279 impl SizedItem for TestItem {
280 fn size_bytes(&self) -> usize {
281 self.size
282 }
283 }
284
285 fn item(id: &str, size: usize) -> TestItem {
286 TestItem { id: id.to_string(), size }
287 }
288
289 #[test]
290 fn test_batch_empty_initially() {
291 let batcher: HybridBatcher<TestItem> = HybridBatcher::new(BatchConfig::default());
292 assert!(batcher.is_empty());
293 let (count, bytes, _) = batcher.stats();
294 assert_eq!(count, 0);
295 assert_eq!(bytes, 0);
296 }
297
298 #[test]
299 fn test_batch_tracks_items_and_bytes() {
300 let mut batcher = HybridBatcher::new(BatchConfig::default());
301
302 batcher.add(item("a", 100));
303 batcher.add(item("b", 200));
304 batcher.add(item("c", 150));
305
306 let (count, bytes, _) = batcher.stats();
307 assert_eq!(count, 3);
308 assert_eq!(bytes, 450);
309 assert!(!batcher.is_empty());
310 }
311
312 #[test]
313 fn test_flush_on_count_threshold() {
314 let config = BatchConfig {
315 flush_count: 3,
316 flush_bytes: 1_000_000,
317 flush_ms: 10_000,
318 };
319 let mut batcher = HybridBatcher::new(config);
320
321 assert!(batcher.add(item("a", 100)).is_none());
323 assert!(batcher.add(item("b", 100)).is_none());
324
325 let reason = batcher.add(item("c", 100));
327 assert_eq!(reason, Some(FlushReason::Count));
328 }
329
330 #[test]
331 fn test_flush_on_size_threshold() {
332 let config = BatchConfig {
333 flush_count: 1000,
334 flush_bytes: 500,
335 flush_ms: 10_000,
336 };
337 let mut batcher = HybridBatcher::new(config);
338
339 assert!(batcher.add(item("a", 200)).is_none());
341 assert!(batcher.add(item("b", 200)).is_none());
342
343 let reason = batcher.add(item("c", 200));
345 assert_eq!(reason, Some(FlushReason::Size));
346 }
347
348 #[test]
349 fn test_flush_on_time_threshold() {
350 let config = BatchConfig {
351 flush_count: 1000,
352 flush_bytes: 1_000_000,
353 flush_ms: 10, };
355 let mut batcher = HybridBatcher::new(config);
356
357 batcher.add(item("a", 100));
358
359 assert!(!batcher.should_flush_time());
361
362 sleep(Duration::from_millis(15));
364
365 assert!(batcher.should_flush_time());
367 }
368
369 #[test]
370 fn test_take_if_ready_returns_batch() {
371 let config = BatchConfig {
372 flush_count: 2,
373 flush_bytes: 1_000_000,
374 flush_ms: 10_000,
375 };
376 let mut batcher = HybridBatcher::new(config);
377
378 batcher.add(item("a", 100));
379
380 assert!(batcher.take_if_ready().is_none());
382
383 batcher.add(item("b", 200));
384
385 let batch = batcher.take_if_ready().unwrap();
387 assert_eq!(batch.items.len(), 2);
388 assert_eq!(batch.total_bytes, 300);
389 assert_eq!(batch.reason, FlushReason::Count);
390
391 assert!(batcher.is_empty());
393 }
394
395 #[test]
396 fn test_force_flush() {
397 let mut batcher = HybridBatcher::new(BatchConfig::default());
398
399 batcher.add(item("a", 100));
400 batcher.add(item("b", 200));
401
402 let batch = batcher.force_flush().unwrap();
404 assert_eq!(batch.items.len(), 2);
405 assert_eq!(batch.total_bytes, 300);
406 assert_eq!(batch.reason, FlushReason::Manual);
407
408 assert!(batcher.is_empty());
410
411 assert!(batcher.force_flush().is_none());
413
414 batcher.add(item("c", 100));
416 let batch = batcher.force_flush_with_reason(FlushReason::Shutdown).unwrap();
417 assert_eq!(batch.reason, FlushReason::Shutdown);
418 }
419
420 #[test]
421 fn test_take_resets_batch() {
422 let mut batcher = HybridBatcher::new(BatchConfig::default());
423
424 batcher.add(item("a", 100));
425 batcher.add(item("b", 200));
426
427 let items = batcher.take_batch();
428 assert_eq!(items.len(), 2);
429
430 let (count, bytes, age) = batcher.stats();
432 assert_eq!(count, 0);
433 assert_eq!(bytes, 0);
434 assert!(age < Duration::from_millis(10)); }
436
437 #[test]
438 fn test_add_batch() {
439 let mut batcher = HybridBatcher::new(BatchConfig::default());
440
441 let items = vec![item("a", 100), item("b", 200), item("c", 300)];
442 batcher.add_batch(items);
443
444 let (count, bytes, _) = batcher.stats();
445 assert_eq!(count, 3);
446 assert_eq!(bytes, 600);
447 }
448
449 #[test]
450 fn test_count_beats_size_on_simultaneous_threshold() {
451 let config = BatchConfig {
452 flush_count: 2,
453 flush_bytes: 200,
454 flush_ms: 10_000,
455 };
456 let mut batcher = HybridBatcher::new(config);
457
458 batcher.add(item("a", 100));
459
460 let reason = batcher.add(item("b", 100));
462
463 assert_eq!(reason, Some(FlushReason::Count));
465 }
466}