1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
//! An asynchronous, atomic option type intended for use with methods that move `self`. //! //! This module provides `Lease`, a type that acts similarly to an asynchronous `Mutex`, with one //! major difference: it expects you to move the leased item _by value_, and then _return it_ when //! you are done. You can think of a `Lease` as an atomic, asynchronous `Option` type, in which we //! can `take` the value only if no-one else has currently taken it, and where we are notified when //! the value has returned so we can try to take it again. //! //! This type is intended for use with methods that take `self` by value, and _eventually_, at some //! later point in time, return that `Self` for future use. This tends to happen particularly often //! in future-related contexts. For example, consider the following method for a hypothetical, //! non-pipelined connection type: //! //! ```rust,ignore //! impl Connection { //! fn get(self, key: i64) -> impl Future<Item = (i64, Self), Error = Error>; //! } //! ``` //! //! Let's say you want to expose an interface that does _not_ consume `self`, but instead has a //! `poll_ready` method that checks whether the connection is ready to receive another request: //! //! ```rust,ignore //! impl MyConnection { //! fn poll_ready(&mut self) -> Poll<(), Error = Error>; //! fn call(&mut self, key: i64) -> impl Future<Item = i64, Error = Error>; //! } //! ``` //! //! `Lease` allows you to do this. Specifically, `poll_ready` attempts to acquire the lease using //! `Lease::poll_acquire`, and `call` _transfers_ that lease into the returned future. When the //! future eventually resolves, we _restore_ the leased value so that `poll_ready` returns `Ready` //! again to anyone else who may want to take the value. The example above would thus look like //! this: //! //! ```rust,no_run //! # use std::future::Future; //! # use std::task::{Poll, Context}; //! # struct X; impl X { fn get(self, _: i64) -> impl Future<Output = (i64, Self)> { async move { (0, self) } } } //! # struct MyConnection { lease: async_lease::Lease<X> } //! impl MyConnection { //! fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<()> { //! self.lease.poll_acquire(cx) //! } //! //! fn call(&mut self, key: i64) -> impl Future<Output = i64> { //! // We want to transfer the lease into the future //! // and leave behind an unacquired lease. //! let mut lease = self.lease.transfer(); //! let fut = lease.take().get(key); //! async move { //! let (v, connection) = fut.await; //! //! // Give back the connection for other callers. //! // After this, `poll_ready` may return `Ok(Ready)` again. //! lease.restore(connection); //! //! // And yield just the value. //! v //! } //! } //! } //! ``` #![warn( unused_extern_crates, missing_debug_implementations, missing_docs, unreachable_pub )] use std::cell::UnsafeCell; use std::ops::{Deref, DerefMut}; use std::sync::Arc; use std::task::{Context, Poll}; use tokio_sync::semaphore; /// A handle to a leasable value. /// /// Use `poll_acquire` to acquire the lease, `take` to grab the leased value, and `restore` to /// return the leased value when you're done with it. /// /// The code will panic if you attempt to access the `S` behind a `Lease` through `Deref` or /// `DerefMut` without having acquired the lease through `poll_acquire` first, or if you have /// called `take`. /// /// The code will also panic if you attempt to drop a `Lease` without first restoring the leased /// value. #[derive(Debug)] pub struct Lease<T> { inner: Arc<State<T>>, permit: semaphore::Permit, } // As long as T: Send, it's fine to send Lease<T> to other threads. // If T was not Send, sending a Lease<T> would be bad, since you can access T through Lease<T>. unsafe impl<T> Send for Lease<T> where T: Send {} #[derive(Debug)] struct State<T> { c: UnsafeCell<Option<T>>, s: semaphore::Semaphore, } impl<T> Lease<T> { fn option(&mut self) -> &mut Option<T> { unsafe { &mut *self.inner.c.get() } } /// Try to acquire the lease. /// /// If the lease is not available, the current task is notified once it is. pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll<()> { match self.permit.poll_acquire(cx, &self.inner.s) { Poll::Ready(Ok(())) => Poll::Ready(()), Poll::Ready(Err(_)) => { // The semaphore was closed. but, we never explicitly close it, and we have a // handle to it through the Arc, which means that this can never happen. unreachable!() } Poll::Pending => Poll::Pending, } } /// Release the lease (if it has been acquired). /// /// This provides a way of "undoing" a call to `poll_ready` should you decide you don't need /// the lease after all, or if you only needed access by reference. /// /// This method will panic if you attempt to release the lease after you have called `take`. pub fn release(&mut self) { if self.permit.is_acquired() { // We need this check in case the reason we get here is that we already hit this // assertion, and `release` is being called _again_ because `self` is being dropped. if !::std::thread::panicking() { assert!( self.option().is_some(), "attempted to release the lease without restoring the value" ); } self.permit.release(&self.inner.s); } } /// Leave behind a non-acquired lease in place of this one, and return this acquired lease. /// /// This allows you to move a previously acquired lease into another context (like a `Future`) /// where you will later `restore` the leased value. /// /// This method will panic if you attempt to call it without first having acquired the lease. pub fn transfer(&mut self) -> Self { assert!(self.permit.is_acquired()); let mut transferred = self.clone(); ::std::mem::swap(self, &mut transferred); transferred } /// Take the leased value. /// /// This method will panic if you attempt to call it without first having acquired the lease. /// /// Note that you _must_ call `restore` on this lease before you drop it to return the leased /// value to other waiting clients. If you do not, dropping the lease will panic. pub fn take(&mut self) -> T { assert!(self.permit.is_acquired()); self.option() .take() .expect("attempted to call take(), but leased value has not been restored") } /// Restore the leased value. /// /// This method will panic if you attempt to call it without first having acquired the lease. /// /// Note that once you return the leased value, the lease is no longer considered acquired. pub fn restore(&mut self, state: T) { assert!(self.permit.is_acquired()); unsafe { *self.inner.c.get() = Some(state) }; // Finally, we can now release the permit since we're done with the connection self.release(); } } impl<T> Drop for Lease<T> { fn drop(&mut self) { self.release(); } } impl<T> Deref for Lease<T> { type Target = T; fn deref(&self) -> &Self::Target { assert!(self.permit.is_acquired()); let s = unsafe { &*self.inner.c.get() }; s.as_ref() .expect("attempted to deref lease after calling take()") } } impl<T> DerefMut for Lease<T> { fn deref_mut(&mut self) -> &mut Self::Target { assert!(self.permit.is_acquired()); let s = unsafe { &mut *self.inner.c.get() }; s.as_mut() .expect("attempted to deref_mut lease after calling take()") } } impl<T> From<T> for Lease<T> { fn from(s: T) -> Self { Self { inner: Arc::new(State { c: UnsafeCell::new(Some(s)), s: semaphore::Semaphore::new(1), }), permit: semaphore::Permit::new(), } } } impl<T> Clone for Lease<T> { fn clone(&self) -> Self { Self { inner: self.inner.clone(), permit: semaphore::Permit::new(), } } } impl<T> Default for Lease<T> where T: Default, { fn default() -> Self { Self::from(T::default()) } }