pollable_map/futures/
timeout_set.rs1use crate::common::Timed;
2use crate::futures::set::FutureSet;
3use futures::stream::FusedStream;
4use futures::{Stream, StreamExt};
5use std::future::Future;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11pub struct TimeoutFutureSet<S> {
12 duration: Duration,
13 set: FutureSet<Timed<S>>,
14}
15
16impl<S> Deref for TimeoutFutureSet<S> {
17 type Target = FutureSet<Timed<S>>;
18 fn deref(&self) -> &Self::Target {
19 &self.set
20 }
21}
22
23impl<S> DerefMut for TimeoutFutureSet<S> {
24 fn deref_mut(&mut self) -> &mut Self::Target {
25 &mut self.set
26 }
27}
28
29impl<F> TimeoutFutureSet<F>
30where
31 F: Future + Send + Unpin + 'static,
32{
33 pub fn new(duration: Duration) -> Self {
35 Self {
36 duration,
37 set: FutureSet::new(),
38 }
39 }
40
41 pub fn insert(&mut self, future: F) -> bool {
43 self.set.insert(Timed::new(future, self.duration))
44 }
45}
46
47impl<F> Stream for TimeoutFutureSet<F>
48where
49 F: Future + Send + Unpin + 'static,
50{
51 type Item = std::io::Result<F::Output>;
52 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53 self.set.poll_next_unpin(cx)
54 }
55
56 fn size_hint(&self) -> (usize, Option<usize>) {
57 self.set.size_hint()
58 }
59}
60
61impl<F> FusedStream for TimeoutFutureSet<F>
62where
63 F: Future + Send + Unpin + 'static,
64{
65 fn is_terminated(&self) -> bool {
66 self.set.is_terminated()
67 }
68}
69
70#[cfg(test)]
71mod test {
72 use crate::futures::timeout_set::TimeoutFutureSet;
73 use futures::StreamExt;
74 use std::time::Duration;
75
76 #[test]
77 fn timeout_set() {
78 let mut list = TimeoutFutureSet::new(Duration::from_millis(100));
79 assert!(list.insert(futures::future::pending::<()>()));
80
81 futures::executor::block_on(async move {
82 let result = list.next().await;
83 let Some(Err(e)) = result else {
84 unreachable!("result is err");
85 };
86
87 assert_eq!(e.kind(), std::io::ErrorKind::TimedOut);
88 });
89 }
90
91 #[test]
92 fn valid_stream() {
93 let mut list = TimeoutFutureSet::new(Duration::from_secs(10));
94 assert!(list.insert(futures::future::ready(0)));
95
96 futures::executor::block_on(async move {
97 let result = list.next().await;
98 let Some(Ok(val)) = result else {
99 unreachable!("result is err");
100 };
101
102 assert_eq!(val, 0);
103 });
104 }
105}