1#[cfg(test)]
2#[macro_use]
3extern crate doc_comment;
4
5#[cfg(test)]
6doctest!("../README.md");
7
8
9use core::mem;
10use core::pin::Pin;
11use futures::stream::{Fuse, FusedStream, Stream};
12use futures::Future;
13use futures::task::{Context, Poll};
14use futures::StreamExt;
15#[cfg(feature = "sink")]
16use futures_sink::Sink;
17use pin_utils::{unsafe_pinned, unsafe_unpinned};
18
19use std::time::Duration;
20use futures_timer::Delay;
21
22pub trait ChunksTimeoutStreamExt: Stream {
23 fn chunks_timeout(self, capacity: usize, duration: Duration) -> ChunksTimeout<Self>
24 where
25 Self: Sized,
26 {
27 ChunksTimeout::new(self, capacity, duration)
28 }
29}
30impl<T: ?Sized> ChunksTimeoutStreamExt for T where T: Stream {}
31
32#[derive(Debug)]
33#[must_use = "streams do nothing unless polled"]
34pub struct ChunksTimeout<St: Stream> {
35 stream: Fuse<St>,
36 items: Vec<St::Item>,
37 cap: usize,
38 clock: Option<Delay>,
40 duration: Duration,
41}
42
43impl<St: Unpin + Stream> Unpin for ChunksTimeout<St> {}
44
45impl<St: Stream> ChunksTimeout<St>
46where
47 St: Stream,
48{
49 unsafe_unpinned!(items: Vec<St::Item>);
50 unsafe_pinned!(clock: Option<Delay>);
51 unsafe_pinned!(stream: Fuse<St>);
52
53 pub fn new(stream: St, capacity: usize, duration: Duration) -> ChunksTimeout<St> {
54 assert!(capacity > 0);
55
56 ChunksTimeout {
57 stream: stream.fuse(),
58 items: Vec::with_capacity(capacity),
59 cap: capacity,
60 clock: None,
61 duration,
62 }
63 }
64
65 fn take(mut self: Pin<&mut Self>) -> Vec<St::Item> {
66 let cap = self.cap;
67 mem::replace(self.as_mut().items(), Vec::with_capacity(cap))
68 }
69
70 pub fn get_ref(&self) -> &St {
73 self.stream.get_ref()
74 }
75
76 pub fn get_mut(&mut self) -> &mut St {
82 self.stream.get_mut()
83 }
84
85 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
91 self.stream().get_pin_mut()
92 }
93
94 pub fn into_inner(self) -> St {
99 self.stream.into_inner()
100 }
101}
102
103impl<St: Stream> Stream for ChunksTimeout<St> {
104 type Item = Vec<St::Item>;
105
106 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
107 loop {
108 match self.as_mut().stream().poll_next(cx) {
109 Poll::Ready(item) => match item {
110 Some(item) => {
114 if self.items.is_empty() {
115 *self.as_mut().clock() =
116 Some(Delay::new(self.duration));
117 }
118 self.as_mut().items().push(item);
119 if self.items.len() >= self.cap {
120 *self.as_mut().clock() = None;
121 return Poll::Ready(Some(self.as_mut().take()));
122 } else {
123 continue;
125 }
126 }
127
128 None => {
131 let last = if self.items.is_empty() {
132 None
133 } else {
134 let full_buf = mem::replace(self.as_mut().items(), Vec::new());
135 Some(full_buf)
136 };
137
138 return Poll::Ready(last);
139 }
140 },
141 Poll::Pending => {}
143 }
144
145 match self.as_mut().clock().as_pin_mut().map(|clock| clock.poll(cx)) {
146 Some(Poll::Ready(())) => {
147 *self.as_mut().clock() = None;
148 return Poll::Ready(Some(self.as_mut().take()));
149 }
150 Some(Poll::Pending) => {}
151 None => {
152 debug_assert!(
153 self.as_mut().items().is_empty(),
154 "Inner buffer is empty, but clock is available."
155 );
156 }
157 }
158
159 return Poll::Pending;
160 }
161 }
162
163 fn size_hint(&self) -> (usize, Option<usize>) {
164 let chunk_len = if self.items.is_empty() { 0 } else { 1 };
165 let (lower, upper) = self.stream.size_hint();
166 let lower = lower.saturating_add(chunk_len);
167 let upper = match upper {
168 Some(x) => x.checked_add(chunk_len),
169 None => None,
170 };
171 (lower, upper)
172 }
173}
174
175impl<St: FusedStream> FusedStream for ChunksTimeout<St> {
176 fn is_terminated(&self) -> bool {
177 self.stream.is_terminated() & self.items.is_empty()
178 }
179}
180
181#[cfg(feature = "sink")]
183impl<S, Item> Sink<Item> for ChunksTimeout<S>
184where
185 S: Stream + Sink<Item>,
186{
187 type Error = S::Error;
188
189 delegate_sink!(stream, Item);
190}
191
192#[cfg(test)]
193mod tests {
194 use super::*;
195 use futures::future;
196 use futures::{stream, FutureExt, StreamExt, TryFutureExt};
197 use std::iter;
198 use std::time::{Duration, Instant};
199
200 #[test]
201 fn messages_pass_through() {
202 let v = stream::iter(iter::once(5))
203 .chunks_timeout(5, Duration::new(1, 0))
204 .collect::<Vec<_>>();
205 tokio::run(
206 v.then(|x| {
207 assert_eq!(vec![vec![5]], x);
208 future::ready(())
209 })
210 .unit_error()
211 .boxed()
212 .compat(),
213 );
214 }
215
216 #[test]
217 fn message_chunks() {
218 let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
219 let stream = stream::iter(iter);
220
221 let chunk_stream = ChunksTimeout::new(stream, 5, Duration::new(1, 0));
222
223 let v = chunk_stream.collect::<Vec<_>>();
224 tokio::run(
225 v.then(|res| {
226 assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], res);
227 future::ready(())
228 })
229 .unit_error()
230 .boxed()
231 .compat(),
232 );
233 }
234
235 #[test]
236 fn message_early_exit() {
237 let iter = vec![1, 2, 3, 4].into_iter();
238 let stream = stream::iter(iter);
239
240 let chunk_stream = ChunksTimeout::new(stream, 5, Duration::new(1, 0));
241
242 let v = chunk_stream.collect::<Vec<_>>();
243 tokio::run(
244 v.then(|res| {
245 assert_eq!(vec![vec![1, 2, 3, 4]], res);
246 future::ready(())
247 })
248 .unit_error()
249 .boxed()
250 .compat(),
251 );
252 }
253
254 #[test]
256 fn message_timeout() {
257 let iter = vec![1, 2, 3, 4].into_iter();
258 let stream0 = stream::iter(iter);
259
260 let iter = vec![5].into_iter();
261 let stream1 = stream::iter(iter).then(move |n| {
262 Delay::new(Duration::from_millis(300))
263 .map(move |_| n)
264 });
265
266 let iter = vec![6, 7, 8].into_iter();
267 let stream2 = stream::iter(iter);
268
269 let stream = stream0.chain(stream1).chain(stream2);
270 let chunk_stream = ChunksTimeout::new(stream, 5, Duration::from_millis(100));
271
272 let now = Instant::now();
273 let min_times = [Duration::from_millis(80), Duration::from_millis(150)];
274 let max_times = [Duration::from_millis(280), Duration::from_millis(350)];
275 let results = vec![vec![1, 2, 3, 4], vec![5, 6, 7, 8]];
276 let mut i = 0;
277
278 let v = chunk_stream
279 .map(move |s| {
280 let now2 = Instant::now();
281 println!("{}: {:?} {:?}", i, now2 - now, s);
282 assert!((now2 - now) < max_times[i]);
283 assert!((now2 - now) > min_times[i]);
284 i += 1;
285 s
286 })
287 .collect::<Vec<_>>();
288
289 tokio::run(
290 v.then(move |res| {
291 assert_eq!(res, results);
292 future::ready(())
293 })
294 .unit_error()
295 .boxed()
296 .compat(),
297 );
298 }
299}