pollable_map/stream/
timeout_set.rs

1use crate::common::Timed;
2use crate::stream::set::StreamSet;
3use futures::{Stream, StreamExt};
4use std::ops::{Deref, DerefMut};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9pub struct TimeoutStreamSet<S> {
10    duration: Duration,
11    set: StreamSet<Timed<S>>,
12}
13
14impl<S> Deref for TimeoutStreamSet<S> {
15    type Target = StreamSet<Timed<S>>;
16    fn deref(&self) -> &Self::Target {
17        &self.set
18    }
19}
20
21impl<S> DerefMut for TimeoutStreamSet<S> {
22    fn deref_mut(&mut self) -> &mut Self::Target {
23        &mut self.set
24    }
25}
26
27impl<S> TimeoutStreamSet<S>
28where
29    S: Stream + Send + Unpin + 'static,
30{
31    /// Create an empty ['TimeoutStreamSet']
32    pub fn new(duration: Duration) -> Self {
33        Self {
34            duration,
35            set: StreamSet::new(),
36        }
37    }
38
39    /// Insert a stream into the set of streams.
40    pub fn insert(&mut self, stream: S) -> bool {
41        self.set.insert(Timed::new(stream, self.duration))
42    }
43}
44
45impl<S> Stream for TimeoutStreamSet<S>
46where
47    S: Stream + Send + Unpin + 'static,
48{
49    type Item = std::io::Result<S::Item>;
50    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        self.set.poll_next_unpin(cx)
52    }
53
54    fn size_hint(&self) -> (usize, Option<usize>) {
55        self.set.size_hint()
56    }
57}