access_queue/lib.rs
1//! [`AccessQueue`], which allows only a certain number of simultaneous accesses to a guarded item.
2//!
3//! This can be useful for implementing backpressure: when accessing the item through the
4//! [`Access`] future, tasks will wait to access the item until others have completed, limiting the
5//! number of accesses that occur at the same time.
6#![deny(warnings, missing_debug_implementations, missing_docs, rust_2018_idioms)]
7use std::future::Future;
8use std::mem::ManuallyDrop;
9use std::ops::Deref;
10use std::pin::Pin;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::SeqCst;
13use std::task::{Context, Poll};
14
15use futures_core::ready;
16use event_listener::{Event, EventListener};
17
18#[cfg(test)]
19mod tests;
20
21/// The AccessQueue which guards access to some item.
22#[derive(Debug)]
23pub struct AccessQueue<T> {
24 count: AtomicUsize,
25 event: Event,
26 inner: T,
27}
28
29impl<T> AccessQueue<T> {
30 /// Construct a new `AccessQueue`, which guards the `inner` value and allows only `count`
31 /// concurrent accesses to occur simultaneously.
32 pub fn new(inner: T, count: usize) -> AccessQueue<T> {
33 AccessQueue {
34 count: AtomicUsize::new(count),
35 event: Event::new(),
36 inner,
37 }
38 }
39
40 /// Block `amt` accesses.
41 ///
42 /// This reduces the number of concurrent accesses to the guarded item that are allowed. Until
43 /// `release` is called, this many accesses are blocked from occurring.
44 ///
45 /// This function returns `true` if it successfully blocked these accesses, and `false` if it
46 /// could not. Blocking only fails if there are not as many accesses left in the queue as the
47 /// caller has attempted to block.
48 pub fn block(&self, amt: usize) -> bool {
49 let mut current = self.count.load(SeqCst);
50 while current >= amt {
51 match self.count.compare_exchange_weak(current, current - amt, SeqCst, SeqCst) {
52 Ok(_) => return true,
53 Err(n) => current = n,
54 }
55 }
56 false
57 }
58
59 /// Release `amt` additional accesses.
60 ///
61 /// This increases the number of concurrent accesses to the guarded item that are alloewd. It
62 /// can be paired with `block` to raise and lower the limit.
63 pub fn release(&self, amt: usize) {
64 self.count.fetch_add(amt, SeqCst);
65 self.event.notify_additional(amt);
66 }
67
68 /// Wait in the queue to access the guarded item.
69 pub fn access(&self) -> Access<'_, T> {
70 Access {
71 queue: self,
72 listener: None,
73 }
74 }
75
76 /// Skip the access queue and get a reference to the inner item.
77 ///
78 /// This does not modify the number of simultaneous accesses allowed. It can be useful if the
79 /// AccessQueue is only limited certain patterns of use on the inner item.
80 pub fn skip_queue(&self) -> &T {
81 &self.inner
82 }
83
84 /// Get the inner item mutably.
85 ///
86 /// This requires mutable access to the AccessQueue, guaranteeing that no simultaneous accesses
87 /// are occurring.
88 pub fn get_mut(&mut self) -> &mut T {
89 &mut self.inner
90 }
91}
92
93/// A `Future` of a queued access to the inner item.
94///
95/// This can be constructed from [`AccessQueue::access`]. It is a `Future`, and it resolves to an
96/// [`AccessGuard`], which dereferences to the inner item guarded by the access queue.
97#[derive(Debug)]
98pub struct Access<'a, T> {
99 queue: &'a AccessQueue<T>,
100 listener: Option<EventListener>,
101}
102
103impl<'a, T> Access<'a, T> {
104 /// Access the guarded item without waiting in the `AccessQueue`.
105 ///
106 /// This can be used to access the item without following the limitations on the number of
107 /// allowed concurrent accesses.
108 pub fn skip_queue(&self) -> &T {
109 self.queue.skip_queue()
110 }
111}
112
113impl<'a, T> Future for Access<'a, T> {
114 type Output = AccessGuard<'a, T>;
115
116 fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
117 if let Some(listener) = &mut self.listener {
118 ready!(Pin::new(listener).poll(ctx));
119 self.listener = None;
120 }
121
122 while !self.queue.block(1) {
123 match &mut self.listener {
124 Some(listener) => {
125 ready!(Pin::new(listener).poll(ctx));
126 self.listener = None;
127 }
128 None => {
129 let mut listener = self.queue.event.listen();
130 if let Poll::Pending = Pin::new(&mut listener).poll(ctx) {
131 self.listener = Some(listener);
132 return Poll::Pending
133 }
134 }
135 }
136 }
137
138 Poll::Ready(AccessGuard { queue: self.queue })
139 }
140}
141
142/// A resolved access to the guarded item.
143#[derive(Debug)]
144pub struct AccessGuard<'a, T> {
145 queue: &'a AccessQueue<T>,
146}
147
148impl<'a, T> AccessGuard<'a, T> {
149 /// Hold this guard indefinitely, without ever releasing it.
150 ///
151 /// Normaly, when an `AccessGuard` drops, it releases one access in the `AccessQueue` so that
152 /// another `Access` can resolve. If this method is called, instead it is downgraded into a
153 /// normal reference and the access is never released.
154 #[inline]
155 pub fn hold_indefinitely(self) -> &'a T {
156 ManuallyDrop::new(self).queue.skip_queue()
157 }
158
159 /// Release the current access and re-enqueue to wait for another access.
160 #[inline]
161 pub fn reenqueue(self) -> Access<'a, T> {
162 self.queue.release(1);
163 self.hold_and_reenqueue().1
164 }
165
166 /// Hold this access indefinitely while also re-enqueuing
167 #[inline]
168 pub fn hold_and_reenqueue(self) -> (&'a T, Access<'a, T>) {
169 let this = ManuallyDrop::new(self);
170 (this.queue.skip_queue(), this.queue.access())
171 }
172}
173
174impl<'a, T> Deref for AccessGuard<'a, T> {
175 type Target = T;
176
177 fn deref(&self) -> &T {
178 self.queue.skip_queue()
179 }
180}
181
182impl<'a, T> Drop for AccessGuard<'a, T> {
183 fn drop(&mut self) {
184 self.queue.release(1);
185 }
186}
187
188#[allow(dead_code)]
189fn is_send_sync() where AccessQueue<()>: Send + Sync { }