pollable_map/futures/
set.rs1use std::future::Future;
2use std::pin::Pin;
3
4use super::FutureMap;
5use futures::stream::FusedStream;
6use futures::{Stream, StreamExt};
7use std::task::{Context, Poll};
8
9pub struct FutureSet<S> {
10 id: i64,
11 map: FutureMap<i64, S>,
12}
13
14impl<S> Default for FutureSet<S>
15where
16 S: Future + Send + Unpin + 'static,
17{
18 fn default() -> Self {
19 Self::new()
20 }
21}
22
23impl<S> FutureSet<S>
24where
25 S: Future + Send + Unpin + 'static,
26{
27 pub fn new() -> Self {
29 Self {
30 id: 0,
31 map: FutureMap::default(),
32 }
33 }
34
35 pub fn insert(&mut self, fut: S) -> bool {
37 self.id = self.id.wrapping_add(1);
38 self.map.insert(self.id, fut)
39 }
40
41 pub fn iter(&self) -> impl Iterator<Item = &S> {
43 self.map.iter().map(|(_, st)| st)
44 }
45
46 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut S> {
48 self.map.iter_mut().map(|(_, st)| st)
49 }
50
51 pub fn iter_pin(&mut self) -> impl Iterator<Item = Pin<&mut S>> {
53 self.map.iter_pin().map(|(_, st)| st)
54 }
55
56 pub fn clear(&mut self) {
58 self.map.clear();
59 }
60
61 pub fn len(&self) -> usize {
63 self.map.len()
64 }
65
66 pub fn is_empty(&self) -> bool {
68 self.map.is_empty()
69 }
70}
71
72impl<S> FromIterator<S> for FutureSet<S>
73where
74 S: Future + Send + Unpin + 'static,
75{
76 fn from_iter<I: IntoIterator<Item = S>>(iter: I) -> Self {
77 let mut maps = Self::new();
78 for st in iter {
79 maps.insert(st);
80 }
81 maps
82 }
83}
84
85impl<S> Stream for FutureSet<S>
86where
87 S: Future + Send + Unpin + 'static,
88{
89 type Item = S::Output;
90
91 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
92 self.map
93 .poll_next_unpin(cx)
94 .map(|output| output.map(|(_, item)| item))
95 }
96
97 fn size_hint(&self) -> (usize, Option<usize>) {
98 self.map.size_hint()
99 }
100}
101
102impl<S> FusedStream for FutureSet<S>
103where
104 S: Future + Send + Unpin + 'static,
105{
106 fn is_terminated(&self) -> bool {
107 self.map.is_terminated()
108 }
109}
110
111#[cfg(test)]
112mod test {
113 use crate::futures::set::FutureSet;
114 use futures::StreamExt;
115
116 #[test]
117 fn valid_future_set() {
118 let mut list = FutureSet::new();
119 assert!(list.insert(futures::future::ready(0)));
120 assert!(list.insert(futures::future::ready(1)));
121
122 futures::executor::block_on(async move {
123 let val = list.next().await;
124 assert_eq!(val, Some(0));
125 let val = list.next().await;
126 assert_eq!(val, Some(1));
127 });
128 }
129}