async_lease/
lib.rs

1//! An asynchronous, atomic option type intended for use with methods that move `self`.
2//!
3//! This module provides `Lease`, a type that acts similarly to an asynchronous `Mutex`, with one
4//! major difference: it expects you to move the leased item _by value_, and then _return it_ when
5//! you are done. You can think of a `Lease` as an atomic, asynchronous `Option` type, in which we
6//! can `take` the value only if no-one else has currently taken it, and where we are notified when
7//! the value has returned so we can try to take it again.
8//!
9//! This type is intended for use with methods that take `self` by value, and _eventually_, at some
10//! later point in time, return that `Self` for future use. This tends to happen particularly often
11//! in future-related contexts. For example, consider the following method for a hypothetical,
12//! non-pipelined connection type:
13//!
14//! ```rust,ignore
15//! impl Connection {
16//!     fn get(self, key: i64) -> impl Future<Item = (i64, Self), Error = Error>;
17//! }
18//! ```
19//!
20//! Let's say you want to expose an interface that does _not_ consume `self`, but instead has a
21//! `poll_ready` method that checks whether the connection is ready to receive another request:
22//!
23//! ```rust,ignore
24//! impl MyConnection {
25//!     fn poll_ready(&mut self) -> Poll<(), Error = Error>;
26//!     fn call(&mut self, key: i64) -> impl Future<Item = i64, Error = Error>;
27//! }
28//! ```
29//!
30//! `Lease` allows you to do this. Specifically, `poll_ready` attempts to acquire the lease using
31//! `Lease::poll_acquire`, and `call` _transfers_ that lease into the returned future. When the
32//! future eventually resolves, we _restore_ the leased value so that `poll_ready` returns `Ready`
33//! again to anyone else who may want to take the value. The example above would thus look like
34//! this:
35//!
36//! ```rust,ignore
37//! impl MyConnection {
38//!     fn poll_ready(&mut self) -> Poll<(), Error = Error> {
39//!         self.lease.poll_acquire()
40//!     }
41//!
42//!     fn call(&mut self, key: i64) -> impl Future<Item = i64, Error = Error> {
43//!         // We want to transfer the lease into the future
44//!         // and leave behind an unacquired lease.
45//!         let mut lease = self.lease.transfer();
46//!         lease.take().get(key).map(move |(v, connection)| {
47//!             // Give back the connection for other callers.
48//!             // After this, `poll_ready` may return `Ok(Ready)` again.
49//!             lease.restore(connection);
50//!             // And yield just the value.
51//!             v
52//!         })
53//!     }
54//! }
55//! ```
56#![deny(
57    unused_extern_crates,
58    missing_debug_implementations,
59    missing_docs,
60    unreachable_pub
61)]
62#![cfg_attr(test, deny(warnings))]
63
64use futures::Async;
65use std::cell::UnsafeCell;
66use std::ops::{Deref, DerefMut};
67use std::sync::Arc;
68use tokio_sync::semaphore;
69
70/// A handle to a leasable value.
71///
72/// Use `poll_acquire` to acquire the lease, `take` to grab the leased value, and `restore` to
73/// return the leased value when you're done with it.
74///
75/// The code will panic if you attempt to access the `S` behind a `Lease` through `Deref` or
76/// `DerefMut` without having acquired the lease through `poll_acquire` first, or if you have
77/// called `take`.
78///
79/// The code will also panic if you attempt to drop a `Lease` without first restoring the leased
80/// value.
81#[derive(Debug)]
82pub struct Lease<T> {
83    inner: Arc<State<T>>,
84    permit: semaphore::Permit,
85}
86
87// As long as T: Send, it's fine to send Lease<T> to other threads.
88// If T was not Send, sending a Lease<T> would be bad, since you can access T through Lease<T>.
89unsafe impl<T> Send for Lease<T> where T: Send {}
90
91#[derive(Debug)]
92struct State<T> {
93    c: UnsafeCell<Option<T>>,
94    s: semaphore::Semaphore,
95}
96
97impl<T> Lease<T> {
98    fn option(&mut self) -> &mut Option<T> {
99        unsafe { &mut *self.inner.c.get() }
100    }
101
102    /// Try to acquire the lease.
103    ///
104    /// If the lease is not available, the current task is notified once it is.
105    pub fn poll_acquire(&mut self) -> Async<()> {
106        self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
107            // The semaphore was closed. but, we never explicitly close it, and we have a
108            // handle to it through the Arc, which means that this can never happen.
109            unreachable!()
110        })
111    }
112
113    /// Release the lease (if it has been acquired).
114    ///
115    /// This provides a way of "undoing" a call to `poll_ready` should you decide you don't need
116    /// the lease after all, or if you only needed access by reference.
117    ///
118    /// This method will panic if you attempt to release the lease after you have called `take`.
119    pub fn release(&mut self) {
120        if self.permit.is_acquired() {
121            // We need this check in case the reason we get here is that we already hit this
122            // assertion, and `release` is being called _again_ because `self` is being dropped.
123            if !::std::thread::panicking() {
124                assert!(
125                    self.option().is_some(),
126                    "attempted to release the lease without restoring the value"
127                );
128            }
129            self.permit.release(&self.inner.s);
130        }
131    }
132
133    /// Leave behind a non-acquired lease in place of this one, and return this acquired lease.
134    ///
135    /// This allows you to move a previously acquired lease into another context (like a `Future`)
136    /// where you will later `restore` the leased value.
137    ///
138    /// This method will panic if you attempt to call it without first having acquired the lease.
139    pub fn transfer(&mut self) -> Self {
140        assert!(self.permit.is_acquired());
141        let mut transferred = self.clone();
142        ::std::mem::swap(self, &mut transferred);
143        transferred
144    }
145
146    /// Take the leased value.
147    ///
148    /// This method will panic if you attempt to call it without first having acquired the lease.
149    ///
150    /// Note that you _must_ call `restore` on this lease before you drop it to return the leased
151    /// value to other waiting clients. If you do not, dropping the lease will panic.
152    pub fn take(&mut self) -> T {
153        assert!(self.permit.is_acquired());
154        self.option()
155            .take()
156            .expect("attempted to call take(), but leased value has not been restored")
157    }
158
159    /// Restore the leased value.
160    ///
161    /// This method will panic if you attempt to call it without first having acquired the lease.
162    ///
163    /// Note that once you return the leased value, the lease is no longer considered acquired.
164    pub fn restore(&mut self, state: T) {
165        assert!(self.permit.is_acquired());
166        unsafe { *self.inner.c.get() = Some(state) };
167        // Finally, we can now release the permit since we're done with the connection
168        self.release();
169    }
170}
171
172impl<T> Drop for Lease<T> {
173    fn drop(&mut self) {
174        self.release();
175    }
176}
177
178impl<T> Deref for Lease<T> {
179    type Target = T;
180    fn deref(&self) -> &Self::Target {
181        assert!(self.permit.is_acquired());
182        let s = unsafe { &*self.inner.c.get() };
183        s.as_ref()
184            .expect("attempted to deref lease after calling take()")
185    }
186}
187
188impl<T> DerefMut for Lease<T> {
189    fn deref_mut(&mut self) -> &mut Self::Target {
190        assert!(self.permit.is_acquired());
191        let s = unsafe { &mut *self.inner.c.get() };
192        s.as_mut()
193            .expect("attempted to deref_mut lease after calling take()")
194    }
195}
196
197impl<T> From<T> for Lease<T> {
198    fn from(s: T) -> Self {
199        Self {
200            inner: Arc::new(State {
201                c: UnsafeCell::new(Some(s)),
202                s: semaphore::Semaphore::new(1),
203            }),
204            permit: semaphore::Permit::new(),
205        }
206    }
207}
208
209impl<T> Clone for Lease<T> {
210    fn clone(&self) -> Self {
211        Self {
212            inner: self.inner.clone(),
213            permit: semaphore::Permit::new(),
214        }
215    }
216}
217
218impl<T> Default for Lease<T>
219where
220    T: Default,
221{
222    fn default() -> Self {
223        Self::from(T::default())
224    }
225}