futures_bounded/
stream_set.rs

1use futures_util::stream::BoxStream;
2use futures_util::Stream;
3use std::task::{ready, Context, Poll};
4
5use crate::{Delay, PushError, StreamMap, Timeout};
6
7/// Represents a set of [Stream]s.
8///
9/// Each stream must finish within the specified time and the list never outgrows its capacity.
10pub struct StreamSet<O> {
11    id: u32,
12    inner: StreamMap<u32, O>,
13}
14
15impl<O> StreamSet<O> {
16    pub fn new(make_delay: impl Fn() -> Delay + Send + Sync + 'static, capacity: usize) -> Self {
17        Self {
18            id: 0,
19            inner: StreamMap::new(make_delay, capacity),
20        }
21    }
22}
23
24impl<O> StreamSet<O>
25where
26    O: Send + 'static,
27{
28    /// Push a stream into the list.
29    ///
30    /// This method adds the given stream to the list.
31    /// If the length of the list is equal to the capacity, this method returns a error that contains the passed stream.
32    /// In that case, the stream is not added to the set.
33    pub fn try_push<F>(&mut self, stream: F) -> Result<(), BoxStream<O>>
34    where
35        F: Stream<Item = O> + Send + 'static,
36    {
37        self.id = self.id.wrapping_add(1);
38
39        match self.inner.try_push(self.id, stream) {
40            Ok(()) => Ok(()),
41            Err(PushError::BeyondCapacity(w)) => Err(w),
42            Err(PushError::Replaced(_)) => unreachable!("we never reuse IDs"),
43        }
44    }
45
46    pub fn len(&self) -> usize {
47        self.inner.len()
48    }
49
50    pub fn is_empty(&self) -> bool {
51        self.inner.is_empty()
52    }
53
54    pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> {
55        self.inner.poll_ready_unpin(cx)
56    }
57
58    pub fn poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<O, Timeout>>> {
59        let (_, res) = ready!(self.inner.poll_next_unpin(cx));
60
61        Poll::Ready(res)
62    }
63}