futures_bounded/
futures_tuple_set.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::task::{ready, Context, Poll};
4
5use futures_util::future::BoxFuture;
6
7use crate::{Delay, FuturesMap, PushError, Timeout};
8
9/// Represents a list of tuples of a [Future] and an associated piece of data.
10///
11/// Each future must finish within the specified time and the list never outgrows its capacity.
12pub struct FuturesTupleSet<O, D> {
13    id: u32,
14    inner: FuturesMap<u32, O>,
15    data: HashMap<u32, D>,
16}
17
18impl<O, D> FuturesTupleSet<O, D> {
19    pub fn new(make_delay: impl Fn() -> Delay + Send + Sync + 'static, capacity: usize) -> Self {
20        Self {
21            id: 0,
22            inner: FuturesMap::new(make_delay, capacity),
23            data: HashMap::new(),
24        }
25    }
26}
27
28impl<O, D> FuturesTupleSet<O, D>
29where
30    O: 'static,
31{
32    /// Push a future into the list.
33    ///
34    /// This method adds the given future to the list.
35    /// If the length of the list is equal to the capacity, this method returns a error that contains the passed future.
36    /// In that case, the future is not added to the set.
37    pub fn try_push<F>(&mut self, future: F, data: D) -> Result<(), (BoxFuture<O>, D)>
38    where
39        F: Future<Output = O> + Send + 'static,
40    {
41        self.id = self.id.wrapping_add(1);
42
43        match self.inner.try_push(self.id, future) {
44            Ok(()) => {}
45            Err(PushError::BeyondCapacity(w)) => return Err((w, data)),
46            Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
47        }
48        self.data.insert(self.id, data);
49
50        Ok(())
51    }
52
53    pub fn len(&self) -> usize {
54        self.inner.len()
55    }
56
57    pub fn is_empty(&self) -> bool {
58        self.inner.is_empty()
59    }
60
61    pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
62        self.inner.poll_ready_unpin(cx)
63    }
64
65    pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(Result<O, Timeout>, D)> {
66        let (id, res) = ready!(self.inner.poll_unpin(cx));
67        let data = self.data.remove(&id).expect("must have data for future");
68
69        Poll::Ready((res, data))
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76    use futures_util::future::poll_fn;
77    use futures_util::FutureExt;
78    use std::future::ready;
79    use std::time::Duration;
80
81    #[test]
82    fn tracks_associated_data_of_future() {
83        let mut set = FuturesTupleSet::new(|| Delay::futures_timer(Duration::from_secs(10)), 10);
84
85        let _ = set.try_push(ready(1), 1);
86        let _ = set.try_push(ready(2), 2);
87
88        let (res1, data1) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap();
89        let (res2, data2) = poll_fn(|cx| set.poll_unpin(cx)).now_or_never().unwrap();
90
91        assert_eq!(res1.unwrap(), data1);
92        assert_eq!(res2.unwrap(), data2);
93    }
94}