async_executor/
static_executors.rs

1use crate::{debug_state, Executor, LocalExecutor, State};
2use async_task::{Builder, Runnable, Task};
3use slab::Slab;
4use std::{
5    cell::UnsafeCell,
6    fmt,
7    future::Future,
8    marker::PhantomData,
9    panic::{RefUnwindSafe, UnwindSafe},
10};
11
12impl Executor<'static> {
13    /// Consumes the [`Executor`] and intentionally leaks it.
14    ///
15    /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
16    /// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
17    /// when spawning, running, and finishing tasks.
18    ///
19    /// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
20    /// irreversible without the use of unsafe.
21    ///
22    /// # Example
23    ///
24    /// ```
25    /// use async_executor::Executor;
26    /// use futures_lite::future;
27    ///
28    /// let ex = Executor::new().leak();
29    ///
30    /// let task = ex.spawn(async {
31    ///     println!("Hello world");
32    /// });
33    ///
34    /// future::block_on(ex.run(task));
35    /// ```
36    pub fn leak(self) -> &'static StaticExecutor {
37        let ptr = self.state_ptr();
38        // SAFETY: So long as an Executor lives, it's state pointer will always be valid
39        // when accessed through state_ptr. This executor will live for the full 'static
40        // lifetime so this isn't an arbitrary lifetime extension.
41        let state: &'static State = unsafe { &*ptr };
42
43        std::mem::forget(self);
44
45        let mut active = state.active.lock().unwrap();
46        if !active.is_empty() {
47            // Reschedule all of the active tasks.
48            for waker in active.drain() {
49                waker.wake();
50            }
51            // Overwrite to ensure that the slab is deallocated.
52            *active = Slab::new();
53        }
54
55        // SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
56        // The lifetime is not altered: 'static -> 'static.
57        let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) };
58        static_executor
59    }
60}
61
62impl LocalExecutor<'static> {
63    /// Consumes the [`LocalExecutor`] and intentionally leaks it.
64    ///
65    /// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
66    /// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
67    /// when spawning, running, and finishing tasks.
68    ///
69    /// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
70    /// irreversible without the use of unsafe.
71    ///
72    /// # Example
73    ///
74    /// ```
75    /// use async_executor::LocalExecutor;
76    /// use futures_lite::future;
77    ///
78    /// let ex = LocalExecutor::new().leak();
79    ///
80    /// let task = ex.spawn(async {
81    ///     println!("Hello world");
82    /// });
83    ///
84    /// future::block_on(ex.run(task));
85    /// ```
86    pub fn leak(self) -> &'static StaticLocalExecutor {
87        let ptr = self.inner.state_ptr();
88        // SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
89        // when accessed through state_ptr. This executor will live for the full 'static
90        // lifetime so this isn't an arbitrary lifetime extension.
91        let state: &'static State = unsafe { &*ptr };
92
93        std::mem::forget(self);
94
95        let mut active = state.active.lock().unwrap();
96        if !active.is_empty() {
97            // Reschedule all of the active tasks.
98            for waker in active.drain() {
99                waker.wake();
100            }
101            // Overwrite to ensure that the slab is deallocated.
102            *active = Slab::new();
103        }
104
105        // SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
106        // The lifetime is not altered: 'static -> 'static.
107        let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) };
108        static_executor
109    }
110}
111
112/// A static-lifetimed async [`Executor`].
113///
114/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
115/// contexts via [`Executor::leak`].
116///
117/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
118/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
119///
120/// As this type does not implement `Drop`, losing the handle to the executor or failing
121/// to consistently drive the executor with [`StaticExecutor::tick`] or
122/// [`StaticExecutor::run`] will cause the all spawned tasks to permanently leak. Any
123/// tasks at the time will not be cancelled.
124///
125/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html
126#[repr(transparent)]
127pub struct StaticExecutor {
128    state: State,
129}
130
131// SAFETY: Executor stores no thread local state that can be accessed via other thread.
132unsafe impl Send for StaticExecutor {}
133// SAFETY: Executor internally synchronizes all of it's operations internally.
134unsafe impl Sync for StaticExecutor {}
135
136impl UnwindSafe for StaticExecutor {}
137impl RefUnwindSafe for StaticExecutor {}
138
139impl fmt::Debug for StaticExecutor {
140    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141        debug_state(&self.state, "StaticExecutor", f)
142    }
143}
144
145impl StaticExecutor {
146    /// Creates a new StaticExecutor.
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// use async_executor::StaticExecutor;
152    ///
153    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
154    /// ```
155    pub const fn new() -> Self {
156        Self {
157            state: State::new(),
158        }
159    }
160
161    /// Spawns a task onto the executor.
162    ///
163    /// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
164    /// borrow on the executor.
165    ///
166    /// # Examples
167    ///
168    /// ```
169    /// use async_executor::StaticExecutor;
170    ///
171    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
172    ///
173    /// let task = EXECUTOR.spawn(async {
174    ///     println!("Hello world");
175    /// });
176    /// ```
177    pub fn spawn<T: Send + 'static>(
178        &'static self,
179        future: impl Future<Output = T> + Send + 'static,
180    ) -> Task<T> {
181        let (runnable, task) = Builder::new()
182            .propagate_panic(true)
183            .spawn(|()| future, self.schedule());
184        runnable.schedule();
185        task
186    }
187
188    /// Spawns a non-`'static` task onto the executor.
189    ///
190    /// ## Safety
191    ///
192    /// The caller must ensure that the returned task terminates
193    /// or is cancelled before the end of 'a.
194    pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
195        &'static self,
196        future: impl Future<Output = T> + Send + 'a,
197    ) -> Task<T> {
198        // SAFETY:
199        //
200        // - `future` is `Send`
201        // - `future` is not `'static`, but the caller guarantees that the
202        //    task, and thus its `Runnable` must not live longer than `'a`.
203        // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
204        //    Therefore we do not need to worry about what is done with the
205        //    `Waker`.
206        let (runnable, task) = unsafe {
207            Builder::new()
208                .propagate_panic(true)
209                .spawn_unchecked(|()| future, self.schedule())
210        };
211        runnable.schedule();
212        task
213    }
214
215    /// Attempts to run a task if at least one is scheduled.
216    ///
217    /// Running a scheduled task means simply polling its future once.
218    ///
219    /// # Examples
220    ///
221    /// ```
222    /// use async_executor::StaticExecutor;
223    ///
224    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
225    ///
226    /// assert!(!EXECUTOR.try_tick()); // no tasks to run
227    ///
228    /// let task = EXECUTOR.spawn(async {
229    ///     println!("Hello world");
230    /// });
231    ///
232    /// assert!(EXECUTOR.try_tick()); // a task was found
233    /// ```
234    pub fn try_tick(&self) -> bool {
235        self.state.try_tick()
236    }
237
238    /// Runs a single task.
239    ///
240    /// Running a task means simply polling its future once.
241    ///
242    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
243    ///
244    /// # Examples
245    ///
246    /// ```
247    /// use async_executor::StaticExecutor;
248    /// use futures_lite::future;
249    ///
250    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
251    ///
252    /// let task = EXECUTOR.spawn(async {
253    ///     println!("Hello world");
254    /// });
255    ///
256    /// future::block_on(EXECUTOR.tick()); // runs the task
257    /// ```
258    pub async fn tick(&self) {
259        self.state.tick().await;
260    }
261
262    /// Runs the executor until the given future completes.
263    ///
264    /// # Examples
265    ///
266    /// ```
267    /// use async_executor::StaticExecutor;
268    /// use futures_lite::future;
269    ///
270    /// static EXECUTOR: StaticExecutor = StaticExecutor::new();
271    ///
272    /// let task = EXECUTOR.spawn(async { 1 + 2 });
273    /// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
274    ///
275    /// assert_eq!(res, 6);
276    /// ```
277    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
278        self.state.run(future).await
279    }
280
281    /// Returns a function that schedules a runnable task when it gets woken up.
282    fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
283        let state: &'static State = &self.state;
284        // TODO: If possible, push into the current local queue and notify the ticker.
285        move |runnable| {
286            state.queue.push(runnable).unwrap();
287            state.notify();
288        }
289    }
290}
291
292impl Default for StaticExecutor {
293    fn default() -> Self {
294        Self::new()
295    }
296}
297
298/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
299///
300/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
301/// contexts via [`LocalExecutor::leak`].
302///
303/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
304/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
305///
306/// As this type does not implement `Drop`, losing the handle to the executor or failing
307/// to consistently drive the executor with [`StaticLocalExecutor::tick`] or
308/// [`StaticLocalExecutor::run`] will cause the all spawned tasks to permanently leak. Any
309/// tasks at the time will not be cancelled.
310///
311/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
312#[repr(transparent)]
313pub struct StaticLocalExecutor {
314    state: State,
315    marker_: PhantomData<UnsafeCell<()>>,
316}
317
318impl UnwindSafe for StaticLocalExecutor {}
319impl RefUnwindSafe for StaticLocalExecutor {}
320
321impl fmt::Debug for StaticLocalExecutor {
322    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
323        debug_state(&self.state, "StaticLocalExecutor", f)
324    }
325}
326
327impl StaticLocalExecutor {
328    /// Creates a new StaticLocalExecutor.
329    ///
330    /// # Examples
331    ///
332    /// ```
333    /// use async_executor::StaticLocalExecutor;
334    ///
335    /// thread_local! {
336    ///     static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
337    /// }
338    /// ```
339    pub const fn new() -> Self {
340        Self {
341            state: State::new(),
342            marker_: PhantomData,
343        }
344    }
345
346    /// Spawns a task onto the executor.
347    ///
348    /// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
349    /// borrow on the executor.
350    ///
351    /// # Examples
352    ///
353    /// ```
354    /// use async_executor::LocalExecutor;
355    ///
356    /// let ex = LocalExecutor::new().leak();
357    ///
358    /// let task = ex.spawn(async {
359    ///     println!("Hello world");
360    /// });
361    /// ```
362    pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
363        let (runnable, task) = Builder::new()
364            .propagate_panic(true)
365            .spawn_local(|()| future, self.schedule());
366        runnable.schedule();
367        task
368    }
369
370    /// Spawns a non-`'static` task onto the executor.
371    ///
372    /// ## Safety
373    ///
374    /// The caller must ensure that the returned task terminates
375    /// or is cancelled before the end of 'a.
376    pub unsafe fn spawn_scoped<'a, T: 'a>(
377        &'static self,
378        future: impl Future<Output = T> + 'a,
379    ) -> Task<T> {
380        // SAFETY:
381        //
382        // - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
383        //   `try_tick`, `tick` and `run` can only be called from the origin
384        //    thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
385        //    be called from the origin thread, ensuring that `future` and the executor
386        //    share the same origin thread. The `Runnable` can be scheduled from other
387        //    threads, but because of the above `Runnable` can only be called or
388        //    dropped on the origin thread.
389        // - `future` is not `'static`, but the caller guarantees that the
390        //    task, and thus its `Runnable` must not live longer than `'a`.
391        // - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
392        //    Therefore we do not need to worry about what is done with the
393        //    `Waker`.
394        let (runnable, task) = unsafe {
395            Builder::new()
396                .propagate_panic(true)
397                .spawn_unchecked(|()| future, self.schedule())
398        };
399        runnable.schedule();
400        task
401    }
402
403    /// Attempts to run a task if at least one is scheduled.
404    ///
405    /// Running a scheduled task means simply polling its future once.
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// use async_executor::LocalExecutor;
411    ///
412    /// let ex = LocalExecutor::new().leak();
413    /// assert!(!ex.try_tick()); // no tasks to run
414    ///
415    /// let task = ex.spawn(async {
416    ///     println!("Hello world");
417    /// });
418    /// assert!(ex.try_tick()); // a task was found
419    /// ```
420    pub fn try_tick(&self) -> bool {
421        self.state.try_tick()
422    }
423
424    /// Runs a single task.
425    ///
426    /// Running a task means simply polling its future once.
427    ///
428    /// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
429    ///
430    /// # Examples
431    ///
432    /// ```
433    /// use async_executor::LocalExecutor;
434    /// use futures_lite::future;
435    ///
436    /// let ex = LocalExecutor::new().leak();
437    ///
438    /// let task = ex.spawn(async {
439    ///     println!("Hello world");
440    /// });
441    /// future::block_on(ex.tick()); // runs the task
442    /// ```
443    pub async fn tick(&self) {
444        self.state.tick().await;
445    }
446
447    /// Runs the executor until the given future completes.
448    ///
449    /// # Examples
450    ///
451    /// ```
452    /// use async_executor::LocalExecutor;
453    /// use futures_lite::future;
454    ///
455    /// let ex = LocalExecutor::new().leak();
456    ///
457    /// let task = ex.spawn(async { 1 + 2 });
458    /// let res = future::block_on(ex.run(async { task.await * 2 }));
459    ///
460    /// assert_eq!(res, 6);
461    /// ```
462    pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
463        self.state.run(future).await
464    }
465
466    /// Returns a function that schedules a runnable task when it gets woken up.
467    fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
468        let state: &'static State = &self.state;
469        // TODO: If possible, push into the current local queue and notify the ticker.
470        move |runnable| {
471            state.queue.push(runnable).unwrap();
472            state.notify();
473        }
474    }
475}
476
477impl Default for StaticLocalExecutor {
478    fn default() -> Self {
479        Self::new()
480    }
481}