sync_engine/batching/
hybrid_batcher.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Hybrid batching for efficient writes.
5//!
6//! The [`HybridBatcher`] collects items and flushes them in batches based on
7//! configurable thresholds: item count, total bytes, or elapsed time.
8//!
9//! # Example
10//!
11//! ```
12//! use sync_engine::{HybridBatcher, BatchConfig, SizedItem};
13//!
14//! #[derive(Clone)]
15//! struct Item { data: String }
16//! impl SizedItem for Item {
17//!     fn size_bytes(&self) -> usize { self.data.len() }
18//! }
19//!
20//! let config = BatchConfig {
21//!     flush_ms: 100,
22//!     flush_count: 10,
23//!     flush_bytes: 1024,
24//! };
25//!
26//! let mut batcher: HybridBatcher<Item> = HybridBatcher::new(config);
27//! assert!(batcher.is_empty());
28//!
29//! batcher.add(Item { data: "hello".into() });
30//! assert!(!batcher.is_empty());
31//! ```
32
33use std::time::{Duration, Instant};
34use tracing::debug;
35
36/// Batch flush trigger reason
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum FlushReason {
39    /// Time threshold reached
40    Time,
41    /// Item count threshold reached
42    Count,
43    /// Byte size threshold reached
44    Size,
45    /// Manual flush requested
46    Manual,
47    /// Shutdown flush
48    Shutdown,
49}
50
51/// Configuration for hybrid batching
52#[derive(Debug, Clone)]
53pub struct BatchConfig {
54    /// Flush after this many milliseconds (even if batch is small)
55    pub flush_ms: u64,
56    /// Flush after this many items
57    pub flush_count: usize,
58    /// Flush after this many bytes
59    pub flush_bytes: usize,
60}
61
62impl Default for BatchConfig {
63    fn default() -> Self {
64        Self {
65            flush_ms: 100,
66            flush_count: 1000,
67            flush_bytes: 1024 * 1024, // 1 MB
68        }
69    }
70}
71
72/// A batch of items ready for flush
73#[derive(Debug)]
74pub struct FlushBatch<T> {
75    pub items: Vec<T>,
76    pub total_bytes: usize,
77    pub reason: FlushReason,
78}
79
80/// A batch of items pending flush
81#[derive(Debug)]
82pub struct Batch<T> {
83    pub items: Vec<T>,
84    pub total_bytes: usize,
85    pub created_at: Instant,
86}
87
88impl<T> Batch<T> {
89    pub fn new() -> Self {
90        Self {
91            items: Vec::new(),
92            total_bytes: 0,
93            created_at: Instant::now(),
94        }
95    }
96
97    pub fn is_empty(&self) -> bool {
98        self.items.is_empty()
99    }
100
101    pub fn len(&self) -> usize {
102        self.items.len()
103    }
104
105    pub fn age(&self) -> Duration {
106        self.created_at.elapsed()
107    }
108
109    pub fn push(&mut self, item: T, size_bytes: usize) {
110        self.items.push(item);
111        self.total_bytes += size_bytes;
112    }
113
114    pub fn take(&mut self) -> Vec<T> {
115        self.total_bytes = 0;
116        self.created_at = Instant::now();
117        std::mem::take(&mut self.items)
118    }
119}
120
121impl<T> Default for Batch<T> {
122    fn default() -> Self {
123        Self::new()
124    }
125}
126
127/// Hybrid batcher that flushes based on time, count, or size thresholds.
128/// Whichever threshold is hit first triggers the flush.
129pub struct HybridBatcher<T> {
130    config: BatchConfig,
131    batch: Batch<T>,
132}
133
134impl<T> HybridBatcher<T> {
135    pub fn new(config: BatchConfig) -> Self {
136        Self {
137            config,
138            batch: Batch::new(),
139        }
140    }
141
142    /// Add an item to the batch, returns flush reason if threshold hit
143    pub fn push(&mut self, item: T, size_bytes: usize) -> Option<FlushReason> {
144        self.batch.push(item, size_bytes);
145
146        // Check thresholds in priority order
147        if self.batch.len() >= self.config.flush_count {
148            Some(FlushReason::Count)
149        } else if self.batch.total_bytes >= self.config.flush_bytes {
150            Some(FlushReason::Size)
151        } else {
152            None
153        }
154    }
155
156    /// Check if time threshold exceeded
157    #[must_use]
158    pub fn should_flush_time(&self) -> bool {
159        !self.batch.is_empty() 
160            && self.batch.age() >= Duration::from_millis(self.config.flush_ms)
161    }
162
163    /// Take the current batch for flushing
164    pub fn take_batch(&mut self) -> Vec<T> {
165        let count = self.batch.len();
166        let bytes = self.batch.total_bytes;
167        let items = self.batch.take();
168        debug!(count, bytes, "Batch taken for flush");
169        items
170    }
171
172    /// Check if batch is empty
173    #[must_use]
174    pub fn is_empty(&self) -> bool {
175        self.batch.is_empty()
176    }
177
178    /// Get current batch stats
179    #[must_use]
180    pub fn stats(&self) -> (usize, usize, Duration) {
181        (self.batch.len(), self.batch.total_bytes, self.batch.age())
182    }
183}
184
185/// Extension methods for SyncItem batching (avoids orphan rules)
186impl<T: SizedItem> HybridBatcher<T> {
187    /// Add an item using its intrinsic size
188    pub fn add(&mut self, item: T) -> Option<FlushReason> {
189        let size = item.size_bytes();
190        self.push(item, size)
191    }
192    
193    /// Add multiple items at once
194    pub fn add_batch(&mut self, items: Vec<T>) {
195        for item in items {
196            self.add(item);
197        }
198    }
199    
200    /// Take the batch if any threshold is ready
201    pub fn take_if_ready(&mut self) -> Option<FlushBatch<T>> {
202        // Check count/size threshold
203        let reason = if self.batch.len() >= self.config.flush_count {
204            Some(FlushReason::Count)
205        } else if self.batch.total_bytes >= self.config.flush_bytes {
206            Some(FlushReason::Size)
207        } else if self.should_flush_time() {
208            Some(FlushReason::Time)
209        } else {
210            None
211        };
212        
213        reason.map(|r| {
214            // Capture bytes BEFORE take() resets it
215            let total_bytes = self.batch.total_bytes;
216            FlushBatch {
217                items: self.batch.take(),
218                total_bytes,
219                reason: r,
220            }
221        })
222    }
223    
224    /// Force flush regardless of thresholds (for manual flush or shutdown)
225    pub fn force_flush(&mut self) -> Option<FlushBatch<T>> {
226        self.force_flush_with_reason(FlushReason::Manual)
227    }
228    
229    /// Force flush with a specific reason
230    pub fn force_flush_with_reason(&mut self, reason: FlushReason) -> Option<FlushBatch<T>> {
231        if self.batch.is_empty() {
232            return None;
233        }
234        // Capture bytes BEFORE take() resets it
235        let total_bytes = self.batch.total_bytes;
236        Some(FlushBatch {
237            items: self.batch.take(),
238            total_bytes,
239            reason,
240        })
241    }
242}
243
244/// Extension methods for batchable items (have ID for contains check)
245impl<T: BatchableItem> HybridBatcher<T> {
246    /// Check if an item with the given ID is in the pending batch.
247    #[must_use]
248    pub fn contains(&self, id: &str) -> bool {
249        self.batch.items.iter().any(|item| item.id() == id)
250    }
251}
252
253/// Trait for items that know their own size
254pub trait SizedItem {
255    #[must_use]
256    fn size_bytes(&self) -> usize;
257}
258
259/// Trait for items that have an ID (for contains check)
260pub trait BatchableItem: SizedItem {
261    fn id(&self) -> &str;
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use std::thread::sleep;
268
269    // Simple test item
270    #[derive(Debug, Clone)]
271    #[allow(dead_code)]
272    struct TestItem {
273        id: String,
274        size: usize,
275    }
276
277    impl SizedItem for TestItem {
278        fn size_bytes(&self) -> usize {
279            self.size
280        }
281    }
282
283    fn item(id: &str, size: usize) -> TestItem {
284        TestItem { id: id.to_string(), size }
285    }
286
287    #[test]
288    fn test_batch_empty_initially() {
289        let batcher: HybridBatcher<TestItem> = HybridBatcher::new(BatchConfig::default());
290        assert!(batcher.is_empty());
291        let (count, bytes, _) = batcher.stats();
292        assert_eq!(count, 0);
293        assert_eq!(bytes, 0);
294    }
295
296    #[test]
297    fn test_batch_tracks_items_and_bytes() {
298        let mut batcher = HybridBatcher::new(BatchConfig::default());
299        
300        batcher.add(item("a", 100));
301        batcher.add(item("b", 200));
302        batcher.add(item("c", 150));
303        
304        let (count, bytes, _) = batcher.stats();
305        assert_eq!(count, 3);
306        assert_eq!(bytes, 450);
307        assert!(!batcher.is_empty());
308    }
309
310    #[test]
311    fn test_flush_on_count_threshold() {
312        let config = BatchConfig {
313            flush_count: 3,
314            flush_bytes: 1_000_000,
315            flush_ms: 10_000,
316        };
317        let mut batcher = HybridBatcher::new(config);
318        
319        // First two don't trigger
320        assert!(batcher.add(item("a", 100)).is_none());
321        assert!(batcher.add(item("b", 100)).is_none());
322        
323        // Third triggers count threshold
324        let reason = batcher.add(item("c", 100));
325        assert_eq!(reason, Some(FlushReason::Count));
326    }
327
328    #[test]
329    fn test_flush_on_size_threshold() {
330        let config = BatchConfig {
331            flush_count: 1000,
332            flush_bytes: 500,
333            flush_ms: 10_000,
334        };
335        let mut batcher = HybridBatcher::new(config);
336        
337        // First doesn't trigger
338        assert!(batcher.add(item("a", 200)).is_none());
339        assert!(batcher.add(item("b", 200)).is_none());
340        
341        // This pushes over 500 bytes
342        let reason = batcher.add(item("c", 200));
343        assert_eq!(reason, Some(FlushReason::Size));
344    }
345
346    #[test]
347    fn test_flush_on_time_threshold() {
348        let config = BatchConfig {
349            flush_count: 1000,
350            flush_bytes: 1_000_000,
351            flush_ms: 10, // 10ms
352        };
353        let mut batcher = HybridBatcher::new(config);
354        
355        batcher.add(item("a", 100));
356        
357        // Not ready yet
358        assert!(!batcher.should_flush_time());
359        
360        // Wait for threshold
361        sleep(Duration::from_millis(15));
362        
363        // Now ready
364        assert!(batcher.should_flush_time());
365    }
366
367    #[test]
368    fn test_take_if_ready_returns_batch() {
369        let config = BatchConfig {
370            flush_count: 2,
371            flush_bytes: 1_000_000,
372            flush_ms: 10_000,
373        };
374        let mut batcher = HybridBatcher::new(config);
375        
376        batcher.add(item("a", 100));
377        
378        // Not ready yet
379        assert!(batcher.take_if_ready().is_none());
380        
381        batcher.add(item("b", 200));
382        
383        // Now ready (count threshold)
384        let batch = batcher.take_if_ready().unwrap();
385        assert_eq!(batch.items.len(), 2);
386        assert_eq!(batch.total_bytes, 300);
387        assert_eq!(batch.reason, FlushReason::Count);
388        
389        // Batcher should be empty now
390        assert!(batcher.is_empty());
391    }
392
393    #[test]
394    fn test_force_flush() {
395        let mut batcher = HybridBatcher::new(BatchConfig::default());
396        
397        batcher.add(item("a", 100));
398        batcher.add(item("b", 200));
399        
400        // Force flush regardless of thresholds
401        let batch = batcher.force_flush().unwrap();
402        assert_eq!(batch.items.len(), 2);
403        assert_eq!(batch.total_bytes, 300);
404        assert_eq!(batch.reason, FlushReason::Manual);
405        
406        // Should be empty now
407        assert!(batcher.is_empty());
408        
409        // Force flush on empty returns None
410        assert!(batcher.force_flush().is_none());
411        
412        // Test force_flush_with_reason
413        batcher.add(item("c", 100));
414        let batch = batcher.force_flush_with_reason(FlushReason::Shutdown).unwrap();
415        assert_eq!(batch.reason, FlushReason::Shutdown);
416    }
417
418    #[test]
419    fn test_take_resets_batch() {
420        let mut batcher = HybridBatcher::new(BatchConfig::default());
421        
422        batcher.add(item("a", 100));
423        batcher.add(item("b", 200));
424        
425        let items = batcher.take_batch();
426        assert_eq!(items.len(), 2);
427        
428        // Stats should be reset
429        let (count, bytes, age) = batcher.stats();
430        assert_eq!(count, 0);
431        assert_eq!(bytes, 0);
432        assert!(age < Duration::from_millis(10)); // Timer reset
433    }
434
435    #[test]
436    fn test_add_batch() {
437        let mut batcher = HybridBatcher::new(BatchConfig::default());
438        
439        let items = vec![item("a", 100), item("b", 200), item("c", 300)];
440        batcher.add_batch(items);
441        
442        let (count, bytes, _) = batcher.stats();
443        assert_eq!(count, 3);
444        assert_eq!(bytes, 600);
445    }
446
447    #[test]
448    fn test_count_beats_size_on_simultaneous_threshold() {
449        let config = BatchConfig {
450            flush_count: 2,
451            flush_bytes: 200,
452            flush_ms: 10_000,
453        };
454        let mut batcher = HybridBatcher::new(config);
455        
456        batcher.add(item("a", 100));
457        
458        // This hits both count (2) AND size (200) thresholds
459        let reason = batcher.add(item("b", 100));
460        
461        // Count should win (checked first)
462        assert_eq!(reason, Some(FlushReason::Count));
463    }
464}