tk_listen/
listen.rs

1use std::marker::PhantomData;
2
3use futures::{Stream, Future, IntoFuture, Async};
4use futures::stream::BufferUnordered;
5
6
7/// This is basically `.then(|_| Ok(()))`, but we can't name a closure
8struct SwallowErrors<F: Future<Item=(), Error=()>,  E>(F, PhantomData<E>);
9
10/// This is basically `map(|f| f.then(|_| Ok(())))`,
11/// but we can't name a closure
12struct MapSwallowErrors<S: Stream>(S)
13    where S::Item: IntoFuture<Item=(), Error=()>;
14
15
16
17/// A structure returned by `ListenExt::listen`
18///
19/// This is a future that returns when incoming stream has been closed and
20/// all connections (futures) have been processed. It uses `BufferUnordered`
21/// inside to do heavy lifting.
22pub struct Listen<S: Stream>
23    where S::Item: IntoFuture<Item=(), Error=()>,
24{
25    buffer: BufferUnordered<MapSwallowErrors<S>>,
26}
27
28pub fn new<S: Stream>(stream: S, limit: usize) -> Listen<S>
29    where S::Item: IntoFuture<Item=(), Error=()>,
30{
31    Listen {
32        buffer: MapSwallowErrors(stream).buffer_unordered(limit),
33    }
34}
35
36impl<S: Stream> Future for Listen<S>
37    where S::Item: IntoFuture<Item=(), Error=()>,
38{
39    type Item = ();
40    type Error = S::Error;
41    fn poll(&mut self) -> Result<Async<()>, S::Error> {
42        loop {
43            match self.buffer.poll()? {
44                // Some future just finished, let's check for next one
45                Async::Ready(Some(())) => continue,
46                // No future ready
47                Async::NotReady => return Ok(Async::NotReady),
48                // Stream is done
49                Async::Ready(None) => return Ok(Async::Ready(())),
50            }
51        }
52    }
53}
54
55impl<F: Future<Item=(), Error=()>, E> Future for SwallowErrors<F, E> {
56    type Item = ();
57    type Error = E;  // actually void
58
59    #[inline(always)]
60    fn poll(&mut self) -> Result<Async<F::Item>, E> {
61        match self.0.poll() {
62            Ok(x) => Ok(x),
63            Err(()) => Ok(Async::Ready(())),
64        }
65    }
66}
67
68impl<S: Stream> Stream for MapSwallowErrors<S>
69    where S::Item: IntoFuture<Item=(), Error=()>,
70{
71    type Item = SwallowErrors<<S::Item as IntoFuture>::Future, S::Error>;
72    type Error = S::Error;
73
74    #[inline(always)]
75    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, S::Error> {
76        match self.0.poll()? {
77            Async::Ready(None) => Ok(Async::Ready(None)),
78            Async::Ready(Some(f)) => {
79                Ok(Async::Ready(Some(
80                    SwallowErrors(f.into_future(), PhantomData)
81                )))
82            }
83            Async::NotReady => Ok(Async::NotReady),
84        }
85    }
86}