sync_engine/batching/
hybrid_batcher.rs

1//! Hybrid batching for efficient writes.
2//!
3//! The [`HybridBatcher`] collects items and flushes them in batches based on
4//! configurable thresholds: item count, total bytes, or elapsed time.
5//!
6//! # Example
7//!
8//! ```
9//! use sync_engine::{HybridBatcher, BatchConfig, SizedItem};
10//!
11//! #[derive(Clone)]
12//! struct Item { data: String }
13//! impl SizedItem for Item {
14//!     fn size_bytes(&self) -> usize { self.data.len() }
15//! }
16//!
17//! let config = BatchConfig {
18//!     flush_ms: 100,
19//!     flush_count: 10,
20//!     flush_bytes: 1024,
21//! };
22//!
23//! let mut batcher: HybridBatcher<Item> = HybridBatcher::new(config);
24//! assert!(batcher.is_empty());
25//!
26//! batcher.add(Item { data: "hello".into() });
27//! assert!(!batcher.is_empty());
28//! ```
29
30use std::time::{Duration, Instant};
31use tracing::debug;
32
33/// Batch flush trigger reason
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum FlushReason {
36    /// Time threshold reached
37    Time,
38    /// Item count threshold reached
39    Count,
40    /// Byte size threshold reached
41    Size,
42    /// Manual flush requested
43    Manual,
44    /// Shutdown flush
45    Shutdown,
46}
47
48/// Configuration for hybrid batching
49#[derive(Debug, Clone)]
50pub struct BatchConfig {
51    /// Flush after this many milliseconds (even if batch is small)
52    pub flush_ms: u64,
53    /// Flush after this many items
54    pub flush_count: usize,
55    /// Flush after this many bytes
56    pub flush_bytes: usize,
57}
58
59impl Default for BatchConfig {
60    fn default() -> Self {
61        Self {
62            flush_ms: 100,
63            flush_count: 1000,
64            flush_bytes: 1024 * 1024, // 1 MB
65        }
66    }
67}
68
69/// A batch of items ready for flush
70#[derive(Debug)]
71pub struct FlushBatch<T> {
72    pub items: Vec<T>,
73    pub total_bytes: usize,
74    pub reason: FlushReason,
75}
76
77/// A batch of items pending flush
78#[derive(Debug)]
79pub struct Batch<T> {
80    pub items: Vec<T>,
81    pub total_bytes: usize,
82    pub created_at: Instant,
83}
84
85impl<T> Batch<T> {
86    pub fn new() -> Self {
87        Self {
88            items: Vec::new(),
89            total_bytes: 0,
90            created_at: Instant::now(),
91        }
92    }
93
94    pub fn is_empty(&self) -> bool {
95        self.items.is_empty()
96    }
97
98    pub fn len(&self) -> usize {
99        self.items.len()
100    }
101
102    pub fn age(&self) -> Duration {
103        self.created_at.elapsed()
104    }
105
106    pub fn push(&mut self, item: T, size_bytes: usize) {
107        self.items.push(item);
108        self.total_bytes += size_bytes;
109    }
110
111    pub fn take(&mut self) -> Vec<T> {
112        self.total_bytes = 0;
113        self.created_at = Instant::now();
114        std::mem::take(&mut self.items)
115    }
116}
117
118impl<T> Default for Batch<T> {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124/// Hybrid batcher that flushes based on time, count, or size thresholds.
125/// Whichever threshold is hit first triggers the flush.
126pub struct HybridBatcher<T> {
127    config: BatchConfig,
128    batch: Batch<T>,
129}
130
131impl<T> HybridBatcher<T> {
132    pub fn new(config: BatchConfig) -> Self {
133        Self {
134            config,
135            batch: Batch::new(),
136        }
137    }
138
139    /// Add an item to the batch, returns flush reason if threshold hit
140    pub fn push(&mut self, item: T, size_bytes: usize) -> Option<FlushReason> {
141        self.batch.push(item, size_bytes);
142
143        // Check thresholds in priority order
144        if self.batch.len() >= self.config.flush_count {
145            Some(FlushReason::Count)
146        } else if self.batch.total_bytes >= self.config.flush_bytes {
147            Some(FlushReason::Size)
148        } else {
149            None
150        }
151    }
152
153    /// Check if time threshold exceeded
154    #[must_use]
155    pub fn should_flush_time(&self) -> bool {
156        !self.batch.is_empty() 
157            && self.batch.age() >= Duration::from_millis(self.config.flush_ms)
158    }
159
160    /// Take the current batch for flushing
161    pub fn take_batch(&mut self) -> Vec<T> {
162        let count = self.batch.len();
163        let bytes = self.batch.total_bytes;
164        let items = self.batch.take();
165        debug!(count, bytes, "Batch taken for flush");
166        items
167    }
168
169    /// Check if batch is empty
170    #[must_use]
171    pub fn is_empty(&self) -> bool {
172        self.batch.is_empty()
173    }
174
175    /// Get current batch stats
176    #[must_use]
177    pub fn stats(&self) -> (usize, usize, Duration) {
178        (self.batch.len(), self.batch.total_bytes, self.batch.age())
179    }
180}
181
182/// Extension methods for SyncItem batching (avoids orphan rules)
183impl<T: SizedItem> HybridBatcher<T> {
184    /// Add an item using its intrinsic size
185    pub fn add(&mut self, item: T) -> Option<FlushReason> {
186        let size = item.size_bytes();
187        self.push(item, size)
188    }
189    
190    /// Add multiple items at once
191    pub fn add_batch(&mut self, items: Vec<T>) {
192        for item in items {
193            self.add(item);
194        }
195    }
196    
197    /// Take the batch if any threshold is ready
198    pub fn take_if_ready(&mut self) -> Option<FlushBatch<T>> {
199        // Check count/size threshold
200        let reason = if self.batch.len() >= self.config.flush_count {
201            Some(FlushReason::Count)
202        } else if self.batch.total_bytes >= self.config.flush_bytes {
203            Some(FlushReason::Size)
204        } else if self.should_flush_time() {
205            Some(FlushReason::Time)
206        } else {
207            None
208        };
209        
210        reason.map(|r| {
211            // Capture bytes BEFORE take() resets it
212            let total_bytes = self.batch.total_bytes;
213            FlushBatch {
214                items: self.batch.take(),
215                total_bytes,
216                reason: r,
217            }
218        })
219    }
220    
221    /// Force flush regardless of thresholds (for manual flush or shutdown)
222    pub fn force_flush(&mut self) -> Option<FlushBatch<T>> {
223        self.force_flush_with_reason(FlushReason::Manual)
224    }
225    
226    /// Force flush with a specific reason
227    pub fn force_flush_with_reason(&mut self, reason: FlushReason) -> Option<FlushBatch<T>> {
228        if self.batch.is_empty() {
229            return None;
230        }
231        // Capture bytes BEFORE take() resets it
232        let total_bytes = self.batch.total_bytes;
233        Some(FlushBatch {
234            items: self.batch.take(),
235            total_bytes,
236            reason,
237        })
238    }
239}
240
241/// Extension methods for batchable items (have ID for contains check)
242impl<T: BatchableItem> HybridBatcher<T> {
243    /// Check if an item with the given ID is in the pending batch.
244    #[must_use]
245    pub fn contains(&self, id: &str) -> bool {
246        self.batch.items.iter().any(|item| item.id() == id)
247    }
248}
249
250/// Trait for items that know their own size
251pub trait SizedItem {
252    #[must_use]
253    fn size_bytes(&self) -> usize;
254}
255
256/// Trait for items that have an ID (for contains check)
257pub trait BatchableItem: SizedItem {
258    fn id(&self) -> &str;
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264    use std::thread::sleep;
265
266    // Simple test item
267    #[derive(Debug, Clone)]
268    #[allow(dead_code)]
269    struct TestItem {
270        id: String,
271        size: usize,
272    }
273
274    impl SizedItem for TestItem {
275        fn size_bytes(&self) -> usize {
276            self.size
277        }
278    }
279
280    fn item(id: &str, size: usize) -> TestItem {
281        TestItem { id: id.to_string(), size }
282    }
283
284    #[test]
285    fn test_batch_empty_initially() {
286        let batcher: HybridBatcher<TestItem> = HybridBatcher::new(BatchConfig::default());
287        assert!(batcher.is_empty());
288        let (count, bytes, _) = batcher.stats();
289        assert_eq!(count, 0);
290        assert_eq!(bytes, 0);
291    }
292
293    #[test]
294    fn test_batch_tracks_items_and_bytes() {
295        let mut batcher = HybridBatcher::new(BatchConfig::default());
296        
297        batcher.add(item("a", 100));
298        batcher.add(item("b", 200));
299        batcher.add(item("c", 150));
300        
301        let (count, bytes, _) = batcher.stats();
302        assert_eq!(count, 3);
303        assert_eq!(bytes, 450);
304        assert!(!batcher.is_empty());
305    }
306
307    #[test]
308    fn test_flush_on_count_threshold() {
309        let config = BatchConfig {
310            flush_count: 3,
311            flush_bytes: 1_000_000,
312            flush_ms: 10_000,
313        };
314        let mut batcher = HybridBatcher::new(config);
315        
316        // First two don't trigger
317        assert!(batcher.add(item("a", 100)).is_none());
318        assert!(batcher.add(item("b", 100)).is_none());
319        
320        // Third triggers count threshold
321        let reason = batcher.add(item("c", 100));
322        assert_eq!(reason, Some(FlushReason::Count));
323    }
324
325    #[test]
326    fn test_flush_on_size_threshold() {
327        let config = BatchConfig {
328            flush_count: 1000,
329            flush_bytes: 500,
330            flush_ms: 10_000,
331        };
332        let mut batcher = HybridBatcher::new(config);
333        
334        // First doesn't trigger
335        assert!(batcher.add(item("a", 200)).is_none());
336        assert!(batcher.add(item("b", 200)).is_none());
337        
338        // This pushes over 500 bytes
339        let reason = batcher.add(item("c", 200));
340        assert_eq!(reason, Some(FlushReason::Size));
341    }
342
343    #[test]
344    fn test_flush_on_time_threshold() {
345        let config = BatchConfig {
346            flush_count: 1000,
347            flush_bytes: 1_000_000,
348            flush_ms: 10, // 10ms
349        };
350        let mut batcher = HybridBatcher::new(config);
351        
352        batcher.add(item("a", 100));
353        
354        // Not ready yet
355        assert!(!batcher.should_flush_time());
356        
357        // Wait for threshold
358        sleep(Duration::from_millis(15));
359        
360        // Now ready
361        assert!(batcher.should_flush_time());
362    }
363
364    #[test]
365    fn test_take_if_ready_returns_batch() {
366        let config = BatchConfig {
367            flush_count: 2,
368            flush_bytes: 1_000_000,
369            flush_ms: 10_000,
370        };
371        let mut batcher = HybridBatcher::new(config);
372        
373        batcher.add(item("a", 100));
374        
375        // Not ready yet
376        assert!(batcher.take_if_ready().is_none());
377        
378        batcher.add(item("b", 200));
379        
380        // Now ready (count threshold)
381        let batch = batcher.take_if_ready().unwrap();
382        assert_eq!(batch.items.len(), 2);
383        assert_eq!(batch.total_bytes, 300);
384        assert_eq!(batch.reason, FlushReason::Count);
385        
386        // Batcher should be empty now
387        assert!(batcher.is_empty());
388    }
389
390    #[test]
391    fn test_force_flush() {
392        let mut batcher = HybridBatcher::new(BatchConfig::default());
393        
394        batcher.add(item("a", 100));
395        batcher.add(item("b", 200));
396        
397        // Force flush regardless of thresholds
398        let batch = batcher.force_flush().unwrap();
399        assert_eq!(batch.items.len(), 2);
400        assert_eq!(batch.total_bytes, 300);
401        assert_eq!(batch.reason, FlushReason::Manual);
402        
403        // Should be empty now
404        assert!(batcher.is_empty());
405        
406        // Force flush on empty returns None
407        assert!(batcher.force_flush().is_none());
408        
409        // Test force_flush_with_reason
410        batcher.add(item("c", 100));
411        let batch = batcher.force_flush_with_reason(FlushReason::Shutdown).unwrap();
412        assert_eq!(batch.reason, FlushReason::Shutdown);
413    }
414
415    #[test]
416    fn test_take_resets_batch() {
417        let mut batcher = HybridBatcher::new(BatchConfig::default());
418        
419        batcher.add(item("a", 100));
420        batcher.add(item("b", 200));
421        
422        let items = batcher.take_batch();
423        assert_eq!(items.len(), 2);
424        
425        // Stats should be reset
426        let (count, bytes, age) = batcher.stats();
427        assert_eq!(count, 0);
428        assert_eq!(bytes, 0);
429        assert!(age < Duration::from_millis(10)); // Timer reset
430    }
431
432    #[test]
433    fn test_add_batch() {
434        let mut batcher = HybridBatcher::new(BatchConfig::default());
435        
436        let items = vec![item("a", 100), item("b", 200), item("c", 300)];
437        batcher.add_batch(items);
438        
439        let (count, bytes, _) = batcher.stats();
440        assert_eq!(count, 3);
441        assert_eq!(bytes, 600);
442    }
443
444    #[test]
445    fn test_count_beats_size_on_simultaneous_threshold() {
446        let config = BatchConfig {
447            flush_count: 2,
448            flush_bytes: 200,
449            flush_ms: 10_000,
450        };
451        let mut batcher = HybridBatcher::new(config);
452        
453        batcher.add(item("a", 100));
454        
455        // This hits both count (2) AND size (200) thresholds
456        let reason = batcher.add(item("b", 100));
457        
458        // Count should win (checked first)
459        assert_eq!(reason, Some(FlushReason::Count));
460    }
461}