multistream_batch/
multi_buf_batch.rs

1//! This module provides `MultiBufBatch` that will buffer items into multiple internal batches based on batch stream key until
2//! one of the batches is ready. Then it provides accumulated items in one go along with the batch stream key using `Drain` iterator.
3use linked_hash_map::LinkedHashMap;
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::time::{Duration, Instant};
7use std::vec::Drain;
8
9/// An outstanding batch of items returned from the cache and a time `Instant` at which it was created.
10#[derive(Debug)]
11struct OutstandingBatch<I: Debug> {
12    items: Vec<I>,
13    created: Instant,
14}
15
16impl<I: Debug> OutstandingBatch<I> {
17    fn new() -> OutstandingBatch<I> {
18        OutstandingBatch {
19            items: Vec::new(),
20            created: Instant::now(),
21        }
22    }
23
24    fn from_cache(mut items: Vec<I>) -> OutstandingBatch<I> {
25        // Make sure nothing is left after undrained
26        items.clear();
27
28        OutstandingBatch {
29            items,
30            created: Instant::now(),
31        }
32    }
33}
34
35/// Represents result from `MultiBufBatch.poll()` function call.
36#[derive(Debug)]
37pub enum PollResult<K: Debug> {
38    /// Batch `K` is ready after reaching one of the limits.
39    Ready(K),
40    /// No outstanding batch has not reached one of its limits yet.
41    /// Provides `Duration` after which `max_duration` limit will be reached
42    /// if there is at least one outstanding batch.
43    NotReady(Option<Duration>),
44}
45
46/// Usage statistics.
47#[derive(Debug)]
48pub struct Stats {
49    /// Number of outstanding batches.
50    pub outstanding: usize,
51    /// Number of cached buffers (not used by outstanding batches).
52    pub cached_buffers: usize,
53}
54
55/// Collects items into multiple batches based on stream key.
56/// A batch may become ready after collecting `max_size` number of items or until `max_duration` has elapsed
57/// since first item was appended to the batch.
58///
59/// Batch item buffers are cached and reused to avoid allocations.
60///
61/// This base implementation does not handle actual awaiting for batch duration timeout.
62#[derive(Debug)]
63pub struct MultiBufBatch<K: Debug + Ord + Hash, I: Debug> {
64    max_size: usize,
65    max_duration: Duration,
66    // Cache of empty batch item buffers
67    cache: Vec<Vec<I>>,
68    // Batches that have items in them but has not yet reached any limit in order of insertion
69    outstanding: LinkedHashMap<K, OutstandingBatch<I>>,
70    // Batch with key K is ready to be consumed due to reaching max_size limit
71    full: Option<K>,
72}
73
74impl<K, I> MultiBufBatch<K, I>
75where
76    K: Debug + Ord + Hash + Clone,
77    I: Debug,
78{
79    /// Crates new instance with given maximum batch size (`max_size`) and maximum duration (`max_duration`) that
80    /// batch can last since first item appended to it.
81    ///
82    /// Panics if `max_size == 0`.
83    pub fn new(max_size: usize, max_duration: Duration) -> MultiBufBatch<K, I> {
84        assert!(max_size > 0, "MultiBufBatch::new bad max_size");
85
86        MultiBufBatch {
87            max_size,
88            max_duration,
89            cache: Default::default(),
90            outstanding: Default::default(),
91            full: Default::default(),
92        }
93    }
94
95    /// Checks if batch has reached one of its limits.
96    ///
97    /// Returns:
98    /// * `PollResult::Ready(K)` - batch for stream key `K` has reached one of its limit and is ready to be consumed,
99    /// * `PollResult::NotReady(None)` - batch is not ready yet and has no items appeded yet,
100    /// * `PollResult::NotReady(Some(duration))` - batch is not ready yet but it will be ready after time duration due to duration limit.
101    pub fn poll(&self) -> PollResult<K> {
102        // Check oldest full batch first to make sure that following call to append won't fail
103        if let Some(key) = &self.full {
104            return PollResult::Ready(key.clone());
105        }
106
107        // Check oldest outstanding batch
108        if let Some((key, batch)) = self.outstanding.front() {
109            let since_start = Instant::now().duration_since(batch.created);
110
111            if since_start >= self.max_duration {
112                return PollResult::Ready(key.clone());
113            }
114
115            return PollResult::NotReady(Some(self.max_duration - since_start));
116        }
117
118        return PollResult::NotReady(None);
119    }
120
121    /// Appends item to batch with given stream key.
122    ///
123    /// It is an contract error to append batch that is ready according to `self.poll()`.
124    ///
125    /// Panics if batch has already reached its `max_size` limit.
126    pub fn append(&mut self, key: K, item: I) {
127        assert!(
128            self.full.is_none(),
129            "MultiBufBatch::append unconsumed full batch"
130        );
131
132        // Look up batch in outstanding or crate one using cached or new items buffer
133        if let Some(batch) = self.outstanding.get_mut(&key) {
134            assert!(
135                batch.items.len() < self.max_size,
136                "MultiBufBatch::append on full batch"
137            );
138
139            batch.items.push(item);
140
141            // Mark as full
142            if batch.items.len() >= self.max_size {
143                self.full = Some(key);
144            }
145        } else {
146            let mut batch = if let Some(items) = self.cache.pop() {
147                OutstandingBatch::from_cache(items)
148            } else {
149                OutstandingBatch::new()
150            };
151
152            batch.items.push(item);
153            self.outstanding.insert(key, batch);
154        }
155    }
156
157    /// Moves outstanding batch item buffer to cache and returns its `&mut` reference.
158    fn move_to_cache(&mut self, key: &K) -> Option<&mut Vec<I>> {
159        // If consuming full key clear it
160        if self.full.as_ref().filter(|fkey| *fkey == key).is_some() {
161            self.full.take();
162        }
163
164        // Move items from outstanding to cache
165        let items = self.outstanding.remove(key)?.items;
166        self.cache.push(items);
167        self.cache.last_mut()
168    }
169
170    /// Lists keys of outstanding batches.
171    pub fn outstanding(&self) -> impl Iterator<Item = &K> {
172        self.outstanding.keys()
173    }
174
175    /// Starts new batch dropping all buffered items.
176    pub fn clear(&mut self, key: &K) {
177        self.move_to_cache(key).map(|items| items.clear());
178    }
179
180    /// Consumes batch by draining items from internal buffer.
181    pub fn drain(&mut self, key: &K) -> Option<Drain<I>> {
182        self.move_to_cache(key).map(|items| items.drain(0..))
183    }
184
185    /// Flushes all outstanding batches starting from oldest.
186    pub fn flush(&mut self) -> Vec<(K, Vec<I>)> {
187        let cache = &mut self.cache;
188        let outstanding = &mut self.outstanding;
189
190        outstanding
191            .entries()
192            .map(|entry| {
193                let key = entry.key().clone();
194
195                // Move to cache
196                let items = entry.remove().items;
197                cache.push(items);
198                let items = cache.last_mut().unwrap();
199
200                // Move items out preserving capacity
201                let items = items.split_off(0);
202
203                (key, items)
204            })
205            .collect()
206    }
207
208    /// Returns slice of internal item buffer of given outstanding batch.
209    pub fn get(&self, key: &K) -> Option<&[I]> {
210        self.outstanding
211            .get(key)
212            .map(|batch| batch.items.as_slice())
213    }
214
215    /// Drops cached batch buffers.
216    pub fn clear_cache(&mut self) {
217        self.cache.clear();
218    }
219
220    /// Provides usage statistics.
221    pub fn stats(&self) -> Stats {
222        Stats {
223            outstanding: self.outstanding.len(),
224            cached_buffers: self.cache.len(),
225        }
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    pub use super::*;
232    use assert_matches::assert_matches;
233    use std::time::Duration;
234
235    #[test]
236    fn test_batch_poll() {
237        let mut batch = MultiBufBatch::new(4, Duration::from_secs(10));
238
239        // empty has no outstanding batches
240        assert_matches!(batch.poll(), PollResult::NotReady(None));
241
242        batch.append(0, 1);
243
244        // now we have outstanding
245        assert_matches!(batch.poll(), PollResult::NotReady(Some(_instant)));
246
247        batch.append(0, 2);
248        batch.append(0, 3);
249        batch.append(0, 4);
250
251        assert_matches!(batch.poll(), PollResult::Ready(0) =>
252            assert_eq!(batch.drain(&0).unwrap().collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
253        );
254
255        // no outstanding again
256        assert_matches!(batch.poll(), PollResult::NotReady(None));
257    }
258
259    #[test]
260    fn test_batch_max_size() {
261        let mut batch = MultiBufBatch::new(4, Duration::from_secs(10));
262
263        batch.append(0, 1);
264        batch.append(0, 2);
265        batch.append(0, 3);
266        batch.append(0, 4);
267
268        assert_matches!(batch.poll(), PollResult::Ready(0) =>
269            assert_eq!(batch.drain(&0).unwrap().collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
270        );
271
272        batch.append(0, 5);
273        batch.append(0, 6);
274        batch.append(0, 7);
275        batch.append(0, 8);
276
277        assert_matches!(batch.poll(), PollResult::Ready(0) =>
278            assert_eq!(batch.drain(&0).unwrap().collect::<Vec<_>>().as_slice(), [5, 6, 7, 8])
279        );
280
281        batch.append(1, 1);
282        batch.append(0, 9);
283        batch.append(1, 2);
284        batch.append(0, 10);
285        batch.append(1, 3);
286        batch.append(0, 11);
287        batch.append(1, 4);
288
289        assert_matches!(batch.poll(), PollResult::Ready(1) =>
290            assert_eq!(batch.drain(&1).unwrap().collect::<Vec<_>>().as_slice(), [1, 2, 3, 4])
291        );
292
293        batch.append(0, 12);
294
295        assert_matches!(batch.poll(), PollResult::Ready(0) =>
296            assert_eq!(batch.drain(&0).unwrap().collect::<Vec<_>>().as_slice(), [9, 10, 11, 12])
297        );
298    }
299
300    #[test]
301    fn test_batch_max_duration() {
302        let mut batch = MultiBufBatch::new(4, Duration::from_millis(100));
303
304        batch.append(0, 1);
305        batch.append(0, 2);
306
307        let ready_after = match batch.poll() {
308            PollResult::NotReady(Some(ready_after)) => ready_after,
309            _ => panic!("expected NotReady with instant"),
310        };
311
312        std::thread::sleep(ready_after);
313
314        assert_matches!(batch.poll(), PollResult::Ready(0) =>
315            assert_eq!(batch.drain(&0).unwrap().collect::<Vec<_>>().as_slice(), [1, 2])
316        );
317
318        batch.append(0, 3);
319        batch.append(0, 4);
320        batch.append(0, 5);
321        batch.append(0, 6);
322
323        assert_matches!(batch.poll(), PollResult::Ready(0) =>
324            assert_eq!(batch.drain(&0).unwrap().collect::<Vec<_>>().as_slice(), [3, 4, 5, 6])
325        );
326    }
327
328    #[test]
329    fn test_drain_stream() {
330        let mut batch = MultiBufBatch::new(4, Duration::from_secs(10));
331
332        batch.append(0, 1);
333        batch.append(0, 2);
334        batch.append(0, 3);
335
336        batch.append(1, 1);
337        batch.append(1, 2);
338
339        assert_matches!(batch.drain(&1), Some(drain) =>
340            assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2])
341        );
342
343        assert_matches!(batch.drain(&0), Some(drain) =>
344            assert_eq!(drain.collect::<Vec<_>>().as_slice(), [1, 2, 3])
345        );
346
347        batch.append(0, 5);
348        batch.append(0, 6);
349        batch.append(0, 7);
350        batch.append(0, 8);
351
352        assert_matches!(batch.poll(), PollResult::Ready(0) =>
353            assert_eq!(batch.drain(&0).unwrap().collect::<Vec<_>>().as_slice(), [5, 6, 7, 8])
354        );
355    }
356
357    #[test]
358    fn test_flush() {
359        let mut batch = MultiBufBatch::new(4, Duration::from_secs(10));
360
361        batch.append(0, 1);
362        batch.append(1, 1);
363        batch.append(0, 2);
364        batch.append(1, 2);
365        batch.append(0, 3);
366
367        let batches = batch.flush();
368
369        assert_eq!(batches[0].0, 0);
370        assert_eq!(batches[0].1.as_slice(), [1, 2, 3]);
371
372        assert_eq!(batches[1].0, 1);
373        assert_eq!(batches[1].1.as_slice(), [1, 2]);
374    }
375}