pollable_map/stream/
set.rs

1use futures::{Stream, StreamExt};
2use std::pin::Pin;
3
4use super::StreamMap;
5use futures::stream::FusedStream;
6use std::task::{Context, Poll};
7
8pub struct StreamSet<S> {
9    id: i64,
10    map: StreamMap<i64, S>,
11}
12
13impl<S> Default for StreamSet<S>
14where
15    S: Stream + Send + Unpin + 'static,
16{
17    fn default() -> Self {
18        Self::new()
19    }
20}
21
22impl<S> StreamSet<S>
23where
24    S: Stream + Send + Unpin + 'static,
25{
26    /// Creates an empty ['StreamSet`]
27    pub fn new() -> Self {
28        Self {
29            id: 0,
30            map: StreamMap::default(),
31        }
32    }
33
34    /// Insert a stream into the set of streams.
35    pub fn insert(&mut self, stream: S) -> bool {
36        self.id = self.id.wrapping_add(1);
37        self.map.insert(self.id, stream)
38    }
39
40    /// An iterator visiting all streams in arbitrary order.
41    pub fn iter(&self) -> impl Iterator<Item = &S> {
42        self.map.iter().map(|(_, st)| st)
43    }
44
45    /// An iterator visiting all streams mutably in arbitrary order.
46    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut S> {
47        self.map.iter_mut().map(|(_, st)| st)
48    }
49
50    /// An iterator visiting all streams pinned valued in arbitrary order
51    pub fn iter_pin(&mut self) -> impl Iterator<Item = Pin<&mut S>> {
52        self.map.iter_pin().map(|(_, st)| st)
53    }
54
55    /// Clears the set.
56    pub fn clear(&mut self) {
57        self.map.clear();
58    }
59
60    /// Returns the number of streams in the set.
61    pub fn len(&self) -> usize {
62        self.map.len()
63    }
64
65    /// Return `true` map contains no elements.
66    pub fn is_empty(&self) -> bool {
67        self.map.is_empty()
68    }
69}
70
71impl<S> FromIterator<S> for StreamSet<S>
72where
73    S: Stream + Send + Unpin + 'static,
74{
75    fn from_iter<I: IntoIterator<Item = S>>(iter: I) -> Self {
76        let mut maps = Self::new();
77        for st in iter {
78            maps.insert(st);
79        }
80        maps
81    }
82}
83
84impl<S> Stream for StreamSet<S>
85where
86    S: Stream + Send + Unpin + 'static,
87{
88    type Item = S::Item;
89
90    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
91        self.map
92            .poll_next_unpin(cx)
93            .map(|output| output.map(|(_, item)| item))
94    }
95
96    fn size_hint(&self) -> (usize, Option<usize>) {
97        self.map.size_hint()
98    }
99}
100
101impl<S> FusedStream for StreamSet<S>
102where
103    S: Stream + Send + Unpin + 'static,
104{
105    fn is_terminated(&self) -> bool {
106        self.map.is_terminated()
107    }
108}
109
110#[cfg(test)]
111mod test {
112    use crate::stream::set::StreamSet;
113    use futures::StreamExt;
114
115    #[test]
116    fn valid_stream_set() {
117        let mut list = StreamSet::new();
118        assert!(list.insert(futures::stream::once(async { 0 }).boxed()));
119        assert!(list.insert(futures::stream::once(async { 1 }).boxed()));
120
121        futures::executor::block_on(async move {
122            let val = list.next().await;
123            assert_eq!(val, Some(0));
124            let val = list.next().await;
125            assert_eq!(val, Some(1));
126        });
127    }
128}