access_queue/
lib.rs

1//! [`AccessQueue`], which allows only a certain number of simultaneous accesses to a guarded item.
2//!
3//! This can be useful for implementing backpressure: when accessing the item through the
4//! [`Access`] future, tasks will wait to access the item until others have completed, limiting the
5//! number of accesses that occur at the same time.
6#![deny(warnings, missing_debug_implementations, missing_docs, rust_2018_idioms)]
7use std::future::Future;
8use std::mem::ManuallyDrop;
9use std::ops::Deref;
10use std::pin::Pin;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::SeqCst;
13use std::task::{Context, Poll};
14
15use futures_core::ready;
16use event_listener::{Event, EventListener};
17
18#[cfg(test)]
19mod tests;
20
21/// The AccessQueue which guards access to some item.
22#[derive(Debug)]
23pub struct AccessQueue<T> {
24    count: AtomicUsize,
25    event: Event,
26    inner: T,
27}
28
29impl<T> AccessQueue<T> {
30    /// Construct a new `AccessQueue`, which guards the `inner` value and allows only `count`
31    /// concurrent accesses to occur simultaneously.
32    pub fn new(inner: T, count: usize) -> AccessQueue<T> {
33        AccessQueue {
34            count: AtomicUsize::new(count),
35            event: Event::new(),
36            inner,
37        }
38    }
39
40    /// Block `amt` accesses.
41    ///
42    /// This reduces the number of concurrent accesses to the guarded item that are allowed. Until
43    /// `release` is called, this many accesses are blocked from occurring.
44    ///
45    /// This function returns `true` if it successfully blocked these accesses, and `false` if it
46    /// could not. Blocking only fails if there are not as many accesses left in the queue as the
47    /// caller has attempted to block.
48    pub fn block(&self, amt: usize) -> bool {
49        let mut current = self.count.load(SeqCst);
50        while current >= amt {
51            match self.count.compare_exchange_weak(current, current - amt, SeqCst, SeqCst) {
52                Ok(_)   => return true,
53                Err(n)  => current = n,
54            }
55        }
56        false
57    }
58
59    /// Release `amt` additional accesses.
60    ///
61    /// This increases the number of concurrent accesses to the guarded item that are alloewd. It
62    /// can be paired with `block` to raise and lower the limit.
63    pub fn release(&self, amt: usize) {
64        self.count.fetch_add(amt, SeqCst);
65        self.event.notify_additional(amt);
66    }
67
68    /// Wait in the queue to access the guarded item.
69    pub fn access(&self) -> Access<'_, T> {
70        Access {
71            queue: self,
72            listener: None,
73        }
74    }
75
76    /// Skip the access queue and get a reference to the inner item.
77    ///
78    /// This does not modify the number of simultaneous accesses allowed. It can be useful if the
79    /// AccessQueue is only limited certain patterns of use on the inner item.
80    pub fn skip_queue(&self) -> &T {
81        &self.inner
82    }
83
84    /// Get the inner item mutably.
85    ///
86    /// This requires mutable access to the AccessQueue, guaranteeing that no simultaneous accesses
87    /// are occurring.
88    pub fn get_mut(&mut self) -> &mut T {
89        &mut self.inner
90    }
91}
92
93/// A `Future` of a queued access to the inner item.
94///
95/// This can be constructed from [`AccessQueue::access`]. It is a `Future`, and it resolves to an
96/// [`AccessGuard`], which dereferences to the inner item guarded by the access queue.
97#[derive(Debug)]
98pub struct Access<'a, T> {
99    queue: &'a AccessQueue<T>,
100    listener: Option<EventListener>,
101}
102
103impl<'a, T> Access<'a, T> {
104    /// Access the guarded item without waiting in the `AccessQueue`.
105    ///
106    /// This can be used to access the item without following the limitations on the number of
107    /// allowed concurrent accesses.
108    pub fn skip_queue(&self) -> &T {
109        self.queue.skip_queue()
110    }
111}
112
113impl<'a, T> Future for Access<'a, T> {
114    type Output = AccessGuard<'a, T>;
115
116    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
117        if let Some(listener) = &mut self.listener {
118            ready!(Pin::new(listener).poll(ctx));
119            self.listener = None;
120        }
121
122        while !self.queue.block(1) {
123            match &mut self.listener {
124                Some(listener)  => {
125                    ready!(Pin::new(listener).poll(ctx));
126                    self.listener = None;
127                }
128                None            => {
129                    let mut listener = self.queue.event.listen();
130                    if let Poll::Pending = Pin::new(&mut listener).poll(ctx) {
131                        self.listener = Some(listener);
132                        return Poll::Pending
133                    }
134                }
135            }
136        }
137
138        Poll::Ready(AccessGuard { queue: self.queue })
139    }
140}
141
142/// A resolved access to the guarded item.
143#[derive(Debug)]
144pub struct AccessGuard<'a, T> {
145    queue: &'a AccessQueue<T>,
146}
147
148impl<'a, T> AccessGuard<'a, T> {
149    /// Hold this guard indefinitely, without ever releasing it.
150    ///
151    /// Normaly, when an `AccessGuard` drops, it releases one access in the `AccessQueue` so that
152    /// another `Access` can resolve. If this method is called, instead it is downgraded into a
153    /// normal reference and the access is never released.
154    #[inline]
155    pub fn hold_indefinitely(self) -> &'a T {
156        ManuallyDrop::new(self).queue.skip_queue()
157    }
158
159    /// Release the current access and re-enqueue to wait for another access.
160    #[inline]
161    pub fn reenqueue(self) -> Access<'a, T> {
162        self.queue.release(1);
163        self.hold_and_reenqueue().1
164    }
165
166    /// Hold this access indefinitely while also re-enqueuing
167    #[inline]
168    pub fn hold_and_reenqueue(self) -> (&'a T, Access<'a, T>) {
169        let this = ManuallyDrop::new(self);
170        (this.queue.skip_queue(), this.queue.access())
171    }
172}
173
174impl<'a, T> Deref for AccessGuard<'a, T> {
175    type Target = T;
176
177    fn deref(&self) -> &T {
178        self.queue.skip_queue()
179    }
180}
181
182impl<'a, T> Drop for AccessGuard<'a, T> {
183    fn drop(&mut self) {
184        self.queue.release(1);
185    }
186}
187
188#[allow(dead_code)]
189fn is_send_sync() where AccessQueue<()>: Send + Sync { }