Skip to main content

laminar_core/subscription/
batcher.rs

1//! Notification batching for throughput optimization.
2//!
3//! Accumulates multiple [`ChangeEvent`]s before delivering them as a
4//! [`ChangeEventBatch`], reducing per-event dispatch overhead for
5//! high-throughput scenarios.
6//!
7//! # Flush Triggers
8//!
9//! A batch is flushed when either:
10//! - The batch reaches `max_batch_size` events (size trigger)
11//! - The time since the last flush exceeds `max_batch_delay` (time trigger)
12//!
13//! When batching is disabled (`enabled: false`), events pass through
14//! immediately as single-event batches.
15
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use crate::subscription::event::{ChangeEvent, ChangeEventBatch};
20
21// ---------------------------------------------------------------------------
22// BatchConfig
23// ---------------------------------------------------------------------------
24
25/// Configuration for notification batching.
26#[derive(Debug, Clone)]
27pub struct BatchConfig {
28    /// Maximum events per batch before flushing.
29    pub max_batch_size: usize,
30    /// Maximum time to wait for a batch to fill before flushing.
31    pub max_batch_delay: Duration,
32    /// Whether batching is enabled. When `false`, events pass through
33    /// immediately as single-event batches.
34    pub enabled: bool,
35}
36
37impl Default for BatchConfig {
38    fn default() -> Self {
39        Self {
40            max_batch_size: 64,
41            max_batch_delay: Duration::from_micros(100),
42            enabled: false,
43        }
44    }
45}
46
47// ---------------------------------------------------------------------------
48// NotificationBatcher
49// ---------------------------------------------------------------------------
50
51/// Accumulates events per source and flushes as batches.
52///
53/// Used by the Ring 1 dispatcher to reduce per-event overhead when
54/// delivering to subscribers. Each source has its own buffer, so
55/// events from different sources are never mixed.
56pub struct NotificationBatcher {
57    /// Buffered events per `source_id`.
58    buffers: HashMap<u32, Vec<ChangeEvent>>,
59    /// Last flush time per `source_id`.
60    last_flush: HashMap<u32, Instant>,
61    /// Configuration.
62    config: BatchConfig,
63}
64
65impl NotificationBatcher {
66    /// Creates a new batcher with the given configuration.
67    #[must_use]
68    pub fn new(config: BatchConfig) -> Self {
69        Self {
70            buffers: HashMap::new(),
71            last_flush: HashMap::new(),
72            config,
73        }
74    }
75
76    /// Adds an event to the batcher.
77    ///
78    /// Returns `Some(ChangeEventBatch)` if the batch is ready to deliver
79    /// (size or time trigger), or `None` if the event was buffered.
80    ///
81    /// When batching is disabled, always returns a single-event batch.
82    pub fn add(
83        &mut self,
84        source_id: u32,
85        source_name: &str,
86        event: ChangeEvent,
87    ) -> Option<ChangeEventBatch> {
88        if !self.config.enabled {
89            let seq = event.sequence().unwrap_or(0);
90            return Some(ChangeEventBatch::new(
91                source_name.to_string(),
92                vec![event],
93                seq,
94                seq,
95            ));
96        }
97
98        let buffer = self.buffers.entry(source_id).or_default();
99        buffer.push(event);
100
101        let now = Instant::now();
102        let last = self.last_flush.entry(source_id).or_insert(now);
103
104        if buffer.len() >= self.config.max_batch_size
105            || now.duration_since(*last) >= self.config.max_batch_delay
106        {
107            *last = now;
108            let events = std::mem::take(buffer);
109            let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
110            let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
111            Some(ChangeEventBatch::new(
112                source_name.to_string(),
113                events,
114                first_seq,
115                last_seq,
116            ))
117        } else {
118            None
119        }
120    }
121
122    /// Forces flush of all pending batches.
123    ///
124    /// Returns a vec of `(source_id, batch)` for all non-empty buffers.
125    pub fn flush_all(&mut self) -> Vec<(u32, ChangeEventBatch)> {
126        let mut results = Vec::new();
127
128        for (&source_id, buffer) in &mut self.buffers {
129            if !buffer.is_empty() {
130                let events = std::mem::take(buffer);
131                let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
132                let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
133                results.push((
134                    source_id,
135                    ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
136                ));
137                self.last_flush.insert(source_id, Instant::now());
138            }
139        }
140
141        results
142    }
143
144    /// Flushes batches that have exceeded the `max_batch_delay`.
145    ///
146    /// Returns a vec of `(source_id, batch)` for expired buffers.
147    pub fn flush_expired(&mut self) -> Vec<(u32, ChangeEventBatch)> {
148        let now = Instant::now();
149        let mut results = Vec::new();
150
151        let expired: Vec<u32> = self
152            .buffers
153            .iter()
154            .filter(|(_, buf)| !buf.is_empty())
155            .filter(|(id, _)| {
156                let last = self.last_flush.get(id).copied().unwrap_or(now);
157                now.duration_since(last) >= self.config.max_batch_delay
158            })
159            .map(|(&id, _)| id)
160            .collect();
161
162        for source_id in expired {
163            if let Some(buffer) = self.buffers.get_mut(&source_id) {
164                if !buffer.is_empty() {
165                    let events = std::mem::take(buffer);
166                    let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
167                    let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
168                    self.last_flush.insert(source_id, now);
169                    results.push((
170                        source_id,
171                        ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
172                    ));
173                }
174            }
175        }
176
177        results
178    }
179
180    /// Returns the number of buffered events across all sources.
181    #[must_use]
182    pub fn buffered_count(&self) -> usize {
183        self.buffers.values().map(Vec::len).sum()
184    }
185
186    /// Returns the configuration.
187    #[must_use]
188    pub fn config(&self) -> &BatchConfig {
189        &self.config
190    }
191}
192
193// ===========================================================================
194// Tests
195// ===========================================================================
196
197#[cfg(test)]
198#[allow(clippy::cast_possible_wrap)]
199mod tests {
200    use super::*;
201    use arrow_array::{Int64Array, RecordBatch};
202    use arrow_schema::{DataType, Field, Schema};
203    use std::sync::Arc;
204
205    fn make_event(seq: u64) -> ChangeEvent {
206        let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
207        #[allow(clippy::cast_possible_wrap)]
208        let array = Int64Array::from(vec![seq as i64]);
209        let batch = Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap());
210        #[allow(clippy::cast_possible_wrap)]
211        ChangeEvent::insert(batch, 1000 + seq as i64, seq)
212    }
213
214    #[test]
215    fn test_batcher_immediate_when_disabled() {
216        let config = BatchConfig {
217            enabled: false,
218            ..Default::default()
219        };
220        let mut batcher = NotificationBatcher::new(config);
221
222        let result = batcher.add(0, "mv_a", make_event(1));
223        assert!(result.is_some());
224        let batch = result.unwrap();
225        assert_eq!(batch.len(), 1);
226        assert_eq!(batch.first_sequence, 1);
227        assert_eq!(batch.last_sequence, 1);
228        assert_eq!(batch.source, "mv_a");
229    }
230
231    #[test]
232    fn test_batcher_size_trigger() {
233        let config = BatchConfig {
234            max_batch_size: 3,
235            max_batch_delay: Duration::from_secs(60),
236            enabled: true,
237        };
238        let mut batcher = NotificationBatcher::new(config);
239
240        assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
241        assert!(batcher.add(0, "mv_a", make_event(2)).is_none());
242        assert_eq!(batcher.buffered_count(), 2);
243
244        let batch = batcher.add(0, "mv_a", make_event(3));
245        assert!(batch.is_some());
246        let batch = batch.unwrap();
247        assert_eq!(batch.len(), 3);
248        assert_eq!(batch.first_sequence, 1);
249        assert_eq!(batch.last_sequence, 3);
250        assert_eq!(batcher.buffered_count(), 0);
251    }
252
253    #[test]
254    fn test_batcher_timeout_trigger() {
255        let config = BatchConfig {
256            max_batch_size: 1000,
257            max_batch_delay: Duration::from_millis(1),
258            enabled: true,
259        };
260        let mut batcher = NotificationBatcher::new(config);
261
262        assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
263        std::thread::sleep(Duration::from_millis(5));
264
265        let batch = batcher.add(0, "mv_a", make_event(2));
266        assert!(batch.is_some());
267        assert_eq!(batch.unwrap().len(), 2);
268    }
269
270    #[test]
271    fn test_batcher_flush_all() {
272        let config = BatchConfig {
273            max_batch_size: 100,
274            max_batch_delay: Duration::from_secs(60),
275            enabled: true,
276        };
277        let mut batcher = NotificationBatcher::new(config);
278
279        batcher.add(0, "mv_a", make_event(1));
280        batcher.add(0, "mv_a", make_event(2));
281        batcher.add(1, "mv_b", make_event(3));
282
283        let flushed = batcher.flush_all();
284        assert_eq!(flushed.len(), 2);
285
286        let total_events: usize = flushed.iter().map(|(_, b)| b.len()).sum();
287        assert_eq!(total_events, 3);
288        assert_eq!(batcher.buffered_count(), 0);
289    }
290
291    #[test]
292    fn test_batcher_flush_expired() {
293        let config = BatchConfig {
294            max_batch_size: 100,
295            max_batch_delay: Duration::from_millis(1),
296            enabled: true,
297        };
298        let mut batcher = NotificationBatcher::new(config);
299
300        batcher.add(0, "mv_a", make_event(1));
301        batcher.add(0, "mv_a", make_event(2));
302
303        std::thread::sleep(Duration::from_millis(5));
304
305        let expired = batcher.flush_expired();
306        assert_eq!(expired.len(), 1);
307        assert_eq!(expired[0].1.len(), 2);
308        assert_eq!(batcher.buffered_count(), 0);
309    }
310
311    #[test]
312    fn test_batcher_multiple_sources() {
313        let config = BatchConfig {
314            max_batch_size: 2,
315            max_batch_delay: Duration::from_secs(60),
316            enabled: true,
317        };
318        let mut batcher = NotificationBatcher::new(config);
319
320        assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
321        assert!(batcher.add(1, "mv_b", make_event(2)).is_none());
322        let batch = batcher.add(0, "mv_a", make_event(3));
323        assert!(batch.is_some());
324        assert_eq!(batch.unwrap().len(), 2);
325        assert_eq!(batcher.buffered_count(), 1);
326    }
327}