pollable_map/futures/
timeout_set.rs

1use crate::common::Timed;
2use crate::futures::set::FutureSet;
3use futures::stream::FusedStream;
4use futures::{Stream, StreamExt};
5use std::future::Future;
6use std::ops::{Deref, DerefMut};
7use std::pin::Pin;
8use std::task::{Context, Poll};
9use std::time::Duration;
10
11pub struct TimeoutFutureSet<S> {
12    duration: Duration,
13    set: FutureSet<Timed<S>>,
14}
15
16impl<S> Deref for TimeoutFutureSet<S> {
17    type Target = FutureSet<Timed<S>>;
18    fn deref(&self) -> &Self::Target {
19        &self.set
20    }
21}
22
23impl<S> DerefMut for TimeoutFutureSet<S> {
24    fn deref_mut(&mut self) -> &mut Self::Target {
25        &mut self.set
26    }
27}
28
29impl<F> TimeoutFutureSet<F>
30where
31    F: Future + Send + Unpin + 'static,
32{
33    /// Create an empty [`TimeoutFutureSet`]
34    pub fn new(duration: Duration) -> Self {
35        Self {
36            duration,
37            set: FutureSet::new(),
38        }
39    }
40
41    /// Insert a future into the set of futures.
42    pub fn insert(&mut self, future: F) -> bool {
43        self.set.insert(Timed::new(future, self.duration))
44    }
45}
46
47impl<F> Stream for TimeoutFutureSet<F>
48where
49    F: Future + Send + Unpin + 'static,
50{
51    type Item = std::io::Result<F::Output>;
52    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53        self.set.poll_next_unpin(cx)
54    }
55
56    fn size_hint(&self) -> (usize, Option<usize>) {
57        self.set.size_hint()
58    }
59}
60
61impl<F> FusedStream for TimeoutFutureSet<F>
62where
63    F: Future + Send + Unpin + 'static,
64{
65    fn is_terminated(&self) -> bool {
66        self.set.is_terminated()
67    }
68}
69
70#[cfg(test)]
71mod test {
72    use crate::futures::timeout_set::TimeoutFutureSet;
73    use futures::StreamExt;
74    use std::time::Duration;
75
76    #[test]
77    fn timeout_set() {
78        let mut list = TimeoutFutureSet::new(Duration::from_millis(100));
79        assert!(list.insert(futures::future::pending::<()>()));
80
81        futures::executor::block_on(async move {
82            let result = list.next().await;
83            let Some(Err(e)) = result else {
84                unreachable!("result is err");
85            };
86
87            assert_eq!(e.kind(), std::io::ErrorKind::TimedOut);
88        });
89    }
90
91    #[test]
92    fn valid_stream() {
93        let mut list = TimeoutFutureSet::new(Duration::from_secs(10));
94        assert!(list.insert(futures::future::ready(0)));
95
96        futures::executor::block_on(async move {
97            let result = list.next().await;
98            let Some(Ok(val)) = result else {
99                unreachable!("result is err");
100            };
101
102            assert_eq!(val, 0);
103        });
104    }
105}