async_listen/
listen_ext.rs

1use std::io;
2use std::time::Duration;
3
4use async_std::stream::Stream;
5
6use crate::log;
7use crate::sleep;
8use crate::backpressure::{self, Token};
9use crate::byte_stream::ByteStream;
10
11
12/// An extension trait that provides necessary adapters for turning
13/// a stream of `accept()` events into a full-featured connection listener
14///
15pub trait ListenExt: Stream {
16    /// Log errors which aren't transient using user-specified function
17    ///
18    /// The the warning in this context is any error which isn't transient.
19    /// There are no fatal errors (ones which don't allow listener to
20    /// procceed in the future) on any known platform so any error is
21    /// considered a warning. See
22    /// [`is_transient_error`](fn.is_transient_error.html) for more info.
23    ///
24    /// `stream.log_warnings(user_func)` is equivalent of:
25    ///
26    /// ```ignore
27    /// stream.inspect(|res| res.map_err(|e| {
28    ///     if !is_transient_error(e) {
29    ///         user_func(e);
30    ///     }
31    /// })
32    /// ```
33    ///
34    /// # Example
35    ///
36    /// ```no_run
37    /// # use async_std::net::TcpListener;
38    /// # use async_std::prelude::*;
39    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
40    /// #
41    /// use async_listen::ListenExt;
42    ///
43    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
44    /// let mut incoming = listener.incoming()
45    ///     .log_warnings(|e| eprintln!("Listening error: {}", e));
46    ///
47    /// while let Some(stream) = incoming.next().await {
48    ///     // ...
49    /// }
50    /// #
51    /// # Ok(()) }) }
52    /// ```
53    fn log_warnings<I, F>(self, f: F)
54        -> log::LogWarnings<Self, F>
55        where Self: Stream<Item=Result<I, io::Error>> + Sized,
56              F: FnMut(&io::Error),
57    {
58        log::LogWarnings::new(self, f)
59    }
60
61    /// Handle errors and return infallible stream
62    ///
63    /// There are two types of errors:
64    ///
65    /// * [`transient`](fn.is_transient_error.html) which may be ignored
66    /// * warnings which keep socket in accept queue after the error
67    ///
68    /// We ignore transient errors entirely, and timeout for a `sleep_amount`
69    /// on stick ones.
70    ///
71    /// One example of warning is `EMFILE: too many open files`. In this
72    /// case, if we sleep for some amount, so there is a chance that other
73    /// connection or some file descriptor is closed in the meantime and we
74    /// can accept another connection.
75    ///
76    /// Also in the case of warnings, it's usually a good idea to log them
77    /// (i.e. so file descritor limit or max connection is adjusted by user).
78    /// Use [`log_warnings`](#method.log_warnings) to do this.
79    ///
80    /// `while let Some(s) = stream.handle_errors(d).next().await {...}`
81    /// is equivalent of:
82    ///
83    /// ```ignore
84    /// while let Some(res) = stream.next().await? {
85    ///     let s = match res {
86    ///         Ok(s) => s,
87    ///         Err(e) => {
88    ///             if !is_traisient_error(e) {
89    ///                 task::sleep(d);
90    ///             }
91    ///             continue;
92    ///         }
93    ///     };
94    ///     # ...
95    /// }
96    /// ```
97    ///
98    /// # Example
99    ///
100    fn handle_errors<I>(self, sleep_on_warning: Duration)
101        -> sleep::HandleErrors<Self>
102        where Self: Stream<Item=Result<I, io::Error>> + Sized,
103    {
104        sleep::HandleErrors::new(self, sleep_on_warning)
105    }
106
107    /// Apply a fixed backpressure to the the stream
108    ///
109    /// The output stream yields pairs of (token, stream). The token must
110    /// be kept alive as long as connection is still alive.
111    ///
112    /// See [`backpressure_wrapper`](#method.backpressure_wrapper) method for
113    /// a simple way of handling backpressure in a common case.
114    ///
115    /// `stream.backpressure(10)` is equivalent of:
116    /// ```ignore
117    /// let (tx, rx) = backpressure::new(10);
118    /// stream
119    ///     .apply_backpressure(rx)
120    ///     .map(|conn| (tx.token(), conn))
121    /// ```
122    ///
123    /// # Example
124    ///
125    /// ```no_run
126    /// # use std::time::Duration;
127    /// # use async_std::net::{TcpListener, TcpStream};
128    /// # use async_std::prelude::*;
129    /// # use async_std::task;
130    /// # fn main() -> std::io::Result<()> { task::block_on(async {
131    /// #
132    /// use async_listen::ListenExt;
133    ///
134    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
135    /// let mut incoming = listener.incoming()
136    ///     .handle_errors(Duration::from_millis(100))
137    ///     .backpressure(100);
138    ///
139    /// while let Some((token, stream)) = incoming.next().await {
140    ///     task::spawn(async {
141    ///         connection_loop(stream).await;
142    ///         drop(token);
143    ///     });
144    /// }
145    /// # async fn connection_loop(_stream: TcpStream) {
146    /// # }
147    /// #
148    /// # Ok(()) }) }
149    /// ```
150    ///
151    /// *Note:* the `drop` there is not needed you can use either:
152    ///
153    /// * `let _token = token;` inside `async` block, or
154    /// * `connection_loop(&token, stream)`,
155    ///
156    /// To achieve the same result. But `drop(token)` makes it explicit that
157    /// token is dropped only at that point, which is an important property to
158    /// achieve.
159    fn backpressure<I>(self, limit: usize)
160        -> backpressure::BackpressureToken<Self>
161        where Self: Stream<Item=I> + Sized,
162    {
163        let (_tx, rx) = backpressure::new(limit);
164        return backpressure::BackpressureToken::new(self, rx);
165    }
166
167    /// Apply a backpressure object to a stream
168    ///
169    /// This method is different from [`backpressure`](#method.backpressure) in
170    /// two ways:
171    ///
172    /// 1. It doesn't modify stream output
173    /// 2. External backpressure object may be used to change limit at runtime
174    ///
175    /// With the greater power comes greater responsibility, though. Here are
176    /// some things to remember when using the method:
177    ///
178    /// 1. You must create a token for each connection (see example).
179    /// 2. Token *should* be created before yielding to a main loop, otherwise
180    ///    limit can be exhausted at times.
181    /// 2. Token should be kept alive as long as the connection is alive.
182    ///
183    /// See [`backpressure_wrapper`](#method.backpressure_wrapper) method for
184    /// a simple way of handling backpressure in a common case.
185    ///
186    /// # Example
187    ///
188    /// ```no_run
189    /// # use std::time::Duration;
190    /// # use async_std::net::{TcpListener, TcpStream};
191    /// # use async_std::prelude::*;
192    /// # use async_std::task;
193    /// # fn main() -> std::io::Result<()> { task::block_on(async {
194    /// #
195    /// use async_listen::ListenExt;
196    /// use async_listen::backpressure;
197    ///
198    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
199    /// let (tx, rx) = backpressure::new(10);
200    /// let mut incoming = listener.incoming()
201    ///     .handle_errors(Duration::from_millis(100))
202    ///     .apply_backpressure(rx);
203    ///
204    /// while let Some(stream) = incoming.next().await {
205    ///     let token = tx.token();  // should be created before spawn
206    ///     task::spawn(async {
207    ///         connection_loop(stream).await;
208    ///         drop(token);  // should be dropped after
209    ///     });
210    /// }
211    /// # async fn connection_loop(_stream: TcpStream) {
212    /// # }
213    /// #
214    /// # Ok(()) }) }
215    /// ```
216    ///
217    /// *Note:* the `drop` there is not needed you can use either:
218    ///
219    /// * `let _token = token;` inside `async` block, or
220    /// * `connection_loop(&token, stream)`,
221
222    /// To achieve the same result. But `drop(token)` makes it explicit that
223    /// token is dropped only at that point, which is an important property to
224    /// achieve. Also don't create token in async block as it makes
225    /// backpressure enforcing unreliable.
226    fn apply_backpressure<I>(self, backpressure: backpressure::Receiver)
227        -> backpressure::Backpressure<Self>
228        where Self: Stream<Item=I> + Sized,
229    {
230        return backpressure::Backpressure::new(self, backpressure);
231    }
232
233    /// Apply a backpressure object to a stream and yield ByteStream
234    ///
235    /// This method simplifies backpressure handling by hiding the token
236    /// inside the [`ByteStream`](struct.ByteStream.html) structure, so
237    /// it's lifetime is tied to the lifetime of the structure
238    ///
239    /// The wrapper works for `TcpListener` and `UdpListener` and returns
240    /// the same `ByteStream` structure on both of them. This helps working
241    /// with both kinds of sockets in a uniform way.
242    ///
243    /// Wrapping streams might incur tiny performance cost (although, this cast
244    /// is much smaller than cost of system calls involved in working with
245    /// sockets nevertheless). See [`backpressure`](#method.backpressure) and
246    /// [`apply_backpressure`](#method.apply_backpressure) for a wrapper-less
247    /// way of applying backpressure.
248    ///
249    /// # Example
250    ///
251    /// ```no_run
252    /// # use std::time::Duration;
253    /// # use async_std::net::{TcpListener, TcpStream};
254    /// # use async_std::prelude::*;
255    /// # use async_std::task;
256    /// # fn main() -> std::io::Result<()> { task::block_on(async {
257    /// #
258    /// use async_listen::{ListenExt, ByteStream, backpressure};
259    ///
260    /// let listener = TcpListener::bind("127.0.0.1:0").await?;
261    /// let (_, rx) = backpressure::new(10);
262    /// let mut incoming = listener.incoming()
263    ///     .handle_errors(Duration::from_millis(100))
264    ///     .backpressure_wrapper(rx);
265    ///
266    /// while let Some(stream) = incoming.next().await {
267    ///     task::spawn(connection_loop(stream));
268    /// }
269    /// # async fn connection_loop(_stream: ByteStream) {
270    /// # }
271    /// #
272    /// # Ok(()) }) }
273    /// ```
274    ///
275    /// # Notes
276    ///
277    /// The following examples are equivalent:
278    ///
279    /// ```ignore
280    /// let (_, bp) = backpressure::new(100);
281    /// stream.backpressure_wrapper(bp)
282    /// ```
283    ///
284    /// ```ignore
285    /// let (tx, rx) = backpressure::new(100);
286    /// stream.apply_backpressure(rx)
287    ///     .map(|stream| ByteStream::from((tx.token(), stream)))
288    /// ```
289    ///
290    /// ```ignore
291    /// stream.backpressure(100)
292    ///     .map(ByteStream::from)
293    /// ```
294    ///
295    fn backpressure_wrapper<I>(self, backpressure: backpressure::Receiver)
296        -> backpressure::BackpressureWrapper<Self>
297        where Self: Stream<Item=I> + Sized,
298              ByteStream: From<(Token, I)>,
299    {
300        return backpressure::BackpressureWrapper::new(self, backpressure);
301    }
302}
303
304impl<T: Stream> ListenExt for T {}