1use std::marker::PhantomData;
2
3use futures::{Stream, Future, IntoFuture, Async};
4use futures::stream::BufferUnordered;
5
6
7struct SwallowErrors<F: Future<Item=(), Error=()>, E>(F, PhantomData<E>);
9
10struct MapSwallowErrors<S: Stream>(S)
13 where S::Item: IntoFuture<Item=(), Error=()>;
14
15
16
17pub struct Listen<S: Stream>
23 where S::Item: IntoFuture<Item=(), Error=()>,
24{
25 buffer: BufferUnordered<MapSwallowErrors<S>>,
26}
27
28pub fn new<S: Stream>(stream: S, limit: usize) -> Listen<S>
29 where S::Item: IntoFuture<Item=(), Error=()>,
30{
31 Listen {
32 buffer: MapSwallowErrors(stream).buffer_unordered(limit),
33 }
34}
35
36impl<S: Stream> Future for Listen<S>
37 where S::Item: IntoFuture<Item=(), Error=()>,
38{
39 type Item = ();
40 type Error = S::Error;
41 fn poll(&mut self) -> Result<Async<()>, S::Error> {
42 loop {
43 match self.buffer.poll()? {
44 Async::Ready(Some(())) => continue,
46 Async::NotReady => return Ok(Async::NotReady),
48 Async::Ready(None) => return Ok(Async::Ready(())),
50 }
51 }
52 }
53}
54
55impl<F: Future<Item=(), Error=()>, E> Future for SwallowErrors<F, E> {
56 type Item = ();
57 type Error = E; #[inline(always)]
60 fn poll(&mut self) -> Result<Async<F::Item>, E> {
61 match self.0.poll() {
62 Ok(x) => Ok(x),
63 Err(()) => Ok(Async::Ready(())),
64 }
65 }
66}
67
68impl<S: Stream> Stream for MapSwallowErrors<S>
69 where S::Item: IntoFuture<Item=(), Error=()>,
70{
71 type Item = SwallowErrors<<S::Item as IntoFuture>::Future, S::Error>;
72 type Error = S::Error;
73
74 #[inline(always)]
75 fn poll(&mut self) -> Result<Async<Option<Self::Item>>, S::Error> {
76 match self.0.poll()? {
77 Async::Ready(None) => Ok(Async::Ready(None)),
78 Async::Ready(Some(f)) => {
79 Ok(Async::Ready(Some(
80 SwallowErrors(f.into_future(), PhantomData)
81 )))
82 }
83 Async::NotReady => Ok(Async::NotReady),
84 }
85 }
86}