pollable_map/stream/
set.rs1use futures::{Stream, StreamExt};
2use std::pin::Pin;
3
4use super::StreamMap;
5use futures::stream::FusedStream;
6use std::task::{Context, Poll};
7
8pub struct StreamSet<S> {
9 id: i64,
10 map: StreamMap<i64, S>,
11}
12
13impl<S> Default for StreamSet<S>
14where
15 S: Stream + Send + Unpin + 'static,
16{
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl<S> StreamSet<S>
23where
24 S: Stream + Send + Unpin + 'static,
25{
26 pub fn new() -> Self {
28 Self {
29 id: 0,
30 map: StreamMap::default(),
31 }
32 }
33
34 pub fn insert(&mut self, stream: S) -> bool {
36 self.id = self.id.wrapping_add(1);
37 self.map.insert(self.id, stream)
38 }
39
40 pub fn iter(&self) -> impl Iterator<Item = &S> {
42 self.map.iter().map(|(_, st)| st)
43 }
44
45 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut S> {
47 self.map.iter_mut().map(|(_, st)| st)
48 }
49
50 pub fn iter_pin(&mut self) -> impl Iterator<Item = Pin<&mut S>> {
52 self.map.iter_pin().map(|(_, st)| st)
53 }
54
55 pub fn clear(&mut self) {
57 self.map.clear();
58 }
59
60 pub fn len(&self) -> usize {
62 self.map.len()
63 }
64
65 pub fn is_empty(&self) -> bool {
67 self.map.is_empty()
68 }
69}
70
71impl<S> FromIterator<S> for StreamSet<S>
72where
73 S: Stream + Send + Unpin + 'static,
74{
75 fn from_iter<I: IntoIterator<Item = S>>(iter: I) -> Self {
76 let mut maps = Self::new();
77 for st in iter {
78 maps.insert(st);
79 }
80 maps
81 }
82}
83
84impl<S> Stream for StreamSet<S>
85where
86 S: Stream + Send + Unpin + 'static,
87{
88 type Item = S::Item;
89
90 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
91 self.map
92 .poll_next_unpin(cx)
93 .map(|output| output.map(|(_, item)| item))
94 }
95
96 fn size_hint(&self) -> (usize, Option<usize>) {
97 self.map.size_hint()
98 }
99}
100
101impl<S> FusedStream for StreamSet<S>
102where
103 S: Stream + Send + Unpin + 'static,
104{
105 fn is_terminated(&self) -> bool {
106 self.map.is_terminated()
107 }
108}
109
110#[cfg(test)]
111mod test {
112 use crate::stream::set::StreamSet;
113 use futures::StreamExt;
114
115 #[test]
116 fn valid_stream_set() {
117 let mut list = StreamSet::new();
118 assert!(list.insert(futures::stream::once(async { 0 }).boxed()));
119 assert!(list.insert(futures::stream::once(async { 1 }).boxed()));
120
121 futures::executor::block_on(async move {
122 let val = list.next().await;
123 assert_eq!(val, Some(0));
124 let val = list.next().await;
125 assert_eq!(val, Some(1));
126 });
127 }
128}