timeout_iterator/
asynchronous.rs

1use crate::error::Error;
2use core::pin::Pin;
3use futures::stream::Stream;
4use futures::task::{Context, Poll};
5use pin_project::pin_project;
6use std::time::Duration;
7use tokio::time::timeout;
8use tokio_stream::StreamExt;
9
10#[pin_project]
11pub struct TimeoutStream<R: Stream> {
12    #[pin]
13    source: R,
14    buffer: Vec<R::Item>,
15}
16
17impl<R: Stream> TimeoutStream<R> {
18    /**
19     * Use this constructor
20     */
21    pub async fn with_stream(source: R) -> Result<TimeoutStream<R>, Error> {
22        Ok(TimeoutStream {
23            source,
24            buffer: Vec::new(),
25        })
26    }
27
28    pub async fn peek_timeout(self: Pin<&mut Self>, duration: Duration) -> Result<&R::Item, Error> {
29        // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
30        match timeout(duration, self.peek()).await {
31            Ok(Some(item)) => Ok(item),
32            Ok(None) => Err(Error::Disconnected),
33            Err(_) => Err(Error::TimedOut),
34        }
35    }
36
37    pub async fn next_timeout(
38        mut self: Pin<&mut Self>,
39        duration: Duration,
40    ) -> Result<R::Item, Error> {
41        // Wrap the future with a `Timeout` set to expire in 10 milliseconds.
42        match timeout(duration, self.next()).await {
43            Ok(Some(item)) => Ok(item),
44            Ok(None) => Err(Error::Disconnected),
45            Err(_) => Err(Error::TimedOut),
46        }
47    }
48
49    pub async fn peek(mut self: Pin<&mut Self>) -> Option<&R::Item> {
50        if self.as_mut().project().buffer.is_empty() {
51            match self.next().await {
52                Some(item) => self.as_mut().project().buffer.push(item),
53                None => return None,
54            }
55        }
56        self.project().buffer.first()
57    }
58}
59
60impl<R: Stream> Stream for TimeoutStream<R> {
61    type Item = R::Item;
62
63    fn poll_next(
64        self: Pin<&mut Self>,
65        cx: &mut Context<'_>,
66    ) -> Poll<Option<<Self as Stream>::Item>> {
67        if !self.buffer.is_empty() {
68            return Poll::Ready(Some(self.project().buffer.remove(0)));
69        }
70
71        self.project().source.poll_next(cx)
72    }
73}
74
75#[cfg(all(test, feature = "async"))]
76mod tests {
77    use super::*;
78    use assert_matches::assert_matches;
79    use futures::stream::iter;
80    use std::io::BufRead;
81
82    #[tokio::test]
83    async fn iterates() {
84        let realistic_message = r"1
852
863
874
885";
89        let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
90
91        let mut ti = TimeoutStream::with_stream(lines_iterator).await.unwrap();
92
93        assert_eq!(ti.next().await.unwrap().unwrap(), "1");
94        assert_eq!(ti.next().await.unwrap().unwrap(), "2");
95        assert_eq!(ti.next().await.unwrap().unwrap(), "3");
96        assert_eq!(ti.next().await.unwrap().unwrap(), "4");
97        assert_eq!(ti.next().await.unwrap().unwrap(), "5");
98    }
99
100    #[tokio::test]
101    async fn next_timeout() {
102        let realistic_message = r"1
1032
1043
1054
1065";
107        let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
108
109        let mut pinned_stream = Box::pin(TimeoutStream::with_stream(lines_iterator).await.unwrap());
110        let mut ti = pinned_stream.as_mut();
111
112        assert_eq!(ti.next().await.unwrap().unwrap(), "1");
113        assert_eq!(ti.next().await.unwrap().unwrap(), "2");
114        assert_eq!(ti.next().await.unwrap().unwrap(), "3");
115        assert_eq!(ti.next().await.unwrap().unwrap(), "4");
116        assert_eq!(ti.next().await.unwrap().unwrap(), "5");
117
118        let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
119        assert!(timeout_result.is_err());
120    }
121
122    #[tokio::test]
123    async fn peek_timeout_doesnt_remove() {
124        let realistic_message = r"1
1252
1263
1274
1285";
129        let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
130
131        let mut ti = Box::pin(TimeoutStream::with_stream(lines_iterator).await.unwrap());
132
133        assert_eq!(ti.next().await.unwrap().unwrap(), "1");
134        assert_eq!(ti.next().await.unwrap().unwrap(), "2");
135        assert_eq!(
136            ti.as_mut()
137                .peek_timeout(Duration::from_secs(1))
138                .await
139                .unwrap()
140                .as_ref()
141                .unwrap(),
142            "3"
143        );
144        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "3");
145        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "4");
146        assert_eq!(
147            ti.as_mut()
148                .peek_timeout(Duration::from_secs(1))
149                .await
150                .unwrap()
151                .as_ref()
152                .unwrap(),
153            "5"
154        );
155        assert_eq!(
156            ti.as_mut()
157                .peek_timeout(Duration::from_secs(1))
158                .await
159                .unwrap()
160                .as_ref()
161                .unwrap(),
162            "5"
163        );
164        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "5");
165
166        let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
167        assert!(timeout_result.is_err());
168    }
169
170    #[tokio::test]
171    async fn peek_doesnt_remove() {
172        let realistic_message = r"1
1732
1743
1754
1765";
177        let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
178
179        let mut ti = Box::pin(TimeoutStream::with_stream(lines_iterator).await.unwrap());
180
181        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "1");
182        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "2");
183        assert_eq!(ti.as_mut().peek().await.unwrap().as_ref().unwrap(), "3");
184        assert_eq!(
185            ti.as_mut()
186                .peek_timeout(Duration::from_secs(1))
187                .await
188                .unwrap()
189                .as_ref()
190                .unwrap(),
191            "3"
192        );
193        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "3");
194        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "4");
195        assert_eq!(
196            ti.as_mut()
197                .peek_timeout(Duration::from_secs(1))
198                .await
199                .unwrap()
200                .as_ref()
201                .unwrap(),
202            "5"
203        );
204        assert_eq!(ti.as_mut().peek().await.unwrap().as_ref().unwrap(), "5");
205        assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "5");
206
207        let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
208        assert!(timeout_result.is_err());
209    }
210
211    #[tokio::test]
212    async fn item_iterator() {
213        let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
214
215        let mut ti = Box::pin(
216            TimeoutStream::with_stream(iter(numbers.into_iter()))
217                .await
218                .unwrap(),
219        );
220
221        assert_eq!(ti.as_mut().next().await.unwrap(), 1);
222        assert_eq!(ti.as_mut().next().await.unwrap(), 2);
223        assert_eq!(
224            *ti.as_mut()
225                .peek_timeout(Duration::from_secs(1))
226                .await
227                .unwrap(),
228            3
229        );
230        assert_eq!(ti.as_mut().next().await.unwrap(), 3);
231        assert_eq!(ti.as_mut().next().await.unwrap(), 4);
232        assert_eq!(
233            *ti.as_mut()
234                .peek_timeout(Duration::from_secs(1))
235                .await
236                .unwrap(),
237            5
238        );
239        assert_eq!(
240            *ti.as_mut()
241                .peek_timeout(Duration::from_secs(1))
242                .await
243                .unwrap(),
244            5
245        );
246        assert_eq!(ti.as_mut().next().await.unwrap(), 5);
247
248        let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
249        assert!(timeout_result.is_err());
250    }
251
252    #[tokio::test]
253    async fn timedout_future_doesnt_drop_item() {
254        let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
255
256        let throttled_numbers = Box::pin(
257            iter(numbers.into_iter())
258                // item every second at most
259                .throttle(Duration::from_secs(1)),
260        );
261
262        let mut pinned_stream =
263            Box::pin(TimeoutStream::with_stream(throttled_numbers).await.unwrap());
264        let mut ti = pinned_stream.as_mut();
265
266        assert_eq!(ti.as_mut().next().await.unwrap(), 1);
267        assert_matches!(
268            ti.as_mut()
269                .next_timeout(Duration::from_millis(500))
270                .await
271                .unwrap_err(),
272            Error::TimedOut
273        );
274        assert_eq!(ti.as_mut().next().await.unwrap(), 2);
275        assert_matches!(
276            ti.as_mut()
277                .peek_timeout(Duration::from_millis(500))
278                .await
279                .unwrap_err(),
280            Error::TimedOut
281        );
282        assert_eq!(*ti.as_mut().peek().await.unwrap(), 3);
283        assert_eq!(ti.as_mut().next().await.unwrap(), 3);
284        assert_matches!(
285            ti.as_mut()
286                .next_timeout(Duration::from_millis(500))
287                .await
288                .unwrap_err(),
289            Error::TimedOut
290        );
291        assert_eq!(ti.as_mut().next().await.unwrap(), 4);
292        assert_matches!(
293            ti.as_mut()
294                .next_timeout(Duration::from_millis(100))
295                .await
296                .unwrap_err(),
297            Error::TimedOut
298        );
299        assert_matches!(
300            ti.as_mut()
301                .next_timeout(Duration::from_millis(100))
302                .await
303                .unwrap_err(),
304            Error::TimedOut
305        );
306        assert_matches!(
307            ti.as_mut()
308                .next_timeout(Duration::from_millis(100))
309                .await
310                .unwrap_err(),
311            Error::TimedOut
312        );
313        assert_matches!(
314            ti.as_mut()
315                .next_timeout(Duration::from_millis(100))
316                .await
317                .unwrap_err(),
318            Error::TimedOut
319        );
320        assert_matches!(
321            ti.as_mut()
322                .next_timeout(Duration::from_millis(100))
323                .await
324                .unwrap_err(),
325            Error::TimedOut
326        );
327        assert_matches!(
328            ti.as_mut()
329                .next_timeout(Duration::from_millis(100))
330                .await
331                .unwrap_err(),
332            Error::TimedOut
333        );
334        assert_eq!(ti.as_mut().next().await.unwrap(), 5);
335
336        let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
337        assert!(timeout_result.is_err());
338    }
339}