owned_future/lib.rs
1//!
2//! This tiny crate contains helpers to turn a borrowed future into an owned one.
3//!
4//! # Motivation
5//!
6//! Take [`tokio::sync::Notify`] as an example. It's often useful to call [`Notify::notified`] from
7//! the main thread and then pass it to a spawned thread. Doing this guarantees the resulting
8//! [`Notified`] is watching for calls to [`Notify::notify_waiters`] prior to the thread being
9//! spawned. However this isn't possible with as `notified` borrows the `Notify`.
10//!
11//! ```compile_fail
12//! use std::sync::Arc;
13//! use tokio::sync::Notify;
14//!
15//! let notify = Arc::new(Notify::new());
16//!
17//! // Spawn a thread that waits to be notified
18//! {
19//! let notify = notify.clone();
20//! // Start listening before we spawn
21//! let notified = notify.notified();
22//! tokio::spawn(async move {
23//! // wait for our listen to complete
24//! notified.await; // <-- fails because we can't move `notified`
25//! });
26//! }
27//!
28//! // notify the waiting threads
29//! notify.notify_waiters();
30//! ```
31//!
32//! At present, there's no way to do this kind of borrow and then move, and while there are many
33//! crates available to help turn this problem into a self-borrowing one, those solutions require
34//! `unsafe` code with complicated covariance implications. This crate is instead able to solve this
35//! simple case with no `unsafe`, and only 1-2 lines of `unsafe` code for more complex cases with no
36//! covariance problems. Here is the solution to the above problem:
37//!
38//!
39//! ```
40//! # #[tokio::main(flavor = "current_thread")]
41//! # async fn main() {
42//! use std::sync::Arc;
43//! use tokio::sync::Notify;
44//! use owned_future::make;
45//!
46//! let notify = Arc::new(Notify::new());
47//!
48//! // Spawn a thread that waits to be notified
49//! {
50//! // Start listening before we spawn
51//! let get_notified = owned_future::get!(fn(n: &mut Arc<Notify>) -> () {
52//! n.notified()
53//! });
54//! let notified = make(notify.clone(), get_notified);
55//! tokio::spawn(async move {
56//! // wait for our listen to complete
57//! notified.await;
58//! });
59//! }
60//!
61//! // notify the waiting threads
62//! notify.notify_waiters();
63//! # }
64//! ```
65//!
66//! # Technical Details
67//!
68//! So how does this work exactly? Well, while rust usually doesn't let you move a borrowed value,
69//! there's one exception. Pinned futures. Once `async` code has been transformed into a `Pin`ed
70//! future, it can invoke the borrow operation, but still be freely moved around. Essentially what
71//! this crate does is a prettied up version of this:
72//!
73//! ```skip
74//! let mut wrapped_notified = Box::pin(async move {
75//! let notified = notify.notified();
76//!
77//! // This prevents us from driving the future to completion on the first poll
78//! force_pause().await;
79//!
80//! future.await
81//! });
82//!
83//! // Drive the future up to just past our `force_pause`
84//! wrapped_notified.poll_once()
85//!
86//! tokio::spawn(async move {
87//! // wait for our listen to complete
88//! wrapped_notified.await;
89//! });
90//! ```
91//!
92//! The more complex wrappers have a little bit more machinery to handle auxiliary values and
93//! errors, and the `Async*` helpers need a little bit of pin-projection and poll handling, but
94//! ultimately the core logic boils down to something like the above.
95//!
96//! [`tokio::sync::Notify`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
97//! [`Notify::notified`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notified
98//! [`Notified`]: https://docs.rs/tokio/latest/tokio/sync/futures/struct.Notified.html
99//! [`Notify::notify_waiters`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_waiters
100
101#![no_std]
102
103extern crate alloc;
104
105mod funcs;
106mod macros;
107mod traits;
108
109pub use funcs::*;
110pub use traits::*;
111
112#[cfg(test)]
113mod tests {
114 use alloc::sync::Arc;
115
116 use tokio::sync::{Barrier, Notify};
117
118 use super::*;
119
120 #[tokio::test]
121 async fn get() {
122 let notify = Arc::new(Notify::new());
123 let barrier = Arc::new(Barrier::new(11));
124
125 let get_notified = get!(fn(n: &mut Arc<Notify>) -> () {
126 n.notified()
127 });
128
129 for _ in 0..10 {
130 let barrier = barrier.clone();
131 let notified = get_notified.apply_to(notify.clone());
132 tokio::spawn(async move {
133 notified.await;
134 barrier.wait().await;
135 });
136 }
137
138 notify.notify_waiters();
139 barrier.wait().await;
140 }
141
142 #[tokio::test]
143 async fn try_get() {
144 let notify = Arc::new(Notify::new());
145 let barrier = Arc::new(Barrier::new(11));
146
147 let get_notified = try_get!(fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
148 Ok((n.notified(), ()))
149 });
150
151 for _ in 0..10 {
152 let barrier = barrier.clone();
153 let notified = match get_notified.apply_to(notify.clone()) {
154 Ok((notified, ())) => notified,
155 Err((_notify, ())) => unreachable!(),
156 };
157 tokio::spawn(async move {
158 notified.await;
159 barrier.wait().await;
160 });
161 }
162
163 notify.notify_waiters();
164 barrier.wait().await;
165 }
166
167 #[cfg(feature = "async")]
168 #[tokio::test]
169 async fn async_try_get() {
170 use alloc::vec;
171
172 let notify = Arc::new(Notify::new());
173
174 let get_notified = async_try_get!(async fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
175 Ok((n.notified(), ()))
176 });
177
178 let mut waiters = vec![];
179 for _ in 0..10 {
180 let notified = match get_notified.apply_to(notify.clone()).await {
181 Ok((notified, ())) => notified,
182 Err((_notify, ())) => unreachable!(),
183 };
184 waiters.push(async move {
185 notified.await;
186 })
187 }
188
189 notify.notify_waiters();
190 for waiter in waiters {
191 waiter.await
192 }
193 }
194
195 #[tokio::test]
196 async fn async_send_try_get() {
197 let notify = Arc::new(Notify::new());
198 let barrier = Arc::new(Barrier::new(11));
199
200 let get_notified = async_send_try_get!(fn<'a, 'b>(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
201 async move {
202 let n = n.notified();
203 let v: Pin<Box<dyn 'b + Send + Future<Output = ()>>> = Box::pin(async move { n.await });
204 Ok((
205 v,
206 (),
207 ))
208 }
209 });
210
211 for _ in 0..10 {
212 let barrier = barrier.clone();
213 let notified = match get_notified.apply_to(notify.clone()).await {
214 Ok((notified, ())) => notified,
215 Err((_notify, ())) => unreachable!(),
216 };
217 tokio::spawn(async move {
218 notified.await;
219 barrier.wait().await;
220 });
221 }
222
223 notify.notify_waiters();
224 barrier.wait().await;
225 }
226}