pollable_map/stream/
timeout_map.rs1use crate::common::Timed;
2use crate::stream::StreamMap;
3use futures::{Stream, StreamExt};
4use std::ops::{Deref, DerefMut};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9pub struct TimeoutStreamMap<K, S> {
10 duration: Duration,
11 map: StreamMap<K, Timed<S>>,
12}
13
14impl<K, S> Deref for TimeoutStreamMap<K, S> {
15 type Target = StreamMap<K, Timed<S>>;
16 fn deref(&self) -> &Self::Target {
17 &self.map
18 }
19}
20
21impl<K, S> DerefMut for TimeoutStreamMap<K, S> {
22 fn deref_mut(&mut self) -> &mut Self::Target {
23 &mut self.map
24 }
25}
26
27impl<K, S> TimeoutStreamMap<K, S>
28where
29 K: Clone + PartialEq + Send + Unpin + 'static,
30 S: Stream + Send + Unpin + 'static,
31{
32 pub fn new(duration: Duration) -> Self {
34 Self {
35 duration,
36 map: StreamMap::new(),
37 }
38 }
39
40 pub fn insert(&mut self, key: K, stream: S) -> bool {
44 self.map.insert(key, Timed::new(stream, self.duration))
45 }
46}
47
48impl<K, S> Stream for TimeoutStreamMap<K, S>
49where
50 K: Clone + PartialEq + Send + Unpin + 'static,
51 S: Stream + Send + Unpin + 'static,
52{
53 type Item = (K, std::io::Result<S::Item>);
54 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55 self.map.poll_next_unpin(cx)
56 }
57
58 fn size_hint(&self) -> (usize, Option<usize>) {
59 self.map.size_hint()
60 }
61}