futures_turnstyle/lib.rs
1// Copyright (c) 2018 Nuclear Furnace
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in all
11// copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19// SOFTWARE.
20//! Turnstyles are a way to provide futures-aware gated access in a sequential fashion.
21//!
22//! Callers will "join" the queue, and receive a future that will complete once they make it
23//! through the turnstyle. The turnstyle is controlled externally by some coordinator, and can
24//! "turn" it to let the next waiter in line through. Thus, you can have multiple waiters, which
25//! can join the line at any time, and allow the coordinator to continually admit them through.
26//!
27//! This can be used to synchronize access to a resource, or to provide a synchronization point
28//! to repeatedly calculation.
29//!
30//! One example is a network daemon that reloads its configuration and reestablishes all of its
31//! listening sockets. Normally, you might join (using `select`) both the listening socket
32//! future and a close future so that you can shutdown the socket when its no longer required.
33//!
34//! With a turnstyle, you can join the queue every time you reload the configuration, and then
35//! share that future to all of the newly created listeners. Once the new listeners are ready, you
36//! also do one turn on the turnstyle, which signals the last waiter in line -- a future shared
37//! with all of the old listeners -- that they can now shutdown. That same turnstyle can perform
38//! this over and over without issue.
39//!
40//! Turnstyles internally protect themselves via a `Mutex` but are fast enough in normal cases that
41//! you can `join` or `turn` from within a future without fear of stalling the executor. If you're
42//! joining at an extremely high frequency, you could potentially cause performance degradation.
43extern crate futures;
44
45use futures::{prelude::*, sync::oneshot};
46use std::{
47 collections::VecDeque,
48 sync::{Arc, Mutex, atomic::AtomicUsize, atomic::Ordering::SeqCst},
49};
50
51/// A future that waits to be notified, based on its place in line.
52pub struct Waiter {
53 inner: oneshot::Receiver<usize>,
54}
55
56impl Future for Waiter {
57 type Error = ();
58 type Item = usize;
59
60 fn poll(&mut self) -> Poll<Self::Item, Self::Error> { self.inner.poll().map_err(|_| ()) }
61}
62
63/// An ordered queue of waiting participants.
64///
65/// Every turn of the turnstyle, the next participant in queue is notified and removed from the
66/// queue. If the queue is empty, `turn` is a noop. Waiters receive their all-time position
67/// through the turnstyle as their item i.e. the first waiter receives 0, the second receives 1,
68/// etc.
69///
70/// `Turnstyle`s can be cloned and are safe to share across threads. When a `Turnstyle` is
71/// dropped, all of its waiters will be notified.
72#[derive(Clone)]
73pub struct Turnstyle {
74 waiters: Arc<Mutex<VecDeque<(usize, oneshot::Sender<usize>)>>>,
75 version: Arc<AtomicUsize>,
76}
77
78impl Turnstyle {
79 /// Creates a new, empty turnstyle.
80 pub fn new() -> Turnstyle {
81 Turnstyle {
82 waiters: Arc::new(Mutex::new(VecDeque::new())),
83 version: Arc::new(AtomicUsize::new(0)),
84 }
85 }
86
87 /// Joins the waiting queue.
88 ///
89 /// Returns a tuple of (`usize`, `Waiter`) to the caller. The `usize` represents the waiter's
90 /// ticket in the queue since creation of the turnstyle, and the `Waiter` is a future that will
91 /// complete when the turnstyle turns and reaches the caller's position in the queue.
92 pub fn join(&self) -> (usize, Waiter) {
93 let version = self.version.fetch_add(1, SeqCst);
94 let (tx, rx) = oneshot::channel();
95 {
96 let mut waiters = self.waiters.lock().expect("turnstyle unable to join line");
97 waiters.push_back((version, tx));
98 }
99
100 (version, Waiter { inner: rx })
101 }
102
103 /// Turns once, letting a single waiter through.
104 ///
105 /// The `Waiter` is notified by the future completing. The function returns `true` if a waiter
106 /// was found/notified, `false` otherwise.
107 pub fn turn(&self) -> bool {
108 let waiter = {
109 let mut waiters = self.waiters.lock().unwrap();
110 waiters.pop_front()
111 };
112
113 if let Some((v, w)) = waiter {
114 // We drop the result of the send here because our waiter may have disappeared. This
115 // is fine, because we don't care about notifying _at least_ one waiter, only about
116 // making sure we've removed one waiter from the queue, whether we truly notified
117 // someone or not.
118 let _ = w.send(v);
119 true
120 } else {
121 false
122 }
123 }
124}
125
126impl Drop for Turnstyle {
127 fn drop(&mut self) {
128 while self.turn() {}
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::Turnstyle;
135 use futures::{future, Future, Async};
136
137 #[test]
138 fn single_waiter() {
139 future::lazy(|| {
140 let ts = Turnstyle::new();
141
142 let (_, mut w) = ts.join();
143 assert!(!w.poll().unwrap().is_ready());
144
145 ts.turn();
146 assert!(w.poll().unwrap().is_ready());
147
148 future::ok::<_, ()>(())
149 }).wait()
150 .unwrap();
151 }
152
153 #[test]
154 fn multiple_waiters() {
155 future::lazy(|| {
156 let ts = Turnstyle::new();
157 let (_, mut w1) = ts.join();
158 let (_, mut w2) = ts.join();
159 let (_, mut w3) = ts.join();
160
161 assert!(!w1.poll().unwrap().is_ready());
162 assert!(!w2.poll().unwrap().is_ready());
163 assert!(!w2.poll().unwrap().is_ready());
164
165 ts.turn();
166 assert!(w1.poll().unwrap().is_ready());
167 assert!(!w2.poll().unwrap().is_ready());
168 assert!(!w3.poll().unwrap().is_ready());
169
170 ts.turn();
171 assert!(w2.poll().unwrap().is_ready());
172 assert!(!w3.poll().unwrap().is_ready());
173
174 ts.turn();
175 assert!(w3.poll().unwrap().is_ready());
176
177 future::ok::<_, ()>(())
178 }).wait()
179 .unwrap();
180 }
181
182 #[test]
183 fn versions() {
184 future::lazy(|| {
185 let ts = Turnstyle::new();
186 let (v1, mut w1) = ts.join();
187 let (v2, mut w2) = ts.join();
188 let (v3, mut w3) = ts.join();
189
190 assert_eq!(v1, 0);
191 assert_eq!(v2, 1);
192 assert_eq!(v3, 2);
193
194 ts.turn();
195 ts.turn();
196 ts.turn();
197
198 if let Async::Ready(w1v) = w1.poll().unwrap() {
199 assert_eq!(w1v, 0);
200 } else {
201 panic!("waiter 1 was not ready");
202 }
203
204 if let Async::Ready(w2v) = w2.poll().unwrap() {
205 assert_eq!(w2v, 1);
206 } else {
207 panic!("waiter 2 was not ready");
208 }
209
210 if let Async::Ready(w3v) = w3.poll().unwrap() {
211 assert_eq!(w3v, 2);
212 } else {
213 panic!("waiter 3 was not ready");
214 }
215
216 future::ok::<_, ()>(())
217 }).wait()
218 .unwrap();
219 }
220
221 #[test]
222 fn on_drop() {
223 future::lazy(|| {
224 let ts = Turnstyle::new();
225 let (_, mut w1) = ts.join();
226 let (_, mut w2) = ts.join();
227 let (_, mut w3) = ts.join();
228
229 assert!(!w1.poll().unwrap().is_ready());
230 assert!(!w2.poll().unwrap().is_ready());
231 assert!(!w2.poll().unwrap().is_ready());
232
233 drop(ts);
234
235 assert!(w1.poll().unwrap().is_ready());
236 assert!(w2.poll().unwrap().is_ready());
237 assert!(w3.poll().unwrap().is_ready());
238
239 future::ok::<_, ()>(())
240 }).wait()
241 .unwrap();
242 }
243}