laminar_core/subscription/
batcher.rs1use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use crate::subscription::event::{ChangeEvent, ChangeEventBatch};
20
21#[derive(Debug, Clone)]
27pub struct BatchConfig {
28 pub max_batch_size: usize,
30 pub max_batch_delay: Duration,
32 pub enabled: bool,
35}
36
37impl Default for BatchConfig {
38 fn default() -> Self {
39 Self {
40 max_batch_size: 64,
41 max_batch_delay: Duration::from_micros(100),
42 enabled: false,
43 }
44 }
45}
46
47pub struct NotificationBatcher {
57 buffers: HashMap<u32, Vec<ChangeEvent>>,
59 last_flush: HashMap<u32, Instant>,
61 config: BatchConfig,
63}
64
65impl NotificationBatcher {
66 #[must_use]
68 pub fn new(config: BatchConfig) -> Self {
69 Self {
70 buffers: HashMap::new(),
71 last_flush: HashMap::new(),
72 config,
73 }
74 }
75
76 pub fn add(
83 &mut self,
84 source_id: u32,
85 source_name: &str,
86 event: ChangeEvent,
87 ) -> Option<ChangeEventBatch> {
88 if !self.config.enabled {
89 let seq = event.sequence().unwrap_or(0);
90 return Some(ChangeEventBatch::new(
91 source_name.to_string(),
92 vec![event],
93 seq,
94 seq,
95 ));
96 }
97
98 let buffer = self.buffers.entry(source_id).or_default();
99 buffer.push(event);
100
101 let now = Instant::now();
102 let last = self.last_flush.entry(source_id).or_insert(now);
103
104 if buffer.len() >= self.config.max_batch_size
105 || now.duration_since(*last) >= self.config.max_batch_delay
106 {
107 *last = now;
108 let events = std::mem::take(buffer);
109 let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
110 let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
111 Some(ChangeEventBatch::new(
112 source_name.to_string(),
113 events,
114 first_seq,
115 last_seq,
116 ))
117 } else {
118 None
119 }
120 }
121
122 pub fn flush_all(&mut self) -> Vec<(u32, ChangeEventBatch)> {
126 let mut results = Vec::new();
127
128 for (&source_id, buffer) in &mut self.buffers {
129 if !buffer.is_empty() {
130 let events = std::mem::take(buffer);
131 let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
132 let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
133 results.push((
134 source_id,
135 ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
136 ));
137 self.last_flush.insert(source_id, Instant::now());
138 }
139 }
140
141 results
142 }
143
144 pub fn flush_expired(&mut self) -> Vec<(u32, ChangeEventBatch)> {
148 let now = Instant::now();
149 let mut results = Vec::new();
150
151 let expired: Vec<u32> = self
152 .buffers
153 .iter()
154 .filter(|(_, buf)| !buf.is_empty())
155 .filter(|(id, _)| {
156 let last = self.last_flush.get(id).copied().unwrap_or(now);
157 now.duration_since(last) >= self.config.max_batch_delay
158 })
159 .map(|(&id, _)| id)
160 .collect();
161
162 for source_id in expired {
163 if let Some(buffer) = self.buffers.get_mut(&source_id) {
164 if !buffer.is_empty() {
165 let events = std::mem::take(buffer);
166 let first_seq = events.first().and_then(ChangeEvent::sequence).unwrap_or(0);
167 let last_seq = events.last().and_then(ChangeEvent::sequence).unwrap_or(0);
168 self.last_flush.insert(source_id, now);
169 results.push((
170 source_id,
171 ChangeEventBatch::new(String::new(), events, first_seq, last_seq),
172 ));
173 }
174 }
175 }
176
177 results
178 }
179
180 #[must_use]
182 pub fn buffered_count(&self) -> usize {
183 self.buffers.values().map(Vec::len).sum()
184 }
185
186 #[must_use]
188 pub fn config(&self) -> &BatchConfig {
189 &self.config
190 }
191}
192
193#[cfg(test)]
198#[allow(clippy::cast_possible_wrap)]
199mod tests {
200 use super::*;
201 use arrow_array::{Int64Array, RecordBatch};
202 use arrow_schema::{DataType, Field, Schema};
203 use std::sync::Arc;
204
205 fn make_event(seq: u64) -> ChangeEvent {
206 let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
207 #[allow(clippy::cast_possible_wrap)]
208 let array = Int64Array::from(vec![seq as i64]);
209 let batch = Arc::new(RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap());
210 #[allow(clippy::cast_possible_wrap)]
211 ChangeEvent::insert(batch, 1000 + seq as i64, seq)
212 }
213
214 #[test]
215 fn test_batcher_immediate_when_disabled() {
216 let config = BatchConfig {
217 enabled: false,
218 ..Default::default()
219 };
220 let mut batcher = NotificationBatcher::new(config);
221
222 let result = batcher.add(0, "mv_a", make_event(1));
223 assert!(result.is_some());
224 let batch = result.unwrap();
225 assert_eq!(batch.len(), 1);
226 assert_eq!(batch.first_sequence, 1);
227 assert_eq!(batch.last_sequence, 1);
228 assert_eq!(batch.source, "mv_a");
229 }
230
231 #[test]
232 fn test_batcher_size_trigger() {
233 let config = BatchConfig {
234 max_batch_size: 3,
235 max_batch_delay: Duration::from_secs(60),
236 enabled: true,
237 };
238 let mut batcher = NotificationBatcher::new(config);
239
240 assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
241 assert!(batcher.add(0, "mv_a", make_event(2)).is_none());
242 assert_eq!(batcher.buffered_count(), 2);
243
244 let batch = batcher.add(0, "mv_a", make_event(3));
245 assert!(batch.is_some());
246 let batch = batch.unwrap();
247 assert_eq!(batch.len(), 3);
248 assert_eq!(batch.first_sequence, 1);
249 assert_eq!(batch.last_sequence, 3);
250 assert_eq!(batcher.buffered_count(), 0);
251 }
252
253 #[test]
254 fn test_batcher_timeout_trigger() {
255 let config = BatchConfig {
256 max_batch_size: 1000,
257 max_batch_delay: Duration::from_millis(1),
258 enabled: true,
259 };
260 let mut batcher = NotificationBatcher::new(config);
261
262 assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
263 std::thread::sleep(Duration::from_millis(5));
264
265 let batch = batcher.add(0, "mv_a", make_event(2));
266 assert!(batch.is_some());
267 assert_eq!(batch.unwrap().len(), 2);
268 }
269
270 #[test]
271 fn test_batcher_flush_all() {
272 let config = BatchConfig {
273 max_batch_size: 100,
274 max_batch_delay: Duration::from_secs(60),
275 enabled: true,
276 };
277 let mut batcher = NotificationBatcher::new(config);
278
279 batcher.add(0, "mv_a", make_event(1));
280 batcher.add(0, "mv_a", make_event(2));
281 batcher.add(1, "mv_b", make_event(3));
282
283 let flushed = batcher.flush_all();
284 assert_eq!(flushed.len(), 2);
285
286 let total_events: usize = flushed.iter().map(|(_, b)| b.len()).sum();
287 assert_eq!(total_events, 3);
288 assert_eq!(batcher.buffered_count(), 0);
289 }
290
291 #[test]
292 fn test_batcher_flush_expired() {
293 let config = BatchConfig {
294 max_batch_size: 100,
295 max_batch_delay: Duration::from_millis(1),
296 enabled: true,
297 };
298 let mut batcher = NotificationBatcher::new(config);
299
300 batcher.add(0, "mv_a", make_event(1));
301 batcher.add(0, "mv_a", make_event(2));
302
303 std::thread::sleep(Duration::from_millis(5));
304
305 let expired = batcher.flush_expired();
306 assert_eq!(expired.len(), 1);
307 assert_eq!(expired[0].1.len(), 2);
308 assert_eq!(batcher.buffered_count(), 0);
309 }
310
311 #[test]
312 fn test_batcher_multiple_sources() {
313 let config = BatchConfig {
314 max_batch_size: 2,
315 max_batch_delay: Duration::from_secs(60),
316 enabled: true,
317 };
318 let mut batcher = NotificationBatcher::new(config);
319
320 assert!(batcher.add(0, "mv_a", make_event(1)).is_none());
321 assert!(batcher.add(1, "mv_b", make_event(2)).is_none());
322 let batch = batcher.add(0, "mv_a", make_event(3));
323 assert!(batch.is_some());
324 assert_eq!(batch.unwrap().len(), 2);
325 assert_eq!(batcher.buffered_count(), 1);
326 }
327}