futures_scopes/relay/mod.rs
1use std::sync::Arc;
2
3mod relay_future;
4mod relay_pad;
5mod waker_park;
6
7use futures::task::{FutureObj, LocalSpawn, Spawn, SpawnError};
8pub use relay_pad::UntilEmpty;
9
10use self::relay_pad::RelayPad;
11use crate::{ScopedSpawn, SpawnScope};
12
13/// A local spawn that can be spawned onto a [`RelayScope`].
14pub trait RelayScopeLocalSpawning: LocalSpawn + Clone {
15 /// Add this spawn to `scope` and relay spawned futures to it.
16 fn spawn_scope<'sc>(&self, scope: &RelayScope<'sc>) -> Result<(), SpawnError>
17 where
18 Self: 'sc,
19 {
20 scope.relay_to_local(self)
21 }
22}
23
24impl<Sp: LocalSpawn + Clone> RelayScopeLocalSpawning for Sp {}
25
26/// A spawn that can be spawned onto a [`RelayScope`].
27pub trait RelayScopeSpawning: Spawn + Clone + Send {
28 /// Add this spawn to `scope` and relay spawned futures to it.
29 fn spawn_scope<'sc>(&self, scope: &RelayScope<'sc>) -> Result<(), SpawnError>
30 where
31 Self: 'sc,
32 {
33 scope.relay_to(self)
34 }
35}
36
37impl<Sp: Spawn + Clone + Send> RelayScopeSpawning for Sp {}
38
39/// A spawn scope that can be used to spawn futures of lifetime `'sc` onto multiple underlying spawns.
40///
41/// To poll the spawned futures, `RelayScope` will spawn *relay-tasks* onto all the underlying spawns
42/// registered with [`relay_to`](RelayScope::relay_to) or [`relay_to_local`](RelayScope::relay_to_local).
43/// These *relay-tasks* will replicate themselves to fill up the underlying spawns, but not overwhelm them.
44/// This behavior makes it possible to consolidate multiple spawns into one.
45///
46/// **Important**: Spawned futures will not be polled until [`relay_to`](RelayScope::relay_to)
47/// or [`relay_to_local`](RelayScope::relay_to_local) is used to spawn this scope
48/// onto another spawn. Especially [`until_empty`](RelayScope::until_empty) will not poll
49/// any future that was spawned onto the scope
50/// (unlike [`LocalScope::until_empty`](crate::local::LocalScope::until_empty)).
51///
52/// To safely create a `RelayScope`, use [`new_relay_scope!`].
53///
54/// # Dropping
55///
56/// If the scope is dropped, all futures that were spawned on it will be dropped as well.
57/// For this to happen, the scope may momentarily block the current thread to wait for a future to return from its current polling operation.
58/// Note that this does not mean the scope will block until all futures are polled to completion,
59/// but only until the currently running futures are done polling, regardless of whether they return ready or pending.
60///
61/// # Example
62///
63/// ```
64/// # use futures::executor::{LocalPool,ThreadPool};
65/// # use std::sync::Mutex;
66/// # use futures_scopes::*;
67/// # use futures_scopes::relay::*;
68/// let thread_pool = ThreadPool::new().unwrap();
69/// let mut local_pool = LocalPool::new();
70///
71/// let task_done = Mutex::new(false);
72///
73/// let scope = new_relay_scope!();
74///
75/// // Spawn the scope on both the thread pool and the local pool
76/// scope.relay_to(&thread_pool).unwrap();
77/// scope.relay_to_local(&local_pool.spawner()).unwrap();
78///
79/// scope.spawner().spawn_scoped(async {
80/// // This is either executed on the thread pool or the local pool
81/// println!("Hello from the scope!");
82///
83/// // task_done can safely be referenced without using Arc
84/// *task_done.lock().unwrap() = true;
85/// }).unwrap();
86///
87/// // Run the local pool until we are done for sure
88/// local_pool.run_until(scope.until_empty());
89///
90/// assert!(task_done.lock().unwrap().clone());
91/// ```
92///
93/// # How relaying works
94///
95/// When [`relay_to`](RelayScope::relay_to) or [`relay_to_local`](RelayScope::relay_to_local) is used,
96/// `RelayScope` will spawn a single *relay-task* on the given spawn.
97/// Each *relay-task* (potentially on different executors) will take a future from the scope and try to complete it.
98/// After completing the future, it will grab a new one and repeat the process.
99///
100/// When the scope is dropped, all futures within the *relay-tasks* will be dropped as well.
101/// The *relay-tasks* themselves will continue to live but complete [`Poll::Ready`](std::task::Poll::Ready) the next time they are polled.
102///
103/// To process multiple tasks on the same spawn, the *relay-tasks* will replicate themselves,
104/// by spawning new *relay-tasks* on the same spawn from time to time.
105/// To not overwhelm a single spawn and leave room for other tasks,
106/// the `RelayScope` will monitor the amount of currently not processed tasks for each of its underlying spawns.
107/// If there are too many tasks that are not processed, it will not spawn new *relay-tasks* on that spawn.
108///
109///
110#[derive(Debug)]
111pub struct RelayScope<'sc> {
112 pad: Arc<RelayPad<'sc>>,
113}
114
115/// Creates a new RelayScope in the current scope and returns a reference to it.
116///
117/// The macro can take any number of
118/// [`LocalSpawn`](futures::task::LocalSpawn)s
119/// or
120/// [`Spawn`](futures::task::Spawn)s
121/// as arguments.
122/// The scope will then be spawned onto all of them.
123///
124/// Because the actual value cannot be accessed, it cannot be moved out of scope or be [`std::mem::forget`]ten,
125/// which could lead to undefined behavior.
126///
127/// # Examples
128///
129/// ```
130/// use futures::executor::{LocalPool, ThreadPool};
131/// use futures_scopes::relay::new_relay_scope;
132///
133/// let thread_pool = ThreadPool::new().unwrap();
134/// let pool = LocalPool::new();
135///
136/// let scope1 = new_relay_scope!();
137/// let scope2 = new_relay_scope!(thread_pool, pool.spawner());
138/// ```
139#[doc(hidden)]
140#[macro_export]
141macro_rules! __new_relay_scope__ {
142 () => {{
143 &unsafe { $crate::relay::RelayScope::unchecked_new() }
144 }};
145 ($($scopes:expr),+) => {{
146 let create_custom_scope = || {
147 use $crate::relay::{RelayScopeLocalSpawning, RelayScopeSpawning};
148 let scope = unsafe { $crate::relay::RelayScope::unchecked_new() };
149 $(
150 $scopes.spawn_scope(&scope).ok();
151 )+
152 scope
153 };
154 &create_custom_scope()
155 }};
156}
157
158#[doc(inline)]
159pub use __new_relay_scope__ as new_relay_scope;
160
161impl RelayScope<'static> {
162 /// Creates a new `RelayScope` that can only spawn static futures.
163 pub fn new() -> Self {
164 unsafe { Self::unchecked_new() }
165 }
166}
167
168impl Default for RelayScope<'static> {
169 fn default() -> Self {
170 Self::new()
171 }
172}
173
174impl<'sc> RelayScope<'sc> {
175 /// Creates a new `RelayScope`
176 ///
177 /// Spawned futures can reference everything covered by `'sc`.
178 ///
179 /// # Safety
180 /// It is of utmost important that the created scope is dropped at the end of `'sc`.
181 /// Especially [`std::mem::forget`] should not be used on this type.
182 /// Failing to drop this correctly can lead to spawned futures having references
183 /// into undefined memory (namely when they reference something on the stack that is already popped).
184 /// Use [`new_relay_scope`] to safely create a `RelayScope`, that cannot **not** be dropped.
185 pub unsafe fn unchecked_new() -> Self {
186 Self {
187 pad: Arc::new(RelayPad::new()),
188 }
189 }
190
191 /// Relay this scope to the given `spawn`.
192 ///
193 /// This will spawn a *relay-task* onto the given [`Spawn`] that will process the futures of this scope.
194 /// For more details see [here](RelayScope#how-relaying-works).
195 pub fn relay_to(&self, spawn: &(impl Spawn + Clone + Send + 'sc)) -> Result<(), SpawnError> {
196 relay_future::spawn_on_global(self.pad.clone(), spawn.clone(), self.pad.next_spawn_id())
197 }
198
199 /// Relay this scope to the given local `spawn`.
200 ///
201 /// This will spawn a *relay-task* onto the given [`LocalSpawn`] that will process the futures of this scope.
202 /// For more details see [here](RelayScope#how-relaying-works).
203 pub fn relay_to_local(&self, spawn: &(impl LocalSpawn + Clone + 'sc)) -> Result<(), SpawnError> {
204 relay_future::spawn_on_local(self.pad.clone(), spawn.clone(), self.pad.next_spawn_id())
205 }
206
207 /// Returns a future that will complete the moment there are no more spawned futures in the scope.
208 ///
209 /// This does not effect the ability to spawn new futures,
210 /// and new futures may have been spawned onto the scope before `.until_empty().await` even returns.
211 ///
212 /// Note that unlike [`LocalScope::until_empty`](crate::local::LocalScope::until_empty), the returned `UntilEmpty` future
213 /// does not poll any future that was spawned onto the scope.
214 /// Use [`relay_to`](RelayScope::relay_to) or [`relay_to_local`](RelayScope::relay_to_local), otherwise this future will never complete.
215 ///
216 /// # Example
217 ///
218 /// ```
219 /// # use futures_scopes::*;
220 /// # use futures_scopes::relay::*;
221 /// # use futures::executor::{ThreadPool, block_on};
222 /// let scope = new_relay_scope!();
223 ///
224 /// // Relay the scope to a thread pool
225 /// let thread_pool = ThreadPool::new().unwrap();
226 /// scope.relay_to(&thread_pool).unwrap();
227 ///
228 /// // Spawn some task
229 /// scope.spawner().spawn_scoped(async { /* ... */ }).unwrap();
230 ///
231 /// // Because task will be executed on another thread, we won't run into a deadlock
232 /// // when blocking this thread.
233 /// block_on(scope.until_empty());
234 /// ```
235 pub fn until_empty(&self) -> UntilEmpty {
236 self.pad.until_empty()
237 }
238
239 /// Prevents new tasks from being spawned and cleans up all existing tasks.
240 ///
241 /// Calling this function is effectively the same as dropping the Scope.
242 /// Afterwards the scope can be used normally, but all attempts to spawn
243 /// new futures will fail with a [`shutdown error`].
244 ///
245 ///
246 /// [`shutdown error`]: futures::task::SpawnError
247 ///
248 /// # Blocking
249 ///
250 /// This method blocks the current thread to cleanup the remaining tasks.
251 /// Like drop it will wait for currently running, non-pending tasks to finish
252 /// execution, before dropping them.
253 ///
254 /// # Example
255 ///
256 /// ```
257 /// # use futures_scopes::*;
258 /// # use futures_scopes::relay::*;
259 /// # use futures::executor::block_on;
260 /// let scope = new_relay_scope!();
261 ///
262 /// // stop the scope
263 /// scope.stop();
264 ///
265 /// // New spawn attempts will fail
266 /// scope.spawner().spawn_scoped(async { /* ... */ }).unwrap_err();
267 ///
268 /// // The future returned by until_empty will continue to work
269 /// // and return immediately
270 /// block_on(scope.until_empty());
271 /// ```
272 pub fn stop(&self) {
273 self.pad.destroy();
274 }
275}
276
277impl<'sc> Drop for RelayScope<'sc> {
278 fn drop(&mut self) {
279 self.stop();
280 }
281}
282
283impl<'sc> SpawnScope<'sc, ()> for RelayScope<'sc> {
284 type Spawner = RelayScopeSpawner<'sc>;
285
286 fn spawner(&self) -> Self::Spawner {
287 RelayScopeSpawner {
288 sender: self.pad.clone(),
289 }
290 }
291}
292
293/// A spawner that can be obtained from [`RelayScope::spawner`].
294///
295/// This spawner may live longer then the scope.
296/// In case a future is spawned after the scope has been destroyed,
297/// the spawner will return [`SpawnError::shutdown`].
298#[derive(Debug)]
299pub struct RelayScopeSpawner<'sc> {
300 sender: Arc<RelayPad<'sc>>,
301}
302
303impl<'sc> Clone for RelayScopeSpawner<'sc> {
304 fn clone(&self) -> Self {
305 Self {
306 sender: self.sender.clone(),
307 }
308 }
309}
310
311impl<'sc> ScopedSpawn<'sc, ()> for RelayScopeSpawner<'sc> {
312 fn spawn_obj_scoped(&self, future: FutureObj<'sc, ()>) -> Result<(), SpawnError> {
313 self.sender.enqueue_task(future)
314 }
315
316 fn status_scoped(&self) -> Result<(), SpawnError> {
317 if self.sender.is_destroyed() {
318 Err(SpawnError::shutdown())
319 } else {
320 Ok(())
321 }
322 }
323}
324
325impl<'sc> Spawn for RelayScopeSpawner<'sc> {
326 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
327 self.spawn_obj_scoped(future)
328 }
329
330 fn status(&self) -> Result<(), SpawnError> {
331 self.status_scoped()
332 }
333}