owned_future/
lib.rs

1//!
2//! This tiny crate contains helpers to turn a borrowed future into an owned one.
3//!
4//! # Motivation
5//!
6//! Take [`tokio::sync::Notify`] as an example. A common paradigm is to call [`Notify::notified`]
7//! before a relevant threading update/check, then perform the update/check, and then wait on the
8//! resulting [`Notified`] future. Doing this guarantees the `Notified` is watching for calls to
9//! [`Notify::notify_waiters`] prior to the update/check. This paradigm would be useful when dealing
10//! with thread spawning (i.e. calling `notified` and moving the resulting future into the thread),
11//! but this isn't possible with `notified` as it borrows the `Notify`.
12//!
13//! ```compile_fail
14//! use std::sync::Arc;
15//! use tokio::sync::Notify;
16//!
17//! let notify = Arc::new(Notify::new());
18//!
19//! // Spawn a thread that waits to be notified
20//! {
21//!     // Copy the Arc
22//!     let notify = notify.clone();
23//!
24//!     // Start listening before we spawn
25//!     let notified = notify.notified();
26//!
27//!     // Spawn the thread
28//!     tokio::spawn(async move {
29//!         // Wait for our listen to complete
30//!         notified.await; // <-- fails because we can't move `notified`
31//!     });
32//! }
33//!
34//! // Notify the waiting threads
35//! notify.notify_waiters();
36//! ```
37//!
38//! At present, there's no easy way to do this kind of borrow-then-move. While there are many
39//! crates available to help turn this problem into a self-borrowing one, those solutions require
40//! `unsafe` code with complicated covariance implications. This crate is instead able to solve this
41//! simple case with no `unsafe`, and more complex cases are solved with only 1-2 lines of `unsafe`
42//! code with no covariance meddling. Here is the solution to the above problem:
43//!
44//!
45//! ```
46//! # #[tokio::main(flavor = "current_thread")]
47//! # async fn main() {
48//! use std::sync::Arc;
49//! use tokio::sync::Notify;
50//! use owned_future::make;
51//!
52//! // Make the constructor for our future
53//! let get_notified = owned_future::get!(fn(n: &mut Arc<Notify>) -> () {
54//!     n.notified()
55//! });
56//!
57//! let notify = Arc::new(Notify::new());
58//!
59//! // Spawn a thread that waits to be notified
60//! {
61//!     // Copy the Arc
62//!     let notify = notify.clone();
63//!
64//!     // Start listening before we spawn
65//!     let notified = make(notify, get_notified);
66//!
67//!     // Spawn the thread
68//!     tokio::spawn(async move {
69//!         // wait for our listen to complete
70//!         notified.await;
71//!     });
72//! }
73//!
74//! // notify the waiting threads
75//! notify.notify_waiters();
76//! # }
77//! ```
78//!
79//! # Technical Details
80//!
81//! So how does this work exactly? Rust doesn't usually let you move a borrowed value, but there's
82//! one exception. Pinned `async` blocks. Once a value has been moved into an `async` block, and the
83//! the block has been transformed into a `Pin`ned `Future`, during the execution of the future, the
84//! borrow can be executed, but the pointer anchoring the future can still be freely moved around.
85//! All that may sound a little confusing, but essentially what this crate does is a prettied up
86//! version of this:
87//!
88//! ```skip
89//! // Copy the Arc
90//! let notify = notify.clone();
91//!
92//! let mut wrapped_notified = Box::pin(async move {
93//!     let notified = notify.notified();
94//!
95//!     // This prevents us from driving the future to completion on the first poll
96//!     force_pause().await;
97//!
98//!     future.await
99//! });
100//!
101//! // Drive the future up to just past our `force_pause`.
102//! // This will start listening before we spawn
103//! wrapped_notified.poll_once()
104//!
105//! // Spawn the thread
106//! tokio::spawn(async move {
107//!     // wait for our listen to complete
108//!     wrapped_notified.await;
109//! });
110//!
111//! // notify the waiting threads
112//! notify.notify_waiters();
113//! ```
114//!
115//! The more complex wrappers have a little bit more machinery to handle auxiliary values and
116//! errors, and the `Async*` helpers do a little bit of pin-projection and poll handling, but
117//! ultimately the core logic boils down to something like the above.
118//!
119//! [`tokio::sync::Notify`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
120//! [`Notify::notified`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notified
121//! [`Notified`]: https://docs.rs/tokio/latest/tokio/sync/futures/struct.Notified.html
122//! [`Notify::notify_waiters`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_waiters
123
124#![no_std]
125
126extern crate alloc;
127
128mod funcs;
129mod macros;
130mod traits;
131
132pub use funcs::*;
133pub use traits::*;
134
135#[cfg(test)]
136mod tests {
137    use alloc::sync::Arc;
138
139    use tokio::sync::{Barrier, Notify};
140
141    use super::*;
142
143    #[tokio::test]
144    async fn get() {
145        let notify = Arc::new(Notify::new());
146        let barrier = Arc::new(Barrier::new(11));
147
148        let get_notified = get!(fn(n: &mut Arc<Notify>) -> () {
149            n.notified()
150        });
151
152        for _ in 0..10 {
153            let barrier = barrier.clone();
154            let notified = get_notified.apply_to(notify.clone());
155            tokio::spawn(async move {
156                notified.await;
157                barrier.wait().await;
158            });
159        }
160
161        notify.notify_waiters();
162        barrier.wait().await;
163    }
164
165    #[tokio::test]
166    async fn try_get() {
167        let notify = Arc::new(Notify::new());
168        let barrier = Arc::new(Barrier::new(11));
169
170        let get_notified = try_get!(fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
171            Ok((n.notified(), ()))
172        });
173
174        for _ in 0..10 {
175            let barrier = barrier.clone();
176            let notified = match get_notified.apply_to(notify.clone()) {
177                Ok((notified, ())) => notified,
178                Err((_notify, ())) => unreachable!(),
179            };
180            tokio::spawn(async move {
181                notified.await;
182                barrier.wait().await;
183            });
184        }
185
186        notify.notify_waiters();
187        barrier.wait().await;
188    }
189
190    #[cfg(feature = "async")]
191    #[tokio::test]
192    async fn async_try_get() {
193        use alloc::vec;
194
195        let notify = Arc::new(Notify::new());
196
197        let get_notified = async_try_get!(async fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
198            Ok((n.notified(), ()))
199        });
200
201        let mut waiters = vec![];
202        for _ in 0..10 {
203            let notified = match get_notified.apply_to(notify.clone()).await {
204                Ok((notified, ())) => notified,
205                Err((_notify, ())) => unreachable!(),
206            };
207            waiters.push(async move {
208                notified.await;
209            })
210        }
211
212        notify.notify_waiters();
213        for waiter in waiters {
214            waiter.await
215        }
216    }
217
218    #[tokio::test]
219    async fn async_send_try_get() {
220        let notify = Arc::new(Notify::new());
221        let barrier = Arc::new(Barrier::new(11));
222
223        let get_notified = async_send_try_get!(fn<'a, 'b>(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
224            async move {
225                let n = n.notified();
226                let v: Pin<Box<dyn 'b + Send + Future<Output = ()>>> = Box::pin(async move { n.await });
227                Ok((
228                    v,
229                    (),
230                ))
231            }
232        });
233
234        for _ in 0..10 {
235            let barrier = barrier.clone();
236            let notified = match get_notified.apply_to(notify.clone()).await {
237                Ok((notified, ())) => notified,
238                Err((_notify, ())) => unreachable!(),
239            };
240            tokio::spawn(async move {
241                notified.await;
242                barrier.wait().await;
243            });
244        }
245
246        notify.notify_waiters();
247        barrier.wait().await;
248    }
249}