exocore_core/futures/
batching_stream.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::{Stream, StreamExt};
7
8pub struct BatchingStream<S>
14where
15 S: Stream + Unpin,
16{
17 inner: S,
18 inner_done: bool,
19 max_items: usize,
20}
21
22impl<S> BatchingStream<S>
23where
24 S: Stream + Unpin,
25{
26 pub fn new(inner: S, max_items: usize) -> BatchingStream<S> {
27 BatchingStream {
28 inner,
29 inner_done: false,
30 max_items,
31 }
32 }
33}
34
35impl<S> Stream for BatchingStream<S>
36where
37 S: Stream + Unpin,
38{
39 type Item = Vec<S::Item>;
40
41 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
42 if self.inner_done {
43 return Poll::Ready(None);
44 }
45
46 let max_items = self.max_items;
47 let mut pinned_inner = Pin::new(&mut self.inner);
48 let mut buf = Vec::new();
49 for _ in 0..max_items {
50 match pinned_inner.poll_next_unpin(cx) {
51 Poll::Ready(Some(item)) => {
52 buf.push(item);
53 }
54 Poll::Ready(None) => {
55 self.inner_done = true;
56 break;
57 }
58 Poll::Pending => {
59 break;
60 }
61 }
62 }
63
64 if !buf.is_empty() {
65 Poll::Ready(Some(buf))
66 } else if self.inner_done {
67 Poll::Ready(None)
68 } else {
69 Poll::Pending
70 }
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use futures::{channel::mpsc, SinkExt};
77
78 use super::*;
79 use crate::futures::block_on;
80
81 #[test]
82 fn should_batch_items() {
83 let (mut sender, receiver) = mpsc::channel(15);
84 let mut batched_receiver = BatchingStream::new(receiver, 10);
85
86 block_on(async {
87 for _i in 0u8..15 {
88 sender.send(()).await.unwrap();
89 }
90 });
91
92 let result = block_on(async { batched_receiver.next().await });
93 assert_eq!(result, Some(vec![(); 10]));
94
95 let result = block_on(async { batched_receiver.next().await });
96 assert_eq!(result, Some(vec![(); 5]));
97
98 drop(sender);
99
100 let result = block_on(async { batched_receiver.next().await });
101 assert_eq!(result, None);
102 }
103}