owned_future/lib.rs
1//!
2//! This tiny crate contains helpers to turn a borrowed future into an owned one.
3//!
4//! # Motivation
5//!
6//! Take [`tokio::sync::Notify`] as an example. A common paradigm is to call [`Notify::notified`]
7//! before a relevant threading update/check, then perform the update/check, and then wait on the
8//! resulting [`Notified`] future. Doing this guarantees the `Notified` is watching for calls to
9//! [`Notify::notify_waiters`] prior to the update/check. This paradigm would be useful when dealing
10//! with thread spawning (i.e. calling `notified` and moving the resulting future into the thread),
11//! but this isn't possible with `notified` as it borrows the `Notify`.
12//!
13//! ```compile_fail
14//! use std::sync::Arc;
15//! use tokio::sync::Notify;
16//!
17//! let notify = Arc::new(Notify::new());
18//!
19//! // Spawn a thread that waits to be notified
20//! {
21//! // Copy the Arc
22//! let notify = notify.clone();
23//!
24//! // Start listening before we spawn
25//! let notified = notify.notified();
26//!
27//! // Spawn the thread
28//! tokio::spawn(async move {
29//! // Wait for our listen to complete
30//! notified.await; // <-- fails because we can't move `notified`
31//! });
32//! }
33//!
34//! // Notify the waiting threads
35//! notify.notify_waiters();
36//! ```
37//!
38//! At present, there's no easy way to do this kind of borrow-then-move. While there are many
39//! crates available to help turn this problem into a self-borrowing one, those solutions require
40//! `unsafe` code with complicated covariance implications. This crate is instead able to solve this
41//! simple case with no `unsafe`, and more complex cases are solved with only 1-2 lines of `unsafe`
42//! code with no covariance meddling. Here is the solution to the above problem:
43//!
44//!
45//! ```
46//! # #[tokio::main(flavor = "current_thread")]
47//! # async fn main() {
48//! use std::sync::Arc;
49//! use tokio::sync::Notify;
50//! use owned_future::make;
51//!
52//! // Make the constructor for our future
53//! let get_notified = owned_future::get!(fn(n: &mut Arc<Notify>) -> () {
54//! n.notified()
55//! });
56//!
57//! let notify = Arc::new(Notify::new());
58//!
59//! // Spawn a thread that waits to be notified
60//! {
61//! // Copy the Arc
62//! let notify = notify.clone();
63//!
64//! // Start listening before we spawn
65//! let notified = make(notify, get_notified);
66//!
67//! // Spawn the thread
68//! tokio::spawn(async move {
69//! // wait for our listen to complete
70//! notified.await;
71//! });
72//! }
73//!
74//! // notify the waiting threads
75//! notify.notify_waiters();
76//! # }
77//! ```
78//!
79//! # Technical Details
80//!
81//! So how does this work exactly? Rust doesn't usually let you move a borrowed value, but there's
82//! one exception. Pinned `async` blocks. Once a value has been moved into an `async` block, and the
83//! the block has been transformed into a `Pin`ned `Future`, during the execution of the future, the
84//! borrow can be executed, but the pointer anchoring the future can still be freely moved around.
85//! All that may sound a little confusing, but essentially what this crate does is a prettied up
86//! version of this:
87//!
88//! ```skip
89//! // Copy the Arc
90//! let notify = notify.clone();
91//!
92//! let mut wrapped_notified = Box::pin(async move {
93//! let notified = notify.notified();
94//!
95//! // This prevents us from driving the future to completion on the first poll
96//! force_pause().await;
97//!
98//! future.await
99//! });
100//!
101//! // Drive the future up to just past our `force_pause`.
102//! // This will start listening before we spawn
103//! wrapped_notified.poll_once()
104//!
105//! // Spawn the thread
106//! tokio::spawn(async move {
107//! // wait for our listen to complete
108//! wrapped_notified.await;
109//! });
110//!
111//! // notify the waiting threads
112//! notify.notify_waiters();
113//! ```
114//!
115//! The more complex wrappers have a little bit more machinery to handle auxiliary values and
116//! errors, and the `Async*` helpers do a little bit of pin-projection and poll handling, but
117//! ultimately the core logic boils down to something like the above.
118//!
119//! [`tokio::sync::Notify`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
120//! [`Notify::notified`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notified
121//! [`Notified`]: https://docs.rs/tokio/latest/tokio/sync/futures/struct.Notified.html
122//! [`Notify::notify_waiters`]: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html#method.notify_waiters
123
124#![no_std]
125
126extern crate alloc;
127
128mod funcs;
129mod macros;
130mod traits;
131
132pub use funcs::*;
133pub use traits::*;
134
135#[cfg(test)]
136mod tests {
137 use alloc::sync::Arc;
138
139 use tokio::sync::{Barrier, Notify};
140
141 use super::*;
142
143 #[tokio::test]
144 async fn get() {
145 let notify = Arc::new(Notify::new());
146 let barrier = Arc::new(Barrier::new(11));
147
148 let get_notified = get!(fn(n: &mut Arc<Notify>) -> () {
149 n.notified()
150 });
151
152 for _ in 0..10 {
153 let barrier = barrier.clone();
154 let notified = get_notified.apply_to(notify.clone());
155 tokio::spawn(async move {
156 notified.await;
157 barrier.wait().await;
158 });
159 }
160
161 notify.notify_waiters();
162 barrier.wait().await;
163 }
164
165 #[tokio::test]
166 async fn try_get() {
167 let notify = Arc::new(Notify::new());
168 let barrier = Arc::new(Barrier::new(11));
169
170 let get_notified = try_get!(fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
171 Ok((n.notified(), ()))
172 });
173
174 for _ in 0..10 {
175 let barrier = barrier.clone();
176 let notified = match get_notified.apply_to(notify.clone()) {
177 Ok((notified, ())) => notified,
178 Err((_notify, ())) => unreachable!(),
179 };
180 tokio::spawn(async move {
181 notified.await;
182 barrier.wait().await;
183 });
184 }
185
186 notify.notify_waiters();
187 barrier.wait().await;
188 }
189
190 #[cfg(feature = "async")]
191 #[tokio::test]
192 async fn async_try_get() {
193 use alloc::vec;
194
195 let notify = Arc::new(Notify::new());
196
197 let get_notified = async_try_get!(async fn(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
198 Ok((n.notified(), ()))
199 });
200
201 let mut waiters = vec![];
202 for _ in 0..10 {
203 let notified = match get_notified.apply_to(notify.clone()).await {
204 Ok((notified, ())) => notified,
205 Err((_notify, ())) => unreachable!(),
206 };
207 waiters.push(async move {
208 notified.await;
209 })
210 }
211
212 notify.notify_waiters();
213 for waiter in waiters {
214 waiter.await
215 }
216 }
217
218 #[tokio::test]
219 async fn async_send_try_get() {
220 let notify = Arc::new(Notify::new());
221 let barrier = Arc::new(Barrier::new(11));
222
223 let get_notified = async_send_try_get!(fn<'a, 'b>(n: &mut Arc<Notify>) -> Result<((), ()), ()> {
224 async move {
225 let n = n.notified();
226 let v: Pin<Box<dyn 'b + Send + Future<Output = ()>>> = Box::pin(async move { n.await });
227 Ok((
228 v,
229 (),
230 ))
231 }
232 });
233
234 for _ in 0..10 {
235 let barrier = barrier.clone();
236 let notified = match get_notified.apply_to(notify.clone()).await {
237 Ok((notified, ())) => notified,
238 Err((_notify, ())) => unreachable!(),
239 };
240 tokio::spawn(async move {
241 notified.await;
242 barrier.wait().await;
243 });
244 }
245
246 notify.notify_waiters();
247 barrier.wait().await;
248 }
249}