futures_scopes/relay/
mod.rs

1use std::sync::Arc;
2
3mod relay_future;
4mod relay_pad;
5mod waker_park;
6
7use futures::task::{FutureObj, LocalSpawn, Spawn, SpawnError};
8pub use relay_pad::UntilEmpty;
9
10use self::relay_pad::RelayPad;
11use crate::{ScopedSpawn, SpawnScope};
12
13/// A local spawn that can be spawned onto a [`RelayScope`].
14pub trait RelayScopeLocalSpawning: LocalSpawn + Clone {
15    /// Add this spawn to `scope` and relay spawned futures to it.
16    fn spawn_scope<'sc>(&self, scope: &RelayScope<'sc>) -> Result<(), SpawnError>
17    where
18        Self: 'sc,
19    {
20        scope.relay_to_local(self)
21    }
22}
23
24impl<Sp: LocalSpawn + Clone> RelayScopeLocalSpawning for Sp {}
25
26/// A spawn that can be spawned onto a [`RelayScope`].
27pub trait RelayScopeSpawning: Spawn + Clone + Send {
28    /// Add this spawn to `scope` and relay spawned futures to it.
29    fn spawn_scope<'sc>(&self, scope: &RelayScope<'sc>) -> Result<(), SpawnError>
30    where
31        Self: 'sc,
32    {
33        scope.relay_to(self)
34    }
35}
36
37impl<Sp: Spawn + Clone + Send> RelayScopeSpawning for Sp {}
38
39/// A spawn scope that can be used to spawn futures of lifetime `'sc` onto multiple underlying spawns.
40///
41/// To poll the spawned futures, `RelayScope` will spawn *relay-tasks* onto all the underlying spawns
42/// registered with [`relay_to`](RelayScope::relay_to) or [`relay_to_local`](RelayScope::relay_to_local).
43/// These *relay-tasks* will replicate themselves to fill up the underlying spawns, but not overwhelm them.
44/// This behavior makes it possible to consolidate multiple spawns into one.
45///
46/// **Important**: Spawned futures will not be polled until [`relay_to`](RelayScope::relay_to)
47///                or [`relay_to_local`](RelayScope::relay_to_local) is used to spawn this scope
48///                onto another spawn. Especially [`until_empty`](RelayScope::until_empty) will not poll
49///                any future that was spawned onto the scope
50///                (unlike [`LocalScope::until_empty`](crate::local::LocalScope::until_empty)).
51///
52/// To safely create a `RelayScope`, use [`new_relay_scope!`].
53///
54/// # Dropping
55///
56/// If the scope is dropped, all futures that were spawned on it will be dropped as well.
57/// For this to happen, the scope may momentarily block the current thread to wait for a future to return from its current polling operation.
58/// Note that this does not mean the scope will block until all futures are polled to completion,
59/// but only until the currently running futures are done polling, regardless of whether they return ready or pending.
60///
61/// # Example
62///
63/// ```
64/// # use futures::executor::{LocalPool,ThreadPool};
65/// # use std::sync::Mutex;
66/// # use futures_scopes::*;
67/// # use futures_scopes::relay::*;
68/// let thread_pool = ThreadPool::new().unwrap();
69/// let mut local_pool = LocalPool::new();
70///
71/// let task_done = Mutex::new(false);
72///
73/// let scope = new_relay_scope!();
74///
75/// // Spawn the scope on both the thread pool and the local pool
76/// scope.relay_to(&thread_pool).unwrap();
77/// scope.relay_to_local(&local_pool.spawner()).unwrap();
78///
79/// scope.spawner().spawn_scoped(async {
80///   // This is either executed on the thread pool or the local pool
81///   println!("Hello from the scope!");
82///
83///   // task_done can safely be referenced without using Arc
84///   *task_done.lock().unwrap() = true;
85/// }).unwrap();
86///
87/// // Run the local pool until we are done for sure
88/// local_pool.run_until(scope.until_empty());
89///
90/// assert!(task_done.lock().unwrap().clone());
91/// ```
92///
93/// # How relaying works
94///
95/// When [`relay_to`](RelayScope::relay_to) or [`relay_to_local`](RelayScope::relay_to_local) is used,
96/// `RelayScope` will spawn a single *relay-task* on the given spawn.
97/// Each *relay-task* (potentially on different executors) will take a future from the scope and try to complete it.
98/// After completing the future, it will grab a new one and repeat the process.
99///
100/// When the scope is dropped, all futures within the *relay-tasks* will be dropped as well.
101/// The *relay-tasks* themselves will continue to live but complete [`Poll::Ready`](std::task::Poll::Ready) the next time they are polled.
102///
103/// To process multiple tasks on the same spawn, the *relay-tasks* will replicate themselves,
104/// by spawning new *relay-tasks* on the same spawn from time to time.
105/// To not overwhelm a single spawn and leave room for other tasks,
106/// the `RelayScope` will monitor the amount of currently not processed tasks for each of its underlying spawns.
107/// If there are too many tasks that are not processed, it will not spawn new *relay-tasks* on that spawn.
108///
109///
110#[derive(Debug)]
111pub struct RelayScope<'sc> {
112    pad: Arc<RelayPad<'sc>>,
113}
114
115/// Creates a new RelayScope in the current scope and returns a reference to it.
116///
117/// The macro can take any number of
118/// [`LocalSpawn`](futures::task::LocalSpawn)s
119/// or
120/// [`Spawn`](futures::task::Spawn)s
121/// as arguments.
122/// The scope will then be spawned onto all of them.
123///
124/// Because the actual value cannot be accessed, it cannot be moved out of scope or be [`std::mem::forget`]ten,
125/// which could lead to undefined behavior.
126///
127/// # Examples
128///
129/// ```
130/// use futures::executor::{LocalPool, ThreadPool};
131/// use futures_scopes::relay::new_relay_scope;
132///
133/// let thread_pool = ThreadPool::new().unwrap();
134/// let pool = LocalPool::new();
135///
136/// let scope1 = new_relay_scope!();
137/// let scope2 = new_relay_scope!(thread_pool, pool.spawner());
138/// ```
139#[doc(hidden)]
140#[macro_export]
141macro_rules! __new_relay_scope__ {
142    () => {{
143        &unsafe { $crate::relay::RelayScope::unchecked_new() }
144    }};
145    ($($scopes:expr),+) => {{
146        let create_custom_scope = || {
147            use $crate::relay::{RelayScopeLocalSpawning, RelayScopeSpawning};
148            let scope = unsafe { $crate::relay::RelayScope::unchecked_new() };
149            $(
150                $scopes.spawn_scope(&scope).ok();
151            )+
152            scope
153        };
154        &create_custom_scope()
155    }};
156}
157
158#[doc(inline)]
159pub use __new_relay_scope__ as new_relay_scope;
160
161impl RelayScope<'static> {
162    /// Creates a new `RelayScope` that can only spawn static futures.
163    pub fn new() -> Self {
164        unsafe { Self::unchecked_new() }
165    }
166}
167
168impl Default for RelayScope<'static> {
169    fn default() -> Self {
170        Self::new()
171    }
172}
173
174impl<'sc> RelayScope<'sc> {
175    /// Creates a new `RelayScope`
176    ///
177    /// Spawned futures can reference everything covered by `'sc`.
178    ///
179    /// # Safety
180    /// It is of utmost important that the created scope is dropped at the end of `'sc`.
181    /// Especially [`std::mem::forget`] should not be used on this type.
182    /// Failing to drop this correctly can lead to spawned futures having references
183    /// into undefined memory (namely when they reference something on the stack that is already popped).
184    /// Use [`new_relay_scope`] to safely create a `RelayScope`, that cannot **not** be dropped.
185    pub unsafe fn unchecked_new() -> Self {
186        Self {
187            pad: Arc::new(RelayPad::new()),
188        }
189    }
190
191    /// Relay this scope to the given `spawn`.
192    ///
193    /// This will spawn a *relay-task* onto the given [`Spawn`] that will process the futures of this scope.
194    /// For more details see [here](RelayScope#how-relaying-works).
195    pub fn relay_to(&self, spawn: &(impl Spawn + Clone + Send + 'sc)) -> Result<(), SpawnError> {
196        relay_future::spawn_on_global(self.pad.clone(), spawn.clone(), self.pad.next_spawn_id())
197    }
198
199    /// Relay this scope to the given local `spawn`.
200    ///
201    /// This will spawn a *relay-task* onto the given [`LocalSpawn`] that will process the futures of this scope.
202    /// For more details see [here](RelayScope#how-relaying-works).
203    pub fn relay_to_local(&self, spawn: &(impl LocalSpawn + Clone + 'sc)) -> Result<(), SpawnError> {
204        relay_future::spawn_on_local(self.pad.clone(), spawn.clone(), self.pad.next_spawn_id())
205    }
206
207    /// Returns a future that will complete the moment there are no more spawned futures in the scope.
208    ///
209    /// This does not effect the ability to spawn new futures,
210    /// and new futures may have been spawned onto the scope before `.until_empty().await` even returns.
211    ///
212    /// Note that unlike [`LocalScope::until_empty`](crate::local::LocalScope::until_empty), the returned `UntilEmpty` future
213    /// does not poll any future that was spawned onto the scope.
214    /// Use [`relay_to`](RelayScope::relay_to) or [`relay_to_local`](RelayScope::relay_to_local), otherwise this future will never complete.
215    ///
216    /// # Example
217    ///
218    /// ```
219    /// # use futures_scopes::*;
220    /// # use futures_scopes::relay::*;
221    /// # use futures::executor::{ThreadPool, block_on};
222    /// let scope = new_relay_scope!();
223    ///
224    /// // Relay the scope to a thread pool
225    /// let thread_pool = ThreadPool::new().unwrap();
226    /// scope.relay_to(&thread_pool).unwrap();
227    ///
228    /// // Spawn some task
229    /// scope.spawner().spawn_scoped(async { /* ... */ }).unwrap();
230    ///
231    /// // Because task will be executed on another thread, we won't run into a deadlock
232    /// // when blocking this thread.
233    /// block_on(scope.until_empty());
234    /// ```
235    pub fn until_empty(&self) -> UntilEmpty {
236        self.pad.until_empty()
237    }
238
239    /// Prevents new tasks from being spawned and cleans up all existing tasks.
240    ///
241    /// Calling this function is effectively the same as dropping the Scope.
242    /// Afterwards the scope can be used normally, but all attempts to spawn
243    /// new futures will fail with a [`shutdown error`].
244    ///
245    ///
246    /// [`shutdown error`]: futures::task::SpawnError
247    ///
248    /// # Blocking
249    ///
250    /// This method blocks the current thread to cleanup the remaining tasks.
251    /// Like drop it will wait for currently running, non-pending tasks to finish
252    /// execution, before dropping them.
253    ///
254    /// # Example
255    ///
256    /// ```
257    /// # use futures_scopes::*;
258    /// # use futures_scopes::relay::*;
259    /// # use futures::executor::block_on;
260    /// let scope = new_relay_scope!();
261    ///
262    /// // stop the scope
263    /// scope.stop();
264    ///
265    /// // New spawn attempts will fail
266    /// scope.spawner().spawn_scoped(async { /* ... */ }).unwrap_err();
267    ///
268    /// // The future returned by until_empty will continue to work
269    /// // and return immediately
270    /// block_on(scope.until_empty());
271    /// ```
272    pub fn stop(&self) {
273        self.pad.destroy();
274    }
275}
276
277impl<'sc> Drop for RelayScope<'sc> {
278    fn drop(&mut self) {
279        self.stop();
280    }
281}
282
283impl<'sc> SpawnScope<'sc, ()> for RelayScope<'sc> {
284    type Spawner = RelayScopeSpawner<'sc>;
285
286    fn spawner(&self) -> Self::Spawner {
287        RelayScopeSpawner {
288            sender: self.pad.clone(),
289        }
290    }
291}
292
293/// A spawner that can be obtained from [`RelayScope::spawner`].
294///
295/// This spawner may live longer then the scope.
296/// In case a future is spawned after the scope has been destroyed,
297/// the spawner will return [`SpawnError::shutdown`].
298#[derive(Debug)]
299pub struct RelayScopeSpawner<'sc> {
300    sender: Arc<RelayPad<'sc>>,
301}
302
303impl<'sc> Clone for RelayScopeSpawner<'sc> {
304    fn clone(&self) -> Self {
305        Self {
306            sender: self.sender.clone(),
307        }
308    }
309}
310
311impl<'sc> ScopedSpawn<'sc, ()> for RelayScopeSpawner<'sc> {
312    fn spawn_obj_scoped(&self, future: FutureObj<'sc, ()>) -> Result<(), SpawnError> {
313        self.sender.enqueue_task(future)
314    }
315
316    fn status_scoped(&self) -> Result<(), SpawnError> {
317        if self.sender.is_destroyed() {
318            Err(SpawnError::shutdown())
319        } else {
320            Ok(())
321        }
322    }
323}
324
325impl<'sc> Spawn for RelayScopeSpawner<'sc> {
326    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
327        self.spawn_obj_scoped(future)
328    }
329
330    fn status(&self) -> Result<(), SpawnError> {
331        self.status_scoped()
332    }
333}