sync_engine/batching/
hybrid_batcher.rs

1// Copyright (c) 2025-2026 Adrian Robinson. Licensed under the AGPL-3.0.
2// See LICENSE file in the project root for full license text.
3
4//! Hybrid batching for efficient writes.
5//!
6//! The [`HybridBatcher`] collects items and flushes them in batches based on
7//! configurable thresholds: item count, total bytes, or elapsed time.
8//!
9//! # Example
10//!
11//! ```
12//! use sync_engine::{HybridBatcher, BatchConfig, SizedItem};
13//!
14//! #[derive(Clone)]
15//! struct Item { data: String }
16//! impl SizedItem for Item {
17//!     fn size_bytes(&self) -> usize { self.data.len() }
18//! }
19//!
20//! let config = BatchConfig {
21//!     flush_ms: 100,
22//!     flush_count: 10,
23//!     flush_bytes: 1024,
24//! };
25//!
26//! let mut batcher: HybridBatcher<Item> = HybridBatcher::new(config);
27//! assert!(batcher.is_empty());
28//!
29//! batcher.add(Item { data: "hello".into() });
30//! assert!(!batcher.is_empty());
31//! ```
32
33use std::time::{Duration, Instant};
34use tracing::debug;
35
36/// Batch flush trigger reason
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum FlushReason {
39    /// Time threshold reached
40    Time,
41    /// Item count threshold reached
42    Count,
43    /// Byte size threshold reached
44    Size,
45    /// Manual flush requested
46    Manual,
47    /// Shutdown flush
48    Shutdown,
49    /// View queue flush
50    ViewQueue,
51}
52
53/// Configuration for hybrid batching
54#[derive(Debug, Clone)]
55pub struct BatchConfig {
56    /// Flush after this many milliseconds (even if batch is small)
57    pub flush_ms: u64,
58    /// Flush after this many items
59    pub flush_count: usize,
60    /// Flush after this many bytes
61    pub flush_bytes: usize,
62}
63
64impl Default for BatchConfig {
65    fn default() -> Self {
66        Self {
67            flush_ms: 100,
68            flush_count: 1000,
69            flush_bytes: 1024 * 1024, // 1 MB
70        }
71    }
72}
73
74/// A batch of items ready for flush
75#[derive(Debug)]
76pub struct FlushBatch<T> {
77    pub items: Vec<T>,
78    pub total_bytes: usize,
79    pub reason: FlushReason,
80}
81
82/// A batch of items pending flush
83#[derive(Debug)]
84pub struct Batch<T> {
85    pub items: Vec<T>,
86    pub total_bytes: usize,
87    pub created_at: Instant,
88}
89
90impl<T> Batch<T> {
91    pub fn new() -> Self {
92        Self {
93            items: Vec::new(),
94            total_bytes: 0,
95            created_at: Instant::now(),
96        }
97    }
98
99    pub fn is_empty(&self) -> bool {
100        self.items.is_empty()
101    }
102
103    pub fn len(&self) -> usize {
104        self.items.len()
105    }
106
107    pub fn age(&self) -> Duration {
108        self.created_at.elapsed()
109    }
110
111    pub fn push(&mut self, item: T, size_bytes: usize) {
112        self.items.push(item);
113        self.total_bytes += size_bytes;
114    }
115
116    pub fn take(&mut self) -> Vec<T> {
117        self.total_bytes = 0;
118        self.created_at = Instant::now();
119        std::mem::take(&mut self.items)
120    }
121}
122
123impl<T> Default for Batch<T> {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129/// Hybrid batcher that flushes based on time, count, or size thresholds.
130/// Whichever threshold is hit first triggers the flush.
131pub struct HybridBatcher<T> {
132    config: BatchConfig,
133    batch: Batch<T>,
134}
135
136impl<T> HybridBatcher<T> {
137    pub fn new(config: BatchConfig) -> Self {
138        Self {
139            config,
140            batch: Batch::new(),
141        }
142    }
143
144    /// Add an item to the batch, returns flush reason if threshold hit
145    pub fn push(&mut self, item: T, size_bytes: usize) -> Option<FlushReason> {
146        self.batch.push(item, size_bytes);
147
148        // Check thresholds in priority order
149        if self.batch.len() >= self.config.flush_count {
150            Some(FlushReason::Count)
151        } else if self.batch.total_bytes >= self.config.flush_bytes {
152            Some(FlushReason::Size)
153        } else {
154            None
155        }
156    }
157
158    /// Check if time threshold exceeded
159    #[must_use]
160    pub fn should_flush_time(&self) -> bool {
161        !self.batch.is_empty() 
162            && self.batch.age() >= Duration::from_millis(self.config.flush_ms)
163    }
164
165    /// Take the current batch for flushing
166    pub fn take_batch(&mut self) -> Vec<T> {
167        let count = self.batch.len();
168        let bytes = self.batch.total_bytes;
169        let items = self.batch.take();
170        debug!(count, bytes, "Batch taken for flush");
171        items
172    }
173
174    /// Check if batch is empty
175    #[must_use]
176    pub fn is_empty(&self) -> bool {
177        self.batch.is_empty()
178    }
179
180    /// Get current batch stats
181    #[must_use]
182    pub fn stats(&self) -> (usize, usize, Duration) {
183        (self.batch.len(), self.batch.total_bytes, self.batch.age())
184    }
185}
186
187/// Extension methods for SyncItem batching (avoids orphan rules)
188impl<T: SizedItem> HybridBatcher<T> {
189    /// Add an item using its intrinsic size
190    pub fn add(&mut self, item: T) -> Option<FlushReason> {
191        let size = item.size_bytes();
192        self.push(item, size)
193    }
194    
195    /// Add multiple items at once
196    pub fn add_batch(&mut self, items: Vec<T>) {
197        for item in items {
198            self.add(item);
199        }
200    }
201    
202    /// Take the batch if any threshold is ready
203    pub fn take_if_ready(&mut self) -> Option<FlushBatch<T>> {
204        // Check count/size threshold
205        let reason = if self.batch.len() >= self.config.flush_count {
206            Some(FlushReason::Count)
207        } else if self.batch.total_bytes >= self.config.flush_bytes {
208            Some(FlushReason::Size)
209        } else if self.should_flush_time() {
210            Some(FlushReason::Time)
211        } else {
212            None
213        };
214        
215        reason.map(|r| {
216            // Capture bytes BEFORE take() resets it
217            let total_bytes = self.batch.total_bytes;
218            FlushBatch {
219                items: self.batch.take(),
220                total_bytes,
221                reason: r,
222            }
223        })
224    }
225    
226    /// Force flush regardless of thresholds (for manual flush or shutdown)
227    pub fn force_flush(&mut self) -> Option<FlushBatch<T>> {
228        self.force_flush_with_reason(FlushReason::Manual)
229    }
230    
231    /// Force flush with a specific reason
232    pub fn force_flush_with_reason(&mut self, reason: FlushReason) -> Option<FlushBatch<T>> {
233        if self.batch.is_empty() {
234            return None;
235        }
236        // Capture bytes BEFORE take() resets it
237        let total_bytes = self.batch.total_bytes;
238        Some(FlushBatch {
239            items: self.batch.take(),
240            total_bytes,
241            reason,
242        })
243    }
244}
245
246/// Extension methods for batchable items (have ID for contains check)
247impl<T: BatchableItem> HybridBatcher<T> {
248    /// Check if an item with the given ID is in the pending batch.
249    #[must_use]
250    pub fn contains(&self, id: &str) -> bool {
251        self.batch.items.iter().any(|item| item.id() == id)
252    }
253}
254
255/// Trait for items that know their own size
256pub trait SizedItem {
257    #[must_use]
258    fn size_bytes(&self) -> usize;
259}
260
261/// Trait for items that have an ID (for contains check)
262pub trait BatchableItem: SizedItem {
263    fn id(&self) -> &str;
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use std::thread::sleep;
270
271    // Simple test item
272    #[derive(Debug, Clone)]
273    #[allow(dead_code)]
274    struct TestItem {
275        id: String,
276        size: usize,
277    }
278
279    impl SizedItem for TestItem {
280        fn size_bytes(&self) -> usize {
281            self.size
282        }
283    }
284
285    fn item(id: &str, size: usize) -> TestItem {
286        TestItem { id: id.to_string(), size }
287    }
288
289    #[test]
290    fn test_batch_empty_initially() {
291        let batcher: HybridBatcher<TestItem> = HybridBatcher::new(BatchConfig::default());
292        assert!(batcher.is_empty());
293        let (count, bytes, _) = batcher.stats();
294        assert_eq!(count, 0);
295        assert_eq!(bytes, 0);
296    }
297
298    #[test]
299    fn test_batch_tracks_items_and_bytes() {
300        let mut batcher = HybridBatcher::new(BatchConfig::default());
301        
302        batcher.add(item("a", 100));
303        batcher.add(item("b", 200));
304        batcher.add(item("c", 150));
305        
306        let (count, bytes, _) = batcher.stats();
307        assert_eq!(count, 3);
308        assert_eq!(bytes, 450);
309        assert!(!batcher.is_empty());
310    }
311
312    #[test]
313    fn test_flush_on_count_threshold() {
314        let config = BatchConfig {
315            flush_count: 3,
316            flush_bytes: 1_000_000,
317            flush_ms: 10_000,
318        };
319        let mut batcher = HybridBatcher::new(config);
320        
321        // First two don't trigger
322        assert!(batcher.add(item("a", 100)).is_none());
323        assert!(batcher.add(item("b", 100)).is_none());
324        
325        // Third triggers count threshold
326        let reason = batcher.add(item("c", 100));
327        assert_eq!(reason, Some(FlushReason::Count));
328    }
329
330    #[test]
331    fn test_flush_on_size_threshold() {
332        let config = BatchConfig {
333            flush_count: 1000,
334            flush_bytes: 500,
335            flush_ms: 10_000,
336        };
337        let mut batcher = HybridBatcher::new(config);
338        
339        // First doesn't trigger
340        assert!(batcher.add(item("a", 200)).is_none());
341        assert!(batcher.add(item("b", 200)).is_none());
342        
343        // This pushes over 500 bytes
344        let reason = batcher.add(item("c", 200));
345        assert_eq!(reason, Some(FlushReason::Size));
346    }
347
348    #[test]
349    fn test_flush_on_time_threshold() {
350        let config = BatchConfig {
351            flush_count: 1000,
352            flush_bytes: 1_000_000,
353            flush_ms: 10, // 10ms
354        };
355        let mut batcher = HybridBatcher::new(config);
356        
357        batcher.add(item("a", 100));
358        
359        // Not ready yet
360        assert!(!batcher.should_flush_time());
361        
362        // Wait for threshold
363        sleep(Duration::from_millis(15));
364        
365        // Now ready
366        assert!(batcher.should_flush_time());
367    }
368
369    #[test]
370    fn test_take_if_ready_returns_batch() {
371        let config = BatchConfig {
372            flush_count: 2,
373            flush_bytes: 1_000_000,
374            flush_ms: 10_000,
375        };
376        let mut batcher = HybridBatcher::new(config);
377        
378        batcher.add(item("a", 100));
379        
380        // Not ready yet
381        assert!(batcher.take_if_ready().is_none());
382        
383        batcher.add(item("b", 200));
384        
385        // Now ready (count threshold)
386        let batch = batcher.take_if_ready().unwrap();
387        assert_eq!(batch.items.len(), 2);
388        assert_eq!(batch.total_bytes, 300);
389        assert_eq!(batch.reason, FlushReason::Count);
390        
391        // Batcher should be empty now
392        assert!(batcher.is_empty());
393    }
394
395    #[test]
396    fn test_force_flush() {
397        let mut batcher = HybridBatcher::new(BatchConfig::default());
398        
399        batcher.add(item("a", 100));
400        batcher.add(item("b", 200));
401        
402        // Force flush regardless of thresholds
403        let batch = batcher.force_flush().unwrap();
404        assert_eq!(batch.items.len(), 2);
405        assert_eq!(batch.total_bytes, 300);
406        assert_eq!(batch.reason, FlushReason::Manual);
407        
408        // Should be empty now
409        assert!(batcher.is_empty());
410        
411        // Force flush on empty returns None
412        assert!(batcher.force_flush().is_none());
413        
414        // Test force_flush_with_reason
415        batcher.add(item("c", 100));
416        let batch = batcher.force_flush_with_reason(FlushReason::Shutdown).unwrap();
417        assert_eq!(batch.reason, FlushReason::Shutdown);
418    }
419
420    #[test]
421    fn test_take_resets_batch() {
422        let mut batcher = HybridBatcher::new(BatchConfig::default());
423        
424        batcher.add(item("a", 100));
425        batcher.add(item("b", 200));
426        
427        let items = batcher.take_batch();
428        assert_eq!(items.len(), 2);
429        
430        // Stats should be reset
431        let (count, bytes, age) = batcher.stats();
432        assert_eq!(count, 0);
433        assert_eq!(bytes, 0);
434        assert!(age < Duration::from_millis(10)); // Timer reset
435    }
436
437    #[test]
438    fn test_add_batch() {
439        let mut batcher = HybridBatcher::new(BatchConfig::default());
440        
441        let items = vec![item("a", 100), item("b", 200), item("c", 300)];
442        batcher.add_batch(items);
443        
444        let (count, bytes, _) = batcher.stats();
445        assert_eq!(count, 3);
446        assert_eq!(bytes, 600);
447    }
448
449    #[test]
450    fn test_count_beats_size_on_simultaneous_threshold() {
451        let config = BatchConfig {
452            flush_count: 2,
453            flush_bytes: 200,
454            flush_ms: 10_000,
455        };
456        let mut batcher = HybridBatcher::new(config);
457        
458        batcher.add(item("a", 100));
459        
460        // This hits both count (2) AND size (200) thresholds
461        let reason = batcher.add(item("b", 100));
462        
463        // Count should win (checked first)
464        assert_eq!(reason, Some(FlushReason::Count));
465    }
466}