owned_future/
lib.rs

1//!
2//! This tiny crate contains helpers to turn a borrowed future into an owned one.
3//!
4//! # Motivation
5//!
6//! Take [`tokio::sync::Notify`] as an example. It's often useful to call [`Notify::notified`] from
7//! the main thread and then pass it to a spawned thread. Doing this guarantees the resulting
8//! [`Notified`] is watching for calls to [`Notify::notify_waiters`] prior to the thread being
9//! spawned. However this isn't possible with as `notified` borrows the `Notify`.
10//!
11//! ```compile_fail
12//! use std::sync::Arc;
13//! use tokio::sync::Notify;
14//!
15//! let notify = Arc::new(Notify::new());
16//!
17//! // Spawn a thread that waits to be notified
18//! {
19//!     let notify = notify.clone();
20//!     // Start listening before we spawn
21//!     let notified = notify.notified();
22//!     tokio::spawn(async move {
23//!         // wait for our listen to complete
24//!         notified.await; // <-- fails because we can't move `notified`
25//!     });
26//! }
27//!
28//! // notify the waiting threads
29//! notify.notify_waiters();
30//! ```
31//!
32//! At present, there's no way to do this kind of borrow and then move, and while there are many
33//! crates available to help turn this problem into a self-borrowing one, those solutions require
34//! `unsafe` code with complicated covariance implications. This crate is instead able to solve this
35//! simple case with no `unsafe`, and only 1-2 lines of `unsafe` code for more complex cases with no
36//! covariance problems. Here is the solution to the above problem:
37//!
38//!
39//! ```
40//! # #[tokio::main(flavor = "current_thread")]
41//! # async fn main() {
42//! use std::sync::Arc;
43//! use tokio::sync::Notify;
44//! use owned_future::make;
45//!
46//! let notify = Arc::new(Notify::new());
47//!
48//! // Spawn a thread that waits to be notified
49//! {
50//!     // Start listening before we spawn
51//!     let get_notified = owned_future::get!(fn(n: &mut Arc<Notify>) -> () {
52//!         n.notified()
53//!     });
54//!     let notified = make(notify.clone(), get_notified);
55//!     tokio::spawn(async move {
56//!         // wait for our listen to complete
57//!         notified.await;
58//!     });
59//! }
60//!
61//! // notify the waiting threads
62//! notify.notify_waiters();
63//! # }
64//! ```
65//!
66//! # Technical Details
67//!
68//! So how does this work exactly? Well, while rust usually doesn't let you move a borrowed value,
69//! there's one exception. Pinned futures. Once `async` code has been transformed into a `Pin`ed
70//! future, it can invoke the borrow operation, but still be freely moved around. Essentially what
71//! this crate does is a prettied up version of this:
72//!
73//! ```skip
74//! let mut wrapped_notified = Box::pin(async move {
75//!     let notified = notify.notified();
76//!
77//!     // This prevents us from driving the future to completion on the first poll
78//!     force_pause().await;
79//!
80//!     future.await
81//! });
82//!
83//! // Drive the future up to just past our `force_pause`
84//! wrapped_notified.poll_once()
85//!
86//! tokio::spawn(async move {
87//!     // wait for our listen to complete
88//!     wrapped_notified.await;
89//! });
90//! ```
91//!
92//! The more complex wrappers have a little bit more machinery to handle auxiliary values and
93//! errors, and the `Async*` helpers need a little bit of pin-projection and poll handling, but
94//! ultimately the core logic boils down to something like the above.
95//!
96//! [`tokio::sync::Notify`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
97//! [`Notify::notified`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notified
98//! [`Notified`]: https://docs.rs/tokio/latest/tokio/sync/futures/struct.Notified.html
99//! [`Notify::notify_waiters`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_waiters
100
101#![no_std]
102
103extern crate alloc;
104
105mod funcs;
106mod macros;
107mod traits;
108
109pub use funcs::*;
110pub use traits::*;
111
112#[cfg(test)]
113mod tests {
114    use alloc::sync::Arc;
115
116    use tokio::sync::{Barrier, Notify};
117
118    use super::*;
119
120    #[tokio::test]
121    async fn get() {
122        let notify = Arc::new(Notify::new());
123        let barrier = Arc::new(Barrier::new(11));
124
125        let get_notified = get!(fn(n: &mut Arc<Notify>) -> () {
126            n.notified()
127        });
128
129        for _ in 0..10 {
130            let barrier = barrier.clone();
131            let notified = get_notified.apply_to(notify.clone());
132            tokio::spawn(async move {
133                notified.await;
134                barrier.wait().await;
135            });
136        }
137
138        notify.notify_waiters();
139        barrier.wait().await;
140    }
141
142    #[tokio::test]
143    async fn try_get() {
144        let notify = Arc::new(Notify::new());
145        let barrier = Arc::new(Barrier::new(11));
146
147        let get_notified = try_get!(fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
148            Ok((n.notified(), ()))
149        });
150
151        for _ in 0..10 {
152            let barrier = barrier.clone();
153            let notified = match get_notified.apply_to(notify.clone()) {
154                Ok((notified, ())) => notified,
155                Err((_notify, ())) => unreachable!(),
156            };
157            tokio::spawn(async move {
158                notified.await;
159                barrier.wait().await;
160            });
161        }
162
163        notify.notify_waiters();
164        barrier.wait().await;
165    }
166
167    #[cfg(feature = "async")]
168    #[tokio::test]
169    async fn async_try_get() {
170        use alloc::vec;
171
172        let notify = Arc::new(Notify::new());
173
174        let get_notified = async_try_get!(async fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
175            Ok((n.notified(), ()))
176        });
177
178        let mut waiters = vec![];
179        for _ in 0..10 {
180            let notified = match get_notified.apply_to(notify.clone()).await {
181                Ok((notified, ())) => notified,
182                Err((_notify, ())) => unreachable!(),
183            };
184            waiters.push(async move {
185                notified.await;
186            })
187        }
188
189        notify.notify_waiters();
190        for waiter in waiters {
191            waiter.await
192        }
193    }
194
195    #[tokio::test]
196    async fn async_send_try_get() {
197        let notify = Arc::new(Notify::new());
198        let barrier = Arc::new(Barrier::new(11));
199
200        let get_notified = async_send_try_get!(fn<'a, 'b>(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
201            async move {
202                let n = n.notified();
203                let v: Pin<Box<dyn 'b + Send + Future<Output = ()>>> = Box::pin(async move { n.await });
204                Ok((
205                    v,
206                    (),
207                ))
208            }
209        });
210
211        for _ in 0..10 {
212            let barrier = barrier.clone();
213            let notified = match get_notified.apply_to(notify.clone()).await {
214                Ok((notified, ())) => notified,
215                Err((_notify, ())) => unreachable!(),
216            };
217            tokio::spawn(async move {
218                notified.await;
219                barrier.wait().await;
220            });
221        }
222
223        notify.notify_waiters();
224        barrier.wait().await;
225    }
226}