pollable_map/stream/
timeout_map.rs

1use crate::common::Timed;
2use crate::stream::StreamMap;
3use futures::{Stream, StreamExt};
4use std::ops::{Deref, DerefMut};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use std::time::Duration;
8
9pub struct TimeoutStreamMap<K, S> {
10    duration: Duration,
11    map: StreamMap<K, Timed<S>>,
12}
13
14impl<K, S> Deref for TimeoutStreamMap<K, S> {
15    type Target = StreamMap<K, Timed<S>>;
16    fn deref(&self) -> &Self::Target {
17        &self.map
18    }
19}
20
21impl<K, S> DerefMut for TimeoutStreamMap<K, S> {
22    fn deref_mut(&mut self) -> &mut Self::Target {
23        &mut self.map
24    }
25}
26
27impl<K, S> TimeoutStreamMap<K, S>
28where
29    K: Clone + PartialEq + Send + Unpin + 'static,
30    S: Stream + Send + Unpin + 'static,
31{
32    /// Create an empty [`TimeoutStreamMap`]
33    pub fn new(duration: Duration) -> Self {
34        Self {
35            duration,
36            map: StreamMap::new(),
37        }
38    }
39
40    /// Insert a stream into the map with a unique key.
41    /// The function will return true if the map does not have the key present,
42    /// otherwise it will return false
43    pub fn insert(&mut self, key: K, stream: S) -> bool {
44        self.map.insert(key, Timed::new(stream, self.duration))
45    }
46}
47
48impl<K, S> Stream for TimeoutStreamMap<K, S>
49where
50    K: Clone + PartialEq + Send + Unpin + 'static,
51    S: Stream + Send + Unpin + 'static,
52{
53    type Item = (K, std::io::Result<S::Item>);
54    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
55        self.map.poll_next_unpin(cx)
56    }
57
58    fn size_hint(&self) -> (usize, Option<usize>) {
59        self.map.size_hint()
60    }
61}