futures_turnstyle/
lib.rs

1// Copyright (c) 2018 Nuclear Furnace
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in all
11// copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
19// SOFTWARE.
20//! Turnstyles are a way to provide futures-aware gated access in a sequential fashion.
21//!
22//! Callers will "join" the queue, and receive a future that will complete once they make it
23//! through the turnstyle.  The turnstyle is controlled externally by some coordinator, and can
24//! "turn" it to let the next waiter in line through.  Thus, you can have multiple waiters, which
25//! can join the line at any time, and allow the coordinator to continually admit them through.
26//!
27//! This can be used to synchronize access to a resource, or to provide a synchronization point
28//! to repeatedly calculation.
29//!
30//! One example is a network daemon that reloads its configuration and reestablishes all of its
31//! listening sockets.  Normally, you might join (using `select`) both the listening socket
32//! future and a close future so that you can shutdown the socket when its no longer required.
33//!
34//! With a turnstyle, you can join the queue every time you reload the configuration, and then
35//! share that future to all of the newly created listeners.  Once the new listeners are ready, you
36//! also do one turn on the turnstyle, which signals the last waiter in line -- a future shared
37//! with all of the old listeners -- that they can now shutdown.  That same turnstyle can perform
38//! this over and over without issue.
39//!
40//! Turnstyles internally protect themselves via a `Mutex` but are fast enough in normal cases that
41//! you can `join` or `turn` from within a future without fear of stalling the executor.  If you're
42//! joining at an extremely high frequency, you could potentially cause performance degradation.
43extern crate futures;
44
45use futures::{prelude::*, sync::oneshot};
46use std::{
47    collections::VecDeque,
48    sync::{Arc, Mutex, atomic::AtomicUsize, atomic::Ordering::SeqCst},
49};
50
51/// A future that waits to be notified, based on its place in line.
52pub struct Waiter {
53    inner: oneshot::Receiver<usize>,
54}
55
56impl Future for Waiter {
57    type Error = ();
58    type Item = usize;
59
60    fn poll(&mut self) -> Poll<Self::Item, Self::Error> { self.inner.poll().map_err(|_| ()) }
61}
62
63/// An ordered queue of waiting participants.
64///
65/// Every turn of the turnstyle, the next participant in queue is notified and removed from the
66/// queue.  If the queue is empty, `turn` is a noop.  Waiters receive their all-time position
67/// through the turnstyle as their item i.e. the first waiter receives 0, the second receives 1,
68/// etc.
69///
70/// `Turnstyle`s can be cloned and are safe to share across threads.  When a `Turnstyle` is
71/// dropped, all of its waiters will be notified.
72#[derive(Clone)]
73pub struct Turnstyle {
74    waiters: Arc<Mutex<VecDeque<(usize, oneshot::Sender<usize>)>>>,
75    version: Arc<AtomicUsize>,
76}
77
78impl Turnstyle {
79    /// Creates a new, empty turnstyle.
80    pub fn new() -> Turnstyle {
81        Turnstyle {
82            waiters: Arc::new(Mutex::new(VecDeque::new())),
83            version: Arc::new(AtomicUsize::new(0)),
84        }
85    }
86
87    /// Joins the waiting queue.
88    ///
89    /// Returns a tuple of (`usize`, `Waiter`) to the caller.  The `usize` represents the waiter's
90    /// ticket in the queue since creation of the turnstyle, and the `Waiter` is a future that will
91    /// complete when the turnstyle turns and reaches the caller's position in the queue.
92    pub fn join(&self) -> (usize, Waiter) {
93        let version = self.version.fetch_add(1, SeqCst);
94        let (tx, rx) = oneshot::channel();
95        {
96            let mut waiters = self.waiters.lock().expect("turnstyle unable to join line");
97            waiters.push_back((version, tx));
98        }
99
100        (version, Waiter { inner: rx })
101    }
102
103    /// Turns once, letting a single waiter through.
104    ///
105    /// The `Waiter` is notified by the future completing.  The function returns `true` if a waiter
106    /// was found/notified, `false` otherwise.
107    pub fn turn(&self) -> bool {
108        let waiter = {
109            let mut waiters = self.waiters.lock().unwrap();
110            waiters.pop_front()
111        };
112
113        if let Some((v, w)) = waiter {
114            // We drop the result of the send here because our waiter may have disappeared.  This
115            // is fine, because we don't care about notifying _at least_ one waiter, only about
116            // making sure we've removed one waiter from the queue, whether we truly notified
117            // someone or not.
118            let _ = w.send(v);
119            true
120        } else {
121            false
122        }
123    }
124}
125
126impl Drop for Turnstyle {
127    fn drop(&mut self) {
128        while self.turn() {}
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::Turnstyle;
135    use futures::{future, Future, Async};
136
137    #[test]
138    fn single_waiter() {
139        future::lazy(|| {
140            let ts = Turnstyle::new();
141
142            let (_, mut w) = ts.join();
143            assert!(!w.poll().unwrap().is_ready());
144
145            ts.turn();
146            assert!(w.poll().unwrap().is_ready());
147
148            future::ok::<_, ()>(())
149        }).wait()
150            .unwrap();
151    }
152
153    #[test]
154    fn multiple_waiters() {
155        future::lazy(|| {
156            let ts = Turnstyle::new();
157            let (_, mut w1) = ts.join();
158            let (_, mut w2) = ts.join();
159            let (_, mut w3) = ts.join();
160
161            assert!(!w1.poll().unwrap().is_ready());
162            assert!(!w2.poll().unwrap().is_ready());
163            assert!(!w2.poll().unwrap().is_ready());
164
165            ts.turn();
166            assert!(w1.poll().unwrap().is_ready());
167            assert!(!w2.poll().unwrap().is_ready());
168            assert!(!w3.poll().unwrap().is_ready());
169
170            ts.turn();
171            assert!(w2.poll().unwrap().is_ready());
172            assert!(!w3.poll().unwrap().is_ready());
173
174            ts.turn();
175            assert!(w3.poll().unwrap().is_ready());
176
177            future::ok::<_, ()>(())
178        }).wait()
179            .unwrap();
180    }
181
182    #[test]
183    fn versions() {
184        future::lazy(|| {
185            let ts = Turnstyle::new();
186            let (v1, mut w1) = ts.join();
187            let (v2, mut w2) = ts.join();
188            let (v3, mut w3) = ts.join();
189
190            assert_eq!(v1, 0);
191            assert_eq!(v2, 1);
192            assert_eq!(v3, 2);
193
194            ts.turn();
195            ts.turn();
196            ts.turn();
197
198            if let Async::Ready(w1v) = w1.poll().unwrap() {
199                assert_eq!(w1v, 0);
200            } else {
201                panic!("waiter 1 was not ready");
202            }
203
204            if let Async::Ready(w2v) = w2.poll().unwrap() {
205                assert_eq!(w2v, 1);
206            } else {
207                panic!("waiter 2 was not ready");
208            }
209
210            if let Async::Ready(w3v) = w3.poll().unwrap() {
211                assert_eq!(w3v, 2);
212            } else {
213                panic!("waiter 3 was not ready");
214            }
215
216            future::ok::<_, ()>(())
217        }).wait()
218            .unwrap();
219    }
220
221    #[test]
222    fn on_drop() {
223        future::lazy(|| {
224            let ts = Turnstyle::new();
225            let (_, mut w1) = ts.join();
226            let (_, mut w2) = ts.join();
227            let (_, mut w3) = ts.join();
228
229            assert!(!w1.poll().unwrap().is_ready());
230            assert!(!w2.poll().unwrap().is_ready());
231            assert!(!w2.poll().unwrap().is_ready());
232
233            drop(ts);
234
235            assert!(w1.poll().unwrap().is_ready());
236            assert!(w2.poll().unwrap().is_ready());
237            assert!(w3.poll().unwrap().is_ready());
238
239            future::ok::<_, ()>(())
240        }).wait()
241            .unwrap();
242    }
243}