pollable_map/stream/
timeout_set.rs1use crate::common::Timed;
2use crate::stream::set::StreamSet;
3use futures::{Stream, StreamExt};
4use std::ops::{Deref, DerefMut};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9pub struct TimeoutStreamSet<S> {
10 duration: Duration,
11 set: StreamSet<Timed<S>>,
12}
13
14impl<S> Deref for TimeoutStreamSet<S> {
15 type Target = StreamSet<Timed<S>>;
16 fn deref(&self) -> &Self::Target {
17 &self.set
18 }
19}
20
21impl<S> DerefMut for TimeoutStreamSet<S> {
22 fn deref_mut(&mut self) -> &mut Self::Target {
23 &mut self.set
24 }
25}
26
27impl<S> TimeoutStreamSet<S>
28where
29 S: Stream + Send + Unpin + 'static,
30{
31 pub fn new(duration: Duration) -> Self {
33 Self {
34 duration,
35 set: StreamSet::new(),
36 }
37 }
38
39 pub fn insert(&mut self, stream: S) -> bool {
41 self.set.insert(Timed::new(stream, self.duration))
42 }
43}
44
45impl<S> Stream for TimeoutStreamSet<S>
46where
47 S: Stream + Send + Unpin + 'static,
48{
49 type Item = std::io::Result<S::Item>;
50 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 self.set.poll_next_unpin(cx)
52 }
53
54 fn size_hint(&self) -> (usize, Option<usize>) {
55 self.set.size_hint()
56 }
57}