futures_bounded/
stream_set.rs1use futures_util::stream::BoxStream;
2use futures_util::Stream;
3use std::task::{ready, Context, Poll};
4
5use crate::{Delay, PushError, StreamMap, Timeout};
6
7pub struct StreamSet<O> {
11 id: u32,
12 inner: StreamMap<u32, O>,
13}
14
15impl<O> StreamSet<O> {
16 pub fn new(make_delay: impl Fn() -> Delay + Send + Sync + 'static, capacity: usize) -> Self {
17 Self {
18 id: 0,
19 inner: StreamMap::new(make_delay, capacity),
20 }
21 }
22}
23
24impl<O> StreamSet<O>
25where
26 O: Send + 'static,
27{
28 pub fn try_push<F>(&mut self, stream: F) -> Result<(), BoxStream<O>>
34 where
35 F: Stream<Item = O> + Send + 'static,
36 {
37 self.id = self.id.wrapping_add(1);
38
39 match self.inner.try_push(self.id, stream) {
40 Ok(()) => Ok(()),
41 Err(PushError::BeyondCapacity(w)) => Err(w),
42 Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
43 }
44 }
45
46 pub fn len(&self) -> usize {
47 self.inner.len()
48 }
49
50 pub fn is_empty(&self) -> bool {
51 self.inner.is_empty()
52 }
53
54 pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
55 self.inner.poll_ready_unpin(cx)
56 }
57
58 pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<O, Timeout>>> {
59 let (_, res) = ready!(self.inner.poll_next_unpin(cx));
60
61 Poll::Ready(res)
62 }
63}