tokio_batch/
lib.rs

1#[cfg(test)]
2#[macro_use]
3extern crate doc_comment;
4
5#[cfg(test)]
6doctest!("../README.md");
7
8
9use core::mem;
10use core::pin::Pin;
11use futures::stream::{Fuse, FusedStream, Stream};
12use futures::Future;
13use futures::task::{Context, Poll};
14use futures::StreamExt;
15#[cfg(feature = "sink")]
16use futures_sink::Sink;
17use pin_utils::{unsafe_pinned, unsafe_unpinned};
18
19use std::time::Duration;
20use futures_timer::Delay;
21
22pub trait ChunksTimeoutStreamExt: Stream {
23    fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout<Self>
24    where
25        Self: Sized,
26    {
27        ChunksTimeout::new(self, capacity, duration)
28    }
29}
30impl<T: ?Sized> ChunksTimeoutStreamExt for T where T: Stream {}
31
32#[derive(Debug)]
33#[must_use = "streams do nothing unless polled"]
34pub struct ChunksTimeout<St: Stream> {
35    stream: Fuse<St>,
36    items: Vec<St::Item>,
37    cap: usize,
38    // https://github.com/rust-lang-nursery/futures-rs/issues/1475
39    clock: Option<Delay>,
40    duration: Duration,
41}
42
43impl<St: Unpin + Stream> Unpin for ChunksTimeout<St> {}
44
45impl<St: Stream> ChunksTimeout<St>
46where
47    St: Stream,
48{
49    unsafe_unpinned!(items: Vec<St::Item>);
50    unsafe_pinned!(clock: Option<Delay>);
51    unsafe_pinned!(stream: Fuse<St>);
52
53    pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout<St> {
54        assert!(capacity > 0);
55
56        ChunksTimeout {
57            stream: stream.fuse(),
58            items: Vec::with_capacity(capacity),
59            cap: capacity,
60            clock: None,
61            duration,
62        }
63    }
64
65    fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> {
66        let cap = self.cap;
67        mem::replace(self.as_mut().items(), Vec::with_capacity(cap))
68    }
69
70    /// Acquires a reference to the underlying stream that this combinator is
71    /// pulling from.
72    pub fn get_ref(&self) -> &St {
73        self.stream.get_ref()
74    }
75
76    /// Acquires a mutable reference to the underlying stream that this
77    /// combinator is pulling from.
78    ///
79    /// Note that care must be taken to avoid tampering with the state of the
80    /// stream which may otherwise confuse this combinator.
81    pub fn get_mut(&mut self) -> &mut St {
82        self.stream.get_mut()
83    }
84
85    /// Acquires a pinned mutable reference to the underlying stream that this
86    /// combinator is pulling from.
87    ///
88    /// Note that care must be taken to avoid tampering with the state of the
89    /// stream which may otherwise confuse this combinator.
90    pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
91        self.stream().get_pin_mut()
92    }
93
94    /// Consumes this combinator, returning the underlying stream.
95    ///
96    /// Note that this may discard intermediate state of this combinator, so
97    /// care should be taken to avoid losing resources when this is called.
98    pub fn into_inner(self) -> St {
99        self.stream.into_inner()
100    }
101}
102
103impl<St: Stream> Stream for ChunksTimeout<St> {
104    type Item = Vec<St::Item>;
105
106    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107        loop {
108            match self.as_mut().stream().poll_next(cx) {
109                Poll::Ready(item) => match item {
110                    // Push the item into the buffer and check whether it is full.
111                    // If so, replace our buffer with a new and empty one and return
112                    // the full one.
113                    Some(item) => {
114                        if self.items.is_empty() {
115                            *self.as_mut().clock() =
116                                Some(Delay::new(self.duration));
117                        }
118                        self.as_mut().items().push(item);
119                        if self.items.len() >= self.cap {
120                            *self.as_mut().clock() = None;
121                            return Poll::Ready(Some(self.as_mut().take()));
122                        } else {
123                            // Continue the loop
124                            continue;
125                        }
126                    }
127
128                    // Since the underlying stream ran out of values, return what we
129                    // have buffered, if we have anything.
130                    None => {
131                        let last = if self.items.is_empty() {
132                            None
133                        } else {
134                            let full_buf = mem::replace(self.as_mut().items(), Vec::new());
135                            Some(full_buf)
136                        };
137
138                        return Poll::Ready(last);
139                    }
140                },
141                // Don't return here, as we need to need check the clock.
142                Poll::Pending => {}
143            }
144
145            match self.as_mut().clock().as_pin_mut().map(|clock| clock.poll(cx)) {
146                Some(Poll::Ready(())) => {
147                    *self.as_mut().clock() = None;
148                    return Poll::Ready(Some(self.as_mut().take()));
149                }
150                Some(Poll::Pending) => {}
151                None => {
152                    debug_assert!(
153                        self.as_mut().items().is_empty(),
154                        "Inner buffer is empty, but clock is available."
155                    );
156                }
157            }
158
159            return Poll::Pending;
160        }
161    }
162
163    fn size_hint(&self) -> (usize, Option<usize>) {
164        let chunk_len = if self.items.is_empty() { 0 } else { 1 };
165        let (lower, upper) = self.stream.size_hint();
166        let lower = lower.saturating_add(chunk_len);
167        let upper = match upper {
168            Some(x) => x.checked_add(chunk_len),
169            None => None,
170        };
171        (lower, upper)
172    }
173}
174
175impl<St: FusedStream> FusedStream for ChunksTimeout<St> {
176    fn is_terminated(&self) -> bool {
177        self.stream.is_terminated() & self.items.is_empty()
178    }
179}
180
181// Forwarding impl of Sink from the underlying stream
182#[cfg(feature = "sink")]
183impl<S, Item> Sink<Item> for ChunksTimeout<S>
184where
185    S: Stream + Sink<Item>,
186{
187    type Error = S::Error;
188
189    delegate_sink!(stream, Item);
190}
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195    use futures::future;
196    use futures::{stream, FutureExt, StreamExt, TryFutureExt};
197    use std::iter;
198    use std::time::{Duration, Instant};
199
200    #[test]
201    fn messages_pass_through() {
202        let v = stream::iter(iter::once(5))
203            .chunks_timeout(5, Duration::new(1, 0))
204            .collect::<Vec<_>>();
205        tokio::run(
206            v.then(|x| {
207                assert_eq!(vec![vec![5]], x);
208                future::ready(())
209            })
210            .unit_error()
211            .boxed()
212            .compat(),
213        );
214    }
215
216    #[test]
217    fn message_chunks() {
218        let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
219        let stream = stream::iter(iter);
220
221        let chunk_stream = ChunksTimeout::new(stream, 5, Duration::new(1, 0));
222
223        let v = chunk_stream.collect::<Vec<_>>();
224        tokio::run(
225            v.then(|res| {
226                assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], res);
227                future::ready(())
228            })
229            .unit_error()
230            .boxed()
231            .compat(),
232        );
233    }
234
235    #[test]
236    fn message_early_exit() {
237        let iter = vec![1, 2, 3, 4].into_iter();
238        let stream = stream::iter(iter);
239
240        let chunk_stream = ChunksTimeout::new(stream, 5, Duration::new(1, 0));
241
242        let v = chunk_stream.collect::<Vec<_>>();
243        tokio::run(
244            v.then(|res| {
245                assert_eq!(vec![vec![1, 2, 3, 4]], res);
246                future::ready(())
247            })
248            .unit_error()
249            .boxed()
250            .compat(),
251        );
252    }
253
254    // TODO: use the `tokio-test` and `futures-test-preview` crates
255    #[test]
256    fn message_timeout() {
257        let iter = vec![1, 2, 3, 4].into_iter();
258        let stream0 = stream::iter(iter);
259
260        let iter = vec![5].into_iter();
261        let stream1 = stream::iter(iter).then(move |n| {
262            Delay::new(Duration::from_millis(300))
263                .map(move |_| n)
264        });
265
266        let iter = vec![6, 7, 8].into_iter();
267        let stream2 = stream::iter(iter);
268
269        let stream = stream0.chain(stream1).chain(stream2);
270        let chunk_stream = ChunksTimeout::new(stream, 5, Duration::from_millis(100));
271
272        let now = Instant::now();
273        let min_times = [Duration::from_millis(80), Duration::from_millis(150)];
274        let max_times = [Duration::from_millis(280), Duration::from_millis(350)];
275        let results = vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8]];
276        let mut i = 0;
277
278        let v = chunk_stream
279            .map(move |s| {
280                let now2 = Instant::now();
281                println!("{}: {:?} {:?}", i, now2 - now, s);
282                assert!((now2 - now) < max_times[i]);
283                assert!((now2 - now) > min_times[i]);
284                i += 1;
285                s
286            })
287            .collect::<Vec<_>>();
288
289        tokio::run(
290            v.then(move |res| {
291                assert_eq!(res, results);
292                future::ready(())
293            })
294            .unit_error()
295            .boxed()
296            .compat(),
297        );
298    }
299}