async_lease/lib.rs
1//! An asynchronous, atomic option type intended for use with methods that move `self`.
2//!
3//! This module provides `Lease`, a type that acts similarly to an asynchronous `Mutex`, with one
4//! major difference: it expects you to move the leased item _by value_, and then _return it_ when
5//! you are done. You can think of a `Lease` as an atomic, asynchronous `Option` type, in which we
6//! can `take` the value only if no-one else has currently taken it, and where we are notified when
7//! the value has returned so we can try to take it again.
8//!
9//! This type is intended for use with methods that take `self` by value, and _eventually_, at some
10//! later point in time, return that `Self` for future use. This tends to happen particularly often
11//! in future-related contexts. For example, consider the following method for a hypothetical,
12//! non-pipelined connection type:
13//!
14//! ```rust,ignore
15//! impl Connection {
16//! fn get(self, key: i64) -> impl Future<Item = (i64, Self), Error = Error>;
17//! }
18//! ```
19//!
20//! Let's say you want to expose an interface that does _not_ consume `self`, but instead has a
21//! `poll_ready` method that checks whether the connection is ready to receive another request:
22//!
23//! ```rust,ignore
24//! impl MyConnection {
25//! fn poll_ready(&mut self) -> Poll<(), Error = Error>;
26//! fn call(&mut self, key: i64) -> impl Future<Item = i64, Error = Error>;
27//! }
28//! ```
29//!
30//! `Lease` allows you to do this. Specifically, `poll_ready` attempts to acquire the lease using
31//! `Lease::poll_acquire`, and `call` _transfers_ that lease into the returned future. When the
32//! future eventually resolves, we _restore_ the leased value so that `poll_ready` returns `Ready`
33//! again to anyone else who may want to take the value. The example above would thus look like
34//! this:
35//!
36//! ```rust,ignore
37//! impl MyConnection {
38//! fn poll_ready(&mut self) -> Poll<(), Error = Error> {
39//! self.lease.poll_acquire()
40//! }
41//!
42//! fn call(&mut self, key: i64) -> impl Future<Item = i64, Error = Error> {
43//! // We want to transfer the lease into the future
44//! // and leave behind an unacquired lease.
45//! let mut lease = self.lease.transfer();
46//! lease.take().get(key).map(move |(v, connection)| {
47//! // Give back the connection for other callers.
48//! // After this, `poll_ready` may return `Ok(Ready)` again.
49//! lease.restore(connection);
50//! // And yield just the value.
51//! v
52//! })
53//! }
54//! }
55//! ```
56#![deny(
57 unused_extern_crates,
58 missing_debug_implementations,
59 missing_docs,
60 unreachable_pub
61)]
62#![cfg_attr(test, deny(warnings))]
63
64use futures::Async;
65use std::cell::UnsafeCell;
66use std::ops::{Deref, DerefMut};
67use std::sync::Arc;
68use tokio_sync::semaphore;
69
70/// A handle to a leasable value.
71///
72/// Use `poll_acquire` to acquire the lease, `take` to grab the leased value, and `restore` to
73/// return the leased value when you're done with it.
74///
75/// The code will panic if you attempt to access the `S` behind a `Lease` through `Deref` or
76/// `DerefMut` without having acquired the lease through `poll_acquire` first, or if you have
77/// called `take`.
78///
79/// The code will also panic if you attempt to drop a `Lease` without first restoring the leased
80/// value.
81#[derive(Debug)]
82pub struct Lease<T> {
83 inner: Arc<State<T>>,
84 permit: semaphore::Permit,
85}
86
87// As long as T: Send, it's fine to send Lease<T> to other threads.
88// If T was not Send, sending a Lease<T> would be bad, since you can access T through Lease<T>.
89unsafe impl<T> Send for Lease<T> where T: Send {}
90
91#[derive(Debug)]
92struct State<T> {
93 c: UnsafeCell<Option<T>>,
94 s: semaphore::Semaphore,
95}
96
97impl<T> Lease<T> {
98 fn option(&mut self) -> &mut Option<T> {
99 unsafe { &mut *self.inner.c.get() }
100 }
101
102 /// Try to acquire the lease.
103 ///
104 /// If the lease is not available, the current task is notified once it is.
105 pub fn poll_acquire(&mut self) -> Async<()> {
106 self.permit.poll_acquire(&self.inner.s).unwrap_or_else(|_| {
107 // The semaphore was closed. but, we never explicitly close it, and we have a
108 // handle to it through the Arc, which means that this can never happen.
109 unreachable!()
110 })
111 }
112
113 /// Release the lease (if it has been acquired).
114 ///
115 /// This provides a way of "undoing" a call to `poll_ready` should you decide you don't need
116 /// the lease after all, or if you only needed access by reference.
117 ///
118 /// This method will panic if you attempt to release the lease after you have called `take`.
119 pub fn release(&mut self) {
120 if self.permit.is_acquired() {
121 // We need this check in case the reason we get here is that we already hit this
122 // assertion, and `release` is being called _again_ because `self` is being dropped.
123 if !::std::thread::panicking() {
124 assert!(
125 self.option().is_some(),
126 "attempted to release the lease without restoring the value"
127 );
128 }
129 self.permit.release(&self.inner.s);
130 }
131 }
132
133 /// Leave behind a non-acquired lease in place of this one, and return this acquired lease.
134 ///
135 /// This allows you to move a previously acquired lease into another context (like a `Future`)
136 /// where you will later `restore` the leased value.
137 ///
138 /// This method will panic if you attempt to call it without first having acquired the lease.
139 pub fn transfer(&mut self) -> Self {
140 assert!(self.permit.is_acquired());
141 let mut transferred = self.clone();
142 ::std::mem::swap(self, &mut transferred);
143 transferred
144 }
145
146 /// Take the leased value.
147 ///
148 /// This method will panic if you attempt to call it without first having acquired the lease.
149 ///
150 /// Note that you _must_ call `restore` on this lease before you drop it to return the leased
151 /// value to other waiting clients. If you do not, dropping the lease will panic.
152 pub fn take(&mut self) -> T {
153 assert!(self.permit.is_acquired());
154 self.option()
155 .take()
156 .expect("attempted to call take(), but leased value has not been restored")
157 }
158
159 /// Restore the leased value.
160 ///
161 /// This method will panic if you attempt to call it without first having acquired the lease.
162 ///
163 /// Note that once you return the leased value, the lease is no longer considered acquired.
164 pub fn restore(&mut self, state: T) {
165 assert!(self.permit.is_acquired());
166 unsafe { *self.inner.c.get() = Some(state) };
167 // Finally, we can now release the permit since we're done with the connection
168 self.release();
169 }
170}
171
172impl<T> Drop for Lease<T> {
173 fn drop(&mut self) {
174 self.release();
175 }
176}
177
178impl<T> Deref for Lease<T> {
179 type Target = T;
180 fn deref(&self) -> &Self::Target {
181 assert!(self.permit.is_acquired());
182 let s = unsafe { &*self.inner.c.get() };
183 s.as_ref()
184 .expect("attempted to deref lease after calling take()")
185 }
186}
187
188impl<T> DerefMut for Lease<T> {
189 fn deref_mut(&mut self) -> &mut Self::Target {
190 assert!(self.permit.is_acquired());
191 let s = unsafe { &mut *self.inner.c.get() };
192 s.as_mut()
193 .expect("attempted to deref_mut lease after calling take()")
194 }
195}
196
197impl<T> From<T> for Lease<T> {
198 fn from(s: T) -> Self {
199 Self {
200 inner: Arc::new(State {
201 c: UnsafeCell::new(Some(s)),
202 s: semaphore::Semaphore::new(1),
203 }),
204 permit: semaphore::Permit::new(),
205 }
206 }
207}
208
209impl<T> Clone for Lease<T> {
210 fn clone(&self) -> Self {
211 Self {
212 inner: self.inner.clone(),
213 permit: semaphore::Permit::new(),
214 }
215 }
216}
217
218impl<T> Default for Lease<T>
219where
220 T: Default,
221{
222 fn default() -> Self {
223 Self::from(T::default())
224 }
225}