pollable_map/stream/
set.rs1use futures::{Stream, StreamExt};
2use std::pin::Pin;
3
4use super::StreamMap;
5use std::task::{Context, Poll};
6
7pub struct StreamSet<S> {
8 id: i64,
9 map: StreamMap<i64, S>,
10}
11
12impl<S> Default for StreamSet<S>
13where
14 S: Stream + Send + Unpin + 'static,
15{
16 fn default() -> Self {
17 Self::new()
18 }
19}
20
21impl<S> StreamSet<S>
22where
23 S: Stream + Send + Unpin + 'static,
24{
25 pub fn new() -> Self {
27 Self {
28 id: 0,
29 map: StreamMap::default(),
30 }
31 }
32
33 pub fn insert(&mut self, stream: S) -> bool {
35 let id = self.id.wrapping_add(1);
36 self.map.insert(id, stream)
37 }
38
39 pub fn iter(&self) -> impl Iterator<Item = &S> {
41 self.map.iter().map(|(_, st)| st)
42 }
43
44 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut S> {
46 self.map.iter_mut().map(|(_, st)| st)
47 }
48
49 pub fn iter_pin(&mut self) -> impl Iterator<Item = Pin<&mut S>> {
51 self.map.iter_pin().map(|(_, st)| st)
52 }
53
54 pub fn clear(&mut self) {
56 self.map.clear();
57 }
58
59 pub fn len(&self) -> usize {
61 self.map.len()
62 }
63
64 pub fn is_empty(&self) -> bool {
66 self.map.is_empty()
67 }
68}
69
70impl<S> FromIterator<S> for StreamSet<S>
71where
72 S: Stream + Send + Unpin + 'static,
73{
74 fn from_iter<I: IntoIterator<Item = S>>(iter: I) -> Self {
75 let mut maps = Self::new();
76 for st in iter {
77 maps.insert(st);
78 }
79 maps
80 }
81}
82
83impl<S> Stream for StreamSet<S>
84where
85 S: Stream + Send + Unpin + 'static,
86{
87 type Item = S::Item;
88
89 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90 self.map
91 .poll_next_unpin(cx)
92 .map(|output| output.map(|(_, item)| item))
93 }
94
95 fn size_hint(&self) -> (usize, Option<usize>) {
96 self.map.size_hint()
97 }
98}