timeout_iterator/
asynchronous.rs1use crate::error::Error;
2use core::pin::Pin;
3use futures::stream::Stream;
4use futures::task::{Context, Poll};
5use pin_project::pin_project;
6use std::time::Duration;
7use tokio::time::timeout;
8use tokio_stream::StreamExt;
9
10#[pin_project]
11pub struct TimeoutStream<R: Stream> {
12 #[pin]
13 source: R,
14 buffer: Vec<R::Item>,
15}
16
17impl<R: Stream> TimeoutStream<R> {
18 pub async fn with_stream(source: R) -> Result<TimeoutStream<R>, Error> {
22 Ok(TimeoutStream {
23 source,
24 buffer: Vec::new(),
25 })
26 }
27
28 pub async fn peek_timeout(self: Pin<&mut Self>, duration: Duration) -> Result<&R::Item, Error> {
29 match timeout(duration, self.peek()).await {
31 Ok(Some(item)) => Ok(item),
32 Ok(None) => Err(Error::Disconnected),
33 Err(_) => Err(Error::TimedOut),
34 }
35 }
36
37 pub async fn next_timeout(
38 mut self: Pin<&mut Self>,
39 duration: Duration,
40 ) -> Result<R::Item, Error> {
41 match timeout(duration, self.next()).await {
43 Ok(Some(item)) => Ok(item),
44 Ok(None) => Err(Error::Disconnected),
45 Err(_) => Err(Error::TimedOut),
46 }
47 }
48
49 pub async fn peek(mut self: Pin<&mut Self>) -> Option<&R::Item> {
50 if self.as_mut().project().buffer.is_empty() {
51 match self.next().await {
52 Some(item) => self.as_mut().project().buffer.push(item),
53 None => return None,
54 }
55 }
56 self.project().buffer.first()
57 }
58}
59
60impl<R: Stream> Stream for TimeoutStream<R> {
61 type Item = R::Item;
62
63 fn poll_next(
64 self: Pin<&mut Self>,
65 cx: &mut Context<'_>,
66 ) -> Poll<Option<<Self as Stream>::Item>> {
67 if !self.buffer.is_empty() {
68 return Poll::Ready(Some(self.project().buffer.remove(0)));
69 }
70
71 self.project().source.poll_next(cx)
72 }
73}
74
75#[cfg(all(test, feature = "async"))]
76mod tests {
77 use super::*;
78 use assert_matches::assert_matches;
79 use futures::stream::iter;
80 use std::io::BufRead;
81
82 #[tokio::test]
83 async fn iterates() {
84 let realistic_message = r"1
852
863
874
885";
89 let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
90
91 let mut ti = TimeoutStream::with_stream(lines_iterator).await.unwrap();
92
93 assert_eq!(ti.next().await.unwrap().unwrap(), "1");
94 assert_eq!(ti.next().await.unwrap().unwrap(), "2");
95 assert_eq!(ti.next().await.unwrap().unwrap(), "3");
96 assert_eq!(ti.next().await.unwrap().unwrap(), "4");
97 assert_eq!(ti.next().await.unwrap().unwrap(), "5");
98 }
99
100 #[tokio::test]
101 async fn next_timeout() {
102 let realistic_message = r"1
1032
1043
1054
1065";
107 let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
108
109 let mut pinned_stream = Box::pin(TimeoutStream::with_stream(lines_iterator).await.unwrap());
110 let mut ti = pinned_stream.as_mut();
111
112 assert_eq!(ti.next().await.unwrap().unwrap(), "1");
113 assert_eq!(ti.next().await.unwrap().unwrap(), "2");
114 assert_eq!(ti.next().await.unwrap().unwrap(), "3");
115 assert_eq!(ti.next().await.unwrap().unwrap(), "4");
116 assert_eq!(ti.next().await.unwrap().unwrap(), "5");
117
118 let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
119 assert!(timeout_result.is_err());
120 }
121
122 #[tokio::test]
123 async fn peek_timeout_doesnt_remove() {
124 let realistic_message = r"1
1252
1263
1274
1285";
129 let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
130
131 let mut ti = Box::pin(TimeoutStream::with_stream(lines_iterator).await.unwrap());
132
133 assert_eq!(ti.next().await.unwrap().unwrap(), "1");
134 assert_eq!(ti.next().await.unwrap().unwrap(), "2");
135 assert_eq!(
136 ti.as_mut()
137 .peek_timeout(Duration::from_secs(1))
138 .await
139 .unwrap()
140 .as_ref()
141 .unwrap(),
142 "3"
143 );
144 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "3");
145 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "4");
146 assert_eq!(
147 ti.as_mut()
148 .peek_timeout(Duration::from_secs(1))
149 .await
150 .unwrap()
151 .as_ref()
152 .unwrap(),
153 "5"
154 );
155 assert_eq!(
156 ti.as_mut()
157 .peek_timeout(Duration::from_secs(1))
158 .await
159 .unwrap()
160 .as_ref()
161 .unwrap(),
162 "5"
163 );
164 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "5");
165
166 let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
167 assert!(timeout_result.is_err());
168 }
169
170 #[tokio::test]
171 async fn peek_doesnt_remove() {
172 let realistic_message = r"1
1732
1743
1754
1765";
177 let lines_iterator = iter((Box::new(realistic_message.as_bytes())).lines());
178
179 let mut ti = Box::pin(TimeoutStream::with_stream(lines_iterator).await.unwrap());
180
181 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "1");
182 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "2");
183 assert_eq!(ti.as_mut().peek().await.unwrap().as_ref().unwrap(), "3");
184 assert_eq!(
185 ti.as_mut()
186 .peek_timeout(Duration::from_secs(1))
187 .await
188 .unwrap()
189 .as_ref()
190 .unwrap(),
191 "3"
192 );
193 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "3");
194 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "4");
195 assert_eq!(
196 ti.as_mut()
197 .peek_timeout(Duration::from_secs(1))
198 .await
199 .unwrap()
200 .as_ref()
201 .unwrap(),
202 "5"
203 );
204 assert_eq!(ti.as_mut().peek().await.unwrap().as_ref().unwrap(), "5");
205 assert_eq!(ti.as_mut().next().await.unwrap().unwrap(), "5");
206
207 let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
208 assert!(timeout_result.is_err());
209 }
210
211 #[tokio::test]
212 async fn item_iterator() {
213 let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
214
215 let mut ti = Box::pin(
216 TimeoutStream::with_stream(iter(numbers.into_iter()))
217 .await
218 .unwrap(),
219 );
220
221 assert_eq!(ti.as_mut().next().await.unwrap(), 1);
222 assert_eq!(ti.as_mut().next().await.unwrap(), 2);
223 assert_eq!(
224 *ti.as_mut()
225 .peek_timeout(Duration::from_secs(1))
226 .await
227 .unwrap(),
228 3
229 );
230 assert_eq!(ti.as_mut().next().await.unwrap(), 3);
231 assert_eq!(ti.as_mut().next().await.unwrap(), 4);
232 assert_eq!(
233 *ti.as_mut()
234 .peek_timeout(Duration::from_secs(1))
235 .await
236 .unwrap(),
237 5
238 );
239 assert_eq!(
240 *ti.as_mut()
241 .peek_timeout(Duration::from_secs(1))
242 .await
243 .unwrap(),
244 5
245 );
246 assert_eq!(ti.as_mut().next().await.unwrap(), 5);
247
248 let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
249 assert!(timeout_result.is_err());
250 }
251
252 #[tokio::test]
253 async fn timedout_future_doesnt_drop_item() {
254 let numbers: Vec<u32> = vec![1, 2, 3, 4, 5];
255
256 let throttled_numbers = Box::pin(
257 iter(numbers.into_iter())
258 .throttle(Duration::from_secs(1)),
260 );
261
262 let mut pinned_stream =
263 Box::pin(TimeoutStream::with_stream(throttled_numbers).await.unwrap());
264 let mut ti = pinned_stream.as_mut();
265
266 assert_eq!(ti.as_mut().next().await.unwrap(), 1);
267 assert_matches!(
268 ti.as_mut()
269 .next_timeout(Duration::from_millis(500))
270 .await
271 .unwrap_err(),
272 Error::TimedOut
273 );
274 assert_eq!(ti.as_mut().next().await.unwrap(), 2);
275 assert_matches!(
276 ti.as_mut()
277 .peek_timeout(Duration::from_millis(500))
278 .await
279 .unwrap_err(),
280 Error::TimedOut
281 );
282 assert_eq!(*ti.as_mut().peek().await.unwrap(), 3);
283 assert_eq!(ti.as_mut().next().await.unwrap(), 3);
284 assert_matches!(
285 ti.as_mut()
286 .next_timeout(Duration::from_millis(500))
287 .await
288 .unwrap_err(),
289 Error::TimedOut
290 );
291 assert_eq!(ti.as_mut().next().await.unwrap(), 4);
292 assert_matches!(
293 ti.as_mut()
294 .next_timeout(Duration::from_millis(100))
295 .await
296 .unwrap_err(),
297 Error::TimedOut
298 );
299 assert_matches!(
300 ti.as_mut()
301 .next_timeout(Duration::from_millis(100))
302 .await
303 .unwrap_err(),
304 Error::TimedOut
305 );
306 assert_matches!(
307 ti.as_mut()
308 .next_timeout(Duration::from_millis(100))
309 .await
310 .unwrap_err(),
311 Error::TimedOut
312 );
313 assert_matches!(
314 ti.as_mut()
315 .next_timeout(Duration::from_millis(100))
316 .await
317 .unwrap_err(),
318 Error::TimedOut
319 );
320 assert_matches!(
321 ti.as_mut()
322 .next_timeout(Duration::from_millis(100))
323 .await
324 .unwrap_err(),
325 Error::TimedOut
326 );
327 assert_matches!(
328 ti.as_mut()
329 .next_timeout(Duration::from_millis(100))
330 .await
331 .unwrap_err(),
332 Error::TimedOut
333 );
334 assert_eq!(ti.as_mut().next().await.unwrap(), 5);
335
336 let timeout_result = ti.as_mut().next_timeout(Duration::from_secs(1)).await;
337 assert!(timeout_result.is_err());
338 }
339}