async_listen/listen_ext.rs
1use std::io;
2use std::time::Duration;
3
4use async_std::stream::Stream;
5
6use crate::log;
7use crate::sleep;
8use crate::backpressure::{self, Token};
9use crate::byte_stream::ByteStream;
10
11
12/// An extension trait that provides necessary adapters for turning
13/// a stream of `accept()` events into a full-featured connection listener
14///
15pub trait ListenExt: Stream {
16 /// Log errors which aren't transient using user-specified function
17 ///
18 /// The the warning in this context is any error which isn't transient.
19 /// There are no fatal errors (ones which don't allow listener to
20 /// procceed in the future) on any known platform so any error is
21 /// considered a warning. See
22 /// [`is_transient_error`](fn.is_transient_error.html) for more info.
23 ///
24 /// `stream.log_warnings(user_func)` is equivalent of:
25 ///
26 /// ```ignore
27 /// stream.inspect(|res| res.map_err(|e| {
28 /// if !is_transient_error(e) {
29 /// user_func(e);
30 /// }
31 /// })
32 /// ```
33 ///
34 /// # Example
35 ///
36 /// ```no_run
37 /// # use async_std::net::TcpListener;
38 /// # use async_std::prelude::*;
39 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
40 /// #
41 /// use async_listen::ListenExt;
42 ///
43 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
44 /// let mut incoming = listener.incoming()
45 /// .log_warnings(|e| eprintln!("Listening error: {}", e));
46 ///
47 /// while let Some(stream) = incoming.next().await {
48 /// // ...
49 /// }
50 /// #
51 /// # Ok(()) }) }
52 /// ```
53 fn log_warnings<I, F>(self, f: F)
54 -> log::LogWarnings<Self, F>
55 where Self: Stream<Item=Result<I, io::Error>> + Sized,
56 F: FnMut(&io::Error),
57 {
58 log::LogWarnings::new(self, f)
59 }
60
61 /// Handle errors and return infallible stream
62 ///
63 /// There are two types of errors:
64 ///
65 /// * [`transient`](fn.is_transient_error.html) which may be ignored
66 /// * warnings which keep socket in accept queue after the error
67 ///
68 /// We ignore transient errors entirely, and timeout for a `sleep_amount`
69 /// on stick ones.
70 ///
71 /// One example of warning is `EMFILE: too many open files`. In this
72 /// case, if we sleep for some amount, so there is a chance that other
73 /// connection or some file descriptor is closed in the meantime and we
74 /// can accept another connection.
75 ///
76 /// Also in the case of warnings, it's usually a good idea to log them
77 /// (i.e. so file descritor limit or max connection is adjusted by user).
78 /// Use [`log_warnings`](#method.log_warnings) to do this.
79 ///
80 /// `while let Some(s) = stream.handle_errors(d).next().await {...}`
81 /// is equivalent of:
82 ///
83 /// ```ignore
84 /// while let Some(res) = stream.next().await? {
85 /// let s = match res {
86 /// Ok(s) => s,
87 /// Err(e) => {
88 /// if !is_traisient_error(e) {
89 /// task::sleep(d);
90 /// }
91 /// continue;
92 /// }
93 /// };
94 /// # ...
95 /// }
96 /// ```
97 ///
98 /// # Example
99 ///
100 fn handle_errors<I>(self, sleep_on_warning: Duration)
101 -> sleep::HandleErrors<Self>
102 where Self: Stream<Item=Result<I, io::Error>> + Sized,
103 {
104 sleep::HandleErrors::new(self, sleep_on_warning)
105 }
106
107 /// Apply a fixed backpressure to the the stream
108 ///
109 /// The output stream yields pairs of (token, stream). The token must
110 /// be kept alive as long as connection is still alive.
111 ///
112 /// See [`backpressure_wrapper`](#method.backpressure_wrapper) method for
113 /// a simple way of handling backpressure in a common case.
114 ///
115 /// `stream.backpressure(10)` is equivalent of:
116 /// ```ignore
117 /// let (tx, rx) = backpressure::new(10);
118 /// stream
119 /// .apply_backpressure(rx)
120 /// .map(|conn| (tx.token(), conn))
121 /// ```
122 ///
123 /// # Example
124 ///
125 /// ```no_run
126 /// # use std::time::Duration;
127 /// # use async_std::net::{TcpListener, TcpStream};
128 /// # use async_std::prelude::*;
129 /// # use async_std::task;
130 /// # fn main() -> std::io::Result<()> { task::block_on(async {
131 /// #
132 /// use async_listen::ListenExt;
133 ///
134 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
135 /// let mut incoming = listener.incoming()
136 /// .handle_errors(Duration::from_millis(100))
137 /// .backpressure(100);
138 ///
139 /// while let Some((token, stream)) = incoming.next().await {
140 /// task::spawn(async {
141 /// connection_loop(stream).await;
142 /// drop(token);
143 /// });
144 /// }
145 /// # async fn connection_loop(_stream: TcpStream) {
146 /// # }
147 /// #
148 /// # Ok(()) }) }
149 /// ```
150 ///
151 /// *Note:* the `drop` there is not needed you can use either:
152 ///
153 /// * `let _token = token;` inside `async` block, or
154 /// * `connection_loop(&token, stream)`,
155 ///
156 /// To achieve the same result. But `drop(token)` makes it explicit that
157 /// token is dropped only at that point, which is an important property to
158 /// achieve.
159 fn backpressure<I>(self, limit: usize)
160 -> backpressure::BackpressureToken<Self>
161 where Self: Stream<Item=I> + Sized,
162 {
163 let (_tx, rx) = backpressure::new(limit);
164 return backpressure::BackpressureToken::new(self, rx);
165 }
166
167 /// Apply a backpressure object to a stream
168 ///
169 /// This method is different from [`backpressure`](#method.backpressure) in
170 /// two ways:
171 ///
172 /// 1. It doesn't modify stream output
173 /// 2. External backpressure object may be used to change limit at runtime
174 ///
175 /// With the greater power comes greater responsibility, though. Here are
176 /// some things to remember when using the method:
177 ///
178 /// 1. You must create a token for each connection (see example).
179 /// 2. Token *should* be created before yielding to a main loop, otherwise
180 /// limit can be exhausted at times.
181 /// 2. Token should be kept alive as long as the connection is alive.
182 ///
183 /// See [`backpressure_wrapper`](#method.backpressure_wrapper) method for
184 /// a simple way of handling backpressure in a common case.
185 ///
186 /// # Example
187 ///
188 /// ```no_run
189 /// # use std::time::Duration;
190 /// # use async_std::net::{TcpListener, TcpStream};
191 /// # use async_std::prelude::*;
192 /// # use async_std::task;
193 /// # fn main() -> std::io::Result<()> { task::block_on(async {
194 /// #
195 /// use async_listen::ListenExt;
196 /// use async_listen::backpressure;
197 ///
198 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
199 /// let (tx, rx) = backpressure::new(10);
200 /// let mut incoming = listener.incoming()
201 /// .handle_errors(Duration::from_millis(100))
202 /// .apply_backpressure(rx);
203 ///
204 /// while let Some(stream) = incoming.next().await {
205 /// let token = tx.token(); // should be created before spawn
206 /// task::spawn(async {
207 /// connection_loop(stream).await;
208 /// drop(token); // should be dropped after
209 /// });
210 /// }
211 /// # async fn connection_loop(_stream: TcpStream) {
212 /// # }
213 /// #
214 /// # Ok(()) }) }
215 /// ```
216 ///
217 /// *Note:* the `drop` there is not needed you can use either:
218 ///
219 /// * `let _token = token;` inside `async` block, or
220 /// * `connection_loop(&token, stream)`,
221
222 /// To achieve the same result. But `drop(token)` makes it explicit that
223 /// token is dropped only at that point, which is an important property to
224 /// achieve. Also don't create token in async block as it makes
225 /// backpressure enforcing unreliable.
226 fn apply_backpressure<I>(self, backpressure: backpressure::Receiver)
227 -> backpressure::Backpressure<Self>
228 where Self: Stream<Item=I> + Sized,
229 {
230 return backpressure::Backpressure::new(self, backpressure);
231 }
232
233 /// Apply a backpressure object to a stream and yield ByteStream
234 ///
235 /// This method simplifies backpressure handling by hiding the token
236 /// inside the [`ByteStream`](struct.ByteStream.html) structure, so
237 /// it's lifetime is tied to the lifetime of the structure
238 ///
239 /// The wrapper works for `TcpListener` and `UdpListener` and returns
240 /// the same `ByteStream` structure on both of them. This helps working
241 /// with both kinds of sockets in a uniform way.
242 ///
243 /// Wrapping streams might incur tiny performance cost (although, this cast
244 /// is much smaller than cost of system calls involved in working with
245 /// sockets nevertheless). See [`backpressure`](#method.backpressure) and
246 /// [`apply_backpressure`](#method.apply_backpressure) for a wrapper-less
247 /// way of applying backpressure.
248 ///
249 /// # Example
250 ///
251 /// ```no_run
252 /// # use std::time::Duration;
253 /// # use async_std::net::{TcpListener, TcpStream};
254 /// # use async_std::prelude::*;
255 /// # use async_std::task;
256 /// # fn main() -> std::io::Result<()> { task::block_on(async {
257 /// #
258 /// use async_listen::{ListenExt, ByteStream, backpressure};
259 ///
260 /// let listener = TcpListener::bind("127.0.0.1:0").await?;
261 /// let (_, rx) = backpressure::new(10);
262 /// let mut incoming = listener.incoming()
263 /// .handle_errors(Duration::from_millis(100))
264 /// .backpressure_wrapper(rx);
265 ///
266 /// while let Some(stream) = incoming.next().await {
267 /// task::spawn(connection_loop(stream));
268 /// }
269 /// # async fn connection_loop(_stream: ByteStream) {
270 /// # }
271 /// #
272 /// # Ok(()) }) }
273 /// ```
274 ///
275 /// # Notes
276 ///
277 /// The following examples are equivalent:
278 ///
279 /// ```ignore
280 /// let (_, bp) = backpressure::new(100);
281 /// stream.backpressure_wrapper(bp)
282 /// ```
283 ///
284 /// ```ignore
285 /// let (tx, rx) = backpressure::new(100);
286 /// stream.apply_backpressure(rx)
287 /// .map(|stream| ByteStream::from((tx.token(), stream)))
288 /// ```
289 ///
290 /// ```ignore
291 /// stream.backpressure(100)
292 /// .map(ByteStream::from)
293 /// ```
294 ///
295 fn backpressure_wrapper<I>(self, backpressure: backpressure::Receiver)
296 -> backpressure::BackpressureWrapper<Self>
297 where Self: Stream<Item=I> + Sized,
298 ByteStream: From<(Token, I)>,
299 {
300 return backpressure::BackpressureWrapper::new(self, backpressure);
301 }
302}
303
304impl<T: Stream> ListenExt for T {}