async_listen/
backpressure.rs

1//! Backpressure handling structures
2//!
3//! The usual way to apply backpressure to a stream is using one of the
4//! [`ListenExt`](../trait.ListenExt.html) trait methods:
5//! * [`backpressure`](../trait.ListenExt.html#method.backpressure)
6//! * [`apply_backpressure`](../trait.ListenExt.html#method.apply_backpressure)
7//! * [`backpressure_wrapper`](../trait.ListenExt.html#method.backpressure_wrapper)
8//!
9//! Also take a look at [`backpressure::new`](fn.new.html) for the low-level
10//! interface.
11//!
12use std::fmt;
13use std::pin::Pin;
14use std::sync::atomic::{AtomicUsize, Ordering};
15use std::sync::{Arc, Mutex, TryLockError};
16
17use async_std::stream::Stream;
18use async_std::future::Future;
19use async_std::task::{Poll, Context, Waker};
20
21use crate::byte_stream::ByteStream;
22
23
24struct Inner {
25    active: AtomicUsize,
26    limit: AtomicUsize,
27    task: Mutex<Option<Waker>>,
28}
29
30/// A stream adapter that applies backpressure
31///
32/// See
33/// [`ListenExt::backpressure`](../trait.ListenExt.html#method.backpressure)
34/// for more info.
35pub struct BackpressureToken<S>(Backpressure<S>);
36
37/// A stream adapter that applies backpressure and yields ByteStream
38///
39/// See
40/// [`ListenExt::backpressure_wrapper`](../trait.ListenExt.html#method.backpressure_wrapper)
41/// for more info.
42pub struct BackpressureWrapper<S>(Backpressure<S>);
43
44/// A stream adapter that applies backpressure and yields a token
45///
46/// See
47/// [`ListenExt::apply_backpressure`](../trait.ListenExt.html#method.apply_backpressure)
48/// for more info.
49pub struct Backpressure<S> {
50    stream: S,
51    backpressure: Receiver,
52}
53
54/// The throttler of a stream
55///
56/// See [`new`](fn.new.html) for more details
57pub struct Receiver {
58    inner: Arc<Inner>,
59}
60
61/// Future that resolves when there is less that limit tokens alive
62pub struct HasCapacity<'a> {
63    recv: &'a mut Receiver,
64}
65
66/// The handle that controls backpressure
67///
68/// It can be used to create tokens, changing limit and getting metrics.
69///
70/// See [`new`](fn.new.html) for more details
71#[derive(Clone)]
72pub struct Sender {
73    inner: Arc<Inner>,
74}
75
76/// The token which holds onto a single resource item
77///
78/// # Notes on Cloning
79///
80/// After cloning a `Token`, *both* clones have to be dropped to make
81/// backpressure slot available again.
82#[derive(Clone)]
83pub struct Token {
84    inner: Arc<Inner>,
85}
86
87impl<S: Unpin> Unpin for Backpressure<S> {}
88impl<S: Unpin> Unpin for BackpressureToken<S> {}
89impl<S: Unpin> Unpin for BackpressureWrapper<S> {}
90
91impl Sender {
92    /// Acquire a backpressure token
93    ///
94    /// The token holds one unit of resource
95    ///
96    /// *Note:* You can always acquire a token, even if capacity limit reached.
97    pub fn token(&self) -> Token {
98        self.inner.active.fetch_add(1, Ordering::SeqCst);
99        Token {
100            inner: self.inner.clone(),
101        }
102    }
103    /// Change the limit for the number of connections
104    ///
105    /// If limit is increased it's applied immediately. If limit is lowered,
106    /// we can't drop connections. So listening stream is paused until
107    /// there are less then new limit tokens alive (i.e. first dropped
108    /// tokens may not unblock the stream).
109    pub fn set_limit(&self, new_limit: usize) {
110        let old_limit = self.inner.limit.swap(new_limit, Ordering::SeqCst);
111        if old_limit < new_limit {
112            match self.inner.task.try_lock() {
113                Ok(mut guard) => {
114                    guard.take().map(|w| w.wake());
115                }
116                Err(TryLockError::WouldBlock) => {
117                    // This means either another token is currently waking
118                    // up a Receiver. Or Receiver is currently running.
119                    // Receiver will recheck values after releasing the Mutex.
120                }
121                Err(TryLockError::Poisoned(_)) => {
122                    unreachable!("backpressure lock should never be poisoned");
123                }
124            }
125        }
126    }
127
128    /// Returns the number of currently active tokens
129    ///
130    /// Can return a value larger than limit if tokens are created manually.
131    ///
132    /// This can be used for metrics or debugging. You should not rely on
133    /// this value being in sync. There is also no way to wake-up when this
134    /// value is lower than limit, also see
135    /// [`has_capacity`](struct.Receiver.html#method.has_capacity).
136    pub fn get_active_tokens(&self) -> usize {
137        self.inner.active.load(Ordering::Relaxed)
138    }
139}
140
141impl Receiver {
142    /// Handy to create token in Backpressure wrapper
143    fn token(&self) -> Token {
144        self.inner.active.fetch_add(1, Ordering::SeqCst);
145        Token {
146            inner: self.inner.clone(),
147        }
148    }
149
150    /// Return future which resolves when the current number active of tokens
151    /// is less than a limit
152    ///
153    /// If you create tokens in different task than the task that waits
154    /// on `HasCapacity` there is a race condition.
155    pub fn has_capacity(&mut self) -> HasCapacity {
156        HasCapacity { recv: self }
157    }
158
159    fn poll(&mut self, cx: &mut Context) -> Poll<()> {
160        let limit = self.inner.limit.load(Ordering::Acquire);
161        loop {
162            let active = self.inner.active.load(Ordering::Acquire);
163            if active < limit {
164                return Poll::Ready(());
165            }
166            match self.inner.task.try_lock() {
167                Ok(mut guard) => {
168                    *guard = Some(cx.waker().clone());
169                    break;
170                }
171                Err(TryLockError::WouldBlock) => {
172                    // This means either another token is currently waking
173                    // up this receiver, retry
174                    //
175                    // Note: this looks like a busyloop, but we don't have
176                    // anything long/slow behind the mutex. And it's only
177                    // executed when limit is reached.
178                    continue;
179                }
180                Err(TryLockError::Poisoned(_)) => {
181                    unreachable!("backpressure lock should never be poisoned");
182                }
183            }
184        }
185        // Reread the limit after lock is unlocked because
186        // token Drop relies on that
187        let active = self.inner.active.load(Ordering::Acquire);
188        if active < limit {
189            Poll::Ready(())
190        } else {
191            Poll::Pending
192        }
193    }
194}
195
196impl Drop for Token {
197    fn drop(&mut self) {
198        // TODO(tailhook) we could use Acquire for old_ref,
199        // but not sure how safe is it to compare it with a limit
200        let old_ref = self.inner.active.fetch_sub(1, Ordering::SeqCst);
201        let limit = self.inner.limit.load(Ordering::SeqCst);
202        if old_ref == limit {
203            match self.inner.task.try_lock() {
204                Ok(mut guard) => {
205                    guard.take().map(|w| w.wake());
206                }
207                Err(TryLockError::WouldBlock) => {
208                    // This means either another token is currently waking
209                    // up a Receiver. Or Receiver is currently running.
210                    // Receiver will recheck values after releasing the Mutex.
211                }
212                Err(TryLockError::Poisoned(_)) => {
213                    unreachable!("backpressure lock should never be poisoned");
214                }
215            }
216        }
217    }
218}
219
220impl<S> BackpressureToken<S> {
221    pub(crate) fn new(stream: S, backpressure: Receiver)
222        -> BackpressureToken<S>
223    {
224        BackpressureToken(Backpressure::new(stream, backpressure))
225    }
226
227    /// Acquires a reference to the underlying stream that this adapter is
228    /// pulling from.
229    pub fn get_ref(&self) -> &S {
230        self.0.get_ref()
231    }
232
233    /// Acquires a mutable reference to the underlying stream that this
234    /// adapter is pulling from.
235    pub fn get_mut(&mut self) -> &mut S {
236        self.0.get_mut()
237    }
238
239    /// Consumes this adapter, returning the underlying stream.
240    pub fn into_inner(self) -> S {
241        self.0.into_inner()
242    }
243}
244
245impl<S> BackpressureWrapper<S> {
246    pub(crate) fn new(stream: S, backpressure: Receiver)
247        -> BackpressureWrapper<S>
248    {
249        BackpressureWrapper(Backpressure::new(stream, backpressure))
250    }
251
252    /// Acquires a reference to the underlying stream that this adapter is
253    /// pulling from.
254    pub fn get_ref(&self) -> &S {
255        self.0.get_ref()
256    }
257
258    /// Acquires a mutable reference to the underlying stream that this
259    /// adapter is pulling from.
260    pub fn get_mut(&mut self) -> &mut S {
261        self.0.get_mut()
262    }
263
264    /// Consumes this adapter, returning the underlying stream.
265    pub fn into_inner(self) -> S {
266        self.0.into_inner()
267    }
268}
269
270impl<S> Backpressure<S> {
271    pub(crate) fn new(stream: S, backpressure: Receiver) -> Backpressure<S> {
272        Backpressure { stream, backpressure }
273    }
274
275    /// Acquires a reference to the underlying stream that this adapter is
276    /// pulling from.
277    pub fn get_ref(&self) -> &S {
278        &self.stream
279    }
280
281    /// Acquires a mutable reference to the underlying stream that this
282    /// adapter is pulling from.
283    pub fn get_mut(&mut self) -> &mut S {
284        &mut self.stream
285    }
286
287    /// Consumes this adapter, returning the underlying stream.
288    pub fn into_inner(self) -> S {
289        self.stream
290    }
291}
292
293/// Create a new pair of backpressure structures
294///
295/// These structures are called [`Sender`](struct.Sender.html)
296/// and [`Receiver`](struct.Receiver.html) similar to channels.
297/// The `Receiver` should be used to throttle, either by applying
298/// it to a stream or using it directly. The `Sender` is a way to create
299/// throtting tokens (the stream is paused when there are tokens >= limit),
300/// and to change the limit.
301///
302/// See [`ListenExt`](../trait.ListenExt.html) for example usage
303///
304/// # Direct Use Example
305///
306/// ```no_run
307/// # use std::time::Duration;
308/// # use async_std::net::{TcpListener, TcpStream};
309/// # use async_std::prelude::*;
310/// # use async_std::task;
311/// # fn main() -> std::io::Result<()> { task::block_on(async {
312/// #
313/// use async_listen::ListenExt;
314/// use async_listen::backpressure;
315///
316/// let listener = TcpListener::bind("127.0.0.1:0").await?;
317/// let (tx, mut rx) = backpressure::new(10);
318/// let mut incoming = listener.incoming()
319///     .handle_errors(Duration::from_millis(100));
320///
321/// loop {
322///     rx.has_capacity().await;
323///     let conn = match incoming.next().await {
324///         Some(conn) => conn,
325///         None => break,
326///     };
327///     let token = tx.token();  // should be created before spawn
328///     task::spawn(async {
329///         connection_loop(conn).await;
330///         drop(token);  // should be dropped after
331///     });
332/// }
333/// # async fn connection_loop(_stream: TcpStream) {
334/// # }
335/// #
336/// # Ok(()) }) }
337/// ```
338///
339pub fn new(initial_limit: usize) -> (Sender, Receiver) {
340    let inner = Arc::new(Inner {
341        limit: AtomicUsize::new(initial_limit),
342        active: AtomicUsize::new(0),
343        task: Mutex::new(None),
344    });
345    return (
346        Sender {
347            inner: inner.clone(),
348        },
349        Receiver {
350            inner: inner.clone(),
351        },
352    )
353}
354
355impl fmt::Debug for Token {
356    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
357        debug("Token", &self.inner, f)
358    }
359}
360
361impl fmt::Debug for Sender {
362    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
363        debug("Sender", &self.inner, f)
364    }
365}
366
367impl fmt::Debug for Receiver {
368    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
369        debug("Receiver", &self.inner, f)
370    }
371}
372
373impl<'a> fmt::Debug for HasCapacity<'a> {
374    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
375        debug("HasCapacity", &self.recv.inner, f)
376    }
377}
378
379fn debug(name: &str, inner: &Arc<Inner>, f: &mut fmt::Formatter)
380    -> fmt::Result
381{
382    let active = inner.active.load(Ordering::Relaxed);
383    let limit = inner.limit.load(Ordering::Relaxed);
384    write!(f, "<{} {}/{}>", name, active, limit)
385}
386
387impl<S: fmt::Debug> fmt::Debug for Backpressure<S> {
388    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
389        f.debug_struct("Backpressure")
390            .field("stream", &self.stream)
391            .field("backpressure", &self.backpressure)
392            .finish()
393    }
394}
395
396impl<S: fmt::Debug> fmt::Debug for BackpressureToken<S> {
397    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
398        f.debug_struct("BackpressureToken")
399            .field("stream", &self.0.stream)
400            .field("backpressure", &self.0.backpressure)
401            .finish()
402    }
403}
404
405impl<S: fmt::Debug> fmt::Debug for BackpressureWrapper<S> {
406    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
407        f.debug_struct("BackpressureWrapper")
408            .field("stream", &self.0.stream)
409            .field("backpressure", &self.0.backpressure)
410            .finish()
411    }
412}
413
414impl<I, S> Stream for Backpressure<S>
415    where S: Stream<Item=I> + Unpin
416{
417    type Item = I;
418    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context)
419        -> Poll<Option<Self::Item>>
420    {
421        match self.backpressure.poll(cx) {
422            Poll::Pending => Poll::Pending,
423            Poll::Ready(()) => Pin::new(&mut self.stream).poll_next(cx),
424        }
425    }
426}
427
428impl<'a> Future for HasCapacity<'a> {
429    type Output = ();
430    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
431        self.recv.poll(cx)
432    }
433}
434
435impl<I, S> Stream for BackpressureToken<S>
436    where S: Stream<Item=I> + Unpin
437{
438    type Item = (Token, I);
439    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context)
440        -> Poll<Option<Self::Item>>
441    {
442        Pin::new(&mut self.0)
443        .poll_next(cx)
444        .map(|opt| opt.map(|conn| (self.0.backpressure.token(), conn)))
445    }
446}
447
448impl<I, S> Stream for BackpressureWrapper<S>
449    where S: Stream<Item=I> + Unpin,
450          ByteStream: From<(Token, I)>,
451{
452    type Item = ByteStream;
453    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context)
454        -> Poll<Option<Self::Item>>
455    {
456        Pin::new(&mut self.0)
457        .poll_next(cx)
458        .map(|opt| opt.map(|conn| {
459            ByteStream::from((self.0.backpressure.token(), conn))
460        }))
461    }
462}