async_listen/backpressure.rs
1//! Backpressure handling structures
2//!
3//! The usual way to apply backpressure to a stream is using one of the
4//! [`ListenExt`](../trait.ListenExt.html) trait methods:
5//! * [`backpressure`](../trait.ListenExt.html#method.backpressure)
6//! * [`apply_backpressure`](../trait.ListenExt.html#method.apply_backpressure)
7//! * [`backpressure_wrapper`](../trait.ListenExt.html#method.backpressure_wrapper)
8//!
9//! Also take a look at [`backpressure::new`](fn.new.html) for the low-level
10//! interface.
11//!
12use std::fmt;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::{Arc, Mutex, TryLockError};
16
17use async_std::stream::Stream;
18use async_std::future::Future;
19use async_std::task::{Poll, Context, Waker};
20
21use crate::byte_stream::ByteStream;
22
23
24struct Inner {
25 active: AtomicUsize,
26 limit: AtomicUsize,
27 task: Mutex<Option<Waker>>,
28}
29
30/// A stream adapter that applies backpressure
31///
32/// See
33/// [`ListenExt::backpressure`](../trait.ListenExt.html#method.backpressure)
34/// for more info.
35pub struct BackpressureToken<S>(Backpressure<S>);
36
37/// A stream adapter that applies backpressure and yields ByteStream
38///
39/// See
40/// [`ListenExt::backpressure_wrapper`](../trait.ListenExt.html#method.backpressure_wrapper)
41/// for more info.
42pub struct BackpressureWrapper<S>(Backpressure<S>);
43
44/// A stream adapter that applies backpressure and yields a token
45///
46/// See
47/// [`ListenExt::apply_backpressure`](../trait.ListenExt.html#method.apply_backpressure)
48/// for more info.
49pub struct Backpressure<S> {
50 stream: S,
51 backpressure: Receiver,
52}
53
54/// The throttler of a stream
55///
56/// See [`new`](fn.new.html) for more details
57pub struct Receiver {
58 inner: Arc<Inner>,
59}
60
61/// Future that resolves when there is less that limit tokens alive
62pub struct HasCapacity<'a> {
63 recv: &'a mut Receiver,
64}
65
66/// The handle that controls backpressure
67///
68/// It can be used to create tokens, changing limit and getting metrics.
69///
70/// See [`new`](fn.new.html) for more details
71#[derive(Clone)]
72pub struct Sender {
73 inner: Arc<Inner>,
74}
75
76/// The token which holds onto a single resource item
77///
78/// # Notes on Cloning
79///
80/// After cloning a `Token`, *both* clones have to be dropped to make
81/// backpressure slot available again.
82#[derive(Clone)]
83pub struct Token {
84 inner: Arc<Inner>,
85}
86
87impl<S: Unpin> Unpin for Backpressure<S> {}
88impl<S: Unpin> Unpin for BackpressureToken<S> {}
89impl<S: Unpin> Unpin for BackpressureWrapper<S> {}
90
91impl Sender {
92 /// Acquire a backpressure token
93 ///
94 /// The token holds one unit of resource
95 ///
96 /// *Note:* You can always acquire a token, even if capacity limit reached.
97 pub fn token(&self) -> Token {
98 self.inner.active.fetch_add(1, Ordering::SeqCst);
99 Token {
100 inner: self.inner.clone(),
101 }
102 }
103 /// Change the limit for the number of connections
104 ///
105 /// If limit is increased it's applied immediately. If limit is lowered,
106 /// we can't drop connections. So listening stream is paused until
107 /// there are less then new limit tokens alive (i.e. first dropped
108 /// tokens may not unblock the stream).
109 pub fn set_limit(&self, new_limit: usize) {
110 let old_limit = self.inner.limit.swap(new_limit, Ordering::SeqCst);
111 if old_limit < new_limit {
112 match self.inner.task.try_lock() {
113 Ok(mut guard) => {
114 guard.take().map(|w| w.wake());
115 }
116 Err(TryLockError::WouldBlock) => {
117 // This means either another token is currently waking
118 // up a Receiver. Or Receiver is currently running.
119 // Receiver will recheck values after releasing the Mutex.
120 }
121 Err(TryLockError::Poisoned(_)) => {
122 unreachable!("backpressure lock should never be poisoned");
123 }
124 }
125 }
126 }
127
128 /// Returns the number of currently active tokens
129 ///
130 /// Can return a value larger than limit if tokens are created manually.
131 ///
132 /// This can be used for metrics or debugging. You should not rely on
133 /// this value being in sync. There is also no way to wake-up when this
134 /// value is lower than limit, also see
135 /// [`has_capacity`](struct.Receiver.html#method.has_capacity).
136 pub fn get_active_tokens(&self) -> usize {
137 self.inner.active.load(Ordering::Relaxed)
138 }
139}
140
141impl Receiver {
142 /// Handy to create token in Backpressure wrapper
143 fn token(&self) -> Token {
144 self.inner.active.fetch_add(1, Ordering::SeqCst);
145 Token {
146 inner: self.inner.clone(),
147 }
148 }
149
150 /// Return future which resolves when the current number active of tokens
151 /// is less than a limit
152 ///
153 /// If you create tokens in different task than the task that waits
154 /// on `HasCapacity` there is a race condition.
155 pub fn has_capacity(&mut self) -> HasCapacity {
156 HasCapacity { recv: self }
157 }
158
159 fn poll(&mut self, cx: &mut Context) -> Poll<()> {
160 let limit = self.inner.limit.load(Ordering::Acquire);
161 loop {
162 let active = self.inner.active.load(Ordering::Acquire);
163 if active < limit {
164 return Poll::Ready(());
165 }
166 match self.inner.task.try_lock() {
167 Ok(mut guard) => {
168 *guard = Some(cx.waker().clone());
169 break;
170 }
171 Err(TryLockError::WouldBlock) => {
172 // This means either another token is currently waking
173 // up this receiver, retry
174 //
175 // Note: this looks like a busyloop, but we don't have
176 // anything long/slow behind the mutex. And it's only
177 // executed when limit is reached.
178 continue;
179 }
180 Err(TryLockError::Poisoned(_)) => {
181 unreachable!("backpressure lock should never be poisoned");
182 }
183 }
184 }
185 // Reread the limit after lock is unlocked because
186 // token Drop relies on that
187 let active = self.inner.active.load(Ordering::Acquire);
188 if active < limit {
189 Poll::Ready(())
190 } else {
191 Poll::Pending
192 }
193 }
194}
195
196impl Drop for Token {
197 fn drop(&mut self) {
198 // TODO(tailhook) we could use Acquire for old_ref,
199 // but not sure how safe is it to compare it with a limit
200 let old_ref = self.inner.active.fetch_sub(1, Ordering::SeqCst);
201 let limit = self.inner.limit.load(Ordering::SeqCst);
202 if old_ref == limit {
203 match self.inner.task.try_lock() {
204 Ok(mut guard) => {
205 guard.take().map(|w| w.wake());
206 }
207 Err(TryLockError::WouldBlock) => {
208 // This means either another token is currently waking
209 // up a Receiver. Or Receiver is currently running.
210 // Receiver will recheck values after releasing the Mutex.
211 }
212 Err(TryLockError::Poisoned(_)) => {
213 unreachable!("backpressure lock should never be poisoned");
214 }
215 }
216 }
217 }
218}
219
220impl<S> BackpressureToken<S> {
221 pub(crate) fn new(stream: S, backpressure: Receiver)
222 -> BackpressureToken<S>
223 {
224 BackpressureToken(Backpressure::new(stream, backpressure))
225 }
226
227 /// Acquires a reference to the underlying stream that this adapter is
228 /// pulling from.
229 pub fn get_ref(&self) -> &S {
230 self.0.get_ref()
231 }
232
233 /// Acquires a mutable reference to the underlying stream that this
234 /// adapter is pulling from.
235 pub fn get_mut(&mut self) -> &mut S {
236 self.0.get_mut()
237 }
238
239 /// Consumes this adapter, returning the underlying stream.
240 pub fn into_inner(self) -> S {
241 self.0.into_inner()
242 }
243}
244
245impl<S> BackpressureWrapper<S> {
246 pub(crate) fn new(stream: S, backpressure: Receiver)
247 -> BackpressureWrapper<S>
248 {
249 BackpressureWrapper(Backpressure::new(stream, backpressure))
250 }
251
252 /// Acquires a reference to the underlying stream that this adapter is
253 /// pulling from.
254 pub fn get_ref(&self) -> &S {
255 self.0.get_ref()
256 }
257
258 /// Acquires a mutable reference to the underlying stream that this
259 /// adapter is pulling from.
260 pub fn get_mut(&mut self) -> &mut S {
261 self.0.get_mut()
262 }
263
264 /// Consumes this adapter, returning the underlying stream.
265 pub fn into_inner(self) -> S {
266 self.0.into_inner()
267 }
268}
269
270impl<S> Backpressure<S> {
271 pub(crate) fn new(stream: S, backpressure: Receiver) -> Backpressure<S> {
272 Backpressure { stream, backpressure }
273 }
274
275 /// Acquires a reference to the underlying stream that this adapter is
276 /// pulling from.
277 pub fn get_ref(&self) -> &S {
278 &self.stream
279 }
280
281 /// Acquires a mutable reference to the underlying stream that this
282 /// adapter is pulling from.
283 pub fn get_mut(&mut self) -> &mut S {
284 &mut self.stream
285 }
286
287 /// Consumes this adapter, returning the underlying stream.
288 pub fn into_inner(self) -> S {
289 self.stream
290 }
291}
292
293/// Create a new pair of backpressure structures
294///
295/// These structures are called [`Sender`](struct.Sender.html)
296/// and [`Receiver`](struct.Receiver.html) similar to channels.
297/// The `Receiver` should be used to throttle, either by applying
298/// it to a stream or using it directly. The `Sender` is a way to create
299/// throtting tokens (the stream is paused when there are tokens >= limit),
300/// and to change the limit.
301///
302/// See [`ListenExt`](../trait.ListenExt.html) for example usage
303///
304/// # Direct Use Example
305///
306/// ```no_run
307/// # use std::time::Duration;
308/// # use async_std::net::{TcpListener, TcpStream};
309/// # use async_std::prelude::*;
310/// # use async_std::task;
311/// # fn main() -> std::io::Result<()> { task::block_on(async {
312/// #
313/// use async_listen::ListenExt;
314/// use async_listen::backpressure;
315///
316/// let listener = TcpListener::bind("127.0.0.1:0").await?;
317/// let (tx, mut rx) = backpressure::new(10);
318/// let mut incoming = listener.incoming()
319/// .handle_errors(Duration::from_millis(100));
320///
321/// loop {
322/// rx.has_capacity().await;
323/// let conn = match incoming.next().await {
324/// Some(conn) => conn,
325/// None => break,
326/// };
327/// let token = tx.token(); // should be created before spawn
328/// task::spawn(async {
329/// connection_loop(conn).await;
330/// drop(token); // should be dropped after
331/// });
332/// }
333/// # async fn connection_loop(_stream: TcpStream) {
334/// # }
335/// #
336/// # Ok(()) }) }
337/// ```
338///
339pub fn new(initial_limit: usize) -> (Sender, Receiver) {
340 let inner = Arc::new(Inner {
341 limit: AtomicUsize::new(initial_limit),
342 active: AtomicUsize::new(0),
343 task: Mutex::new(None),
344 });
345 return (
346 Sender {
347 inner: inner.clone(),
348 },
349 Receiver {
350 inner: inner.clone(),
351 },
352 )
353}
354
355impl fmt::Debug for Token {
356 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
357 debug("Token", &self.inner, f)
358 }
359}
360
361impl fmt::Debug for Sender {
362 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363 debug("Sender", &self.inner, f)
364 }
365}
366
367impl fmt::Debug for Receiver {
368 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
369 debug("Receiver", &self.inner, f)
370 }
371}
372
373impl<'a> fmt::Debug for HasCapacity<'a> {
374 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
375 debug("HasCapacity", &self.recv.inner, f)
376 }
377}
378
379fn debug(name: &str, inner: &Arc<Inner>, f: &mut fmt::Formatter)
380 -> fmt::Result
381{
382 let active = inner.active.load(Ordering::Relaxed);
383 let limit = inner.limit.load(Ordering::Relaxed);
384 write!(f, "<{} {}/{}>", name, active, limit)
385}
386
387impl<S: fmt::Debug> fmt::Debug for Backpressure<S> {
388 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
389 f.debug_struct("Backpressure")
390 .field("stream", &self.stream)
391 .field("backpressure", &self.backpressure)
392 .finish()
393 }
394}
395
396impl<S: fmt::Debug> fmt::Debug for BackpressureToken<S> {
397 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398 f.debug_struct("BackpressureToken")
399 .field("stream", &self.0.stream)
400 .field("backpressure", &self.0.backpressure)
401 .finish()
402 }
403}
404
405impl<S: fmt::Debug> fmt::Debug for BackpressureWrapper<S> {
406 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
407 f.debug_struct("BackpressureWrapper")
408 .field("stream", &self.0.stream)
409 .field("backpressure", &self.0.backpressure)
410 .finish()
411 }
412}
413
414impl<I, S> Stream for Backpressure<S>
415 where S: Stream<Item=I> + Unpin
416{
417 type Item = I;
418 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context)
419 -> Poll<Option<Self::Item>>
420 {
421 match self.backpressure.poll(cx) {
422 Poll::Pending => Poll::Pending,
423 Poll::Ready(()) => Pin::new(&mut self.stream).poll_next(cx),
424 }
425 }
426}
427
428impl<'a> Future for HasCapacity<'a> {
429 type Output = ();
430 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
431 self.recv.poll(cx)
432 }
433}
434
435impl<I, S> Stream for BackpressureToken<S>
436 where S: Stream<Item=I> + Unpin
437{
438 type Item = (Token, I);
439 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context)
440 -> Poll<Option<Self::Item>>
441 {
442 Pin::new(&mut self.0)
443 .poll_next(cx)
444 .map(|opt| opt.map(|conn| (self.0.backpressure.token(), conn)))
445 }
446}
447
448impl<I, S> Stream for BackpressureWrapper<S>
449 where S: Stream<Item=I> + Unpin,
450 ByteStream: From<(Token, I)>,
451{
452 type Item = ByteStream;
453 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context)
454 -> Poll<Option<Self::Item>>
455 {
456 Pin::new(&mut self.0)
457 .poll_next(cx)
458 .map(|opt| opt.map(|conn| {
459 ByteStream::from((self.0.backpressure.token(), conn))
460 }))
461 }
462}