pollable_map/stream/
set.rs

1use futures::{Stream, StreamExt};
2use std::pin::Pin;
3
4use super::StreamMap;
5use std::task::{Context, Poll};
6
7pub struct StreamSet<S> {
8    id: i64,
9    map: StreamMap<i64, S>,
10}
11
12impl<S> Default for StreamSet<S>
13where
14    S: Stream + Send + Unpin + 'static,
15{
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl<S> StreamSet<S>
22where
23    S: Stream + Send + Unpin + 'static,
24{
25    /// Creates an empty ['StreamSet`]
26    pub fn new() -> Self {
27        Self {
28            id: 0,
29            map: StreamMap::default(),
30        }
31    }
32
33    /// Insert a stream into the set of streams.
34    pub fn insert(&mut self, stream: S) -> bool {
35        let id = self.id.wrapping_add(1);
36        self.map.insert(id, stream)
37    }
38
39    /// An iterator visiting all streams in arbitrary order.
40    pub fn iter(&self) -> impl Iterator<Item = &S> {
41        self.map.iter().map(|(_, st)| st)
42    }
43
44    /// An iterator visiting all streams mutably in arbitrary order.
45    pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut S> {
46        self.map.iter_mut().map(|(_, st)| st)
47    }
48
49    /// An iterator visiting all streams pinned valued in arbitrary order
50    pub fn iter_pin(&mut self) -> impl Iterator<Item = Pin<&mut S>> {
51        self.map.iter_pin().map(|(_, st)| st)
52    }
53
54    /// Clears the set.
55    pub fn clear(&mut self) {
56        self.map.clear();
57    }
58
59    /// Returns the number of streams in the set.
60    pub fn len(&self) -> usize {
61        self.map.len()
62    }
63
64    /// Return `true` map contains no elements.
65    pub fn is_empty(&self) -> bool {
66        self.map.is_empty()
67    }
68}
69
70impl<S> FromIterator<S> for StreamSet<S>
71where
72    S: Stream + Send + Unpin + 'static,
73{
74    fn from_iter<I: IntoIterator<Item = S>>(iter: I) -> Self {
75        let mut maps = Self::new();
76        for st in iter {
77            maps.insert(st);
78        }
79        maps
80    }
81}
82
83impl<S> Stream for StreamSet<S>
84where
85    S: Stream + Send + Unpin + 'static,
86{
87    type Item = S::Item;
88
89    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90        self.map
91            .poll_next_unpin(cx)
92            .map(|output| output.map(|(_, item)| item))
93    }
94
95    fn size_hint(&self) -> (usize, Option<usize>) {
96        self.map.size_hint()
97    }
98}