exocore_core/futures/
batching_stream.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::{Stream, StreamExt};
7
8/// Wraps a stream to batch all available capped number of items.
9///
10/// This stream doesn't block wait for a certain duration before sending
11/// available items, but will consume the underlying stream until it would
12/// block, or until the maximum number of items is collected.
13pub struct BatchingStream<S>
14where
15    S: Stream + Unpin,
16{
17    inner: S,
18    inner_done: bool,
19    max_items: usize,
20}
21
22impl<S> BatchingStream<S>
23where
24    S: Stream + Unpin,
25{
26    pub fn new(inner: S, max_items: usize) -> BatchingStream<S> {
27        BatchingStream {
28            inner,
29            inner_done: false,
30            max_items,
31        }
32    }
33}
34
35impl<S> Stream for BatchingStream<S>
36where
37    S: Stream + Unpin,
38{
39    type Item = Vec<S::Item>;
40
41    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42        if self.inner_done {
43            return Poll::Ready(None);
44        }
45
46        let max_items = self.max_items;
47        let mut pinned_inner = Pin::new(&mut self.inner);
48        let mut buf = Vec::new();
49        for _ in 0..max_items {
50            match pinned_inner.poll_next_unpin(cx) {
51                Poll::Ready(Some(item)) => {
52                    buf.push(item);
53                }
54                Poll::Ready(None) => {
55                    self.inner_done = true;
56                    break;
57                }
58                Poll::Pending => {
59                    break;
60                }
61            }
62        }
63
64        if !buf.is_empty() {
65            Poll::Ready(Some(buf))
66        } else if self.inner_done {
67            Poll::Ready(None)
68        } else {
69            Poll::Pending
70        }
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use futures::{channel::mpsc, SinkExt};
77
78    use super::*;
79    use crate::futures::block_on;
80
81    #[test]
82    fn should_batch_items() {
83        let (mut sender, receiver) = mpsc::channel(15);
84        let mut batched_receiver = BatchingStream::new(receiver, 10);
85
86        block_on(async {
87            for _i in 0u8..15 {
88                sender.send(()).await.unwrap();
89            }
90        });
91
92        let result = block_on(async { batched_receiver.next().await });
93        assert_eq!(result, Some(vec![(); 10]));
94
95        let result = block_on(async { batched_receiver.next().await });
96        assert_eq!(result, Some(vec![(); 5]));
97
98        drop(sender);
99
100        let result = block_on(async { batched_receiver.next().await });
101        assert_eq!(result, None);
102    }
103}